python - 无法在Beam python中将csv文件拆分为多个csv文件
问题描述
我正在尝试根据列 value 将 csv 文件拆分为多个 csv 文件。但现在我正在使用下面的代码,但我能够根据过滤器值创建文件,但文件有空记录。你能请任何人帮我解决这个问题吗?
示例输入数据: 在此处输入图像描述 示例输出文件: 在此处输入图像描述 在此处 输入图像描述
请找到我遵循的步骤
- 在python3中将csv文件读取为dict。
- 从 dict 中提取键并将键的值转换为列表。
- 遍历列表并根据列表值过滤字典
- 写入单独的文件
`
import apache_beam as beam
import csv
import uuid
from apache_beam.options.pipeline_options import PipelineOptions
dict_reader = csv.DictReader(open(input_file,'r'))
insert_date_lst = []
for i in dict_reader:
insert_date_lst.append(i.get("key"))
class TagData(beam.DoFn):
def process(self, element):
key = element.get('key')
yield TaggedOutput(key, element)
with beam.Pipeline(options=PipelineOptions()) as p:
data = p | "dict_read" >> beam.Create(dict_reader)
for i in list(dict.fromkeys(insert_date_lst)):
filter_data = data | "filter"+i >> beam.Filter(lambda x : x['key']==i)
processed_tagged_log = filter_data | "tagged-data-by-key " >> beam.ParDo(TagData()).with_outputs(
*list(dict.fromkeys(insert_date_lst)))
for i in list(dict.fromkeys(insert_date_lst)):
processed_tagged_log[i] | "save file %s" % uuid.uuid4() >> beam.io.WriteToText(output_file + i ,num_shards=0,shard_name_template="",file_name_suffix='.csv')
解决方案
推荐阅读
- javascript - 用 Lodash 分组
- javascript - Marker Clusterer 不是构造函数
- sql - 在 Oracle 中将对象存储到 SQL 数据库中
- python - python中的SQL查询
- gremlin - Cosmos DB:图形 - 查询所有嵌套的顶点和边
- ios - 在表格视图单元格中放置多个项目时遇到问题
- database - Amazon DocumentDB 无法连接并出现错误“SSL 对等证书验证失败”
- javascript - React JS纸牌游戏,计数状态值
- android - 调用相机应用程序(权限拒绝:启动 Intent )
- excel - While 函数 - 根据变量迭代地添加天/月,占每个月的最大天数