聊聊rocketmqclientgo的ACLInterceptor
序
本文主要研究一下rocketmq-client-go的ACLInterceptor
ACLInterceptor
rocketmq-client-go-v2.0.0/internal/remote/interceptor.go
func ACLInterceptor(credentials primitive.Credentials) primitive.Interceptor { return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
cmd := req.(*RemotingCommand)
m := make(map[string]string)
order := make([]string, 1)
m[accessKey] = credentials.AccessKey
order[0] = accessKey
if credentials.SecurityToken != "" {
m[securityToken] = credentials.SecurityToken
}
for k, v := range cmd.ExtFields {
m[k] = v
order = append(order, k)
}
sort.Slice(order, func(i, j int) bool {
return strings.Compare(order[i], order[j]) < 0
})
content := ""
for idx := range order {
content += m[order[idx]]
}
buf := make([]byte, len(content)+len(cmd.Body))
copy(buf, []byte(content))
copy(buf[len(content):], cmd.Body)
cmd.ExtFields[signature] = calculateSignature(buf, []byte(credentials.SecretKey))
cmd.ExtFields[accessKey] = credentials.AccessKey
// The SecurityToken value is unnecessary, user can choose this one.
if credentials.SecurityToken != "" {
cmd.ExtFields[securityToken] = credentials.SecurityToken
}
err := next(ctx, req, reply)
return err
}
}
- ACLInterceptor方法会调用calculateSignature计算签名,然后执行next
calculateSignature
rocketmq-client-go-v2.0.0/internal/remote/interceptor.go
func calculateSignature(data, sk []byte) string { mac := hmac.New(func() hash.Hash {
return sha1.New()
}, sk)
mac.Write(data)
return base64.StdEncoding.EncodeToString(mac.Sum(nil))
}
- calculateSignature方法通过base64来将mac.Sum的结果编码为string
SetCredentials
rocketmq-client-go-v2.0.0/internal/namesrv.go
func (s *namesrvs) SetCredentials(credentials primitive.Credentials) { s.nameSrvClient.RegisterInterceptor(remote.ACLInterceptor(credentials))
}
- namesrv的SetCredentials方法执行的是s.nameSrvClient.RegisterInterceptor(remote.ACLInterceptor(credentials))
RegisterInterceptor
rocketmq-client-go-v2.0.0/internal/remote/remote_client.go
func (c *remotingClient) RegisterInterceptor(interceptors ...primitive.Interceptor) { c.interceptor = primitive.ChainInterceptors(interceptors...)
}
- RegisterInterceptor方法主要是执行primitive.ChainInterceptors
ChainInterceptors
rocketmq-client-go-v2.0.0/primitive/interceptor.go
func ChainInterceptors(interceptors ...Interceptor) Interceptor { if len(interceptors) == 0 {
return nil
}
if len(interceptors) == 1 {
return interceptors[0]
}
return func(ctx context.Context, req, reply interface{}, invoker Invoker) error {
return interceptors[0](ctx, req, reply, getChainedInterceptor(interceptors, 0, invoker))
}
}
- ChainInterceptors方法会执行getChainedInterceptor
getChainedInterceptor
rocketmq-client-go-v2.0.0/primitive/interceptor.go
func getChainedInterceptor(interceptors []Interceptor, cur int, finalInvoker Invoker) Invoker { if cur == len(interceptors)-1 {
return finalInvoker
}
return func(ctx context.Context, req, reply interface{}) error {
return interceptors[cur+1](ctx, req, reply, getChainedInterceptor(interceptors, cur+1, finalInvoker))
}
}
- getChainedInterceptor方法递归执行getChainedInterceptor(interceptors, cur+1, finalInvoker)直到
cur == len(interceptors)-1
小结
ACLInterceptor方法会调用calculateSignature计算签名,然后执行next;calculateSignature方法通过base64来将mac.Sum的结果编码为string
doc
- interceptor
以上是 聊聊rocketmqclientgo的ACLInterceptor 的全部内容, 来源链接: utcz.com/z/518158.html