首页 > 解决方案 > 使用 keras 的自定义数据生成器功能预处理海量数据

问题描述

实际上,我正在构建一个 keras 模型,并且我有一个 msg 格式的数据集,其中包含超过 1000 万个实例和 40 个分类特征。目前我只使用它的一个样本,因为读取所有数据集并对其进行编码以适应内存是不可能的。这是我正在使用的部分代码:

import pandas as pd
from category_encoders import BinaryEncoder as be
from sklearn.preprocessing import StandardScaler


def model():
   model = Sequential()
   model.add(Dense(120, input_dim=233, kernel_initializer='uniform', activation='selu'))
   model.add(Dense(12, kernel_initializer='uniform', activation='sigmoid'))
   model.compile(SGD(lr=0.008),loss='mean_squared_error', metrics=['accuracy'])
   return model

def addrDataLoading():

   data=pd.read_msgpack('datum.msg')
   data=data.dropna(subset=['s_address','d_address'])
   data=data.sample(300000) # taking a sample of all the dataset to make the    encoding possible
   y=data[['s_address','d_address']]
   x=data.drop(['s_address','d_address'],1)

   encX = be().fit(x, y)
   numeric_X= encX.transform(x)
   encY=be().fit(y,y)
   numeric_Y=encY.transform(y)
   scaler=StandardScaler()
   X_all=scaler.fit_transform(numeric_X)
   x_train=X_all[0:250000,:]
   y_train=numeric_Y.iloc[0:250000,:]
   x_val=X_all[250000:,:]    
   y_val=numeric_Y.iloc[250000:,:]

   return x_train,y_train,x_val,y_val 



x_train,y_train,x_val,y_val=addrDataLoading()

model.fit(x_train, y_train,validation_data=(x_val,y_val),nb_epoch=20, batch_size=200)

所以我的问题是如何使用自定义数据生成器函数来读取和处理我拥有的所有数据而不仅仅是样本,然后使用 fit_generator() 函数来训练我的模型?

编辑

这是数据样本: netData

我认为从数据中提取不同的样本会导致不同的编码维度。

对于这个示例,有 16 个不同的类别:4 个地址(3 位)、4 个主机名(3 位)、1 个子网掩码(1 位)、5 个基础设施(3 位)、1 个访问区域(1 位),因此二进制编码将为我们提供11 位,数据的新维度是 11,之前是 5。所以假设address列中的另一个样本有 8 个不同的类别,这将给出 4 位二进制,我们让其他列中的类别数量相同,所以总体编码将产生 12 维。我相信是什么导致了问题。

标签: pythonkeraslarge-data

解决方案


稍微慢一点的解决方案(重复相同的动作)

编辑 - 在创建生成器之前适合 BinatyEncoder

首先删除 NA 并进一步使用干净的数据以避免重新分配数据帧。

data = pd.read_msgpack('datum.msg')
data.dropna(subset=['s_address','d_address']).to_msgpack('datum_clean.msg')

在此解决方案中,data_generator 可以多次处理相同的数据。如果它不重要,您可以使用此解决方案。

定义读取数据和拆分索引以进行训练和测试的函数。它不会消耗大量内存。

import pandas as pd
from category_encoders import BinaryEncoder as be
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import numpy as np

def model():
   #some code defining the model


def train_test_index_split():
    # if there's enough memory to add one more column
    data = pd.read_msgpack('datum_cleaned.msg')
    train_idx, test_idx = train_test_split(data.index) 
    return data, train_idx, test_idx


data, train_idx, test_idx = train_test_index_split()

定义和初始化数据生成器,用于训练和验证

def data_generator(data, encX, encY,  bathc_size, n_steps, index):
    # EDIT: As the data was cleaned, you don't need dropna
    # data = data.dropna(subset=['s_address','d_address'])
    for i in range(n_steps):
        batch_idx = np.random.choice(index, batch_size)
        sample = data.loc[batch_idx]
        y = sample[['s_address', 'd_address']]
        x = sample.drop(['s_address', 'd_address'], 1)
        numeric_X = encX.transform(x)
        numeric_Y = encY.transform(y)
        scaler = StandardScaler()
        X_all = scaler.fit_transform(numeric_X)
        yield X_all, numeric_Y

