首页 > 解决方案 > Azure 数据块:KafkaUtils createDirectStream 导致 Py4JNetworkError("Answer from Java side is empty") 错误

问题描述

在 Azure databricks 中,我尝试在笔记本中创建一个 kafka 流并用它来创建一个 spark 作业。Databricks 在 KafkaUtils.createDirectStream() 行抛出错误。下面附上对应的代码。

from kazoo.client import KazooClient
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition

sc = spark.sparkContext
ssc = StreamingContext(sc, 30)
print('SSC created:: {}'.format(ssc))
zk = KazooClient(hosts=kafka_host)
print(kafka_host)
zk.start()
_offset_directory = "/" + topic + "/" + "DA_DAINT" + "/partitions"
print(_offset_directory)

if zk.exists(_offset_directory):
    partitions = zk.get_children(_offset_directory)
    print(partitions)
    partition_offsets_dict = {}
    for partition in partitions:
        offset, stat = zk.get((_offset_directory + '/' + partition))
        partition_offsets_dict[partition] = offset.decode()
    print(partition_offsets_dict)
    from_offset = {}
    for _partition in partitions:
        offset = partition_offsets_dict[_partition]
        topic_partition = TopicAndPartition(topic, int(_partition))
        from_offset[topic_partition] = int(offset)
    print(from_offset)
    print("\nCreate kafka direct stream ...")

    kafka_stream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": broker_list},
                                                 fromOffsets=from_offset)

附加错误堆栈跟踪。

Traceback (most recent call last):
  File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
An error occurred while calling 
o581.createTopicAndPartition Traceback (most recent call last):
File "<command-3832551107104577>", line 77, in <module> fromOffsets=from_offset) 
File "/databricks/spark/python/pyspark/streaming/kafka.py", line 141, in createDirectStream v) for (k, v) in fromOffsets.items()]) 
File "/databricks/spark/python/pyspark/streaming/kafka.py", line 141, in <listcomp> v) for (k, v) in fromOffsets.items()]) 
File "/databricks/spark/python/pyspark/streaming/kafka.py", 
line 314, in _jTopicAndPartition return helper.createTopicAndPartition(self._topic, self._partition) 
File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", 
line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) 
File "/databricks/spark/python/pyspark/sql/utils.py", 
line 63, in deco return f(*a, **kw) 
File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 336, in get_return_value format(target_id, ".", name)) 
py4j.protocol.Py4JError: An error occurred while calling o581.createTopicAndPartition

在 Azure databricks 中,当在 python notebook 中使用 Kafka 流时,我安装了kafka-pythonorg.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1库并将它们作为依赖项添加到 spark -databricks 中的工作。

注 1: 当我在 databricks 笔记本中使用简单的 kafka 消费者时,我也能够从 Kafka 接收数据。

from kafka import KafkaConsumer
if __name__ == "__main__":
    consumer_ = KafkaConsumer(group_id='test', bootstrap_servers=['my_kafka_server:9092'])
    print(consumer_.topics())
    consumer_.subscribe(topics=['dev_test'])
    for m in consumer_:
        print(m)

只有当我尝试在 azure databricks python notebook 中使用KafkaUtils.createDirectStream()创建 Kafka 直接流时,才会出现问题。

用于重现此问题的另一组最小代码,

from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

broker = "broker:9092"
topic = "dev_topic"

sc = spark.sparkContext
ssc = StreamingContext(sc, 30)
dks = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": broker})
print("Direct stream created...")
parsed = dks.map(lambda v: v[1])
summary_dstream = parsed.count().map(lambda x: 'Words in this batch: %s' % x)
print(summary_dstream)

注 2: Kafka 版本:0.10 Scala 版本:2.11 Spark 版本:2.4.3

标签: apache-sparkpysparkspark-streamingazure-databricks

解决方案


我仍然无法找到问题的根本原因。但是使用 jar org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.3 解决了这个问题。

更新 1: 从微软支持团队获得以下更新:

以下是数据砖工程的更新。

我们看到客户正在使用已过时的 DStreams API ( https://docs.microsoft.com/en-us/azure/databricks/spark/latest/rdd-streaming/ ),我们不再支持它。此外,我们强烈建议他们切换到结构化流,您可以按照此文档进行操作 - https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/kafka


推荐阅读