blob: d3e9eae6048523e36cb228b3b153b3d11f87b903 [file] [log] [blame]
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for saving utility functions."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import numpy as np
from tensorflow.python import keras
from tensorflow.python import tf2
from tensorflow.python.client import session as session_lib
from tensorflow.python.eager import backprop
from tensorflow.python.eager import context
from tensorflow.python.eager import def_function
from tensorflow.python.feature_column import feature_column_v2
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.framework import tensor_shape
from tensorflow.python.framework import tensor_spec
from tensorflow.python.framework import test_util
from tensorflow.python.keras import backend as K
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import testing_utils
from tensorflow.python.keras.engine import sequential
from tensorflow.python.keras.optimizer_v2 import gradient_descent
from tensorflow.python.keras.saving import saving_utils
from tensorflow.python.ops import array_ops
from tensorflow.python.platform import test
from tensorflow.python.saved_model import loader
from tensorflow.python.saved_model import save as save_lib
from tensorflow.python.saved_model import signature_constants
from tensorflow.python.saved_model import tag_constants
from tensorflow.python.training import rmsprop
class TraceModelCallTest(keras_parameterized.TestCase):
def _assert_all_close(self, expected, actual):
if not context.executing_eagerly():
with self.cached_session() as sess:
K._initialize_variables(sess)
self.assertAllClose(expected, actual)
else:
self.assertAllClose(expected, actual)
@keras_parameterized.run_with_all_model_types
@test_util.run_in_graph_and_eager_modes
def test_trace_model_outputs(self):
input_dim = 5 if testing_utils.get_model_type() == 'functional' else None
model = testing_utils.get_small_mlp(10, 3, input_dim)
inputs = array_ops.ones((8, 5))
if input_dim is None:
with self.assertRaisesRegexp(ValueError,
'input shapes have not been set'):
saving_utils.trace_model_call(model)
model._set_inputs(inputs)
fn = saving_utils.trace_model_call(model)
signature_outputs = fn(inputs)
expected_outputs = {model.output_names[0]: model(inputs)}
self._assert_all_close(expected_outputs, signature_outputs)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_trace_model_outputs_after_fitting(self):
input_dim = 5 if testing_utils.get_model_type() == 'functional' else None
model = testing_utils.get_small_mlp(10, 3, input_dim)
model.compile(
optimizer='sgd',
loss='mse',
run_eagerly=testing_utils.should_run_eagerly(),
run_distributed=testing_utils.should_run_distributed())
model.fit(x=np.random.random((8, 5)),
y=np.random.random((8, 3)), epochs=2)
inputs = array_ops.ones((8, 5))
fn = saving_utils.trace_model_call(model)
signature_outputs = fn(inputs)
expected_outputs = {model.output_names[0]: model(inputs)}
self._assert_all_close(expected_outputs, signature_outputs)
@keras_parameterized.run_with_all_model_types(exclude_models='sequential')
@keras_parameterized.run_all_keras_modes
def test_trace_multi_io_model_outputs(self):
input_dim = 5
num_classes = 3
num_classes_b = 4
input_a = keras.layers.Input(shape=(input_dim,), name='input_a')
input_b = keras.layers.Input(shape=(input_dim,), name='input_b')
dense = keras.layers.Dense(num_classes, name='dense')
dense2 = keras.layers.Dense(num_classes_b, name='dense2')
dropout = keras.layers.Dropout(0.5, name='dropout')
branch_a = [input_a, dense]
branch_b = [input_b, dense, dense2, dropout]
model = testing_utils.get_multi_io_model(branch_a, branch_b)
input_a_np = np.random.random((10, input_dim)).astype(np.float32)
input_b_np = np.random.random((10, input_dim)).astype(np.float32)
if testing_utils.get_model_type() == 'subclass':
with self.assertRaisesRegexp(ValueError,
'input shapes have not been set'):
saving_utils.trace_model_call(model)
model.compile(
optimizer='sgd',
loss='mse',
run_eagerly=testing_utils.should_run_eagerly(),
run_distributed=testing_utils.should_run_distributed())
model.fit(x=[np.random.random((8, input_dim)).astype(np.float32),
np.random.random((8, input_dim)).astype(np.float32)],
y=[np.random.random((8, num_classes)).astype(np.float32),
np.random.random((8, num_classes_b)).astype(np.float32)],
epochs=2)
fn = saving_utils.trace_model_call(model)
signature_outputs = fn([input_a_np, input_b_np])
outputs = model([input_a_np, input_b_np])
expected_outputs = {model.output_names[0]: outputs[0],
model.output_names[1]: outputs[1]}
self._assert_all_close(expected_outputs, signature_outputs)
@test_util.run_in_graph_and_eager_modes
def test_trace_features_layer(self):
columns = [feature_column_v2.numeric_column('x')]
model = sequential.Sequential(
[feature_column_v2.DenseFeatures(columns)])
model_input = {'x': constant_op.constant([[1.]])}
model.predict(model_input, steps=1)
fn = saving_utils.trace_model_call(model)
self.assertAllClose({'output_1': [[1.]]}, fn({'x': [[1.]]}))
columns = [feature_column_v2.numeric_column('x'),
feature_column_v2.numeric_column('y')]
model = sequential.Sequential(
[feature_column_v2.DenseFeatures(columns)])
model_input = {'x': constant_op.constant([[1.]]),
'y': constant_op.constant([[2.]])}
model.predict(model_input, steps=1)
fn = saving_utils.trace_model_call(model)
self.assertAllClose({'output_1': [[1., 2.]]},
fn({'x': [[1.]], 'y': [[2.]]}))
@test_util.run_in_graph_and_eager_modes
def test_specify_input_signature(self):
model = testing_utils.get_small_sequential_mlp(10, 3, None)
inputs = array_ops.ones((8, 5))
with self.assertRaisesRegexp(ValueError, 'input shapes have not been set'):
saving_utils.trace_model_call(model)
fn = saving_utils.trace_model_call(
model, [tensor_spec.TensorSpec(shape=[None, 5], dtype=dtypes.float32)])
signature_outputs = fn(inputs)
expected_outputs = {model.output_names[0]: model(inputs)}
self._assert_all_close(expected_outputs, signature_outputs)
@test_util.run_in_graph_and_eager_modes
def test_subclassed_model_with_input_signature(self):
class Model(keras.Model):
def __init__(self):
super(Model, self).__init__()
self.dense = keras.layers.Dense(3, name='dense')
@def_function.function(
input_signature=[[tensor_spec.TensorSpec([None, 5], dtypes.float32),
tensor_spec.TensorSpec([None], dtypes.float32)]],)
def call(self, inputs, *args):
x, y = inputs
return self.dense(x) + y
model = Model()
fn = saving_utils.trace_model_call(model)
x = array_ops.ones((8, 5), dtype=dtypes.float32)
y = array_ops.ones((3,), dtype=dtypes.float32)
expected_outputs = {'output_1': model([x, y])}
signature_outputs = fn([x, y])
self._assert_all_close(expected_outputs, signature_outputs)
@keras_parameterized.run_with_all_model_types
@test_util.run_in_graph_and_eager_modes
def test_model_with_fixed_input_dim(self):
"""Ensure that the batch_dim is removed when saving.
When serving or retraining, it is important to reset the batch dim.
This can be an issue inside of tf.function. See b/132783590 for context.
"""
model = testing_utils.get_small_mlp(10, 3, 5)
loss_object = keras.losses.MeanSquaredError()
optimizer = gradient_descent.SGD()
@def_function.function
def train_step(data, labels):
with backprop.GradientTape() as tape:
predictions = model(data)
loss = loss_object(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
x = np.random.random((8, 5))
y = np.random.random((8, 3))
train_step(x, y)
fn = saving_utils.trace_model_call(model)
self.assertEqual(fn.input_signature[0].shape.as_list(),
tensor_shape.TensorShape([None, 5]).as_list())
def _import_and_infer(save_dir, inputs):
"""Import a SavedModel into a TF 1.x-style graph and run `signature_key`."""
graph = ops.Graph()
with graph.as_default(), session_lib.Session() as session:
model = loader.load(session, [tag_constants.SERVING], save_dir)
signature = model.signature_def[
signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY]
assert set(inputs.keys()) == set(signature.inputs.keys())
feed_dict = {}
for arg_name in inputs.keys():
feed_dict[graph.get_tensor_by_name(signature.inputs[arg_name].name)] = (
inputs[arg_name])
output_dict = {}
for output_name, output_tensor_info in signature.outputs.items():
output_dict[output_name] = graph.get_tensor_by_name(
output_tensor_info.name)
return session.run(output_dict, feed_dict=feed_dict)
class ModelSaveTest(keras_parameterized.TestCase):
@keras_parameterized.run_with_all_model_types
@test_util.run_v2_only
def test_model_save(self):
input_dim = 5
model = testing_utils.get_small_mlp(10, 3, input_dim)
inputs = array_ops.ones((8, 5))
if testing_utils.get_model_type() == 'subclass':
model._set_inputs(inputs)
save_dir = os.path.join(self.get_temp_dir(), 'saved_model')
save_lib.save(model, save_dir)
self.assertAllClose(
{model.output_names[0]: model.predict_on_batch(inputs)},
_import_and_infer(save_dir, {model.input_names[0]: np.ones((8, 5))}))
class ExtractModelMetricsTest(keras_parameterized.TestCase):
@keras_parameterized.run_all_keras_modes
def test_extract_model_metrics(self):
a = keras.layers.Input(shape=(3,), name='input_a')
b = keras.layers.Input(shape=(3,), name='input_b')
dense = keras.layers.Dense(4, name='dense')
c = dense(a)
d = dense(b)
e = keras.layers.Dropout(0.5, name='dropout')(c)
model = keras.models.Model([a, b], [d, e])
extract_metrics = saving_utils.extract_model_metrics(model)
self.assertEqual(None, extract_metrics)
extract_metric_names = [
'dense_binary_accuracy', 'dropout_binary_accuracy',
'dense_mean_squared_error', 'dropout_mean_squared_error'
]
if tf2.enabled():
extract_metric_names.extend(['dense_mae', 'dropout_mae'])
else:
extract_metric_names.extend(
['dense_mean_absolute_error', 'dropout_mean_absolute_error'])
model_metric_names = ['loss', 'dense_loss', 'dropout_loss'
] + extract_metric_names
model.compile(
loss='mae',
metrics=[
keras.metrics.BinaryAccuracy(), 'mae',
keras.metrics.mean_squared_error
],
optimizer=rmsprop.RMSPropOptimizer(learning_rate=0.01),
run_eagerly=testing_utils.should_run_eagerly(),
run_distributed=testing_utils.should_run_distributed())
extract_metrics = saving_utils.extract_model_metrics(model)
self.assertEqual(set(model_metric_names), set(model.metrics_names))
self.assertEqual(set(extract_metric_names), set(extract_metrics.keys()))
if __name__ == '__main__':
test.main()