Kafka KStreams-处理超时

我试图用<KStream>.process()一个TimeWindows.of("name", 30000)批量一些

值,并送他们。似乎30秒钟超出了使用者超时间隔,在此间隔之后,Kafka认为该使用者已失效并释放了分区。

我尝试提高 和 的频率来避免这种情况:

config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");

config.put(StreamsConfig.POLL_MS_CONFIG, "5000");

不幸的是,这些错误仍在发生:

(很多)

ERROR  o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog 

org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0

其次是:

INFO   o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1

WARN o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)

显然,我需要更频繁地将心跳发送回服务器。怎么样?

我的拓扑是:

KStreamBuilder kStreamBuilder = new KStreamBuilder();

KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

KTable<Windowed<String>, String> kt = lines.aggregateByKey(

new DBAggregateInit(),

new DBAggregate(),

TimeWindows.of("write_aggregate2", 30000));

DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();

kt.toStream().process(dbProcessorSupplier);

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

该 是关键,每30秒分组值。在

我调用context.schedule(30000)

提供的实例 。这是

的实现,其中提供了所有替代。他们所做的只是LOG,所以我知道每当被击中时。

这是一个非常简单的拓扑,但是很明显,我在某处缺少了一步。


回答:

我知道我可以在服务器端进行调整,但是我希望有一个客户端解决方案。我喜欢在客户端退出/死亡时很快使分区可用的概念。


回答:

为了简化问题,我从图中删除了聚合步骤。现在只是消费者->

processor()。(如果我直接将消费者发送给.print()它,则v可以快速运行,所以我知道可以)。(类似地,如果我通过.print()它输出聚合(KTable)似乎也可以)。

我发现.process()- 应该.punctuate()每30秒调用一次-

实际上阻​​塞了可变的时间长度,并且输出有些随机(如果有的话)。

  • 主程序
  • 调试输出
  • 处理器供应商
  • 处理器

回答:

我将调试级别设置为“调试”并重新运行。我看到很多消息:

DEBUG  o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>

但是该.punctuate()函数的断点没有被触发。因此,它正在做很多工作,但没有给我使用它的机会。

回答:

一些澄清:

  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG是提交间隔的下限,即,在提交之后,下次提交不会在此时间之前进行。基本上,Kafka Stream会在这段时间过后尝试尽快提交,但是并不能保证执行下一次提交实际上需要多长时间。
  • StreamsConfig.POLL_MS_CONFIG用于内部KafkaConsumer#poll()呼叫,以指定呼叫的最大阻塞时间poll()

因此,这两个值都不会更频繁地帮助心跳。

Kafka

Streams在处理记录时遵循“深度优先”策略。这意味着,在poll()每个记录之后,将执行拓扑的所有运算符。假设您有三个连续的映射,则在处理下一个/第二个记录之前,将为第一条记录调用所有三个映射。

这样,在完全处理poll()完第一个记录的所有记录之后,将进行下一个调用poll()。如果您想更频繁地发送心跳信号,则需要确保单个poll()调用可获取较少的记录,从而处理所有记录所需的时间更少,并且下一个记录poll()将更早触发。

您可以使用配置参数KafkaConsumer来指定通过它StreamsConfig来完成此操作(请参阅https://kafka.apache.org/documentation.html#consumerconfigs):

streamConfig.put(ConsumerConfig.XXX,VALUE);

  • max.poll.records:如果减小此值,将轮询较少的记录
  • session.timeout.ms:如果增加此值,则会有更多的时间来处理数据(为了完整性起见,添加此时间是因为它实际上是客户端设置,而不是服务器/经纪人端配置-即使您知道此解决方案并且不喜欢它: ))

从Kafka开始0.10.1,可以(并建议)在stream

config中为consumer和procuder配置添加前缀。这避免了参数冲突,因为某些参数名称用于使用者和生产者,并且不能以其他方式区分(并且将同时应用于使用者

生产者)。要给参数加上前缀,可以分别使用StreamsConfig#consumerPrefix()StreamsConfig#producerPrefix()。例如:

streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER),

VALUE);

要添加的另一件事:这个问题中描述的方案是一个已知问题,并且已经有KIP-62引入了用于KafkaConsumer发送心跳的后台线程,从而将心跳与poll()呼叫分离。Kafka

Streams将在即将发布的版本中利用此新功能。

以上是 Kafka KStreams-处理超时 的全部内容, 来源链接: utcz.com/qa/407543.html

回到顶部