RabbitMQ客户端开发向导
AMQP 协议层面的操作通过 Channel 接口实现。Connection 是用来开启 Channel(信道)的,可以注册事件处理器,也可以在应用结束时关闭连接。与 RabbitMQ 相关的开发工作,基本上也是围绕 Connection 和 Channel 这两个类展开的。
连接 RabbitMQ
方式一:
ConnectionFactory factory = new ConnectionFactory();factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(virtualHost);
factory.setHost(ip_address);
factory.setPort(Port);
Connection conn = factory.newConnection();
方式二:
ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://userName:password@ipAddress:portNumber/virtualHost");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
注意要点:
Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程间共享,应用程序应该为每一个线程开辟一个 Channel。
多线程共享 Channel 实例是非线程安全的。
Channel 或者 Connection 中有个 isOpen 方法可以用来检测其是否已处于开启状态。但并不推荐在生产环境的代码上使用 isOpen 方法,这个方法的返回值依赖于 shutdownCause 的存在,有可能会产生竞争。
使用交换器和队列:
交换器和队列是 AMQP 中 high-level 层面的构建模块,应用程序需确保在使用他们的时候就已经存在了,在使用之前需要先声明(declare) 他们。
// 声明一个交换器channel.exchangeDeclare(exchangeName, "direct", true);
// 声明一个队列并获取队列的名字
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeNeme, routingKey);
上面创建了一个持久化的、非自动删除的、绑定类型为 direct 的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列。这里的交换器和队列也都没有设置特殊的参数。
上面声明的队列具备如下特性:只对当前应用中同一个 Connection 层面可用,同一个 Conenction 的不同 Channel 可共用,并且也会在应用连接断开时自动删除。
channel.exchangeDeclare(exchangeName, "direct", true);channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
这里的队列被声明为持久化的、非排他的、非自动删除的,而且也被分配另一个确定的已知的名称(由客户端分配而非 RabbitMQ 自动生成)。
生产者和消费者都可以声明一个交换器或者队列。如果尝试声明另一个已经存在的交换器或者队列,只要声明的参数完全匹配现存的交换器或者队列,RabbitMQ 就可以什么都不做,并成功返回。如果声明的参数不匹配则会抛出异常。
生产者和消费者都能够使用 queueDeclare 来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道置为“传输”模式,之后才能声明队列。
exchangeBind 方法:
我们不仅可以将交换器与队列绑定,也可以将交换器与交换器绑定,两者的用法如出一辙。
绑定后,消息从source 交换器转发到destination 交换器,某种程度上来说 destination 交换器可以看作一个队列。
channel.exchangeDeclare("source","direct",false,true,null);channel.exchangeDeclare("destination","fanout",false,true,null);
channel.exchangeBind("destination","source","exKey");
channel.queueDeclare("queue",false,false,true,null);
channel.queueBind("queue","destination","");
channel.basicPublish("source","exKey",null,"exToExDemo".getBytes());
生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换器 destination, 并把消息转发到 destination 中,进而存储在 destination 绑定的队列 queue 中。
何时创建:
RabbitMQ 的消息存储在队列中,交换器的使用并不真正耗费服务器的性能,而队列会。如果要衡量 RabbitMQ 当前的 QPS(每秒查询率) 只需看队列的即可。在实际业务应用中,需要对所创建的队列的流量、内存占用及网卡占用有一个清晰的认知,预估其平均值和峰值,以便在固定硬件资源的情况下能够进行合理有效的分配。
预先创建好资源还有一个好处是,可以确保交换器和队列之间正确地绑定匹配。很多时候由于人为因素、代码缺陷等,发送消息的交换器并没有绑定任何队列,那么消息将会丢失;或者交换器绑定了某个队列,但是发送消息时的路由键无法与现存的队列匹配,那么消息也会丢失。当然可以匹配 mandatory 参数或者备份交换器来提高程序的健壮性。
如果集群资源充足,而即将使用的队列所占用的资源又在可控的范围之内,为了增加业务的灵活性,也完全可以在业务程序中声明队列。
使用预先分配创建资源的静态方式还是动态方式,需要从业务逻辑本身、公司运维体系和公司硬件资源等方面考虑。
发送消息:
如果要发送一个消息,可以使用 Channel 类的 basicPublish 方法
byte[] messageBodyBytes = "Hello, World!".getBytes();channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
为了更好地控制发送,可以使用 mandatory 这个参数,或者可以发送一些特定属性的信息:
channel.basicPublish(exchangeName, routingKey, mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
也可以自己设定消息的属性:
channle.basicPublish(exchangeName, routingKey, new AMQP.BasicProperties.Builder()
.contentType("text/plan")
.deliveryMode(2)
.priority(1)
.userId("hidden")
.build()),
messageBodyBytes();
这条消息的投递模式(delivery mode) 设置为2,即消息会被持久化(即存入磁盘)在服务器中。同时这条消息的优先级(priority) 设置为1,content-type 为 "text/plan"。
也可以发送一条带有 headers 的消息:
Map<String, Object> headers = new HashMap<String,Object>();header.put("location","here");
header.put("time","tody");
channel.basicPublish(exchangeName, routingKey, new AMQP.BasicProperties.Builder().headers(headers).build(),messageBodyBytes);
还可以发送一条带有过期时间(expiration) 的消息:
channel.basicPublish(exchangeName, routingKey, new AMQP.BasicProperties.Builder().expriation("60000").buld(),messageBodyBytes);
对于 basicPublish 而言,有几个重载方法:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
void basicPublic(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)throws IOExceprion;
对应的具体参数解释如下所述:
- exchange: 交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到 RabbitMQ 默认的交换器中。
- routingKey: 路由键,交换器根据路由键将消息存储到相应的队列中。
- props: 消息的基本属性集,其包含 14 个属性成员,分别有 contentType、contentEncoding、headers(Map<String,Object>)、deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId。
- byte[] body: 消息体(payload),真正需要发送的消息。
- mandatory 和 immediate 后面详细介绍。
消费消息:
RabbitMQ 的消费模式分两种:推(Push)模式和拉(Pull)模式。推模式采用 Basic.Consumer 进行消费,而拉模式则是调用 Basic.Get 进行消费。
推模式将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息。好处很明显,消费者总是有一堆在内存中待处理的消息,所以效率高。缺点是缓冲区可能会溢出。
拉模式在消费者需要时采取消息中间件拉取消息,这段网络开销会明显增加消息延迟,降低系统吞吐量。
选择推模式还是拉模式需要考虑使用场景。
推模式:
在推模式中,可以通过持续订阅的方式来消费消息
接收消息一般通过实现 Consumer 接口或者继承 DefaultConsumer 类来实现。当调用与 Consumer 相关的API方法时,不同的订阅采用不同的消费者标签(consumerTag)来区分彼此,在同一个 Channel 中的消费者也需要通过唯一的消费者标签以作区分
boolean autoAck = false;channel.basicQos(64);
channel.basicConsumer(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties
properties, byte[] body) throws IOException{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
}
});
注意,上面代码中显示地设置 autoAck 为 false,然后在接收到消息之后进行显示 ack 操作(channel.basicAck),对于消费者来说这个设置是非常必要的,可以防止消息不必要地丢失。
Channel 类中 basicConsumer 方法有如下几种形式:
- String basicConsume(String queue, Consumer callback) throws IOException;
- String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
- String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
- String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
- String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,Object> arguments, Consumer callback) throws IOException;
对应的参数如下所述:
- queue: 队列的名称;
- autoAck: 设置是否自动确认。建议设成 false,即不自动确认;
- consumerTag: 消费者标签,用来区分多个消费者;
- noLocal: 设置为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者;
- exclusive: 设置是否排他;
- arguments: 设置消费者的其他参数;
- callback: 设置消费者的回调函数。用来处理 RabbitMQ 推送过来的消息,比如 DefaultConsumer,使用时需要客户端重写(override)其中的方法。
对于消费者客户端来说重写 handleDelivery 方法是十分方便的。更复杂的消费者客户端会重写更多方法,如下:
- void handleConsumeOk(String consumerTag);
- void handleCancelOk(String consumerTag);
- void handleCancel(String consumerTag) throws IOException;
- void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) ;
- void handleRecoverOk(String consumerTag);
handleShutdownSignal 方法,当 Channel 或者 Connection 关闭的时候会调用。再者,handleConsumeOk 方法会在其他方法之前调用,返回消费者标签。
重写 handleCancelOk 和 handleCancel 方法,这样消费端可以在显示地或者隐示地取消订阅的时候调用。也可以通过 channel.basicCancel 方法来显示地取消一个消费者的订阅:channel.basicCancel(consumerTag);
注意上面这行代码会首先出发 handleConsumerOk 方法,之后触发 handleDelivery 方法,最后才触发 handleCancelOk 方法。
和生产者一样,消费者客户端同样需要考虑线程安全的问题。消费者客户端的这些 callback 会被分配到与 Channel 不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法。
每个 Channel 都拥有自己独立的线程。最常用的做法是一个 Channel 对应一个消费者,也就是意味着消费者彼此之间没有任何关联。当然也可以在一个 Channel 中维持多个消费者,但是要注意一个问题,如果 Channel 中的一个消费者一直在运行,那么其他消费者的 callback 会被 "耽搁"。
拉模式:
通过 channel.basicGet 方法可以单条地获取消息,其返回值是 GetResponse。Channel 类的 basicGet 方法没有其他重载方法,只有:
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
其中 queue 代表队列的名称,如果设置 autoAck 为 false , 那么同样需要调用 channel.basicAck 来确认消息已被成功接收。
GetResponse response = channel.basicGet(QUEUE_NAME, false);System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
注意要点:
Basic.Consume 将信道(Channel) 置为投递模式,直到取消队列的订阅为止。在投递模式期间,RabbitMQ 会不断地推送消息给消费者,当然推送消息的个数还是受到 Basic.Qos 的限制。如果只想从队列获得单条消息而不是持续订阅,建议还是使用 Basic.Get 进行消费。但是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume, 这样做会严重影响 RabbitMQ 的性能。如果要实现高吞吐量,消费者理应使用 Basic.Consume 方法。
消费端的确认与拒绝:
为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(message acknowledement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false 时,RabbitMQ 会等待消费者显示地回复确认信号后才从内存(或者磁盘)中移去消息(实质上时先打上删除标记,之后再删除)。当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息直到消费者显示调用 Basic.Ack 命令为止。
当 autoAck 参数置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。
RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。
RabbitMQ 的 Web 管理平台上可以看到当前队列中的 "Ready" 状态和 "Unacknowledged" 状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数,也可以通过相应的命令来查看:
rabbitmqctl list_queues name message_ready message_unacknowledged
在消费者接收到消息后,如果想明确拒绝当前的消息而不是确认,那么应该怎么做呢?消费者客户端可以调用与其对应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。
Channel 类中的 basicReject 方法定义如下:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
其中 deliveryTag 可以看作消息的编号,它是一个 64 位的长整型值,最大值是 9223372036854775807 。如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果 requeue 参数设置位 false,则 RabbitMQ 立即会把消息从队列中移除,而不会把它发送给新的消费者。
Basic.Reject 命令一次只能拒绝一个条消息,如果想要批量拒绝消息,则可以使用 Basic.Nack 这个命令:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
multiple 参数设置为 false 则表示拒绝编号为 deliveryTag 的这一条消息,这时候 basicNack 和 basicReject 方法一样;multiple 参数设置为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息。
注意要点:
将 channel.basicReject 或者 channel.basicNack 中的 requeue 设置为 false,可以启用"死信队列"的功能。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。
对于 requeue ,AMQP 中还有一个命令 Basic.Recover 具备可重入队列的特性:
Basic.RecoverOk basicRecover() throws IOException;Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
这个 channel.basicRecover 方法用来请求 RabbitMQ 重新发送还未被确认的消息。如果 requeue 参数设置为 true,则未被确认的消息会被重新加入到队列中,这样对于同一条消息来说,可能会被分配给与之前不同的消费者。如果 requeue 参数设置为 false,那么同一条消息会被分配给与之前相同的消费者。默认情况下,如果不设置 requeue 这个参数,相当于 channel.basicRecover(true),即 requeue 默认为 true。
以上是 RabbitMQ客户端开发向导 的全部内容, 来源链接: utcz.com/z/512203.html