apache-spark - writing corrupt data from kafka / json datasource in spark structured streaming
问题描述
In spark batch jobs I usually have a JSON datasource written to a file and can use corrupt column features of the DataFrame reader to write the corrupt data out in a seperate location, and another reader to write the valid data both from the same job. ( The data is written as parquet )
But in Spark Structred Streaming I'm first reading the stream in via kafka as a string and then using from_json to get my DataFrame. Then from_json uses JsonToStructs which uses a FailFast mode in the parser and does not return the unparsed string to a column in the DataFrame. (see Note in Ref) Then how can I write corrupt data that doesn't match my schema and possibly invalid JSON to another location using SSS?
Finally in the batch job the same job can write both dataframes. But Spark Structured Streaming requires special handling for multiple sinks. Then in Spark 2.3.1 (my current version) we should include details about how to write both corrupt and invalid streams properly...
Ref: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Expression-JsonToStructs.html
val rawKafkaDataFrame=spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.broker)
.option("kafka.ssl.truststore.location", path.toString)
.option("kafka.ssl.truststore.password", config.pass)
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.security.protocol", "SSL")
.option("subscribe", config.topic)
.option("startingOffsets", "earliest")
.load()
val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
// does not provide a corrupt column or way to work with corrupt
jsonDataFrame.select(from_json(col("value"), schema)).select("jsontostructs(value).*")
解决方案
当您从字符串转换为 json 时,如果它无法使用提供的模式进行解析,它将返回 null。您可以过滤空值并选择字符串。像这样的东西。
val jsonDF = jsonDataFrame.withColumn("json", from_json(col("value"), schema))
val invalidJsonDF = jsonDF.filter(col("json").isNull).select("value")
推荐阅读
- tensorflow - 分布式张量流:首席工作者的工作是什么?
- python-3.x - 如何使用 selenium python 识别 GitHub 页面标题徽标 invertocat
- c++ - Boost beast::websocket 回调函数
- javascript - 使用javascript获取不在单独文件中的xml的属性
- github-pages - Github 页面不使用 index.html
- footer - 修复粘性页脚而不重叠正文内容
- android - 自动完成搜索片段如何仅在按钮 onclick 上可见?
- javascript - 使用嵌套对象调用 setState 不会正确更新状态
- ajax - X-Editable - 不显示来自模态的 ajax 响应 href 的编辑框
- qt - 将文件复制到输出目录 QT