(翻译)RabbitMQJavaClient教程(五)主题
在前一篇教程中我们改进了我们的日志系统。我们使用direct交换器替换掉了只能广播消息的fanout交换器,使消费者可以选择想要的日志级别。
不过我们虽然使用了direct交换器改进了系统,但是它还是有一些局限性——他不能基于多个条件做路由。
在我们的日志系统中我们可能不仅仅想要根据日志级别订阅消息,还想要根据日志的发送方来判断。你可能在unix工具syslog中听过类似的概念,它就会根据日志级别(info/warn/crit...)和来源(auth/cron/kern...)来路由日志。
这将会给我们提供很大的灵活性——我们可能仅想要接受来自“ cron ”的error日志,但是来自“kern”的日志则全部接收。
我们实现这个功能,我们需要学习一个更复杂的交换器“topic”。
Topic交换器
发往 topic交换器的消息的routing_key是使用点间隔的一系列单词,而不能随意赋值。 这些单词随意选择,但通常它们代表着与消息有关的某些特性。一些合法的routing key如下所示:"stock.usd.nyse"、 "nyse.vmw"、"quick.orange.rabbit"。在routing key中你可以使用任意数量的单词,但是不能超过255个字节。
binding key的格式也必须与routing key相同。 topic交换器与direct交换器的内部逻辑相同——一个带有特定routing key的消息将会发往所有binding key匹配的队列中。然而,对于binding key来说有两点需要注意的:
- “*”代表有且只有一个单词
- “#”代表有0个或多个单词
通过一个例子我们可以轻松的解释这些概念:
在这个例子中,我们将会发送的消息都代表着某种动物。每条消息都有一个三个单词(和两个点)组成的routing key。其中第一个单词表示速度,第二个单词表示颜色,第三个单词表示种类,“ <speed>.<colour>.<species> ”。
我们创建了三个绑定:Q1队列的binding key为“ *.orange.* ”,Q2队列的binding key为“ *.*.rabbit ”和“ lazy.# ”。
这些绑定关系总结如下:
- Q1对所有的橘色动物感兴趣
- Q2想要知道关于兔子和懒惰的动物的所有信息
一个routing key为“ quick.orange.rabbit ”的消息将会同时发往两个队列。routing key为“ lazy.orange.elephant ”的消息也会被同时发往两个队列。另一方面“ quick.orange.fox ”只会被发往Q1,而” lazy.brown.fox “只会被发往Q2。注意即使” lazy.pink.rabbit “匹配了Q2的两个binding key,它也只会被发送一次。” quick.brown.fo “没有任何匹配的key,所以会被丢弃。
如果我们不按照规定来,而是发送一个或四个单词,比如“ orange ”或者“ quick.orange.male.rabbit ”的时候会发生什么呢?这些消息会因为匹配不到任何队列而被丢弃。
然而,虽然“ lazy.orange.male.rabbit ”有四个单词,但它可以和Q2相匹配,所以它会发送Q2。
Topic交换器
Topic交换器非常强大,它可以当做其他的交换器使用。
当一个队列的binding key为“#”的时候——他将会无视routing key并接收所有的消息,就像fanout交换器那样。
当binding key中不包含“#”和“*”的时候,topic交换器的行为就与direct交换器相同。
完整代码
我们将在我们的日志系统中使用topic交换器。我们将假设每条日志消息的routing key都由两个单词组成“ <facility>.<severity> ”。
代码几乎和前一篇教程相同。
==EmitLogTopic.java==
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent "" + routingKey + "":"" + message + """);
}
}
//..
}
==ReceiveLogsTopic.java==
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received "" +
delivery.getEnvelope().getRoutingKey() + "":"" + message + """);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
以上是 (翻译)RabbitMQJavaClient教程(五)主题 的全部内容, 来源链接: utcz.com/z/511163.html