首页 > 解决方案 > 芹菜工人在扭动跑步时挂起

问题描述

下面的代码片段是我的 celery 设置,这里我在每个子 celery worker 进程中运行一个扭曲的反应器。

import os
from threading import Thread
from celery import Celery
from twisted.internet import threads, reactor
from celery import signals
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import LoopingCall

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
    thread = Thread(target=reactor.run, name="reactor.run", args=(False,)).start()
    print('STARTED NEW WORKER', os.getpid())

@signals.worker_process_shutdown.connect()
def shutdown_reactor(**kwargs):
    reactor.callFromThread(reactor.stop)
    print('REACTOR SHUTDOWN', os.getpid())

class OrgUnitEventHandler():
    @inlineCallbacks
    def process(self, *args, **kwargs):
        val = sum(args)
        yield val
        returnValue(val)

def inThread(x, y): 
    obj = OrgUnitEventHandler()
    output = threads.blockingCallFromThread(reactor,
                                        obj.process, x, y)
    return output

@app.task
def add(x, y):
    print('ADD --> '+str(os.getpid()))
    result = inThread(x, y)
    print("FINAL RESULT--> "+str(result)+"-->"+str(os.getpid()))
    return result

使用上面的代码,我看到大部分时间芹菜工人在同一工人再次出现新任务时被挂起并被释放。请参阅下面的日志..

[2020-05-07 20:12:26,481: WARNING/ForkPoolWorker-1] STARTED NEW WORKER
[2020-05-07 20:12:26,481: WARNING/ForkPoolWorker-1] 11651
[2020-05-07 20:12:26,498: WARNING/ForkPoolWorker-2] STARTED NEW WORKER
[2020-05-07 20:12:26,499: WARNING/ForkPoolWorker-2] 11653
[2020-05-07 20:12:26,515: WARNING/ForkPoolWorker-3] STARTED NEW WORKER
[2020-05-07 20:12:26,515: WARNING/ForkPoolWorker-3] 11655
[2020-05-07 20:12:26,533: WARNING/ForkPoolWorker-4] STARTED NEW WORKER
[2020-05-07 20:12:26,534: WARNING/ForkPoolWorker-4] 11659
[2020-05-07 20:12:32,239: WARNING/ForkPoolWorker-1] ADD --> 11651   # task-1 hang
[2020-05-07 20:12:36,611: WARNING/ForkPoolWorker-2] ADD --> 11653   # task-2 hang
[2020-05-07 20:13:00,858: WARNING/ForkPoolWorker-3] ADD --> 11655   # task-3 hang
[2020-05-07 20:13:00,859: WARNING/ForkPoolWorker-2] FINAL RESULT--> 3-->11653  # task-2 Done
[2020-05-07 20:13:00,859: WARNING/ForkPoolWorker-3] FINAL RESULT--> 3-->11655 # task-3 done

Loopingcall当我在运行反应器时使用时,我从来没有看到过芹菜挂起的问题。

...


class EventLoop(object):
    def _startLoopingCall(self, reactor):
        lc = LoopingCall(print)
        lc.start(1, False)

    def setup(self, reactor):
        reactor.callFromThread(self._startLoopingCall, reactor)
        thread = Thread(target=lambda: reactor.run(installSignalHandlers=False),name="reactor ").start()

@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
    EventLoop().setup(reactor)
    #thread = Thread(target=reactor.run, name="reactor.run", args=(False,)).start()
    print('STARTED NEW WORKER', os.getpid())

...

确认 celery 工人从未使用上述代码挂起的日志:

[2020-05-07 20:22:08,824: WARNING/ForkPoolWorker-1] STARTED NEW WORKER
[2020-05-07 20:22:08,824: WARNING/ForkPoolWorker-1] 12302
[2020-05-07 20:22:08,841: WARNING/ForkPoolWorker-2] STARTED NEW WORKER
[2020-05-07 20:22:08,841: WARNING/ForkPoolWorker-2] 12304
[2020-05-07 20:22:08,857: WARNING/ForkPoolWorker-3] STARTED NEW WORKER
[2020-05-07 20:22:08,857: WARNING/ForkPoolWorker-3] 12306
[2020-05-07 20:22:08,873: WARNING/ForkPoolWorker-4] STARTED NEW WORKER
[2020-05-07 20:22:08,874: WARNING/ForkPoolWorker-4] 12308


[2020-05-07 20:22:12,446: WARNING/ForkPoolWorker-1] ADD --> 12302
[2020-05-07 20:22:12,826: WARNING/ForkPoolWorker-1] FINAL RESULT--> 3-->12302
[2020-05-07 20:22:15,489: WARNING/ForkPoolWorker-2] ADD --> 12304
[2020-05-07 20:22:15,842: WARNING/ForkPoolWorker-2] FINAL RESULT--> 3-->12304
[2020-05-07 20:22:17,828: WARNING/ForkPoolWorker-1] ADD --> 12302
[2020-05-07 20:22:17,828: WARNING/ForkPoolWorker-1] FINAL RESULT--> 3-->12302
[2020-05-07 20:22:20,475: WARNING/ForkPoolWorker-2] ADD --> 12304
[2020-05-07 20:22:20,842: WARNING/ForkPoolWorker-2] FINAL RESULT--> 3-->12304

我无法弄清楚 LoopingCall 在这里用一个扭曲的反应器做什么才能使其正常工作。请分享你的想法

标签: python-3.xcelerytwistedreactortwisted.internet

解决方案


推荐阅读