让卡夫卡消费者一次只阅读一条消息

我们有Kafka安装程序能够通过多台服务器并行处理消息。 但每条消息只能处理一次(并且只能由一台服务器处理)。 我们已经启动并运行,并且工作正常。

现在,我们面临的问题是,卡夫卡消费者批量阅读邮件以获得最大效率。 如果/当处理失败,服务器关闭或任何事情时,这会导致问题,因为那样我们就会丢失即将被处理的数据。

有没有办法让消费者一次只阅读消息让卡夫卡保留未处理的消息? 就像是; 消费者拉取一条消息 - >进程 - >完成时提交偏移,重复。 使用Kafka这可行吗? 任何想法/想法?

谢谢!


您可以尝试将max.poll.records设置为1。


你提到只有一个处理,但是你担心丢失数据。 我假设当你的一台服务器出现故障时,你只是担心边缘情况? 你失去了数据?

我不认为有一种方法可以一次完成一条消息。 仔细观察消费者配置,似乎只能设置消费者可从卡夫卡获取的最大字节数,而不是消息数。

fetch.message.max.bytes

但是,如果您担心完全丢失数据,如果您从未提交抵消卡夫卡不会标记为承诺,并且不会丢失。 通过Kafka文档了解交付语义,

因此,Kafka有效地保证默认情况下至少一次交付,并且允许用户在交付之前通过禁用生产者重试并在处理一批消息之前提交其偏移量来实现最多一次交付。 正确的一次交付需要与目标存储系统的合作,但Kafka提供了这种直接实现的抵销。

所以要实现 - 一次处理不是Kafka默认启用的。 只要您将处理输出写入存储,它就要求您实施存储偏移量。

但是,通过简单地让消费者将其偏移量存储在与其输出相同的位置上,可以更简单地处理这种情况...作为示例,我们在HDFS中填充数据的Hadoop ETL将其偏移量存储在HDFS中读取以确保数据和偏移量都被更新或者两者都不是。

我希望有所帮助。


这取决于你要使用的客户端。 对于C ++和Python,有可能消耗每次一个消息。

对于python,我使用https://github.com/mumrah/kafka-python。 以下代码每次可以使用一条消息:

message = self.__consumer.get_message(block=False, timeout=self.IterTimeout, get_partition_info=True )

__消费者是SimpleConsumer的对象。

在这里看到我的问题和答案:如何在程序中阻止Python Kafka Consumer?

对于C ++,我使用https://github.com/edenhill/librdkafka。 以下代码每次可以使用一条消息。

214         while( m_bRunning )
215         {
216                 // Start to read messages from the local queue.
217                 RdKafka::Message *msg = m_consumer->consume(m_topic, m_partition, 1000);
218                 msg_consume(msg);
219                 delete msg;
220                 m_consumer->poll(0);
221         }

m_consumer是指向C ++ Consumer对象(C ++ API)的指针。

希望这个帮助。

链接地址: http://www.djcxy.com/p/87449.html

上一篇: Having a Kafka Consumer read a single message at a time

下一篇: Issue with Robolectric with new version of Google Play services