google-cloud-platform - 尝试使用 PySpark 读取 Dataproc 工作流中的 BigQuery 表时出现问题
问题描述
我正在尝试使用 GCP + Dataproc + PySpark 自动化流程。为此,我创建了以下脚本:
data_project = project_name
data_pop_table = dataset_name.table_name
spark = SparkSession\
.builder\
.master('local[*]')\
.appName('workflow_segmentation')\
.config('spark.local.dir', '/dev/spark')\
.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.17.2")\
.getOrCreate()
data = spark.read\
.format('com.google.cloud.spark.bigquery')\
.option("project", data_project)\
.option("table", data_pop_table)\
.load()
此脚本由使用以下 bash 脚本创建的 Dataproc 工作流使用:
#Creating the job
gcloud dataproc workflow-templates create dataproc_job_name \
--region=us-central1
#Setting up the job (selecting Python version & the source code to run)
gcloud dataproc workflow-templates add-job pyspark file:///root/folder/main.py \
--workflow-template=dataproc_job_name \
--step-id=id_1 \
--region=us-central1
#Setting up the VM
gcloud dataproc workflow-templates set-managed-cluster dataproc_job_name \
--cluster-name=automatic-dataproc-job \
--single-node \
--master-machine-type=n1-standard-32 \
--image-version=1.4 \
--region=us-central1 \
--scopes cloud-platform \
--metadata='PIP_PACKAGES=pandas numpy matplotlib google-cloud-storage' \
--initialization-actions=gs://datastudio_ds/automations-prod/config_files/pip_install.sh
但是,当我运行 DataProc 作业时,我收到以下错误:
Traceback (most recent call last):
File "/root/folder/main.py", line 16, in <module>
fill_as_preprocessing=True)
File "/root/folder/main.py", line 760, in data_adecuation
.option("table",self.data_pop_table)\
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o643.load.
: java.lang.ClassNotFoundException: Failed to find data source: com.google.cloud.spark.bigquery. Please find packages at http://spark.apache.org/third-party-projects.html
我不知道为什么会出现这个错误。事实上,我在 DataProc 集群中运行相同的脚本,它运行良好。如果有人过去遇到过这个问题或知道如何解决它,我将非常感激!
解决方案
为了完成,这个问题通过在 add-job 命令中设置 --jar 标志来解决。--jar 标志必须指定包含到 BigQuery 的 Spark 连接器的 .jar 文件的路径。下面是创建 Dataproc 作业的正确 bash 脚本:
#Creating the job
gcloud dataproc workflow-templates create dataproc_job_name \
--region=us-central1
#Setting up the job (selecting Python version & the source code to run)
gcloud dataproc workflow-templates add-job pyspark file:///root/folder/main.py \
--workflow-template=dataproc_job_name \
--step-id=id_1 \
--jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar \
--region=us-central1
#Setting up the VM
gcloud dataproc workflow-templates set-managed-cluster dataproc_job_name \
--cluster-name=automatic-dataproc-job \
--single-node \
--master-machine-type=n1-standard-32 \
--image-version=1.4 \
--region=us-central1 \
--scopes cloud-platform \
--metadata='PIP_PACKAGES=pandas numpy matplotlib google-cloud-storage' \
--initialization-actions=gs://datastudio_ds/automations-prod/config_files/pip_install.sh
推荐阅读
- python - 使用中间件在 Flask 请求中设置变量以在模板中使用
- excel - Delphi 程序不再启动 Excel
- c++ - 为什么字符数组输出没有空终止符的垃圾?
- python - 在 python 中使用 Panda 反转特殊字符
- android - 在上下文中使用一个Button:RecyclerView中的SecondActivity
- python - Python azure.identity 中 AADCredentials 的替代方案
- jenkins - ${env.VAR}、${VAR} 或 $VAR 定义 Jenkins 声明性管道环境变量的方式有什么区别?
- python - 在 Python 中使用“for”循环和使用星号运算符 (*) 解包来打印元组
- python - 如何将视频帧保存到新文件夹
- python - 由于编码问题,无法正确抓取网页