apache-spark - Spark Streaming:文本数据源仅支持单列
问题描述
我正在使用Kafka
数据,然后将数据流式传输到HDFS
.
存储在Kafka
主题trial
中的数据如下:
hadoop
hive
hive
kafka
hive
但是,当我提交代码时,它会返回:
线程“主”中的异常
org.apache.spark.sql.streaming.StreamingQueryException: Text data source supports only a single column, and you have 7 columns.;
=== Streaming Query ===
Identifier: [id = 2f3c7433-f511-49e6-bdcf-4275b1f1229a, runId = 9c0f7a35-118a-469c-990f-af00f55d95fb]
Current Committed Offsets: {KafkaSource[Subscribe[trial]]: {"trial":{"2":13,"1":13,"3":12,"0":13}}}
Current Available Offsets: {KafkaSource[Subscribe[trial]]: {"trial":{"2":13,"1":13,"3":12,"0":14}}}
我的问题是:如上所示,存储的数据Kafka
只包含一列,为什么程序说有7 columns
?
任何帮助表示赞赏。
我的spark-streaming
代码:
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder.master("local[4]")
.appName("SpeedTester")
.config("spark.driver.memory", "3g")
.getOrCreate()
val ds = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.95.20:9092")
.option("subscribe", "trial")
.option("startingOffsets" , "earliest")
.load()
.writeStream
.format("text")
.option("path", "hdfs://192.168.95.21:8022/tmp/streaming/fixed")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
.awaitTermination()
}
解决方案
结构化流 + Kafka 集成指南中对此进行了解释:
源中的每一行都具有以下架构:
列类型
密钥二进制
值二进制
主题字符串
分区整数
偏移长
时间戳长
时间戳类型 int
这正好给出了七列。如果您只想写入有效负载(值),请选择它并转换为字符串:
spark.readStream
...
.load()
.selectExpr("CAST(value as string)")
.writeStream
...
.awaitTermination()
推荐阅读
- r - 鉴于最新的 R 版本,无法安装软件包
- javascript - 如何在 Firebase 上添加子项,然后获取并显示信息?
- javascript - 如何通过单击使用循环动态创建的按钮来获取按钮的值
- android - 从 ViewHolder 类更新项目数据
- javascript - 求和嵌套值(如果存在)
- javascript - 如何为数组上的每个元素运行函数
- parsing - 阅读器宏与普通解析器
- c# - 使用 Azure 函数始终加密 - 尝试通过静态变量避免重复初始化
- rust - 是否有针对 Rc 或 Arc 的操作克隆基础值并将其返回给调用者?
- javascript - Whats wrong with my javascript canvas code?