首页 > 解决方案 > 具有共享变量的 Python 多线程

问题描述

我正在尝试并行化我的工作,但我是多线程的新手,所以对具体的实现感到困惑。

我有一个套接字侦听器,它将数据保存到缓冲区。当缓冲区达到他的容量时,我需要将其数据保存到数据库中。在一个线程上我想启动套接字侦听器,而在并行任务上我想检查缓冲区状态。

BufferQueue只是 python 的扩展list,具有允许检查列表是否达到指定大小的方法。

SocketManagerSTREAM_URL我正在收听的流媒体数据提供者。它使用回调函数来处理消息

但是当我使用回调来检索数据时,我不确定使用共享变量是一个正确且最佳的决定

buffer = BufferQueue(buffer_size=10000)

def start_listening_to_sokcet(client):
    s = SocketManager(client)
    s.start_socket(cb_new)
    s.start()

def cb_new(message):
    print("New message")
    global buffer
    for m in message:
        #save data to buffer

def is_buffer_ready(buffer):
    global buffer
    print("Buffer state")
    if buffer.ready():
         #save buffer data to db

如果您能帮我解决这个问题,我将不胜感激

标签: pythonmultithreadingsocketswebsocketmultiprocessing

解决方案


我认为您正在寻找的只是queue模块。

Aqueue.Queue是一个自同步队列,专为在线程之间传递对象而设计。

默认情况下,get对队列的调用将阻塞,直到对象可用,这是您通常想要做的——在网络应用程序中使用线程进行并发的要点是,您的线程看起来都像普通的同步代码,但大部分时间都在使用当他们无事可做时,他们在套接字、文件、队列或其他任何东西上等待的时间。但是您可以使用 来检查而不阻塞block=False,或者timeout等待。

您还可以maxsize在构造队列时指定 a 。然后,默认情况下,put将阻塞,直到队列不太满而无法接受新对象。但是,同样,如果它太满,你可以使用blockortimeout尝试失败。

所有同步都在内部处理getand put,因此您不需要 aLock来保证线程安全或Condition向服务员发出信号。

队列甚至可以为您处理关机。生产者可以只是put一个特殊的值,告诉消费者在看到它时退出get

对于生产者需要等待消费者完成的优雅关闭,您可以task_done在消费者处理完每个排队对象后使用可选方法,并让生产者阻塞该join方法。但是如果你不需要这个——或者有另一种方式来等待关闭,例如,加入消费者线程——你可以跳过这部分。


推荐阅读