apache-flink - Flink SQL 中跳跃窗口上的指数衰减移动平均值:铸造时间
问题描述
现在我们在 Flink 中有了带有花哨窗口的 SQL,我试图让衰减的移动平均线被“在未来的 Flink 版本中对于 Table API 和 SQL 都可能发生的事情”引用。来自他们的SQL 路线图/预览 2017-03 帖子:
table
.window(Slide over 1.hour every 1.second as 'w)
.groupBy('productId, 'w)
.select(
'w.end,
'productId,
('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)
这是我的尝试(也受到方解石衰变示例的启发):
SELECT
lb_index one_key,
HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,
SUM(Y *
EXP(
proctime -
HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
))
FROM write_position
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
时间是处理时间,我们通过从 AppendStream 表创建 write_position 获得的 proctime 为:
tEnv.registerTable(
"write_position",
tEnv.fromDataStream(appendStream, "lb_index, Y, proctime.proctime"))
我收到此错误:
Cannot apply '-' to arguments of type '<TIME ATTRIBUTE(PROCTIME)> - <TIME ATTRIBUTE(PROCTIME)>'.
Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>'
我已经尝试将 proctime 转换为我所知道的所有其他类型(试图达到 NUMERIC 的应许之地),但我只是找不到如何使它工作。
我错过了什么吗?proctime 是一种您无法转换的非常特殊的“系统更改编号”时间吗?如果是这样,仍然必须有某种方法将其与 HOP_START(proctime,...) 值进行比较。
解决方案
您可以使用 timestampDiff 减去两个时间点(请参阅文档)。你像这样使用它
TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)
其中 timepointunit 可以是 SECOND、MINUTE、HOUR、DAY、MONTH 或 YEAR。
我没有尝试过处理时间,但它确实适用于事件时间字段,所以希望它会。
推荐阅读
- excel - Exel 2016 用户表单文本框属性
- c++ - 为什么重叠比较总是在以下情况下评估为真
- android - 如何知道何时开始和结束服务?
- reactjs - 如何配置 Selenium 以在 Chrome 中预装扩展程序?
- javascript - 为什么当我将“3”传递给它时,我的函数只返回正确的值?
- javascript - 忽略正则表达式中的html标签
- objective-c - 存在多个键时如何更改特定的值范围?
- javascript - React 每隔几秒钟将图像移动到另一个图像上
- ios - 本机代码捕获图像在 IOS 11 上引发异常?
- c# - 如何将通用功能添加到列表并执行它们?