(翻译)RabbitMQJavaClient教程(三)发布/订阅

编程

在前一篇教程中我们创建了一个工作队列。工作队列假设每个消息只会分发给一个消费者。在本篇教程中我们将会做一些完全不同的事情——发送同一条消息给多个消费者。这种模式被称为“发布/订阅”。

为了描述这种模式,我们将会构建一个简单的日志系统。它包含两个程序——一个发送日志消息,另一个接收并打印这些消息。

在我们的日志系统中每一个接收程序都会获取到这些消息。通过这种方式我们可以启动一个消费者将日志记录到磁盘,然后启动另一个消费者将日志输出到屏幕上。

实际上,发布的日志消息将会广播给所有的消费者。

交换器

在前几篇教程中,我们都是直接通过队列发送和接收消息。现在是时候引入RabbitMQ中完整的消息传递模型了。

让我们快速回顾一下先前教程中介绍的内容:

  • 生产者是发送消息的用户程序
  • 队列是存储消息的缓冲区
  • 消费者是接收消息的用户程序

RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。事实上,通常情况下生产者甚至不知道一条消息是否会被发送到一个队列。

相反,生产者只能将消息发送给交换器。交换器是一种非常简单的东西。一方面,它接收生产者发送的消息;另一方面,它把消息推送给队列。交换器必须明确的知道当收到一条消息的时候需要做什么。是否应该将消息发送给某个队列?还是将消息发送给多个队列?或者直接丢弃该消息?这些规则都通过交换器类型来定义。

可用的交换器类型有:direct, topic, headers和fanout。我们将关注最后一个——fanout。让我们创建一个此类型的交换器,并命名为“logs”:

channel.exchangeDeclare("logs", "fanout");

fanout交换器非常简单。正如你从它的名字中所猜测的那样,它只是把它收到的消息广播给所有它关联的队列。而这正好就是我们想要的。

列出所有交换器

通过rabbitmqctl命令你可以列出当前RabbitMQ中所有的交换器:

sudo rabbitmqctl list_exchanges

在列表中可能会有一些名为==amq.*==交换器和默认交换器(没有名字)。这些都是RabbitMQ默认创建的,但是目前你不太可能用到它们。

无名交换器

在之前的教程中,我们对交换器相关的只是一无所知,却仍向队列中发送了消息。我们之所以成功是因为我们使用了一个默认交换器,它使用空字符串("")标识。

回忆一下我们之前是怎么发送消息的:

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数就是交换器的名字。空字符串表示默认的无名交换器:消息会根据routingKey参数配置的名字路由到指定的队列——如果存在的话。

现在,我们可以将我们的消息发布到指定名称的交换器了:

channel.basicPublish( "logs", "", null, message.getBytes());

临时队列

你可能还记得之前我们使用的带有具体名字的队列(比如hello和task_queue)。可以给队列起名字对于我们是十分重要的——我们需要将消费者指定到同一个队列。当你希望在生产者和消费者之间共享队列的时候,给队列起一个名字是十分重要的。

但是对我们的日志系统来说则不然。我们希望收到所有的日志消息,而非其中的某个子集。我们也只对当前正在发送的消息感兴趣,而对旧消息不感兴趣。要解决这个问题需要做两件事情。

首先,无论何时我们连接到RabbitMQ,我们都需要一个全新的、空的队列。为了实现这个需求,我们可以通过使用随机名称创建队列或者更直接点,让服务端为我们选择一个随机队列。

其次,一旦我们的消费者断开了与队列的连接,队列需要自动被删除。

在java客户端中,当我们使用没有任何参数的==queueDeclare()==方法时,我们创建了一个非持久化的、非排他的、自动删除的队列,而它的名字是自动生成的:

String queueName = channel.queueDeclare().getQueue();

你可以通过这个文档来学习更多关于==exclusive==参数和其他队列属性的知识。

在上述代码中,queueName是一个包含随机字符串的名字。比如==amq.gen-JzTY20BRgKO-HjmUJj0wLg==。

绑定

我们已经创建了一个fanout交换器和一个队列。现在我们需要告诉交换器把消息发送到我们的队列中。在交换器和队列之间的这种关系我们称作绑定(binding)。

channel.queueBind(queueName, "logs", "");

从现在开始==logs==交换器将会把消息发送给我们的队列。

列举绑定关系

你可以使用下列方式获取当前已有的绑定关系

rabbitmqctl list_bindings

完整代码

发送日志消息的生产者的代码,和前一篇教程几乎没什么差别。其中最大的差别就是我们指定了交换器的名字为==logs==而非无名交换器。当我们发送消息时,需要提供一个路由键( routingKey),但是在fanout交换器中,这个参数是无效的。下边是==EmitLog.java==的代码:

public class EmitLog {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

try (Connection connection = factory.newConnection();

Channel channel = connection.createChannel()) {

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

String message = argv.length < 1 ? "info: Hello World!" :

String.join(" ", argv);

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

System.out.println(" [x] Sent "" + message + """);

}

}

}

如你所见,在建立连接之后我们声明了一个交换器。因为我们不能发布消息到一个不存在的交换器,所以这一步非常重要。

如果交换器当前没有绑定任何队列,消息将会丢失,但是这对于我们来说是无所谓的——如果当前没有任何消费者,我们把消息丢掉也没什么关系。

==ReceiveLogs.java==的代码:

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

String queueName = channel.queueDeclare().getQueue();

channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received "" + message + """);

};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

}

}

以上是 (翻译)RabbitMQJavaClient教程(三)发布/订阅 的全部内容, 来源链接: utcz.com/z/511130.html

回到顶部