blob: c3708d92688816543344610dace4cd94e998e3d4 [file] [log] [blame]
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for LSTM layer."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from absl.testing import parameterized
import numpy as np
from tensorflow.python import keras
from tensorflow.python.eager import context
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import testing_utils
from tensorflow.python.platform import test
from tensorflow.python.training import adam
from tensorflow.python.training import gradient_descent
@keras_parameterized.run_all_keras_modes
class LSTMLayerTest(keras_parameterized.TestCase):
def test_return_sequences_LSTM(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
testing_utils.layer_test(
keras.layers.LSTM,
kwargs={'units': units,
'return_sequences': True},
input_shape=(num_samples, timesteps, embedding_dim))
def test_float64_LSTM(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
testing_utils.layer_test(
keras.layers.LSTM,
kwargs={'units': units,
'return_sequences': True,
'dtype': 'float64'},
input_shape=(num_samples, timesteps, embedding_dim),
input_dtype='float64')
def test_static_shape_inference_LSTM(self):
# Github issue: 15165
timesteps = 3
embedding_dim = 4
units = 2
model = keras.models.Sequential()
inputs = keras.layers.Dense(embedding_dim,
input_shape=(timesteps, embedding_dim))
model.add(inputs)
layer = keras.layers.LSTM(units, return_sequences=True)
model.add(layer)
outputs = model.layers[-1].output
self.assertEqual(outputs.shape.as_list(), [None, timesteps, units])
def test_dynamic_behavior_LSTM(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
layer = keras.layers.LSTM(units, input_shape=(None, embedding_dim))
model = keras.models.Sequential()
model.add(layer)
model.compile(
'rmsprop',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.random.random((num_samples, timesteps, embedding_dim))
y = np.random.random((num_samples, units))
model.train_on_batch(x, y)
def test_dropout_LSTM(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
testing_utils.layer_test(
keras.layers.LSTM,
kwargs={'units': units,
'dropout': 0.1,
'recurrent_dropout': 0.1},
input_shape=(num_samples, timesteps, embedding_dim))
@parameterized.parameters([0, 1, 2])
def test_implementation_mode_LSTM(self, implementation_mode):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
testing_utils.layer_test(
keras.layers.LSTM,
kwargs={'units': units,
'implementation': implementation_mode},
input_shape=(num_samples, timesteps, embedding_dim))
def test_constraints_LSTM(self):
embedding_dim = 4
layer_class = keras.layers.LSTM
k_constraint = keras.constraints.max_norm(0.01)
r_constraint = keras.constraints.max_norm(0.01)
b_constraint = keras.constraints.max_norm(0.01)
layer = layer_class(
5,
return_sequences=False,
weights=None,
input_shape=(None, embedding_dim),
kernel_constraint=k_constraint,
recurrent_constraint=r_constraint,
bias_constraint=b_constraint)
layer.build((None, None, embedding_dim))
self.assertEqual(layer.cell.kernel.constraint, k_constraint)
self.assertEqual(layer.cell.recurrent_kernel.constraint, r_constraint)
self.assertEqual(layer.cell.bias.constraint, b_constraint)
def test_with_masking_layer_LSTM(self):
layer_class = keras.layers.LSTM
inputs = np.random.random((2, 3, 4))
targets = np.abs(np.random.random((2, 3, 5)))
targets /= targets.sum(axis=-1, keepdims=True)
model = keras.models.Sequential()
model.add(keras.layers.Masking(input_shape=(3, 4)))
model.add(layer_class(units=5, return_sequences=True, unroll=False))
model.compile(
loss='categorical_crossentropy',
optimizer='rmsprop',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1)
def test_masking_with_stacking_LSTM(self):
inputs = np.random.random((2, 3, 4))
targets = np.abs(np.random.random((2, 3, 5)))
targets /= targets.sum(axis=-1, keepdims=True)
model = keras.models.Sequential()
model.add(keras.layers.Masking(input_shape=(3, 4)))
lstm_cells = [keras.layers.LSTMCell(10), keras.layers.LSTMCell(5)]
model.add(keras.layers.RNN(lstm_cells, return_sequences=True, unroll=False))
model.compile(
loss='categorical_crossentropy',
optimizer='rmsprop',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1)
def test_from_config_LSTM(self):
layer_class = keras.layers.LSTM
for stateful in (False, True):
l1 = layer_class(units=1, stateful=stateful)
l2 = layer_class.from_config(l1.get_config())
assert l1.get_config() == l2.get_config()
def test_specify_initial_state_keras_tensor(self):
num_states = 2
timesteps = 3
embedding_dim = 4
units = 3
num_samples = 2
# Test with Keras tensor
inputs = keras.Input((timesteps, embedding_dim))
initial_state = [keras.Input((units,)) for _ in range(num_states)]
layer = keras.layers.LSTM(units)
if len(initial_state) == 1:
output = layer(inputs, initial_state=initial_state[0])
else:
output = layer(inputs, initial_state=initial_state)
assert initial_state[0] in layer._inbound_nodes[0].input_tensors
model = keras.models.Model([inputs] + initial_state, output)
model.compile(
loss='categorical_crossentropy',
optimizer=adam.AdamOptimizer(),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
inputs = np.random.random((num_samples, timesteps, embedding_dim))
initial_state = [np.random.random((num_samples, units))
for _ in range(num_states)]
targets = np.random.random((num_samples, units))
model.train_on_batch([inputs] + initial_state, targets)
def test_specify_initial_state_non_keras_tensor(self):
num_states = 2
timesteps = 3
embedding_dim = 4
units = 3
num_samples = 2
# Test with non-Keras tensor
inputs = keras.Input((timesteps, embedding_dim))
initial_state = [keras.backend.random_normal_variable(
(num_samples, units), 0, 1)
for _ in range(num_states)]
layer = keras.layers.LSTM(units)
output = layer(inputs, initial_state=initial_state)
model = keras.models.Model(inputs, output)
model.compile(
loss='categorical_crossentropy',
optimizer=adam.AdamOptimizer(),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
inputs = np.random.random((num_samples, timesteps, embedding_dim))
targets = np.random.random((num_samples, units))
model.train_on_batch(inputs, targets)
def test_reset_states_with_values(self):
num_states = 2
timesteps = 3
embedding_dim = 4
units = 3
num_samples = 2
layer = keras.layers.LSTM(units, stateful=True)
layer.build((num_samples, timesteps, embedding_dim))
layer.reset_states()
assert len(layer.states) == num_states
assert layer.states[0] is not None
self.assertAllClose(
keras.backend.eval(layer.states[0]),
np.zeros(keras.backend.int_shape(layer.states[0])),
atol=1e-4)
state_shapes = [keras.backend.int_shape(state) for state in layer.states]
values = [np.ones(shape) for shape in state_shapes]
if len(values) == 1:
values = values[0]
layer.reset_states(values)
self.assertAllClose(
keras.backend.eval(layer.states[0]),
np.ones(keras.backend.int_shape(layer.states[0])),
atol=1e-4)
# Test with invalid data
with self.assertRaises(ValueError):
layer.reset_states([1] * (len(layer.states) + 1))
def test_specify_state_with_masking(self):
num_states = 2
timesteps = 3
embedding_dim = 4
units = 3
num_samples = 2
inputs = keras.Input((timesteps, embedding_dim))
_ = keras.layers.Masking()(inputs)
initial_state = [keras.Input((units,)) for _ in range(num_states)]
output = keras.layers.LSTM(units)(inputs, initial_state=initial_state)
model = keras.models.Model([inputs] + initial_state, output)
model.compile(
loss='categorical_crossentropy',
optimizer='rmsprop',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
inputs = np.random.random((num_samples, timesteps, embedding_dim))
initial_state = [np.random.random((num_samples, units))
for _ in range(num_states)]
targets = np.random.random((num_samples, units))
model.train_on_batch([inputs] + initial_state, targets)
def test_return_state(self):
num_states = 2
timesteps = 3
embedding_dim = 4
units = 3
num_samples = 2
inputs = keras.Input(batch_shape=(num_samples, timesteps, embedding_dim))
layer = keras.layers.LSTM(units, return_state=True, stateful=True)
outputs = layer(inputs)
state = outputs[1:]
assert len(state) == num_states
model = keras.models.Model(inputs, state[0])
inputs = np.random.random((num_samples, timesteps, embedding_dim))
state = model.predict(inputs)
self.assertAllClose(keras.backend.eval(layer.states[0]), state, atol=1e-4)
def test_state_reuse(self):
timesteps = 3
embedding_dim = 4
units = 3
num_samples = 2
inputs = keras.Input(batch_shape=(num_samples, timesteps, embedding_dim))
layer = keras.layers.LSTM(units, return_state=True, return_sequences=True)
outputs = layer(inputs)
output, state = outputs[0], outputs[1:]
output = keras.layers.LSTM(units)(output, initial_state=state)
model = keras.models.Model(inputs, output)
inputs = np.random.random((num_samples, timesteps, embedding_dim))
outputs = model.predict(inputs)
def test_initial_states_as_other_inputs(self):
timesteps = 3
embedding_dim = 4
units = 3
num_samples = 2
num_states = 2
layer_class = keras.layers.LSTM
# Test with Keras tensor
main_inputs = keras.Input((timesteps, embedding_dim))
initial_state = [keras.Input((units,)) for _ in range(num_states)]
inputs = [main_inputs] + initial_state
layer = layer_class(units)
output = layer(inputs)
assert initial_state[0] in layer._inbound_nodes[0].input_tensors
model = keras.models.Model(inputs, output)
model.compile(
loss='categorical_crossentropy',
optimizer=adam.AdamOptimizer(),
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
main_inputs = np.random.random((num_samples, timesteps, embedding_dim))
initial_state = [np.random.random((num_samples, units))
for _ in range(num_states)]
targets = np.random.random((num_samples, units))
model.train_on_batch([main_inputs] + initial_state, targets)
def test_regularizers_LSTM(self):
embedding_dim = 4
layer_class = keras.layers.LSTM
layer = layer_class(
5,
return_sequences=False,
weights=None,
input_shape=(None, embedding_dim),
kernel_regularizer=keras.regularizers.l1(0.01),
recurrent_regularizer=keras.regularizers.l1(0.01),
bias_regularizer='l2',
activity_regularizer='l1')
layer.build((None, None, 2))
self.assertEqual(len(layer.losses), 3)
x = keras.backend.variable(np.ones((2, 3, 2)))
layer(x)
if context.executing_eagerly():
self.assertEqual(len(layer.losses), 4)
else:
self.assertEqual(len(layer.get_losses_for(x)), 1)
def test_statefulness_LSTM(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
layer_class = keras.layers.LSTM
model = keras.models.Sequential()
model.add(
keras.layers.Embedding(
4,
embedding_dim,
mask_zero=True,
input_length=timesteps,
batch_input_shape=(num_samples, timesteps)))
layer = layer_class(
units, return_sequences=False, stateful=True, weights=None)
model.add(layer)
model.compile(
optimizer=gradient_descent.GradientDescentOptimizer(0.01),
loss='mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
out1 = model.predict(np.ones((num_samples, timesteps)))
self.assertEqual(out1.shape, (num_samples, units))
# train once so that the states change
model.train_on_batch(
np.ones((num_samples, timesteps)), np.ones((num_samples, units)))
out2 = model.predict(np.ones((num_samples, timesteps)))
# if the state is not reset, output should be different
self.assertNotEqual(out1.max(), out2.max())
# check that output changes after states are reset
# (even though the model itself didn't change)
layer.reset_states()
out3 = model.predict(np.ones((num_samples, timesteps)))
self.assertNotEqual(out2.max(), out3.max())
# check that container-level reset_states() works
model.reset_states()
out4 = model.predict(np.ones((num_samples, timesteps)))
self.assertAllClose(out3, out4, atol=1e-5)
# check that the call to `predict` updated the states
out5 = model.predict(np.ones((num_samples, timesteps)))
self.assertNotEqual(out4.max(), out5.max())
# Check masking
layer.reset_states()
left_padded_input = np.ones((num_samples, timesteps))
left_padded_input[0, :1] = 0
left_padded_input[1, :2] = 0
out6 = model.predict(left_padded_input)
layer.reset_states()
right_padded_input = np.ones((num_samples, timesteps))
right_padded_input[0, -1:] = 0
right_padded_input[1, -2:] = 0
out7 = model.predict(right_padded_input)
self.assertAllClose(out7, out6, atol=1e-5)
if __name__ == '__main__':
test.main()