首页 > 解决方案 > 使用 modin 和 Ray 将数据集写入多个目录会莫名其妙地停顿

问题描述

问题

我正在尝试使用 ray、modin(带有 ray 后端)和 python 对多个目录执行 IO 操作。文件写入暂停,内存和磁盘使用量根本没有变化,程序被阻塞。

设置

我有一个射线演员这样设置

import os
os.environ["MODIN_ENGINE"] = "ray"  # Modin will use Ray

import ray
import modin.pandas as mpd
from numpy.core import numeric
from tqdm import tqdm


@ray.remote
class DatasetHelper:
    # Class Variables (static) are to be written here

    @ray.method(num_returns=1)
    def get_dataset(self):
        return self.dataset

    @ray.method(num_returns=1)
    def generate_dataset(self):
        # generates some dataset and returns a dictionary. 
        return {'status': 1,
                'data_dir': self.data_dir}

    @ray.method(num_returns=1)
    def get_config(self):
        return {
            "data_dir": self.data_dir,
            "data_map_dir": self.data_map_dir,
            "out_dir": self.out_dir
        }

    def _validate_initialization(self):
        # Logic here isnt relevant
        if self.data_dir == "" or self.data_map == "" or self.nRows == 42:
            return False
        return True
    
    def __init__(self, data_dir, data_map_dir, nRows, out_dir):
        self.data = {}
        self.data_map = {}
        self.dataset = mpd.DataFrame()
        self.timestamp = []
        self.first = True
        self.out_dir = out_dir

        self.data_dir = data_dir
        self.data_map_dir = data_map_dir
        self.nRows = nRows

    def _extract_data(self):
        print('Reading data ...')
        for each in os.listdir(self.data_dir):
            self.data[each.split('.')[0]] = mpd.read_csv(os.path.join(self.data_dir, each),
                                                         header=None,
                                                         nrows=self.nRows)
        print('Data read successfully ...')
        print('Validating times for monotonicity and uniqueness ... ')

        for each in tqdm(self.data):
            if mpd.to_datetime(self.data[each][0]).is_monotonic and mpd.to_datetime(self.data[each][0]).is_unique:
                pass
            else:
                print('Validation failed for uuid: {}'.format(each))
                return

    def _extract_data_maps(self):
        self.data_map = mpd.read_pickle(self.data_map_dir)
        print('Data-Map unpickled successfully ...')

主要逻辑结构如下图,

from functools import cached_property
import os
import threading
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from DatasetHelper import DatasetHelper
import gc
import json
import logging
from multiprocessing import Process
import asyncio

import ray
ray.init(
    # Limiting the object memory store used by ray.put()
    # object_store_memory=20000000000,

    # Limiting the memory usage of each worker.
    # _memory = (1024.0 * 3) * 0.5,

    # Specifiying custom directories for temp and object spilling
    _temp_dir=os.path.join("/project/bhavaraj/Anaheim/ray_tmp"),
    _system_config={
        "object_spilling_config": json.dumps(
            {"type": "filesystem", "params": {
                "directory_path": "/project/bhavaraj/Anaheim/ray_plasma"}},
        )
    },

    logging_level=logging.DEBUG,
    ignore_reinit_error=True,
    num_gpus=1,
    num_cpus=40,
    dashboard_port=8265
)

write_lock = threading.Lock()

def cache_dataset(loc):
    from datetime import datetime

    params = ray.get(loc.get_config.remote())
    params['out_dir'] = os.getcwd() if params['out_dir'] is None else params['out_dir']
    if os.path.exists(params['out_dir']) is False:
        os.mkdir(params['out_dir'])
    
    dataset_name = datetime.now().strftime("%H:%M:%S") + \
        "_{}_Cache.csv".format(id(params['out_dir']))
    print("Writing to file in {}".format(params['out_dir']))
    print("Acquiring Lock")
    with write_lock:
        print("Lock acquired ...")

        ray.get(loc.get_dataset.remote()).to_csv(os.path.join(params['out_dir'], dataset_name))
    
    print("Writing to file finished at {}".format(params['out_dir']))

R_DATA_DIR: str = '/data/intermediate/R/'
R_DATA_MAP: str = '/data/external/DataMap/R.pkl'

G_DATA_DIR: str = '/data/intermediate/G/'
G_DATA_MAP: str = 'data/external/DataMap/G.pkl'

