python - PySpark 2.x:以编程方式将 Maven JAR 坐标添加到 Spark
问题描述
以下是我的 PySpark 启动片段,非常可靠(我已经使用了很长时间)。今天我添加了spark.jars.packages
选项中显示的两个 Maven 坐标(有效地“插入”了 Kafka 支持)。现在通常会触发依赖项下载(由 Spark 自动执行):
import sys, os, multiprocessing
from pyspark.sql import DataFrame, DataFrameStatFunctions, DataFrameNaFunctions
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as sFn
from pyspark.sql.types import *
from pyspark.sql.types import Row
# ------------------------------------------
# Note: Row() in .../pyspark/sql/types.py
# isn't included in '__all__' list(), so
# we must import it by name here.
# ------------------------------------------
num_cpus = multiprocessing.cpu_count() # Number of CPUs for SPARK Local mode.
os.environ.pop('SPARK_MASTER_HOST', None) # Since we're using pip/pySpark these three ENVs
os.environ.pop('SPARK_MASTER_POST', None) # aren't needed; and we ensure pySpark doesn't
os.environ.pop('SPARK_HOME', None) # get confused by them, should they be set.
os.environ.pop('PYTHONSTARTUP', None) # Just in case pySpark 2.x attempts to read this.
os.environ['PYSPARK_PYTHON'] = sys.executable # Make SPARK Workers use same Python as Master.
os.environ['JAVA_HOME'] = '/usr/lib/jvm/jre' # Oracle JAVA for our pip/python3/pySpark 2.4 (CDH's JRE won't work).
JARS_IVY_REPO = '/home/jdoe/SPARK.JARS.REPO.d/'
# ======================================================================
# Maven Coordinates for JARs (and their dependencies) needed to plug
# extra functionality into Spark 2.x (e.g. Kafka SQL and Streaming)
# A one-time internet connection is necessary for Spark to autimatically
# download JARs specified by the coordinates (and dependencies).
# ======================================================================
spark_jars_packages = ','.join(['org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0',
'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0',])
# ======================================================================
spark_conf = SparkConf()
spark_conf.setAll([('spark.master', 'local[{}]'.format(num_cpus)),
('spark.app.name', 'myApp'),
('spark.submit.deployMode', 'client'),
('spark.ui.showConsoleProgress', 'true'),
('spark.eventLog.enabled', 'false'),
('spark.logConf', 'false'),
('spark.jars.repositories', 'file:/' + JARS_IVY_REPO),
('spark.jars.ivy', JARS_IVY_REPO),
('spark.jars.packages', spark_jars_packages), ])
spark_sesn = SparkSession.builder.config(conf = spark_conf).getOrCreate()
spark_ctxt = spark_sesn.sparkContext
spark_reader = spark_sesn.read
spark_streamReader = spark_sesn.readStream
spark_ctxt.setLogLevel("WARN")
但是,当我运行片段(例如)时,插件没有下载和/或加载./python -i init_spark.py
,因为它们应该。
这种机制曾经起作用,但后来停止了。我错过了什么?
先感谢您!
解决方案
这是一种问题比答案更有价值的帖子,因为上面的代码有效,但在 Spark 2.x 文档或示例中找不到。
以上是我通过 Maven Coordinates 以编程方式向 Spark 2.x 添加功能的方式。我有这个工作,但后来它停止工作。为什么?
当我在 a 中运行上述代码时jupyter notebook
,笔记本已经——在幕后——已经通过我的PYTHONSTARTUP
脚本运行了相同的代码片段。该PYTHONSTARTUP
脚本具有与上述相同的代码,但省略了 maven 坐标(按意图)。
那么,这个微妙的问题是如何出现的:
spark_sesn = SparkSession.builder.config(conf = spark_conf).getOrCreate()
因为 Spark Session 已经存在,所以上面的语句只是重用了现有的 session (.getOrCreate()),它没有加载 jars/libraries(同样,因为我的 PYTHONSTARTUP 脚本故意省略了它们)。这就是为什么将打印语句放在 PYTHONSTARTUP 脚本中(否则它们是静默的)是一个好主意的原因。
最后,我只是忘记了这样做:$ unset PYTHONSTARTUP
在启动JupyterLab / Notebook
守护进程之前。
我希望这个问题对其他人有所帮助,因为这就是如何以编程方式向 Spark 2.x(在本例中为 Kafka)添加功能。请注意,您需要 Internet 连接才能从 Maven Central 一次性下载指定的 jar 和递归依赖项。
推荐阅读
- angular - Angular,如何在 FormArray 的特定 FormControl 中设置焦点
- typescript - 如何在渲染页面之前等待firebase加载数据
- javascript - 使用 Object Javascript 在 foreach 循环中的最后一次迭代
- postman - 邮递员在收集开始时定义变量列表一次
- javascript - 如何从“日期范围选择器”禁用每天的“点击”事件?
- embedded - 使用 srec_cat 加入三个二进制文件并填充漏洞
- ember.js - Ember-cli-build,排除组件 ember 插件
- python - 如何从 Process- 或 Thread 实例返回值?
- javascript - Axios 在控制台上打印值但返回未定义
- linux - 在 @INC 中找不到 Parallel/ForkManager.pm