聊聊rocketmq的AllocateMessageQueueConsistentHash

编程

本文主要研究一下rocketmq的AllocateMessageQueueConsistentHash

AllocateMessageQueueStrategy

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java

public interface AllocateMessageQueueStrategy {

/**

* Allocating by consumer id

*

* @param consumerGroup current consumer group

* @param currentCID current consumer id

* @param mqAll message queue set in current topic

* @param cidAll consumer set in current consumer group

* @return The allocate result of given strategy

*/

List<MessageQueue> allocate(

final String consumerGroup,

final String currentCID,

final List<MessageQueue> mqAll,

final List<String> cidAll

);

/**

* Algorithm name

*

* @return The strategy name

*/

String getName();

}

  • AllocateMessageQueueStrategy定义了allocate、getName方法

AllocateMessageQueueConsistentHash

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java

public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {

private final InternalLogger log = ClientLogger.getLog();

private final int virtualNodeCnt;

private final HashFunction customHashFunction;

public AllocateMessageQueueConsistentHash() {

this(10);

}

public AllocateMessageQueueConsistentHash(int virtualNodeCnt) {

this(virtualNodeCnt, null);

}

public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {

if (virtualNodeCnt < 0) {

throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt);

}

this.virtualNodeCnt = virtualNodeCnt;

this.customHashFunction = customHashFunction;

}

@Override

public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,

List<String> cidAll) {

if (currentCID == null || currentCID.length() < 1) {

throw new IllegalArgumentException("currentCID is empty");

}

if (mqAll == null || mqAll.isEmpty()) {

throw new IllegalArgumentException("mqAll is null or mqAll empty");

}

if (cidAll == null || cidAll.isEmpty()) {

throw new IllegalArgumentException("cidAll is null or cidAll empty");

}

List<MessageQueue> result = new ArrayList<MessageQueue>();

if (!cidAll.contains(currentCID)) {

log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",

consumerGroup,

currentCID,

cidAll);

return result;

}

Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();

for (String cid : cidAll) {

cidNodes.add(new ClientNode(cid));

}

final ConsistentHashRouter<ClientNode> router; //for building hash ring

if (customHashFunction != null) {

router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);

} else {

router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);

}

List<MessageQueue> results = new ArrayList<MessageQueue>();

for (MessageQueue mq : mqAll) {

ClientNode clientNode = router.routeNode(mq.toString());

if (clientNode != null && currentCID.equals(clientNode.getKey())) {

results.add(mq);

}

}

return results;

}

@Override

public String getName() {

return "CONSISTENT_HASH";

}

private static class ClientNode implements Node {

private final String clientID;

public ClientNode(String clientID) {

this.clientID = clientID;

}

@Override

public String getKey() {

return clientID;

}

}

}

  • AllocateMessageQueueConsistentHash实现了AllocateMessageQueueStrategy接口,它定义了virtualNodeCnt(默认为10)及customHashFunction属性;其allocate方法根据cidAll构造ClientNode列表,然后创建ConsistentHashRouter,最后遍历mqAll使用router.routeNode(mq.toString())选择clientNode,若clientNode不为null且currentCID.equals(clientNode.getKey()),则将该mq添加到results中,最后返回results;其getName方法返回的是CONSISTENT_HASH

ConsistentHashRouter

rocketmq-common-4.5.2-sources.jar!/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java

public class ConsistentHashRouter<T extends Node> {

private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<Long, VirtualNode<T>>();

private final HashFunction hashFunction;

public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) {

this(pNodes, vNodeCount, new MD5Hash());

}

/**

* @param pNodes collections of physical nodes

* @param vNodeCount amounts of virtual nodes

* @param hashFunction hash Function to hash Node instances

*/

public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) {

if (hashFunction == null) {

throw new NullPointerException("Hash Function is null");

}

this.hashFunction = hashFunction;

if (pNodes != null) {

for (T pNode : pNodes) {

addNode(pNode, vNodeCount);

}

}

}

/**

* add physic node to the hash ring with some virtual nodes

*

* @param pNode physical node needs added to hash ring

* @param vNodeCount the number of virtual node of the physical node. Value should be greater than or equals to 0

*/

public void addNode(T pNode, int vNodeCount) {

if (vNodeCount < 0)

throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);

int existingReplicas = getExistingReplicas(pNode);

for (int i = 0; i < vNodeCount; i++) {

VirtualNode<T> vNode = new VirtualNode<T>(pNode, i + existingReplicas);

ring.put(hashFunction.hash(vNode.getKey()), vNode);

}

}

/**

* remove the physical node from the hash ring

*/

public void removeNode(T pNode) {

Iterator<Long> it = ring.keySet().iterator();

while (it.hasNext()) {

Long key = it.next();

VirtualNode<T> virtualNode = ring.get(key);

if (virtualNode.isVirtualNodeOf(pNode)) {

it.remove();

}

}

}

/**

* with a specified key, route the nearest Node instance in the current hash ring

*

* @param objectKey the object key to find a nearest Node

*/

public T routeNode(String objectKey) {

if (ring.isEmpty()) {

return null;

}

Long hashVal = hashFunction.hash(objectKey);

SortedMap<Long, VirtualNode<T>> tailMap = ring.tailMap(hashVal);

Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();

return ring.get(nodeHashVal).getPhysicalNode();

}

public int getExistingReplicas(T pNode) {

int replicas = 0;

for (VirtualNode<T> vNode : ring.values()) {

if (vNode.isVirtualNodeOf(pNode)) {

replicas++;

}

}

return replicas;

}

//default hash function

private static class MD5Hash implements HashFunction {

MessageDigest instance;

public MD5Hash() {

try {

instance = MessageDigest.getInstance("MD5");

} catch (NoSuchAlgorithmException e) {

}

}

@Override

public long hash(String key) {

instance.reset();

instance.update(key.getBytes());

byte[] digest = instance.digest();

long h = 0;

for (int i = 0; i < 4; i++) {

h <<= 8;

h |= ((int) digest[i]) & 0xFF;

}

return h;

}

}

}

  • ConsistentHashRouter使用virtual node来进行一致性哈希,默认的hashFunction为MD5Hash;它提供了addNode、removeNode、routeNode、getExistingReplicas方法;其routeNode方法使用hashFunction.hash(objectKey)计算hashVal,然后使用ring.tailMap(hashVal)获取tailMap,若tailMap不为空则取tailMap.firstKey(),否则取ring.firstKey()作为nodeHashVal,最后根据该值取ring中的VirtualNode再取其physicalNode

小结

AllocateMessageQueueConsistentHash实现了AllocateMessageQueueStrategy接口,它定义了virtualNodeCnt(默认为10)及customHashFunction属性;其allocate方法根据cidAll构造ClientNode列表,然后创建ConsistentHashRouter,最后遍历mqAll使用router.routeNode(mq.toString())选择clientNode,若clientNode不为null且currentCID.equals(clientNode.getKey()),则将该mq添加到results中,最后返回results;其getName方法返回的是CONSISTENT_HASH

doc

  • AllocateMessageQueueConsistentHash

以上是 聊聊rocketmq的AllocateMessageQueueConsistentHash 的全部内容, 来源链接: utcz.com/z/511241.html

回到顶部