首页 > 解决方案 > 无法在Beam python中将csv文件拆分为多个csv文件

问题描述

我正在尝试根据列 value 将 csv 文件拆分为多个 csv 文件。但现在我正在使用下面的代码,但我能够根据过滤器值创建文件,但文件有空记录。你能请任何人帮我解决这个问题吗?

示例输入数据: 在此处输入图像描述 示例输出文件: 在此处输入图像描述 在此处 输入图像描述

请找到我遵循的步骤

  1. 在python3中将csv文件读取为dict。
  2. 从 dict 中提取键并将键的值转换为列表。
  3. 遍历列表并根据列表值过滤字典
  4. 写入单独的文件

`

import apache_beam as beam
import csv
import uuid
from apache_beam.options.pipeline_options import PipelineOptions
dict_reader = csv.DictReader(open(input_file,'r'))
insert_date_lst = []
for i in dict_reader:
    insert_date_lst.append(i.get("key"))

class TagData(beam.DoFn):
    def process(self, element):
        key = element.get('key')
        yield TaggedOutput(key, element)

with beam.Pipeline(options=PipelineOptions()) as p:

  data = p | "dict_read" >> beam.Create(dict_reader)
  for i in list(dict.fromkeys(insert_date_lst)):
      filter_data = data | "filter"+i >> beam.Filter(lambda x : x['key']==i)
   processed_tagged_log = filter_data | "tagged-data-by-key " >> beam.ParDo(TagData()).with_outputs(
       *list(dict.fromkeys(insert_date_lst)))
   for i in list(dict.fromkeys(insert_date_lst)):
      processed_tagged_log[i] | "save file %s" % uuid.uuid4() >> beam.io.WriteToText(output_file + i ,num_shards=0,shard_name_template="",file_name_suffix='.csv')                                                                                                                                                                                                                                              

标签: pythonapache-beam

解决方案


推荐阅读