Verify JWT with modulus and exponent (PHP)

JWT is the token format for id token published by OpenID Connect provider (Facebook, Twitter, Google, etc) or access token published by Azure Active Directory. (See here for JWT format. Id token must be JWT, but access token is no defined format in specification.)
Here I show you PHP sample code for JWT verification (validation) with modulus (n) and exponent (e).

I used Crypt_RSA package on phpseclib, and as I described here, you can run this package only with your web browser (with no software installation on your desktop) by using Azure App Service. You can also test with no extra cost !

<?php
include('Crypt/RSA.php');

// decode JWT (here, id token)
$id_token = 'eyJ0eXAiOiJKV1...';
$token_arr = explode('.', $id_token);
$headers_enc = $token_arr[0];
$claims_enc = $token_arr[1];
$sig_enc = $token_arr[2];
$sig = base64_url_decode($sig_enc);

// create Crypt_RSA
$rsa = new Crypt_RSA();

// load public key with modulus and exponent
$modulus = 'tVKUtcx_n9rt5a...';
$exponent = 'AQAB';
$public = [
  'n' => new Math_BigInteger(base64_url_decode($modulus), 256),
  'e' => new Math_BigInteger(base64_url_decode($exponent), 256),
];
$rsa->loadKey($public);

// set hash algorithm
$rsa->setHash('sha256');
$rsa->setSignatureMode(CRYPT_RSA_SIGNATURE_PKCS1);
$hash = new Crypt_Hash('sha256');

// verify !
echo $rsa->verify($headers_enc . '.' . $claims_enc, $sig) ? 'verified' : 'not verified';

function base64_url_decode($arg) {
  $res = $arg;
  $res = str_replace('-', '+', $res);
  $res = str_replace('_', '/', $res);
  switch (strlen($res) % 4) {
    case 0:
      break;
    case 2:
      $res .= "==";
      break;
    case 3:
      $res .= "=";
      break;
    default:
      break;
  }
  $res = base64_decode($res);
  return $res;
}
?>

 

Advertisements

Install Crypt_RSA on Azure Web App only using Web Browser

Here I show you how to install Crypt_RSA for php on Azure Website (Windows) with only web browser, and with no extra cost (free !).
You don’t need any software installation on your desktop, and it’s so useful to test Crypt_RSA.

Now let’s start.

First open Kudu console in Azure Web App.
Go to D:\home\site\wwwroot and run the following command for downloading go-pear.phar.

curl http://pear.php.net/go-pear.phar > go-pear.phar

Run go-pear.phar as follows.

php go-pear.phar

While it’s installing, you will be asked to install as system or local copy, and you must select “local” and proceed installation.

When you’re asked whether to change php.ini, please set “no“.

Now you can use pear command !
Install Crypt_RSA with the following command.

pear -c D:\home\site\wwwroot\pear.ini install Crypt_RSA

Or install Crypt_RSA on phpseclib (PHP Secure Communications Library) as follows.

pear -c D:\home\site\wwwroot\pear.ini channel-discover phpseclib.sourceforge.net
pear -c D:\home\site\wwwroot\pear.ini install phpseclib/Crypt_RSA

After the package is installed, you can edit your own code (php) using App Service Editor.

 

TensorFlow Performance by GPU and TPU (w/ screen recording)

Everybody knows the performance of deep AI workloads depends a lot on devices like GPUs. But how fast is it ? TPU has really robust performance compared with GPUs ?
Here I show you the insight for these concerns by simple experimentation.

For example, the following is the demonstration for running same TensorFlow training task (ResNet network for CIFAR-10 dataset) on both CPU (left side) and NVIDIA Tesla K80 (right side). Here we used “n1-highmem-8” VM instance on Google Compute Engine.
As you can easily see, it apparently needs a lot of time to train on CPU architecture. (It’s obvious without need to run detailed benchmark testing.)

In this post, I just simply show you its performance impact of single device with the same example of code for your intuition. (It will be very simple benchmarks.) I don’t discuss about detailed tuning techniques such as memory layouts or parallelism for model optimization, but I hope this can give you a sense for performance impact by devices.

Here I use CIFRA-10 (50,000 training images which size is 32 x 32 by 3 RGB channels) for dataset and train using ResNet-32 convolutional network (without bottleneck) which learning rate is scheduled : 0.1 (< 40,000 steps), 0.01 (< 60,000 steps), 0.001 (< 80,000 steps), 0.0001 (>= 80,000 steps).

The layers of this network is not so deep and data is not so large such as ImageNet, but it will be enough to understand how the device effects the performance here.

Performance (Steps per Second)

For performance comparison, I simply show you the transition of {the number of training steps}/{second} for the first thousands of steps with TensorBoard screen-capturing. (It’s not difficult and you can soon trace the same procedure.)

For example, the previous case (comparison with CPU and K80) achieves the following performance. As you can see, it is about 8x – 9x faster than CPU by using NVIDIA Tesla K80 utilized device.

Following is the result for the latest NVIDIA Tesla V100 (Volta architecture).
Currently Google doesn’t offer V100 instance as Compute VM and here I used Azure Virtual Machines NC6s_v3 for benchmarks.

As you can see, now it achieves about 50x faster compared with the general purpose CPU device (previous n1-highmem-8 machine).

The intuitive speed is like that :

Finally, I show you the result using Google Cloud TPU (TPUv2). Note that here I specified 1 replica, but 8 replicas are the expected behavior for using Cloud TPUs. (See “Cloud TPU – Troubleshooting and FAQ” for details.)
As you can see, the speed is about 2x faster than the latest Volta GPU (V100) for 1 device (not distributed).

In Conclusion

So which one is the best choice ?

As we saw earlier, this post shows that TPU is the fastest choice for TensorFlow, but it’s important to remember that TPU has some caveats, such as :

  • TPU supports only TensorFlow (incl. Keras on TensorFlow) and you cannot bring Theano, CNTK, Caffe or other major frameworks on TPU devices.
  • Currently the programming code depends on TPU and therefore you cannot debug your code on other devices.

For instance, it’s well known that Cognitive Toolkit (CNTK) is 2x – 5x faster than TensorFlow when using RNN (incl. LSTM). Depending on the real situations, it sometimes might be the same speed or faster to run CNTK on V100 rather than TensorFlow on TPU, because TPU is not so remarkable (it’s still close) compared with the latest GPU architecture.
Here I don’t go so far, but you can also take the latest multiple GPU communications’ technologies with NVIDIA software for speed-up.

Moreover here I showed only training performance, but, during inference (real-time predictions or online predictions), you can take other choices like TensorRT, Tensor Core architecture on V100 (NVIDIA says it’s faster than TPU), or Microsoft FPGA technologies (also Microsoft says it’s faster than TPU).

In conclusion, the latest P100, V100 or TPU has surely remarkable performance against old architectures, and these are helpful for your AI workloads with deep neural layers. But, for the latest V100 and TPUs, you must carefully consider which one is better depending on various aspects of real usage, such as the shape of generated graph, what framework you choose, human workloads in AI lifecycle, required numbers of distributions (devices), cost effectiveness, etc, etc.

Note that currently (April 2018) AWS EC2 p3.2xlarge is $3.06/hour (computing only), Azure VM NCv3 is $3.06/hour (computing only) and TPU is $6.50/hour (device consumption only).

End-to-End Example for Distributed TensorFlow

Current computing technologies for AI (by giant Google, Microsoft, etc) is focusing more practical IT infrastructures or services, which enables to run high-throughput and massive workloads with cloud infrastructure and device-acceleration (GPU or TPU) integrated. In such a situation, the programming for distributed computing is an important piece of technologies.

In this post I show you fundamentals for running distributed training with popular TensorFlow framework. I show the overview for your beginning through the development life cycle including provisioning, programming, running, and evaluation with the basic example.
Here we use regular Distributed TensorFlow with standard functions (only pure TensorFlow library without Keras or other helper functions), but it will also help you understand other distributed frameworks like Horovod, TensorFlowOnSpark, etc. First I use usual session (here I use MonitoredTrainingSession) for distributed training, and next I show you with new Estimator (high-level API) and see its benefits.

I don’t go so far into topology or programming patterns for distributed computing ideas itself (I focus on the tutorial for your first understanding), but I hope this post helps you to start your distributed computing by popular TensorFlow.

