首页 > 解决方案 > Apache Flink 中的有状态函数容错消息分发

问题描述

我正在尝试使用 apache flink 有状态函数来实现消息传递场景。

按照设计,我需要从传入消息中计算一些统计数据并将它们存储在状态中。之后,场景函数将访问这些状态和消息并在它们上运行业务规则。但是我们每条消息可能有几十个场景,每个场景都应该只运行一次。

代码或多或少如下

@Override
    public void configure(MatchBinder binder) {
        binder
            .predicate(Transaction.class,this::updateTransactionStatAndSendToScenatioManager)
}

    private void updateTransactionStatAndSendToScenatioManager(Context context, Transaction transaction){
        // state update
        context.send(FnScenarioManager.TYPE,  String.valueOf(transaction.id()) , transaction);
    }

FnScenarioManager:

@Override
    public void configure(MatchBinder binder) {
    binder
        .predicate(Transaction.class,this::runTransactionScenarios);
}


private void runTransactionScenarios(Context context, Transaction transaction){
   context.send(Scenario1.TYPE,String.valueOf(transaction.id()),transaction);
   context.send(Scenario2.TYPE,String.valueOf(transaction.id()),transaction);
   context.send(Scenario3.TYPE,String.valueOf(transaction.id()),transaction);
   ...
   context.send(ScenarioN.TYPE,String.valueOf(transaction.id()),transaction);
}

我的问题是如果集群在runTransactionScenarios中间崩溃会发生什么?

标签: stateapache-flinkstatefulflink-statefun

解决方案


有状态函数(以及一般的 Apache Flink)支持完全一次的状态语义。这意味着在失败的情况下,运行时将始终以模拟完全无故障执行的方式回滚状态和消息。

这意味着消息可能会被重播,但内部状态将回滚到收到消息之前的时间点。只要您的业务规则仅修改 statefun 状态并通过出口与外界交互,您就可以将系统视为具有仅一次的属性。


推荐阅读