消息队列整合SpringBoot和RabbitMQ

阿粉之前已经是教给大家如何安装RabbitMQ,如何写一个生产者,如何写一个消费者,而接下来的这篇文章,详细讲解一下如何使用队列和交换机进行不同的发布消息以及消费消息,以及怎么整合SpringBoot和RabbitMQ。

<–more–>

1. 简单消息模式

下面阿粉就只用一个项目把所有类型的交换机全部都加入到一个SpringBoot项目中来,我们首先需要创建的就是一个Config类,而这个Config类中将会把所有的工作模式集中在这里。

创建个SpringBoot项目,然后配置pom.xml文件,在中间加入我们的依赖Jar。

未命名

1

2

3

4

5

6

7

8

9

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

<dependency>

<groupId>com.alibaba.mq-amqp</groupId>

<artifactId>mq-amqp-client</artifactId>

<version>1.0.5</version>

</dependency>

加入依赖环境,配置启动配置。

未命名

然后我们就开始写个简单消息模式的配置

未命名

简单发消息的Producer和Consumer

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

@Component

public class Producer {

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* 发生消息到RabbitMQ,使用SpringBoot默认的交换机

*

* @param message

*/

public void sendMessage(String message) {

rabbitTemplate.setConfirmCallback(confirmCallback);

rabbitTemplate.convertAndSend(RabbitMQConfig.SIMPLE_QUEUE_NAME, message);

}

//回调函数: confirm确认 此处的confirm确认是发送的ACK确认

final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

if(ack){

//如果confirm返回成功 则进行更新

System.out.println("confirm确认消息已经发出");

} else {

//失败则进行具体的后续操作:重试 或者补偿等手段

System.err.println("异常处理...");

}

}

};

}

阿粉不知不觉的调用了竟然有7次,

未命名

这么多了,那么咱们赶紧都给他消费了吧。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

    @Autowired

private RabbitTemplate rabbitTemplate;

//记住这个,这Template是SpringBoot里面整合RabbitMQ自带的,不是我自己专门写的哈,大家不要一看到这个注入,就寻思是个自己写的。

/**

* 普通默认点对点的消息消费者

* @param message

*/

@RabbitListener(queues = {RabbitMQConfig.SIMPLE_QUEUE_NAME})

@RabbitHandler

public void receiveMessage(String message, Channel channel, @Headers Map<String,Object> headers) { //

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

//处理监听得到的消息

String message = null;

try {

message = new String(body, "UTF-8");

//消息处理逻辑

sendMessage(message);

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

channel.abort();

} finally {

channel.basicAck(envelope.getDeliveryTag(), false);

}

}

};

boolean autoAck = false;

//消息消费完成确认

try {

channel.basicConsume(RabbitMQConfig.SIMPLE_QUEUE_NAME, autoAck, consumer);

}catch (Exception e){

}

System.out.println("消费者进行消息消费:消费主体是-----"+ message);

}

未命名

未命名

这一口气怎么都给我消费光了呢?是不是有点不厚道。

@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列,而我们给的这个队列名字也很简单SIMPLE_QUEUE_NAME,发现里面有消息,我知道了,我就要开始消费了,你给我一条,我给你消费一条。

未命名

上图也是官网上给出的简单模式的消息队列,就是你发一条,我消费一条。

Work(工作)模式

未命名

工作模式这个其实更简单,就是,一个生产者,然后整出了2个消费者去消费,大家可以在消费者里面输出内容,然后循环调用生产者,一口气给他生成个100条,然后看消费者是怎么调用的,阿粉相信大家对这个模式肯定理解是非常到位的,因为,确实是随机的,如果你不相信,大家可以看看。

未命名

1

2

3

4

5

        //消息发送

for (int i = 0;i < 100 ;i++){

producer.sendMessage("生产消息消息"+i);

}

消费者一模一样的复制出一份,在消费消息的时候打印一下就可以啦

