python - 我是否正确使用 aiohttp 和 psycopg2 ?
问题描述
我对使用 asyncio/aiohttp 很陌生,但是我有一个 Python 脚本,它从 Postgres 表中读取一批 URL:s,下载 URL:s,在每次下载时运行一个处理函数(与问题无关) ,并将处理结果保存到表中。
在简化形式中,它看起来像这样:
import asyncio
import psycopg2
from aiohttp import ClientSession, TCPConnector
BATCH_SIZE = 100
def _get_pgconn():
return psycopg2.connect()
def db_conn(func):
def _db_conn(*args, **kwargs):
with _get_pgconn() as conn:
with conn.cursor() as cur:
return func(cur, *args, **kwargs)
conn.commit()
return _db_conn
async def run():
async with ClientSession(connector=TCPConnector(ssl=False, limit=100)) as session:
while True:
count = await run_batch(session)
if count == 0:
break
async def run_batch(session):
tasks = []
for url in get_batch():
task = asyncio.ensure_future(process_url(url, session))
tasks.append(task)
await asyncio.gather(*tasks)
results = [task.result() for task in tasks]
save_batch_result(results)
return len(results)
async def process_url(url, session):
try:
async with session.get(url, timeout=15) as response:
body = await response.read()
return process_body(body)
except:
return {...}
@db_conn
def get_batch(cur):
sql = "SELECT id, url FROM db.urls WHERE processed IS NULL LIMIT %s"
cur.execute(sql, (BATCH_SIZE,))
return cur.fetchall()
@db_conn
def save_batch_result(cur, results):
sql = "UPDATE db.urls SET a = %(a)s, processed = true WHERE id = %(id)s"
cur.executemany(sql, tuple(results))
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
但我有一种感觉,我必须在这里遗漏一些东西。脚本运行,但似乎每批都变得越来越慢。process_url
特别是随着时间的推移,对函数的调用似乎变得更慢。使用的内存也在不断增长,所以我猜可能有一些东西我在运行之间无法正确清理?
我也有增加批量大小的问题,如果我超过 200,我似乎从调用session.get
. 我尝试使用limit
TCPConnector 的参数,将其设置得更高和更低,但我看不出它有多大帮助。也尝试在几个不同的服务器上运行它,但它似乎是一样的。有什么方法可以考虑如何更有效地设置这些值?
将不胜感激一些指示我在这里可能做错的事情!
解决方案
推荐阅读
- javascript - d3.js 折线图工具提示内容未显示
- html - 如何在需要时使这个内部 div 垂直滚动?
- firebase - curl:(60)SSL证书问题:证书链中的自签名证书
- html - 如何下载带有 css 叠加效果的图像?
- prolog - prolog 数据文件上的多用户 Web 应用程序
- java-8 - 通过 SSLSocketFactory 连接时 Java 客户端支持哪些密码?
- php - 为什么在使用“composer create-project”创建项目后,依赖项不是最新的可用版本?
- java - 无法在 android studio 中的 child() 中为参数“pathString”传递 null
- javascript - 正确使用 insertBefore() 的方法
- python - 将评估表达式传递给 Python 中的递归的问题