首页 > 解决方案 > Python服务器:如何在确保一次只处理一个任务的同时连续排队任务

问题描述

我有两项任务,其中一项每两秒调用一次,另一项随机调用一次。两者都需要在上一个调用完成之前访问一个无法调用的对象(如果发生这种情况,我需要手动重新启动硬件设备)。

该对象来自一个允许通过套接字与硬件设备通信的类。

为此,我创建了一个线程类,以便在后台运行所有内容并且不会阻止其他任务。在这个类中,我实现了一个队列:两个不同的函数将任务放入队列中,一个工作人员应该执行任务!!不!同时。

由于整个项目是一个服务器,它应该连续运行。

好吧,这是我的代码,它显然不起作用。如果有人知道如何解决这个问题,我会很高兴。

更新:26.10.2020 为了让我的问题更清楚,我根据 Artiom Kozyrev 的回答更新了代码。

import time
from threading import Lock, Thread
import threading
from queue import Queue


class ThreadWorker(Thread):
    def __init__(self, _lock: Lock, _queue: Queue, name: str):
        # daemon=False means that process waits until all threads are finished
        # (not only main one and garbage collector)
        super().__init__(name=name, daemon=False)
        # lock prevents several worker threads do work simultaneously
        self.lock = _lock
        # tasks are send from the main thread via Queue
        self.queue = _queue

    def do_work(self, job):
        # lock context manager prevents other worker threads from working in the same time
        with self.lock:
            time.sleep(3)
            print(f"{threading.current_thread().getName()}: {job * 10}")

    def run(self):
        while True:
            job = self.queue.get()
            # "poison pillow" - stop message from queue
            if not job:
                break
            self.do_work(job)

def TimeStamp(msg):
    tElapsed = (time.time() - tStart)  # Display Thread Info
    sElap = int(tElapsed)
    msElap = int((tElapsed - sElap) * 1000)
    usElap = int((tElapsed - sElap - msElap / 1000) * 1000000)
    print(msg , ': ',  sElap, 's', msElap, 'ms', usElap, 'us')

def f1():
    TimeStamp("f1 start")
    time.sleep(2)
    TimeStamp("f1 finished")

def f2():
    TimeStamp("f2 start")
    time.sleep(6)
    TimeStamp("f2 finished")

def insertf1():
    for i in range(10):
        q.put(f1())
        time.sleep(2)

def insertf2():
    for i in range(10):
        time.sleep(10)
        q.put(f2())



q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)]  # create workers
for w in workers:
    w.start()

tStart = time.time()
threading.Thread(target=insertf1, daemon=True).start()
threading.Thread(target=insertf2, daemon=True).start()

输出是:

f1 开始:0 秒 0 毫秒 0 秒

f1 完成:2 秒 2 毫秒 515 微秒

f1 开始:4 秒 9 毫秒 335 微秒

f1 完成:6 秒 9 毫秒 932 微秒

f1 开始:8 秒 17 毫秒 428 微秒

f2 开始:10 秒 12 毫秒 794 微秒

f1 完成:10 s 28 ms 633 us

f1 开始:12 秒 29 毫秒 182 微秒

f1完成:14 s 34 ms 411 us

f2完成:16 s 19 ms 330 us

f1 在 f2 完成之前就开始了,这是需要避免的。

标签: pythonmultithreadingobjectqueue

解决方案


为此,您需要结合QueueLock。锁将阻止工作线程同时工作。在下面找到代码示例:

import time
from threading import Lock, Thread
import threading
from queue import Queue


class ThreadWorker(Thread):
    def __init__(self, _lock: Lock, _queue: Queue, name: str):
        # daemon=False means that process waits until all threads are finished 
        # (not only main one and garbage collector)
        super().__init__(name=name, daemon=False) 
        # lock prevents several worker threads do work simultaneously
        self.lock = _lock
        # tasks are send from the main thread via Queue
        self.queue = _queue

    def do_work(self, job):
        # lock context manager prevents other worker threads from working in the same time
        with self.lock:
            time.sleep(3)
            print(f"{threading.current_thread().getName()}: {job * 10}")

    def run(self):
        while True:
            job = self.queue.get()
            # "poison pillow" - stop message from queue
            if not job:
                break
            self.do_work(job)


if __name__ == '__main__':
    q = Queue()
    lock = Lock()
    workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)]  # create workers
    for w in workers:
        w.start()
    # produce tasks
    for i in range(10):
        q.put(i)
    # stop tasks with "poison pillow"
    for i in range(len(workers)):
        q.put(None)

根据对问题的补充进行编辑(已添加锁定)

主要思想是不应该在没有 Lock 的情况下运行 f1 和 f2。

import time
from threading import Lock, Thread
import threading
from queue import Queue


class ThreadWorker(Thread):
    def __init__(self, _lock: Lock, _queue: Queue, name: str):
        # daemon=False means that process waits until all threads are finished
        # (not only main one and garbage collector)
        super().__init__(name=name, daemon=False)
        # lock prevents several worker threads do work simultaneously
        self.lock = _lock
        # tasks are send from the main thread via Queue
        self.queue = _queue

    def do_work(self, f):
        # lock context manager prevents other worker threads from working in the same time
        with self.lock:
            time.sleep(3)
            print(f"{threading.current_thread().getName()}: {f()}")

    def run(self):
        while True:
            job = self.queue.get()
            # "poison pillow" - stop message from queue
            if not job:
                break
            self.do_work(job)


def TimeStamp(msg):
    tElapsed = (time.time() - tStart)  # Display Thread Info
    sElap = int(tElapsed)
    msElap = int((tElapsed - sElap) * 1000)
    usElap = int((tElapsed - sElap - msElap / 1000) * 1000000)
    print(msg, ': ',  sElap, 's', msElap, 'ms', usElap, 'us')


def f1():
    TimeStamp("f1 start")
    time.sleep(1)
    TimeStamp("f1 finished")
    return f"Func-1-{threading.current_thread().getName()}"


def f2():
    TimeStamp("f2 start")
    time.sleep(3)
    TimeStamp("f2 finished")
    return f"Func-2-{threading.current_thread().getName()}"


def insertf1():
    for i in range(5):
        q.put(f1)  # do not run f1 here! Run it in worker thread with Lock


def insertf2():
    for i in range(5):
        q.put(f2) # do not run f2 here! Run it in worker thread with Lock


q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)]  # create workers
for w in workers:
    w.start()

tStart = time.time()
threading.Thread(target=insertf1, daemon=True).start()
threading.Thread(target=insertf2, daemon=True).start()

推荐阅读