blob: fcb3ceac827bd7b7c24f06d1ba43ac3ca0bcf6b6 [file] [log] [blame]
# pylint: disable=g-bad-file-header
# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for tensorflow.python.client.graph_util."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
from tensorflow.core.framework import attr_value_pb2
from tensorflow.core.framework import graph_pb2
from tensorflow.core.framework import node_def_pb2
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import importer
from tensorflow.python.framework import ops
from tensorflow.python.framework import tensor_util
from tensorflow.python.framework import test_util
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import gen_nn_ops
from tensorflow.python.ops import image_ops
from tensorflow.python.ops import math_ops # pylint: disable=unused-import
from tensorflow.python.ops import nn_ops
from tensorflow.python.platform import test
from tensorflow.python.tools import optimize_for_inference_lib
class OptimizeForInferenceTest(test.TestCase):
def create_node_def(self, op, name, inputs):
new_node = node_def_pb2.NodeDef()
new_node.op = op
new_node.name = name
for input_name in inputs:
new_node.input.extend([input_name])
return new_node
def create_constant_node_def(self, name, value, dtype, shape=None):
node = self.create_node_def("Const", name, [])
self.set_attr_dtype(node, "dtype", dtype)
self.set_attr_tensor(node, "value", value, dtype, shape)
return node
def set_attr_dtype(self, node, key, value):
node.attr[key].CopyFrom(
attr_value_pb2.AttrValue(type=value.as_datatype_enum))
def set_attr_tensor(self, node, key, value, dtype, shape=None):
node.attr[key].CopyFrom(
attr_value_pb2.AttrValue(tensor=tensor_util.make_tensor_proto(
value, dtype=dtype, shape=shape)))
def testOptimizeForInference(self):
self.maxDiff = 1000
unused_constant_name = "unused_constant"
unconnected_add_name = "unconnected_add"
a_constant_name = "a_constant"
b_constant_name = "b_constant"
a_check_name = "a_check"
b_check_name = "b_check"
a_identity_name = "a_identity"
b_identity_name = "b_identity"
add_name = "add"
unused_output_add_name = "unused_output_add"
graph_def = graph_pb2.GraphDef()
unused_constant = self.create_constant_node_def(
unused_constant_name, value=0, dtype=dtypes.float32, shape=[])
graph_def.node.extend([unused_constant])
unconnected_add_node = self.create_node_def(
"Add", unconnected_add_name,
[unused_constant_name, unused_constant_name])
self.set_attr_dtype(unconnected_add_node, "T", dtypes.float32)
graph_def.node.extend([unconnected_add_node])
a_constant = self.create_constant_node_def(
a_constant_name, value=1, dtype=dtypes.float32, shape=[])
graph_def.node.extend([a_constant])
a_check_node = self.create_node_def("CheckNumerics", a_check_name,
[a_constant_name])
graph_def.node.extend([a_check_node])
a_identity_node = self.create_node_def(
"Identity", a_identity_name, [a_constant_name, "^" + a_check_name])
graph_def.node.extend([a_identity_node])
b_constant = self.create_constant_node_def(
b_constant_name, value=1, dtype=dtypes.float32, shape=[])
graph_def.node.extend([b_constant])
b_check_node = self.create_node_def("CheckNumerics", b_check_name,
[b_constant_name])
graph_def.node.extend([b_check_node])
b_identity_node = self.create_node_def(
"Identity", b_identity_name, [b_constant_name, "^" + b_check_name])
graph_def.node.extend([b_identity_node])
add_node = self.create_node_def("Add", add_name,
[a_identity_name, b_identity_name])
self.set_attr_dtype(add_node, "T", dtypes.float32)
graph_def.node.extend([add_node])
unused_output_add_node = self.create_node_def("Add", unused_output_add_name,
[add_name, b_constant_name])
self.set_attr_dtype(unused_output_add_node, "T", dtypes.float32)
graph_def.node.extend([unused_output_add_node])
expected_output = graph_pb2.GraphDef()
a_constant = self.create_constant_node_def(
a_constant_name, value=1, dtype=dtypes.float32, shape=[])
expected_output.node.extend([a_constant])
b_constant = self.create_constant_node_def(
b_constant_name, value=1, dtype=dtypes.float32, shape=[])
expected_output.node.extend([b_constant])
add_node = self.create_node_def("Add", add_name,
[a_constant_name, b_constant_name])
self.set_attr_dtype(add_node, "T", dtypes.float32)
expected_output.node.extend([add_node])
output = optimize_for_inference_lib.optimize_for_inference(
graph_def, [], [add_name], dtypes.float32.as_datatype_enum)
self.assertProtoEquals(expected_output, output)
def testFoldBatchNorms(self):
with self.test_session() as sess:
inputs = [1, 4, 2, 5, 3, 6, -1, -4, -2, -5, -3, -6]
input_op = constant_op.constant(
np.array(inputs), shape=[1, 1, 6, 2], dtype=dtypes.float32)
weights = [1, 2, 3, 4, 0.1, 0.2, 0.3, 0.4]
weights_op = constant_op.constant(
np.array(weights), shape=[1, 2, 2, 2], dtype=dtypes.float32)
conv_op = nn_ops.conv2d(
input_op, weights_op, [1, 1, 1, 1], padding="SAME", name="conv_op")
mean_op = constant_op.constant(
np.array([10, 20]), shape=[2], dtype=dtypes.float32)
variance_op = constant_op.constant(
np.array([0.25, 0.5]), shape=[2], dtype=dtypes.float32)
beta_op = constant_op.constant(
np.array([0.1, 0.6]), shape=[2], dtype=dtypes.float32)
gamma_op = constant_op.constant(
np.array([1.0, 2.0]), shape=[2], dtype=dtypes.float32)
test_util.set_producer_version(ops.get_default_graph(), 8)
gen_nn_ops._batch_norm_with_global_normalization(
conv_op,
mean_op,
variance_op,
beta_op,
gamma_op,
0.00001,
False,
name="output")
original_graph_def = sess.graph_def
original_result = sess.run(["output:0"])
optimized_graph_def = optimize_for_inference_lib.fold_batch_norms(
original_graph_def)
with self.test_session() as sess:
_ = importer.import_graph_def(
optimized_graph_def, input_map={}, name="optimized")
optimized_result = sess.run(["optimized/output:0"])
self.assertAllClose(original_result, optimized_result)
for node in optimized_graph_def.node:
self.assertNotEqual("BatchNormWithGlobalNormalization", node.op)
def testFoldFusedBatchNorms(self):
for data_format, use_gpu in [("NHWC", False), ("NCHW", True)]:
with self.test_session(use_gpu=use_gpu) as sess:
inputs = [1, 4, 2, 5, 3, 6, -1, -4, -2, -5, -3, -6]
input_op = constant_op.constant(
np.array(inputs),
shape=[1, 1, 6, 2] if data_format == "NHWC" else [1, 2, 1, 6],
dtype=dtypes.float32)
weights = [1, 2, 3, 4, 0.1, 0.2, 0.3, 0.4]
weights_op = constant_op.constant(
np.array(weights), shape=[1, 2, 2, 2], dtype=dtypes.float32)
conv_op = nn_ops.conv2d(
input_op,
weights_op, [1, 1, 1, 1],
padding="SAME",
data_format=data_format,
name="conv_op")
mean_op = constant_op.constant(
np.array([10, 20]), shape=[2], dtype=dtypes.float32)
variance_op = constant_op.constant(
np.array([0.25, 0.5]), shape=[2], dtype=dtypes.float32)
beta_op = constant_op.constant(
np.array([0.1, 0.6]), shape=[2], dtype=dtypes.float32)
gamma_op = constant_op.constant(
np.array([1.0, 2.0]), shape=[2], dtype=dtypes.float32)
ops.get_default_graph().graph_def_versions.producer = 9
gen_nn_ops._fused_batch_norm(
conv_op,
gamma_op,
beta_op,
mean_op,
variance_op,
0.00001,
is_training=False,
data_format=data_format,
name="output")
original_graph_def = sess.graph_def
original_result = sess.run(["output:0"])
optimized_graph_def = optimize_for_inference_lib.fold_batch_norms(
original_graph_def)
with self.test_session(use_gpu=use_gpu) as sess:
_ = importer.import_graph_def(
optimized_graph_def, input_map={}, name="optimized")
optimized_result = sess.run(["optimized/output:0"])
self.assertAllClose(
original_result, optimized_result, rtol=1e-04, atol=1e-06)
for node in optimized_graph_def.node:
self.assertNotEqual("FusedBatchNorm", node.op)
def testFuseResizePadAndConv(self):
with self.test_session() as sess:
inputs = [1, 4, 2, 5, 3, 6, -1, -4, -2, -5, -3, -6]
input_op = constant_op.constant(
np.array(inputs), shape=[1, 2, 3, 2], dtype=dtypes.float32)
resize_op = image_ops.resize_bilinear(
input_op, [12, 4], align_corners=False)
pad_op = array_ops.pad(resize_op, [[0, 0], [1, 1], [2, 2], [0, 0]],
mode="REFLECT")
weights = [1, 2, 3, 4, 0.1, 0.2, 0.3, 0.4]
weights_op = constant_op.constant(
np.array(weights), shape=[1, 2, 2, 2], dtype=dtypes.float32)
nn_ops.conv2d(
pad_op, weights_op, [1, 1, 1, 1], padding="VALID", name="output")
original_graph_def = sess.graph_def
original_result = sess.run(["output:0"])
optimized_graph_def = optimize_for_inference_lib.fuse_resize_and_conv(
original_graph_def, ["output"])
with self.test_session() as sess:
_ = importer.import_graph_def(
optimized_graph_def, input_map={}, name="optimized")
optimized_result = sess.run(["optimized/output:0"])
self.assertAllClose(original_result, optimized_result)
for node in optimized_graph_def.node:
self.assertNotEqual("Conv2D", node.op)
self.assertNotEqual("MirrorPad", node.op)
self.assertNotEqual("ResizeBilinear", node.op)
def testFuseResizeAndConv(self):
with self.test_session() as sess:
inputs = [1, 4, 2, 5, 3, 6, -1, -4, -2, -5, -3, -6]
input_op = constant_op.constant(
np.array(inputs), shape=[1, 2, 3, 2], dtype=dtypes.float32)
resize_op = image_ops.resize_bilinear(
input_op, [12, 4], align_corners=False)
weights = [1, 2, 3, 4, 0.1, 0.2, 0.3, 0.4]
weights_op = constant_op.constant(
np.array(weights), shape=[1, 2, 2, 2], dtype=dtypes.float32)
nn_ops.conv2d(
resize_op, weights_op, [1, 1, 1, 1], padding="VALID", name="output")
original_graph_def = sess.graph_def
original_result = sess.run(["output:0"])
optimized_graph_def = optimize_for_inference_lib.fuse_resize_and_conv(
original_graph_def, ["output"])
with self.test_session() as sess:
_ = importer.import_graph_def(
optimized_graph_def, input_map={}, name="optimized")
optimized_result = sess.run(["optimized/output:0"])
self.assertAllClose(original_result, optimized_result)
for node in optimized_graph_def.node:
self.assertNotEqual("Conv2D", node.op)
self.assertNotEqual("MirrorPad", node.op)
def testFusePadAndConv(self):
with self.test_session() as sess:
inputs = [1, 4, 2, 5, 3, 6, -1, -4, -2, -5, -3, -6]
input_op = constant_op.constant(
np.array(inputs), shape=[1, 2, 3, 2], dtype=dtypes.float32)
pad_op = array_ops.pad(input_op, [[0, 0], [1, 1], [2, 2], [0, 0]],
mode="REFLECT")
weights = [1, 2, 3, 4, 0.1, 0.2, 0.3, 0.4]
weights_op = constant_op.constant(
np.array(weights), shape=[1, 2, 2, 2], dtype=dtypes.float32)
nn_ops.conv2d(
pad_op, weights_op, [1, 1, 1, 1], padding="VALID", name="output")
original_graph_def = sess.graph_def
original_result = sess.run(["output:0"])
optimized_graph_def = optimize_for_inference_lib.fuse_resize_and_conv(
original_graph_def, ["output"])
with self.test_session() as sess:
_ = importer.import_graph_def(
optimized_graph_def, input_map={}, name="optimized")
optimized_result = sess.run(["optimized/output:0"])
self.assertAllClose(original_result, optimized_result)
for node in optimized_graph_def.node:
self.assertNotEqual("Conv2D", node.op)
self.assertNotEqual("ResizeBilinear", node.op)
if __name__ == "__main__":
test.main()