airflow - 如何在谷歌作曲家上安装 dask
问题描述
我试图在谷歌作曲家(气流)上安装 dask。我使用 pypi (GCP UI) 添加 dask 和以下所需的包(不确定是否需要所有的谷歌,但找不到 require.txt):
dask
toolz
partd
cloudpickle
google-cloud
google-cloud-storage
google-auth
google-auth-oauthlib
decorator
当我运行具有 dd.read_csv("a gcp bucket") 的 DAG 时,它在气流日志中显示以下错误:
[2018-10-24 22:25:12,729] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/dask/bytes/core.py", line 350, in get_fs_token_paths
[2018-10-24 22:25:12,733] {base_task_runner.py:98} INFO - Subtask: fs, fs_token = get_fs(protocol, options)
[2018-10-24 22:25:12,735] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/dask/bytes/core.py", line 473, in get_fs
[2018-10-24 22:25:12,740] {base_task_runner.py:98} INFO - Subtask: "Need to install `gcsfs` library for Google Cloud Storage support\n"
[2018-10-24 22:25:12,741] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/dask/utils.py", line 94, in import_required
[2018-10-24 22:25:12,748] {base_task_runner.py:98} INFO - Subtask: raise RuntimeError(error_msg)
[2018-10-24 22:25:12,751] {base_task_runner.py:98} INFO - Subtask: RuntimeError: Need to install `gcsfs` library for Google Cloud Storage support
[2018-10-24 22:25:12,756] {base_task_runner.py:98} INFO - Subtask: conda install gcsfs -c conda-forge
[2018-10-24 22:25:12,758] {base_task_runner.py:98} INFO - Subtask: or
[2018-10-24 22:25:12,762] {base_task_runner.py:98} INFO - Subtask: pip install gcsfs
所以我尝试使用 pypi 安装 gcsfs,但出现以下气流错误:
{
insertId: "17ks763f726w1i"
logName: "projects/xxxxxxxxx/logs/airflow-worker"
receiveTimestamp: "2018-10-25T15:42:24.935880717Z"
resource: {…}
severity: "ERROR"
textPayload: "Traceback (most recent call last):
File "/usr/local/bin/gcsfuse", line 7, in <module>
from gcsfs.cli.gcsfuse import main
File "/usr/local/lib/python2.7/site-
packages/gcsfs/cli/gcsfuse.py", line 3, in <module>
fuse import FUSE
ImportError: No module named fuse
"
timestamp: "2018-10-25T15:41:53Z"
}
似乎它被困在所需包裹的循环中!不确定我是否在这里错过了什么?有什么想法吗?
解决方案
你不需要在你的 PyPi 包中添加存储,它已经安装好了。我运行了一个 dag (image-version:composer-1.3.0-airflow-1.10.0) 记录预装包的版本,它似乎是 1.13.0。为了复制您的案例,我还在我的 dag 中添加了以下内容:
import dask.dataframe as dd
def read_csv_dask():
df = dd.read_csv('gs://gcs_path/data.csv')
logging.info("csv from gs://gcs_path/ read alright")
在此之前,我通过 UI 添加了以下依赖项:
dask==0.20.0
toolz==0.9.0
partd==0.3.9
cloudpickle==0.6.1
相应的任务失败并显示与您相同的消息(“需要安装gcsfs
库以支持 Google 云存储”),此时我返回 UI 并尝试添加gcsfs==0.1.2
. 这从未成功。但是,我没有收到您所做的错误,而是因为“Composer Backend timed out”而反复失败。
此时,您可以考虑以下替代方案:
1) 在 BashOperator 中使用 pip 安装 gcsfs。这不是最优的,因为每次运行 dag 时都会安装 gcsfs。
2)使用另一个库。你在用这个 csv 做什么?如果您将其上传到gs://composer_gcs_bucket/data/
目录(检查此处),您可以使用例如 csv 标准库来读取它,如下所示:
import csv
def read_csv():
f=open('/home/airflow/gcs/data/data.csv', 'rU')
reader = csv.reader(f)
推荐阅读
- c++ - 如何在 C++ 中创建类型列表的笛卡尔积?
- mongoose - 如何在通过调用 mongoose save() 方法更新某些行时停止制作版本
- openstreetmap - 为 openstreetmap 获取实用的 maxspeed
- javascript - VueJS:当用户搜索某些东西时过滤对象数组
- python - XPath 或 BeautifulSoup(或其他方式)来选择和解析某个 div 块
- geode - java连接被拒绝错误与spring boot数据geode远程定位器
- java - OptaPlanner:如何在没有 .xml 配置文件的情况下构建求解器?
- angular - 如何在Angular中@import CSS文件?
- html - 如果未选中复选框,则禁用按钮
- java - 未捕获事务异常