ActiveMQ一些消费者如果到达生产者后面,他们不会完成任务
我只是从ActiveMQ开始,我似乎有一个奇怪的问题。 (来源如下)
有两种情况
消费者连接到代理,等待队列中的任务。 制片人稍后到达,放下任务清单,并由不同的消费者正确接受并执行。 这工作正常,我也模拟它。
生产者首先连接,删除任务列表。 目前没有消费者连接。 现在当我们说3个消费者--C1,C2和C3连接到经纪人时(按照这个顺序),我发现只有C1接受并完成了生产者放弃的任务。 C2和C3保持空闲状态。 为什么会发生?
关于第二种情况,我还注意到了另外一件事情 - 如果生产者继续在队列中放置任务,则C2和C3将完成任务,但如果生产者在之前(如前所述)放弃任务,则C2和C3不会做任何事情。
生产者代码
package com.activemq.apps; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import com.commons.helpers.Maths; public class Publisher implements MessageListener { private static String _URL; private static String _TOPIC_PUBLISH; private static String _TOPIC_CONSUME; public Publisher (String URL, String TOPIC) { _URL = URL; _TOPIC_PUBLISH = TOPIC + "_REQUESTS"; _TOPIC_CONSUME = TOPIC + "_RESPONSES"; } public void initialize() { try { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_URL); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destinationProducer = session.createQueue(_TOPIC_PUBLISH); Destination destinationConsumers = session.createQueue(_TOPIC_CONSUME); MessageProducer producer = session.createProducer(destinationProducer); MessageConsumer consumer = session.createConsumer(destinationConsumers); consumer.setMessageListener(this); int count = 0; System.out.println("Sending requests"); while (true) { int randomSleepTime = Maths.rand(1000, 5000); String messageToSend = count + "_" + randomSleepTime; TextMessage message = session.createTextMessage(messageToSend); producer.send(message); System.out.println("Job #" + count + " | " + (randomSleepTime/1000) + "s"); if (count++%10 == 0) Thread.sleep(10000); } // System.out.println("Waiting for responses"); // connection.close(); } catch (JMSException ex) { ex.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; try { String response = msg.getText(); String[] responseSplit = response.split("_"); String clientId = responseSplit[1]; String count = responseSplit[0]; System.out.println("Got response from " + clientId + " Job #" + count); } catch (JMSException e) { e.printStackTrace(); } } } }
消费者代码
package com.activemq.apps; import java.util.UUID; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer implements MessageListener { private static String _URL; private static String _TOPIC_PUBLISH; private static String _TOPIC_CONSUME; private static String _CLIENTID; private MessageProducer producer; private Session session; public Consumer (String URL, String TOPIC) { _URL = URL; _TOPIC_PUBLISH = TOPIC + "_RESPONSES"; _TOPIC_CONSUME = TOPIC + "_REQUESTS"; } public void initialize() { try { _CLIENTID = UUID.randomUUID().toString(); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_URL); Connection connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destinationProducer = session.createQueue(_TOPIC_PUBLISH); Destination destinationConsumers = session.createQueue(_TOPIC_CONSUME); producer = session.createProducer(destinationProducer); MessageConsumer consumer = session.createConsumer(destinationConsumers); consumer.setMessageListener(this); System.out.println("Client: " + _CLIENTID + "nWaiting to pick up tasks"); // connection.close(); } catch (JMSException ex) { ex.printStackTrace(); } } @Override public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; try { String[] messageSplits = msg.getText().split("_"); String count = messageSplits[0]; String timeString = messageSplits[1]; int sleepFor = Integer.parseInt(timeString); System.out.println("Job #" + count + " | Sleeping for " + (sleepFor/1000) + "s"); Thread.sleep(sleepFor); TextMessage sendToProducer = session.createTextMessage(count + "_" + _CLIENTID); producer.send(sendToProducer); } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
你提到过
现在让我们说3个消费者 - C1,C2和C3连接到经纪人(按照这个顺序)
由于C1首先连接,因此在连接后立即开始获取队列中的所有消息。 这是预料之中的。 所以我没有看到任何问题。 C2,C3不是空闲的,但C1在C2和C3可以之前已经获得了消息。
我不确定生产者发送了多少条消息。 我假设消息的数量较少。 看看你期望什么尝试从制片人发送许多消息,如成千上万或数百万,然后启动消费者。 大量的消息是主观的,取决于内存,网络和其他资源。 你可能会看到你的期望。
我不认为这里有什么奇怪的。 队列表示应该只有一个用户的P2P模式。 在我们的案例中,我们有3个消费者,但并不禁止,但不能保证消费者将收到消息的任何特定顺序。
链接地址: http://www.djcxy.com/p/33051.html上一篇: ActiveMQ some consumers not picking up tasks if they arrive after producer