首页 > 解决方案 > 遍历查询的复制结果的最佳方法是什么?

问题描述

我想做什么

我的目标是从 Postgres 复制数据,将其转换为 Parquet 并加载到数据湖中。在这篇文章中,我主要关注将数据从 asyncpg 加载到类似文件的对象,而不会耗尽内存。

目前,这是我从 Postgres 复制数据的方式:

async def postgres_copy(query: str, output_format: str = 'csv', **creds) -> pa.lib.Buffer:
    a_buffer = pa.BufferOutputStream()
    con = await asyncpg.connect(**creds)
    try:
        response = await con.copy_from_query(query=query, output=arrow_buffer, format=output_format, header=True)
        return arrow_buffer.getvalue()
    finally:
        await con.close()

什么不工作

有些表太大而无法放入缓冲区的内存中,这会导致内存不足异常。

我想做什么

为避免内存不足,我想通过查询结果流式传输或分页(服务器端)。换句话说,在内存中以块的形式处理查询结果。阅读此处的文档似乎可以遍历光标,例如:

async def iterate(con: Connection):
    async with con.transaction():
        # Create a Cursor object
        cur = await con.cursor('SELECT generate_series(0, 100)')

        # Fetch a list of the first 5 rows and print them
        print(await cur.fetch(5))

        # Move the cursor 5 rows forward and fetch the next
        await cur.forward(5)
        print(await cur.fetch(5))

我的问题:

标签: pythonpostgresqlasyncpg

解决方案


推荐阅读