首页 > 解决方案 > 自定义估算器上的 Tensorboard 找到自定义标量,但不会显示

问题描述

我希望将 Tensorboard 用于https://github.com/BNN-UPC/GNNetworkingChallenge,以便深入了解 Graph Neural Network RouteNet 中循环的收敛。根据我的阅读,tf.estimators 应该无需修改就可以与 Tensorboard 一起使用(也许对日志挂钩做了一些事情)。但是对于 Routenet,虽然 Tensorboard 似乎为自定义标量选项卡找到了一些东西,但它不会显示任何东西。RouteNet 是一个不寻常的架构,但它的模型文件中确实包含 summary.scalars 和一个日志挂钩。有没有人暗示为什么 Tensorboard 没有显示时间序列或找到非自定义标量?

主文件

import configparser
import pandas as pd
import numpy as np
import tempfile
import os
from read_dataset import input_fn
from routenet_model import model_fn

tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO)


#Ana, for tensorboard
#import tensorflow as tf
#import datetime
#This is not a vanilla tensorboard because of the Estimator class being used to ineract with the model_fn
#on Estimator:
#You can run Estimator-based models on a local host or on a distributed 
# multi-server environment without changing your model. Furthermore, you can 
# run Estimator-based models on CPUs, GPUs, or TPUs without recoding your model.


def train_and_evaluate(train_dir, eval_dir, config, model_dir=None):
    """Trains and evaluates the model.

    Args:
        train_dir (string): Path of the training directory.
        eval_dir (string): Path of the evaluation directory.
        config (configparser): Config file containing the diferent configurations
                               and hyperparameters.
        model_dir (string): Directory where all outputs (checkpoints, event files, etc.) are written.
                            If model_dir is not set, a temporary directory is used.
    """

    my_checkpoint_config = tf.estimator.RunConfig(
        save_checkpoints_secs=int(config['RUN_CONFIG']['save_checkpoints_secs']),
        keep_checkpoint_max=int(config['RUN_CONFIG']['keep_checkpoint_max'])
    )

    estimator = tf.estimator.Estimator(
        model_fn=model_fn,
        model_dir=model_dir,
        config=my_checkpoint_config,
        params=config
    )

    train_spec = tf.estimator.TrainSpec(
        input_fn=lambda: input_fn(train_dir, repeat=True, shuffle=True),
        max_steps=int(config['RUN_CONFIG']['train_steps'])
    )

    eval_spec = tf.estimator.EvalSpec(
        input_fn=lambda: input_fn(eval_dir, repeat=False, shuffle=False),
        throttle_secs=int(config['RUN_CONFIG']['throttle_secs'])
    )

    tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)


def predict(test_dir, model_dir, config):
    """Generate the predictions given a model.

    Args:
        test_dir (string): Path of the test directory.
        model_dir (string): Directory with the trained model.
        config (configparser): Config file containing the diferent configurations
                               and hyperparameters.

    Returns:
        list: A list with the predicted values.
    """

    estimator = tf.estimator.Estimator(
        model_fn=model_fn,
        model_dir=model_dir,
        params=config
    )

    pred_results = estimator.predict(input_fn=lambda: input_fn(test_dir, repeat=False, shuffle=False))

    return [pred['predictions'] for pred in pred_results]


