python-3.x - 为 WritetoFiles 设置文件名
问题描述
我的流程将文件存储在磁盘上,我需要设置文件名,以便找回内容。
默认命名是窗口时间戳和计数器,这对我没有帮助。文档对我来说不够清楚。(https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.fileio.html?highlight=default_file_naming)
fileio.WriteToFiles(archive_storage, file_naming=beam.io.fileio.destination_prefix_naming())
我想命名<HASH>.json
HASH 是文件内数据的文件。
解决方案
由于这个例子,我能够得到一个工作片段。在这种情况下,我们将根据它们的散列为每条记录指定不同destination
的值,因为我们希望将每个元素写入不同的文件。此外,我们将传递我们的自定义命名函数,称为hash_naming
:
data = [{'id': 0, 'message': 'hello'},
{'id': 1, 'message': 'world'}]
(p
| 'Create Events' >> beam.Create(data) \
| 'JSONify' >> beam.Map(json.dumps) \
| 'Print Hashes' >> beam.ParDo(PrintHashFn()) \
| 'Write Files' >> fileio.WriteToFiles(
path='./output',
destination=lambda record: hash(record),
sink=lambda dest: JsonSink(),
file_naming=hash_naming))
我们PrintHashFn
将使用每个哈希记录每个元素:
logging.info("Element: %s with hash %s", element, hash(element))
这样,对于我们的数据,我们将得到:
INFO:root:Element: {"message": "hello", "id": 0} with hash -1885604661473532601
INFO:root:Element: {"message": "world", "id": 1} with hash 9144125507731048840
可能有更好的方法,但我发现调用fileio.destination_prefix_naming()(*args)
我们可以-1885604661473532601
从默认命名方案( )中检索目标( -1885604661473532601----00000-00001
):
def hash_naming(*args):
file_name = fileio.destination_prefix_naming()(*args) # -1885604661473532601----00000-00001
destination = file_name.split('----')[0] # -1885604661473532601
return '{}.json'.format(destination) # -1885604661473532601.json
请注意,如果您在混合中添加窗口,则获取子字符串的拆分可能会有所不同。
使用 2.16.0 SDK 运行脚本,DirectRunner
我得到以下输出:
$ ls output/
-1885604661473532601.json 9144125507731048840.json
$ cat output/-1885604661473532601.json
"{\"message\": \"hello\", \"id\": 0}"
在这里更新了完整的代码。
推荐阅读
- sql - sql server 连接或替换,哪个更好(更快)
- python - 无法使用 Pycharm 创建 Conda 环境
- java - 如果 Connection 稳定,则使用 Android WorkManager 运行函数
- java - 构建包含另一个模块的模块时,不存在带有包的 Maven 构建失败
- javascript - React - 父组件更改时更新子组件中的状态
- asp.net - “无法在对象中插入重复的键行”与“违反 UNIQUE KEY 约束”之间的区别
- date - 如何在 TWIG 中显示“今天”而不是今天的日期?
- flutter - 如何使用 dartio HttpClient 从 http 标头获取最后修改
- json - 正确访问特定练习的方法 可汗学院
- python - 如何读取 csv 标头