quarkus - 如何正确使用 quarkus 和 smallrye 做事件驱动的微服务
问题描述
亲爱的,我正在尝试做一些事件驱动的微服务。目前,当使用 Quarkus 和 Smallrye-Reactive 消息传递扩展接收到消息时,我能够使用来自 Kafka 的消息并更新数据库记录。我想要进一步实现的是能够在成功的情况下向其他主题发送消息,否则向错误主题发送消息。我知道我们可以使用 return 和 @outgoing 注释来发出新消息,但我认为它不适合我的用例。如果在使用消息时发生错误,我需要一个指导。我应该将消息返回到原始主题(通过不确认消息)还是应该使用它并向不同的主题产生错误消息以回滚原始事务。
这是我的代码:
@Incoming("new-payment")
public void newMessage(String msg) {
LOG.info("New payment has been received.");
LOG.info("Payload is {}", msg);
PaymentEvent pe = jsob.fromJson(msg, PaymentEvent.class);
mysqlPool.preparedQuery("select totalBuyers from Book where isbn = ? ",
Tuple.of(pe.getIsbn()))
.thenApply(rs -> {
RowIterator<Row> iterator = rs.iterator();
if (iterator.hasNext()) {
return iterator.next().getInteger(0) + 1;
} else {
return Integer.valueOf(0);
}
})
.thenApply(totalCount -> {
return mysqlPool.preparedQuery("update Book set totalBuyers = ?",
Tuple.of(totalCount));
})
.whenComplete((rs, err) -> {
if (err != null) {
//Emit an error to error topic.
} else {
//Emit a msg to other service.
}
});
}
另外,如果您有更好的代码,请提交,我还是反应式编程的新手 :)。
解决方案
我多年来一直在做企业集成,我认为你会想要两者都做。
我应该将消息返回到原始主题(通过不确认消息)还是应该使用它并向不同的主题产生错误消息以回滚原始事务。
该事件应保留在主题上,以供另一个实例可能拾取和处理。并且应该将错误消息记录为事件。也许同一个消费者可以成功接收并重新处理该事件。
EDA(事件驱动架构)可能会提供不同的方法来处理此问题,但在 ESB 上,消息将被标记为已尝试。通常,经过三次尝试会将其发送到死信队列,以便稍后对其进行更正和重新处理。
我们的企业也开始使用 EDA 设计和构建应用程序,所以我有兴趣阅读其他人对这个问题的看法。感谢您专注于 Quarkus。我相信这是我见过的最好的 Redhat 技术之一!
推荐阅读
- python - 是否有使用 sphinx.ext.napoleon 在 Sphinx 中记录函数类型参数的标准格式?
- discord - 如何停止播放机器人音乐?(不和谐.js)
- angular - 从 Flask 获取 JSON 到 Angular
- php - 如何在登录时配置用户禁止更多 maxAttempts?
- swiftui - SwiftUI - 预览在一台特定设备上停止工作
- r - 两个数据集满足 R 中的一个条件
- java - 带有注释处理器的 AspectJ maven 插件 - 源生成两次
- php - 捕获 Twilio API 异常 - PHP
- python - 为什么我的 numpy 文件比使用相同数组生成的 PNG 大?
- mysql - 我可以散列/加密数据库 TEXT 列吗?