RabbitMq在低并发下处理失效订单的应用

编程

RabbitMQ 基本概念

  • Message

    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

  • Publisher

    消息的生产者,也是一个向交换器发布消息的客户端应用程序。

  • Exchange

    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

  • Binding

    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

  • Queue

    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

  • Connection

    网络连接,比如一个TCP连接。

  • Channel

    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

  • Consumer

    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

  • Virtual Host

    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

Exchange 类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型

direct

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

fanout

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

topic

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配不多不少一个单词。

消息队列的应用场景

消息通讯

消息队列最主要功能收发消息,其内部有高效的通讯机制,因此非常适合用于消息通讯。

我们可以基于消息队列开发点对点聊天系统,也可以开发广播系统,用于将消息广播给大量接收者。

异步处理

一般我们写的程序都是顺序执行(同步执行),比如一个用户注册函数,其执行顺序如下:

  • 1.写入用户注册数据。

  • 2.发送注册邮件。

  • 3.发送注册成功的短信通知。

  • 4.更新统计数据。

按照上面的执行顺序,要全部执行完毕,才能返回成功,但其实在第1步执行成功后,其他的步骤完全可以异步执行,我们可以将后面的逻辑发给消息队列,再由其他程序异步执行,如下所示:

使用消息队列进行异步处理,可以更快地返回结果,加快服务器的响应速度,提升了服务器的性能。

服务解耦

在我们的系统中,应用与应用之间的通讯是很常见的,一般我们应用之间直接调用,比如说应用A调用应用B的接口,这时候应用之间的关系是强耦合的。

如果应用B处于不可用的状态,那么应用A也会受影响。

在应用A与应用B之间引入消息队列进行服务解耦,如果应用B挂掉,也不会影响应用A的使用。

流量削峰

对于高并发的系统来说,在访问高峰时,突发的流量就像洪水般向应用系统涌过来,尤其是一些高并发写操作,随时会导致数据库服务器瘫痪,无法继续提供服务。

而引入消息队列则可以减少突发流量对应用系统的冲击。消息队列就像“水库”一样,拦蓄上游的洪水,削减进入下游河道的洪峰流量,从而达到减免洪水灾害的目的。

这方面最常见的例子就是秒杀系统,一般秒杀活动瞬间流量很高,如果流量全部涌向秒杀系统,会压垮秒杀系统,通过引入消息队列,可以有效缓冲突发流量,达到“削峰填谷”的作用。

RabbitMQ异步通知业务邮件信息

完整的项目代码地址为:https://gitee.com/jiansin/jjlang-exam/tree/master/exam-demo

rabbitmq设置邮件配置类

@Configuration

public class RabbitConfig {

private static final Logger log= LoggerFactory.getLogger(RabbitConfig.class);

@Autowired

private CachingConnectionFactory connectionFactory;

/**

* 日志队列

* @return

*/

@Bean

public Queue logQueue(){

return new Queue(DemoConstant.LOG_QUEUE_NAME,true);

}

/**

* 日志交换机

* @return

*/

@Bean

public DirectExchange logDirectExchange(){

return new DirectExchange(DemoConstant.LOG_EXCHANGE_NAME,true,false);

}

/**

* 绑定队列和交换机

* @return

*/

@Bean

public Binding logQueueExchangeBinding(){

return BindingBuilder.bind(logQueue()).to(logDirectExchange()).with(DemoConstant.LOG_ROUTING_KEY);

}

@Bean

public Jackson2JsonMessageConverter jackson2JsonMessageConverter (){

return new Jackson2JsonMessageConverter();

}

@Bean

public RabbitTemplate rabbitTemplate(){

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());

// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调

rabbitTemplate.setMandatory(true);

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

//可以处理

log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);

}

});

// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

@Override

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);

}

});

return rabbitTemplate;

}

}

常量配置类

/**

* @description:

* @author: lilang

* @version:

* @modified By:1170370113@qq.com

*/

