Broadcasting message to all clients using Pika + sockjs
I'm new to realtime apps based on WebSockets, and stucked at one point. My app has following components:
The problem is that I can't figure out how to pass data received from queue to all clients. I've done a pub/sub exchange, so when user connects to server, new connection is established with RabbitMQ for every user, but that's not what I want. Below is what I've got as far.
common/pika_client.py :
import logging
import pika
from pika.adapters.tornado_connection import TornadoConnection
class PikaClient(object):
def __init__(self, exchange, host='localhost', port=5672, vhost=''):
# Default values
self.connected = False
self.connecting = False
self.connection = None
self.channel = None
self.host = host
self.port = port
self.vhost = vhost
self.exchange = exchange
# preparing logger
self.log = logging.getLogger(__name__)
self.set_log_level()
def set_log_level(self, log_level=logging.WARNING):
self.log.setLevel(log_level)
def connect(self):
self.log.info("CONNECTING")
if self.connecting:
self.log.info('%s: Already connecting to RabbitMQ' %
self.__class__.__name__)
return
self.log.info('%s: Connecting to RabbitMQ on localhost:5672' %
self.__class__.__name__)
self.connecting = True
param = pika.ConnectionParameters(
host=self.host,
port=self.port,
virtual_host=self.vhost
)
self.connection = TornadoConnection(param,
on_open_callback=self.on_connected)
self.connection.add_on_close_callback(self.on_closed)
def on_connected(self, connection):
self.log.info('%s: Connected to RabbitMQ on localhost:5672' %
self.__class__.__name__)
self.connected = True
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_channel_open(self, channel):
self.log.info('%s: Channel Open, Declaring Exchange %s' %
(self.__class__.__name__, self.exchange))
self.channel = channel
self.channel.exchange_declare(
exchange=self.exchange,
type="fanout",
callback=self.on_exchange_declared
)
def on_exchange_declared(self, frame):
self.log.info('%s: Exchange Declared, Declaring Queue' %
(self.__class__.__name__))
self.channel.queue_declare(exclusive=True,
callback=self.on_queue_declared)
def on_queue_declared(self, frame):
self.log.info('%s: Queue Declared, Binding Queue %s' %
(self.__class__.__name__, frame.method.queue))
self.queue_name = frame.method.queue
self.channel.queue_bind(
exchange=self.exchange,
queue=frame.method.queue,
callback=self.on_queue_bound
)
def on_queue_bound(self, frame):
self.log.info('%s: Queue Bound. To receive messages implement
on_queue_bound method' % self.__class__.__name__)
def on_closed(self, connection):
self.log.info('%s: Connection Closed' % self.__class__.__name__)
self.connected = False
self.connection = None
self.connecting = False
self.channel = None
self.connection = self.connect()
def add_message_handler(self, handler):
self.message_handler = handler
tracker.py
from sockjs.tornado import SockJSConnection
import settings
from common.pika_client import PikaClient
class QueueReceiver(PikaClient):
"""Receives messages from RabbitMQ """
def on_queue_bound(self, frame):
self.log.info('Consuming on queue %s' % self.queue_name)
self.channel.basic_consume(consumer_callback=self.message_handler,
queue=self.queue_name
)
class TrackerConnection(SockJSConnection):
def on_open(self, info):
self.queue = QueueReceiver('clt')
self.queue.add_message_handler(self.on_queue_message)
self.queue.set_log_level(settings.LOG_LEVEL)
self.queue.connect()
def on_queue_message(self, channel, method, header, body):
self.send(body)
self.queue.channel.basic_ack(delivery_tag=method.delivery_tag)
It works, but like I mentioned, I'd like to have only one connection to queue, receive messages, do some stuff and broadcast results to clients using broadcast() method. Thanks in advance for any help.
链接地址: http://www.djcxy.com/p/44770.html