multithreading - 如何使用 PyQt 线程长时间睡眠?
问题描述
我有许多特定的对象需要以特定的不断变化的间隔运行特定的功能,一次又一次,直到他们决定完成。
例如,一个对象可能需要等待 30 秒,运行,等待 60 秒,运行,等待 10 秒,运行......你明白了,这可能会发生在 30-120 个不同的对象上,运行完全相同种功能。
我在想,简单地让一个函数在确切的时间间隔内休眠就可以解决我的问题,但是,如果我错了,请纠正我,我记得线程池在任何给定时间只能运行一定数量的线程(对我来说是 12 )。我如何绕过这个限制?
class Thing(object):
def getCurrentPeriod(self):
return random.randint(5, 30) # Some ever changing period of time
def refresh(self):
doThings() # A long running task that is disk and network intensive
def waitRefresh(self):
period = self.getCurrentPeriod()
time.sleep(period) # Wait that period out
self.refresh()
return self.needRefresh()
# Boolean if it needs to restart - Not sure about how to reschedule,
# or specifically where to connect the worker emit when it finishes
# to make sure this *specific* Thing obj gets it's waitRefresh func called again.
class App(QMainWindow):
def __init__(self, *args, **kwargs):
super(MainWindow, self).__init__(*args, **kwargs)
self.threadpool = QThreadPool()
# Add initial objects to pool (other portions of app may add more over time)
for thing in self.acquireThings():
worker = Worker(thing.waitRefresh)
self.threadpool.start(worker)
不包括 WorkerSignals 类和 QRunnable 子类,这个例子包括我通常做的。该示例正在解决相同的问题,但以(很可能)低效的方式。
编辑:带有完整工作示例的新示例time.sleep
如何不暂停线程并允许其他人工作。我觉得这async
可能是唯一的实现,但有没有快速修复,所以我不必改变我的整个应用程序?
解决方案
当我决定实际尝试这QTimer
门课时,最终的解决方案出现了。也许有更优化的解决方案,但这个似乎打了所有的复选框,即使它简单得令人担忧。
import random
import time
import traceback
from functools import partial
from PyQt5.QtCore import *
from PyQt5.QtGui import QFont
from PyQt5.QtWidgets import *
class WorkerSignals(QObject):
"""
Represents the signals a Worker can emit.
"""
finished = pyqtSignal()
starting = pyqtSignal(int) # ID of thread
result = pyqtSignal(tuple) # Tuple refresh result, result and ID
class Worker(QRunnable):
"""
A worker designed to tell when it's starting, when it's finished and the result.
Designed to work around Thread.refresh().
"""
def __init__(self, fn, thread_id, *args, **kwargs):
super(Worker, self).__init__()
# Store constructor arguments (re-used for processing)
self.fn = fn
self.id = thread_id
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
@pyqtSlot()
def run(self):
"""
Runs a given method, and emits the result with the Worker's coordinated ID.
"""
try:
self.signals.starting.emit(self.id) # Thread is now finally ready to work.
result = self.fn(*self.args, **self.kwargs) # Refresh Thread!
self.signals.result.emit(result) # Thread is finished, emit result tuple.
except:
traceback.print_exc()
finally:
self.signals.finished.emit() # Done
class Thread(object):
"""
Basic Rules for a Thread Object:
Cannot store the next timestamp on the object (it's a database object, I don't believe it's good practice
to be creating sessions over and over to simply read/write the access time.
ID and Active are allowed as booleans.
"""
i = -1
def __init__(self):
self.id = Thread.nextID()
self.active = True
self.refreshes = 0
def refresh(self) -> tuple:
"""
'Refreshes' a thread. Waits a specific period, then decides whether Thread object should be deactivated or
returned from additional refreshes. Chance of deactivation lowers with each refresh.
:return: The refresh result, a tuple with a boolean and the thread's ID (for identifying it later)
"""
# Represents my SQL Alchemy Model's refresh() function
self.refreshes += 1
time.sleep(random.randint(2, 5))
if random.random() <= max(0.1, 1.0 - ((self.refreshes + 5) / 10)):
self.active = False
return self.active, self.id
@staticmethod
def getRefreshTime() -> float:
"""
Represents the amount of time before a thread should be refreshed.
Should NOT be used to determine whether the thread is still active or not.
:return: The time period that should be waited.
"""
return random.uniform(10, 300)
@staticmethod
def nextID() -> int:
"""
Returns integer thread IDs in sequence to remove possibility of duplicate IDs.
:return: Integer Thread ID
"""
Thread.i += 1
return Thread.i
def __repr__(self):
return f'Thread(id={self.id} active={self.active})'
class MainWindow(QMainWindow):
"""
GUI containing a Label, Button and ListWidget showing all the active sleeping/working threads.
Manages a threadpool, a number of background singleshot timers, etc.
"""
def __init__(self, *args, **kwargs):
super(MainWindow, self).__init__(*args, **kwargs)
# Widgets Setup
layout = QVBoxLayout()
self.list = QListWidget()
self.l = QLabel("Total Active: 0")
self.button = QPushButton("Refresh List")
self.button.pressed.connect(self.refreshList)
self.button.setDisabled(True)
layout.addWidget(self.l)
layout.addWidget(self.button)
layout.addWidget(self.list)
w = QWidget()
w.setLayout(layout)
self.setCentralWidget(w)
self.show()
# Periodically add threads to the pool.
self.poolTimer = QTimer()
self.poolTimer.setInterval(5_000)
self.poolTimer.timeout.connect(self.addThreads)
# Threading Setup
self.threadpool = QThreadPool()
print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())
self.active, self.threads = {}, {}
# Add a number of threads to start with.
for _ in range(random.randint(5, 16)):
self.setupThread(Thread())
self.poolTimer.start()
def refreshList(self):
"""
Refreshes the ListWidget in the GUI with all the active/sleeping/working threads.
"""
self.list.clear()
bold = QFont()
bold.setBold(True)
active = 0
for thread in self.threads.values():
item = QListWidgetItem(
f'Thread {thread.id}/{thread.refreshes}')
# Bold a thread if it's working
if self.active[thread.id]:
active += 1
item.setFont(bold)
self.list.addItem(item)
self.l.setText(f'Total Active: {active}/{len(self.threads)}')
def refreshResult(self, result) -> None:
"""
When a thread is finished, the result determines it's next course of action, which is either
to return to the pool again, or delete itself.
:param result: A tuple containing the result (bool) and the connected Thread ID.
"""
self.active[result[1]] = False
if result[0]:
print(f'Restarting Thread {result[1]}')
self.setupThread(self.threads[result[1]]) # Add by ID, which would normally be a database GET
else:
print(f'Thread {result[1]} shutting down.')
del self.active[result[1]]
del self.threads[result[1]]
self.refreshList()
def updateActivity(self, thread_id) -> None:
"""
Connected to the starting signal, helps signal when a thread is actually being refreshed.
:param thread_id: The Thread ID
"""
print(f'Thread {thread_id} is now active/working.')
self.active[thread_id] = True
def refresh(self, thread):
"""
Adds a new worker to the threadpool to be refreshed.
Can't be considered a real start to the thread.refresh function, as the pool has a max of 12 workers at any time.
The 'starting' signal can tell us when a specific thread is actually being refreshed, and is represented
as a Bold element in the list.
:param thread: A thread instance.
"""
print(f'Adding Thread {thread.id} to the pool.')
worker = Worker(thread.refresh, thread_id=thread.id)
worker.signals.result.connect(self.refreshResult)
worker.signals.starting.connect(self.updateActivity)
self.threadpool.start(worker)
# self.active[thread.id] = True
self.refreshList()
def setupThread(self, thread) -> None:
"""
Adds a new timer designated to start a specific thread.
:param thread: A thread instance.
"""
self.active[thread.id] = False
self.threads[thread.id] = thread
t = QTimer()
period = thread.getRefreshTime()
t.singleShot(period * 1000, partial(self.refresh, thread=thread))
print(f'Thread {thread.id} will start in {period} seconds.')
self.refreshList()
def addThreads(self):
"""
Adds a number of threads to the pool. Called automatically every couple seconds.
"""
add = max(0, 30 + random.randint(-5, 5) - len(self.threads))
if add > 0:
print(f'Adding {add} thread{"s" if add > 1 else ""}.')
for _ in range(add):
self.setupThread(Thread())
app = QApplication([])
window = MainWindow()
app.exec_()
当请求线程时,会创建一个 Timer 并singleShot
在一个额外的函数上触发,该函数会将其添加到线程池中。这个线程池可以处理多达 12 个不断刷新的“刷新”线程,并且信号允许 GUI 在发现更改的那一刻进行更新。
数以千计的“线程”对象可能正在等待,并且似乎singleShot
能够在需要时准确地将它们添加到池中。
信号有助于区分线程何时为sleeping
,working
和active
(但inactive
Thread 对象被立即删除)。
我能想到的关于这个程序的唯一警告是:
1) QThread 实现能打败它吗?
2)QTimer
一旦它的singleshot
函数被执行和触发,会发生什么?它们会被正确地 GC 处理,还是会在后台建立数千个消耗资源?
推荐阅读
- excel - 仅当用户通过远程桌面连接时才会引发远程过程调用 (RPC) 错误
- r - 当 for 循环中的 if 语句为 false 并且我不想对 false 进行任何操作然后直接测试下一个值时,我该怎么办?
- javascript - 无法从网络工作者那里获取数据
- python - 在 Python 中打印函数参数名称和值
- sql - 级联删除到另一个表的行
- yocto - yocto 中有哪些不同的功能,就像 do_compile_append 一样?
- blazor - 使用不与 Blazor 服务器一起使用的按钮事件
- php - Composer 不会安装我最新的标签,但总是会安装一个旧版本
- c++ - 将 *argv[] 中的偶数/奇数元素添加到数组
- python - Numpy 在数组的每个元素上应用条件