python - 芹菜工人从哪个目录开始
问题描述
我需要一些关于芹菜工人的帮助。我尤其无法理解需要从哪里(哪个目录)触发 celery worker 命令,以及它背后的概念是什么以及与导入有关的一些事情。
所以说我有以下目录结构:
.
├── __init__.py
├── entry.py
├── state1
│ ├── __init__.py
│ ├── family1
│ │ ├── __init__.py
│ │ ├── task1.py
│ │ ├── task2.py
│ │ └── task3.py
│ └── family2
│ ├── __init__.py
│ └── task1.py
└── state2
├── __init__.py
├── family1
│ ├── __init__.py
│ ├── task1.py
│ └── task2.py
└── family2
├── __init__.py
├── task1.py
└── task2.py
.
根目录是当前工作目录,名为 project
每个 taskn.py(task1.py、task2.py 等)都是单独的任务。每个任务文件如下所示:
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost')
CELERY_CONFIG = {
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
'CELERY_TASK_SERIALIZER': 'pickle',
'CELERY_ACCEPT_CONTENT': ['json','pickle']
}
celapp.conf.update(**CELERY_CONFIG)
@celapp.task()
def t1():
print("starting task")
time.sleep(5)
print("Finished task")
以下是 的内容entry.py
:
import json
from flask_cors import CORS
from flask import Flask, Response, render_template
from flask import request, jsonify, redirect
from functools import wraps
<what would be the import statement to import all the tasks>
_name_ = "project_x"
app = Flask(_name_)
@app.route("/api1", methods=['POST'])
def api1():
req = request.jsonify
if not req:
return jsonify(success=False, msg="Missing request parameters", code="1")
else:
param1 = req.get('p1')
param2 = req.get('p2')
tId = startTask()
return jsonify(success="True", msg="All Good", taskId=tId)
def startTask():
tId = "abcd123"
created_task = state1.family1.task1.subtask(queue='q1')
created_task.delay()
return tId
if __name__ == '__main__':
app.run(debug=True, host="192.168.1.7", port="4444")
entry.py 是触发 api1 的烧瓶应用程序,然后根据我想要启动特定任务的参数。
现在这是我的问题:
entry.py
导入文件中所有任务的导入语句是什么- 我从哪里开始工人。我的意思是我应该从哪个目录启动
Celery -A <directory name> worker -l info
命令,为什么? - 在许多示例中,我看到任务和 CeleryApp 文件之间存在明显的隔离。有人可以建议安排我的任务和芹菜配置等的更好方法吗?上述两个问题如何与这个新提议的结构保持一致?
解决方案
好的,希望这可能会有所帮助。我会按照你的要求反过来回答。
在许多示例中,我看到任务和 CeleryApp 文件之间存在明显的隔离。有人可以建议安排我的任务和芹菜配置等的更好方法吗?上述两个问题如何与这个新提议的结构保持一致?
我在您添加的片段中看到的第一个问题,taskn.py
您拥有的每个片段都有他自己的celery
. 您需要在每个taskn.py
. 我推荐的是创建一个celery_app.py
my_app
├── __init__.py
├── entry.py
├── celery_app.py
│ ├── ...
在此文件中,您将创建 celery 实例
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost')
CELERY_CONFIG = {
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
'CELERY_TASK_SERIALIZER': 'pickle',
'CELERY_ACCEPT_CONTENT': ['json','pickle']
}
celapp.conf.update(**CELERY_CONFIG)
celery_app.conf.imports = [
'state1.family1.task1',
'my_app.state1.family1.task2', # Or Maybe
...
]
然后在每一个taskn.py
你可以导入这个实例,每个任务都将注册在同一个 celery 应用程序下
from my_app.celery_app import celapp
@celapp.task()
def t1():
print("starting task")
time.sleep(5)
print("Finished task")
我从哪里开始工人。我的意思是我应该从哪个目录启动 Celery -A worker -l info 命令,为什么?
然后你应该很容易调用Celery -A my_app.celery_app worker -l info
,因为你的 celery 实例将在模块 my_app,子模块 celery_app
导入entry.py中所有任务的导入语句是什么
最后,entry.py
您可以执行import state1.family1.task1 import t1
和调用t1.delay()
或任何已注册的任务。
推荐阅读
- python-3.x - pycharm 错误:包“pandas”需要不同的 Python:3.5.2 不在“>=3.5.3”中
- android - 从适配器类删除后从 sqlite 数据库更新 RecyclerView
- java - Java 游戏石头剪刀布。无法弄清楚主要方法用户控制循环继续播放
- python - 您可以在处于自由层的同时从并行或异步函数中写入 dynamodb 吗?
- python - 将每日数据重新格式化为 pandas 中的每周数据,同时保留与每周日期相对应的多行
- javascript - 应该保存暗模式设置的 Javascript 在 Firefox 的子文件夹中不起作用。如何解决这个问题?
- xcode - 不能将 GROUP 或 BUTTON 放到 Watchkit App 的情节提要“主机控制器”上?
- tensorflow - 如果我使用 y_pred 作为 keras 中的自定义损失函数会发生什么?这不应该总是有效,因为 y_pred 总是可以区分的吗?
- arrays - 无论数据帧中数组中所需变量的长度如何,如何在其中动态创建包含数组的字典?
- java - 为什么在我第二次输入内容之前循环中断