首页 > 解决方案 > 对时间敏感的应用程序的并发、并行和多处理

问题描述

需要一些程序设计的指导。我是初学者,所以我不知道是否有更好的方法来解决这个问题。我尝试使用 multiprocessing 和 asyncio 来解决时间问题。在我认为是最好的方法的多处理下,我的程序在一个进程中的一个函数中运行 sql 查询,然后该函数通过在另一个进程中运行的 Manager().list([]) 将结果传递给第二个函数。

我正在使用 schedule 模块每 10 秒运行一次 sql 查询函数,并且每秒运行第二个函数。(我需要根据第二个函数中的当前日期/时间评估从 sql 查询返回的日期/时间以进行后续操作。)我看到的是 sql 函数似乎中断了第二次评估函数,例如我将在时间评估函数中看到当前日期/时间增量(以秒为单位),但是当 sql 查询函数运行时,我看到一、二或三秒的间隔(以及来自 sql 查询的匹配日期/时间到当前日期/时间不会触发期望动作),然后日期/时间开始再次增加。

我不确定多处理是否按预期工作,或者我的代码是否有错误,或者我是否应该尝试另一种方法(广泛接受建议)

谢谢!!!!

真的希望避免发布我的代码,我是一个真正的 python 初学者,我知道这是一团糟(难以置信的轻描淡写!)而且我对此感到有点尴尬......

from multiprocessing import Process, Manager
import pendulum
import schedule
import time
import mysql.connector
from mysql.connector import Error
import paho.mqtt.client as mqttc
import json

