blob: f6bf57a789c7f887f92d0919488109313045968b [file] [log] [blame]
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Parameterized unit tests for quantizing a Tensorflow graph."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tensorflow.contrib.layers.python.layers import layers
from tensorflow.contrib.quantize.python import fold_batch_norms
from tensorflow.contrib.quantize.python import quantize
from tensorflow.python.framework import ops
from tensorflow.python.framework import test_util
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import nn_ops
from tensorflow.python.ops import variable_scope
from tensorflow.python.platform import googletest
batch_norm = layers.batch_norm
conv2d = layers.conv2d
fully_connected = layers.fully_connected
separable_conv2d = layers.separable_conv2d
class QuantizeTest(test_util.TensorFlowTestCase):
def _RunWithoutBatchNormTestOverParameters(self, test_fn):
# TODO(suharshs): Use parameterized test once OSS TF supports it.
parameters_list = [
# (activation, activation_op_name, with_bypass, delay)
(nn_ops.relu6, 'Relu6', False, None),
(nn_ops.relu, 'Relu', False, None),
(array_ops.identity, 'Identity', False, None),
(nn_ops.relu6, 'Relu6', False, 5000),
(nn_ops.relu, 'Relu', False, 5000),
(array_ops.identity, 'Identity', False, 5000),
(nn_ops.relu6, 'Relu6', True, None),
(nn_ops.relu, 'Relu', True, None),
(array_ops.identity, 'Identity', True, None),
(nn_ops.relu6, 'Relu6', True, 5000),
(nn_ops.relu, 'Relu', True, 5000),
(array_ops.identity, 'Identity', True, 5000),
]
for params in parameters_list:
# Test everything with resource variables and normal variables.
test_fn(params[0], params[1], params[2], params[3], False, None)
test_fn(params[0], params[1], params[2], params[3], True, None)
# Test with both empty scope and an example scope
test_fn(params[0], params[1], params[2], params[3], False, 'test')
test_fn(params[0], params[1], params[2], params[3], True, 'test')
def _AssertCorrectQuantizedGraphWithoutBatchNorm(
self, graph, scope, layer, activation_op_name, with_bypass, delay,
use_resource):
quantization_node_name = 'FakeQuantWithMinMaxVars'
conv_scope = self._GetConvScope(scope, with_bypass)
delim = '/' if conv_scope else ''
if scope:
scope = scope + '/'
weights_quant = graph.get_operation_by_name(
conv_scope + delim + 'weights_quant/' + quantization_node_name)
self.assertEqual(weights_quant.type, quantization_node_name)
# Assemble the expected inputs.
if use_resource:
expected_inputs = [
conv_scope + delim +
'weights_quant/FakeQuantWithMinMaxVars/ReadVariableOp',
conv_scope + delim +
'weights_quant/FakeQuantWithMinMaxVars/ReadVariableOp_1',
]
if layer == 'DepthwiseConv2dNative':
expected_inputs.append(conv_scope + delim + 'depthwise/ReadVariableOp')
else:
expected_inputs.append(conv_scope + delim + layer + '/ReadVariableOp')
else:
expected_inputs = [
conv_scope + delim + 'weights_quant/AssignMinLast',
conv_scope + delim + 'weights_quant/AssignMaxLast',
]
if layer == 'DepthwiseConv2dNative':
expected_inputs.append(conv_scope + delim + 'depthwise_weights/read')
else:
expected_inputs.append(conv_scope + delim + 'weights/read')
self._AssertInputOpsAre(weights_quant, expected_inputs)
if delay and delay > 0:
output_op_name = (
conv_scope + delim + 'weights_quant/delayed_quant/Switch_1')
else:
if layer == 'DepthwiseConv2dNative':
output_op_name = conv_scope + delim + 'depthwise'
else:
output_op_name = conv_scope + delim + layer
self._AssertOutputGoesToOps(weights_quant, graph, [output_op_name])
if with_bypass:
conv_quant = graph.get_operation_by_name(
conv_scope + delim + 'conv_quant/' + quantization_node_name)
self.assertEqual(conv_quant.type, quantization_node_name)
if use_resource:
expected_inputs = [
conv_scope + delim +
'conv_quant/FakeQuantWithMinMaxVars/ReadVariableOp',
conv_scope + delim +
'conv_quant/FakeQuantWithMinMaxVars/ReadVariableOp_1',
conv_scope + delim + 'BiasAdd',
]
else:
expected_inputs = [
conv_scope + delim + 'conv_quant/AssignMinEma',
conv_scope + delim + 'conv_quant/AssignMaxEma',
conv_scope + delim + 'BiasAdd'
]
self._AssertInputOpsAre(conv_quant, expected_inputs)
output_op_name = (
conv_scope + delim + 'conv_quant/delayed_quant/Switch_1'
if delay else scope + 'Add')
self._AssertOutputGoesToOps(conv_quant, graph, [output_op_name])
act_quant = graph.get_operation_by_name(scope + 'act_quant/' +
quantization_node_name)
self.assertEqual(act_quant.type, quantization_node_name)
if use_resource:
expected_inputs = [
scope + 'act_quant/FakeQuantWithMinMaxVars/ReadVariableOp',
scope + 'act_quant/FakeQuantWithMinMaxVars/ReadVariableOp_1',
scope + activation_op_name,
]
else:
expected_inputs = [
scope + 'act_quant/AssignMinEma', scope + 'act_quant/AssignMaxEma',
scope + activation_op_name
]
self._AssertInputOpsAre(act_quant, expected_inputs)
output_op_name = (
scope + 'act_quant/delayed_quant/Switch_1'
if delay else 'control_dependency')
self._AssertOutputGoesToOps(act_quant, graph, [output_op_name])
self._AssertIdempotent(graph)
def testQuantize_Conv2dWithoutBatchNorm(self):
self._RunWithoutBatchNormTestOverParameters(
self._TestQuantize_Conv2dWithoutBatchNorm)
def _TestQuantize_Conv2dWithoutBatchNorm(self, activation, activation_op_name,
with_bypass, delay, use_resource,
scope):
"""Tests quantization: inputs -> Conv2d no batch norm -> Activation.
Args:
activation: Callable that returns an Operation, a factory method for the
Activation.
activation_op_name: String, name of the Activation operation.
with_bypass: Bool, when true there is an extra connection added from
inputs to just before Activation.
delay: Int (optional), delay in number of steps until quantization starts.
use_resource: Bool, when true uses resource variables.
scope: String, specifies top level scope for the graph
"""
graph = ops.Graph()
with graph.as_default():
variable_scope.get_variable_scope().set_use_resource(use_resource)
batch_size, height, width, depth = 5, 128, 128, 3
inputs = array_ops.zeros((batch_size, height, width, depth))
stride = 1 if with_bypass else 2
out_depth = 3 if with_bypass else 32
activation_fn = None if with_bypass else activation
conv_scope = self._GetConvScope(scope, with_bypass)
scope = '' if scope is None else scope
delim = '/' if scope else ''
node = conv2d(
inputs,
out_depth, [5, 5],
stride=stride,
padding='SAME',
weights_initializer=self._WeightInit(0.09),
activation_fn=activation_fn,
scope=conv_scope)
if with_bypass:
node = math_ops.add(inputs, node, name=scope + delim + 'Add')
node = activation(node, name=scope + delim + activation_op_name)
update_barrier = control_flow_ops.no_op(name='update_barrier')
with ops.control_dependencies([update_barrier]):
array_ops.identity(node, name='control_dependency')
quantize.Quantize(graph, True, quant_delay=delay)
if conv_scope is None:
conv_scope = ''
self._AssertCorrectQuantizedGraphWithoutBatchNorm(
graph, scope, 'Conv2D', activation_op_name, with_bypass, delay,
use_resource)
def testQuantize_FCWithoutBatchNorm(self):
self._RunWithoutBatchNormTestOverParameters(
self._TestQuantize_FCWithoutBatchNorm)
def _TestQuantize_FCWithoutBatchNorm(self, activation, activation_op_name,
with_bypass, delay, use_resource, scope):
"""Tests quantization: inputs -> FC no batch norm -> Activation.
Args:
activation: Callable that returns an Operation, a factory method for the
Activation.
activation_op_name: String, name of the Activation operation.
with_bypass: Bool, when true there is an extra connection added from
inputs to just before Activation.
delay: Int (optional), delay in number of steps until quantization starts.
use_resource: Bool, when true uses resource variables.
scope: String, specifies top level scope for the graph
"""
graph = ops.Graph()
with graph.as_default():
variable_scope.get_variable_scope().set_use_resource(use_resource)
batch_size, depth = 5, 256
inputs = array_ops.zeros((batch_size, depth))
out_depth = 256 if with_bypass else 128
activation_fn = None if with_bypass else activation
fc_scope = self._GetConvScope(scope, with_bypass)
scope = '' if scope is None else scope
delim = '/' if scope else ''
node = fully_connected(
inputs,
out_depth,
weights_initializer=self._WeightInit(0.03),
activation_fn=activation_fn,
scope=fc_scope)
if with_bypass:
node = math_ops.add(inputs, node, name=scope + delim + 'Add')
node = activation(node, name=scope + delim + activation_op_name)
update_barrier = control_flow_ops.no_op(name='update_barrier')
with ops.control_dependencies([update_barrier]):
array_ops.identity(node, name='control_dependency')
quantize.Quantize(graph, True, quant_delay=delay)
self._AssertCorrectQuantizedGraphWithoutBatchNorm(
graph, scope, 'MatMul', activation_op_name, with_bypass, delay,
use_resource)
def testQuantize_DepthwiseConv2dWithoutBatchNorm(self):
self._RunWithoutBatchNormTestOverParameters(
self._TestQuantize_DepthwiseConv2dWithoutBatchNorm)
def _TestQuantize_DepthwiseConv2dWithoutBatchNorm(
self, activation, activation_op_name, with_bypass, delay, use_resource,
scope):
"""Tests quantization: inputs -> DWConv2d no batch norm -> Activation.
Args:
activation: Callable that returns an Operation, a factory method for the
Activation.
activation_op_name: String, name of the Activation operation.
with_bypass: Bool, when true there is an extra connection added from
inputs to just before Activation.
delay: Int (optional), delay in number of steps until quantization starts.
use_resource: Bool, when true uses resource variables.
scope: String, specifies top level scope for the graph
"""
graph = ops.Graph()
with graph.as_default():
variable_scope.get_variable_scope().set_use_resource(use_resource)
batch_size, height, width, depth = 5, 128, 128, 3
inputs = array_ops.zeros((batch_size, height, width, depth))
stride = 1 if with_bypass else 2
activation_fn = None if with_bypass else activation
conv_scope = self._GetConvScope(scope, with_bypass)
scope = '' if scope is None else scope
delim = '/' if scope else ''
node = separable_conv2d(
inputs,
None, [5, 5],
stride=stride,
depth_multiplier=1.0,
padding='SAME',
weights_initializer=self._WeightInit(0.09),
activation_fn=activation_fn,
scope=conv_scope)
if with_bypass:
node = math_ops.add(inputs, node, name=scope + delim + 'Add')
node = activation(node, name=scope + delim + activation_op_name)
update_barrier = control_flow_ops.no_op(name='update_barrier')
with ops.control_dependencies([update_barrier]):
array_ops.identity(node, name='control_dependency')
quantize.Quantize(graph, True, quant_delay=delay)
self._AssertCorrectQuantizedGraphWithoutBatchNorm(
graph, scope, 'DepthwiseConv2dNative', activation_op_name, with_bypass,
delay, use_resource)
def testQuantize_AtrousConvWithoutBatchNorm(self):
self._RunWithoutBatchNormTestOverParameters(
self._TestQuantize_AtrousConvWithoutBatchNorm)
def _TestQuantize_AtrousConvWithoutBatchNorm(self, activation,
activation_op_name, with_bypass,
delay, use_resource, scope):
"""Tests quantization: inputs -> atrous conv no batch norm -> Activation.
Args:
activation: Callable that returns an Operation, a factory method for the
Activation.
activation_op_name: String, name of the Activation operation.
with_bypass: Bool, when true there is an extra connection added from
inputs to just before Activation.
delay: Int (optional), delay in number of steps until quantization starts.
use_resource: Bool, when true uses resource variables.
scope: String, specifies top level scope for the graph
"""
graph = ops.Graph()
with graph.as_default():
variable_scope.get_variable_scope().set_use_resource(use_resource)
batch_size, height, width, depth = 5, 128, 128, 3
inputs = array_ops.zeros((batch_size, height, width, depth))
dilation_rate = 2
activation_fn = None if with_bypass else activation
conv_scope = self._GetConvScope(scope, with_bypass)
scope = '' if scope is None else scope
delim = '/' if scope else ''
node = separable_conv2d(
inputs,
None, [3, 3],
rate=dilation_rate,
depth_multiplier=1.0,
padding='SAME',
weights_initializer=self._WeightInit(0.09),
activation_fn=activation_fn,
scope=conv_scope)
if with_bypass:
node = math_ops.add(inputs, node, name=scope + delim + 'Add')
node = activation(node, name=scope + delim + activation_op_name)
update_barrier = control_flow_ops.no_op(name='update_barrier')
with ops.control_dependencies([update_barrier]):
array_ops.identity(node, name='control_dependency')
quantize.Quantize(graph, True, quant_delay=delay)
self._AssertCorrectQuantizedGraphWithoutBatchNorm(
graph, scope, 'DepthwiseConv2dNative', activation_op_name, with_bypass,
delay, use_resource)
def _RunBatchNormTestOverParameters(self, test_fn):
# TODO(suharshs): Use parameterized test once OSS TF supports it.
parameters_list = [
# (activation, activation_op_name, with_bypass, delay, fused_batch_norm)
(nn_ops.relu6, 'Relu6', False, None, False),
(nn_ops.relu, 'Relu', False, None, False),
(array_ops.identity, 'Identity', False, None, False),
(nn_ops.relu6, 'Relu6', False, 5000, False),
(nn_ops.relu, 'Relu', False, 5000, False),
(array_ops.identity, 'Identity', False, 5000, False),
(nn_ops.relu6, 'Relu6', True, None, False),
(nn_ops.relu, 'Relu', True, None, False),
(array_ops.identity, 'Identity', True, None, False),
(nn_ops.relu6, 'Relu6', True, 5000, False),
(nn_ops.relu, 'Relu', True, 5000, False),
(array_ops.identity, 'Identity', True, 5000, False),
(nn_ops.relu6, 'Relu6', False, None, True),
(nn_ops.relu, 'Relu', False, None, True),
(array_ops.identity, 'Identity', False, None, True),
(nn_ops.relu6, 'Relu6', False, 5000, True),
(nn_ops.relu, 'Relu', False, 5000, True),
(array_ops.identity, 'Identity', False, 5000, True),
(nn_ops.relu6, 'Relu6', True, None, True),
(nn_ops.relu, 'Relu', True, None, True),
(array_ops.identity, 'Identity', True, None, True),
(nn_ops.relu6, 'Relu6', True, 5000, True),
(nn_ops.relu, 'Relu', True, 5000, True),
(array_ops.identity, 'Identity', True, 5000, True)
]
for params in parameters_list:
# Test everything with resource variables and normal variables.
test_fn(params[0], params[1], params[2], params[3], params[4], False,
None)
test_fn(params[0], params[1], params[2], params[3], params[4], True, None)
test_fn(params[0], params[1], params[2], params[3], params[4], False,
'test')
test_fn(params[0], params[1], params[2], params[3], params[4], True,
'test')
def _AssertCorrectQuantizedGraphWithBatchNorm(self, graph, scope, layer,
activation_op_name, with_bypass,
delay, use_resource):
quantization_node_name = 'FakeQuantWithMinMaxVars'
conv_scope = self._GetConvScope(scope, with_bypass)
delim = '/' if conv_scope else ''
if scope:
scope = scope + '/'
weights_quant = graph.get_operation_by_name(
conv_scope + delim + 'weights_quant/' + quantization_node_name)
self.assertEqual(weights_quant.type, quantization_node_name)
if use_resource:
expected_inputs = [
conv_scope + delim +
'weights_quant/FakeQuantWithMinMaxVars/ReadVariableOp',
conv_scope + delim +
'weights_quant/FakeQuantWithMinMaxVars/ReadVariableOp_1',
]
else:
expected_inputs = [
conv_scope + delim + 'weights_quant/' + 'AssignMinLast',
conv_scope + delim + 'weights_quant/' + 'AssignMaxLast'
]
expected_inputs.append(conv_scope + delim + 'mul_fold')
self._AssertInputOpsAre(weights_quant, expected_inputs)
if layer == 'DepthwiseConv2dNative':
output_op_name = conv_scope + delim + (
'weights_quant/delayed_quant/Switch_1' if delay else 'depthwise_Fold')
else:
output_op_name = conv_scope + delim + (
'weights_quant/delayed_quant/Switch_1' if delay else layer + '_Fold')
self._AssertOutputGoesToOps(weights_quant, graph, [output_op_name])
if with_bypass:
conv_quant = graph.get_operation_by_name(
conv_scope + delim + 'conv_quant/' + quantization_node_name)
self.assertEqual(conv_quant.type, quantization_node_name)
if use_resource:
expected_inputs = [
conv_scope + delim +
'conv_quant/FakeQuantWithMinMaxVars/ReadVariableOp',
conv_scope + delim +
'conv_quant/FakeQuantWithMinMaxVars/ReadVariableOp_1',
]
else:
expected_inputs = [
conv_scope + delim + 'conv_quant/AssignMinEma',
conv_scope + delim + 'conv_quant/AssignMaxEma',
]
expected_inputs.append(conv_scope + delim + 'add_fold')
self._AssertInputOpsAre(conv_quant, expected_inputs)
output_op_name = (
conv_scope + delim + 'conv_quant/delayed_quant/Switch_1'
if delay else scope + 'Add')
self._AssertOutputGoesToOps(conv_quant, graph, [output_op_name])
act_quant = graph.get_operation_by_name(scope + 'act_quant/' +
quantization_node_name)
self.assertEqual(act_quant.type, quantization_node_name)
if use_resource:
expected_inputs = [
scope + 'act_quant/FakeQuantWithMinMaxVars/ReadVariableOp',
scope + 'act_quant/FakeQuantWithMinMaxVars/ReadVariableOp_1',
]
else:
expected_inputs = [
scope + 'act_quant/AssignMinEma',
scope + 'act_quant/AssignMaxEma',
]
expected_inputs.append(scope + activation_op_name)
self._AssertInputOpsAre(act_quant, expected_inputs)
output_op_name = (
scope + 'act_quant/delayed_quant/Switch_1'
if delay else 'control_dependency')
self._AssertOutputGoesToOps(act_quant, graph, [output_op_name])
self._AssertIdempotent(graph)
def testQuantize_Conv2dWithBatchNorm(self):
self._RunBatchNormTestOverParameters(self._TestQuantize_Conv2dWithBatchNorm)
def _TestQuantize_Conv2dWithBatchNorm(self, activation, activation_op_name,
with_bypass, delay, fused_batch_norm,
use_resource, scope):
"""Tests quantization: inputs -> Conv2d with batch norm -> Activation.
Args:
activation: Callable that returns an Operation, a factory method for the
Activation.
activation_op_name: String, name of the Activation operation.
with_bypass: Bool, when true there is an extra connection added from
inputs to just before Activation.
delay: Int (optional), delay in number of steps until quantization starts.
fused_batch_norm: Bool, when true use FusedBatchNorm.
use_resource: Bool, when true uses resource variables.
scope: String, specifies top level scope for the graph
"""
graph = ops.Graph()
with graph.as_default():
variable_scope.get_variable_scope().set_use_resource(use_resource)
batch_size, height, width, depth = 5, 128, 128, 3
inputs = array_ops.zeros((batch_size, height, width, depth))
stride = 1 if with_bypass else 2
out_depth = 3 if with_bypass else 32
conv_scope = self._GetConvScope(scope, with_bypass)
scope = '' if scope is None else scope
delim = '/' if scope else ''
node = conv2d(
inputs,
out_depth, [5, 5],
stride=stride,
padding='SAME',
weights_initializer=self._WeightInit(0.09),
activation_fn=None,
normalizer_fn=batch_norm,
normalizer_params=self._BatchNormParams(fused_batch_norm),
scope=conv_scope)
# Manually add a bypass (optional) and an activation.
if with_bypass:
node = math_ops.add(inputs, node, name=scope + delim + 'Add')
node = activation(node, name=scope + delim + activation_op_name)
update_barrier = control_flow_ops.no_op(name='update_barrier')
with ops.control_dependencies([update_barrier]):
array_ops.identity(node, name='control_dependency')
fold_batch_norms.FoldBatchNorms(graph, is_training=True)
quantize.Quantize(graph, True, quant_delay=delay)
self._AssertCorrectQuantizedGraphWithBatchNorm(
graph, scope, 'Conv2D', activation_op_name, with_bypass, delay,
use_resource)
def testQuantize_FCWithBatchNorm(self):
self._RunBatchNormTestOverParameters(self._TestQuantize_FCWithBatchNorm)
def _TestQuantize_FCWithBatchNorm(self, activation, activation_op_name,
with_bypass, delay, fused_batch_norm,
use_resource, scope):
"""Tests quantization: inputs -> FC with batch norm -> Activation.
Args:
activation: Callable that returns an Operation, a factory method for the
Activation.
activation_op_name: String, name of the Activation operation.
with_bypass: Bool, when true there is an extra connection added from
inputs to just before Activation.
delay: Int (optional), delay in number of steps until quantization starts.
fused_batch_norm: Bool, when true use FusedBatchNorm.
use_resource: Bool, when true uses resource variables.
scope: String, specifies top level scope for the graph
"""
graph = ops.Graph()
with graph.as_default():
variable_scope.get_variable_scope().set_use_resource(use_resource)
batch_size, depth = 5, 256
inputs = array_ops.zeros((batch_size, depth))
out_depth = 256 if with_bypass else 128
conv_scope = self._GetConvScope(scope, with_bypass)
scope = '' if scope is None else scope
delim = '/' if scope else ''
node = fully_connected(
inputs,
out_depth,
weights_initializer=self._WeightInit(0.03),
activation_fn=None,
normalizer_fn=batch_norm,
normalizer_params=self._BatchNormParams(fused_batch_norm),
scope=conv_scope)
# Manually add a bypass (optional) and an activation.
if with_bypass:
node = math_ops.add(inputs, node, name=scope + delim + 'Add')
node = activation(node, name=scope + delim + activation_op_name)
update_barrier = control_flow_ops.no_op(name='update_barrier')
with ops.control_dependencies([update_barrier]):
array_ops.identity(node, name='control_dependency')
fold_batch_norms.FoldBatchNorms(graph, is_training=True)
quantize.Quantize(graph, True, quant_delay=delay)
self._AssertCorrectQuantizedGraphWithBatchNorm(
graph, scope, 'MatMul', activation_op_name, with_bypass, delay,
use_resource)
def testQuantize_DepthwiseConv2dWithBatchNorm(self):
self._RunBatchNormTestOverParameters(
self._TestQuantize_DepthwiseConv2dWithBatchNorm)
def _TestQuantize_DepthwiseConv2dWithBatchNorm(
self, activation, activation_op_name, with_bypass, delay,
fused_batch_norm, use_resource, scope):
"""Tests quantization: inputs -> DWConv2d with batch norm -> Activation.
Args:
activation: Callable that returns an Operation, a factory method for the
Activation.
activation_op_name: String, name of the Activation operation.
with_bypass: Bool, when true there is an extra connection added from
inputs to just before Activation.
delay: Int (optional), delay in number of steps until quantization starts.
fused_batch_norm: Bool, when true use FusedBatchNorm.
use_resource: Bool, when true uses resource variables.
scope: String, specifies top level scope for the graph
"""
graph = ops.Graph()
with graph.as_default():
variable_scope.get_variable_scope().set_use_resource(use_resource)
batch_size, height, width, depth = 5, 128, 128, 3
inputs = array_ops.zeros((batch_size, height, width, depth))
stride = 1 if with_bypass else 2
conv_scope = self._GetConvScope(scope, with_bypass)
scope = '' if scope is None else scope
delim = '/' if scope else ''
node = separable_conv2d(
inputs,
None, [5, 5],
stride=stride,
depth_multiplier=1.0,
padding='SAME',
weights_initializer=self._WeightInit(0.09),
activation_fn=None,
normalizer_fn=batch_norm,
normalizer_params=self._BatchNormParams(fused_batch_norm),
scope=conv_scope)
# Manually add a bypass (optional) and an activation.
if with_bypass:
node = math_ops.add(inputs, node, name=scope + delim + 'Add')
node = activation(node, name=scope + delim + activation_op_name)
update_barrier = control_flow_ops.no_op(name='update_barrier')
with ops.control_dependencies([update_barrier]):
array_ops.identity(node, name='control_dependency')
fold_batch_norms.FoldBatchNorms(graph, is_training=True)
quantize.Quantize(graph, True, quant_delay=delay)
self._AssertCorrectQuantizedGraphWithBatchNorm(
graph, scope, 'DepthwiseConv2dNative', activation_op_name,
with_bypass, delay, use_resource)
def testQuantize_AtrousConvWithBatchNorm(self):
self._RunBatchNormTestOverParameters(
self._TestQuantize_AtrousConvWithBatchNorm)
def _TestQuantize_AtrousConvWithBatchNorm(
self, activation, activation_op_name, with_bypass, delay,
fused_batch_norm, use_resource, scope):
"""Tests quantization: inputs -> atrous conv with batch norm -> Activation.
Args:
activation: Callable that returns an Operation, a factory method for the
Activation.
activation_op_name: String, name of the Activation operation.
with_bypass: Bool, when true there is an extra connection added from
inputs to just before Activation.
delay: Int (optional), delay in number of steps until quantization starts.
fused_batch_norm: Bool, when true use FusedBatchNorm.
use_resource: Bool, when true uses resource variables.
scope: String, specifies top level scope for the graph
"""
graph = ops.Graph()
with graph.as_default():
variable_scope.get_variable_scope().set_use_resource(use_resource)
batch_size, height, width, depth = 5, 128, 128, 3
inputs = array_ops.zeros((batch_size, height, width, depth))
dilation_rate = 2
conv_scope = self._GetConvScope(scope, with_bypass)
scope = '' if scope is None else scope
delim = '/' if scope else ''
node = separable_conv2d(
inputs,
None, [3, 3],
rate=dilation_rate,
depth_multiplier=1.0,
padding='SAME',
weights_initializer=self._WeightInit(0.09),
activation_fn=None,
normalizer_fn=batch_norm,
normalizer_params=self._BatchNormParams(fused_batch_norm),
scope=conv_scope)
# Manually add a bypass (optional) and an activation.
if with_bypass:
node = math_ops.add(inputs, node, name=scope + delim + 'Add')
node = activation(node, name=scope + delim + activation_op_name)
update_barrier = control_flow_ops.no_op(name='update_barrier')
with ops.control_dependencies([update_barrier]):
array_ops.identity(node, name='control_dependency')
fold_batch_norms.FoldBatchNorms(graph, is_training=True)
quantize.Quantize(graph, True, quant_delay=delay)
self._AssertCorrectQuantizedGraphWithBatchNorm(
graph, scope, 'DepthwiseConv2dNative', activation_op_name,
with_bypass, delay, use_resource)
def _AssertIdempotent(self, graph):
# Ensure that calling the rewrite again doesn't change the graph.
graph_def_before = str(graph.as_graph_def())
with graph.as_default():
# Ensuring that calling the rewrite again doesn't add more nodes.
fold_batch_norms.FoldBatchNorms(graph, is_training=True)
quantize.Quantize(graph, True)
graph_def_after = str(graph.as_graph_def())
self.assertEqual(graph_def_before, graph_def_after)
def testBatchNormForcedUpdates(self):
parameter_list = [
# (activation, activation_op_name, fused_batch_norm)
(nn_ops.relu6, 'Relu6', False),
(nn_ops.relu, 'Relu', False),
(array_ops.identity, 'Identity', False),
(nn_ops.relu6, 'Relu6', True),
(nn_ops.relu, 'Relu', True),
(array_ops.identity, 'Identity', True),
]
for params in parameter_list:
self._TestBatchNormForcedUpdates(params[0], params[1], params[2], False)
self._TestBatchNormForcedUpdates(params[0], params[1], params[2], True)
def _TestBatchNormForcedUpdates(self, activation, activation_op_name,
fused_batch_norm, use_resource):
"""post_activation bypass quantization should happen with forced updates."""
graph = ops.Graph()
with graph.as_default():
variable_scope.get_variable_scope().set_use_resource(use_resource)
batch_size, height, width, depth = 5, 128, 128, 3
input1 = array_ops.zeros((batch_size, height, width, depth))
input2 = array_ops.zeros((batch_size, height / 2, width / 2, 32))
# Setting updates_collections to None forces updates adding an extra
# identity operation following batch norms.
bn_params = self._BatchNormParams(
fused=fused_batch_norm, force_updates=True)
conv = conv2d(
input1,
32, [5, 5],
stride=2,
padding='SAME',
weights_initializer=self._WeightInit(0.09),
activation_fn=activation,
normalizer_fn=batch_norm,
normalizer_params=bn_params,
scope='test/test')
bypass_tensor = math_ops.add(conv, input2, name='test/add')
# The output of the post_activation bypass will be another layer.
_ = conv2d(
bypass_tensor,
32, [5, 5],
stride=2,
padding='SAME',
weights_initializer=self._WeightInit(0.09),
normalizer_fn=batch_norm,
normalizer_params=bn_params,
activation_fn=activation,
scope='test/unused')
fold_batch_norms.FoldBatchNorms(graph, is_training=True)
quantize.Quantize(graph, is_training=True)
# Ensure that the bypass node is preceded by and followed by a
# FakeQuantWithMinMaxVar operation, since the output of the Add isn't an
# activation.
self.assertTrue('FakeQuantWithMinMaxVars' in
[c.type for c in bypass_tensor.consumers()])
self.assertTrue('FakeQuantWithMinMaxVars' in
[i.op.type for i in bypass_tensor.op.inputs])
with open('/tmp/bn_quant_test.pbtxt', 'w') as f:
f.write(str(graph.as_graph_def()))
def _GetConvScope(self, scope, with_bypass):
if scope is None:
scope = ''
delim = '/' if scope else ''
if with_bypass:
conv_scope = scope + delim + 'test2'
else:
conv_scope = scope
return conv_scope
def _BatchNormParams(self, fused=False, force_updates=False):
params = {
'center': True,
'scale': True,
'decay': 1.0 - 0.003,
'fused': fused
}
if force_updates:
params['updates_collections'] = None
return params
def _WeightInit(self, stddev):
"""Returns truncated normal variable initializer.
Function is defined purely to shorten the name so that it stops wrapping.
Args:
stddev: Standard deviation of normal variable.
Returns:
An initialized that initializes with a truncated normal variable.
"""
return init_ops.truncated_normal_initializer(stddev=stddev)
def _AssertInputOpsAre(self, op, in_op_names):
"""Asserts that all inputs to op come from in_op_names (disregarding order).
Args:
op: Operation to check inputs for.
in_op_names: List of strings, operations where all op's inputs should
come from.
"""
expected_inputs = [in_op_name + ':0' for in_op_name in in_op_names]
self.assertItemsEqual([t.name for t in op.inputs], expected_inputs)
def _AssertOutputGoesToOps(self, op, graph, out_op_names):
"""Asserts that outputs from op go to out_op_names (and perhaps others).
Args:
op: Operation to check outputs for.
graph: Graph where output operations are located.
out_op_names: List of strings, operations where op's outputs should go.
"""
for out_op_name in out_op_names:
out_op = graph.get_operation_by_name(out_op_name)
self.assertIn(op.outputs[0].name, [str(t.name) for t in out_op.inputs])
if __name__ == '__main__':
googletest.main()