google-cloud-dataflow - 结合 groupByKey 和未绑定的流
问题描述
管道是:
1. PCollections<String> readTopic = PubSubIO.readString() ...
2. PCollection<String> windowSession = readTopic.apply(Window.<String>into(Sessions
.withGapDuration(Duration.standardHours(1))));
3.PCollection<KV<post, user> KVparsedPosts= windowSession.apply(ParDo.of(new
ParseEventFn());
4. PCollection<KV<post, Iterable<user>> iterableKV=
KVparsedPosts.apply(GroupByKey.create())
5.PCollectionList<KV<post, Iterable<user>>listPosts =
iterableKV.apply(Combine.globally(new CombinePosts()).withoutDefaults())
6.listPosts.apply(ParDo.of(new writePosts())
输入是一条消息,它代表一个用户,其中包含与该用户相关的一系列帖子,帖子不是唯一的(同一个帖子可能与许多用户相关)我的问题是第 5 步组合,我不确定如何组合 KV<。 ..> 列出以便writePost
将写入数据库批次的帖子而不仅仅是一个帖子,
示例:假设我们有用户:A、B、C 和帖子:1、2、3、4,并且有我们从 PubSub 读取的消息:
{user:a, posts:[1,2,4]}
{user:b, posts:[2,3]}
{user:c, posts:[2,4]}
我想这样组合:
{post:1, user:[a]}
{post:2, user:[a,b,c]}
{post:3, user:[b]}
{post:4, user:[a,c]}
解决方案
如果我正确理解了您的问题,那么您似乎已经知道如何修改您的元素以获得示例中的预期结果,而您唯一剩下的问题是如何将多个帖子一起批处理,对吗?
我将为您的示例(与您现有的代码相匹配)以及多个帖子的批量处理建议的方法。
第 1 步:从自定义 DoFn 创建 ParDo,将用户和帖子列表反转为 <post, user> 的 KV。例如:
{user:a, posts:[1,2,4]} -> {post:1, user:a}, {post:2, user:a}, {post:4, user:a}
你似乎已经在这条线上这样做了:3. PCollection<KV<post, user> KVparsedPosts= windowSession.apply(ParDo.of(new ParseEventFn());
第 2 步:执行 GroupByKey,将帖子作为 Key。例如:
{post:2, user:a}, {post:2, user:b}, {post:2, user:c} -> {post:2, user:[a,b,c]}
你似乎在这条线上这样做:4. PCollection<KV<post, Iterable<user>> iterableKV= KVparsedPosts.apply(GroupByKey.create())
第 3 步(可选):使用上面的代码,您可以一次将一篇文章写入数据库。如果要批量处理多个帖子,则需要将多个帖子分组,这可以通过GroupIntoBatches转换来完成。为了使用此转换,您需要为所有元素分配一个占位符键来批处理它们,这可以使用WithKeys完成。例如:
PCollection<KV<post, Iterable<user>> iterableKV =
KVparsedPosts.apply(GroupByKey.create());
// Apply a key of 0 to every element.
PCollection<KV<Integer, KV<post, Iterable<user>>>> keyedPosts =
iterableKV.apply(WithKeys.of(0));
// Group all elements into batches of 5 posts.
PCollection<KV<Integer, Iterable<KV<post, Iterable<user>>>>> batchedPosts =
keyedPosts.apply(GroupIntoBatches.<Integer, KV<post, Iterable<user>>>ofSize(5));
批处理后,可以删除占位符键,并可以完成任何后续工作。
推荐阅读
- search - 排名上的 SharePoint 搜索
- javascript - React Native / Javascript - 获取导入模块的名称
- mongodb - MongoDB - JSON 模式验证
- python - Bonobo ETL _name 参数被 __call__ 方法覆盖
- grafana - 初始加载时出现 Grafana 错误:无法找到应用程序文件错误
- vb.net - 将图像加载到图片框时,Hw t 对图像进行固定缩放
- amazon-web-services - 在 Firebase、AWS 等后端服务器上接收电子邮件、解析内容和验证
- python - 如果任何附近的像素更亮,OpenCV(Python)擦除一个像素
- node.js - 通过第二个自己的 ip 地址服务器向目标 api 发送请求
- python - 用于图像分割评估的骰子系数