Preparing your multiple machines (cluster)

Before starting, you must prepare your multiple machines with python and TensorFlow installed and configured (GPU-accelerated, etc).

Here I don’t explain how to setup your environment, but you can also use Google Cloud Machine Learning Engine (Cloud ML) or Azure Batch AI, which significantly simplifies your provisioning rather than the basic cloud infrastructure (IaaS) like Google Compute VM, Amazon EC2, or Azure VM. (Google Cloud ML is fully-managed and Azure Batch AI is IaaS based. For both Google Cloud ML and Azure Batch AI, you need to specify the following cluster spec and start the following MonitoredTrainingSession in your code.)

In this post I used Azure Batch AI for the following samples and here is useful resource for your first start of Azure Batch AI. In the practical execution, you can also use infini-band network for inter-node communications.

Topology (Brief Overview)

There exist a lot of resources (articles) explaining the topology and programming of Distributed TensorFlow, but let me summarize brief concepts before starting. (You need to know for running your code.)

Distributed TensorFlow consists of 3 types of computing nodes, called “parameter node”, “worker node”, and “master node”.

Computing session runs on multiple (or single) worker nodes. Computed parameters are kept by the parameter node and shared with workers. (You can also run multiple parameter nodes for each parameter blocks, which enables high-throughput IOs for writing and reading parameters. If you don’t specify ps (=parameter server) tasks for variables, the round-robin strategy over all ps tasks is applied.)
One of workers (among worker nodes) must be master node (chief) and master coordinates all workloads in each worker nodes.

Programming Sample

The following is our complete code for Distributed TensorFlow with usual session. (Later I show you using Estimator class instead of using low-level MonitoredTrainingSession.) Because I want to focus only on our concerns, here I use simple graph (neural network) with well-known MNIST (hand-writing digits) dataset.
I referred here (non-distributed training example) which only use standard TensorFlow functions without any other helper classes or functions and now I modified this original code for Distributed TensorFlow. (The highlighted line is modified for Distributed TensorFlow.) Please compare the following distributed one with the original non-distributed one.

Here we use asynchronous training, with which each nodes has independent training loop. Moreover I’m here using QueueRunner (FIFOQueue) for batch reading, but you can also use tf.data functions instead.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import sys
import argparse
import math

import tensorflow as tf

FLAGS = None
batch_size = 100

cluster = None
server = None
is_chief = False

def main(_):
  with tf.device(tf.train.replica_device_setter(
    worker_device="/job:%s/task:%d" % (FLAGS.job_name, FLAGS.task_index),
    cluster=cluster)):

    ###
    ### Training
    ###

    #
    # read training data
    #

    # image - 784 (=28 x 28) elements of grey-scaled integer value [0, 1]
    # label - digit (0, 1, ..., 9)
    train_queue = tf.train.string_input_producer(
      [FLAGS.train_file],
      num_epochs = 2) # data is repeated and it raises OutOfRange when data is over
    train_reader = tf.TFRecordReader()
    _, train_serialized_exam = train_reader.read(train_queue)
    train_exam = tf.parse_single_example(
      train_serialized_exam,
      features={
        'image_raw': tf.FixedLenFeature([], tf.string),
        'label': tf.FixedLenFeature([], tf.int64)
      })
    train_image = tf.decode_raw(train_exam['image_raw'], tf.uint8)
    train_image.set_shape([784])
    train_image = tf.cast(train_image, tf.float32) * (1. / 255)
    train_label = tf.cast(train_exam['label'], tf.int32)
    train_batch_image, train_batch_label = tf.train.batch(
      [train_image, train_label],
      batch_size=batch_size)

    #
    # define training graph
    #

    # define input
    plchd_image = tf.placeholder(
      dtype=tf.float32,
      shape=(batch_size, 784))
    plchd_label = tf.placeholder(
      dtype=tf.int32,
      shape=(batch_size))
      
    # define network and inference
    # (simple 2 fully connected hidden layer : 784->128->64->10)
    with tf.name_scope('hidden1'):
      weights = tf.Variable(
        tf.truncated_normal(
          [784, 128],
          stddev=1.0 / math.sqrt(float(784))),
        name='weights')
      biases = tf.Variable(
        tf.zeros([128]),
        name='biases')
      hidden1 = tf.nn.relu(tf.matmul(plchd_image, weights) + biases)
    with tf.name_scope('hidden2'):
      weights = tf.Variable(
        tf.truncated_normal(
          [128, 64],
          stddev=1.0 / math.sqrt(float(128))),
        name='weights')
      biases = tf.Variable(
        tf.zeros([64]),
        name='biases')
      hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)
    with tf.name_scope('softmax_linear'):
      weights = tf.Variable(
        tf.truncated_normal(
          [64, 10],
          stddev=1.0 / math.sqrt(float(64))),
      name='weights')
      biases = tf.Variable(
        tf.zeros([10]),
        name='biases')
      logits = tf.matmul(hidden2, weights) + biases

    # define optimization
    global_step = tf.train.create_global_step() # start without checkpoint
    optimizer = tf.train.GradientDescentOptimizer(
      learning_rate=0.07)
    loss = tf.losses.sparse_softmax_cross_entropy(
      labels=plchd_label,
      logits=logits)
    train_op = optimizer.minimize(
      loss=loss,
      global_step=global_step)
    
    #
    # run session
    #

    with tf.train.MonitoredTrainingSession(
      master=server.target,
      checkpoint_dir=FLAGS.out_dir,
      is_chief=is_chief) as sess:
      
      # when data is over, OutOfRangeError occurs and ends with MonitoredSession
      
      local_step_value = 0
      array_image, array_label = sess.run(
        [train_batch_image, train_batch_label])
      while not sess.should_stop():
        feed_dict = {
          plchd_image: array_image,
          plchd_label: array_label
        }
        _, global_step_value, loss_value, array_image, array_label = sess.run(
          [train_op, global_step, loss, train_batch_image, train_batch_label],
          feed_dict=feed_dict)
        local_step_value += 1
        if local_step_value % 100 == 0: # You can also use tf.train.LoggingTensorHook for output
          print("Local Step %d, Global Step %d (Loss: %.2f)" % (local_step_value, global_step_value, loss_value))

    print('training finished')
            
if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument(
    '--train_file',
    type=str,
    default='/home/demouser/train.tfrecords',
    help='File path for the training data.')
  parser.add_argument(
    '--out_dir',
    type=str,
    default='/home/demouser/out',
    help='Dir path for the model and checkpoint output.')
  parser.add_argument(
    '--job_name',
    type=str,
    required=True,
    help='job name (parameter or worker) for cluster')
  parser.add_argument(
    '--task_index',
    type=int,
    required=True,
    help='index number in job for cluster')
  FLAGS, unparsed = parser.parse_known_args()

  # start server
  cluster = tf.train.ClusterSpec({
    'ps': ['10.0.0.6:2222'],
    'worker': [
      '10.0.0.4:2222',
      '10.0.0.5:2222'
    ]})
  server = tf.train.Server(
    cluster,
    job_name=FLAGS.job_name,
    task_index=FLAGS.task_index)
  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":
    is_chief = (FLAGS.task_index == 0)
    tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

Now I pick up and explain about our Distributed TensorFlow example.

First, we start tf.train.Server in each nodes to communicate each other with gRPC protocol.
As you can see below, each server is having corresponding each role (later we set job_name and task_index using command line options), and here I assume one parameter node (10.0.0.6) and two worker nodes (10.0.0.4 and 10.0.0.5), in which 10.0.0.4 is master (chief) node.

def main(_):
  with tf.device(tf.train.replica_device_setter(
    worker_device="/job:%s/task:%d" % (FLAGS.job_name, FLAGS.task_index),
    cluster=cluster)):
    ...
            
if __name__ == '__main__':
  ...

  # start server
  cluster = tf.train.ClusterSpec({
    'ps': ['10.0.0.6:2222'],
    'worker': [
      '10.0.0.4:2222',
      '10.0.0.5:2222'
    ]})
  server = tf.train.Server(
    cluster,
    job_name=FLAGS.job_name,
    task_index=FLAGS.task_index)
  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":
    is_chief = (FLAGS.task_index == 0)
    tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

