python - 如何将 aiopg 与 aiohttp 一起使用
问题描述
我有一个应用程序,它遍历来自 Postgres 表的批量 URL:s,下载 URL,在每次下载时运行处理函数并将处理结果保存到表中。
我使用 aiopg 和 aiohttp 编写了它以使其异步运行。在简化形式中,它看起来像:
import asyncio
import aiopg
from aiohttp import ClientSession, TCPConnector
BATCH_SIZE = 100
dsn = "dbname=events user={} password={} host={}".format(DB_USER, DB_PASSWORD, DB_HOST)
async def run():
async with ClientSession(connector=TCPConnector(ssl=False, limit=100)) as session:
async with aiopg.create_pool(dsn) as pool:
while True:
count = await run_batch(session, pool)
if count == 0:
break
async def run_batch(session, db_pool):
tasks = []
async for url in get_batch(db_pool):
task = asyncio.ensure_future(process_url(url, session, db_pool))
tasks.append(task)
await asyncio.gather(*tasks)
async def get_batch(db_pool):
sql = "SELECT id, url FROM db.urls ... LIMIT %s"
async with db_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql, (BATCH_SIZE,))
for row in cur:
yield row
async def process_url(url, session, db_pool):
async with session.get(url, timeout=15) as response:
body = await response.read()
data = process_body(body)
await save_data(db_pool, data)
async def process_body(body):
...
return data
async def save_data(db_pool, data):
sql = "UPDATE db.urls ..."
async with db_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql, (data,))
但有些不对劲。脚本运行的时间越长,运行的越慢,调用session.get
. 我的猜测是我使用 Postgres 连接的方式有问题,但我不知道出了什么问题。任何帮助将非常感激!
解决方案
推荐阅读
- android-fragments - Android导航:共享元素转换在片段之间不起作用
- php - 如何扩展php内存限制?
- c# - 如何在 EF Core Like 函数中使用通配符
- python-3.x - 创建菜单时找不到模块
- php - 一段时间后,RabbitMQ 消费者翻倍
- python - 如何导入大型 csv 文件并执行操作
- reactjs - 如何修复 '_react["default"].memo 不是函数。(在 React native 的 '_react["default"].memo(connectFunction)' 错误中?
- python - 如何修复在lora中创建应用程序
- angular - Angular 7 cli不读取自定义ts文件
- ansible - 使用“azure_rm_deployment”时,资源组是在“我们西部”的特定位置创建的吗?