(翻译)RabbitMQJavaClient教程(七)发布者确认
“发布者确认”是RabbitMQ用来实现可靠地消息发布的一个扩展。如果一个channel上开启了发布者确认,当RabbitMQ妥善保存了消息之后,它会给对应的客户端回复一个确认消息。
总览
在本篇教程中我们将使用“发布者确认”来保证发布的消息安全抵达RabbitMQ。我们将介绍多种确认策略并说明各自的优缺点。
在Channel上开启发布者确认
发布者确认是RabbitMQ对 AMQP 0.9.1协议的扩展,所以它们默认是关闭的。可以通过调用Channel的 confirmSelect方法来开启:
Channel channel = connection.createChannel();channel.confirmSelect();
每个希望开启发布者确认的channel都需要调用这个方法。但是每个channel只调用一次就够了,不需要一条消息调用一次。
策略一:单独发送消息
让我们从最简单的方法开始——发布一条消息然后同步等待RabbitMQ确认:
while (thereAreMessagesToPublish()) { byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
// uses a 5 second timeout
channel.waitForConfirmsOrDie(5_000);
}
在上述代码中我们跟平常一样发布了一条消息然后使用“Channel#waitForConfirmsOrDie(long)”来等待确认。当消息被确认之后这个方法会立即返回。 如果未在超时时间内确认该消息或该消息没有被确认 (这意味着RabbitMQ因为某些原因不能讲消息妥善保管),此方法会抛出一个异常。 异常的处理通常包括记录错误消息和/或重试发送消息。
不同的客户端实现对于发布者确认的同步处理也是不同的,所以请仔细阅读所有客户端的说明文档。
这种方法非常简单,但也有一个致命缺点: 它显著降低了消息发布的速度, 因为等待消息的确认会阻止所有后续消息的发布。这种方法无法提供每秒超过数百条消息的吞吐量。不过,对于某些应用来说,这种吞吐量也足够了。
发布者消息确认不是异步的吗?
我们在一开始的确提到了发布者消息确认是异步的,但是在上边的代码中我们是同步等待消息确认的。实际上客户端是异步接收的确认消息,然后取消了waitForConfirmsOrDie的阻塞调用。 可以将waitForConfirmsOrDie视为一个依赖于后台异步通知的同步助手。
策略二:批量发布消息
为了改进前面的示例,我们可以批量发布消息,并等待整个批次被确认。下边的代码将100条组成一批:
int batchSize = 100;int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
ch.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
ch.waitForConfirmsOrDie(5_000);
}
与每条消息都等待确认相比,同时确认一批消息可以大幅提高吞吐量(使用非本地的RabbitMQ节点,提高了20~30倍)。唯一的缺点就是当有异常发生时,我们不知道到底发生了什么,为此我们需要 将整个批处理保存在内存中以记录一些有意义的内容或重新发布消息。这个解决方案仍然是同步的,所以它仍会阻塞消息的发布。
策略三:异步处理发布确认
RabbitMQ异步确认已发布的消息,我们只需在客户端上注册一个回调即可收到这些确认的通知:
Channel channel = connection.createChannel();channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
// code when message is confirmed
}, (sequenceNumber, multiple) -> {
// code when message is nack-ed
});
这里有两个回调方法:一个用来接收确认消息,一个用来接收没收到的消息。每个方法有两个参数:
- sequence number: 一个用来标识确认或否认消息的数字。我们稍后会看到如何将它与发布的消息关联起来。
- multiple:这是一个布尔值。如果是false,每次只能确认或否认一条消息;如果是true,所有序列号不大于sequence number的消息都会被确认/否认
序列号可以通过Channel#getNextPublishSeqNo()在消息发布之前获取:
int sequenceNumber = channel.getNextPublishSeqNo());ch.basicPublish(exchange, queue, properties, body);
将消息与序列号关联的一种简单方法是使用map。 假设我们要发布字符串,之所以使用字符串是因为它们很容易转换成用于发布的字节数组。下边一个使用map将发布序列号与消息的字符串主体相关联的代码示例:
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();// ... code for confirm callbacks will come later
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());
现在代码将使用map跟踪发布的消息。我们需要在消息确认到达时清理这个map,然后当消息被否认时做一些诸如记录一个警告日志之类的事情:
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
sequenceNumber, true
);
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
};
channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
// ... publishing code
上面的代码包含一个回调方法,它会在消息确认到达时清理map中的消息。注意这个回调同时支持单笔和批量确认。这个回调方法是用来处理消息确认的情况的( Channel#addConfirmListener的第一个参数)。否认消息的回调方法将取出消息的本体并发出一个警告。然后,它使用刚才的回调方法来清除未完成确认的消息(无论消息是被确认还是否认,它都需要从map中清除)
如何跟踪未完成的消息确认?
我们的示例使用 ConcurrentNavigableMap来跟踪未完成的确认。这个数据结构可以提供如下便利。它允许将消息和序列号轻松的关联起来(无论消息的内容是什么),并且可以轻松将map清理到指定的序列号(用来处理批量的确认/否认)。最后,他直接并发访问,因为确认的回调方法是由客户端的某个线程调用的,而客户端会维护在不同的发布者的线程中。
除了使用复杂的map实现之外,你还可以使用一个简单的 concurrent hash map和一个 跟踪发布序列下限的变量来实现对消息确认的跟踪。 但它们通常涉及更多知识,而这些知识超出了教程的范畴。
来总结一下,异步处理发布确认通常需要如下步骤:
- 想办法将消息和发布序列号关联起来
- 在 channel上注册一个监听器,当消息被确认/否认的时候执行合适的操作,比如记录日志或者重新发布被否认的消息。消息与序列号之前的关联关系也需要在本环节做一些清理。
- 在发布消息之前记录发布序列号
重新发布被否认的消息?
从相应的回调中重新发布nack-ed消息可能很诱人,但应避免这种情况,因为消息确认的回调方法是在I/O线程中调度的,他不应该用来做业务操作。 更好的解决方案是将消息放入由发布线程轮询的内存队列中。 诸如ConcurrentLinkedQueue之类的类将是在确认回调和发布线程之间传输消息的理想选择。
总结
在某些应用程序中,确保RabbitMQ正确接收到消息非常重要。发布者确认正式RabbitMQ用来实现这个需求的功能。 发布者确认本质上是异步的,但也可以同步处理它们。实现发布者确认的方式多种多样,通常根据系统的实际情况来选择。比如:
- 分别发送每条消息,然后同步等待确认——简单,但是非常影响吞吐量
- 批量发送消息,然后同步等待批量的回复——简单,吞吐量影响较小,但是在出现异常时很难定位原因
- 异步处理——性能最好的方案,对于错误也有良好的处理,但是需要保证代码的正确性
以上是 (翻译)RabbitMQJavaClient教程(七)发布者确认 的全部内容, 来源链接: utcz.com/z/511346.html