javascript - 如何使用流分析存储传入事件的 UDF 返回值?
问题描述
我需要将下面的 C# 代码翻译成 Azure 流分析可以使用的东西。
我有一个类似于以下内容的 C# 应用程序:
var inputEvents = new List<Event>();
foreach (var file in files){
(List<Event> events, DateTime maxDate) = ProcessEvents(file, inputEvents);
inputEvents = events.Where(e => e.Duration == null).ToList();
}
ProcessEvents() 将 inputEvents 传递给其他辅助方法的位置
我需要使用流分析来实现整个代码。var 文件部分是通过使用 Collect() 收集一堆事件来实现的。每个批次都发送到充当ProcessEvents()的 UDF 。但是,ProcessEvents() 返回下一次迭代所需的其他事件。由于 UDF 是无状态的,因此下一批将无法使用上一批返回的事件。
如何在流分析中重写上面的 C# 代码?
我尝试了以下方法:
- 使用 UDA 存储返回的事件。失败,因为由于某种原因它无法存储 JSON 数组并不断修改它。
- 使用参考数据输入来存储返回的事件。失败,因为它们只能在使用 JOIN 的流分析中使用,而不能在 UDF 中使用。
流分析 T-SQL 查询:
WITH eventsCollection AS (SELECT COLLECT() AS allEvents
FROM EventHubStreamMessage
GROUP BY SessionWindow(minute,2,4)),
step1 AS (
SELECT UDF.SampleProcessEvents(allEvents) as Source
FROM eventsCollection
)
SELECT *
INTO [StorageTable]
FROM step1
流分析 UDF 代码(短版):
function main(allEvents){
allEvents = JSON.stringify(allEvents);
var inputEvents = new Array ();
return processEvents(JSON.parse(allEvents), inputEvents)
}
function processEvents(allEvents, inputEvents){
for (i=0l i<allEvents.length; i++){
if (allEvents[i].Event =="ON"){
powerOn(inputEvents);
}
}
}
function powerOn(inputEvents){
return true;
}
解决方案
推荐阅读
- typescript - 在 Typesript 中创建 FixedUpdate
- java - 如何在 Java Spark 中读取 avro 文件作为对象列表
- html - 如何使用 watir 和 children / xpath 来单击按钮?
- android - 无法让 expo 在 android 模拟器 (macOS) 上运行
- java - 数组越界异常,Mac上的JTable但不是windows?
- php - 如何从 bash 文件运行 Laravel 项目?
- postgresql - PostgreSQL,Postgis 几何字段
- javascript - 无法使用 JQuery 显示隐藏元素
- tensorflow - TensorFlow js VS TensorFlow 精简版
- android - 为什么Android在添加新片段时会调用片段的onDestroyView?