首页 > 解决方案 > writing corrupt data from kafka / json datasource in spark structured streaming

问题描述

In spark batch jobs I usually have a JSON datasource written to a file and can use corrupt column features of the DataFrame reader to write the corrupt data out in a seperate location, and another reader to write the valid data both from the same job. ( The data is written as parquet )

But in Spark Structred Streaming I'm first reading the stream in via kafka as a string and then using from_json to get my DataFrame. Then from_json uses JsonToStructs which uses a FailFast mode in the parser and does not return the unparsed string to a column in the DataFrame. (see Note in Ref) Then how can I write corrupt data that doesn't match my schema and possibly invalid JSON to another location using SSS?

Finally in the batch job the same job can write both dataframes. But Spark Structured Streaming requires special handling for multiple sinks. Then in Spark 2.3.1 (my current version) we should include details about how to write both corrupt and invalid streams properly...

Ref: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Expression-JsonToStructs.html

val rawKafkaDataFrame=spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", config.broker)
  .option("kafka.ssl.truststore.location", path.toString)
  .option("kafka.ssl.truststore.password", config.pass)
  .option("kafka.ssl.truststore.type", "JKS")
  .option("kafka.security.protocol", "SSL")
  .option("subscribe", config.topic)
  .option("startingOffsets", "earliest")

  .load()

val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))

// does not provide a corrupt column or way to work with corrupt
jsonDataFrame.select(from_json(col("value"), schema)).select("jsontostructs(value).*")

标签: apache-sparkapache-spark-sqlspark-structured-streaming

解决方案


当您从字符串转换为 json 时,如果它无法使用提供的模式进行解析,它将返回 null。您可以过滤空值并选择字符串。像这样的东西。

val jsonDF =  jsonDataFrame.withColumn("json", from_json(col("value"), schema))
val invalidJsonDF = jsonDF.filter(col("json").isNull).select("value")

推荐阅读