聊聊rocketmqclientgo的api.go

编程

本文主要研究一下rocketmq-client-go的api.go

Producer

rocketmq-client-go-v2.0.0/api.go

type Producer interface {

Start() error

Shutdown() error

SendSync(ctx context.Context, mq ...*primitive.Message) (*primitive.SendResult, error)

SendAsync(ctx context.Context, mq func(ctx context.Context, result *primitive.SendResult, err error),

msg ...*primitive.Message) error

SendOneWay(ctx context.Context, mq ...*primitive.Message) error

}

func NewProducer(opts ...producer.Option) (Producer, error) {

return producer.NewDefaultProducer(opts...)

}

  • Producer定义了Start、Shutdown、SendSync、SendAsync、SendOneWay方法;NewProducer方法通过producer.NewDefaultProducer创建Producer

TransactionProducer

rocketmq-client-go-v2.0.0/api.go

type TransactionProducer interface {

Start() error

Shutdown() error

SendMessageInTransaction(ctx context.Context, mq *primitive.Message) (*primitive.TransactionSendResult, error)

}

func NewTransactionProducer(listener primitive.TransactionListener, opts ...producer.Option) (TransactionProducer, error) {

return producer.NewTransactionProducer(listener, opts...)

}

  • TransactionProducer方法定义了Start、Shutdown、SendMessageInTransaction方法;NewTransactionProducer方法通过producer.NewTransactionProducer创建TransactionProducer

PushConsumer

rocketmq-client-go-v2.0.0/api.go

type PushConsumer interface {

// Start the PullConsumer for consuming message

Start() error

// Shutdown the PullConsumer, all offset of MessageQueue will be sync to broker before process exit

Shutdown() error

// Subscribe a topic for consuming

Subscribe(topic string, selector consumer.MessageSelector,

f func(context.Context, ...*primitive.MessageExt) (consumer.ConsumeResult, error)) error

// Unsubscribe a topic

Unsubscribe(topic string) error

}

func NewPushConsumer(opts ...consumer.Option) (PushConsumer, error) {

return consumer.NewPushConsumer(opts...)

}

  • PushConsumer定义了Start、Shutdown、Subscribe、Unsubscribe方法;NewPushConsumer通过consumer.NewPushConsumer创建PushConsumer

PullConsumer

rocketmq-client-go-v2.0.0/api.go

type PullConsumer interface {

// Start the PullConsumer for consuming message

Start() error

// Shutdown the PullConsumer, all offset of MessageQueue will be commit to broker before process exit

Shutdown() error

// Subscribe a topic for consuming

Subscribe(topic string, selector consumer.MessageSelector) error

// Unsubscribe a topic

Unsubscribe(topic string) error

// MessageQueues get MessageQueue list about for a given topic. This method will issue a remote call to the server

// if it does not already have any MessageQueue about the given topic.

MessageQueues(topic string) []primitive.MessageQueue

// Pull message for the topic specified. It is an error to not have subscribed to any topics before pull for message

//

// Specified numbers of messages is returned if message greater that numbers, and the offset will auto forward.

// It means that if you meeting messages consuming failed, you should process failed messages by yourself.

Pull(ctx context.Context, topic string, numbers int) (*primitive.PullResult, error)

// Pull message for the topic specified from a specified MessageQueue and offset. It is an error to not have

// subscribed to any topics before pull for message. the method will not affect the offset recorded

//

// Specified numbers of messages is returned.

PullFrom(ctx context.Context, mq primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)

// Lookup offset for the given message queue by timestamp. The returned offset for the message queue is the

// earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message

// queue.

//

// Timestamp must be millisecond level, if you want to lookup the earliest offset of the mq, you could set the

// timestamp 0, and if you want to the latest offset the mq, you could set the timestamp math.MaxInt64.

Lookup(ctx context.Context, mq primitive.MessageQueue, timestamp int64) (int64, error)

// Commit the offset of specified mqs to broker, if auto-commit is disable, you must commit the offset manually.

Commit(ctx context.Context, mqs ...primitive.MessageQueue) (int64, error)

// CommittedOffset return the offset of specified Message

CommittedOffset(mq primitive.MessageQueue) (int64, error)

// Seek set offset of the mq, if you wanna re-consuming your message form one position, the method may help you.

// if you want re-consuming from one time, you cloud Lookup() then seek it.

Seek(mq primitive.MessageQueue, offset int64) error

// Pause consuming for specified MessageQueues, after pause, client will not fetch any message from the specified

// message queues

//

// Note that this method does not affect message queue subscription. In particular, it does not cause a group

// rebalance.

//

// if a MessageQueue belong a topic that has not been subscribed, an error will be returned

//Pause(mqs ...primitive.MessageQueue) error

// Resume specified message queues which have been paused with Pause, if a MessageQueue that not paused,

// it will be ignored. if not subscribed, an error will be returned

//Resume(mqs ...primitive.MessageQueue) error

}

// The PullConsumer has not implemented completely, if you want have an experience of PullConsumer, you could use

// consumer.NewPullConsumer(...), but it may changed in the future.

//

// The PullConsumer will be supported in next release

func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) {

return nil, errors.New("pull consumer has not supported")

}

  • PullConsumer定义了Start、Shutdown、Subscribe、Unsubscribe、MessageQueues、Pull、PullFrom、Lookup、Commit、CommittedOffset、Seek方法;NewPullConsumer目前还不支持,返回nil

小结

rocketmq-client-go的api.go定义了Producer、TransactionProducer、PushConsumer、PullConsumer,其中PullConsumer目前还没有实现

doc

  • api

以上是 聊聊rocketmqclientgo的api.go 的全部内容, 来源链接: utcz.com/z/518039.html

回到顶部