Nonblocking fifo

How can I make a fifo between two python processes, that allow dropping of lines if the reader is not able to handle the input?

  • If the reader tries to read or readline faster then the writer writes, it should block.
  • If the reader cannot work as fast as the writer writes, the writer should not block. Lines should not be buffered (except one line at a time) and only the last line written should be received by the reader on its next readline attempt.
  • Is this possible with a named fifo, or is there any other simple way for achiving this?


    The following code uses a named FIFO to allow communication between two scripts.

  • If the reader tries to read faster than the writer, it blocks.
  • If the reader cannot keep up with the writer, the writer does not block.
  • Operations are buffer oriented. Line oriented operations are not currently implemented.
  • This code should be considered a proof-of-concept. The delays and buffer sizes are arbitrary.
  • Code

    import argparse
    import errno
    import os
    from select import select
    import time
    
    class OneFifo(object):
        def __init__(self, name):
            self.name = name
    
        def __enter__(self):
            if os.path.exists(self.name):
                os.unlink(self.name)
            os.mkfifo(self.name)
            return self
    
        def __exit__(self, exc_type, exc_value, exc_traceback):
            if os.path.exists(self.name):
                os.unlink(self.name)
    
        def write(self, data):
            print "Waiting for client to open FIFO..."
            try:
                server_file = os.open(self.name, os.O_WRONLY | os.O_NONBLOCK)
            except OSError as exc:
                if exc.errno == errno.ENXIO:
                    server_file = None
                else:
                    raise
            if server_file is not None:
                print "Writing line to FIFO..."
                try:
                    os.write(server_file, data)
                    print "Done."
                except OSError as exc:
                    if exc.errno == errno.EPIPE:
                        pass
                    else:
                        raise
                os.close(server_file)
    
        def read_nonblocking(self):
            result = None
            try:
                client_file = os.open(self.name, os.O_RDONLY | os.O_NONBLOCK)
            except OSError as exc:
                if exc.errno == errno.ENOENT:
                    client_file = None
                else:
                    raise
            if client_file is not None:
                try:
                    rlist = [client_file]
                    wlist = []
                    xlist = []
                    rlist, wlist, xlist = select(rlist, wlist, xlist, 0.01)
                    if client_file in rlist:
                        result = os.read(client_file, 1024)
                except OSError as exc:
                    if exc.errno == errno.EAGAIN or exc.errno == errno.EWOULDBLOCK:
                        result = None
                    else:
                        raise
                os.close(client_file)
            return result
    
        def read(self):
            try:
                with open(self.name, 'r') as client_file:
                    result = client_file.read()
            except OSError as exc:
                if exc.errno == errno.ENOENT:
                    result = None
                else:
                    raise
            if not len(result):
                result = None
            return result
    
    def parse_argument():
        parser = argparse.ArgumentParser()
        parser.add_argument('-c', '--client', action='store_true',
                            help='Set this flag for the client')
        parser.add_argument('-n', '--non-blocking', action='store_true',
                            help='Set this flag to read without blocking')
        result = parser.parse_args()
        return result
    
    if __name__ == '__main__':
        args = parse_argument()
        if not args.client:
            with OneFifo('known_name') as one_fifo:
                while True:
                    one_fifo.write('one line')
                    time.sleep(0.1)
        else:
            one_fifo = OneFifo('known_name')
            while True:
                if args.non_blocking:
                    result = one_fifo.read_nonblocking()
                else:
                    result = one_fifo.read()
                if result is not None:
                    print result
    

    The server checks if the client has opened the FIFO. If the client has opened the FIFO, the server writes a line. Otherwise, the server continues running. I have implemented a non-blocking read because the blocking read causes a problem: If the server restarts, most of the time the client stays blocked and never recovers. With a non-blocking client , a server restart is more easily tolerated.

    Output

    [user@machine:~] python onefifo.py
    Waiting for client to open FIFO...
    Waiting for client to open FIFO...
    Writing line to FIFO...           
    Done.
    Waiting for client to open FIFO...
    Writing line to FIFO...
    Done.
    
    [user@machine:~] python onefifo.py -c
    one line
    one line
    

    Notes

    On startup, if the server detects that the FIFO already exists, it removes it. This is the easiest way to notify clients that the server has restarted. This notification is usually ignored by the blocking version of the client .


    Well, that's not actually a FIFO (queue) as far as I am aware - it's a single variable. I suppose it might be implementable if you set up a queue or pipe with a maximum size of 1, but it seems that it would work better to use a Lock on a single object in one of the processes, which the other process references via a proxy object. The reader would set it to None whenever it reads, and the writer would overwrite the contents every time it writes.

    You can get those to the other processes by passing the proxy of the object, and a proxy of the lock, as an argument to all relevant processes. To get it slightly more conveniently, you can use a Manager , which provides a single object with proxy that you can pass in, which contains and provides proxies for whatever other objects (including locks) you want to put in it. This answer provides a useful example of proper use of a Manager to pass objects into a new process.

    链接地址: http://www.djcxy.com/p/83160.html

    上一篇: 在单元测试期间渲染视图

    下一篇: 不阻塞fifo