首页 > 解决方案 > 我是否正确使用 aiohttp 和 psycopg2 ?

问题描述

我对使用 asyncio/aiohttp 很陌生,但是我有一个 Python 脚本,它从 Postgres 表中读取一批 URL:s,下载 URL:s,在每次下载时运行一个处理函数(与问题无关) ,并将处理结果保存到表中。

在简化形式中,它看起来像这样:

import asyncio
import psycopg2
from aiohttp import ClientSession, TCPConnector

BATCH_SIZE = 100

def _get_pgconn():
    return psycopg2.connect()

def db_conn(func):
    def _db_conn(*args, **kwargs):
        with _get_pgconn() as conn:
            with conn.cursor() as cur:
                return func(cur, *args, **kwargs)
            conn.commit()
    return _db_conn

async def run():
    async with ClientSession(connector=TCPConnector(ssl=False, limit=100)) as session:
        while True:
            count = await run_batch(session)
            if count == 0:
                break

async def run_batch(session):
    tasks = []
    for url in get_batch():
        task = asyncio.ensure_future(process_url(url, session))
        tasks.append(task)

    await asyncio.gather(*tasks)
    results = [task.result() for task in tasks]
    save_batch_result(results)
    return len(results)

async def process_url(url, session):
    try:
        async with session.get(url, timeout=15) as response:
            body = await response.read()
            return process_body(body)
    except:
        return {...}

@db_conn
def get_batch(cur):
    sql = "SELECT id, url FROM db.urls WHERE processed IS NULL LIMIT %s"
    cur.execute(sql, (BATCH_SIZE,))
    return cur.fetchall()


@db_conn
def save_batch_result(cur, results):
    sql = "UPDATE db.urls SET a = %(a)s, processed = true WHERE id = %(id)s"
    cur.executemany(sql, tuple(results))


loop = asyncio.get_event_loop()
loop.run_until_complete(run())

但我有一种感觉,我必须在这里遗漏一些东西。脚本运行,但似乎每批都变得越来越慢。process_url特别是随着时间的推移,对函数的调用似乎变得更慢。使用的内存也在不断增长,所以我猜可能有一些东西我在运行之间无法正确清理?

我也有增加批量大小的问题,如果我超过 200,我似乎从调用session.get. 我尝试使用limitTCPConnector 的参数,将其设置得更高和更低,但我看不出它有多大帮助。也尝试在几个不同的服务器上运行它,但它似乎是一样的。有什么方法可以考虑如何更有效地设置这些值?

将不胜感激一些指示我在这里可能做错的事情!

标签: pythonpsycopg2aiohttp

解决方案


您的代码的问题是将异步aiohttp库与同步 psycopg2客户端混合在一起。

因此,对 DB 的调用会阻塞事件循环,从而完全影响所有其他并行任务。

要解决它,您需要使用异步 DB 客户端:aiopgpsycopg2异步模式的包装器)或asyncpg(它具有不同的 API,但运行速度更快)。


推荐阅读