聊聊rocketmqclientgo的QueueSelector

编程

本文主要研究一下rocketmq-client-go的QueueSelector

QueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type QueueSelector interface {

Select(*primitive.Message, []*primitive.MessageQueue) *primitive.MessageQueue

}

  • QueueSelector接口,定义了Select方法

manualQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type manualQueueSelector struct{}

func NewManualQueueSelector() QueueSelector {

return new(manualQueueSelector)

}

func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {

return message.Queue

}

  • manualQueueSelector的select方法直接返回message.Queue

NewRandomQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type randomQueueSelector struct {

rander *rand.Rand

}

func NewRandomQueueSelector() QueueSelector {

s := new(randomQueueSelector)

s.rander = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))

return s

}

func (r randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {

i := r.rander.Intn(len(queues))

return queues[i]

}

  • NewRandomQueueSelector方法先创建randomQueueSelector,然后设置其rander;Select方法通过r.rander.Intn(len(queues))随机选择index,然后从queue取值

roundRobinQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type roundRobinQueueSelector struct {

sync.Locker

indexer map[string]*int32

}

func NewRoundRobinQueueSelector() QueueSelector {

s := &roundRobinQueueSelector{

Locker: new(sync.Mutex),

indexer: map[string]*int32{},

}

return s

}

func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {

t := message.Topic

if _, exist := r.indexer[t]; !exist {

r.Lock()

if _, exist := r.indexer[t]; !exist {

var v = int32(0)

r.indexer[t] = &v

}

r.Unlock()

}

index := r.indexer[t]

i := atomic.AddInt32(index, 1)

if i < 0 {

i = -i

atomic.StoreInt32(index, 0)

}

qIndex := int(i) % len(queues)

return queues[qIndex]

}

  • roundRobinQueueSelector的qIndex为int(i) % len(queues)

hashQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

type hashQueueSelector struct {

random QueueSelector

}

func NewHashQueueSelector() QueueSelector {

return &hashQueueSelector{

random: NewRandomQueueSelector(),

}

}

// hashQueueSelector choose the queue by hash if message having sharding key, otherwise choose queue by random instead.

func (h *hashQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {

key := message.GetShardingKey()

if len(key) == 0 {

return h.random.Select(message, queues)

}

hasher := fnv.New32a()

_, err := hasher.Write([]byte(key))

if err != nil {

return nil

}

queueId := int(hasher.Sum32()) % len(queues)

if queueId < 0 {

queueId = -queueId

}

return queues[queueId]

}

  • hashQueueSelector通过int(hasher.Sum32()) % len(queues)来计算queue的index

小结

rocketmq-client-go的selector.go定义了manualQueueSelector、roundRobinQueueSelector、hashQueueSelector

doc

  • selector.go

以上是 聊聊rocketmqclientgo的QueueSelector 的全部内容, 来源链接: utcz.com/z/518024.html

回到顶部