blob: e65bccb591549ddc002d2cb2a341a37a2c98c2c2 [file] [log] [blame]
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for categorical preprocessing layers."""
import numpy as np
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import sparse_tensor
from tensorflow.python.framework import tensor_shape
from tensorflow.python.framework import tensor_spec
from tensorflow.python.keras import keras_parameterized
from tensorflow.python.keras import testing_utils
from tensorflow.python.keras.engine import input_layer
from tensorflow.python.keras.engine import training
from tensorflow.python.keras.layers.preprocessing import category_crossing
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import sparse_ops
from tensorflow.python.ops.ragged import ragged_factory_ops
from tensorflow.python.ops.ragged import ragged_tensor
from tensorflow.python.platform import test
@keras_parameterized.run_all_keras_modes(always_skip_v1=True)
class CategoryCrossingTest(keras_parameterized.TestCase):
def test_crossing_sparse_inputs(self):
layer = category_crossing.CategoryCrossing()
inputs_0 = sparse_tensor.SparseTensor(
indices=[[0, 0], [1, 0], [1, 1]],
values=['a', 'b', 'c'],
dense_shape=[2, 2])
inputs_1 = sparse_tensor.SparseTensor(
indices=[[0, 1], [1, 2]], values=['d', 'e'], dense_shape=[2, 3])
output = layer([inputs_0, inputs_1])
self.assertAllClose(np.asarray([[0, 0], [1, 0], [1, 1]]), output.indices)
self.assertAllEqual([b'a_X_d', b'b_X_e', b'c_X_e'], output.values)
def test_crossing_sparse_inputs_custom_sep(self):
layer = category_crossing.CategoryCrossing(separator='_Y_')
inputs_0 = sparse_tensor.SparseTensor(
indices=[[0, 0], [1, 0], [1, 1]],
values=['a', 'b', 'c'],
dense_shape=[2, 2])
inputs_1 = sparse_tensor.SparseTensor(
indices=[[0, 1], [1, 2]], values=['d', 'e'], dense_shape=[2, 3])
output = layer([inputs_0, inputs_1])
self.assertAllClose(np.asarray([[0, 0], [1, 0], [1, 1]]), output.indices)
self.assertAllEqual([b'a_Y_d', b'b_Y_e', b'c_Y_e'], output.values)
def test_crossing_sparse_inputs_empty_sep(self):
layer = category_crossing.CategoryCrossing(separator='')
inputs_0 = sparse_tensor.SparseTensor(
indices=[[0, 0], [1, 0], [1, 1]],
values=['a', 'b', 'c'],
dense_shape=[2, 2])
inputs_1 = sparse_tensor.SparseTensor(
indices=[[0, 1], [1, 2]], values=['d', 'e'], dense_shape=[2, 3])
output = layer([inputs_0, inputs_1])
self.assertAllClose(np.asarray([[0, 0], [1, 0], [1, 1]]), output.indices)
self.assertAllEqual([b'ad', b'be', b'ce'], output.values)
def test_crossing_sparse_inputs_depth_int(self):
layer = category_crossing.CategoryCrossing(depth=1)
inputs_0 = sparse_tensor.SparseTensor(
indices=[[0, 0], [1, 0], [2, 0]],
values=['a', 'b', 'c'],
dense_shape=[3, 1])
inputs_1 = sparse_tensor.SparseTensor(
indices=[[0, 0], [1, 0], [2, 0]],
values=['d', 'e', 'f'],
dense_shape=[3, 1])
output = layer([inputs_0, inputs_1])
self.assertIsInstance(output, sparse_tensor.SparseTensor)
output = sparse_ops.sparse_tensor_to_dense(output)
expected_out = [[b'a', b'd'], [b'b', b'e'], [b'c', b'f']]
self.assertAllEqual(expected_out, output)
def test_crossing_sparse_inputs_depth_tuple(self):
layer = category_crossing.CategoryCrossing(depth=(2, 3))
inputs_0 = sparse_tensor.SparseTensor(
indices=[[0, 0], [1, 0], [2, 0]],
values=['a', 'b', 'c'],
dense_shape=[3, 1])
inputs_1 = sparse_tensor.SparseTensor(
indices=[[0, 0], [1, 0], [2, 0]],
values=['d', 'e', 'f'],
dense_shape=[3, 1])
inputs_2 = sparse_tensor.SparseTensor(
indices=[[0, 0], [1, 0], [2, 0]],
values=['g', 'h', 'i'],
dense_shape=[3, 1])
inp_0_t = input_layer.Input(shape=(1,), sparse=True, dtype=dtypes.string)
inp_1_t = input_layer.Input(shape=(1,), sparse=True, dtype=dtypes.string)
inp_2_t = input_layer.Input(shape=(1,), sparse=True, dtype=dtypes.string)
out_t = layer([inp_0_t, inp_1_t, inp_2_t])
model = training.Model([inp_0_t, inp_1_t, inp_2_t], out_t)
output = model.predict([inputs_0, inputs_1, inputs_2])
self.assertIsInstance(output, sparse_tensor.SparseTensor)
output = sparse_ops.sparse_tensor_to_dense(output)
expected_outputs_0 = [[b'a_X_d', b'a_X_g', b'd_X_g', b'a_X_d_X_g']]
expected_outputs_1 = [[b'b_X_e', b'b_X_h', b'e_X_h', b'b_X_e_X_h']]
expected_outputs_2 = [[b'c_X_f', b'c_X_i', b'f_X_i', b'c_X_f_X_i']]
expected_out = array_ops.concat(
[expected_outputs_0, expected_outputs_1, expected_outputs_2], axis=0)
self.assertAllEqual(expected_out, output)
def test_crossing_ragged_inputs(self):
inputs_0 = ragged_factory_ops.constant(
[['omar', 'skywalker'], ['marlo']],
dtype=dtypes.string)
inputs_1 = ragged_factory_ops.constant(
[['a'], ['b']],
dtype=dtypes.string)
inp_0_t = input_layer.Input(shape=(None,), ragged=True, dtype=dtypes.string)
inp_1_t = input_layer.Input(shape=(None,), ragged=True, dtype=dtypes.string)
non_hashed_layer = category_crossing.CategoryCrossing()
out_t = non_hashed_layer([inp_0_t, inp_1_t])
model = training.Model(inputs=[inp_0_t, inp_1_t], outputs=out_t)
expected_output = [[b'omar_X_a', b'skywalker_X_a'], [b'marlo_X_b']]
self.assertAllEqual(expected_output, model.predict([inputs_0, inputs_1]))
def test_crossing_ragged_inputs_depth_int(self):
layer = category_crossing.CategoryCrossing(depth=1)
inputs_0 = ragged_factory_ops.constant([['a'], ['b'], ['c']])
inputs_1 = ragged_factory_ops.constant([['d'], ['e'], ['f']])
output = layer([inputs_0, inputs_1])
expected_output = [[b'a', b'd'], [b'b', b'e'], [b'c', b'f']]
self.assertIsInstance(output, ragged_tensor.RaggedTensor)
self.assertAllEqual(expected_output, output)
layer = category_crossing.CategoryCrossing(depth=2)
inp_0_t = input_layer.Input(shape=(None,), ragged=True, dtype=dtypes.string)
inp_1_t = input_layer.Input(shape=(None,), ragged=True, dtype=dtypes.string)
out_t = layer([inp_0_t, inp_1_t])
model = training.Model([inp_0_t, inp_1_t], out_t)
expected_output = [[b'a', b'd', b'a_X_d'], [b'b', b'e', b'b_X_e'],
[b'c', b'f', b'c_X_f']]
self.assertAllEqual(expected_output, model.predict([inputs_0, inputs_1]))
def test_crossing_ragged_inputs_depth_tuple(self):
layer = category_crossing.CategoryCrossing(depth=[2, 3])
inputs_0 = ragged_factory_ops.constant([['a'], ['b'], ['c']])
inputs_1 = ragged_factory_ops.constant([['d'], ['e'], ['f']])
inputs_2 = ragged_factory_ops.constant([['g'], ['h'], ['i']])
inp_0_t = input_layer.Input(shape=(None,), ragged=True, dtype=dtypes.string)
inp_1_t = input_layer.Input(shape=(None,), ragged=True, dtype=dtypes.string)
inp_2_t = input_layer.Input(shape=(None,), ragged=True, dtype=dtypes.string)
out_t = layer([inp_0_t, inp_1_t, inp_2_t])
model = training.Model([inp_0_t, inp_1_t, inp_2_t], out_t)
expected_output = [[b'a_X_d', b'a_X_g', b'd_X_g', b'a_X_d_X_g'],
[b'b_X_e', b'b_X_h', b'e_X_h', b'b_X_e_X_h'],
[b'c_X_f', b'c_X_i', b'f_X_i', b'c_X_f_X_i']]
output = model.predict([inputs_0, inputs_1, inputs_2])
self.assertIsInstance(output, ragged_tensor.RaggedTensor)
self.assertAllEqual(expected_output, output)
def test_crossing_with_dense_inputs(self):
layer = category_crossing.CategoryCrossing()
inputs_0 = np.asarray([[1, 2]])
inputs_1 = np.asarray([[1, 3]])
output = layer([inputs_0, inputs_1])
self.assertAllEqual([[b'1_X_1', b'1_X_3', b'2_X_1', b'2_X_3']], output)
def test_crossing_with_list_inputs(self):
layer = category_crossing.CategoryCrossing()
inputs_0 = [[1, 2]]
inputs_1 = [[1, 3]]
output = layer([inputs_0, inputs_1])
self.assertAllEqual([[b'1_X_1', b'1_X_3', b'2_X_1', b'2_X_3']], output)
inputs_0 = [1, 2]
inputs_1 = [1, 3]
output = layer([inputs_0, inputs_1])
self.assertAllEqual([[b'1_X_1'], [b'2_X_3']], output)
inputs_0 = np.asarray([1, 2])
inputs_1 = np.asarray([1, 3])
output = layer([inputs_0, inputs_1])
self.assertAllEqual([[b'1_X_1'], [b'2_X_3']], output)
def test_crossing_dense_inputs_depth_int(self):
layer = category_crossing.CategoryCrossing(depth=1)
inputs_0 = constant_op.constant([['a'], ['b'], ['c']])
inputs_1 = constant_op.constant([['d'], ['e'], ['f']])
output = layer([inputs_0, inputs_1])
expected_output = [[b'a', b'd'], [b'b', b'e'], [b'c', b'f']]
self.assertAllEqual(expected_output, output)
layer = category_crossing.CategoryCrossing(depth=2)
inp_0_t = input_layer.Input(shape=(1,), dtype=dtypes.string)
inp_1_t = input_layer.Input(shape=(1,), dtype=dtypes.string)
out_t = layer([inp_0_t, inp_1_t])
model = training.Model([inp_0_t, inp_1_t], out_t)
crossed_output = [[b'a_X_d'], [b'b_X_e'], [b'c_X_f']]
expected_output = array_ops.concat([expected_output, crossed_output],
axis=1)
self.assertAllEqual(expected_output, model.predict([inputs_0, inputs_1]))
def test_crossing_dense_inputs_depth_tuple(self):
layer = category_crossing.CategoryCrossing(depth=[2, 3])
inputs_0 = constant_op.constant([['a'], ['b'], ['c']])
inputs_1 = constant_op.constant([['d'], ['e'], ['f']])
inputs_2 = constant_op.constant([['g'], ['h'], ['i']])
inp_0_t = input_layer.Input(shape=(1,), dtype=dtypes.string)
inp_1_t = input_layer.Input(shape=(1,), dtype=dtypes.string)
inp_2_t = input_layer.Input(shape=(1,), dtype=dtypes.string)
out_t = layer([inp_0_t, inp_1_t, inp_2_t])
model = training.Model([inp_0_t, inp_1_t, inp_2_t], out_t)
expected_outputs_0 = [[b'a_X_d', b'a_X_g', b'd_X_g', b'a_X_d_X_g']]
expected_outputs_1 = [[b'b_X_e', b'b_X_h', b'e_X_h', b'b_X_e_X_h']]
expected_outputs_2 = [[b'c_X_f', b'c_X_i', b'f_X_i', b'c_X_f_X_i']]
expected_output = array_ops.concat(
[expected_outputs_0, expected_outputs_1, expected_outputs_2], axis=0)
self.assertAllEqual(expected_output,
model.predict([inputs_0, inputs_1, inputs_2]))
def test_crossing_compute_output_signature(self):
input_shapes = [
tensor_shape.TensorShape([2, 2]),
tensor_shape.TensorShape([2, 3])
]
input_specs = [
tensor_spec.TensorSpec(input_shape, dtypes.string)
for input_shape in input_shapes
]
layer = category_crossing.CategoryCrossing()
output_spec = layer.compute_output_signature(input_specs)
self.assertEqual(output_spec.shape.dims[0], input_shapes[0].dims[0])
self.assertEqual(output_spec.dtype, dtypes.string)
@testing_utils.run_v2_only
def test_config_with_custom_name(self):
layer = category_crossing.CategoryCrossing(depth=2, name='hashing')
config = layer.get_config()
layer_1 = category_crossing.CategoryCrossing.from_config(config)
self.assertEqual(layer_1.name, layer.name)
layer = category_crossing.CategoryCrossing(name='hashing')
config = layer.get_config()
layer_1 = category_crossing.CategoryCrossing.from_config(config)
self.assertEqual(layer_1.name, layer.name)
if __name__ == '__main__':
test.main()