首页 > 解决方案 > 使用 google-cloud-dataflow beam.io.avroio.WriteToAvro(

问题描述

使用 google-cloud-dataflow/Cloud Composer for CSV to Avro,一切都在我的本地环境中运行。尝试从 Cloud Storage 存储桶读取包含 Avro 架构的 .avsc 文件时,我不断收到:IOError: [Errno 2] No such file or directory:'gs://my-bucket/xxx.avsc'

代码:

from __future__ import absolute_import
import argparse
import logging
import ntpath
import avro.schema
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import SetupOptions
from datetime import datetime


class RowTransformer(object):
    def __init__(self, delimiter, header, filename):
        self.delimiter = delimiter
        self.keys = re.split(',', header)
        self.filename = filename

    def parse(self, row):
        self.load_dt = datetime.utcnow()
        split_row = row.split(self.delimiter)
        #Need to cast anything that is not a string into proper type
        split_row[8] = float('0' if not split_row[8] else split_row[8])  
        split_row[9] = float('0' if not split_row[9] else split_row[9])  
        split_row[10] = float('0' if not split_row[10] else split_row[10]) 
        split_row[11] = float('0' if not split_row[11] else split_row[11]) 
        split_row[12] = float('0' if not split_row[12] else split_row[12])
        split_row[13] = float('0' if not split_row[13] else split_row[13]) 
        split_row[14] = float('0' if not split_row[14] else split_row[14]) 
        split_row[15] = float('0' if not split_row[15] else split_row[15]) 
        split_row[16] = float('0' if not split_row[16] else split_row[16]) 
        split_row[17] = float('0' if not split_row[17] else split_row[17]) 
        split_row[18] = str('0' if not split_row[18] else split_row[18])   
        split_row[19] = str('0' if not split_row[19] else split_row[19])  
        split_row.append(self.filename)
        split_row.append(self.load_dt.strftime('%Y-%m-%d %H:%M:%S.%f')) 
        decode_row = [i.decode('UTF-8') if isinstance(i, basestring) else i for i in split_row]
        row = dict(zip(self.keys, decode_row))
        return row

def run(argv=None):
    """The main function which creates the pipeline and runs it."""

    parser = argparse.ArgumentParser()
    parser.add_argument('--input', dest='input', required=False,
        help='Input file to read.  This can be a local file or '
             'a file in a Google Storage Bucket.',
        default='gs://my-bucket/receive/xxx.txt')
    parser.add_argument('--output', dest='output', required=False,
                        help='Output Avro to Cloud Storage',
                        default='gs://my-bucket/')
    parser.add_argument('--schema', dest='schema', required=False,
                        help='Avro Schema',
                        default='gs://my-bucket/xxx.avsc')
    parser.add_argument('--delimiter', dest='delimiter', required=False,
                        help='Delimiter to split input records.',
                        default='|')
    parser.add_argument('--fields', dest='fields', required=False,
                        help='list of field names expected',
                        default='Col1,Col2...etc')
    known_args, pipeline_args = parser.parse_known_args(argv)
    row_transformer = RowTransformer(delimiter=known_args.delimiter,
                                     header=known_args.fields,
                                     filename=ntpath.basename(known_args.input))
    p_opts = pipeline_options.PipelineOptions(pipeline_args)

    with beam.Pipeline(options=p_opts) as pipeline:
        schema_file = avro.schema.parse(open(known_args.schema, "rb").read())
        rows = pipeline | "Read from text file" >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
        dict_records = rows | "Convert to Avro" >> beam.Map(lambda r: row_transformer.parse(r))
        dict_records | "Write to Cloud Storage as Avro" >> beam.io.avroio.WriteToAvro(known_args.output,schema=schema_file)
run()

标签: google-cloud-dataflowapache-beam

解决方案


您需要使用apache_beam.io.gcp.gcsio该类而不是beam.io.ReadFromText仅读取本地文件,https://beam.apache.org/documentation/sdks/pydoc/2.6.0/apache_beam.io.gcp.gcsio.html


推荐阅读