Rabbitmq queue with process forking in PHP
I have simple queue worker based on standard AMQP Class from PHP. It works with RabbitMQ as a server. I have Queue Class for initialize AMQP connection wirh RabbitMQ. Everything works fine with code below:
$queue = new Queue('myQueue');
while($envelope = $queue->getEnvelope()) {
$command = unserialize($envelope->getBody());
if ($command instanceof QueueCommand) {
try {
if ($command->execute()) {
$queue->ack($envelope->getDeliveryTag());
}
} catch (Exception $exc) {
// an error occurred so do some processing to deal with it
}
}
}
However I wanted to fork queue command execution, but in this case queue goes endless with the first command over and over again. I can't acknowledge RabbitMQ that message was recieved with $queue->ack(); My forked version (simplified with only one child for testing sake) looks like this :
$queue = new Queue('myQueue');
while($envelope = $queue->getEnvelope()) {
$command = unserialize($envelope->getBody());
if ($command instanceof QueueCommand) {
$pid = pcntl_fork();
if ($pid) {
//parent proces
//wait for child
pcntl_waitpid($pid, $status, WUNTRACED);
if($status > 0) {
// an error occurred so do some processing to deal with it
} else {
//remove Command from queue
$queue->ack($envelope->getDeliveryTag());
}
} else {
//child process
try {
if ($command->execute()) {
exit(0);
}
} catch (Exception $exc) {
exit(1);
}
}
}
}
any help will be appreciated...
I finally solved the problem! I had to run ack command from child process, it works this way! This is correct code:
$queue = new Queue('myQueue');
while($envelope = $queue->getEnvelope()) {
$command = unserialize($envelope->getBody());
if ($command instanceof QueueCommand) {
$pid = pcntl_fork();
if ($pid) {
//parent proces
//wit for child
pcntl_waitpid($pid, $status, WUNTRACED);
if($status > 0) {
// an error occurred so do some processing to deal with it
} else {
// sucess
}
} else {
//child process
try {
if ($command->execute()) {
$queue->ack($envelope->getDeliveryTag());
exit(0);
}
} catch (Exception $exc) {
exit(1);
}
}
}
}
链接地址: http://www.djcxy.com/p/34166.html