public class DemoConstant {

public static final String LOG_QUEUE_NAME = "rabbitmq-log-queue";

public static final String LOG_EXCHANGE_NAME = "rabbitmq-log-exchange";

public static final String LOG_ROUTING_KEY = "rabbtimq-log-routingkey";

}

邮件生产者进行消息生产

@Service

public class RabbitMqServiceImpl implements RabbitMqService {

private static final Logger logger= LoggerFactory.getLogger(RabbitMqServiceImpl.class);

@Autowired

private RabbitTemplate rabbitTemplate;

@Autowired

private ObjectMapper objectMapper;

@Override

public String sendInfo() {

try {

UserLog userLog = new UserLog("我是发送的日志消息信息","200");

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

rabbitTemplate.setExchange(DemoConstant.LOG_EXCHANGE_NAME);

rabbitTemplate.setRoutingKey(DemoConstant.LOG_ROUTING_KEY);

Message message = MessageBuilder.withBody(objectMapper.writeValueAsBytes(userLog)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();

message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON);

for (int i =0 ; i< 10000 ;i++){

rabbitTemplate.convertAndSend(message);

}

} catch (JsonProcessingException e) {

logger.info("消息发送异常",e);

}

return "发送成功";

}

}

邮件消费者进行消息消费

@Slf4j

@Component

public class BusinessMessageReceiver {

private static final Logger logger = LoggerFactory.getLogger(BusinessMessageReceiver.class);

@Autowired

private ObjectMapper objectMapper;

/**

* 监听消费用户日志

*

* @param message

*/

@RabbitListener(queues = DemoConstant.LOG_QUEUE_NAME)

public void consumeUserLogQueue(Message message,Channel channel) throws IOException {

try {

channel.basicQos(1);

byte []messageBody = message.getBody();

UserLog userLog = objectMapper.readValue(messageBody, UserLog.class);

logger.info("监听消费用户日志 监听到消息: {} ", userLog);

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

//TODO:记录进行发送邮件信息

} catch (Exception e) {

logger.info("消息处理出现异常", e);

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

}

}

死信队列: DLX,dead-letter-exchange

死信消息原因

1.消息被拒绝 (basic.reject / basic.nack) 并且 reQueue=false

2.消息 TTL 过期

当这个队列中有死信时,rabbitMQ 就会自动的将这个消息重新发布到设置的 死信exchange 上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理。

死信队列应用

  • 保证消息百分百投递

  • 处理需要延时处理的任务,比如订单失效处理(也可用定时任务进行处理)

死信队列在秒杀场景中处理失效订单应用

  • 处理失效订单的方式有多种方式进行处理,比如定时任务等,如下采用mq进行处理。

  • 下面实际用途容易将数据库打挂,同样可以采用MQ进行消峰处理,如下主要是对低并发下失效订单的处理。

常量配置类:

/**

* @description:

* @author: lilang

* @version:

* @modified By:1170370113@qq.com

*/

public class DemoConstant {

//订单超时未支付自动失效-死信队列消息模型

//设置死信队列交换机

public static final String MQ_KILL_ITEM_SUCCESS_KILL_DEAD_EXCHANGE = "kill.item.success.kill.dead.exchange";

//设置死信队列路由key

public static final String MQ_KILL_ITEM_SUCCESS_KILL_DEAD_ROUTING_KEY = "kill.item.success.kill.dead.routing.key";

//设置死信队列queue名称

public static final String MQ_KILL_ITEM_SUCCESS_KILL_DEAD_REAL_QUEUE="kill.item.success.kill.dead.real.queue";

//设置正常秒杀队列名称

public static final String MQ_KILL_ITEM_SUCCESS_KILL_DEAD_QUEUE = "kill.item.success.kill.dead.queue";

//设置正常秒杀队列交换机

public static final String MQ_KILL_ITEM_SUCCESS_KILL_DEAD_PROD_EXCHANGE="kill.item.success.kill.dead.prod.exchange";

//正常秒杀队列交换机和正常秒杀queue之间的路由键

public static final String MQ_KILL_ITEM_SUCCESS_KILL_DEAD_PROD_ROUTING_KEY="kill.item.success.kill.dead.prod.routing.key";

//毫秒作为单位

public static final String KILL_SUCESS_DEAD_EXPIRE = "30000";

}

rabbitmq队列配置类:

@Configuration

public class RabbitConfig {

private static final Logger log= LoggerFactory.getLogger(RabbitConfig.class);

@Autowired

private CachingConnectionFactory connectionFactory;

@Bean

public Jackson2JsonMessageConverter jackson2JsonMessageConverter (){

return new Jackson2JsonMessageConverter();

}

@Bean

public RabbitTemplate rabbitTemplate(){

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());

// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调

rabbitTemplate.setMandatory(true);

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

//可以处理

log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);

}

});

// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

@Override

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);

}

});

return rabbitTemplate;

}

//构建秒杀成功之后-订单超时未支付的死信队列消息模型

@Bean

public Queue successKillDeadQueue(){

Map<String, Object> argsMap= Maps.newHashMap();

argsMap.put("x-dead-letter-exchange",DemoConstant.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_EXCHANGE);

argsMap.put("x-dead-letter-routing-key",DemoConstant.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_ROUTING_KEY);

return new Queue(DemoConstant.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_QUEUE,true,false,false,argsMap);

}

//基本交换机

@Bean

public TopicExchange successKillDeadProdExchange(){

return new TopicExchange(DemoConstant.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_PROD_EXCHANGE,true,false);

}

//创建基本交换机+基本路由 -> 死信队列 的绑定

@Bean

public Binding successKillDeadProdBinding(){

return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(DemoConstant.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_PROD_ROUTING_KEY);

}

//死信队列queue

@Bean

public Queue successKillRealQueue(){

return new Queue(DemoConstant.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_REAL_QUEUE,true);

}

//死信交换机

@Bean

public TopicExchange successKillDeadExchange(){

return new TopicExchange(DemoConstant.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_EXCHANGE,true,false);

}

//死信交换机+死信路由->真正队列 的绑定

@Bean

public Binding successKillDeadBinding(){

return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(DemoConstant.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_ROUTING_KEY);

}

}

控制层:

@Controller

@RequestMapping("/kill")

@Api("高并发测试控制类")

public class KillDemoController {

@Autowired

private KillService killService;

/***

* 商品秒杀核心业务逻辑-用于压力测试

*/

@RequestMapping(value ="/iphone",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)

@ResponseBody

public CommonResult killItemLockByRedisson(@RequestBody @Validated KillDto dto){

//基于Redisson的分布式锁进行控制

Boolean res=killService.killItemLockByRedisson(dto.getKillId(),dto.getUserId());

return CommonResult.success(res);

}

}

业务控制层:

@Service

public class KillServiceImpl implements KillService {

private static final Logger logger= LoggerFactory.getLogger(KillService.class);

private SnowFlake snowFlake=new SnowFlake(2,3);

@Autowired

private KillDao killDao;

@Qualifier("redissonSingle")

@Autowired

private RedissonClient redissonClient;

@Autowired

private RabbitMqService rabbitMqService;

@Override

public Boolean killItemLockByZookeeper(Integer killId, Integer userId) {

Boolean result=false;

InterProcessMutex mutex=new InterProcessMutex(curatorFramework,pathPrefix+killId+userId+"-lock");

try {

//限时等待

if (mutex.acquire(10L,TimeUnit.SECONDS)){

//核心业务逻辑的处理

//根据用户id,订单id查询是否已经秒杀了

if (killDao.countByKillUserId(killId,userId) <= 0){

//查询对应的产品信息

//获取秒杀商品详细信息

ItemKill itemKill=killDao.selectItemById(killId);

if (itemKill!=null && 1 == itemKill.getCanKill() && itemKill.getTotal()>0){

//扣减库存

int res=killDao.updateKillItemById(killId);

if (res>0){

//是-生成秒杀成功的订单

this.insertKillSuccessInfo(itemKill,userId);

result=true;

}

}else {

logger.error("产品已经卖完了,谢谢!");

}

}else{

logger.error("redisson-您已经抢购过该商品了!");

}

}

}catch (Exception e){

logger.info("秒杀出现了异常 。。。。",e);

}finally {

if (mutex!=null){

try {

mutex.release();

} catch (Exception e) {

logger.info("秒杀出现了异常 。。。。",e);

}

}

}

return result;

}

}

    /**

* 通用的方法-记录用户秒杀成功后生成的订单-并进行异步邮件消息的通知

* @param kill

* @param userId

* @throws Exception

*/

