firehose消息追踪
代码示例
- 开启 firehose
# 在所有节点都要执行[root@rabbitmq1 sbin]# ./rabbitmqctl trace_on -p test
Starting tracing for vhost "test" ...
Trace enabled for vhost test
- 测试代码
public class T21_firehose {
@Test
public void publish() throws IOException, TimeoutException, InterruptedException {
Address[] addresses = new Address[]{
new Address("192.168.32.61", 5672),
new Address("192.168.32.62", 5672),
new Address("192.168.32.63", 5672)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("sun");
factory.setPassword("111111");
factory.setVirtualHost("test");
Connection conn = factory.newConnection(addresses);
Channel channel = conn.createChannel();
String message = "firehose";
channel.exchangeDeclare("firehose-exchange", BuiltinExchangeType.DIRECT, true);
channel.queueDeclare("firehose-queue", true, false, false, null);
channel.queueBind("firehose-queue", "firehose-exchange", "firehose");
channel.basicPublish("firehose-exchange", "firehose", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.close();
System.out.println("confirm success");
}
@Test
public void consume() throws IOException, TimeoutException, InterruptedException {
Address[] addresses = new Address[]{
new Address("192.168.32.61", 5672),
new Address("192.168.32.62", 5672),
new Address("192.168.32.63", 5672)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("sun");
factory.setPassword("111111");
factory.setVirtualHost("test");
Connection conn = factory.newConnection(addresses);
Channel channel = conn.createChannel();
//为了防止报错队列,交换器未定义,对消费端无影响
channel.exchangeDeclare("firehose-exchange", BuiltinExchangeType.DIRECT, true);
channel.queueDeclare("firehose-queue", true, false, false, null);
channel.queueBind("firehose-queue", "firehose-exchange", "firehose");
channel.basicQos(30);
channel.basicConsume("firehose-queue", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("*********************************************************");
System.out.println("consumerTag:" + consumerTag);
System.out.println("body:" + new String(body));
System.out.println("Envelope:" + envelope);
System.out.println("*********************************************************");
//发送确认消息
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
Thread.sleep(Integer.MAX_VALUE);
}
@Test
public void consume_trace() throws IOException, TimeoutException, InterruptedException {
Address[] addresses = new Address[]{
new Address("192.168.32.61", 5672),
new Address("192.168.32.62", 5672),
new Address("192.168.32.63", 5672)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("sun");
factory.setPassword("111111");
factory.setVirtualHost("test");
Connection conn = factory.newConnection(addresses);
Channel channel = conn.createChannel();
//为了防止报错队列,交换器未定义,对消费端无影响
channel.queueDeclare("queue", true, false, false, null);
channel.queueBind("queue", "amq.rabbitmq.trace", "#");
channel.basicQos(30);
channel.basicConsume("queue", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("*************************trace********************************");
System.out.println("consumerTag:" + consumerTag);
System.out.println("body:" + new String(body));
System.out.println("Envelope:" + envelope);
System.out.println("*************************trace********************************");
//发送确认消息
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
Thread.sleep(Integer.MAX_VALUE);
}
}
以上是 firehose消息追踪 的全部内容, 来源链接: utcz.com/z/515266.html