AnyEvent::RabbitMQ issues with closed channels
I'm writing a master program for publishing message into a message queue (RabbitMQ). The program is written in Perl 5 and is using AnyEvent::RabbitMQ for the communication to RabbitMQ.
The following minimal example (for the issue I ran into) will fail on a second command send via the same channel with the error "Channel closed".
use strictures 2;
use AnyEvent::RabbitMQ;
main();
############################################################################
sub main {
_log( debug => 'main' );
my $condvar = AnyEvent->condvar;
my $ar = AnyEvent::RabbitMQ->new;
$ar->load_xml_spec;
_log( debug => 'Connecting to RabbitMQ...' );
$ar->connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0,
on_success => sub { _on_connect_success( $condvar, $ar, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
$condvar->recv;
$ar->close;
return;
}
############################################################################
sub _on_connect_success {
my ( $condvar, $ar, $new_ar ) = @_;
_log( debug => 'Connected to RabbitMQ.' );
_open_channel( $condvar, $new_ar );
return;
}
############################################################################
sub _open_channel {
my ( $condvar, $ar ) = @_;
_log( debug => 'Opening RabbitMQ channel...' );
$ar->open_channel(
on_success => sub { _on_open_channel_success( $condvar, $ar, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_open_channel_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Opened RabbitMQ channel.' );
_declare_queue( $condvar, $ar, $channel );
return;
}
############################################################################
sub _declare_queue {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declaring RabbitMQ queue...' );
$channel->declare_queue(
queue => 'test',
auto_delete => 1,
passive => 0,
durable => 0,
exclusive => 0,
no_ack => 1,
ticket => 0,
on_success =>
sub { _on_declare_queue_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_declare_queue_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declared RabbitMQ queue.' );
_bind_queue( $condvar, $ar, $channel );
return;
}
############################################################################
sub _bind_queue {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binding RabbitMQ queue...' );
$channel->bind_queue(
queue => 'test',
exchange => '',
routing_key => '',
on_success => sub { _on_bind_queue_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_bind_queue_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binded RabbitMQ queue.' );
_log( info => 'Master ready to publish messages.' );
_publish_message( $condvar, $ar, $channel, 'Hello, world!' );
return;
}
############################################################################
sub _publish_message {
my ( $condvar, $ar, $channel, $message ) = @_;
_log( debug => "Publishing RabbitMQ message ($message)..." );
$channel->publish(
queue => 'test',
exchange => '',
routing_key => '',
body => $message,
header => {},
mandatory => 0,
immediate => 0,
on_success =>
sub { _on_publish_message_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
on_ack => sub { _error( $condvar, $ar, 'ack', @_ ) },
on_nack => sub { _error( $condvar, $ar, 'nack', @_ ) },
);
return;
}
############################################################################
sub _on_publish_message_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => "Published RabbitMQ message." );
sleep 1;
_publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
return;
}
############################################################################
sub _error {
my ( $condvar, $ar, $type, @error ) = @_;
_log( error => sprintf '%s - %s', $type, join ', ', @error );
$condvar->send( $condvar, $ar, $type, @error );
return;
}
############################################################################
sub _log {
my ( $level, $message ) = @_;
my @time = gmtime time;
$time[5] += 1900;
$time[4] += 1;
my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00', @time[ 5, 4, 3, 2, 1, 0 ];
my @caller0 = caller(0);
my @caller1 = caller(1);
my $subroutine = $caller1[3];
$subroutine =~ s/^$caller0[0]:://;
print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])n";
return;
}
This program should:
This program (master program) should not consume messages. There are other programs out there to do this job.
The minimal example (see above) will produce the following output:
2015-08-12T13:02:07+00:00 [debug] main at minimal.pl line 9 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connecting to RabbitMQ... at minimal.pl line 13 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connected to RabbitMQ. at minimal.pl line 36 (_on_connect_success; from minimal.pl line 22)
2015-08-12T13:02:07+00:00 [debug] Opening RabbitMQ channel... at minimal.pl line 44 (_open_channel; from minimal.pl line 37)
2015-08-12T13:02:07+00:00 [debug] Opened RabbitMQ channel. at minimal.pl line 58 (_on_open_channel_success; from minimal.pl line 46)
2015-08-12T13:02:07+00:00 [debug] Declaring RabbitMQ queue... at minimal.pl line 66 (_declare_queue; from minimal.pl line 59)
2015-08-12T13:02:07+00:00 [debug] Declared RabbitMQ queue. at minimal.pl line 88 (_on_declare_queue_success; from minimal.pl line 76)
2015-08-12T13:02:07+00:00 [debug] Binding RabbitMQ queue... at minimal.pl line 96 (_bind_queue; from minimal.pl line 89)
2015-08-12T13:02:07+00:00 [error] failure - Channel closed at minimal.pl line 155 (_error; from minimal.pl line 102)
2015-08-12T13:02:07+00:00 [error] close - Net::AMQP::Frame::Method=HASH(0x38fe1c8) at minimal.pl line 155 (_error; from minimal.pl line 50)
Why does AnyEvent::RabbitMQ
or RabbitMQ itself closes the channel (not the connection or did I miss something)?
If you take a look at the RabbitMQ server logs you will see something like this:
{amqp_error,access_refused,"operation not permitted on the default exchange",'queue.bind'}
Apparently it doesn't let you bind a queue on the default exchange. So you need to declare and bind your own exchange first.
sub _declare_exchange {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declaring RabbitMQ exchange...' );
$channel->declare_exchange(
exchange => 'testest',
type => 'fanout',
on_success =>
sub { _on_declare_exchange_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_declare_exchange_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declared RabbitMQ exchange.' );
_bind_exchange( $condvar, $ar, $channel );
return;
}
############################################################################
sub _bind_exchange {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binding RabbitMQ exchange...' );
$channel->bind_exchange(
source => 'testest',
destination => 'testest',
routing_key => '',
on_success => sub { _on_bind_exchange_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
Once you've set those subs up, tell your program to use this custom exchange.
sub _on_open_channel_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Opened RabbitMQ channel.' );
$channel->confirm;
_declare_exchange( $condvar, $ar, $channel );
return;
}
The $channel->confirm
is necessary to make RabbitMQ answer with a confirmation when you send a message to the queue. If you do not do that, the success handler will never get called because there are no success responses coming back.
Then in your _bind_queue
you need to add the exchange to the bind_queue()
call.
$channel->bind_queue(
queue => 'test',
exchange => 'testest', # <-- here
routing_key => '',
# ...
);
The same needs to be done in the _publish_message
with the publish()
call. There you should also replace the on_ack
handler with something that actually deals with the acknowledgement. I think you intended to do that but had a copy/paste error1.
$channel->publish(
queue => 'test',
exchange => 'testest', # <-- here
routing_key => '',
# ...
on_ack => sub {
_on_publish_message_success( $condvar, $ar, $channel, @_ );
},
);
One more thing is that the sleep
call in _on_publish_message_success
is not a good idea when you are working with AnyEvent as that will stop the whole program. Use an AE::timer
instead.
my $t;
$t = AE::timer(1,0,sub {
_publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
undef $t;
});
Here is the full code with all the changes.
use strictures 2;
use AnyEvent::RabbitMQ;
main();
############################################################################
sub main {
_log( debug => 'main' );
my $condvar = AnyEvent->condvar;
my $ar = AnyEvent::RabbitMQ->new;
$ar->load_xml_spec;
_log( debug => 'Connecting to RabbitMQ...' );
$ar->connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/guest',
timeout => 1,
tls => 0,
on_success => sub { _on_connect_success( $condvar, $ar, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
$condvar->recv;
$ar->close;
return;
}
############################################################################
sub _on_connect_success {
my ( $condvar, $ar, $new_ar ) = @_;
_log( debug => 'Connected to RabbitMQ.' );
_open_channel( $condvar, $new_ar );
return;
}
############################################################################
sub _open_channel {
my ( $condvar, $ar ) = @_;
_log( debug => 'Opening RabbitMQ channel...' );
$ar->open_channel(
on_success => sub { _on_open_channel_success( $condvar, $ar, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_open_channel_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Opened RabbitMQ channel.' );
$channel->confirm;
_declare_exchange( $condvar, $ar, $channel );
return;
}
############################################################################
sub _declare_exchange {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declaring RabbitMQ exchange...' );
$channel->declare_exchange(
exchange => 'testest',
type => 'fanout',
on_success =>
sub { _on_declare_exchange_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_declare_exchange_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declared RabbitMQ exchange.' );
_bind_exchange( $condvar, $ar, $channel );
return;
}
############################################################################
sub _bind_exchange {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binding RabbitMQ exchange...' );
$channel->bind_exchange(
source => 'testest',
destination => 'testest',
routing_key => '',
on_success => sub { _on_bind_exchange_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_bind_exchange_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binded RabbitMQ exchange.' );
_declare_queue( $condvar, $ar, $channel );
return;
}
############################################################################
sub _declare_queue {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declaring RabbitMQ queue...' );
$channel->declare_queue(
queue => 'test',
auto_delete => 1,
passive => 0,
durable => 0,
exclusive => 0,
no_ack => 1,
ticket => 0,
on_success =>
sub { _on_declare_queue_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_declare_queue_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declared RabbitMQ queue.' );
_bind_queue( $condvar, $ar, $channel );
return;
}
############################################################################
sub _bind_queue {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binding RabbitMQ queue...' );
$channel->bind_queue(
queue => 'test',
exchange => 'testest',
routing_key => '',
on_success => sub { _on_bind_queue_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_bind_queue_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binded RabbitMQ queue.' );
_log( info => 'Master ready to publish messages.' );
_publish_message( $condvar, $ar, $channel, 'Hello, world!' );
return;
}
############################################################################
sub _publish_message {
my ( $condvar, $ar, $channel, $message ) = @_;
_log( debug => "Publishing RabbitMQ message ($message)..." );
$channel->publish(
queue => 'test',
exchange => 'testest',
routing_key => '',
body => $message,
header => {},
mandatory => 0,
immediate => 0,
on_success =>
sub { _on_publish_message_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
on_ack => sub {
_on_publish_message_success( $condvar, $ar, $channel, @_ );
# _error( $condvar, $ar, 'ack', @_ )
},
on_nack => sub { _error( $condvar, $ar, 'nack', @_ ) },
);
return;
}
############################################################################
sub _on_publish_message_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => "Published RabbitMQ message." );
my $t; $t=AE::timer(1,0,sub {
_publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
undef $t;
});
return;
}
############################################################################
sub _error {
my ( $condvar, $ar, $type, @error ) = @_;
_log( error => sprintf '%s - %s', $type, join ', ', @error );
$condvar->send( $condvar, $ar, $type, @error );
return;
}
############################################################################
sub _log {
my ( $level, $message ) = @_;
my @time = gmtime time;
$time[5] += 1900;
$time[4] += 1;
my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00', @time[ 5, 4, 3, 2, 1, 0 ];
my @caller0 = caller(0);
my @caller1 = caller(1);
my $subroutine = $caller1[3];
$subroutine =~ s/^$caller0[0]:://;
print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])n";
return;
}
1) In some places you need to buy your colleagues a beer for those :)
链接地址: http://www.djcxy.com/p/59238.html