首页 > 解决方案 > 尝试使用 PySpark 读取 Dataproc 工作流中的 BigQuery 表时出现问题

问题描述

我正在尝试使用 GCP + Dataproc + PySpark 自动化流程。为此,我创建了以下脚本:

data_project = project_name
data_pop_table = dataset_name.table_name

spark = SparkSession\             
             .builder\
             .master('local[*]')\
             .appName('workflow_segmentation')\
             .config('spark.local.dir', '/dev/spark')\
             .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.17.2")\
             .getOrCreate()

data = spark.read\
            .format('com.google.cloud.spark.bigquery')\
            .option("project", data_project)\
            .option("table", data_pop_table)\
            .load()

此脚本由使用以下 bash 脚本创建的 Dataproc 工作流使用:

#Creating the job
gcloud dataproc workflow-templates create dataproc_job_name \
    --region=us-central1

#Setting up the job (selecting Python version & the source code to run)
gcloud dataproc workflow-templates add-job pyspark file:///root/folder/main.py \
    --workflow-template=dataproc_job_name \
    --step-id=id_1 \
    --region=us-central1

#Setting up the VM
gcloud dataproc workflow-templates set-managed-cluster dataproc_job_name \
    --cluster-name=automatic-dataproc-job \
    --single-node \
    --master-machine-type=n1-standard-32 \
    --image-version=1.4 \
    --region=us-central1 \
    --scopes cloud-platform \
    --metadata='PIP_PACKAGES=pandas numpy matplotlib google-cloud-storage' \
    --initialization-actions=gs://datastudio_ds/automations-prod/config_files/pip_install.sh

但是,当我运行 DataProc 作业时,我收到以下错误:

Traceback (most recent call last):
  File "/root/folder/main.py", line 16, in <module>
    fill_as_preprocessing=True)
  File "/root/folder/main.py", line 760, in data_adecuation
    .option("table",self.data_pop_table)\
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o643.load.
: java.lang.ClassNotFoundException: Failed to find data source: com.google.cloud.spark.bigquery. Please find packages at http://spark.apache.org/third-party-projects.html

我不知道为什么会出现这个错误。事实上,我在 DataProc 集群中运行相同的脚本,它运行良好。如果有人过去遇到过这个问题或知道如何解决它,我将非常感激!

标签: google-cloud-platformpysparkgoogle-cloud-dataproc

解决方案


为了完成,这个问题通过在 add-job 命令中设置 --jar 标志来解决。--jar 标志必须指定包含到 BigQuery 的 Spark 连接器的 .jar 文件的路径。下面是创建 Dataproc 作业的正确 bash 脚本:

#Creating the job
gcloud dataproc workflow-templates create dataproc_job_name \
    --region=us-central1

#Setting up the job (selecting Python version & the source code to run)
gcloud dataproc workflow-templates add-job pyspark file:///root/folder/main.py \
    --workflow-template=dataproc_job_name \
    --step-id=id_1 \
    --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar \
    --region=us-central1

#Setting up the VM
gcloud dataproc workflow-templates set-managed-cluster dataproc_job_name \
    --cluster-name=automatic-dataproc-job \
    --single-node \
    --master-machine-type=n1-standard-32 \
    --image-version=1.4 \
    --region=us-central1 \
    --scopes cloud-platform \
    --metadata='PIP_PACKAGES=pandas numpy matplotlib google-cloud-storage' \
    --initialization-actions=gs://datastudio_ds/automations-prod/config_files/pip_install.sh

推荐阅读