首页 > 解决方案 > 分布式张量流:首席工作者的工作是什么?

问题描述

我正在使用分布式张量流示例的一个版本https://www.tensorflow.org/deploy/distributed 这是我在“mnist_trainer.py”中的代码。

import math
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.INFO)

# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")

# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("hidden_units", 100,
                            "Number of units in the hidden layer of the NN")
tf.app.flags.DEFINE_string("data_dir", "/home/anijsure/mnist_data",
                           "Directory for storing mnist data")
tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")

FLAGS = tf.app.flags.FLAGS

IMAGE_PIXELS = 28

def main(_):
  print "Starting"
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")

  # Create a cluster from the parameter server and worker hosts.
  print "Cluster starting"
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

  # Create and start a server for the local task.
  print "Server starting"
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":
    print "Job : WORKER"

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):
      mytask = tf.constant(FLAGS.task_index, name="mytask")

      mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
      dataset = tf.data.Dataset.from_tensor_slices((mnist.train.images, mnist.train.labels))
      # Create batches of data
      dataset = dataset.batch(FLAGS.batch_size)
      # Create an iterator, to go over the dataset
      iterator = dataset.make_initializable_iterator()
      X,Y = iterator.get_next()

      # Variables of the hidden layer
      hid_w = tf.Variable(
          tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
                              stddev=1.0 / IMAGE_PIXELS), name="hid_w")
      hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")

      # Variables of the softmax layer
      sm_w = tf.Variable(
          tf.truncated_normal([FLAGS.hidden_units, 10],
                              stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
          name="sm_w")
      sm_b = tf.Variable(tf.zeros([10]), name="sm_b")

      hid_lin = tf.nn.xw_plus_b(X, hid_w, hid_b)
      hid = tf.nn.relu(hid_lin)

      y = tf.nn.xw_plus_b(hid, sm_w, sm_b)
      loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=Y, logits=y), name="loss")

      global_step = tf.train.get_or_create_global_step()

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)


    # The StopAtStepHook handles stopping after running given steps.
    chiefhooks=[tf.train.StopAtStepHook(num_steps=25)]
    allhooks=[tf.train.LoggingTensorHook(tensors={"Task": "mytask","loss":"loss", "Step":"global_step"}, every_n_iter=1)]

    # The MonitoredTrainingSession takes care of session initialization,
    # restoring from a checkpoint, saving to a checkpoint, and closing when done
    # or an error occurs.
    with tf.train.MonitoredTrainingSession(master=server.target,
                                           is_chief=(FLAGS.task_index == 0),
                                           checkpoint_dir="/tmp/train_logs_%d" % FLAGS.task_index,
                                           hooks=allhooks, chief_only_hooks=chiefhooks) as mon_sess:
      mon_sess.run(iterator.initializer)
      while not mon_sess.should_stop():
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.
        # mon_sess.run handles AbortedError in case of preempted PS.

        _ = mon_sess.run([train_op])

if __name__ == "__main__":
  tf.app.run()

我像这样运行它:

HOSTS=<node0>:2222
WORKERS=<node1>:2222,<node1>:2223,<node1>:2224

python mnist_trainer.py --ps_hosts=$HOSTS --worker_hosts=$WORKERS --job_name=ps --task_index=0 &
python mnist_trainer.py --data_dir mnist_data --ps_hosts=$HOSTS --worker_hosts=$WORKERS --job_name=worker --task_index=0 2>&1 | tee worker0.log &
python mnist_trainer.py --data_dir mnist_data_1 --ps_hosts=$HOSTS --worker_hosts=$WORKERS --job_name=worker --task_index=1 2>&1 | tee worker1.log &
python mnist_trainer.py --data_dir mnist_data_2 --ps_hosts=$HOSTS --worker_hosts=$WORKERS --job_name=worker --task_index=2 2>&1 | tee worker2.log &

我已经用 1 个 PS 和 2 或 3 个工人尝试过这个 - 两个节点都是 CPU 机器。PS在node0上,workers在node1上都是不同的端口。在 2 或 3 名工人的情况下,首席工人(task0 工人)似乎根本没有进行任何更新。我只在首席工人上将 StopatStepHook 设置为 25。然而,培训似乎停止在 global_step=549 和 2 个工人案例和 global_step=1098 和 3 个工人案例。我正在使用 LoggingTensorHook 打印工作任务#,它只显示任务 1 和 2 记录任何内容。只有在最后一次迭代中,任务 0 才会记录张量。

这是预期的行为吗?首席员工是否应该只跟踪监控会话、检查点等?

考虑到训练确实停止在这个神奇的 550 次迭代,首席工人身上的某些东西确实触发了停止。

首席工人在做什么以及如何跟踪停止步骤?

标签: tensorflow

解决方案


通常由首席工作者负责训练集群的操作initialize graphsave model checkpoint


推荐阅读