首页 > 解决方案 > 挂在python中的异步服务器

问题描述

我目前正在使用python异步服务器来为一些客户端提供服务,服务器起初运行良好,然后最终挂起,没有显示它是否正在接收客户端的请求,当我按下 ctr-c 时,它显示它接收到来自客户端的请求并且它不会停止,它只是继续审查 requests 。我不知道这个错误是从哪里来的。提前谢谢你

import asyncio, json
from collections import Coroutine
from typing import Any

import Engine
import struct

header_struct = struct.Struct("!Q")  # messages up to 2**64 - 1 in length


async def recvall(reader, length):
    blocks = []
    while length:
        block = await reader.read(length)
        if not block:
            raise EOFError('socket closed with {} bytes left in this block'.format(length))
        length -= len(block)
        blocks.append(block)
    return b''.join(blocks)


async def get_block(reader):
    data = await recvall(reader, header_struct.size)
    (block_length,) = header_struct.unpack(data)
    return await recvall(reader, block_length)


async def put_block(writer, message):
    block_length = len(message)
    writer.write(header_struct.pack(block_length))
    writer.write(message)
    # await writer.drain()


async def handle_conversation(reader, writer):
    address__ = writer.get_extra_info("peername")
    print("Accepted connection from {}".format(address__))
    while True:
        # ************************try to check if there data to send*********************************
        try:
            block = await get_block(reader)
            # decode the data
            data = block.decode()
            decoded_data = json.loads(data)
            # dont forget to make this synchronous
            answer = await Engine.get_answer(decoded_data["Task"], decoded_data["content"])

            # don't forget to check in there is necessary data to push and making sure data is conveyed
            await put_block(writer, answer)
            print(answer)
        except Exception as e:
            raise


if __name__ == '__main__':
    address = Engine.parse_command_line("asyncio server using coroutine")
    # loop = asyncio.get_event_loop()
    # coro = asyncio.start_server(handle_conversation, *address)


    async def main():
        server = await asyncio.start_server(
            handle_conversation, *address)

        addr = server.sockets[0].getsockname()
        print(f'Serving on {addr}')

        async with server:
            await server.serve_forever()

    asyncio.run(main(), debug=True)

引擎代码

import argparse
import json
import time
import upload_pic_and_search
import accept_connections
import connect
import opinion_poll
import share
import zmq
from jsonrpclib import Server

context = zmq.Context()
aphorisms = {"share": share.share_,
             "poll": opinion_poll.add_poll,
             "add_profile_pic": upload_pic_and_search.profile_pic,
             "connect": connect.connect,
             "accept_connection": accept_connections.accept_connection}


def sighn_up(doc):
    """this function will connect to sighn up """
    proxy = Server('http://localhost:7002')
    answer = proxy.sighn_up(doc)
    return answer


def Verification(doc):
    """Code verification routine"""
    proxy = Server('http://localhost:7002')
    answer = proxy.verify(doc)
    return answer


def login(doc):
    """This function handkes authetication"""
    proxy = Server('http://localhost:7002')
    answer = proxy.autheticate(doc)
    return answer


def post(doc):
    """connect to server that handles posts"""
    proxy = Server('http://localhost:6700')
    answer = proxy.post(doc)
    return answer


def comment(doc):
    """connect to the server  that stores comments"""
    proxy = Server('http://localhost:6701')
    answer = proxy.comments_(doc)
    return answer


def reply(doc):
    """store the reply"""
    proxy = Server('http://localhost:6702')
    answer = proxy.reply(doc)
    return answer


def share(doc):
    """share the post"""
    proxy = Server('http://localhost:6703')
    answer = proxy.share(doc)
    return answer


def likes(doc):
    """connect to the likes queue"""
    zcontext = zmq.Context()
    osock = zcontext.socket(zmq.PUSH)
    osock.connect("tcp://127.0.0.1:6704")
    osock.send_json(doc)
    return {"Task": "like", "like": True}


def follow(doc):
    """handles the follow coroutine"""
    zcontext = zmq.Context()
    osock = zcontext.socket(zmq.PUSH)
    osock.connect("tcp://127.0.0.1:6705")
    osock.send_json(doc)


def connect(doc):
    """connect to routine for connection"""
    zcontext = zmq.Context()
    osock = zcontext.socket(zmq.PUSH)
    osock.connect("tcp://127.0.0.1:6706")
    osock.send_json(doc)


def accept_connection(doc):
    """the queue responsible accepting connections"""
    zcontext = zmq.Context()
    osock = zcontext.socket(zmq.PUSH)
    osock.connect("tcp://127.0.0.1:6707")
    osock.send_json(doc)


def add_profile_pic(doc):
    """Add the profile pic of the user"""
    proxy = Server('http://localhost:7006')
    answer = proxy.profile_pic(doc)
    return answer


def search(doc):
    """search the user in the database"""
    proxy = Server('http://localhost:7006')
    answer = proxy.search(doc)
    return answer


def profile(doc):
    """search the user in the database"""
    proxy = Server('http://localhost:7006')
    answer = proxy.profile(doc)
    return answer


async def get_answer(aphorism, content):
    """Return the response to particular question"""
    # time.sleep(0.0)
    # fetch responsible function
    # function = aphorisms.get(aphorism, "Error:Unknown aphorism.")
    function = eval(aphorism)
    answer = function(content)
    return send(answer)


def send(data):
    """Prepare the data to be sent via socket"""
    json_data = json.dumps(data)
    data_bytes = json_data.encode()
    return data_bytes


def parse_command_line(description):
    """arse command line and return a socket address."""
    parser = argparse.ArgumentParser(description=description)
    parser.add_argument('host', help="IP or hostname")
    parser.add_argument("-p", metavar='port', type=int, default=1060, help="TCP port (default 1060)")
    args = parser.parse_args()
    address = (args.host, args.p)
    return address


def recv_untill(sock, suffix):
    """Receive bytes over socket `sock` until we receive the `suffix`."""
    message = sock.recv(4096)
    if not message:
        raise EOFError("Socket closed")
    while not message.endswith(suffix):
        data = sock.recv(4096)
        if not data:
            raise IOError('received {!r} then socket closed'.format(message))
        message += data
    return message

标签: python-asyncio

解决方案


推荐阅读