google-cloud-dataflow - Beam 将 PTransform 应用于值,同时保留键
问题描述
我似乎在 Beam 中与这种模式作斗争。这是一个流式传输管道。
在高水平:
- 消息进入兔子
- 消息内容包括一个 ID 和 N 个 S3 文件路径
- 我想在列出的所有 S3 文件中生成一些聚合,但结果应该由原始消息键入
- 使用聚合结果将消息写回rabbit,每个传入消息一个
不可避免地,我最终得到了一些PCollection[KV[MessageId, S3FilePaths]]
,并想在 s 上应用一堆PTransform
sS3FilePaths
但不要失去它们最初是由 键控的事实MessageId
。
我似乎找不到通用的“映射 KV pcollection 的值但保留密钥”功能,我相信我必须使用 PTransform (而不是 a DoFn
),因为文件 IO 已全部实现为 PTransforms。
我从根本上认为这是错误的方式吗?任何帮助深表感谢。
更新:对不起,细节很清楚。在令人沮丧的星期五结束时发布此消息是我自己的错。
我有一些基本的绊脚石:
- 我已经意识到这
PCollection[KV
真的是为了组合已经加载的数据。试图将每个操作隔离V
为一组单独的管道操作并没有真正与 API 相结合 - 我没有为手头的任务适当地设置全局窗口/触发。此外,我的转换并不总是保留我假设的窗口/窗格语义。
- 我为每条消息输入了不同的 s3 文件路径,但由于诸如https://issues.apache.org/jira/browse/BEAM-7753之类的问题,
FileIO
API 以PTransform
s 为导向,这不容易让我标记结果使用传入的消息 ID,并且无法ReadableFile
直接实例化(它是包私有的),我无法将它拼凑在一起。我最终将 Java 的 S3 客户端包装在一个自定义PTransform
中,该自定义保留了原始MessageId
值以及每个返回值。
在这一点上,我有一些端到端的工作。我的代码实际上是用scio编写的,所以分享起来有点棘手,但在高层次上:
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())
在固定的 1 秒窗口中使用触发器从 RabbitMQ 读取,并小心使用ProcessContext.output
以在整个过程中保留窗口和时间戳PCollection[(MessageId, V)]
始终使用(scalaTuple2
syntax)的一般形式。什么时候V
是 S3 文件的路径,我利用路径中的一个PTransform
发出文件内容的路径(这是一种不受支持的数据格式)- 聚合在分组到之后完成
PCollection[KV[(MessageId, FileElementId), FileElement]]
,然后再归约到,PCollection[MessageId, FileElement]
以便保留每个传入消息的归约语义。
2号让我有点失望。我希望能够利用光束文件系统函数从文件中读取并将每个输出与它指定的消息 id 结合起来。但我现在处于一个好位置。
解决方案
KV<KeyT, ValueT>
如果它们只接受 aKeyT
或,则无法将转换应用于 a ValueT
。如果您需要在对值应用转换时保留密钥,那么推荐的方法是编写自己的DoFns
可以接受KV
但忽略密钥的方法,或者重组管道,以便您不依赖于转换的输出需要放下钥匙。
推荐阅读
- c# - 如何获取LiveCharts系列中鼠标选中点的坐标?
- c - 我如何优化这个(这是一个我的世界着色器)
- python - 如何使用嵌入式 python 解释器(pybind11)构建和执行 cpp 文件
- c# - 从 JSON 文件在 C# 中创建一个类
- c++ - 此代码显示错误,弹出窗口“调试断言失败”
- .net - 程序作为控制台运行但不作为 Windows 服务运行
- r - dplyr / tidy 方法来过滤基于子字符串的向量?
- javascript - 我在状态中有一个 T 和 Z 格式的数据,React 输入接受 HH:MM ss 格式,所以如何转换
- ios - 在较旧的 iOS 设备上安装 IPA 时出错:无法为 64 位 Mach-O 输入文件找到匹配的拱门
- github - 文件 ios/Flutter/Flutter.framework/Flutter 为 351.71 MB;这超出了 GitHub 的文件大小限制 100.00 MB