python - 自定义估算器上的 Tensorboard 找到自定义标量,但不会显示
问题描述
我希望将 Tensorboard 用于https://github.com/BNN-UPC/GNNetworkingChallenge,以便深入了解 Graph Neural Network RouteNet 中循环的收敛。根据我的阅读,tf.estimators 应该无需修改就可以与 Tensorboard 一起使用(也许对日志挂钩做了一些事情)。但是对于 Routenet,虽然 Tensorboard 似乎为自定义标量选项卡找到了一些东西,但它不会显示任何东西。RouteNet 是一个不寻常的架构,但它的模型文件中确实包含 summary.scalars 和一个日志挂钩。有没有人暗示为什么 Tensorboard 没有显示时间序列或找到非自定义标量?
主文件
import configparser
import pandas as pd
import numpy as np
import tempfile
import os
from read_dataset import input_fn
from routenet_model import model_fn
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO)
#Ana, for tensorboard
#import tensorflow as tf
#import datetime
#This is not a vanilla tensorboard because of the Estimator class being used to ineract with the model_fn
#on Estimator:
#You can run Estimator-based models on a local host or on a distributed
# multi-server environment without changing your model. Furthermore, you can
# run Estimator-based models on CPUs, GPUs, or TPUs without recoding your model.
def train_and_evaluate(train_dir, eval_dir, config, model_dir=None):
"""Trains and evaluates the model.
Args:
train_dir (string): Path of the training directory.
eval_dir (string): Path of the evaluation directory.
config (configparser): Config file containing the diferent configurations
and hyperparameters.
model_dir (string): Directory where all outputs (checkpoints, event files, etc.) are written.
If model_dir is not set, a temporary directory is used.
"""
my_checkpoint_config = tf.estimator.RunConfig(
save_checkpoints_secs=int(config['RUN_CONFIG']['save_checkpoints_secs']),
keep_checkpoint_max=int(config['RUN_CONFIG']['keep_checkpoint_max'])
)
estimator = tf.estimator.Estimator(
model_fn=model_fn,
model_dir=model_dir,
config=my_checkpoint_config,
params=config
)
train_spec = tf.estimator.TrainSpec(
input_fn=lambda: input_fn(train_dir, repeat=True, shuffle=True),
max_steps=int(config['RUN_CONFIG']['train_steps'])
)
eval_spec = tf.estimator.EvalSpec(
input_fn=lambda: input_fn(eval_dir, repeat=False, shuffle=False),
throttle_secs=int(config['RUN_CONFIG']['throttle_secs'])
)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
def predict(test_dir, model_dir, config):
"""Generate the predictions given a model.
Args:
test_dir (string): Path of the test directory.
model_dir (string): Directory with the trained model.
config (configparser): Config file containing the diferent configurations
and hyperparameters.
Returns:
list: A list with the predicted values.
"""
estimator = tf.estimator.Estimator(
model_fn=model_fn,
model_dir=model_dir,
params=config
)
pred_results = estimator.predict(input_fn=lambda: input_fn(test_dir, repeat=False, shuffle=False))
return [pred['predictions'] for pred in pred_results]
def predict_and_save(test_dir, model_dir, save_dir, filename, config):
"""Generates and saves a Pandas Dataframe in CSV format with the real and the predicted delay.
It also computes the MAPE (Mean Absolute Percentage Error) of all the samples in the dataset
and computes its mean.
Args:
test_dir (string): Path of the test directory.
model_dir (string): Directory with the trained model.
save_dir (string): Directory where the generated dataframe will be saved (in csv).
filename (string): The filename of the dataframe.
config (configparser): Config file containing the diferent configurations
and hyperparameters.
Returns:
float: The Mean Absolute Percentage Error.
"""
if not os.path.exists(save_dir):
os.makedirs(save_dir)
tmp_dir = tempfile.mkdtemp()
ds = input_fn(test_dir, repeat=False, shuffle=False)
dataframes_to_concat = []
it = 0
df_files = []
delays = np.array([])
for predictors, target in ds:
it += 1
delays = np.append(delays, target)
if it % 1000 == 0:
aux_df = pd.DataFrame({
"Delay": delays
})
dataframes_to_concat.append(aux_df)
delays = np.array([])
if it % 3000 == 0:
df = pd.concat(dataframes_to_concat)
file = os.path.join(tmp_dir, "tmp_df_" + str(it) + ".parquet")
df.to_parquet(file)
df_files.append(file)
dataframes_to_concat = []
if it % 3000 != 0:
if it % 1000 != 0:
aux_df = pd.DataFrame({
"Delay": delays
})
dataframes_to_concat.append(aux_df)
df = pd.concat(dataframes_to_concat)
file = os.path.join(tmp_dir, "tmp_df_" + str(it) + ".parquet")
df.to_parquet(file)
df_files.append(file)
df_list = []
for file in df_files:
df_list.append(pd.read_parquet(os.path.join(file)))
df = pd.concat(df_list)
file = os.path.join(save_dir, filename)
df.to_csv(file)
predictions = predict(test_dir, model_dir, config)
df["Predicted_Delay"] = predictions
df['Absolute_Error'] = np.abs(df["Delay"] - df["Predicted_Delay"])
df['Absolute_Percentage_Error'] = (df['Absolute_Error'] / np.abs(df["Delay"]))*100
return df['Absolute_Percentage_Error'].mean()
if __name__ == '__main__':
config = configparser.ConfigParser()
config._interpolation = configparser.ExtendedInterpolation()
config.read('../code/config.ini')
train_and_evaluate(config['DIRECTORIES']['train'],
config['DIRECTORIES']['test'],
config._sections,
model_dir=config['DIRECTORIES']['logs'])
mre = predict_and_save(config['DIRECTORIES']['test'],
config['DIRECTORIES']['logs'],
'../dataframes/',
'predictions.csv',
config._sections)
路由网络模型.py
from __future__ import print_function
import tensorflow as tf
#Ana, for tensorboard
import tensorflow as tf
import datetime
class RouteNetModel(tf.keras.Model):
""" Init method for the custom model.
Args:
config (dict): Python dictionary containing the diferent configurations
and hyperparameters.
output_units (int): Output units for the last readout's layer.
Attributes:
config (dict): Python dictionary containing the diferent configurations
and hyperparameters.
link_update (GRUCell): Link GRU Cell used in the Message Passing step.
path_update (GRUCell): Path GRU Cell used in the Message Passing step.
readout (Keras Model): Readout Neural Network. It expects as input the
path states and outputs the per-path delay.
"""
def __init__(self, config, output_units=1):
super(RouteNetModel, self).__init__()
# Configuration dictionary. It contains the needed Hyperparameters for the model.
# All the Hyperparameters can be found in the config.ini file
self.config = config
# GRU Cells used in the Message Passing step
self.link_update = tf.keras.layers.GRUCell(int(self.config['HYPERPARAMETERS']['link_state_dim']))
self.path_update = tf.keras.layers.GRUCell(int(self.config['HYPERPARAMETERS']['path_state_dim']))
# Readout Neural Network. It expects as input the path states and outputs the per-path delay
self.readout = tf.keras.Sequential([
tf.keras.layers.Input(shape=int(self.config['HYPERPARAMETERS']['path_state_dim'])),
tf.keras.layers.Dense(int(self.config['HYPERPARAMETERS']['readout_units']),
activation=tf.nn.selu,
kernel_regularizer=tf.keras.regularizers.l2(
float(self.config['HYPERPARAMETERS']['l2'])),
),
tf.keras.layers.Dense(int(self.config['HYPERPARAMETERS']['readout_units']),
activation=tf.nn.relu,
kernel_regularizer=tf.keras.regularizers.l2(
float(self.config['HYPERPARAMETERS']['l2']))),
tf.keras.layers.Dense(output_units,
kernel_regularizer=tf.keras.regularizers.l2(
float(self.config['HYPERPARAMETERS']['l2_2'])))
])
def call(self, inputs, training=False):
"""This function is execution each time the model is called
Args:
inputs (dict): Features used to make the predictions.
training (bool): Whether the model is training or not. If False, the
model does not update the weights.
Returns:
tensor: A tensor containing the per-path delay.
"""
f_ = inputs
links = f_['links']
paths = f_['paths']
seqs = f_['sequences']
# Compute the shape for the all-zero tensor for link_state
shape = tf.stack([
f_['n_links'],
int(self.config['HYPERPARAMETERS']['link_state_dim']) - 1
], axis=0)
# Initialize the initial hidden state for links
link_state = tf.concat([
tf.expand_dims(f_['link_capacity'], axis=1),
tf.zeros(shape)
], axis=1)
# Compute the shape for the all-zero tensor for path_state
shape = tf.stack([
f_['n_paths'],
int(self.config['HYPERPARAMETERS']['path_state_dim']) - 1
], axis=0)
# Initialize the initial hidden state for paths
path_state = tf.concat([
tf.expand_dims(f_['bandwith'], axis=1),
tf.zeros(shape)
], axis=1)
# Iterate t times doing the message passing
for _ in range(int(self.config['HYPERPARAMETERS']['t'])):
# The following lines generate a tensor of dimensions [n_paths, max_len_path, dimension_link] with all 0
# but the link hidden states
h_tild = tf.gather(link_state, links)
ids = tf.stack([paths, seqs], axis=1)
max_len = tf.reduce_max(seqs) + 1
shape = tf.stack([
f_['n_paths'],
max_len,
int(self.config['HYPERPARAMETERS']['link_state_dim'])])
lens = tf.math.segment_sum(data=tf.ones_like(paths),
segment_ids=paths)
# Generate the aforementioned tensor [n_paths, max_len_path, dimension_link]
link_inputs = tf.scatter_nd(ids, h_tild, shape)
# Define the RNN used for the message passing links to paths
gru_rnn = tf.keras.layers.RNN(self.path_update,
return_sequences=True,
return_state=True)
# First message passing: update the path_state
outputs, path_state = gru_rnn(inputs=link_inputs,
initial_state=path_state,
mask=tf.sequence_mask(lens))
# For every link, gather and sum the sequence of hidden states of the paths that contain it
m = tf.gather_nd(outputs, ids)
m = tf.math.unsorted_segment_sum(m, links, f_['n_links'])
# Second message passing: update the link_state
link_state, _ = self.link_update(m, [link_state])
# Call the readout ANN and return its predictions
r = self.readout(path_state, training=training)
return r
def r_squared(labels, predictions):
"""Computes the R^2 score.
Args:
labels (tf.Tensor): True values
labels (tf.Tensor): This is the second item returned from the input_fn passed to train, evaluate, and predict.
If mode is tf.estimator.ModeKeys.PREDICT, labels=None will be passed.
Returns:
tf.Tensor: Mean R^2
"""
total_error = tf.reduce_sum(tf.square(labels - tf.reduce_mean(labels)))
unexplained_error = tf.reduce_sum(tf.square(labels - predictions))
r_sq = 1.0 - tf.truediv(unexplained_error, total_error)
# Needed for tf2 compatibility.
m_r_sq, update_rsq_op = tf.compat.v1.metrics.mean(r_sq)
return m_r_sq, update_rsq_op
def model_fn(features, labels, mode, params):
"""model_fn used by the estimator, which, given inputs and a number of other parameters,
returns the ops necessary to perform training, evaluation, or predictions.
Args:
features (dict): This is the first item returned from the input_fn passed to train, evaluate, and predict.
labels (tf.Tensor): This is the second item returned from the input_fn passed to train, evaluate, and predict.
If mode is tf.estimator.ModeKeys.PREDICT, labels=None will be passed.
mode (tf.estimator.ModeKeys): Specifies if this is training, evaluation or prediction.
params (dict): Dict of hyperparameters. Will receive what is passed to Estimator in params parameter.
Returns:
tf.estimator.EstimatorSpec: Ops and objects returned from a model_fn and passed to an Estimator.
"""
# Create the model.
model = RouteNetModel(params)
# Execute the call function and obtain the predictions.
predictions = model(features, training=(mode == tf.estimator.ModeKeys.TRAIN))
predictions = tf.squeeze(predictions)
# If we are performing predictions.
if mode == tf.estimator.ModeKeys.PREDICT:
# Return the predicted values.
return tf.estimator.EstimatorSpec(
mode, predictions={
'predictions': predictions
})
# Define the loss function.
loss_function = tf.keras.losses.MeanSquaredError()
# Obtain the regularization loss of the model.
regularization_loss = sum(model.losses)
# Compute the loss defined previously.
loss = loss_function(labels, predictions)
# Compute the total loss.
total_loss = loss + regularization_loss
tf.summary.scalar('loss', loss)
tf.summary.scalar('regularization_loss', regularization_loss)
tf.summary.scalar('total_loss', total_loss)
# If we are performing evaluation.
if mode == tf.estimator.ModeKeys.EVAL:
# Define the different evaluation metrics
label_mean = tf.keras.metrics.Mean()
_ = label_mean.update_state(labels)
prediction_mean = tf.keras.metrics.Mean()
_ = prediction_mean.update_state(predictions)
mae = tf.keras.metrics.MeanAbsoluteError()
_ = mae.update_state(labels, predictions)
mre = tf.keras.metrics.MeanRelativeError(normalizer=tf.abs(labels))
_ = mre.update_state(labels, predictions)
return tf.estimator.EstimatorSpec(
mode, loss=loss,
eval_metric_ops={
'label/mean': label_mean,
'prediction/mean': prediction_mean,
'mae': mae,
'mre': mre,
'r-squared': r_squared(labels, predictions)
}
)
# If we are performing training.
assert mode == tf.estimator.ModeKeys.TRAIN
# Compute the gradients.
grads = tf.gradients(total_loss, model.trainable_variables)
summaries = [tf.summary.histogram(var.op.name, var) for var in model.trainable_variables]
summaries += [tf.summary.histogram(g.op.name, g) for g in grads if g is not None]
# Define an exponential decay schedule.
decayed_lr = tf.keras.optimizers.schedules.ExponentialDecay(float(params['HYPERPARAMETERS']['learning_rate']),
int(params['HYPERPARAMETERS']['decay_steps']),
float(params['HYPERPARAMETERS']['decay_rate']),
staircase=True)
# Define an Adam optimizer using the defined exponential decay.
optimizer = tf.keras.optimizers.Adam(learning_rate=decayed_lr)
# Manually assign tf.compat.v1.global_step variable to optimizer.iterations
# to make tf.compat.v1.train.global_step increased correctly.
optimizer.iterations = tf.compat.v1.train.get_or_create_global_step()
# Apply the processed gradients using the optimizer.
train_op = optimizer.apply_gradients(zip(grads, model.trainable_variables))
# Define the logging hook. It returns the loss, the regularization loss and the
# total loss every 10 iterations.
logging_hook = tf.estimator.LoggingTensorHook(
{"Loss": loss,
"Regularization loss": regularization_loss,
"Total loss": total_loss}, every_n_iter=10)
return tf.estimator.EstimatorSpec(mode,
loss=loss,
train_op=train_op,
training_hooks=[logging_hook]
)
到目前为止,我已经查看: 如何将 tensorboard 与 tf.estimator.Estimator 一起使用,https://towardsdatascience.com/an-advanced-example-of-tensorflow-estimators-part-1-3-c9ffba3bff03,如何添加 Tensorboard到 Tensorflow 估计器过程,如何将 tensorboard 可视化集成到 tf.Estimator?,如何使用 estimator API 在 tensorboard 上添加更多细节,如何使用 estimator API 在 tensorboard 上添加更多细节, https://developers.googleblog.com/2017/12/creating-custom-estimators-in-tensorflow.html
解决方案
推荐阅读
- python - 无法在 spark 独立集群上完成 spark 作业
- macos - 如何检测 Mac 应用中的 VoiceOver 状态变化
- java - AWT 图形未正确翻译
- nginx - 如何使用特定端口从外部访问 nginx 服务器
- regex - 使用正则表达式从 DF 中提取完全匹配
- curl - 需要有关 MarkLogic Curl 命令的解决方案
- spring - Spring/Hibernate/JPA:防止将孩子重新分配给另一个父母
- c++ - 如何在c ++中按降序对基于第二个元素的对列表进行排序
- java - 我的代码中的错误问题:loadLibrary 找不到符号
- javascript - 表单提交角帖子不起作用