带Spring Boot的简单嵌入式Kafka测试示例
编辑仅供参考:gitHub工作示例
我正在互联网上搜索,找不到嵌入式Kafka测试的工作示例。
我的设置是:
- Spring Boot
- 多个@KafkaListener在一个类中具有不同的主题
- 嵌入式Kafka可以很好地进行测试
- 使用发送到主题的Kafkatemplate进行测试,但是 @KafkaListener方法即使经过大量睡眠后仍无法接收任何内容
- 没有显示警告或错误,仅在日志中显示来自Kafka的垃圾信息
请帮我。大部分是配置过度或工程过度的示例。我相信这可以简单完成。谢谢你们!
@Controllerpublic class KafkaController {
private static final Logger LOG = getLogger(KafkaController.class);
@KafkaListener(topics = "test.kafka.topic")
public void receiveDunningHead(final String payload) {
LOG.debug("Receiving event with payload [{}]", payload);
//I will do database stuff here which i could check in db for testing
}
}
私有静态字符串SENDER_TOPIC =“ test.kafka.topic”;
@ClassRulepublic static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Test
public void testSend() throws InterruptedException, ExecutionException {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
Thread.sleep(10000);
}
回答:
嵌入式Kafka测试适用于以下配置,
测试课注释
@EnableKafka@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
brokerProperties = {
"listeners=PLAINTEXT://localhost:3333",
"port=3333"
})
public class KafkaConsumerTest {
@Autowired
KafkaEmbedded kafkaEmbeded;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
注释之前的设置方法
@Beforepublic void setUp() throws Exception {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbeded.getPartitionsPerTopic());
}
}
注意:我不是@ClassRule
用来创建嵌入式Kafka而是自动装配
@Autowired embeddedKafka
@Testpublic void testReceive() throws Exception {
kafkaTemplate.send(topic, data);
}
希望这可以帮助!
编辑:测试配置类标记为 @TestConfiguration
@TestConfigurationpublic class TestConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(topic);
return kafkaTemplate;
}
现在@Test
方法将自动连接KafkaTemplate并用于发送消息
kafkaTemplate.send(topic, data);
用上面的行更新了答案代码块
以上是 带Spring Boot的简单嵌入式Kafka测试示例 的全部内容, 来源链接: utcz.com/qa/432176.html