apache-spark - 如何在结构化流中保持和重置状态?
问题描述
我有一个要求,我需要显示从当天开始到当前时间的特定类别的汇总计数。
我正在使用 Structure Streaming 进行分组。由于窗口不保持数据帧的状态,我不确定如何实现可以保持其状态并在先前状态上增加计数器的逻辑。另外,我将如何在新的一天开始时重置状态。
输入记录:
{"Floor_Id" : "Shop Floor 1",
"HaltRecord" : {
"HaltReason" : "Danahydraulic Error",
"Severity" : "Low",
"FaultErrorCategory" : "Docked",
"NonFaultErrorCategory" : null
},
"Description" : "Forklift",
"Category" : {
"Type" : "Halt",
"End_time" : NumberLong(2018-02-13T12:00:01),
"Start_time" : NumberLong(2018-02-13T12:00:00)
},
"Asset_Id" : 123,
"isError" : "y",
"Timestamp": 2018-02-13T12:00:01}
输出响应:
{
"Floor_Id": "Shop Floor 1",
"Error_Category": [
{
"Category": "Operator Error",
"DataPoints":
{
"NumberOfErrors": 20,
"Date": 2018-02-13
}
},
{
"Category": "Danahydraulic Error",
"DataPoints": {
"NumberOfErrors": 15,
"Date": 2018-02-13
}
}
]
}
任何帮助深表感谢。
解决方案
我没有使用结构化流的状态函数,但我知道它是 mapGroupWithState 函数,它提供了保持状态和执行计数逻辑的能力。
推荐阅读
- python - 如何覆盖 Outlook 日历事件
- python - ax.add_patch() 中的错误导致坐标缩放不一致/随机
- sparql - RDF4J 的 AST 是否允许 SPARQL 查询重写?
- python - 在数据帧行上应用具有多个操作的函数的有效方法
- php - Symfony 1.4:在维护模式下清除缓存时出错
- python - 未找到来自 conda 环境的模块
- python - 对类型为文字的参数使用字符串值
- git - 你如何克服 GitLab 到 GitHub 迁移问题?
- c++ - 将spmat犰狳稀疏矩阵转换为csr格式
- python - 使用 TensorFlow 在沙盒中运行 python 3