消息生产消费测试
package sun.example.rabbitmq;import com.rabbitmq.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class T1 {
@Test
public void publish() throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("sun");
factory.setPassword("111111");
factory.setVirtualHost("test");
factory.setHost("192.168.32.61");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("*************************************************");
System.out.println("success");
System.out.println("*************************************************");
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("*************************************************");
System.out.println("error");
System.out.println("*************************************************");
}
});
channel.exchangeDeclare("hello-exchange", BuiltinExchangeType.DIRECT, true);
channel.queueDeclare("hello-queue", true, false, false, null);
channel.queueBind("hello-queue", "hello-exchange", "hello");
channel.confirmSelect();
channel.basicPublish("hello-exchange", "hello", MessageProperties.TEXT_PLAIN, "hello".getBytes());
Thread.sleep(Integer.MAX_VALUE);
}
@Test
public void consume() throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("sun");
factory.setPassword("111111");
factory.setVirtualHost("test");
factory.setHost("192.168.32.61");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare("hello-exchange", BuiltinExchangeType.DIRECT, true);
channel.queueDeclare("hello-queue", true, false, false, null);
channel.basicConsume("hello-queue", new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("*********************************************************");
System.out.println("consumerTag:" + consumerTag);
System.out.println("body:" + new String(body));
System.out.println("Envelope:" + envelope);
System.out.println("*********************************************************");
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
Thread.sleep(Integer.MAX_VALUE);
}
@Before
@After
public void setUp() throws Exception {
System.out.println("*************************************************");
System.out.println("*************************************************");
System.out.println("*************************************************");
}
}
以上是 消息生产消费测试 的全部内容, 来源链接: utcz.com/z/511959.html