mule - Mulesoft 与 Salesforce 流 API 使用 CDC
问题描述
我正在开发一个 Mule API 流来测试 Salesforce 事件流。我设置了连接器并订阅了流媒体频道。
当我创建/更新/删除联系人记录时,这工作得很好,事件通过,我通过将它们添加到另一个数据库来处理它们。
我对replayId
功能有点困惑。使用当前设置,我可以关闭 Mule 应用程序,在组织中创建联系人,然后当我将应用程序重新联机时,它会通过从中断处添加数据来恢复。完美的。
但是,我试图模拟如果骡子应用程序在处理事件时崩溃会发生什么。
我运行了一些 APEX 来创建 100 条随机联系人记录。一旦我看到它记录了我的应用程序中的第一个流,我就杀死了 mule 应用程序。我在这里的假设是,当我恢复应用程序时,它会知道它在哪里停止,就像在之前的测试中创建联系人之前它处于离线状态一样。
我注意到的是,它只处理在我关闭应用程序之前通过的少数联系人。
看起来事件可能在流输入中来得太快,以至于它已经到达replayId
流中的最后一个。但是,由于这些记录仍未添加到我的外部数据库中,因此我正在丢失这些记录。流完成了它应该做的事情,但由于应用程序仍在处理一批工作,我的 100 条记录没有像replayId
反映的那样被提交。
如果在应用程序崩溃之前有大量数据流,我该如何解决这个问题,以免最终丢失数据?commit
我记得在 Kafka 中,一旦将其插入数据库,您就必须能够获取该 id,以便它知道您正式处理的最后一个。Mule 中是否有这样的概念,我可以告诉它我在哪里正式停止并致力于 DB?
解决方案
协议 (CometD) 级别的可靠性意味着许多属性。其中最主要的是订阅者已收到消息的事务性 ACK(确认)。CometD 支持 ACK 作为扩展。Salesforce 的 CometD 实施不支持 ACK。即使确实如此,您仍然会遇到问题……但是风险的频率/损失可能会更低。
在您的情况下,您必须设计一个解决方案,该解决方案相当于查找和重放未提交到目标数据库的事件。您可以使用 Mule 中的自定义代码或接线适配器来执行此操作。不保证重播 ID 值对于连续事件是连续的,但它们将被排序。重放 ID 为 100 的事件 A 将跟随重放 ID 为 200 的事件 B。
您需要在数据库中存储一个重播 ID 值。然后,您可以在重新订阅时使用它(在订阅者失败后)从 SF 中检索数据库中缺少的事件。这仅在故障窗口足够小时才有效。对于标准平台事件许可证,Salesforce 事件保留窗口目前为 24 小时。更高级别的许可证允许更长的保留时间。
根据数据量、事件频率和其他过程参数,您可以使用Heroku Connect开箱即用地获得所有这些。它确实意味着 Heroku 上的 Postgres DB + HC 的许可成本和运营成本,但我们在类似情况下的大多数客户都认为这是值得的。
推荐阅读
- django - 使用 makemigrations 命令不会发生 Django 2.1.5 迁移
- python - 在 Windows 上使用 pgmagick 的 jpg2000 到 jpg 以灰度显示
- typescript - Typescript full-qualify namespace vs import 有什么区别
- c# - 急切地加载多个级别
- memory - 并行性:内存系统给人一种大、便宜和快速的错觉
- oauth-2.0 - 授权标头中是否支持多个不记名令牌
- java - 如何从平面 sql 选择结果集中创建嵌套映射
- database - 仅在 dynamodb 中为具有两种类型值的列创建索引的效果如何
- angular - 为 prod 构建时出现错误 Uncaught NullInjectorError: StaticInjectorError(a)[D -> v]
- java - Java延迟函数