首页 > 解决方案 > 加入两个大卷的 PCollection 有性能问题

问题描述

使用 CoGroupsByKey 方法加入两个 Pcollection,需要数小时才能执行 8+ 百万条记录。从另一个 stackoverflow 帖子中注意到CoGbkResult 有超过 10000 个元素,需要重复(可能很慢) “CoGbkResult 有超过 10000 个元素,需要重复(可能很慢)”。

使用此方法提高此性能的任何建议。

这是代码片段,

PCollection<TableRow> pc1 = ...;
PCollection<TableRow> pc2 = ...;

WithKeys<String, TableRow> withKeyValue = 
  WithKeys.of((TableRow row) -> String.format("%s",row.get("KEYNAME")))
          .withKeyType(TypeDescriptors.strings());

PCollection<KV<String,TableRow>> keyed_pc1 =
  pc1.apply("WithKeys", withKeyValue );

PCollection<KV<String,TableRow>> keyed_pc2 = 
  pc2.apply("WithKeys", withKeyValue );

// (org.apache.beam.sdk.extensions.joinlibrary.Join class)
PCollection<KV<String,KV<TableRow,TableRow>>> joinedCollection = 
  Join.innerJoin(keyed_pc1, keyed_pc2); 

标签: javagoogle-cloud-dataflowapache-beam

解决方案


Apache Beam 规范没有定义连接的执行,除了 SDK 之外,没有其他更快的方法可以自己编写内部连接。因此,这个问题的答案取决于执行连接的对象,即哪个运行器。我不知道 Flink 或 Spark 跑步者,所以这个答案将特定于 Dataflow 跑步者。

如果您还没有,请查看有关此主题的博客文章。在博文中,它描述了可以手动启用的 Dataflow Shuffle Service。此服务是比当前默认更好的实现,通常可以更快地执行,尤其是对于连接。

要启用 Dataflow Shuffle 服务,请传入以下标志

--experiments=shuffle_mode=service
--region=<allowed region>

其中允许洗牌的区域是:“us-central1”、“europe-west1”、“europe-west4”、“asia-northeast1”。


推荐阅读