blob: b8f38e8a28f93c30f05526c1eb5af04f1627c14a [file] [log] [blame]
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for the ParallelInterleaveDataset serialization."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
from tensorflow.python.data.experimental.kernel_tests.serialization import dataset_serialization_test_base
from tensorflow.python.data.experimental.ops import interleave_ops
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.framework import sparse_tensor
from tensorflow.python.ops import sparse_ops
from tensorflow.python.platform import test
class ParallelInterleaveDatasetSerializationTest(
dataset_serialization_test_base.DatasetSerializationTestBase):
def setUp(self):
self.input_values = np.array([4, 5, 6], dtype=np.int64)
self.num_repeats = 2
self.num_outputs = np.sum(self.input_values) * 2
def _build_ds(self, cycle_length, block_length, sloppy=False):
return (dataset_ops.Dataset.from_tensor_slices(
self.input_values).repeat(self.num_repeats).apply(
interleave_ops.parallel_interleave(
lambda x: dataset_ops.Dataset.range(10 * x, 11 * x),
cycle_length, block_length, sloppy)))
def testSerializationCore(self):
# cycle_length > 1, block_length > 1
cycle_length = 2
block_length = 3
self.run_core_tests(
lambda: self._build_ds(cycle_length, block_length),
lambda: self._build_ds(cycle_length * 2, block_length * 1),
self.num_outputs)
# cycle_length = 1
cycle_length = 1
block_length = 3
self.run_core_tests(lambda: self._build_ds(cycle_length, block_length),
None, self.num_outputs)
# block_length = 1
cycle_length = 2
block_length = 1
self.run_core_tests(lambda: self._build_ds(cycle_length, block_length),
None, self.num_outputs)
def testSerializationWithSloppy(self):
break_points = self.gen_break_points(self.num_outputs, 10)
expected_outputs = np.repeat(
np.concatenate([np.arange(10 * x, 11 * x) for x in self.input_values]),
self.num_repeats).tolist()
def run_test(cycle_length, block_length):
actual = self.gen_outputs(
lambda: self._build_ds(cycle_length, block_length, True),
break_points, self.num_outputs)
self.assertSequenceEqual(sorted(actual), expected_outputs)
# cycle_length > 1, block_length > 1
run_test(2, 3)
# cycle_length = 1
run_test(1, 3)
# block_length = 1
run_test(2, 1)
def testSparseCore(self):
def _map_fn(i):
return sparse_tensor.SparseTensorValue(
indices=[[0, 0], [1, 1]], values=(i * [1, -1]), dense_shape=[2, 2])
def _interleave_fn(x):
return dataset_ops.Dataset.from_tensor_slices(
sparse_ops.sparse_to_dense(x.indices, x.dense_shape, x.values))
def _build_dataset():
return dataset_ops.Dataset.range(10).map(_map_fn).apply(
interleave_ops.parallel_interleave(_interleave_fn, 1))
self.run_core_tests(_build_dataset, None, 20)
if __name__ == '__main__':
test.main()