blob: f24a01ee1ce9123556151c48e441c9006f59bff4 [file] [log] [blame]
## @package seq2seq_util
# Module caffe2.python.examples.seq2seq_util
""" A bunch of util functions to build Seq2Seq models with Caffe2."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import collections
from future.utils import viewitems
import caffe2.proto.caffe2_pb2 as caffe2_pb2
from caffe2.python import core, rnn_cell, brew
PAD_ID = 0
PAD = '<PAD>'
GO_ID = 1
GO = '<GO>'
EOS_ID = 2
EOS = '<EOS>'
UNK_ID = 3
UNK = '<UNK>'
def gen_vocab(corpus, unk_threshold):
vocab = collections.defaultdict(lambda: len(vocab))
freqs = collections.defaultdict(lambda: 0)
# Adding padding tokens to the vocabulary to maintain consistency with IDs
vocab[PAD]
vocab[GO]
vocab[EOS]
vocab[UNK]
with open(corpus) as f:
for sentence in f:
tokens = sentence.strip().split()
for token in tokens:
freqs[token] += 1
for token, freq in viewitems(freqs):
if freq > unk_threshold:
vocab[token]
return vocab
def get_numberized_sentence(sentence, vocab):
numerized_sentence = []
for token in sentence.strip().split():
if token in vocab:
numerized_sentence.append(vocab[token])
else:
numerized_sentence.append(vocab[UNK])
return numerized_sentence
def build_embeddings(
model,
vocab_size,
embedding_size,
name,
freeze_embeddings,
):
embeddings = model.param_init_net.GaussianFill(
[],
name,
shape=[vocab_size, embedding_size],
std=0.1,
)
if not freeze_embeddings:
model.params.append(embeddings)
return embeddings
def rnn_unidirectional_encoder(
model,
embedded_inputs,
input_lengths,
initial_hidden_state,
initial_cell_state,
embedding_size,
encoder_num_units,
use_attention,
scope=None,
):
""" Unidirectional (forward pass) LSTM encoder."""
outputs, final_hidden_state, _, final_cell_state = rnn_cell.LSTM(
model=model,
input_blob=embedded_inputs,
seq_lengths=input_lengths,
initial_states=(initial_hidden_state, initial_cell_state),
dim_in=embedding_size,
dim_out=encoder_num_units,
scope=(scope + '/' if scope else '') + 'encoder',
outputs_with_grads=([0] if use_attention else [1, 3]),
)
return outputs, final_hidden_state, final_cell_state
def rnn_bidirectional_encoder(
model,
embedded_inputs,
input_lengths,
initial_hidden_state,
initial_cell_state,
embedding_size,
encoder_num_units,
use_attention,
scope=None,
):
""" Bidirectional (forward pass and backward pass) LSTM encoder."""
# Forward pass
(
outputs_fw,
final_hidden_state_fw,
_,
final_cell_state_fw,
) = rnn_cell.LSTM(
model=model,
input_blob=embedded_inputs,
seq_lengths=input_lengths,
initial_states=(initial_hidden_state, initial_cell_state),
dim_in=embedding_size,
dim_out=encoder_num_units,
scope=(scope + '/' if scope else '') + 'forward_encoder',
outputs_with_grads=([0] if use_attention else [1, 3]),
)
# Backward pass
reversed_embedded_inputs = model.net.ReversePackedSegs(
[embedded_inputs, input_lengths],
['reversed_embedded_inputs'],
)
(
outputs_bw,
final_hidden_state_bw,
_,
final_cell_state_bw,
) = rnn_cell.LSTM(
model=model,
input_blob=reversed_embedded_inputs,
seq_lengths=input_lengths,
initial_states=(initial_hidden_state, initial_cell_state),
dim_in=embedding_size,
dim_out=encoder_num_units,
scope=(scope + '/' if scope else '') + 'backward_encoder',
outputs_with_grads=([0] if use_attention else [1, 3]),
)
outputs_bw = model.net.ReversePackedSegs(
[outputs_bw, input_lengths],
['outputs_bw'],
)
# Concatenate forward and backward results
outputs, _ = model.net.Concat(
[outputs_fw, outputs_bw],
['outputs', 'outputs_dim'],
axis=2,
)
final_hidden_state, _ = model.net.Concat(
[final_hidden_state_fw, final_hidden_state_bw],
['final_hidden_state', 'final_hidden_state_dim'],
axis=2,
)
final_cell_state, _ = model.net.Concat(
[final_cell_state_fw, final_cell_state_bw],
['final_cell_state', 'final_cell_state_dim'],
axis=2,
)
return outputs, final_hidden_state, final_cell_state
def build_embedding_encoder(
model,
encoder_params,
inputs,
input_lengths,
vocab_size,
embeddings,
embedding_size,
use_attention,
num_gpus=0,
scope=None,
):
with core.NameScope(scope or ''):
if num_gpus == 0:
embedded_encoder_inputs = model.net.Gather(
[embeddings, inputs],
['embedded_encoder_inputs'],
)
else:
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
embedded_encoder_inputs_cpu = model.net.Gather(
[embeddings, inputs],
['embedded_encoder_inputs_cpu'],
)
embedded_encoder_inputs = model.CopyCPUToGPU(
embedded_encoder_inputs_cpu,
'embedded_encoder_inputs',
)
assert len(encoder_params['encoder_layer_configs']) == 1
encoder_num_units = (
encoder_params['encoder_layer_configs'][0]['num_units']
)
with core.NameScope(scope or ''):
encoder_initial_cell_state = model.param_init_net.ConstantFill(
[],
['encoder_initial_cell_state'],
shape=[encoder_num_units],
value=0.0,
)
encoder_initial_hidden_state = model.param_init_net.ConstantFill(
[],
'encoder_initial_hidden_state',
shape=[encoder_num_units],
value=0.0,
)
# Choose corresponding rnn encoder function
if encoder_params['use_bidirectional_encoder']:
rnn_encoder_func = rnn_bidirectional_encoder
encoder_output_dim = 2 * encoder_num_units
else:
rnn_encoder_func = rnn_unidirectional_encoder
encoder_output_dim = encoder_num_units
(
encoder_outputs,
final_encoder_hidden_state,
final_encoder_cell_state,
) = rnn_encoder_func(
model,
embedded_encoder_inputs,
input_lengths,
encoder_initial_hidden_state,
encoder_initial_cell_state,
embedding_size,
encoder_num_units,
use_attention,
scope=scope,
)
weighted_encoder_outputs = None
return (
encoder_outputs,
weighted_encoder_outputs,
final_encoder_hidden_state,
final_encoder_cell_state,
encoder_output_dim,
)
def build_initial_rnn_decoder_states(
model,
encoder_num_units,
decoder_num_units,
final_encoder_hidden_state,
final_encoder_cell_state,
use_attention,
):
if use_attention:
decoder_initial_hidden_state = model.param_init_net.ConstantFill(
[],
'decoder_initial_hidden_state',
shape=[decoder_num_units],
value=0.0,
)
decoder_initial_cell_state = model.param_init_net.ConstantFill(
[],
'decoder_initial_cell_state',
shape=[decoder_num_units],
value=0.0,
)
initial_attention_weighted_encoder_context = (
model.param_init_net.ConstantFill(
[],
'initial_attention_weighted_encoder_context',
shape=[encoder_num_units],
value=0.0,
)
)
return (
decoder_initial_hidden_state,
decoder_initial_cell_state,
initial_attention_weighted_encoder_context,
)
else:
decoder_initial_hidden_state = brew.fc(
model,
final_encoder_hidden_state,
'decoder_initial_hidden_state',
encoder_num_units,
decoder_num_units,
axis=2,
)
decoder_initial_cell_state = brew.fc(
model,
final_encoder_cell_state,
'decoder_initial_cell_state',
encoder_num_units,
decoder_num_units,
axis=2,
)
return (
decoder_initial_hidden_state,
decoder_initial_cell_state,
)
def output_projection(
model,
decoder_outputs,
decoder_output_size,
target_vocab_size,
decoder_softmax_size,
):
if decoder_softmax_size is not None:
decoder_outputs = brew.fc(
model,
decoder_outputs,
'decoder_outputs_scaled',
dim_in=decoder_output_size,
dim_out=decoder_softmax_size,
)
decoder_output_size = decoder_softmax_size
output_projection_w = model.param_init_net.XavierFill(
[],
'output_projection_w',
shape=[target_vocab_size, decoder_output_size],
)
output_projection_b = model.param_init_net.XavierFill(
[],
'output_projection_b',
shape=[target_vocab_size],
)
model.params.extend([
output_projection_w,
output_projection_b,
])
output_logits = model.net.FC(
[
decoder_outputs,
output_projection_w,
output_projection_b,
],
['output_logits'],
)
return output_logits