Spring整合RabbitMQ04MessageListenerAdapter

编程

RabbitConfig

package com.wyg.rabbitmq.springamqp;

import com.wyg.rabbitmq.springamqp.convert.MyPngMesssageConvert;

import com.wyg.rabbitmq.springamqp.convert.MyPDFMessageConvert;

import org.springframework.amqp.core.AcknowledgeMode;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitAdmin;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;

import org.springframework.amqp.support.ConsumerTagStrategy;

import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import com.wyg.rabbitmq.springamqp.convert.MyTextMessageConvert;

/**

* RabbitAdmin

*

* @author wyg0405@gmail.com

* @date 2019-11-25 15:11

* @since JDK1.8

* @version V1.0

*/

@Configuration

public class RabbitConfig {

@Bean

public ConnectionFactory connectionFactory() {

CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();

cachingConnectionFactory.setAddresses("localhost:5672");

cachingConnectionFactory.setUsername("guest");

cachingConnectionFactory.setPassword("guest");

cachingConnectionFactory.setVirtualHost("/");

return cachingConnectionFactory;

}

/**

* RabbitAdmin注入容器

*

* @param connectionFactory

* @return

* @throws @author

* wyg0405@gmail.com

* @date 2019/11/25 16:35

*/

@Bean

public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {

RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);

/*

* autoStartup 必须要设为 true ,否则Spring容器不会加载RabbitAdmin类

*/

rabbitAdmin.setAutoStartup(true);

return rabbitAdmin;

}

/**

* RabbitTemplate注入

*

* @param connectionFactory

* @return

* @throws @author

* wyg0405@gmail.com

* @date 2019/11/25 16:37

*/

public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

return rabbitTemplate;

}

/**

* SimpleMessageListenerContainer注入

*

* @param connectionFactory

* @return

* @throws @author

* wyg0405@gmail.com

* @date 2019/11/25 17:16

*/

@Bean

public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

// 监听多个queue

container.addQueueNames("test01");

//container.addQueueNames("test01", "test02", "test03");

// 设置当前消费者数量

container.setConcurrentConsumers(1);

// 设置最大的消费者数量

container.setMaxConcurrentConsumers(5);

// 设置不要重回队列

container.setDefaultRequeueRejected(false);

// 设置自动签收

container.setAcknowledgeMode(AcknowledgeMode.AUTO);

// 设置消费端tag策略

container.setConsumerTagStrategy(new ConsumerTagStrategy() {

@Override

public String createConsumerTag(String queue) {

return queue + "_" + System.currentTimeMillis();

}

});

// 方式二,使用适配器

MessageListenerAdapter adapter = new MessageListenerAdapter(new MyMessageListenerDelegate());

// 自定处理消息方法,不设置默认为handleMessage

adapter.setDefaultListenerMethod("consumeMsg");

// 自定义消息转换器

adapter.setMessageConverter(new MyTextMessageConvert());

container.setMessageListener(adapter);

return container;

}

}

MyMessageListenerDelegate,自定义适配器

package com.wyg.rabbitmq.springamqp;

import java.io.File;

import java.util.Map;

import com.wyg.rabbitmq.springamqp.convert.Order;

import com.wyg.rabbitmq.springamqp.convert.User;

/**

*

* @author wyg0405@gmail.com

* @date 2019-11-29 14:23

* @since JDK1.8

* @version V1.0

*/

public class MyMessageListenerDelegate {

// 默认方法

public void handleMessage(byte[] body) {

System.out.println("默认处理方法,message:" + new String(body));

}

// 自定义处理方法

public void consumeMsg(byte[] msgBody) {

System.out.println("自定义处理方法,message:" + new String(msgBody));

}

// 自定义处理String类型消息方法

public void consumeMsg(String msgBody) {

System.out.println("自定义处理String消息方法,message:" + new String(msgBody));

}

}

RabbitConfigTest 单元测试

package com.wyg.rabbitmq.springamqp;

import java.io.*;

import java.lang.reflect.Field;

import java.nio.file.Files;

import java.nio.file.Path;

import java.nio.file.Paths;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.amqp.core.*;

import org.springframework.amqp.rabbit.core.RabbitAdmin;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

import com.fasterxml.jackson.core.JsonProcessingException;

import com.fasterxml.jackson.databind.ObjectMapper;

import com.fasterxml.jackson.databind.json.JsonMapper;

import com.wyg.rabbitmq.springamqp.convert.Order;

import com.wyg.rabbitmq.springamqp.convert.User;

@RunWith(SpringRunner.class)

@SpringBootTest

public class RabbitConfigTest {

@Autowired

RabbitAdmin rabbitAdmin;

@Autowired

private RabbitTemplate rabbitTemplate;

@Autowired

private SimpleMessageListenerContainer simpleMessageListenerContainer;

@Test

public void sendTextMsg() {

// 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02",

// "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey

for (int i = 0; i < 3; i++) {

MessageProperties messageProperties = new MessageProperties();

messageProperties.setContentType("application/text");

String body = "第" + i + "条消息";

Message msg = new Message(body.getBytes(), messageProperties);

rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg);

}

}

}

以上是 Spring整合RabbitMQ04MessageListenerAdapter 的全部内容, 来源链接: utcz.com/z/511387.html

回到顶部