apache-spark - 使用 foreachRDD 和 paho 将 PySpark 结果发送到 MQTT 代理
问题描述
我正在尝试将带有计算结果的 DStream 发送到 MQTT 代理,但 foreachRDD 不断崩溃。
我正在运行带有 Bahir 的 Spark 2.4.3,用于从 git master 编译的 MQTT 订阅。到目前为止一切正常。在尝试使用 MQTT 发布我的结果之前,我尝试了 saveAsFiles(),这很有效(但不完全是我想要的)。
def sendPartition(part):
# code for publishing with MQTT here
return 0
mydstream = MQTTUtils.createStream(ssc, brokerUrl, topic)
mydstream = packets.map(change_format) \
.map(lambda mac: (mac, 1)) \
.reduceByKey(lambda a, b: a + b)
mydstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition)) # line 56
我得到的错误是这样的:
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/SPARK_HOME/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
r = self.func(t, *rdds)
File "/SPARK_HOME/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 161, in <lambda>
func = lambda t, rdd: old_func(rdd)
File "/path/to/my/code.py", line 56, in <lambda>
mydstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
File "/SPARK_HOME/python/lib/pyspark.zip/pyspark/rdd.py", line 806, in foreachPartition
self.mapPartitions(func).count() # Force evaluation
File "/SPARK_HOME/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/SPARK_HOME/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/SPARK_HOME/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
vals = self.mapPartitions(func).collect()
File "/SPARK_HOME/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/SPARK_HOME/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/SPARK_HOME/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.: java.lang.IllegalArgumentException: Unsupported class file major version 55
有很多java错误,但我怀疑错误在我的代码中。
解决方案
你能运行其他 Spark 命令吗?在堆栈跟踪的末尾,您会看到java.lang.IllegalArgumentException: Unsupported class file major version 55
. 这表明您在不受支持的 Java 版本上运行 Spark。
Spark 尚不兼容 Java 11(我认为是由于 Scala 施加的限制)。尝试将 spark 配置为使用 Java 8。具体情况会根据您使用的平台而有所不同。您可能需要安装 Java 8,并更改JAVA_HOME
环境变量以指向新安装。
推荐阅读
- django - handling duplicate objects with custom slugs via mixins
- html - 如何向屏幕阅读器指示 span 元素?
- java - 无法读取 azure 上应用程序资源文件夹中的 json 文件
- jquery - Kendo UI DataSource/Grid 更改客户端过滤器参数
- visual-studio - Visual Studio 中 TFS 警报的电子邮件替代方案?
- windows - Windows 应用程序将什么交给 Windows 子系统进行打印作业?是文字吗?
- python - Match a 2-digit with separated comma (float digits not included)
- git - 如何使用 Jenkins Pipeline SCM 脚本支持多分支发布?
- javascript - 我的作业中出现无限循环错误;不知道怎么解决
- python-3.x - Pillow & Tkinter 不显示具有透明度的 PNG