首页 > 解决方案 > 流分析 - 如何处理参考输入中的 json

问题描述

我有一个 Azure 流分析 (ASA) 作业,它处理来自事件中心的设备遥测数据。流应该与来自 sql 表的参考数据连接,以使用额外的设备元数据增强每条消息。合并的条目应存储在 CosmosDb 中。

为设备元数据提供服务的 sql 数据库:

CREATE TABLE [dbo].[MyTable]
(
  [DeviceId] NVARCHAR(20) NOT NULL PRIMARY KEY, 
  [MetaData] NVARCHAR(MAX) NULL   /* this stores json, which can vary per record */
)

在 ASA 中,我使用一个简单的查询配置了参考数据输入:

SELECT DeviceId, JSON_QUERY(MetaData) FROM [dbo].[MyTable]

我有执行连接的主要 ASA 查询:

WITH temptable AS (
SELECT * FROM [telemetry-input] TD PARTITION BY PartitionId
LEFT OUTER JOIN [metadata-input] MD
ON TD.DeviceId = MD.DeviceId
)

SELECT TD.*, MD.MetaData 
INTO [cosmos-db-output] 
FROM temptable PARTITION BY PartitionId

这一切正常,合并的数据存储在 CosmosDb 中。但是,来自 sql 的 Metadata 列的值被视为字符串,并使用引号和转义字符存储在 comos 中。例子:

{ "DeviceId" : "abc1234", … , "MetaData" : "{ \"TestKey\": \"test value\" }" };

有没有办法将元数据中的 json 存储为适当的 Json 对象,即

{ "DeviceId" : "abc1234", … , "MetaData" : { "TestKey": "test value" } };

标签: azure-sql-databaseazure-cosmosdbazure-stream-analyticsstream-analytics

解决方案


我找到了在 ASA 中实现它的方法 - 你需要创建javascript 用户函数

function parseJson(strjson){
          return JSON.parse(strjson);
}

并在您的查询中调用它:

...
SELECT TD.*, udf.parseJson(MD.MetaData)
...

推荐阅读