import datetime from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger def aps_test(x): print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', hour=23, minute=36) scheduler.add_job(func=aps_test, args=('一次性任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12)) scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3) # 上面的写法无法pyinstaller到windows # 建议写: scheduler.add_job(func=aps_test, args=('定时任务',), trigger=CronTrigger(hour=23, minute=37)) scheduler.add_job(func=aps_test, args=('一次性任务',), trigger=DateTrigger(datetime.datetime.now() + datetime.timedelta(seconds=12))) scheduler.add_job(func=aps_test, args=('循环任务',), trigger=IntervalTrigger(seconds=3)) scheduler.start() # 开始任务,上面的任务并行处理 # 两个有用的参数: # 每小时(上下浮动120秒区间内)运行`aps_test` scheduler.add_job(aps_test, 'interval', hours=1, jitter=120) # 丢失任务的执行与合并 # 有时,任务会由于一些问题没有被执行。错过执行时间后,调度器才打开了。这时,调度器会检查每个任务的misfire_grace_time参数int值,延迟上限,来确定是否还执行失败的任务(这个参数可以全局设定的或者是为每个任务单独设定)。此时,一个哑弹任务,就可能会被连续执行多次。 # 如果在规定的时间未能执行任务,那么在120秒内允许继续运行。如果不加这个参数,服务器忙的话,可能会错过任务的执行。 scheduler.add_job(aps_test, 'interval', hours=1, misfire_grace_time=120) # 但这就可能导致一个问题,有些未完成任务实际上并不需要被执行多次。coalescing合并参数就能把一个多次的未完成当作一次未完成的任务。 # 也就是说,coalescing为True能把多个排队执行的同样任务,变成一个,而不会触发异常事件。 # 这个是给高频任务用的,比如果说一分钟执行一次,甚至多个进程。我还没用到。