rest - Vertx EventBus 回复“特定”消息
问题描述
我们有一个案例如下:
令人担忧的是,协调器从方法上下文发送消息并从另一个获取响应:
private void forwardToVWClient(Message msg) {
vertx.eventBus().send(RESTClient.ADDRESS, msg.body(), deliveryOptions, res -> {
if (res.succeeded()) {
log.info("forwardToVWClient. VW got result : success.");
// do not reply ok until we get an OK from the Listener verticle
} else {
log.error("forwardToVWClient VW got result : failure.");
msg.fail(500, res.cause().getMessage());
}
});
}
然后我有另一个事件总线消耗方法,我收到响应:
vertx.eventBus().consumer(ADDRESS_RESPONSE, this::handleResponseMessage);
private void handleResponseMessage(Message msg) {
// how to reply the message received in the context of forwardToVWClient ??
}
forwardToVWClient
那么,当我在 中收到响应时,如何在 的上下文中回复消息handleResponseMessage
?
到目前为止的几个想法:
- 将消息放在顶点上下文中?
- 消息对象有一个字段:
.replyAddress()
返回一个 int,我将它保存在静态 ConcurrentHashMap 中并使用它来回复特定消息。我将发布更多详细信息作为答案。
有没有更好的办法?
解决方案
实现它的一种方法是保存replyAddress
消息的字段并使用它将消息发送回始发者。
下面是一些简化的代码,展示了如何:
public class VehicleStateCoordinatorVerticle extends AbstractVerticle {
final static String ADDRESS_REQUEST = "CoordinatorRequest";
final static String ADDRESS_RESPONSE = "CoordinatorResponse";
static ConcurrentHashMap<String, VWApiRequest> pendingCommands = new ConcurrentHashMap<>();
public void start() {
vertx.eventBus().consumer(ADDRESS_REQUEST, this::handleRequestMessage);
vertx.eventBus().consumer(ADDRESS_RESPONSE, this::handleResponseMessage);
log.info("===== VehicleStateCoordinatorVerticle - bus consumer ready =====");
}
private void handleRequestMessage(Message msg) {
// .... omitted for brevity
// save the replyAddress and the command for later/callback
cmd.setReplyAddress(msg.replyAddress());
pendingCommands.put(cmd.getVwReference(), cmd);
forwardToVWClient(msg);
}
private void forwardToVWClient(Message msg) {
vertx.eventBus().send(AbstractOEMClientVerticle.ADDRESS, msg.body(), deliveryOptions, res -> {
if (res.succeeded()) {
log.info("forwardToVWClient. VW got result : success.");
// do not reply ok until we get an OK from the VWAPIServer verticle
} else {
log.error("forwardToVWClient VW got result : failure.");
msg.fail(500, res.cause().getMessage());
}
});
}
private void handleResponseMessage(Message msg) {
//..
VWApiRequest vwApiRequest = pendingCommands.get(vwReference);
if(vwApiRequest == null){
log.error("No pending vwApiRequest could be found!");
return;
}
/**
* Instead of targeting the RESTApi address,
* we use the replyAddress to target the specific message that is pending response.
*/
vertx.eventBus().send(vwApiRequest.getReplyAddress(), body, deliveryOptions, res -> {
if (res.succeeded()) {
// cheers!
}
else{
log.error("Error in handleResponseMessage {}", res.cause().getMessage());
}
});
}
推荐阅读
- mysql - MYSQL 6.3 中的 INNER JOIN 问题
- javascript - 在 JavaScript 中的特定索引处旋转数组
- python - python中的回文函数没有返回预期结果
- mysql - 在查询中使用联合连接两个表
- excel - 行删除脚本 - 日期之间的比较不起作用
- sql - 如何获取与 ID 关联的所有时间戳并将其聚合为总时间?
- azure-cosmosdb - 确保 Cosmos db 中 SP 中操作/事务的原子性
- postgresql - 防止可能的 PostgreSQL GUC 参数竞争条件?
- c - 只能将 Char* 写入 LPVOID 缓冲区吗?
- python - 从 excel 创建数据框