Channels idling in RabbitMQ with work queue scenario
Basing my architecture off of this example:
RabbitMQ - Work Queues
Setup:
I'm looking into bottlenecks in my implementation that's causing slowdowns. Since I use noAck to re-queue failed workers. To enable this evenly across my worker threads I've set prefetch to 1.Looking at this question: RabbitMQ work queue is blocking consumers - They are seeing what I see in the below screenshots:
To make sure workers are only assigned a message at a time, I need the pre-fetch set to 1, but others say this causes workers to work sequentially rather than in parallel.
What does Running actually mean at the channel level? I see the queue and connection are running properly, but individual channels (one per thread) are idling.
EDIT #1: This note about passing a connection pool to the RabbitMQ connection looks promising. https://www.rabbitmq.com/api-guide.html#consumer-thread-pool I'm using Spring AMQP but I think a similar approach can be used here:
/**
* Configure a large thread pool for concurrent channels on the physical Connection
*/
@Bean
public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory() {
logger.info("Configuring connection factory");
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.rabbitMQProperties.getAddresses());
cf.setUsername(this.rabbitMQProperties.getUsername());
cf.setPassword(this.rabbitMQProperties.getPassword());
cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost());
//configure a large thread pool for the connection thread
int threads = 30;
logger.info(String.format("Configuring thread pool with %d threads", threads));
ExecutorService connectionPool = Executors.newFixedThreadPool(threads);
cf.setExecutor(connectionPool);
logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString()));
logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize()));
logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize()));
return cf;
}
@Bean
AmqpTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.CachingConnectionFactory connectionFactory){
AmqpTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
Apparantly in Spring AMQP the default thread pool for a single physical TCP/IP connection is defaulted to 5 threads:
Spring AMQP
Also, at the time of writing, the rabbitmq-client library creates a fixed thread pool for each connection (5 threads) by default. When using a large number of connections, you should consider setting a custom executor on the CachingConnectionFactory. Then, the same executor will be used by all connections and its threads can be shared. The executor's thread pool should be unbounded, or set appropriately for the expected utilization (usually, at least one thread per connection). If multiple channels are created on each connection then the pool size will affect the concurrency, so a variable (or simple cached) thread pool executor would be most suitable.
I was able to replicate this by changing the number of threads assigned to RabbitMQ's connection pool:
/**
* Expand the number of concurrent threads for a single RabbitMQ connection
* http://docs.spring.io/spring-amqp/reference/htmlsingle/
* Also, at the time of writing, the rabbitmq-client library creates a fixed thread pool for each connection (5 threads) by default.
* When using a large number of connections, you should consider setting a custom executor on the CachingConnectionFactory.
*/
@Bean(name="channelPool")
@Scope("singleton")
MigrationPool rabbitConnectionPool(){
int channels = 50;
logger.info(String.format("Configuring connection pool with %d threads", channels));
return new MigrationPool(channels, channels, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* Configure a large thread pool for concurrent channels on the physical Connection
*/
@Bean
public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory(@Qualifier("channelPool") MigrationPool connectionPool) {
logger.info("Configuring connection factory");
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.rabbitMQProperties.getAddresses());
cf.setUsername(this.rabbitMQProperties.getUsername());
cf.setPassword(this.rabbitMQProperties.getPassword());
cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost());
cf.setExecutor(connectionPool);
logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString()));
logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize()));
logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize()));
logger.info(String.format("MQ thread pool: %d threads", connectionPool.getMaximumPoolSize()));
return cf;
}
In the above code I have the number of threads per connection mirroring the number of virtual channels ie one real physical thread for each virtual RabbitMQ channel as each channel references a worker thread processing one message each. This results in the channels no longer blocking on the default 5 connections and instead taking full advantage of the expanded number of threads:
Manipulating the number of threads available to RabbitMQ's connection will show channels blocking. Setting to 10 threads and opening 50 channels for example.
链接地址: http://www.djcxy.com/p/34104.html