| # Copyright 2018 The TensorFlow Authors. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============================================================================== |
| """This module customizes `test_combinations` for `tf.distribute.Strategy`. |
| |
| Additionally it provides `generate()`, `combine()` and `times()` with |
| `tf.distribute.Strategy` customizations as a default. |
| """ |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import re |
| import sys |
| import types |
| import unittest |
| |
| import six |
| |
| from tensorflow.python.distribute import multi_process_runner |
| from tensorflow.python.distribute import multi_worker_test_base |
| from tensorflow.python.eager import context |
| from tensorflow.python.framework import combinations as framework_combinations |
| from tensorflow.python.framework import test_combinations as combinations_lib |
| from tensorflow.python.platform import flags |
| from tensorflow.python.util import tf_decorator |
| from tensorflow.python.util import tf_inspect |
| |
| FLAGS = flags.FLAGS |
| |
| |
| # TODO(rchao): Rename `distribution` parameter to `strategy` or |
| # `distribute_strategy` in all tests. |
| class DistributionParameter(combinations_lib.ParameterModifier): |
| """Transforms arguments of type `NamedDistribution`. |
| |
| Convert all arguments of type `NamedDistribution` to the value of their |
| `strategy` property. |
| """ |
| |
| def modified_arguments(self, kwargs, requested_parameters): |
| del requested_parameters |
| distribution_arguments = {} |
| for k, v in kwargs.items(): |
| if isinstance(v, NamedDistribution): |
| distribution_arguments[k] = v.strategy |
| return distribution_arguments |
| |
| |
| class ClusterParameters(combinations_lib.ParameterModifier): |
| """Adds cluster parameters if a `NamedDistribution` has it. |
| |
| It needs to be before DistributionParameter. |
| """ |
| |
| def modified_arguments(self, kwargs, requested_parameters): |
| strategy = None |
| for _, v in kwargs.items(): |
| if isinstance(v, NamedDistribution): |
| if strategy is not None and _num_total_workers(v.has_chief, |
| v.num_workers) > 1: |
| raise ValueError("Only support one NamedDistribution for multi worker" |
| "tests.") |
| strategy = v |
| # Always set cluster parameters if they're requested. So that generate() |
| # works when there's no startegy in the combinations. |
| update = {} |
| if "has_chief" in requested_parameters: |
| update["has_chief"] = strategy.has_chief if strategy else False |
| if "num_workers" in requested_parameters: |
| update["num_workers"] = strategy.num_workers if strategy else 1 |
| return update |
| |
| |
| class NamedGPUCombination(combinations_lib.TestCombination): |
| """Enable tests to request GPU hardware and skip non-GPU combinations. |
| |
| This class expects test_combinations to be generated with `NamedDistribution` |
| wrapping instances of `tf.distribute.Strategy`. |
| |
| Optionally, the `required_gpus` argument is supported. GPU hardware is |
| required, if its value is `True` or > 0. |
| |
| Attributes: |
| GPU_TEST: The environment is considered to have GPU hardware available if |
| the name of the program contains "test_gpu" or "test_xla_gpu". |
| """ |
| |
| GPU_TEST = re.search(r"(test_gpu|test_xla_gpu)$", sys.argv[0]) |
| |
| def should_execute_combination(self, kwargs): |
| distributions = [ |
| v for v in kwargs.values() if isinstance(v, NamedDistribution) |
| ] |
| required_gpus = kwargs.get("required_gpus", None) |
| |
| if distributions and required_gpus: |
| raise ValueError("Do not use `required_gpus` and arguments of type " |
| "NamedDistribution together.") |
| |
| number_of_required_gpus = max([required_gpus or 0] + |
| [d.required_gpus or 0 for d in distributions]) |
| |
| if not number_of_required_gpus and GPUCombination.GPU_TEST: |
| return (False, "Test that doesn't require GPUs.") |
| elif (number_of_required_gpus > 0 |
| and context.num_gpus() < number_of_required_gpus): |
| return (False, ("Only {} of {} required GPUs are available.".format( |
| context.num_gpus(), number_of_required_gpus))) |
| else: |
| return (True, None) |
| |
| def parameter_modifiers(self): |
| return [combinations_lib.OptionalParameter("required_gpus")] |
| |
| |
| class GPUCombination(NamedGPUCombination): |
| """NamedGPUCombination that passes `tf.distribute.Strategy` to the tests.""" |
| |
| def parameter_modifiers(self): |
| return [ |
| ClusterParameters(), |
| DistributionParameter(), |
| ] + NamedGPUCombination.parameter_modifiers(self) |
| |
| |
| class NamedTPUCombination(combinations_lib.TestCombination): |
| """Allow to request TPU hardware and skip non-TPU combinations. |
| |
| This class expects test_combinations to be generated with `NamedDistribution` |
| wrapping instances of `tf.distribute.Strategy`. |
| |
| Optionally, the `required_tpus` parameter is supported. TPU hardware is |
| required, if its argument is `True` or > 0. |
| |
| Optionally, the `use_cloud_tpu` parameter is supported. If TPU hardware is |
| required by `required_tpus`, it specifically must be a Cloud TPU (specified |
| with `--tpu`) if `use_cloud_tpu` is `True`. |
| |
| Attributes: |
| TPU_TEST: The environment is considered to have TPU hardware available if |
| the name of the program contains "test_tpu". |
| """ |
| |
| TPU_TEST = "test_tpu" in sys.argv[0] |
| |
| def should_execute_combination(self, kwargs): |
| distributions = [ |
| v for v in kwargs.values() if isinstance(v, NamedDistribution) |
| ] |
| # TODO(isaprykin): Migrate all tests away from using 'required_tpu' in favor |
| # of 'required_tpus'. |
| if "required_tpus" in kwargs and "required_tpu" in kwargs: |
| raise ValueError("Do not use `required_tpu`. Both `required_tpus` and " |
| "`required_tpu` were specified.") |
| required_tpus = kwargs.get("required_tpus", None) or kwargs.get( |
| "required_tpu", None) |
| |
| if distributions and required_tpus: |
| raise ValueError("Do not use `required_tpus` and arguments of type " |
| "NamedDistribution together.") |
| |
| # TODO(isaprykin): Add support for a particular number of TPUs. Right now |
| # it's binary. |
| number_of_required_tpus = max([required_tpus or 0] + |
| [d.required_tpu or 0 for d in distributions]) |
| use_cloud_tpu = any([kwargs.get("use_cloud_tpu")] + |
| [d.use_cloud_tpu for d in distributions]) |
| tpu = hasattr(FLAGS, "tpu") and FLAGS.tpu or "" |
| |
| if not number_of_required_tpus and TPUCombination.TPU_TEST: |
| return (False, "Test that doesn't require TPUs.") |
| if number_of_required_tpus and not TPUCombination.TPU_TEST: |
| return (False, "Test requires a TPU, but it's not available.") |
| if use_cloud_tpu and not tpu: |
| return (False, "Test requires a Cloud TPU, but none specified.") |
| if not use_cloud_tpu and tpu: |
| return (False, "Test requires local TPU, but Cloud TPU specified.") |
| return (True, None) |
| |
| def parameter_modifiers(self): |
| return [ |
| combinations_lib.OptionalParameter("required_tpus"), |
| combinations_lib.OptionalParameter("required_tpu"), |
| combinations_lib.OptionalParameter("use_cloud_tpu"), |
| ] |
| |
| |
| class TPUCombination(NamedTPUCombination): |
| """NamedTPUCombination that passes `tf.distribute.Strategy` to the tests.""" |
| |
| def parameter_modifiers(self): |
| return [ |
| ClusterParameters(), |
| DistributionParameter(), |
| ] + NamedTPUCombination.parameter_modifiers(self) |
| |
| |
| class NamedDistribution(object): |
| """Wraps a `tf.distribute.Strategy` and adds a name for test titles.""" |
| |
| def __init__(self, |
| name, |
| distribution_fn, |
| required_gpus=None, |
| required_tpu=False, |
| use_cloud_tpu=False, |
| has_chief=False, |
| num_workers=1): |
| """Initialize NamedDistribution. |
| |
| Args: |
| name: Name that will be a part of the name of the test case. |
| distribution_fn: A callable that creates a `tf.distribute.Strategy`. |
| required_gpus: The number of GPUs that the strategy requires. |
| required_tpu: Whether the strategy requires TPU. |
| use_cloud_tpu: Whether the strategy requires cloud TPU. |
| has_chief: Whether the strategy requires a chief worker. |
| num_workers: The number of workers that the strategy requires. |
| """ |
| object.__init__(self) |
| self._name = name |
| self._distribution_fn = distribution_fn |
| self.required_gpus = required_gpus |
| self.required_tpu = required_tpu |
| self.use_cloud_tpu = use_cloud_tpu |
| self.has_chief = has_chief |
| self.num_workers = num_workers |
| |
| @property |
| def strategy(self): |
| return self._distribution_fn() |
| |
| def __repr__(self): |
| return self._name |
| |
| |
| def concat(*combined): |
| """Concats combinations.""" |
| result = [] |
| for one in combined: |
| result += one |
| return result |
| |
| |
| def generate(combinations, test_combinations=()): |
| # pylint: disable=g-doc-args,g-doc-return-or-yield |
| """Distributed adapter of `framework.combinations_lib.generate`. |
| |
| All tests with distributed strategy should use this one instead of |
| `framework.test_combinations.generate`. This function has support of strategy |
| combinations, GPU/TPU and multi worker support. |
| |
| See `framework.test_combinations_lib.generate` for usage. |
| """ |
| # pylint: enable=g-doc-args,g-doc-return-or-yield |
| default_combinations = ( |
| framework_combinations.EagerGraphCombination(), |
| framework_combinations.TFVersionCombination(), |
| GPUCombination(), |
| TPUCombination(), |
| ) |
| # We apply our own decoration to handle multi worker tests before applying |
| # framework.test_combinations.generate. The order is important since we need |
| # framework.test_combinations.generate to apply all parameter modifiers first. |
| combination_decorator = combinations_lib.generate( |
| combinations, test_combinations=default_combinations + test_combinations) |
| |
| def decorator(test_method_or_class): |
| if isinstance(test_method_or_class, type): |
| # If it's a test class. |
| class_object = test_method_or_class |
| # Decorate each test method with _multi_worker_test. |
| for name, test_method in six.iteritems(class_object.__dict__.copy()): |
| if (name.startswith(unittest.TestLoader.testMethodPrefix) and |
| isinstance(test_method, types.FunctionType)): |
| setattr(class_object, name, _multi_worker_test(test_method)) |
| return combination_decorator(class_object) |
| else: |
| return combination_decorator(_multi_worker_test(test_method_or_class)) |
| |
| return decorator |
| |
| |
| combine = combinations_lib.combine |
| times = combinations_lib.times |
| NamedObject = combinations_lib.NamedObject |
| |
| |
| def main(): |
| """Tests must call this main().""" |
| return multi_process_runner.test_main() |
| |
| |
| # Identifies whether we're in the main process or worker processes. |
| # `_multi_worker_test` decoration behaves differently in the main processs and |
| # the worker processes. See the documentation of _multi_worker_test for detail. |
| _running_in_worker = False |
| |
| |
| def _test_runner(test_id): |
| """Executes the test with the given test_id. |
| |
| This is a simple wrapper around TestRunner to be used with |
| multi_process_runner. Similar to test.main(), but it executes only one test |
| specified by test_id and returns whether the test succeeds. If the test fails, |
| the function prints failures and errors to stdout. |
| |
| Args: |
| test_id: TestCase.id() |
| |
| Returns: |
| A boolean indicates whether the test succeeds. |
| """ |
| global _running_in_worker |
| # No need to restore the value of _running_in_worker since it should always be |
| # True in worker processes. |
| _running_in_worker = True |
| test = unittest.defaultTestLoader.loadTestsFromName(test_id) |
| runner = unittest.TextTestRunner() |
| result = runner.run(test) |
| # Print failures and errors to stdout and multi_process_runner will collect |
| # them and stream back to the main process. |
| for _, msg in result.failures + result.errors: |
| print(msg) |
| # Return expected failures as failures, so that the main process can get |
| # them and fail as expected. |
| if result.expectedFailures: |
| return False |
| return result.wasSuccessful() |
| |
| |
| def _multi_worker_test(test_method): |
| """Decorate test_method so that it runs in each worker. |
| |
| We use `multi_process_runner` to simulate multiple workers. Since we run the |
| this function in the main process and all worker processes, this decoration |
| behaves differently in the main process and worker procssses. In the main |
| process, it spawns subprocesses and runs the test on each of them; in a worker |
| process, it executes test in the same way as a normal test, e.g. |
| setUp()/tearDown() are called before/after the test. |
| |
| Args: |
| test_method: a function which must be a test method. |
| |
| Returns: |
| Decorated `test_method`. Note that the decorated function has additional |
| arguments. |
| """ |
| |
| def decorator(self, has_chief, num_workers, **kwargs): |
| if _num_total_workers(has_chief, num_workers) == 1 or _running_in_worker: |
| # We're in worker process or the test is for single worker. Either case we |
| # execute the test method directly instead of spawning subprocesses. |
| test_method(self, **kwargs) |
| return |
| |
| # We're in the main process. We spawn subprocesses and run the *test* on |
| # each of them. Note that we're not directly executing test_method passed to |
| # _multi_worker_test, because we need setUp()/tearDown() to be called and |
| # all the decorations on the test method. The conceptual call stack is: |
| # [main process]test.main() |
| # [main process]test_runner.run(test) |
| # [main process]wrapper by combinations.generate() |
| # [main process]_multi_worker_test.decorator() |
| # # A sub process goes through the same code path as the main |
| # # process. |
| # [sub process]_test_runner() |
| # [sub process]test_runner.run(test) |
| # [sub process]wrapper by combinations.generate() |
| # [sub process]_multi_worker_test.decorator() |
| # # _running_in_worker is True |
| # [sub process]test_method() |
| test_id = self.id() |
| cluster_spec = multi_worker_test_base.create_cluster_spec( |
| has_chief=has_chief, num_workers=num_workers, num_ps=0, has_eval=False) |
| result = multi_process_runner.run( |
| _test_runner, cluster_spec, args=(test_id,)) |
| for was_successful in result.return_value: |
| if not was_successful: |
| raise AssertionError("some worker failed, see logs for details") |
| |
| argspec = tf_inspect.getfullargspec(test_method) |
| decorator_args = (argspec.args or []) + ["has_chief", "num_workers"] |
| decorator_argspec = argspec._replace(args=decorator_args) |
| return tf_decorator.make_decorator( |
| test_method, decorator, decorator_argspec=decorator_argspec) |
| |
| |
| def _num_total_workers(has_chief, num_workers): |
| """Returns the number of workers including the chief.""" |
| if has_chief: |
| return num_workers + 1 |
| return num_workers |