| # Lint as: python2, python3 |
| # Copyright 2018 The TensorFlow Authors. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============================================================================== |
| """Tests for lite.py.""" |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import io |
| import logging |
| import os |
| import tempfile |
| |
| from absl.testing import parameterized |
| import numpy as np |
| import six |
| from six.moves import range |
| |
| from tensorflow.lite.python import lite |
| from tensorflow.lite.python import lite_constants |
| from tensorflow.lite.python.convert import ConverterError |
| from tensorflow.lite.python.convert import mlir_quantize |
| from tensorflow.lite.python.interpreter import Interpreter |
| from tensorflow.python import keras |
| from tensorflow.python.client import session |
| from tensorflow.python.eager import context |
| from tensorflow.python.eager import def_function |
| from tensorflow.python.framework import constant_op |
| from tensorflow.python.framework import dtypes |
| from tensorflow.python.framework import ops |
| from tensorflow.python.framework import test_util |
| from tensorflow.python.ops import array_ops |
| from tensorflow.python.ops import gen_array_ops |
| from tensorflow.python.ops import math_ops |
| from tensorflow.python.ops import nn_ops |
| from tensorflow.python.ops import variable_scope |
| from tensorflow.python.ops import variables |
| from tensorflow.python.ops.variables import global_variables_initializer as _global_variables_initializer |
| from tensorflow.python.platform import gfile |
| from tensorflow.python.platform import resource_loader |
| from tensorflow.python.platform import test |
| from tensorflow.python.saved_model import saved_model |
| from tensorflow.python.training.training_util import write_graph |
| |
| |
| class LiteTest(test_util.TensorFlowTestCase): |
| """Base class of all the tests in this module.""" |
| |
| def setUp(self): |
| self._original_use_experimental_new_converter = ( |
| lite._USE_EXPERIMENTAL_NEW_CONVERTER) |
| super(LiteTest, self).setUp() |
| |
| def tearDown(self): |
| super(LiteTest, self).tearDown() |
| lite._USE_EXPERIMENTAL_NEW_CONVERTER = ( |
| self._original_use_experimental_new_converter) |
| |
| |
| class TestModels(LiteTest): |
| |
| def assertValidDebugInfo(self, debug_info): |
| """Verify the DebugInfo is valid.""" |
| file_names = set() |
| for file_path in debug_info.files: |
| file_names.add(os.path.basename(file_path)) |
| # To make the test independent on how the nodes are created, we only assert |
| # the name of this test file. |
| self.assertIn('lite_test.py', file_names) |
| self.assertNotIn('lite_v2_test.py', file_names) |
| |
| |
| class FromConstructor(TestModels): |
| |
| # Tests invalid constructors using a dummy value for the GraphDef. |
| def testInvalidConstructor(self): |
| message = ('If input_tensors and output_tensors are None, both ' |
| 'input_arrays_with_shape and output_arrays must be defined.') |
| |
| # `output_arrays` is not defined. |
| with self.assertRaises(ValueError) as error: |
| lite.TFLiteConverter( |
| None, None, [], input_arrays_with_shape=[('input', [3, 9])]) |
| self.assertEqual(message, str(error.exception)) |
| |
| # `input_arrays_with_shape` is not defined. |
| with self.assertRaises(ValueError) as error: |
| lite.TFLiteConverter(None, [], None, output_arrays=['output']) |
| self.assertEqual(message, str(error.exception)) |
| |
| # Tests valid constructors using a dummy value for the GraphDef. |
| def testValidConstructor(self): |
| converter = lite.TFLiteConverter( |
| None, |
| None, |
| None, |
| input_arrays_with_shape=[('input', [3, 9])], |
| output_arrays=['output']) |
| self.assertFalse(converter._has_valid_tensors()) |
| self.assertEqual(converter.get_input_arrays(), ['input']) |
| |
| with self.assertRaises(ValueError) as error: |
| converter._set_batch_size(1) |
| self.assertEqual( |
| 'The batch size cannot be set for this model. Please use ' |
| 'input_shapes parameter.', str(error.exception)) |
| |
| converter = lite.TFLiteConverter(None, ['input_tensor'], ['output_tensor']) |
| self.assertTrue(converter._has_valid_tensors()) |
| |
| |
| class FromSessionTest(TestModels, parameterized.TestCase): |
| |
| def testFloat(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual('add', output_details[0]['name']) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| def testForgottenCallToAllocateTensors(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| input_index = interpreter.get_input_details()[0]['index'] |
| dummy_tensor = np.ones(shape=[1, 16, 16, 3], dtype=np.float32) |
| with self.assertRaises(ValueError): |
| interpreter.set_tensor(input_index, dummy_tensor) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| def testString(self, enable_mlir): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder(shape=[4], dtype=dtypes.string) |
| out_tensor = array_ops.reshape(in_tensor, shape=[2, 2]) |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| converter.experimental_new_converter = enable_mlir |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual(np.string_, input_details[0]['dtype']) |
| self.assertTrue(([4] == input_details[0]['shape']).all()) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual('Reshape', output_details[0]['name']) |
| self.assertEqual(np.string_, output_details[0]['dtype']) |
| self.assertTrue(([2, 2] == output_details[0]['shape']).all()) |
| # TODO(b/122659643): Test setting/getting string data via the python |
| # interpreter API after support has been added. |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| def testQuantization(self, enable_mlir): |
| with ops.Graph().as_default(): |
| in_tensor_1 = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32, name='inputA') |
| in_tensor_2 = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32, name='inputB') |
| out_tensor = array_ops.fake_quant_with_min_max_args( |
| in_tensor_1 + in_tensor_2, min=0., max=1., name='output') |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, |
| [in_tensor_1, in_tensor_2], |
| [out_tensor]) |
| converter.inference_type = lite_constants.QUANTIZED_UINT8 |
| converter.quantized_input_stats = { |
| 'inputA': (0., 1.), |
| 'inputB': (0., 1.) |
| } # mean, std_dev |
| converter.experimental_new_converter = enable_mlir |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(2, len(input_details)) |
| self.assertEqual('inputA', input_details[0]['name']) |
| self.assertEqual(np.uint8, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((1., 0.), |
| input_details[0]['quantization']) # scale, zero_point |
| |
| self.assertEqual('inputB', input_details[1]['name']) |
| self.assertEqual(np.uint8, input_details[1]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[1]['shape']).all()) |
| self.assertEqual((1., 0.), |
| input_details[1]['quantization']) # scale, zero_point |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual(np.uint8, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all()) |
| self.assertTrue(output_details[0]['quantization'][0] > 0) # scale |
| |
| def testQuantizedInput(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| converter.inference_input_type = lite_constants.QUANTIZED_UINT8 |
| converter.inference_type = lite_constants.FLOAT |
| converter.quantized_input_stats = {'Placeholder': (0., 1.)} # mean, std_dev |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual(np.uint8, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((1., 0.), |
| input_details[0]['quantization']) # scale, zero_point |
| |
| output_details = interpreter.get_output_details() |
| self.assertLen(output_details, 1) |
| self.assertEqual('add', output_details[0]['name']) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) # float |
| |
| def testQuantizationInvalid(self): |
| with ops.Graph().as_default(): |
| in_tensor_1 = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32, name='inputA') |
| in_tensor_2 = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32, name='inputB') |
| out_tensor = array_ops.fake_quant_with_min_max_args( |
| in_tensor_1 + in_tensor_2, min=0., max=1., name='output') |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, |
| [in_tensor_1, in_tensor_2], |
| [out_tensor]) |
| converter.inference_type = lite_constants.QUANTIZED_UINT8 |
| converter.quantized_input_stats = {'inputA': (0., 1.)} # mean, std_dev |
| with self.assertRaises(ValueError) as error: |
| converter.convert() |
| self.assertEqual( |
| 'Quantization input stats are not available for input tensors ' |
| '\'inputB\'.', str(error.exception)) |
| |
| def testIntermediateInputArray(self): |
| """Convert a model from an intermediate input array.""" |
| with ops.Graph().as_default(): |
| in_tensor_init = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| in_tensor_final = in_tensor_init + in_tensor_init |
| out_tensor = in_tensor_final + in_tensor_final |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor_final], |
| [out_tensor]) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('add', input_details[0]['name']) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual('add_1', output_details[0]['name']) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| def testSizeNoneInvalid(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder(dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Test None as shape when dynamic shapes are disabled. Run with TOCO in |
| # order to invoke shape checking code. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| converter.experimental_new_converter = False |
| with self.assertRaises(ValueError) as error: |
| converter.convert() |
| self.assertEqual('Provide an input shape for input array \'Placeholder\'.', |
| str(error.exception)) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| def testScalarValid(self, enable_mlir): |
| # Construct a graph using a scalar (empty shape) input. |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder(dtype=dtypes.float32, shape=[]) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Test conversion with the scalar input shape. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| converter.experimental_new_converter = enable_mlir |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([] == input_details[0]['shape']).all()) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual('add', output_details[0]['name']) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([] == input_details[0]['shape']).all()) |
| |
| # Validate inference using the scalar inputs/outputs. |
| test_input = np.array(4.0, dtype=np.float32) |
| expected_output = np.array(8.0, dtype=np.float32) |
| interpreter.set_tensor(input_details[0]['index'], test_input) |
| interpreter.invoke() |
| |
| output_data = interpreter.get_tensor(output_details[0]['index']) |
| self.assertTrue((expected_output == output_data).all()) |
| |
| def testSizeInvalid(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, None, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Test invalid shape. None after 1st dimension. Run with TOCO in order to |
| # invoke shape checking code. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| converter.experimental_new_converter = False |
| with self.assertRaises(ValueError) as error: |
| converter.convert() |
| self.assertEqual( |
| 'None is only supported in the 1st dimension. Tensor ' |
| '\'Placeholder\' has invalid shape \'[1, None, 16, 3]\'.', |
| str(error.exception)) |
| |
| def testSizeNone(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, None, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Test None after 1st dimension. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| converter.experimental_new_converter = True |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 1, 16, 3] == input_details[0]['shape']).all()) |
| self.assertTrue(([1, -1, 16, |
| 3] == input_details[0]['shape_signature']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| # Resize tensor with strict checking. |
| with self.assertRaises(RuntimeError) as error: |
| interpreter.resize_tensor_input(0, [3, 16, 16, 3], strict=True) |
| self.assertIn( |
| 'ResizeInputTensorStrict only allows mutating unknown dimensions ' |
| 'identified by -1.', str(error.exception)) |
| |
| # Resize tensor and invoke. |
| interpreter.resize_tensor_input(0, [1, 16, 16, 3], strict=True) |
| interpreter.allocate_tensors() |
| interpreter.invoke() |
| |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| self.assertTrue(([1, -1, 16, |
| 3] == input_details[0]['shape_signature']).all()) |
| |
| output_details = interpreter.get_output_details() |
| self.assertTrue(([1, -1, 16, |
| 3] == output_details[0]['shape_signature']).all()) |
| |
| def testResizeTensorInputStrict(self): |
| # Ensures that resize_tensor_input(strict=True) works as expected. |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| |
| # Resize incorrect value. |
| with self.assertRaises(RuntimeError) as error: |
| interpreter.resize_tensor_input(0, [3, 16, 16, 3], strict=True) |
| self.assertIn( |
| 'ResizeInputTensorStrict only allows mutating unknown dimensions ' |
| 'identified by -1.', str(error.exception)) |
| |
| # Resize correct value. |
| interpreter.resize_tensor_input(0, [1, 16, 16, 3], strict=True) |
| interpreter.allocate_tensors() |
| |
| def testBatchSizeValid(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[None, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual('add', output_details[0]['name']) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| def testBatchSizeNonZero(self): |
| with ops.Graph().as_default(): |
| in_tensor_1 = array_ops.placeholder( |
| shape=[None, 4], dtype=dtypes.float32, name='input1') |
| in_tensor_2 = array_ops.placeholder( |
| shape=[4, 10], dtype=dtypes.float32, name='input2') |
| out_tensor = math_ops.matmul(in_tensor_1, in_tensor_2) |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, |
| [in_tensor_1, in_tensor_2], |
| [out_tensor]) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 2) |
| self.assertEqual('input1', input_details[0]['name']) |
| self.assertTrue(([1, 4] == input_details[0]['shape']).all()) |
| self.assertEqual('input2', input_details[1]['name']) |
| self.assertTrue(([4, 10] == input_details[1]['shape']).all()) |
| |
| def testFreezeGraph(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| var = variable_scope.get_variable( |
| 'weights', shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| # Get the second output to ensure freezing properly processes tensor names |
| # like 'X:1'. |
| out_tensor = nn_ops.top_k(in_tensor + var, name='top_k')[1] |
| sess = session.Session() |
| sess.run(_global_variables_initializer()) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual('top_k:1', output_details[0]['name']) |
| self.assertEqual(np.int32, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 1] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| def testGraphviz(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| converter.output_format = lite_constants.GRAPHVIZ_DOT |
| graphviz_output = converter.convert() |
| self.assertTrue(graphviz_output) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| def testDumpGraphviz(self, enable_mlir): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| converter.experimental_new_converter = enable_mlir |
| graphviz_dir = self.get_temp_dir() |
| converter.dump_graphviz_dir = graphviz_dir |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Ensure interpreter is able to allocate and check graphviz data. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| num_items_graphviz = len(os.listdir(graphviz_dir)) |
| self.assertTrue(num_items_graphviz) |
| self.assertTrue( |
| os.path.exists(os.path.join(graphviz_dir, 'toco_AT_IMPORT.dot'))) |
| self.assertTrue( |
| os.path.exists( |
| os.path.join(graphviz_dir, 'toco_AFTER_TRANSFORMATIONS.dot'))) |
| |
| # new converter doesn't support `dump_graphviz_video` flag |
| if not enable_mlir: |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| converter.experimental_new_converter = enable_mlir |
| graphviz_dir = self.get_temp_dir() |
| converter.dump_graphviz_dir = graphviz_dir |
| converter.dump_graphviz_video = True |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Ensure graphviz folder has more data after using video flag. |
| num_items_graphviz_video = len(os.listdir(graphviz_dir)) |
| self.assertGreater(num_items_graphviz_video, num_items_graphviz) |
| |
| def testDumpConversionSummary(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| log_dir = self.get_temp_dir() |
| converter.conversion_summary_dir = log_dir |
| # Conversion logs will only be generated when the mlir converter is enabled. |
| converter.experimental_new_converter = True |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| num_items_conversion_summary = len(os.listdir(log_dir)) |
| self.assertTrue(num_items_conversion_summary) |
| |
| def testDumpConversionSummaryWithOldConverter(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| converter.experimental_new_converter = False |
| log_dir = self.get_temp_dir() |
| converter.conversion_summary_dir = log_dir |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| # Check nothing is generated under the conversion summary path. |
| num_items_conversion_summary = len(os.listdir(log_dir)) |
| self.assertEqual(num_items_conversion_summary, 0) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| def testInferenceInputType(self, enable_mlir): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| converter.experimental_new_converter = enable_mlir |
| converter.inference_input_type = lite_constants.QUANTIZED_UINT8 |
| converter.quantized_input_stats = {'Placeholder': (0., 1.)} # mean, std_dev |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual(np.uint8, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((1., 0.), input_details[0]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual('add', output_details[0]['name']) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all()) |
| |
| def testDefaultRangesStats(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| converter.inference_type = lite_constants.QUANTIZED_UINT8 |
| converter.quantized_input_stats = {'Placeholder': (0., 1.)} # mean, std_dev |
| converter.default_ranges_stats = (0, 6) # min, max |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual(np.uint8, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((1., 0.), input_details[0]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual('add', output_details[0]['name']) |
| self.assertEqual(np.uint8, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all()) |
| self.assertTrue(output_details[0]['quantization'][0] > 0) # scale |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| def testPostTrainingQuantizeDeprecatedAttribute(self, enable_mlir): |
| with ops.Graph().as_default(): |
| in_tensor_1 = array_ops.placeholder( |
| shape=[33, 33], dtype=dtypes.float32, name='inputA') |
| in_tensor_2 = constant_op.constant( |
| np.random.uniform(low=-10., high=10., size=(33, 33)), |
| shape=[33, 33], |
| dtype=dtypes.float32, |
| name='inputB') |
| out_tensor = math_ops.matmul(in_tensor_1, in_tensor_2, name='output') |
| sess = session.Session() |
| |
| quantized_converter = lite.TFLiteConverter.from_session( |
| sess, [in_tensor_1], [out_tensor]) |
| self.assertFalse(quantized_converter.post_training_quantize) |
| quantized_converter.experimental_new_converter = enable_mlir |
| |
| quantized_converter.post_training_quantize = True |
| self.assertTrue(quantized_converter.post_training_quantize) |
| self.assertEqual(quantized_converter.optimizations, [lite.Optimize.DEFAULT]) |
| |
| quantized_tflite = quantized_converter.convert() |
| self.assertTrue(quantized_tflite) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| def testPostTrainingQuantize(self, enable_mlir): |
| np.random.seed(0) |
| with ops.Graph().as_default(): |
| # We need the tensor to have more than 1024 elements for quantize_weights |
| # to kick in. Thus, the [33, 33] shape. |
| in_tensor_1 = array_ops.placeholder( |
| shape=[33, 33], dtype=dtypes.float32, name='inputA') |
| in_tensor_2 = constant_op.constant( |
| np.random.uniform(low=-10., high=10., size=(33, 33)), |
| shape=[33, 33], |
| dtype=dtypes.float32, |
| name='inputB') |
| out_tensor = math_ops.matmul(in_tensor_1, in_tensor_2, name='output') |
| sess = session.Session() |
| |
| # Convert float model. |
| float_converter = lite.TFLiteConverter.from_session(sess, [in_tensor_1], |
| [out_tensor]) |
| float_converter.experimental_new_converter = enable_mlir |
| float_tflite = float_converter.convert() |
| self.assertTrue(float_tflite) |
| |
| # Convert quantized weights model. |
| quantized_converter = lite.TFLiteConverter.from_session( |
| sess, [in_tensor_1], [out_tensor]) |
| |
| quantized_converter.experimental_new_converter = enable_mlir |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_converter.experimental_new_converter = enable_mlir |
| quantized_tflite = quantized_converter.convert() |
| self.assertTrue(quantized_tflite) |
| |
| # Ensure that the quantized weights tflite model is smaller. |
| self.assertTrue(len(quantized_tflite) < len(float_tflite)) |
| |
| def _getCalibrationQuantizeModel(self): |
| np.random.seed(0) |
| inp = array_ops.placeholder( |
| dtype=dtypes.float32, shape=(1, 5, 5, 3), name='input') |
| conv = nn_ops.conv2d( |
| inp, |
| filter=array_ops.ones([3, 3, 3, 16]), |
| strides=[1, 1, 1, 1], |
| padding='SAME') |
| output = nn_ops.relu(conv, name='output') |
| |
| def calibration_gen(): |
| for _ in range(5): |
| yield [np.random.uniform(-1, 1, size=(1, 5, 5, 3)).astype(np.float32)] |
| |
| return (inp, output, calibration_gen) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| def testPostTrainingCalibrateAndQuantize(self, enable_mlir): |
| with ops.Graph().as_default(): |
| inp, output, calibration_gen = self._getCalibrationQuantizeModel() |
| sess = session.Session() |
| |
| # Convert float model. |
| float_converter = lite.TFLiteConverter.from_session(sess, [inp], [output]) |
| float_tflite = float_converter.convert() |
| self.assertTrue(float_tflite) |
| |
| # Convert quantized model. |
| quantized_converter = lite.TFLiteConverter.from_session( |
| sess, [inp], [output]) |
| quantized_converter.experimental_new_converter = enable_mlir |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_converter.representative_dataset = calibration_gen |
| quantized_tflite = quantized_converter.convert() |
| self.assertTrue(quantized_tflite) |
| |
| # The default input and output types should be float. |
| interpreter = Interpreter(model_content=quantized_tflite) |
| interpreter.allocate_tensors() |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| |
| # Ensure that the quantized weights tflite model is smaller. |
| self.assertLess(len(quantized_tflite), len(float_tflite)) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| def testCalibrateAndQuantizeBuiltinInt8(self, enable_mlir): |
| with ops.Graph().as_default(): |
| inp, output, calibration_gen = self._getCalibrationQuantizeModel() |
| sess = session.Session() |
| |
| # Convert float model. |
| float_converter = lite.TFLiteConverter.from_session(sess, [inp], [output]) |
| float_converter.experimental_new_converter = enable_mlir |
| float_tflite = float_converter.convert() |
| self.assertTrue(float_tflite) |
| |
| # Convert model by specifying target spec (instead of optimizations), since |
| # when targeting an integer only backend, quantization is mandatory. |
| quantized_converter = lite.TFLiteConverter.from_session( |
| sess, [inp], [output]) |
| quantized_converter.experimental_new_converter = enable_mlir |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.TFLITE_BUILTINS_INT8 |
| ] |
| quantized_converter.representative_dataset = calibration_gen |
| quantized_tflite = quantized_converter.convert() |
| self.assertTrue(quantized_tflite) |
| |
| # The default input and output types should be float. |
| interpreter = Interpreter(model_content=quantized_tflite) |
| interpreter.allocate_tensors() |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| |
| # Ensure that the quantized weights tflite model is smaller. |
| self.assertLess(len(quantized_tflite), len(float_tflite)) |
| |
| @parameterized.named_parameters( |
| # Quantize to Float16 even if rep data provided. |
| ('UseRepresentativeData', True, False, True, False, False, False), |
| # Quantize to Float16 if no rep data provided. |
| ('NoRepresentativeData', False, False, True, False, False, False), |
| # Post training quantization if both rep data and int8 included. |
| ('UseSampleDataIncludeInt8', True, True, False, False, True, False), |
| |
| # Quantize to Float16 even if rep data provided with mlir. |
| ('UseRepresentativeDataMlir', True, False, True, False, False, True), |
| # Quantize to Float16 if no rep data provided with mlir. |
| ('NoRepresentativeDataMlir', False, False, True, False, False, True), |
| # Post training quantization if both rep data and int8 included with mlir. |
| ('SampleDataIncludeInt8Mlir', True, True, False, False, True, True)) |
| def testQuantizeFloat16(self, use_rep_data, include_int8, |
| is_float16_quantized, is_error, |
| is_post_training_quantized, enable_mlir): |
| with ops.Graph().as_default(): |
| inp, output, calibration_gen = self._getCalibrationQuantizeModel() |
| sess = session.Session() |
| |
| idx = 1 if enable_mlir else 0 |
| node_name = 'Conv2D' if enable_mlir else 'Conv2D_bias' |
| # Convert float model. |
| float_converter = lite.TFLiteConverter.from_session(sess, [inp], [output]) |
| float_converter.experimental_new_converter = enable_mlir |
| float_tflite = float_converter.convert() |
| self.assertTrue(float_tflite) |
| interpreter = Interpreter(model_content=float_tflite) |
| interpreter.allocate_tensors() |
| self.assertEqual(interpreter.get_tensor_details()[idx]['name'], node_name) |
| self.assertEqual(interpreter.get_tensor_details()[idx]['dtype'], |
| lite.constants.FLOAT) |
| # Convert model to quantized version |
| quantized_converter = lite.TFLiteConverter.from_session( |
| sess, [inp], [output]) |
| quantized_converter.experimental_new_converter = enable_mlir |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_converter.target_spec.supported_types = [lite.constants.FLOAT16] |
| if include_int8: |
| quantized_converter.target_spec.supported_types.append( |
| lite.constants.INT8) |
| if use_rep_data: |
| quantized_converter.representative_dataset = calibration_gen |
| |
| if is_error: |
| with self.assertRaises(ValueError) as error: |
| quantized_converter.convert() |
| self.assertEqual( |
| 'representative_dataset is required when specifying ' |
| 'TFLITE_BUILTINS_INT8 or INT8 supported types.', str(error.exception)) |
| |
| else: |
| quantized_tflite = quantized_converter.convert() |
| self.assertTrue(quantized_tflite) |
| interpreter = Interpreter(model_content=quantized_tflite) |
| interpreter.allocate_tensors() |
| self.assertEqual(interpreter.get_tensor_details()[idx]['name'], node_name) |
| |
| if is_float16_quantized: |
| # Verify that bias constant is float16 type. |
| self.assertEqual(interpreter.get_tensor_details()[idx]['dtype'], |
| lite.constants.FLOAT16) |
| elif is_post_training_quantized: |
| # Verify that bias constants is int32 type. |
| self.assertEqual(interpreter.get_tensor_details()[idx]['dtype'], |
| lite.constants.INT32) |
| else: |
| raise ValueError('Invalid test options.') |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| def testInvalidQuantizeFloat16(self, enable_mlir): |
| with ops.Graph().as_default(): |
| inp, output, _ = self._getCalibrationQuantizeModel() |
| sess = session.Session() |
| |
| # Specify float16 quantization |
| quantized_converter = lite.TFLiteConverter.from_session( |
| sess, [inp], [output]) |
| quantized_converter.experimental_new_converter = enable_mlir |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_converter.target_spec.supported_types = [lite.constants.FLOAT16] |
| # Specify only int8 builtin ops |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.TFLITE_BUILTINS_INT8 |
| ] |
| with self.assertRaises(ValueError) as error: |
| quantized_converter.convert() |
| self.assertEqual( |
| 'TFLITE_BUILTINS_INT8 requires smallest supported type to be INT8.', |
| str(error.exception)) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| def testInvalidPostTrainingQuantize(self, enable_mlir): |
| np.random.seed(0) |
| with ops.Graph().as_default(): |
| # We need the tensor to have more than 1024 elements for quantize_weights |
| # to kick in. Thus, the [33, 33] shape. |
| in_tensor_1 = array_ops.placeholder( |
| shape=[33, 33], dtype=dtypes.float32, name='inputA') |
| in_tensor_2 = constant_op.constant( |
| np.random.uniform(low=-10., high=10., size=(33, 33)), |
| shape=[33, 33], |
| dtype=dtypes.float32, |
| name='inputB') |
| out_tensor = math_ops.matmul(in_tensor_1, in_tensor_2, name='output') |
| sess = session.Session() |
| |
| # Attempt to convert to quantized weights model. |
| quantized_converter = lite.TFLiteConverter.from_session( |
| sess, [in_tensor_1], [out_tensor]) |
| quantized_converter.experimental_new_converter = enable_mlir |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| # Restricting to int8 type only |
| quantized_converter.target_spec.supported_types = [lite.constants.INT8] |
| # A representative dataset is required for full fixed point quantization. |
| with self.assertRaises(ValueError) as error: |
| quantized_converter.convert() |
| self.assertEqual( |
| 'representative_dataset is required when specifying ' |
| 'TFLITE_BUILTINS_INT8 or INT8 supported types.', str(error.exception)) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| def testPostTrainingCalibrateAndQuantizeFloatNotAllowed(self, enable_mlir): |
| with ops.Graph().as_default(): |
| inp, output, calibration_gen = self._getCalibrationQuantizeModel() |
| sess = session.Session() |
| |
| # Convert float model. |
| float_converter = lite.TFLiteConverter.from_session(sess, [inp], [output]) |
| float_converter.experimental_new_converter = enable_mlir |
| float_tflite = float_converter.convert() |
| self.assertTrue(float_tflite) |
| |
| # Convert quantized model. |
| quantized_converter = lite.TFLiteConverter.from_session( |
| sess, [inp], [output]) |
| quantized_converter.experimental_new_converter = enable_mlir |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_converter.representative_dataset = calibration_gen |
| quantized_converter.target_spec.supported_types = [lite.constants.INT8] |
| quantized_tflite = quantized_converter.convert() |
| self.assertTrue(quantized_tflite) |
| |
| # Ensure that the quantized weights tflite model is smaller. |
| self.assertLess(len(quantized_tflite), len(float_tflite)) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| def testPostTrainingCalibrateAndQuantizeInt8Inputs(self, enable_mlir): |
| with ops.Graph().as_default(): |
| inp, output, calibration_gen = self._getCalibrationQuantizeModel() |
| sess = session.Session() |
| |
| # Convert float model. |
| float_converter = lite.TFLiteConverter.from_session(sess, [inp], [output]) |
| float_converter.experimental_new_converter = enable_mlir |
| float_tflite = float_converter.convert() |
| self.assertTrue(float_tflite) |
| |
| # Convert quantized weights model. |
| quantized_converter = lite.TFLiteConverter.from_session( |
| sess, [inp], [output]) |
| quantized_converter.experimental_new_converter = enable_mlir |
| quantized_converter.inference_input_type = lite_constants.INT8 |
| quantized_converter.inference_output_type = lite_constants.INT8 |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_converter.representative_dataset = calibration_gen |
| quantized_tflite = quantized_converter.convert() |
| self.assertTrue(quantized_tflite) |
| |
| # The input and output types should be int8. |
| interpreter = Interpreter(model_content=quantized_tflite) |
| interpreter.allocate_tensors() |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual(np.int8, input_details[0]['dtype']) |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual(np.int8, output_details[0]['dtype']) |
| |
| # Ensure that the quantized weights tflite model is smaller. |
| self.assertLess(len(quantized_tflite), len(float_tflite)) |
| |
| @parameterized.named_parameters( |
| ('InferenceType_INT8', lite_constants.INT8), |
| ('InferenceType_QUANTIZED_INT8', lite_constants.QUANTIZED_UINT8)) |
| def testRequiresInputStatsForTrainingTimeQuantization(self, quantized_type): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = array_ops.fake_quant_with_min_max_args( |
| in_tensor + in_tensor, min=0., max=1.) |
| sess = session.Session() |
| |
| quantized_converter = lite.TFLiteConverter.from_session( |
| sess, [in_tensor], [out_tensor]) |
| |
| with self.assertRaises(ValueError) as error: |
| quantized_converter.inference_type = quantized_type |
| quantized_converter.convert() |
| self.assertEqual( |
| 'std_dev and mean must be defined when inference_type or ' |
| 'inference_input_type is QUANTIZED_UINT8 or INT8.', |
| str(error.exception)) |
| |
| with self.assertRaises(ValueError) as error: |
| quantized_converter.inference_type = lite_constants.FLOAT |
| quantized_converter.inference_input_type = quantized_type |
| quantized_converter.convert() |
| self.assertEqual( |
| 'std_dev and mean must be defined when inference_type or ' |
| 'inference_input_type is QUANTIZED_UINT8 or INT8.', |
| str(error.exception)) |
| |
| quantized_converter.inference_type = quantized_type |
| quantized_converter.inference_input_type = quantized_type |
| |
| input_arrays = quantized_converter.get_input_arrays() |
| quantized_converter.quantized_input_stats = { |
| input_arrays[0]: (0., 1.) |
| } |
| quantized_converter.convert() |
| |
| def testTrainingTimeAndPostTrainingCalibrateAndQuantize(self): |
| with ops.Graph().as_default(): |
| inp, output, calibration_gen = self._getCalibrationQuantizeModel() |
| sess = session.Session() |
| |
| # Convert float model. |
| float_converter = lite.TFLiteConverter.from_session(sess, [inp], [output]) |
| float_tflite = float_converter.convert() |
| self.assertTrue(float_tflite) |
| |
| converter = lite.TFLiteConverter.from_session(sess, [inp], [output]) |
| |
| # extra flags to trigger training time quantization conversion |
| converter.inference_type = lite_constants.INT8 |
| converter.inference_input_type = lite_constants.FLOAT |
| converter.inference_output_type = lite_constants.FLOAT |
| input_arrays = converter.get_input_arrays() |
| converter.quantized_input_stats = { |
| input_arrays[0]: (0., 1.) |
| } |
| # trigger post-training quantization |
| converter.optimizations = [lite.Optimize.DEFAULT] |
| converter.representative_dataset = calibration_gen |
| converter._experimental_new_quantizer = True |
| quantized_tflite = converter.convert() |
| self.assertTrue(quantized_tflite) |
| self.assertLess(len(quantized_tflite), len(float_tflite)) |
| |
| # calibration only api |
| converter._experimental_calibrate_only = True |
| calibrated_tflite = converter.convert() |
| quantized_tflite = mlir_quantize(calibrated_tflite, fully_quantize=True) |
| interpreter = Interpreter(model_content=quantized_tflite) |
| interpreter.allocate_tensors() |
| input_details = interpreter.get_input_details() |
| self.assertEqual(np.int8, input_details[0]['dtype']) |
| self.assertEqual((1., 0.), input_details[0]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(np.int8, output_details[0]['dtype']) |
| |
| def testFloatTocoConverter(self): |
| """Tests deprecated test TocoConverter.""" |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TocoConverter.from_session(sess, [in_tensor], [out_tensor]) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Ensure the interpreter is able to load. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| def testMultipleOutputNodeNames(self): |
| """Tests converting a graph with an op that have multiple outputs.""" |
| with ops.Graph().as_default(): |
| input_tensor = array_ops.placeholder(shape=[4], dtype=dtypes.float32) |
| out0, out1, out2, out3 = array_ops.split( |
| input_tensor, [1, 1, 1, 1], axis=0) |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [input_tensor], |
| [out0, out1, out2, out3]) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| interpreter.set_tensor(input_details[0]['index'], |
| np.asarray([1.0, 2.0, 3.0, 4.0], dtype=np.float32)) |
| interpreter.invoke() |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(4, len(output_details)) |
| self.assertEqual(1.0, interpreter.get_tensor(output_details[0]['index'])) |
| self.assertEqual(2.0, interpreter.get_tensor(output_details[1]['index'])) |
| self.assertEqual(3.0, interpreter.get_tensor(output_details[2]['index'])) |
| self.assertEqual(4.0, interpreter.get_tensor(output_details[3]['index'])) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| @test_util.run_in_graph_and_eager_modes |
| def testFunctions(self, enable_mlir): |
| """Tests tf.function in 1.X.""" |
| |
| @def_function.function |
| def plus_placeholder(x, placeholder): |
| return x + placeholder |
| |
| with ops.Graph().as_default(): |
| placeholder = array_ops.placeholder( |
| dtype=dtypes.float32, shape=[1], name='input') |
| variable_node = variables.Variable(1.0, name='variable_node') |
| defun_node = plus_placeholder(variable_node, placeholder) |
| output_node = math_ops.multiply(defun_node, 2.0, name='output_node') |
| |
| # Initialize variables in the model. |
| sess = session.Session() |
| sess.run(variables.variables_initializer([variable_node])) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [placeholder], |
| [output_node]) |
| converter.experimental_new_converter = enable_mlir |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('input', input_details[0]['name']) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual('output_node', output_details[0]['name']) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| def testInferenceInputOutputTypeFloatDefault(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual('add', output_details[0]['name']) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all()) |
| |
| def testInferenceInputOutputTypeQuantizedUint8Default(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = array_ops.fake_quant_with_min_max_args( |
| in_tensor + in_tensor, min=0., max=1., name='output') |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| converter.inference_type = lite_constants.QUANTIZED_UINT8 |
| converter.quantized_input_stats = {'Placeholder': (0., 1.)} # mean, std_dev |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual(np.uint8, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual('output', output_details[0]['name']) |
| self.assertEqual(np.uint8, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all()) |
| |
| def testReusingConverterWithDifferentPostTrainingQuantization(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| out_tensor = array_ops.fake_quant_with_min_max_args( |
| in_tensor + in_tensor, min=0., max=1., name='output') |
| sess = session.Session() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| |
| converter.post_training_quantize = True |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| converter.post_training_quantize = False |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| def testResizeWithShape(self): |
| with ops.Graph().as_default(): |
| # Construct a graph with a dynamically shapped input and an internal node |
| # that relies on the output of that input's shape. |
| in_tensor = array_ops.placeholder( |
| shape=[None, None], dtype=dtypes.float32) |
| in_tensor2 = [[1, 2], [3, 4]] |
| out_tensor = array_ops.reshape(in_tensor2, array_ops.shape(in_tensor)) |
| sess = session.Session() |
| |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| converter.experimental_new_converter = True |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertTrue(([1, 1] == input_details[0]['shape']).all()) |
| self.assertTrue(([-1, -1] == input_details[0]['shape_signature']).all()) |
| |
| # Resize tensor and invoke. |
| interpreter.resize_tensor_input(0, [4]) |
| interpreter.allocate_tensors() |
| interpreter.invoke() |
| |
| # The output should be reshaped properly according to the resized input. |
| output_details = interpreter.get_output_details() |
| self.assertLen(output_details, 1) |
| self.assertEqual(np.int32, output_details[0]['dtype']) |
| self.assertTrue(([4] == output_details[0]['shape']).all()) |
| output_data = interpreter.get_tensor(output_details[0]['index']) |
| self.assertTrue(([1, 2, 3, 4] == output_data).all()) |
| |
| def testResizingIntermediateDynamicTensor(self): |
| # This is a regression test for the case where shape of dynamic output |
| # tensors changes between invocations. |
| # See also https://github.com/tensorflow/tensorflow/issues/26549 |
| with ops.Graph().as_default(): |
| input_tensor = array_ops.placeholder(shape=[1, 1], dtype=dtypes.float32) |
| input2_tensor = array_ops.placeholder(shape=[1], dtype=dtypes.float32) |
| |
| # The bug is triggered only when dynamic tensor is intermediate. Putting |
| # some other ops around it. |
| neg = math_ops.negative(input2_tensor) |
| padding = array_ops.placeholder(shape=[2, 2], dtype=dtypes.int32) |
| output_tensor = array_ops.pad(input_tensor, padding) + neg |
| |
| sess = session.Session() |
| |
| converter = lite.TFLiteConverter.from_session( |
| sess, [input_tensor, padding, input2_tensor], [output_tensor]) |
| tflite_model = converter.convert() |
| |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| interpreter.set_tensor(input_details[1]['index'], |
| np.array([[1, 1], [1, 1]], dtype=np.int32)) |
| interpreter.invoke() |
| |
| # Without the fix, invocation will fail when changing the shape of |
| # intermediate dynamic tensors. |
| interpreter.set_tensor(input_details[1]['index'], |
| np.array([[2, 2], [2, 2]], dtype=np.int32)) |
| interpreter.invoke() |
| |
| def testGraphDebugInfo(self): |
| """Test a session has debug info captured.""" |
| |
| @def_function.function |
| def plus_placeholder(x, placeholder): |
| return x + placeholder |
| |
| with ops.Graph().as_default(): |
| placeholder = array_ops.placeholder( |
| dtype=dtypes.float32, shape=[1], name='input') |
| variable_node = variables.Variable(1.0, name='variable_node') |
| defun_node = plus_placeholder(variable_node, placeholder) |
| output_node = math_ops.multiply(defun_node, 2.0, name='output_node') |
| |
| # Initialize variables in the model. |
| sess = session.Session() |
| sess.run(variables.variables_initializer([variable_node])) |
| |
| converter = lite.TFLiteConverter.from_session(sess, [placeholder], |
| [output_node]) |
| converter.convert() |
| self.assertValidDebugInfo(converter._debug_info) |
| |
| # Check the add node in the inlined function is included. |
| func = sess.graph.as_graph_def().library.function[0].signature.name |
| self.assertIn(('add@' + six.ensure_str(func)), converter._debug_info.traces) |
| |
| |
| class FromFrozenGraphFile(LiteTest): |
| |
| def testFloat(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| _ = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Write graph to file. |
| graph_def_file = os.path.join(self.get_temp_dir(), 'model.pb') |
| write_graph(sess.graph_def, '', graph_def_file, False) |
| sess.close() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_frozen_graph(graph_def_file, |
| ['Placeholder'], ['add']) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual('add', output_details[0]['name']) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| def testFloatWithShapesArray(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| _ = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Write graph to file. |
| graph_def_file = os.path.join(self.get_temp_dir(), 'model.pb') |
| write_graph(sess.graph_def, '', graph_def_file, False) |
| sess.close() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_frozen_graph( |
| graph_def_file, ['Placeholder'], ['add'], |
| input_shapes={'Placeholder': [1, 16, 16, 3]}) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| |
| def testFreezeGraph(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| var = variable_scope.get_variable( |
| 'weights', shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| _ = in_tensor + var |
| sess = session.Session() |
| |
| # Write graph to file. |
| graph_def_file = os.path.join(self.get_temp_dir(), 'model.pb') |
| write_graph(sess.graph_def, '', graph_def_file, False) |
| sess.close() |
| |
| # Ensure the graph with variables cannot be converted. |
| with self.assertRaises(ValueError) as error: |
| lite.TFLiteConverter.from_frozen_graph(graph_def_file, ['Placeholder'], |
| ['add']) |
| self.assertEqual('Please freeze the graph using freeze_graph.py.', |
| str(error.exception)) |
| |
| def testPbtxt(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| _ = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Write graph to file. |
| graph_def_file = os.path.join(self.get_temp_dir(), 'model.pbtxt') |
| write_graph(sess.graph_def, '', graph_def_file, True) |
| sess.close() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_frozen_graph(graph_def_file, |
| ['Placeholder'], ['add']) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual('add', output_details[0]['name']) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| def testInvalidFileNotFound(self): |
| with self.assertRaises(IOError) as error: |
| lite.TFLiteConverter.from_frozen_graph('invalid_file', ['Placeholder'], |
| ['add']) |
| self.assertEqual('File \'invalid_file\' does not exist.', |
| str(error.exception)) |
| |
| def testInvalidFileBadData(self): |
| graph_def_file = os.path.join(self.get_temp_dir(), 'invalid_file') |
| with gfile.Open(graph_def_file, 'wb') as temp_file: |
| temp_file.write('bad data') |
| temp_file.flush() |
| |
| # Attempts to convert the invalid model. |
| with self.assertRaises(IOError) as error: |
| lite.TFLiteConverter.from_frozen_graph(graph_def_file, ['Placeholder'], |
| ['add']) |
| self.assertEqual( |
| 'Unable to parse input file \'{}\'.'.format(graph_def_file), |
| str(error.exception)) |
| |
| def testFloatTocoConverter(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| _ = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Write graph to file. |
| graph_def_file = os.path.join(self.get_temp_dir(), 'model.pb') |
| write_graph(sess.graph_def, '', graph_def_file, False) |
| sess.close() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TocoConverter.from_frozen_graph(graph_def_file, |
| ['Placeholder'], ['add']) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Ensure the model is able to load. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| def testGraphDebugInfo(self): |
| """Test a frozen graph doesn't have debug info captured.""" |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder( |
| shape=[1, 16, 16, 3], dtype=dtypes.float32) |
| _ = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Write graph to file. |
| graph_def_file = os.path.join(self.get_temp_dir(), 'model.pb') |
| write_graph(sess.graph_def, '', graph_def_file, False) |
| sess.close() |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TocoConverter.from_frozen_graph(graph_def_file, |
| ['Placeholder'], ['add']) |
| converter.convert() |
| # GraphDebugInfo should be none for frozen graph. |
| self.assertTrue(not converter._debug_info) |
| |
| |
| class FromFrozenGraphObjectDetection(LiteTest): |
| |
| def _initObjectDetectionArgs(self): |
| # Initializes the arguments required for the object detection model. |
| # Looks for the model file which is saved in a different location internally |
| # and externally. |
| filename = resource_loader.get_path_to_datafile('testdata/tflite_graph.pb') |
| if not os.path.exists(filename): |
| filename = os.path.join( |
| resource_loader.get_root_dir_with_all_resources(), |
| '../tflite_mobilenet_ssd_quant_protobuf/tflite_graph.pb') |
| if not os.path.exists(filename): |
| raise IOError("File '{0}' does not exist.".format(filename)) |
| |
| self._graph_def_file = filename |
| self._input_arrays = ['normalized_input_image_tensor'] |
| self._output_arrays = [ |
| 'TFLite_Detection_PostProcess', 'TFLite_Detection_PostProcess:1', |
| 'TFLite_Detection_PostProcess:2', 'TFLite_Detection_PostProcess:3' |
| ] |
| self._input_shapes = {'normalized_input_image_tensor': [1, 300, 300, 3]} |
| |
| def testTFLiteGraphDef(self): |
| # Tests the object detection model that cannot be loaded in TensorFlow. |
| self._initObjectDetectionArgs() |
| |
| converter = lite.TFLiteConverter.from_frozen_graph(self._graph_def_file, |
| self._input_arrays, |
| self._output_arrays, |
| self._input_shapes) |
| converter.allow_custom_ops = True |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('normalized_input_image_tensor', input_details[0]['name']) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 300, 300, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(4, len(output_details)) |
| self.assertEqual('TFLite_Detection_PostProcess', output_details[0]['name']) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 10, 4] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| self.assertEqual('TFLite_Detection_PostProcess:1', |
| output_details[1]['name']) |
| self.assertTrue(([1, 10] == output_details[1]['shape']).all()) |
| self.assertEqual('TFLite_Detection_PostProcess:2', |
| output_details[2]['name']) |
| self.assertTrue(([1, 10] == output_details[2]['shape']).all()) |
| self.assertEqual('TFLite_Detection_PostProcess:3', |
| output_details[3]['name']) |
| self.assertTrue(([1] == output_details[3]['shape']).all()) |
| |
| |
| class FromSavedModelTest(TestModels): |
| |
| def _createSavedModel(self, shape): |
| """Create a simple SavedModel.""" |
| saved_model_dir = os.path.join(self.get_temp_dir(), 'simple_savedmodel') |
| with ops.Graph().as_default(): |
| with session.Session() as sess: |
| in_tensor_1 = array_ops.placeholder( |
| shape=shape, dtype=dtypes.float32, name='inputB') |
| in_tensor_2 = array_ops.placeholder( |
| shape=shape, dtype=dtypes.float32, name='inputA') |
| out_tensor = in_tensor_1 + in_tensor_2 |
| inputs = {'x': in_tensor_1, 'y': in_tensor_2} |
| outputs = {'z': out_tensor} |
| saved_model.simple_save(sess, saved_model_dir, inputs, outputs) |
| return saved_model_dir |
| |
| def testSimpleModel(self): |
| """Test a SavedModel.""" |
| saved_model_dir = self._createSavedModel(shape=[1, 16, 16, 3]) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_saved_model(saved_model_dir) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(2, len(input_details)) |
| self.assertStartsWith(input_details[0]['name'], 'inputA') |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| self.assertStartsWith(input_details[1]['name'], 'inputB') |
| self.assertEqual(np.float32, input_details[1]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[1]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[1]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertStartsWith(output_details[0]['name'], 'add') |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| def testOldConverterWarning(self): |
| """Test if the warning message when using TOCO is logged.""" |
| saved_model_dir = self._createSavedModel(shape=[1, 16, 16, 3]) |
| log = io.BytesIO() if six.PY2 else io.StringIO() |
| handler = logging.StreamHandler(log) |
| logging.root.addHandler(handler) |
| warning_message = 'Please consider switching to the new converter' |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_saved_model(saved_model_dir) |
| converter.experimental_new_converter = False |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| self.assertIn(warning_message, log.getvalue()) |
| logging.root.removeHandler(handler) |
| |
| def testNewConverterOptOut(self): |
| """Test if the opt out message when using New converter is logged.""" |
| saved_model_dir = self._createSavedModel(shape=[1, 16, 16, 3]) |
| log = io.BytesIO() if six.PY2 else io.StringIO() |
| handler = logging.StreamHandler(log) |
| logging.root.addHandler(handler) |
| optout_message = ('Using experimental converter: ' |
| 'If you encountered a problem') |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverter.from_saved_model(saved_model_dir) |
| converter.experimental_new_converter = True |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| self.assertIn(optout_message, log.getvalue()) |
| logging.root.removeHandler(handler) |
| |
| def testNoneBatchSize(self): |
| """Test a SavedModel, with None in input tensor's shape.""" |
| saved_model_dir = self._createSavedModel(shape=[None, 16, 16, 3]) |
| |
| converter = lite.TFLiteConverter.from_saved_model(saved_model_dir) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(2, len(input_details)) |
| self.assertStartsWith(input_details[0]['name'], 'inputA') |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| self.assertStartsWith(input_details[1]['name'], 'inputB') |
| self.assertEqual(np.float32, input_details[1]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[1]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[1]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertStartsWith(output_details[0]['name'], 'add') |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| def testOrderInputArrays(self): |
| """Test a SavedModel ordering of input arrays.""" |
| saved_model_dir = self._createSavedModel(shape=[1, 16, 16, 3]) |
| |
| converter = lite.TFLiteConverter.from_saved_model( |
| saved_model_dir, input_arrays=['inputB', 'inputA']) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(2, len(input_details)) |
| self.assertStartsWith(input_details[0]['name'], 'inputA') |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| self.assertStartsWith(input_details[1]['name'], 'inputB') |
| self.assertEqual(np.float32, input_details[1]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == input_details[1]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[1]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertStartsWith(output_details[0]['name'], 'add') |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 16, 16, 3] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| def testSubsetInputArrays(self): |
| """Test a SavedModel with a subset of the input array names of the model.""" |
| saved_model_dir = self._createSavedModel(shape=[1, 16, 16, 3]) |
| |
| # Check case where input shape is given. |
| converter = lite.TFLiteConverter.from_saved_model( |
| saved_model_dir, |
| input_arrays=['inputA'], |
| input_shapes={'inputA': [1, 16, 16, 3]}) |
| |
| # Since we only partially specify the input, this is not allowed. |
| with self.assertRaises(ConverterError): |
| _ = converter.convert() |
| |
| # Check case where input shape is None. |
| converter = lite.TFLiteConverter.from_saved_model( |
| saved_model_dir, input_arrays=['inputA'], input_shapes={'inputA': None}) |
| |
| # Since we only partially specify the input, this is not allowed. |
| with self.assertRaises(ConverterError): |
| _ = converter.convert() |
| |
| def testSimpleModelTocoConverter(self): |
| """Test a SavedModel with deprecated TocoConverter.""" |
| saved_model_dir = self._createSavedModel(shape=[1, 16, 16, 3]) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TocoConverter.from_saved_model(saved_model_dir) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Ensure the model is able to load. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| def testGraphDebugInfo(self): |
| """Test a SavedModel has debug info captured.""" |
| saved_model_dir = self._createSavedModel(shape=[1, 16, 16, 3]) |
| converter = lite.TFLiteConverter.from_saved_model(saved_model_dir) |
| converter.convert() |
| self.assertValidDebugInfo(converter._debug_info) |
| |
| |
| class MyAddLayer(keras.layers.Layer): |
| |
| def __init__(self, increment, **kwargs): |
| super(MyAddLayer, self).__init__(**kwargs) |
| self._increment = increment |
| |
| def call(self, inputs): |
| return inputs + self._increment |
| |
| def get_config(self): |
| config = super(MyAddLayer, self).get_config() |
| config['increment'] = self._increment |
| return config |
| |
| |
| class FromKerasFile(TestModels, parameterized.TestCase): |
| |
| def setUp(self): |
| super(FromKerasFile, self).setUp() |
| self._keras_file = None |
| self._custom_objects = None |
| if not context.executing_eagerly(): |
| keras.backend.clear_session() |
| |
| def tearDown(self): |
| if self._keras_file: |
| os.remove(self._keras_file) |
| super(FromKerasFile, self).tearDown() |
| |
| def _getSequentialModel(self, include_custom_layer=False): |
| model = keras.models.Sequential() |
| model.add(keras.layers.Dense(2, input_shape=(3,))) |
| if include_custom_layer: |
| model.add(MyAddLayer(1.0)) |
| model.add(keras.layers.RepeatVector(3)) |
| model.add(keras.layers.TimeDistributed(keras.layers.Dense(3))) |
| model.compile( |
| loss=keras.losses.MSE, |
| optimizer='sgd', |
| metrics=[keras.metrics.categorical_accuracy], |
| sample_weight_mode='temporal') |
| x = np.random.random((1, 3)) |
| y = np.random.random((1, 3, 3)) |
| model.train_on_batch(x, y) |
| model.predict(x) |
| |
| try: |
| fd, self._keras_file = tempfile.mkstemp('.h5') |
| keras.models.save_model(model, self._keras_file) |
| finally: |
| os.close(fd) |
| |
| if include_custom_layer: |
| self._custom_objects = {'MyAddLayer': MyAddLayer} |
| |
| @parameterized.named_parameters(('_graph', context.graph_mode), |
| ('_eager', context.eager_mode)) |
| def testSequentialModel(self, test_context): |
| """Test a Sequential tf.keras model with default inputs.""" |
| with test_context(): |
| self._getSequentialModel() |
| |
| converter = lite.TFLiteConverter.from_keras_model_file(self._keras_file) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check tensor details of converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertEndsWith(input_details[0]['name'], 'dense_input') |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertLen(output_details, 1) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 3, 3] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| # Check inference of converted model. |
| input_data = np.array([[1, 2, 3]], dtype=np.float32) |
| interpreter.set_tensor(input_details[0]['index'], input_data) |
| interpreter.invoke() |
| tflite_result = interpreter.get_tensor(output_details[0]['index']) |
| |
| keras_model = keras.models.load_model(self._keras_file) |
| keras_result = keras_model.predict(input_data) |
| |
| np.testing.assert_almost_equal(tflite_result, keras_result, 5) |
| |
| @parameterized.named_parameters(('_graph', context.graph_mode), |
| ('_eager', context.eager_mode)) |
| def testCustomLayer(self, test_context): |
| """Test a Sequential tf.keras model with default inputs.""" |
| with test_context(): |
| self._getSequentialModel(include_custom_layer=True) |
| |
| converter = lite.TFLiteConverter.from_keras_model_file( |
| self._keras_file, custom_objects=self._custom_objects) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check tensor details of converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| output_details = interpreter.get_output_details() |
| |
| # Check inference of converted model. |
| input_data = np.array([[1, 2, 3]], dtype=np.float32) |
| interpreter.set_tensor(input_details[0]['index'], input_data) |
| interpreter.invoke() |
| tflite_result = interpreter.get_tensor(output_details[0]['index']) |
| |
| keras_model = keras.models.load_model( |
| self._keras_file, custom_objects=self._custom_objects) |
| keras_result = keras_model.predict(input_data) |
| |
| np.testing.assert_almost_equal(tflite_result, keras_result, 5) |
| |
| def testSequentialModelInputArray(self): |
| """Test a Sequential tf.keras model testing input arrays argument.""" |
| ops.disable_eager_execution() |
| self._getSequentialModel() |
| |
| # Invalid input array raises error. |
| with self.assertRaises(ValueError) as error: |
| lite.TFLiteConverter.from_keras_model_file( |
| self._keras_file, input_arrays=['invalid-input']) |
| self.assertEqual("Invalid tensors 'invalid-input' were found.", |
| str(error.exception)) |
| |
| # Valid input array. |
| converter = lite.TFLiteConverter.from_keras_model_file( |
| self._keras_file, input_arrays=['dense_input']) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| def testSequentialModelInputShape(self): |
| """Test a Sequential tf.keras model testing input shapes argument.""" |
| self._getSequentialModel() |
| |
| # Passing in shape of invalid input array raises error. |
| with self.assertRaises(ValueError) as error: |
| converter = lite.TFLiteConverter.from_keras_model_file( |
| self._keras_file, input_shapes={'invalid-input': [2, 3]}) |
| self.assertEqual( |
| "Invalid tensor 'invalid-input' found in tensor shapes map.", |
| str(error.exception)) |
| |
| # Passing in shape of valid input array. |
| converter = lite.TFLiteConverter.from_keras_model_file( |
| self._keras_file, input_shapes={'dense_input': [2, 3]}) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check input shape from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertEndsWith(input_details[0]['name'], 'dense_input') |
| self.assertTrue(([2, 3] == input_details[0]['shape']).all()) |
| |
| def testSequentialModelOutputArray(self): |
| """Test a Sequential tf.keras model testing output arrays argument.""" |
| ops.disable_eager_execution() |
| self._getSequentialModel() |
| |
| # Invalid output array raises error. |
| with self.assertRaises(ValueError) as error: |
| lite.TFLiteConverter.from_keras_model_file( |
| self._keras_file, output_arrays=['invalid-output']) |
| self.assertEqual("Invalid tensors 'invalid-output' were found.", |
| str(error.exception)) |
| |
| # Valid output array. |
| converter = lite.TFLiteConverter.from_keras_model_file( |
| self._keras_file, output_arrays=['time_distributed/Reshape_1']) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| @parameterized.named_parameters(('_graph', context.graph_mode), |
| ('_eager', context.eager_mode)) |
| def testFunctionalModel(self, test_context): |
| """Test a Functional tf.keras model with default inputs.""" |
| with test_context(): |
| inputs = keras.layers.Input(shape=(3,), name='input') |
| x = keras.layers.Dense(2)(inputs) |
| output = keras.layers.Dense(3)(x) |
| |
| model = keras.models.Model(inputs, output) |
| model.compile( |
| loss=keras.losses.MSE, |
| optimizer='sgd', |
| metrics=[keras.metrics.categorical_accuracy]) |
| x = np.random.random((1, 3)) |
| y = np.random.random((1, 3)) |
| model.train_on_batch(x, y) |
| |
| model.predict(x) |
| fd, self._keras_file = tempfile.mkstemp('.h5') |
| try: |
| keras.models.save_model(model, self._keras_file) |
| finally: |
| os.close(fd) |
| |
| # Convert to TFLite model. |
| converter = lite.TFLiteConverter.from_keras_model_file(self._keras_file) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check tensor details of converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertEqual('input', input_details[0]['name']) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertLen(output_details, 1) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 3] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| # Check inference of converted model. |
| input_data = np.array([[1, 2, 3]], dtype=np.float32) |
| interpreter.set_tensor(input_details[0]['index'], input_data) |
| interpreter.invoke() |
| tflite_result = interpreter.get_tensor(output_details[0]['index']) |
| |
| keras_model = keras.models.load_model(self._keras_file) |
| keras_result = keras_model.predict(input_data) |
| |
| np.testing.assert_almost_equal(tflite_result, keras_result, 5) |
| |
| def testFunctionalModelMultipleInputs(self): |
| """Test a Functional tf.keras model with multiple inputs and outputs.""" |
| a = keras.layers.Input(shape=(3,), name='input_a') |
| b = keras.layers.Input(shape=(3,), name='input_b') |
| dense = keras.layers.Dense(4, name='dense') |
| c = dense(a) |
| d = dense(b) |
| e = keras.layers.Dropout(0.5, name='dropout')(c) |
| |
| model = keras.models.Model([a, b], [d, e]) |
| model.compile( |
| loss=keras.losses.MSE, |
| optimizer='sgd', |
| metrics=[keras.metrics.mae], |
| loss_weights=[1., 0.5]) |
| |
| input_a_np = np.random.random((10, 3)) |
| input_b_np = np.random.random((10, 3)) |
| output_d_np = np.random.random((10, 4)) |
| output_e_np = np.random.random((10, 4)) |
| model.train_on_batch([input_a_np, input_b_np], [output_d_np, output_e_np]) |
| |
| model.predict([input_a_np, input_b_np], batch_size=5) |
| fd, self._keras_file = tempfile.mkstemp('.h5') |
| try: |
| keras.models.save_model(model, self._keras_file) |
| finally: |
| os.close(fd) |
| |
| # Convert to TFLite model. |
| converter = lite.TFLiteConverter.from_keras_model_file(self._keras_file) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 2) |
| self.assertEndsWith(input_details[0]['name'], 'input_a') |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| self.assertEndsWith(input_details[1]['name'], 'input_b') |
| self.assertEqual(np.float32, input_details[1]['dtype']) |
| self.assertTrue(([1, 3] == input_details[1]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[1]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertLen(output_details, 2) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 4] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| self.assertEqual(np.float32, output_details[1]['dtype']) |
| self.assertTrue(([1, 4] == output_details[1]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[1]['quantization']) |
| |
| def testFunctionalSequentialModel(self): |
| """Test a Functional tf.keras model containing a Sequential model.""" |
| model = keras.models.Sequential() |
| model.add(keras.layers.Dense(2, input_shape=(3,))) |
| model.add(keras.layers.RepeatVector(3)) |
| model.add(keras.layers.TimeDistributed(keras.layers.Dense(3))) |
| model = keras.models.Model(model.input, model.output) |
| |
| model.compile( |
| loss=keras.losses.MSE, |
| optimizer='sgd', |
| metrics=[keras.metrics.categorical_accuracy], |
| sample_weight_mode='temporal') |
| x = np.random.random((1, 3)) |
| y = np.random.random((1, 3, 3)) |
| model.train_on_batch(x, y) |
| model.predict(x) |
| |
| model.predict(x) |
| fd, self._keras_file = tempfile.mkstemp('.h5') |
| try: |
| keras.models.save_model(model, self._keras_file) |
| finally: |
| os.close(fd) |
| |
| # Convert to TFLite model. |
| converter = lite.TFLiteConverter.from_keras_model_file(self._keras_file) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Check tensor details of converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertEndsWith(input_details[0]['name'], 'dense_input') |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([1, 3] == input_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertLen(output_details, 1) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([1, 3, 3] == output_details[0]['shape']).all()) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| # Check inference of converted model. |
| input_data = np.array([[1, 2, 3]], dtype=np.float32) |
| interpreter.set_tensor(input_details[0]['index'], input_data) |
| interpreter.invoke() |
| tflite_result = interpreter.get_tensor(output_details[0]['index']) |
| |
| keras_model = keras.models.load_model(self._keras_file) |
| keras_result = keras_model.predict(input_data) |
| |
| np.testing.assert_almost_equal(tflite_result, keras_result, 5) |
| |
| def testSequentialModelTocoConverter(self): |
| """Test a Sequential tf.keras model with deprecated TocoConverter.""" |
| self._getSequentialModel() |
| |
| converter = lite.TocoConverter.from_keras_model_file(self._keras_file) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Ensure the model is able to load. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| @parameterized.named_parameters(('_graph', context.graph_mode), |
| ('_eager', context.eager_mode)) |
| def testGraphDebugInfo(self, test_context): |
| """Test a Sequential tf.keras model has debug info captured.""" |
| with test_context(): |
| self._getSequentialModel() |
| converter = lite.TFLiteConverter.from_keras_model_file(self._keras_file) |
| converter.convert() |
| self.assertValidDebugInfo(converter._debug_info) |
| |
| def testExperimentalSparsifyModel(self): |
| self._getSequentialModel() |
| |
| converter = lite.TocoConverter.from_keras_model_file(self._keras_file) |
| converter._experimental_sparsify_model = True |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| |
| class GrapplerTest(TestModels, parameterized.TestCase): |
| |
| def testConstantFolding(self): |
| ops.disable_eager_execution() |
| # Constant folding handles the tf.broadcast_to operation which was not |
| # supported by the TFLite at the time this test was added. |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder(shape=[3, 3], dtype=dtypes.float32) |
| y_const = constant_op.constant([1., 2., 3.]) |
| y_broadcast = gen_array_ops.broadcast_to(y_const, [3, 3]) |
| out_tensor = math_ops.matmul(in_tensor, y_broadcast, name='output') |
| sess = session.Session() |
| |
| # Convert model. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertEqual(1, len(input_details)) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertTrue(([3, 3] == input_details[0]['shape']).all()) |
| |
| output_details = interpreter.get_output_details() |
| self.assertEqual(1, len(output_details)) |
| self.assertEqual('output', output_details[0]['name']) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue(([3, 3] == output_details[0]['shape']).all()) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| def testInputNodeIsNotFolded(self, enable_mlir): |
| ops.disable_eager_execution() |
| # Constant folding handles the tf.broadcast_to operation which was not |
| # supported by the TFLite at the time this test was added. |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder(shape=[3], dtype=dtypes.float32) |
| y_const = constant_op.constant([1., 2., 3.]) |
| y_add = y_const + y_const |
| out_tensor = in_tensor * y_add |
| sess = session.Session() |
| |
| # Convert model. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor, y_const], |
| [out_tensor]) |
| converter.experimental_new_converter = enable_mlir |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 2) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| self.assertEqual('Const', input_details[1]['name']) |
| |
| def testGrapplerConstFolding(self): |
| # Constant folding converts the following add operation to tf.broadcast_to |
| # operation which was not supported by the TFLite at the time this test was |
| # added. |
| @def_function.function |
| def plus_placeholder(x, placeholder): |
| return x + placeholder |
| |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder(shape=[2, 2], dtype=dtypes.float32) |
| out_tensor = plus_placeholder( |
| array_ops.zeros([2, 2, 2]), |
| array_ops.reshape(in_tensor, shape=[2, 2])) |
| sess = session.Session() |
| |
| # Convert model. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| # Only disable this path in MLIR conversion for toco compatibility. |
| converter.experimental_new_converter = True |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertEqual('Placeholder', input_details[0]['name']) |
| |
| |
| class ImportOpsUtilTest(LiteTest): |
| |
| def testGetPotentiallySupportedOps(self): |
| self.assertIsNotNone(lite.get_potentially_supported_ops()) |
| |
| |
| class DefaultConverterAttrsTest(LiteTest): |
| |
| def testAttrs(self): |
| with ops.Graph().as_default(): |
| in_tensor = array_ops.placeholder(shape=[2, 2], dtype=dtypes.float32) |
| out_tensor = in_tensor + in_tensor |
| sess = session.Session() |
| |
| # Convert model. |
| converter = lite.TFLiteConverter.from_session(sess, [in_tensor], |
| [out_tensor]) |
| |
| # Assert output format. |
| self.assertEqual(converter.output_format, lite_constants.TFLITE) |
| |
| # Assert the default inference type is float. |
| self.assertEqual(converter.inference_type, lite_constants.FLOAT) |
| |
| # Assert the default inference type overrides are None. |
| self.assertIsNone(converter.inference_input_type) |
| self.assertIsNone(converter.inference_output_type) |
| |
| # Assert the default quantization options are not set. |
| self.assertEqual(converter.quantized_input_stats, {}) |
| self.assertIsNone(converter.default_ranges_stats) |
| self.assertFalse(converter.reorder_across_fake_quant) |
| self.assertFalse(converter.change_concat_input_ranges) |
| |
| # Assert dropping control dependency is enabled by default. |
| self.assertTrue(converter.drop_control_dependency) |
| |
| # Assert dumping extra information is disabled by default. |
| self.assertIsNone(converter.dump_graphviz_dir) |
| self.assertFalse(converter.dump_graphviz_video) |
| self.assertIsNone(converter.conversion_summary_dir) |
| |
| |
| if __name__ == '__main__': |
| test.main() |