深度學習Tensorflow生產環境部署(下·模型部署篇)
前一篇講過環境的部署篇,這一次就講講從程式碼角度如何匯出pb模型,如何進行服務呼叫。
1 hello world篇
部署完docker後,如果是cpu環境,可以直接拉取tensorflow/serving,如果是GPU環境則麻煩點,具體參考前一篇,這裡就不再贅述了。
cpu版本的可以直接拉取tensorflow/serving,docker會自動拉取latest版本:
docker pull tensorflow/serving
如果想要指定tensorflow的版本,可以去這裡檢視:https://hub.docker.com/r/tensorflow/serving/tags/
比如我需要的是1.12.0版本的tf,那麼也可以拉取指定的版本:
docker pull tensorflow/serving:1.12.0
拉取完映象,需要下載一個hello world的程式程式碼。
mkdir -p /tmp/tfserving
cd /tmp/tfserving
git clone https://github.com/tensorflow/serving
tensorflow/serving的github中有對應的測試模型,模型其實就是 y = 0.5 * x + 2。即輸入一個數,輸出是對應的y。
執行下面的命令,在docker中部署服務:
docker run -p 8501:8501 --mount type=bind,source=/tmp/serving/tensorflow_serving/servables/tensorflow/testdata/saved_model_half_plus_two_cpu,target=/models/half_plus_two -e MODEL_NAME=half_plus_two -t tensorflow/serving &
上面的命令中,把/tmp/serving/tensorflow_serving/servables/tensorflow/testdata/saved_model_half_plus_two_cpu
路徑掛載到/models/half_plus_two
,這樣tensorflow_serving就可以載入models下的模型了,然後開放內部8501的http介面。
執行docker ps
檢視服務列表:
➜ ~ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 7decb4286057 tensorflow/serving "/usr/bin/tf_serving…" 7 seconds ago Up 6 seconds 8500/tcp, 0.0.0.0:8501->8501/tcp eager_dewdney
傳送一個http請求測試一下:
curl -d '{"instances": [1.0, 2.0, 5.0]}' -X POST http://localhost:8501/v1/models/half_plus_two:predict
{
"predictions": [2.5, 3.0, 4.5
]
}%
2 mnist篇
由於前面的例子,serving工程下只有pb模型,沒有模型的訓練和匯出,因此看不出其中的門道。這一部分就直接基於手寫體識別的例子,展示一下如何從tensorflow訓練程式碼匯出模型,又如何通過grpc服務進行模型的呼叫。
訓練和匯出:
#! /usr/bin/env python
"""
訓練並匯出Softmax迴歸模型,使用SaveModel匯出訓練模型並添加簽名。
"""
from __future__ import print_function
import os
import sys
# This is a placeholder for a Google-internal import.
import tensorflow as tf
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
import basic.mnist_input_data as mnist_input_data
# 定義模型引數
tf.app.flags.DEFINE_integer('training_iteration', 10, 'number of training iterations.')
tf.app.flags.DEFINE_integer('model_version', 2, 'version number of the model.')
tf.app.flags.DEFINE_string('work_dir', './tmp', 'Working directory.')
FLAGS = tf.app.flags.FLAGS
def main(_):
# 引數校驗
# if len(sys.argv) < 2 or sys.argv[-1].startswith('-'):
# print('Usage: mnist_saved_model.py [--training_iteration=x] '
# '[--model_version=y] export_dir')
# sys.exit(-1)
# if FLAGS.training_iteration <= 0:
# print('Please specify a positive value for training iteration.')
# sys.exit(-1)
# if FLAGS.model_version <= 0:
# print('Please specify a positive value for version number.')
# sys.exit(-1)
# Train model
print('Training model...')
mnist = mnist_input_data.read_data_sets(FLAGS.work_dir, one_hot=True)
sess = tf.InteractiveSession()
serialized_tf_example = tf.placeholder(tf.string, name='tf_example')
feature_configs = {'x': tf.FixedLenFeature(shape=[784], dtype=tf.float32), }
tf_example = tf.parse_example(serialized_tf_example, feature_configs)
x = tf.identity(tf_example['x'], name='x') # use tf.identity() to assign name
y_ = tf.placeholder('float', shape=[None, 10])
w = tf.Variable(tf.zeros([784, 10]))
b = tf.Variable(tf.zeros([10]))
sess.run(tf.global_variables_initializer())
y = tf.nn.softmax(tf.matmul(x, w) + b, name='y')
cross_entropy = -tf.reduce_sum(y_ * tf.log(y))
train_step = tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy)
values, indices = tf.nn.top_k(y, 10)
table = tf.contrib.lookup.index_to_string_table_from_tensor(
tf.constant([str(i) for i in range(10)]))
prediction_classes = table.lookup(tf.to_int64(indices))
for _ in range(FLAGS.training_iteration):
batch = mnist.train.next_batch(50)
train_step.run(feed_dict={x: batch[0], y_: batch[1]})
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float'))
print('training accuracy %g' % sess.run(
accuracy, feed_dict={
x: mnist.test.images,
y_: mnist.test.labels
}))
print('Done training!')
# Export model
# WARNING(break-tutorial-inline-code): The following code snippet is
# in-lined in tutorials, please update tutorial documents accordingly
# whenever code changes.
# export_path_base = sys.argv[-1]
export_path_base = "/Users/xingoo/PycharmProjects/ml-in-action/實踐-tensorflow/01-官方文件-學習和使用ML/save_model"
export_path = os.path.join(tf.compat.as_bytes(export_path_base), tf.compat.as_bytes(str(FLAGS.model_version)))
print('Exporting trained model to', export_path)
# 配置匯出地址,建立SaveModel
builder = tf.saved_model.builder.SavedModelBuilder(export_path)
# Build the signature_def_map.
# 建立TensorInfo,包含type,shape,name
classification_inputs = tf.saved_model.utils.build_tensor_info(serialized_tf_example)
classification_outputs_classes = tf.saved_model.utils.build_tensor_info(prediction_classes)
classification_outputs_scores = tf.saved_model.utils.build_tensor_info(values)
# 分類簽名:演算法型別+輸入+輸出(概率和名字)
classification_signature = (
tf.saved_model.signature_def_utils.build_signature_def(
inputs={
tf.saved_model.signature_constants.CLASSIFY_INPUTS:
classification_inputs
},
outputs={
tf.saved_model.signature_constants.CLASSIFY_OUTPUT_CLASSES:
classification_outputs_classes,
tf.saved_model.signature_constants.CLASSIFY_OUTPUT_SCORES:
classification_outputs_scores
},
method_name=tf.saved_model.signature_constants.CLASSIFY_METHOD_NAME))
tensor_info_x = tf.saved_model.utils.build_tensor_info(x)
tensor_info_y = tf.saved_model.utils.build_tensor_info(y)
# 預測簽名:輸入的x和輸出的y
prediction_signature = (
tf.saved_model.signature_def_utils.build_signature_def(
inputs={'images': tensor_info_x},
outputs={'scores': tensor_info_y},
method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME))
# 構建圖和變數的資訊:
"""
sess 會話
tags 標籤,預設提供serving、train、eval、gpu、tpu
signature_def_map 簽名
main_op 初始化?
strip_default_attrs strip?
"""
# predict_images就是服務呼叫的方法
# serving_default是沒有輸入簽名時,使用的方法
builder.add_meta_graph_and_variables(
sess, [tf.saved_model.tag_constants.SERVING],
signature_def_map={
'predict_images':
prediction_signature,
tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:
classification_signature,
},
main_op=tf.tables_initializer(),
strip_default_attrs=True)
# 儲存
builder.save()
print('Done exporting!')
if __name__ == '__main__':
tf.app.run()
執行後,在當前目錄中就有一個save_model檔案,儲存了各個版本的pb模型檔案。
然後基於grpc部署服務:
docker run -p 8500:8500 --mount type=bind,source=/Users/xingoo/PycharmProjects/ml-in-action/01-實踐-tensorflow/01-官方文件-學習和使用ML/save_model,target=/models/mnist -e MODEL_NAME=mnist -t tensorflow/serving &
服務部署成功,檢視一下docker列表:
➜ ~ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
39a06cc35961 tensorflow/serving "/usr/bin/tf_serving…" 4 seconds ago Up 3 seconds 0.0.0.0:8500->8500/tcp, 8501/tcp hardcore_galileo
然後編寫對應的client程式碼:
import tensorflow as tf
import basic.mnist_input_data as mnist_input_data
import grpc
import numpy as np
import sys
import threading
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc
tf.app.flags.DEFINE_integer('concurrency', 1, 'maximum number of concurrent inference requests')
tf.app.flags.DEFINE_integer('num_tests', 100, 'Number of test images')
tf.app.flags.DEFINE_string('server', 'localhost:8500', 'PredictionService host:port')
tf.app.flags.DEFINE_string('work_dir', './tmp', 'Working directory. ')
FLAGS = tf.app.flags.FLAGS
test_data_set = mnist_input_data.read_data_sets(FLAGS.work_dir).test
channel = grpc.insecure_channel(FLAGS.server)
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
class _ResultCounter(object):
"""Counter for the prediction results."""
def __init__(self, num_tests, concurrency):
self._num_tests = num_tests
self._concurrency = concurrency
self._error = 0
self._done = 0
self._active = 0
self._condition = threading.Condition()
def inc_error(self):
with self._condition:
self._error += 1
def inc_done(self):
with self._condition:
self._done += 1
self._condition.notify()
def dec_active(self):
with self._condition:
self._active -= 1
self._condition.notify()
def get_error_rate(self):
with self._condition:
while self._done != self._num_tests:
self._condition.wait()
return self._error / float(self._num_tests)
def throttle(self):
with self._condition:
while self._active == self._concurrency:
self._condition.wait()
self._active += 1
def _create_rpc_callback(label, result_counter):
def _callback(result_future):
exception = result_future.exception()
if exception:
result_counter.inc_error()
print(exception)
else:
response = np.array(result_future.result().outputs['scores'].float_val)
prediction = np.argmax(response)
sys.stdout.write("%s - %s\n" % (label, prediction))
sys.stdout.flush()
result_counter.inc_done()
result_counter.dec_active()
return _callback
result_counter = _ResultCounter(FLAGS.num_tests, FLAGS.concurrency)
for i in range(FLAGS.num_tests):
request = predict_pb2.PredictRequest()
request.model_spec.name = 'mnist'
request.model_spec.signature_name = 'predict_images'
image, label = test_data_set.next_batch(1)
request.inputs['images'].CopyFrom(tf.contrib.util.make_tensor_proto(image[0], shape=[1, image[0].size]))
result_counter.throttle()
result_future = stub.Predict.future(request, 5.0) # 5 seconds
result_future.add_done_callback(_create_rpc_callback(label[0], result_counter))
print(result_counter.get_error_rate())
得到對應的輸出:
3 - 3
6 - 6
9 - 9
3 - 3
1 - 1
4 - 9
1 - 5
7 - 9
6 - 6
9 - 9
0.0