python-3.x - 芹菜工人在扭动跑步时挂起
问题描述
下面的代码片段是我的 celery 设置,这里我在每个子 celery worker 进程中运行一个扭曲的反应器。
import os
from threading import Thread
from celery import Celery
from twisted.internet import threads, reactor
from celery import signals
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import LoopingCall
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
thread = Thread(target=reactor.run, name="reactor.run", args=(False,)).start()
print('STARTED NEW WORKER', os.getpid())
@signals.worker_process_shutdown.connect()
def shutdown_reactor(**kwargs):
reactor.callFromThread(reactor.stop)
print('REACTOR SHUTDOWN', os.getpid())
class OrgUnitEventHandler():
@inlineCallbacks
def process(self, *args, **kwargs):
val = sum(args)
yield val
returnValue(val)
def inThread(x, y):
obj = OrgUnitEventHandler()
output = threads.blockingCallFromThread(reactor,
obj.process, x, y)
return output
@app.task
def add(x, y):
print('ADD --> '+str(os.getpid()))
result = inThread(x, y)
print("FINAL RESULT--> "+str(result)+"-->"+str(os.getpid()))
return result
使用上面的代码,我看到大部分时间芹菜工人在同一工人再次出现新任务时被挂起并被释放。请参阅下面的日志..
[2020-05-07 20:12:26,481: WARNING/ForkPoolWorker-1] STARTED NEW WORKER
[2020-05-07 20:12:26,481: WARNING/ForkPoolWorker-1] 11651
[2020-05-07 20:12:26,498: WARNING/ForkPoolWorker-2] STARTED NEW WORKER
[2020-05-07 20:12:26,499: WARNING/ForkPoolWorker-2] 11653
[2020-05-07 20:12:26,515: WARNING/ForkPoolWorker-3] STARTED NEW WORKER
[2020-05-07 20:12:26,515: WARNING/ForkPoolWorker-3] 11655
[2020-05-07 20:12:26,533: WARNING/ForkPoolWorker-4] STARTED NEW WORKER
[2020-05-07 20:12:26,534: WARNING/ForkPoolWorker-4] 11659
[2020-05-07 20:12:32,239: WARNING/ForkPoolWorker-1] ADD --> 11651 # task-1 hang
[2020-05-07 20:12:36,611: WARNING/ForkPoolWorker-2] ADD --> 11653 # task-2 hang
[2020-05-07 20:13:00,858: WARNING/ForkPoolWorker-3] ADD --> 11655 # task-3 hang
[2020-05-07 20:13:00,859: WARNING/ForkPoolWorker-2] FINAL RESULT--> 3-->11653 # task-2 Done
[2020-05-07 20:13:00,859: WARNING/ForkPoolWorker-3] FINAL RESULT--> 3-->11655 # task-3 done
Loopingcall
当我在运行反应器时使用时,我从来没有看到过芹菜挂起的问题。
...
class EventLoop(object):
def _startLoopingCall(self, reactor):
lc = LoopingCall(print)
lc.start(1, False)
def setup(self, reactor):
reactor.callFromThread(self._startLoopingCall, reactor)
thread = Thread(target=lambda: reactor.run(installSignalHandlers=False),name="reactor ").start()
@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
EventLoop().setup(reactor)
#thread = Thread(target=reactor.run, name="reactor.run", args=(False,)).start()
print('STARTED NEW WORKER', os.getpid())
...
确认 celery 工人从未使用上述代码挂起的日志:
[2020-05-07 20:22:08,824: WARNING/ForkPoolWorker-1] STARTED NEW WORKER
[2020-05-07 20:22:08,824: WARNING/ForkPoolWorker-1] 12302
[2020-05-07 20:22:08,841: WARNING/ForkPoolWorker-2] STARTED NEW WORKER
[2020-05-07 20:22:08,841: WARNING/ForkPoolWorker-2] 12304
[2020-05-07 20:22:08,857: WARNING/ForkPoolWorker-3] STARTED NEW WORKER
[2020-05-07 20:22:08,857: WARNING/ForkPoolWorker-3] 12306
[2020-05-07 20:22:08,873: WARNING/ForkPoolWorker-4] STARTED NEW WORKER
[2020-05-07 20:22:08,874: WARNING/ForkPoolWorker-4] 12308
[2020-05-07 20:22:12,446: WARNING/ForkPoolWorker-1] ADD --> 12302
[2020-05-07 20:22:12,826: WARNING/ForkPoolWorker-1] FINAL RESULT--> 3-->12302
[2020-05-07 20:22:15,489: WARNING/ForkPoolWorker-2] ADD --> 12304
[2020-05-07 20:22:15,842: WARNING/ForkPoolWorker-2] FINAL RESULT--> 3-->12304
[2020-05-07 20:22:17,828: WARNING/ForkPoolWorker-1] ADD --> 12302
[2020-05-07 20:22:17,828: WARNING/ForkPoolWorker-1] FINAL RESULT--> 3-->12302
[2020-05-07 20:22:20,475: WARNING/ForkPoolWorker-2] ADD --> 12304
[2020-05-07 20:22:20,842: WARNING/ForkPoolWorker-2] FINAL RESULT--> 3-->12304
我无法弄清楚 LoopingCall 在这里用一个扭曲的反应器做什么才能使其正常工作。请分享你的想法
解决方案
推荐阅读
- javascript - 使用大型 jpeg 时 jsPDF 图像损坏
- actions-on-google - 报告状态时遇到内部错误
- javascript - 使用地图使用 Swiper 渲染多个 Select 组件不起作用 - Material UI React
- pine-script -
如何在某个条件下查找以前的闭包 - python-3.x - django 应用程序仅使用端口 8000
- websphere - 如何将 Cluster Queue Manager 状态从 running 更改为 inactive,以及从 inactive 更改为 running
- angular - 在 IIS 上保护 Angular Web 应用程序
- python - 未选择文件 此字段为必填项
- vue.js - vue.js + v-ymap:在所选城市显示标记。并将地图置于所选城市的中心
- cucumber-java - Cucumber-Java:避免多次登录并在多个独立场景中使用相同的会话