firehose消息追踪

编程

代码示例

  • 开启 firehose

# 在所有节点都要执行

[root@rabbitmq1 sbin]# ./rabbitmqctl trace_on -p test

Starting tracing for vhost "test" ...

Trace enabled for vhost test

  • 测试代码

public class T21_firehose {

@Test

public void publish() throws IOException, TimeoutException, InterruptedException {

Address[] addresses = new Address[]{

new Address("192.168.32.61", 5672),

new Address("192.168.32.62", 5672),

new Address("192.168.32.63", 5672)

};

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("sun");

factory.setPassword("111111");

factory.setVirtualHost("test");

Connection conn = factory.newConnection(addresses);

Channel channel = conn.createChannel();

String message = "firehose";

channel.exchangeDeclare("firehose-exchange", BuiltinExchangeType.DIRECT, true);

channel.queueDeclare("firehose-queue", true, false, false, null);

channel.queueBind("firehose-queue", "firehose-exchange", "firehose");

channel.basicPublish("firehose-exchange", "firehose", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

channel.close();

System.out.println("confirm success");

}

@Test

public void consume() throws IOException, TimeoutException, InterruptedException {

Address[] addresses = new Address[]{

new Address("192.168.32.61", 5672),

new Address("192.168.32.62", 5672),

new Address("192.168.32.63", 5672)

};

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("sun");

factory.setPassword("111111");

factory.setVirtualHost("test");

Connection conn = factory.newConnection(addresses);

Channel channel = conn.createChannel();

//为了防止报错队列,交换器未定义,对消费端无影响

channel.exchangeDeclare("firehose-exchange", BuiltinExchangeType.DIRECT, true);

channel.queueDeclare("firehose-queue", true, false, false, null);

channel.queueBind("firehose-queue", "firehose-exchange", "firehose");

channel.basicQos(30);

channel.basicConsume("firehose-queue", new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("*********************************************************");

System.out.println("consumerTag:" + consumerTag);

System.out.println("body:" + new String(body));

System.out.println("Envelope:" + envelope);

System.out.println("*********************************************************");

//发送确认消息

channel.basicAck(envelope.getDeliveryTag(), true);

}

});

Thread.sleep(Integer.MAX_VALUE);

}

@Test

public void consume_trace() throws IOException, TimeoutException, InterruptedException {

Address[] addresses = new Address[]{

new Address("192.168.32.61", 5672),

new Address("192.168.32.62", 5672),

new Address("192.168.32.63", 5672)

};

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("sun");

factory.setPassword("111111");

factory.setVirtualHost("test");

Connection conn = factory.newConnection(addresses);

Channel channel = conn.createChannel();

//为了防止报错队列,交换器未定义,对消费端无影响

channel.queueDeclare("queue", true, false, false, null);

channel.queueBind("queue", "amq.rabbitmq.trace", "#");

channel.basicQos(30);

channel.basicConsume("queue", new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("*************************trace********************************");

System.out.println("consumerTag:" + consumerTag);

System.out.println("body:" + new String(body));

System.out.println("Envelope:" + envelope);

System.out.println("*************************trace********************************");

//发送确认消息

channel.basicAck(envelope.getDeliveryTag(), true);

}

});

Thread.sleep(Integer.MAX_VALUE);

}

}

以上是 firehose消息追踪 的全部内容, 来源链接: utcz.com/z/515266.html

回到顶部