python-3.x - 如何获得多处理队列的确切大小???文档说它不可靠。有什么办法可以解决这个问题?
问题描述
我有这段代码,它有两个进程,一个是入队,另一个是出队。
我需要每 60 秒检查一次队列的大小。q.size() 确实给出了结果,但我希望结果应该是准确的。那么还有其他方法吗?
确切地说,我需要监控每分钟队列的输入和输出以及每分钟队列的大小。
from multiprocessing import Queue, Process
import os
import time
import datetime as dt
import statsd
import random
statsd_client = statsd.StatsClient(host="localhost", port=8125,
prefix=None, maxudpsize=512, ipv6=False)
q = Queue()
#put_timer = statsd_client.timer('put')
def queue_add_proc1():
print("process 1 Id :", os.getpid())
print("adding items to queue")
x = 0
upload_time = time.time()
enque_count=0
while x < 10000:
#put_timer.start()
curr_time = time.time()
if curr_time - upload_time > 60:
statsd_client.incr('enque_count_everyMinute', enque_count)
statsd_client.incr('queue_size_enqueing', q.qsize())
print("metric sent")
enque_count = 0
upload_time = curr_time
q.put(x*2)
#put_timer.stop(send=True)
print("added to queue")
x =x+ 1
enque_count+=1
time.sleep(0.014)
print("done")
#pop_timer = statsd_client.timer('get')
def queue_pop_proc2():
print("Process 2 ID :",os.getpid())
print("popping values from queue")
upload_time = time.time()
deque_count = 0
while not q.empty():
curr_time = time.time()
if curr_time - upload_time > 60:
# upload dequed count
statsd_client.incr('deque_count_everyMinute', deque_count)
statsd_client.incr('queue_size_dequeing', q.qsize())
print("metric sent")
deque_count = 0
upload_time = curr_time
print(" popped item ", q.get())
print("dequeued")
deque_count += 1
time.sleep(0.03)
if __name__ == '__main__':
msgs_added_each_minute = list()
msgs_popped_each_minute = list()
print("Main process ID :", os.getpid())
p1 = Process(target=queue_add_proc1)
p2 = Process(target=queue_pop_proc2)
p1.start()
p2.start()
p2.join()
p1.join()
解决方案
的结果queue.qsize()
是准确的。不准确之处在于用户经常使用它的方式。
由于Queue
在不同线程/进程之间同时共享,当进程 A 检查其大小时,进程 B 可能会交错执行并更改它。这可能会导致逻辑问题。
考虑以下示例:
queue.put("something")
def process_a(queue):
"""Does something if queue has at least one element."""
if queue.qsize() > 0:
# now, Process B takes over and steals the only element in the queue
element = queue.get() # UNEXPECTED: process A will block here
do_something(element)
def process_b(queue):
"""Gets an element from the queue."""
queue.get()
a = Process(process_a, args=(queue,))
b = Process(process_b, args=(queue,))
a.start()
b.start()
这里的问题是逻辑依赖于队列特定的状态并对其进行操作。然而,队列状态可能在并发场景中的任何时候发生变化。因此,上述逻辑不会按预期工作。
对于您的特定用例,这应该不是问题,因为您正在做的是在某个时间点监视队列状态。因此,您不必关心队列在一毫秒后是否具有不同的大小。您关心的是队列的平均吞吐量。
推荐阅读
- css - 颜色选择器弹出窗口超出当前视图
- json - 在 OData Web api 中控制 JSON 日期时间序列化
- oauth - Google Actions 帐户关联:在测试中不起作用
- snort - Snort 间歇性地在数据包上丢失警报
- c++ - Eigen::VectorXd::operator += 似乎比通过 std::vector 循环慢约 69%
- python - 狮身人面像警告:文档不包含在任何目录树中
- google-sheets - 谷歌表格公式根据条件将单元格留空
- python - 如何在python中打开svg url
- apache-flink - 使用 Log4j2 进行 Flink
- php - PHP - 当键包含小数位时如何检查数组键是否存在?