Spring Boot配置多个ActiveMQ实例

我需要将消息从一个ActiveMQ实例上的队列移动到另一个ActiveMQ实例上。是否可以使用Spring

Boot配置连接到两个不同的ActiveMQ实例?

我需要创建多个connectionFactories吗?如果是这样,那么JmsTemplate如何知道要连接到哪个ActiveMQ实例?

  @Bean

public ConnectionFactory connectionFactory() {

return new ActiveMQConnectionFactory(JMS_BROKER_URL);

}

任何帮助和代码示例将很有用。

提前致谢。通用汽车

回答:

除了@Chris的响应之外,您还必须使用不同的端口创建不同的BrokerService实例,并创建不同的ConnectionFactory以连接到每个代理,并使用这些不同的工厂向不同的代理发送消息,从而创建不同的JmsTemplate。

例如 :

import javax.jms.ConnectionFactory;

import javax.jms.QueueConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;

import org.apache.activemq.broker.BrokerService;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.annotation.Primary;

import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

import org.springframework.jms.config.JmsListenerContainerFactory;

import org.springframework.jms.core.JmsTemplate;

@Configuration

public class ActiveMQConfigurationForJmsCamelRouteConsumeAndForward {

public static final String LOCAL_Q = "localQ";

public static final String REMOTE_Q = "remoteQ";

@Bean

public BrokerService broker() throws Exception {

final BrokerService broker = new BrokerService();

broker.addConnector("tcp://localhost:5671");

broker.setBrokerName("broker");

broker.setUseJmx(false);

return broker;

}

@Bean

public BrokerService broker2() throws Exception {

final BrokerService broker = new BrokerService();

broker.addConnector("tcp://localhost:5672");

broker.setBrokerName("broker2");

broker.setUseJmx(false);

return broker;

}

@Bean

@Primary

public ConnectionFactory jmsConnectionFactory() {

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5671");

return connectionFactory;

}

@Bean

public QueueConnectionFactory jmsConnectionFactory2() {

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5672");

return connectionFactory;

}

@Bean

@Primary

public JmsTemplate jmsTemplate() {

JmsTemplate jmsTemplate = new JmsTemplate();

jmsTemplate.setConnectionFactory(jmsConnectionFactory());

jmsTemplate.setDefaultDestinationName(LOCAL_Q);

return jmsTemplate;

}

@Bean

public JmsTemplate jmsTemplate2() {

JmsTemplate jmsTemplate = new JmsTemplate();

jmsTemplate.setConnectionFactory(jmsConnectionFactory2());

jmsTemplate.setDefaultDestinationName(REMOTE_Q);

return jmsTemplate;

}

@Bean

public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory,

DefaultJmsListenerContainerFactoryConfigurer configurer) {

DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

configurer.configure(factory, connectionFactory);

return factory;

}

@Bean

public JmsListenerContainerFactory<?> jmsListenerContainerFactory2(

@Qualifier("jmsConnectionFactory2") ConnectionFactory connectionFactory,

DefaultJmsListenerContainerFactoryConfigurer configurer) {

DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

configurer.configure(factory, connectionFactory);

return factory;

}

}

请注意,通过下面的示例,您不能在要从其转发消息的队列上拥有多个使用者,因为Camel或JmsBridgeConnectors会使用该消息并进行转发。如果只希望转发邮件的副本,则可以采用以下解决方案:1-将队列转换为主题,通过持久订阅或追溯使用者来管理脱机使用者的邮件。2-将您的队列转换为复合队列,并使用DestinationsInterceptors将消息复制到另一个队列。3-将NetworkConnector用于经纪人网络

@Bean

public BrokerService broker() throws Exception {

final BrokerService broker = new BrokerService();

broker.addConnector("tcp://localhost:5671");

SimpleJmsQueueConnector simpleJmsQueueConnector = new SimpleJmsQueueConnector();

OutboundQueueBridge bridge = new OutboundQueueBridge();

bridge.setLocalQueueName(LOCAL_Q);

bridge.setOutboundQueueName(REMOTE_Q);

OutboundQueueBridge[] outboundQueueBridges = new OutboundQueueBridge[] { bridge };

simpleJmsQueueConnector.getReconnectionPolicy().setMaxSendRetries(ReconnectionPolicy.INFINITE);

simpleJmsQueueConnector.setOutboundQueueBridges(outboundQueueBridges);

simpleJmsQueueConnector.setLocalQueueConnectionFactory((QueueConnectionFactory) jmsConnectionFactory());

simpleJmsQueueConnector.setOutboundQueueConnectionFactory(jmsConnectionFactory2());

JmsConnector[] jmsConnectors = new JmsConnector[] { simpleJmsQueueConnector };

broker.setJmsBridgeConnectors(jmsConnectors);

broker.setBrokerName("broker");

broker.setUseJmx(false);

return broker;

}

@Bean

public CamelContext camelContext() throws Exception {

CamelContext context = new DefaultCamelContext();

context.addComponent("inboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5671"));

context.addComponent("outboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5672"));

context.addRoutes(new RouteBuilder() {

public void configure() {

from("inboundQueue:queue:" + LOCAL_Q).to("outboundQueue:queue:" + REMOTE_Q);

}

});

context.start();

return context;

}

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.boot.CommandLineRunner;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.stereotype.Component;

@Component

public class Producer implements CommandLineRunner {

@Autowired

private JmsTemplate jmsTemplate;

@Autowired

@Qualifier("jmsTemplate2")

private JmsTemplate jmsTemplate2;

@Override

public void run(String... args) throws Exception {

send("Sample message");

}

public void send(String msg) {

this.jmsTemplate.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.LOCAL_Q, msg);

this.jmsTemplate2.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, msg);

}

}

import javax.jms.Session;

import org.apache.activemq.ActiveMQSession;

import org.springframework.jms.annotation.JmsListener;

import org.springframework.stereotype.Component;

@Component

public class Consumer {

@JmsListener(destination = ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, containerFactory = "jmsListenerContainerFactory2")

public void receiveQueue(Session session, String text) {

System.out.println(((ActiveMQSession) session).getConnection().getBrokerInfo());

System.out.println(text);

}

}

以上是 Spring Boot配置多个ActiveMQ实例 的全部内容, 来源链接: utcz.com/qa/415640.html

回到顶部