python - 附加数据文件(.csv、.json)作为要在 Dataflow 上使用的设置包的一部分
问题描述
我正在尝试使用数据流来完成需要使用 .csv 和 .json 文件的任务。据我了解,我应该能够创建一个 setup.py 文件,其中将包含这些文件并将它们分发给多个工作人员。
这是我的文件的布局方式:
pipline.py
setup.py
utils /
-->__init__.py
-->**CSV.csv**
-->**JSON.json**
这是我的 setup.py 文件:
import setuptools
setuptools.setup(name='utils',
version='0.0.1',
description='utils',
packages=setuptools.find_packages(),
package_data={'utils': ['**CSV.csv**', '**JSON.json**']},
include_package_data=True)
这是我的 bean.DoFn 函数:
class DoWork(beam.DoFn):
def process(self, element):
import pandas as pd
df_csv = pd.read_csv('**CSV.csv**')
df_json = pd.read_json('**JSON.json**')
Do other stuff with dataframes
yield [stuff]
我的管道设置如下:
dataflow_options = ['--job_name=pipline',
'--project=pipeline',
'--temp_location=gs://pipeline/temp',
'--staging_location=gs://pipeline/stage',
'--setup_file=./setup.py']
options = PipelineOptions(dataflow_options)
gcloud_options = options.view_as(GoogleCloudOptions)
options.view_as(StandardOptions).runner = 'DataflowRunner'
with beam.Pipeline(options=options) as p:
update = p | beam.Create(files) | beam.ParDo(DoWork())
基本上我不断得到:
IOError: File CSV.csv does not exist
它认为 .json 文件也不存在,只是在到达该步骤之前出错。这些文件可能没有进入数据流,或者我在 DoFn 中错误地引用了它们。我是否应该将文件放入 setup 函数的 data_files 参数而不是 package_data 中?
解决方案
您需要在 gs 中上传输入文件并提供 gs 位置而不是CSV。我认为您在本地运行代码,将 csv 文件与代码放在同一目录中。但是使用 DataflowRunner 运行它需要 gs 中的文件。
推荐阅读
- c# - 如何同时运行 2 个成功的操作?
- dialogflow-es - Google 上的帐户关联操作
- linux - 如何解决 Cron 作业中的预期 EOF
- indexing - RavenDB:如何从 MultiMapIndex 中正确查询/过滤嵌套值?
- reactjs - 在 recharts 中有条件地将边界半径放在 Bar 上
- flutter - 导航到其他页面时保留按钮的状态
- iis - 是否可以使用 IIS 重写模块在服务器而不是客户端上重写 URL
- asp.net - 网格视图内的网格视图 网格视图内的网格视图
- bootstrap-4 - Bootstrap 侧视图在小屏幕中移至底部
- c# - 关于在 C# 中获取后台进程