def predict_and_save(test_dir, model_dir, save_dir, filename, config):
    """Generates and saves a Pandas Dataframe in CSV format with the real and the predicted delay.
    It also computes the MAPE (Mean Absolute Percentage Error) of all the samples in the dataset
    and computes its mean.

    Args:
        test_dir (string): Path of the test directory.
        model_dir (string): Directory with the trained model.
        save_dir (string): Directory where the generated dataframe will be saved (in csv).
        filename (string): The filename of the dataframe.
        config (configparser): Config file containing the diferent configurations
                               and hyperparameters.

    Returns:
        float: The Mean Absolute Percentage Error.
    """

    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    tmp_dir = tempfile.mkdtemp()

    ds = input_fn(test_dir, repeat=False, shuffle=False)

    dataframes_to_concat = []

    it = 0

    df_files = []
    delays = np.array([])
    for predictors, target in ds:

        it += 1
        delays = np.append(delays, target)

        if it % 1000 == 0:

            aux_df = pd.DataFrame({
                "Delay": delays
            })

            dataframes_to_concat.append(aux_df)
            delays = np.array([])

            if it % 3000 == 0:
                df = pd.concat(dataframes_to_concat)
                file = os.path.join(tmp_dir, "tmp_df_" + str(it) + ".parquet")
                df.to_parquet(file)
                df_files.append(file)
                dataframes_to_concat = []


    if it % 3000 != 0:
        if it % 1000 != 0:

            aux_df = pd.DataFrame({
                "Delay": delays
            })

            dataframes_to_concat.append(aux_df)

        df = pd.concat(dataframes_to_concat)
        file = os.path.join(tmp_dir, "tmp_df_" + str(it) + ".parquet")
        df.to_parquet(file)
        df_files.append(file)

    df_list = []

    for file in df_files:
        df_list.append(pd.read_parquet(os.path.join(file)))

    df = pd.concat(df_list)

    file = os.path.join(save_dir, filename)
    df.to_csv(file)

    predictions = predict(test_dir, model_dir, config)

    df["Predicted_Delay"] = predictions
    df['Absolute_Error'] = np.abs(df["Delay"] - df["Predicted_Delay"])
    df['Absolute_Percentage_Error'] = (df['Absolute_Error'] / np.abs(df["Delay"]))*100

    return df['Absolute_Percentage_Error'].mean()


if __name__ == '__main__':
    config = configparser.ConfigParser()
    config._interpolation = configparser.ExtendedInterpolation()
    config.read('../code/config.ini')

    train_and_evaluate(config['DIRECTORIES']['train'],
                       config['DIRECTORIES']['test'],
                       config._sections,
                       model_dir=config['DIRECTORIES']['logs'])

    mre = predict_and_save(config['DIRECTORIES']['test'],
                           config['DIRECTORIES']['logs'],
                           '../dataframes/',
                           'predictions.csv',
                           config._sections)

路由网络模型.py

from __future__ import print_function
import tensorflow as tf


#Ana, for tensorboard
import tensorflow as tf
import datetime

