首页 > 解决方案 > 在 python asyncio 中等待 pubsub 回调

问题描述

我正在一个订阅 pubsub 主题并为每条新传入消息启动回调的新线程中创建一个 pubsub 订阅。

此回调函数需要等待对 Postgres 数据库的异步调用我正在使用 asyncpg 进行数据库调用

cannot perform operation: another operation is in progress尝试访问数据库时出现错误

import asyncio
import json
import traceback
from datetime import datetime
from typing import Callable

import google.cloud.pubsub_v1
import logging
from concurrent import futures

from google.cloud import pubsub_v1

async def write_to_database(i, pool):
    query = f"""
                        insert into table1 values('a','b');
                                                        """
    async with pool.acquire() as con:
        try:
            await con.execute(query)
        finally:
            await pool.release(con)


async def pubsub_callback(message):
    """Process the incoming message."""
    try:

        print((f'Received ID:{message.message_id} '
               f'PUBTIME:{message.publish_time} '
               f'ATTEMPT:{message.delivery_attempt} '
               f'Data: {message.data}'))

        await asyncio.sleep(1)
        await write_to_database(pool)
        print(
            f'Message ID:{message.message_id} Processed.')
    except ValueError:
        print(
            f'Message ID:{message.message_id} was not processed')
    finally:
        # pass
        message.ack()


async def receive_message():
    await asyncio.sleep(.5)
    loop = asyncio.new_event_loop()

    subscriber = google.cloud.pubsub_v1.SubscriberClient()
    project_id = 'project_id'
    subscription_id = 'subscription_id'
    with subscriber:
        subscription_path = subscriber.subscription_path(project_id,
                                                         subscription_id)

        try:
            print(f'Listening for messages on {subscription_path}')


            def create_pubsub_callback_task(message):
                """Callback handler for the subscription; schedule a task on the event loop"""
                Asyncio.run(pubsub_callback(message))
                # loop.create_task(pubsub_callback(message))

            streaming_pull_future = subscriber.subscribe(subscription_id,
                                                         callback=create_pubsub_callback_task)

            streaming_pull_future.result(timeout=None)

        except TimeoutError:
            streaming_pull_future.cancel()
        except KeyboardInterrupt:
            streaming_pull_future.cancel()

loop1 = asyncio.get_event_loop()
pool = loop1.run_until_complete(asyncpg.create_pool(url))
loop1.run_until_complete(receive_message())

标签: pythonpostgresqlpython-asyncioasyncpg

解决方案


推荐阅读