google-cloud-dataflow - 数据流:如何从另一个管道喷出的现有 PCollection 创建管道
问题描述
我正在尝试将我的管道拆分为许多较小的管道,以便它们执行得更快。我正在对谷歌云存储 blob (PCollection) 的 PCollection 进行分区,以便得到一个
PCollectionList<Blob> collectionList
从那里我希望能够做到:
Pipeline p2 = Pipeline.create(collectionList.get(0));
.apply(stuff)
.apply(stuff)
Pipeline p3 = Pipeline.create(collectionList.get(1));
.apply(stuff)
.apply(stuff)
但是我还没有找到任何关于从已经存在的 PCollection 创建初始 PCollection 的文档,如果有人能指出正确的方向,我将不胜感激。谢谢!
解决方案
您应该研究将Partition
PCollection 拆分为 N 个较小的转换。您可以提供 PartitionFn 来定义拆分的完成方式。您可以在Beam 编程指南中找到以下示例:
// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the partitioning function.
// In this example, we define the PartitionFn in-line.
// Returns a PCollectionList containing each of the resulting partitions as individual PCollection objects.
PCollection<Student> students = ...;
// Split students up into 10 partitions, by percentile:
PCollectionList<Student> studentsByPercentile =
students.apply(Partition.of(10, new PartitionFn<Student>() {
public int partitionFor(Student student, int numPartitions) {
return student.getPercentile() // 0..99
* numPartitions / 100;
}}));
// You can extract each partition from the PCollectionList using the get method, as follows:
PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
推荐阅读
- mongodb - MongoDB:为什么使用 {field_name:" 查找查询
"} 不同于 {field_name: ^ $}? - swift - 如何知道 SKAction.repeat(action, count) 何时完成
- cordova - 带有 cordova 和 ionic2 的 ar.js - 视频未显示,但 3d AR 对象显示
- r - R plotly极坐标图中刻度标签的注释/文本
- c# - JWT 验证在 .NET Core 中有效,在 .NET Framework 中失败
- tcsh - TCSH - most compact sytax for checking if value in an array/list?
- google-bigquery - 支持使用 KNIME 的 Google BigQuery JDBC 驱动程序
- html - div 消失,移除高度限制
- applescript - 如何处理从 AppleScript 中的结果返回的列表?
- sql - 如何在将 EntityManager 与 Hibernate 一起使用时使用 IN 子句