springboot项目中使用rabbitmq开发应用
AMQP介绍
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。
Exchange有Direct、Fanout、Topic、Headers,最常用的是Direct、Fanout、Topic三种类型。
“生产/消费”消息模型
生产者发送消息到broker server(RabbitMQ)。在Broker内部,用户创建Exchange/Queue,
通过Binding规则将两者联系在一起。Exchange分发消息,根据类型/binding的不同分发策略有区别。
消息最后来到Queue中,等待消费者取走。
AMQP messaging 中的基本概念
Broker: 接收和分发消息的应用,RabbitMQ Server就是Message Broker。
Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。
Connection: publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。
Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
Queue: 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。
Binding: exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。
A、SpringAMQP声明
SpringAMQP声明即在rabbit基础API里面声明一个exchange、Bingding、queue。使用SpringAMQP去声明,就需要使用@bean的声明方式。
B、RabbitAdmin
(1)RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可
(2)autoStartup必须设置true,否则Spring容器不会加载RabbitAdmin类
(3)RabbitAdmin底层实现就是从Spring容器中获取Exchagge,Bingding,RoutingKey以及Queue的@Bean声明
(4)然后使用RabbitTemplate的execute方法执行对应声明,修改,删除等操作
C、RabbitTemplate
Spring AMQP 提供了 RabbitTemplate 来简化 RabbitMQ 发送和接收消息操作
D、SimpleMessageListenerContailer
(1)简单消息监听容器:这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
(2)设置事务特性,事务管理器,事务属性,事务容量,事务开启等
(3)设置消息确认和自动确认模式,是否重回队列,异常捕获handler函数
(4)设置消费者标签生成策略,是否独占模式,消费者属性等
(5)simpleMessageListenerContailer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小,接收消息的模式等
E、MessageListenerAdapter
1)可以把一个没有实现MessageListener和ChannelAwareMessageListener接口的类适配成一个可以处理消息的处理器
2)默认的方法名称为:handleMessage,可以通过setDefaultListenerMethod设置新的消息处理方法
3)MessageListenerAdapter支持不同的队列交给不同的方法去执行。使用setQueueOrTagToMethodName方法设置,当根据queue名称没有找到匹配的方法的时候,就会交给默认的方法去处理。
F、MessageConverter
(1)消息转换器
(2)在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要MessageConverter
(3)实现MessageConverter接口,重写toMessage(java对象转换为Message) fromMessage(Message对象转换为java对象)
(4)json转换器,自定义二进制转换器(比如图片类型,pdf,ppt,流媒体)
1、安装并启动 amqp,方法如:https://my.oschina.net/lion1220/blog/3151027
登录amqp,界面如下
2、创建项目,并导入 amqp jar包
<dependency> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3、properties文件中添加配置项
spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.cache.channel.size=100
#spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
#生产者
mq.producer.x.exchangeName=test_x_exchange
mq.producer.x.exchangeType=direct
mq.producer.x.queueName=test_producer_x_queue
mq.producer.x.routeKey=test_producer_x_queue
mq.producer.x.durable=true
#消费者
mq.consumer.x.exchangeName=test_x_exchange
mq.consumer.x.exchangeType=direct
mq.consumer.x.queueName=test_consumer_x_queue
mq.consumer.x.routeKey=test_consumer_x_queue
mq.consumer.x.durable=true
#用来发送通知
mq.notify.x.exchangeName=test_x_exchange
mq.notify.x.exchangeType=fanout
mq.notify.x.queueName=test_notify_x_queue
mq.notify.x.routeKey=test_notify_x_queue
mq.notify.x.durable=true
4、amqp 配置类
@Configurationpublic class AmqpConfig {
private static final Logger LOG = LoggerFactory.getLogger(AmqpConfig.class);
@Autowired
private Globals globals;
public final static String RP_FACTORY_NAME = "rpMQContainerFactory";
public final static String BANK_FACTORY_BEAN_NAME = "bankMQContainerFactory";
@Value("${spring.rabbitmq.host:127.0.0.1}")
private String notifyMqHost;
@Value("${spring.rabbitmq.port:5672}")
private Integer notifyMqPort;
@Value("${spring.rabbitmq.username:admin}")
private String notifyMqUsername;
@Value("${spring.rabbitmq.password:admin}")
private String notifyMqPassword;
@Value("${spring.rabbitmq.channelCacheSize:1024}")
private Integer notifyMqChannelCacheSize;
@Value("${spring.rabbitmq.publisherConfirms:true}")
private String notifyMqPublisherConfirms;
/** ********* ConnectionFactory **************** */
@Bean(name = "factoryProvider")
@Primary
public CachingConnectionFactory factoryProvider(){
CachingConnectionFactory factoryProvider = new CachingConnectionFactory();
factoryProvider.setHost(notifyMqHost);
factoryProvider.setPort(notifyMqPort);
factoryProvider.setUsername(notifyMqUsername);
factoryProvider.setPassword(notifyMqPassword);
factoryProvider.setVirtualHost("/");
return factoryProvider;
}
@Bean(name = "factoryConsumerCgt")
public CachingConnectionFactory factoryConsumerCgt(){
CachingConnectionFactory factoryConsumer = new CachingConnectionFactory();
factoryConsumer.setHost(notifyMqHost);
factoryConsumer.setPort(notifyMqPort);
factoryConsumer.setUsername(notifyMqUsername);
factoryConsumer.setPassword(notifyMqPassword);
factoryConsumer.setVirtualHost("/bank");
return factoryConsumer;
}
/** ******************************************** */
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
/** ********************************************* */
/**
* rabbitTemplate
*
* @param rabbitAdmin
* @return
*/
@Bean
public AmqpTemplate rabbitTemplate(RabbitAdmin rabbitAdmin) {
RabbitTemplate rabbitTemplate = rabbitAdmin.getRabbitTemplate();
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
public RabbitAdmin rabbitAdmin(CachingConnectionFactory factoryProvider) {
return new RabbitAdmin(factoryProvider);
}
/** *********** 消费者工厂 ************ */
/*消费者工厂 */
@Bean(RP_FACTORY_NAME)
public SimpleRabbitListenerContainerFactory rpRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("factoryProvider")ConnectionFactory factoryProvider) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(factoryProvider);
factory.setMessageConverter(jackson2JsonMessageConverter());
//单次从队列获取数量
factory.setPrefetchCount(globals.getDefaultPrefetchNumber());
//#默认消费者线程数
factory.setConcurrentConsumers(globals.getMinConsumerNumber());
//最大消费者线程数
factory.setMaxConcurrentConsumers(globals.getMaxConsumerNumber());
//消息格式转换
factory.setMessageConverter(jackson2JsonMessageConverter());
configurer.configure(factory, factoryProvider);
return factory;
}
@Bean(BANK_FACTORY_BEAN_NAME)
public SimpleRabbitListenerContainerFactory bankRabbitListenerContainerFactory(
@Qualifier("factoryConsumerCgt")ConnectionFactory factoryConsumerCgt) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(factoryConsumerCgt);
factory.setMessageConverter(jackson2JsonMessageConverter());
return factory;
}
/** ********** Queue *********** */
@ConfigurationProperties(prefix = "mq.producer.bank")
@Bean
public QueueSetting bankProducerQueueSett() {
return new QueueSetting();
}
@ConfigurationProperties(prefix = "mq.consumer.bank")
@Bean
public QueueSetting bankConsumerQueueSett() {
return new QueueSetting();
}
@ConfigurationProperties(prefix = "mq.notify.bank")
@Bean
public QueueSetting bankNotifyQueueSett() {
return new QueueSetting();
}
/** ************* Exchange ******************** */
/**
* 发送
* @param rabbitAdmin
* @return
*/
@Bean
public Exchange bankProducerExchange(RabbitAdmin rabbitAdmin, @Qualifier("bankProducerQueueSett") QueueSetting bankProducerQueueSett){
Exchange exchange = new DirectExchange(bankProducerQueueSett().getExchangeName());
rabbitAdmin.declareExchange(exchange);
return exchange;
}
/**
* 发送
* @param rabbitAdmin
* @return
*/
@Bean
public Exchange bankConsumerExchange(RabbitAdmin rabbitAdmin, @Qualifier("bankConsumerQueueSett") QueueSetting bankConsumerQueueSett){
Exchange exchange = new DirectExchange(bankConsumerQueueSett().getExchangeName());
rabbitAdmin.declareExchange(exchange);
return exchange;
}
/**
* 广播结果
* @param rabbitAdmin
* @return
*/
@Bean
public Exchange bankNotifyExchange(RabbitAdmin rabbitAdmin, @Qualifier("bankNotifyQueueSett") QueueSetting bankNotifyQueueSett){
Exchange exchange = new FanoutExchange(bankNotifyQueueSett().getExchangeName());
rabbitAdmin.declareExchange(exchange);
return exchange;
}
/** ******************************************* */
public String getNotifyMqHost() {
return notifyMqHost;
}
public void setNotifyMqHost(String notifyMqHost) {
this.notifyMqHost = notifyMqHost;
}
public Integer getNotifyMqPort() {
return notifyMqPort;
}
public void setNotifyMqPort(Integer notifyMqPort) {
this.notifyMqPort = notifyMqPort;
}
public String getNotifyMqUsername() {
return notifyMqUsername;
}
public void setNotifyMqUsername(String notifyMqUsername) {
this.notifyMqUsername = notifyMqUsername;
}
public String getNotifyMqPassword() {
return notifyMqPassword;
}
public void setNotifyMqPassword(String notifyMqPassword) {
this.notifyMqPassword = notifyMqPassword;
}
public Integer getNotifyMqChannelCacheSize() {
return notifyMqChannelCacheSize;
}
public void setNotifyMqChannelCacheSize(Integer notifyMqChannelCacheSize) {
this.notifyMqChannelCacheSize = notifyMqChannelCacheSize;
}
public String getNotifyMqPublisherConfirms() {
return notifyMqPublisherConfirms;
}
public void setNotifyMqPublisherConfirms(String notifyMqPublisherConfirms) {
this.notifyMqPublisherConfirms = notifyMqPublisherConfirms;
}
}
5、广播返回结果
@Slf4j@Component
public class MQProvider {
@Autowired
private AmqpTemplate rabbitTemplate;
@Autowired
private QueueSetting bankNotifyQueueSett;
public void send(String msg) {
log.info("MQ广播消息:", msg);
rabbitTemplate.convertAndSend(bankNotifyQueueSett.getExchangeName(), bankNotifyQueueSett.getRouteKey(), msg);
}
}
6、消费MQ数据
@Slf4j@Component
public class MQConsumer {
/**
* 消费数据发到MQ
*
* @param messageSource
* @param channel
* @param tag
* @throws IOException
* @throws InterruptedException
*/
@RabbitListener(containerFactory = AmqpConfig.BANK_FACTORY_BEAN_NAME,
bindings = @QueueBinding(value = @Queue(value = "${mq.consumer.bank.queueName}", durable = "true",
exclusive = "false", autoDelete = "false"),
exchange = @Exchange(value = "${mq.consumer.bank.exchangeName}", durable = "${mq.consumer.bank.durable}",
type = ExchangeTypes.DIRECT), key = "${mq.consumer.bank.routeKey}"), admin = "rabbitAdmin")
public void process(byte[] messageSource, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag){
try {
String message = new String(messageSource, "UTF-8");
log.info("接收[${mq.queueNamePre}]的消息:{}" ,message);
if (StringUtils.isEmpty(message)) {
log.error("接收 message 为空");
}
//TODO 处理业务数据
} catch (UnsupportedEncodingException e) {
log.error("接收 message UnsupportedEncodingException",e);
} finally {
try {
channel.basicAck(tag, false);
} catch (IOException e) {
log.error("channel.basicAck Exception",e);
}
}
}
}
7、测试发送消息
8、查看发送数据
源码示例:https://gitee.com/lion123/springboot-rabbitmq-demo
以上是 springboot项目中使用rabbitmq开发应用 的全部内容, 来源链接: utcz.com/z/514589.html