def worker(data, final_list):
    def create_connection(host_name, data_base, user_name, user_password):
        print('create_connection:111111111111111111111111111111111111111111111111111')
        connection = None
        try:
            connection = mysql.connector.connect(
                host=host_name,
                database=data_base,
                user=user_name,
                passwd=user_password,
                # Auto commit
                autocommit=True
            )
            print("Connection to MySQL DB from function successful")
        except Error as e:
            print(f"The error '{e}' occurred")
        return connection

    def query_database():
        print('running query_database:22222222222222222222222222222222222222222222222222222222222222222222')
        connection = create_connection("10.0.0.XXX", "webcalendar", "python2", "XXXXXXX")
        queryappointments = """

            SELECT DISTINCT
              webcal_entry.cal_id,
              webcal_entry.cal_date, 
              webcal_entry.cal_due_time,
              webcal_entry.cal_name,
              webcal_entry.cal_description,
              webcal_entry_user.cal_status,
              webcal_entry.cal_time
            FROM
              webcal_entry_user
              LEFT JOIN webcal_entry ON webcal_entry.cal_id = webcal_entry_user.cal_id
            WHERE 
             webcal_entry_user.cal_status = 'A'
            """

        cursor = connection.cursor()
        unformatted_appointments_in_list_of_tuples = None
        try:
            cursor.execute(queryappointments)
            unformatted_appointments_in_list_of_tuples = cursor.fetchall()
        except Error as e:
            print(f"The error '{e}' occurred")
        finally:
            if (connection.is_connected()):
                cursor.close()
                connection.close()
                print("MySQL connection is closed")
        return unformatted_appointments_in_list_of_tuples

    def convert_unformatted_appointments_in_list_of_tuples_to_list_of_lists():
        unformatted_appointments_in_list_of_tuples = query_database()
        print('running convert_unformatted_appointments_in_list_of_tuples_to_list_of_lists:333333333333333333333')
        unformatted_appointments_in_list_of_lists = [list(appointments) for appointments in
                                                     unformatted_appointments_in_list_of_tuples]
        print(unformatted_appointments_in_list_of_lists)
        return unformatted_appointments_in_list_of_lists

    def create_untimed_reminders_in_list_of_lists():
        # Webcalendar uses a -1 in webcal_entry.cal_time to indicate an untimed appointment
        # this function creates a list of lists for these entries
        print('running create_untimed_reminders_in_list_of_lists:444444444444444444444444444444444444444444444444')
        untimed_reminders = []
        unformatted_appointments_in_list_of_lists = convert_unformatted_appointments_in_list_of_tuples_to_list_of_lists()
        for i in (range(len(unformatted_appointments_in_list_of_lists))):
            if unformatted_appointments_in_list_of_lists[i][6] == -1:
                untimed_reminders.append(unformatted_appointments_in_list_of_lists[i])
        print("untimed_reminders:")
        print(untimed_reminders)
        return untimed_reminders

    def remove_untimed_appointments_in_list_of_lists():
        # Webcalendar uses a -1 in webcal_entry.cal_time to indicate an untimed appointment
        # this function removes those entries
        print('running remove_untimed_appointments_in_list_of_lists:555555555555555555555555555555555555555555555')
        unformatted_appointments_in_list_of_lists = convert_unformatted_appointments_in_list_of_tuples_to_list_of_lists()
        for i in reversed(range(len(unformatted_appointments_in_list_of_lists))):
            if unformatted_appointments_in_list_of_lists[i][6] == -1:
                del unformatted_appointments_in_list_of_lists[i]
        print("remindersminus:")
        print(unformatted_appointments_in_list_of_lists)
        return unformatted_appointments_in_list_of_lists

    def format_appointments_in_list_of_lists_with_date_time():
        unformatted_appointments_in_list_of_lists = remove_untimed_appointments_in_list_of_lists()
        print('running format_appointments_in_list_of_lists_with_date_time:6666666666666666666666666666666666666666')
        # Clear final_list = Manager().list([]) so we don't keep reappending same appointments
        final_list[:] = []
        final_list[:]

        for sub_list in unformatted_appointments_in_list_of_lists:
            # Webcalendar stores the appointment date as an integer in webcal_entry.cal_date and the appointment
            # time as an integer in webcal_entry.cal_due_time, we need to combine these two fields and add a "T"
            # between them to create an ISO 8601 Datetime string
            sub_list[0] = (str(sub_list[1])) + "T" + str('{:>06d}'.format(sub_list[2]))
            sub_list.remove(sub_list[1])
            sub_list.remove(sub_list[1])

        for sub_list in unformatted_appointments_in_list_of_lists:
            # Webcalendar stores datetimes as UTC, we need to convert the string we created to an UTC datetime objects
            sub_list[0] = (pendulum.parse(sub_list[0], tz='UTC'))

        for sub_list in unformatted_appointments_in_list_of_lists:
            # Since we want the MQTT messages to fire at the local date/time, we need to convert the datetime objects to
            # the local timezone
            sub_list[0] = sub_list[0].in_tz('America/Los_Angeles')
        for item in unformatted_appointments_in_list_of_lists:
            final_list.append(item)
        print('unformatted_appointments_in_list_of_lists:')
        print(unformatted_appointments_in_list_of_lists)
        print('final_list from worker:')
        print(final_list)

    format_appointments_in_list_of_lists_with_date_time()

def worker2(data, final_list):
    def evaluate_appointment_time_in_formatted_appointments_in_list_of_lists_to_current_time_and_publish_to_MQTT_if_equal():
        client = mqttc.Client("SuperPython")
        client.connect("10.0.0.XXX", 1883)
        currentdatetime = pendulum.now().replace(microsecond=0)
        # Since Pendulum is scheduled to run the job function every .6 seconds, we need to trim off the microseconds from
        # the datetime object or it will miss entries because Webcalendar entries have only whole minutes
        print(
            'running evaluate_appointment_time_in_formatted_appointments_in_list_of_lists_to_current_time_and_publish_to_MQTT_if_equal:88888')
        print('final_list from MQTT publish:')
        print(final_list)
        for i in final_list:
            print('entry:')
            print(i[2])
            print('i[0]:')
            print(i[0].to_datetime_string())
            print('currentdatetime:')
            print(currentdatetime.to_datetime_string())
            if i[0] == currentdatetime:
                payload = {"text": i[2], "siteId": "default", "lang": "", "id": "", "sessionId": "", "volume": 1.0}
                #            print(json.dumps(payload))
                mqttc.publish(topic="hermes/tts/say", payload=json.dumps(payload))
        client.disconnect()

    evaluate_appointment_time_in_formatted_appointments_in_list_of_lists_to_current_time_and_publish_to_MQTT_if_equal()


