blob: e1b10a7adc4c714e8fe2b745477badc0aceb9562 [file] [log] [blame]
#!/usr/bin/python3
# Copyright 2017, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""NN model compiler
Contain classes definition and utilify functions for compiling models and
examples into NDK-based CTS and VTS unit tests.
Used by example_generator.py and spec_visualizer.py
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import copy
from functools import reduce
import argparse
import io
import itertools
import os
import re
import sys
import traceback
import numpy as np
def GetJointStr(l, sep=", ", method=str):
return sep.join([method(i) for i in l])
# Print in C float literal format
def PrettyPrintAsFloat(x):
s = str(float(x))
if s.find(".") >= 0 or s.find("e") >= 0:
return s + "f"
else:
return s + ".0f"
# Transform from original type to float32
def Dequantize(v, ty):
v -= ty.zeroPoint
if ty.scale != 0:
v *= ty.scale
if isinstance(ty.extraParams, SymmPerChannelQuantParams):
v *= ty.extraParams.GetScalesBroadcastArray(ty.dimensions)
return v
# Transform float32 to target data type
def Quantize(v, ty):
if ty.scale != 0:
v /= ty.scale
if isinstance(ty.extraParams, SymmPerChannelQuantParams):
v = v / ty.extraParams.GetScalesBroadcastArray(ty.dimensions)
v += ty.zeroPoint
if not ty.IsFloat():
v = np.round(v)
v = v.astype(int)
if ty.type == "TENSOR_QUANT8_ASYMM":
v = np.minimum(np.maximum(v, 0), 255)
elif ty.type == "TENSOR_QUANT16_ASYMM":
v = np.minimum(np.maximum(v, 0), 65535)
elif ty.type == "TENSOR_QUANT8_SYMM_PER_CHANNEL":
v = np.minimum(np.maximum(v, -127), 127)
elif ty.type == "UINT32":
v = np.maximum(v, 0)
elif ty.type == "TENSOR_QUANT8_ASYMM_SIGNED":
v = np.minimum(np.maximum(v, -128), 127)
return v
# Tracking objects inside a model with a unique name
class NamedObject:
existingNames = set()
def __init__(self, *args, sep="_", showZero=False, startsFrom=0, skipRenaming=False):
name = GetJointStr([i for i in args if i is not None and i != ""], sep=sep)
if skipRenaming:
self.name = name
return
# make the name unique by renaming with a suffix number
uniqueName = name if showZero is False else name + sep + str(startsFrom)
while uniqueName in self.__class__.existingNames:
startsFrom += 1
uniqueName = name + sep + str(startsFrom)
self.__class__.existingNames.add(uniqueName)
self.name = uniqueName
def __str__(self):
return self.name
__repr__ = __str__
# Since names are unique, objects with the same name are considered equal
def __eq__(self, other):
return isinstance(other, NamedObject) and self.name == other.name
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(self.name)
def __lt__(self, other):
return self.name < other.name
# Types, operands should all have a unique name since they share the same namespace
class NamedVariable(NamedObject):
existingNames = set()
def __init__(self, *args, sep="_", showZero=False, startsFrom=0, skipRenaming=False):
NamedObject.__init__(self, *args, sep=sep, showZero=showZero,
startsFrom=startsFrom, skipRenaming=skipRenaming)
# Global variables in the spec namespace such as CreateModel, is_ignored, and examples
class GlobalVariable(NamedVariable):
def __init__(self, *args, skipRenaming=False):
NamedObject.__init__(self, *args, startsFrom=1, skipRenaming=skipRenaming)
# Each test should have a unique name, but will not conflict with variables
class NamedTest(NamedObject):
existingNames = set()
def __init__(self, *args, startsFrom=0, skipRenaming=False):
NamedObject.__init__(self, *args, startsFrom=1, skipRenaming=skipRenaming)
class Type(NamedVariable):
typesMap = dict()
typeLookup = {
"INT32": "int32_t",
"UINT32": "uint32_t",
"FLOAT32": "float",
"FLOAT16": "_Float16",
"TENSOR_INT32": "int32_t",
"TENSOR_FLOAT16": "_Float16",
"TENSOR_FLOAT32": "float",
"TENSOR_QUANT8_ASYMM": "uint8_t",
"TENSOR_QUANT8_SYMM": "int8_t",
"BOOL": "bool8",
"TENSOR_QUANT16_ASYMM": "uint16_t",
"TENSOR_QUANT16_SYMM": "int16_t",
"TENSOR_BOOL8": "bool8",
"TENSOR_QUANT8_SYMM_PER_CHANNEL": "int8_t",
"TENSOR_QUANT8_ASYMM_SIGNED": "int8_t",
# "OEM_SCALAR": this is service-defined.
"TENSOR_OEM_BYTE": "uint8_t",
"SUBGRAPH": "uint32_t", # Index into TestModel::referenced.
}
# types are named as "type0", "type1", ...
def __init__(self, vt, dimensions, scale, zeroPoint, name="type", skipRenaming=False,
extraParams=None):
NamedVariable.__init__(self, name, sep="", showZero=True, skipRenaming=skipRenaming)
self.type = vt
self.dimensions = dimensions
self.scale = float(scale)
self.zeroPoint = int(zeroPoint)
self.extraParams = extraParams
# Factory for Type object, only create a new Type if requested type does
# not have a match with all existing types
@staticmethod
def GetType(vt, dimensions, scale=0, zeroPoint=0, extraParams=None):
assert isinstance(dimensions, (list, tuple)), \
'dimensions must be a list or tuple, got {}'.format(type(dimensions))
key = ",".join([vt, str(dimensions), str(scale), str(zeroPoint), str(extraParams)])
if key not in Type.typesMap:
Type.typesMap[key] = Type(vt, dimensions, scale, zeroPoint, extraParams=extraParams)
return Type.typesMap[key]
@staticmethod
def GetAllTypes():
# sort to ensure a stable order when dumping the code
return sorted(Type.typesMap.values())
# For backward-compatibility
@staticmethod
def GetTypeFromString(vt, shape, extraParams=None):
dimensions, scale, zeroPoint = Type.GetParsedShape(shape)
scale = float(scale)
zeroPoint = int(zeroPoint)
return Type.GetType(vt, dimensions, scale, zeroPoint, extraParams)
# For backward-compatibility
@staticmethod
def GetParsedShape(shape):
# Parse shape
if (shape != "" and shape != "{}"):
left, sep, right = shape.partition('{')
real_shape, sep, right = right.partition('}')
shape = [int(x) for x in real_shape.split(",")]
# left now looks like "0.0f, 127.5f, "
scale, sep, zero_point = right.rpartition(',')
if scale == "":
if zero_point == "":
return shape, "0", "0"
return shape, zero_point, "0"
left, sep, scale = scale.partition(',')
return shape, scale.replace("f", ""), zero_point
else:
return [], "0", "0"
def GetNumberOfElements(self):
return reduce(lambda x,y: x*y, self.dimensions, 1)
def GetCppTypeString(self):
return Type.typeLookup[self.type]
def IsFloat(self):
return self.GetCppTypeString() in ["float", "_Float16"]
def IsBool(self):
return self.GetCppTypeString() == "bool8"
def IsScalar(self):
return not self.type.startswith("TENSOR_")
def GetElementByteSize(self):
cppTypeString = self.GetCppTypeString()
if cppTypeString in ["uint8_t", "int8_t", "bool8"]:
return 1
elif cppTypeString in ["int16_t", "uint16_t", "_Float16"]:
return 2
else:
return 4
def GetByteSize(self):
return self.GetElementByteSize() * self.GetNumberOfElements()
def GetDimensionsString(self):
return "{" + GetJointStr(self.dimensions) + "}"
def GetSignatureTuple(self):
return (self.type, self.dimensions, self.scale, self.zeroPoint)
def ToUnspecifiedDim(self):
return Type.GetType(self.type, [0] * len(self.dimensions), self.scale, self.zeroPoint)
# To track implicitly convertible parameter types
class ImplicitParameter():
@staticmethod
def ImplicitConvertion(value):
if isinstance(value, Operand):
return value
for implicitType in ImplicitParameter.__subclasses__():
if implicitType.IsCompatible(value):
return implicitType("param", value)
assert False, "%s not supported for implicit parameter"%value
# ExtraParams with per-channel quantization.
class SymmPerChannelQuantParams():
def __init__(self, channelDim, scales, hide = False):
self.channelDim = channelDim
self.scales = scales
self.hide = hide
def GetScalesBroadcastArray(self, dimensions):
bshape = [1] * len(dimensions)
bshape[self.channelDim] = len(self.scales)
return np.array(self.scales).reshape(bshape)
def GetConstructor(self):
return "SymmPerChannelQuantParams({%s},%d)" % (
", ".join(str(x) + "f" for x in self.scales), self.channelDim)
def GetVtsSetter(self):
return "channelQuant"
def GetVtsConstructor(self):
return "SymmPerChannelQuantParams{.scales={%s}, .channelDim=%d}" % (
", ".join(str(x) + "f" for x in self.scales), self.channelDim)
# An operand that can be fed into operations. Also, an operand is always
# declared before operations.
class Operand(NamedVariable):
def __init__(self, name, opType, value, backward=None, skipRenaming=False, extraParams=None):
NamedVariable.__init__(self, name, sep="", skipRenaming=skipRenaming)
if type(opType) is str:
self.type = Type.GetTypeFromString(opType, value, extraParams)
value = backward
else:
self.type = Type.GetType(*opType, extraParams=extraParams)
self.SetValue(value)
self.lifetime = "TEMPORARY_VARIABLE"
self.model_index = None
self.ins = []
self.outs = []
def SetValue(self, value):
self.value = value if type(value) is list or type(value) is tuple or value is None \
else [value]
return self
def SetValueFromNumpy(self, value):
self.value = value.flatten().tolist()
return self
def GetValueAsNumpy(self):
return np.array(self.value).reshape(self.type.dimensions)
# Print value as cpp-style list initialization
def GetListInitialization(self):
if self.value is None:
return "{}"
elif self.type.IsFloat():
return "{%s}"%(GetJointStr(self.value, method=PrettyPrintAsFloat))
elif self.type.IsBool():
return "{%s}"%(GetJointStr(self.value, method=lambda v: "true" if v else "false"))
else:
return "{%s}"%(GetJointStr(self.value, method=lambda x: str(int(x))))
def ToUnspecifiedDim(self):
self.dimensions = self.type.dimensions
self.type = self.type.ToUnspecifiedDim()
def ConvertTo(self, DerivedClass, name=None):
assert issubclass(DerivedClass, Operand)
name = self.name if name is None else name
newop = DerivedClass(name, self.type.GetSignatureTuple(), skipRenaming=True,
extraParams=self.type.extraParams)
if not issubclass(DerivedClass, Internal):
newop.SetValue(self.value)
return newop
# Base class of user-defined input/output operand
class InOut(Operand):
def __init__(self, name, opType, backward=None, skipRenaming=False, extraParams=None):
Operand.__init__(self, name, opType, backward, None, skipRenaming=skipRenaming, extraParams=extraParams)
self.lifetime = "SUBGRAPH_INPUT"
self.index = 0
def Feed(self, value):
self.SetValue(value[self] if type(value) is dict else value)
return self
# A user-declared input operand
class Input(InOut):
def __init__(self, name, opType, backward=None, skipRenaming=False, extraParams=None):
InOut.__init__(self, name, opType, backward, skipRenaming=skipRenaming, extraParams=extraParams)
self.lifetime = "SUBGRAPH_INPUT"
# A user-declared output operand
class Output(InOut):
def __init__(self, name, opType, backward=None, skipRenaming=False, extraParams=None):
InOut.__init__(self, name, opType, backward, skipRenaming=skipRenaming, extraParams=extraParams)
self.lifetime = "SUBGRAPH_OUTPUT"
# An output that we don't want to compare the results
class IgnoredOutput(Output):
def __init__(self, name, opType, backward=None, skipRenaming=False, extraParams=None):
Output.__init__(self, name, opType, backward, skipRenaming=skipRenaming, extraParams=extraParams)
self.lifetime = "SUBGRAPH_OUTPUT"
def Feed(self, value):
numElements = reduce(lambda x,y: x*y, self.type.dimensions, 1)
self.value = [0 for x in range(numElements)]
return self
# An explicitly declared parameter
class Parameter(Operand):
def __init__(self, name, opType, value, backward=None, skipRenaming=False, extraParams=None):
Operand.__init__(self, name, opType, value, backward, skipRenaming=skipRenaming,
extraParams=extraParams)
self.initializer = NamedVariable(str(self) + "_init")
if value is None:
self.lifetime = "NO_VALUE"
elif Configuration.useSHM():
self.lifetime = "CONSTANT_REFERENCE"
else:
self.lifetime = "CONSTANT_COPY"
# A shortcut for parameters of INT32
class Int32Scalar(Parameter, ImplicitParameter):
def __init__(self, name, value):
Parameter.__init__(self, name, ("INT32", []), int(value))
@staticmethod
def IsCompatible(value):
return type(value) is int
# A shortcut for parameters of FLOAT16
class Float16Scalar(Parameter, ImplicitParameter):
def __init__(self, name, value):
Parameter.__init__(self, name, ("FLOAT16", []), float(value))
@staticmethod
def IsCompatible(value):
return False
# A shortcut for parameters of FLOAT32
class Float32Scalar(Parameter, ImplicitParameter):
def __init__(self, name, value):
Parameter.__init__(self, name, ("FLOAT32", []), float(value))
@staticmethod
def IsCompatible(value):
return type(value) is float
# A shortcut for parameters of BOOL
class BoolScalar(Parameter, ImplicitParameter):
def __init__(self, name, value):
Parameter.__init__(self, name, ("BOOL", []), bool(value))
@staticmethod
def IsCompatible(value):
return type(value) is bool
# A shortcut for parameter of 1-D TENSOR_INT32
class Int32Vector(Parameter, ImplicitParameter):
def __init__(self, name, value):
Parameter.__init__(self, name, ("TENSOR_INT32", [len(value)]), [int(v) for v in value])
@staticmethod
def IsCompatible(value):
if type(value) is not list and type(value) is not tuple:
return False
return all(type(i) is int for i in value)
# A shortcut for parameter of 1-D TENSOR_FLOAT32
class Float32Vector(Parameter, ImplicitParameter):
def __init__(self, name, value):
Parameter.__init__(self, name, ("TENSOR_FLOAT32", [len(value)]), [float(v) for v in value])
@staticmethod
def IsCompatible(value):
if type(value) is not list and type(value) is not tuple:
return False
return all(type(i) is float for i in value)
# A shortcut for a SUBGRAPH parameter
class SubgraphReference(Parameter, ImplicitParameter):
def __init__(self, name, model):
Parameter.__init__(self, name, ("SUBGRAPH", []), model)
self.lifetime = "SUBGRAPH"
if model.name is None:
model.name = name
@staticmethod
def IsCompatible(value):
return type(value) is Model
# An explicitly declared intermediate result
class Internal(Operand):
def __init__(self, name, opType, backward=None, skipRenaming=False, extraParams=None):
Operand.__init__(self, name, opType, backward, None, skipRenaming=skipRenaming,
extraParams=extraParams)
self.lifetime = "TEMPORARY_VARIABLE"
# An operation in a model, does not need a name
class Operation:
def __init__(self, optype, ins, outs):
self.optype = optype
self.SetInputs(ins)
self.SetOutputs(outs)
# for the ease of debugging
def __str__(self):
insString = GetJointStr(self.ins)
outsString = GetJointStr(self.outs)
return "Operation %s: [%s] -> [%s]"%(self.optype, insString, outsString)
__repr__ = __str__
def SetInputs(self, ins):
self.ins = [ImplicitParameter.ImplicitConvertion(i) for i in ins]
return self
def SetOutputs(self, outs):
self.outs = list(outs)
return self
# Main interface
class Model:
models = list()
def __init__(self, name=None):
self.name = name
self.operations = []
self.operands = []
self.isRelaxed = False
self.compiled = False
self.dumped = False
self.version = FileNames.version
self.referenced_models = None
Model.models.append(self)
def WithSuffix(self, *args):
self.createFunctionName = GlobalVariable("CreateModel", self.name, *args)
self.createTestFunctionName = GlobalVariable("createTestModel", self.name, *args)
self.isIgnoredFunctionName = GlobalVariable("is_ignored", self.name, *args)
return self
def AddOperand(self, operand):
if operand not in self.operands:
self.operands.append(operand)
return self
# Makes sure the model contains all (and only) the given inputs in the
# specified order.
def IdentifyInputs(self, *args):
for arg in args:
self.AddOperand(arg)
inputs = tuple(self.GetInputs())
assert inputs == args, '{} vs {}'.format(inputs, args)
return self
# Makes sure the model contains all (and only) the given outputs in the
# specified order.
def IdentifyOutputs(self, *args):
for arg in args:
self.AddOperand(arg)
outputs = tuple(self.GetOutputs())
assert outputs == args, '{} vs {}'.format(outputs, args)
return self
def AddOperation(self, operation):
self.operations.append(operation)
for i in operation.ins:
self.AddOperand(i)
for o in operation.outs:
self.AddOperand(o)
return self
def Operation(self, op_name, *args):
return self.AddOperation(Operation(op_name, args, []))
def To(self, *args):
assert len(self.operations) > 0
if type(args[0]) is tuple or type(args[0]) is list:
outs = args[0]
else:
outs = args
self.operations[-1].SetOutputs(outs)
for o in outs:
self.AddOperand(o)
return self
def RelaxedExecution(self, isRelaxed):
self.isRelaxed = isRelaxed
return self
# Sets the version of the model in compliance tests. Set to None to disable the test.
def IntroducedIn(self, ver):
self.version = ver
return self
def GetTypes(self):
return sorted(list(set(op.type for op in self.operands)))
def GetInputs(self):
return [i for i in self.operands if isinstance(i, Input)]
def GetOutputs(self):
return [o for o in self.operands if isinstance(o, Output)]
def GetInputsIndex(self):
return [i for i,op in enumerate(self.operands) if isinstance(op, Input)]
def GetOutputsIndex(self):
return [o for o,op in enumerate(self.operands) if isinstance(op, Output)]
def GetIndexOfOperands(self, operands):
return [self.operands.index(i) for i in operands]
def GetIgnoredOutputs(self):
return [o for o in self.operands if isinstance(o, IgnoredOutput)]
def GetParameters(self):
return [p for p in self.operands if isinstance(p, Parameter)]
def GetReferencedModels(self):
assert self.compiled
return self.referenced_models
def GetEquivalentOperands(self, targets):
return [self.operands[self.operands.index(t)] for t in targets]
def UpdateEquivalentOperands(self, targets):
for t in targets:
self.operands[self.operands.index(t)] = t
return self
def SetOperandIndex(self):
for ind, i in enumerate(self.GetInputs()):
i.index = ind
for ind, o in enumerate(self.GetOutputs()):
o.index = ind
for ind, op in enumerate(self.operands):
op.model_index = ind
return self
def SetOperandInsAndOuts(self):
for op in self.operands:
op.ins = list()
op.outs = list()
for op in self.operations:
op.ins = self.GetEquivalentOperands(op.ins)
op.outs = self.GetEquivalentOperands(op.outs)
for i in op.ins:
i.outs.append(op)
for o in op.outs:
o.ins.append(op)
return self
def TopologicalSortHelper(self, op, deps, visited):
if op in visited:
assert op not in deps, "Cycle detected in the graph"
else:
visited.add(op)
for i in deps[op]:
self.TopologicalSortHelper(i, deps, visited)
self.operations.append(op)
deps.pop(op)
# Topological sort of the operations, and detect if there is a cycle is the graph
def TopologicalSort(self):
deps = {op: list() for op in self.operations}
[deps[o].append(i) for op in self.operands for o in op.outs for i in op.ins]
operations = self.operations.copy()
self.operations = []
visited = set()
for op in operations:
self.TopologicalSortHelper(op, deps, visited)
def CompileReferencedModels(self, referenced_models, referenced_model_to_index):
for operand in self.operands:
if operand.lifetime != "SUBGRAPH":
continue
model = operand.value[0]
key = id(model)
if key not in referenced_model_to_index:
referenced_model_to_index[key] = len(referenced_model_to_index)
referenced_models.append(model)
model.Compile(referenced_models, referenced_model_to_index)
operand.value = [referenced_model_to_index[key]]
def Compile(self, referenced_models=None, referenced_model_to_index=None):
if self.compiled:
return self
if referenced_models is None:
# This is the main model.
referenced_models = []
referenced_model_to_index = {}
self.referenced_models = referenced_models
self.SetOperandIndex()
self.SetOperandInsAndOuts()
self.TopologicalSort()
self.CompileReferencedModels(referenced_models, referenced_model_to_index)
# Do not check compliance for relaxed mode tests.
if self.isRelaxed:
self.IntroducedIn(None)
self.compiled = True
return self
def Feed(self, feedDict):
for i in self.GetInputs():
i.Feed(feedDict[0])
for o in self.GetOutputs():
o.Feed(feedDict[1])
return self
# To track implicitly convertible variation types
class ImplicitVariation:
@staticmethod
def ImplicitConvertion(value):
if isinstance(value, ModelVariation):
return value
for implicitType in ImplicitVariation.__subclasses__():
value = value if type(value) is tuple or type(value) is list else [value]
if implicitType.IsCompatible(value[0]):
var = implicitType(value[0])
if len(value) > 1:
var.Identify(*value[1:])
return var
assert False, "%s not supported for implicit variation"%value[0]
# An exception indicating that the current variation list should be skipped.
class SkipVariation(Exception):
pass
# The base class for model variations
class ModelVariation:
supportsSubgraphs = False
def __init__(self, name=None):
self.targetOperands = {}
self.name = name
# Apply the model variation.
def ApplyTo(self, model):
assert not model.compiled
assert not model.dumped
if not self.supportsSubgraphs:
containsSubgraphs = any(operand.lifetime == "SUBGRAPH" for operand in model.operands)
assert not containsSubgraphs, "Variation {} does not support subgraphs".format(
self.__class__.__name__)
if not self.targetOperands:
self.AutoIdentify(model)
# Transform operands and model.
targets = model.GetEquivalentOperands(sorted(self.targetOperands.keys()))
model.UpdateEquivalentOperands(
[self.TransformOperand(op, self.targetOperands[op]) for op in targets])
model = self.TransformModel(model)
return model
def IdentifyOperands(self, args=None):
if args is None:
return self
self.targetOperands = args if type(args) is dict else {i: None for i in args}
return self
def Identify(self, operandArgs=None, paramArgs=None):
self.IdentifyOperands(operandArgs)
return self
# Set variation to its default name
def SetToDefaultName(self):
self.name = ""
return self
# Automatically select the target operand list
def AutoIdentify(self, model):
return self
# Transform operands that are marked by IdentifyOperands()
def TransformOperand(self, op, arg=None):
return op
# Transform the model
def TransformModel(self, model):
return model
# Default variation that does nothing
class DefaultVariation(ModelVariation):
supportsSubgraphs = True
def __init__(self, name=None):
ModelVariation.__init__(self, name=name)
# Convert operand data type
class DataTypeConverter(ModelVariation, ImplicitVariation):
supportsSubgraphs = True
def __init__(self, targetType=None, name=None, scale=None, zeroPoint=None):
ModelVariation.__init__(self, name=name)
if targetType is not None:
assert DataTypeConverter.IsCompatible(targetType)
self.targetType = targetType
self.scale = scale
self.zeroPoint = zeroPoint
@staticmethod
def IsCompatible(value):
return value.lower() in ["float16", "int32", "quant8", "quant8_signed"]
def SetToDefaultName(self):
if self.targetType is not None:
self.name = self.targetType.lower()
return self
targetTypes = list(zip(*(arg for arg in self.targetOperands.values()
if type(arg) is not DataTypeConverter)))[0]
if "TENSOR_QUANT8_SYMM_PER_CHANNEL" in targetTypes:
self.name = "channelQuant8"
elif "TENSOR_QUANT8_ASYMM" in targetTypes:
self.name = "quant8"
elif "TENSOR_QUANT8_ASYMM_SIGNED" in targetTypes:
self.name = "quant8_signed"
elif "TENSOR_INT32" in targetTypes:
self.name = "int32"
elif "TENSOR_FLOAT16" in targetTypes:
self.name = "float16"
else:
self.name = "float32"
return self
def AutoIdentify(self, model):
if self.targetType is not None:
if self.targetType == "quant8" or self.targetType == "quant8_signed":
if self.targetType == "quant8":
tensorType = "TENSOR_QUANT8_ASYMM"
else:
tensorType = "TENSOR_QUANT8_ASYMM_SIGNED"
assert self.scale is not None
assert self.zeroPoint is not None
tensorType = [tensorType, self.scale, self.zeroPoint]
scalarType = None # Not supported.
else:
tensorType = ["TENSOR_" + self.targetType.upper()]
scalarType = [self.targetType.upper()]
# By default, select all the float32 tensors/scalars
targets = dict()
targets.update({op: DataTypeConverter(self.targetType, self.name,
self.scale, self.zeroPoint)
for op in model.operands if op.type.type == "SUBGRAPH"})
targets.update({op: tensorType
for op in model.operands if op.type.type == "TENSOR_FLOAT32"})
if scalarType is not None:
targets.update({op: scalarType
for op in model.operands if op.type.type == "FLOAT32"})
self.Identify(targets)
return self
def TransformOperand(self, op, arg=None):
if type(arg) is DataTypeConverter:
# Handle nested SUBGRAPHs
assert len(op.value) == 1
assert type(op.value[0]) is Model
op.value[0] = arg.ApplyTo(op.value[0])
return op
if len(arg) == 1:
typeTuple = (arg[0], op.type.dimensions)
else:
typeTuple = (arg[0], op.type.dimensions, *arg[1:])
# To handle Internal operands
if op.value is None or op.type.GetNumberOfElements() == 0:
op.type = Type.GetType(*typeTuple)
else:
v = Dequantize(op.GetValueAsNumpy().astype(np.float32), op.type)
op.type = Type.GetType(*typeTuple)
v = Quantize(v, op.type)
op.SetValueFromNumpy(v)
return op
# Convert model to turn on/off relaxed computation
class RelaxedModeConverter(ModelVariation, ImplicitVariation):
supportsSubgraphs = True
def __init__(self, isRelaxed=True, name=None):
ModelVariation.__init__(self, name=name)
if isinstance(isRelaxed, bool):
self.isRelaxed = isRelaxed
else:
assert RelaxedModeConverter.IsCompatible(isRelaxed.lower())
self.isRelaxed = True
@staticmethod
def IsCompatible(value):
return value.lower() in ["relaxed"]
def SetToDefaultName(self):
self.name = "relaxed" if self.isRelaxed else "float"
return self
def TransformModel(self, model):
model.RelaxedExecution(self.isRelaxed)
return model
# Convert data layout between "NHWC" amd "NCHW"
class DataLayoutConverter(ModelVariation, ImplicitVariation):
def __init__(self, targetLayout="nchw", name=None):
ModelVariation.__init__(self, name=name)
self.targetLayout = targetLayout.lower()
assert DataLayoutConverter.IsCompatible(self.targetLayout)
self.perm = (0, 3, 1, 2) if self.targetLayout == "nchw" else (0, 2, 3, 1)
self.param = True if self.targetLayout == "nchw" else False
@staticmethod
def IsCompatible(value):
return value.lower() in ["nhwc", "nchw"]
def SetToDefaultName(self):
self.name = self.targetLayout
return self
def TransformOperand(self, op, arg=None):
if len(op.type.dimensions) == 4:
# To handle Internal operands
if op.value is not None and op.type.GetNumberOfElements() != 0:
op.SetValueFromNumpy(op.GetValueAsNumpy().transpose(self.perm))
newDim = [op.type.dimensions[i] for i in self.perm]
op.type = Type.GetType(op.type.type, newDim, op.type.scale, op.type.zeroPoint)
elif len(op.type.dimensions) == 1 and len(op.value) == 4:
op.SetValueFromNumpy(op.GetValueAsNumpy()[list(self.perm)])
elif op.type.type == "BOOL":
op.SetValue(self.param)
else:
assert False, "%s not supported by DataLayoutConverter"%op
return op
# Convert data by tansposing and removing axis
class AxisConverter(ModelVariation):
def __init__(self, origin, target, dim, drop=[], name=None):
ModelVariation.__init__(self, name=name)
self.origin = origin
self.target = target
assert all(i >= -dim and i < dim for i in [self.origin, self.target])
self.dim = dim
self.perm = list(range(dim))
self.perm.insert(target if target >= 0 else target + dim, self.perm.pop(origin))
self.drop = [drop] if type(drop) is int else list(drop)
assert all(i >= -dim and i < dim for i in self.drop)
self.drop = [i if i >= 0 else i + dim for i in self.drop]
assert target not in self.drop and target + dim not in self.drop
def SetToDefaultName(self):
axis = self.target if self.target >= 0 else self.target + self.dim
axis -= sum(i < axis for i in self.drop)
neg = "" if self.target >= 0 else "_neg"
self.name = "dim%d_axis%d%s"%(self.dim - len(self.drop), axis, neg)
return self
def TransposeAxis(self, op):
if op.type.type == "INT32":
op.SetValue(self.target)
elif len(op.type.dimensions) == self.dim:
# To handle Internal operands
if op.value is not None:
op.SetValueFromNumpy(op.GetValueAsNumpy().transpose(self.perm))
newDim = [op.type.dimensions[i] for i in self.perm]
op.type = Type.GetType(op.type.type, newDim, op.type.scale, op.type.zeroPoint)
else:
assert False, "%s not supported by AxisConverter"%op
return op
def RemoveAxis(self, op):
if op.type.type == "INT32":
if op.value[0] >= 0:
op.SetValue(op.value[0] - sum(i < op.value[0] for i in self.drop))
else:
op.SetValue(op.value[0] + sum(i > (op.value[0] + self.dim) for i in self.drop))
elif len(op.type.dimensions) == self.dim:
if op.value is not None:
val = op.GetValueAsNumpy()
for i in sorted(self.drop, reverse=True):
val = np.take(val, 0, axis=i)
op.SetValueFromNumpy(val)
newDim = [op.type.dimensions[i] for i in range(self.dim) if i not in self.drop]
op.type = Type.GetType(op.type.type, newDim, op.type.scale, op.type.zeroPoint)
else:
assert False, "%s not supported by AxisConverter"%op
return op
def TransformOperand(self, op, arg=None):
op = self.TransposeAxis(op)
op = self.RemoveAxis(op)
return op
# Convert Output based on activation
class ActivationConverter(ModelVariation, ImplicitVariation):
# (Enum, low, high)
actMap = {
"none": (0, None, None),
"relu": (1, 0.0, None),
"relu1": (2, -1.0, 1.0),
"relu6": (3, 0.0, 6.0),
}
def __init__(self, act="relu", name=None):
ModelVariation.__init__(self, name=name)
self.act = act.lower()
assert ActivationConverter.IsCompatible(self.act)
self.enum = ActivationConverter.actMap[self.act][0]
self.low = ActivationConverter.actMap[self.act][1]
self.high = ActivationConverter.actMap[self.act][2]
@staticmethod
def IsCompatible(value):
return value.lower() in ActivationConverter.actMap.keys()
def SetToDefaultName(self):
self.name = self.act
return self
def TransformOperand(self, op, arg=None):
if op.type.type == "INT32": # activation enum
return op.SetValue(self.enum)
else:
assert isinstance(op, Output)
v = op.GetValueAsNumpy()
if self.low is not None:
low = Quantize(self.low, op.type)
v = np.maximum(v, low)
if self.high is not None:
high = Quantize(self.high, op.type)
v = np.minimum(v, high)
return op.SetValueFromNumpy(v)
# Convert all constant tensors as model inputs.
class AllTensorsAsInputsConverter(ModelVariation):
supportsSubgraphs = True
def __init__(self, name=None):
ModelVariation.__init__(self, name=name)
def SetToDefaultName(self):
self.name = "all_tensors_as_inputs"
return self
def TransformModel(self, model):
if len(model.operations) != 1:
raise SkipVariation
# Find all constant tensors.
tensorParams = [
p for p in model.operands
if type(p) is Parameter and not p.type.IsScalar() and p.value is not None
]
if not tensorParams:
raise SkipVariation
# Convert to model inputs.
model.UpdateEquivalentOperands([op.ConvertTo(Input) for op in tensorParams])
return model
def CompatibleWithADD(op):
return (len(op.type.dimensions) <= 4 and
len(op.value) > 0 and
op.type.type in ["TENSOR_FLOAT32", "TENSOR_QUANT8_ASYMM",
"TENSOR_FLOAT16", "TENSOR_QUANT8_ASYMM_SIGNED"])
# Add a dummy ADD operation before each model input to make it as an internal operand.
class AllInputsAsInternalCoverter(ModelVariation):
supportsSubgraphs = True
def __init__(self, name=None):
ModelVariation.__init__(self, name=name)
def SetToDefaultName(self):
self.name = "all_inputs_as_internal"
return self
def TransformModel(self, model):
if len(model.operations) != 1:
raise SkipVariation
# Find all input tensors that can be an output of the ADD operation.
modelInputs = [i for i in model.GetInputs() if CompatibleWithADD(i)]
if not modelInputs:
raise SkipVariation
# Make every input an output of a dummy operation: input_new ADD dummy = input.
for op in modelInputs:
newInput = op.ConvertTo(Input, name=op.name + "_new")
dummyParam = Parameter("dummy", (op.type.type, [1], op.type.scale, op.type.zeroPoint),
[op.type.zeroPoint])
model.Operation("ADD", newInput, dummyParam, 0).To(op)
# Convert to internal operands.
model.UpdateEquivalentOperands([op.ConvertTo(Internal) for op in modelInputs])
return model
# Add a dummy ADD operation after each model output to make it as an internal operand.
class AllOutputsAsInternalCoverter(ModelVariation):
supportsSubgraphs = True
def __init__(self, name=None):
ModelVariation.__init__(self, name=name)
def SetToDefaultName(self):
self.name = "all_outputs_as_internal"
return self
def TransformModel(self, model):
if len(model.operations) != 1:
raise SkipVariation
# Find all output tensors that can be an input to an ADD operation.
modelOutputs = [o for o in model.GetOutputs() if CompatibleWithADD(o)]
if not modelOutputs:
raise SkipVariation
# Make every output an input of a dummy operation: output ADD dummy = output_new.
for op in modelOutputs:
newOutput = op.ConvertTo(Output, name=op.name + "_new")
dummyParam = Parameter("dummy", (op.type.type, [1], op.type.scale, op.type.zeroPoint),
[op.type.zeroPoint])
model.Operation("ADD", op, dummyParam, 0).To(newOutput)
# Convert to internal operands.
model.UpdateEquivalentOperands([op.ConvertTo(Internal) for op in modelOutputs])
return model
# An example is always attached to a model, and could have multiple variations
class Example:
examples = []
versionOverrides = {}
def __init__(self, *args, model=None, name=None):
self.model = Model.models[-1] if model is None else model
self.name = name
self.expectedMultinomialDistributionTolerance = 0
self.expectFailure = False
self.testDynamicOutputShape = True
self.testLifeTimeVariation = True
self.feedDicts = []
for feedDict in args:
if type(feedDict) is tuple or type(feedDict) is list:
self.feedDicts.append(feedDict)
elif type(feedDict) is dict:
self.feedDicts.append((
{i: feedDict[i] for i in self.model.GetInputs()},
{o: feedDict[o] for o in self.model.GetOutputs()}
))
else:
assert False
self.variations = []
Example.examples.append(self)
@staticmethod
def SetVersion(ver, *args):
for name in args:
Example.versionOverrides[name] = ver
# Main entrance of test generator
@staticmethod
def DumpAllExamples(DumpModel=None, model_fd=None,
DumpExample=None, example_fd=None,
DumpTest=None, test_fd=None):
Example.CombineAllExamples()
for example in Example.examples:
example.Dump(DumpModel, model_fd, DumpExample, example_fd, DumpTest, test_fd)
# Combine examples with the same model, same name, and same set of variations
@staticmethod
def CombineAllExamples():
modelMap = {}
newExamples = []
for example in Example.examples:
key = (example.model, example.name, tuple(tuple(e) for e in example.variations))
if key in modelMap:
modelMap[key].Combine(example)
else:
modelMap[key] = example
newExamples.append(example)
Example.examples = newExamples
def AddVariations(self, *args, includeDefault=True, defaultName=None):
self.variations.append([DefaultVariation(defaultName)] if includeDefault else [])
self.variations[-1].extend(ImplicitVariation.ImplicitConvertion(i) for i in args)
return self
def AddNchw(self, *args, includeDefault=True, defaultName="nhwc"):
var = DataLayoutConverter("nchw").Identify(args)
self.AddVariations(var, includeDefault=includeDefault, defaultName=defaultName)
return self
def AddRelaxed(self, isRelaxed=True, includeDefault=True, defaultName=None):
var = RelaxedModeConverter(isRelaxed)
self.AddVariations(var, includeDefault=includeDefault, defaultName=defaultName)
return self
def AddRelu(self, *args, includeDefault=True, defaultName=None):
var = ActivationConverter("relu").Identify(args)
self.AddVariations(var, includeDefault=includeDefault, defaultName=defaultName)
return self
def AddAllActivations(self, *args):
var = [ActivationConverter(i).Identify(args)
for i in sorted(ActivationConverter.actMap.keys())]
self.AddVariations(*var, includeDefault=False)
return self
def GuessOriginalAxisAndDim(self, *args):
origin = None
dim = None
for arg in args:
if arg.type.type == "INT32":
origin = arg.value[0]
else:
if dim is None:
dim = len(arg.type.dimensions)
else:
assert dim == len(arg.type.dimensions)
assert dim is not None
origin = dim - 1 if origin is None else origin
origin = origin + dim if origin < 0 else origin
return origin, dim
def AddAxis(self, axis, *args, includeDefault=True, defaultName=None):
origin, dim = self.GuessOriginalAxisAndDim(*args)
axis = [axis] if type(axis) is int else list(axis)
var = [AxisConverter(origin, a, dim).Identify(args) for a in axis]
self.AddVariations(*var, includeDefault=includeDefault, defaultName=defaultName)
return self
def AddAllPositiveAxis(self, *args):
origin, dim = self.GuessOriginalAxisAndDim(*args)
var = [AxisConverter(origin, a, dim).Identify(args) for a in range(dim)]
self.AddVariations(*var, includeDefault=False)
return self
def AddAllAxis(self, *args):
origin, dim = self.GuessOriginalAxisAndDim(*args)
var = [AxisConverter(origin, a, dim).Identify(args) for a in range(-dim, dim)]
self.AddVariations(*var, includeDefault=False)
return self
def AddDims(self, dims, *args, includeDefault=True, defaultName=None):
origin, dim = self.GuessOriginalAxisAndDim(*args)
dims = [dims] if type(dims) is int else list(dims)
drop = list(range(dim))
drop.pop(origin)
var = [AxisConverter(origin, origin, dim, drop[0:(dim-i)]).Identify(args) for i in dims]
self.AddVariations(*var, includeDefault=includeDefault, defaultName=defaultName)
return self
def AddAllDims(self, *args):
origin, dim = self.GuessOriginalAxisAndDim(*args)
drop = list(range(dim))
drop.pop(origin)
var = [AxisConverter(origin, origin, dim, drop[0:i]).Identify(args) for i in range(dim)]
self.AddVariations(*var, includeDefault=False)
return self
def AddAllDimsAndPositiveAxis(self, *args):
origin, dim = self.GuessOriginalAxisAndDim(*args)
var = [AxisConverter(origin, j, dim, range(i)).Identify(args) \
for i in range(dim) for j in range(i, dim)]
self.AddVariations(*var, includeDefault=False)
return self
def AddAllDimsAndAxis(self, *args):
origin, dim = self.GuessOriginalAxisAndDim(*args)
var = [AxisConverter(origin, k, dim, range(i)).Identify(args) \
for i in range(dim) for j in range(i, dim) for k in [j, j - dim]]
self.AddVariations(*var, includeDefault=False)
return self
def Combine(self, other):
assert self.model is other.model, "Only examples targetting the same model can be combined"
assert tuple(self.variations) == tuple(other.variations), \
"Only examples with the same set of variations can be combined"
assert self.name == other.name, "Only examples with the same name can be combined"
self.feedDicts.extend(other.feedDicts)
return self
def Dump(self, DumpModel, model_fd, DumpExample, example_fd, DumpTest, test_fd):
if self.testLifeTimeVariation and len(self.model.operations) == 1 and \
self.expectedMultinomialDistributionTolerance == 0:
self.AddVariations(AllTensorsAsInputsConverter())
self.AddVariations(AllInputsAsInternalCoverter())
[v.SetToDefaultName() for vs in self.variations for v in vs if v.name is None]
for feedDict in self.feedDicts:
self.model.Feed(feedDict)
for variationList in itertools.product(*self.variations):
modelOrigin = self.model
self.model = copy.deepcopy(self.model)
# Apply variations
try:
for variation in variationList:
self.model = variation.ApplyTo(self.model)
except SkipVariation:
self.model = modelOrigin
continue
# Concat names for test and examples
varNames = [v.name for v in variationList]
self.testName = NamedTest(FileNames.specName, self.model.name, self.name, *varNames)
self.examplesName = GlobalVariable("test_model", self.model.name, self.name,
*varNames)
if str(self.testName) in Example.versionOverrides:
self.model.IntroducedIn(Example.versionOverrides[str(self.testName)])
self.model.WithSuffix(*varNames).Compile()
# Dump files
if DumpExample is not None and example_fd is not None:
DumpExample(self, example_fd)
if DumpTest is not None and test_fd is not None:
DumpTest(self, test_fd)
# Restore model before variation
self.model = modelOrigin
return self
# Specifies the RANDOM_MULTINOMIAL distribution tolerance.
# If set to greater than zero, the input is compared as log-probabilities
# to the output and must be within this tolerance to pass.
def WithMultinomialDistributionTolerance(self, expectedTolerance):
assert self.expectFailure is False
self.expectedMultinomialDistributionTolerance = expectedTolerance
return self
# Specifies that this example is expected to fail during compilation or execution.
def ExpectFailure(self):
assert self.expectedMultinomialDistributionTolerance == 0
self.expectFailure = True
return self
def DisableDynamicOutputShapeVariation(self):
self.testDynamicOutputShape = False
return self
def DisableLifeTimeVariation(self):
self.testLifeTimeVariation = False
return self
class FileNames:
specFiles = []
specNames = []
exampleFiles = []
specFile = ""
specName = ""
exampleFile = ""
version = ""
fileIndex = 0
@staticmethod
def InitializeFileLists(spec, example):
# get all spec files and target files
if os.path.isfile(spec):
FileNames.specFiles = [os.path.abspath(spec)]
elif os.path.isdir(spec):
FileNames.specFiles = sorted([os.path.abspath(os.path.join(spec, f))
for f in os.listdir(spec) if f.endswith(".mod.py")])
else:
assert False, "%s is neither a file or a directory"%spec
FileNames.specNames = [re.sub(r"\..*", "", os.path.basename(f))
for f in FileNames.specFiles]
FileNames.exampleFiles = FileNames.ParseTargetFiles(example, ".example.cpp")
@staticmethod
def ParseTargetFiles(arg, ext):
numFiles = len(FileNames.specFiles)
if arg is None:
return [None] * numFiles
absPath = os.path.abspath(arg)
if os.path.isdir(arg):
target = [os.path.join(absPath, f + ext) for f in FileNames.specNames]
elif arg == "-":
target = ["-"] * numFiles
else:
target = [absPath] * numFiles
return target
@staticmethod
def NextFile():
if FileNames.fileIndex >= len(FileNames.specFiles):
return False
FileNames.specFile = FileNames.specFiles[FileNames.fileIndex]
FileNames.specName = FileNames.specNames[FileNames.fileIndex]
FileNames.exampleFile = FileNames.exampleFiles[FileNames.fileIndex]
FileNames.fileIndex += 1
NamedObject.existingNames = set()
NamedVariable.existingNames = set()
NamedTest.existingNames = set()
Type.typesMap = dict()
Model.models = list()
Example.examples = list()
Configuration.use_shm_for_weights = False
# Extract version from absolute file path.
versionMatch = re.findall(r"/V\d_\d/", FileNames.specFile)
if len(versionMatch) == 1:
FileNames.version = versionMatch[0].strip('/')
else:
FileNames.version = None
return True
class Configuration:
use_shm_for_weights = False
hook_mode = False
@staticmethod
def useSHM():
return Configuration.use_shm_for_weights
def GetTestGeneratorMTime():
tgFiles = ['test_generator.py', 'example_generator.py']
tgDir = os.path.dirname(__file__)
return max(os.path.getmtime(os.path.join(tgDir, filename))
for filename in tgFiles)
def MightNeedRegeneration():
specTime = os.path.getmtime(FileNames.specFile)
tgTime = GetTestGeneratorMTime()
return not os.path.exists(FileNames.exampleFile) or \
os.path.getmtime(FileNames.exampleFile) <= max(specTime, tgTime)
def Read(filename):
with open(filename) as reader:
return reader.read()
def AtomicWrite(filename, data):
# os.replace(src, dest) may fail if src and dest are on diffrent
# filesystems.
tempFile = filename + '.tmp'
try:
with open(tempFile, 'w') as writer:
writer.write(data)
os.replace(tempFile, filename)
tempFile = None
finally:
if tempFile is not None and os.path.exists(tempFile):
os.remove(tempFile)
def GetExecScope():
return dict(
ActivationConverter=ActivationConverter,
AllInputsAsInternalCoverter=AllInputsAsInternalCoverter,
AllOutputsAsInternalCoverter=AllOutputsAsInternalCoverter,
AllTensorsAsInputsConverter=AllTensorsAsInputsConverter,
BoolScalar=BoolScalar,
Configuration=Configuration,
DataLayoutConverter=DataLayoutConverter,
DataTypeConverter=DataTypeConverter,
Example=Example,
Float16Scalar=Float16Scalar,
Float32Scalar=Float32Scalar,
Float32Vector=Float32Vector,
IgnoredOutput=IgnoredOutput,
Input=Input,
Int32Scalar=Int32Scalar,
Int32Vector=Int32Vector,
Internal=Internal,
Model=Model,
Operand=Operand,
Output=Output,
Parameter=Parameter,
RelaxedModeConverter=RelaxedModeConverter,
SubgraphReference=SubgraphReference,
SymmPerChannelQuantParams=SymmPerChannelQuantParams)
def ArgumentParser():
parser = argparse.ArgumentParser()
parser.add_argument("spec", help="the spec file or directory")
parser.add_argument("--hook", help="hook mode", action='store_true')
return parser
def ParseArgs(parser):
args = parser.parse_args()
Configuration.hook_mode = args.hook
return args
def Run(InitializeFiles=None, DumpExample=None):
exec_scope = GetExecScope()
while FileNames.NextFile():
try:
if not MightNeedRegeneration():
continue
exec(Read(FileNames.specFile), exec_scope)
example_buf = io.StringIO() if FileNames.exampleFile else None
InitializeFiles(example_fd=example_buf)
Example.DumpAllExamples(DumpExample=DumpExample, example_fd=example_buf)
if FileNames.exampleFile is None:
continue
if Configuration.hook_mode and (not os.path.exists(FileNames.exampleFile) or
Read(FileNames.exampleFile) != example_buf.getvalue()):
print(('\n{filename} is out of date. '
'Please run {generate_all_tests_sh} before uploading.\n').format(
filename=FileNames.exampleFile,
generate_all_tests_sh=os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', '..', 'runtime', 'test',
'specs', 'generate_all_tests.sh'))))
sys.exit(1)
AtomicWrite(FileNames.exampleFile, example_buf.getvalue())
except Exception:
traceback.print_exc()
sys.exit("Exception raised when processing {}".format(FileNames.specFile))