apache-kafka - 如何使用 writeStream 将 Spark 流传递给 kafka 主题
问题描述
我正在使用提供流的推特流功能。我需要使用 Spark writeStream 函数,例如:writeStream function link
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
“df”需要是流数据集/数据帧。如果 df 是一个普通的 DataFrame,它会给出错误,显示 'writeStream' 只能在流数据集/DataFrame 上调用;
我已经完成了: 1. 从 twitter 获取流 2. 过滤并映射它以获取每个 twitt 的标签(正、负、自然)
最后一步是 groupBy 标记和计数,并将其传递给 Kafka。
你们知道如何将 Dstream 转换为流数据集/数据帧吗?
编辑:ForeachRDD 函数确实将 Dstream 更改为普通 DataFrame。但是“writeStream”只能在流数据集/数据帧上调用。(上面提供了writeStream链接)
org.apache.spark.sql.AnalysisException:“writeStream”只能在流数据集/数据帧上调用;
解决方案
如何将 Dstream 转换为流数据集/DataFrame?
DStream
是一系列 RDD 的抽象。
流式传输Dataset
是一系列Dataset
s 的“抽象”(我使用引号,因为流式传输和批处理 s 之间的区别是 s的Dataset
属性)。isStreaming
Dataset
可以将 a 转换DStream
为流式传输Dataset
以保持DStream
.
我认为你并不真的想要它。
您所需要的只是使用推文DStream
并将它们保存到 Kafka 主题(并且您认为您需要结构化流)。我认为您只需要 Spark SQL(结构化流的底层引擎)。
伪代码如下(抱歉,自从我使用老式的 Spark Streaming 以来已经有一段时间了):
val spark: SparkSession = ...
val tweets = DStream...
tweets.foreachRDD { rdd =>
import spark.implicits._
rdd.toDF.write.format("kafka")...
}
推荐阅读
- python - GCP 上的多线程 Python 脚本中的 IO 速度突然下降
- amazon-web-services - 解决 dynamodb 的 terraform 中的“ValidationException:TimeToLive 已禁用”错误
- docker - SSH无法在带有traefik的docker上使用Gitea
- utf-8 - 在 colab 中解码 gb-2312 文件
- apache-spark - 如何将 MariaDB Connector/J 与 Pyspark 一起用于 JDBC?
- powershell - 使用 PowerShell 从 .csv 更新 Active Directory 管理器属性
- c# - 以编程方式关闭与 IIS 的 HTTP/2 连接
- google-apps-script - 根据单元格值将行移动到另一个工作表的第一行
- google-apps-script - 将两个不同工作表的不同范围导出到一个 PDF 文件
- php - 用户元 - 其他配置文件字段 - 添加新用户时不显示 - Wordpress