springboot项目中使用rabbitmq开发应用

编程

AMQP介绍

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。

Exchange有Direct、Fanout、Topic、Headers,最常用的是Direct、Fanout、Topic三种类型。

“生产/消费”消息模型
生产者发送消息到broker server(RabbitMQ)。在Broker内部,用户创建Exchange/Queue,
通过Binding规则将两者联系在一起。Exchange分发消息,根据类型/binding的不同分发策略有区别。
消息最后来到Queue中,等待消费者取走。

AMQP messaging 中的基本概念
Broker: 接收和分发消息的应用,RabbitMQ Server就是Message Broker。
Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。
Connection: publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。
Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
Queue: 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。
Binding: exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。

A、SpringAMQP声明
SpringAMQP声明即在rabbit基础API里面声明一个exchange、Bingding、queue。使用SpringAMQP去声明,就需要使用@bean的声明方式。

B、RabbitAdmin
(1)RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可
(2)autoStartup必须设置true,否则Spring容器不会加载RabbitAdmin类
(3)RabbitAdmin底层实现就是从Spring容器中获取Exchagge,Bingding,RoutingKey以及Queue的@Bean声明
(4)然后使用RabbitTemplate的execute方法执行对应声明,修改,删除等操作

C、RabbitTemplate
Spring AMQP 提供了 RabbitTemplate 来简化 RabbitMQ 发送和接收消息操作

D、SimpleMessageListenerContailer
(1)简单消息监听容器:这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
(2)设置事务特性,事务管理器,事务属性,事务容量,事务开启等
(3)设置消息确认和自动确认模式,是否重回队列,异常捕获handler函数
(4)设置消费者标签生成策略,是否独占模式,消费者属性等
(5)simpleMessageListenerContailer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小,接收消息的模式等

E、MessageListenerAdapter
1)可以把一个没有实现MessageListener和ChannelAwareMessageListener接口的类适配成一个可以处理消息的处理器
2)默认的方法名称为:handleMessage,可以通过setDefaultListenerMethod设置新的消息处理方法
3)MessageListenerAdapter支持不同的队列交给不同的方法去执行。使用setQueueOrTagToMethodName方法设置,当根据queue名称没有找到匹配的方法的时候,就会交给默认的方法去处理。

F、MessageConverter
(1)消息转换器
(2)在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要MessageConverter
(3)实现MessageConverter接口,重写toMessage(java对象转换为Message) fromMessage(Message对象转换为java对象)
(4)json转换器,自定义二进制转换器(比如图片类型,pdf,ppt,流媒体)

 

1、安装并启动 amqp,方法如:https://my.oschina.net/lion1220/blog/3151027

     登录amqp,界面如下

2、创建项目,并导入 amqp jar包

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

3、properties文件中添加配置项

spring.rabbitmq.host=127.0.0.1

spring.rabbitmq.port=5672

spring.rabbitmq.username=admin

spring.rabbitmq.password=admin

spring.rabbitmq.cache.channel.size=100

#spring.rabbitmq.publisher-confirms=true

spring.rabbitmq.virtual-host=/

#生产者

mq.producer.x.exchangeName=test_x_exchange

mq.producer.x.exchangeType=direct

mq.producer.x.queueName=test_producer_x_queue

mq.producer.x.routeKey=test_producer_x_queue

mq.producer.x.durable=true

#消费者

mq.consumer.x.exchangeName=test_x_exchange

mq.consumer.x.exchangeType=direct

mq.consumer.x.queueName=test_consumer_x_queue

mq.consumer.x.routeKey=test_consumer_x_queue

mq.consumer.x.durable=true

#用来发送通知

mq.notify.x.exchangeName=test_x_exchange

mq.notify.x.exchangeType=fanout

mq.notify.x.queueName=test_notify_x_queue

mq.notify.x.routeKey=test_notify_x_queue

mq.notify.x.durable=true

4、amqp 配置类

@Configuration

