Spring Kafka的Spring Boot Rest API

我已经设计了一个Spring Boot REST API ADD和GET方法

    @RestController("ProductV1Controller")

public class ProductController

{

private final IProductProducer _productProducer;

public ProductController(IProductProducer productProducer) {

_productProducer = productProducer;}

@PostMapping()

void AddProduct(@Valid @RequestBody ProductViewModel product) {

_productProducer.AddProduct(product);

}

@GetMapping()

List<ProductViewModel> Products() {

var test = _productProducer.GetProducts();

return _productProducer.GetProducts();

}

}

服务层

@Service

public class ProductProducer implements IProductProducer{

private final KafkaTemplate<String, Object> _template;

public ProductProducer(KafkaTemplate<String, Object> _template) {

this._template = _template;

}

@Override

public List<ProductViewModel> GetProducts() {

this._template.send(ProductTopicConstants.GET_PRODUCTS,null);

return List.of(new ProductViewModel("","",0,"")); --> Need to return the value from the kafka

}

@Override

public void AddProduct(ProductViewModel product) {

this._template.send(ProductTopicConstants.ADD_PRODUCT, product);

}

}

卡夫卡听众

 @KafkaListener(id = ProductTopicConstants.GET_PRODUCTS, topics = ProductTopicConstants.GET_PRODUCTS)

public List<Product> GetProducts() {

return _productRepository.findAll();

}

在服务层中,GetProducts()我需要返回来自的项目列表。_productRepository.findAll();

使用Spring kafka进行REST API的最佳方法是什么?

回答:

您需要使用a ReplyingKafkaTemplate将结果返回到rest控制器。

参见ReplyingKafkaTemplate。

2.1.3版引入了KafkaTemplate的子类来提供请求/回复语义。该类名为ReplyingKafkaTemplate,并且具有一个方法(超类中的方法除外)。

结果是一个ListenableFuture,它用结果(或一个超时异常)进行异步填充。结果还具有sendFuture属性,该属性是调用KafkaTemplate.send()的结果。您可以使用此将来确定发送操作的结果。

该文档有一个示例。

@SpringBootApplication

@RestController

public class So63058608Application {

private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);

public static void main(String[] args) {

SpringApplication.run(So63058608Application.class, args);

}

@Autowired

private ReplyingKafkaTemplate<String, String, List<String>> replyTemplate;

@GetMapping(path = "/get")

public List<String> getThem() throws Exception {

RequestReplyFuture<String, String, List<String>> future =

this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));

LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());

return future.get(10, TimeUnit.SECONDS).value();

}

@KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)

@SendTo

public List<String> returnList(@Payload(required = false) String payload) {

return new ArrayList<>(List.of("foo", "bar", "baz"));

}

@Bean

public ReplyingKafkaTemplate<String, String, List<String>> replyer(ProducerFactory<String, String> pf,

ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {

containerFactory.setReplyTemplate(kafkaTemplate(pf));

ConcurrentMessageListenerContainer<String, List<String>> container = replyContainer(containerFactory);

ReplyingKafkaTemplate<String, String, List<String>> replyer = new ReplyingKafkaTemplate<>(pf, container);

return replyer;

}

@Bean

public ConcurrentMessageListenerContainer<String, List<String>> replyContainer(

ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {

ConcurrentMessageListenerContainer<String, List<String>> container =

containerFactory.createContainer("so63058608-2");

container.getContainerProperties().setGroupId("so63058608-2");

container.setBatchErrorHandler(new BatchLoggingErrorHandler());

return container;

}

@Bean

public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {

return new KafkaTemplate<>(pf);

}

@Bean

public NewTopic topic1() {

return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();

}

@Bean

public NewTopic topic3() {

return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();

}

}

spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

$ curl localhost:8080/get

["foo","bar","baz"]

并返回一些对象的列表…

@SpringBootApplication

@RestController

public class So63058608Application {

private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);

public static void main(String[] args) {

SpringApplication.run(So63058608Application.class, args);

}

@Autowired

private ReplyingKafkaTemplate<String, String, List<Foo>> replyTemplate;

@GetMapping(path = "/get")

public List<Foo> getThem() throws Exception {

RequestReplyFuture<String, String, List<Foo>> future =

this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));

LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());

List<Foo> result = future.get(10, TimeUnit.SECONDS).value();

LOG.info(result.toString());

return result;

}

@KafkaListener(id = "so63058608-1", topics = "so63058608-1", splitIterables = false)

@SendTo

public List<Foo> returnList(@Payload(required = false) String payload) {

return new ArrayList<>(List.of(new Foo("foo"), new Foo("bar"), new Foo("baz")));

}

@Bean

public ReplyingKafkaTemplate<String, String, List<Foo>> replyer(ProducerFactory<String, String> pf,

ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {

containerFactory.setReplyTemplate(kafkaTemplate(pf));

ConcurrentMessageListenerContainer<String, List<Foo>> container = replyContainer(containerFactory);

ReplyingKafkaTemplate<String, String, List<Foo>> replyer = new ReplyingKafkaTemplate<>(pf, container);

return replyer;

}

@Bean

public ConcurrentMessageListenerContainer<String, List<Foo>> replyContainer(

ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {

ConcurrentMessageListenerContainer<String, List<Foo>> container =

containerFactory.createContainer("so63058608-2");

container.getContainerProperties().setGroupId("so63058608-2");

container.setBatchErrorHandler(new BatchLoggingErrorHandler());

return container;

}

@Bean

public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {

return new KafkaTemplate<>(pf);

}

@Bean

public NewTopic topic1() {

return TopicBuilder.name("so63058608-1").partitions(1).replicas(1).build();

}

@Bean

public NewTopic topic3() {

return TopicBuilder.name("so63058608-2").partitions(1).replicas(1).build();

}

public static JavaType returnType(byte[] data, Headers headers) {

return TypeFactory.defaultInstance()

.constructCollectionLikeType(List.class, Foo.class);

}

}

class Foo {

private String bar;

public Foo() {

}

public Foo(String bar) {

this.bar = bar;

}

public String getBar() {

return this.bar;

}

public void setBar(String bar) {

this.bar = bar;

}

@Override

public String toString() {

return "Foo [bar=" + this.bar + "]";

}

}

spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType

[Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]

以上是 Spring Kafka的Spring Boot Rest API 的全部内容, 来源链接: utcz.com/qa/420874.html

回到顶部