(翻译)RabbitMQJavaClient教程(二)工作队列
在第一篇教程中我们编写了发送消息到队列并从中接收消息的程序。在本篇教程中我们将会创建一个可以在多个消费者中分发耗时任务的工作队列(Work Queue)。
工作队列(也叫任务队列)的主要作用是你不需要立即运行一个耗时的任务并等待它完成。相反,我们可以将这个任务延后运行。我们将一个任务(task)封装成消息并发送到一个队列。一个后台运行的工作进程(worker process)将会取出这个任务并运行它。当你运行了多个工作线程时,任务会在它们之中进行分配。
如果你需要在一个较短的HTTP请求窗口中处理一个复杂任务,这个概念会显得尤为重要。
准备工作
在上一篇教程中我们发送了一条包含" Hello World! "的消息。这次我们将发送代表着复杂任务的字符串。由于我们并没有诸如调整图片大小或者渲染PDF文件这些真正耗时的任务,所以我们使用==Thread.sleep()==来模拟。我们将字符串中"."的数量作为它的复杂度,每一个点表示一秒钟的“工作”。比如,一个用==Hello...==描述的任务将会消耗三秒钟。
为了通过命令行发送任意的消息,我们将对上一篇教程中的Send.java的代码进行一些修改。这段程序将会把我们的任务放入工作队列,所以我们叫它==NewTask.java==。
String message = String.join(" ", argv);channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);
我们之前的Recv.java代码也需要进行一些修改:它需要把消息体中的每个“.”当做一秒钟的任务耗时。它将会接收消息并运行任务,所以我们把它叫做==Worker.java==。
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received "" + message + """);
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
我们模拟任务耗时的代码如下所示:
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) {
if (ch == ".") Thread.sleep(1000);
}
}
轮询分发
使用任务队列的优势之一就是就可以将任务并行化。如果我们的任务无法及时处理导致积压,可以很轻松的通过增加更多工作线程来解决。
首先,让我们同时开启两个工作线程。它们将同时从队列中获取消息,然后我们看下具体情况如何。
你需要打开三个命令行窗口(译者注:你可以IDE完成下列操作,不需要命令行)。其中两个用来跑工作线程。这些控制台称作我们的消费者——C1和C2.
# shell 1java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
在第三个窗口中我们发布新的任务。一旦你启动好了消费者,你就可以开始发布一些消息。
# shell 3java -cp $CP NewTask First message.
# => [x] Sent "First message."
java -cp $CP NewTask Second message..
# => [x] Sent "Second message.."
java -cp $CP NewTask Third message...
# => [x] Sent "Third message..."
java -cp $CP NewTask Fourth message....
# => [x] Sent "Fourth message...."
java -cp $CP NewTask Fifth message.....
# => [x] Sent "Fifth message....."
让我们看下我们的消费者收到了什么东西:
java -cp $CP Worker# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received "First message."
# => [x] Received "Third message..."
# => [x] Received "Fifth message....."
java -cp $CP Worker# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received "Second message.."
# => [x] Received "Fourth message...."
默认情况下,RabbitMQ会依次将消息发给每个消费者。一般情况下每个消费者收到的消息数量是相同的。这种分发消息的方式我们叫做轮询(round-robin)。你可以试下开启三个或者更多工作线程会有什么结果。
消息确认
任务的运行可能需要花费一些时间。你可能想要知道当某个消费者运行了一个耗时很长的任务但是中途失败的时候会发生什么。在我们当前的代码中,一旦RabbitMQ将某个消息发送给了消费者,它会立即将该消息标记为删除。在这种情况下,如果你杀掉了那个进程我们就会丢失掉它正在处理的消息。同样的,我们会丢失所有发送到当前进程但还未处理的消息。
但是我们并不想损失任何消息。当一个工作进程挂掉之后,我们希望其他的工作进程可以接受这个任务。
为了保证我们的每一条消息都不会丢失,RabbitMQ支持消息确认机制。确认消息是消费者用来告诉RabbitMQ某个消息已经被正确收到并处理,可以随时被删除。
如果某个消费者因为挂掉(channel关闭,connection关闭或者TCP连接断开)而没有发送确认消息,RabbitMQ会认为这条信息未处理完成并将它重新入队。如果此时有其他消费者存活,它会将消息转发给对应消费者。通过这种方式你可以保证消息永远不会丢失,即使消费者会随机的挂掉。
当前的消息没有任何超时设置:RabbitMQ在消费者挂掉的时候始终会重新转发消息。 即使处理一条消息花费非常非常长的时间也没关系。
手动消息确认默认是开启状态。在前一章的例子中我们通过设置==autoAck=true==手动关闭了它。现在是时候把它设置为false,并在任务完成后发送合适的确认消息了。
channel.basicQos(1); // accept only one unack-ed message at a time (see below)DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received "" + message + """);
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
通过这段代码我们可以保证即使在任务处理消息的时候使用CTRL+C关闭了进程,也不会有任何消息丢失。工作线程挂掉后不久,所有未确认的消息会被重新发送。
确认消息的发送必须与消息的接收位于同一个channel中。尝试使用一个不同的channel发送确认消息将会收到一个channel级别的协议异常。详情可以查看该文档。
忘记确认
忘记调用basicAck进行确认是一种很常见的错误。这个错误很简单,但是后果很严重。消息会在你的客户端退出的时候重新发送(看起来像是随机发送),但是RabbitMQ会因为一直持有这些未回复的消息而占用大量的内存。
你可以使用==rabbitmqctl==打印==messages_unacknowledged==参数来调试这个错误
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在windwos上,使用下列命令
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久化
我们已经学习了当消费者挂掉时如何保证消息不丢失。但是当RabbitMQ服务端挂掉时,消息依然会丢失。
当RabbitMQ退出或崩溃时,默认会丢掉所有的队列和消息。如果想要保证消息不丢失我们需要做两件事:将队列和消息都设置为持久化。
首先,我们需要保证RabbitMQ永远不会丢掉我们的队列。为了达到这个目的,我们需要把队列声明为持久化的:
boolean durable = true;channel.queueDeclare("hello", durable, false, false, null);
虽然上边的代码本身是正确的,但是却无法在我们当前的设置下使用。因为我们已经定义了名叫hello的非持久化队列。RabbitMQ不允许使用不同的参数重新声明已经存在的队列,并且会返回一个错误。最简单的解决方法就是,重新声明一个名字不同的队列,比如==task_queue==:
boolean durable = true;channel.queueDeclare("task_queue", durable, false, false, null);
生产者和消费者都需要修改队列声明的代码。
现在我们可以保证即使RabbitMQ重启,==task_queue==队列也不会丢失。然后我们需要将我们的消息也设置为持久化的——通过设置MessageProperties的PERSISTENT_TEXT_PLAIN参数:
import com.rabbitmq.client.MessageProperties;channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
消息持久化的注意事项
将消息设置为持久化的并不意味着它一定不会丢失。虽然它告诉RabbitMQ将这条消息保存到磁盘上,但是从接收消息到存到磁盘之间仍有一个短暂的时间窗口。同样的,RabbitMQ也不会每个消息都调用==fsync(2)==——此时消息只会被保存在系统缓存而不是真正的写入磁盘。所以消息的持久化并非万无一失,但是对于我们简单的任务队列来说是足够了。如果你想要一个更加安全的持久化你可以参考生产者确认
公平消息分发
你可能已经注意到消息分发并不像我们想象中的那样工作。考虑这样一个场景,有两个消费者,RabbitMQ发出的单数消息都很复杂,而双数的消息都很简单,这就会导致一个消费者始终处于忙碌状态,而另一个消费者几乎无事可做。然而,RabbitMQ并不知道这种情况,他依然会均匀的分发消息。
发生这种情况是因为RabbitMQ在消息进入队列时才调度消息。 它并不会关注消费者回复确认消息的数量。 它只是盲目地将每第n条消息发送给第n个消费者。
为了杜绝这种情况发生,我们可以使用==basicQos==方法设置prefetchCount=1。这会告诉RabbitMQ一次最多只能给消费者一条消息。或者换句话说,在消费者处理完消息并回复确认之前,不然发送新的消息。这样的话,RabbitMQ就会将消息分发到另一个空闲的消费者。
int prefetchCount = 1;channel.basicQos(prefetchCount);
注意队列的容量
如果所有的消费者都处于忙碌状态,你的队列可能被填满。你需要注意这一点,或者增加更多消费者,或者使用其他策略。
完整代码
==NewTask.java==
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = String.join(" ", argv);
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent "" + message + """);
}
}
}
==Worker.java==
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received "" + message + """);
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == ".") {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
通过消息确认机制和 prefetchCount参数你可以创建一个工作队列。而持久化选项使得即使 RabbitMQ重启,消息依然会存在。
如果你想要知道关于 Channel类和 MessageProperties的更多信息,可以查阅JavaDocs。
以上是 (翻译)RabbitMQJavaClient教程(二)工作队列 的全部内容, 来源链接: utcz.com/z/511122.html