blob: 94e7f354cd56ac684a232f211ae39f88ff55bfd1 [file] [log] [blame]
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for V2 LSTM layer."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import shutil
import time
from absl.testing import parameterized
import numpy as np
from tensorflow.core.protobuf import config_pb2
from tensorflow.core.protobuf import rewriter_config_pb2
from tensorflow.python import keras
from tensorflow.python.client import session as session_lib
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.eager import context
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import random_seed
from tensorflow.python.framework import test_util
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import testing_utils
from tensorflow.python.keras.layers import recurrent as rnn_v1
from tensorflow.python.keras.layers import recurrent_v2 as rnn
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import gen_math_ops
from tensorflow.python.ops import variables
from tensorflow.python.platform import test
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.training import gradient_descent
from tensorflow.python.util import nest
# Global config for grappler setting that is used for graph mode test.
_rewrites = rewriter_config_pb2.RewriterConfig()
_rewrites.implementation_selector = rewriter_config_pb2.RewriterConfig.ON
_rewrites.min_graph_nodes = -1
_graph_options = config_pb2.GraphOptions(rewrite_options=_rewrites)
_config = config_pb2.ConfigProto(graph_options=_graph_options)
@keras_parameterized.run_all_keras_modes(config=_config)
class LSTMV2Test(keras_parameterized.TestCase):
@parameterized.named_parameters(
('non_tan_activation', 'relu', 'sigmoid', 0, False, True),
('non_sigmoid_recur_activation', 'tanh', 'relu', 0, False, True),
('use_recurrent_dropout', 'tanh', 'sigmoid', 0.1, False, True),
('unroll', 'tanh', 'sigmoid', 0, True, True),
('not_use_bias', 'tanh', 'sigmoid', 0, False, False),
)
def test_could_use_defun_backend(self, activation, recurrent_activation,
recurrent_dropout, unroll, use_bias):
layer = rnn.LSTM(
1,
activation=activation,
recurrent_activation=recurrent_activation,
recurrent_dropout=recurrent_dropout,
unroll=unroll,
use_bias=use_bias)
self.assertFalse(layer.could_use_cudnn)
def test_static_shape_inference_LSTM(self):
# Github issue: 15165
timesteps = 3
embedding_dim = 4
units = 2
model = keras.models.Sequential()
inputs = keras.layers.Dense(
embedding_dim, input_shape=(timesteps, embedding_dim))
model.add(inputs)
layer = rnn.LSTM(units, return_sequences=True)
model.add(layer)
outputs = model.layers[-1].output
self.assertEqual(outputs.shape.as_list(), [None, timesteps, units])
def test_dynamic_behavior_LSTM(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
layer = rnn.LSTM(units, input_shape=(None, embedding_dim))
model = keras.models.Sequential()
model.add(layer)
model.compile(gradient_descent.GradientDescentOptimizer(0.001), 'mse')
x = np.random.random((num_samples, timesteps, embedding_dim))
y = np.random.random((num_samples, units))
model.train_on_batch(x, y)
def test_stacking_LSTM(self):
inputs = np.random.random((2, 3, 4))
targets = np.abs(np.random.random((2, 3, 5)))
targets /= targets.sum(axis=-1, keepdims=True)
model = keras.models.Sequential()
model.add(rnn.LSTM(10, return_sequences=True, unroll=False))
model.add(rnn.LSTM(5, return_sequences=True, unroll=False))
model.compile(
loss='categorical_crossentropy',
optimizer=gradient_descent.GradientDescentOptimizer(0.01))
model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1)
def test_from_config_LSTM(self):
layer_class = rnn.LSTM
for stateful in (False, True):
l1 = layer_class(units=1, stateful=stateful)
l2 = layer_class.from_config(l1.get_config())
assert l1.get_config() == l2.get_config()
def test_specify_initial_state_keras_tensor(self):
num_states = 2
timesteps = 3
embedding_dim = 4
units = 3
num_samples = 2
# Test with Keras tensor
inputs = keras.Input((timesteps, embedding_dim))
initial_state = [keras.Input((units,)) for _ in range(num_states)]
layer = rnn.LSTM(units)
if len(initial_state) == 1:
output = layer(inputs, initial_state=initial_state[0])
else:
output = layer(inputs, initial_state=initial_state)
assert initial_state[0] in layer._inbound_nodes[0].input_tensors
model = keras.models.Model([inputs] + initial_state, output)
model.compile(
loss='categorical_crossentropy',
optimizer=gradient_descent.GradientDescentOptimizer(0.01))
inputs = np.random.random((num_samples, timesteps, embedding_dim))
initial_state = [
np.random.random((num_samples, units)) for _ in range(num_states)
]
targets = np.random.random((num_samples, units))
model.train_on_batch([inputs] + initial_state, targets)
def test_specify_initial_state_non_keras_tensor(self):
num_states = 2
timesteps = 3
embedding_dim = 4
units = 3
num_samples = 2
# Test with non-Keras tensor
inputs = keras.Input((timesteps, embedding_dim))
initial_state = [
keras.backend.random_normal_variable((num_samples, units), 0, 1)
for _ in range(num_states)
]
layer = rnn.LSTM(units)
output = layer(inputs, initial_state=initial_state)
model = keras.models.Model(inputs, output)
model.compile(
loss='categorical_crossentropy',
optimizer=gradient_descent.GradientDescentOptimizer(0.01))
inputs = np.random.random((num_samples, timesteps, embedding_dim))
targets = np.random.random((num_samples, units))
model.train_on_batch(inputs, targets)
def test_reset_states_with_values(self):
num_states = 2
timesteps = 3
embedding_dim = 4
units = 3
num_samples = 2
layer = rnn.LSTM(units, stateful=True)
layer.build((num_samples, timesteps, embedding_dim))
initial_weight_count = len(layer.weights)
layer.reset_states()
assert len(layer.states) == num_states
assert layer.states[0] is not None
self.assertAllClose(
keras.backend.eval(layer.states[0]),
np.zeros(keras.backend.int_shape(layer.states[0])),
atol=1e-4)
state_shapes = [keras.backend.int_shape(state) for state in layer.states]
values = [np.ones(shape) for shape in state_shapes]
if len(values) == 1:
values = values[0]
layer.reset_states(values)
self.assertAllClose(
keras.backend.eval(layer.states[0]),
np.ones(keras.backend.int_shape(layer.states[0])),
atol=1e-4)
# Test with invalid data
with self.assertRaises(ValueError):
layer.reset_states([1] * (len(layer.states) + 1))
self.assertEqual(initial_weight_count, len(layer.weights))
# Variables in "states" shouldn't show up in .weights
layer.states = nest.map_structure(variables.Variable, values)
layer.reset_states()
self.assertEqual(initial_weight_count, len(layer.weights))
def test_specify_state_with_masking(self):
num_states = 2
timesteps = 3
embedding_dim = 4
units = 3
num_samples = 2
inputs = keras.Input((timesteps, embedding_dim))
_ = keras.layers.Masking()(inputs)
initial_state = [keras.Input((units,)) for _ in range(num_states)]
output = rnn.LSTM(units)(
inputs, initial_state=initial_state)
model = keras.models.Model([inputs] + initial_state, output)
model.compile(
loss='categorical_crossentropy',
optimizer=gradient_descent.GradientDescentOptimizer(0.01))
inputs = np.random.random((num_samples, timesteps, embedding_dim))
initial_state = [
np.random.random((num_samples, units)) for _ in range(num_states)
]
targets = np.random.random((num_samples, units))
model.train_on_batch([inputs] + initial_state, targets)
def test_return_state(self):
num_states = 2
timesteps = 3
embedding_dim = 4
units = 3
num_samples = 2
inputs = keras.Input(batch_shape=(num_samples, timesteps, embedding_dim))
masked = keras.layers.Masking()(inputs)
layer = rnn.LSTM(units, return_state=True, stateful=True)
outputs = layer(masked)
state = outputs[1:]
assert len(state) == num_states
model = keras.models.Model(inputs, state[0])
inputs = np.random.random((num_samples, timesteps, embedding_dim))
state = model.predict(inputs)
self.assertAllClose(keras.backend.eval(layer.states[0]), state, atol=1e-4)
def test_state_reuse(self):
timesteps = 3
embedding_dim = 4
units = 3
num_samples = 2
inputs = keras.Input(batch_shape=(num_samples, timesteps, embedding_dim))
layer = rnn.LSTM(
units, return_state=True, return_sequences=True)
outputs = layer(inputs)
output, state = outputs[0], outputs[1:]
output = rnn.LSTM(units)(output, initial_state=state)
model = keras.models.Model(inputs, output)
inputs = np.random.random((num_samples, timesteps, embedding_dim))
model.predict(inputs)
def test_initial_states_as_other_inputs(self):
timesteps = 3
embedding_dim = 4
units = 3
num_samples = 2
num_states = 2
layer_class = rnn.LSTM
# Test with Keras tensor
main_inputs = keras.Input((timesteps, embedding_dim))
initial_state = [keras.Input((units,)) for _ in range(num_states)]
inputs = [main_inputs] + initial_state
layer = layer_class(units)
output = layer(inputs)
assert initial_state[0] in layer._inbound_nodes[0].input_tensors
model = keras.models.Model(inputs, output)
model.compile(
loss='categorical_crossentropy',
optimizer=gradient_descent.GradientDescentOptimizer(0.01))
main_inputs = np.random.random((num_samples, timesteps, embedding_dim))
initial_state = [
np.random.random((num_samples, units)) for _ in range(num_states)
]
targets = np.random.random((num_samples, units))
model.train_on_batch([main_inputs] + initial_state, targets)
@test_util.run_v2_only
def test_lstm_v2_feature_parity_with_canonical_lstm(self):
input_shape = 10
rnn_state_size = 8
timestep = 4
batch = 20
(x_train, y_train), _ = testing_utils.get_test_data(
train_samples=batch,
test_samples=0,
input_shape=(timestep, input_shape),
num_classes=rnn_state_size,
random_seed=random_seed.DEFAULT_GRAPH_SEED)
y_train = keras.utils.to_categorical(y_train, rnn_state_size)
# For the last batch item of the test data, we filter out the last
# timestep to simulate the variable length sequence and masking test.
x_train[-2:, -1, :] = 0.0
y_train[-2:] = 0
inputs = keras.layers.Input(
shape=[timestep, input_shape], dtype=dtypes.float32)
masked_input = keras.layers.Masking()(inputs)
lstm_layer = rnn_v1.LSTM(rnn_state_size,
recurrent_activation='sigmoid')
output = lstm_layer(masked_input)
lstm_model = keras.models.Model(inputs, output)
weights = lstm_model.get_weights()
y_1 = lstm_model.predict(x_train)
lstm_model.compile('rmsprop', 'mse')
lstm_model.fit(x_train, y_train)
y_2 = lstm_model.predict(x_train)
with test_util.device(use_gpu=True):
cudnn_layer = rnn.LSTM(rnn_state_size)
cudnn_model = keras.models.Model(inputs, cudnn_layer(masked_input))
cudnn_model.set_weights(weights)
y_3 = cudnn_model.predict(x_train)
cudnn_model.compile('rmsprop', 'mse')
cudnn_model.fit(x_train, y_train)
y_4 = cudnn_model.predict(x_train)
self.assertAllClose(y_1, y_3, rtol=1e-5, atol=2e-5)
self.assertAllClose(y_2, y_4, rtol=1e-5, atol=2e-5)
@parameterized.named_parameters(('v0', 0), ('v1', 1), ('v2', 2))
def test_implementation_mode_LSTM(self, implementation_mode):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
testing_utils.layer_test(
rnn.LSTM,
kwargs={
'units': units,
'implementation': implementation_mode
},
input_shape=(num_samples, timesteps, embedding_dim))
layer_class = rnn.LSTM
k_constraint = keras.constraints.max_norm(0.01)
r_constraint = keras.constraints.max_norm(0.01)
b_constraint = keras.constraints.max_norm(0.01)
layer = layer_class(
5,
return_sequences=False,
weights=None,
input_shape=(None, embedding_dim),
kernel_constraint=k_constraint,
recurrent_constraint=r_constraint,
bias_constraint=b_constraint)
layer.build((None, None, embedding_dim))
self.assertEqual(layer.cell.kernel.constraint, k_constraint)
self.assertEqual(layer.cell.recurrent_kernel.constraint, r_constraint)
self.assertEqual(layer.cell.bias.constraint, b_constraint)
layer_class = rnn.LSTM
inputs = np.random.random((2, 3, 4))
targets = np.abs(np.random.random((2, 3, 5)))
targets /= targets.sum(axis=-1, keepdims=True)
model = keras.models.Sequential()
model.add(keras.layers.Masking(input_shape=(3, 4)))
model.add(layer_class(units=5, return_sequences=True, unroll=False))
model.compile(
loss='categorical_crossentropy',
optimizer=gradient_descent.GradientDescentOptimizer(0.01))
model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1)
def test_masking_with_stacking_LSTM(self):
inputs = np.random.random((2, 3, 4))
targets = np.abs(np.random.random((2, 3, 5)))
targets /= targets.sum(axis=-1, keepdims=True)
model = keras.models.Sequential()
model.add(keras.layers.Masking(input_shape=(3, 4)))
model.add(rnn.LSTM(10, return_sequences=True, unroll=False))
model.add(rnn.LSTM(5, return_sequences=True, unroll=False))
model.compile(
loss='categorical_crossentropy',
optimizer=gradient_descent.GradientDescentOptimizer(0.01))
model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1)
@parameterized.named_parameters(
# test_name, time_major, go_backwards
('normal', False, False),
('time_major', True, False),
('go_backwards', False, True),
('both', True, True),
)
def test_time_major_and_go_backward(self, time_major, go_backwards):
input_shape = 10
rnn_state_size = 8
timestep = 4
batch = 100
x_train = np.random.random((batch, timestep, input_shape))
def build_model(layer_cls):
inputs = keras.layers.Input(
shape=[timestep, input_shape], dtype=dtypes.float32)
layer = layer_cls(rnn_state_size,
recurrent_activation='sigmoid',
time_major=time_major,
return_sequences=True,
go_backwards=go_backwards)
if time_major:
converted_input = keras.layers.Lambda(
lambda t: array_ops.transpose(t, [1, 0, 2]))(inputs)
outputs = layer(converted_input)
outputs = keras.layers.Lambda(
lambda t: array_ops.transpose(t, [1, 0, 2]))(outputs)
else:
outputs = layer(inputs)
return keras.models.Model(inputs, outputs)
lstm_model = build_model(rnn_v1.LSTM)
y_ref = lstm_model.predict(x_train)
weights = lstm_model.get_weights()
lstm_v2_model = build_model(rnn.LSTM)
lstm_v2_model.set_weights(weights)
y = lstm_v2_model.predict(x_train)
self.assertAllClose(y, y_ref)
input_shape = 10
rnn_state_size = 8
output_shape = 8
timestep = 4
batch = 100
epoch = 10
(x_train, y_train), _ = testing_utils.get_test_data(
train_samples=batch,
test_samples=0,
input_shape=(timestep, input_shape),
num_classes=output_shape)
y_train = keras.utils.to_categorical(y_train, output_shape)
layer = rnn.LSTM(rnn_state_size)
inputs = keras.layers.Input(
shape=[timestep, input_shape], dtype=dtypes.float32)
outputs = layer(inputs)
model = keras.models.Model(inputs, outputs)
model.compile('rmsprop', loss='mse')
model.fit(x_train, y_train, epochs=epoch)
model.evaluate(x_train, y_train)
model.predict(x_train)
@parameterized.named_parameters(
# test_name, use_bias, bias_initializer, activation
('normal', True, 'zeros'),
('no_bias', False, 'zeros'),
('random_bias', True, 'random_uniform'),
)
def test_lstm_model_save_load(self, use_bias, bias_initializer):
temp_dir = self.get_temp_dir()
self.addCleanup(shutil.rmtree, temp_dir)
h5_path = os.path.join(temp_dir, 'test.h5')
batch = 10
timestep = 3
input_dim = 5
units = 2
x = np.random.random((batch, timestep, input_dim))
def build_model():
inputs = keras.layers.Input(
shape=[timestep, input_dim], dtype=dtypes.float32)
layer = rnn.LSTM(
units,
use_bias=use_bias,
bias_initializer=bias_initializer)
output = layer(inputs)
return keras.models.Model(inputs, output), layer
model, layer = build_model()
y_ref = model.predict(x)
model.save_weights(h5_path)
cloned_model, new_layer = build_model()
cloned_model.load_weights(h5_path)
y = cloned_model.predict(x)
self.assertAllClose(y, y_ref)
self.assertAllClose(layer.get_weights(), new_layer.get_weights())
def test_lstm_output_on_multiple_kernel(self):
input_shape = 10
rnn_state_size = 8
timestep = 4
batch = 100
x_train = np.random.random((batch, timestep, input_shape))
inputs = keras.layers.Input(
shape=[timestep, input_shape], dtype=dtypes.float32)
with test_util.device(use_gpu=False):
layer = rnn.LSTM(rnn_state_size)
output = layer(inputs)
cpu_model = keras.models.Model(inputs, output)
weights = cpu_model.get_weights()
y_1 = cpu_model.predict(x_train)
with test_util.device(use_gpu=True):
layer = rnn.LSTM(rnn_state_size)
output = layer(inputs)
gpu_model = keras.models.Model(inputs, output)
gpu_model.set_weights(weights)
y_2 = gpu_model.predict(x_train)
# Note that CuDNN uses 'sigmoid' as activation, so the LSTM V2 uses
# 'sigmoid' as default. Construct the canonical LSTM with sigmoid to achieve
# the same output.
with test_util.device(use_gpu=True):
layer = rnn_v1.LSTM(rnn_state_size, recurrent_activation='sigmoid')
output = layer(inputs)
canonical_model = keras.models.Model(inputs, output)
# Remove the extra cudnn bias since canonical lstm will not use it.
canonical_model.set_weights(weights[:3])
y_3 = canonical_model.predict(x_train)
self.assertAllClose(y_1, y_2)
self.assertAllClose(y_2, y_3)
def test_return_sequences_LSTM(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
testing_utils.layer_test(
rnn.LSTM,
kwargs={
'units': units,
'return_sequences': True
},
input_shape=(num_samples, timesteps, embedding_dim))
def test_float64_LSTM(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
testing_utils.layer_test(
rnn.LSTM,
kwargs={
'units': units,
'return_sequences': True,
'dtype': 'float64'
},
input_shape=(num_samples, timesteps, embedding_dim),
input_dtype='float64')
def test_regularizers_LSTM(self):
embedding_dim = 4
layer_class = rnn.LSTM
layer = layer_class(
5,
return_sequences=False,
weights=None,
input_shape=(None, embedding_dim),
kernel_regularizer=keras.regularizers.l1(0.01),
recurrent_regularizer=keras.regularizers.l1(0.01),
bias_regularizer='l2',
activity_regularizer='l1')
layer.build((None, None, 2))
self.assertEqual(len(layer.losses), 3)
x = keras.backend.variable(np.ones((2, 3, 2)))
layer(x)
if context.executing_eagerly():
self.assertEqual(len(layer.losses), 4)
else:
self.assertEqual(len(layer.get_losses_for(x)), 1)
def test_statefulness_LSTM(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
layer_class = rnn.LSTM
model = keras.models.Sequential()
model.add(
keras.layers.Embedding(
4,
embedding_dim,
mask_zero=True,
input_length=timesteps,
batch_input_shape=(num_samples, timesteps)))
layer = layer_class(
units, return_sequences=False, stateful=True, weights=None)
model.add(layer)
model.compile(
optimizer=gradient_descent.GradientDescentOptimizer(0.01),
loss='mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
out1 = model.predict(np.ones((num_samples, timesteps)))
self.assertEqual(out1.shape, (num_samples, units))
# train once so that the states change
model.train_on_batch(
np.ones((num_samples, timesteps)), np.ones((num_samples, units)))
out2 = model.predict(np.ones((num_samples, timesteps)))
# if the state is not reset, output should be different
self.assertNotEqual(out1.max(), out2.max())
# check that output changes after states are reset
# (even though the model itself didn't change)
layer.reset_states()
out3 = model.predict(np.ones((num_samples, timesteps)))
self.assertNotEqual(out2.max(), out3.max())
# check that container-level reset_states() works
model.reset_states()
out4 = model.predict(np.ones((num_samples, timesteps)))
self.assertAllClose(out3, out4, atol=1e-5)
# check that the call to `predict` updated the states
out5 = model.predict(np.ones((num_samples, timesteps)))
self.assertNotEqual(out4.max(), out5.max())
# Check masking
layer.reset_states()
left_padded_input = np.ones((num_samples, timesteps))
left_padded_input[0, :1] = 0
left_padded_input[1, :2] = 0
out6 = model.predict(left_padded_input)
layer.reset_states()
right_padded_input = np.ones((num_samples, timesteps))
right_padded_input[0, -1:] = 0
right_padded_input[1, -2:] = 0
out7 = model.predict(right_padded_input)
layer.reset_states()
mix_padded_input = np.ones((num_samples, timesteps))
mix_padded_input[0, 1] = 0
mix_padded_input[1, 0] = 0
mix_padded_input[1, 2] = 0
out8 = model.predict(mix_padded_input)
self.assertAllClose(out7, out6, atol=1e-5)
self.assertAllClose(out8, out7, atol=1e-5)
def test_stateful_LSTM_training(self):
# See b/123587692 for more context.
vocab_size = 20
embedding_dim = 10
batch_size = 8
timestep = 12
units = 5
x = np.random.randint(0, vocab_size, size=(batch_size, timestep))
y = np.random.randint(0, vocab_size, size=(batch_size, timestep))
model = keras.Sequential([
keras.layers.Embedding(vocab_size, embedding_dim,
batch_input_shape=[batch_size, timestep]),
rnn.LSTM(units, return_sequences=True, stateful=True),
keras.layers.Dense(vocab_size)
])
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, y, epochs=1, shuffle=False)
def test_dropout_LSTM(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
testing_utils.layer_test(
rnn.LSTM,
kwargs={
'units': units,
'dropout': 0.1,
'recurrent_dropout': 0.1
},
input_shape=(num_samples, timesteps, embedding_dim))
def test_bidirectional(self):
batch = 128
timestep = 20
vocab_size = 1000
model = keras.Sequential([
keras.layers.Embedding(vocab_size, 64),
keras.layers.Bidirectional(rnn.LSTM(
64, return_sequences=True)),
keras.layers.Bidirectional(rnn.LSTM(32)),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(1, activation='sigmoid')
])
model.compile(loss='binary_crossentropy',
optimizer='adam',
metrics=['accuracy'])
x = np.random.randint(0, vocab_size, size=(batch, timestep))
y = np.random.randint(0, 1, size=(batch))
model.fit(x, y, epochs=1, shuffle=False)
model.evaluate(x, y)
model.predict(x)
@test_util.run_v2_only
def test_explicit_device_with_go_backward_and_mask(self):
batch_size = 8
timestep = 7
masksteps = 5
units = 4
inputs = np.random.randn(batch_size, timestep, units).astype(np.float32)
mask = np.ones((batch_size, timestep)).astype(np.bool)
mask[:, masksteps:] = 0
# Test for V1 behavior.
lstm_v1 = rnn_v1.LSTM(units, return_sequences=True, go_backwards=True)
with test_util.device(use_gpu=True):
outputs_masked_v1 = lstm_v1(inputs, mask=constant_op.constant(mask))
outputs_trimmed_v1 = lstm_v1(inputs[:, :masksteps])
self.assertAllClose(outputs_masked_v1[:, -masksteps:], outputs_trimmed_v1)
# Test for V2 behavior.
lstm = rnn.LSTM(units, return_sequences=True, go_backwards=True)
with test_util.device(use_gpu=True):
outputs_masked = lstm(inputs, mask=constant_op.constant(mask))
outputs_trimmed = lstm(inputs[:, :masksteps])
self.assertAllClose(outputs_masked[:, -masksteps:], outputs_trimmed)
@test_util.run_deprecated_v1
def test_v1_session_behavior(self):
# See b/139132348 for more details.
x = np.random.uniform(size=(100, 4, 8))
y = np.random.uniform(size=(100, 1))
dataset = dataset_ops.Dataset.from_tensor_slices(
(x, y)).shuffle(100).batch(32)
inp = keras.layers.Input(shape=(4, 8))
layer = rnn.LSTM(1)(inp)
layer = keras.layers.Dense(1)(layer)
model = keras.models.Model(inp, layer)
model.compile(loss='mse', optimizer='sgd')
model.fit(dataset)
@keras_parameterized.run_all_keras_modes(config=_config)
class LSTMGraphRewriteTest(keras_parameterized.TestCase):
input_shape = 10
output_shape = 8
rnn_state_size = 8
timestep = 4
batch = 100
epoch = 1
def _test_runtime_with_model(self, model):
(x_train, y_train), _ = testing_utils.get_test_data(
train_samples=self.batch,
test_samples=0,
input_shape=(self.timestep, self.input_shape),
num_classes=self.output_shape)
y_train = keras.utils.to_categorical(y_train, self.output_shape)
model.compile(
optimizer='sgd',
loss=['categorical_crossentropy', None],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
existing_loss = 0
for _ in range(self.epoch):
history = model.fit(x_train, y_train)
loss_value = history.history['loss'][0]
self.assertNotEqual(existing_loss, loss_value)
existing_loss = loss_value
_, runtime_value = model.predict(x_train)
if test.is_gpu_available():
self.assertEqual(runtime_value[0], rnn._RUNTIME_GPU)
else:
self.assertEqual(runtime_value[0], rnn._RUNTIME_CPU)
@test_util.run_v2_only
def test_LSTM_runtime(self):
layer = rnn.LSTM(self.rnn_state_size, return_runtime=True)
inputs = keras.layers.Input(
shape=[self.timestep, self.input_shape], dtype=dtypes.float32)
outputs, runtime = layer(inputs)
# Expand the runtime so that it is a 1D tensor instead of scalar.
# TF model does not work with scalar model output, specially during
# aggregation.
runtime = keras.layers.Lambda(
lambda x: array_ops.expand_dims(x, axis=-1))(runtime)
model = keras.models.Model(inputs=inputs, outputs=[outputs, runtime])
self._test_runtime_with_model(model)
@test_util.run_v2_only
def test_LSTM_runtime_with_mask(self):
# Masking will affect which backend is selected based on whether the mask
# is strictly right padded.
layer = rnn.LSTM(self.rnn_state_size, return_runtime=True)
inputs = keras.layers.Input(
shape=[self.timestep, self.input_shape], dtype=dtypes.float32)
masked_inputs = keras.layers.Masking()(inputs)
outputs, runtime = layer(masked_inputs)
# Expand the runtime so that it is a 1D tensor instead of scalar.
# TF model does not work with scalar model output, specially during
# aggregation.
runtime = keras.layers.Lambda(
lambda x: array_ops.expand_dims(x, axis=-1))(runtime)
model = keras.models.Model(inputs=inputs, outputs=[outputs, runtime])
(x_train, y_train), _ = testing_utils.get_test_data(
train_samples=self.batch,
test_samples=0,
input_shape=(self.timestep, self.input_shape),
num_classes=self.output_shape)
y_train = keras.utils.to_categorical(y_train, self.output_shape)
model.compile(
optimizer='sgd',
loss=['categorical_crossentropy', None],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x_train, y_train)
# Verify unpadded data.
_, runtime_value = model.predict(x_train)
if test.is_gpu_available():
self.assertEqual(runtime_value[0], rnn._RUNTIME_GPU)
else:
self.assertEqual(runtime_value[0], rnn._RUNTIME_CPU)
# Update x/y to be right padded by setting the last timestep to 0
x_train[:, -1, :] = 0
y_train[:, -1] = 0
_, runtime_value = model.predict(x_train)
if test.is_gpu_available():
self.assertEqual(runtime_value[0], rnn._RUNTIME_GPU)
else:
self.assertEqual(runtime_value[0], rnn._RUNTIME_CPU)
# Further update x/y to be mix padded (masks in the middle), and verify
# only cpu kernel can be selected.
x_train[:, -3, :] = 0
y_train[:, -3] = 0
_, runtime_value = model.predict(x_train)
self.assertEqual(runtime_value[0], rnn._RUNTIME_CPU)
@test_util.run_v2_only
def test_LSTM_runtime_with_cond(self):
# This test is to demonstrate the graph rewrite of grappler plugin under
# the condition that the function returns different number of internal
# states.
layer = rnn.LSTM(self.rnn_state_size, return_runtime=True)
inputs = keras.layers.Input(
shape=[self.timestep, self.input_shape], dtype=dtypes.float32)
zeros = array_ops.zeros([self.batch, self.output_shape])
dummy_runtime = rnn._runtime(rnn._RUNTIME_UNKNOWN)
a = constant_op.constant(0)
b = constant_op.constant(1)
# Will always run the lstm layer.
outputs, runtime = control_flow_ops.cond(
gen_math_ops.less(a, b),
lambda: layer(inputs),
lambda: (zeros, dummy_runtime))
# Expand the runtime so that it is a 1D tensor instead of scalar.
# TF model does not work with scalar model output, specially during
# aggregation.
runtime = keras.layers.Lambda(
lambda x: array_ops.expand_dims(x, axis=-1))(runtime)
model = keras.models.Model(inputs=inputs, outputs=[outputs, runtime])
self._test_runtime_with_model(model)
class LSTMPerformanceTest(test.Benchmark):
def _measure_performance(self, test_config, model, x_train, y_train):
batch = test_config['batch']
epoch = test_config['epoch']
warmup_epoch = test_config['warmup_epoch']
# warm up the model
model.fit(x_train, y_train, batch_size=batch, epochs=warmup_epoch)
start_time = time.time()
model.fit(x_train, y_train, batch_size=batch, epochs=epoch - warmup_epoch)
end_time = time.time()
return (end_time - start_time) / (epoch - warmup_epoch)
def _time_performance_run_cudnn_lstm(self, test_config, x_train, y_train):
# Get the performance number for standard Cudnn LSTM
input_shape = test_config['input_shape']
rnn_state_size = test_config['rnn_state_size']
timestep = test_config['timestep']
cudnn_lstm_layer = keras.layers.CuDNNLSTM(rnn_state_size)
inputs = keras.layers.Input(
shape=[timestep, input_shape], dtype=dtypes.float32)
outputs = cudnn_lstm_layer(inputs)
model = keras.models.Model(inputs, outputs)
model.compile('sgd', 'mse')
sec_per_epoch = self._measure_performance(
test_config, model, x_train, y_train)
logging.info('Average performance for %s per epoch is: %s',
'CuDNN LSTM', sec_per_epoch)
return sec_per_epoch
def _time_performance_run_unifed_lstm_gpu(
self, test_config, x_train, y_train):
# Get performance number for lstm_v2 with grappler swap the impl
input_shape = test_config['input_shape']
rnn_state_size = test_config['rnn_state_size']
timestep = test_config['timestep']
layer = rnn.LSTM(rnn_state_size)
inputs = keras.layers.Input(
shape=[timestep, input_shape], dtype=dtypes.float32)
outputs = layer(inputs)
model = keras.models.Model(inputs, outputs)
model.compile('sgd', 'mse')
sec_per_epoch = self._measure_performance(
test_config, model, x_train, y_train)
logging.info('Average performance for %s per epoch is: %s',
'LSTM V2', sec_per_epoch)
return sec_per_epoch
def _time_performance_run_normal_lstm(
self, test_config, x_train, y_train):
# Get performance number for standard LSTM on GPU.
input_shape = test_config['input_shape']
rnn_state_size = test_config['rnn_state_size']
timestep = test_config['timestep']
layer = rnn_v1.LSTM(rnn_state_size)
inputs = keras.layers.Input(
shape=[timestep, input_shape], dtype=dtypes.float32)
outputs = layer(inputs)
model = keras.models.Model(inputs, outputs)
model.compile('sgd', 'mse')
sec_per_epoch = self._measure_performance(
test_config, model, x_train, y_train)
logging.info('Average performance for %s per epoch is: %s',
'Normal LSTM', sec_per_epoch)
return sec_per_epoch
def _benchmark_performance_with_standard_cudnn_impl(self):
if not test.is_gpu_available():
self.skipTest('performance test will only run on GPU')
mode = 'eager' if context.executing_eagerly() else 'graph'
batch = 64
num_batch = 10
test_config = {
'input_shape': 128,
'rnn_state_size': 64,
'output_shape': 64,
'timestep': 50,
'batch': batch,
'epoch': 20,
# The performance for warmup epoch is ignored.
'warmup_epoch': 1,
}
(x_train, y_train), _ = testing_utils.get_test_data(
train_samples=(batch * num_batch),
test_samples=0,
input_shape=(test_config['timestep'], test_config['input_shape']),
num_classes=test_config['output_shape'])
y_train = keras.utils.to_categorical(y_train, test_config['output_shape'])
cudnn_sec_per_epoch = self._time_performance_run_cudnn_lstm(
test_config, x_train, y_train)
lstm_v2_sec_per_epoch = self._time_performance_run_unifed_lstm_gpu(
test_config, x_train, y_train)
normal_lstm_sec_per_epoch = self._time_performance_run_normal_lstm(
test_config, x_train, y_train)
cudnn_vs_v2 = cudnn_sec_per_epoch / lstm_v2_sec_per_epoch
v2_vs_normal = normal_lstm_sec_per_epoch / lstm_v2_sec_per_epoch
self.report_benchmark(name='keras_cudnn_lstm_' + mode,
wall_time=cudnn_sec_per_epoch,
iters=test_config['epoch'],
extras=test_config)
self.report_benchmark(name='keras_lstm_v2_' + mode,
wall_time=lstm_v2_sec_per_epoch,
iters=test_config['epoch'],
extras=test_config)
self.report_benchmark(name='keras_canonical_lstm_' + mode,
wall_time=normal_lstm_sec_per_epoch,
iters=test_config['epoch'],
extras=test_config)
logging.info('Expect the performance of LSTM V2 is within 80% of '
'CuDNN LSTM, got {0:.2f}%'.format(cudnn_vs_v2 * 100))
logging.info('Expect the performance of LSTM V2 is more than 5 times'
' of normal LSTM, got {0:.2f}'.format(v2_vs_normal))
def benchmark_performance_graph(self):
with context.graph_mode(), session_lib.Session(config=_config):
self._benchmark_performance_with_standard_cudnn_impl()
def benchmark_performance_eager(self):
with context.eager_mode():
self._benchmark_performance_with_standard_cudnn_impl()
if __name__ == '__main__':
test.main()