首页 > 解决方案 > 使用 Kombu 和 RabbitMQ 执行长时间运行的任务

问题描述

我正在尝试将 Kombu 用于长时间运行的任务(最长的任务可能需要 1 小时),但面临间歇性问题。我看到 OSError: Server unexpectedly closed connection 偶尔出现在客户端代码中。服务器日志显示连接因丢失心跳而关闭。我不确定为什么会发生这种情况,因为我正在将长时间运行的任务卸载到后台线程并保持主线程空闲,以便 kombu 与 rabbitmq 通信心跳。我怎样才能解决这个问题并使kombu在长时间运行的任务中表现良好?

这是我的消费者代码,

import config
from queue import Queue as qq

from kombu import Queue, Exchange
from kombu.mixins import ConsumerProducerMixin
from threading import Thread

class RequestConsumer(ConsumerProducerMixin):

    def __init__(self):
        self.connection = Connection(
                hostname=config.RABBITMQ_HOST,
                userid=config.RABBITMQ_USERNAME,
                port=config.RABBITMQ_PORT,
                password=config.RABBITMQ_PASSWORD,
                virtual_host=config.RABBITMQ_VHOST,
                heartbeat=2*60*60,
                transport_options={"client_properties": {"connection_name": "spark-request-consumer"}}
            )

        self.request_exchange = Exchange(config.SPARK_REQUEST_EXCHANGE, type="fanout")
        self.request_queue = Queue(config.SPARK_REQUEST_QUEUE, self.request_exchange)
        self.result_exchange = Exchange(config.SPARK_RESULT_EXCHANGE, type="fanout")
        self.result_queue = Queue(config.SPARK_RESULT_QUEUE, self.result_exchange)
        self.task_queue = qq()
        Thread(target=self.calculate_results).start()

    def calculate_results(self):
        while True:
            try:
                body, message = self.task_queue.get()
                request = json.loads(body)
                results = calculate(request)  # long running task
                if results:
                    self.push_to_result_queue(results)
                message.ack()
                logger.info("Request done!")
            except Exception:
                logger.error("Error while calculating results:", exc_info=True)

    def push_to_result_queue(self, messages):
        logger.debug("Pushing result to RabbitMQ...")
        for message in messages:
            body = json.dumps(message)
            if message:
                self.producer.publish(
                    body,
                    exchange=self.result_exchange,
                    routing_key="",
                    delivery_mode=2,
                    content_type="application/json",
                    content_encoding="utf-8",
                )

    def callback(self, body, message):
        logger.info("Received a request!")
        self.task_queue.put((body, message))
        logger.info("Request put in internal queue!")

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=[self.request_queue], callbacks=[self.callback], prefetch_count=1)]

标签: rabbitmqceleryamqpkombu

解决方案


推荐阅读