编辑部分现在训练二进制编码器。您应该对您的数据进行二次抽样,以便为编码器创建有代表性的训练集。我猜数据形状的错误是由不正确的训练BinaryEncoderError when checking input: expected dense_9_input to have shape (233,) but got array with shape (234,))引起的:

def get_minimal_unique_frame(df):
    return (pd.Series([df[column].unique() for column in df], index=df.columns)  
           .apply(pd.Series)  # tranform list on unique values to pd.Series
           .T  # transope frame: columns is columns again
           .fillna(method='ffill'))  # fill NaNs with last value

x = get_minimal_unique_frame(data.drop(['s_address', 'd_address'], 1))
y = get_minimal_unique_frame(data[['s_address', 'd_address']])

注意:我从未使用过 category_encoders 并且系统配置不兼容,因此无法安装和检查。因此,以前的代码可能会引发问题。在这种情况下,我想,你应该比较xy数据帧的长度并使其相同,并可能更改数据帧的索引。

encX = be().fit(x, y)
encY = be().fit(y, y)
batch_size = 200
train_steps = 100000  
val_steps = 5000

train_gen = data_generator(data, encX, encY, batch_size, train_steps, train_idx)
test_gen = data_generator(data, encX, encY, batch_size, test_steps, test_idx)

编辑请提供一个示例x_sample,运行train_gen并保存输出,然后发布x_samplesy_smaples

x_samples = []
y_samples = []
for i in range(10):
    x_sample, y_sample = next(train_gen)
    x_samples.append(x_sample)
    y_samples.append(y_sample)

注意:数据生成器不会自行停止。但它会被train_steps方法阻止fit_generator

使用发电机拟合模型:

model.fit_generator(generator=train_gen, steps_per_epoch=train_steps, epochs=1,
                    validation_data=test_gen, validation_steps=val_steps)

据我所知,python如果你不明确地这样做,就不会复制熊猫数据框copy()。因此,两个生成器都使用相同的对象。但是如果你使用 Jupyter Notebook,可能会发生数据泄漏/未收集的数据,并且随之而来的是内存问题。

更高效的解决方案——scketch

清理您的数据

data = pd.read_msgpack('datum.msg')
data.dropna(subset=['s_address','d_address']).to_msgpack('datum_clean.msg')

如果您有足够的磁盘空间,则创建训练/测试拆分,对其进行预处理并存储为 numpy 数组。

data, train_idx, test_idx = train_test_index_split()

def data_preprocessor(data, path, index):
    # data = data.dropna(subset=['s_address','d_address'])
    sample = data.loc[batch_idx]
    y = sample[['s_address', 'd_address']]
    x = sample.drop(['s_address', 'd_address'], 1)
    encX = be().fit(x, y)
    numeric_X = encX.transform(x)
    encY = be().fit(y, y)
    numeric_Y = encY.transform(y)
    scaler = StandardScaler()
    X_all = scaler.fit_transform(numeric_X)
    np.save(path + '_X', X_all)
    np.save(path + '_y', numeric_Y)

data_preprocessor(data, 'train', train_idx)
data_preprocessor(data, 'test', test_idx)

删除不必要的数据:

del data

加载您的文件并使用以下生成器:

train_X = np.load('train_X.npy')
train_y = np.load('train_y.npy')

test_X = np.load('test_X.npy')
test_y = np.load('test_y.npy')

def data_generator(X, y, batch_size, n_steps):
    idxs = np.arange(len(X))
    np.random.shuffle(idxs)
    ptr = 0

    for _ in range(n_steps):
        batch_idx = idxs[ptr:ptr+batch_size]
        x_sample = X[batch_idx]
        y_sample = y[batch_idx]
        ptr += batch_size
        if ptr > len(X):
            ptr = 0
        yield x_sapmple, y_sample

准备生成器:

train_gen = data_generator(train_X, train_y, batch_size, train_steps)
test_gen = data_generator(test_X, test_y, batch_size, test_steps)

最后拟合模型。希望其中一个解决方案会有所帮助。至少如果 python 确实通过数组和数据框购买参考,而不是按价值。Stackoverflow 对此的回答


推荐阅读