| # Copyright 2018 The TensorFlow Authors. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============================================================================== |
| """Tests for compiled Model subclassing.""" |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import os |
| |
| import numpy as np |
| |
| from tensorflow.python import keras |
| from tensorflow.python.data.ops import dataset_ops |
| from tensorflow.python.keras import keras_parameterized |
| from tensorflow.python.keras import model_subclassing_test_util as model_util |
| from tensorflow.python.keras import testing_utils |
| from tensorflow.python.ops import math_ops |
| from tensorflow.python.platform import test |
| |
| try: |
| import h5py # pylint:disable=g-import-not-at-top |
| except ImportError: |
| h5py = None |
| |
| |
| @keras_parameterized.run_all_keras_modes |
| class ModelSubclassCompiledTest(keras_parameterized.TestCase): |
| |
| def test_single_io_workflow_with_np_arrays(self): |
| num_classes = 2 |
| num_samples = 100 |
| input_dim = 50 |
| |
| model = model_util.SimpleTestModel( |
| num_classes=num_classes, use_dp=True, use_bn=True) |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| metrics=['acc', keras.metrics.CategoricalAccuracy()], |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| |
| x = np.ones((num_samples, input_dim)) |
| y = np.zeros((num_samples, num_classes)) |
| |
| model.fit(x, y, epochs=2, batch_size=32, verbose=0) |
| _ = model.evaluate(x, y, verbose=0) |
| |
| def test_multi_io_workflow_with_np_arrays(self): |
| num_classes = (2, 3) |
| num_samples = 1000 |
| input_dim = 50 |
| |
| model = model_util.MultiIOTestModel( |
| num_classes=num_classes, use_dp=True, use_bn=True) |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| metrics=['acc'], |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| |
| x1 = np.ones((num_samples, input_dim)) |
| x2 = np.ones((num_samples, input_dim)) |
| y1 = np.zeros((num_samples, num_classes[0])) |
| y2 = np.zeros((num_samples, num_classes[1])) |
| |
| model.fit([x1, x2], [y1, y2], epochs=2, batch_size=32, verbose=0) |
| _ = model.evaluate([x1, x2], [y1, y2], verbose=0) |
| |
| def test_single_io_workflow_with_datasets(self): |
| num_classes = 2 |
| num_samples = 10 |
| input_dim = 50 |
| |
| with self.cached_session(): |
| model = model_util.SimpleTestModel( |
| num_classes=num_classes, use_dp=True, use_bn=True) |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| |
| x = np.ones((num_samples, input_dim), dtype=np.float32) |
| y = np.zeros((num_samples, num_classes), dtype=np.float32) |
| dataset = dataset_ops.Dataset.from_tensor_slices((x, y)) |
| dataset = dataset.repeat(100) |
| dataset = dataset.batch(10) |
| |
| model.fit(dataset, epochs=2, steps_per_epoch=10, verbose=0) |
| _ = model.evaluate(dataset, steps=10, verbose=0) |
| |
| def test_attributes(self): |
| # layers, weights, trainable_weights, non_trainable_weights, inputs, outputs |
| |
| num_classes = (2, 3) |
| num_samples = 100 |
| input_dim = 50 |
| |
| model = model_util.MultiIOTestModel(num_classes=num_classes, use_bn=True) |
| |
| x1 = np.ones((num_samples, input_dim)) |
| x2 = np.ones((num_samples, input_dim)) |
| y1 = np.zeros((num_samples, num_classes[0])) |
| y2 = np.zeros((num_samples, num_classes[1])) |
| |
| self.assertEqual(model.name, 'test_model') |
| self.assertEqual(model.built, False) |
| self.assertEqual(len(model.weights), 0) |
| |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| model.train_on_batch([x1, x2], [y1, y2]) |
| |
| self.assertEqual(model.built, True) |
| self.assertEqual(len(model.layers), 4) |
| self.assertEqual(len(model.weights), 10) |
| self.assertEqual(len(model.trainable_weights), 8) |
| self.assertEqual(len(model.non_trainable_weights), 2) |
| self.assertEqual(len(model.inputs), 2) |
| self.assertEqual(len(model.outputs), 2) |
| |
| def test_updates(self): |
| # test that updates get run during training |
| num_samples = 100 |
| input_dim = 50 |
| |
| class BNNet(keras.Model): |
| |
| def __init__(self): |
| super(BNNet, self).__init__() |
| self.bn = keras.layers.BatchNormalization(beta_initializer='ones', |
| gamma_initializer='ones') |
| |
| def call(self, inputs): |
| return self.bn(inputs) |
| |
| x = np.ones((num_samples, input_dim)) |
| y = np.ones((num_samples, input_dim)) |
| |
| model = BNNet() |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| y_ref = model.predict(x) |
| |
| model.train_on_batch(x, y) |
| y_new = model.predict(x) |
| self.assertGreater(np.sum(np.abs(y_ref - y_new)), 0.1) |
| |
| def test_training_and_inference_behavior(self): |
| # test that dropout is applied in training and not inference |
| |
| num_samples = 100 |
| input_dim = 50 |
| |
| class DPNet(keras.Model): |
| |
| def __init__(self): |
| super(DPNet, self).__init__() |
| self.dp = keras.layers.Dropout(0.5) |
| self.dense = keras.layers.Dense(1, |
| use_bias=False, |
| kernel_initializer='ones') |
| |
| def call(self, inputs): |
| x = self.dp(inputs) |
| return self.dense(x) |
| |
| model = DPNet() |
| x = np.ones((num_samples, input_dim)) |
| y = model.predict(x) |
| self.assertEqual(np.sum(y), np.sum(x)) |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| loss = model.train_on_batch(x, y) |
| self.assertGreater(loss, 0.1) |
| |
| def test_training_methods(self): |
| # test fit, train_on_batch |
| # on different input types: list, dict |
| |
| num_classes = (2, 3) |
| num_samples = 100 |
| input_dim = 50 |
| |
| x1 = np.ones((num_samples, input_dim)) |
| x2 = np.ones((num_samples, input_dim)) |
| y1 = np.zeros((num_samples, num_classes[0])) |
| y2 = np.zeros((num_samples, num_classes[1])) |
| |
| model = model_util.MultiIOTestModel(num_classes=num_classes, use_bn=True) |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| model.fit([x1, x2], [y1, y2], epochs=2, batch_size=32, verbose=0) |
| model.fit({'input_1': x1, 'input_2': x2}, |
| {'output_1': y1, 'output_2': y2}, |
| epochs=2, batch_size=32) |
| model.fit([x1, x2], [y1, y2], epochs=2, batch_size=32, verbose=0, |
| validation_data=([x1, x2], [y1, y2])) |
| |
| model = model_util.MultiIOTestModel(num_classes=num_classes, use_bn=True) |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| model.train_on_batch([x1, x2], [y1, y2]) |
| model.train_on_batch({'input_1': x1, 'input_2': x2}, |
| {'output_1': y1, 'output_2': y2}) |
| |
| def test_inference_methods(self): |
| # test predict, evaluate, test_on_batch, predict_on_batch |
| # on different input types: list, dict |
| num_classes = (2, 3) |
| num_samples = 100 |
| input_dim = 50 |
| |
| x1 = np.ones((num_samples, input_dim)) |
| x2 = np.ones((num_samples, input_dim)) |
| y1 = np.zeros((num_samples, num_classes[0])) |
| y2 = np.zeros((num_samples, num_classes[1])) |
| |
| model = model_util.MultiIOTestModel(num_classes=num_classes, use_bn=True) |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| model.evaluate([x1, x2], [y1, y2]) |
| model.test_on_batch([x1, x2], [y1, y2]) |
| |
| model = model_util.MultiIOTestModel(num_classes=num_classes, use_bn=True) |
| model.predict([x1, x2]) |
| |
| model = model_util.MultiIOTestModel(num_classes=num_classes, use_bn=True) |
| model.predict_on_batch([x1, x2]) |
| |
| def test_saving(self): |
| num_classes = (2, 3) |
| num_samples = 100 |
| input_dim = 50 |
| |
| x1 = np.ones((num_samples, input_dim)) |
| x2 = np.ones((num_samples, input_dim)) |
| y1 = np.zeros((num_samples, num_classes[0])) |
| y2 = np.zeros((num_samples, num_classes[1])) |
| |
| model = model_util.MultiIOTestModel(num_classes=num_classes, use_bn=True) |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| model.fit([x1, x2], [y1, y2], epochs=2, batch_size=32, verbose=0) |
| y_ref_1, y_ref_2 = model.predict([x1, x2]) |
| |
| tf_format_name = os.path.join(self.get_temp_dir(), 'ckpt') |
| model.save_weights(tf_format_name) |
| if h5py is not None: |
| hdf5_format_name = os.path.join(self.get_temp_dir(), 'weights.h5') |
| model.save_weights(hdf5_format_name) |
| |
| model = model_util.MultiIOTestModel(num_classes=num_classes, use_bn=True) |
| |
| if h5py is not None: |
| with self.assertRaises(ValueError): |
| model.load_weights(hdf5_format_name) |
| |
| model.load_weights(tf_format_name) |
| |
| y1, y2 = model.predict([x1, x2]) |
| self.assertAllClose(y_ref_1, y1, atol=1e-5) |
| self.assertAllClose(y_ref_2, y2, atol=1e-5) |
| |
| if h5py is not None: |
| model.load_weights(hdf5_format_name) |
| |
| y1, y2 = model.predict([x1, x2]) |
| self.assertAllClose(y_ref_1, y1, atol=1e-5) |
| self.assertAllClose(y_ref_2, y2, atol=1e-5) |
| |
| def test_subclass_nested_in_subclass(self): |
| num_classes = 2 |
| num_samples = 100 |
| input_dim = 50 |
| |
| model = model_util.NestedTestModel1(num_classes=num_classes) |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| metrics=['acc'], |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| |
| x = np.ones((num_samples, input_dim)) |
| y = np.zeros((num_samples, num_classes)) |
| |
| model.fit(x, y, epochs=2, batch_size=32, verbose=0) |
| _ = model.evaluate(x, y, verbose=0) |
| |
| self.assertEqual(len(model.weights), 8 + len(model.test_net.weights)) |
| self.assertEqual(len(model.non_trainable_weights), |
| 2 + len(model.test_net.non_trainable_weights)) |
| self.assertEqual(len(model.trainable_weights), |
| 6 + len(model.test_net.trainable_weights)) |
| |
| def test_graph_nested_in_subclass(self): |
| num_classes = 2 |
| num_samples = 100 |
| input_dim = 50 |
| |
| model = model_util.NestedTestModel2(num_classes=num_classes) |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| metrics=['acc'], |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| |
| x = np.ones((num_samples, input_dim)) |
| y = np.zeros((num_samples, num_classes)) |
| |
| model.fit(x, y, epochs=2, batch_size=32, verbose=0) |
| _ = model.evaluate(x, y, verbose=0) |
| |
| self.assertEqual(len(model.weights), 8 + len(model.test_net.weights)) |
| self.assertEqual(len(model.non_trainable_weights), |
| 2 + len(model.test_net.non_trainable_weights)) |
| self.assertEqual(len(model.trainable_weights), |
| 6 + len(model.test_net.trainable_weights)) |
| |
| def test_subclass_nested_in_graph(self): |
| num_classes = 2 |
| num_samples = 100 |
| input_dim = 50 |
| |
| model = model_util.get_nested_model_3( |
| input_dim=input_dim, num_classes=num_classes) |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| metrics=['acc'], |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| |
| x = np.ones((num_samples, input_dim)) |
| y = np.zeros((num_samples, num_classes)) |
| |
| model.fit(x, y, epochs=2, batch_size=32, verbose=0) |
| _ = model.evaluate(x, y, verbose=0) |
| |
| self.assertEqual(len(model.weights), 16) |
| self.assertEqual(len(model.non_trainable_weights), 4) |
| self.assertEqual(len(model.trainable_weights), 12) |
| |
| def test_subclass_nested_in_sequential(self): |
| num_classes = 2 |
| num_samples = 100 |
| input_dim = 50 |
| |
| class Inner(keras.Model): |
| |
| def __init__(self): |
| super(Inner, self).__init__() |
| self.dense1 = keras.layers.Dense(32, activation='relu') |
| self.dense2 = keras.layers.Dense(num_classes, activation='relu') |
| self.bn = keras.layers.BatchNormalization() |
| |
| def call(self, inputs): |
| x = self.dense1(inputs) |
| x = self.dense2(x) |
| return self.bn(x) |
| |
| model = keras.Sequential([Inner()]) |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| metrics=['acc'], |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| |
| x = np.ones((num_samples, input_dim)) |
| y = np.zeros((num_samples, num_classes)) |
| model.fit(x, y, epochs=2, batch_size=32, verbose=0) |
| _ = model.evaluate(x, y, verbose=0) |
| |
| self.assertEqual(len(model.weights), 8) |
| self.assertEqual(len(model.non_trainable_weights), 2) |
| self.assertEqual(len(model.trainable_weights), 6) |
| |
| def test_support_for_manual_training_arg(self): |
| # In most cases, the `training` argument is left unspecified, in which |
| # case it defaults to value corresponding to the Model method being used |
| # (fit -> True, predict -> False, etc). |
| # If the user writes their model `call` method to take |
| # an explicit `training` argument, we must check that the correct value |
| # is being passed to the model for each method call. |
| |
| class DPNet(keras.Model): |
| |
| def __init__(self): |
| super(DPNet, self).__init__() |
| self.dp = keras.layers.Dropout(0.5) |
| self.dense = keras.layers.Dense(1, |
| use_bias=False, |
| kernel_initializer='ones') |
| |
| def call(self, inputs, training=False): |
| x = self.dp(inputs, training=training) |
| return self.dense(x) |
| |
| model = DPNet() |
| x = np.ones((10, 10)) |
| y = model.predict(x) |
| self.assertEqual(np.sum(y), np.sum(x)) |
| model.compile( |
| loss='mse', |
| optimizer='rmsprop', |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| loss = model.train_on_batch(x, y) |
| self.assertGreater(loss, 0.1) |
| |
| def test_no_loss_in_compile(self): |
| |
| class InternalLossModel(keras.Model): |
| |
| def __init__(self): |
| super(InternalLossModel, self).__init__() |
| self.dense = keras.layers.Dense(1) |
| |
| def call(self, inputs): |
| out = self.dense(inputs) |
| self.add_loss(math_ops.reduce_sum(out)) |
| return out |
| |
| model = InternalLossModel() |
| x = np.ones((10, 10)) |
| model.predict(x) |
| model.compile( |
| optimizer='rmsprop', |
| run_eagerly=testing_utils.should_run_eagerly(), |
| experimental_run_tf_function=testing_utils.should_run_tf_function()) |
| model.fit(x) |
| model.evaluate(x) |
| |
| |
| if __name__ == '__main__': |
| test.main() |