分散TensorFlowでロジスティック回帰 -Distributed TensorFlow- その1の続きです。
前回TensorFlowでシングル・ノード版のロジスティック回帰を実装し、MNISTの分類を行いました。
今回はロジスティック回帰を並列実行します。
並列実行環境
並列実行環境として物理マシンを並べても良いのですが、お手軽にDockerコンテナを使います。
CentOS7にDockerを入れ、DockerHubからTensorFlowコンテナイメージを持ってきます。
yum -y install docker # dockerをサービスとして開始 systemctl start docker systemctl enable docker # DockerHubからtensorflowイメージをpull docker pull gcr.io/tensorflow/tensorflow
このイメージはTensorFlowのバージョンが低いので、Dockerfileを作成してTensorFlowをアップグレードします。
vi Dockerfile
~
FROM gcr.io/tensorflow/tensorflow
MAINTAINER x1 <viva008@gmail.com>
EXPOSE 8888 2222
# make directory -> mounted on host
RUN mkdir -p /var/data/shared
# upgrade pip
RUN pip install --upgrade pip
# upgrade tensorflow 0.8 to 0.9
ENV TF_BINARY_URL https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.9.0-cp27-none-linux_x86_64.whl
RUN pip -q uninstall -y tensorflow && \
pip -q install --ignore-installed --upgrade $TF_BINARY_URL
~
コンテナ間でチェック・ポイントを共有するための、共有マウント用ディレクトリ(/var/data/shared)の作成もこのDockerfileで行いました。
docker buildしてイメージの生成を行います。
docker build -t tensorflow:0.9.0 .
TensorFlowの分散処理には下記のロールが必要になります。
- パラメータの計算を並列化するパラメータ・サーバ
- 勾配・損失計算を並列化するワーカー・ホスト
並列化の様子を観察するため、パラメータ・サーバ2台とワーカー・ホスト2台を起動します。
# パラメータ・サーバ1 ps_93 docker run --privileged -td -p 3223:3223 -v /var/data/shared:/var/data/shared --add- host="tensorflow.x1.com:172.xx.xx.xx" --name ps_93 tensorflow:0.9.0 # tensorflow.x1.com はDockerホストのホスト名、172.xx.xx.xxはDockerホストのIPです。 # 各ホストに割り当てるポートを開放しておきます。 # ホストの/var/data/sharedにコンテナの/var/data/sharedをマウントします。 # パラメータ・サーバ2 ps_94 docker run --privileged -td -p 3224:3224 -v /var/data/shared:/var/data/shared --add- host="tensorflow.x1.com:172.xx.xx.xx" --name ps_94 tensorflow:0.9.0 # ワーカー・ホスト1 wk_93 docker run --privileged -td -p 2223:2223 -v /var/data/shared:/var/data/shared --add- host="tensorflow.x1.com:172.xx.xx.xx" --name wk_93 tensorflow:0.9.0 # ワーカー・ホスト3 wk_94 docker run --privileged -td -p 2224:2224 -v /var/data/shared:/var/data/shared --add- host="tensorflow.x1.com:172.xx.xx.xx" --name wk_94 tensorflow:0.9.0
並列実行版ソースコード
公式のDistributed TensorFlow r0.9にはグラフ間レプリケーション・非同期訓練のスケルトンが掲載されているのですが、面倒な勾配の集約をフレームワークに任せたかったのでグラフ間レプリケーション・同期訓練で作成しました。
LogisticRegression.py
#!/usr/bin/env python
#-*- encoding: utf-8 -*-
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
from __future__ import print_function
import tensorflow as tf
import sys
import time
from tensorflow.examples.tutorials.mnist import input_data
# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
"Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
"Comma-separated list of hostname:port pairs")
# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS
# config
batch_size = 100
learning_rate = 0.001
training_epochs = 10
# ホスト間で共有可能なディレクトリを指定します。
board_path = "/var/data/shared/board"
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
worker_num = len(worker_hosts)
# cluster を作成します。
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# ローカル・タスクを実行するサーバを開始します。
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)
is_chief = (FLAGS.task_index == 0)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
# Between-graph replication
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# 更新数のカウンター
global_step = tf.get_variable('global_step', [],
initializer = tf.constant_initializer(0),
trainable = False)
with tf.name_scope('input'):
x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input") # mnist data image of shape 28*28=784
y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input") # 0〜9 10 classes
with tf.name_scope("weights"):
W = tf.Variable(tf.zeros([784, 10]))
with tf.name_scope("biases"):
b = tf.Variable(tf.zeros([10]))
with tf.name_scope("softmax"):
y = tf.nn.softmax(tf.matmul(x, W) + b)
with tf.name_scope('cross_entropy'):
#cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
cross_entropy = -tf.reduce_sum(y_*tf.log(y))
# optimizer
with tf.name_scope('train'):
grad_op = tf.train.GradientDescentOptimizer(learning_rate)
# SyncReplicasOptimizerを使うと、同期的に勾配を集約してオプティマイザに渡すことができます。
rep_op = tf.train.SyncReplicasOptimizer(grad_op,
replicas_to_aggregate=worker_num,
replica_id=FLAGS.task_index,
total_num_replicas=worker_num,
use_locking=True)
train_op = rep_op.minimize(cross_entropy, global_step=global_step)
#train_op = grad_op.minimize(cross_entropy, global_step=global_step)
init_token_op = rep_op.get_init_tokens_op()
chief_queue_runner = rep_op.get_chief_queue_runner()
with tf.name_scope('Accuracy'):
# accuracy
correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
tf.scalar_summary("cost", cross_entropy)
tf.scalar_summary("accuracy", accuracy)
#saver = tf.train.Saver()
summary_op = tf.merge_all_summaries()
init_op = tf.initialize_all_variables()
print("Variables initialized ...")
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
global_step=global_step,
init_op=init_op)
begin_time = time.time()
frequency = 100
with sv.prepare_or_wait_for_session(server.target, config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=True)) as sess:
# is chief
if is_chief:
sv.start_queue_runners(sess, [chief_queue_runner])
sess.run(init_token_op)
writer = tf.train.SummaryWriter(board_path, graph=tf.get_default_graph())
start_time = time.time()
for epoch in range(training_epochs):
batch_count = int(mnist.train.num_examples/batch_size)
count = 0
for i in range(batch_count):
batch_x, batch_y = mnist.train.next_batch(batch_size)
if i % worker_num == FLAGS.task_index:
continue
_, cost, summary, step = sess.run(
[train_op, cross_entropy, summary_op, global_step],
feed_dict={x: batch_x, y_: batch_y})
writer.add_summary(summary, step)
count += 1
if count % frequency == 0 or i+1 == batch_count:
elapsed_time = time.time() - start_time
start_time = time.time()
print("Step: %d," % (step+1),
" Epoch: %2d," % (epoch+1),
" Batch: %3d of %3d," % (i+1, batch_count),
" Cost: %.4f," % cost,
" AvgTime: %3.2fms" % float(elapsed_time*1000/frequency))
count = 0
print("Test-Accuracy: %2.2f" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
print("Total Time: %3.2fs" % float(time.time() - begin_time))
print("Final Cost: %.4f" % cost)
#sv.stop()
if is_chief:
sv.request_stop()
else:
sv.stop()
print("done")
if __name__ == "__main__":
tf.app.run()
このソースコードはDISTRIBUTED TENSORFLOW EXAMPLEを大変参考にさせて頂きました。
実行
dockerコンテナに入り上記のLogisticRegression.pyを実行します。
パラメータ・サーバから起動します。
docker exec -it ps_93 /bin/bash root@2acc72e5e58b: python /var/data/shared/LogisticRegression.py --ps_hosts=tensorflow.x1.com:3223,tensorflow.x1.com:3224 --worker_hosts=tensorflow.x1.com:2223,tensorflow.x1.com:2224 --job_name=ps --task_index=0 docker exec -it ps_94 /bin/bash root@08e6eb86c897: python /var/data/shared/LogisticRegression.py --ps_hosts=tensorflow.x1.com:3223,tensorflow.x1.com:3224 --worker_hosts=tensorflow.x1.com:2223,tensorflow.x1.com:2224 --job_name=ps --task_index=1
ワーカー・ホストはマスターノードから起動します。
docker exec -it wk_93 /bin/bash root@a5e04c49edec: python /var/data/shared/LogisticRegression.py --ps_hosts=tensorflow.x1.com:3223,tensorflow.x1.com:3224 --worker_hosts=tensorflow.x1.com:2223,tensorflow.x1.com:2224 --job_name=worker --task_index=0 docker exec -it wk_94 /bin/bash root@bde152d25ffd: python /var/data/shared/LogisticRegression.py --ps_hosts=tensorflow.x1.com:3223,tensorflow.x1.com:3224 --worker_hosts=tensorflow.x1.com:2223,tensorflow.x1.com:2224 --job_name= worker --task_index=1
結果
実行結果です。epoch=10で実行しました。
Test-Accuracy: 0.92 Total Time: 25.24s Final Cost: 47.4627
シングルノード版よりも少し精度が良いという以外な結果に…?
各ノードのtopの様子です。
すべてのCPUが使われているのがわかります。
ソースコードはこちらです。
DistributedTensorFlow.ipynb


