blob: bd266b5b282a5311bde938fc2b7d5bbbc24a9772 [file] [log] [blame]
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for cudnn recurrent layers."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import tempfile
from absl.testing import parameterized
import numpy as np
from tensorflow.python import keras
from tensorflow.python.framework import test_util
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import testing_utils
from tensorflow.python.keras.optimizer_v2.rmsprop import RMSprop
from tensorflow.python.ops import array_ops
from tensorflow.python.platform import test
from tensorflow.python.training import gradient_descent
@keras_parameterized.run_all_keras_modes
class CuDNNTest(keras_parameterized.TestCase):
@parameterized.named_parameters(
*test_util.generate_combinations_with_testcase_name(
layer_class=[keras.layers.CuDNNGRU, keras.layers.CuDNNLSTM],
return_sequences=[True, False]))
@test_util.run_gpu_only
def test_cudnn_rnn_return_sequence(self, layer_class, return_sequences):
input_size = 10
timesteps = 6
units = 2
num_samples = 32
testing_utils.layer_test(
layer_class,
kwargs={'units': units,
'return_sequences': return_sequences},
input_shape=(num_samples, timesteps, input_size))
@parameterized.named_parameters(
*test_util.generate_combinations_with_testcase_name(
layer_class=[keras.layers.CuDNNGRU, keras.layers.CuDNNLSTM],
go_backwards=[True, False]))
@test_util.run_gpu_only
def test_cudnn_rnn_go_backward(self, layer_class, go_backwards):
input_size = 10
timesteps = 6
units = 2
num_samples = 32
testing_utils.layer_test(
layer_class,
kwargs={'units': units,
'go_backwards': go_backwards},
input_shape=(num_samples, timesteps, input_size))
@parameterized.named_parameters(
('cudnngru', keras.layers.CuDNNGRU),
('cudnnlstm', keras.layers.CuDNNLSTM),
)
@test_util.run_gpu_only
def test_return_state(self, layer_class):
input_size = 10
timesteps = 6
units = 2
num_samples = 32
num_states = 2 if layer_class is keras.layers.CuDNNLSTM else 1
inputs = keras.Input(batch_shape=(num_samples, timesteps, input_size))
layer = layer_class(units, return_state=True, stateful=True)
outputs = layer(inputs)
_, state = outputs[0], outputs[1:]
self.assertEqual(len(state), num_states)
model = keras.models.Model(inputs, state[0])
model.run_eagerly = testing_utils.should_run_eagerly()
model._experimental_run_tf_function = testing_utils.should_run_tf_function()
inputs = np.random.random((num_samples, timesteps, input_size))
state = model.predict(inputs)
np.testing.assert_allclose(
keras.backend.eval(layer.states[0]), state, atol=1e-4)
@parameterized.named_parameters(
('cudnngru', keras.layers.CuDNNGRU),
('cudnnlstm', keras.layers.CuDNNLSTM),
)
@test_util.run_gpu_only
def test_time_major_input(self, layer_class):
input_size = 10
timesteps = 6
units = 2
num_samples = 32
model = keras.models.Sequential()
model.add(
keras.layers.Lambda(lambda t: array_ops.transpose(t, [1, 0, 2])))
layer = layer_class(units, time_major=True, return_sequences=True)
model.add(layer)
model.add(
keras.layers.Lambda(lambda t: array_ops.transpose(t, [1, 0, 2])))
model.compile(loss='categorical_crossentropy',
optimizer=RMSprop(learning_rate=0.001))
model.fit(
np.ones((num_samples, timesteps, input_size)),
np.ones((num_samples, timesteps, units)))
out = model.predict(np.ones((num_samples, timesteps, input_size)))
self.assertEqual(out.shape, (num_samples, timesteps, units))
@parameterized.named_parameters(
('cudnngru', keras.layers.CuDNNGRU),
('cudnnlstm', keras.layers.CuDNNLSTM),
)
@test_util.run_gpu_only
def test_specify_initial_state_keras_tensor(self, layer_class):
input_size = 10
timesteps = 6
units = 2
num_samples = 32
num_states = 2 if layer_class is keras.layers.CuDNNLSTM else 1
inputs = keras.Input((timesteps, input_size))
initial_state = [keras.Input((units,)) for _ in range(num_states)]
layer = layer_class(units)
if len(initial_state) == 1:
output = layer(inputs, initial_state=initial_state[0])
else:
output = layer(inputs, initial_state=initial_state)
self.assertIn(initial_state[0], layer._inbound_nodes[0].input_tensors)
model = keras.models.Model([inputs] + initial_state, output)
model.compile(
loss='categorical_crossentropy',
optimizer=RMSprop(learning_rate=0.001),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
inputs = np.random.random((num_samples, timesteps, input_size))
initial_state = [
np.random.random((num_samples, units)) for _ in range(num_states)
]
targets = np.random.random((num_samples, units))
model.fit([inputs] + initial_state, targets)
class CuDNNGraphOnlyTest(keras_parameterized.TestCase):
@parameterized.named_parameters(
('cudnngru', keras.layers.CuDNNGRU),
('cudnnlstm', keras.layers.CuDNNLSTM),
)
@test_util.run_deprecated_v1
@test_util.run_gpu_only
def test_regularizer(self, layer_class):
input_size = 10
timesteps = 6
units = 2
num_samples = 32
layer = layer_class(
units,
return_sequences=False,
input_shape=(timesteps, input_size),
kernel_regularizer=keras.regularizers.l1(0.01),
recurrent_regularizer=keras.regularizers.l1(0.01),
bias_regularizer='l2')
layer.build((None, None, input_size))
self.assertEqual(len(layer.losses), 3)
layer = layer_class(
units,
return_sequences=False,
input_shape=(timesteps, input_size),
activity_regularizer='l2')
self.assertTrue(layer.activity_regularizer)
x = keras.backend.variable(
np.ones((num_samples, timesteps, input_size)))
layer(x)
self.assertEqual(len(layer.get_losses_for(x)), 1)
@parameterized.named_parameters(
('cudnngru', keras.layers.CuDNNGRU),
('cudnnlstm', keras.layers.CuDNNLSTM),
)
@test_util.run_gpu_only
@test_util.run_v1_only('b/120941292')
def test_statefulness(self, layer_class):
input_size = 10
timesteps = 6
units = 2
num_samples = 32
with self.cached_session(use_gpu=True):
model = keras.models.Sequential()
model.add(
keras.layers.Embedding(
10,
input_size,
input_length=timesteps,
batch_input_shape=(num_samples, timesteps)))
layer = layer_class(
units, return_sequences=False, stateful=True, weights=None)
model.add(layer)
model.compile(optimizer=gradient_descent.GradientDescentOptimizer(0.01),
loss='mse')
out1 = model.predict(np.ones((num_samples, timesteps)))
self.assertEqual(out1.shape, (num_samples, units))
# train once so that the states change
model.train_on_batch(
np.ones((num_samples, timesteps)), np.ones((num_samples, units)))
out2 = model.predict(np.ones((num_samples, timesteps)))
# if the state is not reset, output should be different
self.assertNotEqual(out1.max(), out2.max())
# check that output changes after states are reset
# (even though the model itself didn't change)
layer.reset_states()
out3 = model.predict(np.ones((num_samples, timesteps)))
self.assertNotEqual(out2.max(), out3.max())
# check that container-level reset_states() works
model.reset_states()
out4 = model.predict(np.ones((num_samples, timesteps)))
self.assertAllClose(out3, out4, atol=1e-5)
# check that the call to `predict` updated the states
out5 = model.predict(np.ones((num_samples, timesteps)))
self.assertNotEqual(out4.max(), out5.max())
@test_util.run_all_in_graph_and_eager_modes
class CuDNNV1OnlyTest(keras_parameterized.TestCase):
@test_util.run_gpu_only
def test_trainability(self):
input_size = 10
units = 2
for layer_class in [keras.layers.CuDNNGRU, keras.layers.CuDNNLSTM]:
layer = layer_class(units)
layer.build((None, None, input_size))
self.assertEqual(len(layer.weights), 3)
self.assertEqual(len(layer.trainable_weights), 3)
self.assertEqual(len(layer.non_trainable_weights), 0)
layer.trainable = False
self.assertEqual(len(layer.weights), 3)
self.assertEqual(len(layer.non_trainable_weights), 3)
self.assertEqual(len(layer.trainable_weights), 0)
layer.trainable = True
self.assertEqual(len(layer.weights), 3)
self.assertEqual(len(layer.trainable_weights), 3)
self.assertEqual(len(layer.non_trainable_weights), 0)
@parameterized.named_parameters(
*test_util.generate_combinations_with_testcase_name(
rnn_type=['LSTM', 'GRU'], to_cudnn=[True, False],
bidirectional=[True, False], implementation=[1, 2],
model_nest_level=[1, 2], model_type=['seq', 'func']))
@test_util.run_v1_only('b/120911602, b/112083752')
@test_util.run_gpu_only
def test_load_weights_between_noncudnn_rnn(self, rnn_type, to_cudnn,
bidirectional, implementation,
model_nest_level, model_type):
input_size = 10
timesteps = 6
input_shape = (timesteps, input_size)
units = 2
num_samples = 32
inputs = np.random.random((num_samples, timesteps, input_size))
rnn_layer_kwargs = {
'recurrent_activation': 'sigmoid',
# ensure biases are non-zero and properly converted
'bias_initializer': 'random_uniform',
'implementation': implementation
}
if rnn_type == 'LSTM':
rnn_layer_class = keras.layers.LSTM
cudnn_rnn_layer_class = keras.layers.CuDNNLSTM
else:
rnn_layer_class = keras.layers.GRU
cudnn_rnn_layer_class = keras.layers.CuDNNGRU
rnn_layer_kwargs['reset_after'] = True
layer = rnn_layer_class(units, **rnn_layer_kwargs)
if bidirectional:
layer = keras.layers.Bidirectional(layer)
cudnn_layer = cudnn_rnn_layer_class(units)
if bidirectional:
cudnn_layer = keras.layers.Bidirectional(cudnn_layer)
model = self._make_nested_model(input_shape, layer, model_nest_level,
model_type)
cudnn_model = self._make_nested_model(input_shape, cudnn_layer,
model_nest_level, model_type)
if to_cudnn:
self._convert_model_weights(model, cudnn_model)
else:
self._convert_model_weights(cudnn_model, model)
self.assertAllClose(model.predict(inputs), cudnn_model.predict(inputs),
atol=1e-4)
def _make_nested_model(self, input_shape, layer, level=1, model_type='func'):
# example: make_nested_seq_model((1,), Dense(10), level=2).summary()
def make_nested_seq_model(input_shape, layer, level=1):
model = layer
for i in range(1, level + 1):
layers = [keras.layers.InputLayer(input_shape),
model] if (i == 1) else [model]
model = keras.models.Sequential(layers)
if i > 1:
model.build((None,) + input_shape)
return model
# example: make_nested_func_model((1,), Dense(10), level=2).summary()
def make_nested_func_model(input_shape, layer, level=1):
model_input = keras.layers.Input(input_shape)
model = layer
for _ in range(level):
model = keras.models.Model(model_input, model(model_input))
return model
if model_type == 'func':
return make_nested_func_model(input_shape, layer, level)
elif model_type == 'seq':
return make_nested_seq_model(input_shape, layer, level)
def _convert_model_weights(self, source_model, target_model):
_, fname = tempfile.mkstemp('.h5')
source_model.save_weights(fname)
target_model.load_weights(fname)
os.remove(fname)
@parameterized.named_parameters(
*test_util.generate_combinations_with_testcase_name(
rnn_type=['LSTM', 'GRU'], to_cudnn=[True, False]))
@test_util.run_v1_only('b/120911602')
@test_util.run_gpu_only
def test_load_weights_between_noncudnn_rnn_time_distributed(self, rnn_type,
to_cudnn):
# Similar test as test_load_weights_between_noncudnn_rnn() but has different
# rank of input due to usage of TimeDistributed. Issue: #10356.
input_size = 10
steps = 6
timesteps = 6
input_shape = (timesteps, steps, input_size)
units = 2
num_samples = 32
inputs = np.random.random((num_samples, timesteps, steps, input_size))
rnn_layer_kwargs = {
'recurrent_activation': 'sigmoid',
# ensure biases are non-zero and properly converted
'bias_initializer': 'random_uniform',
}
if rnn_type == 'LSTM':
rnn_layer_class = keras.layers.LSTM
cudnn_rnn_layer_class = keras.layers.CuDNNLSTM
else:
rnn_layer_class = keras.layers.GRU
cudnn_rnn_layer_class = keras.layers.CuDNNGRU
rnn_layer_kwargs['reset_after'] = True
layer = rnn_layer_class(units, **rnn_layer_kwargs)
layer = keras.layers.TimeDistributed(layer)
cudnn_layer = cudnn_rnn_layer_class(units)
cudnn_layer = keras.layers.TimeDistributed(cudnn_layer)
model = self._make_nested_model(input_shape, layer)
cudnn_model = self._make_nested_model(input_shape, cudnn_layer)
if to_cudnn:
self._convert_model_weights(model, cudnn_model)
else:
self._convert_model_weights(cudnn_model, model)
self.assertAllClose(model.predict(inputs), cudnn_model.predict(inputs),
atol=1e-4)
@test_util.run_gpu_only
def test_cudnnrnn_bidirectional(self):
rnn = keras.layers.CuDNNGRU
samples = 2
dim = 2
timesteps = 2
output_dim = 2
mode = 'concat'
x = np.random.random((samples, timesteps, dim))
target_dim = 2 * output_dim if mode == 'concat' else output_dim
y = np.random.random((samples, target_dim))
# test with Sequential model
model = keras.Sequential()
model.add(
keras.layers.Bidirectional(
rnn(output_dim), merge_mode=mode, input_shape=(None, dim)))
model.compile(loss='mse', optimizer='rmsprop')
model.fit(x, y, epochs=1, batch_size=1)
# test config
model.get_config()
model = keras.models.model_from_json(model.to_json())
model.summary()
# test stacked bidirectional layers
model = keras.Sequential()
model.add(
keras.layers.Bidirectional(
rnn(output_dim, return_sequences=True),
merge_mode=mode,
input_shape=(None, dim)))
model.add(keras.layers.Bidirectional(rnn(output_dim), merge_mode=mode))
model.compile(loss='mse', optimizer=R'rmsprop')
model.fit(x, y, epochs=1, batch_size=1)
# test with functional API
inputs = keras.Input((timesteps, dim))
outputs = keras.layers.Bidirectional(
rnn(output_dim), merge_mode=mode)(
inputs)
model = keras.Model(inputs, outputs)
model.compile(loss='mse', optimizer=R'rmsprop')
model.fit(x, y, epochs=1, batch_size=1)
# Bidirectional and stateful
inputs = keras.Input(batch_shape=(1, timesteps, dim))
outputs = keras.layers.Bidirectional(
rnn(output_dim, stateful=True), merge_mode=mode)(
inputs)
model = keras.Model(inputs, outputs)
model.compile(loss='mse', optimizer='rmsprop')
model.fit(x, y, epochs=1, batch_size=1)
@test_util.run_gpu_only
def test_preprocess_weights_for_loading_gru_incompatible(self):
"""Test loading weights between incompatible layers.
Should fail fast with an exception.
"""
input_shape = (3, 5)
def gru(cudnn=False, **kwargs):
layer_class = keras.layers.CuDNNGRU if cudnn else keras.layers.GRU
return layer_class(2, input_shape=input_shape, **kwargs)
def get_layer_weights(layer):
layer.build(input_shape=input_shape)
return layer.get_weights()
def assert_not_compatible(src, dest, message):
with self.assertRaises(ValueError) as ex:
keras.saving.preprocess_weights_for_loading(
dest,
get_layer_weights(src))
self.assertIn(message, str(ex.exception))
assert_not_compatible(
gru(),
gru(cudnn=True),
'GRU(reset_after=False) is not compatible with CuDNNGRU')
assert_not_compatible(
gru(cudnn=True),
gru(),
'CuDNNGRU is not compatible with GRU(reset_after=False)')
assert_not_compatible(
gru(),
gru(reset_after=True),
'GRU(reset_after=False) is not compatible with '
'GRU(reset_after=True)')
assert_not_compatible(
gru(reset_after=True),
gru(),
'GRU(reset_after=True) is not compatible with '
'GRU(reset_after=False)')
if __name__ == '__main__':
test.main()