apache-spark - Spark结构化流中嵌套json对象的列数据
问题描述
在我们的应用程序中,我们使用 Spark sql 将字段值作为列获取。我试图弄清楚如何将列值放入嵌套的 json 对象并推送到 Elasticsearch。还有一种方法可以将值参数化selectExpr
以传递给正则表达式吗?
我们目前正在使用 Spark Java API。
Dataset<Row> data = rowExtracted.selectExpr("split(value,\"[|]\")[0] as channelId",
"split(value,\"[|]\")[1] as country",
"split(value,\"[|]\")[2] as product",
"split(value,\"[|]\")[3] as sourceId",
"split(value,\"[|]\")[4] as systemId",
"split(value,\"[|]\")[5] as destinationId",
"split(value,\"[|]\")[6] as batchId",
"split(value,\"[|]\")[7] as orgId",
"split(value,\"[|]\")[8] as businessId",
"split(value,\"[|]\")[9] as orgAccountId",
"split(value,\"[|]\")[10] as orgBankCode",
"split(value,\"[|]\")[11] as beneAccountId",
"split(value,\"[|]\")[12] as beneBankId",
"split(value,\"[|]\")[13] as currencyCode",
"split(value,\"[|]\")[14] as amount",
"split(value,\"[|]\")[15] as processingDate",
"split(value,\"[|]\")[16] as status",
"split(value,\"[|]\")[17] as rejectCode",
"split(value,\"[|]\")[18] as stageId",
"split(value,\"[|]\")[19] as stageStatus",
"split(value,\"[|]\")[20] as stageUpdatedTime",
"split(value,\"[|]\")[21] as receivedTime",
"split(value,\"[|]\")[22] as sendTime"
);
StreamingQuery query = data.writeStream()
.outputMode(OutputMode.Append()).format("es").option("checkpointLocation", "C:\\checkpoint")
.start("spark_index/doc")
实际输出:
{
"_index": "spark_index",
"_type": "doc",
"_id": "test123",
"_version": 1,
"_score": 1,
"_source": {
"channelId": "test",
"country": "SG",
"product": "test",
"sourceId": "",
"systemId": "test123",
"destinationId": "",
"batchId": "",
"orgId": "test",
"businessId": "test",
"orgAccountId": "test",
"orgBankCode": "",
"beneAccountId": "test",
"beneBankId": "test",
"currencyCode": "SGD",
"amount": "53.0000",
"processingDate": "",
"status": "Pending",
"rejectCode": "test",
"stageId": "123",
"stageStatus": "Comment",
"stageUpdatedTime": "2019-08-05 18:11:05.999000",
"receivedTime": "2019-08-05 18:10:12.701000",
"sendTime": "2019-08-05 18:11:06.003000"
}
}
我们需要节点“txn_summary”下的上述列,例如以下json:
预期输出:
{
"_index": "spark_index",
"_type": "doc",
"_id": "test123",
"_version": 1,
"_score": 1,
"_source": {
"txn_summary": {
"channelId": "test",
"country": "SG",
"product": "test",
"sourceId": "",
"systemId": "test123",
"destinationId": "",
"batchId": "",
"orgId": "test",
"businessId": "test",
"orgAccountId": "test",
"orgBankCode": "",
"beneAccountId": "test",
"beneBankId": "test",
"currencyCode": "SGD",
"amount": "53.0000",
"processingDate": "",
"status": "Pending",
"rejectCode": "test",
"stageId": "123",
"stageStatus": "Comment",
"stageUpdatedTime": "2019-08-05 18:11:05.999000",
"receivedTime": "2019-08-05 18:10:12.701000",
"sendTime": "2019-08-05 18:11:06.003000"
}
}
}
解决方案
将所有列添加到顶级结构应该会产生预期的输出。在斯卡拉:
import org.apache.spark.sql.functions._
data.select(struct(data.columns:_*).as("txn_summary"))
在Java中,我怀疑它会是:
import org.apache.spark.sql.functions.struct;
data.select(struct(data.columns()).as("txn_summary"));
推荐阅读
- javascript - 多个元素的动态Javascript代码
- mongodb - 如何根据client.cer和CA_Intermediate.txt创建keystore,jks和trustore,jks连接MongoDB数据库
- spring-integration - 动态集成流注册导致性能下降
- scala - 具有未知循环数的 Scala 未来
- javascript - 通过 google-closure-compiler jar 更改输出语言规范时出现“ReferenceError: $jscomp is not defined”
- python - 使用python从多个文本文件中提取特定数据到excel文件
- python - 一个函数接受 1 个位置参数,但给出了 133 个
- elasticsearch - 如何在弹性搜索查询中使用通配符来跳过一些前缀值
- amazon-web-services - 用于创建 EC2 窗口(使用 AMI)、运行 my_command.bat、每月处理和终止 EC2 的 AWS 架构
- jquerydatetimepicker - Jquery Datetimepicker - 禁用当天已经过去的时间