blob: c946533498dca8d84f86adb99ba6e402f6b04220 [file] [log] [blame]
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for lite.py functionality related to TensorFlow 2.0."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import numpy as np
from tensorflow.lite.python import lite
from tensorflow.lite.python.interpreter import Interpreter
from tensorflow.python import keras
from tensorflow.python.eager import def_function
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import tensor_spec
from tensorflow.python.framework import test_util
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import gen_array_ops
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import nn_ops
from tensorflow.python.ops import variables
from tensorflow.python.platform import test
from tensorflow.python.saved_model.save import save
from tensorflow.python.training.tracking import tracking
class TestModels(test_util.TensorFlowTestCase):
def _evaluateTFLiteModel(self, tflite_model, input_data):
"""Evaluates the model on the `input_data`."""
interpreter = Interpreter(model_content=tflite_model)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
for input_tensor, tensor_data in zip(input_details, input_data):
interpreter.set_tensor(input_tensor['index'], tensor_data.numpy())
interpreter.invoke()
return interpreter.get_tensor(output_details[0]['index'])
def _getSimpleVariableModel(self):
root = tracking.AutoTrackable()
root.v1 = variables.Variable(3.)
root.v2 = variables.Variable(2.)
root.f = def_function.function(lambda x: root.v1 * root.v2 * x)
return root
def _getMultiFunctionModel(self):
class BasicModel(tracking.AutoTrackable):
def __init__(self):
self.y = None
self.z = None
@def_function.function
def add(self, x):
if self.y is None:
self.y = variables.Variable(2.)
return x + self.y
@def_function.function
def sub(self, x):
if self.z is None:
self.z = variables.Variable(3.)
return x - self.z
return BasicModel()
def _assertValidDebugInfo(self, debug_info):
"""Verify the DebugInfo is valid."""
file_names = set()
for file_path in debug_info.files:
file_names.add(os.path.basename(file_path))
# To make the test independent on how the nodes are created, we only assert
# the name of this test file.
self.assertIn('lite_v2_test.py', file_names)
self.assertNotIn('lite_test.py', file_names)
class FromConcreteFunctionTest(TestModels):
@test_util.run_v2_only
def testTypeInvalid(self):
root = self._getSimpleVariableModel()
with self.assertRaises(ValueError) as error:
_ = lite.TFLiteConverterV2.from_concrete_functions([root.f])
self.assertIn('call from_concrete_function', str(error.exception))
@test_util.run_v2_only
def testFloat(self):
root = self._getSimpleVariableModel()
input_data = constant_op.constant(1., shape=[1])
concrete_func = root.f.get_concrete_function(input_data)
# Convert model.
converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func])
tflite_model = converter.convert()
# Check values from converted model.
expected_value = root.f(input_data)
actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])
self.assertEqual(expected_value.numpy(), actual_value)
@test_util.run_v2_only
def testScalarInput(self):
root = self._getSimpleVariableModel()
input_data = constant_op.constant(1., shape=[])
concrete_func = root.f.get_concrete_function(input_data)
# Convert model.
converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func])
tflite_model = converter.convert()
# Check values from converted model.
expected_value = root.f(input_data)
actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])
self.assertEqual(expected_value.numpy(), actual_value)
@test_util.run_v2_only
def testMultiFunctionModel(self):
"""Convert a single model in a multi-functional model."""
root = self._getMultiFunctionModel()
input_data = constant_op.constant(1., shape=[1])
concrete_func = root.add.get_concrete_function(input_data)
# Convert model and ensure model is not None.
converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func])
tflite_model = converter.convert()
# Check values from converted model.
expected_value = root.add(input_data)
actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])
self.assertEqual(expected_value.numpy(), actual_value)
@test_util.run_v2_only
def testConvertMultipleFunctions(self):
"""Convert multiple functions in a multi-functional model."""
root = self._getMultiFunctionModel()
input_data = constant_op.constant(1., shape=[1])
add_func = root.add.get_concrete_function(input_data)
sub_func = root.sub.get_concrete_function(input_data)
# Try converting multiple functions.
converter = lite.TFLiteConverterV2.from_concrete_functions(
[add_func, sub_func])
with self.assertRaises(ValueError) as error:
_ = converter.convert()
self.assertIn('can only convert a single ConcreteFunction',
str(error.exception))
def _getCalibrationQuantizeModel(self):
np.random.seed(0)
root = tracking.AutoTrackable()
@def_function.function(input_signature=[
tensor_spec.TensorSpec(shape=[1, 5, 5, 3], dtype=dtypes.float32)
])
def func(inp):
conv = nn_ops.conv2d(
inp,
filter=array_ops.ones([3, 3, 3, 16]),
strides=[1, 1, 1, 1],
padding='SAME')
output = nn_ops.relu(conv, name='output')
return output
def calibration_gen():
for _ in range(5):
yield [np.random.uniform(-1, 1, size=(1, 5, 5, 3)).astype(np.float32)]
root.f = func
to_save = root.f.get_concrete_function()
return (to_save, calibration_gen)
def testPostTrainingCalibrateAndQuantize(self):
func, calibration_gen = self._getCalibrationQuantizeModel()
# Convert float model.
float_converter = lite.TFLiteConverterV2.from_concrete_functions([func])
float_tflite = float_converter.convert()
self.assertTrue(float_tflite)
# Convert quantized model.
quantized_converter = lite.TFLiteConverterV2.from_concrete_functions([func])
quantized_converter.optimizations = [lite.Optimize.DEFAULT]
quantized_converter.representative_dataset = calibration_gen
quantized_tflite = quantized_converter.convert()
self.assertTrue(quantized_tflite)
# The default input and output types should be float.
interpreter = Interpreter(model_content=quantized_tflite)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
self.assertEqual(1, len(input_details))
self.assertEqual(np.float32, input_details[0]['dtype'])
output_details = interpreter.get_output_details()
self.assertEqual(1, len(output_details))
self.assertEqual(np.float32, output_details[0]['dtype'])
# Ensure that the quantized weights tflite model is smaller.
self.assertLess(len(quantized_tflite), len(float_tflite))
def testCalibrateAndQuantizeBuiltinInt8(self):
func, calibration_gen = self._getCalibrationQuantizeModel()
# Convert float model.
float_converter = lite.TFLiteConverterV2.from_concrete_functions([func])
float_tflite = float_converter.convert()
self.assertTrue(float_tflite)
# Convert model by specifying target spec (instead of optimizations), since
# when targeting an integer only backend, quantization is mandatory.
quantized_converter = lite.TFLiteConverterV2.from_concrete_functions([func])
quantized_converter.target_spec.supported_ops = [
lite.OpsSet.TFLITE_BUILTINS_INT8
]
quantized_converter.representative_dataset = calibration_gen
quantized_tflite = quantized_converter.convert()
self.assertTrue(quantized_tflite)
# The default input and output types should be float.
interpreter = Interpreter(model_content=quantized_tflite)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
self.assertEqual(1, len(input_details))
self.assertEqual(np.float32, input_details[0]['dtype'])
output_details = interpreter.get_output_details()
self.assertEqual(1, len(output_details))
self.assertEqual(np.float32, output_details[0]['dtype'])
# Ensure that the quantized weights tflite model is smaller.
self.assertLess(len(quantized_tflite), len(float_tflite))
@test_util.run_v2_only
def testEmbeddings(self):
"""Test model with embeddings."""
input_data = constant_op.constant(
np.array(np.random.random_sample((20)), dtype=np.int32))
class EmbeddingModel(keras.Model):
def __init__(self):
super(EmbeddingModel, self).__init__()
self.shared_weights = self.add_weight(
'weights',
shape=(2000, 300),
dtype=dtypes.float32,
initializer=init_ops.random_normal_initializer(
mean=0.0, stddev=300**(-0.5)))
@def_function.function(input_signature=[
tensor_spec.TensorSpec(shape=(20), dtype=dtypes.int32)
])
def func(self, x):
return array_ops.gather(self.shared_weights, x)
# Building the model.
root = EmbeddingModel()
concrete_func = root.func.get_concrete_function()
# Convert model.
converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func])
tflite_model = converter.convert()
# Check values from converted model.
expected_value = root.func(input_data)
actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])
np.testing.assert_almost_equal(expected_value.numpy(), actual_value, 5)
@test_util.run_v2_only
def testGraphDebugInfo(self):
"""Test a concrete function has debug info captured."""
root = tracking.AutoTrackable()
root.v1 = variables.Variable(3.)
root.f = def_function.function(lambda x: root.v1 * x)
input_data = constant_op.constant(1., shape=[1])
concrete_func = root.f.get_concrete_function(input_data)
# Convert model.
converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func])
converter.convert()
self._assertValidDebugInfo(converter._debug_info)
class FromSavedModelTest(TestModels):
@test_util.run_v2_only
def testConstModel(self):
"""Test a basic model with functions to make sure functions are inlined."""
input_data = constant_op.constant(1., shape=[1])
root = tracking.AutoTrackable()
root.f = def_function.function(lambda x: 2. * x)
to_save = root.f.get_concrete_function(input_data)
save_dir = os.path.join(self.get_temp_dir(), 'saved_model')
save(root, save_dir, to_save)
# Convert model and ensure model is not None.
converter = lite.TFLiteConverterV2.from_saved_model(save_dir)
tflite_model = converter.convert()
# Check values from converted model.
expected_value = root.f(input_data)
actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])
self.assertEqual(expected_value.numpy(), actual_value)
@test_util.run_v2_only
def testVariableModel(self):
"""Test a basic model with Variables with saving/loading the SavedModel."""
root = self._getSimpleVariableModel()
input_data = constant_op.constant(1., shape=[1])
to_save = root.f.get_concrete_function(input_data)
save_dir = os.path.join(self.get_temp_dir(), 'saved_model')
save(root, save_dir, to_save)
# Convert model and ensure model is not None.
converter = lite.TFLiteConverterV2.from_saved_model(save_dir)
tflite_model = converter.convert()
# Check values from converted model.
expected_value = root.f(input_data)
actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])
self.assertEqual(expected_value.numpy(), actual_value)
@test_util.run_v2_only
def testSignatures(self):
"""Test values for `signature_keys` argument."""
root = self._getSimpleVariableModel()
input_data = constant_op.constant(1., shape=[1])
to_save = root.f.get_concrete_function(input_data)
save_dir = os.path.join(self.get_temp_dir(), 'saved_model')
save(root, save_dir, to_save)
# Convert model with invalid `signature_keys`.
with self.assertRaises(ValueError) as error:
_ = lite.TFLiteConverterV2.from_saved_model(
save_dir, signature_keys=['INVALID'])
self.assertIn("Invalid signature key 'INVALID'", str(error.exception))
# Convert model with empty `signature_keys`.
converter = lite.TFLiteConverterV2.from_saved_model(
save_dir, signature_keys=[])
tflite_model = converter.convert()
# Check values from converted model.
expected_value = root.f(input_data)
actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])
self.assertEqual(expected_value.numpy(), actual_value)
@test_util.run_v2_only
def testMultipleFunctionModel(self):
"""Convert multiple functions in a multi-functional model."""
root = self._getMultiFunctionModel()
input_data = constant_op.constant(1., shape=[1])
add_func = root.add.get_concrete_function(input_data)
sub_func = root.sub.get_concrete_function(input_data)
save_dir = os.path.join(self.get_temp_dir(), 'saved_model')
save(root, save_dir, {'add': add_func, 'sub': sub_func})
# Ensure the converter generates.
converter = lite.TFLiteConverterV2.from_saved_model(save_dir)
self.assertEqual(len(converter._funcs), 2)
# Try converting multiple functions.
with self.assertRaises(ValueError) as error:
_ = converter.convert()
self.assertIn('This converter can only convert a single ConcreteFunction',
str(error.exception))
@test_util.run_v2_only
def testKerasSequentialModel(self):
"""Test a simple sequential tf.Keras model."""
input_data = constant_op.constant(1., shape=[1, 1])
x = np.array([[1.], [2.]])
y = np.array([[2.], [4.]])
model = keras.models.Sequential([
keras.layers.Dropout(0.2),
keras.layers.Dense(1),
])
model.compile(optimizer='sgd', loss='mean_squared_error')
model.fit(x, y, epochs=1)
save_dir = os.path.join(self.get_temp_dir(), 'saved_model')
save(model, save_dir)
# Convert model and ensure model is not None.
converter = lite.TFLiteConverterV2.from_saved_model(save_dir)
tflite_model = converter.convert()
# Check values from converted model.
expected_value = model.predict(input_data)
actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])
self.assertEqual(expected_value, actual_value)
@test_util.run_v2_only
def testGraphDebugInfo(self):
"""Test a SavedModel has debug info captured."""
input_data = constant_op.constant(1., shape=[1])
root = tracking.AutoTrackable()
root.f = def_function.function(lambda x: 2. * x)
to_save = root.f.get_concrete_function(input_data)
save_dir = os.path.join(self.get_temp_dir(), 'saved_model')
save(root, save_dir, to_save)
# Convert model and ensure model is not None.
converter = lite.TFLiteConverterV2.from_saved_model(save_dir)
converter.convert()
self._assertValidDebugInfo(converter._debug_info)
class FromKerasModelTest(TestModels):
@test_util.run_v2_only
def testSequentialModel(self):
"""Test a simple sequential tf.Keras model."""
input_data = constant_op.constant(1., shape=[1, 1])
# Create a simple Keras model.
x = np.array([[1.], [2.]])
y = np.array([[2.], [4.]])
model = keras.models.Sequential([
keras.layers.Dropout(0.2),
keras.layers.Dense(units=1, input_shape=[1])
])
model.compile(optimizer='sgd', loss='mean_squared_error')
model.fit(x, y, epochs=1)
# Convert model and ensure model is not None.
converter = lite.TFLiteConverterV2.from_keras_model(model)
tflite_model = converter.convert()
# Check values from converted model.
expected_value = model.predict(input_data)
actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])
self.assertEqual(expected_value, actual_value)
@test_util.run_v2_only
def testSequentialMultiInputOutputModel(self):
"""Test a tf.Keras model with multiple inputs and outputs."""
left_input_data = constant_op.constant(1., shape=[1, 3])
right_input_data = constant_op.constant(1., shape=[1, 3])
# Create a simple Keras model.
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 3))
output_c_np = np.random.random((10, 3))
output_d_np = np.random.random((10, 2))
input_a = keras.layers.Input(shape=(3,), name='input_a')
input_b = keras.layers.Input(shape=(3,), name='input_b')
dense = keras.layers.Dense(8, name='dense_1')
interm_a = dense(input_a)
interm_b = dense(input_b)
merged = keras.layers.concatenate([interm_a, interm_b], name='merge')
output_c = keras.layers.Dense(
3, activation='softmax', name='dense_2')(
merged)
output_d = keras.layers.Dense(
2, activation='softmax', name='dense_3')(
merged)
model = keras.models.Model(
inputs=[input_a, input_b], outputs=[output_c, output_d])
model.compile(optimizer='sgd', loss='mean_squared_error')
model.fit([input_a_np, input_b_np], [output_c_np, output_d_np], epochs=1)
# Convert model and ensure model is not None.
converter = lite.TFLiteConverterV2.from_keras_model(model)
tflite_model = converter.convert()
# Check values from converted model.
input_data = [left_input_data, right_input_data]
expected_value = model.predict(input_data)
actual_value = self._evaluateTFLiteModel(tflite_model, input_data)
for tf_result, tflite_result in zip(expected_value, actual_value):
np.testing.assert_almost_equal(tf_result[0], tflite_result, 5)
@test_util.run_v2_only
def testGraphDebugInfo(self):
"""Test a tf.Keras model has debug info captured."""
# Create a simple Keras model.
x = [-1, 0, 1, 2, 3, 4]
y = [-3, -1, 1, 3, 5, 7]
model = keras.models.Sequential(
[keras.layers.Dense(units=1, input_shape=[1])])
model.compile(optimizer='sgd', loss='mean_squared_error')
model.fit(x, y, epochs=1)
converter = lite.TFLiteConverterV2.from_keras_model(model)
converter.convert()
self._assertValidDebugInfo(converter._debug_info)
class GrapplerTest(TestModels):
@test_util.run_v2_only
def testConstantFolding(self):
# Constant folding handles the tf.broadcast_to operation which was not
# supported by the TFLite at the time this test was added.
input_data = constant_op.constant([1., 2., 3., 4., 5., 6., 7., 8., 9.],
shape=[3, 3])
@def_function.function
def func(x):
y_const = constant_op.constant([1., 2., 3.])
y_broadcast = gen_array_ops.broadcast_to(y_const, [3, 3])
return math_ops.matmul(x, y_broadcast)
root = tracking.AutoTrackable()
root.f = func
concrete_func = root.f.get_concrete_function(input_data)
# Convert model.
converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func])
tflite_model = converter.convert()
# Check values from converted model.
expected_value = root.f(input_data)
actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])
np.testing.assert_array_equal(expected_value.numpy(), actual_value)
if __name__ == '__main__':
test.main()