python - 在 python asyncio 中等待 pubsub 回调
问题描述
我正在一个订阅 pubsub 主题并为每条新传入消息启动回调的新线程中创建一个 pubsub 订阅。
此回调函数需要等待对 Postgres 数据库的异步调用我正在使用 asyncpg 进行数据库调用
cannot perform operation: another operation is in progress
尝试访问数据库时出现错误
import asyncio
import json
import traceback
from datetime import datetime
from typing import Callable
import google.cloud.pubsub_v1
import logging
from concurrent import futures
from google.cloud import pubsub_v1
async def write_to_database(i, pool):
query = f"""
insert into table1 values('a','b');
"""
async with pool.acquire() as con:
try:
await con.execute(query)
finally:
await pool.release(con)
async def pubsub_callback(message):
"""Process the incoming message."""
try:
print((f'Received ID:{message.message_id} '
f'PUBTIME:{message.publish_time} '
f'ATTEMPT:{message.delivery_attempt} '
f'Data: {message.data}'))
await asyncio.sleep(1)
await write_to_database(pool)
print(
f'Message ID:{message.message_id} Processed.')
except ValueError:
print(
f'Message ID:{message.message_id} was not processed')
finally:
# pass
message.ack()
async def receive_message():
await asyncio.sleep(.5)
loop = asyncio.new_event_loop()
subscriber = google.cloud.pubsub_v1.SubscriberClient()
project_id = 'project_id'
subscription_id = 'subscription_id'
with subscriber:
subscription_path = subscriber.subscription_path(project_id,
subscription_id)
try:
print(f'Listening for messages on {subscription_path}')
def create_pubsub_callback_task(message):
"""Callback handler for the subscription; schedule a task on the event loop"""
Asyncio.run(pubsub_callback(message))
# loop.create_task(pubsub_callback(message))
streaming_pull_future = subscriber.subscribe(subscription_id,
callback=create_pubsub_callback_task)
streaming_pull_future.result(timeout=None)
except TimeoutError:
streaming_pull_future.cancel()
except KeyboardInterrupt:
streaming_pull_future.cancel()
loop1 = asyncio.get_event_loop()
pool = loop1.run_until_complete(asyncpg.create_pool(url))
loop1.run_until_complete(receive_message())
解决方案
推荐阅读
- javascript - 如何使用量角器删除 DOM 的元素
- amazon-web-services - 未反映上传到 Amazon S3 存储桶的最新 html 文件
- pandas - 如何仅删除不同标签的行(NaN 和 Empty 仍然存在)
- html - 在 NodeJS 中使用 Cheerio 将文本替换为 HTML
- python - 将变量传递给进程python
- mongodb - 在两个集合之间映射代码值,使用其他集合更新值
- shell - 如何在 shell 中获取 {} 的值?
- html - 如何在角度 4 中的任何 html 元素的标题属性上设置 html 字符串?
- html - 将屏幕分成两个单独的相等部分 ionic 3
- php - 将2个数组变成1个数组