TensorFlow Performance by GPU and TPU (w/ screen recording)

Everybody knows the performance of deep AI workloads depends a lot on devices like GPUs. But how fast is it ? TPU has really robust performance compared with GPUs ?
Here I show you the insight for these concerns by simple experimentation.

For example, the following is the demonstration for running same TensorFlow training task (ResNet network for CIFAR-10 dataset) on both CPU (left side) and NVIDIA Tesla K80 (right side). Here we used “n1-highmem-8” VM instance on Google Compute Engine.
As you can easily see, it apparently needs a lot of time to train on CPU architecture. (It’s obvious without need to run detailed benchmark testing.)

In this post, I just simply show you its performance impact of single device with the same example of code for your intuition. (It will be very simple benchmarks.) I don’t discuss about detailed tuning techniques such as memory layouts or parallelism for model optimization, but I hope this can give you a sense for performance impact by devices.

Here I use CIFRA-10 (50,000 training images which size is 32 x 32 by 3 RGB channels) for dataset and train using ResNet-32 convolutional network (without bottleneck) which learning rate is scheduled : 0.1 (< 40,000 steps), 0.01 (< 60,000 steps), 0.001 (< 80,000 steps), 0.0001 (>= 80,000 steps).

The layers of this network is not so deep and data is not so large such as ImageNet, but it will be enough to understand how the device effects the performance here.

Performance (Steps per Second)

For performance comparison, I simply show you the transition of {the number of training steps}/{second} for the first thousands of steps with TensorBoard screen-capturing. (It’s not difficult and you can soon trace the same procedure.)

For example, the previous case (comparison with CPU and K80) achieves the following performance. As you can see, it is about 8x – 9x faster than CPU by using NVIDIA Tesla K80 utilized device.

Following is the result for the latest NVIDIA Tesla V100 (Volta architecture).
Currently Google doesn’t offer V100 instance as Compute VM and here I used Azure Virtual Machines NC6s_v3 for benchmarks.

As you can see, now it achieves about 50x faster compared with the general purpose CPU device (previous n1-highmem-8 machine).

The intuitive speed is like that :

Finally, I show you the result using Google Cloud TPU (TPUv2). Note that here I specified 1 replica, but 8 replicas are the expected behavior for using Cloud TPUs. (See “Cloud TPU – Troubleshooting and FAQ” for details.)
As you can see, the speed is about 2x faster than the latest Volta GPU (V100) for 1 device (not distributed).

In Conclusion

So which one is the best choice ?

As we saw earlier, this post shows that TPU is the fastest choice for TensorFlow, but it’s important to remember that TPU has some caveats, such as :

  • TPU supports only TensorFlow (incl. Keras on TensorFlow) and you cannot bring Theano, CNTK, Caffe or other major frameworks on TPU devices.
  • Currently the programming code depends on TPU and therefore you cannot debug your code on other devices.

For instance, it’s well known that Cognitive Toolkit (CNTK) is 2x – 5x faster than TensorFlow when using RNN (incl. LSTM). Depending on the real situations, it sometimes might be the same speed or faster to run CNTK on V100 rather than TensorFlow on TPU, because TPU is not so remarkable (it’s still close) compared with the latest GPU architecture.
Here I don’t go so far, but you can also take the latest multiple GPU communications’ technologies with NVIDIA software for speed-up.

Moreover here I showed only training performance, but, during inference (real-time predictions or online predictions), you can take other choices like TensorRT, Tensor Core architecture on V100 (NVIDIA says it’s faster than TPU), or Microsoft FPGA technologies (also Microsoft says it’s faster than TPU).

In conclusion, the latest P100, V100 or TPU has surely remarkable performance against old architectures, and these are helpful for your AI workloads with deep neural layers. But, for the latest V100 and TPUs, you must carefully consider which one is better depending on various aspects of real usage, such as the shape of generated graph, what framework you choose, human workloads in AI lifecycle, required numbers of distributions (devices), cost effectiveness, etc, etc.

Note that currently (April 2018) AWS EC2 p3.2xlarge is $3.06/hour (computing only), Azure VM NCv3 is $3.06/hour (computing only) and TPU is $6.50/hour (device consumption only).

Advertisements

End-to-End Example for Distributed TensorFlow

Current computing technologies for AI (by giant Google, Microsoft, etc) is focusing more practical IT infrastructures or services, which enables to run high-throughput and massive workloads with cloud infrastructure and device-acceleration (GPU or TPU) integrated. In such a situation, the programming for distributed computing is an important piece of technologies.

In this post I show you fundamentals for running distributed training with popular TensorFlow framework. I show the overview for your beginning through the development life cycle including provisioning, programming, running, and evaluation with the basic example.
Here we use regular Distributed TensorFlow with standard functions (only pure TensorFlow library without Keras or other helper functions), but it will also help you understand other distributed frameworks like Horovod, TensorFlowOnSpark, etc. First I use usual session (here I use MonitoredTrainingSession) for distributed training, and next I show you with new Estimator (high-level API) and see its benefits.

I don’t go so far into topology or programming patterns for distributed computing ideas itself (I focus on the tutorial for your first understanding), but I hope this post helps you to start your distributed computing by popular TensorFlow.

Preparing your multiple machines (cluster)

Before starting, you must prepare your multiple machines with python and TensorFlow installed and configured (GPU-accelerated, etc).

Here I don’t explain how to setup your environment, but you can also use Google Cloud Machine Learning Engine (Cloud ML) or Azure Batch AI, which significantly simplifies your provisioning rather than the basic cloud infrastructure (IaaS) like Google Compute VM, Amazon EC2, or Azure VM. (Google Cloud ML is fully-managed and Azure Batch AI is IaaS based. For both Google Cloud ML and Azure Batch AI, you need to specify the following cluster spec and start the following MonitoredTrainingSession in your code.)

In this post I used Azure Batch AI for the following samples and here is useful resource for your first start of Azure Batch AI. In the practical execution, you can also use infini-band network for inter-node communications.

Topology (Brief Overview)

There exist a lot of resources (articles) explaining the topology and programming of Distributed TensorFlow, but let me summarize brief concepts before starting. (You need to know for running your code.)

Distributed TensorFlow consists of 3 types of computing nodes, called “parameter node”, “worker node”, and “master node”.

Computing session runs on multiple (or single) worker nodes. Computed parameters are kept by the parameter node and shared with workers. (You can also run multiple parameter nodes for each parameter blocks, which enables high-throughput IOs for writing and reading parameters. If you don’t specify ps (=parameter server) tasks for variables, the round-robin strategy over all ps tasks is applied.)
One of workers (among worker nodes) must be master node (chief) and master coordinates all workloads in each worker nodes.

Programming Sample

The following is our complete code for Distributed TensorFlow with usual session. (Later I show you using Estimator class instead of using low-level MonitoredTrainingSession.) Because I want to focus only on our concerns, here I use simple graph (neural network) with well-known MNIST (hand-writing digits) dataset.
I referred here (non-distributed training example) which only use standard TensorFlow functions without any other helper classes or functions and now I modified this original code for Distributed TensorFlow. (The highlighted line is modified for Distributed TensorFlow.) Please compare the following distributed one with the original non-distributed one.

Here we use asynchronous training, with which each nodes has independent training loop. Moreover I’m here using QueueRunner (FIFOQueue) for batch reading, but you can also use tf.data functions instead.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import sys
import argparse
import math

import tensorflow as tf

FLAGS = None
batch_size = 100

cluster = None
server = None
is_chief = False

