| # Copyright 2018 The TensorFlow Authors. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============================================================================== |
| # pylint: disable=protected-access |
| """Tests for saving/loading function for keras Model.""" |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import os |
| import shutil |
| |
| from absl.testing import parameterized |
| import numpy as np |
| |
| from tensorflow.python import keras |
| from tensorflow.python import tf2 |
| from tensorflow.python.client import session |
| from tensorflow.python.eager import context |
| from tensorflow.python.framework import dtypes |
| from tensorflow.python.framework import ops |
| from tensorflow.python.framework import tensor_spec |
| from tensorflow.python.framework import test_util |
| from tensorflow.python.keras import keras_parameterized |
| from tensorflow.python.keras import testing_utils |
| from tensorflow.python.keras.engine import training as model_lib |
| from tensorflow.python.keras.optimizer_v2 import adadelta |
| from tensorflow.python.keras.optimizer_v2 import rmsprop |
| from tensorflow.python.keras.saving import saved_model_experimental as keras_saved_model |
| from tensorflow.python.keras.utils import mode_keys |
| from tensorflow.python.keras.utils import tf_utils |
| from tensorflow.python.ops import array_ops |
| from tensorflow.python.platform import test |
| from tensorflow.python.saved_model import loader_impl |
| from tensorflow.python.saved_model import model_utils |
| from tensorflow.python.training import training as training_module |
| |
| |
| @keras_parameterized.run_all_keras_modes() |
| class TestModelSavingandLoading(parameterized.TestCase, test.TestCase): |
| |
| def _save_model_dir(self, dirname='saved_model'): |
| temp_dir = self.get_temp_dir() |
| self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True) |
| return os.path.join(temp_dir, dirname) |
| |
| def test_saving_sequential_model(self): |
| with self.cached_session(): |
| model = keras.models.Sequential() |
| model.add(keras.layers.Dense(2, input_shape=(3,))) |
| model.add(keras.layers.RepeatVector(3)) |
| model.add(keras.layers.TimeDistributed(keras.layers.Dense(3))) |
| model.compile( |
| loss=keras.losses.MSE, |
| optimizer=rmsprop.RMSprop(lr=0.0001), |
| metrics=[keras.metrics.categorical_accuracy], |
| sample_weight_mode='temporal', |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| x = np.random.random((1, 3)) |
| y = np.random.random((1, 3, 3)) |
| model.train_on_batch(x, y) |
| |
| ref_y = model.predict(x) |
| |
| saved_model_dir = self._save_model_dir() |
| keras_saved_model.export_saved_model(model, saved_model_dir) |
| |
| loaded_model = keras_saved_model.load_from_saved_model(saved_model_dir) |
| y = loaded_model.predict(x) |
| self.assertAllClose(ref_y, y, atol=1e-05) |
| |
| @test_util.run_in_graph_and_eager_modes |
| def test_saving_sequential_model_without_compile(self): |
| with self.cached_session(): |
| model = keras.models.Sequential() |
| model.add(keras.layers.Dense(2, input_shape=(3,))) |
| model.add(keras.layers.RepeatVector(3)) |
| model.add(keras.layers.TimeDistributed(keras.layers.Dense(3))) |
| |
| x = np.random.random((1, 3)) |
| ref_y = model.predict(x) |
| |
| saved_model_dir = self._save_model_dir() |
| keras_saved_model.export_saved_model(model, saved_model_dir) |
| loaded_model = keras_saved_model.load_from_saved_model(saved_model_dir) |
| |
| y = loaded_model.predict(x) |
| self.assertAllClose(ref_y, y, atol=1e-05) |
| |
| def test_saving_functional_model(self): |
| with self.cached_session(): |
| inputs = keras.layers.Input(shape=(3,)) |
| x = keras.layers.Dense(2)(inputs) |
| output = keras.layers.Dense(3)(x) |
| |
| model = keras.models.Model(inputs, output) |
| model.compile( |
| loss=keras.losses.MSE, |
| optimizer=rmsprop.RMSprop(lr=0.0001), |
| metrics=[keras.metrics.categorical_accuracy], |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| x = np.random.random((1, 3)) |
| y = np.random.random((1, 3)) |
| model.train_on_batch(x, y) |
| |
| ref_y = model.predict(x) |
| |
| saved_model_dir = self._save_model_dir() |
| keras_saved_model.export_saved_model(model, saved_model_dir) |
| loaded_model = keras_saved_model.load_from_saved_model(saved_model_dir) |
| |
| y = loaded_model.predict(x) |
| self.assertAllClose(ref_y, y, atol=1e-05) |
| |
| @test_util.run_in_graph_and_eager_modes |
| def test_saving_functional_model_without_compile(self): |
| with self.cached_session(): |
| inputs = keras.layers.Input(shape=(3,)) |
| x = keras.layers.Dense(2)(inputs) |
| output = keras.layers.Dense(3)(x) |
| |
| model = keras.models.Model(inputs, output) |
| |
| x = np.random.random((1, 3)) |
| y = np.random.random((1, 3)) |
| |
| ref_y = model.predict(x) |
| |
| saved_model_dir = self._save_model_dir() |
| keras_saved_model.export_saved_model(model, saved_model_dir) |
| loaded_model = keras_saved_model.load_from_saved_model(saved_model_dir) |
| |
| y = loaded_model.predict(x) |
| self.assertAllClose(ref_y, y, atol=1e-05) |
| |
| @test_util.run_in_graph_and_eager_modes |
| def test_saving_with_tf_optimizer(self): |
| model = keras.models.Sequential() |
| model.add(keras.layers.Dense(2, input_shape=(3,))) |
| model.add(keras.layers.Dense(3)) |
| model.compile( |
| loss='mse', |
| optimizer=training_module.RMSPropOptimizer(0.1), |
| metrics=['acc']) |
| |
| x = np.random.random((1, 3)) |
| y = np.random.random((1, 3)) |
| model.train_on_batch(x, y) |
| ref_y = model.predict(x) |
| |
| saved_model_dir = self._save_model_dir() |
| keras_saved_model.export_saved_model(model, saved_model_dir) |
| loaded_model = keras_saved_model.load_from_saved_model(saved_model_dir) |
| loaded_model.compile( |
| loss='mse', |
| optimizer=training_module.RMSPropOptimizer(0.1), |
| metrics=['acc'], |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| y = loaded_model.predict(x) |
| self.assertAllClose(ref_y, y, atol=1e-05) |
| |
| # test that new updates are the same with both models |
| x = np.random.random((1, 3)) |
| y = np.random.random((1, 3)) |
| |
| ref_loss = model.train_on_batch(x, y) |
| loss = loaded_model.train_on_batch(x, y) |
| self.assertAllClose(ref_loss, loss, atol=1e-05) |
| |
| ref_y = model.predict(x) |
| y = loaded_model.predict(x) |
| self.assertAllClose(ref_y, y, atol=1e-05) |
| |
| # test saving/loading again |
| saved_model_dir2 = self._save_model_dir('saved_model_2') |
| keras_saved_model.export_saved_model(loaded_model, saved_model_dir2) |
| loaded_model = keras_saved_model.load_from_saved_model(saved_model_dir2) |
| y = loaded_model.predict(x) |
| self.assertAllClose(ref_y, y, atol=1e-05) |
| |
| def test_saving_subclassed_model_raise_error(self): |
| # For now, saving subclassed model should raise an error. It should be |
| # avoided later with loading from SavedModel.pb. |
| |
| class SubclassedModel(model_lib.Model): |
| |
| def __init__(self): |
| super(SubclassedModel, self).__init__() |
| self.layer1 = keras.layers.Dense(3) |
| self.layer2 = keras.layers.Dense(1) |
| |
| def call(self, inp): |
| return self.layer2(self.layer1(inp)) |
| |
| model = SubclassedModel() |
| |
| saved_model_dir = self._save_model_dir() |
| with self.assertRaises(NotImplementedError): |
| keras_saved_model.export_saved_model(model, saved_model_dir) |
| |
| |
| class LayerWithLearningPhase(keras.engine.base_layer.Layer): |
| |
| def build(self, input_shape): |
| self.input_spec = keras.layers.InputSpec(shape=[None] * len(input_shape)) |
| self.built = True |
| |
| def call(self, x, training=None): |
| if training is None: |
| training = keras.backend.learning_phase() |
| output = tf_utils.smart_cond( |
| training, lambda: x * 0, lambda: array_ops.identity(x)) |
| if not context.executing_eagerly(): |
| output._uses_learning_phase = True # pylint: disable=protected-access |
| return output |
| |
| def compute_output_shape(self, input_shape): |
| return input_shape |
| |
| |
| def functional_model(uses_learning_phase=True): |
| inputs = keras.layers.Input(shape=(3,)) |
| x = keras.layers.Dense(2)(inputs) |
| x = keras.layers.Dense(3)(x) |
| if uses_learning_phase: |
| x = LayerWithLearningPhase()(x) |
| return keras.models.Model(inputs, x) |
| |
| |
| def sequential_model(uses_learning_phase=True): |
| model = keras.models.Sequential() |
| model.add(keras.layers.Dense(2, input_shape=(3,))) |
| model.add(keras.layers.Dense(3)) |
| if uses_learning_phase: |
| model.add(LayerWithLearningPhase()) |
| return model |
| |
| |
| def sequential_model_without_input_shape(uses_learning_phase=True): |
| model = keras.models.Sequential() |
| model.add(keras.layers.Dense(2)) |
| model.add(keras.layers.Dense(3)) |
| if uses_learning_phase: |
| model.add(LayerWithLearningPhase()) |
| return model |
| |
| |
| class Subclassed(keras.models.Model): |
| |
| def __init__(self): |
| super(Subclassed, self).__init__() |
| self.dense1 = keras.layers.Dense(2) |
| self.dense2 = keras.layers.Dense(3) |
| |
| def call(self, inputs): |
| x = self.dense1(inputs) |
| x = self.dense2(x) |
| return x |
| |
| |
| def subclassed_model(): |
| return Subclassed() |
| |
| |
| def load_model(sess, path, mode): |
| tags = model_utils.EXPORT_TAG_MAP[mode] |
| sig_def_key = model_utils.SIGNATURE_KEY_MAP[mode] |
| |
| meta_graph_def = loader_impl.load(sess, tags, path) |
| inputs = { |
| k: sess.graph.get_tensor_by_name(v.name) |
| for k, v in meta_graph_def.signature_def[sig_def_key].inputs.items()} |
| outputs = { |
| k: sess.graph.get_tensor_by_name(v.name) |
| for k, v in meta_graph_def.signature_def[sig_def_key].outputs.items()} |
| return inputs, outputs, meta_graph_def |
| |
| |
| @test_util.run_all_in_graph_and_eager_modes |
| class TestModelSavedModelExport(test.TestCase, parameterized.TestCase): |
| |
| def _save_model_dir(self, dirname='saved_model'): |
| temp_dir = self.get_temp_dir() |
| self.addCleanup(shutil.rmtree, temp_dir, ignore_errors=True) |
| return os.path.join(temp_dir, dirname) |
| |
| @parameterized.parameters( |
| { |
| 'model_builder': functional_model, |
| 'uses_learning_phase': True, |
| 'optimizer_cls': adadelta.Adadelta, |
| 'train_before_export': True}, |
| { |
| 'model_builder': functional_model, |
| 'uses_learning_phase': True, |
| 'optimizer_cls': training_module.AdadeltaOptimizer, |
| 'train_before_export': False}, |
| { |
| 'model_builder': functional_model, |
| 'uses_learning_phase': False, |
| 'optimizer_cls': None, |
| 'train_before_export': False}, |
| { |
| 'model_builder': sequential_model, |
| 'uses_learning_phase': True, |
| 'optimizer_cls': training_module.AdadeltaOptimizer, |
| 'train_before_export': True}, |
| { |
| 'model_builder': sequential_model, |
| 'uses_learning_phase': True, |
| 'optimizer_cls': adadelta.Adadelta, |
| 'train_before_export': False}, |
| { |
| 'model_builder': sequential_model, |
| 'uses_learning_phase': False, |
| 'optimizer_cls': None, |
| 'train_before_export': False}, |
| { |
| 'model_builder': sequential_model_without_input_shape, |
| 'uses_learning_phase': True, |
| 'optimizer_cls': training_module.AdadeltaOptimizer, |
| 'train_before_export': False}) |
| def testSaveAndLoadSavedModelExport( |
| self, model_builder, uses_learning_phase, optimizer_cls, |
| train_before_export): |
| optimizer = None if optimizer_cls is None else optimizer_cls() |
| |
| saved_model_dir = self._save_model_dir() |
| |
| np.random.seed(130) |
| input_arr = np.random.random((1, 3)) |
| target_arr = np.random.random((1, 3)) |
| |
| model = model_builder(uses_learning_phase) |
| if optimizer is not None: |
| model.compile( |
| loss='mse', |
| optimizer=optimizer, |
| metrics=['mae']) |
| if train_before_export: |
| model.train_on_batch(input_arr, target_arr) |
| |
| ref_loss, ref_mae = model.evaluate(input_arr, target_arr) |
| |
| ref_predict = model.predict(input_arr) |
| |
| # Export SavedModel |
| keras_saved_model.export_saved_model(model, saved_model_dir) |
| |
| input_name = model.input_names[0] |
| output_name = model.output_names[0] |
| target_name = output_name + '_target' |
| |
| # Load predict graph, and test predictions |
| with session.Session(graph=ops.Graph()) as sess: |
| inputs, outputs, _ = load_model(sess, saved_model_dir, |
| mode_keys.ModeKeys.PREDICT) |
| |
| predictions = sess.run(outputs[output_name], |
| {inputs[input_name]: input_arr}) |
| self.assertAllClose(ref_predict, predictions, atol=1e-05) |
| |
| if optimizer: |
| # Load eval graph, and test predictions, loss and metric values |
| with session.Session(graph=ops.Graph()) as sess: |
| inputs, outputs, _ = load_model(sess, saved_model_dir, |
| mode_keys.ModeKeys.TEST) |
| |
| # First obtain the loss and predictions, and run the metric update op by |
| # feeding in the inputs and targets. |
| metrics_name = 'mae' if tf2.enabled() else 'mean_absolute_error' |
| metrics_update_op_key = 'metrics/' + metrics_name + '/update_op' |
| metrics_value_op_key = 'metrics/' + metrics_name + '/value' |
| |
| loss, predictions, _ = sess.run( |
| (outputs['loss'], outputs['predictions/' + output_name], |
| outputs[metrics_update_op_key]), { |
| inputs[input_name]: input_arr, |
| inputs[target_name]: target_arr |
| }) |
| |
| # The metric value should be run after the update op, to ensure that it |
| # reflects the correct value. |
| metric_value = sess.run(outputs[metrics_value_op_key]) |
| |
| self.assertEqual(int(train_before_export), |
| sess.run(training_module.get_global_step())) |
| self.assertAllClose(ref_loss, loss, atol=1e-05) |
| self.assertAllClose(ref_mae, metric_value, atol=1e-05) |
| self.assertAllClose(ref_predict, predictions, atol=1e-05) |
| |
| # Load train graph, and check for the train op, and prediction values |
| with session.Session(graph=ops.Graph()) as sess: |
| inputs, outputs, meta_graph_def = load_model( |
| sess, saved_model_dir, mode_keys.ModeKeys.TRAIN) |
| self.assertEqual(int(train_before_export), |
| sess.run(training_module.get_global_step())) |
| self.assertIn('loss', outputs) |
| self.assertIn(metrics_update_op_key, outputs) |
| self.assertIn(metrics_value_op_key, outputs) |
| self.assertIn('predictions/' + output_name, outputs) |
| |
| # Train for a step |
| train_op = loader_impl.get_train_op(meta_graph_def) |
| train_outputs, _ = sess.run( |
| [outputs, train_op], {inputs[input_name]: input_arr, |
| inputs[target_name]: target_arr}) |
| self.assertEqual(int(train_before_export) + 1, |
| sess.run(training_module.get_global_step())) |
| |
| if uses_learning_phase: |
| self.assertAllClose( |
| [[0, 0, 0]], train_outputs['predictions/' + output_name], |
| atol=1e-05) |
| else: |
| self.assertNotAllClose( |
| [[0, 0, 0]], train_outputs['predictions/' + output_name], |
| atol=1e-05) |
| |
| def testSaveAndLoadSavedModelWithCustomObject(self): |
| saved_model_dir = self._save_model_dir() |
| with session.Session(graph=ops.Graph()) as sess: |
| def relu6(x): |
| return keras.backend.relu(x, max_value=6) |
| inputs = keras.layers.Input(shape=(1,)) |
| outputs = keras.layers.Activation(relu6)(inputs) |
| model = keras.models.Model(inputs, outputs) |
| keras_saved_model.export_saved_model( |
| model, saved_model_dir, custom_objects={'relu6': relu6}) |
| with session.Session(graph=ops.Graph()) as sess: |
| inputs, outputs, _ = load_model(sess, saved_model_dir, |
| mode_keys.ModeKeys.PREDICT) |
| input_name = model.input_names[0] |
| output_name = model.output_names[0] |
| predictions = sess.run( |
| outputs[output_name], {inputs[input_name]: [[7], [-3], [4]]}) |
| self.assertAllEqual([[6], [0], [4]], predictions) |
| |
| def testAssertModelCloneSameObjectsIgnoreOptimizer(self): |
| input_arr = np.random.random((1, 3)) |
| target_arr = np.random.random((1, 3)) |
| |
| model_graph = ops.Graph() |
| clone_graph = ops.Graph() |
| |
| # Create two models with the same layers but different optimizers. |
| with session.Session(graph=model_graph): |
| inputs = keras.layers.Input(shape=(3,)) |
| x = keras.layers.Dense(2)(inputs) |
| x = keras.layers.Dense(3)(x) |
| model = keras.models.Model(inputs, x) |
| |
| model.compile(loss='mse', optimizer=training_module.AdadeltaOptimizer()) |
| model.train_on_batch(input_arr, target_arr) |
| |
| with session.Session(graph=clone_graph): |
| inputs = keras.layers.Input(shape=(3,)) |
| x = keras.layers.Dense(2)(inputs) |
| x = keras.layers.Dense(3)(x) |
| clone = keras.models.Model(inputs, x) |
| clone.compile(loss='mse', optimizer=keras.optimizers.RMSprop(lr=0.0001)) |
| clone.train_on_batch(input_arr, target_arr) |
| |
| keras_saved_model._assert_same_non_optimizer_objects( |
| model, model_graph, clone, clone_graph) |
| |
| def testAssertModelCloneSameObjectsThrowError(self): |
| input_arr = np.random.random((1, 3)) |
| target_arr = np.random.random((1, 3)) |
| |
| model_graph = ops.Graph() |
| clone_graph = ops.Graph() |
| |
| # Create two models with the same layers but different optimizers. |
| with session.Session(graph=model_graph): |
| inputs = keras.layers.Input(shape=(3,)) |
| x = keras.layers.Dense(2)(inputs) |
| x = keras.layers.Dense(3)(x) |
| model = keras.models.Model(inputs, x) |
| |
| model.compile(loss='mse', optimizer=training_module.AdadeltaOptimizer()) |
| model.train_on_batch(input_arr, target_arr) |
| |
| with session.Session(graph=clone_graph): |
| inputs = keras.layers.Input(shape=(3,)) |
| x = keras.layers.Dense(2)(inputs) |
| x = keras.layers.Dense(4)(x) |
| x = keras.layers.Dense(3)(x) |
| clone = keras.models.Model(inputs, x) |
| clone.compile(loss='mse', optimizer=keras.optimizers.RMSprop(lr=0.0001)) |
| clone.train_on_batch(input_arr, target_arr) |
| |
| def testSaveSequentialModelWithoutInputShapes(self): |
| model = sequential_model_without_input_shape(True) |
| # A Sequential model that hasn't been built should raise an error. |
| with self.assertRaisesRegexp( |
| ValueError, 'Weights for sequential model have not yet been created'): |
| keras_saved_model.export_saved_model(model, '') |
| |
| # Even with input_signature, the model's weights has not been created. |
| with self.assertRaisesRegexp( |
| ValueError, 'Weights for sequential model have not yet been created'): |
| saved_model_dir = self._save_model_dir() |
| keras_saved_model.export_saved_model( |
| model, |
| saved_model_dir, |
| input_signature=tensor_spec.TensorSpec( |
| shape=(10, 11, 12, 13, 14), dtype=dtypes.float32, |
| name='spec_input')) |
| |
| @parameterized.parameters( |
| { |
| 'model_builder': sequential_model_without_input_shape, |
| 'input_signature': [tensor_spec.TensorSpec(shape=[None, 3], |
| dtype=dtypes.float32)]}, |
| { |
| 'model_builder': subclassed_model, |
| 'input_signature': [tensor_spec.TensorSpec(shape=[None, 3], |
| dtype=dtypes.float32)]}) |
| def testServingOnly(self, model_builder, input_signature): |
| if context.executing_eagerly(): |
| saved_model_dir = self._save_model_dir() |
| input_arr = np.random.random((5, 3)).astype(np.float32) |
| model = model_builder() |
| ref_predict = model.predict(input_arr) |
| |
| keras_saved_model.export_saved_model( |
| model, |
| saved_model_dir, |
| serving_only=True, |
| input_signature=input_signature) |
| |
| # Load predict graph, and test predictions |
| with session.Session(graph=ops.Graph()) as sess: |
| inputs, outputs, _ = load_model(sess, saved_model_dir, |
| mode_keys.ModeKeys.PREDICT) |
| predictions = sess.run(outputs[next(iter(outputs.keys()))], |
| {inputs[next(iter(inputs.keys()))]: input_arr}) |
| self.assertAllClose(ref_predict, predictions, atol=1e-05) |
| |
| |
| if __name__ == '__main__': |
| test.main() |