python - 如何阻止我的进程空闲或被杀死?
问题描述
我需要处理数百万用户。我有数百万个 user_id,我从 http 请求中获取用户数据并写入文件。
我正在使用多处理来执行一批这些任务。然后我在每个进程中使用多线程来批量执行任务。这显着提高了性能,并使我能够以更快的速度处理更多用户。
问题:
我发现经过一定时间后,所有进程都变得不活跃。我通过查看活动监视器知道这一点。一开始我可以看到他们使用了大量的 cpu 并且有线程,一段时间后他们似乎空闲并且我的程序挂起。
import os
import time
import logging
import multiprocessing
import config
import json
from google.cloud import storage
from pymongo import MongoClient, UpdateOne
from queue import Queue
import threading
from multiprocessing import Pool, cpu_count
PROCESSES = multiprocessing.cpu_count() - 1
def get_tweet_objects(user, counter, lock, proc):
# Removed ( calls a http request and writes json file to disk
lock.acquire()
try:
counter.value = counter.value + 1
finally:
lock.release()
print("APP ID: {app_id}, REMAINING: {app_remaining}, TOTAL USERS: {total_users}, USER: {user_id}, NO OF TWEETS: {no_tweets}, TIME TAKEN: {time_taken}"
.format(app_id=app.APP_ID, app_remaining=0, total_users=counter.value, user_id=user["user_id"], no_tweets=len(total_tweets), time_taken=round((end - start), 2)), threading.current_thread().name, proc)
def add_tasks(task_queue, tasks):
for task in tasks:
task_queue.put(task)
return task_queue
def process_tasks(task_queue, counter, lock):
logger = multiprocessing.get_logger()
proc = os.getpid()
while not task_queue.empty():
try:
user = task_queue.get()
do_multithreading(user, counter, lock, proc)
except Exception as e:
logger.error(e)
logger.info(f'Process {proc} completed successfully')
return True
def manage_queue(task_queue, counter, lock, proc):
while True:
user = task_queue.get()
get_tweet_objects(user, counter, lock, proc)
task_queue.task_done()
def do_multithreading(batches, counter, lock, proc):
"""Starts the multithreading"""
# Set the number of threads.
number_of_threads = 5
# Initializes the queue.
task_queue = Queue()
# Starts the multithreading
for i in range(number_of_threads):
t = threading.Thread(target=manage_queue, args=[
task_queue, counter, lock, proc])
t.daemon = True
t.start()
for batch in batches:
task_queue.put(batch)
task_queue.join()
def run():
mongodb = MongoClient(host=config.MONGO_URI)["twitter"]
existing_users = mongodb[SCREEN_NAME].find({}).limit(10000)
batches = create_batches_of_100(existing_users)
empty_task_queue = multiprocessing.Manager().Queue()
full_task_queue = add_tasks(empty_task_queue, batches)
processes = []
counter = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
print(f'Running with {PROCESSES} processes!')
start = time.time()
for w in range(PROCESSES):
p = multiprocessing.Process(
target=process_tasks, args=(full_task_queue, counter, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f'Time taken = {time.time() - start:.10f}')
if __name__ == '__main__':
multiprocessing.log_to_stderr(logging.ERROR)
run()
解决方案
因此,代码存在多个问题。首先,不惜一切代价避免无限循环,例如manage_queue
功能。注意:我不是说“避免while True:
”,因为这并不意味着它是一个无限循环(例如你可以在break
里面)。
话虽如此,最大的问题(我们在聊天中的长时间讨论中发现)是该get_tweet_object()
函数有时会因异常而失败,并且当这种情况发生时task_queue.task_done()
永远不会被调用,因此task_queue.join()
永远不会退出。
另一个问题是混合while not task_queue.empty():
是task_queue.get()
一种竞争条件。当两个并行线程运行并且task_queue
恰好有 1 个元素时会发生什么?其中一个将永远挂起。这应该替换为task_queue.get(False)
适当的queue.Empty
捕获。它看起来像化妆品,但事实是比赛条件是.get()
随叫随到的。这样,您还需要在生成线程之前填充队列。
总而言之,这里有变化:
from queue import Empty
def do_multithreading(batches, counter, lock, proc):
"""Starts the multithreading"""
# Set the number of threads.
number_of_threads = 5
# Initializes the queue.
for batch in batches:
task_queue.put(batch)
# Starts the multithreading
for i in range(number_of_threads):
t = threading.Thread(target=manage_queue, args=[
task_queue, counter, lock, proc])
t.daemon = True
t.start()
task_queue.join()
def manage_queue(task_queue, counter, lock, proc):
while True:
try:
user = task_queue.get(False)
except Empty:
break
try:
get_tweet_objects(user, counter, lock, proc)
except Exception as exc:
print(exc)
finally:
task_queue.task_done()
def process_tasks(task_queue, counter, lock):
logger = multiprocessing.get_logger()
proc = os.getpid()
while True:
try:
user = task_queue.get(False)
except Empty:
break
try:
do_multithreading(user, counter, lock, proc)
except Exception as e:
logger.error(e)
logger.info(f'Process {proc} completed successfully')
return True
话虽如此,我强烈建议使用process/thread executors。
推荐阅读
- java - 如何在 SpringBootTest 中模拟 Spring 的 @Retryable 属性,例如 maxAttemps 和延迟
- haskell - 如何在 Haskell 中为唱片店建模
- javascript - 如何使用 Javascript 删除最外层的 XML 元素标签对?
- javascript - 使用 jQuery 从主页清除用户控件中的 ASP TextBox 文本
- c# - 将 ASP.NET Idenity 添加到项目后无法更新数据库
- java - Java 数字格式验证
- reactjs - 如何在模拟提交点击时使用 jest/enzyme 验证 antd 表单字段
- rest - GraphQL 用于读取,Rest 用于写入
- python - Python:将数据框的 2 列附加在一起
- java - 如何在另一个注释中创建和使用具有给定日期的 DateFormat?