blob: cfff5d089644056a99b7d7caa0424ae0d4bd7240 [file] [log] [blame]
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Functional preprocessing stage tests."""
# pylint: disable=g-classes-have-attributes
import time
import numpy as np
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras.engine import base_preprocessing_layer
from tensorflow.python.keras.engine.input_layer import Input
from tensorflow.python.keras.layers import convolutional
from tensorflow.python.keras.layers import core
from tensorflow.python.keras.layers import merge
from tensorflow.python.keras.layers.preprocessing import image_preprocessing
from tensorflow.python.keras.layers.preprocessing import normalization
from tensorflow.python.keras.layers.preprocessing import preprocessing_stage
from tensorflow.python.keras.layers.preprocessing import preprocessing_test_utils
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.platform import test
class PL(base_preprocessing_layer.PreprocessingLayer):
def __init__(self, **kwargs):
self.adapt_time = None
self.adapt_count = 0
super(PL, self).__init__(**kwargs)
def adapt(self, data, reset_state=True):
self.adapt_time = time.time()
self.adapt_count += 1
def call(self, inputs):
return inputs + 1
class PLMerge(PL):
def call(self, inputs):
return inputs[0] + inputs[1]
class PLSplit(PL):
def call(self, inputs):
return inputs + 1, inputs - 1
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
class PreprocessingStageTest(keras_parameterized.TestCase,
preprocessing_test_utils.PreprocessingLayerTest):
def test_adapt_preprocessing_stage_with_single_input_output(self):
x = Input(shape=(3,))
l0 = PL()
y = l0(x)
l1 = PL()
z = l1(y)
stage = preprocessing_stage.FunctionalPreprocessingStage(x, z)
stage.compile()
# Test with NumPy array
one_array = np.ones((4, 3), dtype='float32')
stage.adapt(one_array)
self.assertEqual(l0.adapt_count, 1)
self.assertEqual(l1.adapt_count, 1)
self.assertLessEqual(l0.adapt_time, l1.adapt_time)
# Check call
z = stage(array_ops.ones((4, 3), dtype='float32'))
self.assertAllClose(z, np.ones((4, 3), dtype='float32') + 2.)
# Test with dataset
adapt_data = dataset_ops.Dataset.from_tensor_slices(one_array)
adapt_data = adapt_data.batch(2) # 5 batches of 2 samples
stage.adapt(adapt_data)
self.assertEqual(l0.adapt_count, 2)
self.assertEqual(l1.adapt_count, 2)
self.assertLessEqual(l0.adapt_time, l1.adapt_time)
# Test error with bad data
with self.assertRaisesRegex(ValueError, 'requires a '):
stage.adapt(None)
# Disallow calling fit
with self.assertRaisesRegex(ValueError, 'Preprocessing stage'):
stage.fit(None)
def test_adapt_preprocessing_stage_with_list_input(self):
x0 = Input(shape=(3,))
x1 = Input(shape=(3,))
x2 = Input(shape=(3,))
l0 = PLMerge()
y = l0([x0, x1])
l1 = PLMerge()
y = l1([y, x2])
l2 = PLSplit()
z, y = l2(y)
stage = preprocessing_stage.FunctionalPreprocessingStage([x0, x1, x2],
[y, z])
stage.compile()
# Test with NumPy array
one_array = np.ones((4, 3), dtype='float32')
stage.adapt([one_array, one_array, one_array])
self.assertEqual(l0.adapt_count, 1)
self.assertEqual(l1.adapt_count, 1)
self.assertEqual(l2.adapt_count, 1)
self.assertLessEqual(l0.adapt_time, l1.adapt_time)
self.assertLessEqual(l1.adapt_time, l2.adapt_time)
# Check call
y, z = stage([
array_ops.ones((4, 3), dtype='float32'),
array_ops.ones((4, 3), dtype='float32'),
array_ops.ones((4, 3), dtype='float32')
])
self.assertAllClose(y, np.ones((4, 3), dtype='float32') + 1.)
self.assertAllClose(z, np.ones((4, 3), dtype='float32') + 3.)
# Test with dataset
adapt_data = dataset_ops.Dataset.from_tensor_slices(
(one_array, one_array, one_array))
adapt_data = adapt_data.batch(2) # 5 batches of 2 samples
stage.adapt(adapt_data)
self.assertEqual(l0.adapt_count, 2)
self.assertEqual(l1.adapt_count, 2)
self.assertEqual(l2.adapt_count, 2)
self.assertLessEqual(l0.adapt_time, l1.adapt_time)
self.assertLessEqual(l1.adapt_time, l2.adapt_time)
# Test error with bad data
with self.assertRaisesRegex(ValueError, 'requires a '):
stage.adapt(None)
def test_adapt_preprocessing_stage_with_dict_input(self):
x0 = Input(shape=(3,), name='x0')
x1 = Input(shape=(4,), name='x1')
x2 = Input(shape=(3, 5), name='x2')
# dimension will mismatch if x1 incorrectly placed.
x1_sum = core.Lambda(
lambda x: math_ops.reduce_sum(x, axis=-1, keepdims=True))(
x1)
x2_sum = core.Lambda(lambda x: math_ops.reduce_sum(x, axis=-1))(x2)
l0 = PLMerge()
y = l0([x0, x1_sum])
l1 = PLMerge()
y = l1([y, x2_sum])
l2 = PLSplit()
z, y = l2(y)
stage = preprocessing_stage.FunctionalPreprocessingStage(
{
'x2': x2,
'x0': x0,
'x1': x1
}, [y, z])
stage.compile()
# Test with dict of NumPy array
one_array0 = np.ones((4, 3), dtype='float32')
one_array1 = np.ones((4, 4), dtype='float32')
one_array2 = np.ones((4, 3, 5), dtype='float32')
adapt_data = {'x1': one_array1, 'x0': one_array0, 'x2': one_array2}
stage.adapt(adapt_data)
self.assertEqual(l0.adapt_count, 1)
self.assertEqual(l1.adapt_count, 1)
self.assertEqual(l2.adapt_count, 1)
self.assertLessEqual(l0.adapt_time, l1.adapt_time)
self.assertLessEqual(l1.adapt_time, l2.adapt_time)
# Check call
y, z = stage({
'x1': array_ops.constant(one_array1),
'x2': array_ops.constant(one_array2),
'x0': array_ops.constant(one_array0)
})
self.assertAllClose(y, np.zeros((4, 3), dtype='float32') + 9.)
self.assertAllClose(z, np.zeros((4, 3), dtype='float32') + 11.)
# Test with list of NumPy array
adapt_data = [one_array0, one_array1, one_array2]
stage.adapt(adapt_data)
self.assertEqual(l0.adapt_count, 2)
self.assertEqual(l1.adapt_count, 2)
self.assertEqual(l2.adapt_count, 2)
self.assertLessEqual(l0.adapt_time, l1.adapt_time)
self.assertLessEqual(l1.adapt_time, l2.adapt_time)
# Test with flattened dataset
adapt_data = dataset_ops.Dataset.from_tensor_slices(
(one_array0, one_array1, one_array2))
adapt_data = adapt_data.batch(2) # 5 batches of 2 samples
stage.adapt(adapt_data)
self.assertEqual(l0.adapt_count, 3)
self.assertEqual(l1.adapt_count, 3)
self.assertEqual(l2.adapt_count, 3)
self.assertLessEqual(l0.adapt_time, l1.adapt_time)
self.assertLessEqual(l1.adapt_time, l2.adapt_time)
# Test with dataset in dict shape
adapt_data = dataset_ops.Dataset.from_tensor_slices({
'x0': one_array0,
'x2': one_array2,
'x1': one_array1
})
adapt_data = adapt_data.batch(2) # 5 batches of 2 samples
stage.adapt(adapt_data)
self.assertEqual(l0.adapt_count, 4)
self.assertEqual(l1.adapt_count, 4)
self.assertEqual(l2.adapt_count, 4)
self.assertLessEqual(l0.adapt_time, l1.adapt_time)
self.assertLessEqual(l1.adapt_time, l2.adapt_time)
# Test error with bad data
with self.assertRaisesRegex(ValueError, 'requires a '):
stage.adapt(None)
def test_adapt_preprocessing_stage_with_dict_output(self):
x = Input(shape=(3,), name='x')
l0 = PLSplit()
y0, y1 = l0(x)
l1 = PLSplit()
z0, z1 = l1(y0)
stage = preprocessing_stage.FunctionalPreprocessingStage({'x': x}, {
'y1': y1,
'z1': z1,
'y0': y0,
'z0': z0
})
stage.compile()
# Test with NumPy array
one_array = np.ones((4, 3), dtype='float32')
adapt_data = {'x': one_array}
stage.adapt(adapt_data)
self.assertEqual(l0.adapt_count, 1)
self.assertEqual(l1.adapt_count, 1)
self.assertLessEqual(l0.adapt_time, l1.adapt_time)
# Check call
outputs = stage({'x': array_ops.constant(one_array)})
self.assertEqual(set(outputs.keys()), {'y0', 'y1', 'z0', 'z1'})
self.assertAllClose(outputs['y0'], np.ones((4, 3), dtype='float32') + 1.)
self.assertAllClose(outputs['y1'], np.ones((4, 3), dtype='float32') - 1.)
self.assertAllClose(outputs['z0'], np.ones((4, 3), dtype='float32') + 2.)
self.assertAllClose(outputs['z1'], np.ones((4, 3), dtype='float32'))
def test_preprocessing_stage_with_nested_input(self):
# Test with NumPy array
x0 = Input(shape=(3,))
x1 = Input(shape=(3,))
x2 = Input(shape=(3,))
l0 = PLMerge()
y = l0([x0, x1])
l1 = PLMerge()
y = l1([y, x2])
l2 = PLSplit()
z, y = l2(y)
stage = preprocessing_stage.FunctionalPreprocessingStage([x0, [x1, x2]],
[y, z])
stage.compile()
one_array = np.ones((4, 3), dtype='float32')
stage.adapt([one_array, [one_array, one_array]])
self.assertEqual(l0.adapt_count, 1)
self.assertEqual(l1.adapt_count, 1)
self.assertEqual(l2.adapt_count, 1)
self.assertLessEqual(l0.adapt_time, l1.adapt_time)
self.assertLessEqual(l1.adapt_time, l2.adapt_time)
# Check call
y, z = stage([
array_ops.ones((4, 3), dtype='float32'),
[
array_ops.ones((4, 3), dtype='float32'),
array_ops.ones((4, 3), dtype='float32')
]
])
self.assertAllClose(y, np.ones((4, 3), dtype='float32') + 1.)
self.assertAllClose(z, np.ones((4, 3), dtype='float32') + 3.)
# Test with dataset
adapt_data = dataset_ops.Dataset.from_tensor_slices(
(one_array, (one_array, one_array)))
adapt_data = adapt_data.batch(2) # 5 batches of 2 samples
stage.adapt(adapt_data)
self.assertEqual(l0.adapt_count, 2)
self.assertEqual(l1.adapt_count, 2)
self.assertEqual(l2.adapt_count, 2)
self.assertLessEqual(l0.adapt_time, l1.adapt_time)
self.assertLessEqual(l1.adapt_time, l2.adapt_time)
# Test error with bad data
with self.assertRaisesRegex(ValueError, 'requires a '):
stage.adapt(None)
def test_include_layers_with_dict_input(self):
class PLMergeDict(PLMerge):
def call(self, inputs):
return inputs['a'] + inputs['b']
x0 = Input(shape=(3,))
x1 = Input(shape=(3,))
l0 = PLMergeDict()
y = l0({'a': x0, 'b': x1})
l1 = PLSplit()
z, y = l1(y)
stage = preprocessing_stage.FunctionalPreprocessingStage([x0, x1], [y, z])
stage.compile()
one_array = np.ones((4, 3), dtype='float32')
adapt_data = dataset_ops.Dataset.from_tensor_slices((one_array, one_array))
stage.adapt(adapt_data)
self.assertEqual(l0.adapt_count, 1)
self.assertEqual(l1.adapt_count, 1)
self.assertLessEqual(l0.adapt_time, l1.adapt_time)
# Check call
y, z = stage([
array_ops.ones((4, 3), dtype='float32'),
array_ops.ones((4, 3), dtype='float32')
])
self.assertAllClose(y, np.ones((4, 3), dtype='float32'))
self.assertAllClose(z, np.ones((4, 3), dtype='float32') + 2.)
def test_include_layers_with_nested_input(self):
class PLMergeNest(PLMerge):
def call(self, inputs):
a = inputs[0]
b = inputs[1][0]
c = inputs[1][1]
return a + b + c
x0 = Input(shape=(3,))
x1 = Input(shape=(3,))
x2 = Input(shape=(3,))
l0 = PLMergeNest()
y = l0([x0, [x1, x2]])
stage = preprocessing_stage.FunctionalPreprocessingStage([x0, x1, x2], y)
stage.compile()
one_array = np.ones((4, 3), dtype='float32')
adapt_data = dataset_ops.Dataset.from_tensor_slices((one_array,) * 3)
stage.adapt(adapt_data)
self.assertEqual(l0.adapt_count, 1)
# Check call
y = stage([
array_ops.ones((4, 3), dtype='float32'),
array_ops.ones((4, 3), dtype='float32'),
array_ops.ones((4, 3), dtype='float32')
])
self.assertAllClose(y, np.ones((4, 3), dtype='float32') + 2.)
def test_mixing_preprocessing_and_regular_layers(self):
x0 = Input(shape=(10, 10, 3))
x1 = Input(shape=(10, 10, 3))
x2 = Input(shape=(10, 10, 3))
y0 = merge.Add()([x0, x1])
y1 = image_preprocessing.CenterCrop(8, 8)(x2)
y1 = convolutional.ZeroPadding2D(padding=1)(y1)
z = merge.Add()([y0, y1])
z = normalization.Normalization()(z)
z = convolutional.Conv2D(4, 3)(z)
stage = preprocessing_stage.FunctionalPreprocessingStage([x0, x1, x2], z)
data = [
np.ones((12, 10, 10, 3), dtype='float32'),
np.ones((12, 10, 10, 3), dtype='float32'),
np.ones((12, 10, 10, 3), dtype='float32')
]
stage.adapt(data)
_ = stage(data)
stage.compile('rmsprop', 'mse')
with self.assertRaisesRegex(ValueError, 'Preprocessing stage'):
stage.fit(data, np.ones((12, 8, 8, 4)))
ds_x0 = dataset_ops.Dataset.from_tensor_slices(np.ones((12, 10, 10, 3)))
ds_x1 = dataset_ops.Dataset.from_tensor_slices(np.ones((12, 10, 10, 3)))
ds_x2 = dataset_ops.Dataset.from_tensor_slices(np.ones((12, 10, 10, 3)))
ds_x = dataset_ops.Dataset.zip((ds_x0, ds_x1, ds_x2))
ds_y = dataset_ops.Dataset.from_tensor_slices(np.ones((12, 8, 8, 4)))
dataset = dataset_ops.Dataset.zip((ds_x, ds_y)).batch(4)
with self.assertRaisesRegex(ValueError, 'Preprocessing stage'):
stage.fit(dataset)
_ = stage.evaluate(data, np.ones((12, 8, 8, 4)))
_ = stage.predict(data)
if __name__ == '__main__':
test.main()