rabbitmq - 使用 Kombu 和 RabbitMQ 执行长时间运行的任务
问题描述
我正在尝试将 Kombu 用于长时间运行的任务(最长的任务可能需要 1 小时),但面临间歇性问题。我看到 OSError: Server unexpectedly closed connection 偶尔出现在客户端代码中。服务器日志显示连接因丢失心跳而关闭。我不确定为什么会发生这种情况,因为我正在将长时间运行的任务卸载到后台线程并保持主线程空闲,以便 kombu 与 rabbitmq 通信心跳。我怎样才能解决这个问题并使kombu在长时间运行的任务中表现良好?
这是我的消费者代码,
import config
from queue import Queue as qq
from kombu import Queue, Exchange
from kombu.mixins import ConsumerProducerMixin
from threading import Thread
class RequestConsumer(ConsumerProducerMixin):
def __init__(self):
self.connection = Connection(
hostname=config.RABBITMQ_HOST,
userid=config.RABBITMQ_USERNAME,
port=config.RABBITMQ_PORT,
password=config.RABBITMQ_PASSWORD,
virtual_host=config.RABBITMQ_VHOST,
heartbeat=2*60*60,
transport_options={"client_properties": {"connection_name": "spark-request-consumer"}}
)
self.request_exchange = Exchange(config.SPARK_REQUEST_EXCHANGE, type="fanout")
self.request_queue = Queue(config.SPARK_REQUEST_QUEUE, self.request_exchange)
self.result_exchange = Exchange(config.SPARK_RESULT_EXCHANGE, type="fanout")
self.result_queue = Queue(config.SPARK_RESULT_QUEUE, self.result_exchange)
self.task_queue = qq()
Thread(target=self.calculate_results).start()
def calculate_results(self):
while True:
try:
body, message = self.task_queue.get()
request = json.loads(body)
results = calculate(request) # long running task
if results:
self.push_to_result_queue(results)
message.ack()
logger.info("Request done!")
except Exception:
logger.error("Error while calculating results:", exc_info=True)
def push_to_result_queue(self, messages):
logger.debug("Pushing result to RabbitMQ...")
for message in messages:
body = json.dumps(message)
if message:
self.producer.publish(
body,
exchange=self.result_exchange,
routing_key="",
delivery_mode=2,
content_type="application/json",
content_encoding="utf-8",
)
def callback(self, body, message):
logger.info("Received a request!")
self.task_queue.put((body, message))
logger.info("Request put in internal queue!")
def get_consumers(self, Consumer, channel):
return [Consumer(queues=[self.request_queue], callbacks=[self.callback], prefetch_count=1)]
解决方案
推荐阅读
- numpy - 使用 np.random-normal 而不是 tf.random_normal 的不同优化行为
- recover - 如何恢复调整大小的 btrfs lvm 分区
- c# - 如何将 MaterialDesignXamlToolkit 包含到 WPF 类库中?
- c# - 正确地将配置注入 ASP Core
- python - 如何在我的代码中修复此“NoneType”错误?
- c# - 在 Unity 编辑器中导入对象,运行构建项目时出错
- node.js - 无法使用 Nginx 登录 keyrock IDM
- javascript - Fullcalendar v4 customButton 中的 Datetimepicker-DIV
- php - 如何使用“时间”(来自数据库的数据,数据类型:时间戳)在 Chart JS 中绘制图形
- c# - 将字符串转换为 int[] 返回不同的值