apache-spark - 写入 Kafka 接收器 Nullpointer 异常
问题描述
我正在尝试写一个 Kafka 主题并收到 NullPointerException。这发生在消耗了几百条记录之后。同样奇怪的是,每次我重新启动进程时,在再次弹出异常之前都会消耗相同的记录。
从笔记本中发布我认为代码的相关部分;
df_tweets = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_servers) \
.option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")\
.option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")\
.option("security.protocol", "PLAINTEXT")\
.option("subscribe", "Covid-topic") \
.option("startingOffsets", "latest") \
.load()
schema = StructType([\
StructField("loc", StringType(), True),\
StructField("id", StringType(), True),\
StructField("text", StringType(), True),\
StructField("createdOnDate", StringType(), True),\
StructField("lang", StringType(), True)
])
spark_lines = df_tweets\
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as json") \
.select("key", f.from_json("json", schema).alias("data")) \
.select("key", "data.*")\
.where("key is not NULL")\
.where("id is not NULL")\
.select("key", "id")
spark_writer = spark_lines\
.selectExpr("CAST(key AS STRING) AS key", "to_json(struct(id)) AS value")\
.writeStream \
.format("kafka") \
.option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")\
.option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")\
.option("security.protocol", "PLAINTEXT")\
.option("kafka.bootstrap.servers", kafka_servers) \
.option("topic", "Covid-spark") \
.option("checkpointLocation", "/tmp/kafka-checkpoint") \
.start()
完整的错误信息是;
> 21/07/05 06:45:24 ERROR MicroBatchExecution: Query [id =
> 1b60ba2e-ae2d-4feb-8253-296dd9ae7c6d, runId =
> c260aeeb-dd61-40f7-b9f6-b6549957ed93] terminated with error
> java.lang.NullPointerException at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:613)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
我尝试过滤空值并限制仅写入两个字段以尝试了解正在发生的事情。任何想法为什么会发生这种情况以及如何解决它?
解决方案
推荐阅读
- angular - 表单字段以 Angular 中的“必需”错误启动
- python - 将 Geojson 信息转换为 geopandas 几何
- amazon-web-services - 无转发路径的 AWS 负载均衡器路径基本路由
- javascript - WebPack 5 TerserWebpackPlugin content-v2 文件夹清理
- kubernetes - 如何在 Kubernetes 中将流量从 .svc.cluster.local 重定向到 .svc.k8s.my-domain.com?
- r - 在 R 中基于组和按行计算差异
- scala - Scala:Spark SQL即使存在也无法解析列名
- node.js - bcrypt 与 nodejs 比较
- python - 当我导出到文件时,字符“被复制(使用 python pandas)
- r - 在带有对数刻度的ggplot a中使用极坐标时,如何为标签创建“内环”