blob: a83689cd8ab4f9c1176511e799941f8a22077422 [file] [log] [blame]
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for training routines."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import io
import logging
import re
import sys
from absl.testing import parameterized
import numpy as np
import six
from tensorflow.python import keras
from tensorflow.python import tf2
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.eager import context
from tensorflow.python.eager import def_function
from tensorflow.python.eager import function
from tensorflow.python.framework import ops
from tensorflow.python.framework import tensor_shape
from tensorflow.python.framework import test_util as tf_test_util
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import losses
from tensorflow.python.keras import metrics as metrics_module
from tensorflow.python.keras import testing_utils
from tensorflow.python.keras.callbacks import Callback
from tensorflow.python.keras.engine import training_utils
from tensorflow.python.keras.optimizer_v2 import gradient_descent
from tensorflow.python.keras.utils import data_utils
from tensorflow.python.keras.utils import np_utils
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import sparse_ops
from tensorflow.python.ops import state_ops
from tensorflow.python.ops import variables as variables_lib
from tensorflow.python.platform import test
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.training.rmsprop import RMSPropOptimizer
try:
import scipy.sparse as scipy_sparse # pylint: disable=g-import-not-at-top
except ImportError:
scipy_sparse = None
class CompileTest(keras_parameterized.TestCase):
def _get_multi_output_model(self):
input_a = keras.layers.Input(shape=(3,), name='input_a')
output_a = keras.layers.Dense(1, name='dense_1')(input_a)
output_b = keras.layers.Dense(1, name='dense_2')(input_a)
return keras.models.Model(input_a, [output_a, output_b])
def _do_test_compile_with_model_and_single_loss(self, model, loss):
model.compile(
optimizer='adam',
loss=loss,
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
self.assertEqual(model.loss, loss)
loss = losses.get(loss)
if not isinstance(loss, list):
loss_list = [loss] * len(model.outputs)
self.assertEqual(len(model.loss_functions), len(loss_list))
for i in range(len(loss_list)):
self.assertIsInstance(model.loss_functions[i], losses.LossFunctionWrapper)
if not isinstance(loss_list[i], losses.LossFunctionWrapper):
self.assertEqual(model.loss_functions[i].fn, loss_list[i])
self.assertAllEqual(model._loss_weights_list, [1.] * len(loss_list))
def test_respect_run_functions_eagerly(self):
with context.eager_mode():
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=2, input_dim=3)
model.compile('sgd', 'mse')
def_function.run_functions_eagerly(True)
self.assertTrue(model.run_eagerly)
def_function.run_functions_eagerly(False)
self.assertFalse(model.run_eagerly)
@keras_parameterized.run_all_keras_modes
@parameterized.named_parameters(('loss_string', 'mse'),
('loss_function', losses.mean_squared_error),
('loss_instance', losses.MeanSquaredError()))
def test_compile_with_single_output(self, loss):
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=2, input_dim=3)
self._do_test_compile_with_model_and_single_loss(model, loss)
@keras_parameterized.run_all_keras_modes
@parameterized.named_parameters(('loss_string', 'mse'),
('loss_function', losses.mean_squared_error),
('loss_instance', losses.MeanSquaredError()))
def test_compile_with_multi_output(self, loss):
model = self._get_multi_output_model()
self._do_test_compile_with_model_and_single_loss(model, loss)
@keras_parameterized.run_all_keras_modes
def test_compile_with_multi_output_and_multi_loss(self):
model = self._get_multi_output_model()
# Test loss is a list.
loss = ['mse', 'mae']
model.compile(
optimizer='adam',
loss=loss,
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
self.assertEqual(model.loss_functions[0].fn, losses.mean_squared_error)
self.assertEqual(model.loss_functions[1].fn, losses.mean_absolute_error)
self.assertAllEqual(model._loss_weights_list, [1., 1.])
# Test loss is a dict.
loss = {'dense_1': 'mae', 'dense_2': 'mse'}
model.compile(
optimizer='adam',
loss=loss,
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
self.assertEqual(model.loss_functions[0].fn, losses.mean_absolute_error)
self.assertEqual(model.loss_functions[1].fn, losses.mean_squared_error)
self.assertAllEqual(model._loss_weights_list, [1., 1.])
@keras_parameterized.run_all_keras_modes
def test_compile_with_multi_output_and_loss_weights_list(self):
model = self._get_multi_output_model()
loss_weights = [1., 2.]
model.compile(
optimizer='adam',
loss='mse',
loss_weights=loss_weights,
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
self.assertAllEqual(model._loss_weights_list, [1., 2.])
def test_compile_with_multi_output_and_loss_weights_dict(self):
with context.graph_mode():
model = self._get_multi_output_model()
loss_weights = {'dense_1': 1., 'dense_2': 2.}
model.compile(optimizer='adam', loss='mse', loss_weights=loss_weights)
self.assertAllEqual(model._loss_weights_list, [1., 2.])
input_np = np.random.random((10, 3))
output_a_np = np.random.random((10, 1))
output_b_np = np.random.random((10, 1))
with self.cached_session() as sess:
sess.run(variables_lib.global_variables_initializer())
total_loss, y_preds = sess.run(
[model.total_loss, model.outputs],
feed_dict={
'input_a:0': input_np,
'dense_1_target:0': output_a_np,
'dense_2_target:0': output_b_np
})
self.assertAllClose(
total_loss,
np.mean(
np.add((output_a_np - y_preds[0])**2,
2 * (output_b_np - y_preds[1])**2)))
@keras_parameterized.run_all_keras_modes
def test_compile_with_incorrect_loss_size(self):
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=2, input_dim=3)
with self.assertRaisesRegexp(ValueError, 'The model has 1 outputs'):
model.compile(
optimizer='adam',
loss=['mse', 'mae'],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
@keras_parameterized.run_all_keras_modes
def test_compile_with_incorrect_loss_key(self):
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=2, input_dim=3)
with self.assertRaisesRegexp(
ValueError,
r'Unknown entries in loss dictionary: \[\'unknown_output\'\]. '
r'Only expected following keys: \[\'dense_1\'\]'):
model.compile(
optimizer='adam',
loss={'unknown_output': 'mse'},
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
@keras_parameterized.run_all_keras_modes
def test_compile_with_incorrect_loss_weights_size(self):
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=2, input_dim=3)
with self.assertRaisesRegexp(ValueError,
'it should have one entry per model output'):
model.compile(
optimizer='adam',
loss='mse',
loss_weights=[1., 2.],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
@keras_parameterized.run_all_keras_modes
def test_compile_with_incorrect_loss_weights_key(self):
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=2, input_dim=3)
with self.assertRaisesRegexp(
ValueError,
r'Unknown entries in loss_weights dictionary: \[\'unknown_output\'\]. '
r'Only expected following keys: \[\'dense_1\'\]'):
model.compile(
optimizer='adam',
loss='mse',
loss_weights={'unknown_output': 1.},
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
@keras_parameterized.run_all_keras_modes
def test_compile_with_incorrect_sample_weight_mode(self):
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=2, input_dim=3)
with self.assertRaisesRegexp(
ValueError,
r'Unknown entries in sample_weight_mode dictionary: \[\'unknown\'\]. '
r'Only expected following keys: \[\'dense_1\'\]'):
model.compile(
optimizer='adam',
loss='mse',
sample_weight_mode={'unknown': 'temporal'},
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
@tf_test_util.run_deprecated_v1
def test_compile_with_session_kwargs(self):
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=2, input_dim=3)
# Test that unknown arguments are not accepted
with self.assertRaisesRegexp(
TypeError,
r'Invalid keyword argument'):
model.compile(
optimizer='adam',
loss='mse',
foo=True)
class TrainingTest(keras_parameterized.TestCase):
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_fit_training_arg(self):
class ReturnTraining(keras.layers.Layer):
def call(self, inputs, training):
if training:
return inputs + array_ops.constant([100], 'float32')
else:
return inputs + array_ops.constant([0], 'float32')
model = keras.Sequential([ReturnTraining()])
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
hist = model.fit(x=np.array([0.]), y=np.array([0.]))
self.assertAllClose(hist.history['loss'][0], 10000)
@keras_parameterized.run_all_keras_modes
def test_fit_and_validate_learning_phase(self):
class ReturnTraining(keras.layers.Layer):
def call(self, inputs):
return keras.backend.in_train_phase(
lambda: array_ops.ones_like(inputs),
lambda: array_ops.zeros_like(inputs))
model = keras.Sequential([ReturnTraining(input_shape=(2,))])
model.compile(
'sgd',
loss='mae',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
inputs = np.ones((40, 2), dtype=np.float32)
targets = np.ones((40, 1), dtype=np.float32)
# Test correctness with `steps_per_epoch`.
train_dataset = dataset_ops.Dataset.from_tensor_slices(
(inputs, targets)).batch(10)
val_dataset = dataset_ops.Dataset.from_tensor_slices(
(inputs, targets)).batch(10)
history = model.fit(
train_dataset, epochs=2, verbose=1, validation_data=val_dataset)
# The training loss should be 0.0
self.assertAllClose(history.history['loss'][0], 0.0)
# The validation loss should be 1.0.
self.assertAllClose(history.history['val_loss'][0], 1.0)
@keras_parameterized.run_all_keras_modes
def test_fit_and_validate_training_arg(self):
class ReturnTraining(keras.layers.Layer):
def call(self, inputs, training=None):
return keras.backend.in_train_phase(
lambda: array_ops.ones_like(inputs),
lambda: array_ops.zeros_like(inputs),
training=training)
model = keras.Sequential([ReturnTraining(input_shape=(2,))])
model.compile(
'sgd',
loss='mae',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
inputs = np.ones((40, 2), dtype=np.float32)
targets = np.ones((40, 1), dtype=np.float32)
# Test correctness with `steps_per_epoch`.
train_dataset = dataset_ops.Dataset.from_tensor_slices(
(inputs, targets)).batch(10)
val_dataset = dataset_ops.Dataset.from_tensor_slices(
(inputs, targets)).batch(10)
history = model.fit(
train_dataset, epochs=2, verbose=1, validation_data=val_dataset)
# The training loss should be 0.0
self.assertAllClose(history.history['loss'][0], 0.0)
# The validation loss should be 1.0.
self.assertAllClose(history.history['val_loss'][0], 1.0)
@keras_parameterized.run_all_keras_modes
@keras_parameterized.run_with_all_model_types
def test_target_dtype_matches_output(self):
def _loss_fn(labels, preds):
self.assertEqual(labels.dtype, preds.dtype)
return labels - preds
layers = [keras.layers.Dense(10, dtype=np.float64),
keras.layers.Dense(10, dtype=np.float64)]
model = testing_utils.get_model_from_layers(layers, input_shape=(1,))
inputs = np.ones(10, dtype=np.float64)
targets = np.ones(10, dtype=np.float64)
model.compile(
'sgd',
loss=_loss_fn,
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.train_on_batch(inputs, targets)
model.test_on_batch(inputs, targets)
self.assertEqual(model.predict(inputs).dtype, np.float64)
@keras_parameterized.run_all_keras_modes
def test_fit_and_validate_nested_training_arg(self):
class NestedReturnTraining(keras.layers.Layer):
def call(self, inputs, training=None):
return keras.backend.in_train_phase(
lambda: array_ops.ones_like(inputs),
lambda: array_ops.zeros_like(inputs),
training=training)
class ReturnTraining(keras.layers.Layer):
def __init__(self, input_shape=None, **kwargs):
super(ReturnTraining, self).__init__(input_shape=input_shape, **kwargs)
self._nested_layer = None
def build(self, input_shape):
self._nested_layer = NestedReturnTraining()
self.built = True
def call(self, inputs):
return self._nested_layer(inputs)
model = keras.Sequential([ReturnTraining(input_shape=(2,))])
model.compile(
'sgd',
loss='mae',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
inputs = np.ones((40, 2), dtype=np.float32)
targets = np.ones((40, 1), dtype=np.float32)
# Test correctness with `steps_per_epoch`.
train_dataset = dataset_ops.Dataset.from_tensor_slices(
(inputs, targets)).batch(10)
val_dataset = dataset_ops.Dataset.from_tensor_slices(
(inputs, targets)).batch(10)
history = model.fit(
train_dataset, epochs=2, verbose=1, validation_data=val_dataset)
# The training loss should be 0.0
self.assertAllClose(history.history['loss'][0], 0.0)
# The validation loss should be 1.0.
self.assertAllClose(history.history['val_loss'][0], 1.0)
@keras_parameterized.run_with_all_model_types(exclude_models='sequential')
@keras_parameterized.run_all_keras_modes
def test_fit_on_arrays(self):
input_a = keras.layers.Input(shape=(3,), name='input_a')
input_b = keras.layers.Input(shape=(3,), name='input_b')
dense = keras.layers.Dense(4, name='dense')
dropout = keras.layers.Dropout(0.5, name='dropout')
branch_a = [input_a, dense]
branch_b = [input_b, dense, dropout]
model = testing_utils.get_multi_io_model(branch_a, branch_b)
optimizer = RMSPropOptimizer(learning_rate=0.001)
loss = 'mse'
loss_weights = [1., 0.5]
model.compile(
optimizer,
loss,
metrics=[metrics_module.CategoricalAccuracy(), 'mae'],
loss_weights=loss_weights,
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 3))
output_d_np = np.random.random((10, 4))
output_e_np = np.random.random((10, 4))
# Test fit at different verbosity
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=1,
batch_size=5,
verbose=0)
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=1,
batch_size=5,
verbose=1)
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=2,
batch_size=5,
verbose=2)
model.train_on_batch([input_a_np, input_b_np], [output_d_np, output_e_np])
# Test with validation data
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
validation_data=([input_a_np, input_b_np], [output_d_np,
output_e_np]),
epochs=1,
batch_size=5,
verbose=0)
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
validation_data=([input_a_np, input_b_np], [output_d_np,
output_e_np]),
epochs=2,
batch_size=5,
verbose=1)
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
validation_data=([input_a_np, input_b_np], [output_d_np,
output_e_np]),
epochs=2,
batch_size=5,
verbose=2)
# Test with validation split
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=2,
batch_size=5,
verbose=0,
validation_split=0.2)
if testing_utils.get_model_type() == 'functional':
# Test with dictionary inputs
model.fit(
{
'input_a': input_a_np,
'input_b': input_b_np
}, {
'dense': output_d_np,
'dropout': output_e_np
},
epochs=1,
batch_size=5,
verbose=0)
model.fit(
{
'input_a': input_a_np,
'input_b': input_b_np
}, {
'dense': output_d_np,
'dropout': output_e_np
},
epochs=1,
batch_size=5,
verbose=1)
model.fit(
{
'input_a': input_a_np,
'input_b': input_b_np
}, {
'dense': output_d_np,
'dropout': output_e_np
},
validation_data=({
'input_a': input_a_np,
'input_b': input_b_np
}, {
'dense': output_d_np,
'dropout': output_e_np
}),
epochs=1,
batch_size=5,
verbose=0)
model.train_on_batch({
'input_a': input_a_np,
'input_b': input_b_np
}, {
'dense': output_d_np,
'dropout': output_e_np
})
# Test with lists for loss, metrics
loss = ['mae', 'mse']
model.compile(
optimizer,
loss,
metrics=[metrics_module.CategoricalAccuracy(), 'mae'],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=1,
batch_size=5,
verbose=0)
# Test with dictionaries for loss, metrics, loss weights
if testing_utils.get_model_type() == 'functional':
loss = {'dense': 'mse', 'dropout': 'mae'}
loss_weights = {'dense': 1., 'dropout': 0.5}
metrics = {
'dense': 'mse',
'dropout': metrics_module.CategoricalAccuracy()
}
model.compile(
optimizer,
loss,
metrics=metrics,
loss_weights=loss_weights,
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=1,
batch_size=5,
verbose=0)
# Invalid use cases
with self.assertRaises(ValueError):
model.train_on_batch({'input_a': input_a_np},
[output_d_np, output_e_np])
with self.assertRaises(ValueError):
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=1,
validation_data=([input_a_np, input_b_np], 0, 0),
verbose=0)
with self.assertRaises(ValueError):
model.train_on_batch([input_a_np], [output_d_np, output_e_np])
with self.assertRaises(ValueError):
model.train_on_batch(1, [output_d_np, output_e_np])
with self.assertRaises(ValueError):
model.train_on_batch(input_a_np, [output_d_np, output_e_np])
with self.assertRaises(ValueError):
bad_input = np.random.random((11, 3))
model.train_on_batch([bad_input, input_b_np],
[output_d_np, output_e_np])
with self.assertRaises(ValueError):
bad_target = np.random.random((11, 4))
model.train_on_batch([input_a_np, input_b_np],
[bad_target, output_e_np])
# Build single-input model
x = keras.layers.Input(shape=(3,), name='input_a')
y = keras.layers.Dense(4)(x)
model = keras.models.Model(x, y)
model.compile(
optimizer,
loss='mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
# This will work
model.fit([input_a_np], output_d_np, epochs=1)
# TODO(gsundeep) Test only works in eager, file ticket
if testing_utils.should_run_eagerly() and context.executing_eagerly():
with self.assertRaises(ValueError):
model.fit([input_a_np, input_a_np], output_d_np, epochs=1)
# Test model on a list of floats
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 4))
# Test execution on inputs that are lists of scalars.
# TF2 and TF1 have slightly different semantics:
if (testing_utils.should_run_tf_function()
or testing_utils.should_run_eagerly()):
# In TF2 to avoid any ambiguity when there are nested lists
# the entire input gets converted to a
# single numpy array (& it only works in the case of a single io model)
model.fit(np.ndarray.tolist(input_a_np),
np.ndarray.tolist(input_b_np),
epochs=2,
batch_size=5,
verbose=2)
else:
# In TF1 there was logic to try disambiguating between the individual
# inputs when lists are nested. This allowed multi-io functional models
# to support lists of scalars as input, but it caused ambiguity issues
# for subclass models & made it trickier to pass multi-dimensional inputs
# as lists of scalars to single io models. This was an excessive amount
# of complexity for what boiled down to a convenience method we were
# mainly just using for writing tests.
model.fit([np.ndarray.tolist(input_a_np)],
[np.ndarray.tolist(input_b_np)],
epochs=2,
batch_size=5,
verbose=2)
@keras_parameterized.run_all_keras_modes
def test_evaluate_predict_on_arrays(self):
a = keras.layers.Input(shape=(3,), name='input_a')
b = keras.layers.Input(shape=(3,), name='input_b')
dense = keras.layers.Dense(4, name='dense')
c = dense(a)
d = dense(b)
e = keras.layers.Dropout(0.5, name='dropout')(c)
model = keras.models.Model([a, b], [d, e])
optimizer = RMSPropOptimizer(learning_rate=0.001)
loss = 'mse'
loss_weights = [1., 0.5]
model.compile(
optimizer,
loss,
metrics=['mae', metrics_module.CategoricalAccuracy()],
loss_weights=loss_weights,
sample_weight_mode=None,
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 3))
output_d_np = np.random.random((10, 4))
output_e_np = np.random.random((10, 4))
# Test evaluate at different verbosity
out = model.evaluate(
[input_a_np, input_b_np], [output_d_np, output_e_np],
batch_size=5,
verbose=0)
self.assertEqual(len(out), 7)
out = model.evaluate(
[input_a_np, input_b_np], [output_d_np, output_e_np],
batch_size=5,
verbose=1)
self.assertEqual(len(out), 7)
out = model.evaluate(
[input_a_np, input_b_np], [output_d_np, output_e_np],
batch_size=5,
verbose=2)
self.assertEqual(len(out), 7)
out = model.test_on_batch([input_a_np, input_b_np],
[output_d_np, output_e_np])
self.assertEqual(len(out), 7)
# Test evaluate with dictionary inputs
model.evaluate(
{
'input_a': input_a_np,
'input_b': input_b_np
}, {
'dense': output_d_np,
'dropout': output_e_np
},
batch_size=5,
verbose=0)
model.evaluate(
{
'input_a': input_a_np,
'input_b': input_b_np
}, {
'dense': output_d_np,
'dropout': output_e_np
},
batch_size=5,
verbose=1)
# Test predict
out = model.predict([input_a_np, input_b_np], batch_size=5)
self.assertEqual(len(out), 2)
out = model.predict({'input_a': input_a_np, 'input_b': input_b_np})
self.assertEqual(len(out), 2)
out = model.predict_on_batch({
'input_a': input_a_np,
'input_b': input_b_np
})
self.assertEqual(len(out), 2)
def _make_sequence_input_functions(self, input_type):
# train and test
xy_namedtuple = collections.namedtuple('xy_namedtuple', ['x', 'y'])
# predict
x_namedtuple = collections.namedtuple('x_namedtuple', ['x'])
if input_type == 'dataset':
dataset = dataset_ops.Dataset.range(16).map(
lambda _: array_ops.ones(shape=(1,)))
xy_dataset = dataset_ops.Dataset.zip((dataset, dataset)).batch(4)
x_dataset = dataset.batch(4)
def xy_function(use_namedtuple):
return xy_dataset.map(xy_namedtuple) if use_namedtuple else xy_dataset
def x_function(use_namedtuple):
return x_dataset.map(x_namedtuple) if use_namedtuple else x_dataset
return xy_function, x_function
elif input_type == 'generator':
def xy_generator(use_namedtuple):
x, y = np.ones((4, 1)), np.ones((4, 1))
for _ in range(4):
if use_namedtuple:
yield xy_namedtuple(x, y)
else:
yield x, y
def x_generator(use_namedtuple):
x = np.ones((4, 1))
for _ in range(4):
if use_namedtuple:
yield x_namedtuple(x)
else:
yield x
return xy_generator, x_generator
elif input_type == 'sequence':
class XYSequence(data_utils.Sequence):
def __init__(self, use_namedtuple):
self._use_namedtuple = use_namedtuple
super(XYSequence, self).__init__()
def __getitem__(self, idx):
x, y = np.ones((4, 1)), np.ones((4, 1))
if self._use_namedtuple:
return xy_namedtuple(x, y)
return x, y
def __len__(self):
return 4
class XSequence(data_utils.Sequence):
def __init__(self, use_namedtuple):
self._use_namedtuple = use_namedtuple
super(XSequence, self).__init__()
def __getitem__(self, idx):
x = np.ones((4, 1))
if self._use_namedtuple:
return x_namedtuple(x)
return x
def __len__(self):
return 4
return XYSequence, XSequence
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
@keras_parameterized.run_with_all_model_types
@parameterized.named_parameters(
('dataset', 'dataset'),
('generator', 'generator'),
('sequence', 'sequence'),
)
def test_sequence_input_types(self, input_type):
"""Ensure that namedtuples and tuples are plumbed identically."""
if not testing_utils.should_run_tf_function():
self.skipTest('Improved checking is only present in data_adapter.')
xy_function, x_function = self._make_sequence_input_functions(input_type)
fit_kwargs, evaluate_kwargs, predict_kwargs = {}, {}, {}
if input_type == 'generator':
fit_kwargs['steps_per_epoch'] = 4
evaluate_kwargs['steps'] = 4
predict_kwargs['steps'] = 4
model = testing_utils.get_small_mlp(1, 1, 1)
model.compile(
loss='mse',
optimizer='sgd',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(xy_function(use_namedtuple=False), **fit_kwargs)
model.evaluate(xy_function(use_namedtuple=False), **evaluate_kwargs)
model.predict(x_function(use_namedtuple=False), **predict_kwargs)
xy_pattern = re.escape(
"Received namedtuple (<class '__main__.xy_namedtuple'>) with fields "
"`('x', 'y')` as input.")
x_pattern = re.escape(
"Received namedtuple (<class '__main__.x_namedtuple'>) with fields "
"`('x',)` as input.")
with self.assertRaisesRegex(ValueError, xy_pattern):
model.fit(xy_function(use_namedtuple=True), **fit_kwargs)
with self.assertRaisesRegex(ValueError, xy_pattern):
model.evaluate(xy_function(use_namedtuple=True), **evaluate_kwargs)
with self.assertRaisesRegex(ValueError, x_pattern):
model.predict(x_function(use_namedtuple=True), **predict_kwargs)
@keras_parameterized.run_all_keras_modes
@keras_parameterized.run_with_all_model_types
def test_activity_regularizer_fit(self):
loss = {}
for reg in [None, 'l2']:
layers = [
keras.layers.Dense(
10, activation='relu', activity_regularizer=reg,
kernel_initializer='ones', use_bias=False),
keras.layers.Dense(
1, activation='sigmoid', kernel_initializer='ones',
use_bias=False),
]
model = testing_utils.get_model_from_layers(
layers, input_shape=(10,))
x = np.ones((10, 10), 'float32')
y = np.ones((10, 1), 'float32')
optimizer = RMSPropOptimizer(learning_rate=0.001)
model.compile(
optimizer,
'binary_crossentropy',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, y, batch_size=2, epochs=5)
loss[reg] = model.evaluate(x, y)
self.assertLess(loss[None], loss['l2'])
@keras_parameterized.run_all_keras_modes
@keras_parameterized.run_with_all_model_types
def test_activity_regularizer_loss_value(self):
layer = keras.layers.Dense(
1, kernel_initializer=keras.initializers.zeros(),
bias_initializer=keras.initializers.ones(), activity_regularizer='l2')
model = testing_utils.get_model_from_layers([layer], input_shape=(10,))
x = np.ones((10, 10), 'float32')
y = np.ones((10, 1), 'float32')
optimizer = RMSPropOptimizer(learning_rate=0.001)
model.compile(
optimizer,
'binary_crossentropy',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
loss = model.test_on_batch(x, y)
self.assertAlmostEqual(0.01, loss, places=4)
@keras_parameterized.run_all_keras_modes
def test_activity_regularizer_batch_independent(self):
inputs = keras.layers.Input(shape=(10,))
x = keras.layers.Dense(
10, activation='relu', activity_regularizer='l2')(
inputs)
outputs = keras.layers.Dense(1, activation='sigmoid')(x)
model = keras.Model(inputs, outputs)
optimizer = RMSPropOptimizer(learning_rate=0.001)
model.compile(
optimizer,
'binary_crossentropy',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.ones((10, 10), 'float32')
y = np.ones((10, 1), 'float32')
loss_small_batch = model.test_on_batch(x, y)
x2 = np.ones((20, 10), 'float32')
y2 = np.ones((20, 1), 'float32')
loss_big_batch = model.test_on_batch(x2, y2)
self.assertAlmostEqual(loss_small_batch, loss_big_batch, places=4)
@keras_parameterized.run_all_keras_modes
def test_activity_regularizer_in_model_call(self):
class MyModel(keras.Model):
def call(self, inputs):
self.add_loss(inputs)
return inputs
x = ops.convert_to_tensor(1.)
model = MyModel()
_ = model(x)
self.assertEqual(1, len(model.losses))
@keras_parameterized.run_all_keras_modes
def test_custom_mapping_in_config(self):
class MyModel(keras.Model):
def call(self, inputs):
return inputs
def get_config(self):
self.a = {}
return {'a': self.a}
model = MyModel()
self.assertIn('{"a": {}}', model.to_json())
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_training_on_sparse_data_with_dense_placeholders(self):
if scipy_sparse is None:
return
test_inputs = [
scipy_sparse.random(6, 3, density=0.25).tocsr() for _ in range(2)
]
test_outputs = [
scipy_sparse.random(6, i, density=0.25).tocsr() for i in range(3, 5)
]
in1 = keras.layers.Input(shape=(3,))
in2 = keras.layers.Input(shape=(3,))
out1 = keras.layers.Dropout(0.5, name='dropout')(in1)
out2 = keras.layers.Dense(4, name='dense_1')(in2)
model = keras.Model([in1, in2], [out1, out2])
model.experimental_run_tf_function = testing_utils.should_run_tf_function()
with self.assertRaisesRegexp(ValueError, 'Please densify'):
model.predict(test_inputs, batch_size=2)
optimizer = 'rmsprop'
model.compile(
optimizer,
'mse',
metrics=['mae', metrics_module.CategoricalAccuracy()],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
with self.assertRaisesRegexp(ValueError, 'Please densify'):
model.fit(test_inputs, test_outputs,
epochs=1, batch_size=2)
with self.assertRaisesRegexp(ValueError, 'Please densify'):
model.evaluate(test_inputs, test_outputs, batch_size=2)
@tf_test_util.run_deprecated_v1
def test_training_on_sparse_data_with_dense_placeholders_v1(self):
if scipy_sparse is None:
return
test_inputs = [
scipy_sparse.random(6, 3, density=0.25).tocsr() for _ in range(2)
]
test_outputs = [
scipy_sparse.random(6, i, density=0.25).tocsr() for i in range(3, 5)
]
in1 = keras.layers.Input(shape=(3,))
in2 = keras.layers.Input(shape=(3,))
out1 = keras.layers.Dropout(0.5, name='dropout')(in1)
out2 = keras.layers.Dense(4, name='dense_1')(in2)
model = keras.Model([in1, in2], [out1, out2])
model.predict(test_inputs, batch_size=2)
optimizer = 'rmsprop'
model.compile(
optimizer,
'mse',
metrics=['mae', metrics_module.CategoricalAccuracy()])
model.fit(test_inputs, test_outputs,
epochs=1, batch_size=2, validation_split=0.5)
model.evaluate(test_inputs, test_outputs, batch_size=2)
@keras_parameterized.run_all_keras_modes
def test_compile_with_sparse_placeholders(self):
input_layer = keras.layers.Input(shape=(10,), sparse=True)
weights = variables_lib.Variable(
np.ones((10, 1)).astype(np.float32), name='weights')
weights_mult = lambda x: sparse_ops.sparse_tensor_dense_matmul(x, weights)
output_layer = keras.layers.Lambda(weights_mult)(input_layer)
model = keras.Model([input_layer], output_layer)
model.compile(
loss='binary_crossentropy',
optimizer='adam',
metrics=['accuracy'],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
@keras_parameterized.run_all_keras_modes
def test_that_trainable_disables_updates(self):
val_a = np.random.random((10, 4))
val_out = np.random.random((10, 4))
a = keras.layers.Input(shape=(4,))
layer = keras.layers.BatchNormalization(input_shape=(4,))
b = layer(a)
model = keras.Model(a, b)
model.trainable = False
assert not model.updates
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
assert not model.updates
x1 = model.predict(val_a)
model.train_on_batch(val_a, val_out)
x2 = model.predict(val_a)
self.assertAllClose(x1, x2, atol=1e-7)
model.trainable = True
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
assert model.updates
model.train_on_batch(val_a, val_out)
x2 = model.predict(val_a)
assert np.abs(np.sum(x1 - x2)) > 1e-5
layer.trainable = False
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
assert not model.updates
x1 = model.predict(val_a)
model.train_on_batch(val_a, val_out)
x2 = model.predict(val_a)
self.assertAllClose(x1, x2, atol=1e-7)
def test_weight_deduplication_in_methods(self):
inp = keras.layers.Input(shape=(1,))
bn = keras.layers.BatchNormalization()
d = keras.layers.Dense(1)
m0 = keras.models.Model(inp, d(bn(inp)))
m1 = keras.models.Model(inp, d(bn(inp)))
x0 = m0(inp)
x1 = m1(inp)
x = keras.layers.Add()([x0, x1])
model = keras.models.Model(inp, x)
self.assertLen(model.trainable_weights, 4)
self.assertLen(model.non_trainable_weights, 2)
self.assertLen(model.weights, 6)
@keras_parameterized.run_all_keras_modes
def test_weight_deduplication(self):
class WatchingLayer(keras.layers.Layer):
def __init__(self, dense_to_track):
# This will cause the kernel and bias to be double counted, effectively
# doubling the learning rate if weights are not deduped.
self._kernel = dense_to_track.kernel
self._bias = dense_to_track.bias
super(WatchingLayer, self).__init__()
inp = keras.layers.Input(shape=(1,))
dense_layer = keras.layers.Dense(1)
dense_output = dense_layer(inp) # This will build the dense kernel
# Deterministically set weights to make the test repeatable.
dense_layer.set_weights([np.ones((1, 1)), np.zeros((1,))])
output = WatchingLayer(dense_layer)(dense_output)
model = keras.models.Model(inp, output)
# 0.25 is the edge of the radius of convergence for the double apply case.
# At lr=0.24, the double apply case will very slowly descend while the
# correct case will drop very quickly.
model.compile(loss='mse', optimizer=gradient_descent.SGD(0.24),
run_eagerly=testing_utils.should_run_eagerly())
x = np.ones((64 * 2,))
y = 4.5 * x - 3.
history = model.fit(x, y, batch_size=64, epochs=2, verbose=2)
# If the gradient apply is duplicated then the loss after 2 epochs will
# be ~0.15, compared to the correct answer of O(1e-7).
self.assertLess(history.history['loss'][-1], 1e-6)
def test_logs_passed_to_callbacks(self):
with self.cached_session():
input_dim = 5
num_classes = 1
class TestCallback(Callback):
def __init__(self):
super(TestCallback, self).__init__()
self.epoch_end_logs = None
self.batch_end_logs = None
self.epoch_end_call_count = 0
self.batch_end_call_count = 0
def on_epoch_end(self, epoch, logs=None):
self.epoch_end_logs = logs
self.epoch_end_call_count += 1
def on_batch_end(self, batch, logs=None):
self.batch_end_logs = logs
self.batch_end_call_count += 1
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=num_classes, input_dim=input_dim)
model.compile(
loss='binary_crossentropy',
metrics=['acc'],
weighted_metrics=['mae'],
optimizer=RMSPropOptimizer(learning_rate=0.01))
np.random.seed(1337)
(x_train, y_train), (_, _) = testing_utils.get_test_data(
train_samples=10,
test_samples=10,
input_shape=(input_dim,),
num_classes=num_classes)
test_callback = TestCallback()
model.fit(
x_train,
y_train,
batch_size=2,
epochs=2,
verbose=0,
callbacks=[test_callback],
validation_data=(x_train, y_train))
self.assertEqual(test_callback.batch_end_call_count, 10)
self.assertEqual(test_callback.epoch_end_call_count, 2)
weighted_metric = ('mae'
if tf2.enabled() else 'weighted_mean_absolute_error')
self.assertSetEqual(
set(test_callback.batch_end_logs.keys()),
set(['batch', 'size', 'acc', 'loss', weighted_metric]))
self.assertSetEqual(
set(test_callback.epoch_end_logs.keys()),
set([
'acc', 'loss', weighted_metric, 'val_acc', 'val_loss',
'val_' + weighted_metric
]))
@keras_parameterized.run_all_keras_modes
def test_mismatched_output_shape_and_target_shape(self):
model = keras.Sequential([
keras.layers.Dense(2, input_shape=(3, 4)),
keras.layers.Dense(5),
])
model.compile(
RMSPropOptimizer(learning_rate=0.001),
loss='sparse_categorical_crossentropy',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
# Test with Numpy data
x_train = np.random.random((10, 3, 4))
y_train = np.random.randint(0, 5, size=(10, 3))
model.fit(x_train, y_train, batch_size=5, epochs=1)
# Test with iterator
dataset = dataset_ops.Dataset.from_tensor_slices((x_train, y_train))
dataset = dataset.repeat(10)
dataset = dataset.batch(10)
model.fit(dataset, epochs=1, steps_per_epoch=2)
if context.executing_eagerly():
# Test with eager execution
model.compile(RMSPropOptimizer(learning_rate=0.001),
loss='sparse_categorical_crossentropy',
run_eagerly=True)
model.fit(x_train, y_train, batch_size=5, epochs=1)
# Test with eager execution and iterator
model.fit(dataset, epochs=1, steps_per_epoch=2)
def test_losses_in_defun(self):
with context.eager_mode():
layer = keras.layers.Dense(1, kernel_regularizer='l1')
layer(array_ops.ones([1, 10]))
@function.defun
def get_losses():
return layer.losses
self.assertAllEqual(
self.evaluate(layer.losses), self.evaluate(get_losses()))
@keras_parameterized.run_all_keras_modes
def test_logging(self):
mock_stdout = io.BytesIO() if six.PY2 else io.StringIO()
model = keras.models.Sequential()
model.add(keras.layers.Dense(10, activation='relu'))
model.add(keras.layers.Dense(1, activation='sigmoid'))
model.compile(
RMSPropOptimizer(learning_rate=0.001),
loss='binary_crossentropy',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
with test.mock.patch.object(sys, 'stdout', mock_stdout):
model.fit(
np.ones((10, 10), 'float32'), np.ones((10, 1), 'float32'), epochs=10)
self.assertTrue('Epoch 5/10' in mock_stdout.getvalue())
@tf_test_util.run_in_graph_and_eager_modes
def test_training_with_loss_instance(self):
a = keras.layers.Input(shape=(3,), name='input_a')
b = keras.layers.Input(shape=(3,), name='input_b')
dense = keras.layers.Dense(4, name='dense')
c = dense(a)
d = dense(b)
e = keras.layers.Dropout(0.5, name='dropout')(c)
model = keras.models.Model([a, b], [d, e])
loss_weights = [1., 0.5]
model.compile(
RMSPropOptimizer(learning_rate=0.001),
loss=keras.losses.MeanSquaredError(),
metrics=[metrics_module.CategoricalAccuracy(), 'mae'],
loss_weights=loss_weights)
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 3))
output_d_np = np.random.random((10, 4))
output_e_np = np.random.random((10, 4))
model.fit([input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=1,
batch_size=5)
@tf_test_util.run_in_graph_and_eager_modes
def test_static_batch_in_input_layer(self):
class Counter(keras.callbacks.Callback):
def __init__(self):
self.batches = 0
def on_batch_end(self, batch, logs=None):
self.batches += 1
x, y = np.ones((64, 10), 'float32'), np.ones((64, 1), 'float32')
for batch_size, expected_batches in [(None, 2), (4, 16)]:
inputs = keras.Input(batch_size=batch_size, shape=(10,))
outputs = keras.layers.Dense(1, activation='sigmoid')(inputs)
model = keras.Model(inputs, outputs)
model.compile(keras.optimizer_v2.adam.Adam(0.001), 'binary_crossentropy')
counter = Counter()
model.fit(x, y, callbacks=[counter])
self.assertEqual(counter.batches, expected_batches)
model = keras.Sequential(
[keras.layers.Dense(1, batch_input_shape=(batch_size, 10))])
model.compile(keras.optimizer_v2.adam.Adam(0.001), 'binary_crossentropy')
counter = Counter()
model.fit(x, y, callbacks=[counter])
self.assertEqual(counter.batches, expected_batches)
@tf_test_util.run_in_graph_and_eager_modes
def test_static_batch_in_input_layer_consistency_checks(self):
x, y = np.ones((64, 10), 'float32'), np.ones((64, 1), 'float32')
inputs = keras.Input(batch_size=2, shape=(10,))
outputs = keras.layers.Dense(1, activation='sigmoid')(inputs)
model = keras.Model(inputs, outputs)
model.compile(keras.optimizer_v2.adam.Adam(0.001), 'binary_crossentropy')
with self.assertRaisesRegexp(ValueError,
'incompatible with the specified batch size'):
model.fit(x, y, batch_size=4)
@tf_test_util.run_in_graph_and_eager_modes
def test_compatible_batch_size_functional_model(self):
class MyLayer(keras.layers.Layer):
def call(self, inputs):
return array_ops.concat(inputs, axis=0)
input1 = keras.Input(batch_size=2, shape=(10,))
input2 = keras.Input(batch_size=3, shape=(10,))
outputs = MyLayer()([input1, input2])
with self.assertRaisesRegexp(ValueError,
'specified batch sizes of the Input Layers'):
keras.Model([input1, input2], outputs)
@tf_test_util.run_in_graph_and_eager_modes
def test_calling_subclass_model_on_different_datasets(self):
class SubclassedModel(keras.models.Model):
def call(self, inputs):
return inputs * 2
model = SubclassedModel()
dataset_one = dataset_ops.Dataset.range(2).batch(2)
dataset_two = dataset_ops.Dataset.range(3, 10).batch(2)
self.assertAllEqual([[0], [2]], model.predict(dataset_one, steps=1))
self.assertAllEqual([[6], [8], [10], [12]],
model.predict(dataset_two, steps=2))
def test_training_on_sparse_categorical_crossentropy_loss_with_softmax(self):
with context.eager_mode():
np.random.seed(1337)
train_x = np.ones((100, 4))
train_y = np.random.randint(0, 1, size=(100, 1))
reference_model = testing_utils.get_small_sequential_mlp(16, 2,
input_dim=4)
reference_model.compile(loss='sparse_categorical_crossentropy',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=True)
fixed_weights = reference_model.get_weights()
reference_model_loss = reference_model.train_on_batch(train_x, train_y)
test_model = testing_utils.get_small_sequential_mlp(16, 2, input_dim=4)
test_model.compile(loss='sparse_categorical_crossentropy',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=False)
test_model.set_weights(fixed_weights)
test_model_loss = test_model.train_on_batch(train_x, train_y)
self.assertAlmostEqual(test_model_loss, reference_model_loss, places=4)
def test_training_on_categorical_crossentropy_loss_with_softmax(self):
with context.eager_mode():
np.random.seed(1337)
train_x = np.ones((100, 4))
train_y = np_utils.to_categorical(
np.random.randint(0, 1, size=(100, 1)), 2)
reference_model = testing_utils.get_small_sequential_mlp(16, 2,
input_dim=4)
reference_model.compile(loss='categorical_crossentropy',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=True)
fixed_weights = reference_model.get_weights()
reference_model_loss = reference_model.train_on_batch(train_x, train_y)
test_model = testing_utils.get_small_sequential_mlp(16, 2, input_dim=4)
test_model.compile(loss='categorical_crossentropy',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=False)
test_model.set_weights(fixed_weights)
test_model_loss = test_model.train_on_batch(train_x, train_y)
self.assertAlmostEqual(test_model_loss, reference_model_loss, places=4)
def test_training_on_binary_crossentropy_loss(self):
with context.eager_mode():
train_x = np.ones((100, 4), dtype=np.float32)
train_y = np.ones((100, 1), dtype=np.float32)
reference_model = testing_utils.get_small_sequential_mlp(16, 1,
input_dim=4)
reference_model.compile(loss='binary_crossentropy',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=True)
fixed_weights = reference_model.get_weights()
reference_model_loss = reference_model.train_on_batch(train_x, train_y)
test_model = testing_utils.get_small_sequential_mlp(16, 1, input_dim=4)
test_model.compile(loss='binary_crossentropy',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=False)
test_model.set_weights(fixed_weights)
test_model_loss = test_model.train_on_batch(train_x, train_y)
self.assertAlmostEqual(test_model_loss, reference_model_loss, places=4)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
@parameterized.named_parameters(
('default', 1, 4), ('integer_two', 2, 2), ('integer_four', 4, 1),
('simple_list', [1, 3, 4], 3), ('duplicated_list', [4, 2, 2], 2))
def test_validation_freq(self, validation_freq, expected_runs):
x, y = np.ones((10, 10)), np.ones((10, 1))
model = testing_utils.get_small_mlp(2, 1, 10)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
class ValCounter(keras.callbacks.Callback):
def __init__(self):
self.val_runs = 0
def on_test_begin(self, logs=None):
self.val_runs += 1
val_counter = ValCounter()
model.fit(
x,
y,
epochs=4,
validation_data=(x, y),
validation_freq=validation_freq,
callbacks=[val_counter])
self.assertEqual(val_counter.val_runs, expected_runs)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_validation_steps_without_data(self):
x, y = np.ones((10, 10)), np.ones((10, 1))
model = testing_utils.get_small_mlp(2, 1, 10)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
with self.assertRaisesRegexp(
ValueError, '`validation_steps` should not be specified if '
'`validation_data` is None.'):
model.fit(x, y, epochs=4, validation_data=None, validation_steps=3)
@keras_parameterized.run_all_keras_modes
def test_add_loss_correctness(self):
inputs = keras.Input(shape=(1,))
targets = keras.Input(shape=(1,))
outputs = testing_utils.Bias()(inputs)
model = keras.Model([inputs, targets], outputs)
model.add_loss(2 * math_ops.reduce_mean(
keras.losses.mean_absolute_error(targets, outputs)))
model.add_loss(keras.losses.MeanAbsoluteError()(targets, outputs))
model.compile(
keras.optimizer_v2.gradient_descent.SGD(0.025),
loss=keras.losses.MeanAbsoluteError(),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.array([[0.], [1.], [2.]])
y = np.array([[0.5], [2.], [3.5]])
history = model.fit([x, y], y, batch_size=3, epochs=5)
self.assertAllClose(history.history['loss'], [4., 3.6, 3.2, 2.8, 2.4], 1e-3)
@keras_parameterized.run_all_keras_modes
def test_add_loss_with_sample_weight_correctness(self):
inputs = keras.Input(shape=(1,))
targets = keras.Input(shape=(1,))
sw = keras.Input(shape=(1,))
outputs = testing_utils.Bias()(inputs)
model = keras.Model([inputs, targets, sw], outputs)
model.add_loss(2 * math_ops.reduce_mean(
sw * keras.losses.mean_absolute_error(targets, outputs)))
model.add_loss(keras.losses.MeanAbsoluteError()(targets, outputs, sw))
model.compile(
keras.optimizer_v2.gradient_descent.SGD(0.025),
loss=keras.losses.MeanAbsoluteError(),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.array([[0.], [1.], [2.]])
y = np.array([[0.5], [2.], [3.5]])
w = np.array([1.25, 0.5, 1.25])
history = model.fit([x, y, w], y, batch_size=3, epochs=5, sample_weight=w)
self.assertAllClose(history.history['loss'], [4., 3.6, 3.2, 2.8, 2.4], 1e-3)
@keras_parameterized.run_all_keras_modes
def test_unconditional_add_loss_correctness(self):
class MyLayer(keras.layers.Layer):
def call(self, inputs, training=None):
# Reachable from the inputs but marked as unconditional.
self.add_loss(math_ops.reduce_sum(inputs))
return inputs
inputs = keras.Input((3,))
layer = MyLayer()
outputs = layer(inputs)
model = keras.Model(inputs, outputs)
self.assertEqual(len(model.losses), 1)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
loss = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3)))
self.assertEqual(loss, 2 * 3)
@keras_parameterized.run_all_keras_modes
def test_clear_losses(self):
class LayerWithSharedNestedLossLayer(keras.layers.Layer):
def __init__(self):
super(LayerWithSharedNestedLossLayer, self).__init__()
self.loss_layer = keras.layers.ActivityRegularization(l2=0.001)
self.add_weight(shape=(1,), regularizer='l2')
def call(self, x):
x = self.loss_layer(x)
return self.loss_layer(x)
inputs = keras.Input(shape=(1,))
outputs = LayerWithSharedNestedLossLayer()(inputs)
model = keras.Model(inputs, outputs)
# Weight loss + 2 activity losses.
self.assertEqual(len(model.losses), 3)
x = array_ops.ones((1, 1))
model(x)
y = array_ops.ones((1, 1))
model(y)
if context.executing_eagerly():
# Eager losses are cleared every `__call__`.
self.assertEqual(len(model.losses), 3)
else:
self.assertEqual(len(model.get_losses_for(x)), 2)
self.assertEqual(len(model.get_losses_for(y)), 2)
self.assertEqual(len(model.get_losses_for(None)), 1)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_layer_with_variable_output(self):
class VariableOutputLayer(keras.layers.Layer):
def build(self, input_shape):
self.v = self.add_weight('output_var', shape=(2, 5), initializer='ones')
def call(self, inputs):
return self.v
model = testing_utils.get_model_from_layers(
[VariableOutputLayer(), keras.layers.Dense(1)], input_shape=(10,))
# TODO(omalleyt): Make this work with `run_eagerly=True`.
model.compile('sgd', 'mse', run_eagerly=False)
model.fit(np.ones((10, 10)), np.ones((10, 1)), batch_size=2, epochs=5)
self.assertLen(model.trainable_variables, 3)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
@testing_utils.enable_v2_dtype_behavior
def test_model_dtype(self):
class AssertTypeLayer(keras.layers.Layer):
def call(self, inputs):
assert inputs.dtype.name == self.dtype, (
'Input tensor has type %s which does not match assert type %s' %
(inputs.dtype.name, self.assert_type))
return inputs + 1.
for dtype in ('float16', 'float32', 'float64'):
model = testing_utils.get_model_from_layers(
[AssertTypeLayer(dtype=dtype)], input_shape=(10,))
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.ones((10, 10))
y = np.ones((10, 10))
model.fit(x, y)
model.test_on_batch(x, y)
model(x)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
@testing_utils.enable_v2_dtype_behavior
def test_model_input_dtype(self):
model = testing_utils.get_small_mlp(1, 10, 10)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.ones((10, 10)).astype(np.float64)
y = np.ones((10, 10)).astype(np.float64)
dataset = dataset_ops.Dataset.from_tensor_slices((x, y)).batch(2)
model.fit(dataset)
self.assertEqual(model._compute_dtype, 'float32')
# Input dtype should match the model dtype, even if the inputs passed to the
# model have a different dtype.
self.assertEqual(model.inputs[0].dtype, 'float32')
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_subclassed_model_with_training_arg(self):
class LayerWithTrainingArg(keras.layers.Layer):
def call(self, inputs, training=None):
self.training = training
return inputs
class ModelWithTrainingArg(keras.Model):
def __init__(self):
super(ModelWithTrainingArg, self).__init__()
self.l1 = LayerWithTrainingArg()
def call(self, inputs, training=None):
self.training = training
inputs = self.l1(inputs, training=training)
return inputs
x = np.zeros((1, 2))
model = ModelWithTrainingArg()
model.compile(
loss='mse',
optimizer='sgd',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, x, epochs=1)
if (testing_utils.should_run_eagerly() or
testing_utils.should_run_tf_function()):
expected_training_arg = True
else:
expected_training_arg = keras.backend.symbolic_learning_phase()
self.assertIs(model.training, expected_training_arg)
self.assertIs(model.l1.training, expected_training_arg)
@keras_parameterized.run_all_keras_modes
def test_error_when_model_is_not_compiled(self):
inputs = keras.Input(shape=(1,))
outputs = keras.layers.Dense(1)(inputs)
model = keras.Model(inputs, outputs)
with self.assertRaisesRegex(RuntimeError, 'must compile your model'):
model.fit(np.ones((1, 1)), np.ones((1, 1)))
class MyModel(keras.Model):
def call(self, x):
self.add_loss(math_ops.reduce_sum(x))
return x
model = MyModel()
with self.assertRaisesRegex(RuntimeError, 'must compile your model'):
model.fit(np.random.random((32, 1)), epochs=2)
class TestExceptionsAndWarnings(keras_parameterized.TestCase):
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_invalid_batch_dimension(self):
def custom_reshape(inputs):
return keras.backend.reshape(inputs, (-1, 8, 8, 3))
layer_1 = keras.layers.Lambda(custom_reshape)
layer_2 = keras.layers.Conv2D(32, (3, 3))
model = testing_utils.get_model_from_layers([layer_1, layer_2],
input_shape=(8, 8, 6))
model.compile('sgd', loss='mse')
with self.assertRaisesRegex(
ValueError,
'Mismatch between expected batch size and model output batch size. '
r'Output shape = \(20, 6, 6, 32\), expected output shape = '
r'shape \(10, 6, 6, 32\)'):
model.predict(np.ones((10, 8, 8, 6)), batch_size=10)
@keras_parameterized.run_all_keras_modes
def test_invalid_loss(self):
num_classes = 5
train_samples = 1000
test_samples = 1000
input_dim = 5
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=num_classes, input_dim=input_dim)
optimizer = RMSPropOptimizer(learning_rate=0.001)
model.compile(optimizer, loss='categorical_crossentropy')
np.random.seed(1337)
(x_train, y_train), (_, _) = testing_utils.get_test_data(
train_samples=train_samples,
test_samples=test_samples,
input_shape=(input_dim,),
num_classes=num_classes)
with self.assertRaises(ValueError):
model.fit(x_train, np.concatenate([y_train, y_train], axis=-1))
if not context.executing_eagerly():
# TODO(psv): Investigate these use cases in eager mode.
with self.assertRaises(ValueError):
model.fit(x_train, y_train)
with self.assertRaises(ValueError):
model.compile(
optimizer,
loss=None,
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
@keras_parameterized.run_all_keras_modes
def test_compile_warning_for_loss_missing_output(self):
with self.cached_session():
inp = keras.layers.Input(shape=(16,), name='input_a')
out_1 = keras.layers.Dense(8, name='dense_1')(inp)
out_2 = keras.layers.Dense(3, activation='softmax', name='dense_2')(out_1)
model = keras.models.Model(inputs=[inp], outputs=[out_1, out_2])
optimizer = RMSPropOptimizer(learning_rate=0.001)
with test.mock.patch.object(logging, 'warning') as mock_log:
model.compile(
optimizer,
loss={
'dense_2': 'categorical_crossentropy',
},
metrics={
'dense_2': 'categorical_accuracy',
'dense_1': metrics_module.CategoricalAccuracy(),
},
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
msg = ('Output dense_1 missing from loss dictionary. We assume this '
'was done on purpose. The fit and evaluate APIs will not be '
'expecting any data to be passed to dense_1.')
self.assertRegexpMatches(str(mock_log.call_args), msg)
@keras_parameterized.run_all_keras_modes
def test_invalid_steps_per_epoch_usage(self):
x = keras.layers.Input(shape=(1,))
y = keras.layers.Dense(1)(x)
model = keras.Model(x, y)
model.compile(
'sgd',
loss='mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=False)
err_msg = 'When passing input data as arrays, do not specify'
with test.mock.patch.object(logging, 'warning') as mock_log:
model._standardize_user_data(
np.zeros((100, 1)), np.ones((100, 1)), check_steps=True, steps=4)
self.assertRegexpMatches(str(mock_log.call_args), err_msg)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_invalid_batch_size_argument_with_sequence_input(self):
class DummySequence(data_utils.Sequence):
def __getitem__(self, idx):
return np.zeros([10, 2]), np.ones([10, 4])
def __len__(self):
return 10
model = testing_utils.get_small_mlp(
num_hidden=10, num_classes=1, input_dim=10)
model.compile(
'adam',
'binary_crossentropy',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
with self.assertRaisesRegexp(
ValueError, 'The `batch_size` argument must not be specified'):
model.fit(DummySequence(), batch_size=2, epochs=2)
with self.assertRaisesRegexp(
ValueError, 'The `batch_size` argument must not be specified'):
model.evaluate(DummySequence(), batch_size=2)
with self.assertRaisesRegexp(
ValueError, 'The `batch_size` argument must not be specified'):
model.predict(DummySequence(), batch_size=2)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_non_returning_sequence(self):
if not testing_utils.should_run_tf_function():
self.skipTest('This case is only handled in the new execution path.')
class DummySequence(data_utils.Sequence):
def __getitem__(self, idx):
return
def __len__(self):
return 10
model = testing_utils.get_small_mlp(
num_hidden=10, num_classes=1, input_dim=10)
model.compile(
'adam',
'binary_crossentropy',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
with self.assertRaisesRegexp(IndexError, 'Could not infer batch size'):
model.fit(DummySequence(), epochs=2)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_sparse_op_with_op_layer(self):
inputs = keras.layers.Input(shape=(2,), sparse=True, name='sparse_tensor')
output = sparse_ops.sparse_minimum(inputs, inputs)
with self.assertRaisesRegexp(
ValueError,
'Sparse ops are not supported with functional models with built-in '
'layer wrapping'
):
keras.Model([inputs], output)
class LossWeightingTest(keras_parameterized.TestCase):
@keras_parameterized.run_all_keras_modes
def test_class_weights(self):
num_classes = 5
batch_size = 5
epochs = 10
weighted_class = 3
weight = 10.
train_samples = 1000
test_samples = 1000
input_dim = 5
learning_rate = 0.001
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=num_classes, input_dim=input_dim)
model.compile(
loss='categorical_crossentropy',
metrics=['acc', metrics_module.CategoricalAccuracy()],
weighted_metrics=['mae', metrics_module.CategoricalAccuracy()],
optimizer=RMSPropOptimizer(learning_rate=learning_rate),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
np.random.seed(1337)
(x_train, y_train), (x_test, y_test) = testing_utils.get_test_data(
train_samples=train_samples,
test_samples=test_samples,
input_shape=(input_dim,),
num_classes=num_classes)
int_y_test = y_test.copy()
int_y_train = y_train.copy()
# convert class vectors to binary class matrices
y_train = np_utils.to_categorical(y_train, num_classes)
y_test = np_utils.to_categorical(y_test, num_classes)
test_ids = np.where(int_y_test == np.array(weighted_class))[0]
class_weight = dict([(i, 1.) for i in range(num_classes)])
class_weight[weighted_class] = weight
model.fit(
x_train,
y_train,
batch_size=batch_size,
epochs=epochs // 3,
verbose=0,
class_weight=class_weight,
validation_data=(x_train, y_train))
model.fit(
x_train,
y_train,
batch_size=batch_size,
epochs=epochs // 2,
verbose=0,
class_weight=class_weight)
model.fit(
x_train,
y_train,
batch_size=batch_size,
epochs=epochs // 2,
verbose=0,
class_weight=class_weight,
validation_split=0.1)
model.train_on_batch(
x_train[:batch_size], y_train[:batch_size], class_weight=class_weight)
ref_score = model.evaluate(x_test, y_test, verbose=0)
score = model.evaluate(
x_test[test_ids, :], y_test[test_ids, :], verbose=0)
self.assertLess(score[0], ref_score[0])
@keras_parameterized.run_all_keras_modes
def test_sample_weights(self):
num_classes = 5
batch_size = 5
epochs = 10
weighted_class = 3
weight = 10.
train_samples = 1000
test_samples = 1000
input_dim = 5
learning_rate = 0.001
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=num_classes, input_dim=input_dim)
model.compile(
RMSPropOptimizer(learning_rate=learning_rate),
metrics=['acc', metrics_module.CategoricalAccuracy()],
weighted_metrics=['mae', metrics_module.CategoricalAccuracy()],
loss='categorical_crossentropy',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
np.random.seed(43)
(x_train, y_train), (x_test, y_test) = testing_utils.get_test_data(
train_samples=train_samples,
test_samples=test_samples,
input_shape=(input_dim,),
num_classes=num_classes)
int_y_test = y_test.copy()
int_y_train = y_train.copy()
# convert class vectors to binary class matrices
y_train = np_utils.to_categorical(y_train, num_classes)
y_test = np_utils.to_categorical(y_test, num_classes)
test_ids = np.where(int_y_test == np.array(weighted_class))[0]
sample_weight = np.ones((y_train.shape[0]))
sample_weight[int_y_train == weighted_class] = weight
model.fit(
x_train,
y_train,
batch_size=batch_size,
epochs=epochs // 3,
verbose=0,
sample_weight=sample_weight)
model.fit(
x_train,
y_train,
batch_size=batch_size,
epochs=epochs // 3,
verbose=0,
sample_weight=sample_weight,
validation_split=0.1)
model.train_on_batch(
x_train[:batch_size],
y_train[:batch_size],
sample_weight=sample_weight[:batch_size])
model.test_on_batch(
x_train[:batch_size],
y_train[:batch_size],
sample_weight=sample_weight[:batch_size])
ref_score = model.evaluate(
x_test, y_test, verbose=0, sample_weight=sample_weight)
score = model.evaluate(
x_test[test_ids, :],
y_test[test_ids, :],
verbose=0,
sample_weight=sample_weight[test_ids])
self.assertLess(score[0], ref_score[0])
@keras_parameterized.run_all_keras_modes
def test_temporal_sample_weights(self):
num_classes = 5
batch_size = 5
epochs = 10
weighted_class = 3
weight = 10.
train_samples = 1000
test_samples = 1000
input_dim = 5
timesteps = 3
learning_rate = 0.001
with self.cached_session():
model = keras.models.Sequential()
model.add(
keras.layers.TimeDistributed(
keras.layers.Dense(num_classes),
input_shape=(timesteps, input_dim)))
model.add(keras.layers.Activation('softmax'))
np.random.seed(1337)
(x_train, y_train), (x_test, y_test) = testing_utils.get_test_data(
train_samples=train_samples,
test_samples=test_samples,
input_shape=(input_dim,),
num_classes=num_classes)
int_y_test = y_test.copy()
int_y_train = y_train.copy()
# convert class vectors to binary class matrices
y_train = np_utils.to_categorical(y_train, num_classes)
y_test = np_utils.to_categorical(y_test, num_classes)
test_ids = np.where(int_y_test == np.array(weighted_class))[0]
sample_weight = np.ones((y_train.shape[0]))
sample_weight[int_y_train == weighted_class] = weight
temporal_x_train = np.reshape(x_train, (len(x_train), 1,
x_train.shape[1]))
temporal_x_train = np.repeat(temporal_x_train, timesteps, axis=1)
temporal_x_test = np.reshape(x_test, (len(x_test), 1, x_test.shape[1]))
temporal_x_test = np.repeat(temporal_x_test, timesteps, axis=1)
temporal_y_train = np.reshape(y_train, (len(y_train), 1,
y_train.shape[1]))
temporal_y_train = np.repeat(temporal_y_train, timesteps, axis=1)
temporal_y_test = np.reshape(y_test, (len(y_test), 1, y_test.shape[1]))
temporal_y_test = np.repeat(temporal_y_test, timesteps, axis=1)
temporal_sample_weight = np.reshape(sample_weight, (len(sample_weight),
1))
temporal_sample_weight = np.repeat(
temporal_sample_weight, timesteps, axis=1)
model.compile(
RMSPropOptimizer(learning_rate=learning_rate),
loss='categorical_crossentropy',
metrics=['acc', metrics_module.CategoricalAccuracy()],
weighted_metrics=['mae', metrics_module.CategoricalAccuracy()],
sample_weight_mode='temporal',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(
temporal_x_train,
temporal_y_train,
batch_size=batch_size,
epochs=epochs // 3,
verbose=0,
sample_weight=temporal_sample_weight)
model.fit(
temporal_x_train,
temporal_y_train,
batch_size=batch_size,
epochs=epochs // 3,
verbose=0,
sample_weight=temporal_sample_weight,
validation_split=0.1)
model.train_on_batch(
temporal_x_train[:batch_size],
temporal_y_train[:batch_size],
sample_weight=temporal_sample_weight[:batch_size])
model.test_on_batch(
temporal_x_train[:batch_size],
temporal_y_train[:batch_size],
sample_weight=temporal_sample_weight[:batch_size])
ref_score = model.evaluate(temporal_x_test, temporal_y_test, verbose=0)
if not context.executing_eagerly():
score = model.evaluate(
temporal_x_test[test_ids], temporal_y_test[test_ids], verbose=0)
self.assertLess(score[0], ref_score[0])
@keras_parameterized.run_all_keras_modes
@keras_parameterized.run_with_all_model_types(exclude_models='sequential')
def test_fit_with_incorrect_weights(self):
input_a = keras.layers.Input(shape=(3,), name='input_a')
input_b = keras.layers.Input(shape=(3,), name='input_b')
dense = keras.layers.Dense(2, name='output_1')
dropout = keras.layers.Dropout(0.5, name='output_2')
branch_a = [input_a, dense]
branch_b = [input_b, dense, dropout]
model = testing_utils.get_multi_io_model(branch_a, branch_b)
model.compile(
optimizer='adam',
loss='mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.random.random((10, 3))
y = np.random.random((10, 2))
with self.assertRaisesRegexp(
ValueError,
r'Unknown entries in sample_weight dictionary: \[\'unknown\'\]. '
r'Only expected following keys: \[\'output_1\', \'output_2\'\]'):
model.fit([x, x], [y, y],
epochs=1,
sample_weight={'unknown': 'something'})
with self.assertRaisesRegexp(
ValueError,
r'Unknown entries in class_weight dictionary: \[\'unknown\'\]. '
r'Only expected following keys: \[\'output_1\', \'output_2\'\]'):
model.fit([x, x], [y, y], epochs=1, class_weight={'unknown': 'something'})
@keras_parameterized.run_all_keras_modes
def test_class_weight_invalid_use_case(self):
num_classes = 5
train_samples = 1000
test_samples = 1000
input_dim = 5
timesteps = 3
learning_rate = 0.001
with self.cached_session():
model = keras.models.Sequential()
model.add(
keras.layers.TimeDistributed(
keras.layers.Dense(num_classes),
input_shape=(timesteps, input_dim)))
model.add(keras.layers.Activation('softmax'))
optimizer = RMSPropOptimizer(learning_rate=learning_rate)
model.compile(
optimizer,
loss='binary_crossentropy',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
(x_train, y_train), _ = testing_utils.get_test_data(
train_samples=train_samples,
test_samples=test_samples,
input_shape=(input_dim,),
num_classes=num_classes)
# convert class vectors to binary class matrices
y_train = np_utils.to_categorical(y_train, num_classes)
class_weight = dict([(i, 1.) for i in range(num_classes)])
del class_weight[1]
with self.assertRaises(ValueError):
model.fit(x_train, y_train,
epochs=0, verbose=0, class_weight=class_weight)
with self.assertRaises(ValueError):
model.compile(
optimizer,
loss='binary_crossentropy',
sample_weight_mode=[],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
# Build multi-output model
x = keras.Input((3,))
y1 = keras.layers.Dense(4, name='1')(x)
y2 = keras.layers.Dense(4, name='2')(x)
model = keras.models.Model(x, [y1, y2])
model.compile(
optimizer,
loss='mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x_np = np.random.random((10, 3))
y_np = np.random.random((10, 4))
w_np = np.random.random((10,))
# This will work
model.fit(x_np, [y_np, y_np], epochs=1,
sample_weight={'1': w_np})
# These will not
with self.assertRaises(ValueError):
model.fit(x_np, [y_np, y_np], epochs=1,
sample_weight=[w_np])
with self.assertRaises(TypeError):
model.fit(x_np, [y_np, y_np], epochs=1,
sample_weight=w_np)
with self.assertRaises(ValueError):
bad_w_np = np.random.random((11,))
model.fit(x_np, [y_np, y_np], epochs=1,
sample_weight={'1': bad_w_np})
with self.assertRaises(ValueError):
bad_w_np = np.random.random((10, 2))
model.fit(x_np, [y_np, y_np], epochs=1,
sample_weight={'1': bad_w_np})
with self.assertRaises(ValueError):
bad_w_np = np.random.random((10, 2, 2))
model.fit(x_np, [y_np, y_np], epochs=1,
sample_weight={'1': bad_w_np})
@keras_parameterized.run_all_keras_modes
def test_default_sample_weight(self):
"""Verifies that fit works without having to set sample_weight."""
num_classes = 5
input_dim = 5
timesteps = 3
learning_rate = 0.001
with self.cached_session():
model = keras.models.Sequential()
model.add(
keras.layers.TimeDistributed(
keras.layers.Dense(num_classes),
input_shape=(timesteps, input_dim)))
x = np.random.random((10, timesteps, input_dim))
y = np.random.random((10, timesteps, num_classes))
optimizer = RMSPropOptimizer(learning_rate=learning_rate)
# sample_weight_mode is a list and mode value is None
model.compile(
optimizer,
loss='mse',
sample_weight_mode=[None],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, y, epochs=1, batch_size=10)
# sample_weight_mode is a list and mode value is `temporal`
model.compile(
optimizer,
loss='mse',
sample_weight_mode=['temporal'],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, y, epochs=1, batch_size=10)
# sample_weight_mode is a dict and mode value is None
model.compile(
optimizer,
loss='mse',
sample_weight_mode={'time_distributed': None},
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, y, epochs=1, batch_size=10)
# sample_weight_mode is a dict and mode value is `temporal`
model.compile(
optimizer,
loss='mse',
sample_weight_mode={'time_distributed': 'temporal'},
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, y, epochs=1, batch_size=10)
# sample_weight_mode is a not a list/dict and mode value is None
model.compile(
optimizer,
loss='mse',
sample_weight_mode=None,
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, y, epochs=1, batch_size=10)
# sample_weight_mode is a not a list/dict and mode value is `temporal`
model.compile(
optimizer,
loss='mse',
sample_weight_mode='temporal',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, y, epochs=1, batch_size=10)
def test_sample_weight_tensor(self):
"""Tests that sample weight may be defined as a tensor in the graph."""
with context.graph_mode():
# Create a simple pass-through model
input_layer = keras.layers.Input(shape=1, name='input_layer')
model = keras.Model(inputs=input_layer, outputs=input_layer)
model.compile(
loss='mean_absolute_error',
optimizer='adam')
# Prepare sample weights iterator tensor
sample_weights = array_ops.constant(
[[0, .4, 1, 1], [2, .4, .3, 1]])
dataset = dataset_ops.Dataset.from_tensor_slices(sample_weights)
sample_weights = dataset_ops.make_one_shot_iterator(dataset).get_next()
sample_weights = training_utils.standardize_sample_weights(
sample_weights, model.output_names)
# Update model loss with sample weight tensor.
model._compile_weights_loss_and_weighted_metrics(sample_weights)
feeds = {'input_layer:0': [[0], [0], [0], [0]],
'input_layer_target:0': [[1], [1], [1], [1]]}
with self.cached_session() as sess:
self.assertAllClose(
(.4 + 1 + 1) / 4, sess.run(model.total_loss, feed_dict=feeds))
self.assertAllClose(
(2+ .4 + .3 + 1) / 4, sess.run(model.total_loss, feed_dict=feeds))
def test_prepare_sample_weights(self):
# pylint:disable=anomalous-backslash-in-string
input_layer = keras.layers.Input(shape=1, name='input_layer')
model = keras.Model(inputs=input_layer, outputs=[input_layer, input_layer])
sample_weights = array_ops.constant([0, .4, 1, 1])
temporal_weights = array_ops.constant([[1, 2], [3, 4], [5, 6]])
model.compile(
loss='mean_absolute_error',
optimizer='adam',
sample_weight_mode=None)
with self.assertRaises(AssertionError):
model._prepare_sample_weights([sample_weights, sample_weights])
model.compile(loss='mean_absolute_error', optimizer='adam',
sample_weight_mode='temporal')
model._prepare_sample_weights([temporal_weights, temporal_weights])
with self.assertRaisesRegexp(ValueError, 'Expected shape \[None, None\]'):
model._prepare_sample_weights([sample_weights, sample_weights])
with self.assertRaisesRegexp(ValueError,
'sample weights must have same length as the '
'number of outputs'):
model._prepare_sample_weights([temporal_weights])
model.compile(loss='mean_absolute_error', optimizer='adam',
sample_weight_mode='samplewise')
model._prepare_sample_weights([sample_weights, sample_weights])
with self.assertRaisesRegexp(ValueError, 'Expected shape \[None\]'):
model._prepare_sample_weights([temporal_weights, temporal_weights])
# pylint:enable=anomalous-backslash-in-string
@keras_parameterized.run_all_keras_modes
class MaskingTest(keras_parameterized.TestCase):
def _get_model(self, input_shape=None):
layers = [
keras.layers.Masking(mask_value=0),
keras.layers.TimeDistributed(
keras.layers.Dense(1, kernel_initializer='one'))
]
model = testing_utils.get_model_from_layers(layers, input_shape)
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
return model
@keras_parameterized.run_with_all_model_types
def test_masking(self):
model = self._get_model(input_shape=(2, 1))
x = np.array([[[1], [1]], [[0], [0]]])
y = np.array([[[1], [1]], [[1], [1]]])
loss = model.train_on_batch(x, y)
self.assertEqual(loss, 0)
@keras_parameterized.run_with_all_model_types(exclude_models='functional')
def test_masking_deferred(self):
model = self._get_model()
x = np.array([[[1], [1]], [[0], [0]]])
y = np.array([[[1], [1]], [[1], [1]]])
loss = model.train_on_batch(x, y)
self.assertEqual(loss, 0)
def test_mask_argument_in_layer(self):
# Test that the mask argument gets correctly passed to a layer in the
# functional API.
class CustomMaskedLayer(keras.layers.Layer):
def __init__(self):
super(CustomMaskedLayer, self).__init__()
self.supports_masking = True
def call(self, inputs, mask=None):
assert mask is not None
return inputs
def compute_output_shape(self, input_shape):
return input_shape
x = np.random.random((5, 3))
inputs = keras.layers.Input((3,))
masked = keras.layers.Masking(mask_value=0)(inputs)
outputs = CustomMaskedLayer()(masked)
model = keras.Model(inputs, outputs)
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
y = np.random.random((5, 3))
model.train_on_batch(x, y)
@keras_parameterized.run_all_keras_modes
class TestDynamicTrainability(keras_parameterized.TestCase):
def test_trainable_warning(self):
x = np.random.random((5, 3))
y = np.random.random((5, 2))
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_dim=3))
model.trainable = False
model.compile(
'rmsprop',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.trainable = True
model.train_on_batch(x, y)
self.assertRaises(Warning)
def test_trainable_argument(self):
with self.cached_session():
x = np.random.random((5, 3))
y = np.random.random((5, 2))
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_dim=3, trainable=False))
model.compile(
'rmsprop',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
out = model.predict(x)
model.train_on_batch(x, y)
out_2 = model.predict(x)
self.assertAllClose(out, out_2)
# test with nesting
inputs = keras.layers.Input(shape=(3,))
output = model(inputs)
model = keras.models.Model(inputs, output)
model.compile(
'rmsprop',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
out = model.predict(x)
model.train_on_batch(x, y)
out_2 = model.predict(x)
self.assertAllClose(out, out_2)
def test_layer_trainability_switch(self):
# with constructor argument, in Sequential
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, trainable=False, input_dim=1))
self.assertListEqual(model.trainable_weights, [])
# by setting the `trainable` argument, in Sequential
model = keras.models.Sequential()
layer = keras.layers.Dense(2, input_dim=1)
model.add(layer)
self.assertListEqual(model.trainable_weights, layer.trainable_weights)
layer.trainable = False
self.assertListEqual(model.trainable_weights, [])
# with constructor argument, in Model
x = keras.layers.Input(shape=(1,))
y = keras.layers.Dense(2, trainable=False)(x)
model = keras.models.Model(x, y)
self.assertListEqual(model.trainable_weights, [])
# by setting the `trainable` argument, in Model
x = keras.layers.Input(shape=(1,))
layer = keras.layers.Dense(2)
y = layer(x)
model = keras.models.Model(x, y)
self.assertListEqual(model.trainable_weights, layer.trainable_weights)
layer.trainable = False
self.assertListEqual(model.trainable_weights, [])
def test_model_trainability_switch(self):
# a non-trainable model has no trainable weights
x = keras.layers.Input(shape=(1,))
y = keras.layers.Dense(2)(x)
model = keras.models.Model(x, y)
model.trainable = False
self.assertListEqual(model.trainable_weights, [])
# same for Sequential
model = keras.models.Sequential()
model.add(keras.layers.Dense(2, input_dim=1))
model.trainable = False
self.assertListEqual(model.trainable_weights, [])
def test_nested_model_trainability(self):
# a Sequential inside a Model
inner_model = keras.models.Sequential()
inner_model.add(keras.layers.Dense(2, input_dim=1))
x = keras.layers.Input(shape=(1,))
y = inner_model(x)
outer_model = keras.models.Model(x, y)
self.assertListEqual(outer_model.trainable_weights,
inner_model.trainable_weights)
inner_model.trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
inner_model.trainable = True
inner_model.layers[-1].trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
# a Sequential inside a Sequential
inner_model = keras.models.Sequential()
inner_model.add(keras.layers.Dense(2, input_dim=1))
outer_model = keras.models.Sequential()
outer_model.add(inner_model)
self.assertListEqual(outer_model.trainable_weights,
inner_model.trainable_weights)
inner_model.trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
inner_model.trainable = True
inner_model.layers[-1].trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
# a Model inside a Model
x = keras.layers.Input(shape=(1,))
y = keras.layers.Dense(2)(x)
inner_model = keras.models.Model(x, y)
x = keras.layers.Input(shape=(1,))
y = inner_model(x)
outer_model = keras.models.Model(x, y)
self.assertListEqual(outer_model.trainable_weights,
inner_model.trainable_weights)
inner_model.trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
inner_model.trainable = True
inner_model.layers[-1].trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
# a Model inside a Sequential
x = keras.layers.Input(shape=(1,))
y = keras.layers.Dense(2)(x)
inner_model = keras.models.Model(x, y)
outer_model = keras.models.Sequential()
outer_model.add(inner_model)
self.assertListEqual(outer_model.trainable_weights,
inner_model.trainable_weights)
inner_model.trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
inner_model.trainable = True
inner_model.layers[-1].trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
def test_gan_workflow(self):
shared_layer = keras.layers.BatchNormalization()
inputs1 = keras.Input(10)
outputs1 = shared_layer(inputs1)
model1 = keras.Model(inputs1, outputs1)
shared_layer.trainable = False
model1.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
inputs2 = keras.Input(10)
outputs2 = shared_layer(inputs2)
model2 = keras.Model(inputs2, outputs2)
shared_layer.trainable = True
model2.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x, y = np.ones((10, 10)), np.ones((10, 10))
out1_0 = model1.predict_on_batch(x)
model1.train_on_batch(x, y)
out1_1 = model1.predict_on_batch(x)
self.assertAllClose(out1_0, out1_1)
out2_0 = model2.predict_on_batch(x)
model2.train_on_batch(x, y)
out2_1 = model2.predict_on_batch(x)
self.assertNotAllClose(out2_0, out2_1)
def test_toggle_value(self):
input_0 = keras.layers.Input(shape=(1,))
dense_0 = keras.layers.Dense(1, kernel_initializer='ones',
bias_initializer='ones')
dense_1 = keras.layers.Dense(1, kernel_initializer='ones',
bias_initializer='ones')
result = keras.layers.Add()([dense_0(input_0), dense_1(input_0)])
model = keras.models.Model(input_0, result)
dense_0.trainable = False
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.ones((10, 1))
y = 5 * x + 2
model.train_on_batch(x, y)
dense_0.trainable = True
model.train_on_batch(x, y)
kernel, bias = dense_0.get_weights()
self.assertAllEqual([kernel[0, 0], bias[0]], [1., 1.])
kernel, bias = dense_1.get_weights()
self.assertAllClose([kernel[0, 0], bias[0]], [1.1176, 1.1176])
class TestTrainingWithDataTensors(keras_parameterized.TestCase):
@tf_test_util.run_deprecated_v1
def test_training_and_eval_methods_on_symbolic_tensors_single_io(self):
x = keras.layers.Input(shape=(3,), name='input')
y = keras.layers.Dense(4, name='dense')(x)
model = keras.Model(x, y)
optimizer = RMSPropOptimizer(learning_rate=0.001)
loss = 'mse'
model.compile(
optimizer,
loss,
metrics=['mae', metrics_module.CategoricalAccuracy()])
inputs = keras.backend.zeros(shape=(10, 3))
targets = keras.backend.zeros(shape=(10, 4))
model.fit(inputs, targets, epochs=1, steps_per_epoch=2, verbose=0)
model.evaluate(inputs, targets, steps=2, verbose=0)
model.predict(inputs, steps=2)
model.train_on_batch(inputs, targets)
model.test_on_batch(inputs, targets)
model.fit(inputs, targets,
epochs=1, steps_per_epoch=2, verbose=0,
validation_data=(inputs, targets), validation_steps=2)
# Test with dynamic shape
inputs = array_ops.placeholder_with_default(
np.zeros((2, 3)), shape=tensor_shape.TensorShape([None, 3]))
targets = array_ops.placeholder_with_default(
np.zeros((2, 4)), shape=tensor_shape.TensorShape([None, 4]))
self.assertEqual(inputs.shape.dims[0].value, None)
model.fit(inputs, targets, epochs=1, steps_per_epoch=2, verbose=0)
model.evaluate(inputs, targets, steps=2, verbose=0)
model.predict(inputs, steps=2)
model.train_on_batch(inputs, targets)
model.test_on_batch(inputs, targets)
model.fit(inputs, targets,
epochs=1, steps_per_epoch=2, verbose=0,
validation_data=(inputs, targets), validation_steps=2)
@tf_test_util.run_deprecated_v1
def test_training_and_eval_methods_on_symbolic_tensors_multi_io(self):
a = keras.layers.Input(shape=(3,), name='input_a')
b = keras.layers.Input(shape=(3,), name='input_b')
dense = keras.layers.Dense(4, name='dense')
c = dense(a)
d = dense(b)
e = keras.layers.Dropout(0.5, name='dropout')(c)
model = keras.models.Model([a, b], [d, e])
optimizer = 'rmsprop'
loss = 'mse'
loss_weights = [1., 0.5]
model.compile(
optimizer,
loss,
metrics=['mae', metrics_module.CategoricalAccuracy()],
loss_weights=loss_weights)
input_a_tf = keras.backend.zeros(shape=(10, 3))
input_b_tf = keras.backend.zeros(shape=(10, 3))
output_d_tf = keras.backend.zeros(shape=(10, 4))
output_e_tf = keras.backend.zeros(shape=(10, 4))
model.fit(
[input_a_tf, input_b_tf], [output_d_tf, output_e_tf],
epochs=1,
steps_per_epoch=2,
verbose=0)
with self.assertRaisesRegexp(ValueError,
'should specify the `steps_per_epoch`'):
model.fit(
[input_a_tf, input_b_tf], [output_d_tf, output_e_tf],
epochs=1,
batch_size=5,
verbose=0)
model.train_on_batch([input_a_tf, input_b_tf], [output_d_tf, output_e_tf])
# Test with dictionary inputs
model.fit(
{'input_a': input_a_tf,
'input_b': input_b_tf},
{'dense': output_d_tf,
'dropout': output_e_tf},
epochs=1,
steps_per_epoch=2,
verbose=0)
model.fit(
{'input_a': input_a_tf,
'input_b': input_b_tf},
{'dense': output_d_tf,
'dropout': output_e_tf},
validation_data=({'input_a': input_a_tf,
'input_b': input_b_tf},
{'dense': output_d_tf,
'dropout': output_e_tf}),
epochs=1,
steps_per_epoch=2,
validation_steps=2,
verbose=0)
model.train_on_batch(
{'input_a': input_a_tf,
'input_b': input_b_tf},
{'dense': output_d_tf,
'dropout': output_e_tf})
# Test with validation data
model.fit(
[input_a_tf, input_b_tf], [output_d_tf, output_e_tf],
validation_data=([input_a_tf, input_b_tf],
[output_d_tf, output_e_tf]),
epochs=1,
steps_per_epoch=2,
validation_steps=2,
verbose=0)
# Test with validation split
with self.assertRaisesRegexp(ValueError,
'you cannot use `validation_split`'):
model.fit(
[input_a_tf, input_b_tf], [output_d_tf, output_e_tf],
epochs=2,
steps_per_epoch=2,
verbose=0,
validation_split=0.2,
validation_steps=2)
# Test evaluation / prediction methods
model.evaluate([input_a_tf, input_b_tf], [output_d_tf, output_e_tf],
steps=2, verbose=0)
model.predict([input_a_tf, input_b_tf], steps=2)
model.test_on_batch([input_a_tf, input_b_tf], [output_d_tf, output_e_tf])
@tf_test_util.run_deprecated_v1
def test_model_with_input_feed_tensor(self):
"""We test building a model with a TF variable as input.
We should be able to call fit, evaluate, predict,
by only passing them data for the placeholder inputs
in the model.
"""
with self.cached_session():
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 3))
output_a_np = np.random.random((10, 4))
output_b_np = np.random.random((10, 3))
input_v = keras.backend.variables_module.Variable(
input_a_np, dtype='float32')
self.evaluate(variables_lib.variables_initializer([input_v]))
a = keras.Input(tensor=input_v)
b = keras.Input(shape=(3,), name='input_b')
a_2 = keras.layers.Dense(4, name='dense_1')(a)
dp = keras.layers.Dropout(0.5, name='dropout')
b_2 = dp(b)
model = keras.models.Model([a, b], [a_2, b_2])
model.summary()
optimizer = 'rmsprop'
loss = 'mse'
loss_weights = [1., 0.5]
model.compile(optimizer, loss, metrics=['mean_squared_error'],
loss_weights=loss_weights,
sample_weight_mode=None)
# test train_on_batch
out = model.train_on_batch(input_b_np,
[output_a_np, output_b_np])
out = model.train_on_batch({'input_b': input_b_np},
[output_a_np, output_b_np])
out = model.test_on_batch({'input_b': input_b_np},
[output_a_np, output_b_np])
out = model.predict_on_batch({'input_b': input_b_np})
# test fit
out = model.fit({'input_b': input_b_np},
[output_a_np, output_b_np], epochs=1, batch_size=10)
out = model.fit(input_b_np,
[output_a_np, output_b_np], epochs=1, batch_size=10)
# test evaluate
out = model.evaluate({'input_b': input_b_np},
[output_a_np, output_b_np], batch_size=10)
out = model.evaluate(input_b_np,
[output_a_np, output_b_np], batch_size=10)
# test predict
out = model.predict({'input_b': input_b_np}, batch_size=10)
out = model.predict(input_b_np, batch_size=10)
self.assertEqual(len(out), 2)
# Now test a model with a single input
# i.e. we don't pass any data to fit the model.
self.evaluate(variables_lib.variables_initializer([input_v]))
a = keras.Input(tensor=input_v)
a_2 = keras.layers.Dense(4, name='dense_1')(a)
a_2 = keras.layers.Dropout(0.5, name='dropout')(a_2)
model = keras.models.Model(a, a_2)
model.summary()
optimizer = 'rmsprop'
loss = 'mse'
model.compile(optimizer, loss, metrics=['mean_squared_error'])
# test train_on_batch
out = model.train_on_batch(None,
output_a_np)
out = model.train_on_batch(None,
output_a_np)
out = model.test_on_batch(None,
output_a_np)
out = model.predict_on_batch(None)
out = model.train_on_batch([],
output_a_np)
out = model.train_on_batch({},
output_a_np)
# test fit
_ = model.fit(None, output_a_np, epochs=1, steps_per_epoch=3)
_ = model.fit(None, output_a_np, epochs=1, steps_per_epoch=3)
# test evaluate
_ = model.evaluate(None, output_a_np, steps=3)
_ = model.evaluate(None, output_a_np, steps=3)
# test predict
out = model.predict(None, steps=3)
out = model.predict(None, steps=3)
self.assertEqual(out.shape, (10 * 3, 4))
# Same, without learning phase
# i.e. we don't pass any data to fit the model.
self.evaluate(variables_lib.variables_initializer([input_v]))
a = keras.Input(tensor=input_v)
a_2 = keras.layers.Dense(4, name='dense_1')(a)
model = keras.models.Model(a, a_2)
model.summary()
optimizer = 'rmsprop'
loss = 'mse'
model.compile(optimizer, loss, metrics=['mean_squared_error'])
# test train_on_batch
out = model.train_on_batch(None,
output_a_np)
out = model.train_on_batch(None,
output_a_np)
out = model.test_on_batch(None,
output_a_np)
out = model.predict_on_batch(None)
out = model.train_on_batch([],
output_a_np)
out = model.train_on_batch({},
output_a_np)
# test fit
_ = model.fit(None, output_a_np, epochs=1, steps_per_epoch=10)
_ = model.fit(None, output_a_np, epochs=1, steps_per_epoch=10)
# test evaluate
_ = model.evaluate(None, output_a_np, steps=10)
_ = model.evaluate(None, output_a_np, steps=10)
# test predict
out = model.predict(None, steps=3)
out = model.predict(None, steps=3)
self.assertEqual(out.shape, (10 * 3, 4))
@keras_parameterized.run_all_keras_modes
def test_model_with_partial_loss(self):
with self.cached_session():
a = keras.Input(shape=(3,), name='input_a')
a_2 = keras.layers.Dense(4, name='dense_1')(a)
dp = keras.layers.Dropout(0.5, name='dropout')
a_3 = dp(a_2)
model = keras.models.Model(a, [a_2, a_3])
optimizer = 'rmsprop'
loss = {'dropout': 'mse'}
model.compile(optimizer, loss, metrics=['mae'])
input_a_np = np.random.random((10, 3))
output_a_np = np.random.random((10, 4))
# test train_on_batch
_ = model.train_on_batch(input_a_np, output_a_np)
_ = model.test_on_batch(input_a_np, output_a_np)
# fit
_ = model.fit(input_a_np, [output_a_np])
# evaluate
_ = model.evaluate(input_a_np, [output_a_np])
# Same without dropout.
a = keras.Input(shape=(3,), name='input_a')
a_2 = keras.layers.Dense(4, name='dense_1')(a)
a_3 = keras.layers.Dense(4, name='dense_2')(a_2)
model = keras.models.Model(a, [a_2, a_3])
optimizer = 'rmsprop'
loss = {'dense_2': 'mse'}
model.compile(optimizer, loss, metrics={'dense_1': 'mae'})
# test train_on_batch
_ = model.train_on_batch(input_a_np, output_a_np)
_ = model.test_on_batch(input_a_np, output_a_np)
# fit
_ = model.fit(input_a_np, [output_a_np])
# evaluate
_ = model.evaluate(input_a_np, [output_a_np])
@tf_test_util.run_deprecated_v1
def test_model_with_external_loss(self):
with self.cached_session():
# None loss, only regularization loss.
a = keras.Input(shape=(3,), name='input_a')
a_2 = keras.layers.Dense(4, name='dense_1',
kernel_regularizer='l1',
bias_regularizer='l2')(a)
dp = keras.layers.Dropout(0.5, name='dropout')
a_3 = dp(a_2)
model = keras.models.Model(a, [a_2, a_3])
optimizer = 'rmsprop'
loss = None
model.compile(optimizer, loss, metrics=['mae'])
input_a_np = np.random.random((10, 3))
# test train_on_batch
out = model.train_on_batch(input_a_np, None)
out = model.test_on_batch(input_a_np, None)
# fit
out = model.fit(input_a_np, None)
# evaluate
out = model.evaluate(input_a_np, None)
# No dropout, external loss.
a = keras.Input(shape=(3,), name='input_a')
a_2 = keras.layers.Dense(4, name='dense_1')(a)
a_3 = keras.layers.Dense(4, name='dense_2')(a)
model = keras.models.Model(a, [a_2, a_3])
model.add_loss(keras.backend.mean(a_3 + a_2))
optimizer = 'rmsprop'
loss = None
model.compile(optimizer, loss, metrics=['mae'])
# test train_on_batch
out = model.train_on_batch(input_a_np, None)
out = model.test_on_batch(input_a_np, None)
# fit
out = model.fit(input_a_np, None)
# evaluate
out = model.evaluate(input_a_np, None)
# Test model with no external data at all.
input_v = keras.backend.variables_module.Variable(
input_a_np, dtype='float32')
self.evaluate(variables_lib.variables_initializer([input_v]))
a = keras.Input(tensor=input_v)
a_2 = keras.layers.Dense(4, name='dense_1')(a)
a_2 = keras.layers.Dropout(0.5, name='dropout')(a_2)
model = keras.models.Model(a, a_2)
model.add_loss(keras.backend.mean(a_2))
model.compile(optimizer='rmsprop',
loss=None,
metrics=['mean_squared_error'])
# test train_on_batch
out = model.train_on_batch(None, None)
out = model.test_on_batch(None, None)
out = model.predict_on_batch(None)
# test fit
with self.assertRaises(ValueError):
out = model.fit(None, None, epochs=1, batch_size=10)
out = model.fit(None, None, epochs=1, steps_per_epoch=1)
# test fit with validation data
with self.assertRaises(ValueError):
out = model.fit(None, None, epochs=1,
steps_per_epoch=None,
validation_steps=2)
out = model.fit(None, None, epochs=1,
steps_per_epoch=2,
validation_steps=2)
# test evaluate
with self.assertRaises(ValueError):
out = model.evaluate(None, None, batch_size=10)
out = model.evaluate(None, None, steps=3)
# test predict
with self.assertRaises(ValueError):
out = model.predict(None, batch_size=10)
out = model.predict(None, steps=3)
self.assertEqual(out.shape, (10 * 3, 4))
# Test multi-output model with no external data at all.
self.evaluate(variables_lib.variables_initializer([input_v]))
a = keras.Input(tensor=input_v)
a_1 = keras.layers.Dense(4, name='dense_1')(a)
a_2 = keras.layers.Dropout(0.5, name='dropout')(a_1)
model = keras.models.Model(a, [a_1, a_2])
model.add_loss(keras.backend.mean(a_2))
model.compile(optimizer='rmsprop',
loss=None,
metrics=['mean_squared_error'])
# test train_on_batch
out = model.train_on_batch(None, None)
out = model.test_on_batch(None, None)
out = model.predict_on_batch(None)
# test fit
with self.assertRaises(ValueError):
out = model.fit(None, None, epochs=1, batch_size=10)
out = model.fit(None, None, epochs=1, steps_per_epoch=1)
# test evaluate
with self.assertRaises(ValueError):
out = model.evaluate(None, None, batch_size=10)
out = model.evaluate(None, None, steps=3)
# test predict
with self.assertRaises(ValueError):
out = model.predict(None, batch_size=10, verbose=1)
out = model.predict(None, steps=3)
self.assertEqual(len(out), 2)
self.assertEqual(out[0].shape, (10 * 3, 4))
self.assertEqual(out[1].shape, (10 * 3, 4))
@tf_test_util.run_deprecated_v1
def test_target_tensors(self):
with self.cached_session():
# single-output, as list
model = keras.models.Sequential()
model.add(keras.layers.Dense(4, input_shape=(4,), name='dense'))
input_val = np.random.random((10, 4))
target_val = np.random.random((10, 4))
target = keras.backend.variable(target_val)
model.compile(optimizer='rmsprop', loss='mse', target_tensors=[target])
model.train_on_batch(input_val, None)
# single-output, as single tensor
model.compile(optimizer='rmsprop', loss='mse', target_tensors=target)
model.train_on_batch(input_val, None)
# single-output, as dict
model.compile(optimizer='rmsprop', loss='mse',
target_tensors={'dense': target})
model.train_on_batch(input_val, None)
# test invalid arguments
with self.assertRaises(TypeError):
model.compile(optimizer='rmsprop', loss='mse',
target_tensors=set())
with self.assertRaises(ValueError):
model.compile(optimizer='rmsprop', loss='mse',
target_tensors=[target, target])
with self.assertRaises(ValueError):
model.compile(optimizer='rmsprop', loss='mse',
target_tensors={'dense2': None})
with self.assertRaises(ValueError):
model.compile(optimizer='rmsprop', loss='mse',
target_tensors=[target])
model.train_on_batch(input_val, target_val)
# multi-output, as list
input_val = np.random.random((10, 4))
target_val_a = np.random.random((10, 4))
target_val_b = np.random.random((10, 4))
target_a = keras.backend.variable(target_val_a)
target_b = keras.backend.variable(target_val_b)
inputs = keras.layers.Input(shape=(4,))
output_a = keras.layers.Dense(4, name='dense_a')(inputs)
output_b = keras.layers.Dense(4, name='dense_b')(inputs)
model = keras.models.Model(inputs, [output_a, output_b])
model.compile(optimizer='rmsprop', loss='mse',
target_tensors=[target_a, target_b])
model.train_on_batch(input_val, None)
# multi-output, as dict
model.compile(optimizer='rmsprop', loss='mse',
target_tensors={'dense_a': target_a,
'dense_b': target_b})
model.train_on_batch(input_val, None)
# test with sample weights
model.compile(
optimizer='rmsprop',
loss='mse',
metrics=['mae', metrics_module.CategoricalAccuracy()],
target_tensors=[target_a, target_b])
model.train_on_batch(input_val, None,
sample_weight={'dense_a': np.random.random((10,))})
@tf_test_util.run_deprecated_v1
def test_model_custom_target_tensors(self):
with self.cached_session():
a = keras.Input(shape=(3,), name='input_a')
b = keras.Input(shape=(3,), name='input_b')
a_2 = keras.layers.Dense(4, name='dense_1')(a)
dp = keras.layers.Dropout(0.5, name='dropout')
b_2 = dp(b)
y = keras.backend.placeholder([10, 4], name='y')
y1 = keras.backend.placeholder([10, 3], name='y1')
y2 = keras.backend.placeholder([7, 5], name='y2')
model = keras.models.Model([a, b], [a_2, b_2])
optimizer = 'rmsprop'
loss = 'mse'
loss_weights = [1., 0.5]
# test list of target tensors
with self.assertRaises(ValueError):
model.compile(optimizer, loss, metrics=[], loss_weights=loss_weights,
sample_weight_mode=None, target_tensors=[y, y1, y2])
model.compile(optimizer, loss, metrics=[], loss_weights=loss_weights,
sample_weight_mode=None, target_tensors=[y, y1])
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 3))
output_a_np = np.random.random((10, 4))
output_b_np = np.random.random((10, 3))
_ = model.train_on_batch([input_a_np, input_b_np],
[output_a_np, output_b_np], {
'dense_1': np.random.random((10,)),
'dropout': np.random.random((10,))
})
# test dictionary of target_tensors
with self.assertRaises(ValueError):
model.compile(optimizer, loss,
metrics=[],
loss_weights=loss_weights,
sample_weight_mode=None,
target_tensors={'does_not_exist': y2})
# test dictionary of target_tensors
model.compile(optimizer, loss,
metrics=[],
loss_weights=loss_weights,
sample_weight_mode=None,
target_tensors={'dense_1': y, 'dropout': y1})
_ = model.train_on_batch([input_a_np, input_b_np],
[output_a_np, output_b_np], {
'dense_1': np.random.random((10,)),
'dropout': np.random.random((10,))
})
# test with custom TF placeholder as target
pl_target_a = keras.backend.array_ops.placeholder('float32',
shape=(None, 4))
model.compile(optimizer='rmsprop', loss='mse',
target_tensors={'dense_1': pl_target_a})
model.train_on_batch([input_a_np, input_b_np],
[output_a_np, output_b_np])
class TestTrainingWithMetrics(keras_parameterized.TestCase):
"""Training tests related to metrics."""
@keras_parameterized.run_all_keras_modes
def test_metrics_names(self):
a = keras.layers.Input(shape=(3,), name='input_a')
b = keras.layers.Input(shape=(3,), name='input_b')
dense = keras.layers.Dense(4, name='dense')
c = dense(a)
d = dense(b)
e = keras.layers.Dropout(0.5, name='dropout')(c)
model = keras.models.Model([a, b], [d, e])
optimizer = RMSPropOptimizer(learning_rate=0.001)
metrics = ['mse', metrics_module.BinaryAccuracy()]
model.compile(
optimizer,
loss='mae',
metrics=metrics,
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
mse_metric = 'mse' if tf2.enabled() else 'mean_squared_error'
reference_metric_names = [
'loss', 'dense_loss', 'dropout_loss', 'dense_' + mse_metric,
'dense_binary_accuracy', 'dropout_' + mse_metric,
'dropout_binary_accuracy'
]
self.assertEqual(reference_metric_names, model.metrics_names)
# Verify that model metric names are not altered during training.
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 3))
output_d_np = np.random.random((10, 4))
output_e_np = np.random.random((10, 4))
model.fit([input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=1,
batch_size=5)
self.assertEqual(reference_metric_names, model.metrics_names)
@keras_parameterized.run_all_keras_modes
def test_metric_state_reset_between_fit_and_evaluate(self):
model = keras.Sequential()
model.add(keras.layers.Dense(3, activation='relu', input_dim=4))
model.add(keras.layers.Dense(1, activation='sigmoid'))
acc_obj = metrics_module.BinaryAccuracy()
model.compile(
loss='mae',
metrics=[acc_obj],
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x_train = np.random.random((100, 4))
y_train = np.random.random((100, 1))
model.fit(x_train, y_train, batch_size=5, epochs=2)
self.assertEqual(self.evaluate(acc_obj.count), 100)
x_test = np.random.random((10, 4))
y_test = np.random.random((10, 1))
model.evaluate(x_test, y_test, batch_size=5)
self.assertEqual(self.evaluate(acc_obj.count), 10)
@keras_parameterized.run_with_all_model_types(exclude_models=['sequential'])
@keras_parameterized.run_all_keras_modes
def test_metrics_valid_compile_input_formats(self):
inp_1 = keras.layers.Input(shape=(1,), name='input_1')
inp_2 = keras.layers.Input(shape=(1,), name='input_2')
x = keras.layers.Dense(3, kernel_initializer='ones', trainable=False)
out_1 = keras.layers.Dense(
1, kernel_initializer='ones', name='output_1', trainable=False)
out_2 = keras.layers.Dense(
1, kernel_initializer='ones', name='output_2', trainable=False)
branch_a = [inp_1, x, out_1]
branch_b = [inp_2, x, out_2]
model = testing_utils.get_multi_io_model(branch_a, branch_b)
# list of metrics.
model.compile(
optimizer='rmsprop',
loss='mse',
metrics=[keras.metrics.MeanSquaredError()],
weighted_metrics=[keras.metrics.MeanSquaredError()],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
# list of list of metrics.
model.compile(
optimizer='rmsprop',
loss='mse',
metrics=[
keras.metrics.MeanSquaredError(),
[keras.metrics.MeanSquaredError(),
keras.metrics.Accuracy()]
],
weighted_metrics=[
keras.metrics.MeanSquaredError(),
[keras.metrics.MeanSquaredError(),
keras.metrics.Accuracy()]
],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
# dict of metrics.
model.compile(
optimizer='rmsprop',
loss='mse',
metrics={
'output_1':
keras.metrics.MeanSquaredError(),
'output_2': [
keras.metrics.MeanSquaredError(),
keras.metrics.Accuracy()
],
},
weighted_metrics={
'output_1':
keras.metrics.MeanSquaredError(),
'output_2': [
keras.metrics.MeanSquaredError(),
keras.metrics.Accuracy()
],
},
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
@keras_parameterized.run_all_keras_modes
def test_invalid_metrics(self):
num_classes = 5
input_dim = 5
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=num_classes, input_dim=input_dim)
with self.assertRaisesRegexp(
TypeError, 'Type of `metrics` argument not understood. '
'Expected a list or dictionary, found: '):
model.compile(
RMSPropOptimizer(learning_rate=0.001),
loss='categorical_crossentropy',
metrics=metrics_module.CategoricalAccuracy(),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
inp = keras.layers.Input(shape=(1,))
x = keras.layers.Dense(3, activation='relu')(inp)
out_1 = keras.layers.Dense(1, activation='sigmoid', name='output_1')(x)
out_2 = keras.layers.Dense(1, activation='sigmoid', name='output_2')(x)
model = keras.models.Model(inp, [out_1, out_2])
with self.assertRaisesRegex(
ValueError, 'When passing a list of lists as `metrics`, '
'it should have one entry per model output. '
'The model has 2 outputs, but you passed metrics='):
model.compile('rmsprop', loss='mse', metrics=[['mse']])
with self.assertRaisesRegex(
ValueError,
r'Unknown entries in metrics dictionary: \[\'output_3\'\]. Only '
r'expected following keys: \[\'output_1\', \'output_2\'\]'):
model.compile(
optimizer='rmsprop',
loss='mse',
metrics={
'output_1': 'mse',
'output_3': 'mse',
},
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
with self.assertRaisesRegex(
ValueError,
r'Unknown entries in metrics dictionary: \[\'output_3\'\]. Only '
r'expected following keys: \[\'output_1\', \'output_2\'\]'):
model.compile(
optimizer='rmsprop',
loss='mse',
weighted_metrics={
'output_1': 'mse',
'output_3': 'mse',
},
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
@keras_parameterized.run_all_keras_modes
def test_metrics_masking(self):
np.random.seed(1337)
model = keras.models.Sequential()
model.add(keras.layers.Masking(mask_value=0, input_shape=(2, 1)))
model.add(
keras.layers.TimeDistributed(
keras.layers.Dense(1, kernel_initializer='ones')))
model.compile(
RMSPropOptimizer(learning_rate=0.001),
loss='mse',
weighted_metrics=['accuracy'],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
# verify that masking is applied.
x = np.array([[[1], [1]], [[1], [1]], [[0], [0]]])
y = np.array([[[1], [1]], [[0], [1]], [[1], [1]]])
scores = model.train_on_batch(x, y)
self.assertArrayNear(scores, [0.25, 0.75], 0.1)
# verify that masking is combined with sample weights.
w = np.array([3, 2, 4])
scores = model.train_on_batch(x, y, sample_weight=w)
self.assertArrayNear(scores, [0.3328, 0.8], 0.001)
@keras_parameterized.run_all_keras_modes
def test_add_metric_with_tensor_on_model(self):
x = keras.layers.Input(shape=(1,))
y = keras.layers.Dense(1, kernel_initializer='ones')(x)
model = keras.models.Model(x, y)
model.add_metric(
math_ops.reduce_sum(y), name='metric_1', aggregation='mean')
if context.executing_eagerly():
# This is not a use case in v1 graph mode.
mean_result = metrics_module.Mean()(y)
with self.assertRaisesRegex(
ValueError, 'Expected a symbolic Tensor for the metric value'):
model.add_metric(mean_result, name='metric_2')
with self.assertRaisesRegex(
ValueError, 'Using the result of calling a `Metric` object '):
with keras.backend.get_graph().as_default():
model.add_metric(metrics_module.Mean(name='metric_2')(y))
model.compile(
'sgd',
loss='mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
inputs = np.ones(shape=(10, 1))
targets = np.ones(shape=(10, 1))
history = model.fit(
inputs,
targets,
epochs=2,
batch_size=5,
validation_data=(inputs, targets))
self.assertEqual(history.history['metric_1'][-1], 5)
self.assertEqual(history.history['val_metric_1'][-1], 5)
eval_results = model.evaluate(inputs, targets, batch_size=5)
self.assertEqual(eval_results[-1], 5)
model.predict(inputs, batch_size=5)
model.train_on_batch(inputs, targets)
model.test_on_batch(inputs, targets)
@keras_parameterized.run_all_keras_modes
def test_add_metric_in_model_call(self):
class TestModel(keras.Model):
def __init__(self):
super(TestModel, self).__init__(name='test_model')
self.dense1 = keras.layers.Dense(2, kernel_initializer='ones')
self.mean = metrics_module.Mean(name='metric_1')
def call(self, x):
self.add_metric(
math_ops.reduce_sum(x), name='metric_2', aggregation='mean')
# Provide same name as in the instance created in __init__
# for eager mode
self.add_metric(self.mean(x), name='metric_1')
return self.dense1(x)
model = TestModel()
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(0.01),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.ones(shape=(10, 1))
y = np.ones(shape=(10, 2))
history = model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
self.assertAlmostEqual(history.history['metric_1'][-1], 1, 0)
self.assertAlmostEqual(history.history['val_metric_1'][-1], 1, 0)
self.assertAlmostEqual(history.history['metric_2'][-1], 5, 0)
self.assertAlmostEqual(history.history['val_metric_2'][-1], 5, 0)
eval_results = model.evaluate(x, y, batch_size=5)
self.assertAlmostEqual(eval_results[1], 1, 0)
self.assertAlmostEqual(eval_results[2], 5, 0)
model.predict(x, batch_size=5)
model.train_on_batch(x, y)
model.test_on_batch(x, y)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_add_metric_in_layer_call(self):
class TestLayer(keras.layers.Layer):
def build(self, input_shape):
self.a = self.add_variable(
'a', (1, 1), initializer='ones', trainable=False)
self.built = True
def call(self, inputs):
self.add_metric(
math_ops.reduce_sum(inputs), name='metric_1', aggregation='mean')
return inputs + 1
layers = [
TestLayer(input_shape=(1,)),
keras.layers.Dense(2, kernel_initializer='ones')
]
model = testing_utils.get_model_from_layers(layers, input_shape=(1,))
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(0.01),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.ones(shape=(10, 1))
y = np.ones(shape=(10, 2))
history = model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
self.assertEqual(history.history['metric_1'][-1], 5)
self.assertAlmostEqual(history.history['val_metric_1'][-1], 5, 0)
@keras_parameterized.run_all_keras_modes
def test_model_metrics_list(self):
class LayerWithAddMetric(keras.layers.Layer):
def __init__(self):
super(LayerWithAddMetric, self).__init__()
self.dense = keras.layers.Dense(1, kernel_initializer='ones')
def __call__(self, inputs):
outputs = self.dense(inputs)
self.add_metric(
math_ops.reduce_sum(outputs), name='metric_1', aggregation='mean')
return outputs
class LayerWithNestedAddMetricLayer(keras.layers.Layer):
def __init__(self):
super(LayerWithNestedAddMetricLayer, self).__init__()
self.layer = LayerWithAddMetric()
def call(self, inputs):
outputs = self.layer(inputs)
self.add_metric(
math_ops.reduce_sum(outputs), name='metric_2', aggregation='mean')
return outputs
x = keras.layers.Input(shape=(1,))
y = LayerWithNestedAddMetricLayer()(x)
model = keras.models.Model(x, y)
model.add_metric(
math_ops.reduce_sum(y), name='metric_3', aggregation='mean')
if context.executing_eagerly():
# This is not a use case in v1 graph mode.
mean_result = metrics_module.Mean()(y)
with self.assertRaisesRegex(
ValueError, 'Expected a symbolic Tensor for the metric value'):
model.add_metric(mean_result, name='metric_4')
with self.assertRaisesRegex(
ValueError, 'Using the result of calling a `Metric` object '):
with keras.backend.get_graph().as_default():
model.add_metric(metrics_module.Mean(name='metric_4')(y))
model.compile(
'sgd',
loss='mse',
metrics=[metrics_module.Accuracy('metric_4')],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
# Verify that the metrics added using `compile` and `add_metric` API are
# included
self.assertEqual([m.name for m in model._compile_metrics], ['metric_4'])
self.assertEqual([m.name for m in model.metrics],
['metric_4', 'metric_2', 'metric_1', 'metric_3'])
@keras_parameterized.run_all_keras_modes
def test_model_metrics_list_in_call(self):
class TestModel(keras.Model):
def __init__(self):
super(TestModel, self).__init__(name='test_model')
self.dense1 = keras.layers.Dense(2, kernel_initializer='ones')
def call(self, x):
self.add_metric(
math_ops.reduce_sum(x), name='metric_1', aggregation='mean')
return self.dense1(x)
model = TestModel()
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(0.01),
metrics=[metrics_module.Accuracy('acc')],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.ones(shape=(10, 1))
y = np.ones(shape=(10, 2))
model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
self.assertEqual([m.name for m in model._compile_metrics], ['acc'])
self.assertEqual([m.name for m in model.metrics], ['acc', 'metric_1'])
@keras_parameterized.run_all_keras_modes
def test_multiple_add_metric_calls(self):
class TestModel(keras.Model):
def __init__(self):
super(TestModel, self).__init__(name='test_model')
self.dense1 = keras.layers.Dense(2, kernel_initializer='ones')
self.mean1 = metrics_module.Mean(name='metric_1')
self.mean2 = metrics_module.Mean(name='metric_2')
def call(self, x):
self.add_metric(self.mean2(x), name='metric_2')
self.add_metric(self.mean1(x), name='metric_1')
self.add_metric(
math_ops.reduce_sum(x), name='metric_3', aggregation='mean')
return self.dense1(x)
model = TestModel()
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(0.01),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.ones(shape=(10, 1))
y = np.ones(shape=(10, 2))
history = model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
self.assertAlmostEqual(history.history['metric_1'][-1], 1, 0)
self.assertAlmostEqual(history.history['metric_2'][-1], 1, 0)
self.assertAlmostEqual(history.history['metric_3'][-1], 5, 0)
eval_results = model.evaluate(x, y, batch_size=5)
self.assertArrayNear(eval_results[1:4], [1, 1, 5], 0.1)
model.predict(x, batch_size=5)
model.train_on_batch(x, y)
model.test_on_batch(x, y)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_invalid_metric_tensor(self):
class TestLayer(keras.layers.Layer):
def build(self, input_shape):
self.built = True
def call(self, inputs):
self.add_metric(math_ops.reduce_mean(inputs), name='metric_1')
return inputs + 1
layers = [TestLayer(input_shape=(1,))]
layers.append(keras.layers.Dense(2, kernel_initializer='ones'))
x = np.ones(shape=(10, 1))
y = np.ones(shape=(10, 2))
with self.assertRaisesRegexp(
ValueError,
'We do not support adding an aggregated metric result tensor that is '
'not the output of a `tf.keras.metrics.Metric` metric instance.'):
model = testing_utils.get_model_from_layers(layers, input_shape=(1,))
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(0.01),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
@keras_parameterized.run_all_keras_modes
def test_duplicate_metric_name_in_add_metric(self):
class TestModel(keras.Model):
def __init__(self):
super(TestModel, self).__init__(name='test_model')
self.dense1 = keras.layers.Dense(2, kernel_initializer='ones')
self.mean = metrics_module.Mean(name='metric_1')
self.mean2 = metrics_module.Mean(name='metric_1')
def call(self, x):
self.add_metric(self.mean(x), name='metric_1')
return self.dense1(x)
model = TestModel()
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(0.01),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.ones(shape=(10, 1))
y = np.ones(shape=(10, 2))
with self.assertRaisesRegexp(
ValueError,
'Please provide different names for the metrics you have added. '
'We found 2 metrics with the name: "metric_1"'):
model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
@keras_parameterized.run_all_keras_modes
def test_add_metric_without_name(self):
class TestModel(keras.Model):
def __init__(self):
super(TestModel, self).__init__(name='test_model')
self.dense1 = keras.layers.Dense(2, kernel_initializer='ones')
def call(self, x):
self.add_metric(math_ops.reduce_sum(x), aggregation='mean')
return self.dense1(x)
model = TestModel()
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(0.01),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.ones(shape=(10, 1))
y = np.ones(shape=(10, 2))
with self.assertRaisesRegex(ValueError,
'Please provide a name for your metric like'):
model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
@keras_parameterized.run_all_keras_modes
def test_add_metric_correctness(self):
inputs = keras.Input(shape=(1,))
targets = keras.Input(shape=(1,))
class Bias(keras.layers.Layer):
def build(self, input_shape):
self.bias = self.add_variable('bias', (1,), initializer='zeros')
self.mae = metrics_module.MeanAbsoluteError(name='mae_1')
def call(self, inputs):
inputs, targets = inputs
outputs = inputs + self.bias
self.add_metric(self.mae(targets, outputs), name='mae_1')
return outputs
outputs = Bias()([inputs, targets])
model = keras.Model([inputs, targets], outputs)
model.add_metric(
metrics_module.mean_absolute_error(targets, outputs),
name='mae_2',
aggregation='mean')
model.compile(
loss='mae',
optimizer=keras.optimizer_v2.gradient_descent.SGD(0.1),
metrics=[metrics_module.MeanAbsoluteError(name='mae_3')],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.array([[0.], [1.], [2.]])
y = np.array([[0.5], [2.], [3.5]])
history = model.fit([x, y], y, batch_size=3, epochs=5)
expected_val = [1., 0.9, 0.8, 0.7, 0.6]
for key in ['loss', 'mae_1', 'mae_2', 'mae_3']:
self.assertAllClose(history.history[key], expected_val, 1e-3)
@keras_parameterized.run_all_keras_modes
def test_add_metric_order(self):
class MyLayer(keras.layers.Layer):
def call(self, inputs, training=None, mask=None):
self.add_metric(
array_ops.ones([32]) * 2.0, name='two', aggregation='mean')
return inputs
class MyModel(keras.Model):
def __init__(self, **kwargs):
super(MyModel, self).__init__(**kwargs)
self._sampler = MyLayer(name='sampler')
def call(self, inputs, training=None, mask=None):
z = self._sampler(inputs)
self.add_metric(
array_ops.ones([32]) * 1.0, name='one', aggregation='mean')
self.add_metric(
array_ops.ones([32]) * 3.0, name='three', aggregation='mean')
return z
xdata = np.random.uniform(size=[32, 16]).astype(np.float32)
dataset_train = dataset_ops.Dataset.from_tensor_slices((xdata, xdata))
dataset_train = dataset_train.batch(32, drop_remainder=True)
model = MyModel()
model.compile(
optimizer='sgd',
loss='mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
history = model.fit(dataset_train, epochs=3)
self.assertDictEqual(
history.history, {
'loss': [0.0, 0.0, 0.0],
'three': [3.0, 3.0, 3.0],
'two': [2.0, 2.0, 2.0],
'one': [1.0, 1.0, 1.0]
})
@keras_parameterized.run_all_keras_modes
def test_model_with_nested_compiled_model(self):
class LayerWithAddMetric(keras.layers.Layer):
def __init__(self):
super(LayerWithAddMetric, self).__init__()
self.dense = keras.layers.Dense(1, kernel_initializer='ones')
def call(self, inputs):
outputs = self.dense(inputs)
self.add_metric(
math_ops.reduce_sum(outputs), name='mean', aggregation='mean')
return outputs
x = keras.layers.Input(shape=(1,))
y = LayerWithAddMetric()(x)
inner_model = keras.models.Model(x, y)
inner_model.add_metric(
math_ops.reduce_sum(y), name='mean1', aggregation='mean')
inner_model.compile(
'sgd',
loss='mse',
metrics=[metrics_module.Accuracy('acc')],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
self.assertEqual([m.name for m in inner_model.metrics],
['acc', 'mean', 'mean1'])
x = keras.layers.Input(shape=[1])
y = inner_model(x)
outer_model = keras.Model(x, y)
outer_model.add_metric(
math_ops.reduce_sum(y), name='mean2', aggregation='mean')
outer_model.compile(
'sgd',
loss='mse',
metrics=[metrics_module.Accuracy('acc2')],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
self.assertEqual([m.name for m in outer_model.metrics],
['acc2', 'mean', 'mean1', 'mean2'])
class BareUpdateLayer(keras.layers.Layer):
def build(self, input_shape):
self.counter = self.add_weight(
'counter',
dtype='int32',
shape=(),
initializer='zeros',
trainable=False)
def call(self, inputs):
state_ops.assign_add(self.counter, 1)
return math_ops.cast(self.counter, inputs.dtype) * inputs
class LambdaUpdateLayer(keras.layers.Layer):
def build(self, input_shape):
self.counter = self.add_weight(
'counter',
dtype='int32',
shape=(),
initializer='zeros',
trainable=False)
def call(self, inputs):
# Make sure update isn't run twice.
self.add_update(lambda: state_ops.assign_add(self.counter, 1))
return math_ops.cast(self.counter, inputs.dtype) * inputs
class NestedUpdateLayer(keras.layers.Layer):
def build(self, input_shape):
self.layer = BareUpdateLayer()
self.layer.build(input_shape)
@property
def counter(self):
return self.layer.counter
def call(self, inputs):
return self.layer(inputs)
class SubgraphUpdateLayer(keras.layers.Layer):
def build(self, input_shape):
self.counter = self.add_weight(
'counter',
dtype='int32',
shape=(),
initializer='zeros',
trainable=False)
def call(self, inputs, training=None):
if training is None:
training = keras.backend.learning_phase()
if training:
self.counter.assign(self.counter + 1)
return inputs
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
class TestAutoUpdates(keras_parameterized.TestCase):
@keras_parameterized.run_with_all_model_types
@parameterized.named_parameters(('bare_update', BareUpdateLayer()),
('lambda_update', LambdaUpdateLayer()),
('nested_update', NestedUpdateLayer()))
def test_updates_in_model(self, layer):
x, y = np.ones((10, 10)), np.ones((10, 1))
model = testing_utils.get_model_from_layers(
[layer, keras.layers.Dense(1)], input_shape=(10,))
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, y, batch_size=2, epochs=1)
self.assertEqual(self.evaluate(layer.counter), 5)
@keras_parameterized.run_with_all_model_types
def test_lambda_updates_trainable_false(self):
x, y = np.ones((10, 10)), np.ones((10, 1))
layer = LambdaUpdateLayer()
model = testing_utils.get_model_from_layers(
[layer, keras.layers.Dense(1)], input_shape=(10,))
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, y, batch_size=2, epochs=1)
self.assertEqual(self.evaluate(layer.counter), 5)
layer.trainable = False
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, y, batch_size=2, epochs=1)
self.assertEqual(self.evaluate(layer.counter), 5)
@keras_parameterized.run_with_all_model_types
def test_subgraph_updates_in_model(self):
layer = SubgraphUpdateLayer()
x, y = np.ones((10, 10)), np.ones((10, 1))
model = testing_utils.get_model_from_layers(
[layer, keras.layers.Dense(1)], input_shape=(10,))
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, y, batch_size=2, epochs=1)
self.assertEqual(self.evaluate(layer.counter), 5)
@parameterized.named_parameters(('bare_update', BareUpdateLayer()),
('lambda_update', LambdaUpdateLayer()),
('nested_update', NestedUpdateLayer()))
def test_updates_standalone_layer(self, layer):
y = layer(np.ones((10, 10)))
self.evaluate(layer.counter.initializer)
self.evaluate(y)
self.assertEqual(self.evaluate(layer.counter), 1)
def test_trainable_false_standalone_layer(self):
layer = LambdaUpdateLayer()
y = layer(np.ones((10, 10)))
self.evaluate(layer.counter.initializer)
self.evaluate(y)
self.assertEqual(self.evaluate(layer.counter), 1)
layer.trainable = False
y = layer(np.ones((10, 10)))
self.evaluate(y)
self.assertEqual(self.evaluate(layer.counter), 1)
@keras_parameterized.run_with_all_model_types
def test_batchnorm_trainable_false(self):
bn = keras.layers.BatchNormalization()
model = testing_utils.get_model_from_layers([bn, keras.layers.Dense(1)],
input_shape=(10,))
bn.trainable = False
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x, y = np.ones((10, 10)), np.ones((10, 1))
model.fit(x, y, batch_size=2, epochs=1)
self.assertAllEqual(self.evaluate(bn.moving_mean), np.zeros((10,)))
self.assertAllEqual(self.evaluate(bn.moving_variance), np.ones((10,)))
if __name__ == '__main__':
test.main()