(翻译)RabbitMQJavaClient教程(五)主题

编程

在前一篇教程中我们改进了我们的日志系统。我们使用direct交换器替换掉了只能广播消息的fanout交换器,使消费者可以选择想要的日志级别。

不过我们虽然使用了direct交换器改进了系统,但是它还是有一些局限性——他不能基于多个条件做路由。

在我们的日志系统中我们可能不仅仅想要根据日志级别订阅消息,还想要根据日志的发送方来判断。你可能在unix工具syslog中听过类似的概念,它就会根据日志级别(info/warn/crit...)和来源(auth/cron/kern...)来路由日志。

这将会给我们提供很大的灵活性——我们可能仅想要接受来自“ cron ”的error日志,但是来自“kern”的日志则全部接收。

我们实现这个功能,我们需要学习一个更复杂的交换器“topic”。

Topic交换器

发往 topic交换器的消息的routing_key是使用点间隔的一系列单词,而不能随意赋值。 这些单词随意选择,但通常它们代表着与消息有关的某些特性。一些合法的routing key如下所示:"stock.usd.nyse"、 "nyse.vmw"、"quick.orange.rabbit"。在routing key中你可以使用任意数量的单词,但是不能超过255个字节。

binding key的格式也必须与routing key相同。 topic交换器与direct交换器的内部逻辑相同——一个带有特定routing key的消息将会发往所有binding key匹配的队列中。然而,对于binding key来说有两点需要注意的:

  • “*”代表有且只有一个单词
  • “#”代表有0个或多个单词

通过一个例子我们可以轻松的解释这些概念:

在这个例子中,我们将会发送的消息都代表着某种动物。每条消息都有一个三个单词(和两个点)组成的routing key。其中第一个单词表示速度,第二个单词表示颜色,第三个单词表示种类,“ <speed>.<colour>.<species> ”。

我们创建了三个绑定:Q1队列的binding key为“ *.orange.* ”,Q2队列的binding key为“ *.*.rabbit ”和“ lazy.# ”。

这些绑定关系总结如下:

  • Q1对所有的橘色动物感兴趣
  • Q2想要知道关于兔子和懒惰的动物的所有信息

一个routing key为“ quick.orange.rabbit ”的消息将会同时发往两个队列。routing key为“ lazy.orange.elephant ”的消息也会被同时发往两个队列。另一方面“ quick.orange.fox ”只会被发往Q1,而” lazy.brown.fox “只会被发往Q2。注意即使” lazy.pink.rabbit “匹配了Q2的两个binding key,它也只会被发送一次。” quick.brown.fo “没有任何匹配的key,所以会被丢弃。

如果我们不按照规定来,而是发送一个或四个单词,比如“ orange ”或者“ quick.orange.male.rabbit ”的时候会发生什么呢?这些消息会因为匹配不到任何队列而被丢弃。

然而,虽然“ lazy.orange.male.rabbit ”有四个单词,但它可以和Q2相匹配,所以它会发送Q2。

Topic交换器

Topic交换器非常强大,它可以当做其他的交换器使用。

当一个队列的binding key为“#”的时候——他将会无视routing key并接收所有的消息,就像fanout交换器那样。

当binding key中不包含“#”和“*”的时候,topic交换器的行为就与direct交换器相同。

完整代码

我们将在我们的日志系统中使用topic交换器。我们将假设每条日志消息的routing key都由两个单词组成“ <facility>.<severity> ”。

代码几乎和前一篇教程相同。

==EmitLogTopic.java==

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

try (Connection connection = factory.newConnection();

Channel channel = connection.createChannel()) {

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

String routingKey = getRouting(argv);

String message = getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));

System.out.println(" [x] Sent "" + routingKey + "":"" + message + """);

}

}

//..

}

==ReceiveLogsTopic.java==

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

String queueName = channel.queueDeclare().getQueue();

if (argv.length < 1) {

System.err.println("Usage: ReceiveLogsTopic [binding_key]...");

System.exit(1);

}

for (String bindingKey : argv) {

channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

}

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received "" +

delivery.getEnvelope().getRoutingKey() + "":"" + message + """);

};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

}

}

以上是 (翻译)RabbitMQJavaClient教程(五)主题 的全部内容, 来源链接: utcz.com/z/511163.html

回到顶部