首页 > 解决方案 > 使用 scala 和 spark 将 Stream Dataframe 转换为 Spark Dataframe

问题描述

我有以下流数据框。

+----------------------------------
|______value______________________| 
| I am going to school         |   
| why are you crying         | 
| You are not very good my friend |

我使用以下代码创建了上述数据框

val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("failOnDataLoss", false)
      .option("subscribe", "myTopic.raw")
      .load()

我想将相同的流数据帧存储到火花数据帧中。可以在scala和spark中转换吗?因为最后我想将火花数据帧转换为句子列表。流数据框的问题是我无法将其直接转换为可以迭代并执行一些数据处理操作的列表。

标签: scaladataframeapache-sparkspark-structured-streaming

解决方案


您应该能够对从 Kafka 获得的流执行许多标准操作,但您需要考虑批处理和流处理之间语义上的差异 - 请参阅Spark 文档

此外,当您从 Kafka 获取数据时,列集是固定的,并且您会获得一个二进制有效负载,您需要将value列转换为字符串或类似的内容(请参阅文档):

val df = readStream.select($"value".cast("string").alias("sentences"))

之后,您将获得一个带有实际有效负载的数据帧,并开始处理。根据处理的复杂性,您可能需要恢复到foreachBatch功能,但这可能不是必需的 - 您需要提供有关您需要执行哪种处理的更多详细信息。


推荐阅读