RabbitMQ by Example:多个线程,通道和队列
我只是阅读RabbitMQ的Java API文档,并发现它非常翔实和直截了当。 如何设置用于发布/消费的简单Channel
的示例非常易于理解。 但这是一个非常简单/基本的例子,它给我留下了一个重要的问题: 我如何设置1+个Channels
来发布/消费多个队列?
比方说,我有一个RabbitMQ服务器,上面有3个队列: logging
, security_events
和customer_orders
。 所以我们要么需要一个Channel
才能够发布/使用所有3个队列,或者更有可能有3个独立的Channels
,每个Channels
专用于一个队列。
除此之外,RabbitMQ的最佳实践规定我们为每个消费者线程设置1个Channel
。 对于这个例子,假设只有1个消费者线程, security_events
没有问题,但是logging
和customer_order
都需要5个线程来处理卷。 所以,如果我理解正确,那是否意味着我们需要:
Channel
和1个消费者线程,用于发布/消费security_events
; 和 Channels
和5个消费者线程,用于发布logging
和消耗logging
; 和 Channels
和5个消费者线索用于发布/消费customer_orders
? 如果我的理解被误导了,请首先纠正我。 无论哪种方式,一些厌倦了RabbitMQ的老兵能否帮助我通过一个体面的代码示例“连接点”来设置符合我的要求的出版商/消费者? 提前致谢!
我认为你最初的理解有几个问题。 坦率地说,看到以下内容我有点惊讶: both need 5 threads to handle the volume
。 你怎么确定你需要这个确切的数字? 你有任何保证5线程就足够了?
RabbitMQ经过调试和时间测试,因此全部都是关于正确的设计和高效的消息处理。
让我们试着回顾一下问题并找到一个合适的解决方案。 顺便说一句,消息队列本身不会提供任何保证,你有真正的好解决方案。 你必须了解你在做什么,并做一些额外的测试。
你一定知道有很多布局可能:
我将使用布局B
作为以示出最简单的方法1
生产者N
消费者问题。 由于你对吞吐量非常担心。 顺便说一句,正如你可能期望RabbitMQ表现得相当好(来源)。 请注意prefetchCount
,稍后我会解决它:
因此消息处理逻辑可能是确保您拥有足够吞吐量的合适位置。 当然,每当你需要处理一条消息时,你都可以跨越一个新的线程,但最终这种方法会杀死你的系统。 基本上,更多的线程会让你获得更大的延迟(如果你愿意,你可以检查Amdahl的法则)。
(参见Amdahl法则)
提示#1:小心线程,使用ThreadPools(细节)
线程池可以描述为Runnable对象(工作队列)的集合和运行线程的连接。 这些线程一直在运行,正在检查工作查询是否有新的工作。 如果有新工作要完成,则执行此Runnable。 Thread类自身提供了一个方法,例如execute(Runnable r)将新的Runnable对象添加到工作队列中。
public class Main {
private static final int NTHREDS = 10;
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
for (int i = 0; i < 500; i++) {
Runnable worker = new MyRunnable(10000000L + i);
executor.execute(worker);
}
// This will make the executor accept no new threads
// and finish all existing threads in the queue
executor.shutdown();
// Wait until all threads are finish
executor.awaitTermination();
System.out.println("Finished all threads");
}
}
提示#2:注意消息处理开销
我会说这是显而易见的优化技术。 您可能会发送小而简单的邮件处理。 整个方法是关于更小的消息被连续设置和处理。 大消息最终会发挥不好的笑话,所以最好避免这种情况。
所以最好发送一些微小的信息,但是如何处理呢? 每次提交工作时都会有开销。 批量处理对于高传入消息速率可能非常有用。
例如,假设我们有简单的消息处理逻辑,并且每次处理消息时我们不希望线程特定的开销。 为了优化, CompositeRunnable can be introduced
非常简单的CompositeRunnable can be introduced
:
class CompositeRunnable implements Runnable {
protected Queue<Runnable> queue = new LinkedList<>();
public void add(Runnable a) {
queue.add(a);
}
@Override
public void run() {
for(Runnable r: queue) {
r.run();
}
}
}
或者通过收集待处理的消息,以稍微不同的方式做同样的事情:
class CompositeMessageWorker<T> implements Runnable {
protected Queue<T> queue = new LinkedList<>();
public void add(T message) {
queue.add(message);
}
@Override
public void run() {
for(T message: queue) {
// process a message
}
}
}
通过这种方式,您可以更有效地处理消息。
提示#3:优化消息处理
尽管您知道可以并行处理消息( Tip #1
)并减少了处理开销( Tip #2
),但您必须尽快完成所有任务。 冗余处理步骤,繁重循环等可能会影响性能。 请看有趣的案例研究:
通过选择正确的XML解析器,将消息队列吞吐量提高十倍
提示#4:连接和通道管理
(资源)
请注意,所有提示都完美协同工作。 随时让我知道,如果你需要更多的细节。
完整的消费者例子(来源)
请注意以下事项:
prefetchCount
可能非常有用: 该命令允许用户选择预取窗口,该窗口指定准备接收的未确认消息的数量。 通过将预取计数设置为非零值,代理将不会向消费者传递任何违反该限制的消息。 为了向前移动窗口,消费者必须确认收到消息(或一组消息)。
例:
static class Worker extends DefaultConsumer {
String name;
Channel channel;
String queue;
int processed;
ExecutorService executorService;
public Worker(int prefetch, ExecutorService threadExecutor,
, Channel c, String q) throws Exception {
super(c);
channel = c;
queue = q;
channel.basicQos(prefetch);
channel.basicConsume(queue, false, this);
executorService = threadExecutor;
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
Runnable task = new VariableLengthTask(this,
envelope.getDeliveryTag(),
channel);
executorService.submit(task);
}
}
您还可以检查以下内容:
我如何设置1 +多个频道来发布/消费多个队列?
您可以使用线程和通道来实现。 您所需要的只是一种对事物进行分类的方法,即登录中的所有队列项目,来自security_events的所有队列元素等。使用routingKey可以实现分类。
即:每次当你添加一个项目到队列中时,指定路由密钥。 它将作为属性元素添加。 通过这个,你可以从一个特定的事件获得值,例如日志记录。
以下代码示例说明了如何在客户端完成它。
例如:
使用路由键标识通道的类型并检索类型。
例如,如果您需要获取有关类型登录的所有渠道,那么您必须将路由键指定为登录名或其他关键字来标识。
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
string routingKey="login";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
你可以在这里找到关于分类的更多细节。
线程部分
一旦发布部分结束,您可以运行线程部分..
在这部分中,您可以根据类别获取已发布的数据。 即; 路由密钥,在你的情况是记录,security_events和customer_orders等。
请查看示例以了解如何在线程中检索数据。
例如:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//**The threads part is as follows**
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
// This part will biend the queue with the severity (login for eg:)
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
}
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.contentType;
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
现在创建一个处理类型登录(路由键)队列中的数据的线程。 通过这种方式你可以创建多个线程。 每个服务不同的目的。
在这里查看关于线程部分的更多细节..
为什么要自己实施一切?
尝试使用某种集成框架。 假设骆驼已经有了一堆连接到各种系统的连接器,Rabbit MQ包括了Camel Rabbit MQ。
你必须定义你的路线。 例如:
您希望将来自5个并发使用者的记录队列中的消息记录到文件中。
from("rabbitmq://localhost/Logging ?concurrentConsumers=5")
.to("file://yourLoggingFile")
如何设置文件使用者有许多选项。 正如你所看到的,你可以通过在你的URI中加入concurrentConsumers=5
来定义应该产生多少个消费者。 如果你想要你可以通过实现处理器接口来创建你自己的制作者或消费者。
它是非常灵活和强大的框架,只需使用提供的组件即可完成大量工作。 项目网站包含大量的例子和文档。
链接地址: http://www.djcxy.com/p/46359.html上一篇: RabbitMQ by Example: Multiple Threads, Channels and Queues