azure - Azure 流分析处理/路由来自同一设备的不同大小和内容的 IoT 消息的推荐方法是什么
问题描述
首先,我们想从我们的一个设备发送(和记录)校准消息。然后,在它完全校准后,我们要发送常规的物联网遥测消息。
工作流程是:数据从 IoT 中心传输到 Azure 流分析,然后传输到 Azure Sql 数据库。
物联网消息的内容可能会根据设备是否正在校准而改变。为了识别设备校准,我们将该设备的名称字段更改为包含“校准”。然后,为了发送常规遥测数据,设备的名称将恢复为包含其名称。
因此,在 ASA 中,要确定消息是否为校准消息:我们检查设备名称字段的内容。
所以这个问题需要用这样的东西来解决:
if message-contents.device-name = 'calibration' then
get these 10 fields from the message
write them to Calibration DB
else
get these different 5 fields from the message
call a ML function with some of these input fields
write result to Telemetry DB
end
我尝试编写使用 CASE 语句的 Azure 流分析代码。
WITH ALLMESSAGES AS (
SELECT *
FROM iothubinput2018aug21 ),
QUERY1 AS (
SELECT try_cast(deviceid as nvarchar(max)) AS DEVICE_NAME,
try_cast(device as nvarchar(max)) as DEVICE_MAC_ID,
try_cast(devicenumber as nvarchar(max)) as DEVICE_UID,
try_cast(emptymax as nvarchar(max)) as EMPTYMAX,
try_cast(emptymin as nvarchar(max)) as EMPTYMIN,
try_cast(emptysum as nvarchar(max)) as EMPTYSUM,
try_cast(fullmax as nvarchar(max)) as FULLMAX,
try_cast(fullmin as nvarchar(max)) as FULLMIN,
try_cast(fullsum as nvarchar(max)) as FULLSUM,
try_cast(emptyresult as nvarchar(max)) as EMPTYRESULT,
try_cast(fullresult as nvarchar(max)) as FULLRESULT,
try_cast(fw as nvarchar(max)) as FIRMWARE_VERSION,
TRY_CAST(SYSTEM.TIMESTAMP AS DATETIME) AS ASA_POSTTIME,
0 AS REC_READ
FROM ALLMESSAGES
WHERE DEVICE_NAME = 'calibration'),
QUERY2 AS (
SELECT
try_cast(deviceid as nvarchar(max)) AS DEVICE_NAME,
try_cast(device as nvarchar(max)) as DEVICE_MAC_ID,
try_cast(uid as nvarchar(max)) as DEVICE_UID,
try_cast(weight as nvarchar(max)) as DEVICE_SENSOR_READINGS,
TRY_CAST(SYSTEM.TIMESTAMP AS DATETIME) AS ASA_POSTTIME,
MLFunctionBatch2019Apr10(DEVICE_MAC_ID, DEVICE_SENSOR_READINGS) AS RESULT,
UDF.SUMOFREADINGS(DEVICE_SENSOR_READINGS) AS DEVICE_SENSOR_READINGS_SUM,
0 AS REC_READ
FROM ALLMESSAGES
WHERE DEVICE_NAME != 'calibration')
SELECT DEVICE_NAME,
DEVICE_MAC_ID,
DEVICE_UID,
DEVICE_SENSOR_READINGS,
ASA_POSTTIME,
CASE result."Scored Labels" WHEN '1' THEN 'FULL' ELSE 'EMPTY' END AS MLSVC_RESULT,
REC_READ,
DEVICE_SENSOR_READINGS_SUM
INTO OUT2SQLDBDEV
FROM QUERY2
WHERE DEVICE_MAC_ID != 'calibration'
SELECT DEVICE_NAME,
DEVICE_MAC_ID,
DEVICE_UID,
EMPTYMAX,
EMPTYMIN,
EMPTYSUM,
FULLMAX,
FULLMIN,
FULLSUM,
EMPTYRESULT,
FULLRESULT,
FIRMWARE_VERSION,
ASA_POSTTIME,
REC_READ
INTO OUT2SQLDBCALIBRATIONDEV
FROM QUERY1
WHERE DEVICE_MAC_ID = 'calibration'
我希望输出到正确的数据库表,具体取决于设备名称是校准还是其他。这没有发生。没有输出出来。在另一次尝试中,ASA 查询失败,因为数据库字段需要 NOT NULL 值,但查询发送的是 NULL 值。
解决方案
我必须对这些值进行一些合并以确保它们不是 NULL,然后查询才起作用。请看下文。
WITH ALLMESSAGES AS (
SELECT *
FROM iothubinput2018aug21 ),
QUERY1 AS (
SELECT
try_cast(deviceid as nvarchar(max)) AS DEVICE_NAME,
try_cast(device as nvarchar(max)) as DEVICE_MAC_ID,
try_cast(uid as nvarchar(max)) as DEVICE_UID,
try_cast(weight as nvarchar(max)) as DEVICE_SENSOR_READINGS,
TRY_CAST(SYSTEM.TIMESTAMP AS DATETIME) AS ASA_POSTTIME,
'1' AS RESULT,
UDF.SUMOFREADINGS(try_cast(weight as nvarchar(max))) AS DEVICE_SENSOR_READINGS_SUM,
0 AS REC_READ
FROM ALLMESSAGES
WHERE COALESCE(deviceid,'NULL') != 'calibration'),
QUERY2 AS (
SELECT try_cast(deviceid as nvarchar(max)) AS DEVICE_NAME,
try_cast(device as nvarchar(max)) as DEVICE_MAC_ID,
try_cast(devicenumber as nvarchar(max)) as DEVICE_UID,
try_cast(emptymax as nvarchar(max)) as EMPTYMAX,
try_cast(emptymin as nvarchar(max)) as EMPTYMIN,
try_cast(emptysum as nvarchar(max)) as EMPTYSUM,
try_cast(fullmax as nvarchar(max)) as FULLMAX,
try_cast(fullmin as nvarchar(max)) as FULLMIN,
try_cast(fullsum as nvarchar(max)) as FULLSUM,
try_cast(emptyresult as nvarchar(max)) as EMPTYRESULT,
try_cast(fullresult as nvarchar(max)) as FULLRESULT,
try_cast(fw as nvarchar(max)) as FIRMWARE_VERSION,
TRY_CAST(SYSTEM.TIMESTAMP AS DATETIME) AS ASA_POSTTIME,
0 AS REC_READ
FROM ALLMESSAGES
WHERE COALESCE(deviceid,'NULL') = 'calibration')
SELECT DEVICE_NAME,
DEVICE_MAC_ID,
DEVICE_UID,
DEVICE_SENSOR_READINGS,
ASA_POSTTIME,
'FULL' AS MLSVC_RESULT,
REC_READ,
DEVICE_SENSOR_READINGS_SUM
INTO OUT2SQLDBDEV
FROM QUERY1
WHERE COALESCE(DEVICE_NAME, 'NULL') != 'calibration'
SELECT DEVICE_NAME,
DEVICE_MAC_ID,
DEVICE_UID,
DEVICE_SENSOR_READINGS,
ASA_POSTTIME,
'FULL' AS MLSVC_RESULT,
REC_READ,
DEVICE_SENSOR_READINGS_SUM
INTO OUT2BLOB2018OCT24
FROM QUERY1
WHERE COALESCE(DEVICE_NAME, 'NULL') != 'calibration'
SELECT DEVICE_NAME,
DEVICE_MAC_ID,
DEVICE_UID,
EMPTYMAX,
EMPTYMIN,
EMPTYSUM,
FULLMAX,
FULLMIN,
FULLSUM,
EMPTYRESULT,
FULLRESULT,
FIRMWARE_VERSION,
ASA_POSTTIME,
REC_READ
INTO OUT2SQLDBCALIBRATIONDEV
FROM QUERY2
WHERE COALESCE(DEVICE_NAME, 'NULL') = 'calibration'
推荐阅读
- python - AWS Lambda 记录到一个 JSON 行
- powershell - Powershell MessageBox 将不需要的数据添加到我的变量中
- sql - 查询 NULL 数、MinValue、MaxValue 和 AvgValue 没有返回结果(不正确)
- mysql - 使用 mysql 的 GROUP BY 自定义奇数列的输出
- javascript - :: 一次绑定的表示法不适用于 AnguarJS 1.6.4
- java - 控制器中的 Spring Boot 覆盖功能
- angular - 在 Angular CLI 项目中找不到源文件
- javascript - How to load external script with optimized require.js?
- spring-boot - 在 Kotlin 中使用 Jpa 注释从基类继承父属性
- java - Spring Integration DSL ScatterGather 流块