KafkaConsumer 0.10 Java API错误消息:没有当前分区分配
我正在使用KafkaConsumer 0.10 Java
api。我想从特定的分区和特定的偏移量中消费。我抬起头,发现有一个搜索方法,但是抛出异常。任何人都有类似的用例或解决方案?
码:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);consumer.seek(new TopicPartition("mytopic", 1), 4);
例外
java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
at xx.xxx.xxx.Test.main(Test.java:182)
回答:
你可以之前seek()
,你首先需要subscribe()
一个主题
assign()
主题,以消费者的分区。也请记住,这subscribe()
和assign()
懒惰-
这样,你也需要做一个“虚拟来电”,以poll()
才可以使用seek()
。
注意:从Kafka 2.0开始,新版本
poll(Duration
timeout)是异步的,不能保证您在
poll
返回时可以完成完整的分配。因此,您可能需要在使用前检查您的分配,seek()
并poll
再次刷新该分配。(有关详细信息,请参见KIP-266)
如果您使用subscribe()
,则使用组管理:这样,您可以使用同一组启动多个使用者,group.id
并且该主题的所有分区将自动在组内的所有使用者上平均分配(每个分区将分配给组中的单个使用者)
。
如果要读取特定的分区,则需要通过进行手动分配assign()
。这使您可以执行所需的任何作业。
顺便说一句:KafkaConsumer
非常长的JavaDoc类,包括示例。值得一读。
以上是 KafkaConsumer 0.10 Java API错误消息:没有当前分区分配 的全部内容, 来源链接: utcz.com/qa/407527.html