首页 > 解决方案 > django - apscheduler 如何在 uwsgi 重新启动时保持工作

问题描述

django - 2.2.12

调度程序 - 3.6.3

我想将通知设置为在用户执行特定任务两周后发送。

调度程序.py

import logging
import time

from apscheduler.schedulers.background import BackgroundScheduler

logger = logging.getLogger(__name__)

class JobLauncher:

    _sched = None

    def __init__(self):
        JobLauncher._sched = BackgroundScheduler()
        JobLauncher._sched.start()

    def __str__(self):
        return "JobLauncher"

    def run(self, job):
        return self.run_job(job)

    def stop(self, job):
        JobLauncher._sched.remove_job(job.name)

    def shutdown(self):
        if JobLauncher._sched.running():
            logger.debug('Scheduler is shutting down.')
            JobLauncher._sched.shutdown()
        else:
            logger.warn("Cannot shutdown scheduler because scheduler is not running at the moment. please check scheduler status.")

    def run_job(self, job):
        if JobLauncher._sched.get_job(job.name) is None:
          _job = JobLauncher._sched.add_job(func=job.runMethod, trigger='date', id=job.name, args=job.job_params,run_date = job.job_date)
          return True
        return False

class CommonJob:

    def __str__(self):
        return "Job Infos : {name : %s, job_params : %s}" % (self.name, self.job_params)

    @property
    def name(self):
        return self._name
      
    @name.setter 
    def name(self, new_name):
        self._name = new_name
      
    @property
    def job_date(self):
        return self._job_date
    
    @job_date.setter 
    def job_date(self, new_job_date):
        self._job_date = new_job_date
       
    @property
    def job_params(self):
        return self._job_params   
        
    @job_params.setter 
    def job_params(self, new_job_params):
        self._job_params = new_job_params
        
    @property
    def runMethod(self):
        return self._runMethod
    
    @runMethod.setter 
    def runMethod(self, new_runMethod):
        self._runMethod = new_runMethod
    


class JobLauncherHolder:

    _launcher = None

    @staticmethod
    def getInstance():
        if not JobLauncherHolder._launcher:
            JobLauncherHolder._launcher = JobLauncher()

        return JobLauncherHolder._launcher

添加工作代码

from utils.scheduler import JobLauncherHolder,CommonJob

def event(self,userUID):
    launcher = JobLauncherHolder.getInstance()
    if launcher:
       job = CommonJob()
       job.name = str(userUID) + 'alarm'
       date =  datetime.now()  + timedelta(days=14)
       job.job_date = date
       job.runMethod = self.testAlarm
       job.job_params = [userUID]
       launcher.run(job)

def testAlarm(self,userUID):
    sendTestFCM(userUID)

在 20 秒而不是 14 天后更改它作为测试代码并运行它,将正常收到通知。

但是当我在 3 分钟后更改它并运行sudo service uwsgi restart时,作业没有运行。为了在实际服务中使用它,即使重新启动以进行更新,也必须维护作业。

我试过商店redis

 def __init__(self):
        jobstores = {
        'default': RedisJobStore(jobs_key='dispatched_trips_jobs', 
        run_times_key='dispatched_trips_running', host='localhost', port=6379)
        }
        JobLauncher._sched = BackgroundScheduler(jobstores=jobstores)
        JobLauncher._sched.start()

引发错误

File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/django/core/handlers/exception.py", line 34, in inner
response = get_response(request)
 File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/django/core/handlers/base.py", line 115, in _get_response
    response = self.process_exception_by_middleware(e, request)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/django/core/handlers/base.py", line 113, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/django/views/decorators/csrf.py", line 54, in wrapped_view
    return view_func(*args, **kwargs)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/rest_framework/viewsets.py", line 116, in view
    return self.dispatch(request, *args, **kwargs)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/rest_framework/views.py", line 495, in dispatch
    response = self.handle_exception(exc)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/rest_framework/views.py", line 455, in handle_exception
    self.raise_uncaught_exception(exc)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/rest_framework/views.py", line 492, in dispatch
    response = handler(request, *args, **kwargs)
  File "./post/views.py", line 826, in scheduleTest
    launcher.run(job)
  File "./utils/scheduler.py", line 30, in run
    return self.run_job(job)
  File "./utils/scheduler.py", line 44, in run_job
    _job = JobLauncher._sched.add_job(func=job.runMethod, trigger='date', id=job.name, args=job.job_params,run_date = job.job_date)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/apscheduler/schedulers/base.py", line 443, in add_job
    self._real_add_job(job, jobstore, replace_existing)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/apscheduler/schedulers/base.py", line 867, in _real_add_job
    store.add_job(job)
  File "/home/ubuntu/myApp/venv/lib/python3.7/site-packages/apscheduler/jobstores/redis.py", line 82, in add_job
    self.pickle_protocol))
TypeError: can't pickle uwsgi._Input objects

请告诉我在重新启动时保持工作的最佳方法uwsgimemory redis mysql可以使用三种方法。

标签: djangoapscheduler

解决方案


发生泡菜错误是因为您试图uwsgi._Input在其参数中添加一个包含不可序列化对象(在本例中)的持久作业。当调度的函数实际上是一个实例方法并且该实例包含一个不可序列化的对象时,这有时可能会无意中发生。

要解决此问题,您需要检查传递给作业的参数,以查看其中是否包含uwsgi._Input对象。确保您还检查了目标函数。如果它是一个实例方法,请确保该实例也不包含这样的对象作为成员。


推荐阅读