python-3.x - apsheduler 关注点分离打破了它
问题描述
我对这个应用程序应该如何工作的心智模型是能够将调用 add_job 的进程与管理进程的进程分开。
当我在开始计划之前添加作业时,这工作正常,但当我尝试将它分成单独的函数时它不会。为什么?add_job 之后是否有提交函数我需要调用
我使用的 sqlite 和 BlockingScheduler 对我的目的更有意义,尽管它已转移到 postgresql 进行调试。
from datetime import datetime, timedelta
from time import sleep
import logging
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
import click
from crontab import CronTab
import pytz
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
jobstores = {
'default': SQLAlchemyJobStore(url='postgresql+psycopg2://myusername:mypassword@localhost/mydb')
}
executors = {
'default': ThreadPoolExecutor(5),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
sched = BackgroundScheduler(jobstores=jobstores, timezone=pytz.timezone('Australia/Sydney'))
def my_job(text):
now = datetime.now()
print(f'{now} text: {text}')
@click.group()
def cli():
pass
@click.command()
@click.option('--message', default='<BLANK>', help='to display when ')
@click.option('--crontab', default='*/1 * * * *', help='Timestamp of ')
def add_job(message, crontab):
# entry = CronTab('0 0 ? * TUE,THU')
entry = CronTab(crontab)
number_of_seconds = entry.next()
timestamp = datetime.now(pytz.timezone('Australia/Sydney')) + timedelta(seconds=number_of_seconds)
move_service_message = f'Service {message} will be moved @ {timestamp}'
sched.add_job(
my_job,
'date',
run_date=timestamp,
args=[move_service_message]
)
print('added job:' + move_service_message)
@click.command()
def start():
# this will wait forever
sched.start()
try:
# This is here to simulate application activity (which keeps the main thread alive).
while True:
sleep(10)
except (KeyboardInterrupt, SystemExit):
# Not strictly necessary if daemonic mode is enabled but should be done if possible
sched.shutdown()
cli.add_command(start)
cli.add_command(add_job)
if __name__ == "__main__":
exit_code = 0 # assume it will be okay
time_started = datetime.now()
try:
cli()
except Exception as e:
print('Exception:', e)
exit_code = 1
finally:
exit(exit_code)
我的包裹是最新的,包括
APScheduler 3.6.0
SQLAlchemy 1.3.1
解决方案
似乎我认为它可能与FAQ中记录的不同。尽管只是修改了我的期望,但我仍然会使用它。
推荐阅读
- php - 如何使用 Laravel 生成报告票号?
- apache-kafka - 处理 kafka 消费者中的反序列化错误(死信队列)
- entity-framework - 添加数据时是否可以确保顺序?
- excel - 如何仅禁用 Excel 电子表格中特定列的填充句柄?
- python - 计算嵌套列表中的最大差异
- html - 如何在使用 Spacy NER 预测命名实体时从文本中删除 html 标签,并再次使用 html 标签以原始格式显示相同的文本?
- d3.js - d3 浮动分组条在时间轴中具有范围值
- c# - 如何强制 MVC 应用程序在启用 Windows 身份验证的情况下要求提供凭据
- javascript - 选择框选项列表更改检测其他选择框更改未检测到更改
- python - 如何在pyautogui中单击图像中心