首页 > 解决方案 > KafkaStream KTable 转储

问题描述

我的问题是关于在收到触发消息时转储其值符合特定标准的 KTable。

下面是这个问题的一个例子:

KTable - CurrentAccountBalance

   John    +10,
   Joe     -1,
   Alice   -2,
   Jill    +5,

我的要求是获取所有对传入事件具有负余额的记录:FETCH_NEGATIVE_BALANCE_ENTRIES它到达不同的命令流。

我的想法是:如果我们使用命令流在 CurrentAccountBalance KTable 上执行 leftJoin ,我们可以转储 CurrentAccountBalance 的所有条目(可用于过滤器),但是,这不会发生。

leftJoin 的 ValueJoiner 只接收右边的命令和左边的 null (而不是 CurrentAccountBalance 的所有条目)。我错过了什么吗?

谢谢

标签: apache-kafka-streams

解决方案


这是我为转储满足特定条件的 KTable 条目而实施的解决方案。在这里为其他寻求类似解决方案的人描述它。

  1. 创建了一个自定义映射器(FlatMapper)来生成满足条件的消息。我已将 state-store 作为构造函数参数传递给此映射器。
  2. 创建了一个名为“FLUSH_NEGATIVE_BALANCES”的事件并在一个名为 commandStream 的新 KStream 上读取它
  3. 当收到这个新的 FLUSH 命令时,我调用在 step1 中创建的 flatmapper 来生成满足条件的消息(在我的例子中是负余额)
  4. 我将 flatmapper 的输出写入接收器流。

    /* 这是 flatmapper 的代码,即 Step1 在上面的描述*/

    公共类 NegativeBalanceMapper 实现 KeyValueMapper>> { private final BusinessRuleValidator businessRuleValidator; 私有最终StoreHolder storeHolder;

    public NegativeBalanceMapper(StoreHolder storeHolder, BusinessRuleValidator businessRuleValidator)
    {
        this.storeHolder = storeHolder;
        this.businessRuleValidator = businessRuleValidator;
    }
    
    public Iterable<KeyValue<String, BalanceEntry>> apply(String key, CommandEntry value)
    {
        List<KeyValue<String, BalanceEntry>> result = new ArrayList<>();
        if (value == null)
        {
            return result;
        }
    
        final ReadOnlyKeyValueStore<String, BalanceEntry> ledgerStore = storeHolder.getLedgerStore();
        if (ledgerStore != null)
        {
            KeyValueIterator<String, BalanceEntry> range = ledgerStore.all();
            while (range.hasNext())
            {
                KeyValue<String, BalanceEntry> next = range.next();
                if (businessRuleValidator.isNegativeBalance(next.value))
                {
                    result.add(new KeyValue<>(next.key, next.value));
                }
            }    
        }
        return result;
    }
    

    }

    /这是接收到 FLUSH 命令时的调用方式-上面的步骤 2 / final KStreamnegativeBalances = filteredCommandStream.flatMap(negativeBalanceMapper);

    /这里我把它推到上面的 sink-step4 /

    负平衡.to(KafkaConstants.TOPIC_NEGATIVE_BALANCES,Produced.with(Serdes.String(), serdeRegistry.getBalanceEntrySerde()));

感谢马蒂亚斯的指导。


推荐阅读