RPC using EventMachine & RabbitMQ
I've been starting to play around with RabbitMQ RPC sample code provided in AMQP gem doc, trying to write very simple code performing synchronous remote calls:
require "amqp"
module RPC
class Base
include EM::Deferrable
def rabbit(rabbit_callback)
rabbit_loop = Proc.new {
AMQP.connect do |connection|
AMQP::Channel.new(connection) do |channel|
channel.queue("rpc.queue", :exclusive => false, :durable => true) do |requests_queue|
self.callback(&rabbit_callback)
self.succeed(connection, channel, requests_queue)
end # requests_queue
end # AMQP.channel
end # AMQP.connect
Signal.trap("INT") { connection.close { EM.stop } }
Signal.trap("TERM") { connection.close { EM.stop } }
}
if !EM.reactor_running?
EM.run do
rabbit_loop.call
end
else
rabbit_loop.call
end
end
end
class Server < Base
def run
server_loop = Proc.new do |connection, channel, requests_queue|
consumer = AMQP::Consumer.new(channel, requests_queue).consume
consumer.on_delivery do |metadata, payload|
puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..."
channel.default_exchange.publish(Time.now.to_s,
:routing_key => metadata.reply_to,
:correlation_id => metadata.message_id,
:mandatory => true)
metadata.ack
end
end
rabbit(server_loop)
end
end
class Client < Base
def sync_push(request)
result = nil
sync_request = Proc.new do |connection, channel, requests_queue|
message_id = Kernel.rand(10101010).to_s
response_queue = channel.queue("", :exclusive => true, :auto_delete => true)
response_queue.subscribe do |headers, payload|
if headers.correlation_id == message_id
result = payload
connection.close { EM.stop }
end
end
EM.add_timer(0.1) do
puts "[request] Sending a request...#{request} with id #{message_id}"
channel.default_exchange.publish(request,
:routing_key => requests_queue.name,
:reply_to => response_queue.name,
:message_id => message_id)
end
end
rabbit(sync_request)
result
end
end
end
The idea is pretty simple: I want to have a message queue always ready (this is handled by rabbit
method). Whenever the client wants to send a request, it starts by creating a temporary queue for the response along with a message id; it then publishes the request to the main message queue and waits for a response with the same message id in the temporary queue in order to know when the answer for this specific request is ready. I guess that the message_id
is somehow redundant with the temporary queue (as the queue should also be unique).
I now run dummy script using this client/server code
# server session
>> server = RPC::Server.new
=> #<RPC::Server:0x007faaa23bb5b0>
>> server.run
Updating client properties
[requests] Got a request 3315740. Sending a reply to amq.gen-QCv8nP2dI5Qd6bg2Q1Xhk0...
and
# client session
>> client = RPC::Client.new
=> #<RPC::Client:0x007ffb6be6aed8>
>> client.sync_push "test 1"
Updating client properties
[request] Sending a request...test 1 with id 3315740
=> "2012-11-02 21:58:45 +0100"
>> client.sync_push "test 2"
AMQ::Client::ConnectionClosedError: Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007ffb6b9c83d0 @payload="x002x00nx00x00x00fx00x00x00x00", @channel=1>
There are two points I really don't understand:
Client
code, why do I have to call EM.add_timer
if I want my message to actually be published? And why using EM.next_tick
doesn't work? My understanding is that "everything" is supposed to be "ready" when publish is called here. There sadly is very few code available online dealing with EM/AMQP so any help will be deeply appreciated! Any comment regarding the efficiency of this would also be much appreciated.
Digging documentation, I finally found that I actually needed the once_declared
callback to be sure that the queue is ready when the client start using it.
Regarding the connection problem, it seems that, somehow, using EM::Deferrable
causes issues so the (quite unsatisfactory) solution simply consists in not including EM::Deferrable
.
require "amqp"
module RPC
module Base
def rabbit(rabbit_callback)
rabbit_loop = Proc.new {
AMQP.start do |connection|
AMQP::Channel.new(connection) do |channel|
channel.queue("rpc.queue", :exclusive => false, :durable => true) do |requests_queue|
requests_queue.once_declared do
rabbit_callback.call(connection, channel, requests_queue)
end
end
end
end
Signal.trap("INT") { AMQP.stop { EM.stop } }
Signal.trap("TERM") { AMQP.stop { EM.stop } }
}
if !EM.reactor_running?
@do_not_stop_reactor = false
EM.run do
rabbit_loop.call
end
else
@do_not_stop_reactor = true
rabbit_loop.call
end
end
end
class Server
include Base
def run
server_loop = Proc.new do |connection, channel, requests_queue|
consumer = AMQP::Consumer.new(channel, requests_queue).consume
consumer.on_delivery do |metadata, payload|
puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..."
channel.default_exchange.publish(Time.now.to_s,
:routing_key => metadata.reply_to,
:correlation_id => metadata.message_id,
:mandatory => true)
metadata.ack
end
end
rabbit(server_loop)
end
end
class Client
include Base
def sync_push(request)
result = nil
sync_request = Proc.new do |connection, channel, requests_queue|
message_id = Kernel.rand(10101010).to_s
response_queue = channel.queue("", :exclusive => true, :auto_delete => true)
response_queue.subscribe do |headers, payload|
if headers.correlation_id == message_id
result = payload
AMQP.stop { EM.stop unless @do_not_stop_reactor }
end
end
response_queue.once_declared do
puts "[request] Sending a request...#{request} with id #{message_id}"
channel.default_exchange.publish(request,
:routing_key => requests_queue.name,
:reply_to => response_queue.name,
:message_id => message_id)
end
end
rabbit(sync_request)
result
end
end
end
链接地址: http://www.djcxy.com/p/61220.html
上一篇: RabbitMQ RPC教程查询