rabbitmqfanout交换器测试

编程

package sun.example.rabbitmq;

import com.rabbitmq.client.*;

import org.junit.Test;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class T3_fanout_exchange {

@Test

public void publish() throws IOException, TimeoutException, InterruptedException {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("sun");

factory.setPassword("111111");

factory.setVirtualHost("test");

factory.setHost("192.168.32.61");

factory.setPort(5672);

Connection conn = factory.newConnection();

Channel channel = conn.createChannel();

String message = "hello";

channel.exchangeDeclare("fanout-exchange", BuiltinExchangeType.FANOUT, true);

channel.basicPublish("fanout-exchange","", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

Thread.sleep(Integer.MAX_VALUE);

}

@Test

public void consumeOne() throws IOException, TimeoutException, InterruptedException {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("sun");

factory.setPassword("111111");

factory.setVirtualHost("test");

factory.setHost("192.168.32.61");

factory.setPort(5672);

Connection conn = factory.newConnection();

Channel channel = conn.createChannel();

channel.queueDeclare("error-queue", false, false, true, null);

channel.queueBind("error-queue", "fanout-exchange", "");

channel.basicConsume("error-queue", new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("[error]body:" + new String(body));

//发送确认消息

channel.basicAck(envelope.getDeliveryTag(), true);

}

});

Thread.sleep(Integer.MAX_VALUE);

}

@Test

public void consumeTwo() throws IOException, TimeoutException, InterruptedException {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("sun");

factory.setPassword("111111");

factory.setVirtualHost("test");

factory.setHost("192.168.32.61");

factory.setPort(5672);

Connection conn = factory.newConnection();

Channel channel = conn.createChannel();

channel.queueDeclare("log-queue", false, false, true, null);

channel.queueBind("log-queue", "fanout-exchange", "");

channel.basicConsume("log-queue", new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("[log] body:" + new String(body));

//发送确认消息

channel.basicAck(envelope.getDeliveryTag(), true);

}

});

Thread.sleep(Integer.MAX_VALUE);

}

}

以上是 rabbitmqfanout交换器测试 的全部内容, 来源链接: utcz.com/z/511957.html

回到顶部