首页 > 解决方案 > 为 WritetoFiles 设置文件名

问题描述

我的流程将文件存储在磁盘上,我需要设置文件名,以便找回内容。

默认命名是窗口时间戳和计数器,这对我没有帮助。文档对我来说不够清楚。(https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.fileio.html?highlight=default_file_naming

fileio.WriteToFiles(archive_storage, file_naming=beam.io.fileio.destination_prefix_naming())

我想命名<HASH>.jsonHASH 是文件内数据的文件。

标签: python-3.xgoogle-cloud-dataflowapache-beam

解决方案


由于这个例子,我能够得到一个工作片段。在这种情况下,我们将根据它们的散列为每条记录指定不同destination的值,因为我们希望将每个元素写入不同的文件。此外,我们将传递我们的自定义命名函数,称为hash_naming

data = [{'id': 0, 'message': 'hello'},
        {'id': 1, 'message': 'world'}]

(p
  | 'Create Events' >> beam.Create(data) \
  | 'JSONify' >> beam.Map(json.dumps) \
  | 'Print Hashes' >> beam.ParDo(PrintHashFn()) \
  | 'Write Files' >> fileio.WriteToFiles(
      path='./output',
      destination=lambda record: hash(record),
      sink=lambda dest: JsonSink(),
      file_naming=hash_naming))

我们PrintHashFn将使用每个哈希记录每个元素:

logging.info("Element: %s with hash %s", element, hash(element))

这样,对于我们的数据,我们将得到:

INFO:root:Element: {"message": "hello", "id": 0} with hash -1885604661473532601
INFO:root:Element: {"message": "world", "id": 1} with hash 9144125507731048840

可能有更好的方法,但我发现调用fileio.destination_prefix_naming()(*args)我们可以-1885604661473532601从默认命名方案( )中检索目标( -1885604661473532601----00000-00001):

def hash_naming(*args):
  file_name = fileio.destination_prefix_naming()(*args)  # -1885604661473532601----00000-00001
  destination = file_name.split('----')[0]  # -1885604661473532601
  return '{}.json'.format(destination)  # -1885604661473532601.json

请注意,如果您在混合中添加窗口,则获取子字符串的拆分可能会有所不同。

使用 2.16.0 SDK 运行脚本,DirectRunner我得到以下输出:

$ ls output/
-1885604661473532601.json  9144125507731048840.json
$ cat output/-1885604661473532601.json 
"{\"message\": \"hello\", \"id\": 0}"

在这里更新了完整的代码。


推荐阅读