apache-spark - kafkashaded.org.apache.kafka.common.errors.TimeoutException:60000 毫秒后更新元数据失败
问题描述
我目前正在研究用例,我正在将 pyspark 数据框写入 confluent-kafka 主题。
def write_data(rows):
rows.selectExpr("to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "xxx.aws.confluent.cloud:9092") \
.option("topic", "test_topic") \
.save()
dataframe.foreachPartition(write_data)
下面是我得到的错误。
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
File "<command-2315>", line 32, in write_data
AttributeError: 'itertools.chain' object has no attribute 'selectExpr'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:514)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:650)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:633)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:468)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
在该主题上启用的身份验证是 `SASL PLAIN。我想知道,我将数据帧写入 confluent-kafka 主题的方法是否正确?还是我还需要添加其他配置。
我是新来的火花。任何帮助,将不胜感激。
解决方案
推荐阅读
- android - 找不到 Azure Pipeline gradle build local.properties'
- spring - 试图在两个服务之间传递 DTO,但出现异常:没有合适的 HttpMessageConverter(尽管显然有)
- android - 反应电容器本机应用程序中的数据存储多个同步
- nim-lang - 如何在 nim 中创建不同过程的查找表?
- java - java中的struct.unpack('f', self.infile.read(4))[0] python代码对应的是哪个?
- java - HttpSession 属性:getAttribute() 仅传递/获取循环中的最后一个值
- git - 无论进行什么合并,Github 上的分支都无法达到奇偶校验
- lua - 更改另一个脚本 lua 中的所有变量名
- html - R Markdown 不使用 R Studio 编织到 Word
- activerecord - 将左连接查询转换为 Yii2 db 活动记录