def main(_):
  with tf.device(tf.train.replica_device_setter(
    worker_device="/job:%s/task:%d" % (FLAGS.job_name, FLAGS.task_index),
    cluster=cluster)):

    ###
    ### Training
    ###

    #
    # read training data
    #

    # image - 784 (=28 x 28) elements of grey-scaled integer value [0, 1]
    # label - digit (0, 1, ..., 9)
    train_queue = tf.train.string_input_producer(
      [FLAGS.train_file],
      num_epochs = 2) # data is repeated and it raises OutOfRange when data is over
    train_reader = tf.TFRecordReader()
    _, train_serialized_exam = train_reader.read(train_queue)
    train_exam = tf.parse_single_example(
      train_serialized_exam,
      features={
        'image_raw': tf.FixedLenFeature([], tf.string),
        'label': tf.FixedLenFeature([], tf.int64)
      })
    train_image = tf.decode_raw(train_exam['image_raw'], tf.uint8)
    train_image.set_shape([784])
    train_image = tf.cast(train_image, tf.float32) * (1. / 255)
    train_label = tf.cast(train_exam['label'], tf.int32)
    train_batch_image, train_batch_label = tf.train.batch(
      [train_image, train_label],
      batch_size=batch_size)

    #
    # define training graph
    #

    # define input
    plchd_image = tf.placeholder(
      dtype=tf.float32,
      shape=(batch_size, 784))
    plchd_label = tf.placeholder(
      dtype=tf.int32,
      shape=(batch_size))
      
    # define network and inference
    # (simple 2 fully connected hidden layer : 784->128->64->10)
    with tf.name_scope('hidden1'):
      weights = tf.Variable(
        tf.truncated_normal(
          [784, 128],
          stddev=1.0 / math.sqrt(float(784))),
        name='weights')
      biases = tf.Variable(
        tf.zeros([128]),
        name='biases')
      hidden1 = tf.nn.relu(tf.matmul(plchd_image, weights) + biases)
    with tf.name_scope('hidden2'):
      weights = tf.Variable(
        tf.truncated_normal(
          [128, 64],
          stddev=1.0 / math.sqrt(float(128))),
        name='weights')
      biases = tf.Variable(
        tf.zeros([64]),
        name='biases')
      hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)
    with tf.name_scope('softmax_linear'):
      weights = tf.Variable(
        tf.truncated_normal(
          [64, 10],
          stddev=1.0 / math.sqrt(float(64))),
      name='weights')
      biases = tf.Variable(
        tf.zeros([10]),
        name='biases')
      logits = tf.matmul(hidden2, weights) + biases

    # define optimization
    global_step = tf.train.create_global_step() # start without checkpoint
    optimizer = tf.train.GradientDescentOptimizer(
      learning_rate=0.07)
    loss = tf.losses.sparse_softmax_cross_entropy(
      labels=plchd_label,
      logits=logits)
    train_op = optimizer.minimize(
      loss=loss,
      global_step=global_step)
    
    #
    # run session
    #

    with tf.train.MonitoredTrainingSession(
      master=server.target,
      checkpoint_dir=FLAGS.out_dir,
      is_chief=is_chief) as sess:
      
      # when data is over, OutOfRangeError occurs and ends with MonitoredSession
      
      local_step_value = 0
      array_image, array_label = sess.run(
        [train_batch_image, train_batch_label])
      while not sess.should_stop():
        feed_dict = {
          plchd_image: array_image,
          plchd_label: array_label
        }
        _, global_step_value, loss_value, array_image, array_label = sess.run(
          [train_op, global_step, loss, train_batch_image, train_batch_label],
          feed_dict=feed_dict)
        local_step_value += 1
        if local_step_value % 100 == 0: # You can also use tf.train.LoggingTensorHook for output
          print("Local Step %d, Global Step %d (Loss: %.2f)" % (local_step_value, global_step_value, loss_value))

    print('training finished')
            
if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument(
    '--train_file',
    type=str,
    default='/home/demouser/train.tfrecords',
    help='File path for the training data.')
  parser.add_argument(
    '--out_dir',
    type=str,
    default='/home/demouser/out',
    help='Dir path for the model and checkpoint output.')
  parser.add_argument(
    '--job_name',
    type=str,
    required=True,
    help='job name (parameter or worker) for cluster')
  parser.add_argument(
    '--task_index',
    type=int,
    required=True,
    help='index number in job for cluster')
  FLAGS, unparsed = parser.parse_known_args()

  # start server
  cluster = tf.train.ClusterSpec({
    'ps': ['10.0.0.6:2222'],
    'worker': [
      '10.0.0.4:2222',
      '10.0.0.5:2222'
    ]})
  server = tf.train.Server(
    cluster,
    job_name=FLAGS.job_name,
    task_index=FLAGS.task_index)
  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":
    is_chief = (FLAGS.task_index == 0)
    tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

Now I pick up and explain about our Distributed TensorFlow example.

First, we start tf.train.Server in each nodes to communicate each other with gRPC protocol.
As you can see below, each server is having corresponding each role (later we set job_name and task_index using command line options), and here I assume one parameter node (10.0.0.6) and two worker nodes (10.0.0.4 and 10.0.0.5), in which 10.0.0.4 is master (chief) node.

def main(_):
  with tf.device(tf.train.replica_device_setter(
    worker_device="/job:%s/task:%d" % (FLAGS.job_name, FLAGS.task_index),
    cluster=cluster)):
    ...
            
if __name__ == '__main__':
  ...

  # start server
  cluster = tf.train.ClusterSpec({
    'ps': ['10.0.0.6:2222'],
    'worker': [
      '10.0.0.4:2222',
      '10.0.0.5:2222'
    ]})
  server = tf.train.Server(
    cluster,
    job_name=FLAGS.job_name,
    task_index=FLAGS.task_index)
  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":
    is_chief = (FLAGS.task_index == 0)
    tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

In this example, we’re assigning each one task (role) for each node (computing machine), but you can also assign one task for each devices (CPUs or GPUs) on the single machine as follows.

...
worker_device = "/job:%s/task:%d/gpu:%d" % (FLAGS.job_name, FLAGS.task_index, 0)
with tf.device(tf.train.replica_device_setter(
  worker_device=worker_device,
  ps_device="/job:ps/cpu:0",
  cluster=cluster)):
  ...

In this example, we set 2 as num_epochs for reading data (train.tfrecords). Therefore, after data is read (iterated) twice by cycle, OutOfRangeError occurs and session (MonitoredTrainingSession) is correctly closed.

...
train_queue = tf.train.string_input_producer(
  [FLAGS.train_file],
  num_epochs = 2)
...

After the graph is constructed, we run the distributed session using tf.train.MonitoredTrainingSession. tf.train.MonitoredTrainingSession can also be used in non-distributed training, but it’s distribution-aware session and enables us to eliminate several boilerplate code for distributed training. As you can see, the original sample (non-distributed one) manually started queue runner (QueueRunner) for batch-reading, but tf.train.MonitoredTrainingSession launches queue thread in the background and we don’t need to start queue runner explicitly. (You can also use tf.train.Supervisor for distributed session, but tf.train.MonitoredTrainingSession is recommended now.)
In the session we read train_batch_image and train_batch_label (each 100 rows of features and labels) sequentially and run train_op and estimate loss .

...
with tf.train.MonitoredTrainingSession(
  master=server.target,
  checkpoint_dir=FLAGS.out_dir,
  is_chief=is_chief) as sess:
  
  local_step_value = 0
  array_image, array_label = sess.run(
    [train_batch_image, train_batch_label])
  while not sess.should_stop():
    feed_dict = {
      plchd_image: array_image,
      plchd_label: array_label
    }
    _, global_step_value, loss_value, array_image, array_label = sess.run(
      [train_op, global_step, loss, train_batch_image, train_batch_label],
      feed_dict=feed_dict)
    local_step_value += 1
    if local_step_value % 100 == 0: # You can also use tf.train.LoggingTensorHook for output
      print("Local Step %d, Global Step %d (Loss: %.2f)" % (local_step_value, global_step_value, loss_value))
...

In asynchronous approach, you can also implement to detect the completion of each nodes automatically with enqueue/dequeue mechanism and you can proceed the consequent steps as follows.

Run and Check Our Model

Now let’s start our program with the following command.
As I mentioned earlier, here we’re assuming one parameter server (10.0.0.6) and two worker serves (10.0.0.4 and 10.0.0.5).

Parameter Node (10.0.0.6)

python mnist_tf_dist.py \
  --job_name=ps \
  --task_index=0

Worker Node 1 (10.0.0.4)

python mnist_tf_dist.py \
  --job_name=worker \
  --task_index=0

Worker Node 2 (10.0.0.5)

python mnist_tf_dist.py \
  --job_name=worker \
  --task_index=1

Once the parameter server is started and all workers enter in MonitoredTrainingSession, the training (the loop of train_op) starts in multiple workers and each nodes respectively (independently) read data on corresponding node.
If you use the shared NFS storage or cloud storage, all nodes can read the same data respectively. (You don’t need to deploy data in multiple nodes respectively.)

Output (2 workers – 10.0.0.4 and 10.0.0.5)

As you can see in our source code, we’re setting checkpoint_dir in session. By this setting, the checkpoint data (tensor objects, variables, etc) are all saved in the directory and you can restore and restart your training later again. You can use gathered checkpoint data as you need. (retrain, test, etc)
For example, when you copy these checkpoint data in the specific local folder, you can test the model’s accuracy using the following local (non-distributed) python code. (Note that the meta is saved in worker node, but the index and data is saved in parameter node separately. You must gather these data in one location or use shared (mounted) checkpoint directory for running the following code.)

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import sys
import argparse
import math

import tensorflow as tf

FLAGS = None
batch_size = 100
                 
