首页 > 解决方案 > TensorFlow-Keras 生成器:关闭自动分片或将 auto_shard_policiy 切换为 DATA

问题描述

在训练我的模型时,我遇到了Tensorflow - Keras 帖子中描述的问题:考虑关闭自动分片或将 auto_shard_policy 切换为 DATA 以分片此数据集。我现在的问题是:@Graham501617 提到的解决方案是否也适用于生成器?这是我到目前为止使用的一些虚拟代码:

class BatchGenerator(Sequence):

    def __init__(self, some_args):
        ...

    def __len__(self):
        num_batches_in_sequence = ...

    def __getitem__(self, _):
        data, labels = get_one_batch(self.some_args)
        return data, labels

在主脚本中,我执行以下操作:

train_generator = BatchGenerator(some_args)
valid_generator = BatchGenerator(some_args)

cross_device_ops = tf.distribute.HierarchicalCopyAllReduce(num_packs=2)
strategy = tf.distribute.MirroredStrategy(cross_device_ops=cross_device_ops)
with strategy.scope():
    model = some_model

model.compile(some_args)

history = model.fit(
    x=train_generator,
    validation_data=valid_generator,
    ...
)

我可能不得不以__getitem__某种方式修改函数,是吗?

我感谢您的支持!

标签: tensorflowkerasgeneratorshardingpolicy

解决方案


你必须将你的生成器包装成一个函数......

下面的示例假设您的数据存储为.npynumpy数组0_x.npy1_x.npy2_x.npyfloat64


from pathlib import Path
import tensorflow as tf
import numpy as np

# Your new generator as a function rather than an object you need to instantiate
def getNextBatch(stop, data_dir):
    i = 0
    data_dir = data_dir.decode('ascii')
    while True:
        while i < stop:
            x = np.load(str(Path(data_dir + "/" + str(i) + "_x.npy")))
            y = np.load(str(Path(data_dir + "/" + str(i) + "_y.npy")))
            yield x, y
            i += 1
        i = 0

# Make a dataset given the directory and strategy
def makeDataset(generator_func, dir, strategy=None):

     # Get amount of files
     data_size = int(len([name for name in os.listdir(dir) if os.path.isfile(os.path.join(dir, name))])/2)
    
     ds = tf.data.Dataset.from_generator(generator_func, args=[data_size, dir], output_types=(tf.float64, tf.float64)) # Make a dataset from the generator. MAKE SURE TO SPECIFY THE DATA TYPE!!!
    
     options = tf.data.Options()
     options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
     ds = ds.with_options(options)
    
     # Optional: Make it a distributed dataset if you're using a strategy
     if strategy is not None:
          ds = strategy.experimental_distribute_dataset(ds)

     return ds



training_ds = makeDataset(getNextBatch, str(Path(data_dir + "/training")), None)
validation_ds = makeDataset(getNextBatch, str(Path(data_dir + "/validation")), None)

model.fit(training_ds,
          epochs=epochs,
          callbacks=callbacks,
          validation_data=validation_ds)

您可能需要在调用中传递每个 epoch 的步数fit(),在这种情况下,您可以使用已经制作的生成器。


推荐阅读