聊聊rocketmqclientgo的transactionProducer

编程

本文主要研究一下rocketmq-client-go的transactionProducer

transactionProducer

rocketmq-client-go-v2.0.0/producer/producer.go

type transactionProducer struct {

producer *defaultProducer

listener primitive.TransactionListener

}

  • transactionProducer定义了producer及listener属性

NewTransactionProducer

rocketmq-client-go-v2.0.0/producer/producer.go

func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error) {

producer, err := NewDefaultProducer(opts...)

if err != nil {

return nil, errors.Wrap(err, "NewDefaultProducer failed.")

}

return &transactionProducer{

producer: producer,

listener: listener,

}, nil

}

  • NewTransactionProducer方法实例化transactionProducer

Start

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) Start() error {

go primitive.WithRecover(func() {

tp.checkTransactionState()

})

return tp.producer.Start()

}

  • Start方法先异步执行tp.checkTransactionState(),然后执行tp.producer.Start()

checkTransactionState

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) checkTransactionState() {

for ch := range tp.producer.callbackCh {

switch callback := ch.(type) {

case *internal.CheckTransactionStateCallback:

localTransactionState := tp.listener.CheckLocalTransaction(callback.Msg)

uniqueKey := callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)

if uniqueKey == "" {

uniqueKey = callback.Msg.MsgId

}

header := &internal.EndTransactionRequestHeader{

CommitLogOffset: callback.Header.CommitLogOffset,

ProducerGroup: tp.producer.group,

TranStateTableOffset: callback.Header.TranStateTableOffset,

FromTransactionCheck: true,

MsgID: uniqueKey,

TransactionId: callback.Header.TransactionId,

CommitOrRollback: tp.transactionState(localTransactionState),

}

req := remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil)

req.Remark = tp.errRemark(nil)

err := tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req,

tp.producer.options.SendMsgTimeout)

if err != nil {

rlog.Error("send ReqENDTransaction to broker error", map[string]interface{}{

"callback": callback.Addr.String(),

"request": req.String(),

rlog.LogKeyUnderlayError: err,

})

}

default:

rlog.Error(fmt.Sprintf("unknown type %v", ch), nil)

}

}

}

  • checkTransactionState方法遍历tp.producer.callbackCh,根据type来不同处理,目前支持CheckTransactionStateCallback,它会构造EndTransactionRequestHeader执行tp.producer.client.InvokeOneWay

Shutdown

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) Shutdown() error {

return tp.producer.Shutdown()

}

  • Shutdown方法执行tp.producer.Shutdown()

SendMessageInTransaction

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) SendMessageInTransaction(ctx context.Context, msg *primitive.Message) (*primitive.TransactionSendResult, error) {

msg.WithProperty(primitive.PropertyTransactionPrepared, "true")

msg.WithProperty(primitive.PropertyProducerGroup, tp.producer.options.GroupName)

rsp, err := tp.producer.SendSync(ctx, msg)

if err != nil {

return nil, err

}

localTransactionState := primitive.UnknowState

switch rsp.Status {

case primitive.SendOK:

if len(rsp.TransactionID) > 0 {

msg.WithProperty("__transactionId__", rsp.TransactionID)

}

transactionId := msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)

if len(transactionId) > 0 {

msg.TransactionId = transactionId

}

localTransactionState = tp.listener.ExecuteLocalTransaction(msg)

if localTransactionState != primitive.CommitMessageState {

rlog.Error("executeLocalTransaction but state unexpected", map[string]interface{}{

"localState": localTransactionState,

"message": msg,

})

}

case primitive.SendFlushDiskTimeout, primitive.SendFlushSlaveTimeout, primitive.SendSlaveNotAvailable:

localTransactionState = primitive.RollbackMessageState

default:

}

tp.endTransaction(*rsp, err, localTransactionState)

transactionSendResult := &primitive.TransactionSendResult{

SendResult: rsp,

State: localTransactionState,

}

return transactionSendResult, nil

}

  • SendMessageInTransaction方法先执行tp.producer.SendSync(ctx, msg),然后根据rsp.Status来做不同处理;对于primitive.SendOK执行tp.listener.ExecuteLocalTransaction来更新localTransactionState;对于primitive.SendFlushDiskTimeout、primitive.SendFlushSlaveTimeout、primitive.SendSlaveNotAvailable则更新localTransactionState为primitive.RollbackMessageState;最后执行tp.endTransaction

小结

transactionProducer定义了producer及listener属性;它提供了NewTransactionProducer、Start、Shutdown、SendMessageInTransaction方法

doc

  • producer

以上是 聊聊rocketmqclientgo的transactionProducer 的全部内容, 来源链接: utcz.com/z/518088.html

回到顶部