python - 如何将我们的自定义代码(将 json 转换为 csv)用于 Google 云数据流?
问题描述
我有自己的代码将 json 转换为 csv ..我需要将此代码放入 gcp 并开发一个数据流作业(它将从云存储中读取 json 并转换为 csv 并再次放入云存储中)
p = beam.Pipeline(options=PipelineOptions())
class to_csv(beam.DoFn):
def process(self,f):
columns_list = ["col1",]
with open(f,"r") as f1:
dd= ast.literal_eval(f1.readlines()[0])
for each in dd['data']:
for ELEMENT in each["values"]:
for KEY,VALUE in ELEMENT.items():
if KEY=="value" and type(VALUE)==dict:
columns_list.extend(VALUE.keys())
new_col_list = list(set(columns_list))
sample = pd.DataFrame(columns=new_col_list)
#Adding values to the table structure
for each in range(len(dd['data'])):
empty_dict = {}
empty_dict["col1"] = dd['data'][each]["id"]
["description"]
for ELEMENT in dd['data'][each]["values"]:
for KEY,VALUE in ELEMENT.items():
if KEY=="value":
if type(VALUE)==int:
empty_dict["value"]=VALUE
elif type(VALUE)==dict:
temp_df = pd.DataFrame().from_dict(VALUE,orient="index").T
for ind in temp_df.columns:
empty_dict[ind] = temp_df[ind][0]
elif KEY=="end_time":
end_time_lis = VALUE
empty_dict["end_time"] = end_time_lis
sample = sample.append(empty_dict,ignore_index=True)
parser = argparse.ArgumentParser(description=__doc__,formatter_class=argparse.RawDescriptionHelpFormatter)
result =p.run()
#loading the data from the source file
data_from_source = (p | 'ReadMyFile' >> ReadFromText("sourcepath"))
data_from_source | 'Convert To Csv' >> beam.ParDo(to_csv())| 'exportresult'>>WriteToText('outputpath')
解决方案
Beam TextIO 支持从 GCS 读取和写入。因此,要从 GCS 读取,请将 'ReadFromText("sourcepath")' 替换为 'ReadFromText("gs://my-bucket/sourcepath")'。
如果你想要一个文件,写起来会有点困难。您可以将“WriteToText('outputpath')”替换为“WriteToText('gs://my-bucket/outprefix')”,它会为每个分片写入一个文件 gs://my-bucket/outprefix-0001-of -00262。如果您需要单个文件输出,Cloud Functions 是合并输出文件的一种选择。
推荐阅读
- javascript - 循环时数组操作不起作用
- jquery - 使用 jquery 选择器检测 pnotify 的显示或加载
- c++ - 在复合运算符上使用 static_cast
- odata - UI5:如何将 Edm.DateTime (V2) 类型的日期添加到 XML 视图?
- graph-databases - 如何使用联合/项目/选择在 gremlin 中获得组合顶点结果?
- mysql - Docker WordPress 图像无法连接到数据库
- vba - 添加新字段后 MS Access 错误 2105“您无法转到指定的记录”
- powershell - 创建使用 PowerShell 脚本的 GitHub 操作
- php - 将简单图像挤压到 .obj
- google-sheets - 有没有办法根据输出范围的条件进行 SUMIFS?