不阻塞fifo

如何在两个python进程之间创建fifo,如果读者无法处理输入,那么允许删除行?

  • 如果读者试图readreadline那么作家写快,应该可以挡住。
  • 如果读者不能像写作者写作那样快,作者就不应该阻止。 行不应该被缓存(除了在一次一行),并只写了应该由读者在下次收到的最后一行readline尝试。
  • 这是可能的一个命名的先进先出,或有任何其他简单的方法来实现这个?


    以下代码使用命名的FIFO来允许两个脚本之间的通信。

  • 如果读者尝试read比写作者更快,则会阻止。
  • 如果读者不能跟上作家,作家不会阻止。
  • 操作是面向缓冲区的。 面向行的操作目前尚未实现。
  • 这个代码应该被认为是一个概念验证。 延迟和缓冲区大小是任意的。
  • import argparse
    import errno
    import os
    from select import select
    import time
    
    class OneFifo(object):
        def __init__(self, name):
            self.name = name
    
        def __enter__(self):
            if os.path.exists(self.name):
                os.unlink(self.name)
            os.mkfifo(self.name)
            return self
    
        def __exit__(self, exc_type, exc_value, exc_traceback):
            if os.path.exists(self.name):
                os.unlink(self.name)
    
        def write(self, data):
            print "Waiting for client to open FIFO..."
            try:
                server_file = os.open(self.name, os.O_WRONLY | os.O_NONBLOCK)
            except OSError as exc:
                if exc.errno == errno.ENXIO:
                    server_file = None
                else:
                    raise
            if server_file is not None:
                print "Writing line to FIFO..."
                try:
                    os.write(server_file, data)
                    print "Done."
                except OSError as exc:
                    if exc.errno == errno.EPIPE:
                        pass
                    else:
                        raise
                os.close(server_file)
    
        def read_nonblocking(self):
            result = None
            try:
                client_file = os.open(self.name, os.O_RDONLY | os.O_NONBLOCK)
            except OSError as exc:
                if exc.errno == errno.ENOENT:
                    client_file = None
                else:
                    raise
            if client_file is not None:
                try:
                    rlist = [client_file]
                    wlist = []
                    xlist = []
                    rlist, wlist, xlist = select(rlist, wlist, xlist, 0.01)
                    if client_file in rlist:
                        result = os.read(client_file, 1024)
                except OSError as exc:
                    if exc.errno == errno.EAGAIN or exc.errno == errno.EWOULDBLOCK:
                        result = None
                    else:
                        raise
                os.close(client_file)
            return result
    
        def read(self):
            try:
                with open(self.name, 'r') as client_file:
                    result = client_file.read()
            except OSError as exc:
                if exc.errno == errno.ENOENT:
                    result = None
                else:
                    raise
            if not len(result):
                result = None
            return result
    
    def parse_argument():
        parser = argparse.ArgumentParser()
        parser.add_argument('-c', '--client', action='store_true',
                            help='Set this flag for the client')
        parser.add_argument('-n', '--non-blocking', action='store_true',
                            help='Set this flag to read without blocking')
        result = parser.parse_args()
        return result
    
    if __name__ == '__main__':
        args = parse_argument()
        if not args.client:
            with OneFifo('known_name') as one_fifo:
                while True:
                    one_fifo.write('one line')
                    time.sleep(0.1)
        else:
            one_fifo = OneFifo('known_name')
            while True:
                if args.non_blocking:
                    result = one_fifo.read_nonblocking()
                else:
                    result = one_fifo.read()
                if result is not None:
                    print result
    

    server检查client是否打开了FIFO。 如果client打开了FIFO, server将写入一行。 否则, server继续运行。 我已经实现了非阻塞式读取,因为阻塞式读取会导致一个问题:如果server重新启动,大部分时间client一直处于阻塞状态并且不会恢复。 使用非阻塞clientserver重新启动更容易被容忍。

    产量

    [user@machine:~] python onefifo.py
    Waiting for client to open FIFO...
    Waiting for client to open FIFO...
    Writing line to FIFO...           
    Done.
    Waiting for client to open FIFO...
    Writing line to FIFO...
    Done.
    
    [user@machine:~] python onefifo.py -c
    one line
    one line
    

    笔记

    启动时,如果server检测到FIFO已经存在,则将其删除。 这是通知clients server已重新启动的最简单方法。 该通知通常会被client的阻止版本忽略。


    那么,据我所知,这实际上并不是一个FIFO(队列) - 它是一个单一的变量。 如果你设置了一个最大大小为1的队列或者管道,我想这可能是可实现的,但是似乎在其中一个进程中的一个对象上使用Lock效果更好,而另一个进程通过代理对象。 读者在读取时会将其设置为None ,并且每次写入时作者都会覆盖内容。

    您可以通过将对象的代理和锁的代理作为所有相关进程的参数传递给其他进程。 为了更方便地获得它,你可以使用一个Manager ,它提供了一个可以传入代理的单个对象,它包含并提供了你想要放入其中的任何其他对象(包括锁)的代理。 这个答案提供了正确使用管理器将对象传递到新流程的有用示例。

    链接地址: http://www.djcxy.com/p/83159.html

    上一篇: Nonblocking fifo

    下一篇: Does a python fifo have to be read with os.open?