首页 > 解决方案 > 如何将我们的自定义代码(将 json 转换为 csv)用于 Google 云数据流?

问题描述

我有自己的代码将 json 转换为 csv ..我需要将此代码放入 gcp 并开发一个数据流作业(它将从云存储中读取 json 并转换为 csv 并再次放入云存储中)

p = beam.Pipeline(options=PipelineOptions())
class to_csv(beam.DoFn): 
    def process(self,f):
        columns_list = ["col1",]
        with open(f,"r") as f1:
            dd= ast.literal_eval(f1.readlines()[0])
            for each in dd['data']:
                for ELEMENT in each["values"]:
                    for KEY,VALUE in ELEMENT.items():

                        if KEY=="value" and type(VALUE)==dict:
                            columns_list.extend(VALUE.keys())
        new_col_list = list(set(columns_list))
        sample = pd.DataFrame(columns=new_col_list)

    #Adding values to the table structure

        for each in range(len(dd['data'])):
            empty_dict = {}
            empty_dict["col1"] = dd['data'][each]["id"]
            ["description"]


            for ELEMENT in dd['data'][each]["values"]:
                for KEY,VALUE in ELEMENT.items():
                    if KEY=="value":
                        if type(VALUE)==int:
                            empty_dict["value"]=VALUE
                        elif type(VALUE)==dict:
                            temp_df = pd.DataFrame().from_dict(VALUE,orient="index").T
                            for ind in temp_df.columns:
                                empty_dict[ind] = temp_df[ind][0]       
                    elif KEY=="end_time":
                        end_time_lis = VALUE
            empty_dict["end_time"] = end_time_lis

            sample = sample.append(empty_dict,ignore_index=True)



parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
result =p.run()
#loading the data from the source file
data_from_source = (p | 'ReadMyFile' >> ReadFromText("sourcepath"))
data_from_source | 'Convert To Csv' >> beam.ParDo(to_csv())| 'exportresult'>>WriteToText('outputpath')

标签: pythongoogle-cloud-dataflow

解决方案


Beam TextIO 支持从 GCS 读取和写入。因此,要从 GCS 读取,请将 'ReadFromText("sourcepath")' 替换为 'ReadFromText("gs://my-bucket/sourcepath")'。

如果你想要一个文件,写起来会有点困难。您可以将“WriteToText('outputpath')”替换为“WriteToText('gs://my-bucket/outprefix')”,它会为每个分片写入一个文件 gs://my-bucket/outprefix-0001-of -00262。如果您需要单个文件输出,Cloud Functions 是合并输出文件的一种选择。


推荐阅读