blob: 303741ba89dff7992ee377e54fd983d47f16dc48 [file] [log] [blame]
# Lint as: python2, python3
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tempfile
import numpy as np
from six.moves import range
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
from tensorflow.python.framework import test_util
from tensorflow.python.platform import test
# Number of steps to train model.
# Dial to 0 means no training at all, all the weights will be just using their
# initial values. This can help make the test smaller.
TRAIN_STEPS = 0
CONFIG = tf.ConfigProto(device_count={"GPU": 0})
class UnidirectionalSequenceLstmTest(test_util.TensorFlowTestCase):
def setUp(self):
tf.reset_default_graph()
# Import MNIST dataset
self.mnist = input_data.read_data_sets(
"/tmp/data/", fake_data=True, one_hot=True)
# Define constants
# Unrolled through 28 time steps
self.time_steps = 28
# Rows of 28 pixels
self.n_input = 28
# Learning rate for Adam optimizer
self.learning_rate = 0.001
# MNIST is meant to be classified in 10 classes(0-9).
self.n_classes = 10
# Batch size
self.batch_size = 16
# Lstm Units.
self.num_units = 16
def buildLstmLayer(self):
return tf.keras.layers.StackedRNNCells([
tf.lite.experimental.nn.TFLiteLSTMCell(
self.num_units, use_peepholes=True, forget_bias=1.0, name="rnn1"),
tf.lite.experimental.nn.TFLiteLSTMCell(
self.num_units, num_proj=8, forget_bias=1.0, name="rnn2"),
tf.lite.experimental.nn.TFLiteLSTMCell(
self.num_units // 2,
use_peepholes=True,
num_proj=8,
forget_bias=0,
name="rnn3"),
tf.lite.experimental.nn.TFLiteLSTMCell(
self.num_units, forget_bias=1.0, name="rnn4")
])
def buildModel(self, lstm_layer, is_dynamic_rnn):
"""Build Mnist recognition model.
Args:
lstm_layer: The lstm layer either a single lstm cell or a multi lstm cell.
is_dynamic_rnn: Use dynamic_rnn or not.
Returns:
A tuple containing:
- Input tensor of the model.
- Prediction tensor of the model.
- Output class tensor of the model.
"""
# Weights and biases for output softmax layer.
out_weights = tf.Variable(
tf.random.normal([self.num_units, self.n_classes]))
out_bias = tf.Variable(tf.random.normal([self.n_classes]))
# input image placeholder
x = tf.placeholder(
"float", [None, self.time_steps, self.n_input], name="INPUT_IMAGE")
# x is shaped [batch_size,time_steps,num_inputs]
if is_dynamic_rnn:
lstm_input = tf.transpose(x, perm=[1, 0, 2])
outputs, _ = tf.lite.experimental.nn.dynamic_rnn(
lstm_layer, lstm_input, dtype="float32")
outputs = tf.unstack(outputs, axis=0)
else:
lstm_input = tf.unstack(x, self.time_steps, 1)
outputs, _ = tf.nn.static_rnn(lstm_layer, lstm_input, dtype="float32")
# Compute logits by multiplying outputs[-1] of shape [batch_size,num_units]
# by the softmax layer's out_weight of shape [num_units,n_classes]
# plus out_bias
prediction = tf.matmul(outputs[-1], out_weights) + out_bias
output_class = tf.nn.softmax(prediction, name="OUTPUT_CLASS")
return x, prediction, output_class
def trainModel(self, x, prediction, output_class, sess):
"""Train the model.
Args:
x: The input tensor.
prediction: The prediction class tensor.
output_class: The output tensor.
sess: The graph session.
"""
# input label placeholder
y = tf.placeholder("float", [None, self.n_classes])
# Loss function
loss = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(logits=prediction, labels=y))
# Optimization
opt = tf.train.AdamOptimizer(
learning_rate=self.learning_rate).minimize(loss)
# Initialize variables
init = tf.global_variables_initializer()
sess.run(init)
for _ in range(TRAIN_STEPS):
batch_x, batch_y = self.mnist.train.next_batch(
batch_size=self.batch_size, fake_data=True)
batch_x = np.array(batch_x)
batch_y = np.array(batch_y)
batch_x = batch_x.reshape((self.batch_size, self.time_steps,
self.n_input))
sess.run(opt, feed_dict={x: batch_x, y: batch_y})
def saveAndRestoreModel(self, lstm_layer, sess, saver, is_dynamic_rnn):
"""Saves and restores the model to mimic the most common use case.
Args:
lstm_layer: The lstm layer either a single lstm cell or a multi lstm cell.
sess: Old session.
saver: Saver created by tf.compat.v1.train.Saver()
is_dynamic_rnn: Use dynamic_rnn or not.
Returns:
A tuple containing:
- Input tensor of the restored model.
- Prediction tensor of the restored model.
- Output tensor, which is the softwmax result of the prediction tensor.
- new session of the restored model.
"""
model_dir = tempfile.mkdtemp()
saver.save(sess, model_dir)
# Reset the graph.
tf.reset_default_graph()
x, prediction, output_class = self.buildModel(lstm_layer, is_dynamic_rnn)
new_sess = tf.compat.v1.Session(config=CONFIG)
saver = tf.train.Saver()
saver.restore(new_sess, model_dir)
return x, prediction, output_class, new_sess
def getInferenceResult(self, x, output_class, sess):
"""Get inference result given input tensor and output tensor.
Args:
x: The input tensor.
output_class: The output tensor.
sess: Current session.
Returns:
A tuple containing:
- Input of the next batch, batch size is 1.
- Expected output.
"""
b1, _ = self.mnist.train.next_batch(batch_size=1, fake_data=True)
b1 = np.array(b1, dtype=np.dtype("float32"))
sample_input = np.reshape(b1, (1, self.time_steps, self.n_input))
expected_output = sess.run(output_class, feed_dict={x: sample_input})
return sample_input, expected_output
def tfliteInvoke(self,
sess,
test_inputs,
input_tensor,
output_tensor,
use_mlir_converter=False):
"""Get tflite inference result.
This method will convert tensorflow from session to tflite model then based
on the inputs, run tflite inference and return the results.
Args:
sess: Current tensorflow session.
test_inputs: The test inputs for tflite.
input_tensor: The input tensor of tensorflow graph.
output_tensor: The output tensor of tensorflow graph.
use_mlir_converter: Whether or not to use MLIRConverter to convert the
model.
Returns:
The tflite inference result.
"""
converter = tf.lite.TFLiteConverter.from_session(sess, [input_tensor],
[output_tensor])
converter.experimental_new_converter = use_mlir_converter
tflite = converter.convert()
interpreter = tf.lite.Interpreter(model_content=tflite)
try:
interpreter.allocate_tensors()
except ValueError:
assert False
input_index = (interpreter.get_input_details()[0]["index"])
interpreter.set_tensor(input_index, test_inputs)
interpreter.invoke()
output_index = (interpreter.get_output_details()[0]["index"])
result = interpreter.get_tensor(output_index)
# Reset all variables so it will not pollute other inferences.
interpreter.reset_all_variables()
return result
def testStaticRnnMultiRnnCell(self):
sess = tf.compat.v1.Session(config=CONFIG)
x, prediction, output_class = self.buildModel(
self.buildLstmLayer(), is_dynamic_rnn=False)
self.trainModel(x, prediction, output_class, sess)
saver = tf.train.Saver()
x, prediction, output_class, new_sess = self.saveAndRestoreModel(
self.buildLstmLayer(), sess, saver, is_dynamic_rnn=False)
test_inputs, expected_output = self.getInferenceResult(
x, output_class, new_sess)
# Test Toco-converted model.
result = self.tfliteInvoke(new_sess, test_inputs, x, output_class, False)
self.assertTrue(np.allclose(expected_output, result, rtol=1e-6, atol=1e-2))
# Test MLIR-Converted model.
result = self.tfliteInvoke(new_sess, test_inputs, x, output_class, True)
self.assertTrue(np.allclose(expected_output, result, rtol=1e-6, atol=1e-2))
@test_util.enable_control_flow_v2
def testDynamicRnnMultiRnnCell(self):
sess = tf.compat.v1.Session(config=CONFIG)
x, prediction, output_class = self.buildModel(
self.buildLstmLayer(), is_dynamic_rnn=True)
self.trainModel(x, prediction, output_class, sess)
saver = tf.train.Saver()
x, prediction, output_class, new_sess = self.saveAndRestoreModel(
self.buildLstmLayer(), sess, saver, is_dynamic_rnn=True)
test_inputs, expected_output = self.getInferenceResult(
x, output_class, new_sess)
# Test Toco-converted model.
result = self.tfliteInvoke(new_sess, test_inputs, x, output_class, False)
self.assertTrue(np.allclose(expected_output, result, rtol=1e-6, atol=1e-2))
# Test MLIR-converted model.
result = self.tfliteInvoke(new_sess, test_inputs, x, output_class, True)
self.assertTrue(np.allclose(expected_output, result, rtol=1e-6, atol=1e-2))
if __name__ == "__main__":
test.main()