B_DATA_DIR: str = '/data/intermediate/B/'
B_DATA_MAP: str = '/data/external/DataMap/B.pkl'

C_DATA_DIR: str = '/data/intermediate/C/'
C_DATA_MAP: str = '/data/external/DataMap/C.pkl'

Z_DATA_DIR: str = '/data/intermediate/Z/'
Z_DATA_MAP: str = '/data/external/DataMap/Z.pkl'

objs_refs = []

n = 50000
b = DatasetHelper.remote(B_DATA_DIR, B_DATA_MAP, n,"./CB")
r = DatasetHelper.remote(R_DATA_DIR, R_DATA_MAP, n, "./LR")
c = DatasetHelper.remote(C_DATA_DIR, C_DATA_MAP, n, "./CC")
g = DatasetHelper.remote(G_DATA_DIR, G_DATA_MAP, n, "./AG")

objs_refs.append(b.generate_dataset.remote())
objs_refs.append(r.generate_dataset.remote())
objs_refs.append(c.generate_dataset.remote())
objs_refs.append(r.generate_dataset.remote())
objs_refs.append(g.generate_dataset.remote())

generate_outs = ray.get([x for x in objs_refs])

print("Printing dataset generation results...")
for each in generate_outs:
    print(each)

# I also tried placing these methods inside the actor but the same issue persists
cache_dataset(b)
cache_dataset(r)
cache_dataset(c)
cache_dataset(g)

我尝试使用以下方法装饰该cache_dataset()方法@remote并调用该方法,

locs = [b, r, c, g]
ray.get([cache_dataset.remote(each) for each in locs])

输出

文件写入没有错误,但程序暂停执行。

2021-09-20 08:32:53,024 DEBUG node.py:890 -- Process STDOUT and STDERR is being redirected to /project/bhavaraj/Anaheim/ray_tmp/session_2021-09-20_08-32-53_008570_36561/logs.
2021-09-20 08:32:53,172 DEBUG services.py:652 -- Waiting for redis server at 127.0.0.1:6379 to respond...
2021-09-20 08:32:53,334 DEBUG services.py:652 -- Waiting for redis server at 127.0.0.1:44291 to respond...
2021-09-20 08:32:53,340 DEBUG services.py:1043 -- Starting Redis shard with 10.0 GB max memory.
2021-09-20 08:33:01,212 INFO services.py:1263 -- View the Ray dashboard at http://127.0.0.1:8265
2021-09-20 08:33:01,216 DEBUG node.py:911 -- Process STDOUT and STDERR is being redirected to /project/bhavaraj/Anaheim/ray_tmp/session_2021-09-20_08-32-53_008570_36561/logs.
2021-09-20 08:33:01,221 DEBUG services.py:1788 -- Determine to start the Plasma object store with 76.48 GB memory using /dev/shm.
2021-09-20 08:33:01,314 DEBUG services.py:652 -- Waiting for redis server at 10.2.1.35:6379 to respond...
(pid=36906) Dataset shape: (100340, 41)
(pid=36913) Dataset shape: (150692, 40)
(pid=36902) Dataset shape: (103949, 41)
(pid=36910) Dataset shape: (420269, 41)
Printing dataset generation results... # prints the results correctly
Writing to file in ./CB
Acquiring Lock
Lock acquired ...
Writing to file finished at ./CB
Writing to file in ./LR
Acquiring Lock
Lock acquired ...
2021-09-20 08:43:02,612 DEBUG (unknown file):0 -- gc.collect() freed 115 refs in 0.23721289704553783 seconds

假设

  1. 我认为光线引擎在所有任务完成执行之前就停止了。我不知道如何证明或验证这个假设。我也知道 ray.get 应该阻止执行,直到所有任务都完成执行。
  2. 某处存在“类似”的死锁情况。

参考

  1. https://docs.ray.io/en/latest/actors.html
  2. https://towardsdatascience.com/writing-your-first-distributed-python-application-with-ray-4248ebc07f41

标签: pythonpython-3.xraymodin

解决方案


对于任何未来的读者,

modin.DataFrame.to_csv()出于未知原因莫名其妙地停顿,但modin.Dataframe.to pickle()不符合相同的逻辑。

.pkl当数据存储为文件时,在读/写时间方面也有显着的性能提升。


推荐阅读