在工作模式的时候,大家一定要注意一点,那就是一个队列中一条消息,只能被一个消费者消费

发布订阅模式

接下来我们先吧配置给大家放上,大家一定是希望看到这个内容,毕竟拿过来就可以用的。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

/**

* Fanout型交换机MQ模型:订阅模式,消息到达交换机会转发到与该交换机绑定的队列

*/

/**

* 与fanout绑定的第一个队列

*/

public static final String FIRST_FANOUT_QUEUE_NAME = "First_Fanout_Queue";

/**

* 与fanout交换机绑定的第二个队列

*/

public static final String SECOND_FANOUT_QUEUE_NAME = "Send_Fanout_Queue";

/**

* fanout 交换机

*/

public static final String FANOUT_EXCHANGE_NAME = "Fanout_Exchange";

/**

* FanoutExchange,持久化、非自动删除

*

* @return

*/

@Bean

public FanoutExchange fanoutExchange() {

return new FanoutExchange(FANOUT_EXCHANGE_NAME);

}

@Bean

public Queue firstFanoutQueue() {

return new Queue(FIRST_FANOUT_QUEUE_NAME);

}

@Bean

public Queue secondFanoutQueue() {

return new Queue(SECOND_FANOUT_QUEUE_NAME);

}

@Bean

public Binding firstFanoutBinding() {

return BindingBuilder.bind(firstFanoutQueue()).to(fanoutExchange());

}

@Bean

public Binding secondFanoutBinding() {

return BindingBuilder.bind(secondFanoutQueue()).to(fanoutExchange());

}

这种模式比较有意思,和之前的工作模式是不一样的,因为发布了一条消息,两个消费者那是都能消费的,为什么呢?那就是上面的一个bind方法,在这个方法里面,实际上相当于把消息发给了交换机,而交换机帮我们做了一件事情,那就是根据绑定来发送,我们再来试试看。

代码写出来

Consumer

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

  /**

* Fanout型交换机的MQ模型消费消息

* @param message

*/

@RabbitListener(queues = {RabbitMQConfig.FIRST_FANOUT_QUEUE_NAME})

@RabbitHandler

public void receiveFanoutMessage(String message) {

System.out.println("11111Fanout消费者进行消息消费:消费主体是-----"+ message);

}

/**

* Fanout型交换机的MQ模型消费消息

* @param message

*/

@RabbitListener(queues = {RabbitMQConfig.SECOND_FANOUT_QUEUE_NAME})

@RabbitHandler

public void receiveFanoutMessage2(String message) {

System.out.println("2222Fanout消费者进行消息消费:消费主体是-----"+ message);

}

Producer

1

2

3

4

5

6

7

8

9

/**

* 发送消息至fanout交换机,由于fanout只关注订阅关系,所以routing key随便指定都可以

* @param message

*/

public void sendFanoutMessage(String message){

rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME, "------", message);

}

大家肯定也都很好奇,为什么呢?之前阿粉还专门说了一句话一个队列中一条消息,只能被一个消费者消费,但是你现在仔细看一下,这种发布订阅模式,他发给了谁?是不是交换机,那交换机中你自定义绑定了什么?是不是两个不同的队列,一个是队列First,一个是队列Second,所以,对于生产者来说,他只是发布了一条消息,但是他把消息发布到了交换机中,而交换机是根据你绑定队列数来进行消息的消费的,这样想的话,是不是就很明确了。

今天阿粉也不给大家讲后三种了,因为一次学的太多了,敲代码容易记不住,阿粉下一篇文章会继续带大家认识路由模式,Topic模式,还有RPC模式,如果大家迫不及待的想学习,欢迎大家来点个赞。

文献参考

《RabbitMQ官方文档》

《如何学习RabbitMQ的六种工作模式》

以上是 消息队列整合SpringBoot和RabbitMQ 的全部内容, 来源链接: utcz.com/a/129571.html

回到顶部