首页 > 解决方案 > 无法通过 Pyspark readStream 从 Kafka 读取数据

问题描述

我正在尝试使用 pyspark readStream 将数据从 kafka 主题读取到我的本地计算机。我能够正确地 printSchema 只是无法以可读格式查看数据。

我正在使用 avaro 格式模式。我还检查了我正确使用它们的正确软件包。我已经用 maven 构建了我自己的包,你可以看到一个 core-snapshot 包。为此我使用过spark-core_2.12:3.1.2

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars",
"jar/JavaSpark-1.0-CORE-SNAPSHOT.jar,
jar/spark-streaming-kafka-0-10_2.12-3.1.2.jar,
jar/spark-sql-kafka-0-10_2.12-3.1.2.jar,jar/kafka-clients-2.6.0.jar,
jar/spark-token-provider-kafka-0-10_2.12-3.1.2.jar,
jar/spark-avro_2.12-3.1.2.jar").getOrCreate() 

下面是我从流中读取数据的代码。

.... kafka connection ....

ds1 = spark.readStream.format("kafka").option(<<--rest of kafka connection options-->>).load()

schema_String = '''{
    "fields": [
      {
        "name": "id",
        "type": "string"
      },
      {
        "name": "name",
        "type": "string"
      },
      {
        "name": "age",
        "type": "double"
      },
      {
        "name": "address",
        "type": "string"
      }
    ],
    "name": "person_info",
    "type": "record"
}'''
output = ds1.select(from_avro("value", schema_String).alias("user")).select("user.*")
output.printSchema()
output.writeStream.format("console").start()

如果我只是尝试运行它。我能够看到架构。但看不到任何价值。而是可以看到错误

KafkaOffsetReaderConsumer: Error in attempt 1 getting Kafka offsets: 
java.lang.NullPointerException
    at org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:60)
    at org.apache.spark.sql.kafka010.ConsumerStrategy.setAuthenticationConfigIfNeeded(ConsumerStrategy.scala:61)
    at org.apache.spark.sql.kafka010.ConsumerStrategy.setAuthenticationConfigIfNeeded$(ConsumerStrategy.scala:60)
    at org.apache.spark.sql.kafka010.SubscribeStrategy.setAuthenticationConfigIfNeeded(ConsumerStrategy.scala:102)
    at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:106)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:82)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:533)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:578)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:577)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:531)
    at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:48)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:531)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchEarliestOffsets(KafkaOffsetReaderConsumer.scala:293)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:148)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:145)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:392)
    at scala.Option.getOrElse(Option.scala:189)
.....

如果我尝试用

output.writeStream.format("console").start().awaitTermination()

然后我看不到任何错误,但我看到了类似的东西。

NetworkClient: [Consumer clientId=consumer-spark-kafka-source-3e718355-6963-4f4b-837c-048c53ffb8a0-963583285-driver-0-1, groupId=spark-kafka-source-3e718355-6963-4f4b-837c-048c53ffb8a0-963583285-driver-0] Bootstrap broker broker--address--:port (id: -1 rack: null) disconnected

-- same message repeatedly after every second --

标签: apache-sparkpysparkapache-kafkaspark-structured-streaming

解决方案


推荐阅读