python - Python多处理队列没有得到所有要处理的元素
问题描述
对于所有活动的活动,我必须查询 TSDB API 的日期以获取每个活动 ID 的数据。所以我从数据库中获取所有活动 ID 并将其放入队列。在 Db 中,我有 430 个活动的活动 ID。
但是 python 代码在大约 100 个条目后终止,不知道原因,有人可以在这里指导我,但是如果我删除了 API 查询获取代码并只打印队列值 get(q.get()),Id 值获取 API 即将到来。
下面是代码
import mysql.connector
from datetime import datetime,timedelta
from datetime import date
import requests
import json
from collections import OrderedDict
from multiprocessing import Pool, Queue
from os import getpid
from time import sleep
from random import random
db = mysql.connector.connect(
host='HOSTNAME',
database='DB',
user='ROOT',
password='PASSWORD',
port='PORT'
)
print("Connection ID:", db.connection_id)
MAX_WORKERS=10
class Testing_mp(object):
def __init__(self):
"""
Initiates a queue, a pool and a temporary buffer, used only
when the queue is full.
"""
self.q = Queue()
self.pool = Pool(processes=MAX_WORKERS, initializer=self.worker_main,)
self.temp_buffer = []
def add_to_queue(self, msg):
"""
If queue is full, put the message in a temporary buffer.
If the queue is not full, adding the message to the queue.
If the buffer is not empty and that the message queue is not full,
putting back messages from the buffer to the queue.
"""
if self.q.full():
print("QISFULL",msg)
self.temp_buffer.append(msg)
else:
self.q.put(msg)
if len(self.temp_buffer) > 0:
add_to_queue(self.temp_buffer.pop())
def write_to_queue(self):
"""
This function writes some messages to the queue.
"""
mycursor = db.cursor()
mycursor.execute("select Id from Campaign where Status='ACTIVE' order by Id desc")
myresult = mycursor.fetchall()
for x in myresult:
self.add_to_queue(x[3])
sleep(random()*2)
db.close() # close the connection
def worker_main(self):
"""
Waits indefinitely for an item to be written in the queue.
Finishes when the parent process terminates.
"""
print "Process {0} started".format(getpid())
while True:
# If queue is not empty, pop the next element and do the work.
# If queue is empty, wait indefinitly until an element get in the queue.
item = self.q.get(block=True,timeout=None)
start_date=datetime.today()
start_date=start_date.date()
end_date = start_date - timedelta(days=8)
start_date = start_date - timedelta(days=1)
print "{0} retrieved: {1}".format(getpid(), item)
#print("STARTDATE",type(start_date))
start_date_ft=start_date.strftime('%Y/%m/%d')
##print("ENDDATE",end_date)
end_date_ft=end_date.strftime('%Y/%m/%d')
url = "http://tsdb.metrics.com:4343/api/query"
if item is not None:
querystring = {"start":end_date_ft,"end":start_date_ft,"m":"avg:1d-avg:percentization{campaign="+str(item)+",type=seen}"}
print(querystring)
response = requests.request("GET", url,params=querystring)
print(response.text)
if response and response.text is not None:
loaded_json = json.loads(response.text,object_pairs_hook=OrderedDict)
for x in loaded_json:
for attribute, value in x.items():
if attribute is not None and attribute=="dps":
dps_data=loaded_json[0][attribute]
perValue=[]
if len(dps_data)>0:
for key,val in dps_data.items():
perValue.append(str(val))
print(str(item)+"==ITEM=="+key+"="+str(val))
print(perValue)
# simulate some random length operations
sleep(random()*1)
# Warning from Python documentation:
# Functionality within this package requires that the __main__ module be
# importable by the children. This means that some examples, such as the
# multiprocessing.Pool examples will not work in the interactive interpreter.
if __name__ == '__main__':
mp_class = Testing_mp()
mp_class.write_to_queue()
# Waits a bit for the child processes to do some work
# because when the parent exits, childs are terminated.
sleep(5)
解决方案
推荐阅读
- android - 我可以使用除我开发的 Android 应用程序以外的任何浏览器限制访问我的网站吗?
- openstreetmap - 如何在一个行政区域内找到所有方法
- python - 使用带有构造函数的 MySQL 连接器
- c - 为什么在 MASM 汇编中使用 FPU x87 指令集执行操作时得到无意义的数字?
- javascript - 不调用 ES6 导入
- wordpress - WordPress 自定义查询字符串未被 Google 索引
- css - HighCharts 组织节点重叠
- javascript - HttpRequest 和 XMLHttpRequest 之间的真正区别
- python-3.x - 如何将时间序列数据的多个文件重新采样为长度相同(观察次数相同)
- bash - 如何创建一个 bash 脚本来删除 mac 上的帐户