python - PySpark - ForEachBatch 中的循环导致“SparkContext 只能在驱动程序上创建和访问”错误
问题描述
我正在尝试从 Python 字典中读取数据转换,将它们应用到 DataFrame 上的 forEachBatch 中,然后在 PySpark 中输出数据:
我在 Spark Streaming 中调用 writeStream 如下:
pipeline.writeStream \
.queryName(self.pipeline_name) \
.foreachBatch(self.transformation_chain()) \
.start()
其中transformation_chain是这个方法:
def transformation_chain(self):
def chain(df, epoch_id):
df_in_edit = df
df_in_edit = filter_out_null(df_in_edit)
df_in_edit = split_one_stream_into_two(df_in_edit, "direction", "velocity")
#for activity in self.data_transformations_workflow:
# df_in_edit = activity["transformation"](df_in_edit, *activity["args"])
df_in_edit.coalesce(1).write.save(path=self.output_path, format=self.output_format, mode='append')
return chain
但是我想应用一个通用 DataTransformations 列表,它以 DF 作为输入和输出一个 DF,因此指定
data_transformations_workflow = [
{
"transformation": filter_out_sensor_columns_presence_validation,
"args": ("value", "direction velocity")
},
{
"transformation": split_one_stream_into_two,
"args": ("direction", "velocity")
}
]
然后将其迭代为:
def transformation_chain(self):
def chain(df, epoch_id):
df_in_edit = df
for activity in self.data_transformations_workflow:
df_in_edit = activity["transformation"](df_in_edit, *activity["args"])
df_in_edit.coalesce(1).write.save(path=self.output_path, format=self.output_format, mode='append')
return chain
但是我在 PySpark 中收到此错误:
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
:: loading settings :: url = jar:file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-30b1f701-8caa-47db-b5fe-5408ccfe09b2;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
found org.apache.kafka#kafka-clients;2.4.1 in central
found com.github.luben#zstd-jni;1.4.4-3 in central
found org.lz4#lz4-java;1.7.1 in central
found org.xerial.snappy#snappy-java;1.1.7.5 in central
found org.slf4j#slf4j-api;1.7.30 in central
found org.spark-project.spark#unused;1.0.0 in central
found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 697ms :: artifacts dl 13ms
:: modules in use:
com.github.luben#zstd-jni;1.4.4-3 from central in [default]
org.apache.commons#commons-pool2;2.6.2 from central in [default]
org.apache.kafka#kafka-clients;2.4.1 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in [default]
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from central in [default]
org.lz4#lz4-java;1.7.1 from central in [default]
org.slf4j#slf4j-api;1.7.30 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 9 | 0 | 0 | 0 || 9 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-30b1f701-8caa-47db-b5fe-5408ccfe09b2
confs: [default]
0 artifacts copied, 9 already retrieved (0kB/16ms)
21/08/18 09:40:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
/usr/local/spark/python/lib/pyspark.zip/pyspark/shell.py:42: UserWarning: Failed to initialize Spark session.
Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shell.py", line 38, in <module>
spark = SparkSession._create_shell_session() # type: ignore
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 543, in _create_shell_session
return SparkSession.builder\
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 228, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 384, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
SparkContext._assert_on_driver()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/context.py", line 1277, in _assert_on_driver
raise Exception("SparkContext should only be created and accessed on the driver.")
Exception: SparkContext should only be created and accessed on the driver.
似乎它在每次迭代时都在重新初始化 Spark。如果我在 For 循环之后执行 CSV 写入操作,我只会收到错误。如果我把它放在 for 循环之前它会起作用(但这显然不是我想要实现的)
所以看起来,这个循环的组合,它执行与以前完全相同的功能和 csv 写入导致错误?
我很感激任何其他提示!我不认为这个错误真的很有帮助,因为我并没有真正在 Executor 中使用 Context
非常感谢和最好的问候!
更新1:
def chain(df, epoch_id):
df_in_edit = df
data_transformations_workflow_local = [
{
"transformation": filter_out_sensor_columns_presence_validation,
"args": ("value", "direction velocity")
},
{
"transformation": split_one_stream_into_two,
"args": ("direction", "velocity")
}
]
for activity in data_transformations_workflow_local:
df_in_edit = activity["transformation"](df_in_edit, *activity["args"])
df_in_edit.coalesce(1).write.save(path=self.output_path, format=self.output_format, mode='append')
return chain
本地声明变量没有帮助:
self
实际上是我自己为这个类设计的一个类:
class DataPipelineGenerator:
pipeline_name = "pipeline"
subscribed_topic = "subscribed"
data_transformations_workflow = [
{
"transformation": filter_out_sensor_columns_presence_validation,
"args": ("value", "direction velocity")
},
{
"transformation": split_one_stream_into_two,
"args": ("direction", "velocity")
}
]
output_path = "/home/jovyan/work/notebooks/pipelines/"
output_format = "csv"
read_stream = []
def __init__(self, pipeline_name, subscribed_topic, data_transformations_workflow, output_path, output_format,spark):
self.pipeline_name = pipeline_name
self.subscribed_topic = subscribed_topic
self.output_path = output_path
self.output_format = output_format
self.read_stream = self.init_read_stream(spark)
def init_read_stream(self, spark):
return spark \
.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', bootstrap_servers) \
.option("startingOffsets", "earliest") \
.option('subscribe', self.subscribed_topic) \
.load() \
.selectExpr('CAST(key AS STRING)', 'CAST(value AS STRING)') \
.select(json_tuple(col("value"), "ts", "key", "value")) \
.selectExpr("c0 as ts", "c1 as key", "c2 as value") \
.withColumn("key_temp", expr("substring(key, {}, length(key))".format(clipping_prefix))) \
.drop("key") \
.withColumnRenamed("key_temp", "key")
def init_write_stream(self):
return self.read_stream.writeStream \
.queryName(self.pipeline_name) \
.foreachBatch(self.transformation_chain()) \
.start()
def transformation_chain(self):
def chain(df, epoch_id):
df_in_edit = df
data_transformations_workflow_local = [
{
"transformation": filter_out_sensor_columns_presence_validation,
"args": ("value", "direction velocity")
},
{
"transformation": split_one_stream_into_two,
"args": ("direction", "velocity")
}
]
for activity in data_transformations_workflow_local:
df_in_edit = activity["transformation"](df_in_edit, *activity["args"])
df_in_edit.coalesce(1).write.save(path=self.output_path, format=self.output_format, mode='append')
return chain
更新2:我想我可能已经找到了问题。在我从中导入数据转换的 Python 文件中,调用了以下内容:
emp_RDD = spark.sparkContext.emptyRDD()
因此,在每次导入/使用此文件时,都会尝试创建一个新的上下文。但是我仍然不确定为什么当我从文件中单独使用这些方法时没有发生这个问题(比如迭代地使用它而不是 for 循环)
解决方案
当您将对象字段或方法传递给 spark 函数时,spark 驱动程序会序列化整个对象。可能用“self”引用的对象也将 SparkContext 作为字段,这就是 SparkContext 到达 spark 执行器的方式。
因此,解决方案是让 data_transformations_workflow 列表成为链函数中的局部变量。通常,避免此错误的一个好方法是将此类数据结构和用户定义的函数定位在单独的模块中。
来自 OP 的进一步信息后的补充:符合我之前提到的一般方法的最安全方法:
- 将chain函数放在单独的python模块中,并在如上所示的模块中导入
- 将链函数称为:.foreachBatch(chain)
推荐阅读
- mysql - 计算当前通过房间的房间正在进行的课程,开始时间,结束时间,日期
- javascript - 添加逗号和小数
- c# - 使用蓝牙在 xamarin 中接收数据
- r - 单撇号的 Dplyr SELECT 问题
- kubernetes - 使用 gcloud 和 kubectl 切换帐户是否有更好的工作流程?
- ios - iphone部分机型inputAccessoryView和安全区底部重叠
- bazel - 将 maven 工件下载到存储库的推荐规则(maven_install vs jvm_import_external)
- windows - 为什么表头的字体在 SQL Shell (psql) 中不能正确显示?
- javascript - 一个函数,用于计算元素的出现或频率并返回一个对象,但删除出现一个的元素并返回其余元素
- ssas - MDX 合并两个查询