python-3.x - 将列表转换为 PCollection
问题描述
我目前有一个DoFn
查看存储桶并查看该存储桶和 dir 前缀中的所有文件。这DoFn
将返回一个列表而不是一个PCollection
. 我如何将此列表转换为PCollection
可以由 使用的DoFn
ConvertFileNames
?
# List all the files within a subdir
class ListBlobs(beam.DoFn):
def start_bundle(self):
self.storage_client = storage.Client()
def process(self, prefix):
bucket = self.storage_client.bucket('xxx')
return list(self.bucket.list_blobs(prefix=prefix))
# Convert Blobs into filenames as patterns
class ConvertFileNames(beam.DoFn):
def process(self, blob):
return 'gs://' + blob.bucket.name + blob.name
解决方案
如梁文档中所述,Beam DoFn 的处理方法返回一个可迭代的元素以放置到下游 PCollection 中。所以,在你的例子中,如果我有一个前缀的 PCollection,调用它prefix_pcoll
,那么我可以写
blobs_pcoll = prefix_pcoll | beam.ParDo(ListBlobs())
并将blobs_pcoll
包含具有此前缀的 blob 列表(即list(self.bucket.list_blobs(prefix=prefix))
所有前缀的连接)。然后你可以写
converted = blobs_pcoll | beam.ParDo(ConvertFileNames())
你也可以写
converted = blobs_pcoll | beam.Map(
lambda blob: 'gs://' + blob.bucket.name + blob.name)
您可能还想查看apache_beam.io.fileio.MatchAll。
推荐阅读
- html - 在最小宽度无响应背景中居中徽标
- java - 将现有 gradle 项目导入 IntelliJ IDEA 时出错
- c# - 使用 Caliburn.Micro 在 MVVM WPF 中绑定
- php - Laravel Artisan 命令 - “找不到类”
- reactjs - 如何为 React Gutenberg 属性分配 iframe url 查询参数值
- javascript - 使用 HtmlWebackPlugin 时如何在里面添加某些脚本标签和标签
- css - CSS。定位和溢出
- drupal-8 - 如何在 Drupal 8 中创建和保存用户配置文件字段?
- python - DRF - `write_only=True` 显示在响应模式中
- mysql - SQL 查询为多个 AND 条件返回 0 行