首页 > 解决方案 > postgres DB 中的大 INSERTS 的 asyncpg 问题

问题描述

在 postgres 中,我试图将大量数据从一个表插入到另一个表 - 大约 3000 万条记录。完成整个过程大约需要 40 分钟,所以我考虑将插入分成一系列 30 个较小的插入,并使用 asyncpg 之类的库异步执行它们。例如,每个 INSERT INTO... SELECT 需要 1.5 分钟来插入大约 100 万条记录,但是在使用 asyncpg 时,我没有看到任何异步行为。看起来好像 INSERTS 只是连续发生,一个接一个。示例,稍微简化,代码如下。任何人都可以发现我做错了什么或建议一种替代方法。甚至有可能做到这一点吗?在此先感谢您的帮助。

import asyncpg
import asyncio

async def test():
    
    conn = await asyncpg.connect(user='myuser', password='mypassword',
                                 database='mydb', host='myhost')
    values = await conn.fetch('''select count(*) from t1''')
    rows=values[0][0]
    await conn.close()

# get a list of 30 non-overlapping start/end rows for the SELECTS
    lst=range(rows)
    nums=(lst[i+1:i + int(len(lst)/30)] for i in range(0, len(lst), int(len(lst)/30)))

    pool =   await asyncpg.create_pool(user='myuser', password='mypassword',
                                 database='mydb', host='myhost')

# The idea is that the below for loop should fire off 
# around 30 INSERT... SELECTS  asynchronously
    for num in nums:
        start=num[0]
        end=num[len(num)-1]+1

        sqlstr=f"""
            insert  into t2
            select 
                   data
            from
            (
                select data , row_number() over() rn
                from t1
            ) as foo where rn between {start} and {end}
        """

        async with pool.acquire() as con:
        await con.execute(sqlstr)

    await pool.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(test())

标签: pythonpostgresqlasynchronousasyncpg

解决方案


推荐阅读