首页 > 解决方案 > 在 Siddhi 中相互比较长度批次的数据

问题描述

我在下面定义了输入流。Datetime 字符串是 like 2010-09-01 06:59:00.000,结果是 double like 157,382,而 UnixDateTime 的类型是 long like 1283324340111

define stream HStream(ID int, DateTime String, Result double, UnixDateTime long);

我想为显示result列平均值的 100 个事件制作长度批次,并且我想将这些批次相互比较。我想对接下来的 5 个批次(每个批次都包含 100 个事件)进行滑动比较。所以我想比较第一批(0-100 事件)和第二批(101-200),直到第六批(501-600)。我希望第二批比较到第七批。我想通过比较实现的是,当 4 个或更多(来自 5 个)批次的批次平均结果全部大于或全部小于 1(与原始批次的平均结果相比)时,我想记录有关原始批次的信息。

我的代码如下。问题我不知道确切的语法。我查看了 WSO2 和 Siddhi 的教程和文档,但我无法解决问题。

@info(name = 'MovingAverageQuery')
from every e1=HStream, e2=HStream[e1.avg(Result) <= avg(Result))+, e2=HStream[e2[last].avg(Result) <= avg(Result)]
select ID, DateTime, Result, 
avg(Result), UnixDateTime
output last every 100 events
insert into OutputStream;

@sink(type='log', prefix='LOGGER')
define stream OutputStream(Nr ID, DateTime String, Result double, Avg double, UnixDateTime long);

标签: wso2siddhistream-analytics

解决方案


您必须对需求使用两个查询,一个是计算平均值(Average100Query),另一个是比较平均值(IdentifyIncreaseingTrend)。

@App:name("AverageSequence")
@App:description("Identify the average increase trend")

define stream HStream(ID int, DateTime String, Result double, UnixDateTime long);

@sink(type='log', prefix='LOGGER')
define stream OutputStream(ID int, DateTime String, avgResult double, UnixDateTime long);

@info(name = 'Average100Query')
from HStream#window.lengthBatch(100)
select ID, DateTime, avg(Result) as avgResult, UnixDateTime 
insert into AverageStream;

@info(name='IdentifyIncreaseingTrend')
from every e1=AverageStream, e2=AverageStream[e2.avgResult >= (e1.avgResult + 1)],  e3=AverageStream[e3.avgResult >= (e2.avgResult + 1)],  e4=AverageStream[e4.avgResult >= (e3.avgResult + 1)], e5=AverageStream[e5.avgResult >= (e4.avgResult + 1)]
select e1.ID, e1.DateTime, e1.avgResult, e1.UnixDateTime 
insert into OutputStream;

我注意到的一些语法问题是在计算完成后,例如 sum(result) 你必须使用as关键字将该属性命名为sum(result) as totalResult. 在序列中,您不能使用平均函数,因为它需要为多个事件完成,但您可以使用重命名的属性totalResult


推荐阅读