user-defined-functions - 创建需要 STREAM 并提供 BATCH (Python) 的 Kapacitor UDF
问题描述
我在制作需要 STREAM 并提供 BATCH 的 UDF 时遇到了麻烦。
这边走:
def info(self):
response = udf_pb2.Response()
response.info.wants = udf_pb2.STREAM
response.info.provides = udf_pb2.BATCH
response.info.options['field'].valueTypes.append(udf_pb2.STRING)
return response
有人有示例代码吗?我在网上搜索(论坛、文档),但所有示例都是针对 BATCH-BACH、STREAM-STREAM 或 BATCH-STREAM。
我在示例中看到,在编写对 Kapacitor 的响应时,在“end_batch(self,end_req)”方法中,有必要“传达”BATCH 已经结束,在一个示例中,这是这样制作的:
def end_batch(self, end_req):
# Send begin batch with count of outliers
self._begin_response.begin.size = len(self._batch)
self._agent.write_response(self._begin_response)
response = udf_pb2.Response()
...
# Send an identical end batch back to Kapacitor
# HERE
response.end.CopyFrom(end_req)
self._agent.write_response(response)
为了发送 BATCH,我必须从“point(self,point)”方法发送它,但无法访问 end_req 对象并且不知道如何创建一个。
提前致谢!再见!
解决方案
希望这仍然是相关的,我会制作一个 STREAM-STREAM UDF 并将其通过管道传输到一个窗口节点中。您可以保留数据窗口的副本,就像在他们的移动平均示例中一样,并对其进行任何批量分析。如果您知道如何编写 STREAM-BATCH UDF,我很乐意看到它,但比我的答案更难看。
编辑
jdv 绝对是正确的,我的最后一个答案肯定是更多的评论。这是 python 中的STREAM-BATCH UDF,它只是回显批量流中传入的数据。它仍然有点破,因为它无法序列化处理程序快照方法中的点类。因此,每当它需要拍摄快照时,它就会崩溃,可以通过使用不同的序列化方法(如酸洗)或通过为点编写 JSON 编码器/解码器来解决。我会在某个时候解决这个问题,但我的工作周快结束了。制作 STREAM-BATCH UDF 需要做的主要事情是构造批处理开始和结束消息,这分别在 createEndBatch 和 createStartBatch 方法中完成。
编辑 2
通过使用 protobufs 方法和 json 的组合来修复序列化。
推荐阅读
- spring - 如何在新事务中保存实体?
- c - Scala <-> C 和 Scala <-> Rust 互操作性。如何?
- flutter - 从小部件列表中删除小部件 [Flutter]
- sql - 计算字段后返回值不正确
- c - 如何使用 H5LTget_attribute_string 函数?
- elasticsearch - 如何使用正则表达式在elasticsearch中通过match_phrase查询搜索子对象的值字段
- powershell - MD5-Checksum hashing with powershell for a whole directory
- orm - RepoDb 找不到映射配置
- c# - 数据合同名称未应用于响应
- java - 使用 ConfigurationProperties 加载的嵌套地图的键以其各自的索引为前缀