首页 > 解决方案 > 使用 foreachRDD 和 paho 将 PySpark 结果发送到 MQTT 代理

问题描述

我正在尝试将带有计算结果的 DStream 发送到 MQTT 代理,但 foreachRDD 不断崩溃。

我正在运行带有 Bahir 的 Spark 2.4.3,用于从 git master 编译的 MQTT 订阅。到目前为止一切正常。在尝试使用 MQTT 发布我的结果之前,我尝试了 saveAsFiles(),这很有效(但不完全是我想要的)。


def sendPartition(part):
    # code for publishing with MQTT here
    return 0


mydstream = MQTTUtils.createStream(ssc, brokerUrl, topic)
mydstream = packets.map(change_format) \
    .map(lambda mac: (mac, 1)) \
    .reduceByKey(lambda a, b: a + b)
mydstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition)) # line 56

我得到的错误是这样的:

org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/SPARK_HOME/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/SPARK_HOME/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 161, in <lambda>
    func = lambda t, rdd: old_func(rdd)
  File "/path/to/my/code.py", line 56, in <lambda>
    mydstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
  File "/SPARK_HOME/python/lib/pyspark.zip/pyspark/rdd.py", line 806, in foreachPartition
    self.mapPartitions(func).count()  # Force evaluation
  File "/SPARK_HOME/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/SPARK_HOME/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/SPARK_HOME/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
    vals = self.mapPartitions(func).collect()
  File "/SPARK_HOME/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/SPARK_HOME/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/SPARK_HOME/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.: java.lang.IllegalArgumentException: Unsupported class file major version 55

有很多java错误,但我怀疑错误在我的代码中。

标签: apache-sparkpyspark

解决方案


你能运行其他 Spark 命令吗?在堆栈跟踪的末尾,您会看到java.lang.IllegalArgumentException: Unsupported class file major version 55. 这表明您在不受支持的 Java 版本上运行 Spark。

Spark 尚不兼容 Java 11(我认为是由于 Scala 施加的限制)。尝试将 spark 配置为使用 Java 8。具体情况会根据您使用的平台而有所不同。您可能需要安装 Java 8,并更改JAVA_HOME环境变量以指向新安装。


推荐阅读