RabbitMQ进阶

编程

消息何去何从:

mandatory 和 immediate 是 channel.basicPublish 方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。RabbitMQ 提供的备份交换器(Alternate Exchange)可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。

mandatory 参数:

当 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当 mandatory 参数设置为 false 时,出现上述情形,则消息直接被丢弃。

生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用 channel.addReturnListener 来添加 ReturnListener 监听器实现。

使用 mandatory 参数的关键代码如下:

channel.basicPublish(EXCHAGE_NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes());

channel.addReturnListener(new ReturnListener(){

public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,

AMQP.BasicProperties basicProperties, byte[] body) throws IOException{

String message = new String(body);

System.out.println("Basic.Return 返回的结果是:" + message);

}

});

上面的代码中生产者没有成功地将消息路由到队列,此时 RabbitMQ 会通过 Basic.Return 返回 "mandatory test"这条消息,之后生产者客户端通过 ReturnListener 监听到了这个事件。 

immediate 参数:

当 immediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者。

概括来说,mandatory 参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。

RabbitMQ 3.0 版本去掉了 immediate 参数的支持,对此官方的解释是:immediate 参数会影响镜像队列的性能,增加了代码的复杂性,建议采用 TTL 和 DLX的方法替代(后续再详细说明)。所以现在发送带 immediate 参数(immediate 参数设置为true)的Basic.Publish 客户端会报对应异常。

备份交换器:

备份交换器,英文名称为 Alternate Exchange,简称 AE。生产者在发送消息地时候如果不设置 mandatory 参数,那么消息在未被路由的情况下将会丢失;如果设置了 mandatory 参数,那么需要添加 RetrunListener 的编程逻辑,生产者的代码将变得复杂。如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在 RabbitMQ 中,再在需要的时候去处理这些消息。

可以通过在声明交换器(调用 channel.exchageDeclare 方法)的时候添加 alternate-exchange 参数来实现,也可以通过策略(Policy,后面详细介绍)的方式实现。如果两者同时使用,则前者的优先级会更高,会覆盖掉 Policy 的设置。

Map<String, Object> args = new HashMap<String,Object>();

args.put("alternate-exchange","my AE");

channel.exchangeDeclare("normalExchage","direct",true,false,args);

channel.exchangeDeclare("my AE","fanout",true,false,null);

channel.queueDeclare("normalQueue",true,false,fasle,null);

channel.queueBind("normalQueue","normalExchange","normalKey");

channel.queueDeclare("unroutedQueue",true,false,false,null);

channel.queueBind("unroutedQueue","my AE","");

注意 myAE的交换器类型为 fanout。

如果此时发送一条消息到 normalExchange 上,当路由键等于 normalKey 的时候,消息能正确路由到 normalQueue 这个队列中。如果路由键设为其他值,比如“errorKey”,即消息不能被正确路由到 与 normalExchange 绑定的任何队列上,此时就会发送给 myAE,进而发送到 unroutedQueue 这个队列。

同样,如果采用 Policy 的方式来设置备份交换器:

rabbitmqctl set_policy AE "^normalExchange$" "{"alternate-exchange": "myAE"}"

备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为 fanout 类型,如若想设置为 direct 或者 topic 的类型也没有什么不妥。需要注意的是,消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的。

考虑这样一种情况,如果备份交换器的类型是 direct,并且有一个与其绑定的队列,假设绑定的路由键是 key1,当某条携带路由键为 key2 的消息被转发到这个备份交换器的时候,备份交换器中没有匹配到合适的队列,则消息丢失。如果消息携带的路由键为 key1,则可以存储到队列中。

对于备份交换器,总结了以下几种特殊情况:

  • 如果设置的备份交换器不存在,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有绑定任何队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有任何匹配的队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器和 mandatory 参数一起使用,那么 mandatory 参数无效。

过期时间(TTL):

TTL,Time to Live 的简称,即过期时间。RabbitMQ 可以对消息和队列设置TTL。

设置消息的TTL: 

目前有两种方法可以设置消息的TTL。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的TTL可以不同。如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的TTL值时,就会变成“死信”(Dead Message),消费者将无法再收到该消息(这点不是绝对的)。

第一种通过队列属性设置消息 TTL 的方法是在 channel.queueDeclare 方法中加入 x-message-ttl 参数实现的,这个参数的单位是毫秒。

Map<String, Object> argss = new HashMap<String, Object>();

argss.put("x-message-ttl",6000);

channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss);

同时也可以通过 Policy 的方式来设置 TTL ,如:

rabbitmqctl set_policy TTL ".*" "{"message-ttl":60000}" --apply-to queues

还可以通过调用 HTTP API 接口设置:略

如果不设置TTL,则表示此消息不会过期;如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃,这个特性可以部分替代 RabbitMQ 3.0 版本之前的 immediate 参数,之所以部分代替,是因为 immediate 参数在投递失败时会用 Basic.Return 将消息返回(这个功能可以用死信队列来实现)

第二种针对每条消息设置 TTL 的方法是在 channel.basicPublish 方法中加入 expiration 的属性参数,单位为毫秒。

AMQP.basicProperties.Builder builder = new AMQP.BasicProperties.Builder();

builder.deliveryMode(2);//持久化消息

builder.expiration("60000");//设置 TTL=60000ms

AMQP.BasicProperties properties = builder.build();

channel.basicPublish(exchangeName, routingKey, mandatory, properties, "ttlTestMessage".getBytes());

AMQP.BasicProperties properties = new AMQP.BasicProperties();

properties.setDeliveryMode(2);

properties.setExpiration("60000");

channel.basicPublish(exchangeName, routingKey, mandatory, properties, "ttlTestMessage".getBytes());

还可以通过 HTTP API 接口设置:略

对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而在第二种方法中,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。

设置队列的 TTL:

 

 

 

 

 

 

 

 

 

 

 

 

 

以上是 RabbitMQ进阶 的全部内容, 来源链接: utcz.com/z/512364.html

回到顶部