Merge "Add mixed-build args (kernel build info) in acloud create."
diff --git a/create/create_args.py b/create/create_args.py
index 1fad077..a2b91e4 100644
--- a/create/create_args.py
+++ b/create/create_args.py
@@ -162,6 +162,20 @@
         help="'cuttlefish only' System image build target, specify if different "
         "from --build_target",
         required=False)
+    parser.add_argument(
+        "--multi-stage-launch",
+        dest="multi_stage_launch",
+        action='store_true',
+        required=False,
+        default=None,
+        help="Enable the multi-stage cuttlefish launch.")
+    parser.add_argument(
+        "--no-multi-stage-launch",
+        dest="multi_stage_launch",
+        action='store_false',
+        required=False,
+        default=None,
+        help="Disable the multi-stage cuttlefish launch.")
 
     # TODO(b/118439885): Old arg formats to support transition, delete when
     # transistion is done.
diff --git a/internal/lib/cvd_compute_client_multi_stage.py b/internal/lib/cvd_compute_client_multi_stage.py
new file mode 100644
index 0000000..ffc97c1
--- /dev/null
+++ b/internal/lib/cvd_compute_client_multi_stage.py
@@ -0,0 +1,426 @@
+# Copyright 2019 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""A client that manages Cuttlefish Virtual Device on compute engine.
+
+** CvdComputeClient **
+
+CvdComputeClient derives from AndroidComputeClient. It manges a google
+compute engine project that is setup for running Cuttlefish Virtual Devices.
+It knows how to create a host instance from Cuttlefish Stable Host Image, fetch
+Android build, and start Android within the host instance.
+
+** Class hierarchy **
+
+  base_cloud_client.BaseCloudApiClient
+                ^
+                |
+       gcompute_client.ComputeClient
+                ^
+                |
+       android_compute_client.AndroidComputeClient
+                ^
+                |
+       cvd_compute_client_multi_stage.CvdComputeClient
+
+"""
+
+import getpass
+import logging
+import os
+import stat
+import subprocess
+import tempfile
+import threading
+
+from distutils.spawn import find_executable
+from acloud import errors
+from acloud.internal import constants
+from acloud.internal.lib import android_build_client
+from acloud.internal.lib import android_compute_client
+from acloud.internal.lib import gcompute_client
+from acloud.internal.lib import utils
+
+logger = logging.getLogger(__name__)
+
+_DEFAULT_BRANCH = "aosp-master"
+_GCE_USER = "vsoc-01"
+_FETCHER_BUILD_TARGET = "aosp_cf_x86_phone-userdebug"
+_FETCHER_NAME = "fetch_cvd"
+_SSH_BIN = "ssh"
+_SSH_CMD = (" -i %(rsa_key_file)s "
+            "-q -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no "
+            "-l %(login_user)s %(ip_addr)s ")
+_SSH_CMD_MAX_RETRY = 4
+_SSH_CMD_RETRY_SLEEP = 3
+
+
+def _ProcessBuild(build_id=None, branch=None, build_target=None):
+    """Create a Cuttlefish fetch_cvd build string.
+
+    Args:
+        build_id: A specific build number to load from. Takes precedence over `branch`.
+        branch: A manifest-branch at which to get the latest build.
+        build_target: A particular device to load at the desired build.
+
+    Returns:
+        A string, used in the fetch_cvd cmd or None if all args are None.
+    """
+    if not build_target:
+        return build_id or branch
+    elif build_target and not branch:
+        branch = _DEFAULT_BRANCH
+    return (build_id or branch) + "/" + build_target
+
+
+def _SshLogOutput(cmd, timeout=None):
+    """Runs a single SSH command while logging its output and processes its return code.
+
+    Output is streamed to the log at the debug level for more interactive debugging.
+    SSH returns error code 255 for "failed to connect", so this is interpreted as a failure in
+    SSH rather than a failure on the target device and this is converted to a different exception
+    type.
+
+    Args:
+        cmd: String the full SSH command to run, including the SSH binary and its arguments.
+        timeout: Optional integer, number of seconds to give
+
+    Raises:
+        errors.DeviceConnectionError: Failed to connect to the GCE instance.
+        subprocess.CalledProc: The process exited with an error on the instance.
+    """
+    logger.info("Running command \"%s\"", cmd)
+    # This code could use check_output instead, but this construction supports
+    # streaming the logs as they are received.
+    process = subprocess.Popen(cmd, shell=True, stdin=None,
+                               stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+    if timeout:
+        timer = threading.Timer(timeout, process.kill)
+        timer.start()
+    while True:
+        output = process.stdout.readline()
+        # poll() can return "0" for success, None means it is still running.
+        if output == '' and process.poll() is not None:
+            break
+        if output:
+            # fetch_cvd and launch_cvd can be noisy, so left at debug
+            logger.debug(output.strip())
+    if timeout:
+        timer.cancel()
+    process.stdout.close()
+    if process.returncode == 255:
+        raise errors.DeviceConnectionError(
+            "Failed to send command to instance (%s)" % cmd)
+    elif process.returncode != 0:
+        raise subprocess.CalledProcessError(process.returncode, cmd)
+
+
+class CvdComputeClient(android_compute_client.AndroidComputeClient):
+    """Client that manages Android Virtual Device."""
+
+    DATA_POLICY_CREATE_IF_MISSING = "create_if_missing"
+
+    def __init__(self, acloud_config, oauth2_credentials):
+        """Initialize.
+
+        Args:
+            acloud_config: An AcloudConfig object.
+            oauth2_credentials: An oauth2client.OAuth2Credentials instance.
+        """
+        super(CvdComputeClient, self).__init__(acloud_config, oauth2_credentials)
+
+        self._fetch_cvd_version = acloud_config.fetch_cvd_version
+        self._build_api = (
+            android_build_client.AndroidBuildClient(oauth2_credentials))
+        self._ssh_private_key_path = acloud_config.ssh_private_key_path
+        self._instance_to_args = dict()
+        self._instance_to_ip = dict()
+
+    def WaitForBoot(self, instance, boot_timeout_secs=None):
+        """Optionally initiates the boot, then waits for the boot to complete.
+
+        For the local-image use case, the local image wrapper code will already launch the device
+        so for parity this will not attempt to launch it in the local-image case.
+
+        For the remote-image case, because this knows the files are present and unlaunched it will
+        run "launch_cvd -daemon" which exits when the device has successfully booted.
+
+        Args:
+            instance: String, name of instance.
+            boot_timeout_secs: Integer, the maximum time in seconds used to
+                               wait for the AVD to boot.
+        Returns:
+            True if devcie bootup successful.
+        """
+        if instance in self._instance_to_args:
+            ssh_command = "./bin/launch_cvd -daemon " + " ".join(self._instance_to_args[instance])
+            self._SshCommand(self._instance_to_ip[instance], ssh_command, boot_timeout_secs)
+            return True
+        return super(CvdComputeClient, self).WaitForBoot(instance, boot_timeout_secs)
+
+    # pylint: disable=arguments-differ,too-many-locals
+    def CreateInstance(self, instance, image_name, image_project,
+                       build_target=None, branch=None, build_id=None,
+                       kernel_branch=None, kernel_build_id=None,
+                       kernel_build_target=None, blank_data_disk_size_gb=None,
+                       avd_spec=None, extra_scopes=None,
+                       system_build_target=None, system_branch=None,
+                       system_build_id=None):
+
+        """Create a single configured cuttlefish device.
+        1. Create gcp instance.
+        2. Put fetch_cvd on the instance.
+        3. Invoke fetch_cvd to fetch and run the instance.
+
+        Args:
+            instance: instance name.
+            image_name: A string, the name of the GCE image.
+            image_project: A string, name of the project where the image lives.
+                           Assume the default project if None.
+            build_target: Target name, e.g. "aosp_cf_x86_phone-userdebug"
+            branch: Branch name, e.g. "aosp-master"
+            build_id: Build id, a string, e.g. "2263051", "P2804227"
+            kernel_branch: Kernel branch name, e.g. "kernel-common-android-4.14"
+            kernel_build_id: Kernel build id, a string, e.g. "223051", "P280427"
+            kernel_build_target: String, Kernel build target name.
+            blank_data_disk_size_gb: Size of the blank data disk in GB.
+            avd_spec: An AVDSpec instance.
+            extra_scopes: A list of extra scopes to be passed to the instance.
+            system_build_target: Target name for the system image,
+                                e.g. "cf_x86_phone-userdebug"
+            system_branch: A String, branch name for the system image.
+            system_build_id: A string, build id for the system image.
+
+        Returns:
+            A string, representing instance name.
+        """
+
+        # A blank data disk would be created on the host. Make sure the size of
+        # the boot disk is large enough to hold it.
+        boot_disk_size_gb = (
+            int(self.GetImage(image_name, image_project)["diskSizeGb"]) +
+            blank_data_disk_size_gb)
+
+        ip = self._CreateGceInstance(instance, image_name, image_project,
+                                     extra_scopes, boot_disk_size_gb, avd_spec)
+
+        self._WaitForSsh(ip)
+
+        if avd_spec and avd_spec.image_source == constants.IMAGE_SRC_LOCAL:
+            return instance
+
+        self._UpdateFetchCvd(ip)
+
+        fetch_cvd_args = ["-credential_source=gce"]
+
+        default_build = _ProcessBuild(build_id, branch, build_target)
+        if default_build:
+            fetch_cvd_args.append("-default_build=" + default_build)
+        system_build = _ProcessBuild(system_build_id, system_branch, system_build_target)
+        if system_build:
+            fetch_cvd_args.append("-system_build=" + system_build)
+        kernel_build = _ProcessBuild(kernel_build_id, kernel_branch, kernel_build_target)
+        if kernel_build:
+            fetch_cvd_args.append("-kernel_build=" + kernel_build)
+
+        self._FetchBuild(ip, fetch_cvd_args)
+
+        launch_cvd_args = []
+
+        if blank_data_disk_size_gb > 0:
+            # Policy 'create_if_missing' would create a blank userdata disk if
+            # missing. If already exist, reuse the disk.
+            launch_cvd_args.append(
+                "-data_policy=" + self.DATA_POLICY_CREATE_IF_MISSING)
+            launch_cvd_args.append(
+                "-blank_data_image_mb=%d" % (blank_data_disk_size_gb * 1024))
+        if avd_spec:
+            launch_cvd_args.append(
+                "-x_res=" + avd_spec.hw_property[constants.HW_X_RES])
+            launch_cvd_args.append(
+                "-y_res=" + avd_spec.hw_property[constants.HW_Y_RES])
+            launch_cvd_args.append(
+                "-dpi=" + avd_spec.hw_property[constants.HW_ALIAS_DPI])
+            if constants.HW_ALIAS_DISK in avd_spec.hw_property:
+                launch_cvd_args.append(
+                    "-data_policy=" + self.DATA_POLICY_CREATE_IF_MISSING)
+                launch_cvd_args.append(
+                    "-blank_data_image_mb="
+                    + avd_spec.hw_property[constants.HW_ALIAS_DISK])
+            if constants.HW_ALIAS_CPUS in avd_spec.hw_property:
+                launch_cvd_args.append("-cpus=%s" % avd_spec.hw_property[constants.HW_ALIAS_CPUS])
+            if constants.HW_ALIAS_MEMORY in avd_spec.hw_property:
+                launch_cvd_args.append(
+                    "-memory_mb=%s" % avd_spec.hw_property[constants.HW_ALIAS_MEMORY])
+        else:
+            resolution = self._resolution.split("x")
+            launch_cvd_args.append("-x_res=" + resolution[0])
+            launch_cvd_args.append("-y_res=" + resolution[1])
+            launch_cvd_args.append("-dpi=" + resolution[3])
+
+        if self._launch_args:
+            launch_cvd_args.append(self._launch_args)
+
+        self._instance_to_args[instance] = launch_cvd_args
+        self._instance_to_ip[instance] = ip
+
+        return instance
+
+    # TODO(b/117625814): Fix this for cloutop
+    def _SshCommand(self, ip, target_command, timeout=None):
+        """Run a shell command over SSH on a remote instance.
+
+        This will retry the command if it fails from SSH connection errors.
+
+        Args:
+            ip: Namedtuple of (internal, external) IP of the instance.
+            target_command: String, text of command to run on the remote instance.
+            timeout: Integer, the maximum time to wait for the command to respond.
+        """
+        ssh_base_cmd = find_executable(_SSH_BIN) + _SSH_CMD % {
+            "login_user": _GCE_USER,
+            "rsa_key_file": self._ssh_private_key_path,
+            "ip_addr": ip.external}
+
+        utils.RetryExceptionType(
+            exception_types=errors.DeviceConnectionError,
+            max_retries=_SSH_CMD_MAX_RETRY,
+            functor=_SshLogOutput,
+            sleep_multiplier=_SSH_CMD_RETRY_SLEEP,
+            retry_backoff_factor=utils.DEFAULT_RETRY_BACKOFF_FACTOR,
+            cmd=ssh_base_cmd + target_command,
+            timeout=timeout)
+
+    @utils.TimeExecute(function_description="Creating GCE instance")
+    def _CreateGceInstance(self, instance, image_name, image_project,
+                           extra_scopes, boot_disk_size_gb, avd_spec):
+        """Create a single configured cuttlefish device.
+
+        Override method from parent class.
+        Args:
+            instance: String, instance name.
+            image_name: String, the name of the GCE image.
+            image_project: String, the name of the project where the image.
+            extra_scopes: A list of extra scopes to be passed to the instance.
+            boot_disk_size_gb: Integer, size of the boot disk in GB.
+            avd_spec: An AVDSpec instance.
+
+        Returns:
+            Namedtuple of (internal, external) IP of the instance.
+        """
+        disk_args = self._GetDiskArgs(
+            instance, image_name, image_project, boot_disk_size_gb)
+
+        metadata = self._metadata.copy()
+
+        if avd_spec:
+            metadata[constants.INS_KEY_AVD_TYPE] = avd_spec.avd_type
+            metadata[constants.INS_KEY_AVD_FLAVOR] = avd_spec.flavor
+            metadata[constants.INS_KEY_DISPLAY] = ("%sx%s (%s)" % (
+                avd_spec.hw_property[constants.HW_X_RES],
+                avd_spec.hw_property[constants.HW_Y_RES],
+                avd_spec.hw_property[constants.HW_ALIAS_DPI]))
+
+        # Add per-instance ssh key
+        if self._ssh_public_key_path:
+            rsa = self._LoadSshPublicKey(self._ssh_public_key_path)
+            logger.info("ssh_public_key_path is specified in config: %s, "
+                        "will add the key to the instance.",
+                        self._ssh_public_key_path)
+            metadata["sshKeys"] = "{0}:{2}\n{1}:{2}".format(getpass.getuser(), _GCE_USER, rsa)
+        else:
+            logger.warning(
+                "ssh_public_key_path is not specified in config, "
+                "only project-wide key will be effective.")
+
+        labels = {constants.LABEL_CREATE_BY: getpass.getuser()}
+        disk_args = self._GetDiskArgs(
+            instance, image_name, image_project, boot_disk_size_gb)
+        gcompute_client.ComputeClient.CreateInstance(
+            self,
+            instance=instance,
+            image_name=image_name,
+            image_project=image_project,
+            disk_args=disk_args,
+            metadata=metadata,
+            machine_type=self._machine_type,
+            network=self._network,
+            zone=self._zone,
+            labels=labels,
+            extra_scopes=extra_scopes)
+        ip = gcompute_client.ComputeClient.GetInstanceIP(
+            self, instance=instance, zone=self._zone)
+
+        return ip
+
+    @utils.TimeExecute(function_description="Waiting for SSH server")
+    def _WaitForSsh(self, ip):
+        """Wait until the remote instance is ready to accept commands over SSH.
+
+        Args:
+            ip: Namedtuple of (internal, external) IP of the instance.
+        """
+        self._SshCommand(ip, "uptime")
+
+    @utils.TimeExecute(function_description="Uploading build fetcher to instance")
+    def _UpdateFetchCvd(self, ip):
+        """Download fetch_cvd from the Build API, and upload it to a remote instance.
+
+        The version of fetch_cvd to use is retrieved from the configuration file. Once fetch_cvd
+        is on the instance, future commands can use it to download relevant Cuttlefish files from
+        the Build API on the instance itself.
+
+        Args:
+            ip: Namedtuple of (internal, external) IP of the instance.
+        """
+        # TODO(schuffelen): Support fetch_cvd_version="latest" when there is
+        # stronger automated testing on it.
+        download_dir = tempfile.mkdtemp()
+        download_target = os.path.join(download_dir, _FETCHER_NAME)
+        self._build_api.DownloadArtifact(
+            build_target=_FETCHER_BUILD_TARGET,
+            build_id=self._fetch_cvd_version,
+            resource_id=_FETCHER_NAME,
+            local_dest=download_target,
+            attempt_id="latest")
+        fetch_cvd_stat = os.stat(download_target)
+        os.chmod(download_target, fetch_cvd_stat.st_mode | stat.S_IEXEC)
+
+        func = lambda rsa, ip: utils.ScpPushFile(
+            src_file=download_target,
+            dst_file=_FETCHER_NAME,
+            host_name=ip.external,
+            user_name=_GCE_USER,
+            rsa_key_file=rsa)
+
+        utils.RetryExceptionType(
+            exception_types=errors.DeviceConnectionError,
+            max_retries=_SSH_CMD_MAX_RETRY,
+            functor=func,
+            sleep_multiplier=_SSH_CMD_RETRY_SLEEP,
+            retry_backoff_factor=utils.DEFAULT_RETRY_BACKOFF_FACTOR,
+            rsa=self._ssh_private_key_path,
+            ip=ip)
+
+        os.remove(download_target)
+        os.rmdir(download_dir)
+
+    @utils.TimeExecute(function_description="Downloading build on instance")
+    def _FetchBuild(self, ip, fetch_args):
+        """Execute fetch_cvd on the remote instance to get Cuttlefish runtime files.
+
+        Args:
+            ip: Namedtuple of (internal, external) IP of the instance.
+            fetch_args: String of arguments to pass to fetch_cvd.
+        """
+        self._SshCommand(ip, "./fetch_cvd " + " ".join(fetch_args))
diff --git a/internal/lib/cvd_compute_client_multi_stage_test.py b/internal/lib/cvd_compute_client_multi_stage_test.py
new file mode 100644
index 0000000..43545ad
--- /dev/null
+++ b/internal/lib/cvd_compute_client_multi_stage_test.py
@@ -0,0 +1,194 @@
+#!/usr/bin/env python
+#
+# Copyright 2019 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for acloud.internal.lib.cvd_compute_client_multi_stage."""
+
+import glob
+import os
+import subprocess
+import unittest
+import mock
+
+from acloud.create import avd_spec
+from acloud.internal import constants
+from acloud.internal.lib import android_build_client
+from acloud.internal.lib import cvd_compute_client_multi_stage
+from acloud.internal.lib import driver_test_lib
+from acloud.internal.lib import gcompute_client
+from acloud.internal.lib import utils
+
+from acloud.internal.lib.cvd_compute_client_multi_stage import _ProcessBuild
+
+
+class CvdComputeClientTest(driver_test_lib.BaseDriverTest):
+    """Test CvdComputeClient."""
+
+    SSH_PUBLIC_KEY_PATH = ""
+    INSTANCE = "fake-instance"
+    IMAGE = "fake-image"
+    IMAGE_PROJECT = "fake-iamge-project"
+    MACHINE_TYPE = "fake-machine-type"
+    NETWORK = "fake-network"
+    ZONE = "fake-zone"
+    BRANCH = "fake-branch"
+    TARGET = "aosp_cf_x86_phone-userdebug"
+    BUILD_ID = "2263051"
+    KERNEL_BRANCH = "fake-kernel-branch"
+    KERNEL_BUILD_ID = "1234567"
+    KERNEL_BUILD_TARGET = "kernel"
+    DPI = 160
+    X_RES = 720
+    Y_RES = 1280
+    METADATA = {"metadata_key": "metadata_value"}
+    EXTRA_DATA_DISK_SIZE_GB = 4
+    BOOT_DISK_SIZE_GB = 10
+    LAUNCH_ARGS = "--setupwizard_mode=REQUIRED"
+    EXTRA_SCOPES = ["scope1"]
+
+    def _GetFakeConfig(self):
+        """Create a fake configuration object.
+
+        Returns:
+            A fake configuration mock object.
+        """
+        fake_cfg = mock.MagicMock()
+        fake_cfg.ssh_public_key_path = self.SSH_PUBLIC_KEY_PATH
+        fake_cfg.machine_type = self.MACHINE_TYPE
+        fake_cfg.network = self.NETWORK
+        fake_cfg.zone = self.ZONE
+        fake_cfg.resolution = "{x}x{y}x32x{dpi}".format(
+            x=self.X_RES, y=self.Y_RES, dpi=self.DPI)
+        fake_cfg.metadata_variable = self.METADATA
+        fake_cfg.extra_data_disk_size_gb = self.EXTRA_DATA_DISK_SIZE_GB
+        fake_cfg.launch_args = self.LAUNCH_ARGS
+        fake_cfg.extra_scopes = self.EXTRA_SCOPES
+        return fake_cfg
+
+    def setUp(self):
+        """Set up the test."""
+        super(CvdComputeClientTest, self).setUp()
+        self.Patch(cvd_compute_client_multi_stage.CvdComputeClient, "InitResourceHandle")
+        self.Patch(android_build_client.AndroidBuildClient, "InitResourceHandle")
+        self.Patch(android_build_client.AndroidBuildClient, "DownloadArtifact")
+        self.Patch(utils, "ScpPushFile")
+        self.cvd_compute_client_multi_stage = cvd_compute_client_multi_stage.CvdComputeClient(
+            self._GetFakeConfig(), mock.MagicMock())
+
+    def testProcessBuild(self):
+        """Test creating "cuttlefish build" strings."""
+        self.assertEqual(_ProcessBuild(build_id="123", branch="abc", build_target="def"), "123/def")
+        self.assertEqual(_ProcessBuild(build_id=None, branch="abc", build_target="def"), "abc/def")
+        self.assertEqual(_ProcessBuild(build_id="123", branch=None, build_target="def"), "123/def")
+        self.assertEqual(_ProcessBuild(build_id="123", branch="abc", build_target=None), "123")
+        self.assertEqual(_ProcessBuild(build_id=None, branch="abc", build_target=None), "abc")
+        self.assertEqual(_ProcessBuild(build_id="123", branch=None, build_target=None), "123")
+        self.assertEqual(_ProcessBuild(build_id=None, branch=None, build_target=None), None)
+
+    @mock.patch.object(utils, "GetBuildEnvironmentVariable", return_value="fake_env")
+    @mock.patch.object(glob, "glob", return_value=["fake.img"])
+    @mock.patch.object(gcompute_client.ComputeClient, "CompareMachineSize",
+                       return_value=1)
+    @mock.patch.object(gcompute_client.ComputeClient, "GetImage",
+                       return_value={"diskSizeGb": 10})
+    @mock.patch.object(gcompute_client.ComputeClient, "CreateInstance")
+    @mock.patch.object(cvd_compute_client_multi_stage.CvdComputeClient, "_GetDiskArgs",
+                       return_value=[{"fake_arg": "fake_value"}])
+    @mock.patch("getpass.getuser", return_value="fake_user")
+    @mock.patch.object(cvd_compute_client_multi_stage.CvdComputeClient, "_SshCommand")
+    def testCreateInstance(self, _mock_ssh, _get_user, _get_disk_args, mock_create,
+                           _get_image, _compare_machine_size, mock_check_img,
+                           _mock_env):
+        """Test CreateInstance."""
+        expected_metadata = dict()
+        expected_metadata_local_image = dict()
+        expected_metadata.update(self.METADATA)
+        expected_metadata_local_image.update(self.METADATA)
+        remote_image_metadata = dict(expected_metadata)
+        expected_disk_args = [{"fake_arg": "fake_value"}]
+        args = mock.MagicMock()
+        args.local_image = None
+        args.config_file = ""
+        args.avd_type = constants.TYPE_CF
+        args.flavor = "phone"
+        args.adb_port = None
+        fake_avd_spec = avd_spec.AVDSpec(args)
+
+        created_subprocess = mock.MagicMock()
+        created_subprocess.stdout = mock.MagicMock()
+        created_subprocess.stdout.readline = mock.MagicMock(return_value='')
+        created_subprocess.poll = mock.MagicMock(return_value=0)
+        created_subprocess.returncode = 0
+        self.Patch(subprocess, "Popen", return_value=created_subprocess)
+        self.Patch(subprocess, "check_call")
+        self.Patch(os, "chmod")
+        self.Patch(os, "stat")
+        self.Patch(os, "remove")
+        self.Patch(os, "rmdir")
+        self.cvd_compute_client_multi_stage.CreateInstance(
+            self.INSTANCE, self.IMAGE, self.IMAGE_PROJECT, self.TARGET,
+            self.BRANCH, self.BUILD_ID, self.KERNEL_BRANCH,
+            self.KERNEL_BUILD_ID, self.KERNEL_BUILD_TARGET,
+            self.EXTRA_DATA_DISK_SIZE_GB, extra_scopes=self.EXTRA_SCOPES)
+        mock_create.assert_called_with(
+            self.cvd_compute_client_multi_stage,
+            instance=self.INSTANCE,
+            image_name=self.IMAGE,
+            image_project=self.IMAGE_PROJECT,
+            disk_args=expected_disk_args,
+            metadata=remote_image_metadata,
+            machine_type=self.MACHINE_TYPE,
+            network=self.NETWORK,
+            zone=self.ZONE,
+            labels={constants.LABEL_CREATE_BY: "fake_user"},
+            extra_scopes=self.EXTRA_SCOPES)
+
+        mock_check_img.return_value = True
+        #test use local image in the remote instance.
+        local_image_metadata = dict(expected_metadata_local_image)
+        fake_avd_spec.hw_property[constants.HW_X_RES] = str(self.X_RES)
+        fake_avd_spec.hw_property[constants.HW_Y_RES] = str(self.Y_RES)
+        fake_avd_spec.hw_property[constants.HW_ALIAS_DPI] = str(self.DPI)
+        fake_avd_spec.hw_property[constants.HW_ALIAS_DISK] = str(
+            self.EXTRA_DATA_DISK_SIZE_GB * 1024)
+        local_image_metadata["avd_type"] = constants.TYPE_CF
+        local_image_metadata["flavor"] = "phone"
+        local_image_metadata[constants.INS_KEY_DISPLAY] = ("%sx%s (%s)" % (
+            fake_avd_spec.hw_property[constants.HW_X_RES],
+            fake_avd_spec.hw_property[constants.HW_Y_RES],
+            fake_avd_spec.hw_property[constants.HW_ALIAS_DPI]))
+        self.cvd_compute_client_multi_stage.CreateInstance(
+            self.INSTANCE, self.IMAGE, self.IMAGE_PROJECT, self.TARGET, self.BRANCH,
+            self.BUILD_ID, self.KERNEL_BRANCH, self.KERNEL_BUILD_ID,
+            self.KERNEL_BUILD_TARGET, self.EXTRA_DATA_DISK_SIZE_GB,
+            fake_avd_spec, extra_scopes=self.EXTRA_SCOPES)
+
+        expected_labels = {constants.LABEL_CREATE_BY: "fake_user"}
+        mock_create.assert_called_with(
+            self.cvd_compute_client_multi_stage,
+            instance=self.INSTANCE,
+            image_name=self.IMAGE,
+            image_project=self.IMAGE_PROJECT,
+            disk_args=expected_disk_args,
+            metadata=local_image_metadata,
+            machine_type=self.MACHINE_TYPE,
+            network=self.NETWORK,
+            zone=self.ZONE,
+            labels=expected_labels,
+            extra_scopes=self.EXTRA_SCOPES)
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/internal/lib/utils.py b/internal/lib/utils.py
index ed9d343..345935a 100755
--- a/internal/lib/utils.py
+++ b/internal/lib/utils.py
@@ -340,6 +340,39 @@
                 src_file, host_name, " ".join(scp_cmd_list), e))
 
 
