blob: 013509102e3e109f9fb82fb506956da406af3a05 [file] [log] [blame]
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for GRU V2 layer."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import shutil
from absl.testing import parameterized
import numpy as np
from tensorflow.core.protobuf import config_pb2
from tensorflow.core.protobuf import rewriter_config_pb2
from tensorflow.python import keras
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.eager import backprop
from tensorflow.python.eager import context
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import random_seed
from tensorflow.python.framework import test_util
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import testing_utils
from tensorflow.python.keras.layers import recurrent as rnn_v1
from tensorflow.python.keras.layers import recurrent_v2 as rnn
from tensorflow.python.keras.utils import np_utils
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import gen_math_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import random_ops
from tensorflow.python.platform import test
from tensorflow.python.training import gradient_descent
# Global config for grappler setting that is used for graph mode test.
_rewrites = rewriter_config_pb2.RewriterConfig()
_rewrites.implementation_selector = rewriter_config_pb2.RewriterConfig.ON
_rewrites.min_graph_nodes = -1
_graph_options = config_pb2.GraphOptions(rewrite_options=_rewrites)
_config = config_pb2.ConfigProto(graph_options=_graph_options)
@keras_parameterized.run_all_keras_modes(config=_config)
class GRUV2Test(keras_parameterized.TestCase):
@parameterized.named_parameters(
('non_tan_activation', 'relu', 'sigmoid', 0, False, True, True),
('non_sigmoid_recur_activation', 'tanh', 'relu', 0, False, True, True),
('use_recurrent_dropout', 'tanh', 'sigmoid', 0.1, False, True, True),
('unroll', 'tanh', 'sigmoid', 0, True, True, True),
('not_use_bias', 'tanh', 'sigmoid', 0, False, False, True),
('not_reset_after', 'tanh', 'sigmoid', 0, False, True, False)
)
def test_could_use_defun_backend(self, activation, recurrent_activation,
recurrent_dropout, unroll, use_bias,
reset_after):
layer = rnn.GRU(1,
activation=activation,
recurrent_activation=recurrent_activation,
recurrent_dropout=recurrent_dropout,
unroll=unroll,
use_bias=use_bias,
reset_after=reset_after)
self.assertFalse(layer.could_use_cudnn)
def test_keras_model_with_gru(self):
input_shape = 10
rnn_state_size = 8
output_shape = 8
timestep = 4
batch = 100
epoch = 10
(x_train, y_train), _ = testing_utils.get_test_data(
train_samples=batch,
test_samples=0,
input_shape=(timestep, input_shape),
num_classes=output_shape)
y_train = np_utils.to_categorical(y_train, output_shape)
layer = rnn.GRU(rnn_state_size)
inputs = keras.layers.Input(
shape=[timestep, input_shape], dtype=dtypes.float32)
outputs = layer(inputs)
model = keras.models.Model(inputs, outputs)
model.compile('rmsprop', loss='mse')
model.fit(x_train, y_train, epochs=epoch)
model.evaluate(x_train, y_train)
model.predict(x_train)
def test_dynamic_behavior_GRU(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
layer = rnn.GRU(units, input_shape=(None, embedding_dim))
model = keras.models.Sequential()
model.add(layer)
model.compile(gradient_descent.GradientDescentOptimizer(0.001), 'mse')
x = np.random.random((num_samples, timesteps, embedding_dim))
y = np.random.random((num_samples, units))
model.train_on_batch(x, y)
def test_stacking_GRU(self):
inputs = np.random.random((2, 3, 4))
targets = np.abs(np.random.random((2, 3, 5)))
targets /= targets.sum(axis=-1, keepdims=True)
model = keras.models.Sequential()
model.add(rnn.GRU(10, return_sequences=True, unroll=False))
model.add(rnn.GRU(5, return_sequences=True, unroll=False))
model.compile(
loss='categorical_crossentropy',
optimizer=gradient_descent.GradientDescentOptimizer(0.01))
model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1)
def test_from_config_GRU(self):
layer_class = rnn.GRU
for stateful in (False, True):
l1 = layer_class(units=1, stateful=stateful)
l2 = layer_class.from_config(l1.get_config())
assert l1.get_config() == l2.get_config()
@test_util.run_v2_only
def test_gru_v2_feature_parity_with_canonical_gru(self):
input_shape = 10
rnn_state_size = 8
timestep = 4
batch = 20
(x_train, y_train), _ = testing_utils.get_test_data(
train_samples=batch,
test_samples=0,
input_shape=(timestep, input_shape),
num_classes=rnn_state_size,
random_seed=random_seed.DEFAULT_GRAPH_SEED)
y_train = np_utils.to_categorical(y_train, rnn_state_size)
# For the last batch item of the test data, we filter out the last
# timestep to simulate the variable length sequence and masking test.
x_train[-2:, -1, :] = 0.0
y_train[-2:] = 0
inputs = keras.layers.Input(
shape=[timestep, input_shape], dtype=dtypes.float32)
masked_input = keras.layers.Masking()(inputs)
gru_layer = rnn_v1.GRU(rnn_state_size,
recurrent_activation='sigmoid',
reset_after=True)
output = gru_layer(masked_input)
gru_model = keras.models.Model(inputs, output)
weights = gru_model.get_weights()
y_1 = gru_model.predict(x_train)
gru_model.compile('rmsprop', 'mse')
gru_model.fit(x_train, y_train)
y_2 = gru_model.predict(x_train)
with test_util.device(use_gpu=True):
cudnn_layer = rnn.GRU(rnn_state_size,
recurrent_activation='sigmoid',
reset_after=True)
cudnn_model = keras.models.Model(inputs, cudnn_layer(masked_input))
cudnn_model.set_weights(weights)
y_3 = cudnn_model.predict(x_train)
cudnn_model.compile('rmsprop', 'mse')
cudnn_model.fit(x_train, y_train)
y_4 = cudnn_model.predict(x_train)
self.assertAllClose(y_1, y_3, rtol=2e-5, atol=2e-5)
self.assertAllClose(y_2, y_4, rtol=2e-5, atol=2e-5)
@parameterized.named_parameters(
# test_name, use_bias, bias_initializer, activation
('normal', True, 'zeros'),
('no_bias', False, 'zeros'),
('random_bias', True, 'random_uniform'),
)
def test_gru_v2_model_save_load(self, use_bias, bias_initializer):
temp_dir = self.get_temp_dir()
self.addCleanup(shutil.rmtree, temp_dir)
h5_path = os.path.join(temp_dir, 'test.h5')
batch = 10
timestep = 3
input_dim = 5
units = 2
x = np.random.random((batch, timestep, input_dim))
def build_model():
inputs = keras.layers.Input(
shape=[timestep, input_dim], dtype=dtypes.float32)
layer = rnn.GRU(
units,
use_bias=use_bias,
bias_initializer=bias_initializer)
output = layer(inputs)
return keras.models.Model(inputs, output), layer
model, layer = build_model()
y_ref = model.predict(x)
model.save_weights(h5_path)
cloned_model, new_layer = build_model()
cloned_model.load_weights(h5_path)
y = cloned_model.predict(x)
self.assertAllClose(y, y_ref)
self.assertAllClose(layer.get_weights(), new_layer.get_weights())
def test_gru_v2_output_on_multiple_kernel(self):
input_shape = 10
rnn_state_size = 8
timestep = 4
batch = 100
x_train = np.random.random((batch, timestep, input_shape))
inputs = keras.layers.Input(
shape=[timestep, input_shape], dtype=dtypes.float32)
with test_util.device(use_gpu=False):
layer = rnn.GRU(rnn_state_size)
output = layer(inputs)
cpu_model = keras.models.Model(inputs, output)
weights = cpu_model.get_weights()
y_1 = cpu_model.predict(x_train)
with test_util.device(use_gpu=True):
layer = rnn.GRU(rnn_state_size)
output = layer(inputs)
gpu_model = keras.models.Model(inputs, output)
gpu_model.set_weights(weights)
y_2 = gpu_model.predict(x_train)
# Note that CuDNN uses 'sigmoid' as activation, so the GRU V2 uses
# 'sigmoid' as default. Construct the canonical GRU with sigmoid to achieve
# the same output.
with test_util.device(use_gpu=True):
layer = rnn_v1.GRU(rnn_state_size,
recurrent_activation='sigmoid',
reset_after=True)
output = layer(inputs)
canonical_model = keras.models.Model(inputs, output)
canonical_model.set_weights(weights)
y_3 = canonical_model.predict(x_train)
self.assertAllClose(y_1, y_2, rtol=1e-5, atol=1e-5)
self.assertAllClose(y_2, y_3, rtol=1e-5, atol=1e-5)
@parameterized.named_parameters(
# test_name, time_major, go_backwards
('normal', False, False),
('time_major', True, False),
('go_backwards', False, True),
('both', True, True),
)
def test_time_major_and_go_backward(self, time_major, go_backwards):
input_shape = 10
rnn_state_size = 8
timestep = 4
batch = 100
x_train = np.random.random((batch, timestep, input_shape))
def build_model(layer_cls):
inputs = keras.layers.Input(
shape=[timestep, input_shape], dtype=dtypes.float32)
layer = layer_cls(rnn_state_size,
recurrent_activation='sigmoid',
time_major=time_major,
return_sequences=True,
go_backwards=go_backwards,
reset_after=True)
if time_major:
converted_input = keras.layers.Lambda(
lambda t: array_ops.transpose(t, [1, 0, 2]))(inputs)
outputs = layer(converted_input)
outputs = keras.layers.Lambda(
lambda t: array_ops.transpose(t, [1, 0, 2]))(outputs)
else:
outputs = layer(inputs)
return keras.models.Model(inputs, outputs)
gru_model = build_model(rnn_v1.GRU)
y_ref = gru_model.predict(x_train)
weights = gru_model.get_weights()
gru_v2_model = build_model(rnn.GRU)
gru_v2_model.set_weights(weights)
y = gru_v2_model.predict(x_train)
self.assertAllClose(y, y_ref)
def test_with_masking_layer_GRU(self):
layer_class = rnn.GRU
inputs = np.random.random((2, 3, 4))
targets = np.abs(np.random.random((2, 3, 5)))
targets /= targets.sum(axis=-1, keepdims=True)
model = keras.models.Sequential()
model.add(keras.layers.Masking(input_shape=(3, 4)))
model.add(layer_class(units=5, return_sequences=True, unroll=False))
model.compile(loss='categorical_crossentropy',
optimizer=gradient_descent.GradientDescentOptimizer(0.001))
model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1)
def test_masking_with_stacking_GRU(self):
inputs = np.random.random((2, 3, 4))
targets = np.abs(np.random.random((2, 3, 5)))
targets /= targets.sum(axis=-1, keepdims=True)
model = keras.models.Sequential()
model.add(keras.layers.Masking(input_shape=(3, 4)))
model.add(rnn.GRU(10, return_sequences=True, unroll=False))
model.add(rnn.GRU(5, return_sequences=True, unroll=False))
model.compile(
loss='categorical_crossentropy',
optimizer=gradient_descent.GradientDescentOptimizer(0.01))
model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1)
def test_return_sequences_GRU(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
testing_utils.layer_test(
rnn.GRU,
kwargs={'units': units,
'return_sequences': True},
input_shape=(num_samples, timesteps, embedding_dim))
def test_float64_GRU(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
testing_utils.layer_test(
rnn.GRU,
kwargs={'units': units,
'return_sequences': True,
'dtype': 'float64'},
input_shape=(num_samples, timesteps, embedding_dim),
input_dtype='float64')
def test_return_states_GRU(self):
layer_class = rnn.GRU
x = np.random.random((2, 3, 4))
y = np.abs(np.random.random((2, 5)))
s = np.abs(np.random.random((2, 5)))
inputs = keras.layers.Input(
shape=[3, 4], dtype=dtypes.float32)
masked = keras.layers.Masking()(inputs)
outputs, states = layer_class(units=5, return_state=True)(masked)
model = keras.models.Model(inputs, [outputs, states])
model.compile(loss='categorical_crossentropy',
optimizer=gradient_descent.GradientDescentOptimizer(0.001))
model.fit(x, [y, s], epochs=1, batch_size=2, verbose=1)
def test_dropout_GRU(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
testing_utils.layer_test(
rnn.GRU,
kwargs={'units': units,
'dropout': 0.1,
'recurrent_dropout': 0.1},
input_shape=(num_samples, timesteps, embedding_dim))
def test_constraints_GRU(self):
embedding_dim = 4
layer_class = rnn.GRU
k_constraint = keras.constraints.max_norm(0.01)
r_constraint = keras.constraints.max_norm(0.01)
b_constraint = keras.constraints.max_norm(0.01)
layer = layer_class(
5,
return_sequences=False,
weights=None,
input_shape=(None, embedding_dim),
kernel_constraint=k_constraint,
recurrent_constraint=r_constraint,
bias_constraint=b_constraint)
layer.build((None, None, embedding_dim))
self.assertEqual(layer.cell.kernel.constraint, k_constraint)
self.assertEqual(layer.cell.recurrent_kernel.constraint, r_constraint)
self.assertEqual(layer.cell.bias.constraint, b_constraint)
@parameterized.parameters([0, 1, 2])
def test_implementation_mode_GRU(self, implementation_mode):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
testing_utils.layer_test(
rnn.GRU,
kwargs={'units': units,
'implementation': implementation_mode},
input_shape=(num_samples, timesteps, embedding_dim))
def test_regularizers_GRU(self):
embedding_dim = 4
layer_class = rnn.GRU
layer = layer_class(
5,
return_sequences=False,
weights=None,
input_shape=(None, embedding_dim),
kernel_regularizer=keras.regularizers.l1(0.01),
recurrent_regularizer=keras.regularizers.l1(0.01),
bias_regularizer='l2',
activity_regularizer='l1')
layer.build((None, None, 2))
self.assertEqual(len(layer.losses), 3)
x = keras.backend.variable(np.ones((2, 3, 2)))
layer(x)
if context.executing_eagerly():
self.assertEqual(len(layer.losses), 4)
else:
self.assertEqual(len(layer.get_losses_for(x)), 1)
def test_statefulness_GRU(self):
num_samples = 2
timesteps = 3
embedding_dim = 4
units = 2
layer_class = rnn.GRU
model = keras.models.Sequential()
model.add(
keras.layers.Embedding(
4,
embedding_dim,
mask_zero=True,
input_length=timesteps,
batch_input_shape=(num_samples, timesteps)))
layer = layer_class(
units, return_sequences=False, stateful=True, weights=None)
model.add(layer)
model.compile(
optimizer=gradient_descent.GradientDescentOptimizer(0.01),
loss='mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
out1 = model.predict(np.ones((num_samples, timesteps)))
self.assertEqual(out1.shape, (num_samples, units))
# train once so that the states change
model.train_on_batch(
np.ones((num_samples, timesteps)), np.ones((num_samples, units)))
out2 = model.predict(np.ones((num_samples, timesteps)))
# if the state is not reset, output should be different
self.assertNotEqual(out1.max(), out2.max())
# check that output changes after states are reset
# (even though the model itself didn't change)
layer.reset_states()
out3 = model.predict(np.ones((num_samples, timesteps)))
self.assertNotEqual(out2.max(), out3.max())
# check that container-level reset_states() works
model.reset_states()
out4 = model.predict(np.ones((num_samples, timesteps)))
np.testing.assert_allclose(out3, out4, atol=1e-5)
# check that the call to `predict` updated the states
out5 = model.predict(np.ones((num_samples, timesteps)))
self.assertNotEqual(out4.max(), out5.max())
# Check masking
layer.reset_states()
left_padded_input = np.ones((num_samples, timesteps))
left_padded_input[0, :1] = 0
left_padded_input[1, :2] = 0
out6 = model.predict(left_padded_input)
layer.reset_states()
right_padded_input = np.ones((num_samples, timesteps))
right_padded_input[0, -1:] = 0
right_padded_input[1, -2:] = 0
out7 = model.predict(right_padded_input)
layer.reset_states()
mix_padded_input = np.ones((num_samples, timesteps))
mix_padded_input[0, 1] = 0
mix_padded_input[1, 0] = 0
mix_padded_input[1, 2] = 0
out8 = model.predict(mix_padded_input)
self.assertAllClose(out7, out6, atol=1e-5)
self.assertAllClose(out8, out7, atol=1e-5)
def test_stateful_GRU_training(self):
# See b/123587692 for more context.
vocab_size = 20
embedding_dim = 10
batch_size = 8
timestep = 12
units = 5
x = np.random.randint(0, vocab_size, size=(batch_size, timestep))
y = np.random.randint(0, vocab_size, size=(batch_size, timestep))
model = keras.Sequential([
keras.layers.Embedding(vocab_size, embedding_dim,
batch_input_shape=[batch_size, timestep]),
rnn.GRU(units, return_sequences=True, stateful=True),
keras.layers.Dense(vocab_size)
])
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x, y, epochs=1, shuffle=False)
@test_util.run_v2_only
def test_explicit_device_with_go_backward_and_mask(self):
batch_size = 8
timestep = 7
masksteps = 5
units = 4
inputs = np.random.randn(batch_size, timestep, units).astype(np.float32)
mask = np.ones((batch_size, timestep)).astype(np.bool)
mask[:, masksteps:] = 0
# Test for V1 behavior.
lstm_v1 = rnn_v1.GRU(units, return_sequences=True, go_backwards=True)
with test_util.device(use_gpu=True):
outputs_masked_v1 = lstm_v1(inputs, mask=constant_op.constant(mask))
outputs_trimmed_v1 = lstm_v1(inputs[:, :masksteps])
self.assertAllClose(outputs_masked_v1[:, -masksteps:], outputs_trimmed_v1)
# Test for V2 behavior.
lstm = rnn.GRU(units, return_sequences=True, go_backwards=True)
with test_util.device(use_gpu=True):
outputs_masked = lstm(inputs, mask=constant_op.constant(mask))
outputs_trimmed = lstm(inputs[:, :masksteps])
self.assertAllClose(outputs_masked[:, -masksteps:], outputs_trimmed)
@test_util.run_deprecated_v1
def test_v1_session_behavior(self):
# See b/139132348 for more details.
x = np.random.uniform(size=(100, 4, 8))
y = np.random.uniform(size=(100, 1))
dataset = dataset_ops.Dataset.from_tensor_slices(
(x, y)).shuffle(100).batch(32)
inp = keras.layers.Input(shape=(4, 8))
layer = rnn.GRU(1)(inp)
layer = keras.layers.Dense(1)(layer)
model = keras.models.Model(inp, layer)
model.compile(loss='mse', optimizer='sgd')
model.fit(dataset)
class GRULayerGradientTapeTest(test.TestCase):
@test_util.run_in_graph_and_eager_modes(config=_config)
def test_in_tape(self):
if not context.executing_eagerly():
self.skipTest('bloo')
time_steps = 10
embedding_size = 11
gru_unit_size = 12
gru = rnn.GRU(gru_unit_size,
return_sequences=True,
return_state=True,
recurrent_activation='sigmoid',
recurrent_initializer='glorot_uniform')
x = random_ops.random_uniform([1, time_steps, embedding_size])
y = random_ops.random_uniform([1, gru_unit_size])
with backprop.GradientTape() as tape:
hidden_state = array_ops.zeros([1, gru_unit_size], dtype=dtypes.float32)
_, state = gru(x, initial_state=hidden_state)
loss = math_ops.reduce_mean(math_ops.square(state - y))
tape.gradient(loss, gru.variables)
@keras_parameterized.run_all_keras_modes(config=_config)
class GRUGraphRewriteTest(keras_parameterized.TestCase):
input_shape = 10
output_shape = 8
rnn_state_size = 8
timestep = 4
batch = 100
epoch = 1
def _test_runtime_with_model(self, model):
(x_train, y_train), _ = testing_utils.get_test_data(
train_samples=self.batch,
test_samples=0,
input_shape=(self.timestep, self.input_shape),
num_classes=self.output_shape)
y_train = np_utils.to_categorical(y_train, self.output_shape)
model.compile(
optimizer='sgd',
loss=['categorical_crossentropy', None],
experimental_run_tf_function=testing_utils.should_run_tf_function())
existing_loss = 0
for _ in range(self.epoch):
history = model.fit(x_train, y_train)
loss_value = history.history['loss'][0]
self.assertNotEqual(existing_loss, loss_value)
existing_loss = loss_value
_, runtime_value = model.predict(x_train)
if test.is_gpu_available():
self.assertEqual(runtime_value[0], rnn._RUNTIME_GPU)
else:
self.assertEqual(runtime_value[0], rnn._RUNTIME_CPU)
@test_util.run_v2_only
def test_GRU_runtime(self):
layer = rnn.GRU(self.rnn_state_size, return_runtime=True)
inputs = keras.layers.Input(
shape=[self.timestep, self.input_shape], dtype=dtypes.float32)
outputs, runtime = layer(inputs)
# Expand the runtime so that it is a 1D tensor instead of scalar.
# TF model does not work with scalar model output, specially during
# aggregation.
runtime = keras.layers.Lambda(
lambda x: array_ops.expand_dims(x, axis=-1))(runtime)
model = keras.models.Model(inputs=inputs, outputs=[outputs, runtime])
self._test_runtime_with_model(model)
@test_util.run_v2_only
def test_GRU_runtime_with_mask(self):
# Masking will affect which backend is selected based on whether the mask
# is strictly right padded.
layer = rnn.GRU(self.rnn_state_size, return_runtime=True)
inputs = keras.layers.Input(
shape=[self.timestep, self.input_shape], dtype=dtypes.float32)
masked_inputs = keras.layers.Masking()(inputs)
outputs, runtime = layer(masked_inputs)
# Expand the runtime so that it is a 1D tensor instead of scalar.
# TF model does not work with scalar model output, specially during
# aggregation.
runtime = keras.layers.Lambda(
lambda x: array_ops.expand_dims(x, axis=-1))(runtime)
model = keras.models.Model(inputs=inputs, outputs=[outputs, runtime])
(x_train, y_train), _ = testing_utils.get_test_data(
train_samples=self.batch,
test_samples=0,
input_shape=(self.timestep, self.input_shape),
num_classes=self.output_shape)
y_train = np_utils.to_categorical(y_train, self.output_shape)
model.compile(
optimizer='sgd',
loss=['categorical_crossentropy', None],
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.fit(x_train, y_train)
# Verify unpadded data.
_, runtime_value = model.predict(x_train)
if test.is_gpu_available():
self.assertEqual(runtime_value[0], rnn._RUNTIME_GPU)
else:
self.assertEqual(runtime_value[0], rnn._RUNTIME_CPU)
# Update x/y to be right padded by setting the last timestep to 0
x_train[:, -1, :] = 0
y_train[:, -1] = 0
_, runtime_value = model.predict(x_train)
if test.is_gpu_available():
self.assertEqual(runtime_value[0], rnn._RUNTIME_GPU)
else:
self.assertEqual(runtime_value[0], rnn._RUNTIME_CPU)
# Further update x/y to be mix padded (masks in the middle), and verify
# only cpu kernel can be selected.
x_train[:, -3, :] = 0
y_train[:, -3] = 0
_, runtime_value = model.predict(x_train)
self.assertEqual(runtime_value[0], rnn._RUNTIME_CPU)
@test_util.run_v2_only
def test_GRU_runtime_with_cond(self):
# This test is to demonstrate the graph rewrite of grappler plugin under
# the condition that the function returns different number of internal
# states.
layer = rnn.GRU(self.rnn_state_size, return_runtime=True)
inputs = keras.layers.Input(
shape=[self.timestep, self.input_shape], dtype=dtypes.float32)
zeros = array_ops.zeros([self.batch, self.output_shape])
dummy_runtime = rnn._runtime(rnn._RUNTIME_UNKNOWN)
a = constant_op.constant(0)
b = constant_op.constant(1)
# Will always run the GRU layer.
outputs, runtime = control_flow_ops.cond(
gen_math_ops.less(a, b),
lambda: layer(inputs),
lambda: (zeros, dummy_runtime))
# Expand the runtime so that it is a 1D tensor instead of scalar.
# TF model does not work with scalar model output, specially during
# aggregation.
runtime = keras.layers.Lambda(
lambda x: array_ops.expand_dims(x, axis=-1))(runtime)
model = keras.models.Model(inputs=inputs, outputs=[outputs, runtime])
self._test_runtime_with_model(model)
if __name__ == '__main__':
test.main()