python - 如何使用 Apache Beam Python 将输出写入动态路径
问题描述
我对 apache 梁很陌生。我的情况如下所示,
我有多个 json 格式的事件。在每个事件中,event_time 列表示该事件的创建时间,我正在使用 event_time 计算它们的创建日期。我想将这些事件分别写在他们的日期分区下。我的代码就像
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.pvalue import TaggedOutput
import json
import time
class EventFormatter(beam.DoFn):
def process(self, element, *args, **kwargs):
tmp_dict = {}
for i in range(len(element['properties'])):
tmp_dict['messageid'] = element['messageid']
tmp_dict['userid'] = element['userid']
tmp_dict['event_time'] = element['event_time']
tmp_dict['productid'] = element['properties'][i]['productid']
yield tmp_dict
class DateParser(beam.DoFn):
def process(self, element, *args, **kwargs):
key = time.strftime('%Y-%m-%d', time.localtime(element.get('event_time')))
print(key, element)
yield TaggedOutput(time.strftime('%Y-%m-%d', time.localtime(element.get('event_time'))), element)
with beam.Pipeline() as pipeline:
events = (
pipeline
| 'Sample Events' >> beam.Create([
{"messageid": "6b1291ea-e50d-425b-9940-44c2aff089c1", "userid": "user-78", "event_time": 1598516997, "properties": [{"productid": "product-173"}]},
{"messageid": "b8b14eb3-8e39-42a3-9528-a323b10a7686", "userid": "user-74", "event_time": 1598346837, "properties": [{"productid": "product-143"},{"productid": "product-144"}]}
])
| beam.ParDo(EventFormatter())
| beam.ParDo(DateParser())
)
output = events | "Parse Date" >> WriteToText('/Users/oguz.aydin/Desktop/event_folder/date={}/'.format(....))
我无法找到我应该如何完成格式块。当我运行代码打印结果时,它给出了
('2020-08-27', {'productid': 'product-173', 'userid': 'user-78', 'event_time': 1598516997, 'messageid': '6b1291ea-e50d-425b-9940-44c2aff089c1'})
('2020-08-25', {'productid': 'product-143', 'userid': 'user-74', 'event_time': 1598346837, 'messageid': 'b8b14eb3-8e39-42a3-9528-a323b10a7686'})
('2020-08-25', {'productid': 'product-144', 'userid': 'user-74', 'event_time': 1598346837, 'messageid': 'b8b14eb3-8e39-42a3-9528-a323b10a7686'})
例如。我想在 date=2020-08-25 文件夹下写 2 个事件,另外一个 date=2020-08-27。
在一天结束时,我想在他们的创建日期文件夹下写下每个事件。
我怎样才能做到这一点?
谢谢你的帮助,
奥古兹。
解决方案
在您的代码中,您正在使用多个输出。这意味着将 DoFn(ParDo)的输出连接到另一个 DoFn,这对于整个管道是静态的。
如果你想根据你拥有的内容将数据转储到不同的文件中,你必须实现一个仅用于写入的 DoFn。
像这样的东西:
class WriteByKey(apache_beam.DoFn):
def process(self, kv):
key, value = kv
with beam.io.gcp.gcsio.GcsIO().open(f'gs://bucket/path/{key}.extension', 'a') as fp:
fp.write(value)
您应该更改 DataParser DoFn 以生成元组 (date, value) 而不是 TaggedOut,并将管道更改为如下所示:
with beam.Pipeline() as pipeline:
events = (
pipeline
| 'Sample Events' >> beam.Create([
{"messageid": "6b1291ea-e50d-425b-9940-44c2aff089c1", "userid": "user-78", "event_time": 1598516997, "properties": [{"productid": "product-173"}]},
{"messageid": "b8b14eb3-8e39-42a3-9528-a323b10a7686", "userid": "user-74", "event_time": 1598346837, "properties": [{"productid": "product-143"},{"productid": "product-144"}]}
])
| beam.ParDo(EventFormatter())
| beam.ParDo(DateParser()) | beam.ParDo(WriteByKey())
)
推荐阅读
- python - 多次调用 compute() 时出现 Dask 错误
- javascript - 有条件的问题决定何时激活传递给组件的每个属性的功能
- google-chrome - 无法提交内部错误 - Chrome 商店扩展提交
- javascript - 使用循环从数组中删除第一个元素,但在 Javascript 中存储“未定义”的最后一个元素
- reactjs - 将环境变量添加到 React 项目
- amazon-web-services - Kinesis Data Firehose 源“Direct PUT”与“Kinesis Data Stream”
- java - 如果配置文件设置不更改为 application-default.properties,如何忽略 application.properties 值
- r - 如何获取多个链接的文本
- video - 在m210 v2中,根据usb模式收不到视频
- laravel - 将数组传递给 Laravel Blade 指令