apache-spark - 无法通过 Pyspark readStream 从 Kafka 读取数据
问题描述
我正在尝试使用 pyspark readStream 将数据从 kafka 主题读取到我的本地计算机。我能够正确地 printSchema 只是无法以可读格式查看数据。
我正在使用 avaro 格式模式。我还检查了我正确使用它们的正确软件包。我已经用 maven 构建了我自己的包,你可以看到一个 core-snapshot 包。为此我使用过spark-core_2.12:3.1.2
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.jars",
"jar/JavaSpark-1.0-CORE-SNAPSHOT.jar,
jar/spark-streaming-kafka-0-10_2.12-3.1.2.jar,
jar/spark-sql-kafka-0-10_2.12-3.1.2.jar,jar/kafka-clients-2.6.0.jar,
jar/spark-token-provider-kafka-0-10_2.12-3.1.2.jar,
jar/spark-avro_2.12-3.1.2.jar").getOrCreate()
下面是我从流中读取数据的代码。
.... kafka connection ....
ds1 = spark.readStream.format("kafka").option(<<--rest of kafka connection options-->>).load()
schema_String = '''{
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "double"
},
{
"name": "address",
"type": "string"
}
],
"name": "person_info",
"type": "record"
}'''
output = ds1.select(from_avro("value", schema_String).alias("user")).select("user.*")
output.printSchema()
output.writeStream.format("console").start()
如果我只是尝试运行它。我能够看到架构。但看不到任何价值。而是可以看到错误
KafkaOffsetReaderConsumer: Error in attempt 1 getting Kafka offsets:
java.lang.NullPointerException
at org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:60)
at org.apache.spark.sql.kafka010.ConsumerStrategy.setAuthenticationConfigIfNeeded(ConsumerStrategy.scala:61)
at org.apache.spark.sql.kafka010.ConsumerStrategy.setAuthenticationConfigIfNeeded$(ConsumerStrategy.scala:60)
at org.apache.spark.sql.kafka010.SubscribeStrategy.setAuthenticationConfigIfNeeded(ConsumerStrategy.scala:102)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:106)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:82)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:533)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:578)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:577)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:531)
at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:48)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:531)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchEarliestOffsets(KafkaOffsetReaderConsumer.scala:293)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:148)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:145)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:392)
at scala.Option.getOrElse(Option.scala:189)
.....
如果我尝试用
output.writeStream.format("console").start().awaitTermination()
然后我看不到任何错误,但我看到了类似的东西。
NetworkClient: [Consumer clientId=consumer-spark-kafka-source-3e718355-6963-4f4b-837c-048c53ffb8a0-963583285-driver-0-1, groupId=spark-kafka-source-3e718355-6963-4f4b-837c-048c53ffb8a0-963583285-driver-0] Bootstrap broker broker--address--:port (id: -1 rack: null) disconnected
-- same message repeatedly after every second --
解决方案
推荐阅读
- git - 有没有办法将 Git 提交历史作为文件系统来浏览?
- typescript - 打字稿打字相关类型
- javascript - Jquery-仅使用具有类属性的第一个元素动态创建 ul
- python - 引用参数的 GNU 并行转义空间和 python 无法解析标志
- google-cloud-vision - 无法加载 VALIDATE 数据集
- ios - Xcode 12 中的 SwiftUI @ViewBuilder 问题无法在 View 扩展中编译项目
- spring-boot - 无法在 Apache Camel 中创建队列连接工厂
- php - 这是一个充分准备好的陈述吗?
- mysql - 不同联合的 SQL 计数
- c# - XML Ref + Id 构造-> 自定义实现与否?