class RouteNetModel(tf.keras.Model):
    """ Init method for the custom model.

    Args:
        config (dict): Python dictionary containing the diferent configurations
                       and hyperparameters.
        output_units (int): Output units for the last readout's layer.

    Attributes:
        config (dict): Python dictionary containing the diferent configurations
                       and hyperparameters.
        link_update (GRUCell): Link GRU Cell used in the Message Passing step.
        path_update (GRUCell): Path GRU Cell used in the Message Passing step.
        readout (Keras Model): Readout Neural Network. It expects as input the
                               path states and outputs the per-path delay.
    """

    def __init__(self, config, output_units=1):
        super(RouteNetModel, self).__init__()

        # Configuration dictionary. It contains the needed Hyperparameters for the model.
        # All the Hyperparameters can be found in the config.ini file
        self.config = config

        # GRU Cells used in the Message Passing step
        self.link_update = tf.keras.layers.GRUCell(int(self.config['HYPERPARAMETERS']['link_state_dim']))
        self.path_update = tf.keras.layers.GRUCell(int(self.config['HYPERPARAMETERS']['path_state_dim']))

        # Readout Neural Network. It expects as input the path states and outputs the per-path delay
        self.readout = tf.keras.Sequential([
            tf.keras.layers.Input(shape=int(self.config['HYPERPARAMETERS']['path_state_dim'])),
            tf.keras.layers.Dense(int(self.config['HYPERPARAMETERS']['readout_units']),
                                  activation=tf.nn.selu,
                                  kernel_regularizer=tf.keras.regularizers.l2(
                                      float(self.config['HYPERPARAMETERS']['l2'])),
                                  ),
            tf.keras.layers.Dense(int(self.config['HYPERPARAMETERS']['readout_units']),
                                  activation=tf.nn.relu,
                                  kernel_regularizer=tf.keras.regularizers.l2(
                                      float(self.config['HYPERPARAMETERS']['l2']))),
            tf.keras.layers.Dense(output_units,
                                  kernel_regularizer=tf.keras.regularizers.l2(
                                      float(self.config['HYPERPARAMETERS']['l2_2'])))
        ])

    def call(self, inputs, training=False):
        """This function is execution each time the model is called

        Args:
            inputs (dict): Features used to make the predictions.
            training (bool): Whether the model is training or not. If False, the
                             model does not update the weights.

        Returns:
            tensor: A tensor containing the per-path delay.
        """

        f_ = inputs

        links = f_['links']
        paths = f_['paths']
        seqs = f_['sequences']

        # Compute the shape for the  all-zero tensor for link_state
        shape = tf.stack([
            f_['n_links'],
            int(self.config['HYPERPARAMETERS']['link_state_dim']) - 1
        ], axis=0)

        # Initialize the initial hidden state for links
        link_state = tf.concat([
            tf.expand_dims(f_['link_capacity'], axis=1),
            tf.zeros(shape)
        ], axis=1)

        # Compute the shape for the  all-zero tensor for path_state
        shape = tf.stack([
            f_['n_paths'],
            int(self.config['HYPERPARAMETERS']['path_state_dim']) - 1
        ], axis=0)

        # Initialize the initial hidden state for paths
        path_state = tf.concat([
            tf.expand_dims(f_['bandwith'], axis=1),
            tf.zeros(shape)
        ], axis=1)

        # Iterate t times doing the message passing
        for _ in range(int(self.config['HYPERPARAMETERS']['t'])):

            # The following lines generate a tensor of dimensions [n_paths, max_len_path, dimension_link] with all 0
            # but the link hidden states
            h_tild = tf.gather(link_state, links)

            ids = tf.stack([paths, seqs], axis=1)
            max_len = tf.reduce_max(seqs) + 1
            shape = tf.stack([
                f_['n_paths'],
                max_len,
                int(self.config['HYPERPARAMETERS']['link_state_dim'])])

            lens = tf.math.segment_sum(data=tf.ones_like(paths),
                                       segment_ids=paths)

            # Generate the aforementioned tensor [n_paths, max_len_path, dimension_link]
            link_inputs = tf.scatter_nd(ids, h_tild, shape)

            # Define the RNN used for the message passing links to paths
            gru_rnn = tf.keras.layers.RNN(self.path_update,
                                          return_sequences=True,
                                          return_state=True)

            # First message passing: update the path_state
            outputs, path_state = gru_rnn(inputs=link_inputs,
                                          initial_state=path_state,
                                          mask=tf.sequence_mask(lens))

            # For every link, gather and sum the sequence of hidden states of the paths that contain it
            m = tf.gather_nd(outputs, ids)
            m = tf.math.unsorted_segment_sum(m, links, f_['n_links'])

            # Second message passing: update the link_state
            link_state, _ = self.link_update(m, [link_state])

        # Call the readout ANN and return its predictions
        r = self.readout(path_state, training=training)

        return r


def r_squared(labels, predictions):
    """Computes the R^2 score.

        Args:
            labels (tf.Tensor): True values
            labels (tf.Tensor): This is the second item returned from the input_fn passed to train, evaluate, and predict.
                                If mode is tf.estimator.ModeKeys.PREDICT, labels=None will be passed.

        Returns:
            tf.Tensor: Mean R^2
        """

    total_error = tf.reduce_sum(tf.square(labels - tf.reduce_mean(labels)))
    unexplained_error = tf.reduce_sum(tf.square(labels - predictions))
    r_sq = 1.0 - tf.truediv(unexplained_error, total_error)

    # Needed for tf2 compatibility.
    m_r_sq, update_rsq_op = tf.compat.v1.metrics.mean(r_sq)

    return m_r_sq, update_rsq_op


