首页 > 解决方案 > celery flask PicklingError,属性查找在 flask_security.core 上失败

问题描述

我在烧瓶中使用 celery 定期执行任务,并使用文档中的演示,调试消息显示正在发送任务,但我没有看到打印输出。

我使用的broker是redis,我使用文档中的方法来创建celery。

我的manage.py档案

import ...

app = create_app()
migrate = Migrate(app, db)
manager = Manager(app)
manager.add_command('db', MigrateCommand)
celery = make_celery(app)


@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )


@celery.task
def test(arg):
    print(arg)

和函数 make_celery

def make_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask

    return celery

我使用这个命令来启动 celery beat

celery -A manage.celery beat --loglevel=DEBUG

创建应用功能

def create_app():
    from apps.models import db
    from admin import admin, security, user_datastore

    app = Flask(__name__, static_folder=settings.ProductSetting.STATIC_FOLDER)
    app.register_blueprint(blueprint)
    app.config.from_object(settings.ProductSetting)
    db.init_app(app) 
    ckeditor = CKEditor(app)
    security_ctx = security.init_app(app, user_datastore)
    login_manager = LoginManager(app)

    @login_manager.user_loader
    def load_user(user_id):
        return db.session.query(User).get(user_id)

    # define a context processor for merging flask-admin's template context into the
    # flask-security views.
    @security_ctx.context_processor
    def security_context_processor():
        return dict(
            admin_base_template=admin.base_template,
            admin_view=admin.index_view,
            h=admin_helpers,
            get_url=url_for
        )

    admin.init_app(app)

    def my_access_control_function():
        """
        :return: True if the user is allowed to access the filemanager, otherwise False
        """
        # You can do whatever permission check you need here
        if not current_user.is_active or not current_user.is_authenticated:
            return False

        if current_user.has_role('superuser'):
            return True

        return False

    flaskfilemanager.init(app, access_control_function=my_access_control_function)
    return app

输出:

Configuration ->
    . broker -> redis://localhost:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%DEBUG
    . maxinterval -> 5.00 minutes (300s)
[2019-02-12 11:27:41,800: DEBUG/MainProcess] Setting default socket timeout to 30
[2019-02-12 11:27:41,801: INFO/MainProcess] beat: Starting...
[2019-02-12 11:27:41,818: DEBUG/MainProcess] Current schedule:
<ScheduleEntry: manage.test('world') manage.test('world') <freq: 30.00 seconds>
<ScheduleEntry: add every 10 manage.test('hello') <freq: 10.00 seconds>
<ScheduleEntry: manage.test('Happy Mondays!') manage.test('Happy Mondays!') <crontab: 30 7 1 * * (m/h/d/dM/M
Y)>
[2019-02-12 11:27:41,818: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes
[2019-02-12 11:27:42,842: INFO/MainProcess] Scheduler: Sending due task add every 10 (manage.test)
[2019-02-12 11:27:44,900: DEBUG/MainProcess] beat: Synchronizing schedule...
[2019-02-12 11:27:44,911: DEBUG/MainProcess] manage.test sent. id->acc7f805-d8d5-4ae3-af35-1b5da1798749
[2019-02-12 11:27:44,912: DEBUG/MainProcess] beat: Waking up in 8.33 seconds.
[2019-02-12 11:27:53,256: INFO/MainProcess] Scheduler: Sending due task manage.test('world') (manage.test)
[2019-02-12 11:27:53,257: DEBUG/MainProcess] manage.test sent. id->1c691bd7-0ba1-4215-8c56-fbc59d5ca775
[2019-02-12 11:27:53,257: INFO/MainProcess] Scheduler: Sending due task add every 10 (manage.test)
[2019-02-12 11:27:53,258: DEBUG/MainProcess] manage.test sent. id->709fe5fd-83c8-4fca-a8e0-0c1ca8d25e84
[2019-02-12 11:27:53,258: DEBUG/MainProcess] beat: Waking up in 9.99 seconds.

启动工人后输出

命令

celery -A manage.celery worker -l info
- ** ---------- [config]
- ** ---------- .> app:         apps:0x27466765c88
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . manage.test

[2019-02-12 15:05:13,190: CRITICAL/MainProcess] Unrecoverable error: PicklingError("Can't pickle <function <
lambda> at 0x000002687E93EA60>: attribute lookup <lambda> on flask_security.core failed",)
Traceback (most recent call last):
  File "e:\env\games\lib\site-packages\celery\worker\worker.py", line 205, in start
    self.blueprint.start(self)
  File "e:\env\games\lib\site-packages\celery\bootsteps.py", line 119, in start
    step.start(parent)
  File "e:\env\games\lib\site-packages\celery\bootsteps.py", line 369, in start
    return self.obj.start()
  File "e:\env\games\lib\site-packages\celery\concurrency\base.py", line 131, in start
    self.on_start()
  File "e:\env\games\lib\site-packages\celery\concurrency\prefork.py", line 112, in on_start
    **self.options)
  File "e:\env\games\lib\site-packages\billiard\pool.py", line 1007, in __init__
    self._create_worker_process(i)
  File "e:\env\games\lib\site-packages\billiard\pool.py", line 1116, in _create_worker_process
    w.start()
  File "e:\env\games\lib\site-packages\billiard\process.py", line 124, in start
    self._popen = self._Popen(self)
  File "e:\env\games\lib\site-packages\billiard\context.py", line 383, in _Popen
    return Popen(process_obj)
  File "e:\env\games\lib\site-packages\billiard\popen_spawn_win32.py", line 79, in __init__
    reduction.dump(process_obj, to_child)
  File "e:\env\games\lib\site-packages\billiard\reduction.py", line 99, in dump
    ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x000002687E93EA60>: attribute lookup <lambda> on
flask_security.core failed

标签: pythonflaskcelery

解决方案


问题是该行:

celery.conf.update(app.config)

将采用所有烧瓶(包括烧瓶安全)配置并将它们添加到 celery 配置中。

在烧瓶安全中,一些配置变量实际上是 lambda(以及一些 localProxies)。当然,这些不会腌制-有些实际上取决于所处的环境。为什么你需要所有这些?


推荐阅读