python - 如何异步处理双向 grpc 流
问题描述
我有一个游戏或任何远程用户界面,它带有一个服务器和多个应该通过网络通信的客户端。 客户端和服务器都应该能够异步发送更新。
这似乎是一个很自然的服务定义,让我们用 grpc 管理会话。
syntax = "proto3";
package mygame;
service Game {
rpc participate(stream ClientRequest) returns (ServerResponse);
}
message ClientRequest {
// Fields for the initial request and further updates
}
message ServerResponse {
// Game updates
}
实现客户端是微不足道的(尽管下面的代码显然是不完整和简化的)。
class Client:
def __init__(self):
self.channel = grpc.insecure_channel("localhost:50051")
self.stub = game_pb2_grpc.GameStub(channel)
self.output_queue = queue.Queue()
def output_iter(self):
while True:
client_output_msg = self.output_queue.get()
self.output_queue.task_done()
yield client_output_msg
def do_work(self):
for response in self.stub.participate(self.output_iter()):
print(response) # handle update
with grpc.insecure_channel("localhost:50051") as channel:
client = Client()
client.do_work()
似乎很难实现服务器而不阻塞。
class Game(game_pb2_grpc.GameServicer):
def __init__(self):
self.pending_events = queue.Queue()
def participate(self, request_iter, context):
for client_update in request_iter:
print(client_update)
# !!!
# The next bit won't happen if the client has no updates
# !!!
try:
while True:
server_update = self.pending_events.get_nowait()
yield server_update
except queue.Empty:
pass
server = grpc.server(ThreadPoolExecutor(max_workers=100))
game_pb2_grpc.add_GameServicer_to_server(Game(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
如代码中所述,如果客户端不不断发送请求,它将不会收到更新。也许异步方法会更好,这也可能解决此设计中的其他问题。
PS:这个问题已经在 go here中用 grpc 解决了,但是我不知道如何将它翻译成 python 的 grpc 实现。
如果有任何帮助,我将非常高兴!
解决方案
我终于能够使用python asynio api让它工作。基本思想是使用asyncio.create_task将读写解耦为两个协程。对于任何感兴趣的人,这里有一个解决方案。
class Game(game_pb2_grpc.GameServicer):
async def read_client_requests(self, request_iter):
async for client_update in request_iter:
print("Recieved message from client:", client_update, end="")
async def write_server_responses(self, context):
for i in range(15):
await context.write(game_pb2.ServerResponse(dummy_value=str(i)))
await asyncio.sleep(0.5)
async def participate(self, request_iter, context):
read_task = asyncio.create_task(self.read_client_requests(request_iter))
write_task = asyncio.create_task(self.write_server_responses(context))
await read_task
await write_task
async def serve():
server = grpc.aio.server()
game_pb2_grpc.add_GameServicer_to_server(Game(), server)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
asyncio.run(serve())
请注意,代替 write 协程,yield 也足够了。
推荐阅读
- c++ - QComboBox 与点击线编辑
- r - 使用一个列表中的位置从 R 中的另一个列表中读取文件
- python - ttk.Treeview 多选时某些行被预选失败
- json - JsonArray 不支持的类型:类 java.util.LinkedHashMap
- ember.js - 如何跟踪 Glimmer 组件中的嵌套属性?
- xamarin.forms - 如何通过在构造函数中传递参数来保留注册的服务使用?
- sql - BigQuery – 在应用 WHERE shop IN(从 X 中选择商店)时,集群字段可以扫描的行数是否有限?
- selenium - 很抱歉,您的计算机或网络在使用 Selenium 在自动测试环境中加载 Google API 时可能会发送自动查询错误
- c# - 有没有办法给网站信息,并得到一个情节?
- node.js - 无法在 Node.js 13.6 中导入 @google/pubsub 和其他 @google npm 模块