首页 > 解决方案 > 带有 keras.utils.Sequence 对象或 tf.data.Dataset 的输入管道?

问题描述

我目前正在使用一个tf.keras.utils.Sequence对象为 CNN 生成图像批次。我正在使用 Tensorflow 2.2 和Model.fit模型的方法。use_multiprocessing=True当我拟合模型时,当我设置时,每个时期都会引发以下警告tf.keras.model.fit(...)

WARNING:tensorflow:multiprocessing can interact badly with TensorFlow,
 causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended

正如文档和我使用Sequence基于 - 的生成器的事实所预期的那样,该模型正在优化。但是,如果use_multiprocessing要使用已弃用的功能代替tf.data对象,我希望使用最新的输入管道。我目前使用以下tf.keras.utils.Sequence基于生成器的生成器,该生成器受这篇关于大型数据集分区良好实践的文章的启发: https ://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly

class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, list_IDs, labels, data_dir, batch_size=32, dim=(128,128), n_channels=1,
             n_classes=2, shuffle=True, **augmentation_kwargs):
    'Initialization'
    self.dim = dim
    self.batch_size = batch_size
    self.labels = labels
    self.list_IDs = list_IDs
    self.data_dir = data_dir
    self.n_channels = n_channels
    self.n_classes = n_classes
    self.shuffle = shuffle
    self.on_epoch_end()
    self.augmentor = keras.preprocessing.image.ImageDataGenerator(**augmentation_kwargs)

def __len__(self):
    'Denotes the number of batches per epoch'
    return int(np.floor(len(self.list_IDs) / self.batch_size))

def __getitem__(self, index):
    'Generate one batch of data'
    # Generate indexes of the batch
    indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

    # Find list of IDs
    list_IDs_temp = [self.list_IDs[k] for k in indexes]

    # Generate data
    X, y = self.__data_generation(list_IDs_temp)

    return X, y

def on_epoch_end(self):
    'Updates indexes after each epoch'
    self.indexes = np.arange(len(self.list_IDs))
    if self.shuffle == True:
        np.random.shuffle(self.indexes)

def __data_generation(self, list_IDs_temp):
    'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
    # Initialization
    X = np.empty((self.batch_size, *self.dim))
    y = np.empty((self.batch_size), dtype=int)

    # Generate data
    for i, ID in enumerate(list_IDs_temp):

        # Store sample
        X[i,] = np.load(self.data_dir +'/{}_stars.npy'.format(ID))

        # Store class
        y[i] = self.labels[ID]

    # Reshape and apply augmentation to sample
    X,y = self.augmentor.flow(X.reshape(self.batch_size,*self.dim,1),y=y,
                              shuffle=False,batch_size=self.batch_size)[0]

    return X, y

所有类的所有数据都在data_dir目录中,并存储为单独的.npy文件。ID 来自字符串列表。类标签取自以 ID 为键的字典——如文章中所述。

Sequence我真的很喜欢发电机设置的直觉。我还可以轻松地生成随机批次来检查它的行为是否符合我的预期。但是我怎样才能重现这个设置tf.data呢?如何Sequence使用interleaveprefetch方法重现生成器的多处理批量生成tf.data.Dataset?和/或我可以简单地使用该方法摄取这个Sequence基于生成器吗?tf.data.Dataset.from_generator()

提前谢谢了。

标签: tensorflowtf.kerastensorflow2.x

解决方案


回答可能为时已晚,但我所做的对我来说效果很好;1-我的课是这样的;

class DataGen(Sequence):
    def __init__(self, df, sr=8000, seconds=3, batch_size=16, shuffle=True):
        self.files = np.array(df.filepath)
        self.label = np.array(df.label)
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.sr = sr
        self.seconds = seconds
        self.dim = self.sr*self.seconds
        self.on_epoch_end()
    
    def __len__():
        return len(self.label)//self.batch_size
    
    def __getitem__(self, x):
        indexs = self.indexs[np.arange(x, x+self.batch_size)]
        return self.__getBatch__(indexs)
        
    def __getBatch__(self, indexs):
        X, y = [], []
        for i in indexs:
            wav = self.__loadFile__(self.files[i])
            X.append(librosa.feature.mfcc(wav, self.sr).T)
            y.append(self.label[i])
        return tf.convert_to_tensor(X), to_categorical(y, num_classes=2)
        
    def __loadFile__(self, file):
        y, sr = librosa.load(file, sr=8000, mono=True)
        if len(y)>self.dim:
            return y[:self.dim]
        return np.pad(y, (0, self.dim-len(y)), 'constant', constant_values=0)
        
    def on_epoch_end(self):
        self.indexs = np.arange(len(self.label))
        if self.shuffle:
            np.random.shuffle(self.indexs)

2-比我更改为以下功能;

def gen(sr=8000, seconds=3, batch_size=16, shuffle=True):
    dim = sr*seconds
    def loadFile(file):
        wav, _ = librosa.load(file, sr=sr, mono=True)
        if len(wav)>dim:
            return wav[:dim]
        return np.pad(wav, (0, dim-len(wav)), 'constant', constant_values=0)
    
    while True:
        indexs = np.arange(len(df))
        if shuffle:
            np.random.shuffle(indexs)
        
        for x in range(len(df)//batch_size):
            X, y = [], []
            for i in indexs[np.arange(x*batch_size, (x+1)*batch_size)]:
                X.append(librosa.feature.mfcc(loadFile(df.filepath[i]), sr).T)
                y.append(df.label[i])
                
            yield tf.convert_to_tensor(X), to_categorical(y, num_classes=2)

3-并且工作正常:

dataset = tf.data.Dataset.from_generator(gen, (tf.dtypes.float32, tf.dtypes.int32))

推荐阅读