End-to-End Example for Distributed TensorFlow

Current computing technologies for AI (by giant Google, Microsoft, etc) is focusing more practical IT infrastructures or services, which enables to run high-throughput and massive workloads with cloud infrastructure and device-acceleration (GPU or TPU) integrated. In such a situation, the programming for distributed computing is an important piece of technologies.

In this post I show you fundamentals for running distributed training with popular TensorFlow framework. I show the overview for your beginning through the development life cycle including provisioning, programming, running, and evaluation with the basic example.
Here we use regular Distributed TensorFlow with standard functions (only pure TensorFlow library without Keras or other helper functions), but it will also help you understand other distributed frameworks like Horovod, TensorFlowOnSpark, etc. First I use usual session (here I use MonitoredTrainingSession) for distributed training, and next I show you with new Estimator (high-level API) and see its benefits.

I don’t go so far into topology or programming patterns for distributed computing ideas itself (I focus on the tutorial for your first understanding), but I hope this post helps you to start your distributed computing by popular TensorFlow.

Preparing your multiple machines (cluster)

Before starting, you must prepare your multiple machines with python and TensorFlow installed and configured (GPU-accelerated, etc).

Here I don’t explain how to setup your environment, but you can also use Google Cloud Machine Learning Engine (Cloud ML) or Azure Batch AI, which significantly simplifies your provisioning rather than the basic cloud infrastructure (IaaS) like Google Compute VM, Amazon EC2, or Azure VM. (Google Cloud ML is fully-managed and Azure Batch AI is IaaS based. For both Google Cloud ML and Azure Batch AI, you need to specify the following cluster spec and start the following MonitoredTrainingSession in your code.)

In this post I used Azure Batch AI for the following samples and here is useful resource for your first start of Azure Batch AI. In the practical execution, you can also use infini-band network for inter-node communications.

Topology (Brief Overview)

There exist a lot of resources (articles) explaining the topology and programming of Distributed TensorFlow, but let me summarize brief concepts before starting. (You need to know for running your code.)

Distributed TensorFlow consists of 3 types of computing nodes, called “parameter node”, “worker node”, and “master node”.

Computing session runs on multiple (or single) worker nodes. Computed parameters are kept by the parameter node and shared with workers. (You can also run multiple parameter nodes for each parameter blocks, which enables high-throughput IOs for writing and reading parameters. If you don’t specify ps (=parameter server) tasks for variables, the round-robin strategy over all ps tasks is applied.)
One of workers (among worker nodes) must be master node (chief) and master coordinates all workloads in each worker nodes.

Programming Sample

The following is our complete code for Distributed TensorFlow with usual session. (Later I show you using Estimator class instead of using low-level MonitoredTrainingSession.) Because I want to focus only on our concerns, here I use simple graph (neural network) with well-known MNIST (hand-writing digits) dataset.
I referred here (non-distributed training example) which only use standard TensorFlow functions without any other helper classes or functions and now I modified this original code for Distributed TensorFlow. (The highlighted line is modified for Distributed TensorFlow.) Please compare the following distributed one with the original non-distributed one.

Here we use asynchronous training, with which each nodes has independent training loop. Moreover I’m here using QueueRunner (FIFOQueue) for batch reading, but you can also use tf.data functions instead.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import sys
import argparse
import math

import tensorflow as tf

FLAGS = None
batch_size = 100

cluster = None
server = None
is_chief = False

