blob: 9680dfb429c4fee1957e0ab36d05ebe517796a63 [file] [log] [blame]
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for Keras discretization preprocessing layer."""
import os
from absl.testing import parameterized
import numpy as np
from tensorflow.python import keras
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import sparse_tensor
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import testing_utils
from tensorflow.python.keras.layers.preprocessing import discretization
from tensorflow.python.keras.layers.preprocessing import preprocessing_test_utils
from tensorflow.python.ops.ragged import ragged_factory_ops
from tensorflow.python.platform import test
from tensorflow.python.saved_model import load
from tensorflow.python.saved_model import save
@keras_parameterized.run_all_keras_modes
class DiscretizationTest(keras_parameterized.TestCase,
preprocessing_test_utils.PreprocessingLayerTest):
def test_bucketize_with_explicit_buckets_integer(self):
input_array = np.array([[-1.5, 1.0, 3.4, .5], [0.0, 3.0, 1.3, 0.0]])
expected_output = [[0, 2, 3, 1], [1, 3, 2, 1]]
expected_output_shape = [None, 4]
input_data = keras.Input(shape=(4,))
layer = discretization.Discretization(bin_boundaries=[0., 1., 2.])
bucket_data = layer(input_data)
self.assertAllEqual(expected_output_shape, bucket_data.shape.as_list())
model = keras.Model(inputs=input_data, outputs=bucket_data)
output_dataset = model.predict(input_array)
self.assertAllEqual(expected_output, output_dataset)
def test_bucketize_with_explicit_buckets_int_input(self):
input_array = np.array([[-1, 1, 3, 0], [0, 3, 1, 0]], dtype=np.int64)
expected_output = [[0, 2, 3, 1], [1, 3, 2, 1]]
expected_output_shape = [None, 4]
input_data = keras.Input(shape=(4,), dtype=dtypes.int64)
layer = discretization.Discretization(bin_boundaries=[-.5, 0.5, 1.5])
bucket_data = layer(input_data)
self.assertAllEqual(expected_output_shape, bucket_data.shape.as_list())
model = keras.Model(inputs=input_data, outputs=bucket_data)
output_dataset = model.predict(input_array)
self.assertAllEqual(expected_output, output_dataset)
def test_bucketize_with_explicit_buckets_sparse_float_input(self):
indices = [[0, 1], [0, 2], [1, 1]]
input_array = sparse_tensor.SparseTensor(
indices=indices, values=[-1.5, 1.0, 3.4], dense_shape=[2, 3])
expected_output = [0, 2, 3]
input_data = keras.Input(shape=(3,), dtype=dtypes.float32, sparse=True)
layer = discretization.Discretization(bin_boundaries=[-.5, 0.5, 1.5])
bucket_data = layer(input_data)
model = keras.Model(inputs=input_data, outputs=bucket_data)
output_dataset = model.predict(input_array, steps=1)
self.assertAllEqual(indices, output_dataset.indices)
self.assertAllEqual(expected_output, output_dataset.values)
def test_bucketize_with_explicit_buckets_ragged_float_input(self):
input_array = ragged_factory_ops.constant([[-1.5, 1.0, 3.4, .5],
[0.0, 3.0, 1.3]])
expected_output = [[0, 2, 3, 1], [1, 3, 2]]
expected_output_shape = [None, None]
input_data = keras.Input(shape=(None,), ragged=True)
layer = discretization.Discretization(bin_boundaries=[0., 1., 2.])
bucket_data = layer(input_data)
self.assertAllEqual(expected_output_shape, bucket_data.shape.as_list())
model = keras.Model(inputs=input_data, outputs=bucket_data)
output_dataset = model.predict(input_array)
self.assertAllEqual(expected_output, output_dataset)
def test_bucketize_with_explicit_buckets_ragged_int_input(self):
input_array = ragged_factory_ops.constant([[-1, 1, 3, 0], [0, 3, 1]],
dtype=dtypes.int64)
expected_output = [[0, 2, 3, 1], [1, 3, 2]]
expected_output_shape = [None, None]
input_data = keras.Input(shape=(None,), ragged=True, dtype=dtypes.int64)
layer = discretization.Discretization(bin_boundaries=[-.5, 0.5, 1.5])
bucket_data = layer(input_data)
self.assertAllEqual(expected_output_shape, bucket_data.shape.as_list())
model = keras.Model(inputs=input_data, outputs=bucket_data)
output_dataset = model.predict(input_array)
self.assertAllEqual(expected_output, output_dataset)
def test_bucketize_with_explicit_buckets_sparse_int_input(self):
indices = [[0, 1], [0, 2], [1, 1]]
input_array = sparse_tensor.SparseTensor(
indices=indices, values=[-1, 1, 3], dense_shape=[2, 3])
expected_output = [0, 2, 3]
input_data = keras.Input(shape=(3,), dtype=dtypes.int32, sparse=True)
layer = discretization.Discretization(bin_boundaries=[-.5, 0.5, 1.5])
bucket_data = layer(input_data)
model = keras.Model(inputs=input_data, outputs=bucket_data)
output_dataset = model.predict(input_array, steps=1)
self.assertAllEqual(indices, output_dataset.indices)
self.assertAllEqual(expected_output, output_dataset.values)
def test_output_shape(self):
input_data = keras.Input(batch_size=16, shape=(4,), dtype=dtypes.int64)
layer = discretization.Discretization(bin_boundaries=[-.5, 0.5, 1.5])
output = layer(input_data)
self.assertAllEqual(output.shape.as_list(), [16, 4])
def test_num_bins_negative_fails(self):
with self.assertRaisesRegex(ValueError, "`num_bins` must be.*num_bins=-7"):
_ = discretization.Discretization(num_bins=-7)
def test_num_bins_and_bins_set_fails(self):
with self.assertRaisesRegex(
ValueError,
r"`num_bins` and `bin_boundaries` should not be set.*5.*\[1, 2\]"):
_ = discretization.Discretization(num_bins=5, bins=[1, 2])
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
class DiscretizationAdaptTest(keras_parameterized.TestCase,
preprocessing_test_utils.PreprocessingLayerTest):
@parameterized.named_parameters([
{
"testcase_name": "2d_single_element",
"adapt_data": np.array([[1.], [2.], [3.], [4.], [5.]]),
"test_data": np.array([[1.], [2.], [3.]]),
"use_dataset": True,
"expected": np.array([[1], [2], [3]]),
"num_bins": 5,
"epsilon": 0.01
}, {
"testcase_name": "2d_multi_element",
"adapt_data": np.array([[1., 6.], [2., 7.], [3., 8.], [4., 9.],
[5., 10.]]),
"test_data": np.array([[1., 10.], [2., 6.], [3., 8.]]),
"use_dataset": True,
"expected": np.array([[0, 4], [1, 3], [1, 4]]),
"num_bins": 5,
"epsilon": 0.01
}, {
"testcase_name": "1d_single_element",
"adapt_data": np.array([3., 2., 1., 5., 4.]),
"test_data": np.array([1., 2., 3.]),
"use_dataset": True,
"expected": np.array([1, 2, 3]),
"num_bins": 5,
"epsilon": 0.01
}, {
"testcase_name": "300_batch_1d_single_element_1",
"adapt_data": np.arange(300),
"test_data": np.arange(300),
"use_dataset": True,
"expected":
np.concatenate([np.zeros(101), np.ones(99), 2 * np.ones(100)]),
"num_bins": 3,
"epsilon": 0.01
}, {
"testcase_name": "300_batch_1d_single_element_2",
"adapt_data": np.arange(300) ** 2,
"test_data": np.arange(300) ** 2,
"use_dataset": True,
"expected":
np.concatenate([np.zeros(101), np.ones(99), 2 * np.ones(100)]),
"num_bins": 3,
"epsilon": 0.01
}, {
"testcase_name": "300_batch_1d_single_element_large_epsilon",
"adapt_data": np.arange(300),
"test_data": np.arange(300),
"use_dataset": True,
"expected": np.concatenate([np.zeros(136), np.ones(164)]),
"num_bins": 2,
"epsilon": 0.1
}])
def test_layer_computation(self, adapt_data, test_data, use_dataset,
expected, num_bins=5, epsilon=0.01):
input_shape = tuple(list(test_data.shape)[1:])
np.random.shuffle(adapt_data)
if use_dataset:
# Keras APIs expect batched datasets
adapt_data = dataset_ops.Dataset.from_tensor_slices(adapt_data).batch(
test_data.shape[0] // 2)
test_data = dataset_ops.Dataset.from_tensor_slices(test_data).batch(
test_data.shape[0] // 2)
layer = discretization.Discretization(epsilon=epsilon, num_bins=num_bins)
layer.adapt(adapt_data)
input_data = keras.Input(shape=input_shape)
output = layer(input_data)
model = keras.Model(input_data, output)
model._run_eagerly = testing_utils.should_run_eagerly()
output_data = model.predict(test_data)
self.assertAllClose(expected, output_data)
def test_merge_state(self):
data = np.arange(300)
partial_ds_1 = dataset_ops.Dataset.from_tensor_slices(data[:100])
partial_ds_2 = dataset_ops.Dataset.from_tensor_slices(data[100:200])
partial_ds_3 = dataset_ops.Dataset.from_tensor_slices(data[200:])
full_ds = partial_ds_1.concatenate(partial_ds_2).concatenate(partial_ds_3)
# Use a higher epsilon to avoid any discrepencies from the quantile
# approximation.
full_layer = discretization.Discretization(num_bins=3, epsilon=0.001)
full_layer.adapt(full_ds.batch(2))
partial_layer_1 = discretization.Discretization(num_bins=3, epsilon=0.001)
partial_layer_1.adapt(partial_ds_1.batch(2))
partial_layer_2 = discretization.Discretization(num_bins=3, epsilon=0.001)
partial_layer_2.adapt(partial_ds_2.batch(2))
partial_layer_3 = discretization.Discretization(num_bins=3, epsilon=0.001)
partial_layer_3.adapt(partial_ds_3.batch(2))
partial_layer_1.merge_state([partial_layer_2, partial_layer_3])
merged_layer = partial_layer_1
data = np.arange(300)
self.assertAllClose(full_layer(data), merged_layer(data))
def test_merge_with_stateless_layers_fails(self):
layer1 = discretization.Discretization(num_bins=2, name="layer1")
layer1.adapt([1, 2, 3])
layer2 = discretization.Discretization(bin_boundaries=[0, 1], name="layer2")
with self.assertRaisesRegex(ValueError, "Cannot merge.*layer2"):
layer1.merge_state([layer2])
def test_merge_with_unadapted_layers_fails(self):
layer1 = discretization.Discretization(num_bins=2, name="layer1")
layer1.adapt([1, 2, 3])
layer2 = discretization.Discretization(num_bins=2, name="layer2")
with self.assertRaisesRegex(ValueError, "Cannot merge.*layer2"):
layer1.merge_state([layer2])
def test_multiple_adapts(self):
first_adapt = [[1], [2], [3]]
second_adapt = [[4], [5], [6]]
predict_input = [[2], [2]]
expected_first_output = [[2], [2]]
expected_second_output = [[0], [0]]
inputs = keras.Input(shape=(1,), dtype=dtypes.int32)
layer = discretization.Discretization(num_bins=3)
layer.adapt(first_adapt)
outputs = layer(inputs)
model = keras.Model(inputs=inputs, outputs=outputs)
actual_output = model.predict(predict_input)
self.assertAllClose(actual_output, expected_first_output)
# Re-adapt the layer on new inputs.
layer.adapt(second_adapt)
# Re-compile the model.
model.compile()
# `predict` should now use the new model state.
actual_output = model.predict(predict_input)
self.assertAllClose(actual_output, expected_second_output)
def test_saved_model_tf(self):
input_data = [[1], [2], [3]]
predict_data = [[0.5], [1.5], [2.5]]
expected_output = [[0], [1], [2]]
inputs = keras.Input(shape=(1,), dtype=dtypes.float32)
layer = discretization.Discretization(num_bins=3)
layer.adapt(input_data)
outputs = layer(inputs)
model = keras.Model(inputs=inputs, outputs=outputs)
output_data = model.predict(predict_data)
self.assertAllClose(output_data, expected_output)
# Save the model to disk.
output_path = os.path.join(self.get_temp_dir(), "tf_saved_model")
save.save(model, output_path)
loaded_model = load.load(output_path)
f = loaded_model.signatures["serving_default"]
# Ensure that the loaded model is unique (so that the save/load is real)
self.assertIsNot(model, loaded_model)
# Validate correctness of the new model.
new_output_data = f(constant_op.constant(predict_data))["discretization"]
self.assertAllClose(new_output_data, expected_output)
def test_saved_model_keras(self):
input_data = [[1], [2], [3]]
predict_data = [[0.5], [1.5], [2.5]]
expected_output = [[0], [1], [2]]
cls = discretization.Discretization
inputs = keras.Input(shape=(1,), dtype=dtypes.float32)
layer = cls(num_bins=3)
layer.adapt(input_data)
outputs = layer(inputs)
model = keras.Model(inputs=inputs, outputs=outputs)
output_data = model.predict(predict_data)
self.assertAllClose(output_data, expected_output)
# Save the model to disk.
output_path = os.path.join(self.get_temp_dir(), "tf_keras_saved_model")
model.save(output_path, save_format="tf")
loaded_model = keras.models.load_model(
output_path, custom_objects={"Discretization": cls})
# Ensure that the loaded model is unique (so that the save/load is real)
self.assertIsNot(model, loaded_model)
# Validate correctness of the new model.
new_output_data = loaded_model.predict(predict_data)
self.assertAllClose(new_output_data, expected_output)
def test_saved_weights_keras(self):
input_data = [[1], [2], [3]]
predict_data = [[0.5], [1.5], [2.5]]
expected_output = [[0], [1], [2]]
cls = discretization.Discretization
inputs = keras.Input(shape=(1,), dtype=dtypes.float32)
layer = cls(num_bins=3)
layer.adapt(input_data)
outputs = layer(inputs)
model = keras.Model(inputs=inputs, outputs=outputs)
output_data = model.predict(predict_data)
self.assertAllClose(output_data, expected_output)
# Save the model to disk.
output_path = os.path.join(self.get_temp_dir(), "tf_keras_saved_weights")
model.save_weights(output_path, save_format="tf")
new_model = keras.Model.from_config(
model.get_config(), custom_objects={"Discretization": cls})
new_model.load_weights(output_path)
# Validate correctness of the new model.
new_output_data = new_model.predict(predict_data)
self.assertAllClose(new_output_data, expected_output)
if __name__ == "__main__":
test.main()