apache-spark - Azure 数据块:KafkaUtils createDirectStream 导致 Py4JNetworkError("Answer from Java side is empty") 错误
问题描述
在 Azure databricks 中,我尝试在笔记本中创建一个 kafka 流并用它来创建一个 spark 作业。Databricks 在 KafkaUtils.createDirectStream() 行抛出错误。下面附上对应的代码。
from kazoo.client import KazooClient
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
sc = spark.sparkContext
ssc = StreamingContext(sc, 30)
print('SSC created:: {}'.format(ssc))
zk = KazooClient(hosts=kafka_host)
print(kafka_host)
zk.start()
_offset_directory = "/" + topic + "/" + "DA_DAINT" + "/partitions"
print(_offset_directory)
if zk.exists(_offset_directory):
partitions = zk.get_children(_offset_directory)
print(partitions)
partition_offsets_dict = {}
for partition in partitions:
offset, stat = zk.get((_offset_directory + '/' + partition))
partition_offsets_dict[partition] = offset.decode()
print(partition_offsets_dict)
from_offset = {}
for _partition in partitions:
offset = partition_offsets_dict[_partition]
topic_partition = TopicAndPartition(topic, int(_partition))
from_offset[topic_partition] = int(offset)
print(from_offset)
print("\nCreate kafka direct stream ...")
kafka_stream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": broker_list},
fromOffsets=from_offset)
附加错误堆栈跟踪。
Traceback (most recent call last):
File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
response = connection.send_command(command)
File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
An error occurred while calling
o581.createTopicAndPartition Traceback (most recent call last):
File "<command-3832551107104577>", line 77, in <module> fromOffsets=from_offset)
File "/databricks/spark/python/pyspark/streaming/kafka.py", line 141, in createDirectStream v) for (k, v) in fromOffsets.items()])
File "/databricks/spark/python/pyspark/streaming/kafka.py", line 141, in <listcomp> v) for (k, v) in fromOffsets.items()])
File "/databricks/spark/python/pyspark/streaming/kafka.py",
line 314, in _jTopicAndPartition return helper.createTopicAndPartition(self._topic, self._partition)
File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name)
File "/databricks/spark/python/pyspark/sql/utils.py",
line 63, in deco return f(*a, **kw)
File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 336, in get_return_value format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o581.createTopicAndPartition
在 Azure databricks 中,当在 python notebook 中使用 Kafka 流时,我安装了kafka-python和org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1库并将它们作为依赖项添加到 spark -databricks 中的工作。
注 1: 当我在 databricks 笔记本中使用简单的 kafka 消费者时,我也能够从 Kafka 接收数据。
from kafka import KafkaConsumer
if __name__ == "__main__":
consumer_ = KafkaConsumer(group_id='test', bootstrap_servers=['my_kafka_server:9092'])
print(consumer_.topics())
consumer_.subscribe(topics=['dev_test'])
for m in consumer_:
print(m)
只有当我尝试在 azure databricks python notebook 中使用KafkaUtils.createDirectStream()创建 Kafka 直接流时,才会出现问题。
用于重现此问题的另一组最小代码,
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
broker = "broker:9092"
topic = "dev_topic"
sc = spark.sparkContext
ssc = StreamingContext(sc, 30)
dks = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": broker})
print("Direct stream created...")
parsed = dks.map(lambda v: v[1])
summary_dstream = parsed.count().map(lambda x: 'Words in this batch: %s' % x)
print(summary_dstream)
注 2: Kafka 版本:0.10 Scala 版本:2.11 Spark 版本:2.4.3
解决方案
我仍然无法找到问题的根本原因。但是使用 jar org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.3 解决了这个问题。
更新 1: 从微软支持团队获得以下更新:
以下是数据砖工程的更新。
我们看到客户正在使用已过时的 DStreams API ( https://docs.microsoft.com/en-us/azure/databricks/spark/latest/rdd-streaming/ ),我们不再支持它。此外,我们强烈建议他们切换到结构化流,您可以按照此文档进行操作 - https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/kafka
推荐阅读
- angular - 定义后直接订阅服务中的可观察对象
- java - 如何在 Windows 下翻译 JFileChooser 列标题?
- javascript - 在 Javascript 中使用 google 服务帐户 json 文件是否安全?
- ios - UICollectionView 单元格幻影动画
- java - 由于 org.springframework.dao.DataIntegrityViolationException 导致同步失败
- c# - 选择不同的 DataTable 行
- webpack - 无法让 PostCSS 与 Webpack 一起使用
- javascript - Draft.js - CompositeDecorator:有没有办法将信息从策略传递到组件?
- java - 多个 Spring Components\ 相同类型的 Bean
- algorithm - 需要帮助理解 Prolog append/3 和 inverse/2 并跟踪输出