def main(_):
  #
  # define graph (to be restored !)
  #

  # define input
  plchd_image = tf.placeholder(
    dtype=tf.float32,
    shape=(batch_size, 784))
  plchd_label = tf.placeholder(
    dtype=tf.int32,
    shape=(batch_size))
    
  # define network and inference
  with tf.name_scope('hidden1'):
    weights = tf.Variable(
      tf.truncated_normal(
        [784, 128],
        stddev=1.0 / math.sqrt(float(784))),
      name='weights')
    biases = tf.Variable(
      tf.zeros([128]),
      name='biases')
    hidden1 = tf.nn.relu(tf.matmul(plchd_image, weights) + biases)
  with tf.name_scope('hidden2'):
    weights = tf.Variable(
      tf.truncated_normal(
        [128, 64],
        stddev=1.0 / math.sqrt(float(128))),
      name='weights')
    biases = tf.Variable(
      tf.zeros([64]),
      name='biases')
    hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)
  with tf.name_scope('softmax_linear'):
    weights = tf.Variable(
      tf.truncated_normal(
        [64, 10],
        stddev=1.0 / math.sqrt(float(64))),
    name='weights')
    biases = tf.Variable(
      tf.zeros([10]),
      name='biases')
    logits = tf.matmul(hidden2, weights) + biases

  #
  # Restore and Testing
  #

  ckpt = tf.train.get_checkpoint_state(FLAGS.out_dir)
  idex = int(ckpt.model_checkpoint_path.split('/')[-1].split('-')[-1])
  
  saver = tf.train.Saver()
  
  with tf.Session() as sess:
    # restore graph
    saver.restore(sess, ckpt.model_checkpoint_path)
    graph = tf.get_default_graph()

    # add to graph - read test data
    test_queue = tf.train.string_input_producer(
      [FLAGS.test_file],
      num_epochs = 1) # when data is over, it raises OutOfRange
    test_reader = tf.TFRecordReader()
    _, test_serialized_exam = test_reader.read(test_queue)
    test_exam = tf.parse_single_example(
      test_serialized_exam,
      features={
        'image_raw': tf.FixedLenFeature([], tf.string),
        'label': tf.FixedLenFeature([], tf.int64)
      })
    test_image = tf.decode_raw(test_exam['image_raw'], tf.uint8)
    test_image.set_shape([784])
    test_image = tf.cast(test_image, tf.float32) * (1. / 255)
    test_label = tf.cast(test_exam['label'], tf.int32)
    test_batch_image, test_batch_label = tf.train.batch(
      [test_image, test_label],
      batch_size=batch_size)

    # add to graph - test (evaluate) graph
    array_correct = tf.nn.in_top_k(logits, plchd_label, 1)
    test_op = tf.reduce_sum(tf.cast(array_correct, tf.int32))
    
    # run
    sess.run(tf.initialize_local_variables())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord) # for data batching    
    num_test = 0
    num_true = 0
    array_image, array_label = sess.run(
      [test_batch_image, test_batch_label])
    try:
      while True:
        feed_dict = {
          plchd_image: array_image,
          plchd_label: array_label
        }
        batch_num_true, array_image, array_label = sess.run(
          [test_op, test_batch_image, test_batch_label],
          feed_dict=feed_dict)
        num_true += batch_num_true
        num_test += batch_size
    except tf.errors.OutOfRangeError:
      print('Scoring done !')
    precision = float(num_true) / num_test
    print('Accuracy: %0.04f (Num of samples: %d)' %
      (precision, num_test))         

if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument(
    '--test_file',
    type=str,
    default='/home/demouser/test.tfrecords',
    help='File path for the test data.')
  parser.add_argument(
    '--out_dir',
    type=str,
    default='/home/demouser/out',
    help='Dir path for the model and checkpoint output.')
  FLAGS, unparsed = parser.parse_known_args()
  
  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

 

Simplify with new Estimator class

You can also use high-level TensorFlow API – tf.estimator.Estimator class – for running Distributed TensorFlow with good modularity. (Recent official samples are often written by this API.)
Let’s see the brief example.

Before you run your code, you must set the following TF_CONFIG environment. As you see later, your cluster configuration settings is not involved in your logic code by this environment settings.

I note that it’s different from the previous tf.train.ClusterSpec settings. As you can see, the chief node is explicitly specified and the worker doesn’t include the chief node.
Same as previous example, chief worker also run the training task, similar to other workers, with this settings.

Parameter Node (10.0.0.6)

