python - 如何与kombu中的任何消费者建立数据库连接(psycopg2)?
问题描述
我想创建三个 Kombu 消费者,每个消费者都有一个到数据库的连接,每个连接都独立于其余连接。
我想替换任务函数中的“连接数据库的工作者”这句话,正在进行的消费者数据库的连接。
感谢你们对我的帮助。
它是我的消费者:
from __future__ import with_statement
from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
from kombu.utils import reprcall
from dashboard.rabbitmq import conf
from kombu import Exchange, Queue
logger = get_logger(__name__)
class Worker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
_list_consumer = []
for item in range(0, 3):
_list_consumer.append(
Consumer(
queues=[Queue('viewer', Exchange('aghigh', type='direct'), routing_key='viewer')],
accept=['pickle', 'json'],
prefetch_count=1,
callbacks=[self.process_task]
)
)
return _list_consumer
def process_task(self, body, message):
fun = body['fun']
args = body['args']
kwargs = body['kwargs']
logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs))
try:
fun(*args, **kwargs)
except Exception as exc:
logger.error('task raised exception: %r', exc)
message.ack()
if __name__ == '__main__':
from kombu import Connection
with Connection(userid=conf.RABBIT_MQ_USER, password=conf.RABBIT_MQ_PASS) as conn:
try:
Worker(conn).run()
except KeyboardInterrupt:
print('bye bye')
和我的任务:
def task(*args, **kwargs):
print("Hello %s" % kwargs)
conn = "connection database of worker"
cur = conn.cursor()
try:
cur.execute("INSERT INTO activity_activitystate (viewer_ip, activity_id, viewer_id, created_at) VALUES(%s, %s, %s, %s )", (
kwargs['viewer_ip'],
kwargs['activity_id'],
kwargs['viewer_id'],
kwargs['created_at'],
))
cur.execute("UPDATE activity_activity SET view_count = view_count + 1 WHERE id = {0}".format(kwargs['activity_id']))
conn.commit()
except:
pass
cur.close()
此数据库连接设置:
from dashboard.singleton import Singleton
from dashboard.rabbitmq import conf
import psycopg2
@Singleton
class DbConnection(object):
__co = None
def __init__(self):
if self.__co is None:
__co = psycopg2.connect(
host=conf.DATABASE['HOST'],
database=conf.DATABASE['NAME'],
user=conf.DATABASE['USER'],
password=conf.DATABASE['PASSWORD']
)
self.__co = __co
def get_connection(self):
return self.__co
conn = DbConnection().get_connection()
解决方案
推荐阅读
- c# - DataTemplate 中的 Xamarin.Forms CollectionView 项目“背后”是什么(关于背景颜色)?
- python - confluent-kafka-python 在生产者的飞行中没有捕捉到超时的元数据请求
- php - 如何在 Laravel 8 Jetstream 中验证注册路由?
- go - 如何将字符串转换为 Golang 结构中的切片
- c++ - 创建位图文件的程序
- c++ - C++ 中的多线程服务器
- windows - 如何在没有管理员权限的情况下将文件复制/下载到 Program Files 中?
- mongodb - 使用大小为 600mb 的 json 文件将集合插入 MongoDB
- java - 如何使用 Spring Boot RestTemplate 调用受 SSL 保护的 api
- c# - 字符串来自变量时未设置图像源