首页 > 解决方案 > 从 Google Cloud Storage 读取 Dataproc

问题描述

我正在尝试从 Dataproc pyspark 应用程序中的 GCS 读取 csv 或 txt 文件。我已经尝试了很多东西。迄今为止最有希望的:

#!/usr/bin/python 
import os
import sys
import pyspark
from pyspark.sql import SQLContext
import pandas as pd
from pyspark import SparkContext, SparkConf

os.environ['SPARK_HOME'] = "/usr/lib/spark/"
sys.path.append("/usr/lib/spark/python/")

sc =SparkContext()
sql_sc = SQLContext(sc)

要么:

pandas_df = pd.read_csv('{BUCKET}/user2user_relations.csv')
s_df = sql_sc.createDataFrame(pandas_df)

或者

data = sc.textFile('gs://{BUCKET}/user2user_relations.csv')

不需要 pandas 解决方法。我想最终得到一个用于 spark ALS 推荐的 RDD。我得到的错误是:

Job [fdcad5bcf77343e2b8782097cd7450cb] submitted.
Waiting for job output...
18/08/08 16:50:20 INFO org.spark_project.jetty.util.log: Logging initialized @2662ms
18/08/08 16:50:20 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT
18/08/08 16:50:20 INFO org.spark_project.jetty.server.Server: Started @2751ms
18/08/08 16:50:20 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector@41335d47{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
18/08/08 16:50:21 INFO com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.7-hadoop2
18/08/08 16:50:21 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at new-try-gcs-pd-m/10.164.0.2:8032
18/08/08 16:50:24 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1533726146765_0018
dataproc-d3d8d55c-05b3-4211-adf2-2014ebdbc20c-europe-west4
go-de-internal
gs://dataproc-d3d8d55c-05b3-4211-adf2-2014ebdbc20c-europe-west4/
gs://dataproc-d3d8d55c-05b3-4211-adf2-2014ebdbc20c-europe-west4/user2user_relations.csv
Traceback (most recent call last):
  File "/tmp/fdcad5bcf77343e2b8782097cd7450cb/pyspark_gcs_acess.py", line 19, in <module>
    pandas_df = pd.read_csv(input_file)
  File "/usr/lib/python2.7/dist-packages/pandas/io/parsers.py", line 452, in parser_f
    return _read(filepath_or_buffer, kwds)
  File "/usr/lib/python2.7/dist-packages/pandas/io/parsers.py", line 234, in _read
    parser = TextFileReader(filepath_or_buffer, **kwds)
  File "/usr/lib/python2.7/dist-packages/pandas/io/parsers.py", line 542, in __init__
    self._make_engine(self.engine)
  File "/usr/lib/python2.7/dist-packages/pandas/io/parsers.py", line 679, in _make_engine
    self._engine = CParserWrapper(self.f, **self.options)
  File "/usr/lib/python2.7/dist-packages/pandas/io/parsers.py", line 1041, in __init__
    self._reader = _parser.TextReader(src, **kwds)
  File "parser.pyx", line 332, in pandas.parser.TextReader.__cinit__ (pandas/parser.c:3218)
  File "parser.pyx", line 559, in pandas.parser.TextReader._setup_parser_source (pandas/parser.c:5594)
IOError: File gs://{}/user2user_relations.csv does not exist
18/08/08 16:50:28 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark@41335d47{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [fdcad5bcf77343e2b8782097cd7450cb] entered state [ERROR] while waiting for [DONE].

谢谢

标签: google-cloud-platformgoogle-cloud-storagegoogle-cloud-dataproc

解决方案


Pandas 不会使用您提供的 URI 直接从 GCS 读取。与 Dataproc 中的 Spark 不同,后者默认安装了 GCS 连接器。

我建议您是否想从 pandas 读取相同的 blob 到:

from google.cloud import storage
client = storage.Client()
# https://console.cloud.google.com/storage/browser/[bucket-id]/
bucket = client.get_bucket('bucket-id-here')
# Then do other things...
blob = bucket.get_blob('remote/path/to/file.txt')

df = pd.read_csv(blob.download_as_string())
  • 或者,首先将文件复制到本地目录,使用命令gsutil cp gs://bucket/blob /local/folder/blob
df = pd.read_csv('/local/folder/blob')

希望能帮助到你。


推荐阅读