tensorflow - 分布式张量流:首席工作者的工作是什么?
问题描述
我正在使用分布式张量流示例的一个版本https://www.tensorflow.org/deploy/distributed 这是我在“mnist_trainer.py”中的代码。
import math
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.INFO)
# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
"Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
"Comma-separated list of hostname:port pairs")
# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("hidden_units", 100,
"Number of units in the hidden layer of the NN")
tf.app.flags.DEFINE_string("data_dir", "/home/anijsure/mnist_data",
"Directory for storing mnist data")
tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")
FLAGS = tf.app.flags.FLAGS
IMAGE_PIXELS = 28
def main(_):
print "Starting"
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# Create a cluster from the parameter server and worker hosts.
print "Cluster starting"
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# Create and start a server for the local task.
print "Server starting"
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
print "Job : WORKER"
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
mytask = tf.constant(FLAGS.task_index, name="mytask")
mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
dataset = tf.data.Dataset.from_tensor_slices((mnist.train.images, mnist.train.labels))
# Create batches of data
dataset = dataset.batch(FLAGS.batch_size)
# Create an iterator, to go over the dataset
iterator = dataset.make_initializable_iterator()
X,Y = iterator.get_next()
# Variables of the hidden layer
hid_w = tf.Variable(
tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
stddev=1.0 / IMAGE_PIXELS), name="hid_w")
hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
# Variables of the softmax layer
sm_w = tf.Variable(
tf.truncated_normal([FLAGS.hidden_units, 10],
stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
hid_lin = tf.nn.xw_plus_b(X, hid_w, hid_b)
hid = tf.nn.relu(hid_lin)
y = tf.nn.xw_plus_b(hid, sm_w, sm_b)
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=Y, logits=y), name="loss")
global_step = tf.train.get_or_create_global_step()
train_op = tf.train.AdagradOptimizer(0.01).minimize(
loss, global_step=global_step)
# The StopAtStepHook handles stopping after running given steps.
chiefhooks=[tf.train.StopAtStepHook(num_steps=25)]
allhooks=[tf.train.LoggingTensorHook(tensors={"Task": "mytask","loss":"loss", "Step":"global_step"}, every_n_iter=1)]
# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(FLAGS.task_index == 0),
checkpoint_dir="/tmp/train_logs_%d" % FLAGS.task_index,
hooks=allhooks, chief_only_hooks=chiefhooks) as mon_sess:
mon_sess.run(iterator.initializer)
while not mon_sess.should_stop():
# Run a training step asynchronously.
# See `tf.train.SyncReplicasOptimizer` for additional details on how to
# perform *synchronous* training.
# mon_sess.run handles AbortedError in case of preempted PS.
_ = mon_sess.run([train_op])
if __name__ == "__main__":
tf.app.run()
我像这样运行它:
HOSTS=<node0>:2222
WORKERS=<node1>:2222,<node1>:2223,<node1>:2224
python mnist_trainer.py --ps_hosts=$HOSTS --worker_hosts=$WORKERS --job_name=ps --task_index=0 &
python mnist_trainer.py --data_dir mnist_data --ps_hosts=$HOSTS --worker_hosts=$WORKERS --job_name=worker --task_index=0 2>&1 | tee worker0.log &
python mnist_trainer.py --data_dir mnist_data_1 --ps_hosts=$HOSTS --worker_hosts=$WORKERS --job_name=worker --task_index=1 2>&1 | tee worker1.log &
python mnist_trainer.py --data_dir mnist_data_2 --ps_hosts=$HOSTS --worker_hosts=$WORKERS --job_name=worker --task_index=2 2>&1 | tee worker2.log &
我已经用 1 个 PS 和 2 或 3 个工人尝试过这个 - 两个节点都是 CPU 机器。PS在node0上,workers在node1上都是不同的端口。在 2 或 3 名工人的情况下,首席工人(task0 工人)似乎根本没有进行任何更新。我只在首席工人上将 StopatStepHook 设置为 25。然而,培训似乎停止在 global_step=549 和 2 个工人案例和 global_step=1098 和 3 个工人案例。我正在使用 LoggingTensorHook 打印工作任务#,它只显示任务 1 和 2 记录任何内容。只有在最后一次迭代中,任务 0 才会记录张量。
这是预期的行为吗?首席员工是否应该只跟踪监控会话、检查点等?
考虑到训练确实停止在这个神奇的 550 次迭代,首席工人身上的某些东西确实触发了停止。
首席工人在做什么以及如何跟踪停止步骤?
解决方案
通常由首席工作者负责训练集群的操作initialize graph
。save model checkpoint
推荐阅读
- javascript - 尽管凭据 ='include',但每次 CORS POST 的会话 ID 都会更改
- python-2.7 - ConflictRefiner 不提供任何输出
- reactjs - Codesandbox “Preview on edit”不适用于 Aurelia,但适用于其他框架和原生 HTML
- java - 如何使 module-info.java 保持最新
- jquery - 当我在申请中使用凭据调用 HTTP.GET 时,角度 6 中的标题不起作用
- laravel - 如何通过具有最具体的关系来排序 Eloquent 中的行?
- c - C - 查找元素的频率失败,数字为 0
- javascript - 2 个具有最小和最大日期的依赖日期选择器
- c# - 从 EF 中的集合中删除项目 - 奇怪的问题
- r - 在 CentOS 的 fftwtools 上安装 R 包需要 fftw3