Akka Streams KillSwitch在alpakka jms
我有一个场景,我开始使用alpakka多个jmsSource(对于不同的队列)。我还需要在任何时候卸下队列。所以我已经添加KillSwitch到jms阿卡流,如下所示: -Akka Streams KillSwitch在alpakka jms
trait MessageListener { lazy val jmsPipeline = jmsSource
.map { x => log.info(s"Received message ${x} from ${queue}"); x }
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach { x => pipelineActorRef ! PreProcessorMessage(x) })
(Keep.both)
.run()
def start(): Unit = {
log.info("Invoking listener : {}", queue)
jmsPipeline
log.info("listener : {} started", queue)
}
def stop():Unit = jmsPipeline._1.shutdown()
def queue: String
}
object ListenerA extends MessageListener {
override def queue: String = "Queue_A"
}
object ListenerB extends MessageListener {
override def queue: String = "Queue_B"
}
..等等。
启动应用程序后,所有的队列连接并正常工作。但是,当我尝试使用停止方法分离队列时,并非所有队列都断开连接并且行为是随机的。我还检查了killSwitch对所有听众都不同。
有人可以告诉我这里有什么问题吗?
回答:
您的日志支持您连接到具有不同流的多个队列的错觉,但是您有多个可能连接到同一队列的流。在这两个监听器对象中,记录器都会记录覆盖的queue
名称,但该队列名称不用于配置jmsSource
。
您没有显示jmsSource
的定义;显然它是在MessageListener
性状之外的某处定义的,在这种情况下,ListenerA
和ListenerB
都使用相同的jmsSource
。换言之,而ListenerA
和ListenerB
具有jmsPipeline
不同实例(这就是为什么杀开关是不同的),这两个jmsPipeline
实例由相同jmsSource
实例衍生(除非jmsSource
是def
即在每次调用创建一个不同的Source
,但即使情况如此,基本问题仍然存在:queue
未在配置中使用)。
在Alpakka,JMS队列上JmsSourceSettings
配置,所以jmsSource
可能看起来像下面这样:
val jmsSource: Source[String, NotUsed] = JmsSource.textSource( JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("MyQueue")
) // the queue is configured here^
当ListenerA.start()
,例如,被调用时,以下记录:
Invoking listener : Queue_A listener : Queue_A started
再次,在以上日志语句中的"Queue_A"
是ListenerA
中被重写的def queue: String
成员的值;它不一定是在jmsSource
(上例中的"MyQueue"
)中实际配置的队列。与ListenerB
以及您在map
组合器中登录的消息一样。
一个简单的解决方法是把的jmsSource
及其JmsSourceSettings
的定义MessageListener
特质内部和这些设置实际使用queue
。
以上是 Akka Streams KillSwitch在alpakka jms 的全部内容, 来源链接: utcz.com/qa/261143.html