首页 > 解决方案 > 如何在谷歌作曲家上安装 dask

问题描述

我试图在谷歌作曲家(气流)上安装 dask。我使用 pypi (GCP UI) 添加 dask 和以下所需的包(不确定是否需要所有的谷歌,但找不到 require.txt):

 dask
 toolz
 partd
 cloudpickle
 google-cloud
 google-cloud-storage
 google-auth
 google-auth-oauthlib
 decorator

当我运行具有 dd.read_csv("a gcp bucket") 的 DAG 时,它在气流日志中显示以下错误:

    [2018-10-24 22:25:12,729] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/dask/bytes/core.py", line 350, in get_fs_token_paths
    [2018-10-24 22:25:12,733] {base_task_runner.py:98} INFO - Subtask:     fs, fs_token = get_fs(protocol, options)
    [2018-10-24 22:25:12,735] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/dask/bytes/core.py", line 473, in get_fs
    [2018-10-24 22:25:12,740] {base_task_runner.py:98} INFO - Subtask:     "Need to install `gcsfs` library for Google Cloud Storage support\n"
    [2018-10-24 22:25:12,741] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/dask/utils.py", line 94, in import_required
    [2018-10-24 22:25:12,748] {base_task_runner.py:98} INFO - Subtask:     raise RuntimeError(error_msg)
    [2018-10-24 22:25:12,751] {base_task_runner.py:98} INFO - Subtask: RuntimeError: Need to install `gcsfs` library for Google Cloud Storage support
    [2018-10-24 22:25:12,756] {base_task_runner.py:98} INFO - Subtask:     conda install gcsfs -c conda-forge
    [2018-10-24 22:25:12,758] {base_task_runner.py:98} INFO - Subtask:     or
    [2018-10-24 22:25:12,762] {base_task_runner.py:98} INFO - Subtask:     pip install gcsfs

所以我尝试使用 pypi 安装 gcsfs,但出现以下气流错误:

{
  insertId:  "17ks763f726w1i"  
  logName:  "projects/xxxxxxxxx/logs/airflow-worker"  
  receiveTimestamp:  "2018-10-25T15:42:24.935880717Z"  
  resource: {…}  
  severity:  "ERROR"  
  textPayload:  "Traceback (most recent call last):
  File "/usr/local/bin/gcsfuse", line 7, in <module>
   from gcsfs.cli.gcsfuse import main
  File "/usr/local/lib/python2.7/site- 
    packages/gcsfs/cli/gcsfuse.py", line 3, in <module>
     fuse import FUSE
    ImportError: No module named fuse
 "  
  timestamp:  "2018-10-25T15:41:53Z"  
}

似乎它被困在所需包裹的循环中!不确定我是否在这里错过了什么?有什么想法吗?

标签: airflowdaskgoogle-cloud-composer

解决方案


你不需要在你的 PyPi 包中添加存储,它已经安装好了。我运行了一个 dag (image-version:composer-1.3.0-airflow-1.10.0) 记录预装包的版本,它似乎是 1.13.0。为了复制您的案例,我还在我的 dag 中添加了以下内容:

import dask.dataframe as dd
def read_csv_dask():
    df = dd.read_csv('gs://gcs_path/data.csv')
    logging.info("csv from gs://gcs_path/ read alright")

在此之前,我通过 UI 添加了以下依赖项:

dask==0.20.0
toolz==0.9.0
partd==0.3.9
cloudpickle==0.6.1

相应的任务失败并显示与您相同的消息(“需要安装gcsfs库以支持 Google 云存储”),此时我返回 UI 并尝试添加gcsfs==0.1.2. 这从未成功。但是,我没有收到您所做的错误,而是因为“Composer Backend timed out”而反复失败。

此时,您可以考虑以下替代方案:

1) 在 BashOperator 中使用 pip 安装 gcsfs。这不是最优的,因为每次运行 dag 时都会安装 gcsfs。

2)使用另一个库。你在用这个 csv 做什么?如果您将其上传到gs://composer_gcs_bucket/data/目录(检查此处),您可以使用例如 csv 标准库来读取它,如下所示:

import csv
def read_csv():
    f=open('/home/airflow/gcs/data/data.csv', 'rU')
    reader = csv.reader(f)

推荐阅读