blob: 5348512016efc504f92e5a956d627698b93b209a [file] [log] [blame]
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests that show Distribute Coordinator works with Estimator."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import glob
import json
import os
import sys
import tempfile
import threading
from absl.testing import parameterized
import numpy as np
import six
_portpicker_import_error = None
try:
import portpicker # pylint: disable=g-import-not-at-top
except ImportError as _error: # pylint: disable=invalid-name
_portpicker_import_error = _error
portpicker = None
# pylint: disable=g-import-not-at-top
from tensorflow.contrib.distribute.python import combinations
from tensorflow.contrib.distribute.python import mirrored_strategy
from tensorflow.contrib.distribute.python import parameter_server_strategy
from tensorflow.contrib.optimizer_v2 import adagrad
from tensorflow.core.protobuf import config_pb2
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.distribute import distribute_coordinator as dc
from tensorflow.python.distribute import estimator_training as dc_training
from tensorflow.python.distribute.distribute_config import DistributeConfig
from tensorflow.python.eager import context
from tensorflow.python.estimator import exporter as exporter_lib
from tensorflow.python.estimator import run_config as run_config_lib
from tensorflow.python.estimator import training as estimator_training
from tensorflow.python.estimator.canned import dnn_linear_combined
from tensorflow.python.estimator.canned import prediction_keys
from tensorflow.python.estimator.export import export as export_lib
from tensorflow.python.feature_column import feature_column
from tensorflow.python.platform import gfile
from tensorflow.python.platform import test
from tensorflow.python.summary import summary_iterator
from tensorflow.python.summary.writer import writer_cache
from tensorflow.python.training import server_lib
BATCH_SIZE = 10
LABEL_DIMENSION = 2
DATA = np.linspace(
0., 2., BATCH_SIZE * LABEL_DIMENSION, dtype=np.float32).reshape(
BATCH_SIZE, LABEL_DIMENSION)
EVAL_NAME = "foo"
EXPORTER_NAME = "saved_model_exporter"
MAX_STEPS = 10
CHIEF = dc._TaskType.CHIEF
EVALUATOR = dc._TaskType.EVALUATOR
WORKER = dc._TaskType.WORKER
PS = dc._TaskType.PS
original_run_distribute_coordinator = dc.run_distribute_coordinator
# TODO(yuefengz): merge this method back to test_util.
def _create_local_cluster(num_workers,
num_ps,
has_eval=False,
protocol="grpc",
worker_config=None,
ps_config=None):
if _portpicker_import_error:
raise _portpicker_import_error # pylint: disable=raising-bad-type
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {
"worker": ["localhost:%s" % port for port in worker_ports],
"ps": ["localhost:%s" % port for port in ps_ports]
}
if has_eval:
cluster_dict["evaluator"] = ["localhost:%s" % portpicker.pick_unused_port()]
cs = server_lib.ClusterSpec(cluster_dict)
workers = [
server_lib.Server(
cs,
job_name="worker",
protocol=protocol,
task_index=ix,
config=worker_config,
start=True) for ix in range(num_workers)
]
ps_servers = [
server_lib.Server(
cs,
job_name="ps",
protocol=protocol,
task_index=ix,
config=ps_config,
start=True) for ix in range(num_ps)
]
if has_eval:
evals = [
server_lib.Server(
cs,
job_name="evaluator",
protocol=protocol,
task_index=0,
config=worker_config,
start=True)
]
else:
evals = []
return workers, ps_servers, evals
def _create_in_process_cluster(num_workers, num_ps, has_eval=False):
"""Create an in-process cluster that consists of only standard server."""
# Leave some memory for cuda runtime.
if has_eval:
gpu_mem_frac = 0.7 / (num_workers + 1)
else:
gpu_mem_frac = 0.7 / num_workers
worker_config = config_pb2.ConfigProto()
worker_config.gpu_options.per_process_gpu_memory_fraction = gpu_mem_frac
# Enable collective ops which has no impact on non-collective ops.
# TODO(yuefengz, tucker): removing this after we move the initialization of
# collective mgr to the session level.
worker_config.experimental.collective_group_leader = (
"/job:worker/replica:0/task:0")
ps_config = config_pb2.ConfigProto()
ps_config.device_count["GPU"] = 0
return _create_local_cluster(
num_workers,
num_ps=num_ps,
has_eval=has_eval,
worker_config=worker_config,
ps_config=ps_config,
protocol="grpc")
def _create_cluster_spec(has_chief=False,
num_workers=1,
num_ps=0,
has_eval=False):
if _portpicker_import_error:
raise _portpicker_import_error # pylint: disable=raising-bad-type
cluster_spec = {}
if has_chief:
cluster_spec[CHIEF] = ["localhost:%s" % portpicker.pick_unused_port()]
if num_workers:
cluster_spec[WORKER] = [
"localhost:%s" % portpicker.pick_unused_port()
for _ in range(num_workers)
]
if num_ps:
cluster_spec[PS] = [
"localhost:%s" % portpicker.pick_unused_port() for _ in range(num_ps)
]
if has_eval:
cluster_spec[EVALUATOR] = ["localhost:%s" % portpicker.pick_unused_port()]
return cluster_spec
def _bytes_to_str(maybe_bytes):
if isinstance(maybe_bytes, six.string_types):
return maybe_bytes
else:
return str(maybe_bytes, "utf-8")
def _strip_protocol(target):
# cluster_spec expects "host:port" strings.
if "//" in target:
return target.split("//")[1]
else:
return target
class DistributeCoordinatorIntegrationTest(test.TestCase,
parameterized.TestCase):
@classmethod
def setUpClass(cls):
"""Create a local cluster with 2 workers."""
cls._workers, cls._ps, cls._evals = _create_in_process_cluster(
num_workers=3, num_ps=2, has_eval=True)
cls._cluster_spec = {
"worker": [
_strip_protocol(_bytes_to_str(w.target)) for w in cls._workers
],
"ps": [_strip_protocol(_bytes_to_str(ps.target)) for ps in cls._ps],
"evaluator": [
_strip_protocol(_bytes_to_str(e.target)) for e in cls._evals
]
}
def setUp(self):
self._model_dir = tempfile.mkdtemp()
self._event = threading.Event()
super(DistributeCoordinatorIntegrationTest, self).setUp()
def dataset_input_fn(self, x, y, batch_size, shuffle):
def input_fn():
dataset = dataset_ops.Dataset.from_tensor_slices((x, y))
if shuffle:
dataset = dataset.shuffle(batch_size)
dataset = dataset.repeat(100).batch(batch_size)
return dataset
return input_fn
def _get_exporter(self, name, fc):
feature_spec = feature_column.make_parse_example_spec(fc)
serving_input_receiver_fn = (
export_lib.build_parsing_serving_input_receiver_fn(feature_spec))
return exporter_lib.LatestExporter(
name, serving_input_receiver_fn=serving_input_receiver_fn)
def _extract_loss_and_global_step(self, event_folder):
"""Returns the loss and global step in last event."""
event_paths = glob.glob(os.path.join(event_folder, "events*"))
loss = None
global_step_count = None
for e in summary_iterator.summary_iterator(event_paths[-1]):
current_loss = None
for v in e.summary.value:
if v.tag == "loss":
current_loss = v.simple_value
# If loss is not found, global step is meaningless.
if current_loss is None:
continue
current_global_step = e.step
if global_step_count is None or current_global_step > global_step_count:
global_step_count = current_global_step
loss = current_loss
return (loss, global_step_count)
def _get_estimator(self,
train_distribute,
eval_distribute,
remote_cluster=None):
input_dimension = LABEL_DIMENSION
linear_feature_columns = [
feature_column.numeric_column("x", shape=(input_dimension,))
]
dnn_feature_columns = [
feature_column.numeric_column("x", shape=(input_dimension,))
]
return dnn_linear_combined.DNNLinearCombinedRegressor(
linear_feature_columns=linear_feature_columns,
dnn_hidden_units=(2, 2),
dnn_feature_columns=dnn_feature_columns,
label_dimension=LABEL_DIMENSION,
model_dir=self._model_dir,
dnn_optimizer=adagrad.AdagradOptimizer(0.001),
linear_optimizer=adagrad.AdagradOptimizer(0.001),
config=run_config_lib.RunConfig(
experimental_distribute=DistributeConfig(
train_distribute=train_distribute,
eval_distribute=eval_distribute,
remote_cluster=remote_cluster)))
def _complete_flow(self,
train_distribute,
eval_distribute,
remote_cluster=None):
estimator = self._get_estimator(train_distribute, eval_distribute,
remote_cluster)
input_dimension = LABEL_DIMENSION
train_input_fn = self.dataset_input_fn(
x={"x": DATA},
y=DATA,
batch_size=BATCH_SIZE // len(train_distribute.worker_devices),
shuffle=True)
if eval_distribute:
eval_batch_size = BATCH_SIZE // len(eval_distribute.worker_devices)
else:
eval_batch_size = BATCH_SIZE
eval_input_fn = self.dataset_input_fn(
x={"x": DATA}, y=DATA, batch_size=eval_batch_size, shuffle=False)
linear_feature_columns = [
feature_column.numeric_column("x", shape=(input_dimension,))
]
dnn_feature_columns = [
feature_column.numeric_column("x", shape=(input_dimension,))
]
feature_columns = linear_feature_columns + dnn_feature_columns
estimator_training.train_and_evaluate(
estimator,
estimator_training.TrainSpec(train_input_fn, max_steps=MAX_STEPS),
estimator_training.EvalSpec(
name=EVAL_NAME,
input_fn=eval_input_fn,
steps=None,
exporters=self._get_exporter(EXPORTER_NAME, feature_columns),
start_delay_secs=0,
throttle_secs=1))
return estimator
def _inspect_train_and_eval_events(self, estimator):
# Make sure nothing is stuck in limbo.
writer_cache.FileWriterCache.clear()
# Examine the training events. Use a range to check global step to avoid
# flakyness due to global step race condition.
training_loss, _ = self._extract_loss_and_global_step(self._model_dir)
self.assertIsNotNone(training_loss)
# Examine the eval events. The global step should be accurate.
eval_dir = os.path.join(self._model_dir, "eval_" + EVAL_NAME)
eval_loss, eval_global_step = self._extract_loss_and_global_step(
event_folder=eval_dir)
self.assertIsNotNone(eval_loss)
self.assertGreaterEqual(eval_global_step, MAX_STEPS)
# Examine the export folder.
export_dir = os.path.join(
os.path.join(self._model_dir, "export"), EXPORTER_NAME)
self.assertTrue(gfile.Exists(export_dir))
# Examine the ckpt for predict.
def predict_input_fn():
return dataset_ops.Dataset.from_tensor_slices({
"x": DATA
}).batch(BATCH_SIZE)
predicted_proba = np.array([
x[prediction_keys.PredictionKeys.PREDICTIONS]
for x in estimator.predict(predict_input_fn)
])
self.assertAllEqual((BATCH_SIZE, LABEL_DIMENSION), predicted_proba.shape)
@combinations.generate(
combinations.combine(
mode=["graph"],
train_distribute_cls=[
mirrored_strategy.MirroredStrategy,
parameter_server_strategy.ParameterServerStrategy
],
eval_distribute_cls=[
None, mirrored_strategy.MirroredStrategy,
parameter_server_strategy.ParameterServerStrategy
],
required_gpus=1))
def test_complete_flow_standalone_client(self, train_distribute_cls,
eval_distribute_cls):
try:
train_distribute = train_distribute_cls(num_gpus=context.num_gpus())
except TypeError:
train_distribute = train_distribute_cls(num_gpus_per_worker=2)
if eval_distribute_cls:
eval_distribute = eval_distribute_cls()
else:
eval_distribute = None
estimator = self._complete_flow(
train_distribute, eval_distribute, remote_cluster=self._cluster_spec)
self._inspect_train_and_eval_events(estimator)
def _mock_run_distribute_coordinator(
self,
worker_fn,
strategy,
eval_fn,
eval_strategy,
mode=dc.CoordinatorMode.STANDALONE_CLIENT,
cluster_spec=None,
session_config=None):
# Calls the origial `run_distribute_coordinator` method but gets task config
# from environment variables and then signals the caller.
task_type = None
task_id = None
if not cluster_spec:
cluster_spec = None
tf_config = json.loads(os.environ.get("TF_CONFIG", "{}"))
if not cluster_spec:
cluster_spec = tf_config.get("cluster", {})
task_env = tf_config.get("task", {})
if task_env:
task_type = task_env.get("type", task_type)
task_id = int(task_env.get("index", task_id))
self._event.set()
original_run_distribute_coordinator(
worker_fn,
strategy,
eval_fn,
eval_strategy,
mode=mode,
cluster_spec=cluster_spec,
task_type=task_type,
task_id=task_id,
session_config=session_config)
def _task_thread(self, train_distribute, eval_distribute):
with test.mock.patch.object(dc, "run_distribute_coordinator",
self._mock_run_distribute_coordinator):
self._complete_flow(train_distribute, eval_distribute)
def _run_task_in_thread(self, cluster_spec, task_type, task_id,
train_distribute, eval_distribute):
if task_type:
tf_config = {
"cluster": cluster_spec,
"task": {
"type": task_type,
"index": task_id
}
}
else:
tf_config = {
"cluster": cluster_spec,
"task": {
"type": task_type,
"index": task_id
}
}
self._event.clear()
t = threading.Thread(
target=self._task_thread, args=(train_distribute, eval_distribute))
with test.mock.patch.dict("os.environ",
{"TF_CONFIG": json.dumps(tf_config)}):
t.start()
self._event.wait()
return t
def _run_multiple_tasks_in_threads(self, cluster_spec, train_distribute,
eval_distribute):
threads = {}
for task_type in cluster_spec.keys():
threads[task_type] = []
for task_id in range(len(cluster_spec[task_type])):
t = self._run_task_in_thread(cluster_spec, task_type, task_id,
train_distribute, eval_distribute)
threads[task_type].append(t)
return threads
@combinations.generate(
combinations.combine(
mode=["graph"],
train_distribute_cls=[
parameter_server_strategy.ParameterServerStrategy,
],
eval_distribute_cls=[
None, mirrored_strategy.MirroredStrategy,
parameter_server_strategy.ParameterServerStrategy
],
required_gpus=1))
def test_complete_flow_indepedent_worker_between_graph(
self, train_distribute_cls, eval_distribute_cls):
train_distribute = train_distribute_cls(
num_gpus_per_worker=context.num_gpus())
if eval_distribute_cls:
eval_distribute = eval_distribute_cls()
else:
eval_distribute = None
cluster_spec = _create_cluster_spec(num_workers=3, num_ps=2, has_eval=True)
threads = self._run_multiple_tasks_in_threads(
cluster_spec, train_distribute, eval_distribute)
for task_type, ts in threads.items():
if task_type == PS:
continue
for t in ts:
t.join()
estimator = self._get_estimator(train_distribute, eval_distribute)
self._inspect_train_and_eval_events(estimator)
@combinations.generate(
combinations.combine(
mode=["graph"],
train_distribute_cls=[mirrored_strategy.MirroredStrategy],
eval_distribute_cls=[None, mirrored_strategy.MirroredStrategy],
required_gpus=1))
def test_complete_flow_indepedent_worker_in_graph(self, train_distribute_cls,
eval_distribute_cls):
train_distribute = train_distribute_cls(num_gpus=context.num_gpus())
if eval_distribute_cls:
eval_distribute = eval_distribute_cls()
else:
eval_distribute = None
cluster_spec = _create_cluster_spec(num_workers=3, num_ps=2, has_eval=True)
threads = self._run_multiple_tasks_in_threads(
cluster_spec, train_distribute, eval_distribute)
threads[WORKER][0].join()
threads[EVALUATOR][0].join()
estimator = self._get_estimator(train_distribute, eval_distribute)
self._inspect_train_and_eval_events(estimator)
TF_CONFIG_WITH_CHIEF = {
"cluster": {
"chief": ["fake_chief"],
},
"task": {
"type": "chief",
"index": 0
}
}
TF_CONFIG_WITH_MASTER = {
"cluster": {
"master": ["fake_master"],
},
"task": {
"type": "master",
"index": 0
}
}
TF_CONFIG_WITHOUT_TASK = {"cluster": {"chief": ["fake_worker"]}}
class RunConfigTest(test.TestCase):
def test_previously_unexpected_cluster_spec(self):
with test.mock.patch.dict(
"os.environ", {"TF_CONFIG": json.dumps(TF_CONFIG_WITHOUT_TASK)}):
run_config_lib.RunConfig(
experimental_distribute=DistributeConfig(
train_distribute=mirrored_strategy.MirroredStrategy(num_gpus=2)))
def test_should_run_distribute_coordinator(self):
"""Tests that should_run_distribute_coordinator return a correct value."""
# We don't use distribute coordinator for local training.
self.assertFalse(
dc_training.should_run_distribute_coordinator(
run_config_lib.RunConfig()))
# When `train_distribute` is not specified, don't use distribute
# coordinator.
with test.mock.patch.dict("os.environ",
{"TF_CONFIG": json.dumps(TF_CONFIG_WITH_CHIEF)}):
self.assertFalse(
dc_training.should_run_distribute_coordinator(
run_config_lib.RunConfig()))
# When `train_distribute` is specified and TF_CONFIG is detected, use
# distribute coordinator.
with test.mock.patch.dict("os.environ",
{"TF_CONFIG": json.dumps(TF_CONFIG_WITH_CHIEF)}):
config_with_train_distribute = run_config_lib.RunConfig(
experimental_distribute=DistributeConfig(
train_distribute=mirrored_strategy.MirroredStrategy(num_gpus=2)))
config_with_eval_distribute = run_config_lib.RunConfig(
experimental_distribute=DistributeConfig(
eval_distribute=mirrored_strategy.MirroredStrategy(num_gpus=2)))
self.assertTrue(
dc_training.should_run_distribute_coordinator(
config_with_train_distribute))
self.assertFalse(
dc_training.should_run_distribute_coordinator(
config_with_eval_distribute))
# With a master in the cluster, don't run distribute coordinator.
with test.mock.patch.dict("os.environ",
{"TF_CONFIG": json.dumps(TF_CONFIG_WITH_MASTER)}):
config = run_config_lib.RunConfig(
experimental_distribute=DistributeConfig(
train_distribute=mirrored_strategy.MirroredStrategy(num_gpus=2)))
self.assertFalse(dc_training.should_run_distribute_coordinator(config))
def test_init_run_config_duplicate_distribute(self):
with self.assertRaises(ValueError):
run_config_lib.RunConfig(
train_distribute=mirrored_strategy.MirroredStrategy(),
experimental_distribute=DistributeConfig(
train_distribute=mirrored_strategy.MirroredStrategy()))
with self.assertRaises(ValueError):
run_config_lib.RunConfig(
eval_distribute=mirrored_strategy.MirroredStrategy(),
experimental_distribute=DistributeConfig(
eval_distribute=mirrored_strategy.MirroredStrategy()))
def test_init_run_config_none_distribute_coordinator_mode(self):
# We don't use distribute coordinator for local training.
config = run_config_lib.RunConfig(
train_distribute=mirrored_strategy.MirroredStrategy())
dc_training.init_run_config(config, {})
self.assertIsNone(config._distribute_coordinator_mode)
# With a master in the cluster, don't run distribute coordinator.
with test.mock.patch.dict("os.environ",
{"TF_CONFIG": json.dumps(TF_CONFIG_WITH_MASTER)}):
config = run_config_lib.RunConfig(
train_distribute=mirrored_strategy.MirroredStrategy())
self.assertIsNone(config._distribute_coordinator_mode)
# When `train_distribute` is not specified, don't use distribute
# coordinator.
with test.mock.patch.dict("os.environ",
{"TF_CONFIG": json.dumps(TF_CONFIG_WITH_CHIEF)}):
config = run_config_lib.RunConfig()
self.assertFalse(hasattr(config, "_distribute_coordinator_mode"))
def test_init_run_config_independent_worker(self):
# When `train_distribute` is specified and TF_CONFIG is detected, use
# distribute coordinator with INDEPENDENT_WORKER mode.
with test.mock.patch.dict("os.environ",
{"TF_CONFIG": json.dumps(TF_CONFIG_WITH_CHIEF)}):
config = run_config_lib.RunConfig(
train_distribute=mirrored_strategy.MirroredStrategy())
self.assertEqual(config._distribute_coordinator_mode,
dc.CoordinatorMode.INDEPENDENT_WORKER)
def test_init_run_config_standalone_client(self):
# When `train_distribute` is specified, TF_CONFIG is detected and
# `experimental.remote_cluster` is set use distribute coordinator with
# STANDALONE_CLIENT mode.
config = run_config_lib.RunConfig(
train_distribute=mirrored_strategy.MirroredStrategy(),
experimental_distribute=DistributeConfig(
remote_cluster={"chief": ["fake_worker"]}))
self.assertEqual(config._distribute_coordinator_mode,
dc.CoordinatorMode.STANDALONE_CLIENT)
if __name__ == "__main__":
with test.mock.patch.object(sys, "exit", os._exit):
test.main()