(翻译)RabbitMQJavaClient教程(四)路由
在前一篇教程中我们构建了一个日志系统。现在我们知道如何把日志消息广播发送给多个接受者了。
在本篇教程中我们将增加一个功能——允许订阅者只获取它感兴趣得一部分消息。比如我们仅把严重错误的日志存储到日志文件,而将所有的日志消息都打印到控制台。
绑定
在之前的章节中我们已经创建过交换器和队列的绑定。你可能还记得下边的代码:
channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定表示交换器和队列之间的一种关系。这可以简单描述为:队列只对这个交换器的消息感兴趣。
绑定关系可以使用一个额外的==routingKey==参数。为了避免与==basic_publish==中的参数混淆,在这里我们叫它==binding key==。下边是我们使用一个key参数创建绑定关系的代码:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
binding key的作用于交换器的类型有关。我们之前使用的fanout交换器,会直接忽略该参数。
Direct 交换器
我们前一篇教程的日志系统会广播消息给所有的消费者。现在我们要扩展这个系统,使它可以根据消息的严重程度进行过滤。比如我们仅想要将严重的错误记录到磁盘上,而不想那些警告或提示信息浪费磁盘空间。
我们现在使用的fanout交换器,没有给我们提供较大的灵活性——它只能无脑的广播消息。
我们将使用==direct==交换器替代它。==direct==交换器使用的路由策略很简单——消息仅会发送到==binding key==与==routing key==相同的队列中。
为了说明这一点,请看下图:
在图片中我们可以看到一个绑定了两个队列的名为“x”的direct交换器。第一个队列的binding key是orange,而第二个则有black和green两个绑定。
在上边的配置中,一个发送到交换器且routing key为orange的消息会被路由到Q1队列。routing key为black和green的消息会被路由到Q2队列。routing key为其他的所有消息都会被丢弃。
多重绑定
用相同的绑定键绑定多个队列是完全可行的。在我们刚才的例子中,我们在交换器X和队列Q1之间增加了一个binding key为black的绑定关系。此时,这个direct交换器的行为就会和fanout交换器相同——它会广播消息给所有的消费者。routing key为black的消息会同时发送给Q1和Q2。
发送日志
在我们的日志系统中,我们将使用direct交换器替换掉fanout交换器。我们可以使用routing key来表示日志的严重级别。通过这种方式,消费者就可以自由选择它想要的日志。我们先来看如何发送日志吧。
像往常一样,我们需要先创建一个交换器:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
然后我们开始发送消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
为了简化模型,我们假设日志级别只有“info”、“warning”和“error“三种。
订阅
接收消息的代码和前一篇教程几乎相同,除了我们将为我们感兴趣的每个日志级别创建一个新的绑定关系:
String queueName = channel.queueDeclare().getQueue();for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
完整代码
==EmitLogDirect.java==
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent "" + severity + "":"" + message + """);
}
}
//..
}
==ReceiveLogsDirect.java==
import com.rabbitmq.client.*;public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for (String severity : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received "" +
delivery.getEnvelope().getRoutingKey() + "":"" + message + """);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
以上是 (翻译)RabbitMQJavaClient教程(四)路由 的全部内容, 来源链接: utcz.com/z/511164.html