rabbitmqtopic交换器测试

编程

package sun.example.rabbitmq;

import com.rabbitmq.client.*;

import org.junit.Test;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class T2 {

@Test

public void publish() throws IOException, TimeoutException, InterruptedException {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("sun");

factory.setPassword("111111");

factory.setVirtualHost("test");

factory.setHost("192.168.32.61");

factory.setPort(5672);

Connection conn = factory.newConnection();

Channel channel = conn.createChannel();

String message = "hello";

channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC, true);

channel.basicPublish("topic-exchange", "error.log", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

Thread.sleep(Integer.MAX_VALUE);

}

@Test

public void consumeOne() throws IOException, TimeoutException, InterruptedException {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("sun");

factory.setPassword("111111");

factory.setVirtualHost("test");

factory.setHost("192.168.32.61");

factory.setPort(5672);

Connection conn = factory.newConnection();

Channel channel = conn.createChannel();

channel.queueDeclare("error-queue", false, false, true, null);

channel.queueBind("error-queue", "topic-exchange", "error.*");

channel.basicConsume("error-queue", new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("[error]body:" + new String(body));

//发送确认消息

channel.basicAck(envelope.getDeliveryTag(), true);

}

});

Thread.sleep(Integer.MAX_VALUE);

}

@Test

public void consumeTwo() throws IOException, TimeoutException, InterruptedException {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("sun");

factory.setPassword("111111");

factory.setVirtualHost("test");

factory.setHost("192.168.32.61");

factory.setPort(5672);

Connection conn = factory.newConnection();

Channel channel = conn.createChannel();

channel.queueDeclare("log-queue", false, false, true, null);

channel.queueBind("log-queue", "topic-exchange", "*.log");

channel.basicConsume("log-queue", new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("[log] body:" + new String(body));

//发送确认消息

channel.basicAck(envelope.getDeliveryTag(), true);

}

});

Thread.sleep(Integer.MAX_VALUE);

}

}

以上是 rabbitmqtopic交换器测试 的全部内容, 来源链接: utcz.com/z/511958.html

回到顶部