带Spring Boot的简单嵌入式Kafka测试示例

编辑仅供参考:gitHub工作示例


我正在互联网上搜索,找不到嵌入式Kafka测试的工作示例。

我的设置是:

  • Spring Boot
  • 多个@KafkaListener在一个类中具有不同的主题
  • 嵌入式Kafka可以很好地进行测试
  • 使用发送到主题的Kafkatemplate进行测试,但是 @KafkaListener方法即使经过大量睡眠后仍无法接收任何内容
  • 没有显示警告或错误,仅在日志中显示来自Kafka的垃圾信息

请帮我。大部分是配置过度或工程过度的示例。我相信这可以简单完成。谢谢你们!

@Controller

public class KafkaController {

private static final Logger LOG = getLogger(KafkaController.class);

@KafkaListener(topics = "test.kafka.topic")

public void receiveDunningHead(final String payload) {

LOG.debug("Receiving event with payload [{}]", payload);

//I will do database stuff here which i could check in db for testing

}

}


私有静态字符串SENDER_TOPIC =“ test.kafka.topic”;

@ClassRule

public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

@Test

public void testSend() throws InterruptedException, ExecutionException {

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);

KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);

producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();

producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();

producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();

Thread.sleep(10000);

}

回答:

嵌入式Kafka测试适用于以下配置,

测试课注释

@EnableKafka

@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config

@EmbeddedKafka(

partitions = 1,

controlledShutdown = false,

brokerProperties = {

"listeners=PLAINTEXT://localhost:3333",

"port=3333"

})

public class KafkaConsumerTest {

@Autowired

KafkaEmbedded kafkaEmbeded;

@Autowired

KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

注释之前的设置方法

@Before

public void setUp() throws Exception {

for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {

ContainerTestUtils.waitForAssignment(messageListenerContainer,

kafkaEmbeded.getPartitionsPerTopic());

}

}

注意:我不是@ClassRule用来创建嵌入式Kafka而是自动装配

@Autowired embeddedKafka

@Test

public void testReceive() throws Exception {

kafkaTemplate.send(topic, data);

}

希望这可以帮助!

编辑:测试配置类标记为 @TestConfiguration

@TestConfiguration

public class TestConfig {

@Bean

public ProducerFactory<String, String> producerFactory() {

return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));

}

@Bean

public KafkaTemplate<String, String> kafkaTemplate() {

KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());

kafkaTemplate.setDefaultTopic(topic);

return kafkaTemplate;

}

现在@Test方法将自动连接KafkaTemplate并用于发送消息

kafkaTemplate.send(topic, data);

用上面的行更新了答案代码块

以上是 带Spring Boot的简单嵌入式Kafka测试示例 的全部内容, 来源链接: utcz.com/qa/432176.html

回到顶部