python - Python服务器:如何在确保一次只处理一个任务的同时连续排队任务
问题描述
我有两项任务,其中一项每两秒调用一次,另一项随机调用一次。两者都需要在上一个调用完成之前访问一个无法调用的对象(如果发生这种情况,我需要手动重新启动硬件设备)。
该对象来自一个允许通过套接字与硬件设备通信的类。
为此,我创建了一个线程类,以便在后台运行所有内容并且不会阻止其他任务。在这个类中,我实现了一个队列:两个不同的函数将任务放入队列中,一个工作人员应该执行任务!!不!同时。
由于整个项目是一个服务器,它应该连续运行。
好吧,这是我的代码,它显然不起作用。如果有人知道如何解决这个问题,我会很高兴。
更新:26.10.2020 为了让我的问题更清楚,我根据 Artiom Kozyrev 的回答更新了代码。
import time
from threading import Lock, Thread
import threading
from queue import Queue
class ThreadWorker(Thread):
def __init__(self, _lock: Lock, _queue: Queue, name: str):
# daemon=False means that process waits until all threads are finished
# (not only main one and garbage collector)
super().__init__(name=name, daemon=False)
# lock prevents several worker threads do work simultaneously
self.lock = _lock
# tasks are send from the main thread via Queue
self.queue = _queue
def do_work(self, job):
# lock context manager prevents other worker threads from working in the same time
with self.lock:
time.sleep(3)
print(f"{threading.current_thread().getName()}: {job * 10}")
def run(self):
while True:
job = self.queue.get()
# "poison pillow" - stop message from queue
if not job:
break
self.do_work(job)
def TimeStamp(msg):
tElapsed = (time.time() - tStart) # Display Thread Info
sElap = int(tElapsed)
msElap = int((tElapsed - sElap) * 1000)
usElap = int((tElapsed - sElap - msElap / 1000) * 1000000)
print(msg , ': ', sElap, 's', msElap, 'ms', usElap, 'us')
def f1():
TimeStamp("f1 start")
time.sleep(2)
TimeStamp("f1 finished")
def f2():
TimeStamp("f2 start")
time.sleep(6)
TimeStamp("f2 finished")
def insertf1():
for i in range(10):
q.put(f1())
time.sleep(2)
def insertf2():
for i in range(10):
time.sleep(10)
q.put(f2())
q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)] # create workers
for w in workers:
w.start()
tStart = time.time()
threading.Thread(target=insertf1, daemon=True).start()
threading.Thread(target=insertf2, daemon=True).start()
输出是:
f1 开始:0 秒 0 毫秒 0 秒
f1 完成:2 秒 2 毫秒 515 微秒
f1 开始:4 秒 9 毫秒 335 微秒
f1 完成:6 秒 9 毫秒 932 微秒
f1 开始:8 秒 17 毫秒 428 微秒
f2 开始:10 秒 12 毫秒 794 微秒
f1 完成:10 s 28 ms 633 us
f1 开始:12 秒 29 毫秒 182 微秒
f1完成:14 s 34 ms 411 us
f2完成:16 s 19 ms 330 us
f1 在 f2 完成之前就开始了,这是需要避免的。
解决方案
为此,您需要结合Queue
和Lock
。锁将阻止工作线程同时工作。在下面找到代码示例:
import time
from threading import Lock, Thread
import threading
from queue import Queue
class ThreadWorker(Thread):
def __init__(self, _lock: Lock, _queue: Queue, name: str):
# daemon=False means that process waits until all threads are finished
# (not only main one and garbage collector)
super().__init__(name=name, daemon=False)
# lock prevents several worker threads do work simultaneously
self.lock = _lock
# tasks are send from the main thread via Queue
self.queue = _queue
def do_work(self, job):
# lock context manager prevents other worker threads from working in the same time
with self.lock:
time.sleep(3)
print(f"{threading.current_thread().getName()}: {job * 10}")
def run(self):
while True:
job = self.queue.get()
# "poison pillow" - stop message from queue
if not job:
break
self.do_work(job)
if __name__ == '__main__':
q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)] # create workers
for w in workers:
w.start()
# produce tasks
for i in range(10):
q.put(i)
# stop tasks with "poison pillow"
for i in range(len(workers)):
q.put(None)
根据对问题的补充进行编辑(已添加锁定)
主要思想是不应该在没有 Lock 的情况下运行 f1 和 f2。
import time
from threading import Lock, Thread
import threading
from queue import Queue
class ThreadWorker(Thread):
def __init__(self, _lock: Lock, _queue: Queue, name: str):
# daemon=False means that process waits until all threads are finished
# (not only main one and garbage collector)
super().__init__(name=name, daemon=False)
# lock prevents several worker threads do work simultaneously
self.lock = _lock
# tasks are send from the main thread via Queue
self.queue = _queue
def do_work(self, f):
# lock context manager prevents other worker threads from working in the same time
with self.lock:
time.sleep(3)
print(f"{threading.current_thread().getName()}: {f()}")
def run(self):
while True:
job = self.queue.get()
# "poison pillow" - stop message from queue
if not job:
break
self.do_work(job)
def TimeStamp(msg):
tElapsed = (time.time() - tStart) # Display Thread Info
sElap = int(tElapsed)
msElap = int((tElapsed - sElap) * 1000)
usElap = int((tElapsed - sElap - msElap / 1000) * 1000000)
print(msg, ': ', sElap, 's', msElap, 'ms', usElap, 'us')
def f1():
TimeStamp("f1 start")
time.sleep(1)
TimeStamp("f1 finished")
return f"Func-1-{threading.current_thread().getName()}"
def f2():
TimeStamp("f2 start")
time.sleep(3)
TimeStamp("f2 finished")
return f"Func-2-{threading.current_thread().getName()}"
def insertf1():
for i in range(5):
q.put(f1) # do not run f1 here! Run it in worker thread with Lock
def insertf2():
for i in range(5):
q.put(f2) # do not run f2 here! Run it in worker thread with Lock
q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)] # create workers
for w in workers:
w.start()
tStart = time.time()
threading.Thread(target=insertf1, daemon=True).start()
threading.Thread(target=insertf2, daemon=True).start()
推荐阅读
- azure - Azure AD Go SDK 守护程序应用程序列表用户返回“访问令牌丢失或格式错误”
- java - 如何在特定会话/连接中配置 MySQL 连接字符串“useAffectedRows”?
- javascript - 单击按钮在列表元素之间切换类
- ubuntu - exec.Command() 的 Golang cmd.Output() 在 systemd 服务(ubuntu)中引发错误
- java - 将值传递给变量
- pandas - 如何从数据框中删除库存日期列
- java - Java-如何将对象列表添加到另一个对象列表中
- javascript - 我无法在任何浏览器中加载 javascript 散点图
- c++ - 为什么使用 `std::map::find` 来检查地图是否有键?
- html - 使用 JSP 和 HTML 读取文件并更改显示选项?