首页 > 解决方案 > Apache Flink 中的有状态函数

问题描述

我研究了 Apache Flink 的新 Stateful Functions 2.0 API。我阅读了以下文档链接https://ci.apache.org/projects/flink/flink-statefun-docs-stable/。我还在 Git 存储库中运行了示例。(https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples)关于实施我有几个问题。

https://flink.apache.org/stateful-functions.html --> 页面末尾有一个例子是欺诈检测的交易评分。

第一个问题是关于状态 TTL。我怎样才能把状态交给 TTL?示例说:30 天后,“欺诈计数”功能将收到一条过期消息(来自自身)并清除其状态。我应该做这本手册还是有其他功能?我该如何做这本手册?

关于keyedstream的第二个问题。示例说:将存在多个“欺诈计数”实例 - 例如,每个客户帐户一个。我应该把价值观PersistedTable<K,V>吗?例如<customerid,count>. 我可以清除特定键的状态吗?

最后一个问题是关于窗口和水印。如何将这些功能实现到 Stateful Functions 2.0?

标签: stateapache-flinkflink-streamingstream-processingflink-statefun

解决方案


第一个问题是关于状态 TTL。我怎样才能把状态交给 TTL?示例说:30 天后,“欺诈计数”功能将收到一条过期消息(来自自身)并清除其状态。我应该手动执行此操作还是有其他功能?我该如何做这本手册?

您可以使用延迟消息手动执行此操作。实际上,您可以通过向自己发送延迟消息来创建回调触发器。此消息是持久的,并且在失败的情况下不会丢失。如果您查看模型服务示例中的欺诈计数函数,您会发现它正是这样做的。当收到一个值时,将发送一条 ttl 消息,延迟 30 天。当收到该消息时,计数会递减。

关于keyedstream的第二个问题。示例说:将存在多个“欺诈计数”实例 - 例如,每个客户帐户一个。我应该将值放入 PersistedTable 吗?例如 。我可以清除特定键的状态吗?

所有函数实例都是“键控”的,因为用户代码总是在键的范围内调用,并且所有 Persisted 字段的范围都限定为该键。关键是地址的“id”部分。在您的示例中,您可以有一个函数“CustomerFunction”来跟踪您的业务中每个客户的信息。当您想与该客户交互时,您将向其发送消息,指定客户 uid 作为地址的“id”。

new Address(new FunctionType("ns", "customer"), "customer-id-1");

如果您要跟踪每个客户的计数,则只需要一个 PersistedValue,因为它已经限定为该客户 ID。回到欺诈计数示例,该功能的范围是“账户 ID”,它跟踪每个银行账户的欺诈交易数量。

最后一个问题是关于窗口和水印。如何将这些功能实现到 Stateful Functions 2.0?

statefun 2.0 不直接支持这些功能。windows的原因是它们主要适用于数据处理,而不是应用程序开发。对于这些用例,使用 Flink 的 DataStream 和 Table API 可能会更好,尽管可以在用户代码中自己实现它们。

活动时间很棘手。事件时间在后台使用“水印”来跟踪系统内的时间进程。它们依赖于与水印相关的有序数据。这意味着如果事件 x 在 2:00 的水印前以 1:59 的时间戳被摄取,它必须始终保持在该水印的前面。否则,该准时记录将被错误地标记为迟到。

有状态函数基于迭代和任意消息传递。因为记录可以通过数据流向任何方向移动,所以事件时间没有很好地定义。


推荐阅读