kafka Avro消息反序列化器,可用于多个主题

我正在尝试使用以下代码以avro格式反序列化kafka消息:[https](https://github.com/ivangfr/springboot-

kafka-debezium-ksql/blob/master/kafka-research-

consumer/src/main/java/com/mycompany/kafkaresearchconsumer/kafka/ReviewsConsumerConfig.java)

//github.com/ivangfr/springboot-kafka-debezium-ksql/blob/master/kafka-

research-consumer/src/main/java/ com / mycompany / kafkaresearchconsumer /

kafka / ReviewsConsumerConfig.java

上面的代码对我来说作为单个主题很好用,但是我必须侦听来自多个主题的消息并创建了多个AvroGenerated文件,但由于配置需要多种Avro类型对象,因此我陷入了配置困境。请考虑以下问题:

https://github.com/ivangfr/springboot-kafka-debezium-

ksql/issues/3

回答:

在配置中使用以下行:

props.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());

props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName() );

代替:

props.put( SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, mysql.researchdb.institutes.Value.class );

props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroWithSchemaDeserializer.class );

最终代码为:

package com.moglix.netsuite.kafka;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;

import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;

import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;

import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;

import java.util.Map;

@EnableKafka

@Configuration

public class ReviewsConsumerConfig

{

@Value( "${kafka.bootstrap-servers}" )

private String bootstrapServers;

@Value( "${kafka.schema-registry-url}" )

private String schemaRegistryUrl;

@Value( "${kafka.reviews.start-offset}" )

private String orderStartOffset;

@Value( "${kafka.reviews.max-poll-records}" )

private Integer maxPollRecords;

@Bean

public <T> ConcurrentKafkaListenerContainerFactory<String, T> kafkaListenerContainerFactory()

{

ConcurrentKafkaListenerContainerFactory<String, T> factory =

new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory( consumerFactory1() );

factory.setBatchListener( true );

factory.getContainerProperties().setAckMode( ContainerProperties.AckMode.MANUAL );

factory.getContainerProperties().setSyncCommits( true );

return factory;

}

@Bean

public <T> ConsumerFactory<String, T> consumerFactory1()

{

return new DefaultKafkaConsumerFactory<>( consumerConfigs1() );

}

@Bean

public Map<String, Object> consumerConfigs1()

{

Map<String, Object> props = new HashMap<>();

props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers );

props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class );

props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName() );

props.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, orderStartOffset );

props.put( AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl );

props.put( KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true );

props.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());// This is main line for my problem solution

//props.put( SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, mysql.researchdb.institutes.Value.class );

props.put( ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords );

props.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );

return props;

}

}

以上是 kafka Avro消息反序列化器,可用于多个主题 的全部内容, 来源链接: utcz.com/qa/427886.html

回到顶部