消息生产消费测试

编程

package sun.example.rabbitmq;

import com.rabbitmq.client.*;

import org.junit.After;

import org.junit.Before;

import org.junit.Test;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class T1 {

@Test

public void publish() throws IOException, TimeoutException, InterruptedException {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("sun");

factory.setPassword("111111");

factory.setVirtualHost("test");

factory.setHost("192.168.32.61");

Connection conn = factory.newConnection();

Channel channel = conn.createChannel();

channel.addConfirmListener(new ConfirmListener() {

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

System.out.println("*************************************************");

System.out.println("success");

System.out.println("*************************************************");

}

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

System.out.println("*************************************************");

System.out.println("error");

System.out.println("*************************************************");

}

});

channel.exchangeDeclare("hello-exchange", BuiltinExchangeType.DIRECT, true);

channel.queueDeclare("hello-queue", true, false, false, null);

channel.queueBind("hello-queue", "hello-exchange", "hello");

channel.confirmSelect();

channel.basicPublish("hello-exchange", "hello", MessageProperties.TEXT_PLAIN, "hello".getBytes());

Thread.sleep(Integer.MAX_VALUE);

}

@Test

public void consume() throws IOException, TimeoutException, InterruptedException {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("sun");

factory.setPassword("111111");

factory.setVirtualHost("test");

factory.setHost("192.168.32.61");

Connection conn = factory.newConnection();

Channel channel = conn.createChannel();

channel.exchangeDeclare("hello-exchange", BuiltinExchangeType.DIRECT, true);

channel.queueDeclare("hello-queue", true, false, false, null);

channel.basicConsume("hello-queue", new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("*********************************************************");

System.out.println("consumerTag:" + consumerTag);

System.out.println("body:" + new String(body));

System.out.println("Envelope:" + envelope);

System.out.println("*********************************************************");

channel.basicAck(envelope.getDeliveryTag(), true);

}

});

Thread.sleep(Integer.MAX_VALUE);

}

@Before

@After

public void setUp() throws Exception {

System.out.println("*************************************************");

System.out.println("*************************************************");

System.out.println("*************************************************");

}

}

以上是 消息生产消费测试 的全部内容, 来源链接: utcz.com/z/511959.html

回到顶部