google-cloud-platform - 我将如何根据数百条规则合并 apache beam / dataflow 中的相关记录?
问题描述
我有必须在记录级别加入的数据。例如,有关用户的数据来自不同的源系统,但没有共同的主键或用户标识符
示例数据
Source System 1:
{userid = 123, first_name="John", last_name="Smith", many other columns...}
Source System 2:
{userid = EFCBA-09DA0, fname="J.", lname="Smith", many other columns...}
- 我可以使用大约 100 条规则将一条记录与另一条记录进行比较,以查看源系统 1 中的客户是否与源系统 2 相同。
- 某些规则可能能够推断记录值并将数据添加到有关客户的主记录中。
- 由于某些规则可能会向任何特定记录推断/添加数据,因此当记录更改时必须再次重新应用这些规则。
- 我们每天有数百万条记录需要统一
Apache Beam / 数据流实现
- Apache Beam DAG 根据定义是非循环的,但我可以通过 pubsub 将数据重新发布到同一个 DAG 以使其成为循环算法。
- 我可以创建一个哈希图的 PCollection,它不断地对所有其他元素进行自连接,但这似乎是一种效率低下的方法
- 如果我想在通过规则时不断修改事物,PCollection 的不变性是一个问题。这听起来像
Flink Gelly
或Spark GraphX
您可能知道数据流中有什么方法可以有效地处理此类问题吗?
其他想法
- Prolog:我尝试使用规则子集在这些数据的子集上运行,但 swi-prolog 似乎没有可扩展性,我无法弄清楚如何将结果持续发送到其他进程。
- JDrools/Jess/Rete:前向链接对于推理和高效的部分应用来说是完美的,但这种算法更多的是对单个记录应用许多规则,而不是从可能相关的记录中推断记录信息。
- 图形数据库:类似
neo4j
或datomic
会很好,因为连接是在记录级别而不是行/列扫描,但我不知道是否可以在梁中做类似的事情 - BigQuery 或 Spanner:在 SQL 中强制执行这些规则并对每条记录进行全表扫描真的很慢。最好将所有记录的图表保存在内存中并在内存中计算。我们还可以尝试连接所有列并在所有列上运行多个比较和更新
或者也许有一种更标准的方法来解决这类问题。
解决方案
到目前为止,很难说哪种解决方案最适合您。我会尝试进一步分解问题并尝试分别解决不同的方面。
据我了解,目标是将代表不同来源中相同事物的匹配记录组合在一起:
- 记录来自多个来源:
- 它在逻辑上是相同的数据,但格式不同;
- 有规则可以判断记录是否代表同一实体:
- 规则的集合是静态的;
所以,逻辑大概是这样的:
- 阅读记录;
- 尝试查找现有的匹配记录;
- 如果找到匹配记录:
- 用新数据更新它;
- 否则保存记录以备将来匹配;
- 重复;
对我来说,这看起来非常高级,在这个细节级别上可能没有单一的“正确”解决方案。
我可能会尝试通过首先更详细地理解它来解决这个问题(也许你已经这样做了),很少有想法:
- 数据的属性是什么?
- 有图案吗?例如,当一个系统发布某些内容时,您是否期望其他系统发布其他内容?
- 一般有什么要求?
- 延迟、一致性、可用性等;
- 如何从源中读取数据?
- 所有系统都可以在文件中批量发布记录,将它们提交到 PubSub,您的解决方案是否需要轮询它们等?
- 可以并行读取数据还是单个流?
- 那么在不同的假设和要求下,如何有效匹配记录的主要问题也可能会有所不同。例如,我会考虑:
- 你能把所有数据都放在内存里吗?
- 你的规则是动态的。他们有没有改变,当他们改变时会发生什么;
- 你能把数据分成可以单独存储和有效匹配的类别吗?例如,如果你知道你可以尝试通过 id 字段匹配一些东西,通过某些东西的哈希等来匹配其他一些东西;
- 您是否需要匹配所有历史/现有数据?
- 你能有一些快速排除逻辑来不做昂贵的检查吗?
- 解决方案的输出是什么?对输出有什么要求?
推荐阅读
- c# - Linq 语句有 10 个错误
- node.js - 在heroku上部署后反应网络错误
- javascript - 为轮播绘制一行内容
- oracle - 调用用户定义的异常,但是当引发其他异常时
- mysql - 计算 MySQL 中的最新 dup 值
- pytorch - 有没有办法计算两个二维数组彼此之间的差异?
- php - 您如何在 Google Cloud Vision(PHP SDK)中向 ImageAnnotatorClient 进行身份验证(或传递身份验证文件)
- python - 在 PyQt5 中上下移动行
- r - 从重复曝光和参与者中创建新的数据框,并且只添加新数据
- javascript - 查询数据已接收但无法从 GraphQL API 访问