rabbitmqfanout交换器测试
package sun.example.rabbitmq;import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class T3_fanout_exchange {
@Test
public void publish() throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("sun");
factory.setPassword("111111");
factory.setVirtualHost("test");
factory.setHost("192.168.32.61");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String message = "hello";
channel.exchangeDeclare("fanout-exchange", BuiltinExchangeType.FANOUT, true);
channel.basicPublish("fanout-exchange","", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Thread.sleep(Integer.MAX_VALUE);
}
@Test
public void consumeOne() throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("sun");
factory.setPassword("111111");
factory.setVirtualHost("test");
factory.setHost("192.168.32.61");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare("error-queue", false, false, true, null);
channel.queueBind("error-queue", "fanout-exchange", "");
channel.basicConsume("error-queue", new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("[error]body:" + new String(body));
//发送确认消息
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
Thread.sleep(Integer.MAX_VALUE);
}
@Test
public void consumeTwo() throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("sun");
factory.setPassword("111111");
factory.setVirtualHost("test");
factory.setHost("192.168.32.61");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare("log-queue", false, false, true, null);
channel.queueBind("log-queue", "fanout-exchange", "");
channel.basicConsume("log-queue", new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("[log] body:" + new String(body));
//发送确认消息
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
Thread.sleep(Integer.MAX_VALUE);
}
}
以上是 rabbitmqfanout交换器测试 的全部内容, 来源链接: utcz.com/z/511957.html