public class AmqpConfig {

private static final Logger LOG = LoggerFactory.getLogger(AmqpConfig.class);

@Autowired

private Globals globals;

public final static String RP_FACTORY_NAME = "rpMQContainerFactory";

public final static String BANK_FACTORY_BEAN_NAME = "bankMQContainerFactory";

@Value("${spring.rabbitmq.host:127.0.0.1}")

private String notifyMqHost;

@Value("${spring.rabbitmq.port:5672}")

private Integer notifyMqPort;

@Value("${spring.rabbitmq.username:admin}")

private String notifyMqUsername;

@Value("${spring.rabbitmq.password:admin}")

private String notifyMqPassword;

@Value("${spring.rabbitmq.channelCacheSize:1024}")

private Integer notifyMqChannelCacheSize;

@Value("${spring.rabbitmq.publisherConfirms:true}")

private String notifyMqPublisherConfirms;

/** ********* ConnectionFactory **************** */

@Bean(name = "factoryProvider")

@Primary

public CachingConnectionFactory factoryProvider(){

CachingConnectionFactory factoryProvider = new CachingConnectionFactory();

factoryProvider.setHost(notifyMqHost);

factoryProvider.setPort(notifyMqPort);

factoryProvider.setUsername(notifyMqUsername);

factoryProvider.setPassword(notifyMqPassword);

factoryProvider.setVirtualHost("/");

return factoryProvider;

}

@Bean(name = "factoryConsumerCgt")

public CachingConnectionFactory factoryConsumerCgt(){

CachingConnectionFactory factoryConsumer = new CachingConnectionFactory();

factoryConsumer.setHost(notifyMqHost);

factoryConsumer.setPort(notifyMqPort);

factoryConsumer.setUsername(notifyMqUsername);

factoryConsumer.setPassword(notifyMqPassword);

factoryConsumer.setVirtualHost("/bank");

return factoryConsumer;

}

/** ******************************************** */

@Bean

public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {

return new Jackson2JsonMessageConverter();

}

/** ********************************************* */

/**

* rabbitTemplate

*

* @param rabbitAdmin

* @return

*/

@Bean

public AmqpTemplate rabbitTemplate(RabbitAdmin rabbitAdmin) {

RabbitTemplate rabbitTemplate = rabbitAdmin.getRabbitTemplate();

rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());

return rabbitTemplate;

}

@Bean

public RabbitAdmin rabbitAdmin(CachingConnectionFactory factoryProvider) {

return new RabbitAdmin(factoryProvider);

}

/** *********** 消费者工厂 ************ */

/*消费者工厂 */

@Bean(RP_FACTORY_NAME)

public SimpleRabbitListenerContainerFactory rpRabbitListenerContainerFactory(

SimpleRabbitListenerContainerFactoryConfigurer configurer,

@Qualifier("factoryProvider")ConnectionFactory factoryProvider) {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

factory.setConnectionFactory(factoryProvider);

factory.setMessageConverter(jackson2JsonMessageConverter());

//单次从队列获取数量

factory.setPrefetchCount(globals.getDefaultPrefetchNumber());

//#默认消费者线程数

factory.setConcurrentConsumers(globals.getMinConsumerNumber());

//最大消费者线程数

factory.setMaxConcurrentConsumers(globals.getMaxConsumerNumber());

//消息格式转换

factory.setMessageConverter(jackson2JsonMessageConverter());

configurer.configure(factory, factoryProvider);

return factory;

}

@Bean(BANK_FACTORY_BEAN_NAME)

public SimpleRabbitListenerContainerFactory bankRabbitListenerContainerFactory(

@Qualifier("factoryConsumerCgt")ConnectionFactory factoryConsumerCgt) {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

factory.setConnectionFactory(factoryConsumerCgt);

factory.setMessageConverter(jackson2JsonMessageConverter());

return factory;

}

/** ********** Queue *********** */

@ConfigurationProperties(prefix = "mq.producer.bank")

@Bean

public QueueSetting bankProducerQueueSett() {

return new QueueSetting();

}

@ConfigurationProperties(prefix = "mq.consumer.bank")

@Bean

public QueueSetting bankConsumerQueueSett() {

return new QueueSetting();

}

@ConfigurationProperties(prefix = "mq.notify.bank")

@Bean

public QueueSetting bankNotifyQueueSett() {

return new QueueSetting();

}

/** ************* Exchange ******************** */

/**

* 发送

* @param rabbitAdmin

* @return

*/

@Bean

public Exchange bankProducerExchange(RabbitAdmin rabbitAdmin, @Qualifier("bankProducerQueueSett") QueueSetting bankProducerQueueSett){

Exchange exchange = new DirectExchange(bankProducerQueueSett().getExchangeName());

rabbitAdmin.declareExchange(exchange);

return exchange;

}

