scala - 在使用 scala spark 写入 Kafka 主题之前为 Dataframe 定义模式
问题描述
我有以下数据框(finalDataFrame)架构
root
|-- sentence: string (nullable = true)
|-- category: string (nullable = true)
|-- Id: string (nullable = true)
我已经定义了以下架构
def defineS3SinkSchema() : StructType = {
new StructType()
.add("payload", new StructType()
.add("sentence", StringType)
.add("Id", LongType)
.add("category", StringType)
)
}
我想将上述模式用于上面定义的数据框并写入 kafka 主题。但我不确定如何将定义的模式与数据框集成。以下是写入 kafka 主题的代码。
val jsonFormatData = finalDataFrame.select(col("key").cast("string").alias("key"),
to_json(struct(
col("sentence"),
col("category"),
col("key").as("Id")
)).alias("value"))
jsonFormatData.printSchema()
val writeStream = jsonFormatData
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("topic", "myTopic.val")
.option("checkpointLocation", "test_path")
.start()
writeStream.awaitTermination()
解决方案
推荐阅读
- jenkins - 使用 Jenkins 部署时如何检查 jar 是否成功启动?
- clang++ - 如何散列 Clang AST 节点以用作 C++ 无序映射中的键?
- html - 如何删除引导导航栏上的填充
- c++ - 使用 C++ 编译问题
- c - C在位置插入和替换字符串
- android-toolbar - 工具栏中的标题问题
- firebase - 如何使自定义声明与存储在 Firebase 数据库中的角色保持同步
- java - 使用增强的 For 循环遍历数组并将每个项目添加到输出变量
- python - 新闻 API - 将输出输入 Pandas DataFrame
- python - 使用 python tkinter 库时如何调整选项卡大小?