| # Lint as: python2, python3 |
| # Copyright 2019 The TensorFlow Authors. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============================================================================== |
| """Tests for lite.py functionality related to TensorFlow 2.0.""" |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import os |
| |
| from absl.testing import parameterized |
| import numpy as np |
| from six.moves import range |
| from six.moves import zip |
| import tensorflow as tf |
| |
| from tensorflow.lite.python import lite |
| from tensorflow.lite.python import lite_v2_test_util |
| from tensorflow.lite.python.convert import mlir_quantize |
| from tensorflow.lite.python.interpreter import Interpreter |
| from tensorflow.lite.toco import types_pb2 as _types_pb2 |
| from tensorflow.python.framework import dtypes |
| from tensorflow.python.framework import ops |
| from tensorflow.python.framework import test_util |
| from tensorflow.python.lib.io import file_io |
| from tensorflow.python.platform import test |
| from tensorflow.python.saved_model import save_options |
| from tensorflow.python.saved_model import saved_model |
| from tensorflow.python.saved_model.loader_impl import parse_saved_model |
| from tensorflow.python.saved_model.save import save |
| from tensorflow.python.training.tracking import tracking |
| |
| |
| class FromConcreteFunctionTest(lite_v2_test_util.ModelTest): |
| |
| @test_util.run_v2_only |
| def testTypeInvalid(self): |
| root = self._getSimpleVariableModel() |
| with self.assertRaises(ValueError) as error: |
| _ = lite.TFLiteConverterV2.from_concrete_functions([root.f]) |
| self.assertIn('call get_concrete_function', str(error.exception)) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| @test_util.run_v2_only |
| def testFloat(self, enable_mlir_converter): |
| root = self._getSimpleVariableModel() |
| input_data = tf.constant(1., shape=[1]) |
| concrete_func = root.f.get_concrete_function(input_data) |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| converter.experimental_new_converter = enable_mlir_converter |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.f(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value.numpy(), actual_value) |
| |
| @parameterized.named_parameters( |
| ('_INT8InputOutput', dtypes.int8), |
| ('_UINT8InputOutput', dtypes.uint8), |
| ('_INT16InputOutput', dtypes.int16)) |
| @test_util.run_v2_only |
| def testInvalidFloat(self, inference_input_output_type): |
| root = self._getSimpleVariableModel() |
| input_data = tf.constant(1., shape=[1]) |
| concrete_func = root.f.get_concrete_function(input_data) |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| with self.assertRaises(ValueError) as error: |
| converter.inference_input_type = inference_input_output_type |
| converter.inference_output_type = inference_input_output_type |
| converter.convert() |
| self.assertEqual( |
| 'The inference_input_type and inference_output_type ' |
| 'must be tf.float32.', str(error.exception)) |
| |
| @test_util.run_v2_only |
| def testScalarInput(self): |
| root = self._getSimpleVariableModel() |
| input_data = tf.constant(1., shape=[]) |
| concrete_func = root.f.get_concrete_function(input_data) |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.f(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value.numpy(), actual_value) |
| |
| @test_util.run_v2_only |
| def testMultiFunctionModel(self): |
| """Convert a single model in a multi-functional model.""" |
| root = self._getMultiFunctionModel() |
| input_data = tf.constant(1., shape=[1]) |
| concrete_func = root.add.get_concrete_function(input_data) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.add(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value.numpy(), actual_value) |
| |
| @test_util.run_v2_only |
| def testConvertMultipleFunctions(self): |
| """Convert multiple functions in a multi-functional model.""" |
| root = self._getMultiFunctionModel() |
| input_data = tf.constant(1., shape=[1]) |
| add_func = root.add.get_concrete_function(input_data) |
| sub_func = root.sub.get_concrete_function(input_data) |
| |
| # Try converting multiple functions. |
| converter = lite.TFLiteConverterV2.from_concrete_functions( |
| [add_func, sub_func]) |
| with self.assertRaises(ValueError) as error: |
| _ = converter.convert() |
| self.assertIn('can only convert a single ConcreteFunction', |
| str(error.exception)) |
| |
| def _getIntegerQuantizeModel(self): |
| np.random.seed(0) |
| |
| root = tracking.AutoTrackable() |
| |
| @tf.function( |
| input_signature=[tf.TensorSpec(shape=[1, 5, 5, 3], dtype=tf.float32)]) |
| def func(inp): |
| conv = tf.nn.conv2d( |
| inp, tf.ones([3, 3, 3, 16]), strides=[1, 1, 1, 1], padding='SAME') |
| output = tf.nn.relu(conv, name='output') |
| return output |
| |
| def calibration_gen(): |
| for _ in range(5): |
| yield [np.random.uniform(-1, 1, size=(1, 5, 5, 3)).astype(np.float32)] |
| |
| root.f = func |
| to_save = root.f.get_concrete_function() |
| return (to_save, calibration_gen) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirQuantizer', True), # enable mlir quantizer |
| ('DisableMlirQuantizer', False)) # disable mlir quantizer |
| def testPostTrainingCalibrateAndQuantize(self, mlir_quantizer): |
| func, calibration_gen = self._getIntegerQuantizeModel() |
| |
| # Convert float model. |
| float_converter = lite.TFLiteConverterV2.from_concrete_functions([func]) |
| float_tflite_model = float_converter.convert() |
| self.assertIsNotNone(float_tflite_model) |
| |
| # Convert quantized model. |
| quantized_converter = lite.TFLiteConverterV2.from_concrete_functions([func]) |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_converter.representative_dataset = calibration_gen |
| quantized_converter._experimental_new_quantizer = mlir_quantizer |
| quantized_tflite_model = quantized_converter.convert() |
| self.assertIsNotNone(quantized_tflite_model) |
| |
| # The default input and output types should be float. |
| interpreter = Interpreter(model_content=quantized_tflite_model) |
| interpreter.allocate_tensors() |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| output_details = interpreter.get_output_details() |
| self.assertLen(output_details, 1) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| |
| # Ensure that the quantized weights tflite model is smaller. |
| self.assertLess(len(quantized_tflite_model), len(float_tflite_model)) |
| |
| @parameterized.named_parameters( |
| ('_INT8InputOutput', dtypes.int8), |
| ('_UINT8InputOutput', dtypes.uint8), |
| ('_INT16InputOutput', dtypes.int16)) |
| @test_util.run_v2_only |
| def testInvalidPostTrainingDynamicRangeQuantization( |
| self, inference_input_output_type): |
| func, _ = self._getIntegerQuantizeModel() |
| |
| # Convert float model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([func]) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Convert quantized model. |
| quantized_converter = lite.TFLiteConverterV2.from_concrete_functions([func]) |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| with self.assertRaises(ValueError) as error: |
| quantized_converter.inference_input_type = inference_input_output_type |
| quantized_converter.inference_output_type = inference_input_output_type |
| quantized_converter.convert() |
| self.assertEqual( |
| 'The inference_input_type and inference_output_type ' |
| 'must be tf.float32.', str(error.exception)) |
| |
| @parameterized.named_parameters( |
| ('_Default', False, False, dtypes.float32), |
| ('_INT8InputOutput', False, False, dtypes.int8), |
| ('_UINT8InputOutput', False, False, dtypes.uint8), |
| ('_INT16Quantize', False, True, dtypes.float32), |
| ('_INT16Quantize_INT16InputOutput', False, True, dtypes.int16), |
| ('_IntOnly', True, False, dtypes.float32), |
| ('_IntOnly_INT8InputOutput', True, False, dtypes.int8), |
| ('_IntOnly_UINT8InputOutput', True, False, |
| dtypes.uint8), |
| ('_IntOnly_INT16Quantize', True, True, dtypes.float32), |
| ('_IntOnly_INT16Quantize_INT16InputOutput', True, True, |
| dtypes.int16)) |
| def testIntegerQuantization(self, is_int_only, is_int16_quantize, |
| inference_input_output_type): |
| func, calibration_gen = self._getIntegerQuantizeModel() |
| |
| # Convert float model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([func]) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| # Convert quantized model. |
| quantized_converter = lite.TFLiteConverterV2.from_concrete_functions([func]) |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_converter.representative_dataset = calibration_gen |
| if is_int_only: |
| if is_int16_quantize: |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.\ |
| EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8 |
| ] |
| else: |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.TFLITE_BUILTINS_INT8 |
| ] |
| else: |
| if is_int16_quantize: |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.\ |
| EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8, |
| lite.OpsSet.TFLITE_BUILTINS |
| ] |
| quantized_converter.inference_input_type = inference_input_output_type |
| quantized_converter.inference_output_type = inference_input_output_type |
| quantized_tflite_model = quantized_converter.convert() |
| self.assertIsNotNone(quantized_tflite_model) |
| |
| interpreter = Interpreter(model_content=quantized_tflite_model) |
| interpreter.allocate_tensors() |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertEqual(inference_input_output_type.as_numpy_dtype, |
| input_details[0]['dtype']) |
| output_details = interpreter.get_output_details() |
| self.assertLen(output_details, 1) |
| self.assertEqual(inference_input_output_type.as_numpy_dtype, |
| output_details[0]['dtype']) |
| |
| # Ensure that the quantized tflite model is smaller. |
| self.assertLess(len(quantized_tflite_model), len(tflite_model)) |
| |
| @parameterized.named_parameters( |
| ('_INT16Quantize_INT8InputOutput', True, dtypes.int8)) |
| def testInvalidIntegerQuantization(self, is_int16_quantize, |
| inference_input_output_type): |
| func, calibration_gen = self._getIntegerQuantizeModel() |
| |
| # Convert quantized model. |
| quantized_converter = lite.TFLiteConverterV2.from_concrete_functions([func]) |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_converter.representative_dataset = calibration_gen |
| if is_int16_quantize: |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.\ |
| EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8, |
| lite.OpsSet.TFLITE_BUILTINS |
| ] |
| with self.assertRaises(ValueError) as error: |
| quantized_converter.inference_input_type = dtypes.int8 |
| quantized_converter.inference_output_type = dtypes.int8 |
| quantized_converter.convert() |
| self.assertEqual( |
| "The inference_input_type and inference_output_type " |
| "must be in ['tf.float32', 'tf.int16'].", str(error.exception)) |
| |
| def testCalibrateAndQuantizeBuiltinInt16(self): |
| func, calibration_gen = self._getIntegerQuantizeModel() |
| |
| # Convert float model. |
| float_converter = lite.TFLiteConverterV2.from_concrete_functions([func]) |
| float_tflite_model = float_converter.convert() |
| self.assertIsNotNone(float_tflite_model) |
| |
| converter = lite.TFLiteConverterV2.from_concrete_functions([func]) |
| # TODO(b/156309549): We should add INT16 to the builtin types. |
| converter.target_spec.supported_ops = [ |
| lite.OpsSet.TFLITE_BUILTINS_INT8 |
| ] |
| converter.representative_dataset = calibration_gen |
| converter._experimental_calibrate_only = True |
| calibrated_tflite = converter.convert() |
| quantized_tflite_model = mlir_quantize( |
| calibrated_tflite, inference_type=_types_pb2.QUANTIZED_INT16) |
| |
| self.assertIsNotNone(quantized_tflite_model) |
| |
| # The default input and output types should be float. |
| interpreter = Interpreter(model_content=quantized_tflite_model) |
| interpreter.allocate_tensors() |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| output_details = interpreter.get_output_details() |
| self.assertLen(output_details, 1) |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| |
| # Ensure that the quantized weights tflite model is smaller. |
| self.assertLess(len(quantized_tflite_model), len(float_tflite_model)) |
| |
| def _getTrainingTimeQuantizedModel(self): |
| |
| class QLinear(tf.keras.layers.Layer): |
| |
| def __init__(self, units=3, **kwargs): |
| super(QLinear, self).__init__(**kwargs) |
| self.units = units |
| |
| def build(self, input_shape): |
| self.w = self.add_weight( |
| 'weight', |
| shape=(input_shape[-1], self.units), |
| initializer='random_normal', |
| trainable=True) |
| self.min_var = self.add_weight( |
| 'min', |
| initializer=tf.keras.initializers.Constant(-6.0), |
| trainable=False) |
| self.max_var = self.add_weight( |
| 'max', |
| initializer=tf.keras.initializers.Constant(6.0), |
| trainable=False) |
| |
| def call(self, inputs): |
| x = tf.quantization.fake_quant_with_min_max_vars( |
| inputs, self.min_var, self.max_var) |
| |
| w_fq = tf.quantization.fake_quant_with_min_max_vars( |
| self.w, self.min_var, self.max_var) |
| x = tf.matmul(x, w_fq) |
| |
| x = tf.quantization.fake_quant_with_min_max_vars( |
| x, self.min_var, self.max_var) |
| |
| return x |
| |
| return tf.keras.Sequential(QLinear(3, input_shape=(2,))) |
| |
| @parameterized.named_parameters( |
| ('_DefaultFLOAT32InputOutput', dtypes.float32), |
| ('_INT8InputOutput', dtypes.int8), |
| ('_UINT8InputOutput', dtypes.uint8)) |
| @test_util.run_v2_only |
| def testTrainingTimeQuantization(self, inference_input_output_type): |
| model = self._getTrainingTimeQuantizedModel() |
| |
| float_converter = lite.TFLiteConverterV2.from_keras_model(model) |
| float_tflite_model = float_converter.convert() |
| self.assertIsNotNone(float_tflite_model) |
| |
| quantized_converter = lite.TFLiteConverterV2.from_keras_model(model) |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_converter.inference_input_type = inference_input_output_type |
| quantized_converter.inference_output_type = inference_input_output_type |
| quantized_tflite_model = quantized_converter.convert() |
| self.assertIsNotNone(quantized_tflite_model) |
| |
| interpreter = Interpreter(model_content=quantized_tflite_model) |
| interpreter.allocate_tensors() |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertEqual(inference_input_output_type.as_numpy_dtype, |
| input_details[0]['dtype']) |
| output_details = interpreter.get_output_details() |
| self.assertLen(output_details, 1) |
| self.assertEqual(inference_input_output_type.as_numpy_dtype, |
| output_details[0]['dtype']) |
| |
| # Ensure that the quantized tflite model is smaller. |
| self.assertLess(len(quantized_tflite_model), len(float_tflite_model)) |
| |
| @test_util.run_v2_only |
| def testNewQuantizer(self): |
| """Test the model quantized by the new converter.""" |
| func, calibration_gen = self._getIntegerQuantizeModel() |
| |
| quantized_converter = lite.TFLiteConverterV2.from_concrete_functions([func]) |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.TFLITE_BUILTINS_INT8 |
| ] |
| quantized_converter.representative_dataset = calibration_gen |
| |
| # default quantizer |
| quantized_converter._experimental_new_quantizer = False |
| old_tflite = quantized_converter.convert() |
| |
| # new quantizer |
| quantized_converter._experimental_new_quantizer = True |
| new_tflite = quantized_converter.convert() |
| |
| for _ in range(5): |
| input_data = tf.constant( |
| np.random.uniform(-1, 1, size=(1, 5, 5, 3)).astype(np.float32)) |
| old_value = self._evaluateTFLiteModel(old_tflite, [input_data]) |
| new_value = self._evaluateTFLiteModel(new_tflite, [input_data]) |
| self.assertAllClose(old_value, new_value, atol=1e-01) |
| |
| @parameterized.named_parameters( |
| ('EnableMlirConverter', True), # enable mlir |
| ('DisableMlirConverter', False)) # disable mlir |
| @test_util.run_v2_only |
| def testEmbeddings(self, enable_mlir_converter): |
| """Test model with embeddings.""" |
| input_data = tf.constant( |
| np.array(np.random.random_sample((20)), dtype=np.int32)) |
| |
| class EmbeddingModel(tf.keras.Model): |
| |
| def __init__(self): |
| super(EmbeddingModel, self).__init__() |
| self.shared_weights = self.add_weight( |
| 'weights', |
| shape=(2000, 300), |
| dtype=tf.float32, |
| initializer=tf.random_normal_initializer( |
| mean=0.0, stddev=300**(-0.5))) |
| |
| @tf.function(input_signature=[tf.TensorSpec(shape=(20), dtype=tf.int32)]) |
| def func(self, x): |
| return tf.gather(self.shared_weights, x) |
| |
| # Building the model. |
| root = EmbeddingModel() |
| concrete_func = root.func.get_concrete_function() |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| converter.experimental_new_converter = enable_mlir_converter |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.func(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertAllClose(expected_value.numpy(), actual_value[0], atol=1e-05) |
| |
| @test_util.run_v2_only |
| def testGraphDebugInfo(self): |
| """Test a concrete function has debug info captured.""" |
| root = tracking.AutoTrackable() |
| root.v1 = tf.Variable(3.) |
| root.f = tf.function(lambda x: root.v1 * x) |
| input_data = tf.constant(1., shape=[1]) |
| concrete_func = root.f.get_concrete_function(input_data) |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| converter.convert() |
| self._assertValidDebugInfo(converter._debug_info) |
| |
| def _getIntegerQuantizationModelWithFlexOp(self): |
| np.random.seed(0) |
| |
| root = tracking.AutoTrackable() |
| |
| @tf.function(input_signature=[ |
| tf.TensorSpec(shape=[3, 3, 3, 3, 3], dtype=tf.float32) |
| ]) |
| def func(inp): |
| tanh = tf.math.tanh(inp) |
| # Flex delegate will merge the consecutive conv3d and erf ops into one |
| # Delegate node. |
| conv3d = tf.nn.conv3d( |
| tanh, |
| tf.ones([3, 3, 3, 3, 3]), |
| strides=[1, 1, 1, 1, 1], |
| padding='SAME') |
| erf = tf.math.erf(conv3d) |
| output = tf.math.tanh(erf) |
| return output |
| |
| def calibration_gen(): |
| for _ in range(5): |
| yield [ |
| np.random.uniform(-1, 1, size=(3, 3, 3, 3, 3)).astype(np.float32) |
| ] |
| |
| root.f = func |
| return (root.f.get_concrete_function(), calibration_gen) |
| |
| @parameterized.named_parameters( |
| ('_Default', False, False, dtypes.float32), |
| ('_INT8InputOutput', False, False, dtypes.int8), |
| ('_UINT8InputOutput', False, False, dtypes.uint8), |
| ('_INT16Quantize', False, True, dtypes.float32), |
| ('_INT16Quantize_INT16InputOutput', False, True, dtypes.int16), |
| ('_IntOnly', True, False, dtypes.float32), |
| ('_IntOnly_INT8InputOutput', True, False, dtypes.int8), |
| ('_IntOnly_UINT8InputOutput', True, False, dtypes.uint8), |
| ('_IntOnly_INT16Quantize', True, True, dtypes.float32), |
| ('_IntOnly_INT16Quantize_INT16InputOutput', True, True, dtypes.int16)) |
| @test_util.run_v2_only |
| def testIntegerQuantizationWithFlexOp(self, is_int_only, is_int16_quantize, |
| inference_input_output_type): |
| func, calibration_gen = self._getIntegerQuantizationModelWithFlexOp() |
| |
| quantized_converter = tf.lite.TFLiteConverter.from_concrete_functions( |
| [func]) |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_converter.representative_dataset = calibration_gen |
| if is_int_only: |
| if is_int16_quantize: |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.\ |
| EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8, |
| lite.OpsSet.SELECT_TF_OPS |
| ] |
| else: |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.TFLITE_BUILTINS_INT8, lite.OpsSet.SELECT_TF_OPS |
| ] |
| else: |
| if is_int16_quantize: |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.\ |
| EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8, |
| lite.OpsSet.TFLITE_BUILTINS, |
| lite.OpsSet.SELECT_TF_OPS |
| ] |
| else: |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.TFLITE_BUILTINS, lite.OpsSet.SELECT_TF_OPS |
| ] |
| |
| quantized_converter.inference_input_type = inference_input_output_type |
| quantized_converter.inference_output_type = inference_input_output_type |
| quantized_tflite_model = quantized_converter.convert() |
| self.assertIsNotNone(quantized_tflite_model) |
| |
| interpreter = Interpreter(model_content=quantized_tflite_model) |
| interpreter.allocate_tensors() |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertEqual(inference_input_output_type.as_numpy_dtype, |
| input_details[0]['dtype']) |
| output_details = interpreter.get_output_details() |
| self.assertLen(output_details, 1) |
| self.assertEqual(inference_input_output_type.as_numpy_dtype, |
| output_details[0]['dtype']) |
| |
| def _getIntegerQuantizationModelWithUnsupportedOps(self): |
| np.random.seed(0) |
| |
| root = tracking.AutoTrackable() |
| |
| @tf.function(input_signature=[ |
| tf.TensorSpec(shape=[3], dtype=tf.float32), |
| tf.TensorSpec(shape=[3], dtype=tf.float32) |
| ]) |
| def func(a, b): |
| # ceil kernel does not support int8 nor int16 types neither. |
| left = tf.math.ceil(a) |
| right = tf.nn.tanh(b) |
| add = tf.math.add(left, right) |
| # ceil kernel does not support int8 nor int16 types neither. |
| output = tf.math.ceil(add) |
| return (output, right) |
| |
| def calibration_gen(): |
| for _ in range(5): |
| yield [ |
| np.random.uniform(-1, 1, size=(3)).astype(np.float32), |
| np.random.uniform(-1, 1, size=(3)).astype(np.float32) |
| ] |
| |
| root.f = func |
| return (root.f.get_concrete_function(), calibration_gen) |
| |
| @parameterized.named_parameters( |
| ('_INT8InputOutput', False, False, dtypes.int8), |
| ('_UINT8InputOutput', False, False, dtypes.uint8), |
| ('_INT16Quantize_INT16InputOutput', False, True, dtypes.int16), |
| ('_IntOnly_INT8InputOutput', True, False, dtypes.int8), |
| ('_IntOnly_UINT8InputOutput', True, False, dtypes.uint8), |
| ('_IntOnly_INT16Quantize_INT16InputOutput', True, True, dtypes.int16)) |
| @test_util.run_v2_only |
| def testIntegerQuantizationWithUnsupportedOps(self, is_int_only, |
| is_int16_quantize, |
| inference_input_output_type): |
| func, calib_gen = self._getIntegerQuantizationModelWithUnsupportedOps() |
| |
| quantized_converter = tf.lite.TFLiteConverter.from_concrete_functions( |
| [func]) |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_converter.representative_dataset = calib_gen |
| if is_int_only: |
| if is_int16_quantize: |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.\ |
| EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8, |
| lite.OpsSet.TFLITE_BUILTINS |
| ] |
| else: |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.TFLITE_BUILTINS_INT8, lite.OpsSet.TFLITE_BUILTINS |
| ] |
| else: |
| if is_int16_quantize: |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.\ |
| EXPERIMENTAL_TFLITE_BUILTINS_ACTIVATIONS_INT16_WEIGHTS_INT8, |
| lite.OpsSet.TFLITE_BUILTINS |
| ] |
| else: |
| quantized_converter.target_spec.supported_ops = [ |
| lite.OpsSet.TFLITE_BUILTINS |
| ] |
| |
| quantized_converter.inference_input_type = inference_input_output_type |
| quantized_converter.inference_output_type = inference_input_output_type |
| quantized_tflite_model = quantized_converter.convert() |
| self.assertIsNotNone(quantized_tflite_model) |
| |
| interpreter = Interpreter(model_content=quantized_tflite_model) |
| interpreter.allocate_tensors() |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 2) |
| # Allow float32 for fallback. |
| self.assertEqual(input_details[0]['dtype'], dtypes.float32) |
| self.assertEqual(input_details[1]['dtype'], |
| inference_input_output_type.as_numpy_dtype) |
| output_details = interpreter.get_output_details() |
| self.assertLen(output_details, 2) |
| # Allow float32 for fallback. |
| self.assertEqual(output_details[0]['dtype'], dtypes.float32) |
| self.assertEqual(output_details[1]['dtype'], |
| inference_input_output_type.as_numpy_dtype) |
| |
| |
| class FromSavedModelTest(lite_v2_test_util.ModelTest): |
| |
| def _createV1SavedModel(self, shape): |
| """Create a simple SavedModel.""" |
| saved_model_dir = os.path.join(self.get_temp_dir(), 'simple_savedmodel') |
| with tf.Graph().as_default(): |
| with tf.compat.v1.Session() as sess: |
| in_tensor_1 = tf.compat.v1.placeholder( |
| shape=shape, dtype=tf.float32, name='inputB') |
| in_tensor_2 = tf.compat.v1.placeholder( |
| shape=shape, dtype=tf.float32, name='inputA') |
| variable_node = tf.Variable(1.0, name='variable_node') |
| out_tensor = in_tensor_1 + in_tensor_2 * variable_node |
| inputs = {'x': in_tensor_1, 'y': in_tensor_2} |
| outputs = {'z': out_tensor} |
| sess.run(tf.compat.v1.variables_initializer([variable_node])) |
| saved_model.simple_save(sess, saved_model_dir, inputs, outputs) |
| return saved_model_dir |
| |
| @test_util.run_v2_only |
| def testV1SimpleModel(self): |
| """Test a SavedModel.""" |
| with tf.Graph().as_default(): |
| saved_model_dir = self._createV1SavedModel(shape=[1, 16, 16, 3]) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_saved_model(saved_model_dir) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| interpreter = Interpreter(model_content=tflite_model) |
| interpreter.allocate_tensors() |
| |
| input_details = interpreter.get_input_details() |
| self.assertLen(input_details, 2) |
| self.assertStartsWith(input_details[0]['name'], 'inputA') |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertAllEqual([1, 16, 16, 3], input_details[0]['shape']) |
| self.assertEqual((0., 0.), input_details[0]['quantization']) |
| |
| self.assertStartsWith( |
| input_details[1]['name'], |
| 'inputB', |
| ) |
| self.assertEqual(np.float32, input_details[1]['dtype']) |
| self.assertTrue([1, 16, 16, 3], input_details[1]['shape']) |
| self.assertEqual((0., 0.), input_details[1]['quantization']) |
| |
| output_details = interpreter.get_output_details() |
| self.assertLen(output_details, 1) |
| self.assertStartsWith(output_details[0]['name'], 'add') |
| self.assertEqual(np.float32, output_details[0]['dtype']) |
| self.assertTrue([1, 16, 16, 3], output_details[0]['shape']) |
| self.assertEqual((0., 0.), output_details[0]['quantization']) |
| |
| @test_util.run_v2_only |
| def testTF1HubFormattedModel(self): |
| """Test a TF1 hub formatted model.""" |
| saved_model_dir = self._createV1SavedModel(shape=[1, 16, 16, 3]) |
| |
| # TF1 hub model is based on V1 saved model and they omit the saved model |
| # schema version setting. |
| saved_model_proto = parse_saved_model(saved_model_dir) |
| saved_model_proto.saved_model_schema_version = 0 |
| |
| saved_model_pb_file_path = os.path.join(saved_model_dir, 'saved_model.pb') |
| with file_io.FileIO(saved_model_pb_file_path, 'wb') as writer: |
| writer.write(saved_model_proto.SerializeToString()) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_saved_model(saved_model_dir) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| @test_util.run_v2_only |
| def testConstModel(self): |
| """Test a basic model with functions to make sure functions are inlined.""" |
| input_data = tf.constant(1., shape=[1]) |
| root = tracking.AutoTrackable() |
| root.f = tf.function(lambda x: 2. * x) |
| to_save = root.f.get_concrete_function(input_data) |
| |
| save_dir = os.path.join(self.get_temp_dir(), 'saved_model') |
| save(root, save_dir, to_save) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_saved_model(save_dir) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.f(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value.numpy(), actual_value) |
| |
| @test_util.run_v2_only |
| def testVariableModel(self): |
| """Test a basic model with Variables with saving/loading the SavedModel.""" |
| root = self._getSimpleVariableModel() |
| input_data = tf.constant(1., shape=[1]) |
| to_save = root.f.get_concrete_function(input_data) |
| |
| save_dir = os.path.join(self.get_temp_dir(), 'saved_model') |
| save(root, save_dir, to_save) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_saved_model(save_dir) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.f(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value.numpy(), actual_value) |
| |
| @test_util.run_v2_only |
| def testSignatures(self): |
| """Test values for `signature_keys` argument.""" |
| root = self._getSimpleVariableModel() |
| input_data = tf.constant(1., shape=[1]) |
| to_save = root.f.get_concrete_function(input_data) |
| |
| save_dir = os.path.join(self.get_temp_dir(), 'saved_model') |
| save(root, save_dir, to_save) |
| |
| # Convert model with invalid `signature_keys`. |
| with self.assertRaises(ValueError) as error: |
| _ = lite.TFLiteConverterV2.from_saved_model( |
| save_dir, signature_keys=['INVALID']) |
| self.assertIn("Invalid signature key 'INVALID'", str(error.exception)) |
| |
| # Convert model with empty `signature_keys`. |
| converter = lite.TFLiteConverterV2.from_saved_model( |
| save_dir, signature_keys=[]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.f(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value.numpy(), actual_value) |
| |
| @test_util.run_v2_only |
| def testMultipleFunctionModel(self): |
| """Convert multiple functions in a multi-functional model.""" |
| root = self._getMultiFunctionModel() |
| input_data = tf.constant(1., shape=[1]) |
| add_func = root.add.get_concrete_function(input_data) |
| sub_func = root.sub.get_concrete_function(input_data) |
| |
| save_dir = os.path.join(self.get_temp_dir(), 'saved_model') |
| save(root, save_dir, {'add': add_func, 'sub': sub_func}) |
| |
| # Try converting multiple functions. |
| with self.assertRaises(ValueError) as error: |
| _ = lite.TFLiteConverterV2.from_saved_model(save_dir) |
| self.assertIn('Only support a single signature key.', str(error.exception)) |
| |
| @test_util.run_v2_only |
| def testNoConcreteFunctionModel(self): |
| root = self._getMultiFunctionModel() |
| |
| save_dir = os.path.join(self.get_temp_dir(), 'saved_model') |
| save(root, save_dir) |
| |
| with self.assertRaises(ValueError) as error: |
| _ = lite.TFLiteConverterV2.from_saved_model(save_dir) |
| self.assertIn('Only support a single signature key.', str(error.exception)) |
| |
| @test_util.run_v2_only |
| def testKerasSequentialModel(self): |
| """Test a simple sequential tf.Keras model.""" |
| input_data = tf.constant(1., shape=[1, 1]) |
| |
| x = np.array([[1.], [2.]]) |
| y = np.array([[2.], [4.]]) |
| |
| model = tf.keras.models.Sequential([ |
| tf.keras.layers.Dropout(0.2), |
| tf.keras.layers.Dense(1), |
| ]) |
| model.compile(optimizer='sgd', loss='mean_squared_error') |
| model.fit(x, y, epochs=1) |
| |
| save_dir = os.path.join(self.get_temp_dir(), 'saved_model') |
| save(model, save_dir) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_saved_model(save_dir) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = model.predict(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value, actual_value) |
| |
| @test_util.run_v2_only |
| def testGraphDebugInfo(self): |
| """Test a SavedModel has debug info captured.""" |
| input_data = tf.constant(1., shape=[1]) |
| root = tracking.AutoTrackable() |
| root.f = tf.function(lambda x: 2. * x) |
| to_save = root.f.get_concrete_function(input_data) |
| options = save_options.SaveOptions(save_debug_info=True) |
| save_dir = os.path.join(self.get_temp_dir(), 'saved_model') |
| save(root, save_dir, to_save, options) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_saved_model(save_dir) |
| converter.convert() |
| self._assertValidDebugInfo(converter._debug_info) |
| |
| @test_util.run_v2_only |
| def testFallbackPath(self): |
| """Test a SavedModel fallback path using old converter.""" |
| saved_model_dir = self._createV1SavedModel(shape=[1, 16, 16, 3]) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_saved_model(saved_model_dir) |
| converter.experimental_new_converter = False |
| tflite_model = converter.convert() |
| |
| self.assertTrue(tflite_model) |
| |
| @test_util.run_v2_only |
| def testNonStatefulConvLSTM2D(self): |
| """Test saved model with non stateful ConvLSTM2D keras layer.""" |
| # Create keras model |
| model = tf.keras.Sequential([ |
| tf.keras.layers.ConvLSTM2D( |
| 32, (3, 3), |
| padding='same', |
| return_sequences=True, |
| stateful=False, |
| batch_input_shape=(1, 1, 10, 10, 1)) |
| ]) |
| model.compile() |
| |
| # Export the keras model to saved model. |
| saved_model_dir = os.path.join(self.get_temp_dir(), 'conv_lstm_2d') |
| model.save(saved_model_dir, save_format='tf', include_optimizer=False) |
| |
| converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir) |
| converter.target_spec.supported_ops = [ |
| tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS |
| ] |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| |
| class FromKerasModelTest(lite_v2_test_util.ModelTest): |
| |
| @test_util.run_v2_only |
| def testSequentialModel(self): |
| """Test a simple sequential tf.Keras model.""" |
| input_data = tf.constant(1., shape=[1, 1]) |
| |
| # Create a simple Keras model. |
| x = np.array([[1.], [2.]]) |
| y = np.array([[2.], [4.]]) |
| |
| model = tf.keras.models.Sequential([ |
| tf.keras.layers.Dropout(0.2), |
| tf.keras.layers.Dense(units=1, input_shape=[1]) |
| ]) |
| model.compile(optimizer='sgd', loss='mean_squared_error') |
| model.fit(x, y, epochs=1) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_keras_model(model) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = model.predict(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| self.assertEqual(expected_value, actual_value) |
| |
| @test_util.run_v2_only |
| def testSequentialMultiInputOutputModel(self): |
| """Test a tf.Keras model with multiple inputs and outputs.""" |
| left_input_data = tf.constant(1., shape=[1, 3]) |
| right_input_data = tf.constant(1., shape=[1, 3]) |
| |
| # Create a simple Keras model. |
| input_a_np = np.random.random((10, 3)) |
| input_b_np = np.random.random((10, 3)) |
| output_c_np = np.random.random((10, 3)) |
| output_d_np = np.random.random((10, 2)) |
| |
| input_a = tf.keras.layers.Input(shape=(3,), name='input_a') |
| input_b = tf.keras.layers.Input(shape=(3,), name='input_b') |
| |
| dense = tf.keras.layers.Dense(8, name='dense_1') |
| interm_a = dense(input_a) |
| interm_b = dense(input_b) |
| merged = tf.keras.layers.concatenate([interm_a, interm_b], name='merge') |
| |
| output_c = tf.keras.layers.Dense( |
| 3, activation='softmax', name='dense_2')( |
| merged) |
| output_d = tf.keras.layers.Dense( |
| 2, activation='softmax', name='dense_3')( |
| merged) |
| |
| model = tf.keras.models.Model( |
| inputs=[input_a, input_b], outputs=[output_c, output_d]) |
| model.compile(optimizer='sgd', loss='mean_squared_error') |
| model.fit([input_a_np, input_b_np], [output_c_np, output_d_np], epochs=1) |
| |
| # Convert model and ensure model is not None. |
| converter = lite.TFLiteConverterV2.from_keras_model(model) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| input_data = [left_input_data, right_input_data] |
| expected_value = model.predict(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, input_data) |
| for tf_result, tflite_result in zip(expected_value, actual_value): |
| self.assertAllClose(tf_result, tflite_result, atol=1e-05) |
| |
| @test_util.run_v2_only |
| def testGraphDebugInfo(self): |
| """Test a tf.Keras model has debug info captured.""" |
| # Create a simple Keras model. |
| x = [-1, 0, 1, 2, 3, 4] |
| y = [-3, -1, 1, 3, 5, 7] |
| model = tf.keras.models.Sequential( |
| [tf.keras.layers.Dense(units=1, input_shape=[1])]) |
| model.compile(optimizer='sgd', loss='mean_squared_error') |
| model.fit(x, y, epochs=1) |
| converter = lite.TFLiteConverterV2.from_keras_model(model) |
| converter.convert() |
| self._assertValidDebugInfo(converter._debug_info) |
| |
| @test_util.run_v2_only |
| def testKerasFallbackPath(self): |
| """Test keras model which failed when exporting to the saved model.""" |
| input_data = tf.constant( |
| np.array(np.random.random_sample((20)), dtype=np.float32)) |
| |
| class Model(tf.keras.Model): |
| |
| def __init__(self): |
| super(Model, self).__init__() |
| # A None name will cause a failure in exporting to a saved model. |
| self.shared_weights = self.add_weight( |
| name=None, |
| shape=(20, 1), |
| dtype=tf.float32, |
| initializer=tf.random_normal_initializer( |
| mean=0.0, stddev=300**(-0.5))) |
| |
| def call(self, x): |
| return tf.add(self.shared_weights, x) |
| |
| # Building the model. |
| model = Model() |
| model.compile(optimizer='sgd', loss='mean_squared_error') |
| model.fit(input_data, input_data, epochs=1) |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_keras_model(model) |
| tflite_model = converter.convert() |
| self.assertTrue(tflite_model) |
| |
| |
| class ControlFlowTest(lite_v2_test_util.ModelTest): |
| |
| @test_util.run_v2_only |
| def testCond(self): |
| input_data = { |
| 'x': tf.constant([1., 2.], shape=[1, 2]), |
| 'b': tf.constant(True) |
| } |
| |
| weights = tf.Variable([[0.1, 0.2], [0.3, 0.4]], dtype=tf.float32) |
| |
| def true_fn(x): |
| return tf.matmul(x, weights) |
| |
| def false_fn(x): |
| return tf.add(x, weights) |
| |
| @tf.function(input_signature=[ |
| tf.TensorSpec(shape=[1, 2], dtype=tf.float32), |
| tf.TensorSpec(shape=(), dtype=tf.bool) |
| ]) |
| def model(x, b): |
| return tf.cond( |
| b, true_fn=lambda: true_fn(x), false_fn=lambda: false_fn(x)) |
| |
| concrete_func = model.get_concrete_function() |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = concrete_func(**input_data) |
| actual_value = self._evaluateTFLiteModel( |
| tflite_model, [input_data['x'], input_data['b']])[0] |
| self.assertAllClose(expected_value, actual_value) |
| |
| @test_util.run_v2_only |
| def testStaticRnn(self): |
| input_data = tf.constant( |
| np.array(np.random.random_sample((3, 10)), dtype=np.float32)) |
| |
| cell = tf.compat.v1.nn.rnn_cell.LSTMCell(10) |
| |
| @tf.function( |
| input_signature=[tf.TensorSpec(shape=[3, 10], dtype=tf.float32)]) |
| def model(x): |
| seq = tf.split(x, 3, 0) |
| return tf.compat.v1.nn.static_rnn( |
| cell, seq, dtype=tf.float32, sequence_length=[1]) |
| |
| concrete_func = model.get_concrete_function() |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = concrete_func(input_data)[0] |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| for expected, actual in zip(expected_value, actual_value): |
| self.assertAllClose(expected, actual) |
| |
| @test_util.run_v2_only |
| def testWhileLoop(self): |
| input_data = tf.constant([1., 2., 3., 4.], shape=[2, 2]) |
| |
| weights = tf.Variable([[0.1, 0.2], [0.3, 0.4]], dtype=tf.float32) |
| |
| def condition(x): |
| return tf.reduce_sum(x) < 100 |
| |
| def body(x): |
| return tf.add(x, weights) |
| |
| @tf.function( |
| input_signature=[tf.TensorSpec(shape=[2, 2], dtype=tf.float32)]) |
| def model(x): |
| return tf.while_loop(condition, body, [x]) |
| |
| concrete_func = model.get_concrete_function() |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = concrete_func(input_data)[0] |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])[0] |
| self.assertAllClose(expected_value, actual_value) |
| |
| @test_util.run_v2_only |
| def testDynamicRnn(self): |
| input_data = tf.constant( |
| np.array(np.random.random_sample((3, 10, 10)), dtype=np.float32)) |
| |
| cell = tf.compat.v1.nn.rnn_cell.LSTMCell(10) |
| |
| @tf.function( |
| input_signature=[tf.TensorSpec(shape=[3, 10, 10], dtype=tf.float32)]) |
| def model(x): |
| return tf.compat.v1.nn.dynamic_rnn(cell, x, dtype=tf.float32) |
| |
| concrete_func = model.get_concrete_function() |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = concrete_func(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data]) |
| for expected, actual in zip(expected_value, actual_value): |
| if not isinstance(expected, ops.EagerTensor): |
| expected = expected.c |
| self.assertAllClose(expected, actual) |
| |
| @parameterized.named_parameters(('LSTM', tf.keras.layers.LSTM), |
| ('SimpleRNN', tf.keras.layers.SimpleRNN), |
| ('GRU', tf.keras.layers.GRU)) |
| @test_util.run_v2_only |
| def testKerasRNN(self, rnn_layer): |
| # This relies on TFLiteConverter to rewrite unknown batch size to 1. The |
| # model will fail if resizing the input to non-1 batch size. |
| input_data = tf.constant( |
| np.array(np.random.random_sample((1, 10, 10)), dtype=np.float32)) |
| rnn_obj = rnn_layer(units=10, input_shape=(10, 10)) |
| model = tf.keras.models.Sequential([ |
| tf.keras.layers.Input(batch_size=1, shape=(10, 10), name='input'), |
| rnn_obj, |
| ]) |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_keras_model(model) |
| tflite_model = converter.convert() |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])[0] |
| |
| # Check values from converted model. |
| expected_value = model.predict(input_data) |
| self.assertAllClose(expected_value, actual_value, atol=1e-05) |
| |
| @parameterized.named_parameters(('LSTM', tf.keras.layers.LSTM), |
| ('SimpleRNN', tf.keras.layers.SimpleRNN), |
| ('GRU', tf.keras.layers.GRU)) |
| @test_util.run_v2_only |
| def testKerasRNNMultiBatches(self, rnn_layer): |
| input_data = tf.constant( |
| np.array(np.random.random_sample((4, 10, 10)), dtype=np.float32)) |
| # Specify a fixed batch size(4) for the test model. |
| x = tf.keras.layers.Input(batch_shape=(4, 10, 10)) |
| y = rnn_layer(units=10, input_shape=(10, 10))(x) |
| model = tf.keras.Model(inputs=[x], outputs=[y]) |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_keras_model(model) |
| tflite_model = converter.convert() |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])[0] |
| |
| # Check values from converted model. |
| expected_value = model.predict(input_data) |
| self.assertAllClose(expected_value, actual_value, atol=1e-05) |
| |
| @test_util.run_v2_only |
| def testKerasBidirectionalRNNReturnSequence(self): |
| input_data = tf.constant( |
| np.array(np.random.random_sample((1, 10, 10)), dtype=np.float32)) |
| model = tf.keras.models.Sequential() |
| model.add(tf.keras.layers.Input(batch_size=1, shape=(10, 10), name='input')) |
| model.add( |
| tf.keras.layers.Bidirectional( |
| tf.keras.layers.LSTM(units=10, return_sequences=True), |
| input_shape=(10, 10))) |
| model.add(tf.keras.layers.Flatten()) |
| model.add(tf.keras.layers.Dense(5)) |
| model.add(tf.keras.layers.Activation('softmax')) |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_keras_model(model) |
| tflite_model = converter.convert() |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])[0] |
| |
| # Check values from converted model. |
| expected_value = model.predict(input_data) |
| self.assertAllClose(expected_value, actual_value, atol=1e-05) |
| |
| @test_util.run_v2_only |
| def testKerasBidirectionalRNN(self): |
| input_data = tf.constant( |
| np.array(np.random.random_sample((1, 10, 10)), dtype=np.float32)) |
| model = tf.keras.models.Sequential() |
| model.add(tf.keras.layers.Input(batch_size=1, shape=(10, 10), name='input')) |
| model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=10))) |
| model.add(tf.keras.layers.Dense(5)) |
| model.add(tf.keras.layers.Activation('softmax')) |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_keras_model(model) |
| tflite_model = converter.convert() |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])[0] |
| |
| # Check values from converted model. |
| expected_value = model.predict(input_data) |
| self.assertAllClose(expected_value, actual_value, atol=1e-05) |
| |
| |
| class GrapplerTest(lite_v2_test_util.ModelTest): |
| |
| @test_util.run_v2_only |
| def testConstantFolding(self): |
| # Constant folding handles the tf.broadcast_to operation which was not |
| # supported by the TFLite at the time this test was added. |
| input_data = tf.constant([1., 2., 3., 4., 5., 6., 7., 8., 9.], shape=[3, 3]) |
| |
| @tf.function |
| def func(x): |
| y_const = tf.constant([1., 2., 3.]) |
| y_broadcast = tf.broadcast_to(y_const, [3, 3]) |
| return tf.matmul(x, y_broadcast) |
| |
| root = tracking.AutoTrackable() |
| root.f = func |
| concrete_func = root.f.get_concrete_function(input_data) |
| |
| # Convert model. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = root.f(input_data) |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])[0] |
| self.assertAllClose(expected_value, actual_value) |
| |
| # Enable hybrid quantization, same result |
| converter.optimizations = [lite.Optimize.DEFAULT] |
| tflite_model = converter.convert() |
| actual_value = self._evaluateTFLiteModel(tflite_model, [input_data])[0] |
| self.assertAllClose(expected_value, actual_value) |
| |
| |
| class UnknownShapes(lite_v2_test_util.ModelTest): |
| |
| @test_util.run_v2_only |
| def testMatMul(self): |
| input_data = tf.constant( |
| np.array(np.random.random_sample((10, 4)), dtype=np.float32)) |
| |
| @tf.function( |
| input_signature=[tf.TensorSpec(shape=[None, 4], dtype=tf.float32)]) |
| def model(in_tensor): |
| shape = tf.shape(in_tensor) |
| fill = tf.transpose(tf.fill(shape, 1.)) |
| return tf.matmul(fill, in_tensor) |
| |
| concrete_func = model.get_concrete_function() |
| |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = concrete_func(input_data) |
| actual_value = self._evaluateTFLiteModel( |
| tflite_model, [input_data], input_shapes=[([-1, 4], [10, 4])])[0] |
| self.assertAllClose(expected_value, actual_value, atol=1e-06) |
| |
| def _getIntegerQuantizeModelWithUnknownShapes(self): |
| np.random.seed(0) |
| |
| @tf.function( |
| input_signature=[tf.TensorSpec(shape=[None, 33], dtype=tf.float32)]) |
| def model(input_tensor): |
| """Define a model with tf.MatMul and unknown shapes.""" |
| # We need the tensor to have more than 1024 elements for quantize_weights |
| # to kick in. Thus, the [33, 33] shape. |
| const_tensor = tf.constant( |
| np.random.uniform(low=-10., high=10., size=[33, 33]), |
| shape=[33, 33], |
| dtype=tf.float32, |
| name='inputB') |
| |
| shape = tf.shape(input_tensor) |
| fill = tf.transpose(tf.fill(shape, 1.)) |
| mult = tf.matmul(fill, input_tensor) |
| return tf.matmul(mult, const_tensor) |
| |
| root = tracking.AutoTrackable() |
| root.f = model |
| concrete_func = root.f.get_concrete_function() |
| |
| def calibration_gen(): |
| for batch in range(5, 20, 5): |
| for _ in range(5): |
| yield [np.random.uniform(-1, 1, size=(batch, 33)).astype(np.float32)] |
| |
| return concrete_func, calibration_gen |
| |
| @test_util.run_v2_only |
| def testMatMulQuantize(self): |
| concrete_func, _ = self._getIntegerQuantizeModelWithUnknownShapes() |
| float_converter = lite.TFLiteConverterV2.from_concrete_functions( |
| [concrete_func]) |
| float_tflite_model = float_converter.convert() |
| |
| quantized_converter = lite.TFLiteConverterV2.from_concrete_functions( |
| [concrete_func]) |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_tflite_model = quantized_converter.convert() |
| |
| # The default input and output types should be float. |
| quantized_interpreter = Interpreter(model_content=quantized_tflite_model) |
| quantized_interpreter.allocate_tensors() |
| input_details = quantized_interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertAllEqual([-1, 33], input_details[0]['shape_signature']) |
| |
| # Ensure that the quantized weights tflite model is smaller. |
| self.assertLess(len(quantized_tflite_model), len(float_tflite_model)) |
| |
| @test_util.run_v2_only |
| def testMatMulCalibrateAndQuantize(self): |
| concrete_func, calibration_gen = \ |
| self._getIntegerQuantizeModelWithUnknownShapes() |
| float_converter = lite.TFLiteConverterV2.from_concrete_functions( |
| [concrete_func]) |
| float_tflite_model = float_converter.convert() |
| |
| quantized_converter = lite.TFLiteConverterV2.from_concrete_functions( |
| [concrete_func]) |
| quantized_converter.optimizations = [lite.Optimize.DEFAULT] |
| quantized_converter.representative_dataset = calibration_gen |
| quantized_tflite_model = quantized_converter.convert() |
| |
| # The default input and output types should be float. |
| quantized_interpreter = Interpreter(model_content=quantized_tflite_model) |
| quantized_interpreter.allocate_tensors() |
| input_details = quantized_interpreter.get_input_details() |
| self.assertLen(input_details, 1) |
| self.assertEqual(np.float32, input_details[0]['dtype']) |
| self.assertAllEqual([-1, 33], input_details[0]['shape_signature']) |
| |
| # Ensure that the quantized weights tflite model is smaller. |
| self.assertLess(len(quantized_tflite_model), len(float_tflite_model)) |
| |
| def testBatchMatMul(self): |
| input_data_1 = tf.constant( |
| np.array(np.random.random_sample((1, 256, 256)), dtype=np.float32)) |
| input_data_2 = tf.constant( |
| np.array(np.random.random_sample((1, 256, 256)), dtype=np.float32)) |
| |
| @tf.function(input_signature=[ |
| tf.TensorSpec(shape=[None, 256, 256], dtype=tf.float32), |
| tf.TensorSpec(shape=[None, 256, 256], dtype=tf.float32) |
| ]) |
| def model(in_tensor_1, in_tensor_2): |
| return tf.matmul(in_tensor_1, in_tensor_2) |
| |
| concrete_func = model.get_concrete_function() |
| |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| tflite_model = converter.convert() |
| |
| # Check values from converted model. |
| expected_value = concrete_func(input_data_1, input_data_2) |
| actual_value = self._evaluateTFLiteModel( |
| tflite_model, [input_data_1, input_data_2], |
| input_shapes=[([-1, 256, 256], [1, 256, 256])])[0] |
| self.assertAllClose(expected_value, actual_value, atol=4) |
| |
| def testSizeInvalid(self): |
| |
| @tf.function(input_signature=[ |
| tf.TensorSpec(shape=[1, None, 16, 3], dtype=tf.float32) |
| ]) |
| def model(in_tensor): |
| return in_tensor + in_tensor |
| |
| concrete_func = model.get_concrete_function() |
| |
| # Test invalid shape. None after 1st dimension. Run with TOCO in order to |
| # invoke shape checking code. |
| converter = lite.TFLiteConverterV2.from_concrete_functions([concrete_func]) |
| converter.experimental_new_converter = False |
| with self.assertRaises(ValueError) as error: |
| converter.convert() |
| self.assertEqual( |
| 'None is only supported in the 1st dimension. Tensor ' |
| '\'in_tensor\' has invalid shape \'[1, None, 16, 3]\'.', |
| str(error.exception)) |
| |
| |
| if __name__ == '__main__': |
| test.main() |