/**

* 发送

* @param rabbitAdmin

* @return

*/

@Bean

public Exchange bankConsumerExchange(RabbitAdmin rabbitAdmin, @Qualifier("bankConsumerQueueSett") QueueSetting bankConsumerQueueSett){

Exchange exchange = new DirectExchange(bankConsumerQueueSett().getExchangeName());

rabbitAdmin.declareExchange(exchange);

return exchange;

}

/**

* 广播结果

* @param rabbitAdmin

* @return

*/

@Bean

public Exchange bankNotifyExchange(RabbitAdmin rabbitAdmin, @Qualifier("bankNotifyQueueSett") QueueSetting bankNotifyQueueSett){

Exchange exchange = new FanoutExchange(bankNotifyQueueSett().getExchangeName());

rabbitAdmin.declareExchange(exchange);

return exchange;

}

/** ******************************************* */

public String getNotifyMqHost() {

return notifyMqHost;

}

public void setNotifyMqHost(String notifyMqHost) {

this.notifyMqHost = notifyMqHost;

}

public Integer getNotifyMqPort() {

return notifyMqPort;

}

public void setNotifyMqPort(Integer notifyMqPort) {

this.notifyMqPort = notifyMqPort;

}

public String getNotifyMqUsername() {

return notifyMqUsername;

}

public void setNotifyMqUsername(String notifyMqUsername) {

this.notifyMqUsername = notifyMqUsername;

}

public String getNotifyMqPassword() {

return notifyMqPassword;

}

public void setNotifyMqPassword(String notifyMqPassword) {

this.notifyMqPassword = notifyMqPassword;

}

public Integer getNotifyMqChannelCacheSize() {

return notifyMqChannelCacheSize;

}

public void setNotifyMqChannelCacheSize(Integer notifyMqChannelCacheSize) {

this.notifyMqChannelCacheSize = notifyMqChannelCacheSize;

}

public String getNotifyMqPublisherConfirms() {

return notifyMqPublisherConfirms;

}

public void setNotifyMqPublisherConfirms(String notifyMqPublisherConfirms) {

this.notifyMqPublisherConfirms = notifyMqPublisherConfirms;

}

}

5、广播返回结果

@Slf4j

@Component

public class MQProvider {

@Autowired

private AmqpTemplate rabbitTemplate;

@Autowired

private QueueSetting bankNotifyQueueSett;

public void send(String msg) {

log.info("MQ广播消息:", msg);

rabbitTemplate.convertAndSend(bankNotifyQueueSett.getExchangeName(), bankNotifyQueueSett.getRouteKey(), msg);

}

}

6、消费MQ数据

@Slf4j

@Component

public class MQConsumer {

/**

* 消费数据发到MQ

*

* @param messageSource

* @param channel

* @param tag

* @throws IOException

* @throws InterruptedException

*/

@RabbitListener(containerFactory = AmqpConfig.BANK_FACTORY_BEAN_NAME,

bindings = @QueueBinding(value = @Queue(value = "${mq.consumer.bank.queueName}", durable = "true",

exclusive = "false", autoDelete = "false"),

exchange = @Exchange(value = "${mq.consumer.bank.exchangeName}", durable = "${mq.consumer.bank.durable}",

type = ExchangeTypes.DIRECT), key = "${mq.consumer.bank.routeKey}"), admin = "rabbitAdmin")

public void process(byte[] messageSource, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag){

try {

String message = new String(messageSource, "UTF-8");

log.info("接收[${mq.queueNamePre}]的消息:{}" ,message);

if (StringUtils.isEmpty(message)) {

log.error("接收 message 为空");

}

//TODO 处理业务数据

} catch (UnsupportedEncodingException e) {

log.error("接收 message UnsupportedEncodingException",e);

} finally {

try {

channel.basicAck(tag, false);

} catch (IOException e) {

log.error("channel.basicAck Exception",e);

}

}

}

}

7、测试发送消息

8、查看发送数据

源码示例:https://gitee.com/lion123/springboot-rabbitmq-demo

 

以上是 springboot项目中使用rabbitmq开发应用 的全部内容, 来源链接: utcz.com/z/514589.html

回到顶部