首页 > 解决方案 > Azure 流分析处理/路由来自同一设备的不同大小和内容的 IoT 消息的推荐方法是什么

问题描述

首先,我们想从我们的一个设备发送(和记录)校准消息。然后,在它完全校准后,我们​​要发送常规的物联网遥测消息。

工作流程是:数据从 IoT 中心传输到 Azure 流分析,然后传输到 Azure Sql 数据库。

物联网消息的内容可能会根据设备是否正在校准而改变。为了识别设备校准,我们将该设备的名称字段更改为包含“校准”。然后,为了发送常规遥测数据,设备的名称将恢复为包含其名称。

因此,在 ASA 中,要确定消息是否为校准消息:我们检查设备名称字段的内容。

所以这个问题需要用这样的东西来解决:

if message-contents.device-name = 'calibration' then
 get these 10 fields from the message
 write them to Calibration DB
else
 get these different 5 fields from the message
 call a ML function with some of these input fields
 write result to Telemetry DB
end

我尝试编写使用 CASE 语句的 Azure 流分析代码。

WITH ALLMESSAGES AS ( 
SELECT *
FROM iothubinput2018aug21 ),

QUERY1 AS (
SELECT try_cast(deviceid as nvarchar(max)) AS DEVICE_NAME, 
       try_cast(device as nvarchar(max)) as DEVICE_MAC_ID,
       try_cast(devicenumber as nvarchar(max)) as DEVICE_UID,
       try_cast(emptymax as nvarchar(max)) as EMPTYMAX,
       try_cast(emptymin as nvarchar(max)) as EMPTYMIN,
       try_cast(emptysum as nvarchar(max)) as EMPTYSUM,
       try_cast(fullmax as nvarchar(max)) as FULLMAX,
       try_cast(fullmin as nvarchar(max)) as FULLMIN,
       try_cast(fullsum as nvarchar(max)) as FULLSUM,
       try_cast(emptyresult as nvarchar(max)) as EMPTYRESULT,
       try_cast(fullresult as nvarchar(max)) as FULLRESULT,
       try_cast(fw as nvarchar(max)) as FIRMWARE_VERSION,
       TRY_CAST(SYSTEM.TIMESTAMP AS DATETIME) AS ASA_POSTTIME,
       0 AS REC_READ
FROM ALLMESSAGES
WHERE DEVICE_NAME = 'calibration'),

QUERY2 AS (
SELECT
       try_cast(deviceid as nvarchar(max)) AS DEVICE_NAME, 
       try_cast(device as nvarchar(max)) as DEVICE_MAC_ID,
       try_cast(uid as nvarchar(max)) as DEVICE_UID,
       try_cast(weight as nvarchar(max)) as DEVICE_SENSOR_READINGS,
       TRY_CAST(SYSTEM.TIMESTAMP AS DATETIME) AS ASA_POSTTIME,
       MLFunctionBatch2019Apr10(DEVICE_MAC_ID, DEVICE_SENSOR_READINGS) AS RESULT,
       UDF.SUMOFREADINGS(DEVICE_SENSOR_READINGS) AS DEVICE_SENSOR_READINGS_SUM,
       0 AS REC_READ
FROM ALLMESSAGES
WHERE DEVICE_NAME != 'calibration')

SELECT DEVICE_NAME, 
       DEVICE_MAC_ID,
       DEVICE_UID,
       DEVICE_SENSOR_READINGS,
       ASA_POSTTIME, 
       CASE result."Scored Labels" WHEN '1' THEN 'FULL' ELSE 'EMPTY' END AS MLSVC_RESULT,
       REC_READ,
       DEVICE_SENSOR_READINGS_SUM
INTO OUT2SQLDBDEV                
FROM QUERY2
WHERE DEVICE_MAC_ID != 'calibration'

SELECT DEVICE_NAME, 
       DEVICE_MAC_ID,
       DEVICE_UID,
       EMPTYMAX,
       EMPTYMIN,
       EMPTYSUM,
       FULLMAX,
       FULLMIN,
       FULLSUM,
       EMPTYRESULT,
       FULLRESULT,
       FIRMWARE_VERSION,
       ASA_POSTTIME,
       REC_READ
