blob: 155fea22d4808cdc698f821984f9f877be47cf3e [file] [log] [blame]
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import unittest
from caffe2.proto import caffe2_pb2
from caffe2.python import core, workspace, data_parallel_model, cnn
from caffe2.python.test_util import TestCase
@unittest.skipIf(not workspace.has_gpu_support, "No gpu support.")
@unittest.skipIf(workspace.NumCudaDevices() < 2, "Need at least 2 GPUs.")
class GPUDataParallelModelTest(TestCase):
def run_model(self, gpu_devices):
'''
Helper function for test_equiv
'''
def input_builder_fun(model):
return None
def model_build_fun(model, loss_scale):
fc = model.FC("data", "fc", 16, 1,
("ConstantFill", {}), ("ConstantFill", {}))
fc_fl = model.FlattenToVec(fc, "fc_fl")
sigm = model.Sigmoid(fc_fl, "sigm")
sq = model.SquaredL2Distance([sigm, "label"], "sq")
loss = model.AveragedLoss(sq, "loss")
loss = model.Scale(loss, scale=loss_scale)
return [loss]
def param_update_fun(model):
ITER = model.Iter("ITER")
LR = model.net.LearningRate(
[ITER],
"LR",
base_lr=(-0.1),
policy="fixed",
)
ONE = model.param_init_net.ConstantFill(
[], "ONE", shape=[1], value=1.0,
)
for param in model.GetParams():
grad = model.param_to_grad[param]
model.WeightedSum([param, ONE, grad, LR], param)
workspace.ResetWorkspace()
model = cnn.CNNModelHelper(
order="NHWC",
name="test{}".format(gpu_devices),
)
data_parallel_model.Parallelize_GPU(
model,
input_builder_fun=input_builder_fun,
forward_pass_builder_fun=model_build_fun,
param_update_builder_fun=param_update_fun,
devices=gpu_devices,
)
np.random.seed(2603)
# Each run has same input, independent of number of gpus
batch_size = 64
for i in range(0, 10):
full_data = np.random.rand(batch_size, 16)
full_labels = np.round(full_data[:, 0])
batch_per_device = batch_size // len(gpu_devices)
for (j, g) in enumerate(gpu_devices):
st = j * batch_per_device
en = st + batch_per_device
data = full_data[st:en, :].astype(np.float32)
labels = full_labels[st:en].astype(np.float32)
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, g)):
workspace.FeedBlob("gpu_{}/data".format(g), data)
workspace.FeedBlob("gpu_{}/label".format(g), labels)
if i == 0:
workspace.RunNetOnce(model.param_init_net)
workspace.CreateNet(model.net)
workspace.RunNet(model.net.Proto().name)
return workspace.FetchBlob("gpu_0/fc_w")
def test_equiv(self):
'''
Test that the model produces exactly same results given
total batchsize, independent of number of GPUs.
'''
result_2gpus = self.run_model([0, 1])
result_1gpus = self.run_model([0])
self.assertTrue(np.allclose(result_1gpus, result_2gpus))
if workspace.NumCudaDevices() >= 4:
result_4gpus = self.run_model(range(4))
self.assertTrue(np.allclose(result_1gpus, result_4gpus))
if workspace.NumCudaDevices() >= 8:
result_8gpus = self.run_model(range(8))
self.assertTrue(np.allclose(result_1gpus, result_8gpus))