MongoDB学习7:ChangeStrean [数据库教程]

database

1.什么是Change Stream?

Change Stream是MongoDB用于实现变更追踪的解决方案,类似于关系型数据库的触发器,但原理不完全相同

Change Stream

触发器

触发方式

异步

同步(事务保证)

触发位置

应用回调事件

数据库触发器

触发次数

每个订阅事件的客户端

1次(触发器)

故障恢复

从上一次断点重新触发

事务回滚

2.Change Stream实现原理

Change Stream是基于oplog实现的。它在oplog上开启一个tailable cursor来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数

被追踪的变更事件主要包括:

  • insert/update/delete:插入、更新、删除
  • drop:集合被删除
  • rename:集合被重命名
  • dropDatabase:数据库被删除
  • invalidate:drop/rename/dropDatabase将导致invalidate被触发,并关闭change stream

3.Change Stream与可重复读

Change Stream只推送已经在大多数节点上提交的变更操作。级“可重复度”的变更,这个验证是通过{readConcern:"majority"}实现的,因此

  • 未开启 readConcern:"majority" 的集群无法使用 Change Stream
  • 当集群无法满足{w:"majority"}时,不会触发 Change Stream(例如PSA架构中的S因故障宕机)

4.Change Stream 变更过滤

如果只对某些类型的变更时间感兴趣,可以使用聚合管道的过滤步骤过滤事件

var cs = db.collection.watch([{

$match:{

operationType:{

$in:{"insert","delete"}

}

}

}])

5.开启Change Stream功能

默认是没有开启Change Stream功能的,在配置文件中设置

replication:

enableMajorityReadConcern: true

6.Change Stream 故障恢复

假如在一系列写入操作的过程中,订阅Change Stream的应用在接收到“写3”之后与 t0 时刻崩溃,重启后后续变更怎么办?


想要从上次中断的地方继续获取变更流,只需要保留上次变更通知中的_id即可


上图所示是一次Change Stream回调所返回的数据。没跳这样的数据都带有一个_id,这个_id可以用于断点恢复,例如:

var cs = db.collection.watch([],{resumeAfter:<_id>})

即可从上一条通知中断处继续获取后续的变更通知

7.Change Steam使用场景

  • 跨集群的变更复制--在源集群中订阅 Change Stream,一旦得到任何变更立即写入目标集群
  • 微服务联动--当一个微服务变更数据库时,其他微服务得到通知并做出相应的变更
  • 其他任何需要系统联动的场景

8.Change Steam 注意事项

  • Change Steam依赖于oplog,因此中断时间不可超过oplog回收的最大时间
  • 在执行update操作时,如果只更新了部分数据,那么Change Steam通知的也是增量部分
  • 同理,删除数据时也通知的仅是删除数据的_id

MongoDB学习7:Change Strean

以上是 MongoDB学习7:ChangeStrean [数据库教程] 的全部内容, 来源链接: utcz.com/z/535194.html

回到顶部