apache-spark - StreamingQueryException:文本数据源仅支持单列
问题描述
我知道这个问题之前已经被问过很多次了,但没有一个答案对我有帮助。
下面是我的火花代码
class ParseLogs extends java.io.Serializable {
def formLogLine(logLine: String): (String,String,String,Int,String,String,String,Int,Float,String,String,Flo at,Int,String,Int,Float,String)={
//some logic
//return value
(recordKey._2.toString().replace("\"", ""),recordKey._3,recordKey._4,recordKey._5,recordKey._6,recordKey._8,sbcId,recordKey._10,recordKey._11,recordKey._12,recordKey._13.trim(),LogTransferTime,contentAccessed,OTT,dataTypeId,recordKey._14,logCaptureTime1)
}
}
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
val myDf = inputDf.selectExpr("CAST(value AS STRING)")
val df1 = myDf.map(line => new ParseLogs().formLogLine(line.get(0).toString()))
我得到以下错误
User class threw exception: org.apache.spark.sql.streaming.StreamingQueryException: Text data source supports only a single column, and you have 17 columns.;
解决方案
使用 UDF 将 logLine 转换为您想要的。例如:
spark.sqlContext.udf.register("YOURLOGIC", (logLine: String) => {
//some logic
(recordKey._2.toString().replace("\"",""),recordKey._3,recordKey._4,recordKey._5,recordKey._6,recordKey._8,sbcId,recordKey._10,recordKey._11,recordKey._12,recordKey._13.trim(),LogTransferTime,contentAccessed,OTT,dataTypeId,recordKey._14,logCaptureTime1)
})
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
val myDf = inputDf.selectExpr("CAST(value AS STRING)")
val df1 = myDf.selectExpr("YOURLOGIC(value) as result")
val result = df1.select(
df1("result").getItem(0),
df1("result").getItem(1),
df1("result").getItem(2)),
df1("result").getItem(3)),
...if you have 17 item,then add to 17
df1("result").getItem(17))
推荐阅读
- jdbc - Ignite 2.11.0 无法使用 SqlFieldsQuery 和 DBeaver 使用 JDBC 或 REST 查看数据
- javascript - 如何同时为多个进度条设置动画?
- configuration - HAproxy - 后端何时抛出 50 倍错误?
- r - 使用 dplyr、group_by 与 mutate() 或 summarise() & str_c() 或 paste() & 折叠连接字符串/行,但保持 NA & 所有字符串
- c - C中的链表(中间插入)
- python - 在数据框中排序 CIDR,检查每个元素的文字等价和重叠
- spring-boot - thymleaf + spring-boot Validaiton 错误未显示在 html 上
- python - 如果后面没有另一个字符串,则删除字符串
- ms-access - 这些 MS Access 图标中的箭头是什么意思?
- android - 由于 startActivityForResult() 已被贬值。如何使用 kotlin 从 android 中的图库中获取图像?