消息中间件二之AMQP实战(下)RabbitMQspringboot实践

编程

直接上代码

配置类

@Configuration

public class RabbitConfig {

// mq地址

@Bean(value = "connectionFactory")

@Primary

public ConnectionFactory connectionFactory(

@Value("${spring.rabbitmq.host}") String host,

@Value("${spring.rabbitmq.port}") int port,

@Value("${spring.rabbitmq.username}") String username,

@Value("${spring.rabbitmq.password}") String password,

@Value("${spring.rabbitmq.virtual-host}") String virtualHost) {

CachingConnectionFactory connectionFactory = new CachingConnectionFactory();

connectionFactory.setHost(host);

connectionFactory.setPort(port);

connectionFactory.setUsername(username);

connectionFactory.setPassword(password);

connectionFactory.setVirtualHost(virtualHost);

return connectionFactory;

}

@Bean

public RetryTemplate retryTemplate() {

RetryTemplate retryTemplate = new RetryTemplate();

ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();

backOffPolicy.setInitialInterval(500);

backOffPolicy.setMultiplier(10.0);

backOffPolicy.setMaxInterval(10000);

retryTemplate.setBackOffPolicy(backOffPolicy);

return retryTemplate;

}

// mq发送

@Bean

public AmqpTemplate myMQTemplate(@Qualifier("connectionFactory") ConnectionFactory connectionFactory) {

RabbitTemplate template = new RabbitTemplate(connectionFactory);

template.setRetryTemplate(retryTemplate());

template.setMessageConverter(new Jackson2JsonMessageConverter());

return template;

}

}

MQ生产者

@Component

public class MQSender {

@Autowired

private AmqpTemplate myMQTemplate;

public void send(String exchangeName, String routingKey, Object object) {

myMQTemplate.convertAndSend(exchangeName, routingKey, object);

}

}

需要注意, 这里没有生成exchange, 需要手动创建, 如果需要程序自动创建, 则需要将exchange声明为bean即可.

执行测试代码

@Autowired

private MQSender mqSender;

@Test

public void send() {

mqSender.send("e.send", "r.send", "send a message");

}

如果你观察的及时, 估计还能看见exchange收到消息的曲线波动~

因为没有消费者, 所以exchange在接收到信息后直接将消息丢弃了, 现在我们创建对应的队列, 并绑定.

再次执行就可以看到队列中有消息了.

下面是消费者

首先我们要在配置类中增加监听配置, 一个自动ack, 一个手动ack

// 自动ack

@Bean(value = "listenerFactoryWithAutoAck")

public SimpleRabbitListenerContainerFactory listenerFactoryWithAutoAck(@Qualifier("connectionFactory") ConnectionFactory newRentConnectionFactory) {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

factory.setConnectionFactory(newRentConnectionFactory);

factory.setMessageConverter(new Jackson2JsonMessageConverter());

factory.setConcurrentConsumers(3);

factory.setMaxConcurrentConsumers(10);

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

return factory;

}

// 手动ack

@Bean(value = "listenerFactoryWithManualAck")

public SimpleRabbitListenerContainerFactory listenerFactoryWithManualAck(@Qualifier("connectionFactory") ConnectionFactory newRentConnectionFactory) {

SimpleRabbitListenerContainerFactory factory = listenerFactory(newRentConnectionFactory);

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

return factory;

}

如果你使用的springboot版本是2.2.*的话, 可以更方便的在监听的注解上设置ackMode, 并且会覆盖在配置监听工厂的配置方式, 这里使用的版本是2.1.8所以只能在配置工厂时设置.

@Component

public class MQListener {

@RabbitListener(containerFactory = "listenerFactoryWithManualAck",

queues = "q.send")

public void consume(Message message, Channel channel,

@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {

System.out.println("message: " + message.toString());

String msg = new String(message.getBody(), StandardCharsets.UTF_8);

System.out.println("message body: " + msg);

}

}

执行后看到消息并没有被消费, 这是因为我们使用手动应答监听, 但是没有发送应答, 服务器将消息重新入队列.

在监听中加入代码, false表示只是响应这条信息, true表示所有信息.

channel.basicAck(tag, false);

启动程序可以看到信息被消费了.

在程序抛异常时, 可能需要手动处理异常, 拒绝消息. true表示消息重新入队列, 还可以被消费; false表示直接丢弃消息

channel.basicReject(tag, true);

以上就是RabbitMQ在springboot中的简单实用

以上是 消息中间件二之AMQP实战(下)RabbitMQspringboot实践 的全部内容, 来源链接: utcz.com/z/514280.html

回到顶部