azure - 从 Azure 流分析中的 EventHub 中删除重复项
问题描述
我创建了一个 Azure 流分析作业,它将从 EventHub 获取输入数据并写入 cosmosDB 和 Blob。
我有时会看到来自 eventHub 的数据是重复的,因此重复的数据将被写入 cosmosDB 和 Blob 存储。
下面显示了从 EventHub 到流分析的示例输入数据。
[
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"04XXX",
"id":1
},
{
"sig3":"000000",
"id":61
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00026XXX03",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"000000",
"id":61
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]
在上面的示例中,idnum: 00086XXX02的事件重复了 3 次。
我正在做以下分析并获得重复的输出。
temp AS (
SELECT
input.idnum AS IDNUM,
input.basetime AS BASETIME,
input.time AS TIME,
ROUND(input.sig1,5) AS SIG1,
flatArrayElement as SIG2,
udf.sgnlArrayMap(input.signals, input.basetime) AS SGNL //UDF to process the signals in input
FROM [input01] as input
CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
WHERE GetArrayLength(input.sig2) >=1
),
SIGNALS AS (
SELECT * FROM temp T JOIN master M ON T.SIG2.ArrayValue.sig3 = M.sig3
)
--Insert SIG2 to COSMOS Container
SELECT
t.IDNUM,
t.BASETIME,
t.TIME,
t.SIG1,
t.SIG2.ArrayValue.id AS ID,
t.SIG2.ArrayValue.sig3 AS SIG3,
t.SGNL
INTO [CosmosTbl]
FROM SIGNALS PARTITION BY PartitionId
输出将如下所示,其中“idnum”存在重复事件:“00086XXX02”
[
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"000000",
"id":61
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]
预期的输出将是没有重复的事件(对于提供的示例,“idnum”:“00086XXX02”不应该有重复的事件)
在将数据写入存储之前,我想删除重复的事件。可以从流分析中进行吗?
创建具有唯一 ID 的 cosmos DB 集合是 Cosmos 端的解决方案,但是这里的表已经存在,我们可以从流分析端做任何事情吗?
解决方案
您可以使用 Distinct 删除重复的事件。有可用的在线文档: https ://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-stream-analytics-query-patterns#remove-duplicate-events-in-a-window
例子:
With Temp AS (
SELECT
COUNT(DISTINCT Time) AS CountTime,
Value,
DeviceId
FROM Input TIMESTAMP BY Time
GROUP BY Value, DeviceId, SYSTEM.TIMESTAMP()
)
SELECT
AVG(Value) AS AverageValue,
DeviceId
INTO Output
FROM Temp
GROUP BY DeviceId,TumblingWindow(minute, 5)
推荐阅读
- node.js - 使用 mongoose 库存储在 mongoDB 中
- node.js - 无法使用 sequelize 创建异步函数
- robotframework-swinglibrary - 即使在脚本中也找不到在单独的线程中运行关键字关键字
- reactjs - 通过反应应用程序中的方法调用显示模式
- android - 如何读取和写入导入的预填充 SQLite 数据库
- google-chrome - Chrome 浏览器 HAR 文件不完整
- string - 基于两个不同列之间的匹配子字符串合并数据帧
- mongodb - 现在 Mongoose 编译后无法覆盖模型
- python - 有没有办法在 python 中使用颜色曲线来操作图像?
- python - Numpy 使用 2D 数组作为 3D 数组的类似高度图的索引