blob: 15976c0a072dd452676b24f8c063092b5155fd82 [file] [log] [blame]
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for training routines."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import io
import sys
from absl.testing import parameterized
import numpy as np
import six
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.eager import context
from tensorflow.python.eager import def_function
from tensorflow.python.eager import function
from tensorflow.python.framework import ops
from tensorflow.python.framework import tensor_shape
from tensorflow.python.framework import test_util as tf_test_util
from tensorflow.python.keras import backend
from tensorflow.python.keras import combinations
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import layers as layers_module
from tensorflow.python.keras import losses
from tensorflow.python.keras import metrics as metrics_module
from tensorflow.python.keras import optimizer_v2
from tensorflow.python.keras import testing_utils
from tensorflow.python.keras.callbacks import Callback
from tensorflow.python.keras.engine import input_layer
from tensorflow.python.keras.engine import sequential
from tensorflow.python.keras.engine import training as training_module
from tensorflow.python.keras.engine import training_utils
from tensorflow.python.keras.utils import data_utils
from tensorflow.python.keras.utils import np_utils
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import nn_ops
from tensorflow.python.ops import sparse_ops
from tensorflow.python.ops import state_ops
from tensorflow.python.ops import template
from tensorflow.python.ops import variable_scope
from tensorflow.python.ops import variables as variables_lib
from tensorflow.python.platform import test
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.training.rmsprop import RMSPropOptimizer
try:
import scipy.sparse as scipy_sparse # pylint: disable=g-import-not-at-top
except ImportError:
scipy_sparse = None
class TrainingTest(keras_parameterized.TestCase):
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_fit_training_arg(self):
class ReturnTraining(layers_module.Layer):
def call(self, inputs, training):
if training:
return inputs + array_ops.constant([100], 'float32')
else:
return inputs + array_ops.constant([0], 'float32')
model = sequential.Sequential([ReturnTraining()])
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
hist = model.fit(x=np.array([0.]), y=np.array([0.]))
self.assertAllClose(hist.history['loss'][0], 10000)
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_fit_on_empty(self):
model = sequential.Sequential([layers_module.Dense(1)])
model.compile('sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly())
with self.assertRaisesRegex(ValueError,
'Expect x to be a non-empty array or dataset.'):
model.fit(x=np.array([]), y=np.array([]))
@keras_parameterized.run_all_keras_modes
def test_run_eagerly_setting(self):
model = sequential.Sequential([layers_module.Dense(1)])
run_eagerly = testing_utils.should_run_eagerly()
model.compile('sgd', 'mse', run_eagerly=run_eagerly)
self.assertEqual(model.run_eagerly, run_eagerly)
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
@parameterized.named_parameters(
('train_on_batch', 'train_on_batch'),
('test_on_batch', 'test_on_batch'),
('predict_on_batch', 'predict_on_batch'),
('fit', 'fit'),
('evaluate', 'evaluate'),
('predict', 'predict'),
)
def test_disallow_methods_inside_tf_function(self, method_name):
model = sequential.Sequential([layers_module.Dense(1)])
run_eagerly = testing_utils.should_run_eagerly()
model.compile('sgd', 'mse', run_eagerly=run_eagerly)
@def_function.function
def my_fn():
getattr(model, method_name)(1)
error_msg = 'inside a `tf.function`'
with self.assertRaisesRegex(RuntimeError, error_msg):
my_fn()
@keras_parameterized.run_all_keras_modes
def test_fit_and_validate_learning_phase(self):
class ReturnTraining(layers_module.Layer):
def call(self, inputs):
return backend.in_train_phase(lambda: array_ops.ones_like(inputs),
lambda: array_ops.zeros_like(inputs))
model = sequential.Sequential([ReturnTraining(input_shape=(2,))])
model.compile(
'sgd',
loss='mae',
run_eagerly=testing_utils.should_run_eagerly())
inputs = np.ones((40, 2), dtype=np.float32)
targets = np.ones((40, 1), dtype=np.float32)
# Test correctness with `steps_per_epoch`.
train_dataset = dataset_ops.Dataset.from_tensor_slices(
(inputs, targets)).batch(10)
val_dataset = dataset_ops.Dataset.from_tensor_slices(
(inputs, targets)).batch(10)
history = model.fit(
train_dataset, epochs=2, verbose=1, validation_data=val_dataset)
# The training loss should be 0.0
self.assertAllClose(history.history['loss'][0], 0.0)
# The validation loss should be 1.0.
self.assertAllClose(history.history['val_loss'][0], 1.0)
@keras_parameterized.run_all_keras_modes
def test_fit_and_validate_training_arg(self):
class ReturnTraining(layers_module.Layer):
def call(self, inputs, training=None):
return backend.in_train_phase(
lambda: array_ops.ones_like(inputs),
lambda: array_ops.zeros_like(inputs),
training=training)
model = sequential.Sequential([ReturnTraining(input_shape=(2,))])
model.compile(
'sgd',
loss='mae',
run_eagerly=testing_utils.should_run_eagerly())
inputs = np.ones((40, 2), dtype=np.float32)
targets = np.ones((40, 1), dtype=np.float32)
# Test correctness with `steps_per_epoch`.
train_dataset = dataset_ops.Dataset.from_tensor_slices(
(inputs, targets)).batch(10)
val_dataset = dataset_ops.Dataset.from_tensor_slices(
(inputs, targets)).batch(10)
history = model.fit(
train_dataset, epochs=2, verbose=1, validation_data=val_dataset)
# The training loss should be 0.0
self.assertAllClose(history.history['loss'][0], 0.0)
# The validation loss should be 1.0.
self.assertAllClose(history.history['val_loss'][0], 1.0)
@keras_parameterized.run_all_keras_modes
@keras_parameterized.run_with_all_model_types
def test_target_dtype_matches_output(self):
def loss_fn(labels, preds):
self.assertEqual(labels.dtype, preds.dtype)
return labels - preds
layers = [
layers_module.Dense(10, dtype=np.float64),
layers_module.Dense(10, dtype=np.float64)
]
model = testing_utils.get_model_from_layers(layers, input_shape=(1,))
inputs = np.ones(10, dtype=np.float64)
targets = np.ones(10, dtype=np.float64)
model.compile(
'sgd',
loss=loss_fn,
run_eagerly=testing_utils.should_run_eagerly())
model.train_on_batch(inputs, targets)
model.test_on_batch(inputs, targets)
self.assertEqual(model.predict(inputs).dtype, np.float64)
@keras_parameterized.run_all_keras_modes
def test_fit_and_validate_nested_training_arg(self):
class NestedReturnTraining(layers_module.Layer):
def call(self, inputs, training=None):
return backend.in_train_phase(
lambda: array_ops.ones_like(inputs),
lambda: array_ops.zeros_like(inputs),
training=training)
class ReturnTraining(layers_module.Layer):
def __init__(self, input_shape=None, **kwargs):
super(ReturnTraining, self).__init__(input_shape=input_shape, **kwargs)
self._nested_layer = None
def build(self, input_shape):
self._nested_layer = NestedReturnTraining()
self.built = True
def call(self, inputs):
return self._nested_layer(inputs)
model = sequential.Sequential([ReturnTraining(input_shape=(2,))])
model.compile(
'sgd',
loss='mae',
run_eagerly=testing_utils.should_run_eagerly())
inputs = np.ones((40, 2), dtype=np.float32)
targets = np.ones((40, 1), dtype=np.float32)
# Test correctness with `steps_per_epoch`.
train_dataset = dataset_ops.Dataset.from_tensor_slices(
(inputs, targets)).batch(10)
val_dataset = dataset_ops.Dataset.from_tensor_slices(
(inputs, targets)).batch(10)
history = model.fit(
train_dataset, epochs=2, verbose=1, validation_data=val_dataset)
# The training loss should be 0.0
self.assertAllClose(history.history['loss'][0], 0.0)
# The validation loss should be 1.0.
self.assertAllClose(history.history['val_loss'][0], 1.0)
@keras_parameterized.run_with_all_model_types(exclude_models='sequential')
@keras_parameterized.run_all_keras_modes
def test_fit_on_arrays(self):
input_a = layers_module.Input(shape=(3,), name='input_a')
input_b = layers_module.Input(shape=(3,), name='input_b')
dense = layers_module.Dense(4, name='dense')
dropout = layers_module.Dropout(0.5, name='dropout')
branch_a = [input_a, dense]
branch_b = [input_b, dense, dropout]
model = testing_utils.get_multi_io_model(branch_a, branch_b)
optimizer = RMSPropOptimizer(learning_rate=0.001)
loss = 'mse'
loss_weights = [1., 0.5]
model.compile(
optimizer,
loss,
metrics=[metrics_module.CategoricalAccuracy(), 'mae'],
loss_weights=loss_weights,
run_eagerly=testing_utils.should_run_eagerly())
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 3))
output_d_np = np.random.random((10, 4))
output_e_np = np.random.random((10, 4))
# Test fit at different verbosity
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=1,
batch_size=5,
verbose=0)
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=1,
batch_size=5,
verbose=1)
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=2,
batch_size=5,
verbose=2)
model.train_on_batch([input_a_np, input_b_np], [output_d_np, output_e_np])
# Test with validation data
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
validation_data=([input_a_np, input_b_np], [output_d_np,
output_e_np]),
epochs=1,
batch_size=5,
verbose=0)
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
validation_data=([input_a_np, input_b_np], [output_d_np,
output_e_np]),
epochs=2,
batch_size=5,
verbose=1)
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
validation_data=([input_a_np, input_b_np], [output_d_np,
output_e_np]),
epochs=2,
batch_size=5,
verbose=2)
# Test with validation split
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=2,
batch_size=5,
verbose=0,
validation_split=0.2)
if testing_utils.get_model_type() == 'functional':
# Test with dictionary inputs
model.fit(
{
'input_a': input_a_np,
'input_b': input_b_np
}, {
'dense': output_d_np,
'dropout': output_e_np
},
epochs=1,
batch_size=5,
verbose=0)
model.fit(
{
'input_a': input_a_np,
'input_b': input_b_np
}, {
'dense': output_d_np,
'dropout': output_e_np
},
epochs=1,
batch_size=5,
verbose=1)
model.fit(
{
'input_a': input_a_np,
'input_b': input_b_np
}, {
'dense': output_d_np,
'dropout': output_e_np
},
validation_data=({
'input_a': input_a_np,
'input_b': input_b_np
}, {
'dense': output_d_np,
'dropout': output_e_np
}),
epochs=1,
batch_size=5,
verbose=0)
model.train_on_batch({
'input_a': input_a_np,
'input_b': input_b_np
}, {
'dense': output_d_np,
'dropout': output_e_np
})
# Test with lists for loss, metrics
loss = ['mae', 'mse']
model.compile(
optimizer,
loss,
metrics=[metrics_module.CategoricalAccuracy(), 'mae'],
run_eagerly=testing_utils.should_run_eagerly())
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=1,
batch_size=5,
verbose=0)
# Test with dictionaries for loss, metrics, loss weights
if testing_utils.get_model_type() == 'functional':
loss = {'dense': 'mse', 'dropout': 'mae'}
loss_weights = {'dense': 1., 'dropout': 0.5}
metrics = {
'dense': 'mse',
'dropout': metrics_module.CategoricalAccuracy()
}
model.compile(
optimizer,
loss,
metrics=metrics,
loss_weights=loss_weights,
run_eagerly=testing_utils.should_run_eagerly())
model.fit(
[input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=1,
batch_size=5,
verbose=0)
# Build single-input model
x = layers_module.Input(shape=(3,), name='input_a')
y = layers_module.Dense(4)(x)
model = training_module.Model(x, y)
model.compile(
optimizer,
loss='mse',
run_eagerly=testing_utils.should_run_eagerly())
# This will work
model.fit([input_a_np], output_d_np, epochs=1)
# Test model on a list of floats
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 4))
# Test execution on inputs that are lists of scalars.
# TF2 and TF1 have slightly different semantics:
if context.executing_eagerly():
# In TF2 to avoid any ambiguity when there are nested lists
# the entire input gets converted to a
# single numpy array (& it only works in the case of a single io model)
model.fit(np.ndarray.tolist(input_a_np),
np.ndarray.tolist(input_b_np),
epochs=2,
batch_size=5,
verbose=2)
else:
# In TF1 there was logic to try disambiguating between the individual
# inputs when lists are nested. This allowed multi-io functional models
# to support lists of scalars as input, but it caused ambiguity issues
# for subclass models & made it trickier to pass multi-dimensional inputs
# as lists of scalars to single io models. This was an excessive amount
# of complexity for what boiled down to a convenience method we were
# mainly just using for writing tests.
model.fit([np.ndarray.tolist(input_a_np)],
[np.ndarray.tolist(input_b_np)],
epochs=2,
batch_size=5,
verbose=2)
@keras_parameterized.run_all_keras_modes
def test_evaluate_predict_on_arrays(self):
a = layers_module.Input(shape=(3,), name='input_a')
b = layers_module.Input(shape=(3,), name='input_b')
dense = layers_module.Dense(4, name='dense')
c = dense(a)
d = dense(b)
e = layers_module.Dropout(0.5, name='dropout')(c)
model = training_module.Model([a, b], [d, e])
optimizer = RMSPropOptimizer(learning_rate=0.001)
loss = 'mse'
loss_weights = [1., 0.5]
model.compile(
optimizer,
loss,
metrics=['mae', metrics_module.CategoricalAccuracy()],
loss_weights=loss_weights,
sample_weight_mode=None,
run_eagerly=testing_utils.should_run_eagerly())
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 3))
output_d_np = np.random.random((10, 4))
output_e_np = np.random.random((10, 4))
# Test evaluate at different verbosity
out = model.evaluate(
[input_a_np, input_b_np], [output_d_np, output_e_np],
batch_size=5,
verbose=0)
self.assertEqual(len(out), 7)
out = model.evaluate(
[input_a_np, input_b_np], [output_d_np, output_e_np],
batch_size=5,
verbose=1)
self.assertEqual(len(out), 7)
out = model.evaluate(
[input_a_np, input_b_np], [output_d_np, output_e_np],
batch_size=5,
verbose=2)
self.assertEqual(len(out), 7)
out = model.test_on_batch([input_a_np, input_b_np],
[output_d_np, output_e_np])
self.assertEqual(len(out), 7)
# Test evaluate with dictionary inputs
model.evaluate(
{
'input_a': input_a_np,
'input_b': input_b_np
}, {
'dense': output_d_np,
'dropout': output_e_np
},
batch_size=5,
verbose=0)
model.evaluate(
{
'input_a': input_a_np,
'input_b': input_b_np
}, {
'dense': output_d_np,
'dropout': output_e_np
},
batch_size=5,
verbose=1)
# Test predict
out = model.predict([input_a_np, input_b_np], batch_size=5)
self.assertEqual(len(out), 2)
out = model.predict({'input_a': input_a_np, 'input_b': input_b_np})
self.assertEqual(len(out), 2)
out = model.predict_on_batch({
'input_a': input_a_np,
'input_b': input_b_np
})
self.assertEqual(len(out), 2)
def _make_sequence_input_functions(self, input_type):
# train and test
xy_namedtuple = collections.namedtuple('xy_namedtuple', ['x', 'y'])
# predict
x_namedtuple = collections.namedtuple('x_namedtuple', ['x'])
if input_type == 'dataset':
dataset = dataset_ops.Dataset.range(16).map(
lambda _: array_ops.ones(shape=(1,)))
xy_dataset = dataset_ops.Dataset.zip((dataset, dataset)).batch(4)
x_dataset = dataset.batch(4)
def xy_function(use_namedtuple):
return xy_dataset.map(xy_namedtuple) if use_namedtuple else xy_dataset
def x_function(use_namedtuple):
return x_dataset.map(x_namedtuple) if use_namedtuple else x_dataset
return xy_function, x_function
elif input_type == 'generator':
def xy_generator(use_namedtuple):
x, y = np.ones((4, 1)), np.ones((4, 1))
for _ in range(4):
if use_namedtuple:
yield xy_namedtuple(x, y)
else:
yield x, y
def x_generator(use_namedtuple):
x = np.ones((4, 1))
for _ in range(4):
if use_namedtuple:
yield x_namedtuple(x)
else:
yield x
return xy_generator, x_generator
elif input_type == 'sequence':
class XYSequence(data_utils.Sequence):
def __init__(self, use_namedtuple):
self._use_namedtuple = use_namedtuple
super(XYSequence, self).__init__()
def __getitem__(self, idx):
x, y = np.ones((4, 1)), np.ones((4, 1))
if self._use_namedtuple:
return xy_namedtuple(x, y)
return x, y
def __len__(self):
return 4
class XSequence(data_utils.Sequence):
def __init__(self, use_namedtuple):
self._use_namedtuple = use_namedtuple
super(XSequence, self).__init__()
def __getitem__(self, idx):
x = np.ones((4, 1))
if self._use_namedtuple:
return x_namedtuple(x)
return x
def __len__(self):
return 4
return XYSequence, XSequence
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
@keras_parameterized.run_with_all_model_types
@parameterized.named_parameters(
('dataset', 'dataset'),
('generator', 'generator'),
('sequence', 'sequence'),
)
def test_sequence_input_types(self, input_type):
"""Ensure that namedtuples and tuples are plumbed identically."""
if not context.executing_eagerly():
self.skipTest('Improved checking is only present in data_adapter.')
xy_function, x_function = self._make_sequence_input_functions(input_type)
fit_kwargs, evaluate_kwargs, predict_kwargs = {}, {}, {}
if input_type == 'generator':
fit_kwargs['steps_per_epoch'] = 4
evaluate_kwargs['steps'] = 4
predict_kwargs['steps'] = 4
model = testing_utils.get_small_mlp(1, 1, 1)
model.compile(
loss='mse',
optimizer='sgd',
run_eagerly=testing_utils.should_run_eagerly())
model.fit(xy_function(use_namedtuple=False), **fit_kwargs)
model.evaluate(xy_function(use_namedtuple=False), **evaluate_kwargs)
model.predict(x_function(use_namedtuple=False), **predict_kwargs)
@keras_parameterized.run_all_keras_modes
def test_custom_mapping_in_config(self):
class MyModel(training_module.Model):
def call(self, inputs):
return inputs
def get_config(self):
self.a = {}
return {'a': self.a}
model = MyModel()
self.assertIn('{"a": {}}', model.to_json())
def test_training_on_sparse_data_with_dense_placeholders_v1(self):
with ops.Graph().as_default():
if scipy_sparse is None:
return
test_inputs = [
scipy_sparse.random(6, 3, density=0.25).tocsr() for _ in range(2)
]
test_outputs = [
scipy_sparse.random(6, i, density=0.25).tocsr() for i in range(3, 5)
]
in1 = layers_module.Input(shape=(3,))
in2 = layers_module.Input(shape=(3,))
out1 = layers_module.Dropout(0.5, name='dropout')(in1)
out2 = layers_module.Dense(4, name='dense_1')(in2)
model = training_module.Model([in1, in2], [out1, out2])
model.predict(test_inputs, batch_size=2)
optimizer = 'rmsprop'
model.compile(
optimizer,
'mse',
metrics=['mae', metrics_module.CategoricalAccuracy()])
model.fit(test_inputs, test_outputs,
epochs=1, batch_size=2, validation_split=0.5)
model.evaluate(test_inputs, test_outputs, batch_size=2)
@keras_parameterized.run_all_keras_modes
def test_compile_with_sparse_placeholders(self):
inputs = layers_module.Input(shape=(10,), sparse=True)
weights = variables_lib.Variable(
np.ones((10, 1)).astype(np.float32), name='weights')
weights_mult = lambda x: sparse_ops.sparse_tensor_dense_matmul(x, weights)
output_layer = layers_module.Lambda(weights_mult)(inputs)
model = training_module.Model([inputs], output_layer)
model.compile(
loss='binary_crossentropy',
optimizer='adam',
metrics=['accuracy'],
run_eagerly=testing_utils.should_run_eagerly())
@keras_parameterized.run_all_keras_modes
def test_that_trainable_disables_updates(self):
val_a = np.random.random((10, 4))
val_out = np.random.random((10, 4))
a = layers_module.Input(shape=(4,))
layer = layers_module.BatchNormalization(input_shape=(4,))
b = layer(a)
model = training_module.Model(a, b)
model.trainable = False
if not ops.executing_eagerly_outside_functions():
self.assertEmpty(model.updates)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
if not ops.executing_eagerly_outside_functions():
self.assertEmpty(model.updates)
x1 = model.predict(val_a)
model.train_on_batch(val_a, val_out)
x2 = model.predict(val_a)
self.assertAllClose(x1, x2, atol=1e-7)
model.trainable = True
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
if not ops.executing_eagerly_outside_functions():
self.assertAllGreater(len(model.updates), 0)
model.train_on_batch(val_a, val_out)
x2 = model.predict(val_a)
assert np.abs(np.sum(x1 - x2)) > 1e-5
layer.trainable = False
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
if not ops.executing_eagerly_outside_functions():
self.assertEmpty(model.updates)
x1 = model.predict(val_a)
model.train_on_batch(val_a, val_out)
x2 = model.predict(val_a)
self.assertAllClose(x1, x2, atol=1e-7)
def test_weight_deduplication_in_methods(self):
inp = layers_module.Input(shape=(1,))
bn = layers_module.BatchNormalization()
d = layers_module.Dense(1)
m0 = training_module.Model(inp, d(bn(inp)))
m1 = training_module.Model(inp, d(bn(inp)))
x0 = m0(inp)
x1 = m1(inp)
x = layers_module.Add()([x0, x1])
model = training_module.Model(inp, x)
self.assertLen(model.trainable_weights, 4)
self.assertLen(model.non_trainable_weights, 2)
self.assertLen(model.weights, 6)
@keras_parameterized.run_all_keras_modes
def test_weight_deduplication(self):
class WatchingLayer(layers_module.Layer):
def __init__(self, dense_to_track):
# This will cause the kernel and bias to be double counted, effectively
# doubling the learning rate if weights are not deduped.
self._kernel = dense_to_track.kernel
self._bias = dense_to_track.bias
super(WatchingLayer, self).__init__()
inp = layers_module.Input(shape=(1,))
dense_layer = layers_module.Dense(1)
dense_output = dense_layer(inp) # This will build the dense kernel
# Deterministically set weights to make the test repeatable.
dense_layer.set_weights([np.ones((1, 1)), np.zeros((1,))])
output = WatchingLayer(dense_layer)(dense_output)
model = training_module.Model(inp, output)
# 0.25 is the edge of the radius of convergence for the double apply case.
# At lr=0.24, the double apply case will very slowly descend while the
# correct case will drop very quickly.
model.compile(
loss='mse',
optimizer=optimizer_v2.gradient_descent.SGD(0.24),
run_eagerly=testing_utils.should_run_eagerly())
x = np.ones((64 * 2,))
y = 4.5 * x - 3.
history = model.fit(x, y, batch_size=64, epochs=2, verbose=2)
# If the gradient apply is duplicated then the loss after 2 epochs will
# be ~0.15, compared to the correct answer of O(1e-7).
self.assertLess(history.history['loss'][-1], 1e-6)
@keras_parameterized.run_all_keras_modes
def test_weight_shared_across_layers(self):
class AddWeightLayer(layers_module.Layer):
def __init__(self, trainable_var, non_trainable_var):
self.trainable_var = trainable_var
self.non_trainable_var = non_trainable_var
super(AddWeightLayer, self).__init__()
def call(self, inputs):
return inputs + self.trainable_var
class LayerWithWeightSharedLayers(layers_module.Layer):
def __init__(self):
super(LayerWithWeightSharedLayers, self).__init__()
shared_trainable_var = variables_lib.Variable(1.)
shared_non_trainable_var = variables_lib.Variable(
1., trainable=False)
self.layer1 = AddWeightLayer(shared_trainable_var,
shared_non_trainable_var)
self.layer2 = AddWeightLayer(shared_trainable_var,
shared_non_trainable_var)
def call(self, inputs):
return self.layer2(self.layer1(inputs))
l = LayerWithWeightSharedLayers()
self.assertEqual(l._layers, [l.layer1, l.layer2])
self.assertEqual(l.variables,
[l.layer1.trainable_var, l.layer1.non_trainable_var])
self.assertEqual(l.trainable_variables, [l.layer1.trainable_var])
self.assertEqual(l.non_trainable_variables, [l.layer1.non_trainable_var])
self.assertLen(l.get_weights(), 2)
@keras_parameterized.run_all_keras_modes
def test_weight_tracking_for_template(self):
def variable_scoped_function(trainable=True):
return variable_scope.get_variable(
'dummy', shape=[1], trainable=trainable,
initializer=init_ops.zeros_initializer())
def nested_template():
nested1 = template.make_template('nested', variable_scoped_function)
nested2 = template.make_template('nested', variable_scoped_function)
v1 = nested1()
v2 = nested2()
# nested1 and nested2 should not share variables
self.assertIsNot(v1, v2)
# Variables created by nested1 should be isolated from variables
# created by nested2.
self.assertEqual(1, len(nested1.variables))
self.assertEqual(1, len(nested2.variables))
self.assertIs(nested1.variables[0], v1)
self.assertIs(nested2.variables[0], v2)
self.assertEqual(1, len(nested1.trainable_variables))
self.assertEqual(1, len(nested2.trainable_variables))
self.assertIs(nested1.trainable_variables[0], v1)
self.assertIs(nested2.trainable_variables[0], v2)
self.assertEqual(len(nested1.non_trainable_variables), 0)
self.assertEqual(len(nested2.non_trainable_variables), 0)
return v1, v2
tmpl1 = template.make_template('s1', nested_template)
tmpl2 = template.make_template('s1', nested_template)
v1, v2 = tmpl1()
v5, v6 = tmpl2()
model = training_module.Model()
model.template = tmpl1
self.assertEqual(2, len(model.variables))
self.assertIs(model.variables[0], v1)
self.assertIs(model.variables[1], v2)
self.assertEqual(2, len(model.variables))
self.assertIs(model.trainable_variables[0], v1)
self.assertIs(model.trainable_variables[1], v2)
self.assertEqual(len(model.non_trainable_variables), 0)
model.templates = [tmpl2]
for v, w in zip(model.variables, [v1, v2, v5, v6]):
self.assertIs(v, w)
for v, w in zip(model.trainable_variables, [v1, v2, v5, v6]):
self.assertIs(v, w)
self.assertEqual(len(model.non_trainable_variables), 0)
# Make sure losses, layers, and updates aren't broken by having a Template
# in the mix, which does not expose any updates or losses.
self.assertEqual([], model.layers)
self.assertEqual([], model.updates)
self.assertEqual([], model.losses)
self.assertEqual([], model.templates.layers)
self.assertEqual([], model.templates.updates)
self.assertEqual([], model.templates.losses)
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_logs_passed_to_callbacks(self):
input_dim = 5
num_classes = 1
class TestCallback(Callback):
def __init__(self):
super(TestCallback, self).__init__()
self.epoch_end_logs = None
self.batch_end_logs = None
self.epoch_end_call_count = 0
self.batch_end_call_count = 0
def on_epoch_end(self, epoch, logs=None):
self.epoch_end_logs = logs
self.epoch_end_call_count += 1
def on_batch_end(self, batch, logs=None):
self.batch_end_logs = logs
self.batch_end_call_count += 1
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=num_classes, input_dim=input_dim)
model.compile(
loss='binary_crossentropy',
metrics=['acc'],
weighted_metrics=['mae'],
optimizer=RMSPropOptimizer(learning_rate=0.01),
run_eagerly=testing_utils.should_run_eagerly())
np.random.seed(1337)
(x_train, y_train), (_, _) = testing_utils.get_test_data(
train_samples=10,
test_samples=10,
input_shape=(input_dim,),
num_classes=num_classes)
test_callback = TestCallback()
model.fit(
x_train,
y_train,
batch_size=2,
epochs=2,
verbose=0,
callbacks=[test_callback],
validation_data=(x_train, y_train))
self.assertEqual(test_callback.batch_end_call_count, 10)
self.assertEqual(test_callback.epoch_end_call_count, 2)
self.assertSetEqual(
set(test_callback.batch_end_logs.keys()), set(['acc', 'loss', 'mae']))
self.assertSetEqual(
set(test_callback.epoch_end_logs.keys()),
set(['acc', 'loss', 'mae', 'val_acc', 'val_loss', 'val_mae']))
@keras_parameterized.run_all_keras_modes
def test_mismatched_output_shape_and_target_shape(self):
model = sequential.Sequential([
layers_module.Dense(2, input_shape=(3, 4)),
layers_module.Dense(5),
])
model.compile(
RMSPropOptimizer(learning_rate=0.001),
loss='sparse_categorical_crossentropy',
run_eagerly=testing_utils.should_run_eagerly())
# Test with Numpy data
x_train = np.random.random((10, 3, 4)).astype(np.float32)
y_train = np.random.randint(0, 5, size=(10, 3)).astype(np.float32)
model.fit(x_train, y_train, batch_size=5, epochs=1)
# Test with iterator
dataset = dataset_ops.Dataset.from_tensor_slices((x_train, y_train))
dataset = dataset.repeat(10)
dataset = dataset.batch(10)
model.fit(dataset, epochs=1, steps_per_epoch=2)
if context.executing_eagerly():
# Test with eager execution
model.compile(RMSPropOptimizer(learning_rate=0.001),
loss='sparse_categorical_crossentropy',
run_eagerly=True)
model.fit(x_train, y_train, batch_size=5, epochs=1)
# Test with eager execution and iterator
model.fit(dataset, epochs=1, steps_per_epoch=2)
def test_losses_in_defun(self):
with context.eager_mode():
layer = layers_module.Dense(1, kernel_regularizer='l1')
layer(array_ops.ones([1, 10]))
@function.defun
def get_losses():
return layer.losses
self.assertAllEqual(
self.evaluate(layer.losses), self.evaluate(get_losses()))
@keras_parameterized.run_all_keras_modes
def test_logging(self):
mock_stdout = io.BytesIO() if six.PY2 else io.StringIO()
model = sequential.Sequential()
model.add(layers_module.Dense(10, activation='relu'))
model.add(layers_module.Dense(1, activation='sigmoid'))
model.compile(
RMSPropOptimizer(learning_rate=0.001),
loss='binary_crossentropy',
run_eagerly=testing_utils.should_run_eagerly())
with test.mock.patch.object(sys, 'stdout', mock_stdout):
model.fit(
np.ones((10, 10), 'float32'), np.ones((10, 1), 'float32'), epochs=10)
self.assertTrue('Epoch 5/10' in mock_stdout.getvalue())
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_training_with_loss_instance(self):
a = layers_module.Input(shape=(3,), name='input_a')
b = layers_module.Input(shape=(3,), name='input_b')
dense = layers_module.Dense(4, name='dense')
c = dense(a)
d = dense(b)
e = layers_module.Dropout(0.5, name='dropout')(c)
model = training_module.Model([a, b], [d, e])
loss_weights = [1., 0.5]
model.compile(
RMSPropOptimizer(learning_rate=0.001),
loss=losses.MeanSquaredError(),
metrics=[metrics_module.CategoricalAccuracy(), 'mae'],
loss_weights=loss_weights)
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 3))
output_d_np = np.random.random((10, 4))
output_e_np = np.random.random((10, 4))
model.fit([input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=1,
batch_size=5)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_static_batch_in_input_layer(self):
if context.executing_eagerly():
self.skipTest('Not inferred in eager.')
class Counter(Callback):
def __init__(self):
self.batches = 0
def on_batch_end(self, batch, logs=None):
self.batches += 1
x, y = np.ones((64, 10), 'float32'), np.ones((64, 1), 'float32')
for batch_size, expected_batches in [(None, 2), (4, 16)]:
inputs = input_layer.Input(batch_size=batch_size, shape=(10,))
outputs = layers_module.Dense(1, activation='sigmoid')(inputs)
model = training_module.Model(inputs, outputs)
model.compile(optimizer_v2.adam.Adam(0.001), 'binary_crossentropy')
counter = Counter()
model.fit(x, y, callbacks=[counter])
self.assertEqual(counter.batches, expected_batches)
model = sequential.Sequential(
[layers_module.Dense(1, batch_input_shape=(batch_size, 10))])
model.compile(optimizer_v2.adam.Adam(0.001), 'binary_crossentropy')
counter = Counter()
model.fit(x, y, callbacks=[counter])
self.assertEqual(counter.batches, expected_batches)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_static_batch_in_input_layer_consistency_checks(self):
if context.executing_eagerly():
self.skipTest('Not inferred in eager.')
x, y = np.ones((64, 10), 'float32'), np.ones((64, 1), 'float32')
inputs = input_layer.Input(batch_size=2, shape=(10,))
outputs = layers_module.Dense(1, activation='sigmoid')(inputs)
model = training_module.Model(inputs, outputs)
model.compile(optimizer_v2.adam.Adam(0.001), 'binary_crossentropy')
with self.assertRaisesRegex(ValueError,
'incompatible with the specified batch size'):
model.fit(x, y, batch_size=4)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_compatible_batch_size_functional_model(self):
class MyLayer(layers_module.Layer):
def call(self, inputs):
return array_ops.concat(inputs, axis=0)
input1 = input_layer.Input(batch_size=2, shape=(10,))
input2 = input_layer.Input(batch_size=3, shape=(10,))
outputs = MyLayer()([input1, input2])
with self.assertRaisesRegex(ValueError,
'specified batch sizes of the Input Layers'):
training_module.Model([input1, input2], outputs)
@combinations.generate(combinations.combine(mode=['graph', 'eager']))
def test_calling_subclass_model_on_different_datasets(self):
class SubclassedModel(training_module.Model):
def call(self, inputs):
return inputs * 2
model = SubclassedModel()
dataset_one = dataset_ops.Dataset.range(2).batch(2)
dataset_two = dataset_ops.Dataset.range(3, 10).batch(2)
self.assertAllEqual([[0], [2]], model.predict(dataset_one, steps=1))
self.assertAllEqual([[6], [8], [10], [12]],
model.predict(dataset_two, steps=2))
def test_training_on_sparse_categorical_crossentropy_loss_with_softmax(self):
with context.eager_mode():
np.random.seed(1337)
train_x = np.ones((100, 4))
train_y = np.random.randint(0, 1, size=(100, 1))
reference_model = testing_utils.get_small_sequential_mlp(16, 2,
input_dim=4)
reference_model.compile(loss='sparse_categorical_crossentropy',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=True)
fixed_weights = reference_model.get_weights()
reference_model_loss = reference_model.train_on_batch(train_x, train_y)
test_model = testing_utils.get_small_sequential_mlp(16, 2, input_dim=4)
test_model.compile(loss='sparse_categorical_crossentropy',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=False)
test_model.set_weights(fixed_weights)
test_model_loss = test_model.train_on_batch(train_x, train_y)
self.assertAlmostEqual(test_model_loss, reference_model_loss, places=4)
def test_training_on_categorical_crossentropy_loss_with_softmax(self):
with context.eager_mode():
np.random.seed(1337)
train_x = np.ones((100, 4))
train_y = np_utils.to_categorical(
np.random.randint(0, 1, size=(100, 1)), 2)
reference_model = testing_utils.get_small_sequential_mlp(16, 2,
input_dim=4)
reference_model.compile(loss='categorical_crossentropy',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=True)
fixed_weights = reference_model.get_weights()
reference_model_loss = reference_model.train_on_batch(train_x, train_y)
test_model = testing_utils.get_small_sequential_mlp(16, 2, input_dim=4)
test_model.compile(loss='categorical_crossentropy',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=False)
test_model.set_weights(fixed_weights)
test_model_loss = test_model.train_on_batch(train_x, train_y)
self.assertAlmostEqual(test_model_loss, reference_model_loss, places=4)
def test_training_on_binary_crossentropy_loss(self):
with context.eager_mode():
train_x = np.ones((100, 4), dtype=np.float32)
train_y = np.ones((100, 1), dtype=np.float32)
reference_model = testing_utils.get_small_sequential_mlp(16, 1,
input_dim=4)
reference_model.compile(loss='binary_crossentropy',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=True)
fixed_weights = reference_model.get_weights()
reference_model_loss = reference_model.train_on_batch(train_x, train_y)
test_model = testing_utils.get_small_sequential_mlp(16, 1, input_dim=4)
test_model.compile(loss='binary_crossentropy',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=False)
test_model.set_weights(fixed_weights)
test_model_loss = test_model.train_on_batch(train_x, train_y)
self.assertAlmostEqual(test_model_loss, reference_model_loss, places=4)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
@parameterized.named_parameters(
('default', 1, 4), ('integer_two', 2, 2), ('integer_four', 4, 1),
('simple_list', [1, 3, 4], 3), ('duplicated_list', [4, 2, 2], 2))
def test_validation_freq(self, validation_freq, expected_runs):
x, y = np.ones((10, 10)), np.ones((10, 1))
model = testing_utils.get_small_mlp(2, 1, 10)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
class ValCounter(Callback):
def __init__(self):
self.val_runs = 0
def on_test_begin(self, logs=None):
self.val_runs += 1
val_counter = ValCounter()
model.fit(
x,
y,
epochs=4,
validation_data=(x, y),
validation_freq=validation_freq,
callbacks=[val_counter])
self.assertEqual(val_counter.val_runs, expected_runs)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_validation_steps_without_data(self):
if context.executing_eagerly():
self.skipTest('Check removed in new `fit`')
x, y = np.ones((10, 10)), np.ones((10, 1))
model = testing_utils.get_small_mlp(2, 1, 10)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
with self.assertRaisesRegex(
ValueError, '`validation_steps` should not be specified if '
'`validation_data` is None.'):
model.fit(x, y, epochs=4, validation_data=None, validation_steps=3)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_layer_with_variable_output(self):
class VariableOutputLayer(layers_module.Layer):
def build(self, input_shape):
self.v = self.add_weight('output_var', shape=(2, 5), initializer='ones')
def call(self, inputs):
return self.v
model = testing_utils.get_model_from_layers(
[VariableOutputLayer(), layers_module.Dense(1)], input_shape=(10,))
# TODO(omalleyt): Make this work with `run_eagerly=True`.
model.compile('sgd', 'mse', run_eagerly=False)
model.fit(np.ones((10, 10)), np.ones((10, 1)), batch_size=2, epochs=5)
self.assertLen(model.trainable_variables, 3)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
@testing_utils.enable_v2_dtype_behavior
def test_model_dtype(self):
class AssertTypeLayer(layers_module.Layer):
def call(self, inputs):
assert inputs.dtype.name == self.dtype, (
'Input tensor has type %s which does not match assert type %s' %
(inputs.dtype.name, self.assert_type))
return inputs + 1.
for dtype in ('float16', 'float32', 'float64'):
model = testing_utils.get_model_from_layers(
[AssertTypeLayer(dtype=dtype)], input_shape=(10,))
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
x = np.ones((10, 10))
y = np.ones((10, 10))
model.fit(x, y)
model.test_on_batch(x, y)
model(x)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
@testing_utils.enable_v2_dtype_behavior
def test_model_input_dtype(self):
model = testing_utils.get_small_mlp(1, 10, 10)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
x = np.ones((10, 10)).astype(np.float64)
y = np.ones((10, 10)).astype(np.float64)
dataset = dataset_ops.Dataset.from_tensor_slices((x, y)).batch(2)
model.fit(dataset)
self.assertEqual(model._compute_dtype, 'float32')
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_subclassed_model_with_training_arg(self):
class LayerWithTrainingArg(layers_module.Layer):
def call(self, inputs, training=None):
self.training = training
return inputs
class ModelWithTrainingArg(training_module.Model):
def __init__(self):
super(ModelWithTrainingArg, self).__init__()
self.l1 = LayerWithTrainingArg()
def call(self, inputs, training=None):
self.training = training
inputs = self.l1(inputs, training=training)
return inputs
x = np.zeros((1, 2))
model = ModelWithTrainingArg()
model.compile(
loss='mse',
optimizer='sgd',
run_eagerly=testing_utils.should_run_eagerly())
model.fit(x, x, epochs=1)
if context.executing_eagerly():
expected_training_arg = True
else:
expected_training_arg = backend.symbolic_learning_phase()
self.assertIs(model.training, expected_training_arg)
self.assertIs(model.l1.training, expected_training_arg)
@keras_parameterized.run_all_keras_modes
def test_error_when_model_is_not_compiled(self):
inputs = input_layer.Input(shape=(1,))
outputs = layers_module.Dense(1)(inputs)
model = training_module.Model(inputs, outputs)
with self.assertRaisesRegex(RuntimeError, 'must compile your model'):
model.fit(np.ones((1, 1)), np.ones((1, 1)))
class MyModel(training_module.Model):
def call(self, x):
self.add_loss(math_ops.reduce_sum(x))
return x
model = MyModel()
with self.assertRaisesRegex(RuntimeError, 'must compile your model'):
model.fit(np.random.random((32, 1)), epochs=2)
@keras_parameterized.run_all_keras_modes
@testing_utils.enable_v2_dtype_behavior
def test_losses_of_different_dtypes(self):
inp = input_layer.Input(shape=(2,))
out_1 = layers_module.Dense(
2, dtype='float32', kernel_regularizer='l2')(
inp)
out_2 = layers_module.Dense(
2, dtype='float16', kernel_regularizer='l2')(
inp)
model = training_module.Model(inp, [out_1, out_2])
extra_loss = math_ops.reduce_sum(math_ops.cast(out_2, 'float64'))
model.add_loss(extra_loss)
model.compile('sgd', ['mse', 'mse'],
run_eagerly=testing_utils.should_run_eagerly())
x, y = np.ones((10, 2)), np.ones((10, 2))
model.fit(x, [y, y])
@keras_parameterized.run_all_keras_modes
@testing_utils.enable_v2_dtype_behavior
def test_losses_of_different_dtypes_with_subclassed_model(self):
class MyModel(training_module.Model):
def build(self, _):
self.dense = layers_module.Dense(2)
def call(self, inputs):
self.add_loss(math_ops.cast(nn_ops.l2_loss(inputs), 'float64'))
return self.dense(inputs)
model = MyModel(dtype='float32')
model.compile('sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly())
x, y = np.ones((10, 2)), np.ones((10, 2))
model.fit(x, y)
@keras_parameterized.run_all_keras_modes
@testing_utils.enable_v2_dtype_behavior
def test_regularizer_of_different_dtype(self):
inp = input_layer.Input(shape=(2,))
def regularizer(weight):
return math_ops.cast(nn_ops.l2_loss(weight), 'float64')
out = layers_module.Dense(
2, dtype='float32', kernel_regularizer=regularizer)(
inp)
model = training_module.Model(inp, out)
model.compile('sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly())
x, y = np.ones((10, 2)), np.ones((10, 2))
model.fit(x, y)
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_outputs_are_floats(self):
x, y = np.ones((10, 1)), np.ones((10, 1))
model = sequential.Sequential([layers_module.Dense(1)])
model.compile('sgd', 'mse', metrics=['accuracy'],
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(x, y, epochs=2)
self.assertIsInstance(history.history['loss'][0], float)
self.assertIsInstance(history.history['accuracy'][0], float)
loss, accuracy = model.train_on_batch(x, y)
self.assertIsInstance(loss, float)
self.assertIsInstance(accuracy, float)
loss, accuracy = model.evaluate(x, y)
self.assertIsInstance(loss, float)
self.assertIsInstance(accuracy, float)
loss, accuracy = model.test_on_batch(x, y)
self.assertIsInstance(loss, float)
self.assertIsInstance(accuracy, float)
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_int_output(self):
x, y = np.ones((10, 1)), np.ones((10, 1))
model = sequential.Sequential([layers_module.Dense(1)])
class MyMetric(metrics_module.Metric):
def update_state(self, y_true, y_pred, sample_weight=None):
del y_true, y_pred, sample_weight
def result(self):
return array_ops.constant(1, dtype='int64')
model.compile('sgd', 'mse', metrics=[MyMetric()],
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(x, y, epochs=2)
self.assertIsInstance(history.history['my_metric'][0], int)
@keras_parameterized.run_all_keras_modes
def test_calling_aggregate_gradient(self):
class _Optimizer(optimizer_v2.gradient_descent.SGD):
"""Mock optimizer to check if _aggregate_gradient is called."""
_HAS_AGGREGATE_GRAD = True
def __init__(self):
self.aggregate_gradients_called = False
super(_Optimizer, self).__init__(name='MyOptimizer')
def _aggregate_gradients(self, grads):
self.aggregate_gradients_called = True
return super(_Optimizer, self)._aggregate_gradients(grads)
mock_optimizer = _Optimizer()
model = sequential.Sequential()
model.add(layers_module.Dense(10, activation='relu'))
model.compile(mock_optimizer, 'mse',
run_eagerly=testing_utils.should_run_eagerly())
x, y = np.ones((10, 10)), np.ones((10, 10))
model.fit(x, y)
self.assertEqual(model.optimizer.aggregate_gradients_called, True)
class _OptimizerOverrideApplyGradients(_Optimizer):
"""Override apply_gradients.
To test the case where the optimizer does not define the
experimental_aggregate_gradients parameter.
"""
_HAS_AGGREGATE_GRAD = False
def apply_gradients(self, grads_and_vars, name=None): # pylint: disable=useless-super-delegation
return super(_OptimizerOverrideApplyGradients,
self).apply_gradients(grads_and_vars, name)
mock_optimizer = _OptimizerOverrideApplyGradients()
model.compile(mock_optimizer, 'mse',
run_eagerly=testing_utils.should_run_eagerly())
x, y = np.ones((10, 10)), np.ones((10, 10))
model.fit(x, y)
self.assertEqual(model.optimizer.aggregate_gradients_called, True)
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_gradients_are_none(self):
class DenseWithExtraWeight(layers_module.Dense):
def build(self, input_shape):
# Gradients w.r.t. extra_weights are None
self.extra_weight_1 = self.add_weight('extra_weight_1', shape=(),
initializer='ones')
super(DenseWithExtraWeight, self).build(input_shape)
self.extra_weight_2 = self.add_weight('extra_weight_2', shape=(),
initializer='ones')
model = sequential.Sequential([DenseWithExtraWeight(4, input_shape=(4,))])
# Test clipping can handle None gradients
opt = optimizer_v2.adam.Adam(clipnorm=1.0, clipvalue=1.0)
model.compile(opt, 'mse', run_eagerly=testing_utils.should_run_eagerly())
inputs = np.random.normal(size=(64, 4))
targets = np.random.normal(size=(64, 4))
old_kernel = model.get_weights()[1]
model.fit(inputs, targets)
new_kernel = model.get_weights()[1]
self.assertNotAllEqual(old_kernel, new_kernel)
@keras_parameterized.run_all_keras_modes
def test_layer_ordering(self):
class MyLayer(layers_module.Layer):
pass
class MyModel(training_module.Model):
def __init__(self, name):
super(MyModel, self).__init__(name=name)
self.weight = variables_lib.Variable(0, name=name)
self.direct_sublayer = MyLayer(name='direct')
self.direct_sublayer.d = {'d': MyLayer(name='direct/dict')}
self.dict_sublayer = {'d': MyLayer(name='dict')}
self.dict_sublayer['d'].direct = MyLayer(name='dict/direct')
model = MyModel('model')
# All sublayers, including self and recursive sublayers.
self.assertEqual(['model', 'direct', 'direct/dict', 'dict', 'dict/direct'],
[l.name for l in model._flatten_layers()])
# Only direct sublayers, including those in data structures.
self.assertEqual(['direct', 'dict'], [l.name for l in model.layers])
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_trainable_state_setting(self):
class UpdateLayer(layers_module.Layer):
def __init__(self):
super(UpdateLayer, self).__init__()
self.v = variables_lib.Variable(0., trainable=False)
def call(self, x):
self.add_update(lambda: self.v.assign_add(1.))
return x * self.v
layer = UpdateLayer()
model_with_updates = sequential.Sequential([layer])
model_with_updates.compile(
'sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly())
layer.trainable = False
model_without_updates = sequential.Sequential([layer])
model_without_updates.compile(
'sgd', 'mse', run_eagerly=testing_utils.should_run_eagerly())
x, y = np.ones((10, 1)), np.ones((10, 1))
self.assertEqual(self.evaluate(layer.v), 0.)
model_with_updates.fit(x, y, batch_size=10)
# assign_add called.
self.assertEqual(self.evaluate(layer.v), 1.)
model_without_updates.fit(x, y, batch_size=10)
# assign_add not called.
self.assertEqual(self.evaluate(layer.v), 1.)
@keras_parameterized.run_all_keras_modes(
always_skip_v1=True)
@parameterized.named_parameters(
('numpy_array', 'numpy_array'),
('dataset_array', 'dataset_array'),
('dataset_dict', 'dataset_dict'))
def test_single_input_no_tuple_wrapping(self, input_type):
x = np.ones((10, 1))
if input_type == 'numpy_array':
batch_size = 3
expected_data_type = ops.Tensor
elif input_type == 'dataset_array':
x = dataset_ops.Dataset.from_tensor_slices(x).batch(3)
batch_size = None
expected_data_type = ops.Tensor
else:
x = {'my_input': x}
x = dataset_ops.Dataset.from_tensor_slices(x).batch(3)
batch_size = None
expected_data_type = dict
test_case = self
class MyModel(training_module.Model):
def train_step(self, data):
# No tuple wrapping for single x input and no targets.
test_case.assertIsInstance(data, expected_data_type)
return super(MyModel, self).train_step(data)
def test_step(self, data):
test_case.assertIsInstance(data, expected_data_type)
return super(MyModel, self).test_step(data)
def predict_step(self, data):
test_case.assertIsInstance(data, expected_data_type)
return super(MyModel, self).predict_step(data)
inputs = layers_module.Input(shape=(1,), name='my_input')
outputs = layers_module.Dense(1)(inputs)
model = MyModel(inputs, outputs)
model.add_loss(math_ops.reduce_sum(outputs))
model.compile('sgd', 'mse')
model.fit(x, batch_size=batch_size)
model.evaluate(x, batch_size=batch_size)
model.predict(x, batch_size=batch_size)
class TestExceptionsAndWarnings(keras_parameterized.TestCase):
@keras_parameterized.run_all_keras_modes
def test_compile_warning_for_loss_missing_output(self):
with self.cached_session():
inp = layers_module.Input(shape=(16,), name='input_a')
out_1 = layers_module.Dense(8, name='dense_1')(inp)
out_2 = layers_module.Dense(
3, activation='softmax', name='dense_2')(
out_1)
model = training_module.Model(inputs=[inp], outputs=[out_1, out_2])
optimizer = RMSPropOptimizer(learning_rate=0.001)
model.compile(
optimizer,
loss={
'dense_2': 'categorical_crossentropy',
},
metrics={
'dense_2': 'categorical_accuracy',
'dense_1': metrics_module.CategoricalAccuracy(),
},
run_eagerly=testing_utils.should_run_eagerly())
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_sparse_op_with_op_layer(self):
with testing_utils.use_keras_tensors_scope(False):
# The meaningful error is only raised w/o KerasTensors.
# It's tricky to raise the exact same error w/ KerasTensors enabled.
# We may want to add dispatching to the sparse_ops and have dispatch
# trigger on attributeerror so that these ops fully work w/ KerasTensors.
# This may need to wait until dispatch v2
inputs = layers_module.Input(
shape=(2,), sparse=True, name='sparse_tensor')
output = sparse_ops.sparse_minimum(inputs, inputs)
with self.assertRaisesRegex(
ValueError, 'not supported by Keras automatic '
'op wrapping'):
training_module.Model([inputs], output)
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_predict_error_with_empty_x(self):
inputs = layers_module.Input(shape=(2,))
outputs = layers_module.Dense(4)(inputs)
model = training_module.Model(inputs=inputs, outputs=outputs)
model.compile(loss='mse')
with self.assertRaisesRegex(ValueError,
'Expect x to be a non-empty array or dataset.'):
model.predict(np.array([]))
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_on_batch_error_inconsistent_batch_size(self):
input_node1 = layers_module.Input(shape=(5,))
input_node2 = layers_module.Input(shape=(5,))
output_node = layers_module.Concatenate()([input_node1, input_node2])
output_node = layers_module.Dense(4)(output_node)
model = training_module.Model([input_node1, input_node2], output_node)
model.compile(loss='mse')
with self.assertRaisesRegex(ValueError, 'Data cardinality is ambiguous'):
model.train_on_batch([np.ones((10, 5)), np.ones((10, 5))],
np.ones((11, 4)))
with self.assertRaisesRegex(ValueError, 'Data cardinality is ambiguous'):
model.test_on_batch([np.ones((10, 5)), np.ones((10, 5))],
np.ones((11, 4)))
with self.assertRaisesRegex(ValueError, 'Data cardinality is ambiguous'):
model.predict_on_batch([np.ones((10, 5)), np.ones((11, 5))])
class LossWeightingTest(keras_parameterized.TestCase):
@keras_parameterized.run_all_keras_modes
def test_class_weights(self):
num_classes = 5
batch_size = 5
epochs = 10
weighted_class = 3
weight = .5
train_samples = 1000
test_samples = 1000
input_dim = 5
learning_rate = 0.001
model = testing_utils.get_small_sequential_mlp(
num_hidden=10, num_classes=num_classes, input_dim=input_dim)
model.compile(
loss='categorical_crossentropy',
metrics=['acc', metrics_module.CategoricalAccuracy()],
weighted_metrics=['mae', metrics_module.CategoricalAccuracy()],
optimizer=RMSPropOptimizer(learning_rate=learning_rate),
run_eagerly=testing_utils.should_run_eagerly())
np.random.seed(1337)
(x_train, y_train), (x_test, y_test) = testing_utils.get_test_data(
train_samples=train_samples,
test_samples=test_samples,
input_shape=(input_dim,),
num_classes=num_classes)
int_y_test = y_test.copy()
int_y_train = y_train.copy()
# convert class vectors to binary class matrices
y_train = np_utils.to_categorical(y_train, num_classes)
y_test = np_utils.to_categorical(y_test, num_classes)
test_ids = np.where(int_y_test == np.array(weighted_class))[0]
class_weight = dict([(i, 1.) for i in range(num_classes)])
class_weight[weighted_class] = weight
model.fit(
x_train,
y_train,
batch_size=batch_size,
epochs=epochs // 3,
verbose=0,
class_weight=class_weight,
validation_data=(x_train, y_train))
model.fit(
x_train,
y_train,
batch_size=batch_size,
epochs=epochs // 2,
verbose=0,
class_weight=class_weight)
model.fit(
x_train,
y_train,
batch_size=batch_size,
epochs=epochs // 2,
verbose=0,
class_weight=class_weight,
validation_split=0.1)
model.train_on_batch(
x_train[:batch_size], y_train[:batch_size], class_weight=class_weight)
ref_score = model.evaluate(x_test, y_test, verbose=0) # pylint: disable=unused-variable
score = model.evaluate( # pylint: disable=unused-variable
x_test[test_ids, :], y_test[test_ids, :], verbose=0)
# TODO(b/152990697): Fix the class weights test here.
# self.assertLess(score[0], ref_score[0])
@keras_parameterized.run_all_keras_modes
def test_temporal_sample_weights(self):
num_classes = 5
batch_size = 5
epochs = 10
weighted_class = 3
weight = 10.
train_samples = 1000
test_samples = 1000
input_dim = 5
timesteps = 3
learning_rate = 0.001
with self.cached_session():
model = sequential.Sequential()
model.add(
layers_module.TimeDistributed(
layers_module.Dense(num_classes),
input_shape=(timesteps, input_dim)))
model.add(layers_module.Activation('softmax'))
np.random.seed(1337)
(x_train, y_train), (x_test, y_test) = testing_utils.get_test_data(
train_samples=train_samples,
test_samples=test_samples,
input_shape=(input_dim,),
num_classes=num_classes)
int_y_test = y_test.copy()
int_y_train = y_train.copy()
# convert class vectors to binary class matrices
y_train = np_utils.to_categorical(y_train, num_classes)
y_test = np_utils.to_categorical(y_test, num_classes)
test_ids = np.where(int_y_test == np.array(weighted_class))[0]
sample_weight = np.ones((y_train.shape[0]))
sample_weight[int_y_train == weighted_class] = weight
temporal_x_train = np.reshape(x_train, (len(x_train), 1,
x_train.shape[1]))
temporal_x_train = np.repeat(temporal_x_train, timesteps, axis=1)
temporal_x_test = np.reshape(x_test, (len(x_test), 1, x_test.shape[1]))
temporal_x_test = np.repeat(temporal_x_test, timesteps, axis=1)
temporal_y_train = np.reshape(y_train, (len(y_train), 1,
y_train.shape[1]))
temporal_y_train = np.repeat(temporal_y_train, timesteps, axis=1)
temporal_y_test = np.reshape(y_test, (len(y_test), 1, y_test.shape[1]))
temporal_y_test = np.repeat(temporal_y_test, timesteps, axis=1)
temporal_sample_weight = np.reshape(sample_weight, (len(sample_weight),
1))
temporal_sample_weight = np.repeat(
temporal_sample_weight, timesteps, axis=1)
model.compile(
RMSPropOptimizer(learning_rate=learning_rate),
loss='categorical_crossentropy',
metrics=['acc', metrics_module.CategoricalAccuracy()],
weighted_metrics=['mae', metrics_module.CategoricalAccuracy()],
sample_weight_mode='temporal',
run_eagerly=testing_utils.should_run_eagerly())
model.fit(
temporal_x_train,
temporal_y_train,
batch_size=batch_size,
epochs=epochs // 3,
verbose=0,
sample_weight=temporal_sample_weight)
model.fit(
temporal_x_train,
temporal_y_train,
batch_size=batch_size,
epochs=epochs // 3,
verbose=0,
sample_weight=temporal_sample_weight,
validation_split=0.1)
model.train_on_batch(
temporal_x_train[:batch_size],
temporal_y_train[:batch_size],
sample_weight=temporal_sample_weight[:batch_size])
model.test_on_batch(
temporal_x_train[:batch_size],
temporal_y_train[:batch_size],
sample_weight=temporal_sample_weight[:batch_size])
ref_score = model.evaluate(temporal_x_test, temporal_y_test, verbose=0)
if not context.executing_eagerly():
score = model.evaluate(
temporal_x_test[test_ids], temporal_y_test[test_ids], verbose=0)
self.assertLess(score[0], ref_score[0])
@keras_parameterized.run_all_keras_modes
@keras_parameterized.run_with_all_model_types(exclude_models='sequential')
def test_fit_with_incorrect_weights(self):
input_a = layers_module.Input(shape=(3,), name='input_a')
input_b = layers_module.Input(shape=(3,), name='input_b')
dense = layers_module.Dense(2, name='output_1')
dropout = layers_module.Dropout(0.5, name='output_2')
branch_a = [input_a, dense]
branch_b = [input_b, dense, dropout]
model = testing_utils.get_multi_io_model(branch_a, branch_b)
model.compile(
optimizer='adam',
loss='mse',
run_eagerly=testing_utils.should_run_eagerly())
x = np.random.random((10, 3))
y = np.random.random((10, 2))
with self.assertRaises(ValueError):
model.fit([x, x], [y, y], epochs=1, sample_weight={'unknown': x})
with self.assertRaises(ValueError):
model.fit([x, x], [y, y], epochs=1, class_weight={'unknown': 1})
@keras_parameterized.run_all_keras_modes
def test_default_sample_weight(self):
"""Verifies that fit works without having to set sample_weight."""
num_classes = 5
input_dim = 5
timesteps = 3
learning_rate = 0.001
with self.cached_session():
model = sequential.Sequential()
model.add(
layers_module.TimeDistributed(
layers_module.Dense(num_classes),
input_shape=(timesteps, input_dim)))
x = np.random.random((10, timesteps, input_dim))
y = np.random.random((10, timesteps, num_classes))
optimizer = RMSPropOptimizer(learning_rate=learning_rate)
# sample_weight_mode is a list and mode value is None
model.compile(
optimizer,
loss='mse',
sample_weight_mode=[None],
run_eagerly=testing_utils.should_run_eagerly())
model.fit(x, y, epochs=1, batch_size=10)
# sample_weight_mode is a list and mode value is `temporal`
model.compile(
optimizer,
loss='mse',
sample_weight_mode=['temporal'],
run_eagerly=testing_utils.should_run_eagerly())
model.fit(x, y, epochs=1, batch_size=10)
# sample_weight_mode is a dict and mode value is None
model.compile(
optimizer,
loss='mse',
sample_weight_mode={'time_distributed': None},
run_eagerly=testing_utils.should_run_eagerly())
model.fit(x, y, epochs=1, batch_size=10)
# sample_weight_mode is a dict and mode value is `temporal`
model.compile(
optimizer,
loss='mse',
sample_weight_mode={'time_distributed': 'temporal'},
run_eagerly=testing_utils.should_run_eagerly())
model.fit(x, y, epochs=1, batch_size=10)
# sample_weight_mode is a not a list/dict and mode value is None
model.compile(
optimizer,
loss='mse',
sample_weight_mode=None,
run_eagerly=testing_utils.should_run_eagerly())
model.fit(x, y, epochs=1, batch_size=10)
# sample_weight_mode is a not a list/dict and mode value is `temporal`
model.compile(
optimizer,
loss='mse',
sample_weight_mode='temporal',
run_eagerly=testing_utils.should_run_eagerly())
model.fit(x, y, epochs=1, batch_size=10)
def test_sample_weight_tensor(self):
"""Tests that sample weight may be defined as a tensor in the graph."""
with ops.get_default_graph().as_default():
# Create a simple pass-through model
inputs = layers_module.Input(shape=1, name='input_layer')
model = training_module.Model(inputs=inputs, outputs=inputs)
model.compile(
loss='mean_absolute_error',
optimizer='adam')
# Prepare sample weights iterator tensor
sample_weights = array_ops.constant(
[[0, .4, 1, 1], [2, .4, .3, 1]])
dataset = dataset_ops.Dataset.from_tensor_slices(sample_weights)
sample_weights = dataset_ops.make_one_shot_iterator(dataset).get_next()
sample_weights = training_utils.standardize_sample_weights(
sample_weights, model.output_names)
# Update model loss with sample weight tensor.
model._compile_weights_loss_and_weighted_metrics(sample_weights)
feeds = {'input_layer:0': [[0], [0], [0], [0]],
'input_layer_target:0': [[1], [1], [1], [1]]}
with self.cached_session() as sess:
self.assertAllClose(
(.4 + 1 + 1) / 4, sess.run(model.total_loss, feed_dict=feeds))
self.assertAllClose(
(2+ .4 + .3 + 1) / 4, sess.run(model.total_loss, feed_dict=feeds))
@keras_parameterized.run_all_keras_modes
class MaskingTest(keras_parameterized.TestCase):
def _get_model(self, input_shape=None):
layers = [
layers_module.Masking(mask_value=0),
layers_module.TimeDistributed(
layers_module.Dense(1, kernel_initializer='one'))
]
model = testing_utils.get_model_from_layers(layers, input_shape)
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=testing_utils.should_run_eagerly())
return model
@keras_parameterized.run_with_all_model_types
def test_masking(self):
model = self._get_model(input_shape=(2, 1))
x = np.array([[[1], [1]], [[0], [0]]])
y = np.array([[[1], [1]], [[1], [1]]])
loss = model.train_on_batch(x, y)
self.assertEqual(loss, 0)
@keras_parameterized.run_with_all_model_types(exclude_models='functional')
def test_masking_deferred(self):
model = self._get_model()
x = np.array([[[1], [1]], [[0], [0]]])
y = np.array([[[1], [1]], [[1], [1]]])
loss = model.train_on_batch(x, y)
self.assertEqual(loss, 0)
def test_mask_argument_in_layer(self):
# Test that the mask argument gets correctly passed to a layer in the
# functional API.
class CustomMaskedLayer(layers_module.Layer):
def __init__(self):
super(CustomMaskedLayer, self).__init__()
self.supports_masking = True
def call(self, inputs, mask=None):
assert mask is not None
return inputs
def compute_output_shape(self, input_shape):
return input_shape
x = np.random.random((5, 3))
inputs = layers_module.Input((3,))
masked = layers_module.Masking(mask_value=0)(inputs)
outputs = CustomMaskedLayer()(masked)
model = training_module.Model(inputs, outputs)
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=testing_utils.should_run_eagerly())
y = np.random.random((5, 3))
model.train_on_batch(x, y)
@keras_parameterized.run_all_keras_modes
class TestDynamicTrainability(keras_parameterized.TestCase):
def test_trainable_warning(self):
x = np.random.random((5, 3))
y = np.random.random((5, 2))
model = sequential.Sequential()
model.add(layers_module.Dense(2, input_dim=3))
model.trainable = False
model.compile(
'rmsprop',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
model.trainable = True
model.train_on_batch(x, y)
self.assertRaises(Warning)
def test_trainable_argument(self):
with self.cached_session():
x = np.random.random((5, 3))
y = np.random.random((5, 2))
model = sequential.Sequential()
model.add(layers_module.Dense(2, input_dim=3, trainable=False))
model.compile(
'rmsprop',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
out = model.predict(x)
model.train_on_batch(x, y)
out_2 = model.predict(x)
self.assertAllClose(out, out_2)
# test with nesting
inputs = layers_module.Input(shape=(3,))
output = model(inputs)
model = training_module.Model(inputs, output)
model.compile(
'rmsprop',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
out = model.predict(x)
model.train_on_batch(x, y)
out_2 = model.predict(x)
self.assertAllClose(out, out_2)
def test_layer_trainability_switch(self):
# with constructor argument, in Sequential
model = sequential.Sequential()
model.add(layers_module.Dense(2, trainable=False, input_dim=1))
self.assertListEqual(model.trainable_weights, [])
# by setting the `trainable` argument, in Sequential
model = sequential.Sequential()
layer = layers_module.Dense(2, input_dim=1)
model.add(layer)
self.assertListEqual(model.trainable_weights, layer.trainable_weights)
layer.trainable = False
self.assertListEqual(model.trainable_weights, [])
# with constructor argument, in Model
x = layers_module.Input(shape=(1,))
y = layers_module.Dense(2, trainable=False)(x)
model = training_module.Model(x, y)
self.assertListEqual(model.trainable_weights, [])
# by setting the `trainable` argument, in Model
x = layers_module.Input(shape=(1,))
layer = layers_module.Dense(2)
y = layer(x)
model = training_module.Model(x, y)
self.assertListEqual(model.trainable_weights, layer.trainable_weights)
layer.trainable = False
self.assertListEqual(model.trainable_weights, [])
def test_model_trainability_switch(self):
# a non-trainable model has no trainable weights
x = layers_module.Input(shape=(1,))
y = layers_module.Dense(2)(x)
model = training_module.Model(x, y)
model.trainable = False
self.assertListEqual(model.trainable_weights, [])
# same for Sequential
model = sequential.Sequential()
model.add(layers_module.Dense(2, input_dim=1))
model.trainable = False
self.assertListEqual(model.trainable_weights, [])
def test_nested_model_trainability(self):
# a Sequential inside a Model
inner_model = sequential.Sequential()
inner_model.add(layers_module.Dense(2, input_dim=1))
x = layers_module.Input(shape=(1,))
y = inner_model(x)
outer_model = training_module.Model(x, y)
self.assertListEqual(outer_model.trainable_weights,
inner_model.trainable_weights)
inner_model.trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
inner_model.trainable = True
inner_model.layers[-1].trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
# a Sequential inside a Sequential
inner_model = sequential.Sequential()
inner_model.add(layers_module.Dense(2, input_dim=1))
outer_model = sequential.Sequential()
outer_model.add(inner_model)
self.assertListEqual(outer_model.trainable_weights,
inner_model.trainable_weights)
inner_model.trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
inner_model.trainable = True
inner_model.layers[-1].trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
# a Model inside a Model
x = layers_module.Input(shape=(1,))
y = layers_module.Dense(2)(x)
inner_model = training_module.Model(x, y)
x = layers_module.Input(shape=(1,))
y = inner_model(x)
outer_model = training_module.Model(x, y)
self.assertListEqual(outer_model.trainable_weights,
inner_model.trainable_weights)
inner_model.trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
inner_model.trainable = True
inner_model.layers[-1].trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
# a Model inside a Sequential
x = layers_module.Input(shape=(1,))
y = layers_module.Dense(2)(x)
inner_model = training_module.Model(x, y)
outer_model = sequential.Sequential()
outer_model.add(inner_model)
self.assertListEqual(outer_model.trainable_weights,
inner_model.trainable_weights)
inner_model.trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
inner_model.trainable = True
inner_model.layers[-1].trainable = False
self.assertListEqual(outer_model.trainable_weights, [])
def test_gan_workflow(self):
shared_layer = layers_module.BatchNormalization()
inputs1 = input_layer.Input(10)
outputs1 = shared_layer(inputs1)
model1 = training_module.Model(inputs1, outputs1)
shared_layer.trainable = False
model1.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
inputs2 = input_layer.Input(10)
outputs2 = shared_layer(inputs2)
model2 = training_module.Model(inputs2, outputs2)
shared_layer.trainable = True
model2.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
x, y = np.ones((10, 10)), np.ones((10, 10))
out1_0 = model1.predict_on_batch(x)
model1.train_on_batch(x, y)
out1_1 = model1.predict_on_batch(x)
self.assertAllClose(out1_0, out1_1)
out2_0 = model2.predict_on_batch(x)
model2.train_on_batch(x, y)
out2_1 = model2.predict_on_batch(x)
self.assertNotAllClose(out2_0, out2_1)
def test_toggle_value(self):
input_0 = layers_module.Input(shape=(1,))
dense_0 = layers_module.Dense(
1, kernel_initializer='ones', bias_initializer='ones')
dense_1 = layers_module.Dense(
1, kernel_initializer='ones', bias_initializer='ones')
result = layers_module.Add()([dense_0(input_0), dense_1(input_0)])
model = training_module.Model(input_0, result)
dense_0.trainable = False
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
x = np.ones((10, 1))
y = 5 * x + 2
model.train_on_batch(x, y)
dense_0.trainable = True
model.train_on_batch(x, y)
kernel, bias = dense_0.get_weights()
self.assertAllEqual([kernel[0, 0], bias[0]], [1., 1.])
kernel, bias = dense_1.get_weights()
self.assertAllClose([kernel[0, 0], bias[0]], [1.1176, 1.1176])
class TestTrainingWithDataTensors(keras_parameterized.TestCase):
def test_training_and_eval_methods_on_symbolic_tensors_single_io(self):
with ops.Graph().as_default():
x = layers_module.Input(shape=(3,), name='input')
y = layers_module.Dense(4, name='dense')(x)
model = training_module.Model(x, y)
optimizer = RMSPropOptimizer(learning_rate=0.001)
loss = 'mse'
model.compile(
optimizer,
loss,
metrics=['mae', metrics_module.CategoricalAccuracy()])
inputs = backend.zeros(shape=(10, 3))
targets = backend.zeros(shape=(10, 4))
model.fit(inputs, targets, epochs=1, steps_per_epoch=2, verbose=0)
model.evaluate(inputs, targets, steps=2, verbose=0)
model.predict(inputs, steps=2)
model.train_on_batch(inputs, targets)
model.test_on_batch(inputs, targets)
model.fit(inputs, targets,
epochs=1, steps_per_epoch=2, verbose=0,
validation_data=(inputs, targets), validation_steps=2)
# Test with dynamic shape
inputs = array_ops.placeholder_with_default(
np.zeros((2, 3)), shape=tensor_shape.TensorShape([None, 3]))
targets = array_ops.placeholder_with_default(
np.zeros((2, 4)), shape=tensor_shape.TensorShape([None, 4]))
self.assertEqual(inputs.shape.dims[0].value, None)
model.fit(inputs, targets, epochs=1, steps_per_epoch=2, verbose=0)
model.evaluate(inputs, targets, steps=2, verbose=0)
model.predict(inputs, steps=2)
model.train_on_batch(inputs, targets)
model.test_on_batch(inputs, targets)
model.fit(inputs, targets,
epochs=1, steps_per_epoch=2, verbose=0,
validation_data=(inputs, targets), validation_steps=2)
def test_training_and_eval_methods_on_symbolic_tensors_multi_io(self):
a = layers_module.Input(shape=(3,), name='input_a')
b = layers_module.Input(shape=(3,), name='input_b')
dense = layers_module.Dense(4, name='dense')
c = dense(a)
d = dense(b)
e = layers_module.Dropout(0.5, name='dropout')(c)
model = training_module.Model([a, b], [d, e])
optimizer = 'rmsprop'
loss = 'mse'
loss_weights = [1., 0.5]
model.compile(
optimizer,
loss,
metrics=['mae', metrics_module.CategoricalAccuracy()],
loss_weights=loss_weights)
input_a_tf = array_ops.zeros(shape=(10, 3))
input_b_tf = array_ops.zeros(shape=(10, 3))
output_d_tf = array_ops.zeros(shape=(10, 4))
output_e_tf = array_ops.zeros(shape=(10, 4))
model.fit([input_a_tf, input_b_tf], [output_d_tf, output_e_tf],
epochs=1,
steps_per_epoch=2,
verbose=0)
model.train_on_batch([input_a_tf, input_b_tf], [output_d_tf, output_e_tf])
# Test with dictionary inputs
model.fit({
'input_a': input_a_tf,
'input_b': input_b_tf
}, {
'dense': output_d_tf,
'dropout': output_e_tf
},
epochs=1,
steps_per_epoch=2,
verbose=0)
model.fit({
'input_a': input_a_tf,
'input_b': input_b_tf
}, {
'dense': output_d_tf,
'dropout': output_e_tf
},
validation_data=({
'input_a': input_a_tf,
'input_b': input_b_tf
}, {
'dense': output_d_tf,
'dropout': output_e_tf
}),
epochs=1,
steps_per_epoch=2,
validation_steps=2,
verbose=0)
model.train_on_batch({
'input_a': input_a_tf,
'input_b': input_b_tf
}, {
'dense': output_d_tf,
'dropout': output_e_tf
})
# Test with validation data
model.fit([input_a_tf, input_b_tf], [output_d_tf, output_e_tf],
validation_data=([input_a_tf,
input_b_tf], [output_d_tf, output_e_tf]),
epochs=1,
steps_per_epoch=2,
validation_steps=2,
verbose=0)
# Test evaluation / prediction methods
model.evaluate([input_a_tf, input_b_tf], [output_d_tf, output_e_tf],
steps=2,
verbose=0)
model.predict([input_a_tf, input_b_tf], steps=2)
model.test_on_batch([input_a_tf, input_b_tf], [output_d_tf, output_e_tf])
@tf_test_util.run_deprecated_v1
def test_model_with_input_feed_tensor(self):
"""We test building a model with a TF variable as input.
We should be able to call fit, evaluate, predict,
by only passing them data for the placeholder inputs
in the model.
"""
with ops.Graph().as_default(), self.cached_session():
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 3))
output_a_np = np.random.random((10, 4))
output_b_np = np.random.random((10, 3))
input_v = backend.variables_module.Variable(input_a_np, dtype='float32')
self.evaluate(variables_lib.variables_initializer([input_v]))
a = input_layer.Input(tensor=input_v)
b = input_layer.Input(shape=(3,), name='input_b')
a_2 = layers_module.Dense(4, name='dense_1')(a)
dp = layers_module.Dropout(0.5, name='dropout')
b_2 = dp(b)
model = training_module.Model([a, b], [a_2, b_2])
model.summary()
optimizer = 'rmsprop'
loss = 'mse'
loss_weights = [1., 0.5]
model.compile(optimizer, loss, metrics=['mean_squared_error'],
loss_weights=loss_weights,
sample_weight_mode=None)
# test train_on_batch
out = model.train_on_batch(input_b_np,
[output_a_np, output_b_np])
out = model.train_on_batch({'input_b': input_b_np},
[output_a_np, output_b_np])
out = model.test_on_batch({'input_b': input_b_np},
[output_a_np, output_b_np])
out = model.predict_on_batch({'input_b': input_b_np})
# test fit
out = model.fit({'input_b': input_b_np},
[output_a_np, output_b_np], epochs=1, batch_size=10)
out = model.fit(input_b_np,
[output_a_np, output_b_np], epochs=1, batch_size=10)
# test evaluate
out = model.evaluate({'input_b': input_b_np},
[output_a_np, output_b_np], batch_size=10)
out = model.evaluate(input_b_np,
[output_a_np, output_b_np], batch_size=10)
# test predict
out = model.predict({'input_b': input_b_np}, batch_size=10)
out = model.predict(input_b_np, batch_size=10)
self.assertEqual(len(out), 2)
# Now test a model with a single input
# i.e. we don't pass any data to fit the model.
self.evaluate(variables_lib.variables_initializer([input_v]))
a = input_layer.Input(tensor=input_v)
a_2 = layers_module.Dense(4, name='dense_1')(a)
a_2 = layers_module.Dropout(0.5, name='dropout')(a_2)
model = training_module.Model(a, a_2)
model.summary()
optimizer = 'rmsprop'
loss = 'mse'
model.compile(optimizer, loss, metrics=['mean_squared_error'])
# test train_on_batch
out = model.train_on_batch(None,
output_a_np)
out = model.train_on_batch(None,
output_a_np)
out = model.test_on_batch(None,
output_a_np)
out = model.predict_on_batch(None)
out = model.train_on_batch([],
output_a_np)
out = model.train_on_batch({},
output_a_np)
# test fit
_ = model.fit(None, output_a_np, epochs=1, steps_per_epoch=3)
_ = model.fit(None, output_a_np, epochs=1, steps_per_epoch=3)
# test evaluate
_ = model.evaluate(None, output_a_np, steps=3)
_ = model.evaluate(None, output_a_np, steps=3)
# test predict
out = model.predict(None, steps=3)
out = model.predict(None, steps=3)
self.assertEqual(out.shape, (10 * 3, 4))
# Same, without learning phase
# i.e. we don't pass any data to fit the model.
self.evaluate(variables_lib.variables_initializer([input_v]))
a = input_layer.Input(tensor=input_v)
a_2 = layers_module.Dense(4, name='dense_1')(a)
model = training_module.Model(a, a_2)
model.summary()
optimizer = 'rmsprop'
loss = 'mse'
model.compile(optimizer, loss, metrics=['mean_squared_error'])
# test train_on_batch
out = model.train_on_batch(None,
output_a_np)
out = model.train_on_batch(None,
output_a_np)
out = model.test_on_batch(None,
output_a_np)
out = model.predict_on_batch(None)
out = model.train_on_batch([],
output_a_np)
out = model.train_on_batch({},
output_a_np)
# test fit
_ = model.fit(None, output_a_np, epochs=1, steps_per_epoch=10)
_ = model.fit(None, output_a_np, epochs=1, steps_per_epoch=10)
# test evaluate
_ = model.evaluate(None, output_a_np, steps=10)
_ = model.evaluate(None, output_a_np, steps=10)
# test predict
out = model.predict(None, steps=3)
out = model.predict(None, steps=3)
self.assertEqual(out.shape, (10 * 3, 4))
@keras_parameterized.run_all_keras_modes
def test_model_with_partial_loss(self):
with self.cached_session():
a = input_layer.Input(shape=(3,), name='input_a')
a_2 = layers_module.Dense(4, name='dense_1')(a)
dp = layers_module.Dropout(0.5, name='dropout')
a_3 = dp(a_2)
model = training_module.Model(a, [a_2, a_3])
optimizer = 'rmsprop'
loss = {'dropout': 'mse'}
model.compile(optimizer, loss, metrics=['mae'])
input_a_np = np.random.random((10, 3))
output_a_np = np.random.random((10, 4))
# test train_on_batch
_ = model.train_on_batch(input_a_np, output_a_np)
_ = model.test_on_batch(input_a_np, output_a_np)
# fit
_ = model.fit(input_a_np, output_a_np)
# evaluate
_ = model.evaluate(input_a_np, output_a_np)
# Same without dropout.
a = input_layer.Input(shape=(3,), name='input_a')
a_2 = layers_module.Dense(4, name='dense_1')(a)
a_3 = layers_module.Dense(4, name='dense_2')(a_2)
model = training_module.Model(a, [a_2, a_3])
optimizer = 'rmsprop'
loss = {'dense_2': 'mse'}
model.compile(optimizer, loss, metrics={'dense_1': 'mae'})
# test train_on_batch
_ = model.train_on_batch(input_a_np, output_a_np)
_ = model.test_on_batch(input_a_np, output_a_np)
# fit
_ = model.fit(input_a_np, output_a_np)
# evaluate
_ = model.evaluate(input_a_np, output_a_np)
def test_model_with_external_loss(self):
with ops.Graph().as_default(), self.cached_session():
# None loss, only regularization loss.
a = input_layer.Input(shape=(3,), name='input_a')
a_2 = layers_module.Dense(
4, name='dense_1', kernel_regularizer='l1', bias_regularizer='l2')(
a)
dp = layers_module.Dropout(0.5, name='dropout')
a_3 = dp(a_2)
model = training_module.Model(a, [a_2, a_3])
optimizer = 'rmsprop'
loss = None
model.compile(optimizer, loss, metrics=['mae'])
input_a_np = np.random.random((10, 3))
# test train_on_batch
out = model.train_on_batch(input_a_np, None)
out = model.test_on_batch(input_a_np, None)
# fit
out = model.fit(input_a_np, None)
# evaluate
out = model.evaluate(input_a_np, None)
# No dropout, external loss.
a = input_layer.Input(shape=(3,), name='input_a')
a_2 = layers_module.Dense(4, name='dense_1')(a)
a_3 = layers_module.Dense(4, name='dense_2')(a)
model = training_module.Model(a, [a_2, a_3])
model.add_loss(backend.mean(a_3 + a_2))
optimizer = 'rmsprop'
loss = None
model.compile(optimizer, loss, metrics=['mae'])
# test train_on_batch
out = model.train_on_batch(input_a_np, None)
out = model.test_on_batch(input_a_np, None)
# fit
out = model.fit(input_a_np, None)
# evaluate
out = model.evaluate(input_a_np, None)
# Test model with no external data at all.
input_v = backend.variables_module.Variable(input_a_np, dtype='float32')
self.evaluate(variables_lib.variables_initializer([input_v]))
a = input_layer.Input(tensor=input_v)
a_2 = layers_module.Dense(4, name='dense_1')(a)
a_2 = layers_module.Dropout(0.5, name='dropout')(a_2)
model = training_module.Model(a, a_2)
model.add_loss(backend.mean(a_2))
model.compile(optimizer='rmsprop',
loss=None,
metrics=['mean_squared_error'])
# test train_on_batch
out = model.train_on_batch(None, None)
out = model.test_on_batch(None, None)
out = model.predict_on_batch(None)
# Test multi-output model with no external data at all.
self.evaluate(variables_lib.variables_initializer([input_v]))
a = input_layer.Input(tensor=input_v)
a_1 = layers_module.Dense(4, name='dense_1')(a)
a_2 = layers_module.Dropout(0.5, name='dropout')(a_1)
model = training_module.Model(a, [a_1, a_2])
model.add_loss(backend.mean(a_2))
model.compile(optimizer='rmsprop',
loss=None,
metrics=['mean_squared_error'])
# test train_on_batch
out = model.train_on_batch(None, None)
out = model.test_on_batch(None, None)
out = model.predict_on_batch(None)
out = model.predict(None, steps=3)
self.assertEqual(len(out), 2)
self.assertEqual(out[0].shape, (10 * 3, 4))
self.assertEqual(out[1].shape, (10 * 3, 4))
def test_target_tensors(self):
with ops.Graph().as_default(), self.cached_session():
# single-output, as list
model = sequential.Sequential()
model.add(layers_module.Dense(4, input_shape=(4,), name='dense'))
input_val = np.random.random((10, 4))
target_val = np.random.random((10, 4))
target = backend.variable(target_val)
model.compile(optimizer='rmsprop', loss='mse', target_tensors=[target])
model.train_on_batch(input_val, None)
# single-output, as single tensor
model.compile(optimizer='rmsprop', loss='mse', target_tensors=target)
model.train_on_batch(input_val, None)
# single-output, as dict
model.compile(optimizer='rmsprop', loss='mse',
target_tensors={'dense': target})
model.train_on_batch(input_val, None)
# test invalid arguments
with self.assertRaises(TypeError):
model.compile(optimizer='rmsprop', loss='mse',
target_tensors=set())
with self.assertRaises(ValueError):
model.compile(optimizer='rmsprop', loss='mse',
target_tensors=[target, target])
with self.assertRaises(ValueError):
model.compile(optimizer='rmsprop', loss='mse',
target_tensors={'dense2': None})
with self.assertRaises(ValueError):
model.compile(optimizer='rmsprop', loss='mse',
target_tensors=[target])
model.train_on_batch(input_val, target_val)
# multi-output, as list
input_val = np.random.random((10, 4))
target_val_a = np.random.random((10, 4))
target_val_b = np.random.random((10, 4))
target_a = backend.variable(target_val_a)
target_b = backend.variable(target_val_b)
inputs = layers_module.Input(shape=(4,))
output_a = layers_module.Dense(4, name='dense_a')(inputs)
output_b = layers_module.Dense(4, name='dense_b')(inputs)
model = training_module.Model(inputs, [output_a, output_b])
model.compile(optimizer='rmsprop', loss='mse',
target_tensors=[target_a, target_b])
model.train_on_batch(input_val, None)
# multi-output, as dict
model.compile(optimizer='rmsprop', loss='mse',
target_tensors={'dense_a': target_a,
'dense_b': target_b})
model.train_on_batch(input_val, None)
# test with sample weights
model.compile(
optimizer='rmsprop',
loss='mse',
metrics=['mae', metrics_module.CategoricalAccuracy()],
target_tensors=[target_a, target_b])
model.train_on_batch(input_val, None,
sample_weight={'dense_a': np.random.random((10,))})
def test_model_custom_target_tensors(self):
with ops.Graph().as_default(), self.cached_session():
a = input_layer.Input(shape=(3,), name='input_a')
b = input_layer.Input(shape=(3,), name='input_b')
a_2 = layers_module.Dense(4, name='dense_1')(a)
dp = layers_module.Dropout(0.5, name='dropout')
b_2 = dp(b)
y = backend.placeholder([10, 4], name='y')
y1 = backend.placeholder([10, 3], name='y1')
y2 = backend.placeholder([7, 5], name='y2')
model = training_module.Model([a, b], [a_2, b_2])
optimizer = 'rmsprop'
loss = 'mse'
loss_weights = [1., 0.5]
# test list of target tensors
with self.assertRaises(ValueError):
model.compile(optimizer, loss, metrics=[], loss_weights=loss_weights,
sample_weight_mode=None, target_tensors=[y, y1, y2])
model.compile(optimizer, loss, metrics=[], loss_weights=loss_weights,
sample_weight_mode=None, target_tensors=[y, y1])
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 3))
output_a_np = np.random.random((10, 4))
output_b_np = np.random.random((10, 3))
_ = model.train_on_batch([input_a_np, input_b_np],
[output_a_np, output_b_np], {
'dense_1': np.random.random((10,)),
'dropout': np.random.random((10,))
})
# test dictionary of target_tensors
with self.assertRaises(ValueError):
model.compile(optimizer, loss,
metrics=[],
loss_weights=loss_weights,
sample_weight_mode=None,
target_tensors={'does_not_exist': y2})
# test dictionary of target_tensors
model.compile(optimizer, loss,
metrics=[],
loss_weights=loss_weights,
sample_weight_mode=None,
target_tensors={'dense_1': y, 'dropout': y1})
_ = model.train_on_batch([input_a_np, input_b_np],
[output_a_np, output_b_np], {
'dense_1': np.random.random((10,)),
'dropout': np.random.random((10,))
})
# test with custom TF placeholder as target
pl_target_a = backend.array_ops.placeholder('float32', shape=(None, 4))
model.compile(optimizer='rmsprop', loss='mse',
target_tensors={'dense_1': pl_target_a})
model.train_on_batch([input_a_np, input_b_np],
[output_a_np, output_b_np])
class TestTrainingWithMetrics(keras_parameterized.TestCase):
"""Training tests related to metrics."""
@keras_parameterized.run_all_keras_modes
def test_metrics_names(self):
a = layers_module.Input(shape=(3,), name='input_a')
b = layers_module.Input(shape=(3,), name='input_b')
dense = layers_module.Dense(4, name='dense')
c = dense(a)
d = dense(b)
e = layers_module.Dropout(0.5, name='dropout')(c)
model = training_module.Model([a, b], [d, e])
optimizer = RMSPropOptimizer(learning_rate=0.001)
metrics = ['mse', metrics_module.BinaryAccuracy()]
model.compile(
optimizer,
loss='mae',
metrics=metrics,
run_eagerly=testing_utils.should_run_eagerly())
mse_metric = 'mse' if context.executing_eagerly() else 'mean_squared_error'
reference_metric_names = [
'loss', 'dense_loss', 'dropout_loss', 'dense_' + mse_metric,
'dense_binary_accuracy', 'dropout_' + mse_metric,
'dropout_binary_accuracy'
]
input_a_np = np.random.random((10, 3))
input_b_np = np.random.random((10, 3))
output_d_np = np.random.random((10, 4))
output_e_np = np.random.random((10, 4))
model.fit([input_a_np, input_b_np], [output_d_np, output_e_np],
epochs=1,
batch_size=5)
self.assertEqual(reference_metric_names, model.metrics_names)
@keras_parameterized.run_all_keras_modes
def test_metric_state_reset_between_fit_and_evaluate(self):
model = sequential.Sequential()
model.add(layers_module.Dense(3, activation='relu', input_dim=4))
model.add(layers_module.Dense(1, activation='sigmoid'))
acc_obj = metrics_module.BinaryAccuracy()
model.compile(
loss='mae',
metrics=[acc_obj],
optimizer=RMSPropOptimizer(learning_rate=0.001),
run_eagerly=testing_utils.should_run_eagerly())
x_train = np.random.random((100, 4))
y_train = np.random.random((100, 1))
model.fit(x_train, y_train, batch_size=5, epochs=2)
self.assertEqual(self.evaluate(acc_obj.count), 100)
x_test = np.random.random((10, 4))
y_test = np.random.random((10, 1))
model.evaluate(x_test, y_test, batch_size=5)
self.assertEqual(self.evaluate(acc_obj.count), 10)
@keras_parameterized.run_with_all_model_types(exclude_models=['sequential'])
@keras_parameterized.run_all_keras_modes
def test_metrics_valid_compile_input_formats(self):
inp_1 = layers_module.Input(shape=(1,), name='input_1')
inp_2 = layers_module.Input(shape=(1,), name='input_2')
x = layers_module.Dense(3, kernel_initializer='ones', trainable=False)
out_1 = layers_module.Dense(
1, kernel_initializer='ones', name='output_1', trainable=False)
out_2 = layers_module.Dense(
1, kernel_initializer='ones', name='output_2', trainable=False)
branch_a = [inp_1, x, out_1]
branch_b = [inp_2, x, out_2]
model = testing_utils.get_multi_io_model(branch_a, branch_b)
# list of metrics.
model.compile(
optimizer='rmsprop',
loss='mse',
metrics=[metrics_module.MeanSquaredError()],
weighted_metrics=[metrics_module.MeanSquaredError()],
run_eagerly=testing_utils.should_run_eagerly())
# list of list of metrics.
model.compile(
optimizer='rmsprop',
loss='mse',
metrics=[
metrics_module.MeanSquaredError(),
[metrics_module.MeanSquaredError(),
metrics_module.Accuracy()]
],
weighted_metrics=[
metrics_module.MeanSquaredError(),
[metrics_module.MeanSquaredError(),
metrics_module.Accuracy()]
],
run_eagerly=testing_utils.should_run_eagerly())
# dict of metrics.
model.compile(
optimizer='rmsprop',
loss='mse',
metrics={
'output_1':
metrics_module.MeanSquaredError(),
'output_2': [
metrics_module.MeanSquaredError(),
metrics_module.Accuracy()
],
},
weighted_metrics={
'output_1':
metrics_module.MeanSquaredError(),
'output_2': [
metrics_module.MeanSquaredError(),
metrics_module.Accuracy()
],
},
run_eagerly=testing_utils.should_run_eagerly())
@keras_parameterized.run_all_keras_modes
def test_metrics_masking(self):
np.random.seed(1337)
model = sequential.Sequential()
model.add(layers_module.Masking(mask_value=0, input_shape=(2, 1)))
model.add(
layers_module.TimeDistributed(
layers_module.Dense(1, kernel_initializer='ones')))
model.compile(
RMSPropOptimizer(learning_rate=0.001),
loss='mse',
weighted_metrics=['accuracy'],
run_eagerly=testing_utils.should_run_eagerly())
# verify that masking is applied.
x = np.array([[[1], [1]], [[1], [1]], [[0], [0]]])
y = np.array([[[1], [1]], [[0], [1]], [[1], [1]]])
scores = model.train_on_batch(x, y)
self.assertArrayNear(scores, [0.25, 0.75], 0.1)
# verify that masking is combined with sample weights.
w = np.array([3, 2, 4])
scores = model.train_on_batch(x, y, sample_weight=w)
self.assertArrayNear(scores, [0.3328, 0.8], 0.001)
@keras_parameterized.run_all_keras_modes
def test_add_metric_with_tensor_on_model(self):
x = layers_module.Input(shape=(1,))
y = layers_module.Dense(1, kernel_initializer='ones')(x)
model = training_module.Model(x, y)
model.add_metric(
math_ops.reduce_sum(y), name='metric_1', aggregation='mean')
if context.executing_eagerly():
# This is not a use case in v1 graph mode.
mean_result = metrics_module.Mean()(y)
with self.assertRaisesRegex(
ValueError, 'Expected a symbolic Tensor for the metric value'):
model.add_metric(mean_result, name='metric_2')
else:
with self.assertRaisesRegex(
ValueError, 'Using the result of calling a `Metric` object '):
with backend.get_graph().as_default():
model.add_metric(metrics_module.Mean(name='metric_2')(y))
model.compile(
'sgd',
loss='mse',
run_eagerly=testing_utils.should_run_eagerly())
inputs = np.ones(shape=(10, 1))
targets = np.ones(shape=(10, 1))
history = model.fit(
inputs,
targets,
epochs=2,
batch_size=5,
validation_data=(inputs, targets))
self.assertEqual(history.history['metric_1'][-1], 5)
self.assertEqual(history.history['val_metric_1'][-1], 5)
eval_results = model.evaluate(inputs, targets, batch_size=5)
self.assertEqual(eval_results[-1], 5)
model.predict(inputs, batch_size=5)
model.train_on_batch(inputs, targets)
model.test_on_batch(inputs, targets)
@keras_parameterized.run_all_keras_modes
def test_add_metric_in_model_call(self):
class TestModel(training_module.Model):
def __init__(self):
super(TestModel, self).__init__(name='test_model')
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
self.mean = metrics_module.Mean(name='metric_1')
def call(self, x):
self.add_metric(
math_ops.reduce_sum(x), name='metric_2', aggregation='mean')
# Provide same name as in the instance created in __init__
# for eager mode
self.add_metric(self.mean(x), name='metric_1')
return self.dense1(x)
model = TestModel()
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(0.01),
run_eagerly=testing_utils.should_run_eagerly())
x = np.ones(shape=(10, 1))
y = np.ones(shape=(10, 2))
history = model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
self.assertAlmostEqual(history.history['metric_1'][-1], 1, 0)
self.assertAlmostEqual(history.history['val_metric_1'][-1], 1, 0)
self.assertAlmostEqual(history.history['metric_2'][-1], 5, 0)
self.assertAlmostEqual(history.history['val_metric_2'][-1], 5, 0)
eval_results = model.evaluate(x, y, batch_size=5)
self.assertAlmostEqual(eval_results[1], 1, 0)
self.assertAlmostEqual(eval_results[2], 5, 0)
model.predict(x, batch_size=5)
model.train_on_batch(x, y)
model.test_on_batch(x, y)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_add_metric_in_layer_call(self):
class TestLayer(layers_module.Layer):
def build(self, input_shape):
self.a = self.add_variable(
'a', (1, 1), initializer='ones', trainable=False)
self.built = True
def call(self, inputs):
self.add_metric(
math_ops.reduce_sum(inputs), name='metric_1', aggregation='mean')
return inputs + 1
layers = [
TestLayer(input_shape=(1,)),
layers_module.Dense(2, kernel_initializer='ones')
]
model = testing_utils.get_model_from_layers(layers, input_shape=(1,))
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(0.01),
run_eagerly=testing_utils.should_run_eagerly())
x = np.ones(shape=(10, 1))
y = np.ones(shape=(10, 2))
history = model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
self.assertEqual(history.history['metric_1'][-1], 5)
self.assertAlmostEqual(history.history['val_metric_1'][-1], 5, 0)
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_model_metrics_list(self):
class LayerWithAddMetric(layers_module.Layer):
def __init__(self):
super(LayerWithAddMetric, self).__init__()
self.dense = layers_module.Dense(1, kernel_initializer='ones')
def __call__(self, inputs):
outputs = self.dense(inputs)
self.add_metric(
math_ops.reduce_sum(outputs), name='metric_1', aggregation='mean')
return outputs
class LayerWithNestedAddMetricLayer(layers_module.Layer):
def __init__(self):
super(LayerWithNestedAddMetricLayer, self).__init__()
self.layer = LayerWithAddMetric()
def call(self, inputs):
outputs = self.layer(inputs)
self.add_metric(
math_ops.reduce_sum(outputs), name='metric_2', aggregation='mean')
return outputs
x = layers_module.Input(shape=(1,))
y = LayerWithNestedAddMetricLayer()(x)
model = training_module.Model(x, y)
model.add_metric(
math_ops.reduce_sum(y), name='metric_3', aggregation='mean')
if context.executing_eagerly():
# This is not a use case in v1 graph mode.
mean_result = metrics_module.Mean()(y)
with self.assertRaisesRegex(
ValueError, 'Expected a symbolic Tensor for the metric value'):
model.add_metric(mean_result, name='metric_4')
else:
with self.assertRaisesRegex(
ValueError, 'Using the result of calling a `Metric` object '):
with backend.get_graph().as_default():
model.add_metric(metrics_module.Mean(name='metric_4')(y))
model.compile(
'sgd',
loss='mse',
metrics=[metrics_module.Accuracy('metric_4')],
run_eagerly=testing_utils.should_run_eagerly())
model.fit(np.ones((10, 1)), np.ones((10, 1)), batch_size=10)
# Verify that the metrics added using `compile` and `add_metric` API are
# included
self.assertEqual([m.name for m in model.metrics],
['loss', 'metric_4', 'metric_2', 'metric_1', 'metric_3'])
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_model_metrics_list_in_call(self):
class TestModel(training_module.Model):
def __init__(self):
super(TestModel, self).__init__(name='test_model')
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
def call(self, x):
self.add_metric(
math_ops.reduce_sum(x), name='metric_1', aggregation='mean')
return self.dense1(x)
model = TestModel()
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(0.01),
metrics=[metrics_module.Accuracy('acc')],
run_eagerly=testing_utils.should_run_eagerly())
x = np.ones(shape=(10, 1))
y = np.ones(shape=(10, 2))
model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
self.assertEqual([m.name for m in model.metrics],
['loss', 'acc', 'metric_1'])
@keras_parameterized.run_all_keras_modes
def test_multiple_add_metric_calls(self):
class TestModel(training_module.Model):
def __init__(self):
super(TestModel, self).__init__(name='test_model')
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
self.mean1 = metrics_module.Mean(name='metric_1')
self.mean2 = metrics_module.Mean(name='metric_2')
def call(self, x):
self.add_metric(self.mean2(x), name='metric_2')
self.add_metric(self.mean1(x), name='metric_1')
self.add_metric(
math_ops.reduce_sum(x), name='metric_3', aggregation='mean')
return self.dense1(x)
model = TestModel()
self.assertListEqual([m.name for m in model.metrics],
['metric_1', 'metric_2'])
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(0.01),
run_eagerly=testing_utils.should_run_eagerly())
x = np.ones(shape=(10, 1))
y = np.ones(shape=(10, 2))
history = model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
self.assertAlmostEqual(history.history['metric_1'][-1], 1, 0)
self.assertAlmostEqual(history.history['metric_2'][-1], 1, 0)
self.assertAlmostEqual(history.history['metric_3'][-1], 5, 0)
eval_results = model.evaluate(x, y, batch_size=5)
self.assertArrayNear(eval_results[1:4], [1, 1, 5], 0.1)
model.predict(x, batch_size=5)
model.train_on_batch(x, y)
model.test_on_batch(x, y)
@keras_parameterized.run_all_keras_modes
def test_multiple_add_metric_calls_layer(self):
class TestLayer(layers_module.Layer):
def __init__(self):
super(TestLayer, self).__init__(name='test_layer')
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
self.m1 = metrics_module.Mean(name='m_1')
self.m2 = [
metrics_module.Mean(name='m_2'),
metrics_module.Mean(name='m_3')
]
self.m3 = {
'mean4': metrics_module.Mean(name='m_4'),
'mean5': metrics_module.Mean(name='m_5')
}
def call(self, x):
self.add_metric(self.m2[0](x))
self.add_metric(self.m2[1](x))
self.add_metric(self.m1(x))
self.add_metric(self.m3['mean4'](x))
self.add_metric(self.m3['mean5'](x))
self.add_metric(math_ops.reduce_sum(x), name='m_6', aggregation='mean')
return self.dense1(x)
layer = TestLayer()
self.assertListEqual([m.name for m in layer.metrics],
['m_1', 'm_2', 'm_3', 'm_4', 'm_5'])
layer(np.ones((10, 10)))
self.assertListEqual([m.name for m in layer.metrics],
['m_1', 'm_2', 'm_3', 'm_4', 'm_5', 'm_6'])
@keras_parameterized.run_all_keras_modes
def test_duplicate_metric_name_in_add_metric(self):
class TestModel(training_module.Model):
def __init__(self):
super(TestModel, self).__init__(name='test_model')
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
self.mean = metrics_module.Mean(name='metric_1')
self.mean2 = metrics_module.Mean(name='metric_1')
def call(self, x):
self.add_metric(self.mean(x), name='metric_1')
return self.dense1(x)
model = TestModel()
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(0.01),
run_eagerly=testing_utils.should_run_eagerly())
x = np.ones(shape=(10, 1))
y = np.ones(shape=(10, 2))
with self.assertRaisesRegex(
ValueError,
'Please provide different names for the metrics you have added. '
'We found 2 metrics with the name: "metric_1"'):
model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
@keras_parameterized.run_all_keras_modes
def test_add_metric_without_name(self):
class TestModel(training_module.Model):
def __init__(self):
super(TestModel, self).__init__(name='test_model')
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
def call(self, x):
self.add_metric(math_ops.reduce_sum(x), aggregation='mean')
return self.dense1(x)
model = TestModel()
model.compile(
loss='mse',
optimizer=RMSPropOptimizer(0.01),
run_eagerly=testing_utils.should_run_eagerly())
x = np.ones(shape=(10, 1))
y = np.ones(shape=(10, 2))
with self.assertRaisesRegex(ValueError,
'Please provide a name for your metric like'):
model.fit(x, y, epochs=2, batch_size=5, validation_data=(x, y))
@keras_parameterized.run_all_keras_modes
def test_add_metric_correctness(self):
inputs = input_layer.Input(shape=(1,))
targets = input_layer.Input(shape=(1,))
class Bias(layers_module.Layer):
def build(self, input_shape):
self.bias = self.add_variable('bias', (1,), initializer='zeros')
self.mae = metrics_module.MeanAbsoluteError(name='mae_1')
def call(self, inputs):
inputs, targets = inputs
outputs = inputs + self.bias
self.add_metric(self.mae(targets, outputs), name='mae_1')
return outputs
outputs = Bias()([inputs, targets])
model = training_module.Model([inputs, targets], outputs)
model.add_metric(
metrics_module.mean_absolute_error(targets, outputs),
name='mae_2',
aggregation='mean')
model.compile(
loss='mae',
optimizer=optimizer_v2.gradient_descent.SGD(0.1),
metrics=[metrics_module.MeanAbsoluteError(name='mae_3')],
run_eagerly=testing_utils.should_run_eagerly())
x = np.array([[0.], [1.], [2.]])
y = np.array([[0.5], [2.], [3.5]])
history = model.fit([x, y], y, batch_size=3, epochs=5)
expected_val = [1., 0.9, 0.8, 0.7, 0.6]
for key in ['loss', 'mae_1', 'mae_2', 'mae_3']:
self.assertAllClose(history.history[key], expected_val, 1e-3)
@keras_parameterized.run_all_keras_modes
def test_add_metric_order(self):
class MyLayer(layers_module.Layer):
def call(self, inputs, training=None, mask=None):
self.add_metric(
array_ops.ones([32]) * 2.0, name='two', aggregation='mean')
return inputs
class MyModel(training_module.Model):
def __init__(self, **kwargs):
super(MyModel, self).__init__(**kwargs)
self._sampler = MyLayer(name='sampler')
def call(self, inputs, training=None, mask=None):
z = self._sampler(inputs)
self.add_metric(
array_ops.ones([32]) * 1.0, name='one', aggregation='mean')
self.add_metric(
array_ops.ones([32]) * 3.0, name='three', aggregation='mean')
return z
xdata = np.random.uniform(size=[32, 16]).astype(np.float32)
dataset_train = dataset_ops.Dataset.from_tensor_slices((xdata, xdata))
dataset_train = dataset_train.batch(32, drop_remainder=True)
model = MyModel()
model.compile(
optimizer='sgd',
loss='mse',
run_eagerly=testing_utils.should_run_eagerly())
history = model.fit(dataset_train, epochs=3)
self.assertDictEqual(
history.history, {
'loss': [0.0, 0.0, 0.0],
'three': [3.0, 3.0, 3.0],
'two': [2.0, 2.0, 2.0],
'one': [1.0, 1.0, 1.0]
})
@keras_parameterized.run_all_keras_modes
def test_add_metric_aggregation_mean(self):
class TestModel(training_module.Model):
def __init__(self):
super(TestModel, self).__init__(name='test_model')
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
def call(self, x):
self.add_metric(
math_ops.reduce_sum(x), name='metric_1', aggregation='mean')
return self.dense1(x)
model = TestModel()
model.compile(
'rmsprop', 'mse', run_eagerly=testing_utils.should_run_eagerly())
model.fit(np.ones(shape=(10, 1)), np.ones(shape=(10, 2)), batch_size=5)
@keras_parameterized.run_all_keras_modes
def test_add_metric_aggregation_none(self):
class TestModel(training_module.Model):
def __init__(self):
super(TestModel, self).__init__(name='test_model')
self.dense1 = layers_module.Dense(2, kernel_initializer='ones')
self.mean = metrics_module.Mean(name='metric_1')
def call(self, x):
self.add_metric(self.mean(x), name='metric_1', aggregation=None)
return self.dense1(x)
model = TestModel()
model.compile(
'rmsprop', 'mse', run_eagerly=testing_utils.should_run_eagerly())
model.fit(np.ones(shape=(10, 1)), np.ones(shape=(10, 2)), batch_size=5)
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def DISABLED_test_add_metric_invalid_aggregation(self):
# TODO(psv): Reenable test once it is fixed.
x = layers_module.Input(shape=(1,))
y = layers_module.Dense(1, kernel_initializer='ones')(x)
model = training_module.Model(x, y)
with self.assertRaisesRegex(ValueError,
'only `mean` sample-wise metric aggregation'):
model.add_metric(
math_ops.reduce_sum(y), name='metric_1', aggregation='sum')
with self.assertRaisesRegex(ValueError,
'only `mean` sample-wise metric aggregation'):
model.add_metric(
math_ops.reduce_sum(y), name='metric_1', aggregation=None)
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_model_with_nested_compiled_model(self):
class LayerWithAddMetric(layers_module.Layer):
def __init__(self):
super(LayerWithAddMetric, self).__init__()
self.dense = layers_module.Dense(1, kernel_initializer='ones')
def call(self, inputs):
outputs = self.dense(inputs)
self.add_metric(
math_ops.reduce_sum(outputs), name='mean', aggregation='mean')
return outputs
x = layers_module.Input(shape=(1,))
y = LayerWithAddMetric()(x)
inner_model = training_module.Model(x, y)
inner_model.add_metric(
math_ops.reduce_sum(y), name='mean1', aggregation='mean')
inner_model.compile(
'sgd',
loss='mse',
metrics=[metrics_module.Accuracy('acc')],
run_eagerly=testing_utils.should_run_eagerly())
inner_model.fit(np.ones((10, 1)), np.ones((10, 1)), batch_size=10)
self.assertEqual([m.name for m in inner_model.metrics],
['loss', 'acc', 'mean', 'mean1'])
x = layers_module.Input(shape=[1])
y = inner_model(x)
outer_model = training_module.Model(x, y)
outer_model.add_metric(
math_ops.reduce_sum(y), name='mean2', aggregation='mean')
outer_model.compile(
'sgd',
loss='mse',
metrics=[metrics_module.Accuracy('acc2')],
run_eagerly=testing_utils.should_run_eagerly())
outer_model.fit(np.ones((10, 1)), np.ones((10, 1)), batch_size=10)
self.assertEqual([m.name for m in outer_model.metrics],
['loss', 'acc2', 'mean', 'mean1', 'mean2'])
class BareUpdateLayer(layers_module.Layer):
def build(self, input_shape):
self.counter = self.add_weight(
'counter',
dtype='int32',
shape=(),
initializer='zeros',
trainable=False)
def call(self, inputs):
state_ops.assign_add(self.counter, 1)
return math_ops.cast(self.counter, inputs.dtype) * inputs
class LambdaUpdateLayer(layers_module.Layer):
def build(self, input_shape):
self.counter = self.add_weight(
'counter',
dtype='int32',
shape=(),
initializer='zeros',
trainable=False)
def call(self, inputs):
# Make sure update isn't run twice.
self.add_update(lambda: state_ops.assign_add(self.counter, 1))
return math_ops.cast(self.counter, inputs.dtype) * inputs
class NestedUpdateLayer(layers_module.Layer):
def build(self, input_shape):
self.layer = BareUpdateLayer()
self.layer.build(input_shape)
@property
def counter(self):
return self.layer.counter
def call(self, inputs):
return self.layer(inputs)
class SubgraphUpdateLayer(layers_module.Layer):
def build(self, input_shape):
self.counter = self.add_weight(
'counter',
dtype='int32',
shape=(),
initializer='zeros',
trainable=False)
def call(self, inputs, training=None):
if training is None:
training = backend.learning_phase()
if training:
self.counter.assign(self.counter + 1)
return inputs
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
class TestAutoUpdates(keras_parameterized.TestCase):
@keras_parameterized.run_with_all_model_types
@parameterized.named_parameters(
('bare_update', BareUpdateLayer),
('lambda_update', LambdaUpdateLayer),
('nested_update', NestedUpdateLayer))
def test_updates_in_model(self, layer_builder):
layer = layer_builder()
x, y = np.ones((10, 10)), np.ones((10, 1))
model = testing_utils.get_model_from_layers(
[layer, layers_module.Dense(1)], input_shape=(10,))
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
model.fit(x, y, batch_size=2, epochs=1)
self.assertEqual(self.evaluate(layer.counter), 5)
@keras_parameterized.run_with_all_model_types
def test_lambda_updates_trainable_false(self):
x, y = np.ones((10, 10)), np.ones((10, 1))
layer = LambdaUpdateLayer()
model = testing_utils.get_model_from_layers(
[layer, layers_module.Dense(1)], input_shape=(10,))
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
model.fit(x, y, batch_size=2, epochs=1)
self.assertEqual(self.evaluate(layer.counter), 5)
layer.trainable = False
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
model.fit(x, y, batch_size=2, epochs=1)
self.assertEqual(self.evaluate(layer.counter), 5)
@keras_parameterized.run_with_all_model_types
def test_subgraph_updates_in_model(self):
layer = SubgraphUpdateLayer()
x, y = np.ones((10, 10)), np.ones((10, 1))
model = testing_utils.get_model_from_layers(
[layer, layers_module.Dense(1)], input_shape=(10,))
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
model.fit(x, y, batch_size=2, epochs=1)
self.assertEqual(self.evaluate(layer.counter), 5)
@parameterized.named_parameters(
('bare_update', BareUpdateLayer),
('lambda_update', LambdaUpdateLayer),
('nested_update', NestedUpdateLayer))
def test_updates_standalone_layer(self, layer_builder):
layer = layer_builder()
y = layer(np.ones((10, 10)))
self.evaluate(layer.counter.initializer)
self.evaluate(y)
self.assertEqual(self.evaluate(layer.counter), 1)
def test_trainable_false_standalone_layer(self):
layer = LambdaUpdateLayer()
y = layer(np.ones((10, 10)))
self.evaluate(layer.counter.initializer)
self.evaluate(y)
self.assertEqual(self.evaluate(layer.counter), 1)
layer.trainable = False
y = layer(np.ones((10, 10)))
self.evaluate(y)
self.assertEqual(self.evaluate(layer.counter), 1)
@keras_parameterized.run_with_all_model_types
def test_batchnorm_trainable_false(self):
bn = layers_module.BatchNormalization()
model = testing_utils.get_model_from_layers([bn, layers_module.Dense(1)],
input_shape=(10,))
bn.trainable = False
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly())
x, y = np.ones((10, 10)), np.ones((10, 1))
model.fit(x, y, batch_size=2, epochs=1)
self.assertAllEqual(self.evaluate(bn.moving_mean), np.zeros((10,)))
self.assertAllEqual(self.evaluate(bn.moving_variance), np.ones((10,)))
class TestFunctionTracing(keras_parameterized.TestCase):
@keras_parameterized.run_all_keras_modes(
always_skip_v1=True, always_skip_eager=True)
def test_no_tracing_between_epoch(self):
if sys.version_info[0] < 3:
self.skipTest('self.assertLogs() call is not available in Python 2.')
model = sequential.Sequential([layers_module.Dense(4, activation='relu')])
model.compile(loss='mse', optimizer='rmsprop')
x = np.random.random((10, 6))
y = np.random.random((10, 4))
logging.set_verbosity(1)
with self.assertLogs(level=1) as logs:
model.fit(x, y, epochs=10, batch_size=5, validation_data=(x, y))
new_func_graph = 'INFO:absl:Creating new FuncGraph for Python function'
self.assertEqual(sum(new_func_graph in log for log in logs.output), 9)
class TestBuildCustomModel(keras_parameterized.TestCase):
@keras_parameterized.run_all_keras_modes
def test_build_list_of_inputs(self):
class MyModel(training_module.Model):
def __init__(self):
super(MyModel, self).__init__()
self.l1 = layers_module.Dense(1)
self.l2 = layers_module.Dense(2)
def call(self, x):
a, b = x
return self.l1(a) + self.l2(b)
# List of tuples
model = MyModel()
model.build([(None, 1), (None, 2)])
self.assertEqual(model.l1.kernel.shape.as_list(), [1, 1])
self.assertEqual(model.l2.kernel.shape.as_list(), [2, 2])
# List of lists
model = MyModel()
model.build([[None, 1], [None, 2]])
self.assertEqual(model.l1.kernel.shape.as_list(), [1, 1])
self.assertEqual(model.l2.kernel.shape.as_list(), [2, 2])
@keras_parameterized.run_all_keras_modes
def test_build_single_inputs(self):
class MyModel(training_module.Model):
def __init__(self):
super(MyModel, self).__init__()
self.l1 = layers_module.Dense(1)
def call(self, x):
return self.l1(x)
model = MyModel()
model.build((None, 1))
self.assertEqual(model.l1.kernel.shape.as_list(), [1, 1])
model = MyModel()
model.build([None, 1])
self.assertEqual(model.l1.kernel.shape.as_list(), [1, 1])
if __name__ == '__main__':
test.main()