在PHP中使用进程分叉的Rabbitmq队列
我有基于PHP的标准AMQP类的简单队列工作。 它与RabbitMQ一起作为服务器。 我有用于初始化AMQP连接的队列类,以及RabbitMQ。 一切工作正常与下面的代码:
$queue = new Queue('myQueue');
while($envelope = $queue->getEnvelope()) {
$command = unserialize($envelope->getBody());
if ($command instanceof QueueCommand) {
try {
if ($command->execute()) {
$queue->ack($envelope->getDeliveryTag());
}
} catch (Exception $exc) {
// an error occurred so do some processing to deal with it
}
}
}
不过,我想fork队列命令执行,但在这种情况下,队列中的第一个命令一遍又一遍地无止境。 我无法确认RabbitMQ收到的消息是$ queue-> ack(); 我的分支版本(为了测试而简化,只有一个孩子)看起来像这样:
$queue = new Queue('myQueue');
while($envelope = $queue->getEnvelope()) {
$command = unserialize($envelope->getBody());
if ($command instanceof QueueCommand) {
$pid = pcntl_fork();
if ($pid) {
//parent proces
//wait for child
pcntl_waitpid($pid, $status, WUNTRACED);
if($status > 0) {
// an error occurred so do some processing to deal with it
} else {
//remove Command from queue
$queue->ack($envelope->getDeliveryTag());
}
} else {
//child process
try {
if ($command->execute()) {
exit(0);
}
} catch (Exception $exc) {
exit(1);
}
}
}
}
任何帮助将不胜感激...
我终于解决了这个问题! 我不得不从子进程运行ack命令,它以这种方式工作! 这是正确的代码:
$queue = new Queue('myQueue');
while($envelope = $queue->getEnvelope()) {
$command = unserialize($envelope->getBody());
if ($command instanceof QueueCommand) {
$pid = pcntl_fork();
if ($pid) {
//parent proces
//wit for child
pcntl_waitpid($pid, $status, WUNTRACED);
if($status > 0) {
// an error occurred so do some processing to deal with it
} else {
// sucess
}
} else {
//child process
try {
if ($command->execute()) {
$queue->ack($envelope->getDeliveryTag());
exit(0);
}
} catch (Exception $exc) {
exit(1);
}
}
}
}
链接地址: http://www.djcxy.com/p/34165.html