KafkaConsumer 0.10 Java API错误消息:没有当前分区分配

我正在使用KafkaConsumer 0.10 Java

api。我想从特定的分区和特定的偏移量中消费。我抬起头,发现有一个搜索方法,但是抛出异常。任何人都有类似的用例或解决方案?

码:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);

consumer.seek(new TopicPartition("mytopic", 1), 4);

例外

java.lang.IllegalStateException: No current assignment for partition mytopic-1

at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)

at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)

at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)

at xx.xxx.xxx.Test.main(Test.java:182)

回答:

你可以之前seek(),你首先需要subscribe()一个主题

assign()主题,以消费者的分区。也请记住,这subscribe()assign()懒惰-

这样,你也需要做一个“虚拟来电”,以poll()才可以使用seek()

注意:从Kafka 2.0开始,新版本poll(Duration

timeout)是异步的,不能保证您在poll返回时可以完成完整的分配。因此,您可能需要在使用前检查您的分配,seek()poll再次刷新该分配。(有关详细信息,请参见KIP-266)

如果您使用subscribe(),则使用组管理:这样,您可以使用同一组启动多个使用者,group.id并且该主题的所有分区将自动在组内的所有使用者上平均分配(每个分区将分配给组中的单个使用者)

如果要读取特定的分区,则需要通过进行手动分配assign()。这使您可以执行所需的任何作业。

顺便说一句:KafkaConsumer非常长的JavaDoc类,包括示例。值得一读。

以上是 KafkaConsumer 0.10 Java API错误消息:没有当前分区分配 的全部内容, 来源链接: utcz.com/qa/407527.html

回到顶部