private void insertKillSuccessInfo(ItemKill kill, Integer userId) throws Exception{

//记录抢购成功后生成的秒杀订单记录

ItemKillSuccess entity=new ItemKillSuccess();

String orderNo=String.valueOf(snowFlake.nextId());

entity.setCode(orderNo); //雪花算法

entity.setItemId(kill.getItemId());

entity.setKillId(kill.getId());

entity.setUserId(userId.toString());

entity.setStatus(SysConstant.OrderStatus.SuccessNotPayed.getCode().byteValue());

entity.setCreateTime(DateTime.now());

int res=killDao.insertSucessKill(entity);

if (res>0){

//入死信队列,用于 “失效” 超过指定的TTL时间时仍然未支付的订单

rabbitMqService.sendKillSuccessOrderExpireMsg(orderNo);

}

}

mq发送消息:

    @Override

public void sendKillSuccessOrderExpireMsg(String orderCode) {

try {

if (StringUtils.isNotBlank(orderCode)){

KillSuccessUserInfo info=killDao.selectByCode(orderCode);

if (info!=null){

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

rabbitTemplate.setExchange(DemoConstant.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_PROD_EXCHANGE);

rabbitTemplate.setRoutingKey(DemoConstant.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_PROD_ROUTING_KEY);

rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

MessageProperties mp=message.getMessageProperties();

//消息设置持久化 消息失效时间到了以后进入死信队列更新订单状态

mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

mp.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserInfo.class);

//设置消息失效时间 消息一旦失效就会进入死信队列

mp.setExpiration(DemoConstant.KILL_SUCESS_DEAD_EXPIRE);

return message;

}

});

}

}

}catch (Exception e){

logger.error("秒杀成功后生成抢购订单-发送信息入死信队列,等待着一定时间失效超时未支付的订单-发生异常,消息为:{}",orderCode,e);

}

}

消息消费者:

@Component

public class KillMessageReceiver {

private static final Logger logger = LoggerFactory.getLogger(BusinessMessageReceiver.class);

@Autowired

private KillDao killDao;

@Autowired

private ObjectMapper objectMapper;

/**

* 用户秒杀成功后超时未支付-监听者

*/

@RabbitListener(queues = DemoConstant.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_REAL_QUEUE)

public void consumeExpireOrder(Message message, Channel channel){

try {

byte []messageBody = message.getBody();

KillSuccessUserInfo info = objectMapper.readValue(messageBody, KillSuccessUserInfo.class);

logger.info("用户秒杀成功后超时未支付-监听者-接收消息:{}",info);

if (info!=null){

ItemKillSuccess entity=killDao.selectByPrimaryKey(info.getCode());

if (entity!=null && entity.getStatus().intValue()==0){

killDao.expireOrder(info.getCode());

}

}

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}catch (Exception e){

logger.error("用户秒杀成功后超时未支付-监听者-发生异常:",e);

}

}

}

以上是 RabbitMq在低并发下处理失效订单的应用 的全部内容, 来源链接: utcz.com/z/517075.html

回到顶部