首页 > 解决方案 > python中列的并行处理

问题描述

我有一个大的 csv 文件,大约 12 GB,大约有 1200 列。我拉出每一列,并进行一些处理,虚拟分类变量,标准化数字变量并将每个处理的列连接到最终数据帧中。cols_list 是一个包含 csv 中所有列名的列表。

for column in cols_list:
    df_column = pd.read_csv('df_sample.csv', usecols = [column],delimiter = ',')
    df_column = df_column.replace(r'^\s*$', np.nan, regex=True)

    ######################################### DATA PROCESSING #####################################
    print('*'*10 + "\nWorking on column:  " + str(column))
    print(datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'))
    # Check if CAT and number of unique categories <=100.
    if (((attribute_dict[column] == 'CAT') & (df_column[column].unique().size <= 100))==True):

       # Add condition to replace $ with '' to fix unit8 error
        if (('$' in str(df_column[column].unique()))==True):
            df_column[column] = df_column[column].astype('str')
            df_column[column] = df_column[column].str.replace(',', '')
            df_column[column] = df_column[column].replace({'\$':''}, regex = True)
            df_column[column] = df_column[column].replace({'\<':''}, regex = True)
            unsigned_int_list.append(column)

        df_target_attribute = pd.get_dummies(df_column[column], dummy_na=True,prefix=column)
        # Check and remove duplicate columns if any:
        df_target_attribute = df_target_attribute.loc[:,~df_target_attribute.columns.duplicated()]
        df_target_matrix = df_target_attribute.values
        df_target_values = df_target_matrix.astype(int)
        df_target_columns = df_target_attribute.columns.values

        ################################# FIX uint8 error #####################################
        # Create the target_attribute df from values and column names arrays
        df_target_attribute = pd.DataFrame(data = df_target_values,columns = df_target_columns)
        df_target_attribute.fillna(0)

        for target_column in list(df_target_attribute.columns):
            # If variance of the dummy created is zero : append it to a list and print to log file.
            if ((np.var(df_target_attribute[[target_column]])[0] != 0)==True):
                df_final[target_column] = df_target_attribute[target_column]
            else:
                zero_var_list.append(target_column)


    elif (attribute_dict[column] == 'NUM'):
        df_target_attribute = df_column
        #Let's impute with 0 for numeric variables:
        df_target_attribute.fillna(value=0,inplace=True)

        # Check for variables having zero variance and skip those if true.
        if ((np.var(df_target_attribute[[column]])[0] != 0)==True):
            df_final[column] = df_target_attribute
        else:
            zero_var_list.append(column)


    elif (attribute_dict[column] == 'TARGET'):

        df_target_attribute = df_column
        df_target_attribute.fillna(value=0,inplace=True)

        if ((np.var(df_target_attribute[[column]])[0] != 0)==True):
            df_final[column] = df_target_attribute
        else:
            zero_var_list.append(column)

attribute_dict 是一个字典,其中包含变量名称作为键,其数据类型作为值。这用于确定要对列进行的处理类型。记录数约为 300 万,列数为 1200。由于数据的大小,此过程大约需要 22 小时才能完成并给出最终的数据帧 - df_final。

我想知道,是否可以通过以下方案使用多处理来加快速度: 1. 一次处理所有数字列。2. 对于 CAT,将 40 列的块放入一个进程中,因此对于 1200 列,将有 30 个进程并行运行。3、对于Target列,由于是单列,也可以单独处理。因此总共将有 32 个进程并行运行。1. 1 代表所有 NUM 列。2. 30 表示 1 个进程中的 40 个 CAT 变量块。3. 1 为目标列。

我从未在 python 中做过这种并行处理,但想学习如何去做,如果在这种情况下会有帮助的话。

我的假设可能是完全错误的,即并行处理将有助于减少运行时间,在这种情况下,请原谅我的幼稚问题。

我可以提供一些样本数据来复制,但我不确定我是否在这个正确的轨道上。

有人可以帮助我吗?

标签: pythonpandasparallel-processingmultiprocessing

解决方案


推荐阅读