def run_schedule():
    while True:
        schedule.run_pending()
#        time.sleep(1)

def run_worker2(input_list_one, final_list):
    schedule.every(0.01).minutes.do(worker2, input_list_one, final_list)
    run_schedule()

def run_worker(input_list_two, final_list):
    schedule.every(0.50).minutes.do(worker, input_list_two, final_list)
    run_schedule()


def run_job():
    p = Process(target=run_worker2, args=(input_list_one, final_list))
    c = Process(target=run_worker, args=(input_list_two, final_list))
    p.start()
    c.start()
    p.join()
#    c.join()

if __name__ == "__main__":
    with Manager() as manager:
        lock = manager.Lock()
        final_list = Manager().list([])
        input_list_one = []
        input_list_two = []
    run_job()

这是输出的相关部分,请注意,在此示例中,当前日期时间打印语句 (2021-04-06 11:51:46 -> 2021-04-06 11:51:48) 在worker2 函数,这意味着 worker2 函数有时会错过属于该特定时间段的约会。我已经通过重新运行程序多次验证了这一点;有时一切正常,因为当约会的时间评估为真时,工作函数不会中断worker2评估,其他时候,当工作函数在该确切时间运行时,约会会丢失,有时甚至更大间隙,取决于工作函数运行多长时间。

