首页 > 解决方案 > 连接到 PostgreSQL 数据库时 Python 打印加载“动画”

问题描述

(欢迎提供有关如何改写标题的提示。)

我正在使用 psycopg2 和 asyncio。我想要发生的是用户有一些迹象表明程序正在做某事。我有一个花哨的“\ | / -”动画,但为了简单起见,我只想说我希望它每秒打印一个点。

到目前为止我所拥有的:

import psycopg2
import asyncio


async def heartbeat(seconds):
    for sec in range(seconds):
        print(".")
        await asyncio.sleep(1)


async def conn_test():
    conn_params = {"database": "testdb", "user": "postgres",
                   "password": "my_pw"}
    conn = None

    print("Attempting to connect to database...")
    while not conn:
        try:
            conn = psycopg2.connect(**conn_params)

        except (Exception, psycopg2.Error) as error:
            print(error)

        finally:
            if conn:
                conn.close()
                print("Successfully connected to database. Now closing connection...")
            else:
                print("Could not connect to the database. Retrying...")


async def main():
    t1 = loop.create_task(heartbeat(10))
    t2 = conn_test()

    await asyncio.wait([t1, t2])

if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(main())
    except Exception as e:
        print(e)
    finally:
        loop.close()

我得到了什么(如果我故意阻止它连接)

.
Attempting to connect to database...

(直到 psycopg2 超时。)

我想要的是:

.
Attempting to connect to database...
.
.
.
.
.

(ETC。)

我试过psycopg2.connect(**conn_params, async_=True)了,但这并没有我想要的效果。我确实得到了点,但是由于返回了连接对象,即使无法建立连接,我的其余代码也无法按预期工作。我尝试进一步研究 async psycopg2。但这种方法似乎带来了很多额外的工作。我只是希望它异步连接,但是一旦连接存在,我execute()的 s 绝对不应该异步工作。

标签: pythonpython-3.xasynchronouspython-asynciopsycopg2

解决方案


按照 dirn 的建议,我使用 asyncpg 重写了代码:

import asyncpg
import asyncio


async def heartbeat(seconds):
    for sec in range(seconds):
        print(".")
        await asyncio.sleep(1)


async def conn_test():
    conn_params = {"database": "testdb", "user": "postgres",
                   "password": "my_pw"}
    conn = None

    print("Attempting to connect to database...")
    while not conn:
        try:
            conn = await asyncpg.connect(**conn_params)

        except TimeoutError:
            print("Could not connect to the database. Retrying...")

        finally:
            if conn:
                print("Successfully connected to database. "
                      "Now closing connection...")
                await conn.close()


async def main():
    t1 = loop.create_task(heartbeat(10))
    t2 = conn_test()

    await asyncio.wait([t1, t2])

if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(main())
    except Exception as e:
        print(e)
    finally:
        loop.close()

这在这个特定示例中产生了预期的结果。现在看看它如何与我的程序的其余部分一起工作......


推荐阅读