+def ScpPushFile(src_file, dst_file, host_name, user_name=None,
+                rsa_key_file=None):
+    """Scp push file to remote.
+
+    Args:
+        src_file: The source file path to be pulled.
+        dst_file: The destiation file path the file is pulled to.
+        host_name: The device host_name or ip to pull file from.
+        user_name: The user_name for scp session.
+        rsa_key_file: The rsa key file.
+    Raises:
+        errors.DeviceConnectionError if scp failed.
+    """
+    scp_cmd_list = SCP_CMD[:]
+    if rsa_key_file:
+        scp_cmd_list.extend(["-i", rsa_key_file])
+    else:
+        logger.warning(
+            "Rsa key file is not specified. "
+            "Will use default rsa key set in user environment")
+    scp_cmd_list.append(src_file)
+    if user_name:
+        scp_cmd_list.append("%s@%s:%s" % (user_name, host_name, dst_file))
+    else:
+        scp_cmd_list.append("%s:%s" % (host_name, dst_file))
+    try:
+        subprocess.check_output(scp_cmd_list, stderr=subprocess.STDOUT)
+    except subprocess.CalledProcessError as e:
+        raise errors.DeviceConnectionError(
+            "Failed to push file %s to %s with '%s': %s" % (
+                src_file, host_name, " ".join(scp_cmd_list), e))
+
+
 def CreateSshKeyPairIfNotExist(private_key_path, public_key_path):
     """Create the ssh key pair if they don't exist.
 
diff --git a/internal/proto/internal_config.proto b/internal/proto/internal_config.proto
index cbb1db7..0bfd80a 100755
--- a/internal/proto/internal_config.proto
+++ b/internal/proto/internal_config.proto
@@ -42,6 +42,8 @@
   // the parts in {} will be automatically replaced with the actual value if
   // you specify them in the pattern, uuid will be automatically generated.
   optional string instance_name_pattern = 11;
+  // [CVD only] Version of fetch_cvd to use.
+  optional string fetch_cvd_version = 12;
 }
 
 // Internal configuration
diff --git a/internal/proto/user_config.proto b/internal/proto/user_config.proto
index c3d618f..1f7688a 100755
--- a/internal/proto/user_config.proto
+++ b/internal/proto/user_config.proto
@@ -101,4 +101,10 @@
 
   // Provide some additional parameters to build the ssh tunnel.
   optional string extra_args_ssh_tunnel = 27;
+
+  // [CVD only] Version of fetch_cvd to use.
+  optional string fetch_cvd_version = 28;
+
+  // [CVD only] Enable multi stage function.
+  optional bool enable_multi_stage = 29;
 }
diff --git a/public/actions/create_cuttlefish_action.py b/public/actions/create_cuttlefish_action.py
index 512252d..5c217c1 100644
--- a/public/actions/create_cuttlefish_action.py
+++ b/public/actions/create_cuttlefish_action.py
@@ -28,6 +28,7 @@
 from acloud.internal.lib import android_build_client
 from acloud.internal.lib import auth
 from acloud.internal.lib import cvd_compute_client
+from acloud.internal.lib import cvd_compute_client_multi_stage
 
 
 logger = logging.getLogger(__name__)
@@ -56,8 +57,12 @@
 
         self.credentials = auth.CreateCredentials(cfg)
 
-        compute_client = cvd_compute_client.CvdComputeClient(
-            cfg, self.credentials)
+        if cfg.enable_multi_stage:
+            compute_client = cvd_compute_client_multi_stage.CvdComputeClient(
+                cfg, self.credentials)
+        else:
+            compute_client = cvd_compute_client.CvdComputeClient(
+                cfg, self.credentials)
         super(CuttlefishDeviceFactory, self).__init__(compute_client)
 
         # Private creation parameters
diff --git a/public/actions/create_cuttlefish_action_test.py b/public/actions/create_cuttlefish_action_test.py
index c91b24b..454c3b8 100644
--- a/public/actions/create_cuttlefish_action_test.py
+++ b/public/actions/create_cuttlefish_action_test.py
@@ -27,6 +27,7 @@
 from acloud.internal.lib import android_compute_client
 from acloud.internal.lib import auth
 from acloud.internal.lib import cvd_compute_client
+from acloud.internal.lib import cvd_compute_client_multi_stage
 from acloud.internal.lib import driver_test_lib
 from acloud.internal.lib import gcompute_client
 from acloud.public.actions import create_cuttlefish_action
@@ -66,6 +67,10 @@
             "CvdComputeClient",
             return_value=self.compute_client)
         self.Patch(
+            cvd_compute_client_multi_stage,
+            "CvdComputeClient",
+            return_value=self.compute_client)
+        self.Patch(
             android_compute_client,
             "AndroidComputeClient",
             return_value=self.compute_client)
@@ -86,6 +91,7 @@
         cfg.extra_data_disk_size_gb = self.EXTRA_DATA_DISK_GB
         cfg.kernel_build_target = self.KERNEL_BUILD_TARGET
         cfg.extra_scopes = self.EXTRA_SCOPES
+        cfg.enable_multi_stage = False
         return cfg
 
     def testCreateDevices(self):
diff --git a/public/actions/remote_instance_cf_device_factory.py b/public/actions/remote_instance_cf_device_factory.py
index a4ab76f..419e0bb 100644
--- a/public/actions/remote_instance_cf_device_factory.py
+++ b/public/actions/remote_instance_cf_device_factory.py
@@ -25,6 +25,7 @@
 from acloud.internal import constants
 from acloud.internal.lib import auth
 from acloud.internal.lib import cvd_compute_client
+from acloud.internal.lib import cvd_compute_client_multi_stage
 from acloud.internal.lib import utils
 from acloud.public.actions import base_device_factory
 
@@ -70,8 +71,13 @@
         self._cvd_host_package_artifact = cvd_host_package_artifact
         self._report_internal_ip = avd_spec.report_internal_ip
         self.credentials = auth.CreateCredentials(avd_spec.cfg)
-        compute_client = cvd_compute_client.CvdComputeClient(
-            avd_spec.cfg, self.credentials)
+        # Control compute_client with enable_multi_stage
+        if self._cfg.enable_multi_stage:
+            compute_client = cvd_compute_client_multi_stage.CvdComputeClient(
+                avd_spec.cfg, self.credentials)
+        else:
+            compute_client = cvd_compute_client.CvdComputeClient(
+                avd_spec.cfg, self.credentials)
         super(RemoteInstanceDeviceFactory, self).__init__(compute_client)
         # Private creation parameters
         self._ssh_cmd = None
diff --git a/public/actions/remote_instance_cf_device_factory_test.py b/public/actions/remote_instance_cf_device_factory_test.py
index 2a68a39..10c3b04 100644
--- a/public/actions/remote_instance_cf_device_factory_test.py
+++ b/public/actions/remote_instance_cf_device_factory_test.py
@@ -24,8 +24,10 @@
 
 from acloud.create import avd_spec
 from acloud.internal import constants
+from acloud.internal.lib import android_build_client
 from acloud.internal.lib import auth
 from acloud.internal.lib import cvd_compute_client
+from acloud.internal.lib import cvd_compute_client_multi_stage
 from acloud.internal.lib import driver_test_lib
 from acloud.internal.lib import utils
 from acloud.public.actions import remote_instance_cf_device_factory
@@ -39,6 +41,7 @@
         super(RemoteInstanceDeviceFactoryTest, self).setUp()
         self.Patch(auth, "CreateCredentials", return_value=mock.MagicMock())
         self.Patch(cvd_compute_client.CvdComputeClient, "InitResourceHandle")
+        self.Patch(cvd_compute_client_multi_stage.CvdComputeClient, "InitResourceHandle")
 
     # pylint: disable=protected-access
     def testSSHExecuteWithRetry(self):
@@ -100,6 +103,52 @@
             fake_host_package_name)
         self.assertEqual(factory._CreateGceInstance(), "ins-1234-userbuild-fake-target")
 
+    # pylint: disable=protected-access
+    @mock.patch.dict(os.environ, {constants.ENV_BUILD_TARGET:'fake-target'})
+    def testCreateGceInstanceNameMultiStage(self):
+        """test create gce instance."""
+        self.Patch(utils, "GetBuildEnvironmentVariable",
+                   return_value="test_environ")
+        self.Patch(glob, "glob", return_vale=["fake.img"])
+        # Mock uuid
+        args = mock.MagicMock()
+        args.config_file = ""
+        args.avd_type = constants.TYPE_CF
+        args.flavor = "phone"
+        args.local_image = None
+        args.adb_port = None
+        fake_avd_spec = avd_spec.AVDSpec(args)
+        fake_avd_spec.cfg.enable_multi_stage = True
+
+        fake_uuid = mock.MagicMock(hex="1234")
+        self.Patch(uuid, "uuid4", return_value=fake_uuid)
+        self.Patch(cvd_compute_client_multi_stage.CvdComputeClient, "CreateInstance")
+        self.Patch(android_build_client.AndroidBuildClient, "InitResourceHandle")
+        fake_host_package_name = "/fake/host_package.tar.gz"
+        fake_image_name = "/fake/aosp_cf_x86_phone-img-eng.username.zip"
+
+        factory = remote_instance_cf_device_factory.RemoteInstanceDeviceFactory(
+            fake_avd_spec,
+            fake_image_name,
+            fake_host_package_name)
+        self.assertEqual(factory._CreateGceInstance(), "ins-1234-userbuild-aosp-cf-x86-phone")
+
+        # Can't get target name from zip file name.
+        fake_image_name = "/fake/aosp_cf_x86_phone.username.zip"
+        factory = remote_instance_cf_device_factory.RemoteInstanceDeviceFactory(
+            fake_avd_spec,
+            fake_image_name,
+            fake_host_package_name)
+        self.assertEqual(factory._CreateGceInstance(), "ins-1234-userbuild-fake-target")
+
+        # No image zip path, it uses local build images.
+        fake_image_name = ""
+        factory = remote_instance_cf_device_factory.RemoteInstanceDeviceFactory(
+            fake_avd_spec,
+            fake_image_name,
+            fake_host_package_name)
+        self.assertEqual(factory._CreateGceInstance(), "ins-1234-userbuild-fake-target")
+
 
 if __name__ == "__main__":
     unittest.main()
diff --git a/public/config.py b/public/config.py
index 5a0566b..b634a55 100755
--- a/public/config.py
+++ b/public/config.py
@@ -209,7 +209,15 @@
         self.instance_name_pattern = (
             usr_cfg.instance_name_pattern or
             internal_cfg.default_usr_cfg.instance_name_pattern)
-
+        self.fetch_cvd_version = (
+            usr_cfg.fetch_cvd_version or
+            internal_cfg.default_usr_cfg.fetch_cvd_version)
+        if usr_cfg.HasField("enable_multi_stage") is not None:
+            self.enable_multi_stage = usr_cfg.enable_multi_stage
+        elif internal_cfg.default_usr_cfg.HasField("enable_multi_stage"):
+            self.enable_multi_stage = internal_cfg.default_usr_cfg.enable_multi_stage
+        else:
+            self.enable_multi_stage = False
 
         # Verify validity of configurations.
         self.Verify()
@@ -240,6 +248,8 @@
         if parsed_args.which in [create_args.CMD_CREATE, "create_cf"]:
             if parsed_args.network:
                 self.network = parsed_args.network
+            if parsed_args.multi_stage_launch is not None:
+                self.enable_multi_stage = parsed_args.multi_stage_launch
 
     def OverrideHwPropertyWithFlavor(self, flavor):
         """Override hw configuration values with flavor name.
diff --git a/public/data/default.config b/public/data/default.config
index a5ce901..6e4915d 100644
--- a/public/data/default.config
+++ b/public/data/default.config
@@ -18,6 +18,7 @@
   network: "default"
   extra_data_disk_size_gb: 0
   instance_name_pattern: "ins-{uuid}-{build_id}-{build_target}"
+  fetch_cvd_version: "P9358116"
 
   metadata_variable {
     key: "camera_front"