blob: 9cffd39c842c0c60a2c047be7023392d5b20c8a6 [file] [log] [blame]
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for the MapAndBatchDataset serialization."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
from tensorflow.python.data.experimental.kernel_tests.serialization import dataset_serialization_test_base
from tensorflow.python.data.experimental.ops import batching
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.framework import sparse_tensor
from tensorflow.python.ops import math_ops
from tensorflow.python.platform import test
class MapAndBatchDatasetSerializationTest(
dataset_serialization_test_base.DatasetSerializationTestBase):
def testNumParallelBatches(self):
range_size = 11
num_repeats = 2
batch_size = 5
total_outputs = range_size * num_repeats
num_outputs_drop_remainder = total_outputs // batch_size
num_outputs_keep_remainder = int(math.ceil(total_outputs / batch_size))
num_parallel_batches = 2
def build_ds(range_start, drop_remainder=False):
def _map_fn(x):
return math_ops.square(x)
return dataset_ops.Dataset.range(
range_start, range_start + range_size).repeat(num_repeats).apply(
batching.map_and_batch(
map_func=_map_fn,
batch_size=batch_size,
num_parallel_batches=num_parallel_batches,
drop_remainder=drop_remainder))
self.run_core_tests(lambda: build_ds(10), num_outputs_keep_remainder)
self.run_core_tests(lambda: build_ds(10, True), num_outputs_drop_remainder)
def testNumParallelCalls(self):
range_size = 11
num_repeats = 2
batch_size = 5
total_outputs = range_size * num_repeats
num_outputs_drop_remainder = total_outputs // batch_size
num_outputs_keep_remainder = int(math.ceil(total_outputs / batch_size))
num_parallel_calls = 7
def build_ds(range_start, drop_remainder=False):
def _map_fn(x):
return math_ops.square(x)
return dataset_ops.Dataset.range(
range_start, range_start + range_size).repeat(num_repeats).apply(
batching.map_and_batch(
map_func=_map_fn,
batch_size=batch_size,
num_parallel_calls=num_parallel_calls,
drop_remainder=drop_remainder))
self.run_core_tests(lambda: build_ds(10), num_outputs_keep_remainder)
self.run_core_tests(lambda: build_ds(10, True), num_outputs_drop_remainder)
def testSparse(self):
def build_dataset():
def map_fn(i):
return sparse_tensor.SparseTensorValue(
indices=[[0]], values=(i * [1]), dense_shape=[1])
return dataset_ops.Dataset.range(10).apply(
batching.map_and_batch(map_fn, 5))
self.run_core_tests(build_dataset, 2)
if __name__ == "__main__":
test.main()