python - 对时间敏感的应用程序的并发、并行和多处理
问题描述
需要一些程序设计的指导。我是初学者,所以我不知道是否有更好的方法来解决这个问题。我尝试使用 multiprocessing 和 asyncio 来解决时间问题。在我认为是最好的方法的多处理下,我的程序在一个进程中的一个函数中运行 sql 查询,然后该函数通过在另一个进程中运行的 Manager().list([]) 将结果传递给第二个函数。
我正在使用 schedule 模块每 10 秒运行一次 sql 查询函数,并且每秒运行第二个函数。(我需要根据第二个函数中的当前日期/时间评估从 sql 查询返回的日期/时间以进行后续操作。)我看到的是 sql 函数似乎中断了第二次评估函数,例如我将在时间评估函数中看到当前日期/时间增量(以秒为单位),但是当 sql 查询函数运行时,我看到一、二或三秒的间隔(以及来自 sql 查询的匹配日期/时间到当前日期/时间不会触发期望动作),然后日期/时间开始再次增加。
我不确定多处理是否按预期工作,或者我的代码是否有错误,或者我是否应该尝试另一种方法(广泛接受建议)
谢谢!!!!
真的希望避免发布我的代码,我是一个真正的 python 初学者,我知道这是一团糟(难以置信的轻描淡写!)而且我对此感到有点尴尬......
from multiprocessing import Process, Manager
import pendulum
import schedule
import time
import mysql.connector
from mysql.connector import Error
import paho.mqtt.client as mqttc
import json
def worker(data, final_list):
def create_connection(host_name, data_base, user_name, user_password):
print('create_connection:111111111111111111111111111111111111111111111111111')
connection = None
try:
connection = mysql.connector.connect(
host=host_name,
database=data_base,
user=user_name,
passwd=user_password,
# Auto commit
autocommit=True
)
print("Connection to MySQL DB from function successful")
except Error as e:
print(f"The error '{e}' occurred")
return connection
def query_database():
print('running query_database:22222222222222222222222222222222222222222222222222222222222222222222')
connection = create_connection("10.0.0.XXX", "webcalendar", "python2", "XXXXXXX")
queryappointments = """
SELECT DISTINCT
webcal_entry.cal_id,
webcal_entry.cal_date,
webcal_entry.cal_due_time,
webcal_entry.cal_name,
webcal_entry.cal_description,
webcal_entry_user.cal_status,
webcal_entry.cal_time
FROM
webcal_entry_user
LEFT JOIN webcal_entry ON webcal_entry.cal_id = webcal_entry_user.cal_id
WHERE
webcal_entry_user.cal_status = 'A'
"""
cursor = connection.cursor()
unformatted_appointments_in_list_of_tuples = None
try:
cursor.execute(queryappointments)
unformatted_appointments_in_list_of_tuples = cursor.fetchall()
except Error as e:
print(f"The error '{e}' occurred")
finally:
if (connection.is_connected()):
cursor.close()
connection.close()
print("MySQL connection is closed")
return unformatted_appointments_in_list_of_tuples
def convert_unformatted_appointments_in_list_of_tuples_to_list_of_lists():
unformatted_appointments_in_list_of_tuples = query_database()
print('running convert_unformatted_appointments_in_list_of_tuples_to_list_of_lists:333333333333333333333')
unformatted_appointments_in_list_of_lists = [list(appointments) for appointments in
unformatted_appointments_in_list_of_tuples]
print(unformatted_appointments_in_list_of_lists)
return unformatted_appointments_in_list_of_lists
def create_untimed_reminders_in_list_of_lists():
# Webcalendar uses a -1 in webcal_entry.cal_time to indicate an untimed appointment
# this function creates a list of lists for these entries
print('running create_untimed_reminders_in_list_of_lists:444444444444444444444444444444444444444444444444')
untimed_reminders = []
unformatted_appointments_in_list_of_lists = convert_unformatted_appointments_in_list_of_tuples_to_list_of_lists()
for i in (range(len(unformatted_appointments_in_list_of_lists))):
if unformatted_appointments_in_list_of_lists[i][6] == -1:
untimed_reminders.append(unformatted_appointments_in_list_of_lists[i])
print("untimed_reminders:")
print(untimed_reminders)
return untimed_reminders
def remove_untimed_appointments_in_list_of_lists():
# Webcalendar uses a -1 in webcal_entry.cal_time to indicate an untimed appointment
# this function removes those entries
print('running remove_untimed_appointments_in_list_of_lists:555555555555555555555555555555555555555555555')
unformatted_appointments_in_list_of_lists = convert_unformatted_appointments_in_list_of_tuples_to_list_of_lists()
for i in reversed(range(len(unformatted_appointments_in_list_of_lists))):
if unformatted_appointments_in_list_of_lists[i][6] == -1:
del unformatted_appointments_in_list_of_lists[i]
print("remindersminus:")
print(unformatted_appointments_in_list_of_lists)
return unformatted_appointments_in_list_of_lists
def format_appointments_in_list_of_lists_with_date_time():
unformatted_appointments_in_list_of_lists = remove_untimed_appointments_in_list_of_lists()
print('running format_appointments_in_list_of_lists_with_date_time:6666666666666666666666666666666666666666')
# Clear final_list = Manager().list([]) so we don't keep reappending same appointments
final_list[:] = []
final_list[:]
for sub_list in unformatted_appointments_in_list_of_lists:
# Webcalendar stores the appointment date as an integer in webcal_entry.cal_date and the appointment
# time as an integer in webcal_entry.cal_due_time, we need to combine these two fields and add a "T"
# between them to create an ISO 8601 Datetime string
sub_list[0] = (str(sub_list[1])) + "T" + str('{:>06d}'.format(sub_list[2]))
sub_list.remove(sub_list[1])
sub_list.remove(sub_list[1])
for sub_list in unformatted_appointments_in_list_of_lists:
# Webcalendar stores datetimes as UTC, we need to convert the string we created to an UTC datetime objects
sub_list[0] = (pendulum.parse(sub_list[0], tz='UTC'))
for sub_list in unformatted_appointments_in_list_of_lists:
# Since we want the MQTT messages to fire at the local date/time, we need to convert the datetime objects to
# the local timezone
sub_list[0] = sub_list[0].in_tz('America/Los_Angeles')
for item in unformatted_appointments_in_list_of_lists:
final_list.append(item)
print('unformatted_appointments_in_list_of_lists:')
print(unformatted_appointments_in_list_of_lists)
print('final_list from worker:')
print(final_list)
format_appointments_in_list_of_lists_with_date_time()
def worker2(data, final_list):
def evaluate_appointment_time_in_formatted_appointments_in_list_of_lists_to_current_time_and_publish_to_MQTT_if_equal():
client = mqttc.Client("SuperPython")
client.connect("10.0.0.XXX", 1883)
currentdatetime = pendulum.now().replace(microsecond=0)
# Since Pendulum is scheduled to run the job function every .6 seconds, we need to trim off the microseconds from
# the datetime object or it will miss entries because Webcalendar entries have only whole minutes
print(
'running evaluate_appointment_time_in_formatted_appointments_in_list_of_lists_to_current_time_and_publish_to_MQTT_if_equal:88888')
print('final_list from MQTT publish:')
print(final_list)
for i in final_list:
print('entry:')
print(i[2])
print('i[0]:')
print(i[0].to_datetime_string())
print('currentdatetime:')
print(currentdatetime.to_datetime_string())
if i[0] == currentdatetime:
payload = {"text": i[2], "siteId": "default", "lang": "", "id": "", "sessionId": "", "volume": 1.0}
# print(json.dumps(payload))
mqttc.publish(topic="hermes/tts/say", payload=json.dumps(payload))
client.disconnect()
evaluate_appointment_time_in_formatted_appointments_in_list_of_lists_to_current_time_and_publish_to_MQTT_if_equal()
def run_schedule():
while True:
schedule.run_pending()
# time.sleep(1)
def run_worker2(input_list_one, final_list):
schedule.every(0.01).minutes.do(worker2, input_list_one, final_list)
run_schedule()
def run_worker(input_list_two, final_list):
schedule.every(0.50).minutes.do(worker, input_list_two, final_list)
run_schedule()
def run_job():
p = Process(target=run_worker2, args=(input_list_one, final_list))
c = Process(target=run_worker, args=(input_list_two, final_list))
p.start()
c.start()
p.join()
# c.join()
if __name__ == "__main__":
with Manager() as manager:
lock = manager.Lock()
final_list = Manager().list([])
input_list_one = []
input_list_two = []
run_job()
这是输出的相关部分,请注意,在此示例中,当前日期时间打印语句 (2021-04-06 11:51:46 -> 2021-04-06 11:51:48) 在worker2 函数,这意味着 worker2 函数有时会错过属于该特定时间段的约会。我已经通过重新运行程序多次验证了这一点;有时一切正常,因为当约会的时间评估为真时,工作函数不会中断worker2评估,其他时候,当工作函数在该确切时间运行时,约会会丢失,有时甚至更大间隙,取决于工作函数运行多长时间。
currentdatetime:
2021-04-06 11:51:46
running remove_untimed_appointments_in_list_of_lists:555555555555555555555555555555555555555555555
running query_database:22222222222222222222222222222222222222222222222222222222222222222222
create_connection:111111111111111111111111111111111111111111111111111
Connection to MySQL DB from function successful
MySQL connection is closed
running convert_unformatted_appointments_in_list_of_tuples_to_list_of_lists:333333333333333333333
[[2, 20200331, 70000, '202003310:00', '<p>March 31st at 12 am</p>', 'A', 70000], [3, 20200331, 110000, 'hermes/tts/say', 'March 31st at 4 AM', 'A', 110000], [4, 20200331, 150000, '2020033108:00', '<p>March 31st at 8 am</p>', 'A', 150000], [5, 20200331, 190000, '2020033112:00', '<p>March 31st at 12PM noon</p>', 'A', 190000], [6, 20200331, 230000, '2020033116:00', '<p>March 31st at 4pm</p>', 'A', 230000], [7, 20200401, 10000, '2020033118:00', '<p>March 31st at 6pm</p>', 'A', 10000], [8, 20200401, 30000, '2020033120:00', '<p>March 31 at 8 pm</p>', 'A', 30000], [9, 20200331, 0, '20200331', '<p>March 31st Untimed event</p>', 'A', -1], [10, 20200331, 151000, '2020033108:10', '<p>March 31st 8:10am</p>', 'A', 151000], [11, 20200401, 0, '2020033117:00', 'March 31st at 5:00 pm???', 'A', 0], [13, 20200401, 1000, '2020033117:10', '<p>March 31st at 5:10pm</p>', 'A', 1000], [14, 20200407, 200000, '2020040713:00', '<p>April 7th at 1:00 pm</p>', 'A', 200000], [15, 20210222, 10000, 'json', 'json', 'A', 10000], [17, 20210226, 195000, 'test at 11:50 #1', 'test at 11:50 #1', 'A', 195000], [18, 20210302, 200000, 'test at noon', 'test at noon', 'A', 200000], [19, 20210404, 202900, 'Woo Hoo It really works four!', 'Woo Hoo It really works four!', 'A', 202900], [20, 20210305, 0, 'Untimed Event 3/5', 'Untimed Event 3/5', 'A', -1]]
remindersminus:
[[2, 20200331, 70000, '202003310:00', '<p>March 31st at 12 am</p>', 'A', 70000], [3, 20200331, 110000, 'hermes/tts/say', 'March 31st at 4 AM', 'A', 110000], [4, 20200331, 150000, '2020033108:00', '<p>March 31st at 8 am</p>', 'A', 150000], [5, 20200331, 190000, '2020033112:00', '<p>March 31st at 12PM noon</p>', 'A', 190000], [6, 20200331, 230000, '2020033116:00', '<p>March 31st at 4pm</p>', 'A', 230000], [7, 20200401, 10000, '2020033118:00', '<p>March 31st at 6pm</p>', 'A', 10000], [8, 20200401, 30000, '2020033120:00', '<p>March 31 at 8 pm</p>', 'A', 30000], [10, 20200331, 151000, '2020033108:10', '<p>March 31st 8:10am</p>', 'A', 151000], [11, 20200401, 0, '2020033117:00', 'March 31st at 5:00 pm???', 'A', 0], [13, 20200401, 1000, '2020033117:10', '<p>March 31st at 5:10pm</p>', 'A', 1000], [14, 20200407, 200000, '2020040713:00', '<p>April 7th at 1:00 pm</p>', 'A', 200000], [15, 20210222, 10000, 'json', 'json', 'A', 10000], [17, 20210226, 195000, 'test at 11:50 #1', 'test at 11:50 #1', 'A', 195000], [18, 20210302, 200000, 'test at noon', 'test at noon', 'A', 200000], [19, 20210404, 202900, 'Woo Hoo It really works four!', 'Woo Hoo It really works four!', 'A', 202900]]
running format_appointments_in_list_of_lists_with_date_time:6666666666666666666666666666666666666666
running evaluate_appointment_time_in_formatted_appointments_in_list_of_lists_to_current_time_and_publish_to_MQTT_if_equal:88888
final_list from MQTT publish:
[[DateTime(2020, 3, 31, 0, 0, 0, tzinfo=Timezone('America/Los_Angeles')), '202003310:00', '<p>March 31st at 12 am</p>', 'A', 70000], [DateTime(2020, 3, 31, 4, 0, 0, tzinfo=Timezone('America/Los_Angeles')), 'hermes/tts/say', 'March 31st at 4 AM', 'A', 110000], [DateTime(2020, 3, 31, 8, 0, 0, tzinfo=Timezone('America/Los_Angeles')), '2020033108:00', '<p>March 31st at 8 am</p>', 'A', 150000], [DateTime(2020, 3, 31, 12, 0, 0, tzinfo=Timezone('America/Los_Angeles')), '2020033112:00', '<p>March 31st at 12PM noon</p>', 'A', 190000], [DateTime(2020, 3, 31, 16, 0, 0, tzinfo=Timezone('America/Los_Angeles')), '2020033116:00', '<p>March 31st at 4pm</p>', 'A', 230000], [DateTime(2020, 3, 31, 18, 0, 0, tzinfo=Timezone('America/Los_Angeles')), '2020033118:00', '<p>March 31st at 6pm</p>', 'A', 10000], [DateTime(2020, 3, 31, 20, 0, 0, tzinfo=Timezone('America/Los_Angeles')), '2020033120:00', '<p>March 31 at 8 pm</p>', 'A', 30000]]
entry:
<p>March 31st at 12 am</p>
i[0]:
2020-03-31 00:00:00
currentdatetime:
2021-04-06 11:51:48
同样,我知道这段代码是一场噩梦,但随着我知识的增长,我会不断改进它。(我希望!!)
解决方案
推荐阅读
- rundeck - Rundeck - 生成新令牌后在哪里更新 rundeck 身份验证令牌?
- javascript - Javascript:根据嵌套的 start_date 和 end_date 对数据进行排序
- monit - M/Monit 更改/重置管理员密码
- docker - 如何找出卷属于哪个容器?
- python - 是否可以通过 cypari2 使用 Pari 并行化 python 代码?
- python - 如何从 BigQuery 表中提取所有列名的列表?
- r - 计算时间差。R
- python - Python中的平行向量点
- python - python bs4如何在一行中组合多个图像url?
- java - Java 属性或类变量或类字段等中的术语是否正确?非常感谢