JMS-从一个消费者到多个消费者

我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。

我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。

显然,这将涉及在生产者和消费者方面修改当前的客户代码。

我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在两个不同队列之间而不是一个队列之间平衡负载,这可能会对性能产生积极影响。

我想就您可能会看到的这些选项和缺点/优点获得建议。任何反馈都非常感谢。

回答:

如您所说,您有一些选择。

如果将其转换为主题以达到相同的效果,则需要使使用者成为永久使用者。如果您的消费者还活着,那么队列提供的一件事就是持久性。这将取决于您使用的MQ系统。

如果要坚持使用队列,则将为每个使用者和将在原始队列上侦听的调度程序创建一个队列。

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1

-> Queue_Consumer_2 <- Consumer_2

-> Queue_Consumer_3 <- Consumer_3

  • 动态添加新消费者更容易。所有消费者将无需任何工作即可收到新消息。
  • 您可以创建循环主题,以便Consumer_1会收到一条消息,然后是Consumer_2,然后是Consumer_3。
  • 可以向消费者推送新消息,而不必查询队列以使它们具有响应性。

  • 除非您的代理支持此配置,否则消息不是持久的。如果使用者下线然后返回,则可能会丢失消息,除非设置了持久使用者。
  • 难以允许Consumer_1和Consumer_2接收消息,但不能接收Consumer_3。使用分派器和队列,分派器无法将消息放入Consumer_3的队列中。

  • 消息是持久的,直到使用者将其删除
  • 调度程序可以通过不将消息放入相应的使用者队列来过滤哪些使用者获得哪些消息。不过,这可以通过过滤器通过主题来完成。

  • 需要创建其他队列以支持多个使用者。在动态环境中,这不会有效。

在开发消息系统时,我更喜欢主题,因为它给了我最大的权力,但是看到您已经在使用队列,就需要您更改系统实现主题的工作方式。

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1

-> Queue_Consumer_2 <- Consumer_2

-> Queue_Consumer_3 <- Consumer_3

请记住,您还需要处理其他事情,例如问题异常处理,重新连接以及在丢失连接时重新排队等。这只是为了让您了解如何完成我的工作描述。

在实际系统中,我可能不会在第一个例外退出。我将允许系统继续以最佳状态运行并记录错误。如此代码所示,如​​果将消息放入单个使用者队列失败,则整个调度程序将停止。

分派器

/*

* To change this template, choose Tools | Templates

* and open the template in the editor.

*/

package stackoverflow_4615895;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageProducer;

import javax.jms.Queue;

import javax.jms.QueueConnection;

import javax.jms.QueueConnectionFactory;

import javax.jms.QueueSession;

import javax.jms.Session;

public class Dispatcher {

private static long QUEUE_WAIT_TIME = 1000;

private boolean mStop = false;

private QueueConnectionFactory mFactory;

private String mSourceQueueName;

private String[] mConsumerQueueNames;

/**

* Create a dispatcher

* @param factory

* The QueueConnectionFactory in which new connections, session, and consumers

* will be created. This is needed to ensure the connection is associated

* with the correct thread.

* @param source

*

* @param consumerQueues

*/

public Dispatcher(

QueueConnectionFactory factory,

String sourceQueue,

String[] consumerQueues) {

mFactory = factory;

mSourceQueueName = sourceQueue;

mConsumerQueueNames = consumerQueues;

}

public void start() {

Thread thread = new Thread(new Runnable() {

public void run() {

Dispatcher.this.run();

}

});

thread.setName("Queue Dispatcher");

thread.start();

}

public void stop() {

mStop = true;

}

private void run() {

QueueConnection connection = null;

MessageProducer producer = null;

MessageConsumer consumer = null;

QueueSession session = null;

try {

// Setup connection and queues for receiving the messages

connection = mFactory.createQueueConnection();

session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);

Queue sourceQueue = session.createQueue(mSourceQueueName);

consumer = session.createConsumer(sourceQueue);

// Create a null producer allowing us to send messages

// to any queue.

producer = session.createProducer(null);

// Create the destination queues based on the consumer names we

// were given.

Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];

for (int index = 0; index < mConsumerQueueNames.length; ++index) {

destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);

}

connection.start();

while (!mStop) {

// Only wait QUEUE_WAIT_TIME in order to give

// the dispatcher a chance to see if it should

// quit

Message m = consumer.receive(QUEUE_WAIT_TIME);

if (m == null) {

continue;

}

// Take the message we received and put

// it in each of the consumers destination

// queues for them to process

for (Queue q : destinationQueues) {

producer.send(q, m);

}

}

} catch (JMSException ex) {

// Do wonderful things here

} finally {

if (producer != null) {

try {

producer.close();

} catch (JMSException ex) {

}

}

if (consumer != null) {

try {

consumer.close();

} catch (JMSException ex) {

}

}

if (session != null) {

try {

session.close();

} catch (JMSException ex) {

}

}

if (connection != null) {

try {

connection.close();

} catch (JMSException ex) {

}

}

}

}

}

Main.java

    QueueConnectionFactory factory = ...;

Dispatcher dispatcher =

new Dispatcher(

factory,

"Queue_Original",

new String[]{

"Consumer_Queue_1",

"Consumer_Queue_2",

"Consumer_Queue_3"});

dispatcher.start();

以上是 JMS-从一个消费者到多个消费者 的全部内容, 来源链接: utcz.com/qa/412888.html

回到顶部