apache-spark - DataFrame 到 Dataset 的转换(scala)
问题描述
我正在尝试将 Kafka 消息值解压缩到案例类实例中。(我把消息放在另一边。)
这段代码:
import ss.implicits._
import org.apache.spark.sql.functions._
val enc: Encoder[TextRecord] = Encoders.product[TextRecord]
ss.udf.register("deserialize", (bytes: Array[Byte]) => {
DefSer.deserialize(bytes).asInstanceOf[TextRecord] }
)
val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
inputStream.printSchema
val records = inputStream
.selectExpr(s"deserialize(value) AS record")
records.printSchema
val rec2 = records.as(enc)
rec2.printSchema
产生这个输出:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
root
|-- record: struct (nullable = true)
| |-- eventTime: timestamp (nullable = true)
| |-- lineLength: integer (nullable = false)
| |-- windDirection: float (nullable = false)
| |-- windSpeed: float (nullable = false)
| |-- gustSpeed: float (nullable = false)
| |-- waveHeight: float (nullable = false)
| |-- dominantWavePeriod: float (nullable = false)
| |-- averageWavePeriod: float (nullable = false)
| |-- mWaveDirection: float (nullable = false)
| |-- seaLevelPressure: float (nullable = false)
| |-- airTemp: float (nullable = false)
| |-- waterSurfaceTemp: float (nullable = false)
| |-- dewPointTemp: float (nullable = false)
| |-- visibility: float (nullable = false)
| |-- pressureTendency: float (nullable = false)
| |-- tide: float (nullable = false)
当我到达水槽时
val debugOut = rec2.writeStream
.format("console")
.option("truncate", "false")
.start()
debugOut.awaitTermination()
催化剂抱怨:
Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`eventTime`' given input columns: [record];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
我尝试了很多方法来“拉起 TextRecord”,方法是调用rec2.map(r=>r.getAs[TextRecord](0))
,explode("record")
等,但遇到ClassCastExceptions
.
解决方案
最简单的方法是直接将 inputStream Row 实例映射到 TextRecord,假设它是一个案例类,使用map
函数
import ss.implicits._
val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
val records = inputStream.map(row =>
DefSer.deserialize(row.getAs[Array[Byte]]("value")).asInstanceOf[TextRecord]
)
records
将直接成为一个Dataset[TextRecord]
.
此外,只要您导入 SparkSession 隐式,您就不需要为您的案例类提供编码器类,Scala 会为您隐式执行。
推荐阅读
- c - 在使用 lldb 调试 macOS 应用程序时,是否可以通过观察点观察寄存器的内容以进行更改?
- java - 用Java实现模型-视图-控制器的不同风格
- angular - Angular AsyncValidator 不能按我的意愿工作
- python - 当 fps 高于 15 时 Pygame 蛇速度太高
- numpy - 当某些数据丢失时,NumPy 如何重塑?
- perl - Perl Mojolicious 路由只调用一次 sub
- double - QLCD数字显示双
- rust - Rust 代码可以在没有标准库的情况下编译吗?
- r - 重叠线图的图例
- websocket - websocket和vertx的区别