"cells": [
"cell_type": "markdown",
"metadata": {
"id": "KQALG9h23b0R"
"source": [
"##### Copyright 2020 The TensorFlow Authors."
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "both",
"id": "U34SJW0W3dg_"
"outputs": [],
"source": [
"#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
"cell_type": "markdown",
"metadata": {
"id": "VIX1XZHJ3gFo"
"source": [
"# TensorFlow NumPy: Distributed Image Classification Tutorial"
"cell_type": "markdown",
"metadata": {
"id": "f7NApJ7R3ndN"
"source": [
"## Overview\n",
"TensorFlow implements a subset of the [NumPy API](, available as `tf.experimental.numpy`. This allows running NumPy code, accelerated by TensorFlow together with access to all of TensorFlow's APIs. Please see [TensorFlow NumPy Guide]( to get started.\n",
"Here you will learn how to build a deep model for an image classification task by using TensorFlow Numpy APIs. For using higher level `tf.keras` APIs, see the following [tutorial]("
"cell_type": "markdown",
"metadata": {
"id": "IYDdfih63rSG"
"source": [
"## Setup\n",
"tf.experimental.numpy will be available in the stable branch starting from TensorFlow 2.4. For now, it is available in nightly."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "3IlLM-YlTMv5"
"outputs": [],
"source": [
"!pip install --quiet --upgrade tf-nightly\n",
"!pip install --quiet --upgrade tensorflow-datasets"
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "U13hRXHKTcsE"
"outputs": [],
"source": [
"import collections\n",
"import functools\n",
"import matplotlib.pyplot as plt\n",
"import os\n",
"import tempfile\n",
"import tensorflow as tf\n",
"import tensorflow.experimental.numpy as tnp\n",
"import tensorflow_datasets as tfds\n",
"gpus = tf.config.list_physical_devices('GPU')\n",
"if gpus:\n",
" tf.config.set_logical_device_configuration(gpus[0], [\n",
" tf.config.LogicalDeviceConfiguration(memory_limit=128),\n",
" tf.config.LogicalDeviceConfiguration(memory_limit=128)])\n",
" devices = tf.config.list_logical_devices('GPU')\n",
" cpus = tf.config.list_physical_devices('CPU')\n",
" tf.config.set_logical_device_configuration(cpus[0], [\n",
" tf.config.LogicalDeviceConfiguration(),\n",
" tf.config.LogicalDeviceConfiguration()])\n",
" devices = tf.config.list_logical_devices('CPU')\n",
"print(\"Using following virtual devices\", devices)"
"cell_type": "markdown",
"metadata": {
"id": "AxNuZSqZKcdM"
"source": [
"## Mnist dataset\n",
"Mnist contains 28 * 28 images of digits from 0 to 9. The task is to classify the images as these 10 possible classes.\n",
"Below, load the dataset and examine a few samples."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "yKf9Tm5OjwGK"
"outputs": [],
"source": [
"NUM_CLASSES = 10\n",
"BATCH_SIZE = 64\n",
"INPUT_SIZE = 28 * 28\n",
"def process_data(data_dict):\n",
" images = tnp.asarray(data_dict['image']) / 255.0\n",
" images = images.reshape(-1, INPUT_SIZE).astype(tnp.float32)\n",
" labels = tnp.asarray(data_dict['label'])\n",
" labels = tnp.eye(NUM_CLASSES, dtype=tnp.float32)[labels]\n",
" return images, labels\n",
"with tf.device(\"CPU:0\"):\n",
" train_dataset = tfds.load('mnist', split='train', shuffle_files=True, \n",
" batch_size=BATCH_SIZE).map(process_data)\n",
" test_dataset = tfds.load('mnist', split='test', shuffle_files=True, \n",
" batch_size=-1)\n",
" x_test, y_test = process_data(test_dataset)\n",
" # Plots some examples.\n",
" images, labels = next(iter(train_dataset.take(1)))\n",
" _, axes = plt.subplots(1, 8, figsize=(12, 96))\n",
" for i, ax in enumerate(axes):\n",
" ax.imshow(images[i].reshape(28, 28), cmap='gray')\n",
" ax.axis(\"off\")\n",
" ax.set_title(\"Label: %d\" % int(tnp.argmax(labels[i])))"
"cell_type": "markdown",
"metadata": {
"id": "ZDJQp4i00qaJ"
"source": [
"## Define layers and model\n",
"Here, you will implement a multi-layer perceptron model that trains on the MNIST data. First, define a `Dense` class which applies a linear transform followed by a \"relu\" non-linearity."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "44yzAmBFreyg"
"outputs": [],
"source": [
"class Dense(tf.Module):\n",
" def __init__(self, units, use_relu=True):\n",
" self.wt = None\n",
" self.bias = None\n",
" self._use_relu = use_relu\n",
" self._built = False\n",
" self._units = units\n",
" def __call__(self, inputs):\n",
" if not self._built:\n",
" self._build(inputs.shape)\n",
" x = tnp.add(tnp.matmul(inputs, self.wt), self.bias)\n",
" if self._use_relu:\n",
" return tnp.maximum(x, 0.)\n",
" else:\n",
" return x\n",
" @property\n",
" def params(self):\n",
" assert self._built\n",
" return [self.wt, self.bias]\n",
" def _build(self, input_shape):\n",
" size = input_shape[1]\n",
" stddev = 1 / tnp.sqrt(size)\n",
" # Note that model parameters are `tf.Variable` since they requires\n",
" # mutation, which is currently unsupported by TensorFlow NumPy.\n",
" # Also note interoperation with TensorFlow APIs below.\n",
" self.wt = tf.Variable(\n",
" tf.random.truncated_normal(\n",
" [size, self._units], stddev=stddev, dtype=tf.float32))\n",
" self.bias = tf.Variable(tf.zeros([self._units], dtype=tf.float32))\n",
" self._built = True"
"cell_type": "markdown",
"metadata": {
"id": "wfKpg3adUCy9"
"source": [
"Next, create a `Model` object that applies two non-linear `Dense` transforms,\n",
"followed by a linear transform."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "NdrdxKB7SenC"
"outputs": [],
"source": [
"class Model(tf.Module):\n",
" \"\"\"A three layer neural network.\"\"\"\n",
" def __init__(self):\n",
" self.layer1 = Dense(128)\n",
" self.layer2 = Dense(32)\n",
" self.layer3 = Dense(NUM_CLASSES, use_relu=False)\n",
" def __call__(self, inputs):\n",
" x = self.layer1(inputs)\n",
" x = self.layer2(x)\n",
" return self.layer3(x)\n",
" @property\n",
" def params(self):\n",
" return self.layer1.params + self.layer2.params + self.layer3.params"
"cell_type": "markdown",
"metadata": {
"id": "Hoxh5Z7E_9Pv"
"source": [
"## Training and evaluation\n",
"Checkout the following methods for performing training and evaluation."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "hOxqjE7rZPdr"
"outputs": [],
"source": [
"def forward(model, inputs, labels):\n",
" \"\"\"Computes prediction and loss.\"\"\"\n",
" logits = model(inputs)\n",
" # TensorFlow's loss function has numerically stable implementation of forward\n",
" # pass and gradients. So we prefer that here.\n",
" loss = tf.nn.softmax_cross_entropy_with_logits(labels, logits)\n",
" mean_loss = tnp.mean(loss)\n",
" return logits, mean_loss\n",
"def compute_gradients(model, inputs, labels):\n",
" \"\"\"Computes gradients of loss based on `labels` and prediction on `inputs`.\"\"\"\n",
" with tf.GradientTape() as tape:\n",
" _, loss = forward(model, inputs, labels)\n",
" gradients = tape.gradient(loss, model.params)\n",
" return gradients\n",
"def compute_sgd_updates(gradients, learning_rate):\n",
" \"\"\"Computes parameter updates based on SGD update rule.\"\"\"\n",
" return [-learning_rate * grad for grad in gradients]\n",
"def apply_updates(model, updates):\n",
" \"\"\"Applies `update` to `model.params`.\"\"\"\n",
" for param, update in zip(model.params, updates):\n",
" param.assign_add(update)\n",
"def evaluate(model, images, labels):\n",
" \"\"\"Evaluates accuracy for `model`'s predictions.\"\"\"\n",
" prediction = model(images)\n",
" predicted_class = tnp.argmax(prediction, axis=-1)\n",
" actual_class = tnp.argmax(labels, axis=-1)\n",
" return float(tnp.mean(predicted_class == actual_class))"
"cell_type": "markdown",
"metadata": {
"id": "8t70b5d6XCs7"
"source": [
"### Single GPU training"
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "HrhS_M6kALeP"
"outputs": [],
"source": [
"NUM_EPOCHS = 10\n",
"def train_step(model, input, labels, learning_rate):\n",
" gradients = compute_gradients(model, input, labels)\n",
" updates = compute_sgd_updates(gradients, learning_rate)\n",
" apply_updates(model, updates)\n",
"# Creates and build a model.\n",
"model = Model()\n",
"accuracies = []\n",
"for _ in range(NUM_EPOCHS):\n",
" for inputs, labels in train_dataset:\n",
" train_step(model, inputs, labels, learning_rate=0.1)\n",
" accuracies.append(evaluate(model, x_test, y_test))\n",
"def plot_accuracies(accuracies):\n",
" plt.plot(accuracies)\n",
" plt.xlabel(\"epoch\")\n",
" plt.ylabel(\"accuracy\")\n",
" plt.title(\"Eval accuracy vs epoch\")\n",
"cell_type": "markdown",
"metadata": {
"id": "Dw7RwQmKcYK9"
"source": [
"#### Saving the models to disk"
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "rmk2xLQLcXkl"
"outputs": [],
"source": [
"# A temporary directory to save our models into.\n",
"dir = tempfile.TemporaryDirectory()\n",
"# We take our model, and create a wrapper for it.\n",
"class SaveableModel(Model):\n",
" @tf.function\n",
" def __call__(self, inputs):\n",
" return super().__call__(inputs)\n",
"saveable_model = SaveableModel()\n",
"# This saves a concrete function that we care about.\n",
"outputs = saveable_model(x_test)\n",
"# This saves the model to disk.\n",
"loaded = tf.saved_model.load(\n",
"outputs_loaded = loaded(x_test)\n",
"# Ensure that the loaded model preserves the weights\n",
"# of the saved model.\n",
"assert tnp.allclose(outputs, outputs_loaded)"
"cell_type": "markdown",
"metadata": {
"id": "ak_hCOkGXXfl"
"source": [
"### Multi GPU runs\n",
"Next, run mirrored training on multiple GPUs. Note that the GPUs used here are virtual and map to the same physical GPU.\n",
"First, define a few utilities to run replicated computation and reductions."
"cell_type": "markdown",
"metadata": {
"id": "ujbeT5p6Xm7k"
"source": [
"#### Distribution primitives\n",
"Checkout primitives below for function replication and distributed reduction."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "MZ6hivj-ZIRo"
"outputs": [],
"source": [
"import threading\n",
"import queue\n",
"# Note that this code currently relies on dispatching operations from python\n",
"# threads.\n",
"class ReplicatedFunction(object):\n",
" \"\"\"Creates a callable that will run `fn` on each device in `devices`.\"\"\"\n",
" def __init__(self, fn, devices, **kw_args):\n",
" self._shutdown = False\n",
" def _replica_fn(device, input_queue, output_queue):\n",
" while not self._shutdown:\n",
" inputs = input_queue.get()\n",
" with tf.device(device):\n",
" output_queue.put(fn(*inputs, **kw_args))\n",
" self.threads = []\n",
" self.input_queues = [queue.Queue() for _ in devices]\n",
" self.output_queues = [queue.Queue() for _ in devices]\n",
" for i, device in enumerate(devices):\n",
" thread = threading.Thread(\n",
" target=_replica_fn,\n",
" args=(device, self.input_queues[i], self.output_queues[i]))\n",
" thread.start()\n",
" self.threads.append(thread)\n",
" def __call__(self, *inputs):\n",
" all_inputs = zip(*inputs)\n",
" for input_queue, replica_input, in zip(self.input_queues, all_inputs):\n",
" input_queue.put(replica_input)\n",
" return [q.get() for q in self.output_queues]\n",
" def __del__(self):\n",
" self._shutdown = True\n",
" for t in self.threads:\n",
" t.join(3)\n",
" self.threads = None\n",
"def collective_mean(inputs, num_devices):\n",
" \"\"\"Performs collective mean reduction on inputs.\"\"\"\n",
" outputs = []\n",
" for instance_key, inp in enumerate(inputs):\n",
" outputs.append(tnp.asarray(\n",
" tf.raw_ops.CollectiveReduce(\n",
" input=inp, group_size=num_devices, group_key=0,\n",
" instance_key=instance_key, merge_op='Add', final_op='Div',\n",
" subdiv_offsets=[])))\n",
" return outputs"
"cell_type": "markdown",
"metadata": {
"id": "1ZiN1rpJYHLu"
"source": [
"#### Distributed training "
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "A6ZHYmLapunm"
"outputs": [],
"source": [
"# This is similar to `train_step` except for an extra collective reduction of\n",
"# gradients\n",
"def replica_step(model, inputs, labels,\n",
" learning_rate=None, num_devices=None):\n",
" gradients = compute_gradients(model, inputs, labels)\n",
" # Note that each replica performs a reduction to compute mean of gradients.\n",
" reduced_gradients = collective_mean(gradients, num_devices)\n",
" updates = compute_sgd_updates(reduced_gradients, learning_rate)\n",
" apply_updates(model, updates)\n",
"models = [Model() for _ in devices]\n",
"# The code below builds all the model objects and copies model parameters from\n",
"# the first model to all the replicas.\n",
"def init_model(model):\n",
" model(tnp.zeros((1, INPUT_SIZE), dtype=tnp.float32))\n",
" if model != models[0]:\n",
" # Copy the first models weights into the other models.\n",
" for p1, p2 in zip(model.params, models[0].params):\n",
" p1.assign(p2)\n",
"with tf.device(devices[0]):\n",
" init_model(models[0])\n",
"# Replicate and run the parameter initialization.\n",
"ReplicatedFunction(init_model, devices[1:])(models[1:])\n",
"# Replicate the training step\n",
"replicated_step = ReplicatedFunction(\n",
" replica_step, devices, learning_rate=0.1, num_devices=len(devices))\n",
"accuracies = []\n",
"print(\"Running distributed training on devices: %s\" % devices)\n",
"for _ in range(NUM_EPOCHS):\n",
" for inputs, labels in train_dataset:\n",
" replicated_step(models,\n",
" tnp.split(inputs, len(devices)),\n",
" tnp.split(labels, len(devices)))\n",
" accuracies.append(evaluate(models[0], x_test, y_test))\n",
"metadata": {
"accelerator": "GPU",
"colab": {
"collapsed_sections": [
"name": "TensorFlow Numpy: Distributed Image Classification",
"private_outputs": true,
"provenance": [],
"toc_visible": true
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
"nbformat": 4,
"nbformat_minor": 0