python - 允许多个python线程同时使用资源
问题描述
我正在编写一个应用程序,我希望允许多个线程同时使用相同的资源。我不明白这是怎么做到的。此刻,我已经走到了这一步:我有两个线程线程 a 和 b 以及两个锁。首先线程a运行,然后线程b。我也想有第三个线程——这不是问题——但我希望线程 a 和 b 同时执行,并且在它们完成后,允许第三个线程承担对公共资源的控制。
from threading import Lock, Thread
import logging
import time
class SynchronizedThreads:
def __init__(self):
# Initialize a list for threads.
self.threads = []
# Initialize lock objects.
self.lock_a = Lock()
self.lock_b = Lock()
# Set the logging format.
log_format = "%(asctime)s: %(message)s"
logging.basicConfig(format=log_format, level=logging.INFO, datefmt="%H:%M:%S")
def thread_a(self):
logging.info('Thread A is starting ...')
logging.info('Thread A is waiting to acquire lock A.')
self.lock_a.acquire()
logging.info('Thread A has acquired lock A, performing some calculation...')
time.sleep(2)
logging.info('Thread A is waiting to acquire lock B.')
self.lock_b.acquire()
logging.info('Thread A has acquired lock B, performing some calculation...')
time.sleep(2)
logging.info('Thread A is releasing both locks.')
self.lock_a.release()
self.lock_b.release()
def thread_b(self):
logging.info('Thread B is starting...')
logging.info('Thread B is waiting to acquire lock B.')
self.lock_a.acquire()
logging.info('Thread B has acquired lock B, performing some calculation...')
time.sleep(5)
logging.info('Thread B is waiting to acquire lock A.')
self.lock_b.acquire()
logging.info('Thread B has acquired lock A, performing some calculation...')
time.sleep(5)
logging.info('Thread B is releasing both locks.')
self.lock_b.release()
self.lock_a.release()
def start_threads(self):
for thread_func in [self.thread_a, self.thread_b]:
self.threads.append(Thread(target=thread_func))
self.threads[-1].start()
def join_threads(self):
for thread in self.threads:
thread.join()
def main():
sync_threads = SynchronizedThreads()
sync_threads.start_threads()
sync_threads.join_threads()
logging.info('Finished')
if __name__ == '__main__':
main()
解决方案
一种可能的解决方案是使用信号量。它可以被认为是一个线程安全的计数器,所以你有两件事需要做,让第三个线程在信号量上等待两次。如果线程 A 和 B 同时以某种方式完成,它会工作,但在这个简单的示例中,它将安全地等待它们一个接一个地完成:
from threading import Lock, Thread, Semaphore
import logging
import time
class SynchronizedThreads:
def __init__(self):
# Initialize a list for threads.
self.threads = []
# Initialize lock objects.
self.lock_a = Lock()
self.lock_b = Lock()
self.sem = Semaphore(0)
# Set the logging format.
log_format = "%(asctime)s: %(message)s"
logging.basicConfig(format=log_format, level=logging.INFO, datefmt="%H:%M:%S")
def thread_a(self):
logging.info('Thread A is starting ...')
logging.info('Thread A is waiting to acquire lock A.')
self.lock_a.acquire()
logging.info('Thread A has acquired lock A, performing some calculation...')
time.sleep(2)
logging.info('Thread A is waiting to acquire lock B.')
self.lock_b.acquire()
logging.info('Thread A has acquired lock B, performing some calculation...')
time.sleep(2)
logging.info('Thread A is releasing both locks.')
self.lock_a.release()
self.lock_b.release()
logging.info("Thread A is signaling that it's done to the semaphore")
self.sem.release()
def thread_b(self):
logging.info('Thread B is starting...')
logging.info('Thread B is waiting to acquire lock B.')
self.lock_a.acquire()
logging.info('Thread B has acquired lock B, performing some calculation...')
time.sleep(5)
logging.info('Thread B is waiting to acquire lock A.')
self.lock_b.acquire()
logging.info('Thread B has acquired lock A, performing some calculation...')
time.sleep(5)
logging.info('Thread B is releasing both locks.')
self.lock_b.release()
self.lock_a.release()
logging.info("Thread B is signaling that it's done to the semaphore")
self.sem.release()
def thread_c(self):
logging.info('Thread C is starting...')
# Two workers, wait for both of them
expected_workers = 2
for _ in range(expected_workers):
logging.info('Thread C is waiting for the semaphore...')
self.sem.acquire()
logging.info("Thread C got the semaphore")
logging.info("Thread C doing some work...")
time.sleep(5)
logging.info("Thread C all done")
def start_threads(self):
for thread_func in [self.thread_a, self.thread_b, self.thread_c]:
self.threads.append(Thread(target=thread_func))
self.threads[-1].start()
def join_threads(self):
for thread in self.threads:
thread.join()
def main():
sync_threads = SynchronizedThreads()
sync_threads.start_threads()
sync_threads.join_threads()
logging.info('Finished')
if __name__ == '__main__':
main()
推荐阅读
- java - 无法在 HandlerInterceptor postHandle() 方法中读取请求正文
- javascript - 将嵌套数组中的对象数组分组并将其总数添加到新数组中
- c# - 在 webapi .net core 3.1 上添加 app.useauthentication 时 Cors 失败
- apache-poi - Apache POI Excelsheet 到 pdf 文件
- git - 如何解决 GitHub 和 VisualStudio 上的烂摊子
- java - 如果我使用 STS 使用 java 8,那么 java 中 List.of() 的替代方法是什么
- asp.net-core-webapi - 并行文件下载 asp.net core
- abp - Abp vNext Rowaction 下拉按钮文本
- jenkins - TriggerRemoteJob - 连接到远程服务器失败
- parquet - 增量加载的 parquet 文件如何提高性能?