blob: e4cc415211450143b3c109bda89d3bc64bd0908d [file] [log] [blame]
# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Core Keras layers.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import copy
import sys
import textwrap
import types as python_types
import warnings
import numpy as np
from tensorflow.python.eager import backprop
from tensorflow.python.eager import context
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.framework import tensor_shape
from tensorflow.python.keras import activations
from tensorflow.python.keras import backend as K
from tensorflow.python.keras import constraints
from tensorflow.python.keras import initializers
from tensorflow.python.keras import regularizers
from tensorflow.python.keras.engine.base_layer import Layer
from tensorflow.python.keras.engine.input_spec import InputSpec
from tensorflow.python.keras.utils import conv_utils
from tensorflow.python.keras.utils import generic_utils
from tensorflow.python.keras.utils import tf_utils
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import gen_math_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import nn
from tensorflow.python.ops import sparse_ops
from tensorflow.python.ops import standard_ops
from tensorflow.python.ops import variable_scope
from tensorflow.python.platform import tf_logging
from tensorflow.python.training.tracking import base as trackable
from tensorflow.python.util import nest
from tensorflow.python.util import tf_inspect
from tensorflow.python.util.tf_export import keras_export
@keras_export('keras.layers.Masking')
class Masking(Layer):
"""Masks a sequence by using a mask value to skip timesteps.
For each timestep in the input tensor (dimension #1 in the tensor),
if all values in the input tensor at that timestep
are equal to `mask_value`, then the timestep will be masked (skipped)
in all downstream layers (as long as they support masking).
If any downstream layer does not support masking yet receives such
an input mask, an exception will be raised.
Example:
Consider a Numpy data array `x` of shape `(samples, timesteps, features)`,
to be fed to an LSTM layer. You want to mask timestep #3 and #5 because you
lack data for these timesteps. You can:
- Set `x[:, 3, :] = 0.` and `x[:, 5, :] = 0.`
- Insert a `Masking` layer with `mask_value=0.` before the LSTM layer:
```python
samples, timesteps, features = 32, 10, 8
inputs = np.random.random([samples, timesteps, features]).astype(np.float32)
inputs[:, 3, :] = 0.
inputs[:, 5, :] = 0.
model = tf.keras.models.Sequential()
model.add(tf.keras.layers.Masking(mask_value=0.,
input_shape=(timesteps, features)))
model.add(tf.keras.layers.LSTM(32))
output = model(inputs)
# The time step 3 and 5 will be skipped from LSTM calculation.
```
See [the masking and padding
guide](https://www.tensorflow.org/guide/keras/masking_and_padding)
for more details.
"""
def __init__(self, mask_value=0., **kwargs):
super(Masking, self).__init__(**kwargs)
self.supports_masking = True
self.mask_value = mask_value
self._compute_output_and_mask_jointly = True
def compute_mask(self, inputs, mask=None):
return K.any(math_ops.not_equal(inputs, self.mask_value), axis=-1)
def call(self, inputs):
boolean_mask = K.any(
math_ops.not_equal(inputs, self.mask_value), axis=-1, keepdims=True)
outputs = inputs * math_ops.cast(boolean_mask, inputs.dtype)
# Compute the mask and outputs simultaneously.
outputs._keras_mask = array_ops.squeeze(boolean_mask, axis=-1) # pylint: disable=protected-access
return outputs
def compute_output_shape(self, input_shape):
return input_shape
def get_config(self):
config = {'mask_value': self.mask_value}
base_config = super(Masking, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
@keras_export('keras.layers.Dropout')
class Dropout(Layer):
"""Applies Dropout to the input.
The Dropout layer randomly sets input units to 0 with a frequency of `rate`
at each step during training time, which helps prevent overfitting.
Inputs not set to 0 are scaled up by 1/(1 - rate) such that the sum over
all inputs is unchanged.
Note that the Dropout layer only applies when `training` is set to True
such that no values are dropped during inference. When using `model.fit`,
`training` will be appropriately set to True automatically, and in other
contexts, you can set the kwarg explicitly to True when calling the layer.
(This is in contrast to setting `trainable=False` for a Dropout layer.
`trainable` does not affect the layer's behavior, as Dropout does
not have any variables/weights that can be frozen during training.)
>>> tf.random.set_seed(0)
>>> layer = tf.keras.layers.Dropout(.2, input_shape=(2,))
>>> data = np.arange(10).reshape(5, 2).astype(np.float32)
>>> print(data)
[[0. 1.]
[2. 3.]
[4. 5.]
[6. 7.]
[8. 9.]]
>>> outputs = layer(data, training=True)
>>> print(outputs)
tf.Tensor(
[[ 0. 1.25]
[ 2.5 3.75]
[ 5. 6.25]
[ 7.5 8.75]
[10. 0. ]], shape=(5, 2), dtype=float32)
Arguments:
rate: Float between 0 and 1. Fraction of the input units to drop.
noise_shape: 1D integer tensor representing the shape of the
binary dropout mask that will be multiplied with the input.
For instance, if your inputs have shape
`(batch_size, timesteps, features)` and
you want the dropout mask to be the same for all timesteps,
you can use `noise_shape=(batch_size, 1, features)`.
seed: A Python integer to use as random seed.
Call arguments:
inputs: Input tensor (of any rank).
training: Python boolean indicating whether the layer should behave in
training mode (adding dropout) or in inference mode (doing nothing).
"""
def __init__(self, rate, noise_shape=None, seed=None, **kwargs):
super(Dropout, self).__init__(**kwargs)
self.rate = rate
self.noise_shape = noise_shape
self.seed = seed
self.supports_masking = True
def _get_noise_shape(self, inputs):
# Subclasses of `Dropout` may implement `_get_noise_shape(self, inputs)`,
# which will override `self.noise_shape`, and allows for custom noise
# shapes with dynamically sized inputs.
if self.noise_shape is None:
return None
concrete_inputs_shape = array_ops.shape(inputs)
noise_shape = []
for i, value in enumerate(self.noise_shape):
noise_shape.append(concrete_inputs_shape[i] if value is None else value)
return ops.convert_to_tensor_v2(noise_shape)
def call(self, inputs, training=None):
if training is None:
training = K.learning_phase()
def dropped_inputs():
return nn.dropout(
inputs,
noise_shape=self._get_noise_shape(inputs),
seed=self.seed,
rate=self.rate)
output = tf_utils.smart_cond(training,
dropped_inputs,
lambda: array_ops.identity(inputs))
return output
def compute_output_shape(self, input_shape):
return input_shape
def get_config(self):
config = {
'rate': self.rate,
'noise_shape': self.noise_shape,
'seed': self.seed
}
base_config = super(Dropout, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
@keras_export('keras.layers.SpatialDropout1D')
class SpatialDropout1D(Dropout):
"""Spatial 1D version of Dropout.
This version performs the same function as Dropout, however it drops
entire 1D feature maps instead of individual elements. If adjacent frames
within feature maps are strongly correlated (as is normally the case in
early convolution layers) then regular dropout will not regularize the
activations and will otherwise just result in an effective learning rate
decrease. In this case, SpatialDropout1D will help promote independence
between feature maps and should be used instead.
Arguments:
rate: Float between 0 and 1. Fraction of the input units to drop.
Call arguments:
inputs: A 3D tensor.
training: Python boolean indicating whether the layer should behave in
training mode (adding dropout) or in inference mode (doing nothing).
Input shape:
3D tensor with shape:
`(samples, timesteps, channels)`
Output shape:
Same as input.
References:
- [Efficient Object Localization Using Convolutional
Networks](https://arxiv.org/abs/1411.4280)
"""
def __init__(self, rate, **kwargs):
super(SpatialDropout1D, self).__init__(rate, **kwargs)
self.input_spec = InputSpec(ndim=3)
def _get_noise_shape(self, inputs):
input_shape = array_ops.shape(inputs)
noise_shape = (input_shape[0], 1, input_shape[2])
return noise_shape
@keras_export('keras.layers.SpatialDropout2D')
class SpatialDropout2D(Dropout):
"""Spatial 2D version of Dropout.
This version performs the same function as Dropout, however it drops
entire 2D feature maps instead of individual elements. If adjacent pixels
within feature maps are strongly correlated (as is normally the case in
early convolution layers) then regular dropout will not regularize the
activations and will otherwise just result in an effective learning rate
decrease. In this case, SpatialDropout2D will help promote independence
between feature maps and should be used instead.
Arguments:
rate: Float between 0 and 1. Fraction of the input units to drop.
data_format: 'channels_first' or 'channels_last'.
In 'channels_first' mode, the channels dimension
(the depth) is at index 1,
in 'channels_last' mode is it at index 3.
It defaults to the `image_data_format` value found in your
Keras config file at `~/.keras/keras.json`.
If you never set it, then it will be "channels_last".
Call arguments:
inputs: A 4D tensor.
training: Python boolean indicating whether the layer should behave in
training mode (adding dropout) or in inference mode (doing nothing).
Input shape:
4D tensor with shape:
`(samples, channels, rows, cols)` if data_format='channels_first'
or 4D tensor with shape:
`(samples, rows, cols, channels)` if data_format='channels_last'.
Output shape:
Same as input.
References:
- [Efficient Object Localization Using Convolutional
Networks](https://arxiv.org/abs/1411.4280)
"""
def __init__(self, rate, data_format=None, **kwargs):
super(SpatialDropout2D, self).__init__(rate, **kwargs)
if data_format is None:
data_format = K.image_data_format()
if data_format not in {'channels_last', 'channels_first'}:
raise ValueError('data_format must be in '
'{"channels_last", "channels_first"}')
self.data_format = data_format
self.input_spec = InputSpec(ndim=4)
def _get_noise_shape(self, inputs):
input_shape = array_ops.shape(inputs)
if self.data_format == 'channels_first':
return (input_shape[0], input_shape[1], 1, 1)
elif self.data_format == 'channels_last':
return (input_shape[0], 1, 1, input_shape[3])
@keras_export('keras.layers.SpatialDropout3D')
class SpatialDropout3D(Dropout):
"""Spatial 3D version of Dropout.
This version performs the same function as Dropout, however it drops
entire 3D feature maps instead of individual elements. If adjacent voxels
within feature maps are strongly correlated (as is normally the case in
early convolution layers) then regular dropout will not regularize the
activations and will otherwise just result in an effective learning rate
decrease. In this case, SpatialDropout3D will help promote independence
between feature maps and should be used instead.
Arguments:
rate: Float between 0 and 1. Fraction of the input units to drop.
data_format: 'channels_first' or 'channels_last'.
In 'channels_first' mode, the channels dimension (the depth)
is at index 1, in 'channels_last' mode is it at index 4.
It defaults to the `image_data_format` value found in your
Keras config file at `~/.keras/keras.json`.
If you never set it, then it will be "channels_last".
Call arguments:
inputs: A 5D tensor.
training: Python boolean indicating whether the layer should behave in
training mode (adding dropout) or in inference mode (doing nothing).
Input shape:
5D tensor with shape:
`(samples, channels, dim1, dim2, dim3)` if data_format='channels_first'
or 5D tensor with shape:
`(samples, dim1, dim2, dim3, channels)` if data_format='channels_last'.
Output shape:
Same as input.
References:
- [Efficient Object Localization Using Convolutional
Networks](https://arxiv.org/abs/1411.4280)
"""
def __init__(self, rate, data_format=None, **kwargs):
super(SpatialDropout3D, self).__init__(rate, **kwargs)
if data_format is None:
data_format = K.image_data_format()
if data_format not in {'channels_last', 'channels_first'}:
raise ValueError('data_format must be in '
'{"channels_last", "channels_first"}')
self.data_format = data_format
self.input_spec = InputSpec(ndim=5)
def _get_noise_shape(self, inputs):
input_shape = array_ops.shape(inputs)
if self.data_format == 'channels_first':
return (input_shape[0], input_shape[1], 1, 1, 1)
elif self.data_format == 'channels_last':
return (input_shape[0], 1, 1, 1, input_shape[4])
@keras_export('keras.layers.Activation')
class Activation(Layer):
"""Applies an activation function to an output.
Arguments:
activation: Activation function, such as `tf.nn.relu`, or string name of
built-in activation function, such as "relu".
Usage:
>>> layer = tf.keras.layers.Activation('relu')
>>> output = layer([-3.0, -1.0, 0.0, 2.0])
>>> list(output.numpy())
[0.0, 0.0, 0.0, 2.0]
>>> layer = tf.keras.layers.Activation(tf.nn.relu)
>>> output = layer([-3.0, -1.0, 0.0, 2.0])
>>> list(output.numpy())
[0.0, 0.0, 0.0, 2.0]
Input shape:
Arbitrary. Use the keyword argument `input_shape`
(tuple of integers, does not include the batch axis)
when using this layer as the first layer in a model.
Output shape:
Same shape as input.
"""
def __init__(self, activation, **kwargs):
super(Activation, self).__init__(**kwargs)
self.supports_masking = True
self.activation = activations.get(activation)
def call(self, inputs):
return self.activation(inputs)
def compute_output_shape(self, input_shape):
return input_shape
def get_config(self):
config = {'activation': activations.serialize(self.activation)}
base_config = super(Activation, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
@keras_export('keras.layers.Reshape')
class Reshape(Layer):
"""Layer that reshapes inputs into the given shape.
Input shape:
Arbitrary, although all dimensions in the input shape must be known/fixed.
Use the keyword argument `input_shape` (tuple of integers, does not include
the samples/batch size axis) when using this layer as the first layer
in a model.
Output shape:
`(batch_size,) + target_shape`
Example:
>>> # as first layer in a Sequential model
>>> model = tf.keras.Sequential()
>>> model.add(tf.keras.layers.Reshape((3, 4), input_shape=(12,)))
>>> # model.output_shape == (None, 3, 4), `None` is the batch size.
>>> model.output_shape
(None, 3, 4)
>>> # as intermediate layer in a Sequential model
>>> model.add(tf.keras.layers.Reshape((6, 2)))
>>> model.output_shape
(None, 6, 2)
>>> # also supports shape inference using `-1` as dimension
>>> model.add(tf.keras.layers.Reshape((-1, 2, 2)))
>>> model.output_shape
(None, None, 2, 2)
"""
def __init__(self, target_shape, **kwargs):
"""Creates a `tf.keras.layers.Reshape` layer instance.
Args:
target_shape: Target shape. Tuple of integers, does not include the
samples dimension (batch size).
**kwargs: Any additional layer keyword arguments.
"""
super(Reshape, self).__init__(**kwargs)
self.target_shape = tuple(target_shape)
def _fix_unknown_dimension(self, input_shape, output_shape):
"""Find and replace a missing dimension in an output shape.
This is a near direct port of the internal Numpy function
`_fix_unknown_dimension` in `numpy/core/src/multiarray/shape.c`
Arguments:
input_shape: Shape of array being reshaped
output_shape: Desired shape of the array with at most
a single -1 which indicates a dimension that should be
derived from the input shape.
Returns:
The new output shape with a -1 replaced with its computed value.
Raises:
ValueError: If the total array size of the output_shape is
different than the input_shape, or more than one unknown dimension
is specified.
"""
output_shape = list(output_shape)
msg = 'total size of new array must be unchanged'
known, unknown = 1, None
for index, dim in enumerate(output_shape):
if dim < 0:
if unknown is None:
unknown = index
else:
raise ValueError('Can only specify one unknown dimension.')
else:
known *= dim
original = np.prod(input_shape, dtype=int)
if unknown is not None:
if known == 0 or original % known != 0:
raise ValueError(msg)
output_shape[unknown] = original // known
elif original != known:
raise ValueError(msg)
return output_shape
def compute_output_shape(self, input_shape):
input_shape = tensor_shape.TensorShape(input_shape).as_list()
if None in input_shape[1:]:
output_shape = [input_shape[0]]
# input shape (partially) unknown? replace -1's with None's
output_shape += tuple(s if s != -1 else None for s in self.target_shape)
else:
output_shape = [input_shape[0]]
output_shape += self._fix_unknown_dimension(input_shape[1:],
self.target_shape)
return tensor_shape.TensorShape(output_shape)
def call(self, inputs):
return array_ops.reshape(inputs,
(array_ops.shape(inputs)[0],) + self.target_shape)
def get_config(self):
config = {'target_shape': self.target_shape}
base_config = super(Reshape, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
@keras_export('keras.layers.Permute')
class Permute(Layer):
"""Permutes the dimensions of the input according to a given pattern.
Useful for e.g. connecting RNNs and convnets together.
Example:
```python
model = Sequential()
model.add(Permute((2, 1), input_shape=(10, 64)))
# now: model.output_shape == (None, 64, 10)
# note: `None` is the batch dimension
```
Arguments:
dims: Tuple of integers. Permutation pattern, does not include the
samples dimension. Indexing starts at 1.
For instance, `(2, 1)` permutes the first and second dimensions
of the input.
Input shape:
Arbitrary. Use the keyword argument `input_shape`
(tuple of integers, does not include the samples axis)
when using this layer as the first layer in a model.
Output shape:
Same as the input shape, but with the dimensions re-ordered according
to the specified pattern.
"""
def __init__(self, dims, **kwargs):
super(Permute, self).__init__(**kwargs)
self.dims = tuple(dims)
if sorted(dims) != list(range(1, len(dims) + 1)):
raise ValueError(
'Invalid permutation `dims` for Permute Layer: %s. '
'The set of indices in `dims` must be consecutive and start from 1.' %
(dims,))
self.input_spec = InputSpec(ndim=len(self.dims) + 1)
def compute_output_shape(self, input_shape):
input_shape = tensor_shape.TensorShape(input_shape).as_list()
output_shape = copy.copy(input_shape)
for i, dim in enumerate(self.dims):
target_dim = input_shape[dim]
output_shape[i + 1] = target_dim
return tensor_shape.TensorShape(output_shape)
def call(self, inputs):
return array_ops.transpose(inputs, perm=(0,) + self.dims)
def get_config(self):
config = {'dims': self.dims}
base_config = super(Permute, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
@keras_export('keras.layers.Flatten')
class Flatten(Layer):
"""Flattens the input. Does not affect the batch size.
If inputs are shaped `(batch,)` without a channel dimension, then flattening
adds an extra channel dimension and output shapes are `(batch, 1)`.
Arguments:
data_format: A string,
one of `channels_last` (default) or `channels_first`.
The ordering of the dimensions in the inputs.
`channels_last` corresponds to inputs with shape
`(batch, ..., channels)` while `channels_first` corresponds to
inputs with shape `(batch, channels, ...)`.
It defaults to the `image_data_format` value found in your
Keras config file at `~/.keras/keras.json`.
If you never set it, then it will be "channels_last".
Example:
```python
model = Sequential()
model.add(Convolution2D(64, 3, 3,
border_mode='same',
input_shape=(3, 32, 32)))
# now: model.output_shape == (None, 64, 32, 32)
model.add(Flatten())
# now: model.output_shape == (None, 65536)
```
"""
def __init__(self, data_format=None, **kwargs):
super(Flatten, self).__init__(**kwargs)
self.data_format = conv_utils.normalize_data_format(data_format)
self.input_spec = InputSpec(min_ndim=1)
def call(self, inputs):
if (self.data_format == 'channels_first'
and K.ndim(inputs) is not None and K.ndim(inputs) > 1):
permutation = [0]
permutation.extend(range(2, K.ndim(inputs)))
permutation.append(1)
inputs = array_ops.transpose(inputs, perm=permutation)
input_shape = inputs.shape
if input_shape[1:].is_fully_defined():
flattened_dim = tensor_shape.dimension_value(
np.prod(input_shape[1:], dtype=int))
# Temporary fix for integer overflow issue.
if flattened_dim > np.iinfo(np.int32).max:
shape_dtype = dtypes.int64
else:
shape_dtype = dtypes.int32
outputs = array_ops.reshape(
inputs, constant_op.constant((-1, flattened_dim), dtype=shape_dtype))
else:
batch_size = tensor_shape.dimension_value(inputs.shape[0])
if batch_size:
# Temporary fix for integer overflow issue.
if batch_size > np.iinfo(np.int32).max:
shape_dtype = dtypes.int64
else:
shape_dtype = dtypes.int32
outputs = array_ops.reshape(
inputs, constant_op.constant((batch_size, -1), dtype=shape_dtype))
else:
outputs = array_ops.reshape(inputs, (array_ops.shape(inputs)[0], -1))
if not context.executing_eagerly():
outputs.set_shape(self.compute_output_shape(inputs.shape))
return outputs
def compute_output_shape(self, input_shape):
input_shape = tensor_shape.as_shape(input_shape).as_list()
if not input_shape:
output_shape = tensor_shape.TensorShape([1])
else:
output_shape = [input_shape[0]]
if np.all(input_shape[1:]):
output_shape += [np.prod(input_shape[1:], dtype=int)]
else:
output_shape += [None]
return tensor_shape.TensorShape(output_shape)
def get_config(self):
config = {'data_format': self.data_format}
base_config = super(Flatten, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
@keras_export('keras.layers.RepeatVector')
class RepeatVector(Layer):
"""Repeats the input n times.
Example:
```python
model = Sequential()
model.add(Dense(32, input_dim=32))
# now: model.output_shape == (None, 32)
# note: `None` is the batch dimension
model.add(RepeatVector(3))
# now: model.output_shape == (None, 3, 32)
```
Arguments:
n: Integer, repetition factor.
Input shape:
2D tensor of shape `(num_samples, features)`.
Output shape:
3D tensor of shape `(num_samples, n, features)`.
"""
def __init__(self, n, **kwargs):
super(RepeatVector, self).__init__(**kwargs)
self.n = n
self.input_spec = InputSpec(ndim=2)
def compute_output_shape(self, input_shape):
input_shape = tensor_shape.TensorShape(input_shape).as_list()
return tensor_shape.TensorShape([input_shape[0], self.n, input_shape[1]])
def call(self, inputs):
return K.repeat(inputs, self.n)
def get_config(self):
config = {'n': self.n}
base_config = super(RepeatVector, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
@keras_export('keras.layers.Lambda')
class Lambda(Layer):
"""Wraps arbitrary expressions as a `Layer` object.
The `Lambda` layer exists so that arbitrary TensorFlow functions
can be used when constructing `Sequential` and Functional API
models. `Lambda` layers are best suited for simple operations or
quick experimentation. For more advanced usecases, follow
[this guide](https://www.tensorflow.org/guide/keras/custom_layers_and_models)
for subclassing `tf.keras.layers.Layer`.
The main reason to subclass `tf.keras.layers.Layer` instead of using a
`Lambda` layer is saving and inspecting a Model. `Lambda` layers
are saved by serializing the Python bytecode, whereas subclassed
Layers can be saved via overriding their `get_config` method. Overriding
`get_config` improves the portability of Models. Models that rely on
subclassed Layers are also often easier to visualize and reason about.
Examples:
```python
# add a x -> x^2 layer
model.add(Lambda(lambda x: x ** 2))
```
```python
# add a layer that returns the concatenation
# of the positive part of the input and
# the opposite of the negative part
def antirectifier(x):
x -= K.mean(x, axis=1, keepdims=True)
x = K.l2_normalize(x, axis=1)
pos = K.relu(x)
neg = K.relu(-x)
return K.concatenate([pos, neg], axis=1)
model.add(Lambda(antirectifier))
```
Variables:
While it is possible to use Variables with Lambda layers, this practice is
discouraged as it can easily lead to bugs. For instance, consider the
following layer:
```python
scale = tf.Variable(1.)
scale_layer = tf.keras.layers.Lambda(lambda x: x * scale)
```
Because scale_layer does not directly track the `scale` variable, it will
not appear in `scale_layer.trainable_weights` and will therefore not be
trained if `scale_layer` is used in a Model.
A better pattern is to write a subclassed Layer:
```python
class ScaleLayer(tf.keras.layers.Layer):
def __init__(self):
super(ScaleLayer, self).__init__()
self.scale = tf.Variable(1.)
def call(self, inputs):
return inputs * self.scale
```
In general, Lambda layers can be convenient for simple stateless
computation, but anything more complex should use a subclass Layer instead.
Arguments:
function: The function to be evaluated. Takes input tensor as first
argument.
output_shape: Expected output shape from function. This argument can be
inferred if not explicitly provided. Can be a tuple or function. If a
tuple, it only specifies the first dimension onward;
sample dimension is assumed either the same as the input: `output_shape =
(input_shape[0], ) + output_shape` or, the input is `None` and
the sample dimension is also `None`: `output_shape = (None, ) +
output_shape` If a function, it specifies the entire shape as a function
of the
input shape: `output_shape = f(input_shape)`
mask: Either None (indicating no masking) or a callable with the same
signature as the `compute_mask` layer method, or a tensor that will be
returned as output mask regardless what the input is.
arguments: Optional dictionary of keyword arguments to be passed to the
function.
Input shape: Arbitrary. Use the keyword argument input_shape (tuple of
integers, does not include the samples axis) when using this layer as the
first layer in a model.
Output shape: Specified by `output_shape` argument
"""
@trackable.no_automatic_dependency_tracking
def __init__(self, function, output_shape=None, mask=None, arguments=None,
**kwargs):
super(Lambda, self).__init__(**kwargs)
self.arguments = arguments or {}
self.function = function
if mask is not None:
self.supports_masking = True
self.mask = mask
self._supports_ragged_inputs = True
self._output_shape = output_shape
# Warning on every invocation will be quite irksome in Eager mode.
self._already_warned = False
function_args = tf_inspect.getfullargspec(function).args
self._fn_expects_training_arg = 'training' in function_args
self._fn_expects_mask_arg = 'mask' in function_args
@tf_utils.shape_type_conversion
def compute_output_shape(self, input_shape):
if self._output_shape is None:
# Make use of existing autocomputation but provide Lambda-specific
# error message. This is always safe to run even when the outer context
# is Graph mode because Lambda layers don't have side effects such as
# `add_loss`.
with context.eager_mode():
try:
return super(Lambda, self).compute_output_shape(input_shape)
except NotImplementedError:
raise NotImplementedError(
'We could not automatically infer the shape of the Lambda\'s '
'output. Please specify `output_shape` for this Lambda.')
if callable(self._output_shape):
output_shapes = self._output_shape(input_shape)
return tf_utils.convert_shapes(output_shapes, to_tuples=False)
# Output shapes are passed directly and don't include batch dimension.
input_tensor_shape = tf_utils.convert_shapes(input_shape, to_tuples=False)
batch_size = nest.flatten(input_tensor_shape)[0][0] if input_shape else None
def _add_batch(shape):
return tensor_shape.TensorShape([batch_size] + shape.as_list())
output_shapes = tf_utils.convert_shapes(self._output_shape, to_tuples=False)
return nest.map_structure(_add_batch, output_shapes)
def call(self, inputs, mask=None, training=None):
# We must copy for thread safety, but it only needs to be a shallow copy.
kwargs = {k: v for k, v in self.arguments.items()}
if self._fn_expects_mask_arg:
kwargs['mask'] = mask
if self._fn_expects_training_arg:
kwargs['training'] = training
created_variables = []
def _variable_creator(next_creator, **kwargs):
var = next_creator(**kwargs)
created_variables.append(var)
return var
with backprop.GradientTape(watch_accessed_variables=True) as tape,\
variable_scope.variable_creator_scope(_variable_creator):
result = self.function(inputs, **kwargs)
self._check_variables(created_variables, tape.watched_variables())
return result
def _check_variables(self, created_variables, accessed_variables):
if not created_variables and not accessed_variables:
# In the common case that a Lambda layer does not touch a Variable, we
# don't want to incur the runtime cost of assembling any state used for
# checking only to immediately discard it.
return
tracked_weights = set(v.ref() for v in self.weights)
untracked_new_vars = [
v for v in created_variables if v.ref() not in tracked_weights
]
if untracked_new_vars:
variable_str = '\n'.join(' {}'.format(i) for i in untracked_new_vars)
error_str = textwrap.dedent(
'''
The following Variables were created within a Lambda layer ({name})
but are not tracked by said layer:
{variable_str}
The layer cannot safely ensure proper Variable reuse across multiple
calls, and consquently this behavior is disallowed for safety. Lambda
layers are not well suited to stateful computation; instead, writing a
subclassed Layer is the recommend way to define layers with
Variables.'''
).format(name=self.name, variable_str=variable_str)
raise ValueError(error_str)
untracked_used_vars = [
v for v in accessed_variables if v.ref() not in tracked_weights
]
if untracked_used_vars and not self._already_warned:
variable_str = '\n'.join(' {}'.format(i) for i in untracked_used_vars)
self._warn(textwrap.dedent(
'''
The following Variables were used a Lambda layer's call ({name}), but
are not present in its tracked objects:
{variable_str}
It is possible that this is intended behavior, but it is more likely
an omission. This is a strong indication that this layer should be
formulated as a subclassed Layer rather than a Lambda layer.'''
).format(name=self.name, variable_str=variable_str))
self._already_warned = True
def _warn(self, msg):
# This method will be overridden in a unit test to raise an error, because
# self.assertWarns is not universally implemented.
return tf_logging.warn(msg)
def compute_mask(self, inputs, mask=None):
if callable(self.mask):
return self.mask(inputs, mask)
return self.mask
def get_config(self):
function_config = self._serialize_function_to_config(self.function)
output_shape_config = self._serialize_function_to_config(self._output_shape,
allow_raw=True)
config = {
'function': function_config[0],
'function_type': function_config[1],
'module': function_config[2],
'output_shape': output_shape_config[0],
'output_shape_type': output_shape_config[1],
'output_shape_module': output_shape_config[2],
}
if self.mask is not None:
mask_config = self._serialize_function_to_config(self.mask)
config.update({
'mask': mask_config[0],
'mask_type': mask_config[1],
'mask_module': mask_config[2]
})
config['arguments'] = self.arguments
base_config = super(Lambda, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def _serialize_function_to_config(self, inputs, allow_raw=False):
if isinstance(inputs, python_types.LambdaType):
output = generic_utils.func_dump(inputs)
output_type = 'lambda'
module = inputs.__module__
elif callable(inputs):
output = inputs.__name__
output_type = 'function'
module = inputs.__module__
elif allow_raw:
output = inputs
output_type = 'raw'
module = None
else:
raise ValueError(
'Invalid input for serialization, type: %s ' % type(inputs))
return output, output_type, module
@classmethod
def from_config(cls, config, custom_objects=None):
config = config.copy()
function = cls._parse_function_from_config(
config, custom_objects, 'function', 'module', 'function_type')
output_shape = cls._parse_function_from_config(
config, custom_objects, 'output_shape', 'output_shape_module',
'output_shape_type')
if 'mask' in config:
mask = cls._parse_function_from_config(
config, custom_objects, 'mask', 'mask_module', 'mask_type')
else:
mask = None
config['function'] = function
config['output_shape'] = output_shape
config['mask'] = mask
# If arguments were numpy array, they have been saved as
# list. We need to recover the ndarray
if 'arguments' in config:
for key in config['arguments']:
if isinstance(config['arguments'][key], dict):
arg_dict = config['arguments'][key]
if 'type' in arg_dict and arg_dict['type'] == 'ndarray':
# Overwrite the argument with its numpy translation
config['arguments'][key] = np.array(arg_dict['value'])
return cls(**config)
@classmethod
def _parse_function_from_config(
cls, config, custom_objects, func_attr_name, module_attr_name,
func_type_attr_name):
globs = globals()
module = config.pop(module_attr_name, None)
if module in sys.modules:
globs.update(sys.modules[module].__dict__)
elif module is not None:
# Note: we don't know the name of the function if it's a lambda.
warnings.warn('{} is not loaded, but a Lambda layer uses it. '
'It may cause errors.'.format(module)
, UserWarning)
if custom_objects:
globs.update(custom_objects)
function_type = config.pop(func_type_attr_name)
if function_type == 'function':
# Simple lookup in custom objects
function = generic_utils.deserialize_keras_object(
config[func_attr_name],
custom_objects=custom_objects,
printable_module_name='function in Lambda layer')
elif function_type == 'lambda':
# Unsafe deserialization from bytecode
function = generic_utils.func_load(
config[func_attr_name], globs=globs)
elif function_type == 'raw':
function = config[func_attr_name]
else:
raise TypeError('Unknown function type:', function_type)
return function
@keras_export('keras.layers.Dense')
class Dense(Layer):
"""Just your regular densely-connected NN layer.
`Dense` implements the operation:
`output = activation(dot(input, kernel) + bias)`
where `activation` is the element-wise activation function
passed as the `activation` argument, `kernel` is a weights matrix
created by the layer, and `bias` is a bias vector created by the layer
(only applicable if `use_bias` is `True`).
Note: If the input to the layer has a rank greater than 2, then `Dense`
computes the dot product between the `inputs` and the `kernel` along the
last axis of the `inputs` and axis 1 of the `kernel` (using `tf.tensordot`).
For example, if input has dimensions `(batch_size, d0, d1)`,
then we create a `kernel` with shape `(d1, units)`, and the `kernel` operates
along axis 2 of the `input`, on every sub-tensor of shape `(1, 1, d1)`
(there are `batch_size * d0` such sub-tensors).
The output in this case will have shape `(batch_size, d0, units)`.
Besides, layer attributes cannot be modified after the layer has been called
once (except the `trainable` attribute).
Example:
```python
# as first layer in a sequential model:
model = Sequential()
model.add(Dense(32, input_shape=(16,)))
# now the model will take as input arrays of shape (*, 16)
# and output arrays of shape (*, 32)
# after the first layer, you don't need to specify
# the size of the input anymore:
model.add(Dense(32))
```
Arguments:
units: Positive integer, dimensionality of the output space.
activation: Activation function to use.
If you don't specify anything, no activation is applied
(ie. "linear" activation: `a(x) = x`).
use_bias: Boolean, whether the layer uses a bias vector.
kernel_initializer: Initializer for the `kernel` weights matrix.
bias_initializer: Initializer for the bias vector.
kernel_regularizer: Regularizer function applied to
the `kernel` weights matrix.
bias_regularizer: Regularizer function applied to the bias vector.
activity_regularizer: Regularizer function applied to
the output of the layer (its "activation")..
kernel_constraint: Constraint function applied to
the `kernel` weights matrix.
bias_constraint: Constraint function applied to the bias vector.
Input shape:
N-D tensor with shape: `(batch_size, ..., input_dim)`.
The most common situation would be
a 2D input with shape `(batch_size, input_dim)`.
Output shape:
N-D tensor with shape: `(batch_size, ..., units)`.
For instance, for a 2D input with shape `(batch_size, input_dim)`,
the output would have shape `(batch_size, units)`.
"""
def __init__(self,
units,
activation=None,
use_bias=True,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
kernel_regularizer=None,
bias_regularizer=None,
activity_regularizer=None,
kernel_constraint=None,
bias_constraint=None,
**kwargs):
if 'input_shape' not in kwargs and 'input_dim' in kwargs:
kwargs['input_shape'] = (kwargs.pop('input_dim'),)
super(Dense, self).__init__(
activity_regularizer=regularizers.get(activity_regularizer), **kwargs)
self.units = int(units) if not isinstance(units, int) else units
self.activation = activations.get(activation)
self.use_bias = use_bias
self.kernel_initializer = initializers.get(kernel_initializer)
self.bias_initializer = initializers.get(bias_initializer)
self.kernel_regularizer = regularizers.get(kernel_regularizer)
self.bias_regularizer = regularizers.get(bias_regularizer)
self.kernel_constraint = constraints.get(kernel_constraint)
self.bias_constraint = constraints.get(bias_constraint)
self.supports_masking = True
self.input_spec = InputSpec(min_ndim=2)
def build(self, input_shape):
dtype = dtypes.as_dtype(self.dtype or K.floatx())
if not (dtype.is_floating or dtype.is_complex):
raise TypeError('Unable to build `Dense` layer with non-floating point '
'dtype %s' % (dtype,))
input_shape = tensor_shape.TensorShape(input_shape)
if tensor_shape.dimension_value(input_shape[-1]) is None:
raise ValueError('The last dimension of the inputs to `Dense` '
'should be defined. Found `None`.')
last_dim = tensor_shape.dimension_value(input_shape[-1])
self.input_spec = InputSpec(min_ndim=2, axes={-1: last_dim})
self.kernel = self.add_weight(
'kernel',
shape=[last_dim, self.units],
initializer=self.kernel_initializer,
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint,
dtype=self.dtype,
trainable=True)
if self.use_bias:
self.bias = self.add_weight(
'bias',
shape=[self.units,],
initializer=self.bias_initializer,
regularizer=self.bias_regularizer,
constraint=self.bias_constraint,
dtype=self.dtype,
trainable=True)
else:
self.bias = None
self.built = True
def call(self, inputs):
rank = inputs.shape.rank
if rank is not None and rank > 2:
# Broadcasting is required for the inputs.
outputs = standard_ops.tensordot(inputs, self.kernel, [[rank - 1], [0]])
# Reshape the output back to the original ndim of the input.
if not context.executing_eagerly():
shape = inputs.shape.as_list()
output_shape = shape[:-1] + [self.units]
outputs.set_shape(output_shape)
else:
inputs = math_ops.cast(inputs, self._compute_dtype)
if K.is_sparse(inputs):
outputs = sparse_ops.sparse_tensor_dense_matmul(inputs, self.kernel)
else:
outputs = gen_math_ops.mat_mul(inputs, self.kernel)
if self.use_bias:
outputs = nn.bias_add(outputs, self.bias)
if self.activation is not None:
return self.activation(outputs) # pylint: disable=not-callable
return outputs
def compute_output_shape(self, input_shape):
input_shape = tensor_shape.TensorShape(input_shape)
input_shape = input_shape.with_rank_at_least(2)
if tensor_shape.dimension_value(input_shape[-1]) is None:
raise ValueError(
'The innermost dimension of input_shape must be defined, but saw: %s'
% input_shape)
return input_shape[:-1].concatenate(self.units)
def get_config(self):
config = {
'units': self.units,
'activation': activations.serialize(self.activation),
'use_bias': self.use_bias,
'kernel_initializer': initializers.serialize(self.kernel_initializer),
'bias_initializer': initializers.serialize(self.bias_initializer),
'kernel_regularizer': regularizers.serialize(self.kernel_regularizer),
'bias_regularizer': regularizers.serialize(self.bias_regularizer),
'activity_regularizer':
regularizers.serialize(self.activity_regularizer),
'kernel_constraint': constraints.serialize(self.kernel_constraint),
'bias_constraint': constraints.serialize(self.bias_constraint)
}
base_config = super(Dense, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
@keras_export('keras.layers.ActivityRegularization')
class ActivityRegularization(Layer):
"""Layer that applies an update to the cost function based input activity.
Arguments:
l1: L1 regularization factor (positive float).
l2: L2 regularization factor (positive float).
Input shape:
Arbitrary. Use the keyword argument `input_shape`
(tuple of integers, does not include the samples axis)
when using this layer as the first layer in a model.
Output shape:
Same shape as input.
"""
def __init__(self, l1=0., l2=0., **kwargs):
super(ActivityRegularization, self).__init__(
activity_regularizer=regularizers.L1L2(l1=l1, l2=l2), **kwargs)
self.supports_masking = True
self.l1 = l1
self.l2 = l2
def compute_output_shape(self, input_shape):
return input_shape
def get_config(self):
config = {'l1': self.l1, 'l2': self.l2}
base_config = super(ActivityRegularization, self).get_config()
return dict(list(base_config.items()) + list(config.items()))