首页 > 解决方案 > Python 线程 - 不断保存来自其他线程的结果

问题描述

我想通过线程从两个传感器获取数据。假设通过调用相应的函数可以获得这些数据。这些传感器应该被连续查询,但它们的值应该只在两次测量之间的时间差大于预定义的阈值时保存(仅在与上次保存数据的时间差大于 10 秒时保存数据)。由于数据收集的两个线程都在一个 while 循环中运行,因此我使用另一个线程来保存数据。但是,我必须以某种方式确保这些数据在保存时可用。所以保存的线程必须等待其他两个线程。目前,我使用time.sleep()哪个正在工作。但是有没有更好的方法来保存另外两个线程的数据呢?

import threading
import copy
import time

def thread1():
    global connected
    global data
    while connected:
        sensor_data = getSensorData()
        data['x'] = sensor_data.x
        data['y'] = sensor_data.y
        data['time'] = sensor_data.time 

def thread2():
    global connected
    global data
    while connected:
        other_sensor_data = getOtherSensorData()
        data['h'] = other_sensor_data.h

def save_data():
    global connected
    global data
    global results
    while connected:
        time.sleep(0.1)
        if len(results) == 0 and len(data) > 0:
            results.append(copy.deepcopy(data))
        elif len(results) >= 1:
            if data['time'] - results[-1]['time'] >= 10:
                results.append(copy.deepcopy(data))


if __name__ == '__main__':
    global connected
    global data
    global result

    connected = True
    data = {}
    if 'result' not in globals():
        result = []

    first_thread = threading.Thread(target=thread1)
    second_thread = threading.Thread(target=thread2)
    save_thread = threading.Thread(target=save_data)
    first_thread.start()
    second_thread.start()
    save_thread.start()

标签: pythonmultithreadingpython-multithreading

解决方案


我个人会使用内置模块队列,该队列旨在安全地从线程中检索数据。

使用 LIFO 功能创建堆然后清空它们。

这是一个工作示例:

import threading
import queue
import time

sensor_pile = queue.LifoQueue()
other_sensor_pile = queue.LifoQueue()


def getSensorData():
    time.sleep(0.05)

    class Test:
        x = "x " + str(time.time())
        y = "y " + str(time.time())
        time = time.time()
    return Test


def getOtherSensorData():
    time.sleep(0.01)

    class Test:
        h = "h " + str(time.time())
    return Test


def thread1():
    global connected
    while connected:
        sensor_pile.put(getSensorData())


def thread2():
    global connected
    while connected:
        other_sensor_pile.put(getOtherSensorData())


def save_data():
    global connected
    global results

    last_time = -float("inf")

    # Use other_sensor_pile.get() if the first h value should not be None
    h = None

    while connected:
        data = sensor_pile.get()  # Will wait to receive data.
        # As the main pile, the main while loop will keep it empty.

        try:
            # Update h to the latest value and then empty the pile.

            # get_nowait will get the data if it is available
            # or raise a queue.Empty error if it isn't.
            h = other_sensor_pile.get_nowait().h

            while True:
                other_sensor_pile.get_nowait()
        except queue.Empty:
            pass

        if data.time - last_time >= 0.2:  # Shortened for the sake of testing.
            last_time = data.time
            results.append({
                'time': last_time,
                'x': data.x,
                'y': data.y,
                'h': h
            })


if __name__ == '__main__':
    global connected
    global results

    connected = True
    if 'results' not in globals():
        results = []

    first_thread = threading.Thread(target=thread1)
    second_thread = threading.Thread(target=thread2)
    save_thread = threading.Thread(target=save_data)
    first_thread.start()
    second_thread.start()
    save_thread.start()

    time.sleep(1)

    connected = False
    save_thread.join()
    second_thread.join()
    first_thread.join()
    for line in results:
        print(line)

使用示例输出:

{'time': 1627652645.7188394, 'x': 'x 45.7188', 'y': 'y 45.7188', 'h': 'h 45.7032'}
{'time': 1627652645.9584458, 'x': 'x 45.9584', 'y': 'y 45.9584', 'h': 'h 45.9584'}
{'time': 1627652646.1991317, 'x': 'x 46.1991', 'y': 'y 46.1991', 'h': 'h 46.1991'}
{'time': 1627652646.4394188, 'x': 'x 46.4394', 'y': 'y 46.4394', 'h': 'h 46.4394'}
{'time': 1627652646.6793587, 'x': 'x 46.6793', 'y': 'y 46.6793', 'h': 'h 46.6793'}

堆的清空效率不高,如果数据放入的速度比清空的速度快,可能会导致问题。

可能还有其他一些方法可以做到这一点,但我不确定它是如何工作的。请参阅: 多线程 - 覆盖队列中的旧值?


推荐阅读