首页 > 解决方案 > 写入 Kafka 接收器 Nullpointer 异常

问题描述

我正在尝试写一个 Kafka 主题并收到 NullPointerException。这发生在消耗了几百条记录之后。同样奇怪的是,每次我重新启动进程时,在再次弹出异常之前都会消耗相同的记录。

从笔记本中发布我认为代码的相关部分;


    df_tweets = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_servers) \
        .option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")\
        .option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")\
        .option("security.protocol", "PLAINTEXT")\
        .option("subscribe", "Covid-topic") \
        .option("startingOffsets", "latest") \
        .load()

    schema = StructType([\
    StructField("loc", StringType(), True),\
    StructField("id", StringType(), True),\
    StructField("text", StringType(), True),\
    StructField("createdOnDate", StringType(), True),\
    StructField("lang", StringType(), True)
    ])

    spark_lines = df_tweets\
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as json") \
        .select("key", f.from_json("json", schema).alias("data")) \
        .select("key", "data.*")\
        .where("key is not NULL")\
        .where("id is not NULL")\
        .select("key", "id")

    spark_writer = spark_lines\
        .selectExpr("CAST(key AS STRING) AS key", "to_json(struct(id)) AS value")\
        .writeStream \
        .format("kafka") \
        .option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")\
        .option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")\
        .option("security.protocol", "PLAINTEXT")\
        .option("kafka.bootstrap.servers", kafka_servers) \
        .option("topic", "Covid-spark") \
        .option("checkpointLocation", "/tmp/kafka-checkpoint") \
        .start()

完整的错误信息是;

> 21/07/05 06:45:24 ERROR MicroBatchExecution: Query [id =
> 1b60ba2e-ae2d-4feb-8253-296dd9ae7c6d, runId =
> c260aeeb-dd61-40f7-b9f6-b6549957ed93] terminated with error
> java.lang.NullPointerException    at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
>   at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:613)
>   at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
>   at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
>   at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>   at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>   at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>   at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>   at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>   at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>   at
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>   at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>   at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>   at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)

我尝试过滤空值并限制仅写入两个字段以尝试了解正在发生的事情。任何想法为什么会发生这种情况以及如何解决它?

标签: apache-sparkpysparkapache-spark-sqlspark-streamingapache-kafka-streams

解决方案


推荐阅读