blob: cedccd61f301c8452098076146de81440061517a [file] [log] [blame]
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for layer wrappers."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import copy
from absl.testing import parameterized
import numpy as np
from tensorflow.python import keras
from tensorflow.python.eager import context
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import tensor_shape
from tensorflow.python.framework import test_util as tf_test_util
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import testing_utils
from tensorflow.python.keras.engine import base_layer_utils
from tensorflow.python.keras.layers.rnn_cell_wrapper_v2 import ResidualWrapper
from tensorflow.python.keras.utils import generic_utils
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops.ragged import ragged_concat_ops
from tensorflow.python.ops.ragged import ragged_factory_ops
from tensorflow.python.ops.ragged import ragged_tensor
from tensorflow.python.platform import test
from tensorflow.python.training.tracking import util as trackable_util
from tensorflow.python.util import object_identity
class _RNNCellWithConstants(keras.layers.Layer):
def __init__(self, units, constant_size, **kwargs):
self.units = units
self.state_size = units
self.constant_size = constant_size
super(_RNNCellWithConstants, self).__init__(**kwargs)
def build(self, input_shape):
self.input_kernel = self.add_weight(
shape=(input_shape[-1], self.units),
initializer='uniform',
name='kernel')
self.recurrent_kernel = self.add_weight(
shape=(self.units, self.units),
initializer='uniform',
name='recurrent_kernel')
self.constant_kernel = self.add_weight(
shape=(self.constant_size, self.units),
initializer='uniform',
name='constant_kernel')
self.built = True
def call(self, inputs, states, constants):
[prev_output] = states
[constant] = constants
h_input = keras.backend.dot(inputs, self.input_kernel)
h_state = keras.backend.dot(prev_output, self.recurrent_kernel)
h_const = keras.backend.dot(constant, self.constant_kernel)
output = h_input + h_state + h_const
return output, [output]
def get_config(self):
config = {'units': self.units, 'constant_size': self.constant_size}
base_config = super(_RNNCellWithConstants, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
class TimeDistributedTest(keras_parameterized.TestCase):
@tf_test_util.run_in_graph_and_eager_modes
def test_timedistributed_dense(self):
model = keras.models.Sequential()
model.add(
keras.layers.TimeDistributed(
keras.layers.Dense(2), input_shape=(3, 4)))
model.compile(optimizer='rmsprop', loss='mse')
model.fit(
np.random.random((10, 3, 4)),
np.random.random((10, 3, 2)),
epochs=1,
batch_size=10)
# test config
model.get_config()
# check whether the model variables are present in the
# trackable list of objects
checkpointed_objects = object_identity.ObjectIdentitySet(
trackable_util.list_objects(model))
for v in model.variables:
self.assertIn(v, checkpointed_objects)
def test_timedistributed_static_batch_size(self):
model = keras.models.Sequential()
model.add(
keras.layers.TimeDistributed(
keras.layers.Dense(2), input_shape=(3, 4), batch_size=10))
model.compile(optimizer='rmsprop', loss='mse')
model.fit(
np.random.random((10, 3, 4)),
np.random.random((10, 3, 2)),
epochs=1,
batch_size=10)
def test_timedistributed_invalid_init(self):
x = constant_op.constant(np.zeros((1, 1)).astype('float32'))
with self.assertRaisesRegexp(
ValueError,
'Please initialize `TimeDistributed` layer with a `Layer` instance.'):
keras.layers.TimeDistributed(x)
def test_timedistributed_conv2d(self):
with self.cached_session():
model = keras.models.Sequential()
model.add(
keras.layers.TimeDistributed(
keras.layers.Conv2D(5, (2, 2), padding='same'),
input_shape=(2, 4, 4, 3)))
model.add(keras.layers.Activation('relu'))
model.compile(optimizer='rmsprop', loss='mse')
model.train_on_batch(
np.random.random((1, 2, 4, 4, 3)), np.random.random((1, 2, 4, 4, 5)))
model = keras.models.model_from_json(model.to_json())
model.summary()
def test_timedistributed_stacked(self):
with self.cached_session():
model = keras.models.Sequential()
model.add(
keras.layers.TimeDistributed(
keras.layers.Dense(2), input_shape=(3, 4)))
model.add(keras.layers.TimeDistributed(keras.layers.Dense(3)))
model.add(keras.layers.Activation('relu'))
model.compile(optimizer='rmsprop', loss='mse')
model.fit(
np.random.random((10, 3, 4)),
np.random.random((10, 3, 3)),
epochs=1,
batch_size=10)
def test_regularizers(self):
with self.cached_session():
model = keras.models.Sequential()
model.add(
keras.layers.TimeDistributed(
keras.layers.Dense(2, kernel_regularizer='l1',
activity_regularizer='l1'),
input_shape=(3, 4)))
model.add(keras.layers.Activation('relu'))
model.compile(optimizer='rmsprop', loss='mse')
self.assertEqual(len(model.losses), 2)
def test_TimeDistributed_batchnorm(self):
with self.cached_session():
# test that wrapped BN updates still work.
model = keras.models.Sequential()
model.add(keras.layers.TimeDistributed(
keras.layers.BatchNormalization(center=True, scale=True),
name='bn',
input_shape=(10, 2)))
model.compile(optimizer='rmsprop', loss='mse')
# Assert that mean and variance are 0 and 1.
td = model.layers[0]
self.assertAllClose(td.get_weights()[2], np.array([0, 0]))
assert np.array_equal(td.get_weights()[3], np.array([1, 1]))
# Train
model.train_on_batch(np.random.normal(loc=2, scale=2, size=(1, 10, 2)),
np.broadcast_to(np.array([0, 1]), (1, 10, 2)))
# Assert that mean and variance changed.
assert not np.array_equal(td.get_weights()[2], np.array([0, 0]))
assert not np.array_equal(td.get_weights()[3], np.array([1, 1]))
# Verify input_map has one mapping from inputs to reshaped inputs.
self.assertEqual(len(td._input_map.keys()), 1)
def test_TimeDistributed_trainable(self):
# test layers that need learning_phase to be set
x = keras.layers.Input(shape=(3, 2))
layer = keras.layers.TimeDistributed(keras.layers.BatchNormalization())
_ = layer(x)
self.assertEqual(len(layer.updates), 2)
self.assertEqual(len(layer.trainable_weights), 2)
layer.trainable = False
assert not layer.updates
assert not layer.trainable_weights
layer.trainable = True
assert len(layer.updates) == 2
assert len(layer.trainable_weights) == 2
def test_TimeDistributed_with_masked_embedding_and_unspecified_shape(self):
with self.cached_session():
# test with unspecified shape and Embeddings with mask_zero
model = keras.models.Sequential()
model.add(keras.layers.TimeDistributed(
keras.layers.Embedding(5, 6, mask_zero=True),
input_shape=(None, None))) # N by t_1 by t_2 by 6
model.add(keras.layers.TimeDistributed(
keras.layers.SimpleRNN(7, return_sequences=True)))
model.add(keras.layers.TimeDistributed(
keras.layers.SimpleRNN(8, return_sequences=False)))
model.add(keras.layers.SimpleRNN(1, return_sequences=False))
model.compile(optimizer='rmsprop', loss='mse')
model_input = np.random.randint(low=1, high=5, size=(10, 3, 4),
dtype='int32')
for i in range(4):
model_input[i, i:, i:] = 0
model.fit(model_input,
np.random.random((10, 1)), epochs=1, batch_size=10)
mask_outputs = [model.layers[0].compute_mask(model.input)]
for layer in model.layers[1:]:
mask_outputs.append(layer.compute_mask(layer.input, mask_outputs[-1]))
func = keras.backend.function([model.input], mask_outputs[:-1])
mask_outputs_val = func([model_input])
ref_mask_val_0 = model_input > 0 # embedding layer
ref_mask_val_1 = ref_mask_val_0 # first RNN layer
ref_mask_val_2 = np.any(ref_mask_val_1, axis=-1) # second RNN layer
ref_mask_val = [ref_mask_val_0, ref_mask_val_1, ref_mask_val_2]
for i in range(3):
self.assertAllEqual(mask_outputs_val[i], ref_mask_val[i])
self.assertIs(mask_outputs[-1], None) # final layer
def test_TimeDistributed_with_masking_layer(self):
with self.cached_session():
# test with Masking layer
model = keras.models.Sequential()
model.add(keras.layers.TimeDistributed(keras.layers.Masking(
mask_value=0.,), input_shape=(None, 4)))
model.add(keras.layers.TimeDistributed(keras.layers.Dense(5)))
model.compile(optimizer='rmsprop', loss='mse')
model_input = np.random.randint(low=1, high=5, size=(10, 3, 4))
for i in range(4):
model_input[i, i:, :] = 0.
model.compile(optimizer='rmsprop', loss='mse')
model.fit(model_input,
np.random.random((10, 3, 5)), epochs=1, batch_size=6)
mask_outputs = [model.layers[0].compute_mask(model.input)]
mask_outputs += [model.layers[1].compute_mask(model.layers[1].input,
mask_outputs[-1])]
func = keras.backend.function([model.input], mask_outputs)
mask_outputs_val = func([model_input])
self.assertEqual((mask_outputs_val[0]).all(),
model_input.all())
self.assertEqual((mask_outputs_val[1]).all(),
model_input.all())
def test_TimeDistributed_with_different_time_shapes(self):
time_dist = keras.layers.TimeDistributed(keras.layers.Dense(5))
ph_1 = keras.backend.placeholder(shape=(None, 10, 13))
out_1 = time_dist(ph_1)
self.assertEqual(out_1.shape.as_list(), [None, 10, 5])
ph_2 = keras.backend.placeholder(shape=(None, 1, 13))
out_2 = time_dist(ph_2)
self.assertEqual(out_2.shape.as_list(), [None, 1, 5])
ph_3 = keras.backend.placeholder(shape=(None, 1, 18))
with self.assertRaisesRegexp(ValueError, 'is incompatible with layer'):
time_dist(ph_3)
def test_TimeDistributed_with_invalid_dimensions(self):
time_dist = keras.layers.TimeDistributed(keras.layers.Dense(5))
ph = keras.backend.placeholder(shape=(None, 10))
with self.assertRaisesRegexp(
ValueError,
'`TimeDistributed` Layer should be passed an `input_shape `'):
time_dist(ph)
@tf_test_util.run_in_graph_and_eager_modes
def test_TimeDistributed_reshape(self):
class NoReshapeLayer(keras.layers.Layer):
def call(self, inputs):
return inputs
# Built-in layers that aren't stateful use the reshape implementation.
td1 = keras.layers.TimeDistributed(keras.layers.Dense(5))
self.assertTrue(td1._always_use_reshape)
# Built-in layers that are stateful don't use the reshape implementation.
td2 = keras.layers.TimeDistributed(
keras.layers.RNN(keras.layers.SimpleRNNCell(10), stateful=True))
self.assertFalse(td2._always_use_reshape)
# Custom layers are not whitelisted for the fast reshape implementation.
td3 = keras.layers.TimeDistributed(NoReshapeLayer())
self.assertFalse(td3._always_use_reshape)
@tf_test_util.run_in_graph_and_eager_modes
def test_TimeDistributed_output_shape_return_types(self):
class TestLayer(keras.layers.Layer):
def call(self, inputs):
return array_ops.concat([inputs, inputs], axis=-1)
def compute_output_shape(self, input_shape):
output_shape = tensor_shape.TensorShape(input_shape).as_list()
output_shape[-1] = output_shape[-1] * 2
output_shape = tensor_shape.TensorShape(output_shape)
return output_shape
class TestListLayer(TestLayer):
def compute_output_shape(self, input_shape):
shape = super(TestListLayer, self).compute_output_shape(input_shape)
return shape.as_list()
class TestTupleLayer(TestLayer):
def compute_output_shape(self, input_shape):
shape = super(TestTupleLayer, self).compute_output_shape(input_shape)
return tuple(shape.as_list())
# Layers can specify output shape as list/tuple/TensorShape
test_layers = [TestLayer, TestListLayer, TestTupleLayer]
for layer in test_layers:
input_layer = keras.layers.TimeDistributed(layer())
inputs = keras.backend.placeholder(shape=(None, 2, 4))
output = input_layer(inputs)
self.assertEqual(output.shape.as_list(), [None, 2, 8])
self.assertEqual(
input_layer.compute_output_shape([None, 2, 4]).as_list(),
[None, 2, 8])
@keras_parameterized.run_all_keras_modes
def test_TimeDistributed_with_mask_first_implementation(self):
np.random.seed(100)
rnn_layer = keras.layers.LSTM(4, return_sequences=True, stateful=True)
data = np.array([[[[1.0], [1.0]], [[0.0], [1.0]]],
[[[1.0], [0.0]], [[1.0], [1.0]]],
[[[1.0], [0.0]], [[1.0], [1.0]]]])
x = keras.layers.Input(shape=(2, 2, 1), batch_size=3)
x_masking = keras.layers.Masking()(x)
y = keras.layers.TimeDistributed(rnn_layer)(x_masking)
model_1 = keras.models.Model(x, y)
model_1.compile(
'rmsprop',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
output_with_mask = model_1.predict(data, steps=1)
y = keras.layers.TimeDistributed(rnn_layer)(x)
model_2 = keras.models.Model(x, y)
model_2.compile(
'rmsprop',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
output = model_2.predict(data, steps=1)
self.assertNotAllClose(output_with_mask, output, atol=1e-7)
@keras_parameterized.run_all_keras_modes
@parameterized.named_parameters(
*tf_test_util.generate_combinations_with_testcase_name(
layer=[keras.layers.LSTM,
keras.layers.Dense]))
def test_TimeDistributed_with_ragged_input(self, layer):
np.random.seed(100)
layer = layer(4)
ragged_data = ragged_factory_ops.constant(
[[[[1.0], [1.0]], [[2.0], [2.0]]],
[[[4.0], [4.0]], [[5.0], [5.0]], [[6.0], [6.0]]],
[[[7.0], [7.0]], [[8.0], [8.0]], [[9.0], [9.0]]]],
ragged_rank=1)
x_ragged = keras.Input(shape=(None, 2, 1), dtype='float32', ragged=True)
y_ragged = keras.layers.TimeDistributed(layer)(x_ragged)
model_1 = keras.models.Model(x_ragged, y_ragged)
output_ragged = model_1.predict(ragged_data, steps=1)
x_dense = keras.Input(shape=(None, 2, 1), dtype='float32')
masking = keras.layers.Masking()(x_dense)
y_dense = keras.layers.TimeDistributed(layer)(masking)
model_2 = keras.models.Model(x_dense, y_dense)
dense_data = ragged_data.to_tensor()
output_dense = model_2.predict(dense_data, steps=1)
output_ragged = ragged_tensor.convert_to_tensor_or_ragged_tensor(
output_ragged, name='tensor')
self.assertAllEqual(output_ragged.to_tensor(), output_dense)
@keras_parameterized.run_all_keras_modes
def test_TimeDistributed_with_ragged_input_with_batch_size(self):
np.random.seed(100)
layer = keras.layers.Dense(16)
ragged_data = ragged_factory_ops.constant(
[[[[1.0], [1.0]], [[2.0], [2.0]]],
[[[4.0], [4.0]], [[5.0], [5.0]], [[6.0], [6.0]]],
[[[7.0], [7.0]], [[8.0], [8.0]], [[9.0], [9.0]]]],
ragged_rank=1)
# Use the first implementation by specifying batch_size
x_ragged = keras.Input(shape=(None, 2, 1), batch_size=3, dtype='float32',
ragged=True)
y_ragged = keras.layers.TimeDistributed(layer)(x_ragged)
model_1 = keras.models.Model(x_ragged, y_ragged)
output_ragged = model_1.predict(ragged_data, steps=1)
x_dense = keras.Input(shape=(None, 2, 1), batch_size=3, dtype='float32')
masking = keras.layers.Masking()(x_dense)
y_dense = keras.layers.TimeDistributed(layer)(masking)
model_2 = keras.models.Model(x_dense, y_dense)
dense_data = ragged_data.to_tensor()
output_dense = model_2.predict(dense_data, steps=1)
output_ragged = ragged_tensor.convert_to_tensor_or_ragged_tensor(
output_ragged, name='tensor')
self.assertAllEqual(output_ragged.to_tensor(), output_dense)
@tf_test_util.run_all_in_graph_and_eager_modes
class BidirectionalTest(test.TestCase, parameterized.TestCase):
def test_bidirectional(self):
rnn = keras.layers.SimpleRNN
samples = 2
dim = 2
timesteps = 2
output_dim = 2
with self.cached_session():
for mode in ['sum', 'concat', 'ave', 'mul']:
x = np.random.random((samples, timesteps, dim))
target_dim = 2 * output_dim if mode == 'concat' else output_dim
y = np.random.random((samples, target_dim))
# test with Sequential model
model = keras.models.Sequential()
model.add(
keras.layers.Bidirectional(
rnn(output_dim), merge_mode=mode, input_shape=(timesteps, dim)))
model.compile(optimizer='rmsprop', loss='mse')
model.fit(x, y, epochs=1, batch_size=1)
# check whether the model variables are present in the
# trackable list of objects
checkpointed_objects = object_identity.ObjectIdentitySet(
trackable_util.list_objects(model))
for v in model.variables:
self.assertIn(v, checkpointed_objects)
# test compute output shape
ref_shape = model.layers[-1].output.shape
shape = model.layers[-1].compute_output_shape(
(None, timesteps, dim))
self.assertListEqual(shape.as_list(), ref_shape.as_list())
# test config
model.get_config()
model = keras.models.model_from_json(model.to_json())
model.summary()
def test_bidirectional_invalid_init(self):
x = constant_op.constant(np.zeros((1, 1)).astype('float32'))
with self.assertRaisesRegexp(
ValueError,
'Please initialize `Bidirectional` layer with a `Layer` instance.'):
keras.layers.Bidirectional(x)
def test_bidirectional_weight_loading(self):
rnn = keras.layers.SimpleRNN
samples = 2
dim = 2
timesteps = 2
output_dim = 2
with self.cached_session():
x = np.random.random((samples, timesteps, dim))
model = keras.models.Sequential()
model.add(
keras.layers.Bidirectional(
rnn(output_dim), input_shape=(timesteps, dim)))
y_ref = model.predict(x)
weights = model.layers[-1].get_weights()
model.layers[-1].set_weights(weights)
y = model.predict(x)
self.assertAllClose(y, y_ref)
def test_bidirectional_stacked(self):
# test stacked bidirectional layers
rnn = keras.layers.SimpleRNN
samples = 2
dim = 2
timesteps = 2
output_dim = 2
mode = 'sum'
with self.cached_session():
x = np.random.random((samples, timesteps, dim))
target_dim = 2 * output_dim if mode == 'concat' else output_dim
y = np.random.random((samples, target_dim))
model = keras.models.Sequential()
model.add(
keras.layers.Bidirectional(
rnn(output_dim, return_sequences=True),
merge_mode=mode,
input_shape=(timesteps, dim)))
model.add(keras.layers.Bidirectional(rnn(output_dim), merge_mode=mode))
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, epochs=1, batch_size=1)
# test with functional API
inputs = keras.layers.Input((timesteps, dim))
output = keras.layers.Bidirectional(
rnn(output_dim), merge_mode=mode)(inputs)
model = keras.models.Model(inputs, output)
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, epochs=1, batch_size=1)
def test_bidirectional_statefulness(self):
# Bidirectional and stateful
rnn = keras.layers.SimpleRNN
samples = 2
dim = 2
timesteps = 2
output_dim = 2
mode = 'sum'
with self.cached_session():
x = np.random.random((samples, timesteps, dim))
target_dim = 2 * output_dim if mode == 'concat' else output_dim
y = np.random.random((samples, target_dim))
inputs = keras.layers.Input(batch_shape=(1, timesteps, dim))
output = keras.layers.Bidirectional(
rnn(output_dim, stateful=True), merge_mode=mode)(inputs)
model = keras.models.Model(inputs, output)
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, epochs=1, batch_size=1)
def test_Bidirectional_merged_value(self):
rnn = keras.layers.LSTM
samples = 2
dim = 5
timesteps = 3
units = 3
x = [np.random.rand(samples, timesteps, dim)]
with self.cached_session():
for merge_mode in ['sum', 'mul', 'ave', 'concat', None]:
if merge_mode == 'sum':
merge_func = lambda y, y_rev: y + y_rev
elif merge_mode == 'mul':
merge_func = lambda y, y_rev: y * y_rev
elif merge_mode == 'ave':
merge_func = lambda y, y_rev: (y + y_rev) / 2
elif merge_mode == 'concat':
merge_func = lambda y, y_rev: np.concatenate((y, y_rev), axis=-1)
else:
merge_func = lambda y, y_rev: [y, y_rev]
# basic case
inputs = keras.Input((timesteps, dim))
layer = keras.layers.Bidirectional(
rnn(units, return_sequences=True), merge_mode=merge_mode)
f_merged = keras.backend.function([inputs], _to_list(layer(inputs)))
f_forward = keras.backend.function([inputs],
[layer.forward_layer(inputs)])
f_backward = keras.backend.function(
[inputs],
[keras.backend.reverse(layer.backward_layer(inputs), 1)])
y_merged = f_merged(x)
y_expected = _to_list(merge_func(f_forward(x)[0], f_backward(x)[0]))
assert len(y_merged) == len(y_expected)
for x1, x2 in zip(y_merged, y_expected):
self.assertAllClose(x1, x2, atol=1e-5)
# test return_state
inputs = keras.Input((timesteps, dim))
layer = keras.layers.Bidirectional(
rnn(units, return_state=True), merge_mode=merge_mode)
f_merged = keras.backend.function([inputs], layer(inputs))
f_forward = keras.backend.function([inputs],
layer.forward_layer(inputs))
f_backward = keras.backend.function([inputs],
layer.backward_layer(inputs))
n_states = len(layer.layer.states)
y_merged = f_merged(x)
y_forward = f_forward(x)
y_backward = f_backward(x)
y_expected = _to_list(merge_func(y_forward[0], y_backward[0]))
assert len(y_merged) == len(y_expected) + n_states * 2
for x1, x2 in zip(y_merged, y_expected):
self.assertAllClose(x1, x2, atol=1e-5)
y_merged = y_merged[-n_states * 2:]
y_forward = y_forward[-n_states:]
y_backward = y_backward[-n_states:]
for state_birnn, state_inner in zip(y_merged, y_forward + y_backward):
self.assertAllClose(state_birnn, state_inner, atol=1e-5)
def test_Bidirectional_dropout(self):
rnn = keras.layers.LSTM
samples = 2
dim = 5
timesteps = 3
units = 3
merge_mode = 'sum'
x = [np.random.rand(samples, timesteps, dim)]
with self.cached_session():
inputs = keras.Input((timesteps, dim))
wrapped = keras.layers.Bidirectional(
rnn(units, dropout=0.2, recurrent_dropout=0.2), merge_mode=merge_mode)
outputs = _to_list(wrapped(inputs, training=True))
inputs = keras.Input((timesteps, dim))
wrapped = keras.layers.Bidirectional(
rnn(units, dropout=0.2, return_state=True), merge_mode=merge_mode)
outputs = _to_list(wrapped(inputs))
model = keras.Model(inputs, outputs)
y1 = _to_list(model.predict(x))
y2 = _to_list(model.predict(x))
for x1, x2 in zip(y1, y2):
self.assertAllClose(x1, x2, atol=1e-5)
def test_Bidirectional_state_reuse(self):
rnn = keras.layers.LSTM
samples = 2
dim = 5
timesteps = 3
units = 3
with self.cached_session():
input1 = keras.layers.Input((timesteps, dim))
layer = keras.layers.Bidirectional(
rnn(units, return_state=True, return_sequences=True))
state = layer(input1)[1:]
# test passing invalid initial_state: passing a tensor
input2 = keras.layers.Input((timesteps, dim))
with self.assertRaises(ValueError):
keras.layers.Bidirectional(rnn(units))(input2, initial_state=state[0])
# test valid usage: passing a list
output = keras.layers.Bidirectional(rnn(units))(input2,
initial_state=state)
model = keras.models.Model([input1, input2], output)
assert len(model.layers) == 4
assert isinstance(model.layers[-1].input, list)
inputs = [np.random.rand(samples, timesteps, dim),
np.random.rand(samples, timesteps, dim)]
model.predict(inputs)
def test_Bidirectional_state_reuse_with_np_input(self):
# See https://github.com/tensorflow/tensorflow/issues/28761 for more detail.
rnn = keras.layers.LSTM
samples = 2
dim = 5
timesteps = 3
units = 3
with self.cached_session():
input1 = np.random.rand(samples, timesteps, dim).astype(np.float32)
layer = keras.layers.Bidirectional(
rnn(units, return_state=True, return_sequences=True))
state = layer(input1)[1:]
input2 = np.random.rand(samples, timesteps, dim).astype(np.float32)
keras.layers.Bidirectional(rnn(units))(input2, initial_state=state)
def test_Bidirectional_trainable(self):
# test layers that need learning_phase to be set
with self.cached_session():
x = keras.layers.Input(shape=(3, 2))
layer = keras.layers.Bidirectional(keras.layers.SimpleRNN(3))
_ = layer(x)
assert len(layer.trainable_weights) == 6
layer.trainable = False
assert not layer.trainable_weights
layer.trainable = True
assert len(layer.trainable_weights) == 6
def test_Bidirectional_updates(self):
if context.executing_eagerly():
self.skipTest('layer.updates is only available in graph mode.')
with self.cached_session():
x = keras.layers.Input(shape=(3, 2))
x_reachable_update = x * x
layer = keras.layers.Bidirectional(keras.layers.SimpleRNN(3))
_ = layer(x)
assert not layer.updates
assert not layer.get_updates_for(None)
assert not layer.get_updates_for(x)
# TODO(b/128684069): Remove when Wrapper sublayers are __call__'d.
with base_layer_utils.call_context().enter(layer, x, True, None):
layer.forward_layer.add_update(x_reachable_update, inputs=x)
layer.forward_layer.add_update(1, inputs=None)
layer.backward_layer.add_update(x_reachable_update, inputs=x)
layer.backward_layer.add_update(1, inputs=None)
assert len(layer.updates) == 4
assert len(layer.get_updates_for(None)) == 2
assert len(layer.get_updates_for(x)) == 2
def test_Bidirectional_losses(self):
with self.cached_session():
x = keras.layers.Input(shape=(3, 2))
x_reachable_loss = x * x
layer = keras.layers.Bidirectional(
keras.layers.SimpleRNN(
3, kernel_regularizer='l1', bias_regularizer='l1',
activity_regularizer='l1'))
_ = layer(x)
assert len(layer.losses) == 6
assert len(layer.get_losses_for(None)) == 4
assert len(layer.get_losses_for(x)) == 2
# Create a random tensor that is not conditional on the inputs.
with keras.backend.get_graph().as_default():
const_tensor = constant_op.constant(1)
layer.forward_layer.add_loss(x_reachable_loss, inputs=x)
layer.forward_layer.add_loss(const_tensor, inputs=None)
layer.backward_layer.add_loss(x_reachable_loss, inputs=x)
layer.backward_layer.add_loss(const_tensor, inputs=None)
assert len(layer.losses) == 10
assert len(layer.get_losses_for(None)) == 6
assert len(layer.get_losses_for(x)) == 4
def test_Bidirectional_with_constants(self):
with self.cached_session():
# Test basic case.
x = keras.Input((5, 5))
c = keras.Input((3,))
cell = _RNNCellWithConstants(32, 3)
custom_objects = {'_RNNCellWithConstants': _RNNCellWithConstants}
with generic_utils.CustomObjectScope(custom_objects):
layer = keras.layers.Bidirectional(keras.layers.RNN(cell))
y = layer(x, constants=c)
model = keras.Model([x, c], y)
model.compile(optimizer='rmsprop', loss='mse')
model.train_on_batch(
[np.zeros((6, 5, 5)), np.zeros((6, 3))],
np.zeros((6, 64))
)
# Test basic case serialization.
x_np = np.random.random((6, 5, 5))
c_np = np.random.random((6, 3))
y_np = model.predict([x_np, c_np])
weights = model.get_weights()
config = layer.get_config()
with generic_utils.CustomObjectScope(custom_objects):
layer = keras.layers.Bidirectional.from_config(copy.deepcopy(config))
y = layer(x, constants=c)
model = keras.Model([x, c], y)
model.set_weights(weights)
y_np_2 = model.predict([x_np, c_np])
self.assertAllClose(y_np, y_np_2, atol=1e-4)
# Test flat list inputs
with generic_utils.CustomObjectScope(custom_objects):
layer = keras.layers.Bidirectional.from_config(copy.deepcopy(config))
y = layer([x, c])
model = keras.Model([x, c], y)
model.set_weights(weights)
y_np_3 = model.predict([x_np, c_np])
self.assertAllClose(y_np, y_np_3, atol=1e-4)
def test_Bidirectional_with_constants_layer_passing_initial_state(self):
with self.cached_session():
# Test basic case.
x = keras.Input((5, 5))
c = keras.Input((3,))
s_for = keras.Input((32,))
s_bac = keras.Input((32,))
cell = _RNNCellWithConstants(32, 3)
custom_objects = {'_RNNCellWithConstants': _RNNCellWithConstants}
with generic_utils.CustomObjectScope(custom_objects):
layer = keras.layers.Bidirectional(keras.layers.RNN(cell))
y = layer(x, initial_state=[s_for, s_bac], constants=c)
model = keras.Model([x, s_for, s_bac, c], y)
model.compile(optimizer='rmsprop', loss='mse')
model.train_on_batch(
[np.zeros((6, 5, 5)),
np.zeros((6, 32)),
np.zeros((6, 32)),
np.zeros((6, 3))],
np.zeros((6, 64))
)
# Test basic case serialization.
x_np = np.random.random((6, 5, 5))
s_fw_np = np.random.random((6, 32))
s_bk_np = np.random.random((6, 32))
c_np = np.random.random((6, 3))
y_np = model.predict([x_np, s_fw_np, s_bk_np, c_np])
weights = model.get_weights()
config = layer.get_config()
with generic_utils.CustomObjectScope(custom_objects):
layer = keras.layers.Bidirectional.from_config(copy.deepcopy(config))
y = layer(x, initial_state=[s_for, s_bac], constants=c)
model = keras.Model([x, s_for, s_bac, c], y)
model.set_weights(weights)
y_np_2 = model.predict([x_np, s_fw_np, s_bk_np, c_np])
self.assertAllClose(y_np, y_np_2, atol=1e-4)
# Verify that state is used
y_np_2_different_s = model.predict(
[x_np, s_fw_np + 10., s_bk_np + 10., c_np])
assert np.mean(y_np - y_np_2_different_s) != 0
# Test flat list inputs
with generic_utils.CustomObjectScope(custom_objects):
layer = keras.layers.Bidirectional.from_config(copy.deepcopy(config))
y = layer([x, s_for, s_bac, c])
model = keras.Model([x, s_for, s_bac, c], y)
model.set_weights(weights)
y_np_3 = model.predict([x_np, s_fw_np, s_bk_np, c_np])
self.assertAllClose(y_np, y_np_3, atol=1e-4)
@tf_test_util.run_in_graph_and_eager_modes
def test_Bidirectional_output_shape_return_types(self):
class TestLayer(keras.layers.SimpleRNN):
def call(self, inputs):
return array_ops.concat([inputs, inputs], axis=-1)
def compute_output_shape(self, input_shape):
output_shape = tensor_shape.TensorShape(input_shape).as_list()
output_shape[-1] = output_shape[-1] * 2
return tensor_shape.TensorShape(output_shape)
class TestListLayer(TestLayer):
def compute_output_shape(self, input_shape):
shape = super(TestListLayer, self).compute_output_shape(input_shape)
return shape.as_list()
class TestTupleLayer(TestLayer):
def compute_output_shape(self, input_shape):
shape = super(TestTupleLayer, self).compute_output_shape(input_shape)
return tuple(shape.as_list())
# Layers can specify output shape as list/tuple/TensorShape
test_layers = [TestLayer, TestListLayer, TestTupleLayer]
for layer in test_layers:
input_layer = keras.layers.Bidirectional(layer(1))
inputs = keras.backend.placeholder(shape=(None, 2, 4))
output = input_layer(inputs)
self.assertEqual(output.shape.as_list(), [None, 2, 16])
self.assertEqual(
input_layer.compute_output_shape([None, 2, 4]).as_list(),
[None, 2, 16])
def test_Bidirectional_last_output_with_masking(self):
rnn = keras.layers.LSTM
samples = 2
dim = 5
timesteps = 3
units = 3
merge_mode = 'concat'
x = np.random.rand(samples, timesteps, dim)
# clear the first record's timestep 2. Last output should be same as state,
# not zeroed.
x[0, 2] = 0
with self.cached_session():
inputs = keras.Input((timesteps, dim))
masked_inputs = keras.layers.Masking()(inputs)
wrapped = keras.layers.Bidirectional(
rnn(units, return_state=True), merge_mode=merge_mode)
outputs = _to_list(wrapped(masked_inputs, training=True))
self.assertLen(outputs, 5)
self.assertEqual(outputs[0].shape.as_list(), [None, units * 2])
model = keras.Model(inputs, outputs)
y = _to_list(model.predict(x))
self.assertLen(y, 5)
self.assertAllClose(y[0], np.concatenate([y[1], y[3]], axis=1))
def test_Bidirectional_sequence_output_with_masking(self):
rnn = keras.layers.LSTM
samples = 2
dim = 5
timesteps = 3
units = 3
merge_mode = 'concat'
x = np.random.rand(samples, timesteps, dim)
# clear the first record's timestep 2, and expect the output of timestep 2
# is also 0s.
x[0, 2] = 0
with self.cached_session():
inputs = keras.Input((timesteps, dim))
masked_inputs = keras.layers.Masking()(inputs)
wrapped = keras.layers.Bidirectional(
rnn(units, return_sequences=True),
merge_mode=merge_mode)
outputs = _to_list(wrapped(masked_inputs, training=True))
self.assertLen(outputs, 1)
self.assertEqual(outputs[0].shape.as_list(), [None, timesteps, units * 2])
model = keras.Model(inputs, outputs)
y = _to_list(model.predict(x))
self.assertLen(y, 1)
self.assertAllClose(y[0][0, 2], np.zeros(units * 2))
@parameterized.parameters(['sum', 'concat'])
@tf_test_util.run_in_graph_and_eager_modes
def test_custom_backward_layer(self, mode):
rnn = keras.layers.SimpleRNN
samples = 2
dim = 2
timesteps = 2
output_dim = 2
x = np.random.random((samples, timesteps, dim))
target_dim = 2 * output_dim if mode == 'concat' else output_dim
y = np.random.random((samples, target_dim))
forward_layer = rnn(output_dim)
backward_layer = rnn(output_dim, go_backwards=True)
# test with Sequential model
model = keras.models.Sequential()
model.add(
keras.layers.Bidirectional(
forward_layer,
merge_mode=mode,
backward_layer=backward_layer,
input_shape=(timesteps, dim)))
model.compile(optimizer='rmsprop', loss='mse')
model.fit(x, y, epochs=1, batch_size=1)
# check whether the model variables are present in the
# trackable list of objects
checkpointed_objects = object_identity.ObjectIdentitySet(
trackable_util.list_objects(model))
for v in model.variables:
self.assertIn(v, checkpointed_objects)
# test compute output shape
ref_shape = model.layers[-1].output.shape
shape = model.layers[-1].compute_output_shape((None, timesteps, dim))
self.assertListEqual(shape.as_list(), ref_shape.as_list())
# test config
model.get_config()
model = keras.models.model_from_json(model.to_json())
model.summary()
@tf_test_util.run_in_graph_and_eager_modes
def test_custom_backward_layer_error_check(self):
rnn = keras.layers.LSTM
units = 2
forward_layer = rnn(units)
backward_layer = rnn(units)
with self.assertRaisesRegexp(ValueError,
'should have different `go_backwards` value.'):
keras.layers.Bidirectional(
forward_layer, merge_mode='concat', backward_layer=backward_layer)
for attr in ('stateful', 'return_sequences', 'return_state'):
kwargs = {attr: True}
backward_layer = rnn(units, go_backwards=True, **kwargs)
with self.assertRaisesRegexp(
ValueError, 'expected to have the same value for attribute ' + attr):
keras.layers.Bidirectional(
forward_layer, merge_mode='concat', backward_layer=backward_layer)
def test_custom_backward_layer_serialization(self):
rnn = keras.layers.LSTM
units = 2
forward_layer = rnn(units)
backward_layer = rnn(units, go_backwards=True)
layer = keras.layers.Bidirectional(
forward_layer, merge_mode='concat', backward_layer=backward_layer)
config = layer.get_config()
layer_from_config = keras.layers.Bidirectional.from_config(config)
new_config = layer_from_config.get_config()
self.assertDictEqual(config, new_config)
def test_rnn_layer_name(self):
rnn = keras.layers.LSTM
units = 2
layer = keras.layers.Bidirectional(rnn(units, name='rnn'))
config = layer.get_config()
self.assertEqual(config['layer']['config']['name'], 'rnn')
layer_from_config = keras.layers.Bidirectional.from_config(config)
self.assertEqual(layer_from_config.forward_layer.name, 'forward_rnn')
self.assertEqual(layer_from_config.backward_layer.name, 'backward_rnn')
def test_custom_backward_rnn_layer_name(self):
rnn = keras.layers.LSTM
units = 2
forward_layer = rnn(units)
backward_layer = rnn(units, go_backwards=True)
layer = keras.layers.Bidirectional(
forward_layer, merge_mode='concat', backward_layer=backward_layer)
config = layer.get_config()
self.assertEqual(config['layer']['config']['name'], 'lstm')
self.assertEqual(config['backward_layer']['config']['name'], 'lstm_1')
layer_from_config = keras.layers.Bidirectional.from_config(config)
self.assertEqual(layer_from_config.forward_layer.name, 'forward_lstm')
self.assertEqual(layer_from_config.backward_layer.name, 'backward_lstm_1')
def test_rnn_with_customized_cell(self):
batch = 20
dim = 5
timesteps = 3
units = 5
merge_mode = 'sum'
class ResidualLSTMCell(keras.layers.LSTMCell):
def call(self, inputs, states, training=None):
output, states = super(ResidualLSTMCell, self).call(inputs, states)
return output + inputs, states
cell = ResidualLSTMCell(units)
forward_layer = keras.layers.RNN(cell)
inputs = keras.Input((timesteps, dim))
bidirectional_rnn = keras.layers.Bidirectional(
forward_layer, merge_mode=merge_mode)
outputs = _to_list(bidirectional_rnn(inputs))
model = keras.Model(inputs, outputs)
model.compile(optimizer='rmsprop', loss='mse')
model.fit(
np.random.random((batch, timesteps, dim)),
np.random.random((batch, units)),
epochs=1,
batch_size=10)
# Test stacking
cell = [ResidualLSTMCell(units), ResidualLSTMCell(units)]
forward_layer = keras.layers.RNN(cell)
inputs = keras.Input((timesteps, dim))
bidirectional_rnn = keras.layers.Bidirectional(
forward_layer, merge_mode=merge_mode)
outputs = _to_list(bidirectional_rnn(inputs))
model = keras.Model(inputs, outputs)
model.compile(optimizer='rmsprop', loss='mse')
model.fit(
np.random.random((batch, timesteps, dim)),
np.random.random((batch, units)),
epochs=1,
batch_size=10)
@tf_test_util.run_v2_only
def test_wrapped_rnn_cell(self):
# See https://github.com/tensorflow/tensorflow/issues/26581.
batch = 20
dim = 5
timesteps = 3
units = 5
merge_mode = 'sum'
cell = keras.layers.LSTMCell(units)
cell = ResidualWrapper(cell)
rnn = keras.layers.RNN(cell)
inputs = keras.Input((timesteps, dim))
wrapped = keras.layers.Bidirectional(rnn, merge_mode=merge_mode)
outputs = _to_list(wrapped(inputs))
model = keras.Model(inputs, outputs)
model.compile(optimizer='rmsprop', loss='mse')
model.fit(
np.random.random((batch, timesteps, dim)),
np.random.random((batch, units)),
epochs=1,
batch_size=10)
@tf_test_util.run_in_graph_and_eager_modes
def test_Bidirectional_ragged_input(self):
np.random.seed(100)
rnn = keras.layers.LSTM
units = 3
x = ragged_factory_ops.constant(
[[[1, 1, 1], [1, 1, 1]], [[1, 1, 1]],
[[1, 1, 1], [1, 1, 1], [1, 1, 1], [1, 1, 1]],
[[1, 1, 1], [1, 1, 1], [1, 1, 1]]],
ragged_rank=1)
x = math_ops.cast(x, 'float32')
# pylint: disable=g-long-lambda
with self.cached_session():
for merge_mode in ['ave', 'concat', 'mul']:
if merge_mode == 'ave':
merge_func = lambda y, y_rev: (y + y_rev) / 2
elif merge_mode == 'concat':
merge_func = lambda y, y_rev: ragged_concat_ops.concat(
(y, y_rev), axis=-1)
elif merge_mode == 'mul':
merge_func = lambda y, y_rev: (y * y_rev)
inputs = keras.Input(
shape=(None, 3), batch_size=4, dtype='float32', ragged=True)
layer = keras.layers.Bidirectional(
rnn(units, return_sequences=True), merge_mode=merge_mode)
f_merged = keras.backend.function([inputs], layer(inputs))
f_forward = keras.backend.function([inputs],
layer.forward_layer(inputs))
f_backward = keras.backend.function(
[inputs],
array_ops.reverse(layer.backward_layer(inputs), axis=[1]))
y_merged = f_merged(x)
y_expected = merge_func(
ragged_tensor.convert_to_tensor_or_ragged_tensor(f_forward(x)),
ragged_tensor.convert_to_tensor_or_ragged_tensor(f_backward(x)))
y_merged = ragged_tensor.convert_to_tensor_or_ragged_tensor(y_merged)
self.assertAllClose(y_merged.flat_values, y_expected.flat_values)
# pylint: enable=g-long-lambda
def _to_list(ls):
if isinstance(ls, list):
return ls
else:
return [ls]
if __name__ == '__main__':
test.main()