python - Python 线程 - 不断保存来自其他线程的结果
问题描述
我想通过线程从两个传感器获取数据。假设通过调用相应的函数可以获得这些数据。这些传感器应该被连续查询,但它们的值应该只在两次测量之间的时间差大于预定义的阈值时保存(仅在与上次保存数据的时间差大于 10 秒时保存数据)。由于数据收集的两个线程都在一个 while 循环中运行,因此我使用另一个线程来保存数据。但是,我必须以某种方式确保这些数据在保存时可用。所以保存的线程必须等待其他两个线程。目前,我使用time.sleep()
哪个正在工作。但是有没有更好的方法来保存另外两个线程的数据呢?
import threading
import copy
import time
def thread1():
global connected
global data
while connected:
sensor_data = getSensorData()
data['x'] = sensor_data.x
data['y'] = sensor_data.y
data['time'] = sensor_data.time
def thread2():
global connected
global data
while connected:
other_sensor_data = getOtherSensorData()
data['h'] = other_sensor_data.h
def save_data():
global connected
global data
global results
while connected:
time.sleep(0.1)
if len(results) == 0 and len(data) > 0:
results.append(copy.deepcopy(data))
elif len(results) >= 1:
if data['time'] - results[-1]['time'] >= 10:
results.append(copy.deepcopy(data))
if __name__ == '__main__':
global connected
global data
global result
connected = True
data = {}
if 'result' not in globals():
result = []
first_thread = threading.Thread(target=thread1)
second_thread = threading.Thread(target=thread2)
save_thread = threading.Thread(target=save_data)
first_thread.start()
second_thread.start()
save_thread.start()
解决方案
我个人会使用内置模块队列,该队列旨在安全地从线程中检索数据。
使用 LIFO 功能创建堆然后清空它们。
这是一个工作示例:
import threading
import queue
import time
sensor_pile = queue.LifoQueue()
other_sensor_pile = queue.LifoQueue()
def getSensorData():
time.sleep(0.05)
class Test:
x = "x " + str(time.time())
y = "y " + str(time.time())
time = time.time()
return Test
def getOtherSensorData():
time.sleep(0.01)
class Test:
h = "h " + str(time.time())
return Test
def thread1():
global connected
while connected:
sensor_pile.put(getSensorData())
def thread2():
global connected
while connected:
other_sensor_pile.put(getOtherSensorData())
def save_data():
global connected
global results
last_time = -float("inf")
# Use other_sensor_pile.get() if the first h value should not be None
h = None
while connected:
data = sensor_pile.get() # Will wait to receive data.
# As the main pile, the main while loop will keep it empty.
try:
# Update h to the latest value and then empty the pile.
# get_nowait will get the data if it is available
# or raise a queue.Empty error if it isn't.
h = other_sensor_pile.get_nowait().h
while True:
other_sensor_pile.get_nowait()
except queue.Empty:
pass
if data.time - last_time >= 0.2: # Shortened for the sake of testing.
last_time = data.time
results.append({
'time': last_time,
'x': data.x,
'y': data.y,
'h': h
})
if __name__ == '__main__':
global connected
global results
connected = True
if 'results' not in globals():
results = []
first_thread = threading.Thread(target=thread1)
second_thread = threading.Thread(target=thread2)
save_thread = threading.Thread(target=save_data)
first_thread.start()
second_thread.start()
save_thread.start()
time.sleep(1)
connected = False
save_thread.join()
second_thread.join()
first_thread.join()
for line in results:
print(line)
使用示例输出:
{'time': 1627652645.7188394, 'x': 'x 45.7188', 'y': 'y 45.7188', 'h': 'h 45.7032'}
{'time': 1627652645.9584458, 'x': 'x 45.9584', 'y': 'y 45.9584', 'h': 'h 45.9584'}
{'time': 1627652646.1991317, 'x': 'x 46.1991', 'y': 'y 46.1991', 'h': 'h 46.1991'}
{'time': 1627652646.4394188, 'x': 'x 46.4394', 'y': 'y 46.4394', 'h': 'h 46.4394'}
{'time': 1627652646.6793587, 'x': 'x 46.6793', 'y': 'y 46.6793', 'h': 'h 46.6793'}
堆的清空效率不高,如果数据放入的速度比清空的速度快,可能会导致问题。
可能还有其他一些方法可以做到这一点,但我不确定它是如何工作的。请参阅: 多线程 - 覆盖队列中的旧值?
推荐阅读
- c# - 如何在视图中显示单个记录(当存储在控制器中使用 lambda 的变量中时)
- java - 在画布中绘制一个矩形
- php - 如何让按钮在其自己的 div 中单击获取数据属性?
- javascript - 如何验证正则表达式本身是否有效?
- c# - 哪个结构更好,将事件处理程序作为 arg 传递,或监听事件触发?
- ios - 用于颤振的 GoogleMaps 插件仅显示 ios 模拟器上的标记
- php - 将数据作为数组传递 - PHP
- excel - SSRS 报告导出到 HH:mm:ss 格式的 excel 问题
- .net - 无法使用 office js 读取锁定的单元格值
- biztalk - BizTalk 中的 MSI 导出文件与预期不同