def model_fn(features, labels, mode, params):
    """model_fn used by the estimator, which, given inputs and a number of other parameters,
       returns the ops necessary to perform training, evaluation, or predictions.

    Args:
        features (dict): This is the first item returned from the input_fn passed to train, evaluate, and predict.
        labels (tf.Tensor): This is the second item returned from the input_fn passed to train, evaluate, and predict.
                            If mode is tf.estimator.ModeKeys.PREDICT, labels=None will be passed.
        mode (tf.estimator.ModeKeys): Specifies if this is training, evaluation or prediction.
        params (dict): Dict of hyperparameters. Will receive what is passed to Estimator in params parameter.

    Returns:
        tf.estimator.EstimatorSpec: Ops and objects returned from a model_fn and passed to an Estimator.
    """

    # Create the model.
    model = RouteNetModel(params)

    # Execute the call function and obtain the predictions.
    predictions = model(features, training=(mode == tf.estimator.ModeKeys.TRAIN))

    predictions = tf.squeeze(predictions)

    # If we are performing predictions.
    if mode == tf.estimator.ModeKeys.PREDICT:
        # Return the predicted values.
        return tf.estimator.EstimatorSpec(
            mode, predictions={
                'predictions': predictions
            })

    # Define the loss function.
    loss_function = tf.keras.losses.MeanSquaredError()

    # Obtain the regularization loss of the model.
    regularization_loss = sum(model.losses)

    # Compute the loss defined previously.
    loss = loss_function(labels, predictions)

    # Compute the total loss.
    total_loss = loss + regularization_loss

    tf.summary.scalar('loss', loss)
    tf.summary.scalar('regularization_loss', regularization_loss)
    tf.summary.scalar('total_loss', total_loss)

    # If we are performing evaluation.
    if mode == tf.estimator.ModeKeys.EVAL:
        # Define the different evaluation metrics
        label_mean = tf.keras.metrics.Mean()
        _ = label_mean.update_state(labels)
        prediction_mean = tf.keras.metrics.Mean()
        _ = prediction_mean.update_state(predictions)
        mae = tf.keras.metrics.MeanAbsoluteError()
        _ = mae.update_state(labels, predictions)
        mre = tf.keras.metrics.MeanRelativeError(normalizer=tf.abs(labels))
        _ = mre.update_state(labels, predictions)

        return tf.estimator.EstimatorSpec(
            mode, loss=loss,
            eval_metric_ops={
                'label/mean': label_mean,
                'prediction/mean': prediction_mean,
                'mae': mae,
                'mre': mre,
                'r-squared': r_squared(labels, predictions)
            }
        )

    # If we are performing training.
    assert mode == tf.estimator.ModeKeys.TRAIN

    # Compute the gradients.
    grads = tf.gradients(total_loss, model.trainable_variables)

    summaries = [tf.summary.histogram(var.op.name, var) for var in model.trainable_variables]
    summaries += [tf.summary.histogram(g.op.name, g) for g in grads if g is not None]

    # Define an exponential decay schedule.
    decayed_lr = tf.keras.optimizers.schedules.ExponentialDecay(float(params['HYPERPARAMETERS']['learning_rate']),
                                                                int(params['HYPERPARAMETERS']['decay_steps']),
                                                                float(params['HYPERPARAMETERS']['decay_rate']),
                                                                staircase=True)

    # Define an Adam optimizer using the defined exponential decay.
    optimizer = tf.keras.optimizers.Adam(learning_rate=decayed_lr)

    # Manually assign tf.compat.v1.global_step variable to optimizer.iterations
    # to make tf.compat.v1.train.global_step increased correctly.
    optimizer.iterations = tf.compat.v1.train.get_or_create_global_step()

    # Apply the processed gradients using the optimizer.
    train_op = optimizer.apply_gradients(zip(grads, model.trainable_variables))

    # Define the logging hook. It returns the loss, the regularization loss and the
    # total loss every 10 iterations.
    logging_hook = tf.estimator.LoggingTensorHook(
        {"Loss": loss,
         "Regularization loss": regularization_loss,
         "Total loss": total_loss}, every_n_iter=10)

    return tf.estimator.EstimatorSpec(mode,
                                      loss=loss,
                                      train_op=train_op,
                                      training_hooks=[logging_hook]
                                      )

到目前为止,我已经查看: 如何将 tensorboard 与 tf.estimator.Estimator 一起使用,https://towardsdatascience.com/an-advanced-example-of-tensorflow-estimators-part-1-3-c9ffba3bff03,如何添加 Tensorboard到 Tensorflow 估计器过程如何将 tensorboard 可视化集成到 tf.Estimator?,如何使用 estimator API 在 tensorboard 上添加更多细节,如何使用 estimator API 在 tensorboard 上添加更多细节, https://developers.googleblog.com/2017/12/creating-custom-estimators-in-tensorflow.html

标签: pythontensorflowmachine-learningtensorboardtensorflow-estimator

解决方案


推荐阅读