converted snippets to testable code
diff --git a/tensorflow/python/data/experimental/ops/data_service_ops.py b/tensorflow/python/data/experimental/ops/data_service_ops.py
index d7379f3..35554f0 100644
--- a/tensorflow/python/data/experimental/ops/data_service_ops.py
+++ b/tensorflow/python/data/experimental/ops/data_service_ops.py
@@ -321,47 +321,42 @@
To see the distributed operations in action, the `DispatchServer` should be
started first so that tf.data workers can register to it.
- ```
- dispatcher = tf.data.experimental.service.DispatchServer(port=5000)
- print(dispatcher.target) # prints grpc://localhost:5000
+ >>> dispatcher = tf.data.experimental.service.DispatchServer(port=5000)
+ >>> print(dispatcher.target) # prints grpc://localhost:5000
- dispatcher_address = dispatcher.target.split("://")[1]
- worker = tf.data.experimental.service.WorkerServer(
- port=0, dispatcher_address=dispatcher_address)
- ```
+ >>> dispatcher_address = dispatcher.target.split("://")[1]
+ >>> worker1 = tf.data.experimental.service.WorkerServer(
+ ... port=5001, dispatcher_address=dispatcher_address)
+ >>> worker2 = tf.data.experimental.service.WorkerServer(
+ ... port=5002, dispatcher_address=dispatcher_address)
+ >>> dataset = tf.data.Dataset.range(5)
+ >>> dataset = dataset.map(lambda x: x*x)
+ >>> dataset = dataset.apply(
+ ... tf.data.experimental.service.distribute("parallel_epochs",
+ ... dispatcher.target))
+ >>> dataset = dataset.map(lambda x: x+1)
- Now, the operations on a `tf.data.Dataset` can be distributed to the worker.
+ >>> print(sorted(list(dataset.as_numpy_iterator())))
+ [1, 1, 2, 2, 5, 5, 10, 10, 17, 17]
- ```
- dataset = tf.data.Dataset.range(5)
- dataset = dataset.map(lambda x: x*x)
- dataset = dataset.apply(
- tf.data.experimental.service.distribute("parallel_epochs",
- dispatcher.target))
- dataset = dataset.map(lambda x: x+1)
-
- for element in dataset:
- print(element) # prints { 1, 2, 5, 10, 17 }
- ```
-
- In the above example, the first two lines (before the call to `distribute`)
- will be executed on the tf.data worker, and the elements are provided over
- RPC. The remaining transformations (after the call to `distribute`) will be
- executed locally.
+ In the above example, the dataset operations (before applying the `distribute`
+ function on the elements) will be executed on the tf.data workers,
+ and the elements are provided over RPC. The remaining transformations
+ (after the call to `distribute`) will be executed locally.
The `job_name` argument allows jobs to be shared across multiple
datasets. Instead of each dataset creating its own job, all
datasets with the same `job_name` will consume from the same job. A new job
will be created for each iteration of the dataset (with each repetition of
`Dataset.repeat` counting as a new iteration). Suppose the `DispatchServer`
- is serving on `dataservice:5000` and two training workers (in either a single
+ is serving on `localhost:5000` and two training workers (in either a single
client or multi-client setup) iterate over the below dataset, and there is a
single tf.data worker:
```
range5_dataset = tf.data.Dataset.range(5)
dataset = range5_dataset.apply(tf.data.experimental.service.distribute(
- "parallel_epochs", "grpc://dataservice:5000", job_name="my_job_name"))
+ "parallel_epochs", "grpc://localhost:5000", job_name="my_job_name"))
for iteration in range(3):
print(list(dataset))
```
@@ -384,6 +379,12 @@
[0, 2, 3, 4]
```
+ NOTE: The dispatcher server in the above mentioned examples is hosted at
+ `grpc://localhost:5000` for demonstration purposes. However, the hostname
+ and port can be modified as per the configuration and availability of
+ resources. Also, to use protocols other than "grpc", they have to be
+ registered by dynamically linking them into the tensorflow binary.
+
Job names must not be re-used across different training jobs within the
lifetime of the tf.data service. In general, the tf.data service is expected
to live for the duration of a single training job.