RabbitMQ中带有工作队列场景的通道闲置
基于这个例子我的架构:
RabbitMQ - 工作队列
建立:
我正在研究导致速度下降的实施中的瓶颈。 因为我使用noAck来重新排队失败的工人。 为了在我的工作线程中均匀地启用这个功能,我将prefetch设置为1.查看这个问题:RabbitMQ工作队列阻止消费者 - 他们看到我在下面的屏幕截图中看到的内容:
为了确保工作人员一次只能分配一条消息,我需要将预取设置为1,但也有人说这会导致工作人员顺序工作而不是并行工作。
跑步在渠道层面究竟意味着什么? 我看到队列和连接正常运行,但单个通道(每个线程一个)空闲。
编辑#1:关于将连接池传递给RabbitMQ连接的说明看起来很有希望。 https://www.rabbitmq.com/api-guide.html#consumer-thread-pool我使用Spring AMQP,但我认为可以在这里使用类似的方法:
/**
* Configure a large thread pool for concurrent channels on the physical Connection
*/
@Bean
public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory() {
logger.info("Configuring connection factory");
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.rabbitMQProperties.getAddresses());
cf.setUsername(this.rabbitMQProperties.getUsername());
cf.setPassword(this.rabbitMQProperties.getPassword());
cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost());
//configure a large thread pool for the connection thread
int threads = 30;
logger.info(String.format("Configuring thread pool with %d threads", threads));
ExecutorService connectionPool = Executors.newFixedThreadPool(threads);
cf.setExecutor(connectionPool);
logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString()));
logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize()));
logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize()));
return cf;
}
@Bean
AmqpTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.CachingConnectionFactory connectionFactory){
AmqpTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
在AMQP Spring春季中,单个物理TCP / IP连接的默认线程池默认为5个线程:
Spring AMQP
此外,在撰写本文时,rabbitmq-client库默认为每个连接(5个线程)创建一个固定的线程池。 在使用大量连接时,应考虑在CachingConnectionFactory上设置自定义执行程序。 然后,所有连接将使用相同的执行程序,并且它的线程可以共享。 执行程序的线程池应该是无界的,或者为预期的利用率设置适当的值(通常每个连接至少有一个线程)。 如果在每个连接上创建多个通道,则池大小将影响并发性,因此变量(或简单缓存)的线程池执行程序将是最合适的。
我能够通过改变分配给RabbitMQ连接池的线程数来复制这个数据:
/**
* Expand the number of concurrent threads for a single RabbitMQ connection
* http://docs.spring.io/spring-amqp/reference/htmlsingle/
* Also, at the time of writing, the rabbitmq-client library creates a fixed thread pool for each connection (5 threads) by default.
* When using a large number of connections, you should consider setting a custom executor on the CachingConnectionFactory.
*/
@Bean(name="channelPool")
@Scope("singleton")
MigrationPool rabbitConnectionPool(){
int channels = 50;
logger.info(String.format("Configuring connection pool with %d threads", channels));
return new MigrationPool(channels, channels, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* Configure a large thread pool for concurrent channels on the physical Connection
*/
@Bean
public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory(@Qualifier("channelPool") MigrationPool connectionPool) {
logger.info("Configuring connection factory");
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.rabbitMQProperties.getAddresses());
cf.setUsername(this.rabbitMQProperties.getUsername());
cf.setPassword(this.rabbitMQProperties.getPassword());
cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost());
cf.setExecutor(connectionPool);
logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString()));
logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize()));
logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize()));
logger.info(String.format("MQ thread pool: %d threads", connectionPool.getMaximumPoolSize()));
return cf;
}
在上面的代码中,我有每个连接的线程数量镜像虚拟通道的数量,即每个虚拟RabbitMQ通道的一个真实物理线程,因为每个通道都引用一个工作线程,每个线程处理一条消息。 这会导致通道不再阻塞默认的5个连接,而是充分利用扩展的线程数量:
操作可用于RabbitMQ连接的线程数将显示通道阻塞。 例如,设置为10个线程并打开50个通道。
链接地址: http://www.djcxy.com/p/34103.html上一篇: Channels idling in RabbitMQ with work queue scenario
下一篇: rabbitmq amqp version negotiation failing with Qpid 6.0.0