blob: c7a3110227a6df6d89765aa20ea6dfcacd1b2f71 [file] [log] [blame]
# Copyright (c) 2016-present, Facebook, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##############################################################################
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from future.utils import viewkeys
from multiprocessing import Process, Queue
import numpy as np
import os
import shutil
import tempfile
import unittest
from caffe2.proto import caffe2_pb2
from caffe2.python import core, cnn, data_parallel_model, dyndep, optimizer, \
rnn_cell, workspace, model_helper, brew
from caffe2.python.test_util import TestCase
dyndep.InitOpsLibrary("@/caffe2/caffe2/distributed:file_store_handler_ops")
class TemporaryDirectory:
def __enter__(self):
self.tmpdir = tempfile.mkdtemp()
return self.tmpdir
def __exit__(self, type, value, traceback):
shutil.rmtree(self.tmpdir)
# Note(jiayq): we are yet to find out why Travis gives out an error in gloo
# like:
# RuntimeError: [enforce fail at /home/travis/build/caffe2/caffe2/third_party/gloo/gloo/transport/tcp/device.cc:113] ifa != nullptr. Unable to find interface for: [127.0.1.1]
# See for example https://travis-ci.org/caffe2/caffe2/jobs/262433866
# As a result, we will check if this is travis, and if yes, disable it.
@unittest.skipIf(os.environ.get("TRAVIS"), "DPMTest has a known issue with Travis.")
class DataParallelModelTest(TestCase):
def run_model(self, devices, gpu):
'''
Helper function for test_equiv
'''
def input_builder_fun(model):
return None
def model_build_fun(model, loss_scale):
fc = model.FC("data", "fc", 16, 1,
("ConstantFill", {}), ("ConstantFill", {}))
fc_fl = model.FlattenToVec(fc, "fc_fl")
sigm = model.Sigmoid(fc_fl, "sigm")
sq = model.SquaredL2Distance([sigm, "label"], "sq")
loss = model.AveragedLoss(sq, "loss")
loss = model.Scale(loss, scale=loss_scale)
# For testing explicit sync
model.param_init_net.UniformFill([], ["sync_num"], shape=[1])
return [loss]
def add_optimizer(model):
return optimizer.build_sgd(
model,
0.1,
policy="fixed",
max_gradient_norm=5.0,
allow_lr_injection=True,
)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
order="NHWC",
name="test{}".format(devices),
)
data_parallel_model.Parallelize(
model,
input_builder_fun=input_builder_fun,
forward_pass_builder_fun=model_build_fun,
optimizer_builder_fun=add_optimizer,
devices=devices,
cpu_device=not gpu,
shared_model=not gpu,
)
data_parallel_model.AddBlobSync(model, ["sync_num"])
np.random.seed(2603)
# Each run has same input, independent of number of gpus
batch_size = 64
for i in range(0, 10):
full_data = np.random.rand(batch_size, 16)
full_labels = np.round(full_data[:, 0])
batch_per_device = batch_size // len(devices)
for (j, g) in enumerate(devices):
st = j * batch_per_device
en = st + batch_per_device
data = full_data[st:en, :].astype(np.float32)
labels = full_labels[st:en].astype(np.float32)
with core.DeviceScope(core.DeviceOption(model._device_type, g)):
workspace.FeedBlob(
"{}_{}/data".format(model._device_prefix, g), data
)
workspace.FeedBlob(
"{}_{}/label".format(model._device_prefix, g), labels
)
if i == 0:
workspace.RunNetOnce(model.param_init_net)
workspace.CreateNet(model.net)
workspace.FeedBlob(
model._device_prefix + "_0/sync_num",
np.array([i * 2]).astype(np.float32),
device_option=core.DeviceOption(model._device_type, 0))
workspace.RunNet(model.net.Proto().name)
# Test AddBlobSync
for j in model._devices:
sync = workspace.FetchBlob(
model._device_prefix + "_{}/sync_num".format(j))[0]
self.assertTrue(abs(sync - i * 2) < 0.01)
return workspace.FetchBlob("{}_0/fc_w".format(model._device_prefix))
def run_test_locally(self, fn, device_option=None, **kwargs):
# Queue for assertion errors on subprocesses
queue = Queue()
# Capture any exception thrown by the subprocess
def run_fn(*args, **kwargs):
try:
if device_option is None:
fn(*args, **kwargs)
workspace.ResetWorkspace()
else:
with core.DeviceScope(device_option):
fn(*args, **kwargs)
workspace.ResetWorkspace()
except Exception as ex:
queue.put(ex)
# Start N processes in the background
procs = []
for i in range(kwargs['comm_size']):
kwargs['comm_rank'] = i
proc = Process(
target=run_fn,
kwargs=kwargs)
proc.start()
procs.append(proc)
# Test complete, join background processes
while len(procs) > 0:
proc = procs.pop(0)
while proc.is_alive():
proc.join(1)
# Raise exception if we find any.
# Note that the following is executed ALSO after
# the last process was joined, so if ANY exception
# was raised, it will be re-raised here.
if not queue.empty():
raise queue.get()
def test_equiv(self):
'''
Test that the model produces exactly same results given
total batchsize, independent of number of GPUs.
'''
for gpu in [True, False]:
if gpu and (not workspace.has_gpu_support or
workspace.NumCudaDevices() < 2):
continue
result_2gpus = self.run_model([0, 1], gpu=gpu)
result_1gpus = self.run_model([0], gpu=gpu)
self.assertTrue(np.allclose(result_1gpus, result_2gpus))
if not gpu or workspace.NumCudaDevices() >= 4:
result_4gpus = self.run_model(list(range(4)), gpu=gpu)
self.assertTrue(np.allclose(result_1gpus, result_4gpus))
if not gpu or workspace.NumCudaDevices() >= 8:
result_8gpus = self.run_model(list(range(8)), gpu=gpu)
self.assertTrue(np.allclose(result_1gpus, result_8gpus))
if not gpu or workspace.NumCudaDevices() >= 16:
result_16gpus = self.run_model(list(range(16)), gpu=gpu)
self.assertTrue(np.allclose(result_1gpus, result_16gpus))
def test_checkpoint_params(self):
def add_input_ops(model):
pass
def add_model_ops(model, loss_scale):
model.NHWC2NCHW("data", "data_nchw")
model.Conv("data_nchw", 'conv1', 3, 64,
weight_init=("MSRAFill", {}), kernel=7,
stride=2, pad=3, no_bias=0)
model.SpatialBN('conv1', 'conv1_spatbn_relu', 64, epsilon=1e-3, is_test=False)
model.Relu('conv1_spatbn_relu', 'conv1_spatbn_relu')
model.MaxPool('conv1_spatbn_relu', 'pool1', kernel=3, stride=2)
model.FC('pool1', 'fc', dim_in=(64 * 56 * 56), dim_out=100)
model.Sigmoid('fc', 'fc_sigm')
model.Softmax('fc_sigm', 'softmax')
model.LabelCrossEntropy(['softmax', 'label'], 'xent')
loss = model.AveragedLoss('xent', 'loss')
# Add a duplicate param init to ensure it does not cause issues
model.param_init_net.ConstantFill(
[], ["fc_w"], shape=((64 * 56 * 56), 1000)
)
return [loss]
def add_optimizer(model):
optimizer.build_sgd(model, 0.1, policy="fixed", momentum=0.9)
model = cnn.CNNModelHelper(
order="NHWC",
name="test",
)
data_parallel_model.Parallelize_CPU(
model,
input_builder_fun=add_input_ops,
forward_pass_builder_fun=add_model_ops,
optimizer_builder_fun=add_optimizer,
devices=[1, 2, 3],
)
# Only gpu_1 params should be returned (gpu_1 is the first gpu)
checkpoint_params = data_parallel_model.GetCheckpointParams(model)
for p in model.GetParams("cpu_1/"):
self.assertTrue(p in checkpoint_params)
self.assertTrue(p + "_momentum" in checkpoint_params)
for p in model.GetParams("cpu_2/"):
self.assertFalse(p in checkpoint_params)
self.assertTrue(
core.BlobReference("cpu_1/fc_w_momentum") in checkpoint_params)
for c in model.GetComputedParams("cpu_1/"):
self.assertTrue(c in checkpoint_params)
for c in model.GetComputedParams("cpu_2/"):
self.assertFalse(c in checkpoint_params)
self.assertFalse(core.BlobReference("cpu_1/data") in checkpoint_params)
self.assertTrue(core.BlobReference("optimizer_iteration") in checkpoint_params)
def test_net_conversion_and_append_net(self):
other = model_helper.ModelHelper()
fc1 = brew.fc(other, "data", "other_fc1", dim_in=3*227*227, dim_out=10)
fc2 = brew.fc(other, fc1, "other_fc2", dim_in=10, dim_out=10)
brew.fc(other, fc2, "other_fc3", dim_in=10, dim_out=10)
def add_input_ops(model):
model.net.UniformFill([], ["data"], shape=[4, 227, 227, 3])
model.net.UniformFill([], ["label"], shape=[4])
def add_model_ops(model, loss_scale):
model.NHWC2NCHW("data", "data_nchw")
model.Conv("data_nchw", 'conv1', 3, 64,
weight_init=("MSRAFill", {}), kernel=7,
stride=2, pad=3, no_bias=0)
model.SpatialBN('conv1', 'conv1_spatbn_relu', 64, epsilon=1e-3, is_test=False)
model.Relu('conv1_spatbn_relu', 'conv1_spatbn_relu')
model.MaxPool('conv1_spatbn_relu', 'pool1', kernel=3, stride=2)
model.FC('pool1', 'fc', dim_in=(64 * 56 * 56), dim_out=10)
# Append the net and param_init_net of the other model
appendnet = data_parallel_model.ConvertNetForDevice(other.net)
model.net.AppendNet(appendnet)
model.param_init_net.AppendNet(
data_parallel_model.ConvertNetForDevice(other.param_init_net))
model.Sigmoid('fc', 'fc_sigm')
model.Softmax('fc_sigm', 'softmax')
loss = model.AveragedLoss('softmax', 'loss')
return [loss]
def add_optimizer(model):
optimizer.build_sgd(model, 0.1, policy="fixed", momentum=0.9)
model = cnn.CNNModelHelper(
order="NCHW",
name="test",
)
data_parallel_model.Parallelize_CPU(
model,
input_builder_fun=add_input_ops,
forward_pass_builder_fun=add_model_ops,
optimizer_builder_fun=add_optimizer,
devices=range(4)
)
# Just create and run net and confirm no exception is thrown
workspace.RunNetOnce(model.param_init_net)
workspace.CreateNet(model.net)
workspace.RunNet(model.net)
def test_synchronization_barrier(self):
def run(comm_rank, comm_size, tmpdir):
def add_input_ops(model):
pass
def add_model_ops(model, loss_scale):
return []
def add_optimizer(model):
pass
store_handler = "store_handler"
workspace.RunOperatorOnce(
core.CreateOperator(
"FileStoreHandlerCreate",
[],
[store_handler],
path=tmpdir))
rendezvous = dict(
kv_handler=store_handler,
shard_id=comm_rank,
num_shards=comm_size,
engine='GLOO',
)
model = cnn.CNNModelHelper(
order="NHWC",
name="test",
)
data_parallel_model.Parallelize_CPU(
model,
input_builder_fun=add_input_ops,
forward_pass_builder_fun=add_model_ops,
optimizer_builder_fun=add_optimizer,
devices=[1, 2, 3],
rendezvous=rendezvous
)
data_parallel_model.RunInitNet(model)
for _ in range(2):
data_parallel_model.Synchronize(model)
with TemporaryDirectory() as tmpdir:
self.run_test_locally(
run,
comm_size=2,
device_option=None,
tmpdir=tmpdir)
def test_device_scope_check(self):
with self.assertRaises(AssertionError):
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)):
data_parallel_model.Parallelize_GPU(None, None, None)
@unittest.skipIf(not workspace.has_gpu_support, "No gpu support.")
@unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
class RecurrentNetworkParallelTest(TestCase):
def run_model(self, devices, gpu):
'''
Helper function for test_equiv
'''
def input_builder_fun(model):
return None
def model_build_fun(model, loss_scale):
workspace.FeedBlob(
core.ScopedBlobReference("seq_lengths"),
np.array([self.T] * self.batch_per_device, dtype=np.int32)
)
model.param_init_net.ConstantFill(
[],
"hidden_init",
value=0.0,
shape=[1, self.batch_per_device, self.hidden_dim]
)
model.param_init_net.ConstantFill(
[],
"cell_init",
value=0.0,
shape=[1, self.batch_per_device, self.hidden_dim]
)
output, _last_hidden, _, _last_state, = rnn_cell.LSTM(
model=model,
input_blob="data",
seq_lengths="seq_lengths",
initial_states=("hidden_init", "cell_init"),
dim_in=self.input_dim,
dim_out=self.hidden_dim,
scope="partest",
)
# A silly loss function
loss = model.AveragedLoss(
model.Sub([output, "target"], "dist"),
"loss",
)
loss = model.Scale(loss, "loss_scaled", scale=loss_scale)
return [loss]
def param_update_fun(model):
ITER = model.Iter("ITER")
LR = model.net.LearningRate(
[ITER],
"LR",
base_lr=(-0.1),
policy="fixed",
)
ONE = model.param_init_net.ConstantFill(
[], "ONE", shape=[1], value=1.0,
)
for param in model.GetParams():
param_grad = model.param_to_grad[param]
model.WeightedSum([param, ONE, param_grad, LR], param)
assert len(model.GetParams()) == len(model.params) // len(model._devices)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
name="recurrent_test{}".format(devices),
)
self.T = 8
self.batch_size = 64
self.input_dim = 8
self.hidden_dim = 31
self.batch_per_device = self.batch_size // len(devices)
data_parallel_model.Parallelize(
model,
input_builder_fun=input_builder_fun,
forward_pass_builder_fun=model_build_fun,
param_update_builder_fun=param_update_fun,
devices=devices,
optimize_gradient_memory=True,
cpu_device=not gpu,
)
# Change all initialization to be ConstantFills so that
# the everything is deterministic
for op in model.param_init_net.Proto().op:
if op.type.endswith('Fill'):
op.type = 'ConstantFill'
# Each run has same input, independent of number of gpus
np.random.seed(20150210)
for i in range(0, 10):
full_data = np.random.rand(self.T, self.batch_size, self.input_dim)
full_target = np.random.rand(
self.T, self.batch_size, self.hidden_dim
)
for (j, g) in enumerate(devices):
st = j * self.batch_per_device
en = st + self.batch_per_device
data = full_data[:, st:en, :].astype(np.float32)
targets = full_target[:, st:en, :].astype(np.float32)
with core.DeviceScope(core.DeviceOption(model._device_type, g)):
workspace.FeedBlob(
"{}_{}/data".format(model._device_prefix, g), data
)
workspace.FeedBlob(
"{}_{}/target".format(model._device_prefix, g), targets
)
if i == 0:
workspace.RunNetOnce(model.param_init_net)
workspace.CreateNet(model.net)
workspace.RunNet(model.net.Proto().name)
return workspace.FetchBlob("{}_0/partest/i2h_w".format(model._device_prefix))
def test_equiv_recurrent(self):
'''
Test that the model produces exactly same results given
total batchsize, independent of number of GPUs/CPUs.
'''
for gpu in [True, False]:
if gpu and not workspace.has_gpu_support:
continue
result_2gpus = self.run_model([0, 1], gpu)
result_1gpus = self.run_model([0], gpu)
self.assertTrue(np.allclose(result_1gpus, result_2gpus))
if not gpu or workspace.NumCudaDevices() >= 4:
result_4gpus = self.run_model(list(range(4)), gpu)
self.assertTrue(np.allclose(result_1gpus, result_4gpus))
if not gpu or workspace.NumCudaDevices() >= 8:
result_8gpus = self.run_model(list(range(8)), gpu)
self.assertTrue(np.allclose(result_1gpus, result_8gpus))
@unittest.skipIf(not workspace.has_gpu_support, "No gpu support.")
@unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
class SparseDataParallelModelTest(TestCase):
'''
Create and run the model. We try with both storing indices for gather
on CPU and on GPU
'''
def run_model(self, V, gpu_devices, cpu_indices):
def input_builder_fun(model):
return None
def model_build_fun(model, loss_scale):
if cpu_indices:
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
gathered_cpu = model.net.Gather(
[self.vecs, 'indices'], 'gathered_cpu')
gathered = model.CopyCPUToGPU(gathered_cpu, "gathered")
else:
gpu_vecs = model.param_init_net.CopyCPUToGPU(
self.vecs, "gpuvecs",
)
model.params.append(gpu_vecs)
gathered = model.net.Gather([gpu_vecs, 'indices'], 'gathered')
flattened = model.Flatten(gathered, "flattened")
fc = model.FC(flattened, "fc", 16 * 16, 1,
("ConstantFill", {}), ("ConstantFill", {}))
fc_fl = model.FlattenToVec(fc, "fc_fl")
sigm = model.Sigmoid(fc_fl, "sigm")
sq = model.SquaredL2Distance([sigm, "label"], "sq")
loss = model.AveragedLoss(sq, "loss")
loss = model.Scale(loss, scale=loss_scale)
return [loss]
def param_update_fun(model):
ONE = model.param_init_net.ConstantFill(
[], "ONE", shape=[1], value=1.0,
)
LR = model.CopyCPUToGPU(self.LR, "LR")
for param in model.GetParams():
param_grad = model.param_to_grad[param]
if not isinstance(param_grad, core.GradientSlice):
model.WeightedSum([param, ONE, param_grad, LR], param)
else:
param_momentum = model.param_init_net.ConstantFill(
[param],
param + '_momentum',
value=0.0,
)
model.net.SparseMomentumSGDUpdate(
[
param_grad.values,
param_momentum,
LR,
param,
param_grad.indices,
],
[
param_grad.values, param_momentum, param
],
momentum=0.1,
nesterov=0,
)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
order="NHWC",
name="sparse_test{}".format(gpu_devices),
)
with core.NameScope("cpu"):
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
self.ITER = model.Iter("ITER")
self.LR = model.net.LearningRate(
[self.ITER],
"LR",
base_lr=(-0.1),
policy="fixed",
)
self.vecs = model.param_init_net.UniformFill(
[], "vecs", shape=[V, 16])
if cpu_indices:
model.params.append(self.vecs)
self.ONE_CPU = model.param_init_net.ConstantFill(
[], "ONE_CPU", shape=[1], value=1.0,
)
data_parallel_model.Parallelize_GPU(
model,
input_builder_fun=input_builder_fun,
forward_pass_builder_fun=model_build_fun,
param_update_builder_fun=param_update_fun,
devices=gpu_devices,
)
# Update the vecs
if cpu_indices:
with core.NameScope("cpu"):
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
for param in model.GetParams():
param_grad = model.param_to_grad[param]
model.ScatterWeightedSum([param, self.ONE_CPU,
param_grad.indices,
param_grad.values,
self.LR],
self.vecs)
else:
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)):
model.CopyGPUToCPU("gpu_0/gpuvecs", self.vecs)
np.random.seed(2603)
# Each run has same input, independent of number of gpus
batch_size = 64
for i in range(0, 10):
full_indices = np.random.permutation(V)[:batch_size * 16].reshape(
batch_size, 16
)
full_labels = full_indices[:, 0] % 2
batch_per_device = batch_size // len(gpu_devices)
for (j, g) in enumerate(gpu_devices):
st = j * batch_per_device
en = st + batch_per_device
indices = full_indices[st:en, :].astype(np.int32)
labels = full_labels[st:en].astype(np.float32)
device_for_indices = core.DeviceOption(caffe2_pb2.CPU)
if not cpu_indices:
device_for_indices = core.DeviceOption(caffe2_pb2.CUDA, g)
with core.DeviceScope(device_for_indices):
workspace.FeedBlob("gpu_{}/indices".format(g), indices)
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, g)):
workspace.FeedBlob("gpu_{}/label".format(g), labels)
if i == 0:
workspace.RunNetOnce(model.param_init_net)
# Force vecs to be same on all runs
orig_vecs = np.random.rand(V, 16).astype(np.float32)
workspace.FeedBlob(
self.vecs,
orig_vecs
)
if not cpu_indices:
for g in gpu_devices:
workspace.FeedBlob(
"gpu_{}/gpuvecs".format(g),
orig_vecs,
device_option=core.DeviceOption(caffe2_pb2.CUDA, g),
)
workspace.CreateNet(model.net)
workspace.RunNet(model.net.Proto().name)
if len(gpu_devices) == 2:
if not cpu_indices:
idx = workspace.FetchBlob("gpu_0/indices")
idx = list(idx.flatten())
n = len(idx)
nu = len(set(idx))
assert n == nu, "We cannot have duplicate indices"
# Sanity check to see the vecs were updated
self.assertFalse(
np.allclose(workspace.FetchBlob(self.vecs), orig_vecs))
return [workspace.FetchBlob(self.vecs if cpu_indices else "gpu_0/gpuvecs"),
workspace.FetchBlob("gpu_0/fc_w")]
def _test_equiv_sparse(self, cpu_indices):
'''
Test that the model produces exactly same results given
total batchsize, independent of number of GPUs.
'''
V = 10000
result_2gpus = self.run_model(V, [0, 1], cpu_indices)
result_1gpus = self.run_model(V, [0], cpu_indices)
self.assertTrue(np.allclose(result_1gpus[0], result_2gpus[0]))
self.assertTrue(np.allclose(result_1gpus[1], result_2gpus[1]))
if workspace.NumCudaDevices() >= 4:
result_4gpus = self.run_model(V, list(range(4)), cpu_indices)
self.assertTrue(np.allclose(result_1gpus[0], result_4gpus[0]))
self.assertTrue(np.allclose(result_1gpus[1], result_4gpus[1]))
if workspace.NumCudaDevices() >= 8:
result_8gpus = self.run_model(V, list(range(8)), cpu_indices)
self.assertTrue(np.allclose(result_1gpus[0], result_8gpus[0]))
self.assertTrue(np.allclose(result_1gpus[1], result_8gpus[1]))
def test_equiv_sparse(self):
self._test_equiv_sparse(True)
self._test_equiv_sparse(False)
@unittest.skipIf(not workspace.has_gpu_support, "No gpu support.")
@unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
class ParallelizeGPUBMUFTest(TestCase):
def _run_model(self, gpu_devices):
'''
Helper function for test_equiv
'''
def input_builder_fun(model):
return None
def _model_build_fun(self, model, loss_scale):
fc = model.FC(
"data", "fc", 16, 1, ("ConstantFill", {}), ("ConstantFill", {})
)
fc_fl = model.FlattenToVec(fc, "fc_fl")
sigm = model.Sigmoid(fc_fl, "sigm")
sq = model.SquaredL2Distance([sigm, "label"], "sq")
loss = model.AveragedLoss(sq, "loss")
loss = model.Scale(loss, scale=loss_scale)
return [loss]
def _param_update_fun(self, model):
ITER = model.Iter("ITER")
LR = model.net.LearningRate(
[ITER],
"LR",
base_lr=(-0.1),
policy="fixed",
)
ONE = model.param_init_net.ConstantFill(
[], "ONE", shape=[1], value=1.0,
)
for param in model.GetParams():
grad = model.param_to_grad[param]
model.WeightedSum([param, ONE, grad, LR], param)
def _generate_data(self, gpu_devices):
np.random.seed(26)
# Each run has same input, independent of number of gpus
batch_size = 64
for _ in range(0, 10):
full_data = np.random.rand(batch_size, 16)
full_labels = np.round(full_data[:, 0])
batch_per_device = batch_size // len(gpu_devices)
for (j, g) in enumerate(gpu_devices):
st = j * batch_per_device
en = st + batch_per_device
data = full_data[st:en, :].astype(np.float32)
labels = full_labels[st:en].astype(np.float32)
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, g)):
workspace.FeedBlob("gpu_{}/data".format(g), data)
workspace.FeedBlob("gpu_{}/label".format(g), labels)
def test_parallelize_gpu_bmuf(self):
model = cnn.CNNModelHelper(
order="NHWC",
name="test"
)
gpu_ids = [0, 1]
def input_builder_fun(model):
return None
self._generate_data(gpu_ids)
data_parallel_model.Parallelize_GPU_BMUF(
model,
input_builder_fun,
self._model_build_fun,
self._param_update_fun,
devices=gpu_ids,
)
data_parallel_model.RunInitNet(model)
# Check initial momentum params are zeros
self.assertEqual(
list(viewkeys(model._device_grouped_blobs)), ['fc_w', 'fc_b']
)
self.assertEqual(workspace.FetchBlob('gpu_0/fc_b_v'), 0)
np.testing.assert_equal(
workspace.FetchBlob('gpu_0/fc_w_v'),
np.zeros(16).astype(np.float32).reshape(1, 16)
)
# Run the algorithm for one iteration to have non-zero params.
data_parallel_model.RunNet(model, 1)
# Save iteration momentum and post local update params
v_b_ = workspace.FetchBlob('gpu_0/fc_b_v')
v_w_ = workspace.FetchBlob('gpu_0/fc_w_v')
workspace.RunNetOnce(model.net)
b_0_ = workspace.FetchBlob('gpu_0/fc_b')
w_0_ = workspace.FetchBlob('gpu_0/fc_w')
b_1_ = workspace.FetchBlob('gpu_1/fc_b')
w_1_ = workspace.FetchBlob('gpu_1/fc_w')
# Compute block gradients.
b_g_ = workspace.FetchBlob('gpu_0/fc_b_g')
w_g_ = workspace.FetchBlob('gpu_0/fc_w_g')
workspace.RunNetOnce(model._global_model_param_updates_net)
g_b = (b_0_ + b_1_) / 2 - b_g_
g_w = (w_0_ + w_1_) / 2 - w_g_
v_b = workspace.FetchBlob('gpu_0/fc_b_v')
v_w = workspace.FetchBlob('gpu_0/fc_w_v')
w_g = workspace.FetchBlob('gpu_0/fc_w_g')
b_g = workspace.FetchBlob('gpu_0/fc_b_g')
w_0 = workspace.FetchBlob('gpu_0/fc_w')
b_0 = workspace.FetchBlob('gpu_0/fc_b')
w_1 = workspace.FetchBlob('gpu_1/fc_w')
b_1 = workspace.FetchBlob('gpu_1/fc_b')
# Check momentum update step
np.testing.assert_equal(v_b, 0.5 * v_b_ + g_b)
np.testing.assert_equal(v_w, 0.5 * v_w_ + g_w)
np.testing.assert_equal(w_g, w_0)
np.testing.assert_equal(w_g, w_1)
np.testing.assert_equal(b_g, b_0)
np.testing.assert_equal(b_g, b_1)
# Check params update step
np.testing.assert_equal(w_0, w_g_ + v_w)
np.testing.assert_equal(b_0, b_g_ + v_b)
@unittest.skipIf(not workspace.has_gpu_support, "No gpu support.")
@unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
class SparseDataParallelModelTestWithSharedIndices(TestCase):
'''
Create and run the model. We try with both storing indices for gather
on CPU and on GPU
'''
def run_model(self, V, gpu_devices):
def input_builder_fun(model):
return None
def model_build_fun(model, loss_scale):
gpu_vecs_gathered = []
gpu_vecs = []
for num, vec in enumerate(self.vecs):
gpu_vec = model.param_init_net.CopyCPUToGPU(
vec, 'gpuvec_{}'.format(num),
)
if num != 2:
model.params.append(gpu_vec)
gpu_vecs.append(gpu_vec)
for num, gpu_vec in enumerate(gpu_vecs):
gpu_vec_gathered = model.net.Gather(
[gpu_vec, 'indices'],
['gpu_vec_gathered_{}'.format(num)]
)
gpu_vecs_gathered.append(gpu_vec_gathered)
assert len(gpu_vecs_gathered) == 3
fc = model.net.FC(
[
gpu_vecs_gathered[2],
gpu_vecs_gathered[0],
gpu_vecs_gathered[1],
],
['fc'],
)
_, loss = model.net.SoftmaxWithLoss(
[fc, 'label'],
['ce_loss', 'avg_loss'],
only_loss=True,
)
loss = model.Scale(loss, scale=loss_scale)
model.net.Print(loss, [], limit=10)
return [loss]
def param_update_fun(model):
ONE = model.param_init_net.ConstantFill(
[], "ONE", shape=[1], value=1.0,
)
LR = model.CopyCPUToGPU(self.LR, "LR")
for param in model.GetParams():
param_grad = model.param_to_grad[param]
if not isinstance(param_grad, core.GradientSlice):
model.WeightedSum([param, ONE, param_grad, LR], param)
else:
model.net.ScatterWeightedSum(
[
param,
ONE,
param_grad.indices,
param_grad.values,
ONE,
],
param,
)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
order="NHWC",
name="sparse_test{}".format(gpu_devices),
)
batch_size = 32
batch_per_device = batch_size // len(gpu_devices)
with core.NameScope("cpu"):
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
self.ITER = model.Iter("ITER")
self.LR = model.net.LearningRate(
[self.ITER],
"LR",
base_lr=(-0.1),
policy="fixed",
)
'''
self.vecs consists of 3 big blobs on which we call Gather:
1) FC weights, shape=(V, 16)
2) FC bias, shape=(V)
3) FC input, shape=(batch_per_device, 16)
'''
self.vecs = [
model.param_init_net.UniformFill(
[], "vec_{}".format(num), shape=[V, 16])
for num in range(2)
]
self.vecs.append(
model.param_init_net.UniformFill(
[],
"vec_2", shape=[batch_per_device, 16]
)
)
self.ONE_CPU = model.param_init_net.ConstantFill(
[], "ONE_CPU", shape=[1], value=1.0,
)
data_parallel_model.Parallelize_GPU(
model,
input_builder_fun=input_builder_fun,
forward_pass_builder_fun=model_build_fun,
param_update_builder_fun=param_update_fun,
devices=gpu_devices,
)
# Update the vecs
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)):
for num, vec in enumerate(self.vecs[:-1]):
model.CopyGPUToCPU("gpu_0/gpuvec_{}".format(num), vec)
# Each run has same input, independent of number of gpus
for i in range(0, 10):
np.random.seed(2603)
full_indices = np.random.permutation(V)[:batch_size].reshape(
batch_size
)
full_labels = full_indices[:] % batch_per_device
for (j, g) in enumerate(gpu_devices):
st = j * batch_per_device
en = st + batch_per_device
indices = full_indices[st:en].astype(np.int32)
labels = full_labels[st:en].astype(np.int32)
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, g)):
workspace.FeedBlob("gpu_{}/indices".format(g), indices)
workspace.FeedBlob("gpu_{}/label".format(g), labels)
if i == 0:
workspace.RunNetOnce(model.param_init_net)
# Force vecs to be same on all runs
orig_vecs = [
np.random.rand(V, 16).astype(np.float32),
np.random.rand(V).astype(np.float32),
np.random.rand(V, 16).astype(np.float32),
]
for vec, orig_vec in zip(self.vecs, orig_vecs):
workspace.FeedBlob(
vec,
orig_vec
)
for g in gpu_devices:
for num, orig_vec in enumerate(orig_vecs):
workspace.FeedBlob(
"gpu_{}/gpuvec_{}".format(g, num),
orig_vec,
device_option=core.DeviceOption(
caffe2_pb2.CUDA, g),
)
workspace.CreateNet(model.net)
workspace.RunNet(model.net.Proto().name)
idx = workspace.FetchBlob('gpu_0/indices')
grad_slices = [
workspace.FetchBlob(
'gpu_{}/gpu_vec_gathered_{}_grad'.format(g, num))
for g in gpu_devices for num in range(2)
]
for grad_slice in grad_slices:
# print (len(idx), len(grad_slice))
assert len(idx) == len(grad_slice), (
'Number of indices {} is not same as number of gradient '
'slices {}. This might lead to illegal memory access'.format(
len(idx), len(grad_slice)
)
)
def test_sparse_shared_indices_gpu(self):
'''
Test that the model has same number of indices and gradient rows
given total batchsize, independent of number of GPUs.
'''
V = 10000
self.run_model(V, [0, 1])
self.run_model(V, [0])
if workspace.NumCudaDevices() >= 4:
self.run_model(V, list(range(4)))
if workspace.NumCudaDevices() >= 8:
self.run_model(V, list(range(8)))
if __name__ == "__main__":
import unittest
unittest.main()