首页 > 解决方案 > 如何使用 writeStream 将 Spark 流传递给 kafka 主题

问题描述

我正在使用提供流的推特流功能。我需要使用 Spark writeStream 函数,例如:writeStream function link

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

“df”需要是流数据集/数据帧。如果 df 是一个普通的 DataFrame,它会给出错误,显示 'writeStream' 只能在流数据集/DataFrame 上调用;

我已经完成了: 1. 从 twitter 获取流 2. 过滤并映射它以获取每个 twitt 的标签(正、负、自然)

最后一步是 groupBy 标记和计数,并将其传递给 Kafka。

你们知道如何将 Dstream 转换为流数据集/数据帧吗?

编辑:ForeachRDD 函数确实将 Dstream 更改为普通 DataFrame。但是“writeStream”只能在流数据集/数据帧上调用。(上面提供了writeStream链接)

org.apache.spark.sql.AnalysisException:“writeStream”只能在流数据集/数据帧上调用;

标签: apache-kafkaspark-streamingspark-structured-streaming

解决方案


如何将 Dstream 转换为流数据集/DataFrame?

DStream是一系列 RDD 的抽象。

流式传输Dataset是一系列Datasets 的“抽象”(我使用引号,因为流式传输和批处理 s 之间的区别是 s的Dataset属性)。isStreamingDataset

可以将 a 转换DStream为流式传输Dataset以保持DStream.

我认为你并不真的想要它。

您所需要的只是使用推文DStream并将它们保存到 Kafka 主题(并且您认为您需要结构化流)。我认为您只需要 Spark SQL(结构化流的底层引擎)。

伪代码如下(抱歉,自从我使用老式的 Spark Streaming 以来已经有一段时间了):

val spark: SparkSession = ...
val tweets = DStream...
tweets.foreachRDD { rdd =>
  import spark.implicits._
  rdd.toDF.write.format("kafka")...
}

推荐阅读