kafka Avro消息反序列化器,可用于多个主题
- 我正在尝试使用以下代码以avro格式反序列化kafka消息:[https](https://github.com/ivangfr/springboot-
- kafka-debezium-ksql/blob/master/kafka-research-
- consumer/src/main/java/com/mycompany/kafkaresearchconsumer/kafka/ReviewsConsumerConfig.java)
- //github.com/ivangfr/springboot-kafka-debezium-ksql/blob/master/kafka-
research-consumer/src/main/java/ com / mycompany / kafkaresearchconsumer /
kafka / ReviewsConsumerConfig.java
上面的代码对我来说作为单个主题很好用,但是我必须侦听来自多个主题的消息并创建了多个AvroGenerated文件,但由于配置需要多种Avro类型对象,因此我陷入了配置困境。请考虑以下问题:
https://github.com/ivangfr/springboot-kafka-debezium-
ksql/issues/3
回答:
在配置中使用以下行:
props.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName() );
代替:
props.put( SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, mysql.researchdb.institutes.Value.class );props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroWithSchemaDeserializer.class );
最终代码为:
package com.moglix.netsuite.kafka;import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class ReviewsConsumerConfig
{
@Value( "${kafka.bootstrap-servers}" )
private String bootstrapServers;
@Value( "${kafka.schema-registry-url}" )
private String schemaRegistryUrl;
@Value( "${kafka.reviews.start-offset}" )
private String orderStartOffset;
@Value( "${kafka.reviews.max-poll-records}" )
private Integer maxPollRecords;
@Bean
public <T> ConcurrentKafkaListenerContainerFactory<String, T> kafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<String, T> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory( consumerFactory1() );
factory.setBatchListener( true );
factory.getContainerProperties().setAckMode( ContainerProperties.AckMode.MANUAL );
factory.getContainerProperties().setSyncCommits( true );
return factory;
}
@Bean
public <T> ConsumerFactory<String, T> consumerFactory1()
{
return new DefaultKafkaConsumerFactory<>( consumerConfigs1() );
}
@Bean
public Map<String, Object> consumerConfigs1()
{
Map<String, Object> props = new HashMap<>();
props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers );
props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class );
props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName() );
props.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, orderStartOffset );
props.put( AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl );
props.put( KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true );
props.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());// This is main line for my problem solution
//props.put( SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, mysql.researchdb.institutes.Value.class );
props.put( ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords );
props.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
return props;
}
}
以上是 kafka Avro消息反序列化器,可用于多个主题 的全部内容, 来源链接: utcz.com/qa/427886.html