def main(_):
  with tf.device(tf.train.replica_device_setter(
    worker_device="/job:%s/task:%d" % (FLAGS.job_name, FLAGS.task_index),
    cluster=cluster)):

    ###
    ### Training
    ###

    #
    # read training data
    #

    # image - 784 (=28 x 28) elements of grey-scaled integer value [0, 1]
    # label - digit (0, 1, ..., 9)
    train_queue = tf.train.string_input_producer(
      [FLAGS.train_file],
      num_epochs = 2) # data is repeated and it raises OutOfRange when data is over
    train_reader = tf.TFRecordReader()
    _, train_serialized_exam = train_reader.read(train_queue)
    train_exam = tf.parse_single_example(
      train_serialized_exam,
      features={
        'image_raw': tf.FixedLenFeature([], tf.string),
        'label': tf.FixedLenFeature([], tf.int64)
      })
    train_image = tf.decode_raw(train_exam['image_raw'], tf.uint8)
    train_image.set_shape([784])
    train_image = tf.cast(train_image, tf.float32) * (1. / 255)
    train_label = tf.cast(train_exam['label'], tf.int32)
    train_batch_image, train_batch_label = tf.train.batch(
      [train_image, train_label],
      batch_size=batch_size)

    #
    # define training graph
    #

    # define input
    plchd_image = tf.placeholder(
      dtype=tf.float32,
      shape=(None, 784))
    plchd_label = tf.placeholder(
      dtype=tf.int32,
      shape=(None))
      
    # define network and inference
    # (simple 2 fully connected hidden layer : 784->128->64->10)
    with tf.name_scope('hidden1'):
      weights = tf.Variable(
        tf.truncated_normal(
          [784, 128],
          stddev=1.0 / math.sqrt(float(784))),
        name='weights')
      biases = tf.Variable(
        tf.zeros([128]),
        name='biases')
      hidden1 = tf.nn.relu(tf.matmul(plchd_image, weights) + biases)
    with tf.name_scope('hidden2'):
      weights = tf.Variable(
        tf.truncated_normal(
          [128, 64],
          stddev=1.0 / math.sqrt(float(128))),
        name='weights')
      biases = tf.Variable(
        tf.zeros([64]),
        name='biases')
      hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)
    with tf.name_scope('softmax_linear'):
      weights = tf.Variable(
        tf.truncated_normal(
          [64, 10],
          stddev=1.0 / math.sqrt(float(64))),
      name='weights')
      biases = tf.Variable(
        tf.zeros([10]),
        name='biases')
      logits = tf.matmul(hidden2, weights) + biases

    # define optimization
    global_step = tf.train.create_global_step() # start without checkpoint
    optimizer = tf.train.GradientDescentOptimizer(
      learning_rate=0.07)
    loss = tf.losses.sparse_softmax_cross_entropy(
      labels=plchd_label,
      logits=logits)
    train_op = optimizer.minimize(
      loss=loss,
      global_step=global_step)
    
    #
    # run session
    #

    with tf.train.MonitoredTrainingSession(
      master=server.target,
      checkpoint_dir=FLAGS.out_dir,
      is_chief=is_chief) as sess:
      
      # when data is over, OutOfRangeError occurs and ends with MonitoredSession
      
      local_step_value = 0
      array_image, array_label = sess.run(
        [train_batch_image, train_batch_label])
      while not sess.should_stop():
        feed_dict = {
          plchd_image: array_image,
          plchd_label: array_label
        }
        _, global_step_value, loss_value, array_image, array_label = sess.run(
          [train_op, global_step, loss, train_batch_image, train_batch_label],
          feed_dict=feed_dict)
        local_step_value += 1
        if local_step_value % 100 == 0: # You can also use tf.train.LoggingTensorHook for output
          print("Local Step %d, Global Step %d (Loss: %.2f)" % (local_step_value, global_step_value, loss_value))

    print('training finished')
            
if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument(
    '--train_file',
    type=str,
    default='/home/demouser/train.tfrecords',
    help='File path for the training data.')
  parser.add_argument(
    '--out_dir',
    type=str,
    default='/home/demouser/out',
    help='Dir path for the model and checkpoint output.')
  parser.add_argument(
    '--job_name',
    type=str,
    required=True,
    help='job name (parameter or worker) for cluster')
  parser.add_argument(
    '--task_index',
    type=int,
    required=True,
    help='index number in job for cluster')
  FLAGS, unparsed = parser.parse_known_args()

  # start server
  cluster = tf.train.ClusterSpec({
    'ps': ['10.0.0.6:2222'],
    'worker': [
      '10.0.0.4:2222',
      '10.0.0.5:2222'
    ]})
  server = tf.train.Server(
    cluster,
    job_name=FLAGS.job_name,
    task_index=FLAGS.task_index)
  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":
    is_chief = (FLAGS.task_index == 0)
    tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

