首页 > 解决方案 > 结合 groupByKey 和未绑定的流

问题描述

管道是:

  1. PCollections<String> readTopic = PubSubIO.readString() ...
  2. PCollection<String> windowSession = readTopic.apply(Window.<String>into(Sessions
                                .withGapDuration(Duration.standardHours(1))));
  3.PCollection<KV<post, user> KVparsedPosts= windowSession.apply(ParDo.of(new 
     ParseEventFn());
  4. PCollection<KV<post, Iterable<user>> iterableKV= 
                        KVparsedPosts.apply(GroupByKey.create())
  5.PCollectionList<KV<post, Iterable<user>>listPosts = 
        iterableKV.apply(Combine.globally(new CombinePosts()).withoutDefaults())
  6.listPosts.apply(ParDo.of(new writePosts())

输入是一条消息,它代表一个用户,其中包含与该用户相关的一系列帖子,帖子不是唯一的(同一个帖子可能与许多用户相关)我的问题是第 5 步组合,我不确定如何组合 KV<。 ..> 列出以便writePost将写入数据库批次的帖子而不仅仅是一个帖子,

示例:假设我们有用户:A、B、C 和帖子:1、2、3、4,并且有我们从 PubSub 读取的消息:

{user:a, posts:[1,2,4]}
{user:b, posts:[2,3]}
{user:c, posts:[2,4]}

我想这样组合:

{post:1, user:[a]}
{post:2, user:[a,b,c]}
{post:3, user:[b]}
{post:4, user:[a,c]}

标签: google-cloud-dataflowapache-beamgoogle-cloud-pubsub

解决方案


如果我正确理解了您的问题,那么您似乎已经知道如何修改您的元素以获得示例中的预期结果,而您唯一剩下的问题是如何将多个帖子一起批处理,对吗?

我将为您的示例(与您现有的代码相匹配)以及多个帖子的批量处理建议的方法。

第 1 步:从自定义 DoFn 创建 ParDo,将用户和帖子列表反转为 <post, user> 的 KV。例如:

{user:a, posts:[1,2,4]} -> {post:1, user:a}, {post:2, user:a}, {post:4, user:a}

你似乎已经在这条线上这样做了:3. PCollection<KV<post, user> KVparsedPosts= windowSession.apply(ParDo.of(new ParseEventFn());

第 2 步:执行 GroupByKey,将帖子作为 Key。例如:

{post:2, user:a}, {post:2, user:b}, {post:2, user:c} -> {post:2, user:[a,b,c]}

你似乎在这条线上这样做:4. PCollection<KV<post, Iterable<user>> iterableKV= KVparsedPosts.apply(GroupByKey.create())

第 3 步(可选):使用上面的代码,您可以一次将一篇文章写入数据库。如果要批量处理多个帖子,则需要将多个帖子分组,这可以通过GroupIntoBatches转换来完成。为了使用此转换,您需要为所有元素分配一个占位符键来批处理它们,这可以使用WithKeys完成。例如:

PCollection<KV<post, Iterable<user>> iterableKV =
        KVparsedPosts.apply(GroupByKey.create());

// Apply a key of 0 to every element.
PCollection<KV<Integer, KV<post, Iterable<user>>>> keyedPosts =
        iterableKV.apply(WithKeys.of(0));

// Group all elements into batches of 5 posts.
PCollection<KV<Integer, Iterable<KV<post, Iterable<user>>>>> batchedPosts =
        keyedPosts.apply(GroupIntoBatches.<Integer, KV<post, Iterable<user>>>ofSize(5));

批处理后,可以删除占位符键,并可以完成任何后续工作。


推荐阅读