Key shared功能官方说明

编程

Key shared功能说明


但在 Shared 模式下,相同 Key 下的消息有可能会丢失顺序。用户希望既能使用 Shared 模式并发地消费消息,又能保证同一 Key 下的消息顺序。





在该需求的推动下,2.4.0 版本新增了 Key_Shared 订阅模式,它扩展了 Shared 订阅模式,使一个分区可以有几个消费者并行地消费消息,但是具有相同 Key 的消息将只被路由给一个消费者。



https://github.com/apache/pulsar/wiki/PIP-34%3A-Add-new-subscribe-type-Key_shared

 

 

动机

当前,有3种订阅模式:独占,共享和故障转移。共享模式订阅的使用非常广泛,因为使用者可以有效地并行使用相同分区的消息。但是使用共享模式将不会保持相同密钥的消息顺序,最好使用共享模式,同时还要保持消息的顺序。

具有多分区的故障转移类型订阅可以部分解决此问题。但是使用者编号与分区编号相关联,因此我们无法添加使用者以加快消息分发。

本提案试图引入新的订阅类型Key_shared以扩展共享类型。通过Key_shred,一个分区可以有多个使用者来并行使用消息,而所有具有相同密钥的消息将仅分派给一个使用者。

提案与变更

主要思想是在新的Key_Shared调度程序中引入哈希路由层。Key_Shared订阅中的每个使用者都提供自己的哈希值范围。当需要将消息分发给使用者时,调度程序首先获取消息的Key,对密钥进行哈希处理,然后将此消息发送给提供此哈希值的使用者。

主要工作在哈希层和新调度程序上。

哈希层

就像在邮件讨论中一样,任何可以将密钥映射到使用者的哈希机制都应该在这里工作。我们将使散列机制可插入此提案中。

消息密钥的哈希值确定目标使用者。哈希层具有以下要求:

  1. 每个使用者提供固定范围的哈希值。
  2. 散列值的整个范围可以被所有使用者覆盖。
  3. 一旦消费者被删除,剩下的消费者仍然可以为整个范围服务。

这是一个示例哈希方法:在调度程序中,代理可以收集每个使用者的调度率。添加新使用者后,我们可以选择最繁忙的使用者并拆分其哈希范围,然后将哈希范围的一半分享给新使用者。当某个使用者关闭时,我们可以将其哈希范围分配给相邻的使用者,该散列范围的调度率较低。

这是一个显示粗略想法的图片: 

共有3种块,每个块代表一个消费者。垂直轴表示哈希范围值,而水平轴表示时间。

  1. 在时间0,创建此订阅,并添加第一个使用者-C1,所有哈希范围(0--1)由使用者C1服务。
  2. 在时间T1处,添加了一个新消费者-C2,并且C1仍为哈希范围(0--0.5)提供服务,而哈希范围(0.5--1)的另一半由新消费者C2服务。
  3. 在时间T2,添加了一个新使用者-C3,并且如果C2的调度程序速率大于C1,则拆分并共享C2的哈希范围。散列范围(0.5--0.75)仍由C2提供,而C3提供散列范围(0.75--1)。
  4. 在时间T3,C1关闭,因为C1与C2相邻,所以它的哈希范围将分配给C2。并且C2将投放(0--0.75)
  5. 在时间T4,C2关闭,其哈希范围分配给C3。C3将服务于整个范围(0--1)。

PulsarApi.proto中的更改

在CommandSubscribe.SubType中添加一个新的子类型。在MessageMetadata中添加一个新字段,该字段仅用于此功能。通过使用新的密钥,可以避免影响其他功能。例如,用户想使用原始密钥进行紧凑/分区路由,而用户想使用新密钥进行密钥排序。

message CommandSubscribe { enum SubType { Exclusive = 0; Shared = 1; Failover = 2; Key_Shared = 3; // add new type here < == } message MessageMetadata { ... optional string ordering_key = 18; 

在代理端添加新的调度程序。

PersistentStickyKeyDispatcherMultipleConsumers。主要方法包括:

void addConsumer(Consumer consumer) throws BrokerServiceException { // add consumer // and update the hash range of related consumer. } void removeConsumer(Consumer consumer) throws BrokerServiceException { // remove consumer // and update the hash range of related consumer. } Consumer getConsumer(String key) { // return the consumer that serves this hash key } // once complete read entries from BookKeeper, // dispatch messages to consumer according to the key value. void readEntriesComplete(List<Entry> entries, Object ctx) { // 1. fetch Key out of Entry. // 1. dispatch message to target consumer. // 1. if consumer not have available permit, add message to messagestoReplay. } 

将来的工作

批处理消息:由于消息是通过代理中的密钥调度的。如果要支持批处理消息,则应添加基于密钥的批处理。消费者优先级:由于当前是通过密钥调度的,因此不使用优先级。如果我们想使用它,我们将来可以添加它。



更多信息,参阅 PIP-34(https://github.com/apache/pulsar/wiki/PIP-34:-Add-new-subscribe-type-Key_shared)。

以上是 Key shared功能官方说明 的全部内容, 来源链接: utcz.com/z/516856.html

回到顶部