c++ - 从服务器端返回后调用 gRPC 调用
问题描述
我有这两个 gRPC 调用:
service Console {
rpc JoinNetwork(keyvaluestore.Request) returns (ClusterInfo) {}
rpc WatchHealth(HealthCheckRequest) returns (stream HealthCheckResponse) {}
}
由两个实体使用:主节点和存储节点。当一个存储节点第一次启动时,它调用JoinNetwork()
主节点,主节点返回一些信息给存储节点。
然后我想WatchHealth()
从主节点调用,它 ping 存储节点,返回不断地向服务器发送“心跳”消息,直到它崩溃。
我能想到将这两个调用链接在一起的唯一方法是在 JoinNetwork 的服务器实现中调用 WatchHealth。但我真的只想在 JoinNetwork 返回后调用 WatchHealth ,我不知道该怎么做。
加入网络(客户端):
bool ConsoleClient::JoinNetwork(const string& my_addr, ClusterInfo* cinfo) {
ClientContext ctx;
Request req;
Status status = stub_->JoinNetwork(&ctx, req, cinfo);
if (status.ok()) {
log("[C: JoinNetwork] Successfully joined cluster " + to_string(cinfo->cid()), VB);
return true;
} else {
log("[C: JoinNetwork] Error " + to_string(status.error_code()) + ": " + status.error_message(), VB);
return false;
}
}
加入网络(服务器):
Status ConsoleServiceImpl::JoinNetwork(ServerContext* ctx, const Request* req, ClusterInfo* res) {
MLOCK.lock();
int cid = (NUM_NODES++) % K;
CDIR[cid].insert(req->addr());
if (CPRIMES.find(cid) == CPRIMES.end()) {
CPRIMES.insert({ cid, req->addr() }); // If no primary exists, make it the primary
}
res->set_cid(cid);
res->set_sid(assign_sid(cid));
res->set_caddrs(pack_addrs(CDIR[cid]));
res->set_primary(CPRIMES[cid]);
log("[S: JoinNetwork] Storage node " + req->addr() + " has joined cluster " + to_string(cid), VB);
MLOCK.unlock();
return Status::OK;
}
WatchHealth(客户端):
bool ConsoleClient::WatchHealth(const string& addr) {
ClientContext ctx;
auto deadline = chrono::system_clock::now() + chrono::milliseconds(500);
ctx.set_deadline(deadline);
HealthCheckRequest req;
HealthCheckResponse res;
unique_ptr<ClientReader<HealthCheckResponse>> reader(stub_->WatchHealth(&ctx, req));
while (reader->Read(&res));
Status status = reader->Finish();
if (status.ok()) {
log("[C: WatchHealth] Storage node " + addr + " shutdown gracefully", VB);
return true;
} else {
log("[C: WatchHealth] Storage node " + addr + " crashed", VB);
return false;
}
}
WatchHealth(服务器):
Status ConsoleServiceImpl::WatchHealth(ServerContext* ctx, const HealthCheckRequest* req, ServerWriter<HealthCheckResponse>* writer) {
HealthCheckResponse res;
while (true) {
res.set_status(console::HealthCheckResponse::SERVING);
writer->Write(res);
}
res.set_status(console::HealthCheckResponse::SHUTTING_DOWN);
writer->Write(res);
return Status::OK;
}
解决方案
推荐阅读
- amazon-web-services - Nodejs AWS Textract 不适用于 lambda
- python - 将天数添加到 PySpark Dataframe 中的数组内的元素
- python - 在动态规划问题中得到错误答案或超过时间限制
- c# - 如何在 DHTMLX - Scheduler .NET 中更改 TimeLine 单元格的列宽
- sql - 如何在 SQL 提示中禁止位图索引?
- r - 为什么R函数解析用逗号代替分号?
- java - 为什么表格不会显示在本地主机(jsp 文件)中?
- javascript - 如何使用 jquery 将签入和签出时间数据保存在 mysql 中?
- python - 如何将多个转换应用于 Python 字典中的同一个键
- r - html输出显示每个函数的描述