Now I pick up and explain about our Distributed TensorFlow example.

First, we start tf.train.Server in each nodes to communicate each other with gRPC protocol.
As you can see below, each server is having corresponding each role (later we set job_name and task_index using command line options), and here I assume one parameter node (10.0.0.6) and two worker nodes (10.0.0.4 and 10.0.0.5), in which 10.0.0.4 is master (chief) node.

def main(_):
  with tf.device(tf.train.replica_device_setter(
    worker_device="/job:%s/task:%d" % (FLAGS.job_name, FLAGS.task_index),
    cluster=cluster)):
    ...
            
if __name__ == '__main__':
  ...

  # start server
  cluster = tf.train.ClusterSpec({
    'ps': ['10.0.0.6:2222'],
    'worker': [
      '10.0.0.4:2222',
      '10.0.0.5:2222'
    ]})
  server = tf.train.Server(
    cluster,
    job_name=FLAGS.job_name,
    task_index=FLAGS.task_index)
  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":
    is_chief = (FLAGS.task_index == 0)
    tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

In this example, we’re assigning each one task (role) for each node (computing machine), but you can also assign one task for each devices (CPUs or GPUs) on the single machine as follows.

...
worker_device = "/job:%s/task:%d/gpu:%d" % (FLAGS.job_name, FLAGS.task_index, 0)
with tf.device(tf.train.replica_device_setter(
  worker_device=worker_device,
  ps_device="/job:ps/cpu:0",
  cluster=cluster)):
  ...

In this example, we set 2 as num_epochs for reading data (train.tfrecords). Therefore, after data is read (iterated) twice by cycle, OutOfRangeError occurs and session (MonitoredTrainingSession) is correctly closed.

...
train_queue = tf.train.string_input_producer(
  [FLAGS.train_file],
  num_epochs = 2)
...

After the graph is constructed, we run the distributed session using tf.train.MonitoredTrainingSession. tf.train.MonitoredTrainingSession can also be used in non-distributed training, but it’s distribution-aware session and enables us to eliminate several boilerplate code for distributed training. As you can see, the original sample (non-distributed one) manually started queue runner (QueueRunner) for batch-reading, but tf.train.MonitoredTrainingSession launches queue thread in the background and we don’t need to start queue runner explicitly. (You can also use tf.train.Supervisor for distributed session, but tf.train.MonitoredTrainingSession is recommended now.)
In the session we read train_batch_image and train_batch_label (each 100 rows of features and labels) sequentially and run train_op and estimate loss .

...
with tf.train.MonitoredTrainingSession(
  master=server.target,
  checkpoint_dir=FLAGS.out_dir,
  is_chief=is_chief) as sess:
  
  local_step_value = 0
  array_image, array_label = sess.run(
    [train_batch_image, train_batch_label])
  while not sess.should_stop():
    feed_dict = {
      plchd_image: array_image,
      plchd_label: array_label
    }
    _, global_step_value, loss_value, array_image, array_label = sess.run(
      [train_op, global_step, loss, train_batch_image, train_batch_label],
      feed_dict=feed_dict)
    local_step_value += 1
    if local_step_value % 100 == 0: # You can also use tf.train.LoggingTensorHook for output
      print("Local Step %d, Global Step %d (Loss: %.2f)" % (local_step_value, global_step_value, loss_value))
...

In asynchronous approach, you can also implement to detect the completion of each nodes automatically with enqueue/dequeue mechanism and you can proceed the consequent steps as follows.

Run and Check Our Model

Now let’s start our program with the following command.
As I mentioned earlier, here we’re assuming one parameter server (10.0.0.6) and two worker serves (10.0.0.4 and 10.0.0.5).

