Nonblocking fifo
How can I make a fifo between two python processes, that allow dropping of lines if the reader is not able to handle the input?
read
or readline
faster then the writer writes, it should block. readline
attempt. Is this possible with a named fifo, or is there any other simple way for achiving this?
The following code uses a named FIFO to allow communication between two scripts.
read
faster than the writer, it blocks. Code
import argparse
import errno
import os
from select import select
import time
class OneFifo(object):
def __init__(self, name):
self.name = name
def __enter__(self):
if os.path.exists(self.name):
os.unlink(self.name)
os.mkfifo(self.name)
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
if os.path.exists(self.name):
os.unlink(self.name)
def write(self, data):
print "Waiting for client to open FIFO..."
try:
server_file = os.open(self.name, os.O_WRONLY | os.O_NONBLOCK)
except OSError as exc:
if exc.errno == errno.ENXIO:
server_file = None
else:
raise
if server_file is not None:
print "Writing line to FIFO..."
try:
os.write(server_file, data)
print "Done."
except OSError as exc:
if exc.errno == errno.EPIPE:
pass
else:
raise
os.close(server_file)
def read_nonblocking(self):
result = None
try:
client_file = os.open(self.name, os.O_RDONLY | os.O_NONBLOCK)
except OSError as exc:
if exc.errno == errno.ENOENT:
client_file = None
else:
raise
if client_file is not None:
try:
rlist = [client_file]
wlist = []
xlist = []
rlist, wlist, xlist = select(rlist, wlist, xlist, 0.01)
if client_file in rlist:
result = os.read(client_file, 1024)
except OSError as exc:
if exc.errno == errno.EAGAIN or exc.errno == errno.EWOULDBLOCK:
result = None
else:
raise
os.close(client_file)
return result
def read(self):
try:
with open(self.name, 'r') as client_file:
result = client_file.read()
except OSError as exc:
if exc.errno == errno.ENOENT:
result = None
else:
raise
if not len(result):
result = None
return result
def parse_argument():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--client', action='store_true',
help='Set this flag for the client')
parser.add_argument('-n', '--non-blocking', action='store_true',
help='Set this flag to read without blocking')
result = parser.parse_args()
return result
if __name__ == '__main__':
args = parse_argument()
if not args.client:
with OneFifo('known_name') as one_fifo:
while True:
one_fifo.write('one line')
time.sleep(0.1)
else:
one_fifo = OneFifo('known_name')
while True:
if args.non_blocking:
result = one_fifo.read_nonblocking()
else:
result = one_fifo.read()
if result is not None:
print result
The server
checks if the client
has opened the FIFO. If the client
has opened the FIFO, the server
writes a line. Otherwise, the server
continues running. I have implemented a non-blocking read because the blocking read causes a problem: If the server
restarts, most of the time the client
stays blocked and never recovers. With a non-blocking client
, a server
restart is more easily tolerated.
Output
[user@machine:~] python onefifo.py
Waiting for client to open FIFO...
Waiting for client to open FIFO...
Writing line to FIFO...
Done.
Waiting for client to open FIFO...
Writing line to FIFO...
Done.
[user@machine:~] python onefifo.py -c
one line
one line
Notes
On startup, if the server
detects that the FIFO already exists, it removes it. This is the easiest way to notify clients
that the server
has restarted. This notification is usually ignored by the blocking version of the client
.
Well, that's not actually a FIFO (queue) as far as I am aware - it's a single variable. I suppose it might be implementable if you set up a queue or pipe with a maximum size of 1, but it seems that it would work better to use a Lock
on a single object in one of the processes, which the other process references via a proxy object. The reader would set it to None
whenever it reads, and the writer would overwrite the contents every time it writes.
You can get those to the other processes by passing the proxy of the object, and a proxy of the lock, as an argument to all relevant processes. To get it slightly more conveniently, you can use a Manager
, which provides a single object with proxy that you can pass in, which contains and provides proxies for whatever other objects (including locks) you want to put in it. This answer provides a useful example of proper use of a Manager to pass objects into a new process.
上一篇: 在单元测试期间渲染视图
下一篇: 不阻塞fifo