google-cloud-dataflow - 使用 google-cloud-dataflow beam.io.avroio.WriteToAvro(
问题描述
使用 google-cloud-dataflow/Cloud Composer for CSV to Avro,一切都在我的本地环境中运行。尝试从 Cloud Storage 存储桶读取包含 Avro 架构的 .avsc 文件时,我不断收到:IOError: [Errno 2] No such file or directory:'gs://my-bucket/xxx.avsc'
代码:
from __future__ import absolute_import
import argparse
import logging
import ntpath
import avro.schema
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import SetupOptions
from datetime import datetime
class RowTransformer(object):
def __init__(self, delimiter, header, filename):
self.delimiter = delimiter
self.keys = re.split(',', header)
self.filename = filename
def parse(self, row):
self.load_dt = datetime.utcnow()
split_row = row.split(self.delimiter)
#Need to cast anything that is not a string into proper type
split_row[8] = float('0' if not split_row[8] else split_row[8])
split_row[9] = float('0' if not split_row[9] else split_row[9])
split_row[10] = float('0' if not split_row[10] else split_row[10])
split_row[11] = float('0' if not split_row[11] else split_row[11])
split_row[12] = float('0' if not split_row[12] else split_row[12])
split_row[13] = float('0' if not split_row[13] else split_row[13])
split_row[14] = float('0' if not split_row[14] else split_row[14])
split_row[15] = float('0' if not split_row[15] else split_row[15])
split_row[16] = float('0' if not split_row[16] else split_row[16])
split_row[17] = float('0' if not split_row[17] else split_row[17])
split_row[18] = str('0' if not split_row[18] else split_row[18])
split_row[19] = str('0' if not split_row[19] else split_row[19])
split_row.append(self.filename)
split_row.append(self.load_dt.strftime('%Y-%m-%d %H:%M:%S.%f'))
decode_row = [i.decode('UTF-8') if isinstance(i, basestring) else i for i in split_row]
row = dict(zip(self.keys, decode_row))
return row
def run(argv=None):
"""The main function which creates the pipeline and runs it."""
parser = argparse.ArgumentParser()
parser.add_argument('--input', dest='input', required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
default='gs://my-bucket/receive/xxx.txt')
parser.add_argument('--output', dest='output', required=False,
help='Output Avro to Cloud Storage',
default='gs://my-bucket/')
parser.add_argument('--schema', dest='schema', required=False,
help='Avro Schema',
default='gs://my-bucket/xxx.avsc')
parser.add_argument('--delimiter', dest='delimiter', required=False,
help='Delimiter to split input records.',
default='|')
parser.add_argument('--fields', dest='fields', required=False,
help='list of field names expected',
default='Col1,Col2...etc')
known_args, pipeline_args = parser.parse_known_args(argv)
row_transformer = RowTransformer(delimiter=known_args.delimiter,
header=known_args.fields,
filename=ntpath.basename(known_args.input))
p_opts = pipeline_options.PipelineOptions(pipeline_args)
with beam.Pipeline(options=p_opts) as pipeline:
schema_file = avro.schema.parse(open(known_args.schema, "rb").read())
rows = pipeline | "Read from text file" >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
dict_records = rows | "Convert to Avro" >> beam.Map(lambda r: row_transformer.parse(r))
dict_records | "Write to Cloud Storage as Avro" >> beam.io.avroio.WriteToAvro(known_args.output,schema=schema_file)
run()
解决方案
您需要使用apache_beam.io.gcp.gcsio
该类而不是beam.io.ReadFromText
仅读取本地文件,https://beam.apache.org/documentation/sdks/pydoc/2.6.0/apache_beam.io.gcp.gcsio.html
推荐阅读
- laravel - VueJs 不适用于 Laravel 5.6
- iphone - 使用 AVFoundation 在 Iphone 照片捕获中图像捕获比闪存慢
- mysql - 执行 CMD 命令会导致 Mariadb 容器停止
- javascript - javascript在textarea中获取选定的文本
- java - java.lang.IllegalStateException:驱动程序可执行文件不存在:尝试通过 Selenium、ChromeDriver 和 Chrome 执行测试时
- javascript - 操作 Promise 返回的对象和数组
- linux - 在 bash 脚本中,count1 = `cat $affected_ip|wc -l`;echo $count1;
- yii2 - Yii2 : 设置访问控制匹配回调
- visual-studio - 为什么这个基数排序 CUDA 代码只对 32 个元素进行排序?
- command - 如何从管理面板运行所有 magento 2 命令