apache-spark - Spark Structured Streaming:将行聚合成列而不加水印
问题描述
我有以下两个镶木地板模式:
users:
--------------------
|user_id | username|
|--------|---------|
| 1 | micky |
| 2 | minnie |
|__3_____|__donald_|
orders:
--------------------
|user_id | order_id|
|--------|---------|
| 1 | 411 |
| 2 | 412 |
| 3 | 413 |
| 1 | 414 |
| 2 | 415 |
| 3 | 416 |
| 3 | 471 |
| 3 | 482 |
|__3_____|__500____|
我要做的是将客户的所有订单收集到一个列中:
users_and_orders
-------------------------------------------
|user_id | username| orders |
|--------|---------|-------- --------------
| 1 | micky | [411,414] |
| 2 | minnie | [412,415] |
|__3_____|__donald_| [413,416,471,482,500]|
代码如下所示:
val users = sparkSession.readStream.option("checkpointLocation", "somelocation")
.schema(userSchema.asInstanceOf[StructType])
.format("parquet")
.load(commandLineArguments.userPath)
.distinct()
val orders = sparkSession
.read
.option("checkpointLocation", "someOtherLocation")
.schema(orderSchema.asInstanceOf[StructType])
.format("parquet")
.load(commandLineArguments.orderPath)
val userJoinOrders = users.join(orders, Seq("user_id"))
val currentDate = new SimpleDateFormat("ddMM").format(new Date())
val currentTimeFormat = new SimpleDateFormat("hhmm")
val dateFormat = new SimpleDateFormat("YYMMDD")
val systemDateTimeFormat = new SimpleDateFormat("YYYYMMDDHHmmss")
//Original data does not have watermark/timestamp
val usersWithOrders = userJoinOrders
.withColumn("timestamp", lit(unix_timestamp(date_format(current_timestamp, "yyyy-MM-dd HH-mm"),"yyyy-MM-dd HH-mm").cast("timestamp")))
.withWatermark("timestamp","10 minutes")
.groupBy("user_id","username","timestamp")
.agg(collect_set(struct($"order_id")) as "orders")
.map(account => {
//Do whatever
})
据我了解,由于我正在汇总流数据,因此需要水印。原始数据没有任何水印。所以我不得不补一张。但上述不起作用。当我运行流代码时,在输出中我看到:
Streaming query made progress: {
"id" : "5f31d025-712e-4577-b6d7-8e8d450ac926",
"runId" : "643fe17a-979a-4599-ac94-45be60136350",
"name" : null,
"timestamp" : "2019-01-21T11:07:22.922Z",
"batchId" : 0,
"numInputRows" : 856,
"processedRowsPerSecond" : 52.42207116173679,
"durationMs" : {
"addBatch" : 15222,
"getBatch" : 44,
"getOffset" : 45,
"queryPlanning" : 888,
"triggerExecution" : 16329,
"walCommit" : 51
},
"eventTime" : {
"avg" : "2019-01-21T11:07:00.000Z",
"max" : "2019-01-21T11:07:00.000Z",
"min" : "2019-01-21T11:07:00.000Z",
"watermark" : "1970-01-01T00:00:00.000Z"
},
"stateOperators" : [ {
"numRowsTotal" : 856,
"numRowsUpdated" : 856,
"memoryUsedBytes" : 805831,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 777031
}
}, {
"numRowsTotal" : 856,
"numRowsUpdated" : 856,
"memoryUsedBytes" : 659463,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 630663
}
} ],
"sources" : [ {
"description" : "FileStreamSource[file:/blah/blah/blah]",
"startOffset" : null,
"endOffset" : {
"logOffset" : 0
},
"numInputRows" : 856,
"processedRowsPerSecond" : 52.42207116173679
} ],
"sink" : {
"description" : "FileSink[/blah/blah/blah/user-orders]"
}
}
我的问题:
- 我究竟做错了什么?
- 有没有更好的方法将关联的行转换为列值而不必使用聚合/分组依据?
任何帮助将不胜感激。
解决方案
推荐阅读
- javascript - 函数在第一次运行时返回 {},然后正常工作
- vb.net - 'RPC 服务器不可用。循环浏览word文档时
- javascript - 即使标记相同,使用 JS 添加的元素与使用 HTML 编写的元素的显示方式也不同
- python - 如何将 tqdm 与 python 多处理集成
- python-3.x - 将便携式 git 与 pythongit 一起使用
- python - 如何在 Colaboratory 中加快神经网络的学习过程?
- click - 编写鼠标悬停和单击事件脚本
- python - python - 正则表达式 - 用逗号很好地分隔数组字符串
- javascript - 如何在ajax请求中发送xml标签作为参数?
- sql-server - 交叉应用以使用“节点”创建新行