聊聊rocketmq的LitePullConsumer

编程

本文主要研究一下rocketmq的LitePullConsumer

LitePullConsumer

rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java

public interface LitePullConsumer {

void start() throws MQClientException;

void shutdown();

void subscribe(final String topic, final String subExpression) throws MQClientException;

void subscribe(final String topic, final MessageSelector selector) throws MQClientException;

void unsubscribe(final String topic);

void assign(Collection<MessageQueue> messageQueues);

List<MessageExt> poll();

List<MessageExt> poll(long timeout);

void seek(MessageQueue messageQueue, long offset) throws MQClientException;

void pause(Collection<MessageQueue> messageQueues);

void resume(Collection<MessageQueue> messageQueues);

boolean isAutoCommit();

void setAutoCommit(boolean autoCommit);

Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException;

Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException;

void commitSync();

Long committed(MessageQueue messageQueue) throws MQClientException;

void registerTopicMessageQueueChangeListener(String topic,

TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException;

}

  • LitePullConsumer接口定义了start、shutdown、subscribe、unsubscribe、assign、poll、seek、pause、resume、isAutoCommit、setAutoCommit、fetchMessageQueues、offsetForTimestamp、commitSync、committed、registerTopicMessageQueueChangeListener方法

DefaultLitePullConsumer

rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java

public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {

private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;

private String consumerGroup;

private long brokerSuspendMaxTimeMillis = 1000 * 20;

private long consumerTimeoutMillisWhenSuspend = 1000 * 30;

private long consumerPullTimeoutMillis = 1000 * 10;

private MessageModel messageModel = MessageModel.CLUSTERING;

private MessageQueueListener messageQueueListener;

private OffsetStore offsetStore;

private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();

private boolean unitMode = false;

private boolean autoCommit = true;

private int pullThreadNums = 20;

private long autoCommitIntervalMillis = 5 * 1000;

private int pullBatchSize = 10;

private long pullThresholdForAll = 10000;

private int consumeMaxSpan = 2000;

private int pullThresholdForQueue = 1000;

private int pullThresholdSizeForQueue = 100;

private long pollTimeoutMillis = 1000 * 5;

private long topicMetadataCheckIntervalMillis = 30 * 1000;

public DefaultLitePullConsumer() {

this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);

}

public DefaultLitePullConsumer(final String consumerGroup) {

this(null, consumerGroup, null);

}

public DefaultLitePullConsumer(RPCHook rpcHook) {

this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);

}

public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {

this(null, consumerGroup, rpcHook);

}

public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {

this.namespace = namespace;

this.consumerGroup = consumerGroup;

defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);

}

@Override

public void start() throws MQClientException {

this.defaultLitePullConsumerImpl.start();

}

@Override

public void shutdown() {

this.defaultLitePullConsumerImpl.shutdown();

}

@Override

public void subscribe(String topic, String subExpression) throws MQClientException {

this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);

}

@Override

public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {

this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector);

}

@Override

public void unsubscribe(String topic) {

this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));

}

@Override

public void assign(Collection<MessageQueue> messageQueues) {

defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));

}

@Override

public List<MessageExt> poll() {

return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis());

}

@Override

public List<MessageExt> poll(long timeout) {

return defaultLitePullConsumerImpl.poll(timeout);

}

@Override

public void seek(MessageQueue messageQueue, long offset) throws MQClientException {

this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset);

}

@Override

public void pause(Collection<MessageQueue> messageQueues) {

this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues));

}

@Override

public void resume(Collection<MessageQueue> messageQueues) {

this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues));

}

@Override

public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {

return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));

}

@Override

public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {

return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp);

}

@Override

public void registerTopicMessageQueueChangeListener(String topic,

TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException {

this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener);

}

@Override

public void commitSync() {

this.defaultLitePullConsumerImpl.commitSync();

}

@Override

public Long committed(MessageQueue messageQueue) throws MQClientException {

return this.defaultLitePullConsumerImpl.committed(messageQueue);

}

@Override

public boolean isAutoCommit() {

return autoCommit;

}

@Override

public void setAutoCommit(boolean autoCommit) {

this.autoCommit = autoCommit;

}

//......

}

  • DefaultLitePullConsumer继承了ClientConfig,实现了LitePullConsumer接口,其构造器会创建DefaultLitePullConsumerImpl,LitePullConsumer接口定义的方法,其内部实现都委托给了DefaultLitePullConsumerImpl

小结

rocketmq6.0引入了LitePullConsumer,解决Add lite pull consumer support for RocketMQ #1388,提供了如下功能:

(1) Support consume messages in subscribe way with auto rebalance.

(2) Support consume messages in assign way with no auto rebalance support.

(3) Add seek/commit offset for a specified message queue.

doc

  • Add lite pull consumer support for RocketMQ #1388
  • [ISSUE #1388]Add lite pull consumer support for RocketMQ #1386
  • LitePullConsumer

以上是 聊聊rocketmq的LitePullConsumer 的全部内容, 来源链接: utcz.com/z/512175.html

回到顶部