python - Python - 调用函数并继续主程序
问题描述
我正在以设定的频率(例如 8hz)收集数据,这些数据被修改、存储,然后偶尔发送出去进行写入。
由于流式传输/写入数据,我遇到了时间问题。当程序写入数据(每 5 秒)时,它需要超过 1/8hz (0.125s) 的时间。这会延迟我的数据采集时间。
我想要做的是调用我的 write 函数并让它运行,但也让我的主程序继续运行,这样时间就不会延迟。
我尝试使用几种不同的方法,但运气不佳:线程、多处理和异步。不过,我很有可能错误地使用它们。
我正在做的一个非常简化的版本:
def main():
while True:
curTime = datetime.datetime.now()
while curTime < nextTime:
continue
data = collectData() #collect data (serial port, tcp, etc.)
pdata = processData(data) #process data
hdata = holdData(hdata) #store data stream for occasional writing
if len(hdata) > 8*5:
writeData(hdata) #send data to be written - takes too long and causes delay in next sample > 0.125s from previous.
nextTime = curTime + datetime.timedelta(microsecond = 125000) #adjust next time for measurement - 0.125s after last time data was collected.
在上面的代码中。我想调用 writeData 并让该函数执行此操作,但要保持我的主要函数继续前进并收集更多数据。假设它比我的写入间隔快,writeData 可以花尽可能长的时间;它现在是。
我正在使用python3。
希望这是足够的信息来提供一些指导。
任何帮助深表感谢。
解决方案
您正在尝试通过使用异步编程来解决您的问题。threading
Python 中的异步编程本身就很棘手,因为使用线程 ( )、进程 ( multiprocessing
) 或协程 ( )实现的并发存在主要差异asyncio
。没有“正确”的方法,您选择最适合当前用例的方法。
您的问题同时具有 IO 绑定(数据获取和写入)和 CPU 绑定(数据处理)任务,它们可以并行独立运行。这是你可以做到的。也许这不是最优雅的解决方案,但它会向您展示如何解决此类问题的想法。
在我们的解决方案中,我们将线程用于 IO 密集型任务,将进程用于 CPU 密集型任务。就个人而言,我更喜欢对所有任务使用线程,但在这种情况下,由于GIL,我们将无法释放现代多核 CPU 的所有功能来并行化数据处理。
首先,让我们在可执行脚本中导入所需的模块:
import time
import random
import signal
from threading import Thread
from multiprocessing.pool import Pool
from queue import Queue, Empty
我们解决的问题是生产者-消费者问题。主线程以固定的时间间隔获取数据并将其放入队列中。处理器线程从队列中获取数据并将其提交给工作池进行处理,然后收集结果并将它们放入另一个队列中。该队列不断被写入线程读取,最终保存数据。现在,我们添加一些常量——并行运行的工作进程数量和以秒为单位的数据获取间隔:
WORKERS = 4
FETCH_INTERVAL = 1
下面是负责FETCH_INTERVAL
在无限循环中每秒钟获取数据的主线程:
def main():
raw_data = Queue()
processor = Thread(target=process, args=(raw_data,))
processor.start()
i = 0
try:
while True:
t_fetch = time.time()
# Simulate the data fetching:
time.sleep(0.5)
data = i, random.random()
print("[main] Fetched raw data:", data)
raw_data.put(data)
t_elapsed = time.time() - t_fetch
if t_elapsed < FETCH_INTERVAL:
time.sleep(FETCH_INTERVAL - t_elapsed)
else:
print("[error] The fetch interval is too short!")
i = i + 1
except KeyboardInterrupt:
print("shutting down...")
finally:
raw_data.put(None)
processor.join()
if __name__ == "__main__":
main()
我们首先定义一个raw_data
队列来存储获取的数据,然后启动一个processor
线程,该线程运行一个process
以raw_data
队列为参数的函数。请注意,我们不只是FETCH_INTERVAL
在每次数据获取后休眠秒,而是考虑到数据获取引起的延迟,因为它也是一个 IO 绑定任务。该脚本无限期地运行,直到Ctrl-C
被按下。一旦中断,我们就将None
其放入队列中以向线程发出处理结束的信号并等待processor
线程完成。现在,我们添加一个由线程process
运行的函数的定义:processor
def process(raw_data):
proc_data = Queue()
writer = Thread(target=write, args=(proc_data,))
writer.start()
with Pool(WORKERS, init_worker) as pool:
while True:
data_batch = dequeue_data(raw_data, batch_size=WORKERS)
if not data_batch:
time.sleep(0.5)
continue
results = pool.map(process_data, data_batch)
print("[processor] Processed raw data:", results)
for r in results:
proc_data.put(r)
if None in data_batch:
break
print("joining the writer thread...")
writer.join()
在这里,我们创建一个proc_data
队列来保存writer
线程的数据处理结果。writer
线程运行一个write
我们稍后会定义的函数。一旦writer
线程启动,我们创建一个pool
进程WORKERS
。在这里,我们使用init_worker
函数作为Pool
进程初始化程序,以便在工作进程中忽略键盘中断,因为它们是在主线程中处理的:
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
一旦创建了进程池,我们就会进入一个无限循环,通过调用我们将在下面定义raw_data
的函数,不断地从队列中取出数据批次。dequeue_data
然后将数据批次提交到工作池进行处理。该process_data
函数将在下面定义。然后,我们收集结果并将它们放入线程proc_data
读取的队列中。writer
如果None
数据批处理中有,则处理被中断,我们等待writer
线程完成。dequeue_data
函数定义如下:
def dequeue_data(data_queue, batch_size):
items = []
for _ in range(batch_size):
try:
item = data_queue.get(block=False)
except (KeyboardInterrupt, Empty):
break
items.append(item)
return items
在这里,您会看到它只是尝试batch_size
从data_queue
. 如果没有数据,则返回一个空列表。该process_data
函数什么都不做,只是休眠 1-5 秒:
def process_data(data):
if data is None:
return
# Simulate the data processing:
time.sleep(random.randint(1, 5))
return data
最后,我们定义在线程write
中运行的函数:writer
def write(proc_data):
while True:
data = proc_data.get()
if data is None:
break
# Simulate the data writing:
time.sleep(random.randint(1, 2))
print("[writer] Wrote processed data:", data)
无限循环None
从proc_data
队列中取出后停止。现在,我们将所有提供的代码保存在一个脚本中,然后运行并检查其输出:
[main] Fetched raw data: (0, 0.8092310624924178)
[main] Fetched raw data: (1, 0.8594148294409398)
[main] Fetched raw data: (2, 0.9059856675215566)
[main] Fetched raw data: (3, 0.5653361157057876)
[main] Fetched raw data: (4, 0.8966396309003691)
[main] Fetched raw data: (5, 0.5772344067614918)
[processor] Processed raw data: [(0, 0.8092310624924178)]
[main] Fetched raw data: (6, 0.4614411399877961)
^Cshutting down...
[writer] Wrote processed data: (0, 0.8092310624924178)
[processor] Processed raw data: [(1, 0.8594148294409398), (2, 0.9059856675215566), (3, 0.5653361157057876), (4, 0.8966396309003691)]
[writer] Wrote processed data: (1, 0.8594148294409398)
[writer] Wrote processed data: (2, 0.9059856675215566)
[processor] Processed raw data: [(5, 0.5772344067614918), (6, 0.4614411399877961), None]
joining the writer thread...
[writer] Wrote processed data: (3, 0.5653361157057876)
[writer] Wrote processed data: (4, 0.8966396309003691)
[writer] Wrote processed data: (5, 0.5772344067614918)
[writer] Wrote processed data: (6, 0.4614411399877961)
线程以固定的main
时间间隔获取数据,同时processor
并行批量处理数据,并writer
保存结果。一旦我们命中线程停止获取数据,然后线程完成处理剩余Ctrl-C
的获取数据并开始等待线程完成将数据写入磁盘。main
processor
writer
推荐阅读
- ios - 基于视图的中心标签
- mysql - Windows 10 更新后 MySql 5.7 服务未启动
- python - API 网关返回额外数据错误
- javascript - ReactJS 的实时 API 监听器
- mongodb - MongoDB 聚合:$Project(如何在同一投影管道的另一个字段上使用一个字段)
- c# - 在多级继承的情况下如何使用虚拟关键字
- c# - 如何编写代码以在 Destructor/Dispose 方法中释放托管/非托管资源
- ios - ios PDFKit displaymode = singlepage 只显示pdf的第一页
- python - 我怎样才能明确地看到'self'在python中做了什么?
- go - 配置文件类型使用