(翻译)RabbitMQJavaClient教程(四)路由

编程

在前一篇教程中我们构建了一个日志系统。现在我们知道如何把日志消息广播发送给多个接受者了。

在本篇教程中我们将增加一个功能——允许订阅者只获取它感兴趣得一部分消息。比如我们仅把严重错误的日志存储到日志文件,而将所有的日志消息都打印到控制台。

绑定

在之前的章节中我们已经创建过交换器和队列的绑定。你可能还记得下边的代码:

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定表示交换器和队列之间的一种关系。这可以简单描述为:队列只对这个交换器的消息感兴趣。

绑定关系可以使用一个额外的==routingKey==参数。为了避免与==basic_publish==中的参数混淆,在这里我们叫它==binding key==。下边是我们使用一个key参数创建绑定关系的代码:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

binding key的作用于交换器的类型有关。我们之前使用的fanout交换器,会直接忽略该参数。

Direct 交换器

我们前一篇教程的日志系统会广播消息给所有的消费者。现在我们要扩展这个系统,使它可以根据消息的严重程度进行过滤。比如我们仅想要将严重的错误记录到磁盘上,而不想那些警告或提示信息浪费磁盘空间。

我们现在使用的fanout交换器,没有给我们提供较大的灵活性——它只能无脑的广播消息。

我们将使用==direct==交换器替代它。==direct==交换器使用的路由策略很简单——消息仅会发送到==binding key==与==routing key==相同的队列中。

为了说明这一点,请看下图:

在图片中我们可以看到一个绑定了两个队列的名为“x”的direct交换器。第一个队列的binding key是orange,而第二个则有black和green两个绑定。

在上边的配置中,一个发送到交换器且routing key为orange的消息会被路由到Q1队列。routing key为black和green的消息会被路由到Q2队列。routing key为其他的所有消息都会被丢弃。

多重绑定

用相同的绑定键绑定多个队列是完全可行的。在我们刚才的例子中,我们在交换器X和队列Q1之间增加了一个binding key为black的绑定关系。此时,这个direct交换器的行为就会和fanout交换器相同——它会广播消息给所有的消费者。routing key为black的消息会同时发送给Q1和Q2。

发送日志

在我们的日志系统中,我们将使用direct交换器替换掉fanout交换器。我们可以使用routing key来表示日志的严重级别。通过这种方式,消费者就可以自由选择它想要的日志。我们先来看如何发送日志吧。

像往常一样,我们需要先创建一个交换器:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

然后我们开始发送消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

为了简化模型,我们假设日志级别只有“info”、“warning”和“error“三种。

订阅

接收消息的代码和前一篇教程几乎相同,除了我们将为我们感兴趣的每个日志级别创建一个新的绑定关系:

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){

channel.queueBind(queueName, EXCHANGE_NAME, severity);

}

完整代码

==EmitLogDirect.java==

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

try (Connection connection = factory.newConnection();

Channel channel = connection.createChannel()) {

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

String severity = getSeverity(argv);

String message = getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));

System.out.println(" [x] Sent "" + severity + "":"" + message + """);

}

}

//..

}

==ReceiveLogsDirect.java==

import com.rabbitmq.client.*;

public class ReceiveLogsDirect {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

String queueName = channel.queueDeclare().getQueue();

if (argv.length < 1) {

System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");

System.exit(1);

}

for (String severity : argv) {

channel.queueBind(queueName, EXCHANGE_NAME, severity);

}

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received "" +

delivery.getEnvelope().getRoutingKey() + "":"" + message + """);

};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

}

}

以上是 (翻译)RabbitMQJavaClient教程(四)路由 的全部内容, 来源链接: utcz.com/z/511164.html

回到顶部