Parameter Node (10.0.0.6)

python mnist_tf_dist.py \
  --job_name=ps \
  --task_index=0

Worker Node 1 (10.0.0.4)

python mnist_tf_dist.py \
  --job_name=worker \
  --task_index=0

Worker Node 2 (10.0.0.5)

python mnist_tf_dist.py \
  --job_name=worker \
  --task_index=1

Once the parameter server is started and all workers enter in MonitoredTrainingSession, the training (the loop of train_op) starts in multiple workers and each nodes respectively (independently) read data on corresponding node.
If you use the shared NFS storage or cloud storage, all nodes can read the same data respectively. (You don’t need to deploy data in multiple nodes respectively.)

Output (2 workers – 10.0.0.4 and 10.0.0.5)

As you can see in our source code, we’re setting checkpoint_dir in session. By this setting, the checkpoint data (tensor objects, variables, etc) are all saved in the directory and you can restore and restart your training later again. You can use gathered checkpoint data as you need. (retrain, test, etc)
For example, when you copy these checkpoint data in the specific local folder, you can test the model’s accuracy using the following local (non-distributed) python code. (Note that the meta is saved in worker node, but the index and data is saved in parameter node separately. You must gather these data in one location or use shared (mounted) checkpoint directory for running the following code.)

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import sys
import argparse
import math

import tensorflow as tf

FLAGS = None
batch_size = 100
                 
def main(_):
  #
  # define graph (to be restored !)
  #

  # define input
  plchd_image = tf.placeholder(
    dtype=tf.float32,
    shape=(batch_size, 784))
  plchd_label = tf.placeholder(
    dtype=tf.int32,
    shape=(batch_size))
    
  # define network and inference
  with tf.name_scope('hidden1'):
    weights = tf.Variable(
      tf.truncated_normal(
        [784, 128],
        stddev=1.0 / math.sqrt(float(784))),
      name='weights')
    biases = tf.Variable(
      tf.zeros([128]),
      name='biases')
    hidden1 = tf.nn.relu(tf.matmul(plchd_image, weights) + biases)
  with tf.name_scope('hidden2'):
    weights = tf.Variable(
      tf.truncated_normal(
        [128, 64],
        stddev=1.0 / math.sqrt(float(128))),
      name='weights')
    biases = tf.Variable(
      tf.zeros([64]),
      name='biases')
    hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)
  with tf.name_scope('softmax_linear'):
    weights = tf.Variable(
      tf.truncated_normal(
        [64, 10],
        stddev=1.0 / math.sqrt(float(64))),
    name='weights')
    biases = tf.Variable(
      tf.zeros([10]),
      name='biases')
    logits = tf.matmul(hidden2, weights) + biases

  #
  # Restore and Testing
  #

  ckpt = tf.train.get_checkpoint_state(FLAGS.out_dir)
  idex = int(ckpt.model_checkpoint_path.split('/')[-1].split('-')[-1])
  
  saver = tf.train.Saver()
  
  with tf.Session() as sess:
    # restore graph
    saver.restore(sess, ckpt.model_checkpoint_path)
    graph = tf.get_default_graph()

    # add to graph - read test data
    test_queue = tf.train.string_input_producer(
      [FLAGS.test_file],
      num_epochs = 1) # when data is over, it raises OutOfRange
    test_reader = tf.TFRecordReader()
    _, test_serialized_exam = test_reader.read(test_queue)
    test_exam = tf.parse_single_example(
      test_serialized_exam,
      features={
        'image_raw': tf.FixedLenFeature([], tf.string),
        'label': tf.FixedLenFeature([], tf.int64)
      })
    test_image = tf.decode_raw(test_exam['image_raw'], tf.uint8)
    test_image.set_shape([784])
    test_image = tf.cast(test_image, tf.float32) * (1. / 255)
    test_label = tf.cast(test_exam['label'], tf.int32)
    test_batch_image, test_batch_label = tf.train.batch(
      [test_image, test_label],
      batch_size=batch_size)

    # add to graph - test (evaluate) graph
    array_correct = tf.nn.in_top_k(logits, plchd_label, 1)
    test_op = tf.reduce_sum(tf.cast(array_correct, tf.int32))
    
    # run
    sess.run(tf.initialize_local_variables())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord) # for data batching    
    num_test = 0
    num_true = 0
    array_image, array_label = sess.run(
      [test_batch_image, test_batch_label])
    try:
      while True:
        feed_dict = {
          plchd_image: array_image,
          plchd_label: array_label
        }
        batch_num_true, array_image, array_label = sess.run(
          [test_op, test_batch_image, test_batch_label],
          feed_dict=feed_dict)
        num_true += batch_num_true
        num_test += batch_size
    except tf.errors.OutOfRangeError:
      print('Scoring done !')
    precision = float(num_true) / num_test
    print('Accuracy: %0.04f (Num of samples: %d)' %
      (precision, num_test))         

