ActiveMQ some consumers not picking up tasks if they arrive after producer

Im just getting starting with ActiveMQ and i seem to have a weird problem. (Source below)

There are 2 scenarios

  • Consumers connect to broker, waits for tasks on the queue. Producer arrives later, drops the list of tasks and they are rightly taken up by the different consumers and performed. This works fine and i have simulated it as well.

  • Producer connects first, drops the list of tasks. No consumers are connected at this point. Now when lets say 3 consumers - C1, C2 and C3 connect to the broker (in that order) i see that only C1 picks up and does the tasks that are dropped by the producer. C2 and C3 stay idle. Why does this happen?

  • I've also noticed 1 more thing in regards to the 2nd scenario -- If the producer keeps on dropping tasks inside the queue, C2 and C3 pick up tasks but if the producer has dropped tasks before (as mentioned) then C2 and C3 dont do anything.

    Producer code

    package com.activemq.apps;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import com.commons.helpers.Maths;
    
    public class Publisher implements MessageListener {
    
        private static String _URL;
        private static String _TOPIC_PUBLISH;
        private static String _TOPIC_CONSUME;
    
        public Publisher (String URL, String TOPIC) {
    
            _URL = URL;
            _TOPIC_PUBLISH = TOPIC + "_REQUESTS";
            _TOPIC_CONSUME = TOPIC + "_RESPONSES";
    
        }
    
        public void initialize() {
    
            try
            {
    
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_URL);
                Connection connection = connectionFactory.createConnection();
                connection.start();
    
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                Destination destinationProducer = session.createQueue(_TOPIC_PUBLISH);
                Destination destinationConsumers = session.createQueue(_TOPIC_CONSUME);
    
                MessageProducer producer = session.createProducer(destinationProducer);
                MessageConsumer consumer = session.createConsumer(destinationConsumers);
    
                consumer.setMessageListener(this);
    
                int count = 0;
    
                System.out.println("Sending requests");
    
                while (true)
                {
                    int randomSleepTime = Maths.rand(1000, 5000);
    
                    String messageToSend = count + "_" + randomSleepTime;
    
                    TextMessage message = session.createTextMessage(messageToSend);
    
                    producer.send(message);
    
                    System.out.println("Job #" + count + " | " + (randomSleepTime/1000) + "s");
    
                    if (count++%10 == 0)
                        Thread.sleep(10000);
    
                }
    
    //          System.out.println("Waiting for responses");
    
    //          connection.close();
            }
    
            catch (JMSException ex)
            {
                ex.printStackTrace();
            }
    
            catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    
        @Override
        public void onMessage(Message message) {
    
            if (message instanceof TextMessage)
            {
                TextMessage msg = (TextMessage) message;
    
                try {
    
                    String response = msg.getText();
                    String[] responseSplit = response.split("_");
    
                    String clientId = responseSplit[1];
                    String count = responseSplit[0];
    
                    System.out.println("Got response from " + clientId + " Job #" + count);
                } 
    
                catch (JMSException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    

    Consumer code

    package com.activemq.apps;
    
    import java.util.UUID;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Consumer implements MessageListener {
    
        private static String _URL;
        private static String _TOPIC_PUBLISH;
        private static String _TOPIC_CONSUME;
        private static String _CLIENTID;
    
        private MessageProducer producer;
        private Session session;
    
        public Consumer (String URL, String TOPIC) {
    
            _URL = URL;
            _TOPIC_PUBLISH = TOPIC + "_RESPONSES";
            _TOPIC_CONSUME = TOPIC + "_REQUESTS";
    
        }
    
        public void initialize() {
    
            try
            {
    
                _CLIENTID = UUID.randomUUID().toString();
    
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_URL);
                Connection connection = connectionFactory.createConnection();
                connection.start();
    
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                Destination destinationProducer = session.createQueue(_TOPIC_PUBLISH);
                Destination destinationConsumers = session.createQueue(_TOPIC_CONSUME);
    
                producer = session.createProducer(destinationProducer);
                MessageConsumer consumer = session.createConsumer(destinationConsumers);
    
                consumer.setMessageListener(this);
    
                System.out.println("Client: " + _CLIENTID + "nWaiting to pick up tasks");
    
    //          connection.close();
            }
    
            catch (JMSException ex)
            {
                ex.printStackTrace();
            }
    
        }
    
        @Override
        public void onMessage(Message message) {
    
            if (message instanceof TextMessage)
            {
                TextMessage msg = (TextMessage) message;
    
                try 
                {
    
                    String[] messageSplits = msg.getText().split("_");
    
                    String count = messageSplits[0];
                    String timeString = messageSplits[1];
    
                    int sleepFor = Integer.parseInt(timeString);
    
                    System.out.println("Job #" + count + " | Sleeping for " + (sleepFor/1000) + "s");
    
                    Thread.sleep(sleepFor);
    
                    TextMessage sendToProducer = session.createTextMessage(count + "_" + _CLIENTID);
    
                    producer.send(sendToProducer);
                } 
    
                catch (JMSException e) {
                    e.printStackTrace();
                } 
    
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    

    You have mentioned

    Now when lets say 3 consumers - C1, C2 and C3 connect to the broker (in that order)

    Since C1 connects first it starts getting all the messages on the queue immediately after it connects. That is expected. So i dont see any issue here. C2, C3 are not idle but C1 has got hold of the messages before C2 and C3 could.

    I am not sure how many messages were sent by the producer. I assume the number of messages are less. To see what you expect try sending many messages from the producer, like thousands or millions and then start the consumers. The high number of messages are subjective and depends on the memory, network and other resources. You might see what you are expecting.


    I dont think there is anything weird here. Queues represent P2P mode which is supposed to have only one consumer. In our case we have 3 consumers, it's not forbidden, but there is no guarantee of any specific order in which consumers will receive messages.

    链接地址: http://www.djcxy.com/p/33052.html

    上一篇: Netty(4.0.4)版本压缩/解压缩字符串消息错误

    下一篇: ActiveMQ一些消费者如果到达生产者后面,他们不会完成任务