| # Copyright 2019 The TensorFlow Authors. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============================================================================== |
| """Tests for lite.py functionality related to TensorFlow 2.0.""" |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import os |
| import numpy as np |
| |
| from tensorflow.lite.python import lite |
| from tensorflow.lite.python.interpreter import Interpreter |
| from tensorflow.python import keras |
| from tensorflow.python.eager import def_function |
| from tensorflow.python.framework import constant_op |
| from tensorflow.python.framework import dtypes |
| from tensorflow.python.framework import tensor_spec |
| from tensorflow.python.framework import test_util |
| from tensorflow.python.ops import array_ops |
| from tensorflow.python.ops import gen_array_ops |
| from tensorflow.python.ops import init_ops |
| from tensorflow.python.ops import math_ops |
| from tensorflow.python.ops import nn_ops |
| from tensorflow.python.ops import variables |
| from tensorflow.python.platform import test |
| from tensorflow.python.saved_model.save import save |
| from tensorflow.python.training.tracking import tracking |
| |
| |
| class TestModels(test_util.TensorFlowTestCase): |
| |
| def _evaluateTFLiteModel(self, tflite_model, input_data): |
| """Evaluates the model on the `input_data`.""" |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| output_details = interpreter.get_output_details() |
| |
| for input_tensor, tensor_data in zip(input_details, input_data): |
| interpreter.set_tensor(input_tensor['index'], tensor_data.numpy()) |
| interpreter.invoke() |
| return interpreter.get_tensor(output_details[0]['index']) |
| |
| def _getSimpleVariableModel(self): |
| root = tracking.AutoTrackable() |
| root.v1 = variables.Variable(3.) |
| root.v2 = variables.Variable(2.) |
| root.f = def_function.function(lambda x: root.v1 * root.v2 * x) |
| return root |
| |
| def _getMultiFunctionModel(self): |
| |
| class BasicModel(tracking.AutoTrackable): |
| |
| def __init__(self): |
| self.y = None |
| self.z = None |
| |
| @def_function.function |
| def add(self, x): |
| if self.y is None: |
| self.y = variables.Variable(2.) |
| return x + self.y |
| |
| @def_function.function |
| def sub(self, x): |
| if self.z is None: |
| self.z = variables.Variable(3.) |
| return x - self.z |
| |
| return BasicModel() |
| |
| def _assertValidDebugInfo(self, debug_info): |
| """Verify the DebugInfo is valid.""" |
| file_names = set() |
| for file_path in debug_info.files: |
| file_names.add(os.path.basename(file_path)) |
| # To make the test independent on how the nodes are created, we only assert |
| # the name of this test file. |
| self.assertIn('lite_v2_test.py', file_names) |
| self.assertNotIn('lite_test.py', file_names) |
| |
| |
| class FromConcreteFunctionTest(TestModels): |
| |
| @test_util.run_v2_only |
| def testTypeInvalid(self): |
| root = self._getSimpleVariableModel() |
| with self.assertRaises(ValueError) as error: |
| _ = lite.TFLiteConverterV2.from_concrete_functions([root.f]) |
| self.assertIn('call from_concrete_function', str(error.exception)) |
| |
| @test_util.run_v2_only |
| def testFloat(self): |
| root = self._getSimpleVariableModel() |
| input_data = constant_op.constant(1., shape=[1]) |
| concrete_func = root.f.get_concrete_function(input_data) |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.f(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value.numpy(), actual_value) |
| |
| @test_util.run_v2_only |
| def testScalarInput(self): |
| root = self._getSimpleVariableModel() |
| input_data = constant_op.constant(1., shape=[]) |
| concrete_func = root.f.get_concrete_function(input_data) |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.f(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value.numpy(), actual_value) |
| |
| @test_util.run_v2_only |
| def testMultiFunctionModel(self): |
| """Convert a single model in a multi-functional model.""" |
| root = self._getMultiFunctionModel() |
| input_data = constant_op.constant(1., shape=[1]) |
| concrete_func = root.add.get_concrete_function(input_data) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.add(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value.numpy(), actual_value) |
| |
| @test_util.run_v2_only |
| def testConvertMultipleFunctions(self): |
| """Convert multiple functions in a multi-functional model.""" |
| root = self._getMultiFunctionModel() |
| input_data = constant_op.constant(1., shape=[1]) |
| add_func = root.add.get_concrete_function(input_data) |
| sub_func = root.sub.get_concrete_function(input_data) |
| |
| # Try converting multiple functions. |
| converter = lite.TFLiteConverterV2.from_concrete_functions( |
| [add_func, sub_func]) |
| with self.assertRaises(ValueError) as error: |
| _ = converter.convert() |
| self.assertIn('can only convert a single ConcreteFunction', |
| str(error.exception)) |
| |
| def _getCalibrationQuantizeModel(self): |
| np.random.seed(0) |
| |
| root = tracking.AutoTrackable() |
| |
| @def_function.function(input_signature=[ |
| tensor_spec.TensorSpec(shape=[1, 5, 5, 3], dtype=dtypes.float32) |
| ]) |
| def func(inp): |
| conv = nn_ops.conv2d( |
| inp, |
| filter=array_ops.ones([3, 3, 3, 16]), |
| strides=[1, 1, 1, 1], |
| padding='SAME') |
| output = nn_ops.relu(conv, name='output') |
| return output |
| |
| def calibration_gen(): |
| for _ in range(5): |
| yield [np.random.uniform(-1, 1, size=(1, 5, 5, 3)).astype(np.float32)] |
| |
| root.f = func |
| to_save = root.f.get_concrete_function() |
| return (to_save, calibration_gen) |
| |
| def testPostTrainingCalibrateAndQuantize(self): |
| func, calibration_gen = self._getCalibrationQuantizeModel() |
| |
| # Convert float model. |
| float_converter = lite.TFLiteConverterV2.from_concrete_functions([func]) |
| float_tflite = float_converter.convert() |
| self.assertTrue(float_tflite) |
| |
| # Convert quantized model. |
| quantized_converter = lite.TFLiteConverterV2.from_concrete_functions([func]) |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_converter.representative_dataset = calibration_gen |
| quantized_tflite = quantized_converter.convert() |
| self.assertTrue(quantized_tflite) |
| |
| # The default input and output types should be float. |
| interpreter = Interpreter(model_content=quantized_tflite) |
| interpreter.allocate_tensors() |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| |
| # Ensure that the quantized weights tflite model is smaller. |
| self.assertLess(len(quantized_tflite), len(float_tflite)) |
| |
| def testCalibrateAndQuantizeBuiltinInt8(self): |
| func, calibration_gen = self._getCalibrationQuantizeModel() |
| |
| # Convert float model. |
| float_converter = lite.TFLiteConverterV2.from_concrete_functions([func]) |
| float_tflite = float_converter.convert() |
| self.assertTrue(float_tflite) |
| |
| # Convert model by specifying target spec (instead of optimizations), since |
| # when targeting an integer only backend, quantization is mandatory. |
| quantized_converter = lite.TFLiteConverterV2.from_concrete_functions([func]) |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.TFLITE_BUILTINS_INT8 |
| ] |
| quantized_converter.representative_dataset = calibration_gen |
| quantized_tflite = quantized_converter.convert() |
| self.assertTrue(quantized_tflite) |
| |
| # The default input and output types should be float. |
| interpreter = Interpreter(model_content=quantized_tflite) |
| interpreter.allocate_tensors() |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| |
| # Ensure that the quantized weights tflite model is smaller. |
| self.assertLess(len(quantized_tflite), len(float_tflite)) |
| |
| @test_util.run_v2_only |
| def testEmbeddings(self): |
| """Test model with embeddings.""" |
| input_data = constant_op.constant( |
| np.array(np.random.random_sample((20)), dtype=np.int32)) |
| |
| class EmbeddingModel(keras.Model): |
| |
| def __init__(self): |
| super(EmbeddingModel, self).__init__() |
| self.shared_weights = self.add_weight( |
| 'weights', |
| shape=(2000, 300), |
| dtype=dtypes.float32, |
| initializer=init_ops.random_normal_initializer( |
| mean=0.0, stddev=300**(-0.5))) |
| |
| @def_function.function(input_signature=[ |
| tensor_spec.TensorSpec(shape=(20), dtype=dtypes.int32) |
| ]) |
| def func(self, x): |
| return array_ops.gather(self.shared_weights, x) |
| |
| # Building the model. |
| root = EmbeddingModel() |
| concrete_func = root.func.get_concrete_function() |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.func(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| np.testing.assert_almost_equal(expected_value.numpy(), actual_value, 5) |
| |
| @test_util.run_v2_only |
| def testGraphDebugInfo(self): |
| """Test a concrete function has debug info captured.""" |
| root = tracking.AutoTrackable() |
| root.v1 = variables.Variable(3.) |
| root.f = def_function.function(lambda x: root.v1 * x) |
| input_data = constant_op.constant(1., shape=[1]) |
| concrete_func = root.f.get_concrete_function(input_data) |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| converter.convert() |
| self._assertValidDebugInfo(converter._debug_info) |
| |
| |
| class FromSavedModelTest(TestModels): |
| |
| @test_util.run_v2_only |
| def testConstModel(self): |
| """Test a basic model with functions to make sure functions are inlined.""" |
| input_data = constant_op.constant(1., shape=[1]) |
| root = tracking.AutoTrackable() |
| root.f = def_function.function(lambda x: 2. * x) |
| to_save = root.f.get_concrete_function(input_data) |
| |
| save_dir = os.path.join(self.get_temp_dir(), 'saved_model') |
| save(root, save_dir, to_save) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_saved_model(save_dir) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.f(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value.numpy(), actual_value) |
| |
| @test_util.run_v2_only |
| def testVariableModel(self): |
| """Test a basic model with Variables with saving/loading the SavedModel.""" |
| root = self._getSimpleVariableModel() |
| input_data = constant_op.constant(1., shape=[1]) |
| to_save = root.f.get_concrete_function(input_data) |
| |
| save_dir = os.path.join(self.get_temp_dir(), 'saved_model') |
| save(root, save_dir, to_save) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_saved_model(save_dir) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.f(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value.numpy(), actual_value) |
| |
| @test_util.run_v2_only |
| def testSignatures(self): |
| """Test values for `signature_keys` argument.""" |
| root = self._getSimpleVariableModel() |
| input_data = constant_op.constant(1., shape=[1]) |
| to_save = root.f.get_concrete_function(input_data) |
| |
| save_dir = os.path.join(self.get_temp_dir(), 'saved_model') |
| save(root, save_dir, to_save) |
| |
| # Convert model with invalid `signature_keys`. |
| with self.assertRaises(ValueError) as error: |
| _ = lite.TFLiteConverterV2.from_saved_model( |
| save_dir, signature_keys=['INVALID']) |
| self.assertIn("Invalid signature key 'INVALID'", str(error.exception)) |
| |
| # Convert model with empty `signature_keys`. |
| converter = lite.TFLiteConverterV2.from_saved_model( |
| save_dir, signature_keys=[]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.f(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value.numpy(), actual_value) |
| |
| @test_util.run_v2_only |
| def testMultipleFunctionModel(self): |
| """Convert multiple functions in a multi-functional model.""" |
| root = self._getMultiFunctionModel() |
| input_data = constant_op.constant(1., shape=[1]) |
| add_func = root.add.get_concrete_function(input_data) |
| sub_func = root.sub.get_concrete_function(input_data) |
| |
| save_dir = os.path.join(self.get_temp_dir(), 'saved_model') |
| save(root, save_dir, {'add': add_func, 'sub': sub_func}) |
| |
| # Ensure the converter generates. |
| converter = lite.TFLiteConverterV2.from_saved_model(save_dir) |
| self.assertEqual(len(converter._funcs), 2) |
| |
| # Try converting multiple functions. |
| with self.assertRaises(ValueError) as error: |
| _ = converter.convert() |
| self.assertIn('This converter can only convert a single ConcreteFunction', |
| str(error.exception)) |
| |
| @test_util.run_v2_only |
| def testKerasSequentialModel(self): |
| """Test a simple sequential tf.Keras model.""" |
| input_data = constant_op.constant(1., shape=[1, 1]) |
| |
| x = np.array([[1.], [2.]]) |
| y = np.array([[2.], [4.]]) |
| |
| model = keras.models.Sequential([ |
| keras.layers.Dropout(0.2), |
| keras.layers.Dense(1), |
| ]) |
| model.compile(optimizer='sgd', loss='mean_squared_error') |
| model.fit(x, y, epochs=1) |
| |
| save_dir = os.path.join(self.get_temp_dir(), 'saved_model') |
| save(model, save_dir) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_saved_model(save_dir) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = model.predict(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value, actual_value) |
| |
| @test_util.run_v2_only |
| def testGraphDebugInfo(self): |
| """Test a SavedModel has debug info captured.""" |
| input_data = constant_op.constant(1., shape=[1]) |
| root = tracking.AutoTrackable() |
| root.f = def_function.function(lambda x: 2. * x) |
| to_save = root.f.get_concrete_function(input_data) |
| |
| save_dir = os.path.join(self.get_temp_dir(), 'saved_model') |
| save(root, save_dir, to_save) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_saved_model(save_dir) |
| converter.convert() |
| self._assertValidDebugInfo(converter._debug_info) |
| |
| |
| class FromKerasModelTest(TestModels): |
| |
| @test_util.run_v2_only |
| def testSequentialModel(self): |
| """Test a simple sequential tf.Keras model.""" |
| input_data = constant_op.constant(1., shape=[1, 1]) |
| |
| # Create a simple Keras model. |
| x = np.array([[1.], [2.]]) |
| y = np.array([[2.], [4.]]) |
| |
| model = keras.models.Sequential([ |
| keras.layers.Dropout(0.2), |
| keras.layers.Dense(units=1, input_shape=[1]) |
| ]) |
| model.compile(optimizer='sgd', loss='mean_squared_error') |
| model.fit(x, y, epochs=1) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_keras_model(model) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = model.predict(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value, actual_value) |
| |
| @test_util.run_v2_only |
| def testSequentialMultiInputOutputModel(self): |
| """Test a tf.Keras model with multiple inputs and outputs.""" |
| left_input_data = constant_op.constant(1., shape=[1, 3]) |
| right_input_data = constant_op.constant(1., shape=[1, 3]) |
| |
| # Create a simple Keras model. |
| input_a_np = np.random.random((10, 3)) |
| input_b_np = np.random.random((10, 3)) |
| output_c_np = np.random.random((10, 3)) |
| output_d_np = np.random.random((10, 2)) |
| |
| input_a = keras.layers.Input(shape=(3,), name='input_a') |
| input_b = keras.layers.Input(shape=(3,), name='input_b') |
| |
| dense = keras.layers.Dense(8, name='dense_1') |
| interm_a = dense(input_a) |
| interm_b = dense(input_b) |
| merged = keras.layers.concatenate([interm_a, interm_b], name='merge') |
| |
| output_c = keras.layers.Dense( |
| 3, activation='softmax', name='dense_2')( |
| merged) |
| output_d = keras.layers.Dense( |
| 2, activation='softmax', name='dense_3')( |
| merged) |
| |
| model = keras.models.Model( |
| inputs=[input_a, input_b], outputs=[output_c, output_d]) |
| model.compile(optimizer='sgd', loss='mean_squared_error') |
| model.fit([input_a_np, input_b_np], [output_c_np, output_d_np], epochs=1) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_keras_model(model) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| input_data = [left_input_data, right_input_data] |
| expected_value = model.predict(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, input_data) |
| for tf_result, tflite_result in zip(expected_value, actual_value): |
| np.testing.assert_almost_equal(tf_result[0], tflite_result, 5) |
| |
| @test_util.run_v2_only |
| def testGraphDebugInfo(self): |
| """Test a tf.Keras model has debug info captured.""" |
| # Create a simple Keras model. |
| x = [-1, 0, 1, 2, 3, 4] |
| y = [-3, -1, 1, 3, 5, 7] |
| model = keras.models.Sequential( |
| [keras.layers.Dense(units=1, input_shape=[1])]) |
| model.compile(optimizer='sgd', loss='mean_squared_error') |
| model.fit(x, y, epochs=1) |
| converter = lite.TFLiteConverterV2.from_keras_model(model) |
| converter.convert() |
| self._assertValidDebugInfo(converter._debug_info) |
| |
| |
| class GrapplerTest(TestModels): |
| |
| @test_util.run_v2_only |
| def testConstantFolding(self): |
| # Constant folding handles the tf.broadcast_to operation which was not |
| # supported by the TFLite at the time this test was added. |
| input_data = constant_op.constant([1., 2., 3., 4., 5., 6., 7., 8., 9.], |
| shape=[3, 3]) |
| |
| @def_function.function |
| def func(x): |
| y_const = constant_op.constant([1., 2., 3.]) |
| y_broadcast = gen_array_ops.broadcast_to(y_const, [3, 3]) |
| return math_ops.matmul(x, y_broadcast) |
| |
| root = tracking.AutoTrackable() |
| root.f = func |
| concrete_func = root.f.get_concrete_function(input_data) |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.f(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| np.testing.assert_array_equal(expected_value.numpy(), actual_value) |
| |
| |
| if __name__ == '__main__': |
| test.main() |