首页 > 解决方案 > 如何阻止我的进程空闲或被杀死?

问题描述

我需要处理数百万用户。我有数百万个 user_id,我从 http 请求中获取用户数据并写入文件。

我正在使用多处理来执行一批这些任务。然后我在每个进程中使用多线程来批量执行任务。这显着提高了性能,并使我能够以更快的速度处理更多用户。

问题

我发现经过一定时间后,所有进程都变得不活跃。我通过查看活动监视器知道这一点。一开始我可以看到他们使用了大量的 cpu 并且有线程,一段时间后他们似乎空闲并且我的程序挂起。

import os
import time
import logging
import multiprocessing
import config
import json
from google.cloud import storage
from pymongo import MongoClient, UpdateOne
from queue import Queue
import threading
from multiprocessing import Pool, cpu_count

PROCESSES = multiprocessing.cpu_count() - 1

def get_tweet_objects(user, counter, lock, proc):

   # Removed ( calls a http request and writes json file to disk

    lock.acquire()
      try:
        counter.value = counter.value + 1
      finally:
        lock.release()

    print("APP ID: {app_id}, REMAINING: {app_remaining}, TOTAL USERS: {total_users}, USER: {user_id}, NO OF TWEETS: {no_tweets}, TIME TAKEN: {time_taken}"
          .format(app_id=app.APP_ID, app_remaining=0, total_users=counter.value, user_id=user["user_id"], no_tweets=len(total_tweets), time_taken=round((end - start), 2)), threading.current_thread().name, proc)


def add_tasks(task_queue, tasks):

    for task in tasks:
        task_queue.put(task)

    return task_queue


def process_tasks(task_queue, counter, lock):

    logger = multiprocessing.get_logger()
    proc = os.getpid()
    while not task_queue.empty():
        try:
            user = task_queue.get()
            do_multithreading(user, counter, lock, proc)

        except Exception as e:
            logger.error(e)
        logger.info(f'Process {proc} completed successfully')
    return True


def manage_queue(task_queue, counter, lock, proc):

    while True:
        user = task_queue.get()
        get_tweet_objects(user, counter, lock, proc)
        task_queue.task_done()


def do_multithreading(batches, counter, lock, proc):
    """Starts the multithreading"""

    # Set the number of threads.
    number_of_threads = 5

    # Initializes the queue.
    task_queue = Queue()

    # Starts the multithreading
    for i in range(number_of_threads):
        t = threading.Thread(target=manage_queue, args=[
                             task_queue, counter, lock, proc])
        t.daemon = True
        t.start()

    for batch in batches:
        task_queue.put(batch)
    task_queue.join()


def run():

    mongodb = MongoClient(host=config.MONGO_URI)["twitter"]

    existing_users = mongodb[SCREEN_NAME].find({}).limit(10000)

    batches = create_batches_of_100(existing_users)

    empty_task_queue = multiprocessing.Manager().Queue()
    full_task_queue = add_tasks(empty_task_queue, batches)
    processes = []

    counter = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    print(f'Running with {PROCESSES} processes!')
    start = time.time()
    for w in range(PROCESSES):
        p = multiprocessing.Process(
            target=process_tasks, args=(full_task_queue, counter, lock))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    print(f'Time taken = {time.time() - start:.10f}')


if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.ERROR)
    run()

标签: pythonpython-3.xpython-multiprocessingpython-multithreading

解决方案


因此,代码存在多个问题。首先,不惜一切代价避免无限循环,例如manage_queue功能。注意:我不是说“避免while True:”,因为这并不意味着它是一个无限循环(例如你可以在break里面)。

话虽如此,最大的问题(我们在聊天中的长时间讨论中发现)是该get_tweet_object()函数有时会因异常而失败,并且当这种情况发生时task_queue.task_done()永远不会被调用,因此task_queue.join()永远不会退出。

另一个问题是混合while not task_queue.empty():task_queue.get()一种竞争条件。当两个并行线程运行并且task_queue恰好有 1 个元素时会发生什么?其中一个将永远挂起。这应该替换为task_queue.get(False)适当的queue.Empty捕获。它看起来像化妆品,但事实是比赛条件是.get()随叫随到的。这样,您还需要在生成线程之前填充队列。

总而言之,这里有变化:

from queue import Empty

def do_multithreading(batches, counter, lock, proc):
    """Starts the multithreading"""

    # Set the number of threads.
    number_of_threads = 5

    # Initializes the queue.
    for batch in batches:
        task_queue.put(batch)

    # Starts the multithreading
    for i in range(number_of_threads):
        t = threading.Thread(target=manage_queue, args=[
                             task_queue, counter, lock, proc])
        t.daemon = True
        t.start()
    task_queue.join()

def manage_queue(task_queue, counter, lock, proc):
    while True:
        try:
            user = task_queue.get(False)
        except Empty:
            break

        try:
            get_tweet_objects(user, counter, lock, proc)
        except Exception as exc:
            print(exc)
        finally:
            task_queue.task_done()

def process_tasks(task_queue, counter, lock):
    logger = multiprocessing.get_logger()
    proc = os.getpid()
    while True:
        try:
            user = task_queue.get(False)
        except Empty:
            break
        try:
            do_multithreading(user, counter, lock, proc)
        except Exception as e:
            logger.error(e)
        logger.info(f'Process {proc} completed successfully')
    return True

话虽如此,我强烈建议使用process/thread executors


推荐阅读