In this example, we’re assigning each one task (role) for each node (computing machine), but you can also assign one task for each devices (CPUs or GPUs) on the single machine as follows.

...
worker_device = "/job:%s/task:%d/gpu:%d" % (FLAGS.job_name, FLAGS.task_index, 0)
with tf.device(tf.train.replica_device_setter(
  worker_device=worker_device,
  ps_device="/job:ps/cpu:0",
  cluster=cluster)):
  ...

In this example, we set 2 as num_epochs for reading data (train.tfrecords). Therefore, after data is read (iterated) twice by cycle, OutOfRangeError occurs and session (MonitoredTrainingSession) is correctly closed.

...
train_queue = tf.train.string_input_producer(
  [FLAGS.train_file],
  num_epochs = 2)
...

After the graph is constructed, we run the distributed session using tf.train.MonitoredTrainingSession. tf.train.MonitoredTrainingSession can also be used in non-distributed training, but it’s distribution-aware session and enables us to eliminate several boilerplate code for distributed training. As you can see, the original sample (non-distributed one) manually started queue runner (QueueRunner) for batch-reading, but tf.train.MonitoredTrainingSession launches queue thread in the background and we don’t need to start queue runner explicitly. (You can also use tf.train.Supervisor for distributed session, but tf.train.MonitoredTrainingSession is recommended now.)
In the session we read train_batch_image and train_batch_label (each 100 rows of features and labels) sequentially and run train_op and estimate loss .

...
with tf.train.MonitoredTrainingSession(
  master=server.target,
  checkpoint_dir=FLAGS.out_dir,
  is_chief=is_chief) as sess:
  
  local_step_value = 0
  array_image, array_label = sess.run(
    [train_batch_image, train_batch_label])
  while not sess.should_stop():
    feed_dict = {
      plchd_image: array_image,
      plchd_label: array_label
    }
    _, global_step_value, loss_value, array_image, array_label = sess.run(
      [train_op, global_step, loss, train_batch_image, train_batch_label],
      feed_dict=feed_dict)
    local_step_value += 1
    if local_step_value % 100 == 0: # You can also use tf.train.LoggingTensorHook for output
      print("Local Step %d, Global Step %d (Loss: %.2f)" % (local_step_value, global_step_value, loss_value))
...

In asynchronous approach, you can also implement to detect the completion of each nodes automatically with enqueue/dequeue mechanism and you can proceed the consequent steps as follows.

Run and Check Our Model

Now let’s start our program with the following command.
As I mentioned earlier, here we’re assuming one parameter server (10.0.0.6) and two worker serves (10.0.0.4 and 10.0.0.5).

Parameter Node (10.0.0.6)

python mnist_tf_dist.py \
  --job_name=ps \
  --task_index=0

Worker Node 1 (10.0.0.4)

python mnist_tf_dist.py \
  --job_name=worker \
  --task_index=0

Worker Node 2 (10.0.0.5)

python mnist_tf_dist.py \
  --job_name=worker \
  --task_index=1

Once the parameter server is started and all workers enter in MonitoredTrainingSession, the training (the loop of train_op) starts in multiple workers and each nodes respectively (independently) read data on corresponding node.
If you use the shared NFS storage or cloud storage, all nodes can read the same data respectively. (You don’t need to deploy data in multiple nodes respectively.)

Output (2 workers – 10.0.0.4 and 10.0.0.5)

As you can see in our source code, we’re setting checkpoint_dir in session. By this setting, the checkpoint data (tensor objects, variables, etc) are all saved in the directory and you can restore and restart your training later again. You can use gathered checkpoint data as you need. (retrain, test, etc)
For example, when you copy these checkpoint data in the specific local folder, you can test the model’s accuracy using the following local (non-distributed) python code. (Note that the meta is saved in worker node, but the index and data is saved in parameter node separately. You must gather these data in one location or use shared (mounted) checkpoint directory for running the following code.)

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import sys
import argparse
import math

import tensorflow as tf

FLAGS = None
batch_size = 100
                 
def main(_):
  #
  # define graph (to be restored !)
  #

  # define input
  plchd_image = tf.placeholder(
    dtype=tf.float32,
    shape=(batch_size, 784))
  plchd_label = tf.placeholder(
    dtype=tf.int32,
    shape=(batch_size))
    
  # define network and inference
  with tf.name_scope('hidden1'):
    weights = tf.Variable(
      tf.truncated_normal(
        [784, 128],
        stddev=1.0 / math.sqrt(float(784))),
      name='weights')
    biases = tf.Variable(
      tf.zeros([128]),
      name='biases')
    hidden1 = tf.nn.relu(tf.matmul(plchd_image, weights) + biases)
  with tf.name_scope('hidden2'):
    weights = tf.Variable(
      tf.truncated_normal(
        [128, 64],
        stddev=1.0 / math.sqrt(float(128))),
      name='weights')
    biases = tf.Variable(
      tf.zeros([64]),
      name='biases')
    hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)
  with tf.name_scope('softmax_linear'):
    weights = tf.Variable(
      tf.truncated_normal(
        [64, 10],
        stddev=1.0 / math.sqrt(float(64))),
    name='weights')
    biases = tf.Variable(
      tf.zeros([10]),
      name='biases')
    logits = tf.matmul(hidden2, weights) + biases

  #
  # Restore and Testing
  #

  ckpt = tf.train.get_checkpoint_state(FLAGS.out_dir)
  idex = int(ckpt.model_checkpoint_path.split('/')[-1].split('-')[-1])
  
  saver = tf.train.Saver()
  
  with tf.Session() as sess:
    # restore graph
    saver.restore(sess, ckpt.model_checkpoint_path)
    graph = tf.get_default_graph()

    # add to graph - read test data
    test_queue = tf.train.string_input_producer(
      [FLAGS.test_file],
      num_epochs = 1) # when data is over, it raises OutOfRange
    test_reader = tf.TFRecordReader()
    _, test_serialized_exam = test_reader.read(test_queue)
    test_exam = tf.parse_single_example(
      test_serialized_exam,
      features={
        'image_raw': tf.FixedLenFeature([], tf.string),
        'label': tf.FixedLenFeature([], tf.int64)
      })
    test_image = tf.decode_raw(test_exam['image_raw'], tf.uint8)
    test_image.set_shape([784])
    test_image = tf.cast(test_image, tf.float32) * (1. / 255)
    test_label = tf.cast(test_exam['label'], tf.int32)
    test_batch_image, test_batch_label = tf.train.batch(
      [test_image, test_label],
      batch_size=batch_size)

    # add to graph - test (evaluate) graph
    array_correct = tf.nn.in_top_k(logits, plchd_label, 1)
    test_op = tf.reduce_sum(tf.cast(array_correct, tf.int32))
    
    # run
    sess.run(tf.initialize_local_variables())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord) # for data batching    
    num_test = 0
    num_true = 0
    array_image, array_label = sess.run(
      [test_batch_image, test_batch_label])
    try:
      while True:
        feed_dict = {
          plchd_image: array_image,
          plchd_label: array_label
        }
        batch_num_true, array_image, array_label = sess.run(
          [test_op, test_batch_image, test_batch_label],
          feed_dict=feed_dict)
        num_true += batch_num_true
        num_test += batch_size
    except tf.errors.OutOfRangeError:
      print('Scoring done !')
    precision = float(num_true) / num_test
    print('Accuracy: %0.04f (Num of samples: %d)' %
      (precision, num_test))         

if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument(
    '--test_file',
    type=str,
    default='/home/demouser/test.tfrecords',
    help='File path for the test data.')
  parser.add_argument(
    '--out_dir',
    type=str,
    default='/home/demouser/out',
    help='Dir path for the model and checkpoint output.')
  FLAGS, unparsed = parser.parse_known_args()
  
  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

 

Simplify with new Estimator class

You can also use high-level TensorFlow API – tf.estimator.Estimator class – for running Distributed TensorFlow with good modularity. (Recent official samples are often written by this API.)
Let’s see the brief example.

Before you run your code, you must set the following TF_CONFIG environment. As you see later, your cluster configuration settings is not involved in your logic code by this environment settings.

