blob: 98ec9d86184483ae4980a9b9fc84ee467a75a401 [file] [log] [blame]
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for TensorFlow 2.0 layer behavior."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import copy
import os
import sys
import traceback
import numpy as np
from tensorflow.python import keras
from tensorflow.python.eager import context
from tensorflow.python.eager import def_function
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.framework import sparse_tensor
from tensorflow.python.framework import tensor_shape
from tensorflow.python.framework import tensor_spec
from tensorflow.python.framework import test_util
from tensorflow.python.keras import backend
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import testing_utils
from tensorflow.python.keras.engine import base_layer
from tensorflow.python.keras.mixed_precision.experimental import policy
from tensorflow.python.keras.optimizer_v2 import rmsprop
from tensorflow.python.keras.utils import tf_utils
from tensorflow.python.layers import core as legacy_core
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import state_ops
from tensorflow.python.ops import summary_ops_v2
from tensorflow.python.ops import tensor_array_ops
from tensorflow.python.ops import variables
from tensorflow.python.ops.ragged import ragged_tensor
from tensorflow.python.platform import gfile
from tensorflow.python.platform import test
from tensorflow.python.platform import tf_logging
from tensorflow.python.summary import summary_iterator
from tensorflow.python.util import nest
class DynamicLayer(base_layer.Layer):
def __init__(self, dynamic=False, **kwargs):
super(DynamicLayer, self).__init__(dynamic=dynamic, **kwargs)
def call(self, inputs):
samples = tensor_array_ops.TensorArray(
dtype=dtypes.float32, size=array_ops.shape(inputs)[0])
for idx, sample in enumerate(inputs):
samples = samples.write(idx, math_ops.square(sample))
return samples.stack()
def compute_output_shape(self, input_shape):
return input_shape
class InvalidLayer(base_layer.Layer):
def call(self, inputs):
raise ValueError('You did something wrong!')
class BaseLayerTest(keras_parameterized.TestCase):
@keras_parameterized.run_with_all_model_types
def test_dynamic_layer(self):
model = testing_utils.get_model_from_layers([DynamicLayer(dynamic=True)],
input_shape=(3,))
self.assertEqual(model.dynamic, True)
model.compile(rmsprop.RMSprop(0.001), loss='mse')
self.assertEqual(model.run_eagerly, True)
model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3)))
@keras_parameterized.run_with_all_model_types
def test_dynamic_layer_error(self):
with self.assertRaisesRegexp(TypeError,
'attempting to use Python control flow'):
model = testing_utils.get_model_from_layers([DynamicLayer()],
input_shape=(3,))
model.compile(rmsprop.RMSprop(0.001), loss='mse')
model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3)))
@keras_parameterized.run_with_all_model_types
def test_dynamic_layer_error_running_in_graph_mode(self):
with context.graph_mode():
model = testing_utils.get_model_from_layers([DynamicLayer(dynamic=True)],
input_shape=(3,))
self.assertEqual(model.dynamic, True)
# But then you cannot run the model since you're in a graph scope.
with self.assertRaisesRegexp(
ValueError, 'You must enable eager execution'):
model.compile(rmsprop.RMSprop(0.001), loss='mse')
def test_manual_compute_output_shape(self):
class BuildCounter(keras.layers.Layer):
def __init__(self, *args, **kwargs): # pylint: disable=redefined-outer-name
super(BuildCounter, self).__init__(*args, **kwargs)
self.build_counter = 0
def build(self, input_shape):
self.build_counter += 1
def call(self, inputs):
return inputs
with context.eager_mode():
layer = BuildCounter(dtype=dtypes.float64)
output_shape = layer.compute_output_shape((None, 10))
self.assertEqual(layer.build_counter, 1)
self.assertEqual(output_shape.as_list(), [None, 10])
output_signature = layer.compute_output_signature(
tensor_spec.TensorSpec(dtype=dtypes.float64, shape=[None, 10]))
self.assertEqual(layer.build_counter, 1)
self.assertEqual(output_signature.dtype, dtypes.float64)
self.assertEqual(output_signature.shape.as_list(), [None, 10])
layer(np.ones((5, 10)))
self.assertEqual(layer.build_counter, 1)
def test_eager_switch_case_input(self):
with context.eager_mode():
task = keras.Input(shape=(), dtype=dtypes.int32)
control_flow_ops.switch_case(
task[0], [lambda: constant_op.constant(1.0) for _ in range(10)])
def test_dynamic_layer_with_deferred_sequential_model(self):
model = keras.Sequential(
[DynamicLayer(dynamic=True),
keras.layers.Dense(3)])
self.assertEqual(model.dynamic, True)
model.compile(rmsprop.RMSprop(0.001), loss='mse')
self.assertEqual(model.run_eagerly, True)
model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3)))
def test_nested_dynamic_layers_in_eager_mode(self):
inputs = keras.Input((3,))
outputs = DynamicLayer(dynamic=True)(inputs)
inner_model = keras.Model(inputs, outputs)
self.assertEqual(inner_model.dynamic, True)
inputs = keras.Input((3,))
x = DynamicLayer(dynamic=True)(inputs)
outputs = inner_model(x)
model = keras.Model(inputs, outputs)
self.assertEqual(model.dynamic, True)
model.compile(rmsprop.RMSprop(0.001), loss='mse')
self.assertEqual(model.run_eagerly, True)
model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3)))
def test_dynamic_subclassed_model_no_shape_inference(self):
class MyModel(keras.Model):
def __init__(self):
super(MyModel, self).__init__(dynamic=True)
self.layer1 = keras.layers.Dense(3)
self.layer2 = keras.layers.Dense(3)
def call(self, inputs):
if math_ops.reduce_sum(inputs) > 0:
return self.layer1(inputs)
else:
return self.layer2(inputs)
model = MyModel()
self.assertEqual(model.dynamic, True)
model.compile(rmsprop.RMSprop(0.001), loss='mse')
self.assertEqual(model.run_eagerly, True)
model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3)))
self.assertEqual(model.outputs, [None])
def test_dynamic_subclassed_model_with_shape_inference(self):
class MyModel(keras.Model):
def __init__(self):
super(MyModel, self).__init__(dynamic=True)
self.layer1 = keras.layers.Dense(3)
self.layer2 = keras.layers.Dense(3)
def call(self, inputs):
if math_ops.reduce_sum(inputs) > 0:
return self.layer1(inputs)
else:
return self.layer2(inputs)
def compute_output_shape(self, input_shape):
return tensor_shape.TensorShape(
tuple(input_shape[:-1].as_list()) + (3,))
model = MyModel()
self.assertEqual(model.dynamic, True)
model.compile(rmsprop.RMSprop(0.001), loss='mse')
model.train_on_batch(np.random.random((2, 3)), np.random.random((2, 3)))
self.assertEqual(model.outputs[0].shape.as_list(), [None, 3])
def test_deepcopy(self):
with context.eager_mode():
bias_reg = lambda x: 1e-3 * math_ops.reduce_sum(x)
layer = keras.layers.Conv2D(32, (3, 3), bias_regularizer=bias_reg)
# Call the Layer on data to generate regularize losses.
layer(array_ops.ones((1, 10, 10, 3)))
self.assertLen(layer.losses, 1)
new_layer = copy.deepcopy(layer)
self.assertEqual(new_layer.bias_regularizer, bias_reg)
self.assertEqual(layer.get_config(), new_layer.get_config())
@test_util.run_in_graph_and_eager_modes
def test_invalid_forward_pass(self):
inputs = keras.Input((3,))
with self.assertRaisesRegexp(ValueError, 'You did something wrong!'):
_ = InvalidLayer()(inputs)
def test_no_legacy_model(self):
inputs = keras.Input((1,))
legacy_dense_0 = legacy_core.Dense(1, name='legacy_dense_0')
legacy_dense_1 = legacy_core.Dense(1, name='legacy_dense_1')
layer = legacy_dense_0(inputs)
layer = keras.layers.Dense(1)(layer)
layer = legacy_dense_1(layer)
expected_regex = (r'The following are legacy tf\.layers\.Layers:\n '
'{}\n {}'.format(legacy_dense_0, legacy_dense_1))
with self.assertRaisesRegexp(TypeError, expected_regex):
_ = keras.models.Model(inputs=[inputs], outputs=[layer])
model = keras.models.Model(inputs=[inputs], outputs=[inputs])
with self.assertRaisesRegexp(TypeError, expected_regex):
model._insert_layers([legacy_dense_0, legacy_dense_1])
def test_no_legacy_sequential(self):
layers = [
keras.layers.Dense(1),
legacy_core.Dense(1, name='legacy_dense_0')
]
expected_regex = r'legacy tf\.layers\.Layers:\n {}'.format(layers[1])
with self.assertRaisesRegexp(TypeError, expected_regex):
_ = keras.models.Sequential(layers)
with self.assertRaisesRegexp(TypeError, expected_regex):
_ = keras.models.Sequential([keras.layers.Input(shape=(4,))] + layers)
model = keras.models.Sequential()
with self.assertRaisesRegexp(TypeError, expected_regex):
for l in layers:
model.add(l)
@keras_parameterized.run_with_all_model_types
@test_util.run_in_graph_and_eager_modes
def test_build_with_numpy_data(self):
model_layers = [
keras.layers.Dense(3, activation='relu', kernel_initializer='ones'),
keras.layers.Dense(1, activation='sigmoid', kernel_initializer='ones')
]
model = testing_utils.get_model_from_layers(model_layers, input_shape=(4,))
model(np.zeros((2, 4), dtype='float32'))
self.assertTrue(model.built)
@test_util.run_in_graph_and_eager_modes
def test_default_add_weight(self):
class TestLayer(keras.layers.Layer):
def __init__(self):
super(TestLayer, self).__init__()
self.default_weight = self.add_weight()
self.weight_without_name = self.add_weight(shape=(3, 4))
self.regularized_weight_without_name = self.add_weight(
shape=(3, 4), regularizer='l2')
layer = TestLayer()
self.assertEqual(layer.default_weight.shape.as_list(), [])
self.assertEqual(layer.weight_without_name.shape.as_list(), [3, 4])
self.assertEqual(layer.default_weight.dtype.name, 'float32')
self.assertEqual(layer.weight_without_name.dtype.name, 'float32')
self.assertEqual(len(layer.losses), 1)
if not context.executing_eagerly():
# Cannot access tensor.name in eager execution.
self.assertTrue('Variable_2/Regularizer' in layer.losses[0].name)
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
def test_learning_phase_freezing_for_layers(self):
class LearningPhaseLayer(keras.layers.Layer):
def call(self, inputs):
return keras.backend.in_train_phase(
lambda: array_ops.ones_like(inputs),
lambda: array_ops.zeros_like(inputs))
def get_learning_phase_value():
model = keras.models.Sequential([LearningPhaseLayer(input_shape=(1,))])
model._run_eagerly = testing_utils.should_run_eagerly()
model._experimental_run_tf_function = (
testing_utils.should_run_tf_function())
return np.sum(model(np.ones((1, 1))))
self.assertEqual(get_learning_phase_value(), 0)
# Test scope.
with keras.backend.learning_phase_scope(1):
self.assertEqual(get_learning_phase_value(), 1)
# The effects of the scope end after exiting it.
self.assertEqual(get_learning_phase_value(), 0)
# Test setting.
keras.backend.set_learning_phase(1)
self.assertEqual(get_learning_phase_value(), 1)
keras.backend.set_learning_phase(0)
self.assertEqual(get_learning_phase_value(), 0)
@keras_parameterized.run_all_keras_modes
def test_learning_phase_freezing_for_layers_in_predict(self):
if not (testing_utils.should_run_eagerly() or
testing_utils.should_run_tf_function()):
self.skipTest('Predict fails to override the outer learning phase in'
'the FuncGraph path.')
class LearningPhaseLayer(keras.layers.Layer):
def call(self, inputs):
return keras.backend.in_train_phase(
lambda: array_ops.ones_like(inputs),
lambda: array_ops.zeros_like(inputs))
def get_learning_phase_value():
model = keras.models.Sequential([LearningPhaseLayer(input_shape=(1,))])
model._run_eagerly = testing_utils.should_run_eagerly()
model._experimental_run_tf_function = (
testing_utils.should_run_tf_function())
return np.sum(model.predict(np.ones((1, 1))))
self.assertEqual(get_learning_phase_value(), 0)
# Test scope.
with keras.backend.learning_phase_scope(1):
self.assertEqual(get_learning_phase_value(), 0)
# The effects of the scope end after exiting it.
self.assertEqual(get_learning_phase_value(), 0)
# Test setting.
keras.backend.set_learning_phase(1)
self.assertEqual(get_learning_phase_value(), 0)
keras.backend.set_learning_phase(0)
self.assertEqual(get_learning_phase_value(), 0)
# Cannot be enabled with `run_eagerly=True`, see b/123904578
@test_util.run_all_in_graph_and_eager_modes
def test_layer_can_return_variable(self):
class ComputeSum(keras.layers.Layer):
def __init__(self):
super(ComputeSum, self).__init__()
self.total = variables.Variable(
initial_value=array_ops.zeros((1, 1)), trainable=False)
if not context.executing_eagerly():
keras.backend.get_session().run(self.total.initializer)
def call(self, inputs):
self.total.assign_add(inputs)
return self.total
inputs = keras.Input(shape=(1,))
model = keras.Model(inputs, ComputeSum()(inputs))
model.predict(np.ones((1, 1)))
def _get_layer_with_training_arg(self):
class TrainingLayer(keras.layers.Layer):
"""A layer with a `training` argument in a defuned `call`."""
@def_function.function
def call(self, inputs, training=None):
if training is None:
training = keras.backend.learning_phase()
return tf_utils.smart_cond(training,
lambda: array_ops.ones_like(inputs),
lambda: array_ops.zeros_like(inputs))
return TrainingLayer()
@keras_parameterized.run_with_all_model_types
# b/124459427: can't test with `run_eagerly=True` for now.
@test_util.run_in_graph_and_eager_modes
def test_training_arg_in_defun(self):
layer = self._get_layer_with_training_arg()
model = testing_utils.get_model_from_layers([layer], input_shape=(1,))
model.compile(rmsprop.RMSprop(0.),
loss='mae')
history = model.fit(np.zeros((1, 1)), np.zeros((1, 1)))
self.assertEqual(history.history['loss'][0], 1.)
loss = model.evaluate(np.zeros((1, 1)), np.zeros((1, 1)))
self.assertEqual(loss, 0.)
# Test that the argument injection performed in `call` is not active
# when the argument is passed explicitly.
layer = self._get_layer_with_training_arg()
inputs = keras.Input(shape=(1,))
# Pass `training` by name
outputs = layer(inputs, training=False)
model = keras.Model(inputs, outputs)
model.compile(rmsprop.RMSprop(0.),
loss='mae')
history = model.fit(np.zeros((1, 1)), np.zeros((1, 1)))
self.assertEqual(history.history['loss'][0], 0.)
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_raw_variable_assignment(self):
class RawVariableLayer(keras.layers.Layer):
def __init__(self, **kwargs):
super(RawVariableLayer, self).__init__(**kwargs)
# Test variables in nested structure.
self.var_list = [variables.Variable(1.), {'a': variables.Variable(2.)}]
def call(self, inputs):
return inputs * self.var_list[0] * self.var_list[1]['a']
model = testing_utils.get_model_from_layers([RawVariableLayer()],
input_shape=(10,))
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x, y = np.ones((10, 10)), np.ones((10, 10))
# Checks that variables get initialized.
model.fit(x, y, batch_size=2, epochs=2)
@test_util.run_in_graph_and_eager_modes
def test_layer_names(self):
inputs = keras.layers.Input(shape=[2])
add1 = inputs + inputs
add2 = keras.layers.Add()([inputs, inputs])
add3 = inputs + inputs
add4 = keras.layers.Add()([inputs, inputs])
model = keras.models.Model(
inputs=[inputs], outputs=[add1, add2, add3, add4])
actual_names = [l.name for l in model.layers]
graph_names = [
'input_1', 'tf_op_layer_AddV2', 'add', 'tf_op_layer_AddV2_1', 'add_1'
]
eager_names = [
'input_1', 'tf_op_layer_add', 'add', 'tf_op_layer_add_2', 'add_1'
]
for actual, eager, graph in zip(actual_names, graph_names, eager_names):
self.assertIn(actual, {eager, graph})
def test_add_trainable_weight_on_frozen_layer(self):
class TestLayer(keras.layers.Layer):
def build(self, input_shape):
self.w = self.add_weight(shape=(), trainable=True)
def call(self, inputs):
return self.w * inputs
layer = TestLayer()
layer.trainable = False
layer.build(None)
layer.trainable = True
self.assertListEqual(layer.trainable_weights, [layer.w])
@keras_parameterized.run_with_all_model_types
@keras_parameterized.run_all_keras_modes
def test_passing_initial_weights_values(self):
kernel_value = np.random.random((10, 2))
layer_with_weights = keras.layers.Dense(
2, use_bias=False, weights=[kernel_value])
model = testing_utils.get_model_from_layers([layer_with_weights],
input_shape=(10,))
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
inputs = np.random.random((3, 10))
out = model.predict(inputs)
self.assertAllClose(model.layers[-1].get_weights()[0], kernel_value)
self.assertAllClose(out, np.dot(inputs, kernel_value))
@test_util.run_in_graph_and_eager_modes
def test_set_weights_and_get_weights(self):
layer = keras.layers.Dense(2)
layer.build((None, 10))
kernel = np.random.random((10, 2))
bias = np.random.random((2,))
layer.set_weights([kernel, bias])
weights = layer.get_weights()
self.assertEqual(len(weights), 2)
self.assertAllClose(weights[0], kernel)
self.assertAllClose(weights[1], bias)
with self.assertRaisesRegexp(
ValueError, 'but the layer was expecting 2 weights'):
layer.set_weights([1, 2, 3])
with self.assertRaisesRegexp(
ValueError, 'not compatible with provided weight shape'):
layer.set_weights([kernel.T, bias])
def test_get_config_error(self):
class MyLayer(keras.layers.Layer):
def __init__(self, my_kwarg='default', **kwargs):
super(MyLayer, self).__init__(**kwargs)
self.my_kwarg = my_kwarg
# `__init__` includes kwargs but `get_config` is not overridden, so
# an error should be thrown:
with self.assertRaises(NotImplementedError):
MyLayer('custom').get_config()
class MyLayerNew(keras.layers.Layer):
def __init__(self, my_kwarg='default', **kwargs):
super(MyLayerNew, self).__init__(**kwargs)
self.my_kwarg = my_kwarg
def get_config(self):
config = super(MyLayerNew, self).get_config()
config['my_kwarg'] = self.my_kwarg
return config
# Test to make sure that error is not raised if the method call is
# from an overridden `get_config`:
self.assertEqual(MyLayerNew('custom').get_config()['my_kwarg'], 'custom')
class MyLayerNew2(keras.layers.Layer):
def __init__(self, name='MyLayerName', dtype=None, **kwargs): # pylint:disable=redefined-outer-name
super(MyLayerNew2, self).__init__(name=name, dtype=dtype, **kwargs)
# Check that if the kwargs in `__init__` are base layer constructor
# arguments, no error is thrown:
self.assertEqual(MyLayerNew2(name='New').get_config()['name'], 'New')
@test_util.run_in_graph_and_eager_modes
def test_count_params(self):
dense = keras.layers.Dense(16)
dense.build((None, 4))
self.assertEqual(dense.count_params(), 16 * 4 + 16)
dense = keras.layers.Dense(16)
with self.assertRaisesRegexp(ValueError, 'call `count_params`'):
dense.count_params()
model = keras.Sequential(keras.layers.Dense(16))
with self.assertRaisesRegexp(ValueError, 'call `count_params`'):
model.count_params()
dense = keras.layers.Dense(16, input_dim=4)
model = keras.Sequential(dense)
self.assertEqual(model.count_params(), 16 * 4 + 16)
def test_super_not_called(self):
class CustomLayerNotCallingSuper(keras.layers.Layer):
def __init__(self):
pass
layer = CustomLayerNotCallingSuper()
with self.assertRaisesRegexp(RuntimeError, 'You must call `super()'):
layer(np.random.random((10, 2)))
class SymbolicSupportTest(test.TestCase):
def test_using_symbolic_tensors_with_tf_ops(self):
# Single-input.
x = keras.Input((3,))
y = math_ops.square(x)
self.assertEqual(y.graph, keras.backend.get_graph())
# Multi-inputs.
x1, x2 = keras.Input((3,)), keras.Input((3,))
y = array_ops.concat([x1, x2], axis=1)
self.assertEqual(y.graph, keras.backend.get_graph())
# Mixing Keras symbolic tensors and graph tensors from the same graph works.
with keras.backend.get_graph().as_default():
x1 = keras.Input((3,))
x2 = keras.Input((3,))
y = math_ops.matmul(x1, x2)
self.assertEqual(y.graph, keras.backend.get_graph())
# Creating same op type (matmul) multiple times in the Keras graph works.
x1 = keras.Input((3,))
x2 = keras.Input((3,))
y = math_ops.matmul(x1, x2)
self.assertEqual(y.graph, keras.backend.get_graph())
def test_mixing_eager_and_graph_tensors(self):
with ops.Graph().as_default():
x1 = array_ops.ones((3, 3))
x2 = array_ops.ones((3, 3))
self.assertIsInstance(x2, ops.EagerTensor)
with self.assertRaisesRegexp(TypeError, 'Graph tensors'):
math_ops.matmul(x1, x2)
def test_mixing_numpy_arrays_and_graph_tensors(self):
with ops.Graph().as_default():
x1 = array_ops.ones((3, 3))
x2 = np.ones((3, 3), dtype='float32')
with self.assertRaisesRegexp(TypeError, 'Graph tensors'):
math_ops.matmul(x1, x2)
@test_util.run_in_graph_and_eager_modes
def test_mixing_keras_symbolic_tensors_and_eager_tensors(self):
x1 = keras.Input((3,))
x2 = array_ops.ones((3, 3))
y = math_ops.matmul(x1, x2)
self.assertEqual(y.graph, keras.backend.get_graph())
fn = keras.backend.function(inputs=[x1], outputs=[y])
x_val = np.random.random((3, 3))
y_val = np.ones((3, 3))
self.assertAllClose(fn([x_val])[0],
np.matmul(x_val, y_val),
atol=1e-5)
@test_util.run_in_graph_and_eager_modes
def test_mixing_keras_symbolic_tensors_and_numpy_arrays(self):
x1 = keras.Input((3,))
x2 = np.ones((3, 3), dtype='float32')
y = math_ops.matmul(x1, x2)
self.assertEqual(y.graph, keras.backend.get_graph())
fn = keras.backend.function(inputs=[x1], outputs=[y])
x_val = np.random.random((3, 3))
y_val = np.ones((3, 3))
self.assertAllClose(fn([x_val])[0],
np.matmul(x_val, y_val),
atol=1e-5)
@test_util.run_in_graph_and_eager_modes
def test_reraising_exception(self):
# When layer is not dynamic, we have some pattern matching during exception
# handling to detect when the user is trying to use python control flow.
# When an exception is thrown but the pattern doesn't match, we want to
# preserve the originating stack trace. An early implementation of this
# logic lost the stack trace. We test the correct behavior here.
class TypeErrorLayer(base_layer.Layer):
def call(self, inputs):
def easily_identifiable_name():
raise TypeError('Non-matching TypeError message.')
easily_identifiable_name()
inputs = keras.Input((3,))
try:
_ = TypeErrorLayer()(inputs)
except TypeError as e:
if hasattr(e, 'ag_error_metadata'):
self.assertIn('easily_identifiable_name', str(e))
# See ErrorMetadataBase in autograph/pyct/errors.py
function_name = e.ag_error_metadata.translated_stack[-1].function_name
else:
tb = traceback.extract_tb(sys.exc_info()[2])
last_entry = tb[-1]
function_name = last_entry[2]
self.assertEqual(function_name, 'easily_identifiable_name')
@test_util.run_in_graph_and_eager_modes
def test_summaries_in_tf_function(self):
if not context.executing_eagerly():
return
class MyLayer(keras.layers.Layer):
def call(self, inputs):
summary_ops_v2.scalar('mean', math_ops.reduce_mean(inputs))
return inputs
tmp_dir = self.get_temp_dir()
writer = summary_ops_v2.create_file_writer_v2(tmp_dir)
with writer.as_default(), summary_ops_v2.always_record_summaries():
my_layer = MyLayer()
x = array_ops.ones((10, 10))
def my_fn(x):
return my_layer(x)
_ = my_fn(x)
event_file = gfile.Glob(os.path.join(tmp_dir, 'events*'))
self.assertLen(event_file, 1)
event_file = event_file[0]
tags = set()
for e in summary_iterator.summary_iterator(event_file):
for val in e.summary.value:
tags.add(val.tag)
self.assertEqual(set(['my_layer/mean']), tags)
@test_util.run_all_in_graph_and_eager_modes
class NestedTrackingTest(test.TestCase):
def test_nested_layer_variable_tracking(self):
# Test that variables from nested sublayers are
# being tracked by subclassed layers.
class MyLayer(keras.layers.Layer):
def __init__(self):
super(MyLayer, self).__init__()
self.dense1 = keras.layers.Dense(1)
self.dense2 = keras.layers.BatchNormalization()
def build(self, input_shape):
self.v1 = self.add_weight('v1', shape=input_shape[1:].as_list())
self.v2 = variables.Variable(
name='v2',
initial_value=np.zeros(input_shape[1:].as_list(), dtype='float32'),
trainable=False)
def call(self, inputs):
x = self.dense1(inputs) + self.dense2(inputs)
return x + self.v1 + self.v2
layer = MyLayer()
inputs = keras.Input((1,))
_ = layer(inputs)
self.assertEqual(len(layer.weights), 8)
self.assertEqual(len(layer.trainable_weights), 5)
self.assertEqual(len(layer.non_trainable_weights), 3)
layer.dense1.trainable = False
self.assertEqual(len(layer.weights), 8)
self.assertEqual(len(layer.trainable_weights), 3)
self.assertEqual(len(layer.non_trainable_weights), 5)
layer.trainable = False
self.assertEqual(len(layer.weights), 8)
self.assertEqual(len(layer.trainable_weights), 0)
self.assertEqual(len(layer.non_trainable_weights), 8)
self.assertEqual(
{id(v) for v in [layer.dense1, layer.dense2, layer.v1, layer.v2]},
{id(v) for _, v in layer._checkpoint_dependencies})
def test_nested_layer_updates_losses_tracking(self):
# Test that updates and losses from nested sublayers are
# being tracked by subclassed layers.
class UpdateAndLossLayer(keras.layers.Layer):
def build(self, _):
self.v1 = self.add_weight('v1', shape=())
def call(self, inputs):
self.add_loss(math_ops.reduce_sum(inputs))
self.add_update(state_ops.assign_add(self.v1, 1))
return inputs + 1
class MyLayer(keras.layers.Layer):
def build(self, _):
self.v1 = self.add_weight('v1', shape=())
def __init__(self):
super(MyLayer, self).__init__()
self.ul1 = UpdateAndLossLayer()
self.ul2 = UpdateAndLossLayer()
def call(self, inputs):
self.add_loss(math_ops.reduce_sum(inputs))
self.add_update(state_ops.assign_add(self.v1, 1))
x = self.ul1(inputs)
return self.ul2(x)
layer = MyLayer()
if context.executing_eagerly():
inputs = array_ops.ones((3, 1))
_ = layer(inputs)
self.assertEqual(len(layer.losses), 3)
self.assertLen(layer.get_losses_for(None), 3)
else:
inputs = keras.Input((1,))
_ = layer(inputs)
self.assertEqual(len(layer.losses), 3)
self.assertEqual(len(layer.updates), 3)
self.assertLen(layer.get_losses_for(None), 3)
def test_attribute_reassignment(self):
l = keras.layers.Layer()
l.a = keras.layers.Layer()
l.a = []
l.a = variables.Variable(1.)
l.a = keras.layers.Layer()
last_assignment = keras.layers.Layer()
l.a = last_assignment
l.b = variables.Variable(1.)
del l.b
l.c = keras.layers.Layer()
del l.c
l.d = last_assignment
del l.d
self.assertEqual([last_assignment], l._layers)
self.assertEqual([], l.trainable_weights)
self.assertEqual([], l.non_trainable_weights)
self.assertEqual([], l.weights)
del l.a
self.assertEqual([], l._layers)
def test_assign_op_not_tracked_as_variable(self):
class LayerWithAssignAttr(keras.layers.Layer):
def build(self, input_shape):
self.v = variables.Variable(1.)
self.v_assign = self.v.assign_add(2.)
layer = LayerWithAssignAttr()
layer.build((10, 10))
self.assertEqual([layer.v], layer.variables)
def test_layer_class_not_tracked_as_sublayer(self):
# See https://github.com/tensorflow/tensorflow/issues/27431 for details.
class LayerWithClassAttribute(keras.layers.Layer):
def __init__(self):
super(LayerWithClassAttribute, self).__init__()
self.layer_fn = keras.layers.Dense
layer = LayerWithClassAttribute()
self.assertEmpty(layer.variables)
self.assertEmpty(layer.submodules)
def test_layer_call_fn_args(self):
class NonDefunLayer(keras.layers.Layer):
def call(self, inputs, a, mask, b=None, training=None):
return inputs
class DefunLayer(keras.layers.Layer):
@def_function.function
def call(self, x, mask, a, training=None, b=None):
return x
nondefun_layer = NonDefunLayer()
self.assertEqual(nondefun_layer._call_fn_args,
['inputs', 'a', 'mask', 'b', 'training'])
defun_layer = DefunLayer()
self.assertEqual(defun_layer._call_fn_args,
['x', 'mask', 'a', 'training', 'b'])
@test_util.run_all_in_graph_and_eager_modes
class NameScopingTest(keras_parameterized.TestCase):
def test_name_scope_layer(self):
x = keras.backend.placeholder(shape=(10, 10))
layer = keras.layers.Dense(10, name='MyName')
layer(x)
self.assertEqual(layer.bias.name, 'MyName/bias:0')
self.assertEqual(layer.kernel.name, 'MyName/kernel:0')
def test_name_scope_sublayer(self):
class NameScopeTracker(keras.layers.Layer):
def call(self, inputs):
self.active_name_scope = ops.get_name_scope()
return inputs
x = keras.backend.placeholder(shape=(10, 10))
sublayer = NameScopeTracker(name='Sublayer')
layer = keras.layers.Dense(10, activation=sublayer, name='MyName2')
layer(x)
self.assertEqual(layer.bias.name, 'MyName2/bias:0')
self.assertEqual(layer.kernel.name, 'MyName2/kernel:0')
self.assertEqual(sublayer.active_name_scope, 'MyName2/Sublayer')
def test_name_scope_tf_tensor(self):
x = ops.convert_to_tensor(np.ones((10, 10)))
layer = keras.layers.Dense(
10, activation=keras.layers.ReLU(name='MyAct'), name='MyName3')
layer(x)
self.assertEqual(layer.bias.name, 'MyName3/bias:0')
self.assertEqual(layer.kernel.name, 'MyName3/kernel:0')
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
class AutographControlFlowTest(keras_parameterized.TestCase):
def test_disabling_in_context_is_matched(self):
test_obj = self
class MyLayer(keras.layers.Layer):
def call(self, inputs, training=None):
with test_obj.assertRaisesRegex(TypeError, 'Tensor.*as.*bool'):
if constant_op.constant(False):
return inputs * 1.
return inputs * 0.
@def_function.function(autograph=False)
def test_fn():
return MyLayer()(constant_op.constant([[1., 2., 3.]]))
test_fn()
def test_if_training_pattern_output(self):
class MyLayer(keras.layers.Layer):
def call(self, inputs, training=None):
if training:
return inputs * 1.
return inputs * 0.
inputs = keras.Input((3,))
outputs = MyLayer()(inputs)
model = keras.Model(inputs, outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
train_loss = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3)))
self.assertEqual(train_loss, 0.)
test_loss = model.test_on_batch(np.ones((2, 3)), np.ones((2, 3)))
self.assertEqual(test_loss, 1.)
def test_if_training_pattern_loss(self):
class MyLayer(keras.layers.Layer):
def call(self, inputs, training=None):
if training:
loss = math_ops.reduce_sum(inputs)
else:
loss = 0.
self.add_loss(loss)
return inputs
inputs = keras.Input((3,))
outputs = MyLayer()(inputs)
model = keras.Model(inputs, outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
train_loss = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3)))
self.assertEqual(train_loss, 2 * 3)
test_loss = model.test_on_batch(np.ones((2, 3)), np.ones((2, 3)))
self.assertEqual(test_loss, 0)
def test_if_training_pattern_metric(self):
class MyLayer(keras.layers.Layer):
def call(self, inputs, training=None):
if training:
metric = math_ops.reduce_sum(inputs)
else:
metric = 0.
self.add_metric(metric, name='my_metric', aggregation='mean')
return inputs
inputs = keras.Input((3,))
outputs = MyLayer()(inputs)
model = keras.Model(inputs, outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
for _ in range(3):
_, train_metric = model.train_on_batch(np.ones((2, 3)),
np.ones((2, 3)))
self.assertEqual(train_metric, 2 * 3)
_, test_metric = model.test_on_batch(np.ones((2, 3)),
np.ones((2, 3)))
self.assertEqual(test_metric, 0)
def test_if_training_pattern_update(self):
class MyLayer(keras.layers.Layer):
def build(self, input_shape):
self.counter = self.add_weight(
shape=(), trainable=False, initializer='zeros')
def call(self, inputs, training=None):
if training:
increment = 1.
else:
increment = 0.
self.counter.assign_add(increment)
return inputs
inputs = keras.Input((3,))
layer = MyLayer()
outputs = layer(inputs)
model = keras.Model(inputs, outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.train_on_batch(np.ones((2, 3)), np.ones((2, 3)))
self.assertEqual(keras.backend.get_value(layer.counter), 1.)
def test_conditional_updates_in_call(self):
class MyLayer(keras.layers.Layer):
def __init__(self):
super(MyLayer,
self).__init__(dynamic=testing_utils.should_run_eagerly())
def build(self, input_shape):
self.counter = self.add_weight(
shape=(), trainable=False, initializer='zeros')
def call(self, inputs, training=None):
if training:
z = math_ops.reduce_sum(inputs)
self.add_update(lambda: self.counter.assign_add(z))
return inputs
def compute_output_shape(self, input_shape):
return input_shape
if testing_utils.should_run_eagerly():
inputs = keras.Input((3,))
layer = MyLayer()
outputs = layer(inputs)
model = keras.Model(inputs, outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
model.train_on_batch(np.ones((2, 3)), np.ones((2, 3)))
self.assertEqual(keras.backend.get_value(layer.counter), 6.)
else:
# TODO(fchollet): support the same workflow in graph mode.
with self.assertRaisesRegexp(RuntimeError,
'`add_update` in a control flow branch'):
layer = MyLayer()
layer(keras.Input((3,)))
_ = layer.updates
def test_conditional_losses_in_call(self):
class MyLayer(keras.layers.Layer):
def __init__(self):
super(MyLayer,
self).__init__(dynamic=testing_utils.should_run_eagerly())
def call(self, inputs, training=None):
if training:
self.add_loss(math_ops.reduce_sum(inputs))
return inputs
def compute_output_shape(self, input_shape):
return input_shape
if testing_utils.should_run_eagerly():
inputs = keras.Input((3,))
layer = MyLayer()
outputs = layer(inputs)
model = keras.Model(inputs, outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
loss = model.train_on_batch(np.ones((2, 3)), np.ones((2, 3)))
self.assertEqual(loss, 2 * 3)
else:
with self.assertRaisesRegexp(RuntimeError,
'`add_loss` in a control flow branch'):
layer = MyLayer()(keras.Input((3,)))
def test_conditional_callable_losses(self):
model = keras.Sequential([
keras.layers.Dense(
1, kernel_regularizer=keras.regularizers.l2(1e-4), input_shape=(1,))
])
model._run_eagerly = testing_utils.should_run_eagerly()
model._experimental_run_tf_function = testing_utils.should_run_tf_function()
def assert_graph(t):
if not context.executing_eagerly():
self.assertEqual(t.graph, ops.get_default_graph())
@def_function.function
def get_losses(t):
if t < 0:
return math_ops.reduce_sum(model.losses) * t
else:
return math_ops.reduce_sum(model.losses)
assert_graph(get_losses(constant_op.constant(2.)))
assert_graph(get_losses(constant_op.constant(0.5)))
def test_conditional_metrics_in_call(self):
class MyLayer(keras.layers.Layer):
def __init__(self):
super(MyLayer,
self).__init__(dynamic=testing_utils.should_run_eagerly())
def call(self, inputs, training=None):
if training:
self.add_metric(math_ops.reduce_sum(inputs),
name='sum',
aggregation='mean')
return inputs
def compute_output_shape(self, input_shape):
return input_shape
if testing_utils.should_run_eagerly():
inputs = keras.Input((3,))
layer = MyLayer()
outputs = layer(inputs)
model = keras.Model(inputs, outputs)
model.compile(
'sgd',
'mse',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
history = model.fit(np.ones((2, 3)), np.ones((2, 3)))
self.assertEqual(history.history['sum'][-1], 2 * 3)
else:
# TODO(fchollet): support the same workflow in graph mode.
with self.assertRaisesRegexp(RuntimeError,
'`add_metric` in a control flow branch'):
layer = MyLayer()(keras.Input((3,)))
def test_conditional_activity_regularizer_in_call(self):
class TestModel(keras.Model):
def __init__(self):
super(TestModel, self).__init__(
name='test_model', dynamic=testing_utils.should_run_eagerly())
self.layer = keras.layers.Dense(2, activity_regularizer='l2')
def call(self, x, training=None):
if math_ops.greater(math_ops.reduce_sum(x), 0.0):
return self.layer(x)
else:
return self.layer(x)
model = TestModel()
model.compile(
loss='mse',
optimizer='sgd',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.ones(shape=(10, 1))
y = np.ones(shape=(10, 2))
if testing_utils.should_run_eagerly():
model.fit(x, y, epochs=2, batch_size=5)
else:
with self.assertRaisesRegexp(
RuntimeError, '`activity_regularizer` in a control flow branch'):
model.fit(x, y, epochs=2, batch_size=5)
def test_conditional_activity_regularizer_with_wrappers_in_call(self):
class TestModel(keras.Model):
def __init__(self):
super(TestModel, self).__init__(
name='test_model', dynamic=testing_utils.should_run_eagerly())
self.layer = keras.layers.TimeDistributed(
keras.layers.Dense(2, activity_regularizer='l2'),
input_shape=(3, 4))
def call(self, x, training=None):
if math_ops.greater(math_ops.reduce_sum(x), 0.0):
return self.layer(x)
else:
return self.layer(x)
model = TestModel()
model.compile(
loss='mse',
optimizer='sgd',
run_eagerly=testing_utils.should_run_eagerly(),
experimental_run_tf_function=testing_utils.should_run_tf_function())
x = np.ones(shape=(10, 3, 4))
y = np.ones(shape=(10, 3, 2))
if testing_utils.should_run_eagerly():
model.fit(x, y, epochs=2, batch_size=5)
else:
with self.assertRaisesRegexp(
RuntimeError, '`activity_regularizer` in a control flow branch'):
model.fit(x, y, epochs=2, batch_size=5)
class AddLayer(keras.layers.Layer):
"""A layer which adds it's input to a variable.
Useful for testing a layer with a variable
"""
def build(self, _):
self.v = self.add_weight('v', (), initializer='ones')
self.built = True
def call(self, inputs):
return inputs + self.v
class IdentityLayer(keras.layers.Layer):
"""A layer that returns it's input.
Useful for testing a layer without a variable.
"""
def call(self, inputs):
return inputs
@test_util.run_all_in_graph_and_eager_modes
class DTypeTest(keras_parameterized.TestCase):
# This class only have tests relating to layer.dtype. Tests for dtype policies
# are in mixed_precision/experimental/keras_test.py
# TODO(reedwm): Maybe have a separate test file for input casting tests.
def _const(self, dtype):
return array_ops.constant(1, dtype=dtype)
@testing_utils.enable_v2_dtype_behavior
def test_dtype_defaults_to_floatx(self):
layer = AddLayer()
self.assertEqual(layer.dtype, 'float32')
layer(self._const('float64'))
self.assertEqual(layer.dtype, 'float32') # dtype should not change
try:
backend.set_floatx('float64')
layer = AddLayer()
self.assertEqual(layer.dtype, 'float64')
finally:
backend.set_floatx('float32')
@testing_utils.enable_v2_dtype_behavior
def test_passing_dtype_to_constructor(self):
layer = IdentityLayer(dtype='float64')
layer(self._const('float32'))
self.assertEqual(layer.dtype, 'float64')
layer = IdentityLayer(dtype='int32')
layer(self._const('float32'))
self.assertEqual(layer.dtype, 'int32')
layer = IdentityLayer(dtype=dtypes.float64)
layer(self._const('float32'))
self.assertEqual(layer.dtype, 'float64')
@testing_utils.enable_v2_dtype_behavior
def input_cast_to_dtype(self):
layer = AddLayer()
# Input should be cast to layer.dtype, so output should also be layer.dtype
self.assertEqual(layer(self._const('float64')).dtype, 'float32')
layer = AddLayer(dtype='float64')
self.assertEqual(layer(self._const('float32')).dtype, 'float64')
# Test inputs are not casted if layer.dtype is not floating-point
layer = IdentityLayer(dtype='int32')
self.assertEqual(layer(self._const('float64')).dtype, 'float64')
# Test inputs are not casted if the inputs are not floating-point
layer = IdentityLayer(dtype='float32')
self.assertEqual(layer(self._const('int32')).dtype, 'int32')
# Test Numpy arrays are casted
layer = IdentityLayer(dtype='float64')
self.assertEqual(layer(np.array(1, dtype='float32')).dtype, 'float64')
# Test Python floats are casted
layer = IdentityLayer(dtype='float64')
self.assertEqual(layer(1.).dtype, 'float64')
@testing_utils.enable_v2_dtype_behavior
def multiple_inputs_cast_to_dtype(self):
class MultiIdentityLayer(keras.layers.Layer):
def call(self, inputs):
return [array_ops.identity(x) for x in inputs]
# Testing layer with default dtype of float32
layer = MultiIdentityLayer()
x, y = layer([self._const('float16'), self._const('float32')])
self.assertEqual(x.dtype, 'float32')
self.assertEqual(y.dtype, 'float32')
# Test passing dtype to the constructor
layer = MultiIdentityLayer(dtype='float64')
x, y = layer([self._const('float16'), self._const('float32')])
self.assertEqual(x.dtype, 'float64')
self.assertEqual(y.dtype, 'float64')
# Test several non-floating point types
layer = MultiIdentityLayer(dtype='float64')
x, y, z, w = layer([self._const('float16'), self._const('bool'),
self._const('float64'), self._constant('complex64')])
self.assertEqual(x.dtype, 'float64')
self.assertEqual(y.dtype, 'bool')
self.assertEqual(z.dtype, 'float64')
self.assertEqual(w.dtype, 'complex64')
@testing_utils.enable_v2_dtype_behavior
def test_extra_args_and_kwargs_not_casted(self):
class IdentityLayerWithArgs(keras.layers.Layer):
def call(self, inputs, *args, **kwargs):
return nest.flatten([inputs, args, kwargs])
layer = IdentityLayerWithArgs(dtype='float64')
x, y, z = layer(self._const('float16'), self._const('float16'),
kwarg=self._const('float16'))
self.assertEqual(x.dtype, 'float64')
self.assertEqual(y.dtype, 'float16')
self.assertEqual(z.dtype, 'float16')
@testing_utils.enable_v2_dtype_behavior
def test_layer_without_autocast(self):
class IdentityLayerWithoutAutocast(IdentityLayer):
def __init__(self, *args, **kwargs):
kwargs['autocast'] = False
super(IdentityLayerWithoutAutocast, self).__init__(*args, **kwargs)
layer = IdentityLayerWithoutAutocast(dtype='float64')
self.assertEqual(layer(self._const('float32')).dtype, 'float32')
@testing_utils.enable_v2_dtype_behavior
def test_dtype_warnings(self):
# Test a layer warns when it casts inputs.
layer = IdentityLayer()
with test.mock.patch.object(tf_logging, 'warn') as mock_warn:
layer(self._const('float64'))
self.assertRegexpMatches(
str(mock_warn.call_args),
".*from dtype float64 to the layer's dtype of float32.*"
"The layer has dtype float32 because.*")
# Test a layer does not warn a second time
with test.mock.patch.object(tf_logging, 'warn') as mock_warn:
layer(self._const('float64'))
mock_warn.assert_not_called()
# Test a new layer can warn even if a different layer already warned
layer = IdentityLayer()
with test.mock.patch.object(tf_logging, 'warn') as mock_warn:
layer(self._const('float64'))
self.assertRegexpMatches(
str(mock_warn.call_args),
".*from dtype float64 to the layer's dtype of float32.*"
"The layer has dtype float32 because.*")
# Test a layer does not warn if a dtype is passed
layer = IdentityLayer(dtype='float32')
with test.mock.patch.object(tf_logging, 'warn') as mock_warn:
layer(self._const('float64'))
mock_warn.assert_not_called()
# Test a layer does not warn if a Policy is set:
with policy.policy_scope('float32'):
layer = IdentityLayer()
with test.mock.patch.object(tf_logging, 'warn') as mock_warn:
layer(self._const('float64'))
mock_warn.assert_not_called()
@testing_utils.enable_v2_dtype_behavior
def test_compute_output_signature(self):
class IdentityLayerWithOutputShape(IdentityLayer):
def compute_output_shape(self, input_shape):
return input_shape
layer = IdentityLayerWithOutputShape(dtype='float64')
output_signature = layer.compute_output_signature(
tensor_spec.TensorSpec(shape=(), dtype='float32'))
self.assertEqual(output_signature.shape, ())
self.assertEqual(output_signature.dtype, 'float64')
@testing_utils.enable_v2_dtype_behavior
def test_composite_tensors_input_casting(self):
sparse = sparse_tensor.SparseTensor(
indices=array_ops.constant([[0, 1], [2, 3]], dtype='int64'),
values=array_ops.constant([0., 1.], dtype='float32'),
dense_shape=array_ops.constant([4, 4], dtype='int64'))
ragged = ragged_tensor.RaggedTensor.from_row_splits(
values=array_ops.constant([1., 2., 3.], dtype='float32'),
row_splits=array_ops.constant([0, 2, 2, 3], dtype='int64'))
layer = IdentityLayer(dtype='float16')
layer._supports_ragged_inputs = True
for x in sparse, ragged:
self.assertEqual(x.dtype, 'float32')
y = layer(x)
self.assertEqual(y.dtype, 'float16')
self.assertEqual(type(x), type(y))
def test_supports_ragged_inputs_attribute_error(self):
with self.assertRaisesRegexp(ValueError,
'does not support RaggedTensors'):
ragged = ragged_tensor.RaggedTensor.from_row_splits(
values=array_ops.constant([1., 2., 3.], dtype='float32'),
row_splits=array_ops.constant([0, 2, 2, 3], dtype='int64'))
model = keras.Sequential([
keras.layers.InputLayer(input_shape=(None,), ragged=True),
IdentityLayer()])
model.compile(rmsprop.RMSprop(0.001), loss='mse')
model.train_on_batch(ragged)
@testing_utils.enable_v2_dtype_behavior
def test_passing_non_tensor(self):
layer = IdentityLayer()
x = object()
y = layer(x) # Layer should not cast 'x', as it's not a tensor
self.assertIs(x, y)
@testing_utils.disable_v2_dtype_behavior
def test_v1_behavior(self):
# Test dtype defaults to None and inferred from input
layer = IdentityLayer()
self.assertIsNone(layer.dtype)
layer(self._const('float64'))
self.assertEqual(layer.dtype, 'float64')
# Test layer does not cast to dtype
self.assertEqual(layer(self._const('float32')).dtype, 'float32')
if __name__ == '__main__':
ops.enable_eager_execution()
test.main()