rabbitmqtopic交换器测试
package sun.example.rabbitmq;import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class T2 {
@Test
public void publish() throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("sun");
factory.setPassword("111111");
factory.setVirtualHost("test");
factory.setHost("192.168.32.61");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String message = "hello";
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC, true);
channel.basicPublish("topic-exchange", "error.log", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Thread.sleep(Integer.MAX_VALUE);
}
@Test
public void consumeOne() throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("sun");
factory.setPassword("111111");
factory.setVirtualHost("test");
factory.setHost("192.168.32.61");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare("error-queue", false, false, true, null);
channel.queueBind("error-queue", "topic-exchange", "error.*");
channel.basicConsume("error-queue", new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("[error]body:" + new String(body));
//发送确认消息
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
Thread.sleep(Integer.MAX_VALUE);
}
@Test
public void consumeTwo() throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("sun");
factory.setPassword("111111");
factory.setVirtualHost("test");
factory.setHost("192.168.32.61");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare("log-queue", false, false, true, null);
channel.queueBind("log-queue", "topic-exchange", "*.log");
channel.basicConsume("log-queue", new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("[log] body:" + new String(body));
//发送确认消息
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
Thread.sleep(Integer.MAX_VALUE);
}
}
以上是 rabbitmqtopic交换器测试 的全部内容, 来源链接: utcz.com/z/511958.html