| # Copyright 2018 The TensorFlow Authors. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============================================================================== |
| """Tests for ParameterServerStrategy.""" |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import copy |
| import threading |
| |
| from absl.testing import parameterized |
| from tensorflow.core.protobuf import config_pb2 |
| from tensorflow.python.data.ops import dataset_ops |
| from tensorflow.python.distribute import central_storage_strategy |
| from tensorflow.python.distribute import combinations |
| from tensorflow.python.distribute import device_util |
| from tensorflow.python.distribute import distribute_lib |
| from tensorflow.python.distribute import distribute_utils |
| from tensorflow.python.distribute import distribution_strategy_context as ds_context |
| from tensorflow.python.distribute import input_lib |
| from tensorflow.python.distribute import multi_worker_test_base |
| from tensorflow.python.distribute import multi_worker_util |
| from tensorflow.python.distribute import parameter_server_strategy |
| from tensorflow.python.distribute import ps_values |
| from tensorflow.python.distribute import reduce_util |
| from tensorflow.python.distribute import strategy_test_lib |
| from tensorflow.python.distribute.cluster_resolver import SimpleClusterResolver |
| from tensorflow.python.eager import backprop |
| from tensorflow.python.eager import context |
| from tensorflow.python.estimator import run_config |
| from tensorflow.python.framework import constant_op |
| from tensorflow.python.framework import device as tf_device |
| from tensorflow.python.framework import dtypes |
| from tensorflow.python.framework import errors |
| from tensorflow.python.framework import ops |
| from tensorflow.python.framework import tensor_util |
| from tensorflow.python.ops import array_ops |
| from tensorflow.python.ops import control_flow_ops |
| from tensorflow.python.ops import gradients |
| from tensorflow.python.ops import math_ops |
| from tensorflow.python.ops import partitioned_variables |
| from tensorflow.python.ops import resource_variable_ops |
| from tensorflow.python.ops import variable_scope |
| from tensorflow.python.ops import variables |
| from tensorflow.python.platform import test |
| from tensorflow.python.training import training_util |
| |
| CHIEF = run_config.TaskType.CHIEF |
| WORKER = run_config.TaskType.WORKER |
| PS = run_config.TaskType.PS |
| |
| |
| def _get_replica_id_integer(): |
| replica_id = ds_context.get_replica_context().replica_id_in_sync_group |
| if isinstance(replica_id, ops.Tensor): |
| replica_id = tensor_util.constant_value(replica_id) |
| return replica_id |
| |
| |
| def create_test_objects(cluster_spec=None, |
| task_type=None, |
| task_id=None, |
| num_gpus=None, |
| sess_config=None): |
| sess_config = sess_config or config_pb2.ConfigProto() |
| if num_gpus is None: |
| num_gpus = context.num_gpus() |
| if cluster_spec and task_type and task_id is not None: |
| cluster_resolver = SimpleClusterResolver( |
| cluster_spec=multi_worker_util.normalize_cluster_spec(cluster_spec), |
| task_type=task_type, |
| task_id=task_id, |
| num_accelerators={'GPU': num_gpus}) |
| distribution = parameter_server_strategy.ParameterServerStrategyV1( |
| cluster_resolver) |
| target = 'grpc://' + cluster_spec[WORKER][task_id] |
| else: |
| distribution = ( |
| central_storage_strategy.CentralStorageStrategy._from_num_gpus(num_gpus) |
| ) |
| target = '' |
| |
| sess_config = copy.deepcopy(sess_config) |
| sess_config = distribution.update_config_proto(sess_config) |
| |
| return distribution, target, sess_config |
| |
| |
| class ParameterServerStrategyTestBase( |
| multi_worker_test_base.MultiWorkerTestBase): |
| |
| def setUp(self): |
| self._result = 0 |
| self._lock = threading.Lock() |
| self._init_condition = threading.Condition() |
| self._init_reached = 0 |
| self._finish_condition = threading.Condition() |
| self._finish_reached = 0 |
| self._sess_config = config_pb2.ConfigProto(allow_soft_placement=True) |
| super(ParameterServerStrategyTestBase, self).setUp() |
| |
| def _get_test_objects(self, task_type, task_id, num_gpus): |
| return create_test_objects( |
| cluster_spec=self._cluster_spec, |
| task_type=task_type, |
| task_id=task_id, |
| num_gpus=num_gpus, |
| sess_config=self._sess_config) |
| |
| def _test_device_assignment_distributed(self, task_type, task_id, num_gpus): |
| worker_device = '/job:%s/replica:0/task:%d' % (task_type, task_id) |
| d, _, sess_config = self._get_test_objects(task_type, task_id, num_gpus) |
| with ops.Graph().as_default(), \ |
| self.cached_session(target=self._default_target, |
| config=sess_config) as sess, \ |
| d.scope(): |
| |
| # Define a variable outside the call_for_each_replica scope. |
| n = variable_scope.get_variable('n', initializer=10.0) |
| self.assertEqual(n.device, '/job:ps/task:0') |
| |
| def model_fn(): |
| if num_gpus == 0: |
| last_part_device = 'device:CPU:0' |
| else: |
| replica_id = _get_replica_id_integer() |
| last_part_device = ('device:GPU:%d' % replica_id) |
| |
| a = constant_op.constant(1.0) |
| b = constant_op.constant(2.0) |
| c = a + b |
| self.assertEqual(a.device, worker_device + '/' + last_part_device) |
| self.assertEqual(b.device, worker_device + '/' + last_part_device) |
| self.assertEqual(c.device, worker_device + '/' + last_part_device) |
| |
| # The device scope is ignored for variables but not for normal ops. |
| with ops.device('/job:worker/task:0'): |
| x = variable_scope.get_variable( |
| 'x', initializer=10.0, |
| aggregation=variable_scope.VariableAggregation.SUM) |
| x_add = x.assign_add(c) |
| e = a + c |
| # The variable x is on the task 1 since the device_function has been |
| # called once before the model_fn. |
| self.assertEqual(x.device, '/job:ps/task:1') |
| self.assertEqual(x_add.device, x.device) |
| self.assertEqual(e.device, |
| '/job:worker/replica:0/task:0/%s' % last_part_device) |
| |
| # The colocate_vars_with can override the distribution's device. |
| with d.extended.colocate_vars_with(x): |
| y = variable_scope.get_variable( |
| 'y', initializer=20.0, |
| aggregation=variable_scope.VariableAggregation.SUM) |
| # We add an identity here to avoid complaints about summing |
| # non-distributed values. |
| y_add = y.assign_add(array_ops.identity(x_add)) |
| self.assertEqual(y.device, '/job:ps/task:1') |
| self.assertEqual(y_add.device, y.device) |
| self.assertEqual(y.device, x.device) |
| |
| z = variable_scope.get_variable( |
| 'z', initializer=10.0, |
| aggregation=variable_scope.VariableAggregation.SUM) |
| self.assertEqual(z.device, '/job:ps/task:0') |
| self.assertNotEqual(z.device, x.device) |
| |
| with ops.control_dependencies([y_add]): |
| # We add an identity here to avoid complaints about summing |
| # non-distributed values. |
| z_add = z.assign_add(array_ops.identity(y)) |
| with ops.control_dependencies([z_add]): |
| f = z + c |
| self.assertEqual(f.device, worker_device + '/' + last_part_device) |
| |
| # The device scope would merge with the default worker device. |
| with ops.device('/CPU:1'): |
| g = e + 1.0 |
| self.assertEqual(g.device, worker_device + '/device:CPU:1') |
| |
| # Ths ops.colocate_with will be ignored when defining a variable but not |
| # for a normal tensor. |
| with ops.colocate_with(x): |
| u = variable_scope.get_variable('u', initializer=30.0) |
| v = variable_scope.get_variable('v', initializer=30.0) |
| h = f + 1.0 |
| self.assertIn('/job:ps/', u.device) |
| self.assertIn('/job:ps/', v.device) |
| # u and v are on different parameter servers. |
| self.assertTrue(u.device != x.device or v.device != x.device) |
| self.assertTrue(u.device == x.device or v.device == x.device) |
| # Here h is not on one worker. Note h.device is canonical while x.device |
| # is not but. |
| self.assertIn('/job:ps/', h.device) |
| return y_add, z_add, f |
| |
| y, z, f = d.extended.call_for_each_replica(model_fn) |
| self.assertNotEqual(y, None) |
| self.assertNotEqual(z, None) |
| self.assertNotEqual(f, None) |
| |
| if context.num_gpus() >= 1 and num_gpus <= 1: |
| self.evaluate(variables.global_variables_initializer()) |
| y_val, z_val, f_val = sess.run([y, z, f]) |
| self.assertEqual(y_val, 33.0) |
| self.assertEqual(z_val, 43.0) |
| self.assertEqual(f_val, 46.0) |
| |
| def _test_device_assignment_distributed_enable_partitioner( |
| self, task_type, task_id, num_gpus): |
| d, _, sess_config = self._get_test_objects(task_type, task_id, num_gpus) |
| num_shards = len(d.extended.parameter_devices) |
| partitioner = partitioned_variables.fixed_size_partitioner(num_shards) |
| with ops.Graph().as_default(), \ |
| self.cached_session(target=self._default_target, |
| config=sess_config) as sess, \ |
| d.scope(): |
| |
| n = variable_scope.get_variable( |
| 'n', |
| initializer=constant_op.constant([10.0, 20.0]), |
| aggregation=variable_scope.VariableAggregation.SUM, |
| partitioner=partitioner) |
| |
| for part_id, var in enumerate(n): |
| self.assertEqual(var.device, '/job:ps/task:%d' % part_id) |
| |
| def model_fn(): |
| a = constant_op.constant([3.0, 5.0]) |
| # The device scope is ignored for variables but not for normal ops. |
| with ops.device('/job:worker/task:0'): |
| x = variable_scope.get_variable( |
| 'x', |
| initializer=constant_op.constant([10.0, 20.0]), |
| aggregation=variable_scope.VariableAggregation.SUM, |
| partitioner=partitioner) |
| x_add = x.assign_add(a, name='x_add') |
| # The variable x is on the task 1 since the device_function has been |
| # called once before the model_fn. |
| for part_id, var in enumerate(x): |
| self.assertEqual(var.device, '/job:ps/task:%d' % part_id) |
| self.assertEqual(var.device, x_add[part_id].device) |
| |
| return x_add |
| |
| x = d.extended.call_for_each_replica(model_fn) |
| |
| if context.num_gpus() >= 1: |
| self.evaluate(variables.global_variables_initializer()) |
| x_val = sess.run(x) |
| if num_gpus < 1: |
| self.assertEqual(x_val, [13.0, 25.0]) |
| else: |
| x_expect = [10.0 + 3 * num_gpus, 20.0 + 5 * num_gpus] |
| self.assertEqual(x_val, x_expect) |
| |
| def _test_device_assignment_local(self, |
| d, |
| compute_device='CPU', |
| variable_device='CPU', |
| num_gpus=0): |
| with ops.Graph().as_default(), \ |
| self.cached_session(target=self._default_target, |
| config=self._sess_config) as sess, \ |
| d.scope(): |
| |
| def model_fn(): |
| if 'CPU' in compute_device: |
| replica_compute_device = '/device:CPU:0' |
| else: |
| replica_id = _get_replica_id_integer() |
| replica_compute_device = ('/device:GPU:%d' % replica_id) |
| replica_compute_device = device_util.canonicalize( |
| replica_compute_device) |
| |
| if 'CPU' in variable_device: |
| replica_variable_device = '/device:CPU:0' |
| else: |
| replica_id = _get_replica_id_integer() |
| replica_variable_device = ('/device:GPU:%d' % replica_id) |
| replica_variable_device = device_util.canonicalize( |
| replica_variable_device) |
| |
| a = constant_op.constant(1.0) |
| b = constant_op.constant(2.0) |
| c = a + b |
| self.assertEqual(a.device, replica_compute_device) |
| self.assertEqual(b.device, replica_compute_device) |
| self.assertEqual(c.device, replica_compute_device) |
| |
| # The device scope is ignored for variables but not for normal ops. |
| with ops.device('/device:GPU:2'): |
| x = variable_scope.get_variable( |
| 'x', initializer=10.0, |
| aggregation=variable_scope.VariableAggregation.SUM) |
| x_add = x.assign_add(c) |
| e = a + c |
| self.assertEqual( |
| device_util.canonicalize(x.device), replica_variable_device) |
| self.assertEqual(x_add.device, x.device) |
| self.assertEqual(e.device, device_util.canonicalize('/device:GPU:2')) |
| |
| # The colocate_vars_with can override the distribution's device. |
| with d.extended.colocate_vars_with(x): |
| y = variable_scope.get_variable( |
| 'y', initializer=20.0, |
| aggregation=variable_scope.VariableAggregation.SUM) |
| # We add an identity here to avoid complaints about summing |
| # non-distributed values. |
| y_add = y.assign_add(array_ops.identity(x_add)) |
| self.assertEqual( |
| device_util.canonicalize(y.device), replica_variable_device) |
| self.assertEqual(y_add.device, y.device) |
| self.assertEqual(y.device, x.device) |
| |
| z = variable_scope.get_variable( |
| 'z', initializer=10.0, |
| aggregation=variable_scope.VariableAggregation.SUM) |
| self.assertEqual( |
| device_util.canonicalize(z.device), replica_variable_device) |
| |
| with ops.control_dependencies([y_add]): |
| # We add an identity here to avoid complaints about summing |
| # non-distributed values. |
| z_add = z.assign_add(array_ops.identity(y)) |
| with ops.control_dependencies([z_add]): |
| f = z + c |
| self.assertEqual(f.device, replica_compute_device) |
| |
| # The device scope would merge with the default worker device. |
| with ops.device('/CPU:1'): |
| g = e + 1.0 |
| self.assertEqual(g.device, device_util.canonicalize('/device:CPU:1')) |
| |
| # Ths ops.colocate_with will be ignored when defining a variable but not |
| # for a normal tensor. |
| with ops.colocate_with(x): |
| u = variable_scope.get_variable('u', initializer=30.0) |
| h = f + 1.0 |
| self.assertEqual( |
| device_util.canonicalize(u.device), replica_variable_device) |
| self.assertEqual( |
| device_util.canonicalize(x.device), |
| device_util.canonicalize(h.device)) |
| return y_add, z_add, f |
| |
| y, z, f = d.extended.call_for_each_replica(model_fn) |
| self.assertNotEqual(y, None) |
| self.assertNotEqual(z, None) |
| self.assertNotEqual(f, None) |
| |
| if context.num_gpus() >= 1 and num_gpus <= 1: |
| self.evaluate(variables.global_variables_initializer()) |
| y_val, z_val, f_val = sess.run([y, z, f]) |
| self.assertEqual(y_val, 33.0) |
| self.assertEqual(z_val, 43.0) |
| self.assertEqual(f_val, 46.0) |
| |
| def _test_simple_increment(self, task_type, task_id, num_gpus): |
| d, master_target, sess_config = self._get_test_objects( |
| task_type, task_id, num_gpus) |
| if d.extended._cluster_spec: |
| num_workers = len(d.extended._cluster_spec.as_dict().get(WORKER)) |
| if 'chief' in d.extended._cluster_spec.as_dict(): |
| num_workers += 1 |
| else: |
| num_workers = 1 |
| with ops.Graph().as_default(), \ |
| self.cached_session(target=master_target, |
| config=sess_config) as sess, \ |
| d.scope(): |
| |
| def model_fn(): |
| x = variable_scope.get_variable( |
| 'x', initializer=10.0, |
| aggregation=variable_scope.VariableAggregation.SUM) |
| y = variable_scope.get_variable( |
| 'y', initializer=20.0, |
| aggregation=variable_scope.VariableAggregation.SUM) |
| z = variable_scope.get_variable( |
| 'z', initializer=30.0, |
| aggregation=variable_scope.VariableAggregation.ONLY_FIRST_REPLICA) |
| |
| # We explicitly make a constant tensor here to avoid complaints about |
| # summing non-distributed values. |
| one = constant_op.constant(1.0) |
| x_add = x.assign_add(one, use_locking=True) |
| y_add = y.assign_add(one, use_locking=True) |
| z_add = z.assign_add(one, use_locking=True) |
| |
| train_op = control_flow_ops.group(x_add, y_add, z_add) |
| return x, y, z, train_op |
| |
| x, y, z, train_op = d.extended.call_for_each_replica(model_fn) |
| train_op = d.group(train_op) |
| |
| if task_id == 0: |
| self.evaluate(variables.global_variables_initializer()) |
| |
| # Workers waiting for chief worker's initializing variables. |
| self._init_condition.acquire() |
| self._init_reached += 1 |
| while self._init_reached != num_workers: |
| self._init_condition.wait() |
| self._init_condition.notify_all() |
| self._init_condition.release() |
| |
| sess.run(train_op) |
| |
| # Wait for other workers to finish training. |
| self._finish_condition.acquire() |
| self._finish_reached += 1 |
| while self._finish_reached != num_workers: |
| self._finish_condition.wait() |
| self._finish_condition.notify_all() |
| self._finish_condition.release() |
| |
| x_val, y_val, z_val = sess.run([x, y, z]) |
| self.assertEqual(x_val, 10.0 + 1.0 * num_workers * d.num_replicas_in_sync) |
| self.assertEqual(y_val, 20.0 + 1.0 * num_workers * d.num_replicas_in_sync) |
| self.assertEqual(z_val, 30.0 + 1.0 * num_workers) |
| |
| def _test_minimize_loss_graph(self, task_type, task_id, num_gpus): |
| d, master_target, sess_config = self._get_test_objects( |
| task_type, task_id, num_gpus) |
| if task_type: |
| # Multi-worker |
| assert hasattr(d.extended, '_cluster_spec') and d.extended._cluster_spec |
| num_workers = len(d.extended._cluster_spec.as_dict().get(WORKER)) |
| if CHIEF in d.extended._cluster_spec.as_dict(): |
| num_workers += 1 |
| else: |
| # local |
| num_workers = 1 |
| |
| with ops.Graph().as_default(), \ |
| self.cached_session(target=master_target, |
| config=sess_config) as sess, \ |
| d.scope(): |
| kernel = strategy_test_lib.create_variable_like_keras_layer( |
| 'kernel', (1, 1), dtypes.float32,) |
| |
| def loss_fn(x): |
| y = array_ops.reshape( |
| math_ops.matmul(x, kernel), []) - constant_op.constant(1.) |
| return y * y |
| |
| # TODO(yuefengz, apassos): eager.backprop.implicit_grad is not safe for |
| # multiple graphs (b/111216820). |
| def grad_fn(x): |
| loss = loss_fn(x) |
| var_list = ( |
| variables.trainable_variables() + ops.get_collection( |
| ops.GraphKeys.TRAINABLE_RESOURCE_VARIABLES)) |
| grads = gradients.gradients(loss, var_list) |
| ret = list(zip(grads, var_list)) |
| return ret |
| |
| def update(v, g): |
| return v.assign_sub(0.05 * g, use_locking=True) |
| |
| one = constant_op.constant([[1.]]) |
| |
| def step(): |
| """Perform one optimization step.""" |
| # Run forward & backward to get gradients, variables list. |
| g_v = d.extended.call_for_each_replica(grad_fn, args=(one,)) |
| # Update the variables using the gradients and the update() function. |
| before_list = [] |
| after_list = [] |
| for g, v in g_v: |
| fetched = d.extended.read_var(v) |
| before_list.append(fetched) |
| with ops.control_dependencies([fetched]): |
| # TODO(yuefengz): support non-Mirrored variable as destinations. |
| g = d.extended.reduce_to( |
| reduce_util.ReduceOp.SUM, g, destinations=v) |
| with ops.control_dependencies( |
| d.extended.update(v, update, args=(g,), group=False)): |
| after_list.append(d.extended.read_var(v)) |
| return before_list, after_list |
| |
| before_out, after_out = step() |
| |
| if (not task_type or |
| multi_worker_util.is_chief( |
| d.extended._cluster_spec, task_type, task_id)): |
| self.evaluate(variables.global_variables_initializer()) |
| |
| # Workers waiting for chief worker's initializing variables. |
| self._init_condition.acquire() |
| self._init_reached += 1 |
| while self._init_reached != num_workers: |
| self._init_condition.wait() |
| self._init_condition.notify_all() |
| self._init_condition.release() |
| |
| for i in range(10): |
| b, a = sess.run((before_out, after_out)) |
| if i == 0: |
| before, = b |
| after, = a |
| |
| error_before = abs(before - 1) |
| error_after = abs(after - 1) |
| # Error should go down |
| self.assertLess(error_after, error_before) |
| |
| def _test_input_fn_iterator(self, |
| task_type, |
| task_id, |
| num_gpus, |
| input_fn, |
| expected_values, |
| test_reinitialize=True, |
| ignore_order=False): |
| distribution, master_target, config = self._get_test_objects( |
| task_type, task_id, num_gpus) |
| devices = distribution.extended.worker_devices |
| |
| with ops.Graph().as_default(), \ |
| self.cached_session(config=config, |
| target=master_target) as sess: |
| iterator = distribution.make_input_fn_iterator(input_fn) |
| sess.run(iterator.initializer) |
| |
| for expected_value in expected_values: |
| next_element = iterator.get_next() |
| computed_value = sess.run([distribute_utils.select_replica( |
| r, next_element) for r in range(len(devices))]) |
| if ignore_order: |
| self.assertCountEqual(expected_value, computed_value) |
| else: |
| self.assertEqual(expected_value, computed_value) |
| |
| with self.assertRaises(errors.OutOfRangeError): |
| next_element = iterator.get_next() |
| sess.run([distribute_utils.select_replica(r, next_element) |
| for r in range(len(devices))]) |
| |
| # After re-initializing the iterator, should be able to iterate again. |
| if test_reinitialize: |
| sess.run(iterator.initializer) |
| |
| for expected_value in expected_values: |
| next_element = iterator.get_next() |
| computed_value = sess.run([distribute_utils.select_replica( |
| r, next_element) for r in range(len(devices))]) |
| if ignore_order: |
| self.assertCountEqual(expected_value, computed_value) |
| else: |
| self.assertEqual(expected_value, computed_value) |
| |
| |
| class ParameterServerStrategyTest( |
| ParameterServerStrategyTestBase, |
| strategy_test_lib.DistributionTestBase, |
| strategy_test_lib.TwoDeviceDistributionTestBase, |
| parameterized.TestCase): |
| |
| @classmethod |
| def setUpClass(cls): |
| cls._cluster_spec = multi_worker_test_base.create_in_process_cluster( |
| num_workers=3, num_ps=2) |
| cls._default_target = 'grpc://' + cls._cluster_spec[WORKER][0] |
| |
| @combinations.generate(combinations.combine(mode=['graph'])) |
| def test_num_replicas_in_sync(self): |
| strategy, _, _ = create_test_objects(num_gpus=2) |
| # All the devices on a given worker are in sync which in this case is the |
| # number of gpus on each worker. |
| self.assertEqual(2, strategy.num_replicas_in_sync) |
| |
| @combinations.generate(combinations.combine(mode=['graph'])) |
| def testDeviceAssignmentLocalCPU(self): |
| strategy, _, _ = create_test_objects(num_gpus=0) |
| self._test_device_assignment_local( |
| strategy, compute_device='CPU', variable_device='CPU', num_gpus=0) |
| |
| @combinations.generate(combinations.combine(mode=['graph'])) |
| def testDeviceAssignmentLocalOneGPU(self): |
| strategy, _, _ = create_test_objects(num_gpus=1) |
| self._test_device_assignment_local( |
| strategy, compute_device='GPU', variable_device='GPU', num_gpus=1) |
| |
| @combinations.generate(combinations.combine(mode=['graph'])) |
| def testDeviceAssignmentLocalTwoGPUs(self): |
| strategy, _, _ = create_test_objects(num_gpus=2) |
| self._test_device_assignment_local( |
| strategy, compute_device='GPU', variable_device='CPU', num_gpus=2) |
| |
| @combinations.generate( |
| combinations.combine(mode=['graph'], num_gpus=[0, 1, 2])) |
| def testDeviceAssignmentDistributed(self, num_gpus): |
| self._test_device_assignment_distributed('worker', 1, num_gpus) |
| |
| @combinations.generate( |
| combinations.combine(mode=['graph'], num_gpus=[0, 1, 2])) |
| def testDeviceAssignmentDistributedEnablePartitioner(self, num_gpus): |
| self._test_device_assignment_distributed_enable_partitioner( |
| 'worker', 1, num_gpus) |
| |
| @combinations.generate(combinations.combine(mode=['graph'])) |
| def testSimpleBetweenGraph(self): |
| self._run_between_graph_clients(self._test_simple_increment, |
| self._cluster_spec, context.num_gpus()) |
| |
| @combinations.generate( |
| combinations.combine(mode=['graph'], required_gpus=[0, 1, 2])) |
| def testLocalSimpleIncrement(self, required_gpus): |
| self._test_simple_increment(None, 0, required_gpus) |
| |
| @combinations.generate( |
| combinations.combine(mode=['graph'], required_gpus=[0, 1, 2])) |
| def testMinimizeLossGraphDistributed(self, required_gpus): |
| self._run_between_graph_clients(self._test_minimize_loss_graph, |
| self._cluster_spec, required_gpus) |
| |
| @combinations.generate( |
| combinations.combine(mode=['graph'], required_gpus=[0, 1, 2])) |
| def testMinimizeLossGraphLocal(self, required_gpus): |
| self._test_minimize_loss_graph(None, None, required_gpus) |
| |
| # TODO(priyag): Refactor this and other multi worker tests. |
| @combinations.generate( |
| combinations.combine( |
| mode=['graph'], required_gpus=[1, 2], use_dataset=[True, False])) |
| def testMakeInputFnIteratorDistributed(self, required_gpus, use_dataset): |
| if use_dataset: |
| fn = lambda: dataset_ops.Dataset.range(100) |
| else: |
| def fn(): |
| dataset = dataset_ops.Dataset.range(100) |
| it = dataset_ops.make_one_shot_iterator(dataset) |
| return it.get_next |
| |
| expected_values = [[i + j |
| for j in range(required_gpus)] |
| for i in range(0, 100, required_gpus)] |
| |
| input_fn = self._input_fn_to_test_input_context( |
| fn, |
| expected_num_replicas_in_sync=required_gpus, |
| expected_num_input_pipelines=3, |
| expected_input_pipeline_id=1) # because task_id = 1 |
| self._test_input_fn_iterator( |
| 'worker', |
| 1, |
| required_gpus, |
| input_fn, |
| expected_values, |
| test_reinitialize=use_dataset, |
| ignore_order=not use_dataset) |
| |
| @combinations.generate( |
| combinations.combine( |
| mode=['graph'], required_gpus=[1, 2], use_dataset=[True, False])) |
| def testMakeInputFnIteratorLocal(self, required_gpus, use_dataset): |
| if use_dataset: |
| fn = lambda: dataset_ops.Dataset.range(100) |
| else: |
| |
| def fn(): |
| dataset = dataset_ops.Dataset.range(100) |
| it = dataset_ops.make_one_shot_iterator(dataset) |
| return it.get_next |
| |
| expected_values = [[i + j |
| for j in range(required_gpus)] |
| for i in range(0, 100, required_gpus)] |
| |
| input_fn = self._input_fn_to_test_input_context( |
| fn, |
| expected_num_replicas_in_sync=required_gpus, |
| expected_num_input_pipelines=1, |
| expected_input_pipeline_id=0) # only one worker and pipeline for local. |
| self._test_input_fn_iterator( |
| None, |
| None, |
| required_gpus, |
| input_fn, |
| expected_values, |
| test_reinitialize=use_dataset, |
| ignore_order=not use_dataset) |
| |
| @combinations.generate(combinations.combine(mode=['graph'])) |
| def testGlobalStepUpdate(self): |
| strategy, _, _ = create_test_objects() |
| self._test_global_step_update(strategy) |
| |
| @combinations.generate(combinations.combine(mode=['graph'])) |
| def testUpdateConfigProtoMultiWorker(self): |
| strategy, _, _ = create_test_objects( |
| cluster_spec=self._cluster_spec, |
| task_type='worker', |
| task_id=1, |
| num_gpus=2) |
| |
| config_proto = config_pb2.ConfigProto(device_filters=['to_be_overridden']) |
| |
| new_config = strategy.update_config_proto(config_proto) |
| |
| # Verify device filters. |
| self.assertEqual(['/job:worker/task:1', '/job:ps'], |
| new_config.device_filters) |
| |
| # Verify isolate_session_state |
| self.assertFalse(new_config.isolate_session_state) |
| |
| @combinations.generate(combinations.combine(mode=['graph'])) |
| def testUpdateConfigProtoLocal(self): |
| strategy, _, _ = create_test_objects(num_gpus=2) |
| |
| config_proto = config_pb2.ConfigProto() |
| new_config = strategy.update_config_proto(config_proto) |
| |
| # Verify isolate_session_state |
| self.assertTrue(new_config.isolate_session_state) |
| |
| @combinations.generate(combinations.combine(mode=['graph', 'eager'])) |
| def testInMultiWorkerMode(self): |
| strategy, _, _ = create_test_objects( |
| cluster_spec=self._cluster_spec, |
| task_type='worker', |
| task_id=1, |
| num_gpus=0) |
| self.assertTrue(strategy.extended._in_multi_worker_mode()) |
| |
| @combinations.generate(combinations.combine(mode=['eager'])) |
| def testEagerCustomTrainingUnimplementedError(self): |
| cluster_spec = multi_worker_test_base.create_in_process_cluster( |
| num_workers=3, num_ps=2) |
| cluster_resolver = SimpleClusterResolver( |
| cluster_spec=multi_worker_util.normalize_cluster_spec(cluster_spec), |
| task_type='worker', |
| task_id=1, |
| num_accelerators={'GPU': 0}) |
| strategy = parameter_server_strategy.ParameterServerStrategyV1( |
| cluster_resolver) |
| dataset = dataset_ops.DatasetV2.from_tensor_slices([5., 6., 7., 8.]) |
| |
| def train_step(data): |
| return math_ops.square(data) |
| |
| self.assertRaisesRegex(NotImplementedError, 'ParameterServerStrategy*', |
| strategy.experimental_distribute_dataset, |
| dataset.batch(2)) |
| |
| self.assertRaisesRegex(NotImplementedError, 'ParameterServerStrategy*', |
| strategy.distribute_datasets_from_function, |
| lambda _: dataset) |
| |
| self.assertRaisesRegex(NotImplementedError, 'ParameterServerStrategy*', |
| strategy.scope) |
| |
| self.assertRaisesRegex(NotImplementedError, 'ParameterServerStrategy*', |
| strategy.run, train_step) |
| |
| @combinations.generate(combinations.combine( |
| mode=['graph'], |
| prefetch_to_device=[None, True])) |
| def test_prefetch_to_device_dataset(self, prefetch_to_device): |
| distribution, _, _ = create_test_objects( |
| cluster_spec=self._cluster_spec, |
| task_type='worker', |
| task_id=0, |
| num_gpus=2) |
| if prefetch_to_device is None: |
| input_options = None |
| else: |
| input_options = distribute_lib.InputOptions( |
| experimental_prefetch_to_device=prefetch_to_device) |
| dataset = dataset_ops.Dataset.range(100) |
| dataset = dataset.batch(distribution.num_replicas_in_sync) |
| dataset = distribution.experimental_distribute_dataset( |
| dataset, options=input_options) |
| if isinstance(dataset, input_lib.DistributedDatasetV1): |
| item = dataset.make_initializable_iterator().get_next() |
| else: |
| self.skipTest('unsupported test combination') |
| device_types = { |
| tf_device.DeviceSpec.from_string(tensor.device).device_type for |
| tensor in item.values} |
| self.assertAllEqual(list(device_types), ['GPU']) |
| |
| @combinations.generate(combinations.combine(mode=['graph'])) |
| def test_prefetch_to_host_dataset(self): |
| distribution, _, _ = create_test_objects( |
| cluster_spec=self._cluster_spec, |
| task_type='worker', |
| task_id=0, |
| num_gpus=2) |
| input_options = distribute_lib.InputOptions( |
| experimental_prefetch_to_device=False) |
| dataset = dataset_ops.Dataset.range(100) |
| dataset = dataset.batch(distribution.num_replicas_in_sync) |
| dataset = distribution.experimental_distribute_dataset( |
| dataset, options=input_options) |
| if isinstance(dataset, input_lib.DistributedDatasetV1): |
| item = dataset.make_initializable_iterator().get_next() |
| else: |
| self.skipTest('unsupported test combination') |
| device_types = { |
| tf_device.DeviceSpec.from_string(tensor.device).device_type for |
| tensor in item.values} |
| self.assertAllEqual(list(device_types), ['CPU']) |
| |
| |
| class ParameterServerStrategyWithChiefTest(ParameterServerStrategyTestBase, |
| parameterized.TestCase): |
| |
| @classmethod |
| def setUpClass(cls): |
| cls._cluster_spec = multi_worker_test_base.create_in_process_cluster( |
| num_workers=3, num_ps=2, has_chief=True) |
| cls._default_target = 'grpc://' + cls._cluster_spec[CHIEF][0] |
| |
| @combinations.generate( |
| combinations.combine(mode=['graph'], required_gpus=[0, 1, 2])) |
| def testSimpleBetweenGraph(self, required_gpus): |
| self._run_between_graph_clients(self._test_simple_increment, |
| self._cluster_spec, required_gpus) |
| |
| @combinations.generate( |
| combinations.combine(mode=['graph'], num_gpus=[0, 1, 2])) |
| def testMinimizeLossGraph(self, num_gpus): |
| self._run_between_graph_clients(self._test_minimize_loss_graph, |
| self._cluster_spec, num_gpus) |
| |
| @combinations.generate(combinations.combine(mode=['graph'])) |
| def testGlobalStepIsWrappedOnTwoGPUs(self): |
| strategy, _, _ = create_test_objects(num_gpus=2) |
| with ops.Graph().as_default(), strategy.scope(): |
| created_step = training_util.create_global_step() |
| get_step = training_util.get_global_step() |
| self.assertEqual(created_step, get_step, |
| msg=('created_step %s type %s vs. get_step %s type %s' % |
| (id(created_step), created_step.__class__.__name__, |
| id(get_step), get_step.__class__.__name__))) |
| self.assertIs(ps_values.AggregatingVariable, type(created_step)) |
| self.assertIs(ps_values.AggregatingVariable, type(get_step)) |
| self.assertIs(strategy, created_step.distribute_strategy) |
| |
| @combinations.generate(combinations.combine(mode=['graph'])) |
| def testGlobalStepIsNotWrappedOnOneGPU(self): |
| strategy, _, _ = create_test_objects(num_gpus=1) |
| with ops.Graph().as_default(), strategy.scope(): |
| created_step = training_util.create_global_step() |
| get_step = training_util.get_global_step() |
| self.assertEqual(created_step, get_step, |
| msg=('created_step %s type %s vs. get_step %s type %s' % |
| (id(created_step), created_step.__class__.__name__, |
| id(get_step), get_step.__class__.__name__))) |
| self.assertIs(resource_variable_ops.ResourceVariable, type(created_step)) |
| self.assertIs(resource_variable_ops.ResourceVariable, type(get_step)) |
| # All variables have an _distribute_strategy parameter. Only variable |
| # subclasses in distribution strategy expose it publicly. |
| self.assertFalse(hasattr(strategy, 'distribute_strategy')) |
| self.assertIs(strategy, created_step._distribute_strategy) |
| |
| @combinations.generate(combinations.combine(mode=['graph'], required_gpus=2)) |
| def testValueContainer(self): |
| strategy, _, _ = create_test_objects(num_gpus=2) |
| with ops.Graph().as_default(), strategy.scope(): |
| |
| def f(): |
| with backprop.GradientTape() as tape: |
| v = variable_scope.get_variable('v', initializer=10.0) |
| _ = v * v |
| v, = tape.watched_variables() |
| w = strategy.extended.value_container(v) |
| self.assertIs(ps_values.AggregatingVariable, type(w)) |
| |
| strategy.extended.call_for_each_replica(f) |
| |
| |
| class CentralStorageStrategyTest(strategy_test_lib.DistributionTestBase, |
| parameterized.TestCase): |
| |
| @combinations.generate(combinations.combine(mode=['graph', 'eager'], |
| required_gpus=2)) |
| def testNumpyDataset(self): |
| strategy, _, _ = create_test_objects(num_gpus=2) |
| self._test_numpy_dataset(strategy) |
| |
| @combinations.generate(combinations.combine(mode=['graph', 'eager'])) |
| def testInMultiWorkerMode(self): |
| strategy, _, _ = create_test_objects(num_gpus=0) |
| self.assertFalse(strategy.extended._in_multi_worker_mode()) |
| |
| |
| if __name__ == '__main__': |
| test.main() |