首页 > 解决方案 > 如何获得多处理队列的确切大小???文档说它不可靠。有什么办法可以解决这个问题?

问题描述

我有这段代码,它有两个进程,一个是入队,另一个是出队。

我需要每 60 秒检查一次队列的大小。q.size() 确实给出了结果,但我希望结果应该是准确的。那么还有其他方法吗?

确切地说,我需要监控每分钟队列的输入和输出以及每分钟队列的大小。

from multiprocessing import Queue, Process
import os
import time
import datetime as dt
import statsd
import random

statsd_client = statsd.StatsClient(host="localhost", port=8125, 
prefix=None, maxudpsize=512, ipv6=False)


q = Queue()


#put_timer = statsd_client.timer('put')
def queue_add_proc1():
    print("process 1 Id :", os.getpid())
    print("adding items to queue")
    x = 0
    upload_time = time.time()

    enque_count=0

    while x < 10000:
        #put_timer.start()
        curr_time = time.time()
        if curr_time - upload_time > 60:
            statsd_client.incr('enque_count_everyMinute', enque_count)
            statsd_client.incr('queue_size_enqueing', q.qsize())

            print("metric sent")
            enque_count = 0
            upload_time = curr_time
        q.put(x*2)
        #put_timer.stop(send=True)
        print("added to queue")
        x =x+ 1
        enque_count+=1
        time.sleep(0.014)
        print("done")


#pop_timer = statsd_client.timer('get')
def queue_pop_proc2():
    print("Process 2 ID :",os.getpid())
    print("popping values from queue")
    upload_time = time.time()
    deque_count = 0
    while not q.empty():
        curr_time = time.time()
        if curr_time - upload_time > 60:
            # upload dequed count
            statsd_client.incr('deque_count_everyMinute', deque_count)
            statsd_client.incr('queue_size_dequeing', q.qsize())
            print("metric sent")
            deque_count = 0
            upload_time = curr_time

        print(" popped item ", q.get())
        print("dequeued")
        deque_count += 1
        time.sleep(0.03)


if __name__ == '__main__':
    msgs_added_each_minute = list()
    msgs_popped_each_minute = list()

    print("Main process ID :", os.getpid())

    p1 = Process(target=queue_add_proc1)

    p2 = Process(target=queue_pop_proc2)

    p1.start()
    p2.start()

    p2.join()
    p1.join()

标签: python-3.xtimequeuemultiprocessing

解决方案


的结果queue.qsize()是准确的。不准确之处在于用户经常使用它的方式。

由于Queue在不同线程/进程之间同时共享,当进程 A 检查其大小时,进程 B 可能会交错执行并更改它。这可能会导致逻辑问题。

考虑以下示例:

queue.put("something")

def process_a(queue):
    """Does something if queue has at least one element."""
    if queue.qsize() > 0:
        # now, Process B takes over and steals the only element in the queue
        element = queue.get()  # UNEXPECTED: process A will block here 
        do_something(element)

def process_b(queue):
    """Gets an element from the queue."""
    queue.get()

a = Process(process_a, args=(queue,))
b = Process(process_b, args=(queue,))
a.start()
b.start()

这里的问题是逻辑依赖于队列特定的状态并对其进行操作。然而,队列状态可能在并发场景中的任何时候发生变化。因此,上述逻辑不会按预期工作。

对于您的特定用例,这应该不是问题,因为您正在做的是在某个时间点监视队列状态。因此,您不必关心队列在一毫秒后是否具有不同的大小。您关心的是队列的平均吞吐量。


推荐阅读