INTO OUT2SQLDBCALIBRATIONDEV
FROM QUERY1
WHERE DEVICE_MAC_ID = 'calibration'

我希望输出到正确的数据库表,具体取决于设备名称是校准还是其他。这没有发生。没有输出出来。在另一次尝试中,ASA 查询失败,因为数据库字段需要 NOT NULL 值,但查询发送的是 NULL 值。

标签: azureazure-iot-hubazure-stream-analytics

解决方案


我必须对这些值进行一些合并以确保它们不是 NULL,然后查询才起作用。请看下文。

WITH ALLMESSAGES AS ( 
SELECT *
FROM iothubinput2018aug21 ),

QUERY1 AS (
SELECT
       try_cast(deviceid as nvarchar(max)) AS DEVICE_NAME, 
       try_cast(device as nvarchar(max)) as DEVICE_MAC_ID,
       try_cast(uid as nvarchar(max)) as DEVICE_UID,
       try_cast(weight as nvarchar(max)) as DEVICE_SENSOR_READINGS,
       TRY_CAST(SYSTEM.TIMESTAMP AS DATETIME) AS ASA_POSTTIME,
       '1' AS RESULT,
        UDF.SUMOFREADINGS(try_cast(weight as nvarchar(max))) AS DEVICE_SENSOR_READINGS_SUM,
       0 AS REC_READ
FROM ALLMESSAGES
WHERE COALESCE(deviceid,'NULL') != 'calibration'), 

QUERY2 AS (
SELECT try_cast(deviceid as nvarchar(max)) AS DEVICE_NAME, 
       try_cast(device as nvarchar(max)) as DEVICE_MAC_ID,
       try_cast(devicenumber as nvarchar(max)) as DEVICE_UID,
       try_cast(emptymax as nvarchar(max)) as EMPTYMAX,
       try_cast(emptymin as nvarchar(max)) as EMPTYMIN,
       try_cast(emptysum as nvarchar(max)) as EMPTYSUM,
       try_cast(fullmax as nvarchar(max)) as FULLMAX,
       try_cast(fullmin as nvarchar(max)) as FULLMIN,
       try_cast(fullsum as nvarchar(max)) as FULLSUM,
       try_cast(emptyresult as nvarchar(max)) as EMPTYRESULT,
       try_cast(fullresult as nvarchar(max)) as FULLRESULT,
       try_cast(fw as nvarchar(max)) as FIRMWARE_VERSION,
       TRY_CAST(SYSTEM.TIMESTAMP AS DATETIME) AS ASA_POSTTIME,
       0 AS REC_READ
FROM ALLMESSAGES
WHERE COALESCE(deviceid,'NULL') = 'calibration')

SELECT DEVICE_NAME, 
       DEVICE_MAC_ID,
       DEVICE_UID,
       DEVICE_SENSOR_READINGS,
       ASA_POSTTIME, 
       'FULL' AS MLSVC_RESULT,
       REC_READ,
       DEVICE_SENSOR_READINGS_SUM
INTO OUT2SQLDBDEV                
FROM QUERY1
WHERE COALESCE(DEVICE_NAME, 'NULL') != 'calibration'

SELECT DEVICE_NAME, 
       DEVICE_MAC_ID,
       DEVICE_UID,
       DEVICE_SENSOR_READINGS,
       ASA_POSTTIME, 
       'FULL' AS MLSVC_RESULT,
       REC_READ,
       DEVICE_SENSOR_READINGS_SUM
INTO OUT2BLOB2018OCT24              
FROM QUERY1
WHERE COALESCE(DEVICE_NAME, 'NULL') != 'calibration'

SELECT DEVICE_NAME, 
       DEVICE_MAC_ID,
       DEVICE_UID,
       EMPTYMAX,
       EMPTYMIN,
       EMPTYSUM,
       FULLMAX,
       FULLMIN,
       FULLSUM,
       EMPTYRESULT,
       FULLRESULT,
       FIRMWARE_VERSION,
       ASA_POSTTIME,
       REC_READ
INTO OUT2SQLDBCALIBRATIONDEV
FROM QUERY2
WHERE COALESCE(DEVICE_NAME, 'NULL') = 'calibration'

推荐阅读