首页 > 解决方案 > 带有套接字 connect_ex 的异步 rabbitmq 消费者

问题描述

我想编写一个 python 程序,从 rabbitmq 服务器获取 ip 和 tcp 端口并扫描以检查端口是否打开,因为这些扫描有时会批量进行(可能 100 个端口,一次将 ip 对添加到队列中)我需要异步进行扫描以及时获得所有结果,即使我将超时时间降低到 1 秒,30 个关闭的端口每次都会保持扫描 30 秒!我尝试了 asyncio 和 aio_pika 来达到我的目标,但扫描仍在同步执行。

import asyncio
import aio_pika
import socket


async def tcp_check(host, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    await asyncio.sleep(1)
    result = sock.connect_ex((host,port))
    print (str(result))

async def main(loop):
    connection = await aio_pika.connect_robust("amqp://user:password@192.168.1.100/")
    async with connection:
        queue_name = "tcp_scans"
        channel = await connection.channel()
        queue = await channel.declare_queue(queue_name, auto_delete=False, durable=True)
        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    context = message.body.decode("utf-8").split(',')
                    await tcp_check(context[0], int(context[1]))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

更新:

我也使用了 asyncio.open_connection:

async def tcp_check(host, port):
    con = asyncio.open_connection(host, port, loop=loop)
    try:
        await asyncio.wait_for(con, timeout=1)
        print("{}:{} Connected".format(host, port))
    except asyncio.TimeoutError:
        print ("{}:{} Closeed".format(host, port))

仍然需要从每个项目中逐一测试......

标签: pythonsocketsrabbitmqpython-asyncio

解决方案


应避免在异步协程中调用同步长时间运行的函数。我建议使用 asyncio 替代 connect_ex,例如:

    try:
        await asyncio.open_connection(host, port)
    except Exception as e:
        print(e)

为了“即时”同时执行一些协程,您可以使用create_task“将协程包装到任务中并安排其执行”,因为它是在doc中编写的。在此之后,协程将很快执行,例如在下一次awaitasync for迭代之后,当控制流返回事件循环时。 create_task返回Task对象,您可以将其添加到列表中并等待它们全部使用asyncio.gatherwith flag完成return_exceptions=True。但在你的情况下,我认为在你的 main() 结束时替换和使用就足够await tcp_check()了,以保证所有 coro 都完成了。create_task(tcp_check())gather

...
asyncio.create_task(tcp_check(context[0], int(context[1])))
...

推荐阅读