首页 > 解决方案 > apache-beam 从 GCS 存储桶的多个文件夹中读取多个文件并加载它 biquery python

问题描述

我想每小时设置一个管道来解析 GCS 存储桶的不同文件夹中的 2000 个原始 protobuf 格式文件,并将数据加载到大查询中。到目前为止,我能够成功解析原始数据。

我知道读取文件夹中所有文件的通配符方法,但我现在不想这样做,因为我有来自不同文件夹的数据,我想像并行一样更快地运行它,而不是按顺序运行

像下面

for x,filename enumerate(file_separted_comma):
    --read data from prto
    --load data to bigquery 

现在我想知道以下方法是否是从 apache Beam 中的不同文件夹解析多个文件并将数据加载到大查询中的最佳或推荐方法。

还有一件事,从 proto 解析后的每条记录,我都将其转换为 JSON 记录以加载到大查询中,但不知道这也是将数据加载到大查询而不是直接加载反序列化(解析)的好方法原型数据。

我正在从 Hadoop 作业转移到数据流,以通过设置此管道来降低成本。

我是 apache-beam 的新手,不知道什么是优缺点,因此有人可以看看代码并在这里帮助我制定更好的生产方法

import time
import sys
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import csv
import base64
import rtbtracker_log_pb2
from google.protobuf import timestamp_pb2
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToJson
import io
from apache_beam.io.filesystems import FileSystems


def get_deserialized_log(serialized_log):
    log = rtbtracker_log_pb2.RtbTrackerLogProto()
    log.ParseFromString(serialized_log)
    return log


def print_row(message):
    message=message[3]
    message = message.replace('_', '/');
    message = message.replace('*', '=');
    message = message.replace('-', '+');
    #finalbunary=base64.b64decode(message.decode('UTF-8'))
    finalbunary=base64.b64decode(message)
    msg=get_deserialized_log(finalbunary)

    jsonObj = MessageToDict(msg)
    #jsonObj = MessageToJson(msg)
    return jsonObj

def parse_file(element):
  for line in csv.reader([element], quotechar='"', delimiter='\t', quoting=csv.QUOTE_ALL, skipinitialspace=True):
    return line



def run():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", dest="input", required=False)
    parser.add_argument("--output", dest="output", required=False)
    app_args, pipeline_args = parser. parse_known_args()

    with beam.Pipeline(options=PipelineOptions()) as p:
        input_list=app_args.input
        file_list = input_list.split(",")
        res_list = ["/home/file_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]

        for i,file in enumerate(file_list):
            onesec=p | "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file)
            parsingProtoFile=onesec | 'Parse file{}'.format(i) >> beam.Map(parse_file)
            printFileConetent=parsingProtoFile | 'Print output {}'.format(i) >>beam.Map(print_row)
        
            #i want to load to bigquery here
            ##LOAD DATA TO BIGQUERY

            #secondsec=printFileConetent | "Write TExt {}".format(i) >> ##beam.io.WriteToText("/home/file_{}".format(i),file_name_suffix=".json", 
###num_shards=1 , 
##append_trailing_newlines = True)
        

if __name__ == '__main__':
    run()

在本地运行下面的代码

python3 another_main.py --input=tracker_one.gz,tracker_two.gz

输出路径我没有提到,因为我不想将数据保存到 gcs,因为我会将它加载到 bigquery

就像下面在 dataflowrunner 中运行

python3 final_beam_v1.py --input gs://bucket/folder/2020/12/23/00/00/fileread.gz --output gs://bucket/beamoutput_four/ --runner DataflowRunner --project PROJECT --staging_location gs://bucket/staging_four --temp_location gs://bucket/temp_four --region us-east1 --setup_file ./setup.py --job_name testing

注意到两个作业将针对同一个作业名称中的单个输入文件运行,并且不知道为什么会发生这种情况,并且 PFA 屏幕截图相同在此处输入图像描述

标签: pythonprotocol-buffersgoogle-cloud-dataflowapache-beam

解决方案


这种读取文件的方法很好(只要输入文件的数量不太大)。但是,如果您可以将要读取的文件集表示为通配符表达式(可以匹配多个文件夹),则可能会执行得更好,并且 Dataflow 将并行读取与该模式匹配的所有文件。

要写入 BigQuery,最好使用内置的 BigQuery sink。默认行为是以 JSON 格式创建临时文件,然后将其加载到 BigQuery 中,但您也可以改用 Avro,这样会更有效。您还可以使用Flatten将所有输入合并到一个 PCollection 中,这样您的管道中只需要一个 BigQuery 接收器。


推荐阅读