




public interface AllocateMessageQueueStrategy {


* Allocating by consumer id


* @param consumerGroup current consumer group

* @param currentCID current consumer id

* @param mqAll message queue set in current topic

* @param cidAll consumer set in current consumer group

* @return The allocate result of given strategy


List<MessageQueue> allocate(

final String consumerGroup,

final String currentCID,

final List<MessageQueue> mqAll,

final List<String> cidAll



* Algorithm name


* @return The strategy name


String getName();


  • AllocateMessageQueueStrategy定义了allocate、getName方法



public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {

private final InternalLogger log = ClientLogger.getLog();

private final int virtualNodeCnt;

private final HashFunction customHashFunction;

public AllocateMessageQueueConsistentHash() {



public AllocateMessageQueueConsistentHash(int virtualNodeCnt) {

this(virtualNodeCnt, null);


public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {

if (virtualNodeCnt < 0) {

throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt);


this.virtualNodeCnt = virtualNodeCnt;

this.customHashFunction = customHashFunction;



public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,

List<String> cidAll) {

if (currentCID == null || currentCID.length() < 1) {

throw new IllegalArgumentException("currentCID is empty");


if (mqAll == null || mqAll.isEmpty()) {

throw new IllegalArgumentException("mqAll is null or mqAll empty");


if (cidAll == null || cidAll.isEmpty()) {

throw new IllegalArgumentException("cidAll is null or cidAll empty");


List<MessageQueue> result = new ArrayList<MessageQueue>();

if (!cidAll.contains(currentCID)) {

log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",




return result;


Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();

for (String cid : cidAll) {

cidNodes.add(new ClientNode(cid));


final ConsistentHashRouter<ClientNode> router; //for building hash ring

if (customHashFunction != null) {

router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);

} else {

router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);


List<MessageQueue> results = new ArrayList<MessageQueue>();

for (MessageQueue mq : mqAll) {

ClientNode clientNode = router.routeNode(mq.toString());

if (clientNode != null && currentCID.equals(clientNode.getKey())) {




return results;



public String getName() {



private static class ClientNode implements Node {

private final String clientID;

public ClientNode(String clientID) {

this.clientID = clientID;



public String getKey() {

return clientID;




  • AllocateMessageQueueConsistentHash实现了AllocateMessageQueueStrategy接口,它定义了virtualNodeCnt(默认为10)及customHashFunction属性;其allocate方法根据cidAll构造ClientNode列表,然后创建ConsistentHashRouter,最后遍历mqAll使用router.routeNode(mq.toString())选择clientNode,若clientNode不为null且currentCID.equals(clientNode.getKey()),则将该mq添加到results中,最后返回results;其getName方法返回的是CONSISTENT_HASH



public class ConsistentHashRouter<T extends Node> {

private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<Long, VirtualNode<T>>();

private final HashFunction hashFunction;

public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) {

this(pNodes, vNodeCount, new MD5Hash());



* @param pNodes collections of physical nodes

* @param vNodeCount amounts of virtual nodes

* @param hashFunction hash Function to hash Node instances


public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) {

if (hashFunction == null) {

throw new NullPointerException("Hash Function is null");


this.hashFunction = hashFunction;

if (pNodes != null) {

for (T pNode : pNodes) {

addNode(pNode, vNodeCount);





* add physic node to the hash ring with some virtual nodes


* @param pNode physical node needs added to hash ring

* @param vNodeCount the number of virtual node of the physical node. Value should be greater than or equals to 0


public void addNode(T pNode, int vNodeCount) {

if (vNodeCount < 0)

throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);

int existingReplicas = getExistingReplicas(pNode);

for (int i = 0; i < vNodeCount; i++) {

VirtualNode<T> vNode = new VirtualNode<T>(pNode, i + existingReplicas);

ring.put(hashFunction.hash(vNode.getKey()), vNode);




* remove the physical node from the hash ring


public void removeNode(T pNode) {

Iterator<Long> it = ring.keySet().iterator();

while (it.hasNext()) {

Long key = it.next();

VirtualNode<T> virtualNode = ring.get(key);

if (virtualNode.isVirtualNodeOf(pNode)) {






* with a specified key, route the nearest Node instance in the current hash ring


* @param objectKey the object key to find a nearest Node


public T routeNode(String objectKey) {

if (ring.isEmpty()) {

return null;


Long hashVal = hashFunction.hash(objectKey);

SortedMap<Long, VirtualNode<T>> tailMap = ring.tailMap(hashVal);

Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();

return ring.get(nodeHashVal).getPhysicalNode();


public int getExistingReplicas(T pNode) {

int replicas = 0;

for (VirtualNode<T> vNode : ring.values()) {

if (vNode.isVirtualNodeOf(pNode)) {




return replicas;


//default hash function

private static class MD5Hash implements HashFunction {

MessageDigest instance;

public MD5Hash() {

try {

instance = MessageDigest.getInstance("MD5");

} catch (NoSuchAlgorithmException e) {




public long hash(String key) {



byte[] digest = instance.digest();

long h = 0;

for (int i = 0; i < 4; i++) {

h <<= 8;

h |= ((int) digest[i]) & 0xFF;


return h;




  • ConsistentHashRouter使用virtual node来进行一致性哈希,默认的hashFunction为MD5Hash;它提供了addNode、removeNode、routeNode、getExistingReplicas方法;其routeNode方法使用hashFunction.hash(objectKey)计算hashVal,然后使用ring.tailMap(hashVal)获取tailMap,若tailMap不为空则取tailMap.firstKey(),否则取ring.firstKey()作为nodeHashVal,最后根据该值取ring中的VirtualNode再取其physicalNode




  • AllocateMessageQueueConsistentHash

以上是 聊聊rocketmq的AllocateMessageQueueConsistentHash 的全部内容, 来源链接: utcz.com/z/511241.html