currentdatetime:
2021-04-06 11:51:46
running remove_untimed_appointments_in_list_of_lists:555555555555555555555555555555555555555555555
running query_database:22222222222222222222222222222222222222222222222222222222222222222222
create_connection:111111111111111111111111111111111111111111111111111
Connection to MySQL DB from function successful
MySQL connection is closed
running convert_unformatted_appointments_in_list_of_tuples_to_list_of_lists:333333333333333333333
[[2, 20200331, 70000, '202003310:00', '<p>March 31st at 12 am</p>', 'A', 70000], [3, 20200331, 110000, 'hermes/tts/say', 'March 31st at 4 AM', 'A', 110000], [4, 20200331, 150000, '2020033108:00', '<p>March 31st at 8 am</p>', 'A', 150000], [5, 20200331, 190000, '2020033112:00', '<p>March 31st at 12PM noon</p>', 'A', 190000], [6, 20200331, 230000, '2020033116:00', '<p>March 31st at 4pm</p>', 'A', 230000], [7, 20200401, 10000, '2020033118:00', '<p>March 31st at 6pm</p>', 'A', 10000], [8, 20200401, 30000, '2020033120:00', '<p>March 31 at 8 pm</p>', 'A', 30000], [9, 20200331, 0, '20200331', '<p>March 31st Untimed event</p>', 'A', -1], [10, 20200331, 151000, '2020033108:10', '<p>March 31st 8:10am</p>', 'A', 151000], [11, 20200401, 0, '2020033117:00', 'March 31st at 5:00 pm???', 'A', 0], [13, 20200401, 1000, '2020033117:10', '<p>March 31st at 5:10pm</p>', 'A', 1000], [14, 20200407, 200000, '2020040713:00', '<p>April 7th at 1:00 pm</p>', 'A', 200000], [15, 20210222, 10000, 'json', 'json', 'A', 10000], [17, 20210226, 195000, 'test at 11:50 #1', 'test at 11:50 #1', 'A', 195000], [18, 20210302, 200000, 'test at noon', 'test at noon', 'A', 200000], [19, 20210404, 202900, 'Woo Hoo It really works four!', 'Woo Hoo It really works four!', 'A', 202900], [20, 20210305, 0, 'Untimed Event 3/5', 'Untimed Event 3/5', 'A', -1]]
remindersminus:
[[2, 20200331, 70000, '202003310:00', '<p>March 31st at 12 am</p>', 'A', 70000], [3, 20200331, 110000, 'hermes/tts/say', 'March 31st at 4 AM', 'A', 110000], [4, 20200331, 150000, '2020033108:00', '<p>March 31st at 8 am</p>', 'A', 150000], [5, 20200331, 190000, '2020033112:00', '<p>March 31st at 12PM noon</p>', 'A', 190000], [6, 20200331, 230000, '2020033116:00', '<p>March 31st at 4pm</p>', 'A', 230000], [7, 20200401, 10000, '2020033118:00', '<p>March 31st at 6pm</p>', 'A', 10000], [8, 20200401, 30000, '2020033120:00', '<p>March 31 at 8 pm</p>', 'A', 30000], [10, 20200331, 151000, '2020033108:10', '<p>March 31st 8:10am</p>', 'A', 151000], [11, 20200401, 0, '2020033117:00', 'March 31st at 5:00 pm???', 'A', 0], [13, 20200401, 1000, '2020033117:10', '<p>March 31st at 5:10pm</p>', 'A', 1000], [14, 20200407, 200000, '2020040713:00', '<p>April 7th at 1:00 pm</p>', 'A', 200000], [15, 20210222, 10000, 'json', 'json', 'A', 10000], [17, 20210226, 195000, 'test at 11:50 #1', 'test at 11:50 #1', 'A', 195000], [18, 20210302, 200000, 'test at noon', 'test at noon', 'A', 200000], [19, 20210404, 202900, 'Woo Hoo It really works four!', 'Woo Hoo It really works four!', 'A', 202900]]
running format_appointments_in_list_of_lists_with_date_time:6666666666666666666666666666666666666666
running evaluate_appointment_time_in_formatted_appointments_in_list_of_lists_to_current_time_and_publish_to_MQTT_if_equal:88888
final_list from MQTT publish:
[[DateTime(2020, 3, 31, 0, 0, 0, tzinfo=Timezone('America/Los_Angeles')), '202003310:00', '<p>March 31st at 12 am</p>', 'A', 70000], [DateTime(2020, 3, 31, 4, 0, 0, tzinfo=Timezone('America/Los_Angeles')), 'hermes/tts/say', 'March 31st at 4 AM', 'A', 110000], [DateTime(2020, 3, 31, 8, 0, 0, tzinfo=Timezone('America/Los_Angeles')), '2020033108:00', '<p>March 31st at 8 am</p>', 'A', 150000], [DateTime(2020, 3, 31, 12, 0, 0, tzinfo=Timezone('America/Los_Angeles')), '2020033112:00', '<p>March 31st at 12PM noon</p>', 'A', 190000], [DateTime(2020, 3, 31, 16, 0, 0, tzinfo=Timezone('America/Los_Angeles')), '2020033116:00', '<p>March 31st at 4pm</p>', 'A', 230000], [DateTime(2020, 3, 31, 18, 0, 0, tzinfo=Timezone('America/Los_Angeles')), '2020033118:00', '<p>March 31st at 6pm</p>', 'A', 10000], [DateTime(2020, 3, 31, 20, 0, 0, tzinfo=Timezone('America/Los_Angeles')), '2020033120:00', '<p>March 31 at 8 pm</p>', 'A', 30000]]
entry:
<p>March 31st at 12 am</p>
i[0]:
2020-03-31 00:00:00
currentdatetime:
2021-04-06 11:51:48

同样,我知道这段代码是一场噩梦,但随着我知识的增长,我会不断改进它。(我希望!!)

标签: pythonconcurrencyparallel-processingmultiprocessing

解决方案


推荐阅读