首页 > 解决方案 > 从服务器端返回后调用 gRPC 调用

问题描述

我有这两个 gRPC 调用:

service Console {
  rpc JoinNetwork(keyvaluestore.Request) returns (ClusterInfo) {}
  rpc WatchHealth(HealthCheckRequest) returns (stream HealthCheckResponse) {}
}

由两个实体使用:主节点和存储节点。当一个存储节点第一次启动时,它调用JoinNetwork()主节点,主节点返回一些信息给存储节点。

然后我想WatchHealth()从主节点调用,它 ping 存储节点,返回不断地向服务器发送“心跳”消息,直到它崩溃。

我能想到将这两个调用链接在一起的唯一方法是在 JoinNetwork 的服务器实现中调用 WatchHealth。但我真的只想在 JoinNetwork 返回调用 WatchHealth ,我不知道该怎么做。

加入网络(客户端):

bool ConsoleClient::JoinNetwork(const string& my_addr, ClusterInfo* cinfo) {
  ClientContext ctx;
  Request req;
  Status status = stub_->JoinNetwork(&ctx, req, cinfo);
  if (status.ok()) {
    log("[C: JoinNetwork] Successfully joined cluster " + to_string(cinfo->cid()), VB);
    return true;
  } else {
    log("[C: JoinNetwork] Error " + to_string(status.error_code()) + ": " + status.error_message(), VB);
    return false;
  }
}

加入网络(服务器):

Status ConsoleServiceImpl::JoinNetwork(ServerContext* ctx, const Request* req, ClusterInfo* res) {
  MLOCK.lock();
  int cid = (NUM_NODES++) % K;
  CDIR[cid].insert(req->addr());
  if (CPRIMES.find(cid) == CPRIMES.end()) {
    CPRIMES.insert({ cid, req->addr() }); // If no primary exists, make it the primary
  }
  res->set_cid(cid);
  res->set_sid(assign_sid(cid));
  res->set_caddrs(pack_addrs(CDIR[cid]));
  res->set_primary(CPRIMES[cid]);
  log("[S: JoinNetwork] Storage node " + req->addr() + " has joined cluster " + to_string(cid), VB);
  MLOCK.unlock();

  return Status::OK;
}

WatchHealth(客户端):

bool ConsoleClient::WatchHealth(const string& addr) {
  ClientContext ctx;
  auto deadline = chrono::system_clock::now() + chrono::milliseconds(500);
  ctx.set_deadline(deadline);
  HealthCheckRequest req;
  HealthCheckResponse res;
  unique_ptr<ClientReader<HealthCheckResponse>> reader(stub_->WatchHealth(&ctx, req));
  while (reader->Read(&res));
  Status status = reader->Finish();
  if (status.ok()) {
    log("[C: WatchHealth] Storage node " + addr + " shutdown gracefully", VB);
    return true;
  } else {
    log("[C: WatchHealth] Storage node " + addr + " crashed", VB);
    return false;
  }
}

WatchHealth(服务器):

Status ConsoleServiceImpl::WatchHealth(ServerContext* ctx, const HealthCheckRequest* req, ServerWriter<HealthCheckResponse>* writer) {
  HealthCheckResponse res;
  while (true) {
    res.set_status(console::HealthCheckResponse::SERVING);
    writer->Write(res);
  }
  res.set_status(console::HealthCheckResponse::SHUTTING_DOWN);
  writer->Write(res);
  return Status::OK;
}

标签: c++grpc

解决方案


推荐阅读