卡夫卡消费者会议超时

我们有一个消费者读取消息的应用程序,并且该线程执行许多操作,包括在消息生成到另一个主题之前进行数据库访问。 在线程上消费和生成消息之间的时间可能需要几分钟的时间。 一旦将消息生成为新主题,就会进行提交以表明我们已完成消费者队列消息的工作。 由于这个原因,自动提交被禁用。

我使用高级消费者,我注意到zookeeper和kafka会话超时,因为在我们对消费者队列执行任何操作之前花费的时间太长,因此每当线程返回以从消费者那里读取更多内容时,卡夫卡就会重新平衡队列,并且消费者在一段时间后读取新消息之前需要很长时间。

我可以设置动物园管理员会话超时非常高,不会造成问题,但是我必须相应地调整重新平衡参数,并且卡夫卡不会在一段时间内吸收新的消费者。

我有什么选择来解决这个问题? 有没有办法让卡夫卡和动物园管理员保持高兴? 如果我使用简单的消费者,是否仍然有这些相同的问题?


这听起来像是你的问题归结为依靠高级消费者来管理最后读取的抵消。 使用简单的消费者可以解决该问题,因为您控制该偏移量的持久性。 请注意,所有高级消费者提交所做的是将最后一次读取偏移量存储在zookeeper中。 没有采取其他措施,您刚刚读取的消息仍然存在于分区中,并且可供其他消费者阅读。

通过卡夫卡简单的消费者,您可以更好地控制何时以及如何进行抵消存储。 你甚至可以坚持抵消除Zookeeper(例如数据库)以外的其他地方。

坏消息是,虽然简单的消费者本身比高级消费者简单,但你需要做更多的工作才能实现代码。 您还必须编写代码才能访问多个分区 - 高级消费者对您来说非常合适。


我认为问题是消费者的投票方法触发消费者的心跳要求。 而当你增加session.timeout。 消费者的心跳不会到达协调者。 由于这种心跳跳跃,协调员标记消费者死亡。 而且消费者重新加入的速度非常缓慢,特别是在单一消费者的情况下。

我遇到了类似的问题,并解决了我必须更改消费者配置属性中的以下参数

session.timeout.ms = request.timeout.ms =超过会话超时

您还必须在kafka broker节点的server.properties中添加以下属性。 group.max.session.timeout.ms =

您可以看到以下链接以获取更多详细信息。 http://grokbase.com/t/kafka/users/16324waa50/session-timeout-ms-limit

链接地址: http://www.djcxy.com/p/23907.html

上一篇: kafka consumer sessions timing out

下一篇: date() is malfunctioning