首页 > 解决方案 > 如何与kombu中的任何消费者建立数据库连接(psycopg2)?

问题描述

我想创建三个 Kombu 消费者,每个消费者都有一个到数据库的连接,每个连接都独立于其余连接。

我想替换任务函数中的“连接数据库的工作者”这句话,正在进行的消费者数据库的连接。

感谢你们对我的帮助。

它是我的消费者:

from __future__ import with_statement

from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
from kombu.utils import reprcall

from dashboard.rabbitmq import conf
from kombu import Exchange, Queue


logger = get_logger(__name__)


class Worker(ConsumerMixin):

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        _list_consumer = []
        for item in range(0, 3):
            _list_consumer.append(
                Consumer(
                    queues=[Queue('viewer', Exchange('aghigh', type='direct'), routing_key='viewer')],
                    accept=['pickle', 'json'],
                    prefetch_count=1,
                    callbacks=[self.process_task]
                )
            )
        return _list_consumer

    def process_task(self, body, message):
        fun = body['fun']
        args = body['args']
        kwargs = body['kwargs']
        logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs))
        try:
            fun(*args, **kwargs)
        except Exception as exc:
            logger.error('task raised exception: %r', exc)
        message.ack()


if __name__ == '__main__':
    from kombu import Connection

    with Connection(userid=conf.RABBIT_MQ_USER, password=conf.RABBIT_MQ_PASS) as conn:
        try:
            Worker(conn).run()
        except KeyboardInterrupt:
            print('bye bye')

和我的任务:

def task(*args, **kwargs):
    print("Hello %s" % kwargs)
    conn = "connection database of worker"
    cur = conn.cursor()
    try:
        cur.execute("INSERT INTO activity_activitystate (viewer_ip, activity_id, viewer_id, created_at) VALUES(%s, %s, %s, %s )", (
            kwargs['viewer_ip'],
            kwargs['activity_id'],
            kwargs['viewer_id'],
            kwargs['created_at'],
        ))

        cur.execute("UPDATE activity_activity SET view_count = view_count + 1 WHERE id = {0}".format(kwargs['activity_id']))

        conn.commit()
    except:
        pass

    cur.close()

此数据库连接设置:

from dashboard.singleton import Singleton
from dashboard.rabbitmq import conf
import psycopg2


@Singleton
class DbConnection(object):
    __co = None

    def __init__(self):
        if self.__co is None:
            __co = psycopg2.connect(
                host=conf.DATABASE['HOST'],
                database=conf.DATABASE['NAME'],
                user=conf.DATABASE['USER'],
                password=conf.DATABASE['PASSWORD']
            )

            self.__co = __co

    def get_connection(self):
        return self.__co


conn = DbConnection().get_connection()

标签: pythonrabbitmqkombu

解决方案


推荐阅读