如何测试ConsumerAwareRebalanceListener?

我使用Spring Boot 2.0.6

开发了一个@KafkaListener带有ConsumerAwareRebalanceListener接口的标签。我实现了该onPartitionsAssigned方法,在该方法中,将偏移的时间倒回固定的时间,例如60秒。

到目前为止,一切都很好。

如何使用Spring

Kafka给我的工具测试上述用例?我以为我需要启动一个Kafka代理(例如EmbeddedKafka),然后停止侦听器,然后再次重新启动它,以测试它是否再次读取了过去60秒内到达的消息。

有人可以帮我吗?我在Google上搜索了一下,但没有找到任何东西。非常感谢。

回答:

@KafkaListener有:

/**

* The unique identifier of the container managing for this endpoint.

* <p>If none is specified an auto-generated one is provided.

* @return the {@code id} for the container managing for this endpoint.

* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)

*/

String id() default "";

属性,因此您可以MessageListenerContainer通过提到的对其进行访问,您可以将其KafkaListenerEndpointRegistry简单地@Autowired放入基于Spring

Testing

Framework的测试类中。然后,你才能真正stop()start()MessageListenerContainer在您的测试方法。

也要注意如何@KafkaListener也有一个autoStartup()属性。

以上是 如何测试ConsumerAwareRebalanceListener? 的全部内容, 来源链接: utcz.com/qa/427636.html

回到顶部