在Spring Boot中控制启用/禁用Kafka使用者

我在Spring Boot" title="Spring Boot">Spring Boot中配置了几个Kafka使用者。这就是kafka.properties的样子(这里仅列出一个使用者的配置):

kafka.topics=

bootstrap.servers=

group.id=

enable.auto.commit=

auto.commit.interval.ms=

session.timeout.ms=

schema.registry.url=

auto.offset.reset=

kafka.enabled=

这是配置:

@Configuration

@PropertySource({"classpath:kafka.properties"})

public class KafkaConsumerConfig {

@Autowired

private Environment env;

@Bean

public ConsumerFactory<String, String> pindropConsumerFactory() {

Map<String, Object> dataRiverProps = new HashMap<>();

dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));

dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));

dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));

dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));

dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));

dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));

return new DefaultKafkaConsumerFactory<>(dataRiverProps);

}

@Bean

public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(pindropConsumerFactory());

return factory;

}

}

这是消费者:

@Component

public class KafkaConsumer {

@Autowired

private MessageProcessor messageProcessor;

@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")

public void consumeJson(String message) {

// processing message

}

}

我是否可以使用道具“ kafka.enabled”来控制此使用者的创建或消息检索?非常感谢!

回答:

您可以通过在使用者中使用属性 (true / false)来做到这一点,如下所示-

@KafkaListener(id = "foo", topics = "Topic1", groupId = "group_id",

containerFactory = "kafkaListenerContainerFactory",autoStartup = "${listen.auto.start:false}")

public void consume(String message) {

//System.out.println("Consumed message: " + message);

}

以上是 在Spring Boot中控制启用/禁用Kafka使用者 的全部内容, 来源链接: utcz.com/qa/435978.html

回到顶部