RabbitMQ中带有工作队列场景的通道闲置

基于这个例子我的架构:

RabbitMQ - 工作队列

建立:

  • 工人一次只能收到一条消息
  • 每个工人下载一个文件,需要几秒钟
  • 一旦工作人员成功下载文档,它会确认该消息
  • 如果工作人员未能下载文档,则不会删除重新排队的消息(最多三次重新尝试)
  • 我正在研究导致速度下降的实施中的瓶颈。 因为我使用noAck来重新排队失败的工人。 为了在我的工作线程中均匀地启用这个功能,我将prefetch设置为1.查看这个问题:RabbitMQ工作队列阻止消费者 - 他们看到我在下面的屏幕截图中看到的内容:

    哎/秒

    渠道

    为了确保工作人员一次只能分配一条消息,我需要将预取设置为1,但也有人说这会导致工作人员顺序工作而不是并行工作。

    跑步在渠道层面究竟意味着什么? 我看到队列和连接正常运行,但单个通道(每个线程一个)空闲。

    编辑#1:关于将连接池传递给RabbitMQ连接的说明看起来很有希望。 https://www.rabbitmq.com/api-guide.html#consumer-thread-pool我使用Spring AMQP,但我认为可以在这里使用类似的方法:

         /**
         * Configure a large thread pool for concurrent channels on the physical Connection
         */
        @Bean
        public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory() {
            logger.info("Configuring connection factory");
            CachingConnectionFactory cf = new CachingConnectionFactory();
            cf.setAddresses(this.rabbitMQProperties.getAddresses());
            cf.setUsername(this.rabbitMQProperties.getUsername());
            cf.setPassword(this.rabbitMQProperties.getPassword());
            cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost());
            //configure a large thread pool for the connection thread
            int threads = 30;
            logger.info(String.format("Configuring thread pool with %d threads", threads));
            ExecutorService connectionPool = Executors.newFixedThreadPool(threads);
            cf.setExecutor(connectionPool);
            logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString()));
            logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize()));
            logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize()));
            return cf;
        }
    
        @Bean
        AmqpTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.CachingConnectionFactory connectionFactory){
            AmqpTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    

    在AMQP Spring春季中,单个物理TCP / IP连接的默认线程池默认为5个线程:

    Spring AMQP

    此外,在撰写本文时,rabbitmq-client库默认为每个连接(5个线程)创建一个固定的线程池。 在使用大量连接时,应考虑在CachingConnectionFactory上设置自定义执行程序。 然后,所有连接将使用相同的执行程序,并且它的线程可以共享。 执行程序的线程池应该是无界的,或者为预期的利用率设置适当的值(通常每个连接至少有一个线程)。 如果在每个连接上创建多个通道,则池大小将影响并发性,因此变量(或简单缓存)的线程池执行程序将是最合适的。

    我能够通过改变分配给RabbitMQ连接池的线程数来复制这个数据:

    /**
         * Expand the number of concurrent threads for a single RabbitMQ connection
         * http://docs.spring.io/spring-amqp/reference/htmlsingle/
         * Also, at the time of writing, the rabbitmq-client library creates a fixed thread pool for each connection (5 threads) by default. 
         * When using a large number of connections, you should consider setting a custom executor on the CachingConnectionFactory.
         */
        @Bean(name="channelPool")
        @Scope("singleton")
        MigrationPool rabbitConnectionPool(){
            int channels = 50;
            logger.info(String.format("Configuring connection pool with %d threads", channels));
            return new MigrationPool(channels, channels, 0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>());
        }
    
        /**
         * Configure a large thread pool for concurrent channels on the physical Connection
         */
        @Bean
        public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory(@Qualifier("channelPool") MigrationPool connectionPool) {
            logger.info("Configuring connection factory");
            CachingConnectionFactory cf = new CachingConnectionFactory();
            cf.setAddresses(this.rabbitMQProperties.getAddresses());
            cf.setUsername(this.rabbitMQProperties.getUsername());
            cf.setPassword(this.rabbitMQProperties.getPassword());
            cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost());
            cf.setExecutor(connectionPool);
            logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString()));
            logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize()));
            logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize()));
            logger.info(String.format("MQ thread pool: %d threads", connectionPool.getMaximumPoolSize()));
            return cf;
        }
    

    在上面的代码中,我有每个连接的线程数量镜像虚拟通道的数量,即每个虚拟RabbitMQ通道的一个真实物理线程,因为每个通道都引用一个工作线程,每个线程处理一条消息。 这会导致通道不再阻塞默认的5个连接,而是充分利用扩展的线程数量:

    渠道不再阻塞

    操作可用于RabbitMQ连接的线程数将显示通道阻塞。 例如,设置为10个线程并打开50个通道。

    链接地址: http://www.djcxy.com/p/34103.html

    上一篇: Channels idling in RabbitMQ with work queue scenario

    下一篇: rabbitmq amqp version negotiation failing with Qpid 6.0.0