python-asyncio - 挂在python中的异步服务器
问题描述
我目前正在使用python异步服务器来为一些客户端提供服务,服务器起初运行良好,然后最终挂起,没有显示它是否正在接收客户端的请求,当我按下 ctr-c 时,它显示它接收到来自客户端的请求并且它不会停止,它只是继续审查 requests 。我不知道这个错误是从哪里来的。提前谢谢你
import asyncio, json
from collections import Coroutine
from typing import Any
import Engine
import struct
header_struct = struct.Struct("!Q") # messages up to 2**64 - 1 in length
async def recvall(reader, length):
blocks = []
while length:
block = await reader.read(length)
if not block:
raise EOFError('socket closed with {} bytes left in this block'.format(length))
length -= len(block)
blocks.append(block)
return b''.join(blocks)
async def get_block(reader):
data = await recvall(reader, header_struct.size)
(block_length,) = header_struct.unpack(data)
return await recvall(reader, block_length)
async def put_block(writer, message):
block_length = len(message)
writer.write(header_struct.pack(block_length))
writer.write(message)
# await writer.drain()
async def handle_conversation(reader, writer):
address__ = writer.get_extra_info("peername")
print("Accepted connection from {}".format(address__))
while True:
# ************************try to check if there data to send*********************************
try:
block = await get_block(reader)
# decode the data
data = block.decode()
decoded_data = json.loads(data)
# dont forget to make this synchronous
answer = await Engine.get_answer(decoded_data["Task"], decoded_data["content"])
# don't forget to check in there is necessary data to push and making sure data is conveyed
await put_block(writer, answer)
print(answer)
except Exception as e:
raise
if __name__ == '__main__':
address = Engine.parse_command_line("asyncio server using coroutine")
# loop = asyncio.get_event_loop()
# coro = asyncio.start_server(handle_conversation, *address)
async def main():
server = await asyncio.start_server(
handle_conversation, *address)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
asyncio.run(main(), debug=True)
引擎代码
import argparse
import json
import time
import upload_pic_and_search
import accept_connections
import connect
import opinion_poll
import share
import zmq
from jsonrpclib import Server
context = zmq.Context()
aphorisms = {"share": share.share_,
"poll": opinion_poll.add_poll,
"add_profile_pic": upload_pic_and_search.profile_pic,
"connect": connect.connect,
"accept_connection": accept_connections.accept_connection}
def sighn_up(doc):
"""this function will connect to sighn up """
proxy = Server('http://localhost:7002')
answer = proxy.sighn_up(doc)
return answer
def Verification(doc):
"""Code verification routine"""
proxy = Server('http://localhost:7002')
answer = proxy.verify(doc)
return answer
def login(doc):
"""This function handkes authetication"""
proxy = Server('http://localhost:7002')
answer = proxy.autheticate(doc)
return answer
def post(doc):
"""connect to server that handles posts"""
proxy = Server('http://localhost:6700')
answer = proxy.post(doc)
return answer
def comment(doc):
"""connect to the server that stores comments"""
proxy = Server('http://localhost:6701')
answer = proxy.comments_(doc)
return answer
def reply(doc):
"""store the reply"""
proxy = Server('http://localhost:6702')
answer = proxy.reply(doc)
return answer
def share(doc):
"""share the post"""
proxy = Server('http://localhost:6703')
answer = proxy.share(doc)
return answer
def likes(doc):
"""connect to the likes queue"""
zcontext = zmq.Context()
osock = zcontext.socket(zmq.PUSH)
osock.connect("tcp://127.0.0.1:6704")
osock.send_json(doc)
return {"Task": "like", "like": True}
def follow(doc):
"""handles the follow coroutine"""
zcontext = zmq.Context()
osock = zcontext.socket(zmq.PUSH)
osock.connect("tcp://127.0.0.1:6705")
osock.send_json(doc)
def connect(doc):
"""connect to routine for connection"""
zcontext = zmq.Context()
osock = zcontext.socket(zmq.PUSH)
osock.connect("tcp://127.0.0.1:6706")
osock.send_json(doc)
def accept_connection(doc):
"""the queue responsible accepting connections"""
zcontext = zmq.Context()
osock = zcontext.socket(zmq.PUSH)
osock.connect("tcp://127.0.0.1:6707")
osock.send_json(doc)
def add_profile_pic(doc):
"""Add the profile pic of the user"""
proxy = Server('http://localhost:7006')
answer = proxy.profile_pic(doc)
return answer
def search(doc):
"""search the user in the database"""
proxy = Server('http://localhost:7006')
answer = proxy.search(doc)
return answer
def profile(doc):
"""search the user in the database"""
proxy = Server('http://localhost:7006')
answer = proxy.profile(doc)
return answer
async def get_answer(aphorism, content):
"""Return the response to particular question"""
# time.sleep(0.0)
# fetch responsible function
# function = aphorisms.get(aphorism, "Error:Unknown aphorism.")
function = eval(aphorism)
answer = function(content)
return send(answer)
def send(data):
"""Prepare the data to be sent via socket"""
json_data = json.dumps(data)
data_bytes = json_data.encode()
return data_bytes
def parse_command_line(description):
"""arse command line and return a socket address."""
parser = argparse.ArgumentParser(description=description)
parser.add_argument('host', help="IP or hostname")
parser.add_argument("-p", metavar='port', type=int, default=1060, help="TCP port (default 1060)")
args = parser.parse_args()
address = (args.host, args.p)
return address
def recv_untill(sock, suffix):
"""Receive bytes over socket `sock` until we receive the `suffix`."""
message = sock.recv(4096)
if not message:
raise EOFError("Socket closed")
while not message.endswith(suffix):
data = sock.recv(4096)
if not data:
raise IOError('received {!r} then socket closed'.format(message))
message += data
return message
解决方案
推荐阅读
- xamarin.ios - 在 VS2017 中,如何显示 Xamarin IOS Assets 的图标大小?
- javascript - 此 Cookie 适用于 Localhost 但不适用于主站点
- c# - 我的 C# 项目中哪个引用的 DLL 正在尝试访问 Internet
- kotlin - Kotlin supplyAsync 与 executor
- macos - X11(xquartz)窗口堆叠
- c# - 创建任务并“等待”它与同步运行某些东西或等待异步方法相同吗?
- android - Proguard 不剥离木材原木
- laravel - 如何在 laravel 中保存 HTML img 标签中的数据
- java - 从字符串到日期的转换不会在 JAVA 中发生
- java - Java计算数组中的字符串索引0