blob: 0cd79cf1976e771025f3096fd5bd63809524613b [file] [log] [blame]
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for `models.py` (model cloning, mainly)."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import functools
import os
from absl.testing import parameterized
import numpy as np
from tensorflow.python import keras
from tensorflow.python.eager import context
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.keras import backend as K
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import metrics
from tensorflow.python.keras import models
from tensorflow.python.keras import testing_utils
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import resource_variable_ops
from tensorflow.python.platform import test
from tensorflow.python.training import adam
class TestModel(keras.Model):
"""A model subclass."""
def __init__(self, n_outputs=4, trainable=True):
"""A test class with one dense layer and number of outputs as a variable."""
super(TestModel, self).__init__()
self.layer1 = keras.layers.Dense(n_outputs)
self.n_outputs = resource_variable_ops.ResourceVariable(
n_outputs, trainable=trainable)
def call(self, x):
return self.layer1(x)
def _get_layers(input_shape=(4,), add_input_layer=False):
if add_input_layer:
model_layers = [keras.layers.InputLayer(input_shape=input_shape),
keras.layers.Dense(4)]
elif input_shape:
model_layers = [keras.layers.Dense(4, input_shape=input_shape)]
else:
model_layers = [keras.layers.Dense(4)]
model_layers += [
keras.layers.BatchNormalization(),
keras.layers.Dropout(0.5),
keras.layers.Dense(4)]
return model_layers
def _get_model(input_shape=(4,)):
model_layers = _get_layers(input_shape=None, add_input_layer=False)
return testing_utils.get_model_from_layers(
model_layers, input_shape=input_shape)
class TestModelCloning(keras_parameterized.TestCase):
@keras_parameterized.run_all_keras_modes
@parameterized.named_parameters([
{'testcase_name': 'has_input_layer',
'input_shape': (4,),
'add_input_layer': True,
'share_weights': False},
{'testcase_name': 'no_input_layer',
'input_shape': None,
'add_input_layer': False,
'share_weights': False},
{'testcase_name': 'has_input_layer_share_weights',
'input_shape': (4,),
'add_input_layer': True,
'share_weights': True},
{'testcase_name': 'no_input_layer_share_weights',
'input_shape': None,
'add_input_layer': False,
'share_weights': True},
])
def test_clone_sequential_model(
self, input_shape, add_input_layer, share_weights):
if share_weights:
clone_fn = functools.partial(
keras.models._clone_sequential_model, layer_fn=models.share_weights)
else:
clone_fn = keras.models.clone_model
val_a = np.random.random((10, 4))
model = models.Sequential(_get_layers(input_shape, add_input_layer))
# Sanity check
self.assertEqual(
isinstance(model._layers[0], keras.layers.InputLayer),
add_input_layer)
self.assertEqual(model._is_graph_network, add_input_layer)
# With placeholder creation -- clone model should have an InputLayer
# if the original model has one.
new_model = clone_fn(model)
self.assertEqual(
isinstance(new_model._layers[0], keras.layers.InputLayer),
add_input_layer)
self.assertEqual(new_model._is_graph_network, model._is_graph_network)
if input_shape:
# update ops from batch norm needs to be included
self.assertEqual(len(new_model.get_updates_for(new_model.inputs)), 2)
# On top of new tensor -- clone model should always have an InputLayer.
input_a = keras.Input(shape=(4,))
new_model = clone_fn(model, input_tensors=input_a)
self.assertIsInstance(new_model._layers[0], keras.layers.InputLayer)
self.assertTrue(new_model._is_graph_network)
# On top of new, non-Keras tensor -- clone model should always have an
# InputLayer.
if not context.executing_eagerly():
# TODO(b/121277734):Skip Eager contexts, as Input() layers raise an error
# saying they should not be used with EagerTensors
input_a = keras.backend.variable(val_a)
new_model = clone_fn(model, input_tensors=input_a)
self.assertIsInstance(new_model._layers[0], keras.layers.InputLayer)
self.assertTrue(new_model._is_graph_network)
@keras_parameterized.run_all_keras_modes
@parameterized.named_parameters([
{'testcase_name': 'clone_weights', 'share_weights': False},
{'testcase_name': 'share_weights', 'share_weights': True},
])
def test_clone_functional_model(self, share_weights):
if share_weights:
clone_fn = functools.partial(
keras.models._clone_functional_model, layer_fn=models.share_weights)
else:
clone_fn = keras.models.clone_model
val_a = np.random.random((10, 4))
val_b = np.random.random((10, 4))
val_out = np.random.random((10, 4))
input_a = keras.Input(shape=(4,))
input_b = keras.Input(shape=(4,))
dense_1 = keras.layers.Dense(4,)
dense_2 = keras.layers.Dense(4,)
x_a = dense_1(input_a)
x_a = keras.layers.Dropout(0.5)(x_a)
x_a = keras.layers.BatchNormalization()(x_a)
x_b = dense_1(input_b)
x_a = dense_2(x_a)
outputs = keras.layers.add([x_a, x_b])
model = keras.models.Model([input_a, input_b], outputs)
# With placeholder creation
new_model = clone_fn(model)
self.assertEqual(len(new_model.get_updates_for(new_model.inputs)), 2)
new_model.compile(
testing_utils.get_v2_optimizer('rmsprop'),
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
run_distributed=testing_utils.should_run_distributed())
new_model.train_on_batch([val_a, val_b], val_out)
# On top of new tensors
input_a = keras.Input(shape=(4,), name='a')
input_b = keras.Input(shape=(4,), name='b')
new_model = keras.models.clone_model(
model, input_tensors=[input_a, input_b])
self.assertEqual(len(new_model.get_updates_for(new_model.inputs)), 2)
new_model.compile(
testing_utils.get_v2_optimizer('rmsprop'),
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
run_distributed=testing_utils.should_run_distributed())
new_model.train_on_batch([val_a, val_b], val_out)
# On top of new, non-Keras tensors
if not context.executing_eagerly():
# TODO(b/121277734):Skip Eager contexts, as Input() layers raise an error
# saying they should not be used with EagerTensors
input_a = keras.backend.variable(val_a)
input_b = keras.backend.variable(val_b)
new_model = clone_fn(model, input_tensors=[input_a, input_b])
self.assertEqual(len(new_model.get_updates_for(new_model.inputs)), 2)
new_model.compile(
testing_utils.get_v2_optimizer('rmsprop'),
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
run_distributed=testing_utils.should_run_distributed())
new_model.train_on_batch(None, val_out)
@keras_parameterized.run_all_keras_modes
@parameterized.named_parameters([
{'testcase_name': 'clone_weights', 'share_weights': False},
{'testcase_name': 'share_weights', 'share_weights': True},
])
def test_clone_functional_with_masking(self, share_weights):
if share_weights:
clone_fn = functools.partial(
keras.models._clone_functional_model, layer_fn=models.share_weights)
else:
clone_fn = keras.models.clone_model
x = np.array([[[1.], [1.]], [[0.], [0.]]])
inputs = keras.Input((2, 1))
outputs = keras.layers.Masking(mask_value=0)(inputs)
outputs = keras.layers.TimeDistributed(
keras.layers.Dense(1, kernel_initializer='one'))(outputs)
model = keras.Model(inputs, outputs)
model = clone_fn(model)
model.compile(
loss='mse',
optimizer=testing_utils.get_v2_optimizer('adam'),
run_eagerly=testing_utils.should_run_eagerly(),
run_distributed=testing_utils.should_run_distributed())
y = np.array([[[1], [1]], [[1], [1]]])
loss = model.train_on_batch(x, y)
self.assertEqual(float(loss), 0.)
def test_model_cloning_invalid_use_cases(self):
seq_model = keras.models.Sequential()
seq_model.add(keras.layers.Dense(4, input_shape=(4,)))
x = keras.Input((4,))
y = keras.layers.Dense(4)(x)
fn_model = keras.models.Model(x, y)
with self.assertRaises(ValueError):
keras.models._clone_functional_model(seq_model)
with self.assertRaises(ValueError):
keras.models._clone_functional_model(None)
with self.assertRaises(ValueError):
keras.models._clone_sequential_model(fn_model)
with self.assertRaises(ValueError):
keras.models._clone_sequential_model(seq_model, input_tensors=[x, x])
with self.assertRaises(ValueError):
keras.models._clone_sequential_model(seq_model, input_tensors=y)
def test_functional_cloning_does_not_create_unnecessary_placeholders(self):
with ops.Graph().as_default():
x = keras.Input((4,))
y = keras.layers.Dense(4)(x)
model = keras.models.Model(x, y)
graph = ops.Graph()
with graph.as_default():
x = array_ops.ones((10, 4))
_ = keras.models.clone_model(model, input_tensors=[x])
has_placeholder = _has_placeholder(graph)
self.assertFalse(has_placeholder)
def test_sequential_cloning_does_not_create_unnecessary_placeholders(self):
with ops.Graph().as_default():
model = keras.models.Sequential()
model.add(keras.layers.Dense(4, input_shape=(4,)))
graph = ops.Graph()
with graph.as_default():
x = array_ops.ones((10, 4))
_ = keras.models.clone_model(model, input_tensors=[x])
has_placeholder = _has_placeholder(graph)
self.assertFalse(has_placeholder)
def _has_placeholder(graph):
ops_types = [op.type for op in graph.get_operations()]
return any('Placeholder' in s for s in ops_types)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
class CheckpointingTests(keras_parameterized.TestCase):
def test_optimizer_dependency(self):
model = _get_model()
opt = adam.AdamOptimizer(.01)
model.compile(
optimizer=opt,
loss='mse',
run_eagerly=testing_utils.should_run_eagerly(),
run_distributed=testing_utils.should_run_distributed())
model.fit(
x=np.array([[1., 2., 3., 4.]]),
y=np.array([[1., 1., 1., 1.]]),
epochs=2)
save_prefix = os.path.join(self.get_temp_dir(), 'ckpt')
beta1_power, _ = opt._get_beta_accumulators()
self.evaluate(beta1_power.assign(12.))
model.save_weights(save_prefix)
self.evaluate(beta1_power.assign(13.))
model.load_weights(save_prefix)
self.assertEqual(12., self.evaluate(beta1_power))
@keras_parameterized.run_all_keras_modes
class TestModelBackend(keras_parameterized.TestCase):
def test_model_backend_float64_use_cases(self):
# Test case for GitHub issue 19318
floatx = keras.backend.floatx()
keras.backend.set_floatx('float64')
x = keras.Input((5,))
y = keras.layers.Dense(1)(x)
model = keras.models.Model(x, y)
model.compile(
testing_utils.get_v2_optimizer('rmsprop'),
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
run_distributed=testing_utils.should_run_distributed())
keras.backend.set_floatx(floatx)
class TestCloneAndBuildModel(keras_parameterized.TestCase):
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_clone_and_build_non_compiled_model(self):
inp = np.random.random((10, 4))
out = np.random.random((10, 4))
model = _get_model()
with self.assertRaisesRegexp(ValueError, 'has not been compiled'):
models.clone_and_build_model(model, compile_clone=True)
is_subclassed = (testing_utils.get_model_type() == 'subclass')
# With placeholder creation
new_model = models.clone_and_build_model(
model, compile_clone=False, in_place_reset=is_subclassed)
with self.assertRaisesRegexp(RuntimeError, 'must compile'):
new_model.evaluate(inp, out)
with self.assertRaisesRegexp(RuntimeError, 'must compile'):
new_model.train_on_batch(inp, out)
new_model.compile(
testing_utils.get_v2_optimizer('rmsprop'),
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
run_distributed=testing_utils.should_run_distributed())
new_model.train_on_batch(inp, out)
# Create new tensors for inputs and targets
input_a = keras.Input(shape=(4,))
target_a = keras.Input(shape=(4,))
new_model = models.clone_and_build_model(
model, input_tensors=input_a, target_tensors=[target_a],
compile_clone=False, in_place_reset=is_subclassed)
with self.assertRaisesRegexp(RuntimeError, 'must compile'):
new_model.evaluate(inp, out)
with self.assertRaisesRegexp(RuntimeError, 'must compile'):
new_model.train_on_batch(inp, out)
new_model.compile(
testing_utils.get_v2_optimizer('rmsprop'),
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
run_distributed=testing_utils.should_run_distributed())
new_model.train_on_batch(inp, out)
def _assert_same_compile_params(self, model):
"""Assert that two models have the same compile parameters."""
self.assertEqual('mse', model.loss)
self.assertTrue(
isinstance(model.optimizer,
(keras.optimizers.RMSprop,
keras.optimizer_v2.rmsprop.RMSprop)))
self.assertEqual(['acc', metrics.categorical_accuracy],
model._compile_metrics)
def _clone_and_build_test_helper(self, model, model_type):
inp = np.random.random((10, 4))
out = np.random.random((10, 4))
is_subclassed = (model_type == 'subclass')
# With placeholder creation
new_model = models.clone_and_build_model(
model, compile_clone=True, in_place_reset=is_subclassed)
self._assert_same_compile_params(new_model)
new_model.train_on_batch(inp, out)
new_model.evaluate(inp, out)
# Create new tensors for inputs and targets
input_a = keras.Input(shape=(4,), name='a')
new_model = models.clone_and_build_model(
model, input_tensors=input_a, compile_clone=True,
in_place_reset=is_subclassed)
self._assert_same_compile_params(new_model)
new_model.train_on_batch(inp, out)
new_model.evaluate(inp, out)
target_a = keras.Input(shape=(4,), name='b')
new_model = models.clone_and_build_model(
model, input_tensors=input_a, target_tensors=[target_a],
compile_clone=True, in_place_reset=is_subclassed)
self._assert_same_compile_params(new_model)
new_model.train_on_batch(inp, out)
new_model.evaluate(inp, out)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_clone_and_build_compiled(self):
model = _get_model()
model.compile(
testing_utils.get_v2_optimizer('rmsprop'),
'mse',
metrics=['acc', metrics.categorical_accuracy],
run_eagerly=testing_utils.should_run_eagerly(),
run_distributed=testing_utils.should_run_distributed())
self._clone_and_build_test_helper(model, testing_utils.get_model_type())
@keras_parameterized.run_all_keras_modes
def test_clone_and_build_sequential_without_inputs_defined(self):
model = models.Sequential(_get_layers(input_shape=None))
model.compile(
testing_utils.get_v2_optimizer('rmsprop'),
'mse',
metrics=['acc', metrics.categorical_accuracy],
run_eagerly=testing_utils.should_run_eagerly(),
run_distributed=testing_utils.should_run_distributed())
self._clone_and_build_test_helper(model, 'sequential')
inp = np.random.random((10, 4))
out = np.random.random((10, 4))
model.train_on_batch(inp, out)
self._clone_and_build_test_helper(model, 'sequential')
def assert_optimizer_iterations_increases(self, optimizer):
model = _get_model()
model.compile(
optimizer,
'mse',
metrics=['acc', metrics.categorical_accuracy],
run_eagerly=testing_utils.should_run_eagerly(),
run_distributed=testing_utils.should_run_distributed())
global_step = keras.backend.variable(123, dtype=dtypes.int64)
clone_model = models.clone_and_build_model(
model, compile_clone=True, optimizer_iterations=global_step,
in_place_reset=(testing_utils.get_model_type() == 'subclass'))
inp = np.random.random((10, 4))
out = np.random.random((10, 4))
clone_model.train_on_batch(inp, out)
self.assertEqual(K.eval(global_step), 124)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_replace_tf_optimizer_iterations_variable(self):
self.assert_optimizer_iterations_increases(adam.AdamOptimizer(0.01))
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_replace_keras_optimizer_iterations_variable(self):
if testing_utils.should_run_eagerly():
# This needs to be updated to run with v2 optimizers.
self.skipTest('b/120991591')
self.assert_optimizer_iterations_increases('adam')
def test_clone_optimizer_in_different_graph(self):
with ops.Graph().as_default():
with self.session():
model = testing_utils.get_small_sequential_mlp(3, 4)
optimizer = keras.optimizer_v2.adam.Adam()
model.compile(
optimizer, 'mse', metrics=['acc', metrics.categorical_accuracy],
)
model.fit(
x=np.array([[1., 2., 3., 4.]]),
y=np.array([[1., 1., 1., 1.]]),
epochs=1)
optimizer_config = optimizer.get_config()
with ops.Graph().as_default():
with self.session():
with self.assertRaisesRegexp(ValueError,
'Cannot use the given session'):
models.clone_and_build_model(model, compile_clone=True)
# The optimizer_config object allows the model to be cloned in a
# different graph.
models.clone_and_build_model(model, compile_clone=True,
optimizer_config=optimizer_config)
if __name__ == '__main__':
test.main()