RabbitMQ + kombu: write/read to one

I am new to working with message exchange and met problem finding proper manual for the task.

I need to organize pool of queues so, that:

  • Producer create some random empty queue and write there all the pack of messages (100 messages usually).

  • Consumer find non-empty and non-locked queue and read from it till it's empty and then delete it and look for next one.

  • So my task is to work with messages as packs, I understand how to produce and consume using same key in one queue, but can't find how to work with the pool of queues.

    We can have several producers and consumers run in parallel, but there is no matter which of them send to whom. We don't need and ever can't link particular producers with particular consumer.

    General task: we have lot of clients to receive push-notifications, we group pushes by some parameters to process later as group, so such group should be in one queue in RabbitMQ to be produced and consumed as a group, but each group is independent from other groups.

    Big thanks to Hannu for the help: key idea of his easy and robust solution that we can have one persistant queue with known name where producer will write names of created queues and consumer will read these names from there.

    To make his solution more readable and easy work with in future in my personal task, I have divided publish_data() in producer into two function - one make random queue and write it to control_queue another receive this random_queue and fill it with messages. Similar idea is good for consumer - one function to process queue, another will be called for process message itself.


    I have done something like this but with Pika. I had to clean and kombufy an old code snippet for the examples. It is probably not very kombuish (this is my absolutely first code snippet written using it) but this is how I would solve it. Basically I would set up a control queue with a known name.

    Publishers will create a random queue name for a pack of messages, dump N messages to it (in my case numbers 1-42) and then post the queue name to the control queue. A consumer then receives this queue name, binds to it, reads messages until queue is empty and then deletes the queue.

    This keeps things relatively simple, as publishers do not need to figure out where they are allowed to publish their groups of data (every queue is new with a random name). Receivers do not need to worry about timeouts or "all done" -messages, as a receiver would receive a queue name only when a group of data has been written to the queue and every message is there waiting.

    There is also no need to tinker with locks or signalling or anything else that would complicate things. You can have as many consumers and producers as you want. And of course using exchanges and routing keys there could be different sets of consumers for different tasks etc.

    Publisher

    from kombu import Connection
    import uuid
    from time import sleep
    def publish_data(conn):
        random_name= "q" + str(uuid.uuid4()).replace("-", "")
        random_queue = conn.SimpleQueue(random_name)
        for i in xrange(0, 42):
            random_queue.put(i)
        random_queue.close()
        return random_name
    
    
    with Connection('amqp://guest:guest@localhost:5672//') as conn:
        control_queue = conn.SimpleQueue('control_queue')
        _a = 0
        while True:
            y_name = publish_data(conn)
            message = y_name
            control_queue.put(message)
            print('Sent: {0}'.format(message))
            _a += 1
            sleep(0.3)
            if _a > 20:
                break
    
        control_queue.close()
    

    Consumer

    from Queue import Empty
    
    from kombu import Connection, Queue
    
    
    def process_msg(foo):
        print str(foo)
        with Connection("amqp://guest:guest@localhost:5672//") as _conn:
            sub_queue = _conn.SimpleQueue(str(foo))
            while True:
                try:
                    _msg = sub_queue.get(block=False)
                    print _msg.payload
                    _msg.ack()
                except Empty:
                    break
            sub_queue.close()
            chan = _conn.channel()
            dq = Queue(name=str(foo), exchange="")
            bdq = dq(chan)
            bdq.delete()
    
    
    with Connection('amqp://guest:guest@localhost:5672//') as conn:
        rec = conn.SimpleQueue('control_queue')
        while True:
            msg = rec.get(block=True)
            entry = msg.payload
            msg.ack()
            process_msg(entry)
    
    链接地址: http://www.djcxy.com/p/46370.html

    上一篇: 带有ArrayAdapter和ViewHolder的ListView将图标添加到错误的项目中

    下一篇: RabbitMQ + kombu:写入/读取到一个