首页 > 解决方案 > 附加数据文件(.csv、.json)作为要在 Dataflow 上使用的设置包的一部分

问题描述

我正在尝试使用数据流来完成需要使用 .csv 和 .json 文件的任务。据我了解,我应该能够创建一个 setup.py 文件,其中将包含这些文件并将它们分发给多个工作人员。

这是我的文件的布局方式:

pipline.py
setup.py
utils /
  -->__init__.py
  -->**CSV.csv**
  -->**JSON.json**

这是我的 setup.py 文件:

import setuptools

setuptools.setup(name='utils',
                 version='0.0.1',
                 description='utils',
                 packages=setuptools.find_packages(),
                 package_data={'utils': ['**CSV.csv**', '**JSON.json**']},
                 include_package_data=True)

这是我的 bean.DoFn 函数:

class DoWork(beam.DoFn):
    def process(self, element):

        import pandas as pd

        df_csv = pd.read_csv('**CSV.csv**')
        df_json = pd.read_json('**JSON.json**')

        Do other stuff with dataframes

        yield [stuff]

我的管道设置如下:

dataflow_options = ['--job_name=pipline',
                    '--project=pipeline',
                    '--temp_location=gs://pipeline/temp',
                    '--staging_location=gs://pipeline/stage',
                    '--setup_file=./setup.py']

options = PipelineOptions(dataflow_options)
gcloud_options = options.view_as(GoogleCloudOptions)
options.view_as(StandardOptions).runner = 'DataflowRunner'

with beam.Pipeline(options=options) as p:
    update = p | beam.Create(files) | beam.ParDo(DoWork())

基本上我不断得到:

IOError: File CSV.csv does not exist

它认为 .json 文件也不存在,只是在到达该步骤之前出错。这些文件可能没有进入数据流,或者我在 DoFn 中错误地引用了它们。我是否应该将文件放入 setup 函数的 data_files 参数而不是 package_data 中?

标签: pythongoogle-cloud-platformgoogle-cloud-dataflowapache-beam

解决方案


您需要在 gs 中上传输入文件并提供 gs 位置而不是CSV。我认为您在本地运行代码,将 csv 文件与代码放在同一目录中。但是使用 DataflowRunner 运行它需要 gs 中的文件。


推荐阅读