首页 > 解决方案 > 如何使用流分析存储传入事件的 UDF 返回值?

问题描述

我需要将下面的 C# 代码翻译成 Azure 流分析可以使用的东西。

我有一个类似于以下内容的 C# 应用程序:

var inputEvents = new List<Event>();
foreach (var file in files){
    (List<Event> events, DateTime maxDate) = ProcessEvents(file, inputEvents);
    inputEvents = events.Where(e => e.Duration == null).ToList();
}

ProcessEvents() 将 inputEvents 传递给其他辅助方法的位置

我需要使用流分析来实现整个代码。var 文件部分是通过使用 Collect() 收集一堆事件来实现的。每个批次都发送到充当ProcessEvents()的 UDF 。但是,ProcessEvents() 返回下一次迭代所需的其他事件。由于 UDF 是无状态的,因此下一批将无法使用上一批返回的事件。

如何在流分析中重写上面的 C# 代码?

我尝试了以下方法:

流分析 T-SQL 查询:

WITH eventsCollection AS (SELECT COLLECT() AS allEvents
FROM EventHubStreamMessage
GROUP BY SessionWindow(minute,2,4)),

step1 AS (
    SELECT UDF.SampleProcessEvents(allEvents) as Source
    FROM eventsCollection
)

SELECT *
INTO [StorageTable]
FROM step1 

流分析 UDF 代码(短版):

function main(allEvents){
    allEvents = JSON.stringify(allEvents);
    var inputEvents = new Array ();

    return processEvents(JSON.parse(allEvents), inputEvents)
}

function processEvents(allEvents, inputEvents){
    for (i=0l i<allEvents.length; i++){
        if (allEvents[i].Event =="ON"){
            powerOn(inputEvents);
        }
    }
}

function powerOn(inputEvents){
    return true;
}

标签: javascriptc#user-defined-functionsazure-stream-analyticsstream-analytics

解决方案


推荐阅读