if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument(
    '--test_file',
    type=str,
    default='/home/demouser/test.tfrecords',
    help='File path for the test data.')
  parser.add_argument(
    '--out_dir',
    type=str,
    default='/home/demouser/out',
    help='Dir path for the model and checkpoint output.')
  FLAGS, unparsed = parser.parse_known_args()
  
  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

 

Simplify with new Estimator class

You can also use high-level TensorFlow API – tf.estimator.Estimator class – for running Distributed TensorFlow with good modularity. (Recent official samples are often written by this API.)
Let’s see the brief example.

Before you run your code, you must set the following TF_CONFIG environment. As you see later, your cluster configuration settings is not involved in your logic code by this environment settings.

I note that it’s different from the previous tf.train.ClusterSpec settings. As you can see, the chief node is explicitly specified and the worker doesn’t include the chief node.
Same as previous example, chief worker also run the training task, similar to other workers, with this settings.

Parameter Node (10.0.0.6)

export TF_CONFIG=$(cat <<<'
{
  "cluster": {
    "chief": ["10.0.0.4:2222"],
    "ps": ["10.0.0.6:2222"],
    "worker": ["10.0.0.5:2222"]
  },
  "task": {
    "index": 0,
    "type": "ps"
  }
}')

Worker Node 1 (10.0.0.4)

export TF_CONFIG=$(cat <<<'
{
  "cluster": {
    "chief": ["10.0.0.4:2222"],
    "ps": ["10.0.0.6:2222"],
    "worker": ["10.0.0.5:2222"]
  },
  "task": {
    "index": 0,
    "type": "chief"
  }
}')

Worker Node 2 (10.0.0.5)

export TF_CONFIG=$(cat <<<'
{
  "cluster": {
    "chief": ["10.0.0.4:2222"],
    "ps": ["10.0.0.6:2222"],
    "worker": ["10.0.0.5:2222"]
  },
  "task": {
    "index": 0,
    "type": "worker"
  }
}')

Now we change our previous code using tf.estimator.Estimator as follows. In this code, we statistically define our input function and model function using Estimator, and run training and evaluation with tf.estimator.train_and_evaluate() function. (Here we define custom Estimator, but you can also use pre-made Estimator, which inherits Estimator class, like LinearClassifier, DNNClassifier, etc.)

As you can see, this code has no difference between distributed and non-distributed.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
 
import sys
import argparse
import math
 
import tensorflow as tf

FLAGS = None
batch_size = 100

def _my_input_fn(filepath, num_epochs):
  # image - 784 (=28 x 28) elements of grey-scaled integer value [0, 1]
  # label - digit (0, 1, ..., 9)
  data_queue = tf.train.string_input_producer(
    [filepath],
    num_epochs = num_epochs) # data is repeated and it raises OutOfRange when data is over
  data_reader = tf.TFRecordReader()
  _, serialized_exam = data_reader.read(data_queue)
  data_exam = tf.parse_single_example(
    serialized_exam,
    features={
      'image_raw': tf.FixedLenFeature([], tf.string),
      'label': tf.FixedLenFeature([], tf.int64)
    })
  data_image = tf.decode_raw(data_exam['image_raw'], tf.uint8)
  data_image.set_shape([784])
  data_image = tf.cast(data_image, tf.float32) * (1. / 255)
  data_label = tf.cast(data_exam['label'], tf.int32)
  data_batch_image, data_batch_label = tf.train.batch(
    [data_image, data_label],
    batch_size=batch_size)
  return data_batch_image, data_batch_label

def _get_input_fn(filepath, num_epochs):
  return lambda: _my_input_fn(filepath, num_epochs)

def _my_model_fn(features, labels, mode):
  # with tf.device(...): # You can set device if using GPUs
  
  # define network and inference
  # (simple 2 fully connected hidden layer : 784->128->64->10)
  with tf.name_scope('hidden1'):
    weights = tf.Variable(
      tf.truncated_normal(
        [784, 128],
        stddev=1.0 / math.sqrt(float(784))),
      name='weights')
    biases = tf.Variable(
      tf.zeros([128]),
      name='biases')
    hidden1 = tf.nn.relu(tf.matmul(features, weights) + biases)
  with tf.name_scope('hidden2'):
    weights = tf.Variable(
      tf.truncated_normal(
        [128, 64],
        stddev=1.0 / math.sqrt(float(128))),
      name='weights')
    biases = tf.Variable(
      tf.zeros([64]),
      name='biases')
    hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)
  with tf.name_scope('softmax_linear'):
    weights = tf.Variable(
      tf.truncated_normal(
        [64, 10],
        stddev=1.0 / math.sqrt(float(64))),
    name='weights')
    biases = tf.Variable(
      tf.zeros([10]),
      name='biases')
    logits = tf.matmul(hidden2, weights) + biases

  # compute evaluation matrix
  predicted_indices = tf.argmax(input=logits, axis=1)
  if mode != tf.estimator.ModeKeys.PREDICT:
    label_indices = tf.cast(labels, tf.int32) 
    accuracy = tf.metrics.accuracy(label_indices, predicted_indices)
    tf.summary.scalar('accuracy', accuracy[1]) # output to TensorBoard

  # compute loss
  loss = tf.losses.sparse_softmax_cross_entropy(
    labels=labels,
    logits=logits)

  # define operations
  if mode == tf.estimator.ModeKeys.TRAIN:
    #global_step = tf.train.create_global_step()
    #global_step = tf.contrib.framework.get_or_create_global_step()
    global_step = tf.train.get_or_create_global_step()    
    optimizer = tf.train.GradientDescentOptimizer(
      learning_rate=0.07)
    train_op = optimizer.minimize(
      loss=loss,
      global_step=global_step)
    return tf.estimator.EstimatorSpec(
      mode,
      loss=loss,
      train_op=train_op)
  if mode == tf.estimator.ModeKeys.EVAL:
    eval_metric_ops = {
      'accuracy': accuracy
    }
    return tf.estimator.EstimatorSpec(
      mode,
      loss=loss,
      eval_metric_ops=eval_metric_ops)
  if mode == tf.estimator.ModeKeys.PREDICT:
    probabilities = tf.nn.softmax(logits, name='softmax_tensor')
    predictions = {
      'classes': predicted_indices,
      'probabilities': probabilities
    }
    export_outputs = {
      'prediction': tf.estimator.export.PredictOutput(predictions)
    }
    return tf.estimator.EstimatorSpec(
      mode,
      predictions=predictions,
      export_outputs=export_outputs)

def main(_):
  # read TF_CONFIG
  run_config = tf.contrib.learn.RunConfig()

  # define
  mnist_fullyconnected_classifier = tf.estimator.Estimator(
    model_fn=_my_model_fn,
    model_dir=FLAGS.out_dir,
    config=run_config)
  train_spec = tf.estimator.TrainSpec(
    input_fn=_get_input_fn(FLAGS.train_file, 2),
    max_steps=60000 * 2 / batch_size)
  eval_spec = tf.estimator.EvalSpec(
    input_fn=_get_input_fn(FLAGS.test_file, 1),
    steps=10000 * 1 / batch_size,
    start_delay_secs=0)
    
  # run !
  tf.estimator.train_and_evaluate(
    mnist_fullyconnected_classifier,
    train_spec,
    eval_spec
  )
             
if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument(
    '--train_file',
    type=str,
    default='/home/demouser/train.tfrecords',
    help='File path for the training data.')
  parser.add_argument(
    '--test_file',
    type=str,
    default='/home/demouser/test.tfrecords',
    help='File path for the test data.')
  parser.add_argument(
    '--out_dir',
    type=str,
    default='/home/demouser/out',
    help='Dir path for the model and checkpoint output.')
  FLAGS, unparsed = parser.parse_known_args()
 
  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

You cannot see the train loop (while clause for mini-batch) in this code, but you can use custom tf.train.SessionRunHook or tf.train.LoggingTensorHook for your custom code (ex : debug printing, etc) in the train loop.

Now you can run this code with the following command. As we saw earlier, we don’t have cluster specific settings in our programming code, then you can run the same command (without specific command options) in all nodes as follows.

Run on all nodes (10.0.0.4 – 10.0.0.6)

python mnist_tf_dist_estimator.py

The output result is written in model directory (model_dir in Estimator) and you can predict (infer) with predict() method in tf.estimator.Estimator instead of using session loop.

 

Works with TensorBoard

You can also see the summarised results in model directory with TensorBoard UI by running the TensorBoard server with the following command. Here logdir is your model directory. (Same as previous note in our example, the results are saved separately in the nodes and you must locate these results in one directory or use shared model directory.)

tensorboard --logdir=/home/demouser/out

Open TensorBoard UI (http://localhost:6006) using your web browser, and you can see the transition of accuracy, because here we collected the accuracy values in our sample code.

Note : When you use remote terminal (Windows, Mac, etc), please open the port on your server’s firewall settings (accessed by http://{your server name}:6006), or configure the SSH tunnel with your terminal client (accessed by http://localhost:6006). The following is the setting example in PuTTY client.

 

Save Model for Production

When you have finished optimization in your training experimentation, you can save your final graph into model file as follows :

def _my_serving_input_fn():
  inputs = {'inputs': tf.placeholder(tf.float32, [None, 784])}
  return tf.estimator.export.ServingInputReceiver(inputs, inputs)

# save model and variables
model_dir = mnist_fullyconnected_classifier.export_savedmodel(
  export_dir_base = './model',
  serving_input_receiver_fn = _my_serving_input_fn)
print('model is saved ', model_dir)

Once you have got your model, you can load your model and serve prediction (inference) on a single node without Estimator.
(Here we use contrib library, but you can also use gRPC for loading in local.)

# Read test data by tensor
dataset = tf.data.TFRecordDataset('/home/demouser/test.tfrecords')
iterator = dataset.make_one_shot_iterator()
data_org = iterator.get_next()
data_exam = tf.parse_single_example(
  data_org,
  features={
    'image_raw': tf.FixedLenFeature([], tf.string),
    'label': tf.FixedLenFeature([], tf.int64)
  })
data_image = tf.decode_raw(data_exam['image_raw'], tf.uint8)
data_image.set_shape([784])
data_image = tf.cast(data_image, tf.float32) * (1. / 255)
data_label = tf.cast(data_exam['label'], tf.int32)

# Run tensor and generate 3 top data as array
with tf.Session() as sess:
  image_arr = []
  label_arr = []
  for i in range(3):
    image, label = sess.run([data_image, data_label])
    image_arr.append(image)
    label_arr.append(label)

# Predict 3 top data
pred_fn = tf.contrib.predictor.from_saved_model(model_dir)
pred = pred_fn({'inputs': image_arr})

# Output and Compare
print('Predicted Values:', pred['classes'])
print('Real Values:', label_arr)

 

Advertisements