如何测试ConsumerAwareRebalanceListener?
我使用Spring Boot 2.0.6
开发了一个@KafkaListener带有ConsumerAwareRebalanceListener接口的标签。我实现了该onPartitionsAssigned方法,在该方法中,将偏移的时间倒回固定的时间,例如60秒。
到目前为止,一切都很好。
如何使用Spring
Kafka给我的工具测试上述用例?我以为我需要启动一个Kafka代理(例如EmbeddedKafka),然后停止侦听器,然后再次重新启动它,以测试它是否再次读取了过去60秒内到达的消息。
有人可以帮我吗?我在Google上搜索了一下,但没有找到任何东西。非常感谢。
回答:
该@KafkaListener有:
/** * The unique identifier of the container managing for this endpoint.
* <p>If none is specified an auto-generated one is provided.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
*/
String id() default "";
属性,因此您可以MessageListenerContainer通过提到的对其进行访问,您可以将其KafkaListenerEndpointRegistry简单地@Autowired放入基于Spring
Testing
Framework的测试类中。然后,你才能真正stop()和start()那MessageListenerContainer在您的测试方法。
也要注意如何@KafkaListener也有一个autoStartup()属性。
以上是 如何测试ConsumerAwareRebalanceListener? 的全部内容, 来源链接: utcz.com/qa/427636.html
