ActiveMQ一些消费者如果到达生产者后面,他们不会完成任务

我只是从ActiveMQ开始,我似乎有一个奇怪的问题。 (来源如下)

有两种情况

  • 消费者连接到代理,等待队列中的任务。 制片人稍后到达,放下任务清单,并由不同的消费者正确接受并执行。 这工作正常,我也模拟它。

  • 生产者首先连接,删除任务列表。 目前没有消费者连接。 现在当我们说3个消费者--C1,C2和C3连接到经纪人时(按照这个顺序),我发现只有C1接受并完成了生产者放弃的任务。 C2和C3保持空闲状态。 为什么会发生?

  • 关于第二种情况,我还注意到了另外一件事情 - 如果生产者继续在队列中放置任务,则C2和C3将完成任务,但如果生产者在之前(如前所述)放弃任务,则C2和C3不会做任何事情。

    生产者代码

    package com.activemq.apps;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import com.commons.helpers.Maths;
    
    public class Publisher implements MessageListener {
    
        private static String _URL;
        private static String _TOPIC_PUBLISH;
        private static String _TOPIC_CONSUME;
    
        public Publisher (String URL, String TOPIC) {
    
            _URL = URL;
            _TOPIC_PUBLISH = TOPIC + "_REQUESTS";
            _TOPIC_CONSUME = TOPIC + "_RESPONSES";
    
        }
    
        public void initialize() {
    
            try
            {
    
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_URL);
                Connection connection = connectionFactory.createConnection();
                connection.start();
    
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                Destination destinationProducer = session.createQueue(_TOPIC_PUBLISH);
                Destination destinationConsumers = session.createQueue(_TOPIC_CONSUME);
    
                MessageProducer producer = session.createProducer(destinationProducer);
                MessageConsumer consumer = session.createConsumer(destinationConsumers);
    
                consumer.setMessageListener(this);
    
                int count = 0;
    
                System.out.println("Sending requests");
    
                while (true)
                {
                    int randomSleepTime = Maths.rand(1000, 5000);
    
                    String messageToSend = count + "_" + randomSleepTime;
    
                    TextMessage message = session.createTextMessage(messageToSend);
    
                    producer.send(message);
    
                    System.out.println("Job #" + count + " | " + (randomSleepTime/1000) + "s");
    
                    if (count++%10 == 0)
                        Thread.sleep(10000);
    
                }
    
    //          System.out.println("Waiting for responses");
    
    //          connection.close();
            }
    
            catch (JMSException ex)
            {
                ex.printStackTrace();
            }
    
            catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    
        @Override
        public void onMessage(Message message) {
    
            if (message instanceof TextMessage)
            {
                TextMessage msg = (TextMessage) message;
    
                try {
    
                    String response = msg.getText();
                    String[] responseSplit = response.split("_");
    
                    String clientId = responseSplit[1];
                    String count = responseSplit[0];
    
                    System.out.println("Got response from " + clientId + " Job #" + count);
                } 
    
                catch (JMSException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    

    消费者代码

    package com.activemq.apps;
    
    import java.util.UUID;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Consumer implements MessageListener {
    
        private static String _URL;
        private static String _TOPIC_PUBLISH;
        private static String _TOPIC_CONSUME;
        private static String _CLIENTID;
    
        private MessageProducer producer;
        private Session session;
    
        public Consumer (String URL, String TOPIC) {
    
            _URL = URL;
            _TOPIC_PUBLISH = TOPIC + "_RESPONSES";
            _TOPIC_CONSUME = TOPIC + "_REQUESTS";
    
        }
    
        public void initialize() {
    
            try
            {
    
                _CLIENTID = UUID.randomUUID().toString();
    
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_URL);
                Connection connection = connectionFactory.createConnection();
                connection.start();
    
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                Destination destinationProducer = session.createQueue(_TOPIC_PUBLISH);
                Destination destinationConsumers = session.createQueue(_TOPIC_CONSUME);
    
                producer = session.createProducer(destinationProducer);
                MessageConsumer consumer = session.createConsumer(destinationConsumers);
    
                consumer.setMessageListener(this);
    
                System.out.println("Client: " + _CLIENTID + "nWaiting to pick up tasks");
    
    //          connection.close();
            }
    
            catch (JMSException ex)
            {
                ex.printStackTrace();
            }
    
        }
    
        @Override
        public void onMessage(Message message) {
    
            if (message instanceof TextMessage)
            {
                TextMessage msg = (TextMessage) message;
    
                try 
                {
    
                    String[] messageSplits = msg.getText().split("_");
    
                    String count = messageSplits[0];
                    String timeString = messageSplits[1];
    
                    int sleepFor = Integer.parseInt(timeString);
    
                    System.out.println("Job #" + count + " | Sleeping for " + (sleepFor/1000) + "s");
    
                    Thread.sleep(sleepFor);
    
                    TextMessage sendToProducer = session.createTextMessage(count + "_" + _CLIENTID);
    
                    producer.send(sendToProducer);
                } 
    
                catch (JMSException e) {
                    e.printStackTrace();
                } 
    
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    

    你提到过

    现在让我们说3个消费者 - C1,C2和C3连接到经纪人(按照这个顺序)

    由于C1首先连接,因此在连接后立即开始获取队列中的所有消息。 这是预料之中的。 所以我没有看到任何问题。 C2,C3不是空闲的,但C1在C2和C3可以之前已经获得了消息。

    我不确定生产者发送了多少条消息。 我假设消息的数量较少。 看看你期望什么尝试从制片人发送许多消息,如成千上万或数百万,然后启动消费者。 大量的消息是主观的,取决于内存,网络和其他资源。 你可能会看到你的期望。


    我不认为这里有什么奇怪的。 队列表示应该只有一个用户的P2P模式。 在我们的案例中,我们有3个消费者,但并不禁止,但不能保证消费者将收到消息的任何特定顺序。

    链接地址: http://www.djcxy.com/p/33051.html

    上一篇: ActiveMQ some consumers not picking up tasks if they arrive after producer

    下一篇: Exception in apache http client