apache-kafka-streams - KafkaStream KTable 转储
问题描述
我的问题是关于在收到触发消息时转储其值符合特定标准的 KTable。
下面是这个问题的一个例子:
KTable - CurrentAccountBalance
John +10,
Joe -1,
Alice -2,
Jill +5,
我的要求是获取所有对传入事件具有负余额的记录:FETCH_NEGATIVE_BALANCE_ENTRIES
它到达不同的命令流。
我的想法是:如果我们使用命令流在 CurrentAccountBalance KTable 上执行 leftJoin ,我们可以转储 CurrentAccountBalance 的所有条目(可用于过滤器),但是,这不会发生。
leftJoin 的 ValueJoiner 只接收右边的命令和左边的 null (而不是 CurrentAccountBalance 的所有条目)。我错过了什么吗?
谢谢
解决方案
这是我为转储满足特定条件的 KTable 条目而实施的解决方案。在这里为其他寻求类似解决方案的人描述它。
- 创建了一个自定义映射器(FlatMapper)来生成满足条件的消息。我已将 state-store 作为构造函数参数传递给此映射器。
- 创建了一个名为“FLUSH_NEGATIVE_BALANCES”的事件并在一个名为 commandStream 的新 KStream 上读取它
- 当收到这个新的 FLUSH 命令时,我调用在 step1 中创建的 flatmapper 来生成满足条件的消息(在我的例子中是负余额)
我将 flatmapper 的输出写入接收器流。
/* 这是 flatmapper 的代码,即 Step1 在上面的描述*/
公共类 NegativeBalanceMapper 实现 KeyValueMapper>> { private final BusinessRuleValidator businessRuleValidator; 私有最终StoreHolder storeHolder;
public NegativeBalanceMapper(StoreHolder storeHolder, BusinessRuleValidator businessRuleValidator) { this.storeHolder = storeHolder; this.businessRuleValidator = businessRuleValidator; } public Iterable<KeyValue<String, BalanceEntry>> apply(String key, CommandEntry value) { List<KeyValue<String, BalanceEntry>> result = new ArrayList<>(); if (value == null) { return result; } final ReadOnlyKeyValueStore<String, BalanceEntry> ledgerStore = storeHolder.getLedgerStore(); if (ledgerStore != null) { KeyValueIterator<String, BalanceEntry> range = ledgerStore.all(); while (range.hasNext()) { KeyValue<String, BalanceEntry> next = range.next(); if (businessRuleValidator.isNegativeBalance(next.value)) { result.add(new KeyValue<>(next.key, next.value)); } } } return result; }
}
/这是接收到 FLUSH 命令时的调用方式-上面的步骤 2 / final KStreamnegativeBalances = filteredCommandStream.flatMap(negativeBalanceMapper);
/这里我把它推到上面的 sink-step4 /
负平衡.to(KafkaConstants.TOPIC_NEGATIVE_BALANCES,Produced.with(Serdes.String(), serdeRegistry.getBalanceEntrySerde()));
感谢马蒂亚斯的指导。
推荐阅读
- c# - VS Winforms: Components (images, buttons, labels) are resizing and stretching on build
- javascript - Javascript:定义为箭头函数的覆盖方法在父级中看不到
- c# - 如何为定制的浏览器构建 javascript 解释器?
- swift - 如何防止用户在 Swift 5 的 UITextView 中添加太多换行符?
- botframework - 如何将参数发送到回发对话框
- javascript - 我可以从 Redux 中的一个 reducer 获得多个状态吗?
- flutter - how to fix align issue on tabbar
- python - 没有 while True: ? 时无法启动输入命令。while True 的重要性和替代项:?
- perl - 用右侧的空格规范化列填充
- magento-2.0 - Magento 2 Create Shipping Method with Delivery Options