首页 > 解决方案 > 从 Azure 流分析中的 EventHub 中删除重复项

问题描述

我创建了一个 Azure 流分析作业,它将从 EventHub 获取输入数据并写入 cosmosDB 和 Blob。

我有时会看到来自 eventHub 的数据是重复的,因此重复的数据将被写入 cosmosDB 和 Blob 存储。

下面显示了从 EventHub 到流分析的示例输入数据。

[
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"000000",
                                             "id":61
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00026XXX03",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"000000",
                                             "id":61
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]

在上面的示例中,idnum: 00086XXX02的事件重复了 3 次。

我正在做以下分析并获得重复的输出。

temp AS (
    SELECT
        input.idnum AS IDNUM,
        input.basetime AS BASETIME,
        input.time AS TIME,
        ROUND(input.sig1,5) AS SIG1,
        flatArrayElement as SIG2,
        udf.sgnlArrayMap(input.signals, input.basetime) AS SGNL //UDF to process the signals in input
    FROM [input01] as input
    CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
    WHERE GetArrayLength(input.sig2) >=1
 ),
SIGNALS AS (
  SELECT * FROM temp T JOIN master M ON T.SIG2.ArrayValue.sig3 = M.sig3 
)

--Insert SIG2 to COSMOS Container
SELECT 
    t.IDNUM,
    t.BASETIME,
    t.TIME,
    t.SIG1,
    t.SIG2.ArrayValue.id AS ID,
    t.SIG2.ArrayValue.sig3 AS SIG3,
    t.SGNL
INTO [CosmosTbl]
FROM SIGNALS PARTITION BY PartitionId

输出将如下所示,其中“idnum”存在重复事件:“00086XXX02”

[
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"000000",
               "id":61
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},                           
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]

预期的输出将是没有重复的事件(对于提供的示例,“idnum”:“00086XXX02”不应该有重复的事件)

在将数据写入存储之前,我想删除重复的事件。可以从流分析中进行吗?

创建具有唯一 ID 的 cosmos DB 集合是 Cosmos 端的解决方案,但是这里的表已经存在,我们可以从流分析端做任何事情吗?

标签: azureazure-cosmosdbazure-stream-analyticsstream-analytics

解决方案


您可以使用 Distinct 删除重复的事件。有可用的在线文档: https ://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-stream-analytics-query-patterns#remove-duplicate-events-in-a-window

例子:

With Temp AS ( 
SELECT      
  COUNT(DISTINCT Time) AS CountTime,    
  Value,    
  DeviceId  
FROM   Input TIMESTAMP BY Time  
GROUP BY   Value,  DeviceId,   SYSTEM.TIMESTAMP() 
)  
SELECT  
  AVG(Value) AS AverageValue,  
  DeviceId  
INTO Output  
FROM Temp  
GROUP BY DeviceId,TumblingWindow(minute, 5) 

推荐阅读