聊聊artemisJMSBridge的QualityOfServiceMode

编程

QualityOfServiceMode

activemq-artemis-2.11.0/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/QualityOfServiceMode.java

public enum QualityOfServiceMode {

AT_MOST_ONCE(0), DUPLICATES_OK(1), ONCE_AND_ONLY_ONCE(2);

private final int value;

QualityOfServiceMode(final int value) {

this.value = value;

}

public int intValue() {

return value;

}

public static QualityOfServiceMode valueOf(final int value) {

if (value == AT_MOST_ONCE.value) {

return AT_MOST_ONCE;

}

if (value == DUPLICATES_OK.value) {

return DUPLICATES_OK;

}

if (value == ONCE_AND_ONLY_ONCE.value) {

return ONCE_AND_ONLY_ONCE;

}

throw new IllegalArgumentException("invalid QualityOfServiceMode value: " + value);

}

}

  • QualityOfServiceMode定义了三个枚举值,分别是AT_MOST_ONCE、DUPLICATES_OK、ONCE_AND_ONLY_ONCE

sendBatchNonTransacted

activemq-artemis-2.11.0/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java

public final class JMSBridgeImpl implements JMSBridge {

//......

private void sendBatchNonTransacted() {

try {

if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE || (qualityOfServiceMode == QualityOfServiceMode.AT_MOST_ONCE && maxBatchSize > 1)) {

// We client ack before sending

if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {

ActiveMQJMSBridgeLogger.LOGGER.trace("Client acking source session");

}

messages.getLast().acknowledge();

if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {

ActiveMQJMSBridgeLogger.LOGGER.trace("Client acked source session");

}

}

boolean exHappened;

do {

exHappened = false;

try {

sendMessages();

} catch (TransactionRolledbackException e) {

ActiveMQJMSBridgeLogger.LOGGER.transactionRolledBack(e);

exHappened = true;

}

}

while (exHappened);

if (maxBatchSize > 1) {

// The sending session is transacted - we need to commit it

if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {

ActiveMQJMSBridgeLogger.LOGGER.trace("Committing target session");

}

targetSession.commit();

if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {

ActiveMQJMSBridgeLogger.LOGGER.trace("Committed target session");

}

}

if (qualityOfServiceMode == QualityOfServiceMode.DUPLICATES_OK) {

// We client ack after sending

// Note we could actually use Session.DUPS_OK_ACKNOWLEDGE here

// For a slightly less span delivery guarantee

if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {

ActiveMQJMSBridgeLogger.LOGGER.trace("Client acking source session");

}

messages.getLast().acknowledge();

if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {

ActiveMQJMSBridgeLogger.LOGGER.trace("Client acked source session");

}

}

} catch (Exception e) {

if (!stopping) {

ActiveMQJMSBridgeLogger.LOGGER.bridgeAckError(e, bridgeName);

}

// We don"t call failure otherwise failover would be broken with ActiveMQ

// We let the ExceptionListener to deal with failures

if (connectedSource) {

try {

sourceSession.recover();

} catch (Throwable ignored) {

}

}

} finally {

// Clear the messages

messages.clear();

}

}

//......

}

  • JMSBridgeImpl的sendBatchNonTransacted方法在qualityOfServiceMode为ONCE_AND_ONLY_ONCE或者AT_MOST_ONCE且maxBatchSize大于1的时候先执行messages.getLast().acknowledge();之后使用一个while循环执行sendMessages,循环在没有TransactionRolledbackException异常时会终止;最后在qualityOfServiceMode为DUPLICATES_OK的时候执行messages.getLast().acknowledge()

小结

QualityOfServiceMode定义了三个枚举值,分别是AT_MOST_ONCE、DUPLICATES_OK、ONCE_AND_ONLY_ONCE;JMSBridgeImpl的sendBatchNonTransacted方法在qualityOfServiceMode为ONCE_AND_ONLY_ONCE或者AT_MOST_ONCE(且maxBatchSize大于1)的时候在sendMessages之前先执行ack(如果异常在ack与sendMessages之间,则消息可能丢失;由于ONCE_AND_ONLY_ONCE需要local transaction或者JTA处理,在没有事务情况下与AT_MOST_ONCE相同);而对于qualityOfServiceMode为DUPLICATES_OK的在sendMessages之后执行ack(如果异常在sendMessages与ack之间,则异常之后,client端由于没有收到ack会再次发送消息,可能造成重复)

doc

  • JMSBridgeImpl

以上是 聊聊artemisJMSBridge的QualityOfServiceMode 的全部内容, 来源链接: utcz.com/z/513114.html

回到顶部