python - 带有套接字 connect_ex 的异步 rabbitmq 消费者
问题描述
我想编写一个 python 程序,从 rabbitmq 服务器获取 ip 和 tcp 端口并扫描以检查端口是否打开,因为这些扫描有时会批量进行(可能 100 个端口,一次将 ip 对添加到队列中)我需要异步进行扫描以及时获得所有结果,即使我将超时时间降低到 1 秒,30 个关闭的端口每次都会保持扫描 30 秒!我尝试了 asyncio 和 aio_pika 来达到我的目标,但扫描仍在同步执行。
import asyncio
import aio_pika
import socket
async def tcp_check(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
await asyncio.sleep(1)
result = sock.connect_ex((host,port))
print (str(result))
async def main(loop):
connection = await aio_pika.connect_robust("amqp://user:password@192.168.1.100/")
async with connection:
queue_name = "tcp_scans"
channel = await connection.channel()
queue = await channel.declare_queue(queue_name, auto_delete=False, durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
context = message.body.decode("utf-8").split(',')
await tcp_check(context[0], int(context[1]))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
更新:
我也使用了 asyncio.open_connection:
async def tcp_check(host, port):
con = asyncio.open_connection(host, port, loop=loop)
try:
await asyncio.wait_for(con, timeout=1)
print("{}:{} Connected".format(host, port))
except asyncio.TimeoutError:
print ("{}:{} Closeed".format(host, port))
仍然需要从每个项目中逐一测试......
解决方案
应避免在异步协程中调用同步长时间运行的函数。我建议使用 asyncio 替代 connect_ex,例如:
try:
await asyncio.open_connection(host, port)
except Exception as e:
print(e)
为了“即时”同时执行一些协程,您可以使用create_task
“将协程包装到任务中并安排其执行”,因为它是在doc中编写的。在此之后,协程将很快执行,例如在下一次await
或async for
迭代之后,当控制流返回事件循环时。
create_task
返回Task
对象,您可以将其添加到列表中并等待它们全部使用asyncio.gather
with flag完成return_exceptions=True
。但在你的情况下,我认为在你的 main() 结束时替换和使用就足够await tcp_check()
了,以保证所有 coro 都完成了。create_task(tcp_check())
gather
...
asyncio.create_task(tcp_check(context[0], int(context[1])))
...
推荐阅读
- dynamics-crm - 无法在内部部署 MS CRM 2016 中删除系统工作流程步骤
- laravel - 关于 laravel 项目中的邮件格式验证
- javascript - 如何在 js 中将缓冲的 base64 转换为 Tiff 图像?
- angular - 如何检查子路由是否存在 - Angular
- mysql - Sequelize - 插入长记录的最佳方式
- javascript - 如何使用纯 JS 删除最后一个输入元素
- javascript - 不要选择 menuItem on icon click in material ui select
- entity-framework-6 - 使用 Entity Framework 6 和 npgsql 抛出错误时如何创建新连接?
- java - 将 8kHz mulaw 实时转换为 16KHz PCM
- neural-network - 用 C 语言训练的神经网络模型?