export TF_CONFIG=$(cat <<<'
{
  "cluster": {
    "chief": ["10.0.0.4:2222"],
    "ps": ["10.0.0.6:2222"],
    "worker": ["10.0.0.5:2222"]
  },
  "task": {
    "index": 0,
    "type": "ps"
  }
}')

Worker Node 1 (10.0.0.4)

export TF_CONFIG=$(cat <<<'
{
  "cluster": {
    "chief": ["10.0.0.4:2222"],
    "ps": ["10.0.0.6:2222"],
    "worker": ["10.0.0.5:2222"]
  },
  "task": {
    "index": 0,
    "type": "chief"
  }
}')

Worker Node 2 (10.0.0.5)

export TF_CONFIG=$(cat <<<'
{
  "cluster": {
    "chief": ["10.0.0.4:2222"],
    "ps": ["10.0.0.6:2222"],
    "worker": ["10.0.0.5:2222"]
  },
  "task": {
    "index": 0,
    "type": "worker"
  }
}')

Now we change our previous code using tf.estimator.Estimator as follows. In this code, we statistically define our input function and model function using Estimator, and run training and evaluation with tf.estimator.train_and_evaluate() function. (Here we define custom Estimator, but you can also use pre-made Estimator, which inherits Estimator class, like LinearClassifier, DNNClassifier, etc.)

As you can see, this code has no difference between distributed and non-distributed.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
 
import sys
import argparse
import math
 
import tensorflow as tf

FLAGS = None
batch_size = 100

def _my_input_fn(filepath, num_epochs):
  # image - 784 (=28 x 28) elements of grey-scaled integer value [0, 1]
  # label - digit (0, 1, ..., 9)
  data_queue = tf.train.string_input_producer(
    [filepath],
    num_epochs = num_epochs) # data is repeated and it raises OutOfRange when data is over
  data_reader = tf.TFRecordReader()
  _, serialized_exam = data_reader.read(data_queue)
  data_exam = tf.parse_single_example(
    serialized_exam,
    features={
      'image_raw': tf.FixedLenFeature([], tf.string),
      'label': tf.FixedLenFeature([], tf.int64)
    })
  data_image = tf.decode_raw(data_exam['image_raw'], tf.uint8)
  data_image.set_shape([784])
  data_image = tf.cast(data_image, tf.float32) * (1. / 255)
  data_label = tf.cast(data_exam['label'], tf.int32)
  data_batch_image, data_batch_label = tf.train.batch(
    [data_image, data_label],
    batch_size=batch_size)
  return data_batch_image, data_batch_label

def _get_input_fn(filepath, num_epochs):
  return lambda: _my_input_fn(filepath, num_epochs)

def _my_model_fn(features, labels, mode):
  # with tf.device(...): # You can set device if using GPUs
  
  # define network and inference
  # (simple 2 fully connected hidden layer : 784->128->64->10)
  with tf.name_scope('hidden1'):
    weights = tf.Variable(
      tf.truncated_normal(
        [784, 128],
        stddev=1.0 / math.sqrt(float(784))),
      name='weights')
    biases = tf.Variable(
      tf.zeros([128]),
      name='biases')
    hidden1 = tf.nn.relu(tf.matmul(features, weights) + biases)
  with tf.name_scope('hidden2'):
    weights = tf.Variable(
      tf.truncated_normal(
        [128, 64],
        stddev=1.0 / math.sqrt(float(128))),
      name='weights')
    biases = tf.Variable(
      tf.zeros([64]),
      name='biases')
    hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)
  with tf.name_scope('softmax_linear'):
    weights = tf.Variable(
      tf.truncated_normal(
        [64, 10],
        stddev=1.0 / math.sqrt(float(64))),
    name='weights')
    biases = tf.Variable(
      tf.zeros([10]),
      name='biases')
    logits = tf.matmul(hidden2, weights) + biases

  # compute evaluation matrix
  label_indices = tf.cast(labels, tf.int32)
  predicted_indices = tf.argmax(input=logits, axis=1)   
  accuracy = tf.metrics.accuracy(label_indices, predicted_indices)
  tf.summary.scalar('accuracy', accuracy[1]) # output to TensorBoard

  # compute loss
  loss = tf.losses.sparse_softmax_cross_entropy(
    labels=labels,
    logits=logits)

  # define operations
  if mode == tf.estimator.ModeKeys.TRAIN:
    #global_step = tf.train.create_global_step()
    #global_step = tf.contrib.framework.get_or_create_global_step()
    global_step = tf.train.get_or_create_global_step()    
    optimizer = tf.train.GradientDescentOptimizer(
      learning_rate=0.07)
    train_op = optimizer.minimize(
      loss=loss,
      global_step=global_step)
    return tf.estimator.EstimatorSpec(
      mode,
      loss=loss,
      train_op=train_op)
  if mode == tf.estimator.ModeKeys.EVAL:
    eval_metric_ops = {
      'accuracy': accuracy
    }
    return tf.estimator.EstimatorSpec(
      mode,
      loss=loss,
      eval_metric_ops=eval_metric_ops)
  if mode == tf.estimator.ModeKeys.INFER:
    probabilities = tf.nn.softmax(logits, name='softmax_tensor')
    predictions = {
      'classes': predicted_indices,
      'probabilities': probabilities
    }
    export_outputs = {
      'prediction': tf.estimator.export.PredictOutput(predictions)
    }
    return tf.estimator.EstimatorSpec(
      mode,
      predictions=predictions,
      export_outputs=export_outputs)

def main(_):
  # read TF_CONFIG
  run_config = tf.contrib.learn.RunConfig()

  # define
  mnist_fullyconnected_classifier = tf.estimator.Estimator(
    model_fn=_my_model_fn,
    model_dir=FLAGS.out_dir,
    config=run_config)
  train_spec = tf.estimator.TrainSpec(
    input_fn=_get_input_fn(FLAGS.train_file, 2),
    max_steps=60000 * 2 / batch_size)
  eval_spec = tf.estimator.EvalSpec(
    input_fn=_get_input_fn(FLAGS.test_file, 1),
    steps=10000 * 1 / batch_size,
    start_delay_secs=0)
    
  # run !
  tf.estimator.train_and_evaluate(
    mnist_fullyconnected_classifier,
    train_spec,
    eval_spec
  )
             
if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument(
    '--train_file',
    type=str,
    default='/home/demouser/train.tfrecords',
    help='File path for the training data.')
  parser.add_argument(
    '--test_file',
    type=str,
    default='/home/demouser/test.tfrecords',
    help='File path for the test data.')
  parser.add_argument(
    '--out_dir',
    type=str,
    default='/home/demouser/out',
    help='Dir path for the model and checkpoint output.')
  FLAGS, unparsed = parser.parse_known_args()
 
  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

You cannot see the train loop (while clause for mini-batch) in this code, but you can use custom tf.train.SessionRunHook or tf.train.LoggingTensorHook for your custom code (ex : debug printing, etc) in the train loop.

Now you can run this code with the following command. As we saw earlier, we don’t have cluster specific settings in our programming code, then you can run the same command (without specific command options) in all nodes as follows.

Run on all nodes (10.0.0.4 – 10.0.0.6)

python mnist_tf_dist_estimator.py

The output result is written in model directory (model_dir in Estimator) and you can predict (infer) with predict() method in tf.estimator.Estimator instead of using session loop.

 

You can also see the summarised results in model directory with TensorBoard UI by running the TensorBoard server with the following command. Here logdir is your model directory. (Same as previous note in our example, the results are saved in the separate nodes and you must locate these results in one directory or use shared model directory.)

tensorboard --logdir=/home/demouser/out

Open TensorBoard UI (http://localhost:6006) using your web browser, and you can see the transition of accuracy, because here we collected the accuracy values in our sample code.

Note : When you use remote terminal (Windows, Mac, etc), please open the port on your server’s firewall settings (accessed by http://{your server name}:6006), or configure the SSH tunnel with your terminal client (accessed by http://localhost:6006). The following is the setting example in PuTTY client.

 

Azure Site Recovery 概要と、可能なこと・不可能なこと

ASR – その意義と概要

Public Cloud として Azure を選択するメリット (Advantage) の 1 つに、Microsoft が長年培ってきたオンプレミス・テクノロジーとの融合 (ハイブリッド技術) がある。クラウドへの本格参入ができない古い体質の業界や企業では、オンプレミスにある資産を Public Cloud に移すのは不可能かもしれないが、こうした安価でスケーラブルなクラウドが、いざというときのリカバリー サイトとして利用できるなら現実味があるだろう。
Azure Site Recovery (ASR) は、そうした「現実解」のニーズに対応した Public Cloud のサービスと言って良い。

Azure Site Recovery (ASR) を介することで、オンプレミスのマシン イメージを Azure 上に Replication し、いざオンプレミス環境に障害が発生した際には、このバックアップされたイメージ (.vhd) を元に、すぐに Azure 上にマシンを稼働して (Failover して)、VPN や閉域網などを経由して企業内から継続的にサービスを利用可能にする。
Primary の復旧が完了したら、Reverse Replicate といって、稼働中の (Azure 上の) リカバリー サイトを元の Primary サイトへ戻すための仕組み (要するに “Failback”) も提供する。


(From Microsoft Hybrid Cloud Blog)

この Azure Site Recovery (ASR) について、よくある疑問点 (利用時の初歩的な質問) を以下の 3 つの観点でまとめたので、これから導入を検討される方は参考にしていただきたい。
後述の通り、ネットワークまわりをどう設計するかは、充分な考察が必要だ。

ASR – よくある疑問 (まとめ)

対象のソース

Primary として選べるソースタイプとしては、以下が利用可能。
なお、現在、Windows Server 2016 の Physical Server はサポートされていないので注意してほしい。(いずれ対応されると思うが)

  • Physical Machine (On-Premise)
  • Hyper-V 上の Guest VM (On-Premise)
  • VMWare 上の VM (On-Premise)
  • Azure 上の VM

いまでは Azure 上の VM などもリカバリー可能だが、ASR は、当初はオンプレミス上の環境をクラウドにリカバリーして事業継続するためのサービスとして登場しており、ASR を中継することで、On-Premise to Azure だけでなく、Azure を中継した On-Premise to On-Premise のリカバリーに使うことも可能だ。

ネットワーク

ASR ではいろいろなアーキテクチャが考えられるが、ここでは話が混乱しないよう On-premises to Azure で考えてみよう。

上述の通り、Failover 後は、企業内からリカバリー先のマシン (今回の場合、Azure 上のマシン) を利用するため、Azure 上の Recovery Site と On-Premise との間はあらかじめ同じネットワークに入れることになる。(あらかじめ Azure の Site-to-Site VPN や ExpressRoute で Recovery 先と接続しておく。)
このように、ASR 利用時は、DR (Disaster Recovery) 時も踏まえた充分なトポロジー設計が必要になる。

例えば、Azure では、同一ネットワーク上に同一のアドレス空間を持たせることはできないので、Recovery された VM は一般に IP アドレスが変更される。このため、リカバリー対象のマシンで DNS サーバーとの接続が必要になり、Azure の Virtual Network (VNet) 上の DNS Server としてオンプレ側の DNS Server を指定したり、あるいは Failover の際に DNS Server も Replication するなど方針を決める必要があるだろう。(Failover 先の Azure VM で IP Address を同じにすることも不可能ではないが、ASR がこうした仕組みを備えているわけでないため、自身でこうした Subnet Failover (転換処理) を実装することになる。)

ASR は、上述の通りイメージ ベースでの Recovery になるので、すべてを ASR に頼るのではなく、データベース (RDB), AD (Active Directory) などでは、下記のようなコンポーネントが持つ Recovery テクノロジーを組み合わせることも検討する。(Multi-Tier アプリケーションのように複数台のマシンで状態の一貫性が必要になる場合には、Multi VM Consistency という機能を有効化することで一貫性を維持できる。)

また、オンプレミスから Azure への Replication (delta による差分反映など) では一般にインターネット (Azure に向けての inbound の https ポート) が使われるが、以下も考慮しておこう。

  • トラフィック送信にインターネットを使わず、Azure ExpressRoute Public Peering を使うよう構成することも可能
  • ただし、現在、トラフィック送信に Site-to-Site VPN を使うことは不可能

一方、Failback では、一般にインターネット回線は利用できないので注意してほしい。Failback の場合は、Azure 側から On-Premise への https による inbound アクセスがおこなわれるため、Azure Network (Azure VNet) からの VPN か ExpressRoute を介して On-Premise につなぐことになるためだ。(Failback も計画に含めるなら、戻す際のネットワーク構成も考慮する必要がある。)

ネットワーク帯域については、ASR Capacity Planner が使用できる。(この ASR Capacity Planner は、Azure 上の ASR 設定の途中でリンクが表示され、ダウンロードを促すようになっている。)

運用手順

ASR では Failover は自動でおこなわれないので、運用・監視は自身でおこない (または、他製品を組み合わせて)、UI (Azure Portal), スクリプト, SDK などから能動的に Failover を呼び出す。(Failover を呼び出すと、Recovery 先でリカバリー イメージを元に VM が起動する。)
なお、OMS と組み合わせた場合は、既に構成のための機能が OMS に Built-in されている。

下記の 3 種類の Failover が提供されていて、いわゆる障害発生時に呼び出す「Unplanned Failover」は、他の 2 つと異なり Failover 時に最新の Delta Replication は作成しない。(その時点でとられている最新のイメージを元に Failover する。)

  • Test Failover
  • Planned Failover
  • Unplanned Failover

Failover 後の処理は、Azure Automation Runbook を使うことができる。VNet 等の Azure Resource の構成や、VM 内でのセットアップスクリプトの実行などは、Azure Automation Runbook を組み合わせて構成する。

Microsoft Azure : Add Azure Automation runbooks to recovery plans
https://docs.microsoft.com/en-us/azure/site-recovery/site-recovery-runbook-automation

 

ここでは ASR 導入の際の初歩的な疑問を中心にまとめたが、細かなトポロジーや FAQ は以下が参考になるので是非参照してほしい。

Azure Site Recovery support matrix (On-premises to Azure)
https://docs.microsoft.com/en-us/azure/site-recovery/site-recovery-support-matrix-to-azure

Azure Site Recovery FAQ
https://docs.microsoft.com/en-us/azure/site-recovery/site-recovery-faq

 

Azure Machine Learning Services ステップ・バイ・ステップ

ここでは、はじめて使う人のために、新しい Azure Machine Learning Services が何を解決し、どのような使い方であるか CLI コマンドにそって見ていく。

新しい Azure Machine Learning Services は、これまでの Azure Machine Learning Studio とは違うまったく新しい概念だ。
従来の Azure Machine Learning Studio を使った人はわかると思うが、GUI で構築できるのは良いが、細かなモデルや学習フローを実現しようとすると python や R が持っている膨大なパッケージに勝るものはなく、Studio で構築したフローから無理やり python や R の外部コードを呼び出して作るといった本末転倒な使い方 (何のために Studio を使っているのかわからないケース) も多々と思う。
新しい Azure Machine Learning Services は、こうした課題を解決すべく、コード (Python, 今後は R も) に基づくオープンなライフサイクルを支援する考え方に刷新されている。(Microsoft Ignite のセッションを見てもらうと、実際そうしたコンセプトが語られている。)

データの準備 (整形など) をおこない、学習をして (作った学習用のコードはローカル環境だけなく、Azure 上の GPU インスタンスや Apache Spark 上などで短時間に学習させ)、出来上がったモデルを従来の Azure Machine Learning Studio のように REST のエンドポイントとして展開するといった、分析・学習 (Analyze) から本番展開 (Operationalize) までのライフサイクル全般をサポートするフレームワークであり、ここでは話をむずかしくしないよう、ローカルで学習したモデルを Web Service 化するという基本の流れに沿って解説したい。

新しい Azure Machine Learning Services は基本的にコマンドから利用するため (実際、Azure Portal からは一部の機能しか使えない)、このコマンドをシナリオに沿って順に見くことで、新しい Azure Machine Learning がどのようなものか確認できるだろう。

なお、このあと見ていくように、ざっくりと Experimentation と Model Management という独立した 2 つの作業にわかれているので、まずは頭の片隅におぼえながら見ていってほしい。

準備

ここで紹介する az ml コマンドを使うには、現時点では、後述の Azure Machine Learning Workbench をインストールするか、Azure の Data Science Virtual Machine (DSVM) を使用する必要がある。
Azure CLI のインストールや azure-cli-ml モジュールの追加インストールでは使えないようなので注意してほしい。

CLI コマンドのヘルプを見るには -h オプションを使用する。
この投稿では、コマンドを難解にしない目的から必要最小限の引数しか指定しないが、細かな設定については下記のように確認してほしい。

# show what resource can be used
az ml -h
# show what command can be used for workspace
az ml workspace -h
# show what option can be used for workspace creation
az ml workspace create -h

また、以降を進める前に、CLI から、あらかじめ Azure にログインをしておこう。(az login を実行する。)

Experimentation Account と Workspace の作成

新しい Azure ML のアカウントには Experimentation Account と Model Management Account がある。
モデル構築時は Experimentation Account を作成し、ここに Workspace を作成して、Workspace 内で個々の Project を作成する。

下記は、CLI コマンドを使って、リソースグループ myRG01 に Experimentation Account (名前 exacc01) と Workspace (名前 testws01) を作成している。(なお、Experimentation Account と Workspace は Azure Portal からも作成できる。)

# Create experimentation account
az ml account experimentation create -n "exacc01" -g "myRG01"
# Create workspace
az ml workspace create -a "exacc01" -n "testws01" -g "myRG01"

Training フェーズ (モデルの作成)

Python のコード作成、トレーニング、モデル作成はオープンな環境 (PyCharm, Jupyter, DataBricks など) を使っても良く、普段使い慣れたエディターでコード編集をして下記の Experimentation コマンドと併用したり、あるいは Experimentation はいっさい使わずにモデルを構築し、それを後述する Model Management で展開するような使い方も充分ありだ。

ただ、上記で作成した Azure Machine Learning Experimentation Account で az ml experimentation submit コマンドを使って Training 用のコードを展開 (および実行) することで、作成したコード (Python または pyspark) のさまざまな条件下でのイテレート (繰り返し実行) とその実行結果 (作成されたモデルとその精度など) を管理できるようになっており、例えば、コードを作成してまずはローカル環境で実行し、Epoch や Learning Rate を変えて実行時間や Accuracy、Regularization Rate を評価し (それを何度も繰り返し)、さらに GPU 環境や Spark Cluster などコンピューティング環境を変えて評価して最終的に実運用に適した条件を決定していく過程を、Cloud 上のサービス (Azure Machine Learning Experimentation) を通して管理できるようになっている。(下図参照)

この Experimentation は、前述のコマンドライン (az ml experimentation submit コマンド) 以外に Azure Machine Learning Workbench (Windows 版、Mac 版) や Visual Studio Code Tools for AI といったデスクトップ ツール (IDE) からも利用できるようになっている。(これらを使うには、もちろん、あらかじめ前述の Experimentation Account を作っておく必要がある。これらの IDE を起動する際に Experimentation Account を確認する。)

aml_workbench

ここではこうした IDE の使い方は説明しないので “チュートリアル : Classify Iris” を参照してほしいが、主に下記のようなステップを支援する。

  • 上記で作成した Workspace に Project を新規作成
  • Data Set を準備・整形 (dataprep)
  • Python で Training のコードを作成 (学習済モデルの作成)
  • 実行と結果の確認

(補足 : Project は単なるローカル環境のフォルダではなく、Azure 上のリソースとして作成される。カレントフォルダーの Project が Active Project として認識され、例えば、Active Project 上で az ml account experimentation show と入力すると、その Project の Experimentation が表示される。)

なお、Azure Machine Learning Workbench にはいくつか魅力的な機能が備わっているので以下に軽く紹介しておこう。私のお気に入りの機能は以下だ。

  • Data の準備では、元のデータを変えることなく、Data cleaning のための細かなステップをエディタでビジュアルに定義できる。(下図の「step」参照。Power BI などで使われていた手法だ。) メトリクスごとのデータ プロファイルをみながら、不要なステップを削ったり、入れ替えたりなどして編集できる。
  • PROSE と呼ばれる AI による Data Wrangling テクノロジにより、ちょっとしたデータ整形なら、Excel の Autofill のように賢く自動解釈する。例えば、ログのようなつながった文字を分解する際も、単なる文字数や文字による一致、正規表現マッチングなどではなく、文のコンテキストを解釈して意味的に分解できる。(“Diving deep into what’s new with Azure Machine Learning” 参照)
    あまりに複雑なデータ加工の場合は、最終的にスクリプトを書くことも可能。
  • 上図のように Experimentation で必要になる config などの構成をサポートする。ローカルマシン以外に、DSVM や HDInsight Spark Cluster などを容易に使えるので、学習の際、IDE 上から、Azure 上の GPU 等の高尚なコンピューティング環境と (まるでローカル環境で構築しているかのように) 容易に連携できる。(“Configure compute environment” 参照)

Workbench 上で整形したデータ (正確には、上述の通り「データ整形の手順」) は .dprep の拡張子を持つファイルとして保存され、下記の通り Pandas や Spark DataFrame など必要な形式に変換してロードできる。

from azureml.dataprep.package import run

iris = run('iris.dprep', dataflow_idx=0, spark=False)

Workbench には、他にも Jupyter Notebook とのインテグレーション、ROC 曲線などの IDE 上での可視化など、多くのサポート機能を備えている。
また、「コード エディタは自分がよく使うものにしたい」といった場合は、Workbench のコード編集として PyCharm や Visual Studio Code を設定することで連携 (コード編集のみ外部エディタを使用) も可能だ。

Python のコードを作成して学習が完了したら、学習済モデルを Pickle 形式のファイル (.pkl) として保存し (Workbench の場合、[Run] ボタンを押す)、このモデルをこのあとの作業 (Scoring フェーズ) で使用する。
なお、この際も、Azure Machine Learning Workbench を使うと、作成されたモデルの [Promote] をおこなうことで (下図)、このモデルの link が取得され、このあとの Model Management で容易に扱えるようになっている。

2018/05 追記 : なお、Build 2018 のタイミングで発表された Azure ML SDK for Python では、通常の notebook における visualize など、Workbench 以外でも開発生産性を高める仕組みがいくつか提供された。(方向性変更 ?)

Scoring (Prediction) フェーズ

学習済モデル ファイル (今回の場合 .pkl と仮定する) ができたら、これを元に Prediction (または Scoring) のコードを作成して、これを Web Service として発行する。

以下は、付属している Iris の Prediction のサンプルだ。(あやめの種類を区別する Logistic Regression のモデルを扱っている。)
この中の以下の処理に注目してほしい。

  • カレント フォルダから、前述で学習したモデル (下記の model.pkl) をロードしている。
  • 下記の通り、このあとの Web Service 化にそなえて、init() (初期化の処理), run() (Web Service 本体の処理) を定義し、これらを main から呼び出してデバッグする。
  • Azure Machine Learning が提供する generate_schema() 関数で、Input (入力引数), Output などの Web Service の仕様を json で出力している。
# Import data collection library. Only supported for docker mode.
# Functionality will be ignored when package isn't found
try:
  from azureml.datacollector import ModelDataCollector
except ImportError:
  print("Data collection is currently only supported in docker mode. May be disabled for local mode.")
  # Mocking out model data collector functionality
  class ModelDataCollector(object):
    def nop(*args, **kw): pass
    def __getattr__(self, _): return self.nop
    def __init__(self, *args, **kw): return None
  pass

import os

# Prepare the web service definition by authoring
# init() and run() functions. Test the functions
# before deploying the web service.
def init():
  global inputs_dc, prediction_dc
  from sklearn.externals import joblib

  # load the model file
  global model
  model = joblib.load('model.pkl')

  inputs_dc = ModelDataCollector("model.pkl", identifier="inputs")
  prediction_dc = ModelDataCollector("model.pkl", identifier="prediction")

def run(input_df):
  import json

  # append 40 random features just like the training script does it.
  import numpy as np
  n = 40
  random_state = np.random.RandomState(0)
  n_samples, n_features = input_df.shape
  input_df = np.c_[input_df, random_state.randn(n_samples, n)]
  inputs_dc.collect(input_df)

  pred = model.predict(input_df)
  prediction_dc.collect(pred)
  return json.dumps(str(pred[0]))

def main():
  from azureml.api.schema.dataTypes import DataTypes
  from azureml.api.schema.sampleDefinition import SampleDefinition
  from azureml.api.realtime.services import generate_schema
  import pandas

  df = pandas.DataFrame(data=[[3.0, 3.6, 1.3, 0.25]], columns=['sepal length', 'sepal width','petal length','petal width'])

  # Turn on data collection debug mode to view output in stdout
  os.environ["AML_MODEL_DC_DEBUG"] = 'true'

  # Test the output of the functions
  init()
  input1 = pandas.DataFrame([[3.0, 3.6, 1.3, 0.25]])
  print("Result: " + run(input1))

  inputs = {"input_df": SampleDefinition(DataTypes.PANDAS, df)}

  #Genereate the schema
  generate_schema(run_func=run, inputs=inputs, filepath='./outputs/service_schema.json')
  print("Schema generated")

if __name__ == "__main__":
  main()

ここまでで、学習済モデル ファイル (上記の model.pkl)、Web Service の仕様を定義した Json ファイル (上記の service_schema.json)、そしてこの Scoring 用の Python コード (score_iris.py とする) が準備できた。

Computing Environment の作成と設定

以上で必要なファイルは整ったので、Web Service 化して展開 (deploy) する。
現実の開発ではさまざまなバージョンのモデルを展開してリバイズ (見直し) をおこなうなど、Model や環境の管理 (バージョン管理など) が欠かせないが、ここからは Azure ML の Model Management を使ってこうした作業を効率化する。なお、必要なファイルさえ揃っていれば、ここまで使ってきた Experimentation Account は必要ない。(Experimentation Account と Model Management Account は完全に分離されている。)

まず、前提知識をいくつか記載する。

新しい Azure Machine Learning を使った Web Service 化では、展開の際に AKS (Azure Container Service の Kubernetes cluster) と ACR (Azure Container Registry) が使われる。このため、あらかじめ、コマンド az provider register --namespace Microsoft.ContainerRegistry によって Azure Container Registry プロバイダーを登録しておく必要がある。(既に登録済の場合は OK)

また、Web Service 化は、ここで紹介する方法以外に、単一のコマンドですべて生成することもできるが、ここでは構成などを理解できるように、あえて複数コマンドでわけて展開をおこなう。(構造が理解できてきたら、単一コマンドで省力化しても良いだろう。)

まず、展開の前に、展開環境である Computing Environment を下記の通り新規作成する。
ここでは、testenv01 という名前の Computing Environment を East US 2 に作成する。(Kubernetes クラスタの名前は「testenv01」になる。) 例えば、環境を Staging 用、Production 用の複数作成しておき、staging 環境のテストが完了したら同じ image を Production に展開するといった使い方が可能だ。

下記の -c (--cluster) オプションで Azure Container Service (AKS) が使用され 、オプションを指定しない場合は local の docker container に展開される。(CLI コマンドを使って local と AKS を切り替えることも可能。) クラスタ構成の場合、既定で Kubernetes 1 pod 内に 2 台の agent を構成するが、agent の台数も環境ごとにオプションで指定できる。

az ml env setup -n testenv01 -l eastus2 -c

上記のように -g (--resource-group) オプションを省略すると、このコマンドによって testenv01rg と testenv01rg-azureml-cxxxx の 2 つのリソースグループが新規作成され、後者のリソースグループに Azure Container Service の各ノード (VM) が展開される。(xxxx は数字。今回は testenv01rg-azureml-c7294 だった。)
これらのリソースグループはこのあとのコマンドで使うのでおぼえておく。

下記のコマンドで Computing Environment がすべて表示されるので、作成中の環境の provisioning state が Succeeded になるまで待つ。(特に新規に Azure Container Service を作成する場合は、最初のノード作成に時間がかかる。)

az ml env list

Succeeded になったら、下記コマンドを実行して、この作成した環境を Active Compute Environment として設定する。

az ml env set -n testenv01 -g testenv01rg

なお、作成された Compute Environment の情報は Azure 上に保存されるが、この Active Compute Environment はその作業環境上のみ (使用しているクライアント上のみ) の設定なので注意。
以下のコマンドで Active (Current) の環境を確認できる。

az ml env show

Model Management Account の作成と設定

次に、下記コマンドで Model Management Account を新規作成する。(このあと登録するモデルは、ここでバージョン等が管理される。)

az ml account modelmanagement create -l eastus2 -n testmg01 -g myRG01

つぎに、下記コマンドによって、上記で作成した Model Management Account  を使用中の作業環境 (クライアント) における Active な Model Management Account として設定する。

az ml account modelmanagement set -n testmg01 -g myRG01

Active な Model Management Account は下記コマンドで確認できる。

az ml account modelmanagement show

Web Service としての展開 (Deploy)

下記コマンドで、前述で作成した学習済モデル ファイル (model.pkl) を登録する。(何度も登録するとバージョンが 2, 3 とあがっていくので注意。)
この際、登録されたモデルの ID (model id) が出力されるので、これをコピーしておく。(以降では model id を 08ceeb5842eb4fa4a94d47533d7e3aab とする。)

az ml model register -m model.pkl -n model.pkl

つぎに、下記コマンドで、上記で準備した Web Service 用の Scoring のソース (score_iris.py) と仕様 (service_schema.json) を登録して Manifest を作成する。ここで、-i (--model-id) オプションは、上記で登録したモデルの ID である。つまり、使用するモデルは都度 変更できる。
(なお、何度も登録すると Manifest のバージョンがあがっていくので注意。)

この Manifest 作成時に manifest id が出力されるので、これをコピーしておく。(以降では manifest id を 789c3c18-b385-423f-b88c-429a2973b99b とする。)

az ml manifest create -n testmanifest1110 -f score_iris.py -r python -i 08ceeb5842eb4fa4a94d47533d7e3aab -s service_schema.json

この Manifest を使って、下記の通り Image を新規作成して登録する。Image の実体は、ACR (Azure Container Registry) に登録される。(この ACR は、上記の az ml env setup コマンドで新規作成されたリソースグループの中にある。)
作成時、イメージの Id が出力されるので、これをコピーしておく。(以降では image id を a61ba643-884d-4181-a7cb-b1b9c6c48b3e とする。)

az ml image create -n testimage1110 --manifest-id 789c3c18-b385-423f-b88c-429a2973b99b

あとは、この登録されたイメージを使って、下記の通り Web Service を展開して起動する。
今回は使用しないが、追加の Package が必要な場合には -c オプションで Conda Dependencies File (.yml) を指定できる。
なお、下記で “realtime” と指定しているが、今後、”batch” (Azure Batch 展開) も可能になるようだ。BUILD 2018 のタイミングで、Azure Batch AI, Azure Container Instance (ACI), IoT Edge への展開がサポートされた。(2018/05 更新)

az ml service create realtime --image-id a61ba643-884d-4181-a7cb-b1b9c6c48b3e -n testapp1110 --collect-model-data true

下記の通り出力されるので、出力された service id (testapp1110.testenv01-90e88070.eastus2) をおぼえておく。(このサービスを利用する際に使用する。)

Creating service............................Done
Service ID: testapp1110.testenv01-90e88070.eastus2
Usage for cmd: az ml service run realtime -i testapp1110.testenv01-90e88070.eastus2 -d "{\"input_df\": [{\"petal length\": 1.3, \"petal width\": 0.25, \"sepal width\": 3.6, \"sepal length\": 3.0}]}"
Usage for powershell: az ml service run realtime -i testapp1110.testenv01-90e88070.eastus2 --% -d "{\"input_df\": [{\"petal length\": 1.3, \"petal width\": 0.25, \"sepal width\": 3.6, \"sepal length\": 3.0}]}"
Additional usage information: 'az ml service usage realtime -i testapp1110.testenv01-90e88070.eastus2'

今回は Azure CLI のみで image を展開しているが、普通に docker コマンドを使っても良いし、展開先も IoT Edge など docker をサポートする他の環境に展開できる。こうした点もブラックボックス化されておらず、あくまでオープンなプラットフォームをベースに作業を簡素化しているだけだ。
また、image を変更 (新しい image を登録) した際には、CLI コマンドを使ってゼロダウンタイムで既存の起動中のサービスを新しい image に変更できる。

Web Service の利用

展開された Web Service (REST) の URL, Port, エンベロープなどは、下記コマンドで確認できる。

az ml service usage realtime -i testapp1110.testenv01-90e88070.eastus2
Scoring URL:
  http://13.68.115.32:80/api/v1/service/testapp1110/score

Headers:
  Content-Type: application/json
  Authorization: Bearer
    ( can be found by running 'az ml service keys realtime -i testapp1110.testenv01-90e88070.eastus2')

Swagger URL:
  http://13.68.115.32:80/api/v1/service/testapp1110/swagger.json

Sample CLI command:
  Usage for cmd: az ml service run realtime -i testapp1110.testenv01-90e88070.eastus2 -d "{\"input_df\": [{\"petal width\": 0.25, \"sepal width\": 3.6, \"sepal length\": 3.0, \"petal length\": 1.3}]}"
  Usage for powershell: az ml service run realtime -i testapp1110.testenv01-90e88070.eastus2 --% -d "{\"input_df\": [{\"petal width\": 0.25, \"sepal width\": 3.6, \"sepal length\": 3.0, \"petal length\": 1.3}]}"

Sample CURL call:
  curl -X POST -H "Content-Type:application/json" -H "Authorization:Bearer " --data "{\"input_df\": [{\"petal width\": 0.25, \"sepal width\": 3.6, \"sepal length\": 3.0, \"petal length\": 1.3}]}" http://13.68.115.32:80/api/v1/service/testapp1110/score

Get debug logs by calling:
  az ml service logs realtime -i testapp1110.testenv01-90e88070.eastus2

Get STDOUT/STDERR or Request/Response logs in App Insights:
  https://analytics.applicationinsights.io/subscriptions/b3ae1c15-4fef-4362-8c3a-5d804cdeb18d/resourcegroups/testenv01rg-azureml-c7294/components/mlcrpaib1917249512d#/discover/home?apptype=Other%20(preview)

ここで認証用に使用する key は、下記コマンドで出力できる。

az ml service keys realtime -i testapp1110.testenv01-90e88070.eastus2
PrimaryKey: 6d452f09a4xxxxxxxxxxxxxxxxxxxxxx
SecondaryKey: ec7453b683xxxxxxxxxxxxxxxxxxxxxx

このため、Fiddler などを使って下記の通り HTTP Request をおこなう。
今回は、あやめの情報 (ガクの幅など) を渡すことで、そのあやめの種類をモデルから推測して返している。

POST http://13.68.115.32:80/api/v1/service/testapp1110/score
Content-Type: application/json
Authorization: Bearer 6d452f09a4xxxxxxxxxxxxxxxxxxxxxx

{
  "input_df": [
    {
      "petal width": 0.25,
      "sepal width": 3.6,
      "sepal length": 3.0,
      "petal length": 1.3
    }
  ]
}
HTTP/1.1 200 OK
content-type: application/json

"\"Iris-setosa\""

Scaling

前述の通り Kubernetes の Cluster が使用されるため、以降は kubectl コマンドで管理できる。
しかし、よく使う Agent Node (Virtual Machine) や Replica の一部の管理タスクは Azure ML のコマンドも使えるので最後に紹介しておこう。

例えば、下記は、Azure ML のコマンドを使って Agent Node (Agent の Virtual Machine) を 3 台に変更する。

az acs scale -g testenv01rg-azureml-c7294 -n testenv01 --new-agent-count 3

下記は、Autoscale の設定を解除して、この Agent 上に動いている Replica を 3 つに設定している。(下記の通り kubectl コマンドを使わずに設定できる。)

az ml service update realtime -i testapp1110.testenv01-90e88070.eastus2 --autoscale-enabled false
az ml service update realtime -i testapp1110.testenv01-90e88070.eastus2 -z 3

 

ご覧のように、新しい Azure Machine Learning を使うと Python で構築した Score (Test) の処理をそのまま Web Service として発行できるが、同じようなことが可能な Server Platform (オンプレミスの製品) として Machine Learning Server というものが使えるのをご存じだろうか。旧 R Server と呼んでいたもので、現在は Python も使えるようになっている。
また、現在 Model Management のみ R もサポートされているらしく、今後、R の Experimentation やドキュメントも出てくることだろう。
つまり、これらは Python と R で同じ Experience を提供し、Machine Learning Team Blog で書いているように、これらのプラットフォーム (クラウド版の Azure ML とオンプレの ML Server) は今後 統一化されていく予定らしい。

 

参考にした情報 :

Azure Machine Learning Tutorial – Classify Iris :
https://docs.microsoft.com/en-gb/azure/machine-learning/preview/tutorial-classifying-iris-part-1

Azure Machine Learning Reference :
https://docs.microsoft.com/en-us/azure/machine-learning/desktop-workbench/experimentation-service-configuration-reference

 

ngrok を ASP.NET で使う (Debug と Capture)

昨今の WebHook や BOT 開発のように、インターネット上で Inbound 呼び出しされる Web アプリの Debug や HTTP Protocol (Raw) Capture をする場合に欠かせない ngrok を ASP.NET と組み合わせて使う場合に必要な設定と動作確認方法を簡単に紹介する。

ASP.NET プロジェクトの設定

ngrok を使うには、まず、ASP.NET のプロジェクトに以下の設定をおこなっておく。

まず、Visual Studio のプロジェクトのプロパティで、[すべてのユーザーにサーバー設定を適用] (Apply server settings to all users) が選択されていることを確認する。(既定では選択されているはず)

apply_settings

開いている Visual Studio ソリューションのフォルダーの .vs\config にある applicationhost.config (IIS Express が使用) を開き、下記の binding を追加しておく。

<configuration>
  . . .

  <system.applicationHost>
    . . .

    <sites>
      . . .

      <site name="WebApplication1" id="2">
        <application path="/" applicationPool="Clr4IntegratedAppPool">
          <virtualDirectory path="/" physicalPath="C:\Demo\WebApplication1\WebApplication1" />
        </application>
        <bindings>
          <binding protocol="http" bindingInformation="*:60587:localhost" />
          <binding protocol="http" bindingInformation="*:60587:*" />
        </bindings>
      </site>
      . . .

ngrok の実行と動作確認

上記の ASP.NET Web アプリケーションを localhost で Debug 実行したら、ngrok をダウンロード (および展開) して、コマンド プロンプトから下記の通り実行する。(60587 は、この ASP.NET Web アプリケーションが実行されている IIS Express でのポート番号)
特に ASP.NET では、Host ヘッダーをちゃんと指定しないと Bad Request (400) になるので、下記の通り指定する。

ngrok http 60587 --host-header="localhost:60587"

上記の実行結果として下記の通り表示されるが、これは、インターネット上の http://ae8b8019.ngrok.io のアドレスが Tunnel されて http://localhost:60587 に行くよ、という意味。

Tunnel Status                 online
Version                       2.0.25/2.1.1
Region                        United States (us)
Web Interface                 http://127.0.0.1:4040
Forwarding                    http://ae8b8019.ngrok.io -> localhost:60587
Forwarding                    https://ae8b8019.ngrok.io -> localhost:60587

Connections                   ttl     opn     rt1     rt5     p50     p90
                              0       0       0.00    0.00    0.00    0.00

そこで、あとは、ブレークポイントを置いて、http://ae8b8019.ngrok.io に接続すると、Tunnel されてきて、ちゃんとブレークポイントで止まる。

HTTP Capture

Web ブラウザを使って上記出力の http://127.0.0.1:4040 (http://localhost:4040) を表示すると、下図の通り HTTP Protocol (Raw) を Capture できる。
WebHook や BOT で、どのような request が来ているか簡単に確認可能だ。

HTTP_Capture

BOT 開発のように双方向連携をおこなうものは、この ngrok と Fiddler を一緒に使えば、inbound と outbound の双方がキャプチャーできるので、最強のデバッグ環境を手に入れることができる。

 

Azure VM (Ubuntu) の MySQL 環境構築

Azure Virtual Machine (Azure 仮想マシン) の Ubuntu (LINUX) を使って、MySQL 環境 (リモート接続環境、データベース作成まで) を構築する手順のメモ MySQL のインストール Azure Virtual Machine を構築したら、まずは、MySQL をインストール。 SSH, Putty などのターミナルから (リモートで) ログインをおこない、以下のコマンドを入力すれば OK。

sudo -s
sudo apt-get install mysql-server

下記の通り入力 (MySQL に root でログイン) して、正しくインストールされたかどうか確認。

// login to mysql
mysql -u root -p
// check status
mysql> status

MySQL の停止と起動は下記の通り入力する。

sudo service mysql stop
sudo service mysql start
(または sudo service mysql restart)

Database の作成 つぎに、MySQL にログインして Database を作成する。

// login to mysql
mysql -u root -p
// create database
mysql> create database testdb character set utf8;

作成されたデータベースを確認するには以下の通り。

mysql> show databases;

Remote 接続の設定 つぎに、MySQL Workbench、コマンドユーティリティ (mysql コマンド等) からリモートで管理できるように、リモート接続の設定をおこなう。 まず、作成した Azure Virtual Machine で [Settings] – [Endpoints] を選択し、MySQL の Port 番号 3306 の Endpoint を追加する。(この Endpoint を開けておく。) set_endpoints つぎに、MySQL 側でリモート接続ができるように、vi /etc/mysql/my.cnf  で下記の箇所を編集 (変更) する。 Host (Server) の IP Address は、ifconfig で確認できる。(inet addr と書かれている箇所) 変更前

bind-address = 127.0.0.1

変更後

bind-address = [Host の IP Address]

MySQL にログインし、今回は root で Remote Access できるように、下記の通り権限付与しておきます。 今回は、root に、すべてのホストからのすべての権限を付与しています。

grant all privileges on testdb.* to root@"%" identified by
  '[root password]' with grant option;

下記を実行すると、mysql.user テーブルに root@% が登録されているのがわかります。

select user, host from mysql.user;

+-------+-----------------+
| user  | host            |
+-------+-----------------+
| root  | %               |
| root  | 127.0.0.1       |
| root  | ::1             |
| root  | localhost       |
| ...   | ...             |
|       |                 |
+-------+-----------------+

以上で、Remote の環境から、MySQL のコマンド ユーティリティ (mysql コマンド等) や MySQL Workbench を使って接続できるようになります。 例えば、下記の通り入力して、あるデータベースから、別のデータベースに、オブジェクト (テーブル、データなど) を移行できます。

cd [mysql dir]/bin
mysqldump -u [username] -p[password] -h[hostname] [database name]
  --add-drop-table > [save file path]
mysql -u[username] -p[password] -h[hostname] --default-character-set=utf8
  [database name] < [read file path]
cd C:\Program Files\MySQL\MySQL Server 5.6\bin
mysqldump -u root -pP@ssw0rd -hmyserver01.cloudapp.net testdb01
  --add-drop-table > C:\tmp\db.sql
mysql -uroot -pP@ssw0rd -hmyserver02.cloudapp.net
  --default-character-set=utf8 testdb02 < C:\tmp\db.sql

Bing Maps の Geocode、逆 Geocode のプログラミング

Bing Maps の Geocode (住所から地図を取得)、Reverse-Geocode (地図から住所を取得) の JavaScript プログラミング (サンプル コード) をメモしておく。

例えば、下記の UI を持つ Web ページ (html) と仮定する。

Bing_sample

Geocoding のプログラミング

まず、上図のテキスト ボックスに日本の住所 (文字列) を入力して Pin アイコン (左上) を押すと、 Bing Maps の地図を更新するサンプル コードは、下記の通り。(説明は、コメントを見てほしい。) なお、あらかじめ、Bing Maps のサイトからアクセス キーを取得し、 下記の <access key> に設定しておく。

<!DOCTYPE html>
<html>
<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=Edge"/>
  <script src="http://ajax.aspnetcdn.com/ajax/jquery/jquery-1.9.0.min.js"></script>
  <script src="http://dev.virtualearth.net/mapcontrol/mapcontrol.ashx?
    v=6.3&mkt=ja-jp"></script>
  <script>
  $(document).ready(function () {
    map = new VEMap('map');
    map.LoadMap();
    map.SetCenterAndZoom(new VELatLong(35.70, 139.7), 13);

    //
    // GeoCoding sample
    //
    $('#checkloc').click(function () {
      var addrval = $('#addresstxt').val();
      var requestUri =
        'http://dev.virtualearth.net/REST/v1/Locations?'
        + 'output=json&countryRegion=JP&addressLine='
        + encodeURI(addrval) + '&key=<access key>&c=ja-jp';
      $.ajax({
        url: requestUri,
        dataType: 'jsonp',
        jsonp: 'jsonp',
        beforeSend: function (xhr) {
          $('#msgline').html('changing map ...');
        },
        success: function (data, status) {
          $('#msgline').html('got location !');
          if (data &&
            data.resourceSets &&
            data.resourceSets.length > 0 &&
            data.resourceSets[0].resources &&
            data.resourceSets[0].resources.length > 0) {

            // 返された bbox (位置情報) の内容から地図を更新
            var bbox = data.resourceSets[0].resources[0].bbox;
            map.SetMapView(new VELatLongRectangle(
              new VELatLong(bbox[0], bbox[1]),
              new VELatLong(bbox[2], bbox[3])));

            // Bing Map にプッシュピンを設定
            var lat = new VELatLong(
              data.resourceSets[0].resources[0].point.coordinates[0],
              data.resourceSets[0].resources[0].point.coordinates[1]);
            var pin = new VEShape(VEShapeType.Pushpin, lat);
            map.AddShape(pin);
          }
        },
        error: function () {
          $('#msgline').html('error.');
        }
      }); // ajax
    }); // click

    $('#msgline').html('ready !');

  }); // ready
  </script>
</head>
<body>
  <p>
    <div>
      <a href="#" id="checkloc">
      <img src="pin.png" width="50" border="0" />
      </a>
    </div>
    <div>
      <input type="text" id="addresstxt" style="width:400px" />
    </div>
  </p>
  <div id="map" style="position:relative;width:400px;height:300px;"> 
  </div>
  <p id="msgline" style="background-color:gray;">initializing ...</p>
</body>
</html>

また、都道府県名や市区町村名など、入力欄をわける場合があるが (下図)、この場合には、下記の通りプログラミングすると良い。

Bing_sample2

<script>
 $(document).ready(function () {
  ... skip code ...

  var requestUri =
    'http://dev.virtualearth.net/REST/v1/Locations?output=json&countryRegion=JP'
    + '&adminDistrict='
    + encodeURI($('#districttxt').val())
    + '&locality='
    + encodeURI($('#localitytxt').val())
    + '&addressLine='
    + encodeURI($('#linetxt').val())
    + '&key=<access key>&c=ja-jp';
  $.ajax({
    url: requestUri,
    dataType: 'jsonp',
    jsonp: 'jsonp',
    ... skip code ...

</script>

... skip code ...

<div>
  都道府県
  <input type="text" id="districttxt" /><br />
  市区町村
  <input type="text" id="localitytxt" /><br />
  番地など
  <input type="text" id="linetxt" /><br />
</div>

... skip code ...

Reverse-Geocoding のプログラミング

今度は逆に、Bing Maps の地図をマウスで右クリックすると、 その位置の住所をテキスト ボックスに設定するサンプルだ。

<!DOCTYPE html>
<html>
<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=Edge"/>
  <script src="http://ajax.aspnetcdn.com/ajax/jquery/jquery-1.9.0.min.js"></script>
  <script src="http://dev.virtualearth.net/mapcontrol/mapcontrol.ashx?
    v=6.3&mkt=ja-jp"></script>
  <script>
  $(document).ready(function () {
    map = new VEMap('map');
    map.LoadMap();
    map.SetCenterAndZoom(new VELatLong(35.70, 139.7), 13);

    //
    // Reverse-GeoCoding sample
    //
    map.AttachEvent("onclick", function (e) {
      // マウスの右クリック以外の場合は、何もしない
      if (!e.rightMouseButton)
        return;

      $('#msgline').html('getting location ...');

      // マウスの選択位置から latlong を取得
      var lat;
      if (e.latLong) {
        lat = e.latLong;
      } else {
        var pxl = new VEPixel(e.mapX, e.mapY);
        lat = map.PixelToLatLong(pxl);
      }

      // Bing Map にプッシュピンを設定
      map.Clear()
      var shape = new VEShape(VEShapeType.Pushpin, lat);
      map.AddShape(shape);

      // latlong から場所の詳細情報を取得
      map.FindLocations(lat, function (loc) {
        $('#addresstxt').val(loc[0].Name);
        $('#msgline').html('location changed !');
      });
    }); // AttachEvent

    $('#msgline').html('ready !');

  }); // ready
  </script>
</head>
<body>
  <p>
    <div>
      <a href="#" id="checkloc">
      <img src="pin.png" width="50" border="0" />
      </a>
    </div>
    <div>
      <input type="text" id="addresstxt" style="width:400px" />
    </div>
  </p>
  <div id="map" style="position:relative;width:400px;height:300px;"> 
  </div>
  <p id="msgline" style="background-color:gray;">initializing ...</p>
</body>
</html>

なお、https (SSL) のサイトから、http のサイトのサービス (jsonp など) を呼び出すことはセキュリティー上の理由から許可されていない。(ブラウザーが警告を表示する。)
Bing Maps では https (SSL) による呼び出しもサポートしているため、このような場合には、下記の通り、「s=1」のクエリー文字列を入れて https (SSL) を使用する。(これを入れておかないと、内部で使用される jsonp 呼び出しなどで http が使用されてしまうので注意。)

<script type=”text/javascript” src=”https://dev.virtualearth.net/mapcontrol/mapcontrol.ashx?v=6.3&mkt=ja-jp&s=1“></script>