python - 读取 csv 文件,清理它,然后使用 Apache Beam 数据流将结果写成 csv
问题描述
我想读取一个 csv 文件,清理它,然后使用 Apache Beam 数据流将结果写成 csv。目的是使文件可加载到 BigQuery 中。清理规则是简单地用双引号转义双引号。我的清洁规则有效。我无法将其合并到管道中。我正在寻求有关我的清洁功能应该返回什么以及如何通过管道调用它的建议。
import apache_beam as beam
import csv
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText
lines = p | ReadFromText(file_pattern="gs://dev/clean_input/input01.csv")
def parse_method(line):
CSV_PARSING_KWARGS = {
'doublequote': True,
'escapechar': '\\',
'quotechar': '"',
'delimiter': ','
}
reader = csv.reader(csv_file, CSV_PARSING_KWARGS)
for rec in reader:
cw = csv.writer(out_file, escapechar='"', quoting=csv.QUOTE_MINIMAL)
cw.writerow(rec)
return rec
def run(region, project, bucket, temploc ):
argv = [
# Passed in args
'--region={}'.format(region),
'--project={}'.format(project),
'--temp_location={}'.format(temploc),
# Constructs
'--staging_location=gs://{}/clean_input/stg/'.format(bucket),
# Mandatory constants
'--job_name=cleammycsv',
'--runner=DataflowRunner'
]
options = PipelineOptions(
flags=argv
)
pipeline = beam.Pipeline(options=options)
clean_csv = (pipeline
lines = lines| 'Read' >> beam.Map(parse_method)
line = lines | 'Output to file' >> WriteToText(file_pattern="gs://dev/clean_output/output_file.csv")
)
pipeline.run()
if __name__ == '__main__':
import argparse
# Create the parser
parser = argparse.ArgumentParser(description='Run the CSV cleaning pipeline')
parser.add_argument('-r','--region', help='Region ID where data flow job to run', default='australia-southeast1')
parser.add_argument('-p','--project', help='Unique project ID', required=True)
parser.add_argument('-b','--bucket', help='Bucket name', required=True)
parser.add_argument('-t','--temploc', help='Bucket name and folder', required=True)
# Execute the parse_args() method
args = vars(parser.parse_args())
run(project=args['project'], bucket=args['bucket'], region=args['region'],temploc=args['temploc'])
解决方案
我终于得到了可以完成这项工作的工作。
import apache_beam as beam
import csv
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText
def parse_file(element):
for line in csv.reader([element], quotechar='"', delimiter=',', quoting=csv.QUOTE_ALL):
line = [s.replace('\"', '') for s in line]
clean_line = '","'.join(line)
final_line = '"'+ clean_line +'"'
return final_line
def run(region, project, bucket, temploc ):
argv = [
# Passed in args
'--region={}'.format(region),
'--project={}'.format(project),
'--temp_location={}'.format(temploc),
# Constructs
'--staging_location=gs://{}/clean_input/stg/'.format(bucket),
# Mandatory constants
'--job_name=cleammycsv',
'--runner=DataflowRunner'
]
filename_in = 'gs://{}/clean_input/IN_FILE.csv'.format(bucket)
files_output = 'gs://{}/clean_output/OUT_FILE.csv'.format(bucket)
options = PipelineOptions(
flags=argv
)
pipeline = beam.Pipeline(options=options)
clean_csv = (pipeline
| 'Read input file' >> beam.io.ReadFromText(filename_in)
| 'Parse file' >> beam.Map(parse_file)
| 'writecsv' >> beam.io.WriteToText(files_output,num_shards=10)
)
pipeline.run()
if __name__ == '__main__':
import argparse
# Create the parser
parser = argparse.ArgumentParser(description='Run the CSV cleaning pipeline')
parser.add_argument('-r','--region', help='Region ID where data flow job to run', required=True)
parser.add_argument('-p','--project', help='Unique project ID', required=True)
parser.add_argument('-b','--bucket', help='Bucket name', required=True)
parser.add_argument('-t','--temploc', help='Bucket name and folder', required=True)
# Execute the parse_args() method
args = vars(parser.parse_args())
run(project=args['project'], bucket=args['bucket'], region=args['region'],temploc=args['temploc'])
推荐阅读
- javascript - 使用javascript在数组中已经存在值时如何显示警报消息
- sql - 函数调用转换的本机查询问题
- ajax - 在jsp中的ajax调用中以JSON形式接收响应
- mysql - Mysql 如何使用 UTF8 字符集将列添加为 varchar(21884)?
- javascript - 页面加载时从浏览器打开 android 应用
- sql - 如何根据指示重复的列创建重复记录
- scala - 迭代数据框并在 Spark SQL 语句中使用这些值
- javascript - jquery select html 不包含选择标签
- angular - 在同一文件中的另一个组件中使用一个组件的变量
- javascript - 像翻页一样反应滚动或滚动一次一定量