python - 带有额外参数的 Celery 自定义任务
问题描述
我目前正在从事一个有许多芹菜任务的项目,每个项目都有复杂的重试规则。
我们配置max_retries
和retry_backoffice
使用环境变量,如果max_retries
超出,我们将消息放在“后备队列”中,如example_task.apply_async(queue=settings.EXAMPLE_FALLBACK_QUEUE, kwargs=kwargs)
. 问题是我们正在复制大量代码并使用以下代码混淆我们的任务:
@app.task
def example_task(**kwargs):
try:
''' The real task business '''
...
except Exception as e:
# Here we have some logs
# Also some checks about the nature of e and how to proceed
# some cases we run:
example_task.retry(
exc=e, max_retries=max_retries, countdown=countdown
)
# and other cases we have
example_task.apply_async(queue=settings.example_task_fallback_queue, kwargs=kwargs)
我的想法是用一些额外的参数扩展celery.Task
类并做这样的事情:
@app.task(@app.task(ignore_result=False, bind=True, base=RetryTask(place_an_order_config))
def example_task(kwargs):
'''Just the real business here'''
并实现一个类,如:
@dataclass
class RetryConfig:
fallback_queue: str
max_retries: int
retry_backoff: int
event_name: str
traced_exceptions: List[Any]
class RetryTask(celery.Task):
def __init__(self, config: RetryConfig):
super().__init__()
self.config = config
def __call__(self, *args, **kwargs):
try:
self.run(*args, **kwargs)
except Exception as e:
if e in self.config.traced_exceptions:
# Logs
return
# And here implement the retry/fallback logic
# Using the parameterization attr `self.config` to
# setup
问题是 arg base
inapp.task
需要一个类(并且 celery 在内部实例化该类)而不是一个对象,所以我将RetryTask
类更改为如下所示:
class RetryTask: # Dont inhirit directly from celery.Task
def __init__(self, config: RetryConfig):
self = celery.Task()
self.config = config
def __call__(self, *args, **kwargs):
try:
self.run(*args, **kwargs)
except Exception as e:
if e in self.config.traced_exceptions:
# Logs
return
# And here implement the retry/fallback logic
# Using the parameterization attr `self.config` to
# setup
这样我就可以将RetryTask
对象传递给app.task
装饰器并参数化我的任务。不幸的是,上面的代码TypeError: metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases
在app.task
函数检查时抛出task = type(fun.__name__, (base,), dict(...)
。
所以我的问题是:如何以正确的方式参数化我的自定义RetryTask
类?
解决方案
推荐阅读
- python - 在饼图中显示三个最佳项目并将其余项目总结为其他
- tinymce - 如何从 TinyMCE 中的单元格/行属性对话框中删除字段
- modelica - TRANSFORM 库是否与 OpenModelica 兼容
- apache-kafka - 如何在 Kafka 中并行处理多个 CSV 文件?
- mongodb - 根据某些列找出 MongoDB 中两个集合中存在的公共行
- python - 安装 WebP,conan 错误
- go - 如何使用 golang 查询检索 LDAP 条目的所有属性?
- tensorflow - 如何在TensorFlow中给定稀疏矩阵数据计算余弦相似度?
- tensorflow - 使用 TensorBoard 从 tf_agents 可视化图形
- json - JSON-RPC 与 JSON 补丁