如何使用JmsTemplate进行手动确认并从Rabbitmq队列中删除消息

我在jmsTemplate中使用RabbitMq(带有JMS),我可以使用RabbitMq队列中的消息,但是它是自动确认的。

我有Search API,但找不到它。

如何设置手动确认。

在下面的代码中,当从队列中消费了消息时,我想用该消息调用Web服务,并取决于来自我的响应,我想从队列中删除该消息。我创建了一个项目,其中我正在使用侦听器,而另一个项目是通过调用从队列中读取消息

第一个项目:

package com.es.jms.listener;

import javax.jms.ConnectionFactory;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.TextMessage;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.jms.listener.MessageListenerContainer;

import org.springframework.jms.listener.SimpleMessageListenerContainer;

import com.rabbitmq.jms.admin.RMQConnectionFactory;

@Configuration

public class RabbitMqMessageListener {

@Bean

public ConnectionFactory jmsConnectionFactory() {

RMQConnectionFactory connectionFactory = new RMQConnectionFactory();

connectionFactory.setUsername("Username");

connectionFactory.setPassword("Password");

connectionFactory.setVirtualHost("vhostname");

connectionFactory.setHost("hostname");

return connectionFactory;

}

@Bean

public MessageListener msgListener() {

return new MessageListener() {

public void onMessage(Message message) {

System.out.println(message.toString());

if (message instanceof TextMessage) {

try {

String msg = ((TextMessage) message).getText();

System.out.println("Received message: " + msg);

// call web service here and depends on web service

// response

// if 200 then delete msg from queue else keep msg in

// queue

} catch (JMSException ex) {

throw new RuntimeException(ex);

}

}

}

};

}

@Bean

public MessageListenerContainer messageListenerContainer() {

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

container.setConnectionFactory(jmsConnectionFactory());

container.setDestinationName("test");

container.setMessageListener(msgListener());

return container;

}

}

第二项目:

package com.rabbitmq.jms.consumer.controller;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.TimeoutException;

import javax.jms.ConnectionFactory;

import org.json.JSONException;

import org.json.JSONObject;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.http.HttpStatus;

import org.springframework.http.ResponseEntity;

import org.springframework.jms.JmsException;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.CrossOrigin;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RequestMethod;

import org.springframework.web.bind.annotation.ResponseBody;

import com.rabbitmq.jms.admin.RMQConnectionFactory;

import redis.clients.jedis.Jedis;

@Controller

public class ReceiverController {

@Autowired

JmsTemplate jmsTemplate;

@Bean

public ConnectionFactory jmsConnectionFactory() {

RMQConnectionFactory connectionFactory = new RMQConnectionFactory();

connectionFactory.setUsername("Username");

connectionFactory.setPassword("Password");

connectionFactory.setVirtualHost("vhostname");

connectionFactory.setHost("hostname");

return connectionFactory;

}

@CrossOrigin

@SuppressWarnings({ "unchecked", "rawtypes" })

@RequestMapping(method = RequestMethod.GET, value = "/getdata")

@ResponseBody

public ResponseEntity<String> fecthDataFromRedis()

throws JSONException, InterruptedException, JmsException, ExecutionException, TimeoutException {

System.out.println("in controller");

jmsTemplate.setReceiveTimeout(500L);

// jmsTemplate.

String message = (String) jmsTemplate.receiveAndConvert("test");

// call web service here and depends on web service

// response

// if 200 then delete msg from queue else keep msg in

// queue

System.out.println(message);

}

return new ResponseEntity(message , HttpStatus.OK);

}

}

我怎样才能做到这一点?

提前致谢。

回答:

您没有使用JmsTemplate,而是使用SimpleMessageListenerContainer来接收消息。

如果您 使用模板,你将不得不使用的execute方法有SessionCallback,因为必须确认其内接收消息的会话的范围内。

但是,使用SimpleMessageListenerContainer,您只需将设置sessionAcknowledgeModeSession.CLIENT_ACKNOWLEDGE。参见容器javadocs

/**

* Message listener container that uses the plain JMS client API's

* {@code MessageConsumer.setMessageListener()} method to

* create concurrent MessageConsumers for the specified listeners.

*

* <p>This is the simplest form of a message listener container.

* It creates a fixed number of JMS Sessions to invoke the listener,

* not allowing for dynamic adaptation to runtime demands. Its main

* advantage is its low level of complexity and the minimum requirements

* on the JMS provider: Not even the ServerSessionPool facility is required.

*

* <p>See the {@link AbstractMessageListenerContainer} javadoc for details

* on acknowledge modes and transaction options. Note that this container

* exposes standard JMS behavior for the default "AUTO_ACKNOWLEDGE" mode:

* that is, automatic message acknowledgment after listener execution,

* with no redelivery in case of a user exception thrown but potential

* redelivery in case of the JVM dying during listener execution.

*

* <p>For a different style of MessageListener handling, through looped

* {@code MessageConsumer.receive()} calls that also allow for

* transactional reception of messages (registering them with XA transactions),

* see {@link DefaultMessageListenerContainer}.

...

使用时JmsTemplate,您必须在会话范围内进行工作-这是…

首先,您必须在模板中启用客户端确认…

this.jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

然后,将execute方法与SessionCallback…一起使用

Boolean result = this.jmsTemplate.execute(session -> {

MessageConsumer consumer = session.createConsumer(

this.jmsTemplate.getDestinationResolver().resolveDestinationName(session, "bar", false));

String result = null;

try {

Message received = consumer.receive(5000);

if (received != null) {

result = (String) this.jmsTemplate.getMessageConverter().fromMessage(received);

// Do some stuff here.

received.acknowledge();

return true;

}

}

catch (Exception e) {

return false;

}

finally {

consumer.close();

}

}, true);

以上是 如何使用JmsTemplate进行手动确认并从Rabbitmq队列中删除消息 的全部内容, 来源链接: utcz.com/qa/433309.html

回到顶部