春云流Kafka粘结剂:“尝试从状态IN_TRANSACTION到状态IN_TRANSACTION的无效转换”

我正在尝试使用Spring云流+ Kafka绑定对Apache Kafka" title="Apache Kafka">Apache Kafka进行“恰好一个交付”概念的一些PoC。春云流Kafka粘结剂:“尝试从状态IN_TRANSACTION到状态IN_TRANSACTION的无效转换”

我安装了Apache Kafka“kafka_2.11-1.0.0”,并在生产者中定义了“transactionIdPrefix”,我知道这是我在Spring Kafka中启用事务所需要做的唯一事情,但是当我这样做时,运行简单源代码&在同一个应用程序中接收器绑定,我看到一些消息在消费者中接收和打印,并且一些消息发生错误。

例如,消息#6接收:

[49] Received message [Payload String content=FromSource1 6][Headers={kafka_offset=1957, scst_nativeHeadersPresent=true, [email protected]695c9a9, kafka_timestampType=CREATE_TIME, my-transaction-id=my-id-6, id=302cf3ef-a154-fd42-6b43-983778e275dc, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=test10, kafka_receivedTimestamp=1514384106395, timestamp=1514384106419}] 

但消息#7有一个错误 “无效过渡从状态IN_TRANSACTION试图状态IN_TRANSACTION”:

2017-12-27 16:15:07.405 ERROR 7731 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafk[email protected]7d3bbc0b]; nested exception is org.apache.kafka.common.KafkaException: TransactionalId my-transaction-3: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION, failedMessage=GenericMessage [payload=byte[13], headers={my-transaction-id=my-id-7, id=d31656af-3286-99b0-c736-d53aa57a5e65, contentType=application/json, timestamp=1514384107399}] 

at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153)

at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:575)

  • 什么这个错误的意思?
  • 是我的配置缺少的东西?
  • 我是否需要在启用事务时以不同的方式实现Source或Sink?


UPDATE: 我打开这个项目的github上的问题,请参见讨论那里。


无法找到如何使用Spring云流与卡夫卡结合+ Trasanctions启用

要重现,需要创建了一个简单的Maven项目与弹簧引导版本为例“2.0.0.M5 “和‘弹簧云流依赖性’版本‘Elmhurst.M3’,并且创建了一个简单的应用程序使用此配置:

server: 

port: 8082

spring:

kafka:

producer:

retries: 5555

acks: "all"

cloud:

stream:

kafka:

binder:

autoAddPartitions: true

transaction:

transactionIdPrefix: my-transaction-

bindings:

output1:

destination: test10

group: test111

binder: kafka

input1:

destination: test10

group: test111

binder: kafka

consumer:

partitioned: true

我还创建简单源库类:

@EnableBinding(SampleSink.MultiInputSink.class) 

public class SampleSink {

@StreamListener(MultiInputSink.INPUT1)

public synchronized void receive1(Message<?> message) {

System.out.println("["+Thread.currentThread().getId()+"] Received message " + message);

}

public interface MultiInputSink {

String INPUT1 = "input1";

@Input(INPUT1)

SubscribableChannel input1();

}

}

和:

@EnableBinding(SampleSource.MultiOutputSource.class) 

public class SampleSource {

AtomicInteger atomicInteger = new AtomicInteger(1);

@Bean

@InboundChannelAdapter(value = MultiOutputSource.OUTPUT1, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))

public synchronized MessageSource<String> messageSource1() {

return new MessageSource<String>() {

public Message<String> receive() {

String message = "FromSource1 "+atomicInteger.getAndIncrement();

m.put("my-transaction-id","my-id-"+ UUID.randomUUID());

return new GenericMessage(message, new MessageHeaders(m));

}

};

}

public interface MultiOutputSource {

String OUTPUT1 = "output1";

@Output(OUTPUT1)

MessageChannel output1();

}

}

回答:

我上到项目的github上开了罚单。 请参考答案和讨论有:

https://github.com/spring-cloud/spring-cloud-stream/issues/1166

,但第一次的答案有:

粘结剂目前不支持生产商发起的交易。

处理器支持事务(消费者启动 事务和生产者参与该事务)。

当没有消费者时,您应该可以直接使用spring-kafka在生产者端启动一个 事务。

以上是 春云流Kafka粘结剂:“尝试从状态IN_TRANSACTION到状态IN_TRANSACTION的无效转换” 的全部内容, 来源链接: utcz.com/qa/265847.html

回到顶部