I note that it’s different from the previous tf.train.ClusterSpec settings. As you can see, the chief node is explicitly specified and the worker doesn’t include the chief node.
Same as previous example, chief worker also run the training task, similar to other workers, with this settings.

Parameter Node (10.0.0.6)

export TF_CONFIG=$(cat <<<'
{
  "cluster": {
    "chief": ["10.0.0.4:2222"],
    "ps": ["10.0.0.6:2222"],
    "worker": ["10.0.0.5:2222"]
  },
  "task": {
    "index": 0,
    "type": "ps"
  }
}')

Worker Node 1 (10.0.0.4)

export TF_CONFIG=$(cat <<<'
{
  "cluster": {
    "chief": ["10.0.0.4:2222"],
    "ps": ["10.0.0.6:2222"],
    "worker": ["10.0.0.5:2222"]
  },
  "task": {
    "index": 0,
    "type": "chief"
  }
}')

Worker Node 2 (10.0.0.5)

export TF_CONFIG=$(cat <<<'
{
  "cluster": {
    "chief": ["10.0.0.4:2222"],
    "ps": ["10.0.0.6:2222"],
    "worker": ["10.0.0.5:2222"]
  },
  "task": {
    "index": 0,
    "type": "worker"
  }
}')

Now we change our previous code using tf.estimator.Estimator as follows. In this code, we statistically define our input function and model function using Estimator, and run training and evaluation with tf.estimator.train_and_evaluate() function. (Here we define custom Estimator, but you can also use pre-made Estimator, which inherits Estimator class, like LinearClassifier, DNNClassifier, etc.)

As you can see, this code has no difference between distributed and non-distributed.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
 
import sys
import argparse
import math
 
import tensorflow as tf

FLAGS = None
batch_size = 100

def _my_input_fn(filepath, num_epochs):
  # image - 784 (=28 x 28) elements of grey-scaled integer value [0, 1]
  # label - digit (0, 1, ..., 9)
  data_queue = tf.train.string_input_producer(
    [filepath],
    num_epochs = num_epochs) # data is repeated and it raises OutOfRange when data is over
  data_reader = tf.TFRecordReader()
  _, serialized_exam = data_reader.read(data_queue)
  data_exam = tf.parse_single_example(
    serialized_exam,
    features={
      'image_raw': tf.FixedLenFeature([], tf.string),
      'label': tf.FixedLenFeature([], tf.int64)
    })
  data_image = tf.decode_raw(data_exam['image_raw'], tf.uint8)
  data_image.set_shape([784])
  data_image = tf.cast(data_image, tf.float32) * (1. / 255)
  data_label = tf.cast(data_exam['label'], tf.int32)
  data_batch_image, data_batch_label = tf.train.batch(
    [data_image, data_label],
    batch_size=batch_size)
  return data_batch_image, data_batch_label

def _get_input_fn(filepath, num_epochs):
  return lambda: _my_input_fn(filepath, num_epochs)

def _my_model_fn(features, labels, mode):
  # with tf.device(...): # You can set device if using GPUs
  
  # define network and inference
  # (simple 2 fully connected hidden layer : 784->128->64->10)
  with tf.name_scope('hidden1'):
    weights = tf.Variable(
      tf.truncated_normal(
        [784, 128],
        stddev=1.0 / math.sqrt(float(784))),
      name='weights')
    biases = tf.Variable(
      tf.zeros([128]),
      name='biases')
    hidden1 = tf.nn.relu(tf.matmul(features, weights) + biases)
  with tf.name_scope('hidden2'):
    weights = tf.Variable(
      tf.truncated_normal(
        [128, 64],
        stddev=1.0 / math.sqrt(float(128))),
      name='weights')
    biases = tf.Variable(
      tf.zeros([64]),
      name='biases')
    hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)
  with tf.name_scope('softmax_linear'):
    weights = tf.Variable(
      tf.truncated_normal(
        [64, 10],
        stddev=1.0 / math.sqrt(float(64))),
    name='weights')
    biases = tf.Variable(
      tf.zeros([10]),
      name='biases')
    logits = tf.matmul(hidden2, weights) + biases

  # compute evaluation matrix
  label_indices = tf.cast(labels, tf.int32)
  predicted_indices = tf.argmax(input=logits, axis=1)   
  accuracy = tf.metrics.accuracy(label_indices, predicted_indices)
  tf.summary.scalar('accuracy', accuracy[1]) # output to TensorBoard

  # compute loss
  loss = tf.losses.sparse_softmax_cross_entropy(
    labels=labels,
    logits=logits)

  # define operations
  if mode == tf.estimator.ModeKeys.TRAIN:
    #global_step = tf.train.create_global_step()
    #global_step = tf.contrib.framework.get_or_create_global_step()
    global_step = tf.train.get_or_create_global_step()    
    optimizer = tf.train.GradientDescentOptimizer(
      learning_rate=0.07)
    train_op = optimizer.minimize(
      loss=loss,
      global_step=global_step)
    return tf.estimator.EstimatorSpec(
      mode,
      loss=loss,
      train_op=train_op)
  if mode == tf.estimator.ModeKeys.EVAL:
    eval_metric_ops = {
      'accuracy': accuracy
    }
    return tf.estimator.EstimatorSpec(
      mode,
      loss=loss,
      eval_metric_ops=eval_metric_ops)
  if mode == tf.estimator.ModeKeys.INFER:
    probabilities = tf.nn.softmax(logits, name='softmax_tensor')
    predictions = {
      'classes': predicted_indices,
      'probabilities': probabilities
    }
    export_outputs = {
      'prediction': tf.estimator.export.PredictOutput(predictions)
    }
    return tf.estimator.EstimatorSpec(
      mode,
      predictions=predictions,
      export_outputs=export_outputs)

def main(_):
  # read TF_CONFIG
  run_config = tf.contrib.learn.RunConfig()

  # define
  mnist_fullyconnected_classifier = tf.estimator.Estimator(
    model_fn=_my_model_fn,
    model_dir=FLAGS.out_dir,
    config=run_config)
  train_spec = tf.estimator.TrainSpec(
    input_fn=_get_input_fn(FLAGS.train_file, 2),
    max_steps=60000 * 2 / batch_size)
  eval_spec = tf.estimator.EvalSpec(
    input_fn=_get_input_fn(FLAGS.test_file, 1),
    steps=10000 * 1 / batch_size,
    start_delay_secs=0)
    
  # run !
  tf.estimator.train_and_evaluate(
    mnist_fullyconnected_classifier,
    train_spec,
    eval_spec
  )
             
if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument(
    '--train_file',
    type=str,
    default='/home/demouser/train.tfrecords',
    help='File path for the training data.')
  parser.add_argument(
    '--test_file',
    type=str,
    default='/home/demouser/test.tfrecords',
    help='File path for the test data.')
  parser.add_argument(
    '--out_dir',
    type=str,
    default='/home/demouser/out',
    help='Dir path for the model and checkpoint output.')
  FLAGS, unparsed = parser.parse_known_args()
 
  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

You cannot see the train loop (while clause for mini-batch) in this code, but you can use custom tf.train.SessionRunHook or tf.train.LoggingTensorHook for your custom code (ex : debug printing, etc) in the train loop.

Now you can run this code with the following command. As we saw earlier, we don’t have cluster specific settings in our programming code, then you can run the same command (without specific command options) in all nodes as follows.

Run on all nodes (10.0.0.4 – 10.0.0.6)

python mnist_tf_dist_estimator.py

The output result is written in model directory (model_dir in Estimator) and you can predict (infer) with predict() method in tf.estimator.Estimator instead of using session loop.

 

You can also see the summarised results in model directory with TensorBoard UI by running the TensorBoard server with the following command. Here logdir is your model directory. (Same as previous note in our example, the results are saved in the separate nodes and you must locate these results in one directory or use shared model directory.)

tensorboard --logdir=/home/demouser/out

Open TensorBoard UI (http://localhost:6006) using your web browser, and you can see the transition of accuracy, because here we collected the accuracy values in our sample code.

Note : When you use remote terminal (Windows, Mac, etc), please open the port on your server’s firewall settings (accessed by http://{your server name}:6006), or configure the SSH tunnel with your terminal client (accessed by http://localhost:6006). The following is the setting example in PuTTY client.

 

Azure Site Recovery 概要と、可能なこと・不可能なこと

ASR – その意義と概要

Public Cloud として Azure を選択するメリット (Advantage) の 1 つに、Microsoft が長年培ってきたオンプレミス・テクノロジーとの融合 (ハイブリッド技術) がある。クラウドへの本格参入ができない古い体質の業界や企業では、オンプレミスにある資産を Public Cloud に移すのは不可能かもしれないが、こうした安価でスケーラブルなクラウドが、いざというときのリカバリー サイトとして利用できるなら現実味があるだろう。
Azure Site Recovery (ASR) は、そうした「現実解」のニーズに対応した Public Cloud のサービスと言って良い。

Azure Site Recovery (ASR) を介することで、オンプレミスのマシン イメージを Azure 上に Replication し、いざオンプレミス環境に障害が発生した際には、このバックアップされたイメージ (.vhd) を元に、すぐに Azure 上にマシンを稼働して (Failover して)、VPN や閉域網などを経由して企業内から継続的にサービスを利用可能にする。
Primary の復旧が完了したら、Reverse Replicate といって、稼働中の (Azure 上の) リカバリー サイトを元の Primary サイトへ戻すための仕組み (要するに “Failback”) も提供する。


(From Microsoft Hybrid Cloud Blog)

この Azure Site Recovery (ASR) について、よくある疑問点 (利用時の初歩的な質問) を以下の 3 つの観点でまとめたので、これから導入を検討される方は参考にしていただきたい。
後述の通り、ネットワークまわりをどう設計するかは、充分な考察が必要だ。

ASR – よくある疑問 (まとめ)

対象のソース

Primary として選べるソースタイプとしては、以下が利用可能。
なお、現在、Windows Server 2016 の Physical Server はサポートされていないので注意してほしい。(いずれ対応されると思うが)

  • Physical Machine (On-Premise)
  • Hyper-V 上の Guest VM (On-Premise)
  • VMWare 上の VM (On-Premise)
  • Azure 上の VM

いまでは Azure 上の VM などもリカバリー可能だが、ASR は、当初はオンプレミス上の環境をクラウドにリカバリーして事業継続するためのサービスとして登場しており、ASR を中継することで、On-Premise to Azure だけでなく、Azure を中継した On-Premise to On-Premise のリカバリーに使うことも可能だ。

ネットワーク

ASR ではいろいろなアーキテクチャが考えられるが、ここでは話が混乱しないよう On-premises to Azure で考えてみよう。

上述の通り、Failover 後は、企業内からリカバリー先のマシン (今回の場合、Azure 上のマシン) を利用するため、Azure 上の Recovery Site と On-Premise との間はあらかじめ同じネットワークに入れることになる。(あらかじめ Azure の Site-to-Site VPN や ExpressRoute で Recovery 先と接続しておく。)
このように、ASR 利用時は、DR (Disaster Recovery) 時も踏まえた充分なトポロジー設計が必要になる。

例えば、Azure では、同一ネットワーク上に同一のアドレス空間を持たせることはできないので、Recovery された VM は一般に IP アドレスが変更される。このため、リカバリー対象のマシンで DNS サーバーとの接続が必要になり、Azure の Virtual Network (VNet) 上の DNS Server としてオンプレ側の DNS Server を指定したり、あるいは Failover の際に DNS Server も Replication するなど方針を決める必要があるだろう。(Failover 先の Azure VM で IP Address を同じにすることも不可能ではないが、ASR がこうした仕組みを備えているわけでないため、自身でこうした Subnet Failover (転換処理) を実装することになる。)

ASR は、上述の通りイメージ ベースでの Recovery になるので、すべてを ASR に頼るのではなく、データベース (RDB), AD (Active Directory) などでは、下記のようなコンポーネントが持つ Recovery テクノロジーを組み合わせることも検討する。(Multi-Tier アプリケーションのように複数台のマシンで状態の一貫性が必要になる場合には、Multi VM Consistency という機能を有効化することで一貫性を維持できる。)

また、オンプレミスから Azure への Replication (delta による差分反映など) では一般にインターネット (Azure に向けての inbound の https ポート) が使われるが、以下も考慮しておこう。

  • トラフィック送信にインターネットを使わず、Azure ExpressRoute Public Peering を使うよう構成することも可能
  • ただし、現在、トラフィック送信に Site-to-Site VPN を使うことは不可能

一方、Failback では、一般にインターネット回線は利用できないので注意してほしい。Failback の場合は、Azure 側から On-Premise への https による inbound アクセスがおこなわれるため、Azure Network (Azure VNet) からの VPN か ExpressRoute を介して On-Premise につなぐことになるためだ。(Failback も計画に含めるなら、戻す際のネットワーク構成も考慮する必要がある。)

ネットワーク帯域については、ASR Capacity Planner が使用できる。(この ASR Capacity Planner は、Azure 上の ASR 設定の途中でリンクが表示され、ダウンロードを促すようになっている。)

運用手順

ASR では Failover は自動でおこなわれないので、運用・監視は自身でおこない (または、他製品を組み合わせて)、UI (Azure Portal), スクリプト, SDK などから能動的に Failover を呼び出す。(Failover を呼び出すと、Recovery 先でリカバリー イメージを元に VM が起動する。)
なお、OMS と組み合わせた場合は、既に構成のための機能が OMS に Built-in されている。

下記の 3 種類の Failover が提供されていて、いわゆる障害発生時に呼び出す「Unplanned Failover」は、他の 2 つと異なり Failover 時に最新の Delta Replication は作成しない。(その時点でとられている最新のイメージを元に Failover する。)

  • Test Failover
  • Planned Failover
  • Unplanned Failover

Failover 後の処理は、Azure Automation Runbook を使うことができる。VNet 等の Azure Resource の構成や、VM 内でのセットアップスクリプトの実行などは、Azure Automation Runbook を組み合わせて構成する。

Microsoft Azure : Add Azure Automation runbooks to recovery plans
https://docs.microsoft.com/en-us/azure/site-recovery/site-recovery-runbook-automation

 

ここでは ASR 導入の際の初歩的な疑問を中心にまとめたが、細かなトポロジーや FAQ は以下が参考になるので是非参照してほしい。

Azure Site Recovery support matrix (On-premises to Azure)
https://docs.microsoft.com/en-us/azure/site-recovery/site-recovery-support-matrix-to-azure

Azure Site Recovery FAQ
https://docs.microsoft.com/en-us/azure/site-recovery/site-recovery-faq

 

Azure Machine Learning Services ステップ・バイ・ステップ

ここでは、はじめて使う人のために、新しい Azure Machine Learning Services が何を解決し、どのような使い方であるか CLI コマンドにそって見ていく。

新しい Azure Machine Learning Services は、これまでの Azure Machine Learning Studio とは違うまったく新しい概念だ。
従来の Azure Machine Learning Studio を使った人はわかると思うが、GUI で構築できるのは良いが、細かなモデルや学習フローを実現しようとすると python や R が持っている膨大なパッケージに勝るものはなく、Studio で構築したフローから無理やり python や R の外部コードを呼び出して作るといった本末転倒な使い方 (何のために Studio を使っているのかわからないケース) も多々と思う。
新しい Azure Machine Learning Services は、こうした課題を解決すべく、コード (Python, 今後は R も) に基づくオープンなライフサイクルを支援する考え方に刷新されている。(Microsoft Ignite のセッションを見てもらうと、実際そうしたコンセプトが語られている。)

データの準備 (整形など) をおこない、学習をして (作った学習用のコードはローカル環境だけなく、Azure 上の GPU インスタンスや Apache Spark 上などで短時間に学習させ)、出来上がったモデルを従来の Azure Machine Learning Studio のように REST のエンドポイントとして展開するといった、分析・学習 (Analyze) から本番展開 (Operationalize) までのライフサイクル全般をサポートするフレームワークであり、ここでは話をむずかしくしないよう、ローカルで学習したモデルを Web Service 化するという基本の流れに沿って解説したい。

新しい Azure Machine Learning Services は基本的にコマンドから利用するため (実際、Azure Portal からは一部の機能しか使えない)、このコマンドをシナリオに沿って順に見くことで、新しい Azure Machine Learning がどのようなものか確認できるだろう。

なお、このあと見ていくように、ざっくりと Experimentation と Model Management という独立した 2 つの作業にわかれているので、まずは頭の片隅におぼえながら見ていってほしい。

準備

ここで紹介する az ml コマンドを使うには、現時点では、後述の Azure Machine Learning Workbench をインストールするか、Azure の Data Science Virtual Machine (DSVM) を使用する必要がある。
Azure CLI のインストールや azure-cli-ml モジュールの追加インストールでは使えないようなので注意してほしい。

CLI コマンドのヘルプを見るには -h オプションを使用する。
この投稿では、コマンドを難解にしない目的から必要最小限の引数しか指定しないが、細かな設定については下記のように確認してほしい。

# show what resource can be used
az ml -h
# show what command can be used for workspace
az ml workspace -h
# show what option can be used for workspace creation
az ml workspace create -h

また、以降を進める前に、CLI から、あらかじめ Azure にログインをしておこう。(az login を実行する。)

Experimentation Account と Workspace の作成

新しい Azure ML のアカウントには Experimentation Account と Model Management Account がある。
モデル構築時は Experimentation Account を作成し、ここに Workspace を作成して、Workspace 内で個々の Project を作成する。

下記は、CLI コマンドを使って、リソースグループ myRG01 に Experimentation Account (名前 exacc01) と Workspace (名前 testws01) を作成している。(なお、Experimentation Account と Workspace は Azure Portal からも作成できる。)

# Create experimentation account
az ml account experimentation create -n "exacc01" -g "myRG01"
# Create workspace
az ml workspace create -a "exacc01" -n "testws01" -g "myRG01"

Training フェーズ (モデルの作成)

Python のコード作成、トレーニング、モデル作成はオープンな環境 (PyCharm, Jupyter, DataBricks など) を使っても良く、普段使い慣れたエディターでコード編集をして下記の Experimentation コマンドと併用したり、あるいは Experimentation はいっさい使わずにモデルを構築し、それを後述する Model Management で展開するような使い方も充分ありだ。

ただ、上記で作成した Azure Machine Learning Experimentation Account で az ml experimentation submit コマンドを使って Training 用のコードを展開 (および実行) することで、作成したコード (Python または pyspark) のさまざまな条件下でのイテレート (繰り返し実行) とその実行結果 (作成されたモデルとその精度など) を管理できるようになっており、例えば、コードを作成してまずはローカル環境で実行し、Epoch や Learning Rate を変えて実行時間や Accuracy、Regularization Rate を評価し (それを何度も繰り返し)、さらに GPU 環境や Spark Cluster などコンピューティング環境を変えて評価して最終的に実運用に適した条件を決定していく過程を、Cloud 上のサービス (Azure Machine Learning Experimentation) を通して管理できるようになっている。(下図参照)

この Experimentation は、前述のコマンドライン (az ml experimentation submit コマンド) 以外に Azure Machine Learning Workbench (Windows 版、Mac 版) や Visual Studio Code Tools for AI といったデスクトップ ツール (IDE) からも利用できるようになっている。(これらを使うには、もちろん、あらかじめ前述の Experimentation Account を作っておく必要がある。これらの IDE を起動する際に Experimentation Account を確認する。)

aml_workbench

ここではこうした IDE の使い方は説明しないので “チュートリアル : Classify Iris” を参照してほしいが、主に下記のようなステップを支援する。

  • 上記で作成した Workspace に Project を新規作成
  • Data Set を準備・整形 (dataprep)
  • Python で Training のコードを作成 (学習済モデルの作成)
  • 実行と結果の確認

(補足 : Project は単なるローカル環境のフォルダではなく、Azure 上のリソースとして作成される。カレントフォルダーの Project が Active Project として認識され、例えば、Active Project 上で az ml account experimentation show と入力すると、その Project の Experimentation が表示される。)

なお、Azure Machine Learning Workbench にはいくつか魅力的な機能が備わっているので以下に軽く紹介しておこう。私のお気に入りの機能は以下だ。

  • Data の準備では、元のデータを変えることなく、Data cleaning のための細かなステップをエディタでビジュアルに定義できる。(下図の「step」参照。Power BI などで使われていた手法だ。) メトリクスごとのデータ プロファイルをみながら、不要なステップを削ったり、入れ替えたりなどして編集できる。
  • PROSE と呼ばれる AI による Data Wrangling テクノロジにより、ちょっとしたデータ整形なら、Excel の Autofill のように賢く自動解釈する。例えば、ログのようなつながった文字を分解する際も、単なる文字数や文字による一致、正規表現マッチングなどではなく、文のコンテキストを解釈して意味的に分解できる。(“Diving deep into what’s new with Azure Machine Learning” 参照)
    あまりに複雑なデータ加工の場合は、最終的にスクリプトを書くことも可能。
  • 上図のように Experimentation で必要になる config などの構成をサポートする。ローカルマシン以外に、DSVM や HDInsight Spark Cluster などを容易に使えるので、学習の際、IDE 上から、Azure 上の GPU 等の高尚なコンピューティング環境と (まるでローカル環境で構築しているかのように) 容易に連携できる。(“Configure compute environment” 参照)

Workbench 上で整形したデータ (正確には、上述の通り「データ整形の手順」) は .dprep の拡張子を持つファイルとして保存され、下記の通り Pandas や Spark DataFrame など必要な形式に変換してロードできる。

from azureml.dataprep.package import run

iris = run('iris.dprep', dataflow_idx=0, spark=False)

Workbench には、他にも Jupyter Notebook とのインテグレーション、ROC 曲線などの IDE 上での可視化など、多くのサポート機能を備えている。
また、「コード エディタは自分がよく使うものにしたい」といった場合は、Workbench のコード編集として PyCharm や Visual Studio Code を設定することで連携 (コード編集のみ外部エディタを使用) も可能だ。

Python のコードを作成して学習が完了したら、学習済モデルを Pickle 形式のファイル (.pkl) として保存し (Workbench の場合、[Run] ボタンを押す)、このモデルをこのあとの作業 (Scoring フェーズ) で使用する。
なお、この際も、Azure Machine Learning Workbench を使うと、作成されたモデルの [Promote] をおこなうことで (下図)、このモデルの link が取得され、このあとの Model Management で容易に扱えるようになっている。

2018/05 追記 : なお、Build 2018 のタイミングで発表された Azure ML SDK for Python では、通常の notebook における visualize など、Workbench 以外でも開発生産性を高める仕組みがいくつか提供された。(方向性変更 ?)

Scoring (Prediction) フェーズ

学習済モデル ファイル (今回の場合 .pkl と仮定する) ができたら、これを元に Prediction (または Scoring) のコードを作成して、これを Web Service として発行する。

以下は、付属している Iris の Prediction のサンプルだ。(あやめの種類を区別する Logistic Regression のモデルを扱っている。)
この中の以下の処理に注目してほしい。

  • カレント フォルダから、前述で学習したモデル (下記の model.pkl) をロードしている。
  • 下記の通り、このあとの Web Service 化にそなえて、init() (初期化の処理), run() (Web Service 本体の処理) を定義し、これらを main から呼び出してデバッグする。
  • Azure Machine Learning が提供する generate_schema() 関数で、Input (入力引数), Output などの Web Service の仕様を json で出力している。
# Import data collection library. Only supported for docker mode.
# Functionality will be ignored when package isn't found
try:
  from azureml.datacollector import ModelDataCollector
except ImportError:
  print("Data collection is currently only supported in docker mode. May be disabled for local mode.")
  # Mocking out model data collector functionality
  class ModelDataCollector(object):
    def nop(*args, **kw): pass
    def __getattr__(self, _): return self.nop
    def __init__(self, *args, **kw): return None
  pass

import os

# Prepare the web service definition by authoring
# init() and run() functions. Test the functions
# before deploying the web service.
def init():
  global inputs_dc, prediction_dc
  from sklearn.externals import joblib

  # load the model file
  global model
  model = joblib.load('model.pkl')

  inputs_dc = ModelDataCollector("model.pkl", identifier="inputs")
  prediction_dc = ModelDataCollector("model.pkl", identifier="prediction")

def run(input_df):
  import json

  # append 40 random features just like the training script does it.
  import numpy as np
  n = 40
  random_state = np.random.RandomState(0)
  n_samples, n_features = input_df.shape
  input_df = np.c_[input_df, random_state.randn(n_samples, n)]
  inputs_dc.collect(input_df)

  pred = model.predict(input_df)
  prediction_dc.collect(pred)
  return json.dumps(str(pred[0]))

def main():
  from azureml.api.schema.dataTypes import DataTypes
  from azureml.api.schema.sampleDefinition import SampleDefinition
  from azureml.api.realtime.services import generate_schema
  import pandas

  df = pandas.DataFrame(data=[[3.0, 3.6, 1.3, 0.25]], columns=['sepal length', 'sepal width','petal length','petal width'])

  # Turn on data collection debug mode to view output in stdout
  os.environ["AML_MODEL_DC_DEBUG"] = 'true'

  # Test the output of the functions
  init()
  input1 = pandas.DataFrame([[3.0, 3.6, 1.3, 0.25]])
  print("Result: " + run(input1))

  inputs = {"input_df": SampleDefinition(DataTypes.PANDAS, df)}

  #Genereate the schema
  generate_schema(run_func=run, inputs=inputs, filepath='./outputs/service_schema.json')
  print("Schema generated")

if __name__ == "__main__":
  main()

ここまでで、学習済モデル ファイル (上記の model.pkl)、Web Service の仕様を定義した Json ファイル (上記の service_schema.json)、そしてこの Scoring 用の Python コード (score_iris.py とする) が準備できた。

Computing Environment の作成と設定

以上で必要なファイルは整ったので、Web Service 化して展開 (deploy) する。
現実の開発ではさまざまなバージョンのモデルを展開してリバイズ (見直し) をおこなうなど、Model や環境の管理 (バージョン管理など) が欠かせないが、ここからは Azure ML の Model Management を使ってこうした作業を効率化する。なお、必要なファイルさえ揃っていれば、ここまで使ってきた Experimentation Account は必要ない。(Experimentation Account と Model Management Account は完全に分離されている。)

まず、前提知識をいくつか記載する。

新しい Azure Machine Learning を使った Web Service 化では、展開の際に AKS (Azure Container Service の Kubernetes cluster) と ACR (Azure Container Registry) が使われる。このため、あらかじめ、コマンド az provider register --namespace Microsoft.ContainerRegistry によって Azure Container Registry プロバイダーを登録しておく必要がある。(既に登録済の場合は OK)

また、Web Service 化は、ここで紹介する方法以外に、単一のコマンドですべて生成することもできるが、ここでは構成などを理解できるように、あえて複数コマンドでわけて展開をおこなう。(構造が理解できてきたら、単一コマンドで省力化しても良いだろう。)

まず、展開の前に、展開環境である Computing Environment を下記の通り新規作成する。
ここでは、testenv01 という名前の Computing Environment を East US 2 に作成する。(Kubernetes クラスタの名前は「testenv01」になる。) 例えば、環境を Staging 用、Production 用の複数作成しておき、staging 環境のテストが完了したら同じ image を Production に展開するといった使い方が可能だ。

下記の -c (--cluster) オプションで Azure Container Service (AKS) が使用され 、オプションを指定しない場合は local の docker container に展開される。(CLI コマンドを使って local と AKS を切り替えることも可能。) クラスタ構成の場合、既定で Kubernetes 1 pod 内に 2 台の agent を構成するが、agent の台数も環境ごとにオプションで指定できる。

az ml env setup -n testenv01 -l eastus2 -c

上記のように -g (--resource-group) オプションを省略すると、このコマンドによって testenv01rg と testenv01rg-azureml-cxxxx の 2 つのリソースグループが新規作成され、後者のリソースグループに Azure Container Service の各ノード (VM) が展開される。(xxxx は数字。今回は testenv01rg-azureml-c7294 だった。)
これらのリソースグループはこのあとのコマンドで使うのでおぼえておく。

下記のコマンドで Computing Environment がすべて表示されるので、作成中の環境の provisioning state が Succeeded になるまで待つ。(特に新規に Azure Container Service を作成する場合は、最初のノード作成に時間がかかる。)

az ml env list

Succeeded になったら、下記コマンドを実行して、この作成した環境を Active Compute Environment として設定する。

az ml env set -n testenv01 -g testenv01rg

なお、作成された Compute Environment の情報は Azure 上に保存されるが、この Active Compute Environment はその作業環境上のみ (使用しているクライアント上のみ) の設定なので注意。
以下のコマンドで Active (Current) の環境を確認できる。

az ml env show

Model Management Account の作成と設定

次に、下記コマンドで Model Management Account を新規作成する。(このあと登録するモデルは、ここでバージョン等が管理される。)

az ml account modelmanagement create -l eastus2 -n testmg01 -g myRG01

つぎに、下記コマンドによって、上記で作成した Model Management Account  を使用中の作業環境 (クライアント) における Active な Model Management Account として設定する。

az ml account modelmanagement set -n testmg01 -g myRG01

Active な Model Management Account は下記コマンドで確認できる。

az ml account modelmanagement show

Web Service としての展開 (Deploy)

下記コマンドで、前述で作成した学習済モデル ファイル (model.pkl) を登録する。(何度も登録するとバージョンが 2, 3 とあがっていくので注意。)
この際、登録されたモデルの ID (model id) が出力されるので、これをコピーしておく。(以降では model id を 08ceeb5842eb4fa4a94d47533d7e3aab とする。)

az ml model register -m model.pkl -n model.pkl

つぎに、下記コマンドで、上記で準備した Web Service 用の Scoring のソース (score_iris.py) と仕様 (service_schema.json) を登録して Manifest を作成する。ここで、-i (--model-id) オプションは、上記で登録したモデルの ID である。つまり、使用するモデルは都度 変更できる。
(なお、何度も登録すると Manifest のバージョンがあがっていくので注意。)

この Manifest 作成時に manifest id が出力されるので、これをコピーしておく。(以降では manifest id を 789c3c18-b385-423f-b88c-429a2973b99b とする。)

az ml manifest create -n testmanifest1110 -f score_iris.py -r python -i 08ceeb5842eb4fa4a94d47533d7e3aab -s service_schema.json

この Manifest を使って、下記の通り Image を新規作成して登録する。Image の実体は、ACR (Azure Container Registry) に登録される。(この ACR は、上記の az ml env setup コマンドで新規作成されたリソースグループの中にある。)
作成時、イメージの Id が出力されるので、これをコピーしておく。(以降では image id を a61ba643-884d-4181-a7cb-b1b9c6c48b3e とする。)

az ml image create -n testimage1110 --manifest-id 789c3c18-b385-423f-b88c-429a2973b99b

あとは、この登録されたイメージを使って、下記の通り Web Service を展開して起動する。
今回は使用しないが、追加の Package が必要な場合には -c オプションで Conda Dependencies File (.yml) を指定できる。
なお、下記で “realtime” と指定しているが、今後、”batch” (Azure Batch 展開) も可能になるようだ。BUILD 2018 のタイミングで、Azure Batch AI, Azure Container Instance (ACI), IoT Edge への展開がサポートされた。(2018/05 更新)

az ml service create realtime --image-id a61ba643-884d-4181-a7cb-b1b9c6c48b3e -n testapp1110 --collect-model-data true

下記の通り出力されるので、出力された service id (testapp1110.testenv01-90e88070.eastus2) をおぼえておく。(このサービスを利用する際に使用する。)

Creating service............................Done
Service ID: testapp1110.testenv01-90e88070.eastus2
Usage for cmd: az ml service run realtime -i testapp1110.testenv01-90e88070.eastus2 -d "{\"input_df\": [{\"petal length\": 1.3, \"petal width\": 0.25, \"sepal width\": 3.6, \"sepal length\": 3.0}]}"
Usage for powershell: az ml service run realtime -i testapp1110.testenv01-90e88070.eastus2 --% -d "{\"input_df\": [{\"petal length\": 1.3, \"petal width\": 0.25, \"sepal width\": 3.6, \"sepal length\": 3.0}]}"
Additional usage information: 'az ml service usage realtime -i testapp1110.testenv01-90e88070.eastus2'

今回は Azure CLI のみで image を展開しているが、普通に docker コマンドを使っても良いし、展開先も IoT Edge など docker をサポートする他の環境に展開できる。こうした点もブラックボックス化されておらず、あくまでオープンなプラットフォームをベースに作業を簡素化しているだけだ。
また、image を変更 (新しい image を登録) した際には、CLI コマンドを使ってゼロダウンタイムで既存の起動中のサービスを新しい image に変更できる。

Web Service の利用

展開された Web Service (REST) の URL, Port, エンベロープなどは、下記コマンドで確認できる。

az ml service usage realtime -i testapp1110.testenv01-90e88070.eastus2
Scoring URL:
  http://13.68.115.32:80/api/v1/service/testapp1110/score

Headers:
  Content-Type: application/json
  Authorization: Bearer
    ( can be found by running 'az ml service keys realtime -i testapp1110.testenv01-90e88070.eastus2')

Swagger URL:
  http://13.68.115.32:80/api/v1/service/testapp1110/swagger.json

Sample CLI command:
  Usage for cmd: az ml service run realtime -i testapp1110.testenv01-90e88070.eastus2 -d "{\"input_df\": [{\"petal width\": 0.25, \"sepal width\": 3.6, \"sepal length\": 3.0, \"petal length\": 1.3}]}"
  Usage for powershell: az ml service run realtime -i testapp1110.testenv01-90e88070.eastus2 --% -d "{\"input_df\": [{\"petal width\": 0.25, \"sepal width\": 3.6, \"sepal length\": 3.0, \"petal length\": 1.3}]}"

Sample CURL call:
  curl -X POST -H "Content-Type:application/json" -H "Authorization:Bearer " --data "{\"input_df\": [{\"petal width\": 0.25, \"sepal width\": 3.6, \"sepal length\": 3.0, \"petal length\": 1.3}]}" http://13.68.115.32:80/api/v1/service/testapp1110/score

Get debug logs by calling:
  az ml service logs realtime -i testapp1110.testenv01-90e88070.eastus2

Get STDOUT/STDERR or Request/Response logs in App Insights:
  https://analytics.applicationinsights.io/subscriptions/b3ae1c15-4fef-4362-8c3a-5d804cdeb18d/resourcegroups/testenv01rg-azureml-c7294/components/mlcrpaib1917249512d#/discover/home?apptype=Other%20(preview)

ここで認証用に使用する key は、下記コマンドで出力できる。

az ml service keys realtime -i testapp1110.testenv01-90e88070.eastus2
PrimaryKey: 6d452f09a4xxxxxxxxxxxxxxxxxxxxxx
SecondaryKey: ec7453b683xxxxxxxxxxxxxxxxxxxxxx

このため、Fiddler などを使って下記の通り HTTP Request をおこなう。
今回は、あやめの情報 (ガクの幅など) を渡すことで、そのあやめの種類をモデルから推測して返している。

POST http://13.68.115.32:80/api/v1/service/testapp1110/score
Content-Type: application/json
Authorization: Bearer 6d452f09a4xxxxxxxxxxxxxxxxxxxxxx

{
  "input_df": [
    {
      "petal width": 0.25,
      "sepal width": 3.6,
      "sepal length": 3.0,
      "petal length": 1.3
    }
  ]
}
HTTP/1.1 200 OK
content-type: application/json

"\"Iris-setosa\""

Scaling

前述の通り Kubernetes の Cluster が使用されるため、以降は kubectl コマンドで管理できる。
しかし、よく使う Agent Node (Virtual Machine) や Replica の一部の管理タスクは Azure ML のコマンドも使えるので最後に紹介しておこう。

例えば、下記は、Azure ML のコマンドを使って Agent Node (Agent の Virtual Machine) を 3 台に変更する。

az acs scale -g testenv01rg-azureml-c7294 -n testenv01 --new-agent-count 3

下記は、Autoscale の設定を解除して、この Agent 上に動いている Replica を 3 つに設定している。(下記の通り kubectl コマンドを使わずに設定できる。)

az ml service update realtime -i testapp1110.testenv01-90e88070.eastus2 --autoscale-enabled false
az ml service update realtime -i testapp1110.testenv01-90e88070.eastus2 -z 3

 

ご覧のように、新しい Azure Machine Learning を使うと Python で構築した Score (Test) の処理をそのまま Web Service として発行できるが、同じようなことが可能な Server Platform (オンプレミスの製品) として Machine Learning Server というものが使えるのをご存じだろうか。旧 R Server と呼んでいたもので、現在は Python も使えるようになっている。
また、現在 Model Management のみ R もサポートされているらしく、今後、R の Experimentation やドキュメントも出てくることだろう。
つまり、これらは Python と R で同じ Experience を提供し、Machine Learning Team Blog で書いているように、これらのプラットフォーム (クラウド版の Azure ML とオンプレの ML Server) は今後 統一化されていく予定らしい。

 

参考にした情報 :

Azure Machine Learning Tutorial – Classify Iris :
https://docs.microsoft.com/en-gb/azure/machine-learning/preview/tutorial-classifying-iris-part-1

Azure Machine Learning Reference :
https://docs.microsoft.com/en-us/azure/machine-learning/desktop-workbench/experimentation-service-configuration-reference

 

R で日本語 OCR (tesseract)

今回実際に必要になった例だが、大量の伝票画像を電子化 (文字化) して、そこから情報を得るようなケースを考えてみたい。
伝票のヘッダー・明細や、項目の位置などは DNN を使ってあらかた学習できるかもしれないが、問題なのは、そこから文字を読み取る作業だ。

MNIST のサンプルなど、文字認識はいまや使い古された画像認識の手法だが、アルファベットや数字とは違い、日本語の場合、漢字も含め相当な数の文字があるため、一から教師あり画像を準備してトレーニングするのは (実際のビジネスでは) 現実的ではないからだ。

そこで、今回、tesseract の R のパッケージを使うことを検討してみた。

使い方

使い方は簡単だ。(今回は、Mac または PC 上の環境を想定する。)
まず、R コンソールなどから、tesseract のパッケージ (バイナリ) をインストールする。

install.packages("tesseract")

既定のバイナリには英語のデータしか入っていないので、Github から日本語の Trained データを落とせば良いが注意がある。
現在 Github にある最新は tesseract 4 のデータだが、CRAN のリポジトリにある tesseract 1.4 パッケージ (R のパッケージ) は tesseract 3.0X がベースになっているので (つまり、tesseract 4 ではない)、tesseract 3.04 ベースの Trained データを落としてくる必要があるのだ。
このため、現在は、以下の通り Branch を指定してダウンロード (Clone) する。

git clone --branch 3.04.00 https://github.com/tesseract-ocr/tessdata.git

下図の通り、各言語ごとの .traineddata がダウンロードされるので、この中の jpn.traineddata を、tesseract のインストール・ディレクトリである %userprofile%\Documents\R\win-library\3.4\tesseract\tessdata にコピーする。

以上で準備完了だ。

今回は Windows や Mac 上でのセットアップ方法だが、本番運用などで Linux に環境を作る際には下記を参照してほしい。

https://github.com/ropensci/tesseract

プログラミング

今回は、下図のサンプル画像を読み込ませよう。

下記の通り、R を使って、日本語用の Trained データ (エンジン) をロードし、これで OCR を行えば良い。

library(tesseract)
tseng <- tesseract(language = "jpn")
text <- ocr(
  "C:\\tmp\\sample.png",
  engine = tseng)
cat(text)

実行結果は、一見、良い感じだ。(下図参照)
今回学習済みデータ (.traineddata) をダウンロードして使ったが、オープンソースなので元データのカスタマイズなども可能らしい。

が、しかし、、、

いろいろやってみたが、今回のような機械で出力した伝票のようなケース (大きさも均一で、一定精度以上のフォント画像) には使えるが、精度の問題で、フリーな手書きの認識などにはまだ耐えられない感じだ。

例えば、文字の大きさ (解像度) を変えると認識率が大きく変わってしまうので、上記の例も、画像サイズによっては誤った結果が返ってくるので注意が必要だ。
また、下図のような手書きのデータもひらがなの「すばる」をまったく認識してくれない状況だった。(解像度を変えると、このひらがなの箇所の結果がいろいろ変化する状況。。。)

日本語については、現状はまだ用途を限定して使う感じだ。(英語の精度はすこぶる良いが)