scala - 使用 scala 和 spark 将 Stream Dataframe 转换为 Spark Dataframe
问题描述
我有以下流数据框。
+----------------------------------
|______value______________________|
| I am going to school |
| why are you crying |
| You are not very good my friend |
我使用以下代码创建了上述数据框
val readStream = existingSparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("failOnDataLoss", false)
.option("subscribe", "myTopic.raw")
.load()
我想将相同的流数据帧存储到火花数据帧中。可以在scala和spark中转换吗?因为最后我想将火花数据帧转换为句子列表。流数据框的问题是我无法将其直接转换为可以迭代并执行一些数据处理操作的列表。
解决方案
您应该能够对从 Kafka 获得的流执行许多标准操作,但您需要考虑批处理和流处理之间语义上的差异 - 请参阅Spark 文档。
此外,当您从 Kafka 获取数据时,列集是固定的,并且您会获得一个二进制有效负载,您需要将value
列转换为字符串或类似的内容(请参阅文档):
val df = readStream.select($"value".cast("string").alias("sentences"))
之后,您将获得一个带有实际有效负载的数据帧,并开始处理。根据处理的复杂性,您可能需要恢复到foreachBatch功能,但这可能不是必需的 - 您需要提供有关您需要执行哪种处理的更多详细信息。
推荐阅读
- git - 在 master 中完成工作后如何 git push 到分支?
- powershell - 从文本文件中读取并添加多个防火墙规则
- angular - Angular 6.0 firebase 托管部署不起作用
- vue.js - 如何将可编辑数据传递给组件?
- javascript - 不会崩溃且运行良好的文件夹
- spring - 在 Spring Cloud Stream 消息正文中找到的嵌入式标头
- amazon-web-services - Amazon Rekognition for Video - getFaceSearch:索引号
- sympy - 乳胶的圆括号(矩阵(…))
- logging - 如何在 Carte 上运行的 Pentaho Kettle Job 中为数据库日志连接名称使用变量?
- javascript - Chrome - 即使在使用“不安全内联”之后,仍然“由于 CSP 而拒绝加载脚本”