blob: 6e276dee55dc477a75bb9ab3457e2f5c827c5cb1 [file] [log] [blame]
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for pfor and for_loop."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
from absl import flags
import numpy as np
from tensorflow.core.example import example_pb2
from tensorflow.core.example import feature_pb2
from tensorflow.python.client import session
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.framework import sparse_tensor
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import bitwise_ops
from tensorflow.python.ops import clip_ops
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import data_flow_ops
from tensorflow.python.ops import gradients as gradient_ops
from tensorflow.python.ops import logging_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import nn
from tensorflow.python.ops import parsing_ops
from tensorflow.python.ops import random_ops
from tensorflow.python.ops import rnn
from tensorflow.python.ops import rnn_cell
from tensorflow.python.ops import tensor_array_grad # pylint: disable=unused-import
from tensorflow.python.ops import tensor_array_ops
from tensorflow.python.ops import variables
from tensorflow.python.ops.parallel_for import control_flow_ops as pfor_control_flow_ops
from tensorflow.python.platform import test
from tensorflow.python.util import nest
class PForTest(test.TestCase):
def _run_targets(self, targets1, targets2=None, run_init=True):
targets1 = nest.flatten(targets1)
targets2 = ([] if targets2 is None else nest.flatten(targets2))
assert len(targets1) == len(targets2) or not targets2
if run_init:
init = variables.global_variables_initializer()
self.evaluate(init)
return self.evaluate(targets1 + targets2)
def run_and_assert_equal(self, targets1, targets2):
outputs = self._run_targets(targets1, targets2)
outputs = nest.flatten(outputs) # flatten SparseTensorValues
n = len(outputs) // 2
for i in range(n):
if outputs[i + n].dtype != np.object:
self.assertAllClose(outputs[i + n], outputs[i], rtol=1e-4, atol=1e-5)
else:
self.assertAllEqual(outputs[i + n], outputs[i])
def _test_loop_fn(self, loop_fn, iters, loop_fn_dtypes=dtypes.float32):
t1 = pfor_control_flow_ops.pfor(loop_fn, iters=iters)
t2 = pfor_control_flow_ops.for_loop(loop_fn, loop_fn_dtypes, iters=iters)
self.run_and_assert_equal(t1, t2)
def test_op_conversion_fallback_to_while_loop(self):
# Note that we used top_k op for this test. If a converter gets defined for
# it, we will need to find another op for which a converter has not been
# defined.
x = random_ops.random_uniform([3, 2, 4])
def loop_fn(i):
x_i = array_ops.gather(x, i)
return nn.top_k(x_i)
with self.assertRaisesRegexp(ValueError, "No converter defined"):
self._test_loop_fn(
loop_fn, 3, loop_fn_dtypes=[dtypes.float32, dtypes.int32])
flags.FLAGS.op_conversion_fallback_to_while_loop = True
self._test_loop_fn(
loop_fn, 3, loop_fn_dtypes=[dtypes.float32, dtypes.int32])
flags.FLAGS.op_conversion_fallback_to_while_loop = False
class ArrayTest(PForTest):
def test_gather(self):
x = random_ops.random_uniform([3, 3, 3])
def loop_fn(i):
outputs = []
x_i = array_ops.gather(x, i)
for y in [x, x_i]:
axes = [0, 2, -1] if y == x else [0]
for axis in axes:
outputs.append(array_ops.gather(y, 2, axis=axis))
outputs.append(array_ops.gather(y, i, axis=axis))
outputs.append(array_ops.gather(y, [i], axis=axis))
outputs.append(array_ops.gather(y, [i, 2], axis=axis))
outputs.append(array_ops.gather(y, [[2, i], [i, 1]], axis=axis))
return outputs
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 20)
def test_shape(self):
x = random_ops.random_uniform([3, 2, 3])
def loop_fn(i):
x_i = array_ops.gather(x, i)
return array_ops.shape(x_i), array_ops.shape(x_i, out_type=dtypes.int64)
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32, dtypes.int64])
def test_size(self):
x = random_ops.random_uniform([3, 2, 3])
def loop_fn(i):
x_i = array_ops.gather(x, i)
return array_ops.size(x_i), array_ops.size(x_i, out_type=dtypes.int64)
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32, dtypes.int64])
def test_rank(self):
x = random_ops.random_uniform([3, 2, 3])
def loop_fn(i):
x_i = array_ops.gather(x, i)
return array_ops.rank(x_i)
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32])
def test_shape_n(self):
x = random_ops.random_uniform([3, 2, 3])
y = random_ops.random_uniform([3])
def loop_fn(i):
x_i = array_ops.gather(x, i)
y_i = array_ops.gather(y, i)
return array_ops.shape_n([x_i, x, y, y_i]), array_ops.shape_n(
[x_i, x, y, y_i], out_type=dtypes.int64)
self._test_loop_fn(
loop_fn, 3, loop_fn_dtypes=[dtypes.int32] * 4 + [dtypes.int64] * 4)
def test_reshape(self):
x = random_ops.random_uniform([3, 2, 3])
def loop_fn(i):
x1 = array_ops.gather(x, i)
return array_ops.reshape(x1, [-1]), array_ops.reshape(x1, [1, 3, 1, -1])
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2)
def test_expand_dims(self):
x = random_ops.random_uniform([3, 2, 3])
def loop_fn(i):
x1 = array_ops.gather(x, i)
return array_ops.expand_dims(
x1, axis=-1), array_ops.expand_dims(
x1, axis=1)
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2)
def test_slice(self):
x = random_ops.random_uniform([3, 2, 3])
def loop_fn(i):
x1 = array_ops.gather(x, i)
return array_ops.slice(x1, begin=(0, 1), size=(2, 1))
self._test_loop_fn(loop_fn, 3)
def test_tile(self):
x = random_ops.random_uniform([3, 2, 3])
def loop_fn(i):
x1 = array_ops.gather(x, i)
return array_ops.tile(x1, [2, 1])
self._test_loop_fn(loop_fn, 3)
def test_tile_loop_dependent(self):
x = random_ops.random_uniform([3, 2, 3])
def loop_fn(i):
x1 = array_ops.gather(x, i)
return array_ops.tile(x1, [i, 1])
with self.assertRaisesRegexp(ValueError, "expected to be loop invariant"):
pfor_control_flow_ops.pfor(loop_fn, 2)
def test_pack(self):
x = random_ops.random_uniform([3, 2, 3])
y = random_ops.random_uniform([2, 3])
def loop_fn(i):
x1 = array_ops.gather(x, i)
return array_ops.stack([x1, y], axis=-1)
self._test_loop_fn(loop_fn, 1)
def test_unpack(self):
x = random_ops.random_uniform([3, 2, 3, 4])
def loop_fn(i):
x_i = array_ops.gather(x, i)
return array_ops.unstack(
x_i, 4, axis=-1), array_ops.unstack(
x_i, 3, axis=1)
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 7)
def test_pad(self):
x = random_ops.random_uniform([3, 2, 3])
padding = constant_op.constant([[1, 2], [3, 4]])
def loop_fn(i):
x1 = array_ops.gather(x, i)
return array_ops.pad(x1, padding, mode="CONSTANT")
self._test_loop_fn(loop_fn, 3)
def test_split(self):
x = random_ops.random_uniform([3, 2, 3])
def loop_fn(i):
x1 = array_ops.gather(x, i)
return array_ops.split(x1, 2, axis=0), array_ops.split(x1, 3, axis=-1)
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 5)
def test_transpose(self):
x = random_ops.random_uniform([3, 2, 3, 4])
def loop_fn(i):
x1 = array_ops.gather(x, i)
return array_ops.transpose(x1, [2, 1, 0])
self._test_loop_fn(loop_fn, 3)
def test_zeros_like(self):
x = random_ops.random_uniform([3, 2, 3])
def loop_fn(i):
x1 = array_ops.gather(x, i)
z = array_ops.zeros_like(x1),
return z, z + x1
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2)
def test_concat_v2(self):
x = random_ops.random_uniform([3, 2, 3])
y = random_ops.random_uniform([2, 3])
def loop_fn(i):
x1 = array_ops.gather(x, i)
return array_ops.concat(
[x1, x1, y], axis=0), array_ops.concat(
[x1, x1, y], axis=-1)
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2)
def test_unary_cwise_ops(self):
for op in [array_ops.identity, array_ops.stop_gradient]:
x = random_ops.random_uniform([3, 5])
# pylint: disable=cell-var-from-loop
def loop_fn(i):
x1 = array_ops.gather(x, i)
y = op(x1) + x1
loss = nn.l2_loss(y)
return op(x), y, gradient_ops.gradients(loss, x1)
# pylint: enable=cell-var-from-loop
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 3)
def test_strided_slice(self):
x = random_ops.random_uniform([3, 3, 4, 4, 2, 2, 2])
def loop_fn(i):
x_i = array_ops.gather(x, i)
y = x_i[:2, ::2, 1::3, ..., array_ops.newaxis, 1]
loss = nn.l2_loss(y)
return y, gradient_ops.gradients(loss, x_i)
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2)
class BitwiseTest(PForTest):
def test_unary_cwise(self):
for op in [bitwise_ops.invert]:
x = random_ops.random_uniform([7, 3, 5], maxval=10, dtype=dtypes.int32)
# pylint: disable=cell-var-from-loop
def loop_fn(i):
x1 = array_ops.gather(x, i)
return op(x1)
# pylint: enable=cell-var-from-loop
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32])
def test_binary_cwise(self):
binary_ops = [
bitwise_ops.bitwise_and,
bitwise_ops.bitwise_or,
bitwise_ops.bitwise_xor,
bitwise_ops.left_shift,
bitwise_ops.right_shift,
]
for op in binary_ops:
x = random_ops.random_uniform([7, 3, 5], maxval=10, dtype=dtypes.int32)
y = random_ops.random_uniform([3, 5], maxval=10, dtype=dtypes.int32)
output_dtypes = []
# pylint: disable=cell-var-from-loop
def loop_fn(i):
x1 = array_ops.gather(x, i)
y1 = array_ops.gather(y, i)
outputs = [op(x, y), op(x1, y), op(x, y1), op(x1, y1), op(x1, x1)]
del output_dtypes[:]
output_dtypes.extend([t.dtype for t in outputs])
return outputs
# pylint: enable=cell-var-from-loop
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=output_dtypes)
class MathTest(PForTest):
def test_unary_cwise_ops(self):
complex_ops = [
math_ops.angle,
math_ops.imag,
math_ops.complex_abs,
math_ops.real,
math_ops.conj,
]
real_ops = [
lambda x: math_ops.acosh(1 + math_ops.square(x)),
math_ops.abs,
math_ops.acos,
math_ops.asin,
math_ops.asinh,
math_ops.atan,
math_ops.atanh,
math_ops.bessel_i0e,
math_ops.bessel_i1e,
math_ops.cos,
math_ops.cosh,
math_ops.digamma,
math_ops.erf,
math_ops.erfc,
math_ops.exp,
math_ops.expm1,
math_ops.inv,
math_ops.is_finite,
math_ops.is_inf,
math_ops.lgamma,
math_ops.log,
math_ops.log1p,
math_ops.neg,
math_ops.negative,
math_ops.reciprocal,
math_ops.rint,
math_ops.round,
math_ops.rsqrt,
math_ops.sigmoid,
math_ops.sign,
math_ops.sin,
math_ops.sinh,
math_ops.sqrt,
math_ops.square,
math_ops.tan,
math_ops.tanh,
math_ops.tanh,
nn.elu,
nn.relu,
nn.relu6,
nn.selu,
nn.softplus,
nn.softsign,
]
for op in complex_ops + real_ops:
x = random_ops.random_uniform([3, 5])
if op in complex_ops:
y = random_ops.random_uniform([3, 5])
x = math_ops.complex(x, y)
# pylint: disable=cell-var-from-loop
output_dtypes = []
def loop_fn(i):
x1 = array_ops.gather(x, i)
y1 = op(x1)
outputs = [op(x), y1]
if y1.dtype == dtypes.float32:
loss = math_ops.reduce_sum(y1 * y1)
grad = gradient_ops.gradients(loss, x1)
if grad and grad[0] is not None:
outputs.extend(grad)
del output_dtypes[:]
output_dtypes.extend([t.dtype for t in outputs])
return outputs
# pylint: enable=cell-var-from-loop
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=output_dtypes)
def test_unary_cwise_no_grad(self):
for op in [math_ops.ceil,
math_ops.floor,
math_ops.logical_not]:
x = random_ops.random_uniform([3, 5])
if op == math_ops.logical_not:
x = x > 0
# pylint: disable=cell-var-from-loop
def loop_fn(i):
return op(array_ops.gather(x, i))
# pylint: enable=cell-var-from-loop
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=x.dtype)
def test_binary_cwise_ops(self):
logical_ops = [
math_ops.logical_and,
math_ops.logical_or,
math_ops.logical_xor
]
# Wrapper functions restricting the range of inputs of zeta and polygamma.
def safe_polygamma(x, y):
return math_ops.polygamma(
math_ops.round(clip_ops.clip_by_value(y, 1, 10)),
x * x + 1)
def safe_zeta(x, y):
return math_ops.zeta(x * x + 1, y * y)
float_ops = [
math_ops.add,
math_ops.add_v2,
math_ops.atan2,
math_ops.complex,
math_ops.div,
math_ops.divide,
math_ops.div_no_nan,
math_ops.equal,
math_ops.floor_div,
math_ops.floor_mod,
math_ops.greater,
math_ops.greater_equal,
math_ops.igamma,
math_ops.igammac,
math_ops.igamma_grad_a,
math_ops.less,
math_ops.less_equal,
math_ops.maximum,
math_ops.minimum,
math_ops.mod,
math_ops.multiply,
math_ops.not_equal,
math_ops.pow,
math_ops.squared_difference,
math_ops.subtract,
math_ops.truncate_mod,
safe_polygamma,
safe_zeta,
]
for op in logical_ops + float_ops:
x = random_ops.random_uniform([7, 3, 5])
y = random_ops.random_uniform([3, 5])
if op in logical_ops:
x = x > 0
y = y > 0
output_dtypes = []
# pylint: disable=cell-var-from-loop
def loop_fn(i):
x1 = array_ops.gather(x, i)
y1 = array_ops.gather(y, i)
outputs = [op(x, y), op(x1, y), op(x, y1), op(x1, y1), op(x1, x1)]
del output_dtypes[:]
output_dtypes.extend([t.dtype for t in outputs])
return outputs
# pylint: enable=cell-var-from-loop
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=output_dtypes)
def test_approximate_equal(self):
x = random_ops.random_uniform([3, 5])
y = random_ops.random_uniform([3, 5])
def loop_fn(i):
x1 = array_ops.gather(x, i)
y1 = array_ops.gather(y, i)
return math_ops.approximate_equal(x1, y1)
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.bool])
def test_addn(self):
x = random_ops.random_uniform([2, 3, 5])
y = random_ops.random_uniform([3, 5])
z = random_ops.random_uniform([3, 5])
def loop_fn(i):
x1 = array_ops.gather(x, i)
return math_ops.add_n([x1, y, z])
self._test_loop_fn(loop_fn, 2)
def test_matmul(self):
for tr_a in (True, False):
for tr_b in (True, False):
for stack_a in (True, False):
for stack_b in (True, False):
shape_a = (5, 3) if tr_a else (3, 5)
if stack_a:
shape_a = (2,) + shape_a
shape_b = (7, 5) if tr_b else (5, 7)
if stack_b:
shape_b = (2,) + shape_b
x = random_ops.random_uniform(shape_a)
y = random_ops.random_uniform(shape_b)
# pylint: disable=cell-var-from-loop
def loop_fn(i):
a = array_ops.gather(x, i) if stack_a else x
b = array_ops.gather(y, i) if stack_b else y
return math_ops.matmul(a, b, transpose_a=tr_a, transpose_b=tr_b)
# pylint: enable=cell-var-from-loop
self._test_loop_fn(loop_fn, 2)
def test_batch_matmul(self):
for tr_a in (True, False):
for tr_b in (True, False):
for stack_a in (True, False):
for stack_b in (True, False):
shape_a = (4, 5, 3) if tr_a else (4, 3, 5)
if stack_a:
shape_a = (2,) + shape_a
shape_b = (4, 7, 5) if tr_b else (4, 5, 7)
if stack_b:
shape_b = (2,) + shape_b
x = random_ops.random_uniform(shape_a)
y = random_ops.random_uniform(shape_b)
# pylint: disable=cell-var-from-loop
def loop_fn(i):
a = array_ops.gather(x, i) if stack_a else x
b = array_ops.gather(y, i) if stack_b else y
return math_ops.matmul(a, b, transpose_a=tr_a, transpose_b=tr_b)
# pylint: enable=cell-var-from-loop
self._test_loop_fn(loop_fn, 2)
def test_reduction(self):
x = random_ops.random_uniform([2, 3, 4, 5])
for op in [
math_ops.reduce_sum, math_ops.reduce_prod, math_ops.reduce_max,
math_ops.reduce_min
]:
for axis in ([1], None, [0, 2]):
for keepdims in (True, False):
# pylint: disable=cell-var-from-loop
def loop_fn(i):
a = array_ops.gather(x, i)
return op(a, axis=axis, keepdims=keepdims)
# pylint: enable=cell-var-from-loop
self._test_loop_fn(loop_fn, 2)
def test_cum_sum(self):
x = random_ops.random_uniform([2, 3, 4, 5])
for axis in (1, -2):
for exclusive in (True, False):
for reverse in (True, False):
# pylint: disable=cell-var-from-loop
def loop_fn(i):
a = array_ops.gather(x, i)
return math_ops.cumsum(
a, axis=axis, exclusive=exclusive, reverse=reverse)
# pylint: enable=cell-var-from-loop
self._test_loop_fn(loop_fn, 2)
def test_cum_prod(self):
x = random_ops.random_uniform([2, 3, 4, 5])
for axis in (1, -2):
for exclusive in (True, False):
for reverse in (True, False):
# pylint: disable=cell-var-from-loop
def loop_fn(i):
a = array_ops.gather(x, i)
return math_ops.cumprod(
a, axis=axis, exclusive=exclusive, reverse=reverse)
# pylint: enable=cell-var-from-loop
self._test_loop_fn(loop_fn, 2)
def test_bias_add(self):
x_shape = [2, 3, 4, 5, 6]
x = random_ops.random_uniform(x_shape)
for data_format in ("NCHW", "NHWC"):
bias_dim = 2 if data_format == "NCHW" else -1
bias_shape = x_shape[bias_dim]
bias = random_ops.random_uniform([bias_shape])
# pylint: disable=cell-var-from-loop
def loop_fn(i):
a = array_ops.gather(x, i)
y = nn.bias_add(a, bias, data_format=data_format)
loss = math_ops.reduce_sum(y * y)
return y, gradient_ops.gradients(loss, bias)
# pylint: enable=cell-var-from-loop
self._test_loop_fn(
loop_fn, 2, loop_fn_dtypes=[dtypes.float32, dtypes.float32])
def test_unsorted_segment_sum(self):
t = random_ops.random_uniform([3, 3, 2])
segment_ids = constant_op.constant([[0, 0, 2], [0, 1, 2], [2, 2, 2]])
num_segments = 3
def loop_fn(i):
data = array_ops.gather(t, i)
data_0 = array_ops.gather(t, 0)
seg_ids = array_ops.gather(segment_ids, i)
return (math_ops.unsorted_segment_sum(data, seg_ids, num_segments),
math_ops.unsorted_segment_sum(data_0, seg_ids, num_segments))
self._test_loop_fn(loop_fn, 3, [dtypes.float32] * 2)
def test_cast(self):
x = constant_op.constant([[1], [2]])
y = constant_op.constant([[1.0], [2.0]])
def loop_fn(i):
return (math_ops.cast(array_ops.gather(x, i), dtypes.float32),
math_ops.cast(array_ops.gather(y, i), dtypes.int32))
self._test_loop_fn(
loop_fn, 2, loop_fn_dtypes=[dtypes.float32, dtypes.int32])
def test_tanh_axpy(self):
a = constant_op.constant(3.)
x = random_ops.random_uniform([4, 5])
y = random_ops.random_uniform([6, 5])
n = x.shape[0]
def loop_fn(i):
return math_ops.tanh(a * array_ops.gather(x, i) + array_ops.gather(y, i))
self._test_loop_fn(loop_fn, n)
def test_select(self):
cond = constant_op.constant([True, False])
a = random_ops.random_uniform([2, 3, 5])
b = random_ops.random_uniform([2, 3, 5])
for cond_shape in [2], [2, 3], [2, 3, 5]:
cond = random_ops.random_uniform(cond_shape) > 0.5
# pylint: disable=cell-var-from-loop
def loop_fn(i):
a_i = array_ops.gather(a, i)
b_i = array_ops.gather(b, i)
cond_i = array_ops.gather(cond, i)
return array_ops.where(cond_i, a_i, b_i)
# pylint: enable=cell-var-from-loop
self._test_loop_fn(loop_fn, 2)
class NNTest(PForTest):
def test_conv2d(self):
x = random_ops.random_uniform([3, 2, 12, 12, 3])
filt = random_ops.random_uniform([3, 3, 3, 7])
def loop_fn(i):
x1 = array_ops.gather(x, i)
return nn.conv2d(
x1, filt, strides=[1, 2, 2, 1], padding="VALID", data_format="NHWC")
self._test_loop_fn(loop_fn, 3)
def test_conv2d_backprop_input(self):
x_shape = [2, 12, 12, 3]
filt = random_ops.random_uniform([3, 3, 3, 7])
grad = random_ops.random_uniform([3, 2, 5, 5, 7])
def loop_fn(i):
grad1 = array_ops.gather(grad, i)
return nn.conv2d_backprop_input(
x_shape,
filt,
grad1,
strides=[1, 2, 2, 1],
padding="VALID",
data_format="NHWC")
self._test_loop_fn(loop_fn, 3)
def test_conv2d_backprop_filter(self):
x = random_ops.random_uniform([3, 2, 12, 12, 3])
x_0 = array_ops.gather(x, 0)
filter_sizes = [3, 3, 3, 7]
grad = random_ops.random_uniform([3, 2, 5, 5, 7])
def loop_fn(i):
x_i = array_ops.gather(x, i)
grad_i = array_ops.gather(grad, i)
return [
nn.conv2d_backprop_filter(
inp,
filter_sizes,
grad_i,
strides=[1, 2, 2, 1],
padding="VALID",
data_format="NHWC") for inp in [x_i, x_0]
]
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2)
def test_avg_pool(self):
x = random_ops.random_uniform([3, 2, 12, 12, 3])
ksize = [1, 3, 3, 1]
def loop_fn(i):
x1 = array_ops.gather(x, i)
output = nn.avg_pool(
x1, ksize, strides=[1, 2, 2, 1], padding="VALID", data_format="NHWC")
loss = nn.l2_loss(output)
return output, gradient_ops.gradients(loss, x1)
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2)
def test_max_pool(self):
x = random_ops.random_uniform([3, 2, 12, 12, 3])
ksize = [1, 3, 3, 1]
def loop_fn(i):
x1 = array_ops.gather(x, i)
output = nn.max_pool(
x1, ksize, strides=[1, 2, 2, 1], padding="VALID", data_format="NHWC")
loss = nn.l2_loss(output)
return output, gradient_ops.gradients(loss, x1)
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2)
def test_fused_batch_norm(self):
data_formats = ["NHWC"]
if test.is_gpu_available():
data_formats.append("NCHW")
for is_training in (True, False):
for data_format in data_formats:
if data_format == "NCHW":
x = random_ops.random_uniform([3, 1, 2, 5, 5])
else:
x = random_ops.random_uniform([3, 1, 5, 5, 2])
scale = random_ops.random_uniform([2])
offset = random_ops.random_uniform([2])
mean = None if is_training else random_ops.random_uniform([2])
variance = None if is_training else random_ops.random_uniform([2])
# pylint: disable=cell-var-from-loop
def loop_fn(i):
x1 = array_ops.gather(x, i)
outputs = nn.fused_batch_norm(
x1,
scale,
offset,
mean=mean,
variance=variance,
epsilon=0.01,
data_format=data_format,
is_training=is_training)
outputs = list(outputs)
# We only test the first value of outputs when is_training is False.
# It looks like CPU and GPU have different outputs for batch_mean and
# batch_variance for this case.
if not is_training:
outputs[1] = constant_op.constant(0.)
outputs[2] = constant_op.constant(0.)
loss = nn.l2_loss(outputs[0])
gradients = gradient_ops.gradients(loss, [x1, scale, offset])
return outputs + gradients
# pylint: enable=cell-var-from-loop
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 6)
def test_softmax_cross_entropy_with_logits(self):
logits = random_ops.random_uniform([3, 2, 4])
labels = random_ops.random_uniform([3, 2, 4])
labels /= math_ops.reduce_sum(labels, axis=[2], keepdims=True)
def loop_fn(i):
logits_i = array_ops.gather(logits, i)
labels_i = array_ops.gather(labels, i)
loss = nn.softmax_cross_entropy_with_logits(
labels=labels_i, logits=logits_i)
return loss, gradient_ops.gradients(math_ops.reduce_sum(loss), logits_i)
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32] * 2)
class RandomTest(PForTest):
# The random values generated in the two implementations are not guaranteed to
# match. So we only check the returned shapes.
def run_and_assert_equal(self, targets1, targets2):
outputs = self._run_targets(targets1, targets2)
n = len(outputs) // 2
for i in range(n):
self.assertAllEqual(outputs[i].shape, outputs[i + n].shape)
def test_random_uniform(self):
def loop_fn(_):
return random_ops.random_uniform([3])
self._test_loop_fn(loop_fn, 5)
def test_random_uniform_int(self):
def loop_fn(_):
return random_ops.random_uniform([3], maxval=1, dtype=dtypes.int32)
self._test_loop_fn(loop_fn, 5, loop_fn_dtypes=dtypes.int32)
def test_random_standard_normal(self):
def loop_fn(_):
return random_ops.random_normal([3])
self._test_loop_fn(loop_fn, 5)
def test_truncated_normal(self):
def loop_fn(_):
return random_ops.truncated_normal([3])
self._test_loop_fn(loop_fn, 5)
def test_random_gamma(self):
def loop_fn(_):
return random_ops.random_gamma([3], alpha=[0.5])
self._test_loop_fn(loop_fn, 5)
def test_random_poisson_v2(self):
def loop_fn(_):
return random_ops.random_poisson(lam=[1.3], shape=[3])
self._test_loop_fn(loop_fn, 5)
class LoggingTest(PForTest):
def test_print(self):
x = random_ops.random_uniform([3, 5])
def loop_fn(i):
x1 = array_ops.gather(x, i)
return logging_ops.Print(
x1, [x1, "x1", array_ops.shape(x1)], summarize=10)
self._test_loop_fn(loop_fn, 3)
def test_assert(self):
def loop_fn(i):
return control_flow_ops.Assert(i < 10, [i, [10], [i + 1]])
# TODO(agarwal): make this work with for_loop.
with session.Session() as sess:
sess.run(pfor_control_flow_ops.pfor(loop_fn, 3))
class TensorArrayTest(PForTest):
def test_create_outside_and_read(self):
ta = tensor_array_ops.TensorArray(
dtypes.int32, 2, clear_after_read=False).write(0, 0).write(1, 1)
def loop_fn(i):
return ta.read(i), ta.read(0)
self._test_loop_fn(loop_fn, 2, [dtypes.int32] * 2)
def test_create_outside_and_gather(self):
ta = tensor_array_ops.TensorArray(
dtypes.int32, 2, clear_after_read=False).write(0, 0).write(1, 1)
def loop_fn(i):
return ta.gather([i]), ta.gather([0, 1])
self._test_loop_fn(loop_fn, 2, [dtypes.int32] * 2)
def test_create_outside_and_write_and_scatter(self):
t = tensor_array_ops.TensorArray(dtypes.int32, 10, clear_after_read=False)
handle = t.handle
def loop_fn(i):
ta = t.write(i + 2, 2 * i).write(i, 5)
ta = ta.scatter([4 + i], [4]).scatter([6 + i, 8 + i], [6 + i, 8 + i])
return ta.flow
t1 = pfor_control_flow_ops.pfor(loop_fn, iters=2)
out1 = tensor_array_ops.TensorArray(
dtypes.int32, handle=handle, flow=t1[-1]).stack()
output1 = self._run_targets(out1)
t2 = pfor_control_flow_ops.for_loop(loop_fn, dtypes.float32, iters=2)
out2 = tensor_array_ops.TensorArray(
dtypes.int32, handle=handle, flow=t2[-1]).stack()
output2 = self._run_targets(out2)
self.assertAllClose(output2, output1)
def test_create_inside_and_write(self):
def loop_fn(i):
# TODO(agarwal): switching the order of writes to ta1 does not work.
ta1 = tensor_array_ops.TensorArray(dtypes.int32, 2).write(0, i).write(
1, 1)
ta2 = tensor_array_ops.TensorArray(dtypes.int32, 1).write(0, 1)
return ta1.stack(), ta2.stack()
self._test_loop_fn(loop_fn, 3, [dtypes.int32] * 2)
def test_create_inside_and_scatter(self):
def loop_fn(i):
# TODO(agarwal): switching the order of scatter to ta1 does not work.
ta1 = tensor_array_ops.TensorArray(dtypes.int32, 2).scatter(
[0], [[i, 2]]).scatter([1], [[1, 2]])
ta2 = tensor_array_ops.TensorArray(dtypes.int32,
2).scatter([0], [3]).scatter([1], [4])
return ta1.stack(), ta2.stack()
self._test_loop_fn(loop_fn, 3, [dtypes.int32] * 2)
def test_create_inside_and_read(self):
def loop_fn(i):
ta1 = tensor_array_ops.TensorArray(
dtypes.int32, 2, clear_after_read=False).write(0, i).write(1, 1)
ta2 = tensor_array_ops.TensorArray(
dtypes.int32, 2, clear_after_read=False).write(0, 1).write(1, 2)
# TODO(agarwal): ta1.read(i) currently is not supported.
return ta1.read(0), ta2.read(0), ta2.read(i)
self._test_loop_fn(loop_fn, 2, [dtypes.int32] * 3)
def test_create_inside_and_gather(self):
def loop_fn(i):
ta1 = tensor_array_ops.TensorArray(
dtypes.int32, 2, clear_after_read=False).write(0, i).write(1, 1)
ta2 = tensor_array_ops.TensorArray(
dtypes.int32, 2, clear_after_read=False).write(0, 1).write(1, 2)
# TODO(agarwal): ta1.read(i) currently is not supported.
return ta1.gather([0, 1]), ta2.gather([0, 1]), ta2.gather([i])
self._test_loop_fn(loop_fn, 2, [dtypes.int32] * 3)
def test_grad(self):
x = random_ops.random_uniform([3, 2])
ta = tensor_array_ops.TensorArray(
dtypes.float32, 3, clear_after_read=False).unstack(x)
y = math_ops.square(ta.stack())
def loop_fn(i):
y_i = array_ops.gather(y, i)
grad = gradient_ops.gradients(y_i, x)[0]
return array_ops.gather(grad, i)
t1 = pfor_control_flow_ops.pfor(loop_fn, iters=3)
# y = x * x. Hence dy/dx = 2 * x.
actual_grad = 2.0 * x
with session.Session() as sess:
actual_grad, computed_grad = sess.run([t1, actual_grad])
self.assertAllClose(actual_grad, computed_grad)
class StackTest(PForTest):
def test_stack_inside_loop_invariant(self):
def loop_fn(_):
s = data_flow_ops.stack_v2(max_size=4, elem_type=dtypes.int32)
op1 = data_flow_ops.stack_push_v2(s, 1)
with ops.control_dependencies([op1]):
op2 = data_flow_ops.stack_push_v2(s, 2)
with ops.control_dependencies([op2]):
e2 = data_flow_ops.stack_pop_v2(s, elem_type=dtypes.int32)
with ops.control_dependencies([e2]):
e1 = data_flow_ops.stack_pop_v2(s, elem_type=dtypes.int32)
return e1, e2
self._test_loop_fn(loop_fn, 2, [dtypes.int32] * 2)
def test_stack_inside_push_loop_dependent(self):
def loop_fn(i):
s = data_flow_ops.stack_v2(max_size=4, elem_type=dtypes.int32)
op1 = data_flow_ops.stack_push_v2(s, i)
with ops.control_dependencies([op1]):
op2 = data_flow_ops.stack_push_v2(s, 2)
with ops.control_dependencies([op2]):
e2 = data_flow_ops.stack_pop_v2(s, elem_type=dtypes.int32)
with ops.control_dependencies([e2]):
e1 = data_flow_ops.stack_pop_v2(s, elem_type=dtypes.int32)
return e1, e2
self._test_loop_fn(loop_fn, 2, [dtypes.int32] * 2)
def test_stack_outside_pop(self):
s = data_flow_ops.stack_v2(max_size=4, elem_type=dtypes.int32)
op = data_flow_ops.stack_push_v2(s, 5)
with ops.control_dependencies([op]):
op = data_flow_ops.stack_push_v2(s, 6)
with ops.control_dependencies([op]):
op = data_flow_ops.stack_push_v2(s, 7)
def loop_fn(_):
e1 = data_flow_ops.stack_pop_v2(s, elem_type=dtypes.int32)
with ops.control_dependencies([e1]):
e2 = data_flow_ops.stack_pop_v2(s, elem_type=dtypes.int32)
return e1, e2
with ops.control_dependencies([op]):
e1, e2 = pfor_control_flow_ops.pfor(loop_fn, iters=2)
with ops.control_dependencies([e1, e2]):
e3 = data_flow_ops.stack_pop_v2(s, elem_type=dtypes.int32)
v1, v2, v3 = self._run_targets([e1, e2, e3], run_init=False)
self.assertAllEqual([7, 7], v1)
self.assertAllEqual([6, 6], v2)
self.assertAllEqual(5, v3)
def test_stack_outside_push(self):
s = data_flow_ops.stack_v2(max_size=4, elem_type=dtypes.int32)
def loop_fn(_):
return data_flow_ops.stack_push_v2(s, 7)
with self.assertRaisesRegexp(ValueError, "StackPushV2 not allowed.*"):
pfor_control_flow_ops.pfor(loop_fn, iters=2)
# TODO(agarwal): test nested while_loops. This currently requires converting a
# tf.cond.
class ControlFlowTest(PForTest):
def test_while_outside_loop(self):
x = control_flow_ops.while_loop(lambda j: j < 4, lambda j: j + 1, [0])
def loop_fn(i):
return x + i
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32])
def test_invariant_while(self):
def loop_fn(_):
return control_flow_ops.while_loop(lambda j: j < 4, lambda j: j + 1, [0])
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32])
def test_invariant_while_with_control_dependency(self):
def loop_fn(i):
with ops.control_dependencies([i]):
return control_flow_ops.while_loop(lambda j: j < 4, lambda j: j + 1,
[0])
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32])
def test_while_with_stateful_ops(self):
def loop_fn(_):
return control_flow_ops.while_loop(
lambda j, x: j < 4,
lambda j, x: (j + 1, x + random_ops.random_uniform([])), [0, 0.])[0]
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32])
def test_while_unstacked_condition(self):
def loop_fn(i):
return control_flow_ops.while_loop(lambda j, x: j < 4,
lambda j, x: (j + 1, x + i), [0, 0])
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32, dtypes.int32])
def test_while(self):
x = random_ops.random_uniform([3, 5])
lengths = constant_op.constant([4, 0, 2])
def loop_fn(i):
x_i = array_ops.gather(x, i)
lengths_i = array_ops.gather(lengths, i)
_, total = control_flow_ops.while_loop(
lambda j, _: j < lengths_i,
lambda j, t: (j + 1, t + array_ops.gather(x_i, j)), [0, 0.])
return total
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.float32])
def test_while_jacobian(self):
x = random_ops.random_uniform([1, 3])
y = random_ops.random_uniform([3, 3])
# out = x @ y @ y @ y @ y, where @ is matmul operator.
_, out = control_flow_ops.while_loop(
lambda i, _: i < 4, lambda i, out: (i + 1, math_ops.matmul(out, y)),
[0, x])
def loop_fn(i):
out_i = array_ops.gather(out, i, axis=1)
return array_ops.reshape(gradient_ops.gradients(out_i, x)[0], [-1])
out = pfor_control_flow_ops.pfor(loop_fn, iters=3)
# The above code does not work with tf.while_loop instead of pfor. So we
# manually compute the expected output here.
# Note that gradient of output w.r.t is (y @ y @ y @ y)^T.
expected_output = y
for _ in range(3):
expected_output = math_ops.matmul(expected_output, y)
expected_output = array_ops.transpose(expected_output, [1, 0])
with session.Session() as sess:
out, expected = sess.run([out, expected_output])
self.assertAllClose(expected, out)
def test_tensor_array_as_loop_variable(self):
def loop_fn(i):
def body(j, ta):
ta = ta.write(j, i + j * j)
return j + 1, ta
_, ta = control_flow_ops.while_loop(
lambda j, _: j < 4, body,
(0, tensor_array_ops.TensorArray(dtypes.int32, size=4)))
return ta.stack()
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32])
def test_read_tensor_array_partitioned_indices(self):
# Note that tensor array values are pfor loop dependent, and the while loop
# termination condition is also dependent on pfor iteration.
def loop_fn(i):
ta = tensor_array_ops.TensorArray(dtypes.int32, size=6)
ta = ta.unstack(i + list(range(5)))
def body(j, s):
return j + 1, s + ta.read(j)
_, s = control_flow_ops.while_loop(lambda j, _: j < i,
body,
(0, 0))
return s
self._test_loop_fn(loop_fn, 3, loop_fn_dtypes=[dtypes.int32])
def test_external_while_loop_grad(self):
# Here we test that external while_loops that are extended from inside pfor
# (due to gradient calls) are not actually converted. If the below was
# converted all pfor iterations would write to the same tensor array
# indices.
x = constant_op.constant(1.)
def body(j, ta):
ta = ta.write(j, x)
return j + 1, ta
_, ta = control_flow_ops.while_loop(
lambda j, _: j < 4, body,
(0, tensor_array_ops.TensorArray(dtypes.float32, size=4)))
out = ta.stack()
def loop_fn(i):
out_i = array_ops.gather(out, i)
return gradient_ops.gradients(out_i, x)[0]
with session.Session() as sess:
# out is [x, x, x]. Hence the gradients should be [1, 1, 1].
self.assertAllEqual([1, 1, 1],
sess.run(pfor_control_flow_ops.pfor(loop_fn, 3)))
def test_tensor_array_grad(self):
inp = constant_op.constant(np.random.rand(3, 4, 2), dtype=dtypes.float32)
ta = tensor_array_ops.TensorArray(dtypes.float32, size=3)
ta = ta.unstack(inp)
def loop_fn(i):
def body(j, x):
value = ta.gather([j])
value = array_ops.gather(array_ops.reshape(value, [4, 2]), i)
return j + 1, x + value
_, out = control_flow_ops.while_loop(lambda j, _: j < 3, body,
(0, array_ops.zeros([2])))
out = math_ops.reduce_prod(out)
return out, gradient_ops.gradients(out, inp)[0]
pfor_out, pfor_out_grad = pfor_control_flow_ops.pfor(loop_fn, 4)
# Note that tf.while_loop does not work in the setup above. So we manually
# construct the equivalent computation of the above loops here.
real_out = math_ops.reduce_sum(inp, reduction_indices=[0])
real_out = math_ops.reduce_prod(real_out, reduction_indices=[1])
# Note that gradients of real_out will accumulate the gradients across the
# output value. Hence we do the same aggregation on pfor_out_grad.
real_out_grad = gradient_ops.gradients(real_out, inp)[0]
sum_pfor_out_grad = math_ops.reduce_sum(
pfor_out_grad, reduction_indices=[0])
with session.Session() as sess:
v1, v2, v1_grad, v2_grad = sess.run(
[pfor_out, real_out, sum_pfor_out_grad, real_out_grad])
self.assertAllClose(v1, v2)
self.assertAllClose(v1_grad, v2_grad)
def dynamic_lstm_input_fn(batch_size, state_size, max_steps):
# We make inputs and sequence_length constant so that multiple session.run
# calls produce the same result.
inputs = constant_op.constant(
np.random.rand(batch_size, max_steps, state_size), dtype=dtypes.float32)
sequence_length = np.random.randint(0, size=[batch_size], high=max_steps + 1)
sequence_length = constant_op.constant(sequence_length, dtype=dtypes.int32)
return inputs, sequence_length
def create_dynamic_lstm(cell_fn, batch_size, state_size, max_steps):
cell = cell_fn(state_size)
inputs, sequence_length = dynamic_lstm_input_fn(batch_size,
state_size,
max_steps)
inputs_ta = tensor_array_ops.TensorArray(
dtypes.float32, size=max_steps, element_shape=[batch_size, state_size])
inputs_time_major = array_ops.transpose(inputs, [1, 0, 2])
inputs_ta = inputs_ta.unstack(inputs_time_major)
zeros = array_ops.zeros([state_size])
def loop_fn(i):
sequence_length_i = array_ops.gather(sequence_length, i)
def body_fn(t, state, ta):
inputs_t = array_ops.expand_dims(
array_ops.gather(inputs_ta.read(t), i), 0)
output, new_state = cell(inputs_t, state)
output = array_ops.reshape(output, [-1])
# TODO(agarwal): one optimization that dynamic_rnn uses is to avoid the
# array_ops.where when t < min(sequence_length). Doing that requires
# supporting tf.cond pfor conversion.
done = t >= sequence_length_i
output = array_ops.where(done, zeros, output)
ta = ta.write(t, output)
new_state = [array_ops.where(done, s, ns) for s, ns in
zip(nest.flatten(state), nest.flatten(new_state))]
new_state = nest.pack_sequence_as(state, new_state)
return t + 1, new_state, ta
def condition_fn(t, _, unused):
del unused
return t < max_steps
initial_state = cell.zero_state(1, dtypes.float32)
_, state, ta = control_flow_ops.while_loop(condition_fn, body_fn, [
0, initial_state,
tensor_array_ops.TensorArray(dtypes.float32, max_steps)
])
new_state = [array_ops.reshape(x, [-1]) for x in nest.flatten(state)]
new_state = nest.pack_sequence_as(initial_state, new_state)
return ta.stack(), new_state
pfor_output = pfor_control_flow_ops.pfor(loop_fn, batch_size)
tf_output = rnn.dynamic_rnn(
cell,
inputs,
sequence_length=sequence_length,
initial_state=cell.zero_state(batch_size, dtypes.float32))
return pfor_output, tf_output
class RNNTest(PForTest):
def test_dynamic_rnn(self):
pfor_outputs, tf_outputs = create_dynamic_lstm(rnn_cell.BasicRNNCell,
3, 5, 7)
self.run_and_assert_equal(pfor_outputs, tf_outputs)
def test_dynamic_lstm(self):
pfor_outputs, tf_outputs = create_dynamic_lstm(rnn_cell.BasicLSTMCell,
3, 5, 7)
self.run_and_assert_equal(pfor_outputs, tf_outputs)
# TODO(agarwal): benchmark numbers on GPU for graphs based on while_loop
# conversion don't look good. Some of it seems like lot of copies between host
# and device. Optimize that.
class Benchmarks(test.Benchmark):
def _run(self, targets, iters, name=None):
def _done(t):
# Note that we don't use tf.control_dependencies since that will not make
# sure that the computation on GPU has actually finished. So we fetch the
# first element of the output, and assume that this will not be called on
# empty tensors.
return array_ops.gather(array_ops.reshape(t, [-1]), 0)
targets = [_done(x) for x in nest.flatten(targets)]
sess = session.Session()
with sess:
init = variables.global_variables_initializer()
sess.run(init)
sess.run(targets)
begin = time.time()
for _ in range(iters):
sess.run(targets)
end = time.time()
avg_time_ms = 1000 * (end - begin) / iters
self.report_benchmark(iters=iters, wall_time=avg_time_ms, name=name)
return avg_time_ms
def benchmark_basic_while(self):
with ops.Graph().as_default():
def loop_fn(i):
_, s = control_flow_ops.while_loop(
lambda t, x: t < i,
lambda t, x: (t + 1, x + i),
[0, 0])
return s
iters = 50
pfor_output = pfor_control_flow_ops.pfor(loop_fn, iters)
for_loop_output = pfor_control_flow_ops.for_loop(loop_fn, dtypes.int32,
iters)
self._run(pfor_output, 100, name="pfor_basic")
self._run(for_loop_output, 100, name="for_loop_basic")
def benchmark_dynamic_rnn(self):
with ops.Graph().as_default():
pfor_outputs, tf_outputs = create_dynamic_lstm(rnn_cell.BasicRNNCell,
128, 512, 16)
self._run(pfor_outputs, 100, name="pfor_rnn")
self._run(tf_outputs, 100, name="tf_rnn")
def benchmark_dynamic_lstm(self):
with ops.Graph().as_default():
pfor_outputs, tf_outputs = create_dynamic_lstm(rnn_cell.BasicLSTMCell,
128, 512, 16)
self._run(pfor_outputs, 100, name="pfor_lstm")
self._run(tf_outputs, 100, name="tf_lstm")
class SparseTest(PForTest):
def test_var_loop_len(self):
num_iters = array_ops.placeholder(dtypes.int32)
def loop_fn(_):
return sparse_tensor.SparseTensor([[0], [1], [2]], [4, 5, 6],
[3]) # [0, 2, 0]
pfor = pfor_control_flow_ops.pfor(loop_fn, num_iters)
with self.cached_session() as sess:
sess.run(pfor, feed_dict={num_iters: 3})
def test_sparse_result_none_stacked(self):
num_iters = 10
def loop_fn(_):
return sparse_tensor.SparseTensor([[0], [1], [2]], [4, 5, 6],
[3]) # [0, 2, 0]
pfor = pfor_control_flow_ops.pfor(loop_fn, num_iters)
indices = [[i, j] for i in range(num_iters) for j in range(3)]
values = [4, 5, 6] * num_iters
dense_shapes = [num_iters, 3]
# Expected result: [[4, 5, 6], [4, 5, 6], [4, 5, 6], ...]
manual = sparse_tensor.SparseTensor(indices, values, dense_shapes)
self.run_and_assert_equal(pfor, manual)
def test_sparse_result_all_stacked(self):
num_iters = 10
def loop_fn(i):
i = array_ops.expand_dims(math_ops.cast(i, dtypes.int64), 0)
indices = array_ops.expand_dims(i, 0)
return sparse_tensor.SparseTensor(indices, i, i + 1) # [0, ..., 0, i]
# Expected result: [[0], [0, 1], [0, 0, 2], [0, 0, 0, 3], ...]
pfor = pfor_control_flow_ops.pfor(loop_fn, num_iters)
manual = sparse_tensor.SparseTensor([[i, i] for i in range(num_iters)],
list(range(num_iters)),
(num_iters, num_iters))
self.run_and_assert_equal(pfor, manual)
def test_sparse_result_indices_stacked(self):
num_iters = 10
def loop_fn(i):
i = array_ops.expand_dims(math_ops.cast(i, dtypes.int64), 0)
indices = array_ops.expand_dims(i, 0)
return sparse_tensor.SparseTensor(indices, [1], [num_iters])
# Expected result: identity matrix size num_iters * num_iters
pfor = pfor_control_flow_ops.pfor(loop_fn, num_iters)
manual = sparse_tensor.SparseTensor([[i, i] for i in range(num_iters)],
[1] * num_iters, (num_iters, num_iters))
self.run_and_assert_equal(pfor, manual)
def test_sparse_result_values_stacked(self):
num_iters = 10
def loop_fn(i):
i = array_ops.expand_dims(math_ops.cast(i, dtypes.int64), 0)
return sparse_tensor.SparseTensor([[0]], i, [num_iters]) # [i, 0, ..., 0]
# Expected result: [[1, 0, ...], [2, 0, ...], [3, 0, ...], ...]
pfor = pfor_control_flow_ops.pfor(loop_fn, num_iters)
manual = sparse_tensor.SparseTensor([[i, 0] for i in range(num_iters)],
list(range(num_iters)),
(num_iters, num_iters))
self.run_and_assert_equal(pfor, manual)
def test_sparse_result_shapes_stacked(self):
num_iters = 10
def loop_fn(i):
i = array_ops.expand_dims(math_ops.cast(i, dtypes.int64), 0)
return sparse_tensor.SparseTensor([[0]], [1], i + 1) # [1, 0, ..., 0]
# Expected result: [[1, 0, 0, ...], [1, 0, 0, ...], ...]
pfor = pfor_control_flow_ops.pfor(loop_fn, num_iters)
manual = sparse_tensor.SparseTensor([[i, 0] for i in range(num_iters)],
[1] * num_iters, (num_iters, num_iters))
self.run_and_assert_equal(pfor, manual)
def test_sparse_result_shapes_stacked_2D(self):
num_iters = 10
def loop_fn(i):
i = array_ops.expand_dims(math_ops.cast(i + 1, dtypes.int64), 0)
shape = array_ops.concat([i, i], 0)
return sparse_tensor.SparseTensor([[0, 0]], [1], shape) # [1, 0, ..., 0]
# Expected result: [[[1, 0, ...], [0, ..., 0], [0, ..., 0], ...], ...]
pfor = pfor_control_flow_ops.pfor(loop_fn, num_iters)
manual = sparse_tensor.SparseTensor([[i, 0, 0] for i in range(num_iters)],
[1] * num_iters,
(num_iters, num_iters, num_iters))
self.run_and_assert_equal(pfor, manual)
class ParsingTest(PForTest):
def test_decode_csv(self):
csv_tensor = constant_op.constant([["1:2:3"], ["::"], ["7:8:9"]])
kwargs = {"record_defaults": [[10], [20], [30]], "field_delim": ":"}
def loop_fn(i):
line = array_ops.gather(csv_tensor, i)
return parsing_ops.decode_csv(line, **kwargs)
self._test_loop_fn(loop_fn, iters=3, loop_fn_dtypes=[dtypes.int32] * 3)
def test_parse_single_example(self):
def _int64_feature(*values):
return feature_pb2.Feature(int64_list=feature_pb2.Int64List(value=values))
def _bytes_feature(*values):
return feature_pb2.Feature(
bytes_list=feature_pb2.BytesList(
value=[v.encode("utf-8") for v in values]))
examples = constant_op.constant([
example_pb2.Example(
features=feature_pb2.Features(
feature={
"dense_int": _int64_feature(i),
"dense_str": _bytes_feature(str(i)),
"sparse_int": _int64_feature(i, i * 2, i * 4, i * 8),
"sparse_str": _bytes_feature(*["abc"] * i)
})).SerializeToString() for i in range(10)
])
features = {
"dense_int": parsing_ops.FixedLenFeature((), dtypes.int64, 0),
"dense_str": parsing_ops.FixedLenFeature((), dtypes.string, ""),
"sparse_int": parsing_ops.VarLenFeature(dtypes.int64),
"sparse_str": parsing_ops.VarLenFeature(dtypes.string),
}
def loop_fn(i):
example_proto = array_ops.gather(examples, i)
f = parsing_ops.parse_single_example(example_proto, features)
return f
pfor = pfor_control_flow_ops.pfor(loop_fn, iters=10)
manual = parsing_ops.parse_example(examples, features)
self.run_and_assert_equal(pfor, manual)
if __name__ == "__main__":
test.main()