首页 > 解决方案 > Mulesoft 与 Salesforce 流 API 使用 CDC

问题描述

我正在开发一个 Mule API 流来测试 Salesforce 事件流。我设置了连接器并订阅了流媒体频道。

当我创建/更新/删除联系人记录时,这工作得很好,事件通过,我通过将它们添加到另一个数据库来处理它们。

在此处输入图像描述

我对replayId功能有点困惑。使用当前设置,我可以关闭 Mule 应用程序,在组织中创建联系人,然后当我将应用程序重新联机时,它会通过从中断处添加数据来恢复。完美的。

但是,我试图模拟如果骡子应用程序在处理事件时崩溃会发生什么。

我运行了一些 APEX 来创建 100 条随机联系人记录。一旦我看到它记录了我的应用程序中的第一个流,我就杀死了 mule 应用程序。我在这里的假设是,当我恢复应用程序时,它会知道它在哪里停止,就像在之前的测试中创建联系人之前它处于离线状态一样。

我注意到的是,它只处理在我关闭应用程序之前通过的少数联系人。

看起来事件可能在流输入中来得太快,以至于它已经到达replayId流中的最后一个。但是,由于这些记录仍未添加到我的外部数据库中,因此我正在丢失这些记录。流完成了它应该做的事情,但由于应用程序仍在处理一批工作,我的 100 条记录没有像replayId反映的那样被提交。

如果在应用程序崩溃之前有大量数据流,我该如何解决这个问题,以免最终丢失数据?commit我记得在 Kafka 中,一旦将其插入数据库,您就必须能够获取该 id,以便它知道您正式处理的最后一个。Mule 中是否有这样的概念,我可以告诉它我在哪里正式停止并致力于 DB?

标签: muleesbcometddata-streamchange-data-capture

解决方案


协议 (CometD) 级别的可靠性意味着许多属性。其中最主要的是订阅者已收到消息的事务性 ACK(确认)。CometD 支持 ACK 作为扩展。Salesforce 的 CometD 实施不支持 ACK。即使确实如此,您仍然会遇到问题……但是风险的频率/损失可能会更低。

在您的情况下,您必须设计一个解决方案,该解决方案相当于查找和重放未提交到目标数据库的事件。您可以使用 Mule 中的自定义代码或接线适配器来执行此操作。不保证重播 ID 值对于连续事件是连续的,但它们将被排序。重放 ID 为 100 的事件 A 将跟随重放 ID 为 200 的事件 B。

您需要在数据库中存储一个重播 ID 值。然后,您可以在重新订阅时使用它(在订阅者失败后)从 SF 中检索数据库中缺少的事件。这仅在故障窗口足够小时才有效。对于标准平台事件许可证,Salesforce 事件保留窗口目前为 24 小时。更高级别的许可证允许更长的保留时间。

根据数据量、事件频率和其他过程参数,您可以使用Heroku Connect开箱即用地获得所有这些。它确实意味着 Heroku 上的 Postgres DB + HC 的许可成本和运营成本,但我们在类似情况下的大多数客户都认为这是值得的。


推荐阅读