blob: 2490209631075502df03c9f81c035b8da05ef2b6 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
Unit tests for the contents of device_utils.py (mostly DeviceUtils).
"""
# pylint: disable=protected-access
# pylint: disable=unused-argument
import json
import logging
import os
import stat
import unittest
from devil import devil_env
from devil.android import device_errors
from devil.android import device_signal
from devil.android import device_utils
from devil.android.sdk import adb_wrapper
from devil.android.sdk import intent
from devil.android.sdk import keyevent
from devil.android.sdk import version_codes
from devil.utils import cmd_helper
from devil.utils import mock_calls
with devil_env.SysPath(devil_env.PYMOCK_PATH):
import mock # pylint: disable=import-error
class AnyStringWith(object):
def __init__(self, value):
self._value = value
def __eq__(self, other):
return self._value in other
def __repr__(self):
return '<AnyStringWith: %s>' % self._value
class _MockApkHelper(object):
def __init__(self, path, package_name, perms=None):
self.path = path
self.package_name = package_name
self.perms = perms
def GetPackageName(self):
return self.package_name
def GetPermissions(self):
return self.perms
class _MockMultipleDevicesError(Exception):
pass
class DeviceUtilsInitTest(unittest.TestCase):
def testInitWithStr(self):
serial_as_str = str('0123456789abcdef')
d = device_utils.DeviceUtils('0123456789abcdef')
self.assertEqual(serial_as_str, d.adb.GetDeviceSerial())
def testInitWithUnicode(self):
serial_as_unicode = unicode('fedcba9876543210')
d = device_utils.DeviceUtils(serial_as_unicode)
self.assertEqual(serial_as_unicode, d.adb.GetDeviceSerial())
def testInitWithAdbWrapper(self):
serial = '123456789abcdef0'
a = adb_wrapper.AdbWrapper(serial)
d = device_utils.DeviceUtils(a)
self.assertEqual(serial, d.adb.GetDeviceSerial())
def testInitWithMissing_fails(self):
with self.assertRaises(ValueError):
device_utils.DeviceUtils(None)
with self.assertRaises(ValueError):
device_utils.DeviceUtils('')
class DeviceUtilsGetAVDsTest(mock_calls.TestCase):
def testGetAVDs(self):
mocked_attrs = {
'android_sdk': '/my/sdk/path'
}
with mock.patch('devil.devil_env._Environment.LocalPath',
mock.Mock(side_effect=lambda a: mocked_attrs[a])):
with self.assertCall(
mock.call.devil.utils.cmd_helper.GetCmdOutput(
[mock.ANY, 'list', 'avd']),
'Available Android Virtual Devices:\n'
' Name: my_android5.0\n'
' Path: /some/path/to/.android/avd/my_android5.0.avd\n'
' Target: Android 5.0 (API level 21)\n'
' Tag/ABI: default/x86\n'
' Skin: WVGA800\n'):
self.assertEquals(['my_android5.0'], device_utils.GetAVDs())
class DeviceUtilsRestartServerTest(mock_calls.TestCase):
@mock.patch('time.sleep', mock.Mock())
def testRestartServer_succeeds(self):
with self.assertCalls(
mock.call.devil.android.sdk.adb_wrapper.AdbWrapper.KillServer(),
(mock.call.devil.utils.cmd_helper.GetCmdStatusAndOutput(
['pgrep', 'adb']),
(1, '')),
mock.call.devil.android.sdk.adb_wrapper.AdbWrapper.StartServer(),
(mock.call.devil.utils.cmd_helper.GetCmdStatusAndOutput(
['pgrep', 'adb']),
(1, '')),
(mock.call.devil.utils.cmd_helper.GetCmdStatusAndOutput(
['pgrep', 'adb']),
(0, '123\n'))):
device_utils.RestartServer()
class MockTempFile(object):
def __init__(self, name='/tmp/some/file'):
self.file = mock.MagicMock(spec=file)
self.file.name = name
self.file.name_quoted = cmd_helper.SingleQuote(name)
def __enter__(self):
return self.file
def __exit__(self, exc_type, exc_val, exc_tb):
pass
@property
def name(self):
return self.file.name
class _PatchedFunction(object):
def __init__(self, patched=None, mocked=None):
self.patched = patched
self.mocked = mocked
def _AdbWrapperMock(test_serial, is_ready=True):
adb = mock.Mock(spec=adb_wrapper.AdbWrapper)
adb.__str__ = mock.Mock(return_value=test_serial)
adb.GetDeviceSerial.return_value = test_serial
adb.is_ready = is_ready
return adb
class DeviceUtilsTest(mock_calls.TestCase):
def setUp(self):
self.adb = _AdbWrapperMock('0123456789abcdef')
self.device = device_utils.DeviceUtils(
self.adb, default_timeout=10, default_retries=0)
self.watchMethodCalls(self.call.adb, ignore=['GetDeviceSerial'])
def AdbCommandError(self, args=None, output=None, status=None, msg=None):
if args is None:
args = ['[unspecified]']
return mock.Mock(side_effect=device_errors.AdbCommandFailedError(
args, output, status, msg, str(self.device)))
def CommandError(self, msg=None):
if msg is None:
msg = 'Command failed'
return mock.Mock(side_effect=device_errors.CommandFailedError(
msg, str(self.device)))
def ShellError(self, output=None, status=1):
def action(cmd, *args, **kwargs):
raise device_errors.AdbShellCommandFailedError(
cmd, output, status, str(self.device))
if output is None:
output = 'Permission denied\n'
return action
def TimeoutError(self, msg=None):
if msg is None:
msg = 'Operation timed out'
return mock.Mock(side_effect=device_errors.CommandTimeoutError(
msg, str(self.device)))
def EnsureCacheInitialized(self, props=None, sdcard='/sdcard'):
props = props or []
ret = [sdcard, 'TOKEN'] + props
return (self.call.device.RunShellCommand(
AnyStringWith('getprop'),
shell=True, check_return=True, large_output=True), ret)
class DeviceUtilsEqTest(DeviceUtilsTest):
def testEq_equal_deviceUtils(self):
other = device_utils.DeviceUtils(_AdbWrapperMock('0123456789abcdef'))
self.assertTrue(self.device == other)
self.assertTrue(other == self.device)
def testEq_equal_adbWrapper(self):
other = adb_wrapper.AdbWrapper('0123456789abcdef')
self.assertTrue(self.device == other)
self.assertTrue(other == self.device)
def testEq_equal_string(self):
other = '0123456789abcdef'
self.assertTrue(self.device == other)
self.assertTrue(other == self.device)
def testEq_devicesNotEqual(self):
other = device_utils.DeviceUtils(_AdbWrapperMock('0123456789abcdee'))
self.assertFalse(self.device == other)
self.assertFalse(other == self.device)
def testEq_identity(self):
self.assertTrue(self.device == self.device)
def testEq_serialInList(self):
devices = [self.device]
self.assertTrue('0123456789abcdef' in devices)
class DeviceUtilsLtTest(DeviceUtilsTest):
def testLt_lessThan(self):
other = device_utils.DeviceUtils(_AdbWrapperMock('ffffffffffffffff'))
self.assertTrue(self.device < other)
self.assertTrue(other > self.device)
def testLt_greaterThan_lhs(self):
other = device_utils.DeviceUtils(_AdbWrapperMock('0000000000000000'))
self.assertFalse(self.device < other)
self.assertFalse(other > self.device)
def testLt_equal(self):
other = device_utils.DeviceUtils(_AdbWrapperMock('0123456789abcdef'))
self.assertFalse(self.device < other)
self.assertFalse(other > self.device)
def testLt_sorted(self):
devices = [
device_utils.DeviceUtils(_AdbWrapperMock('ffffffffffffffff')),
device_utils.DeviceUtils(_AdbWrapperMock('0000000000000000')),
]
sorted_devices = sorted(devices)
self.assertEquals('0000000000000000',
sorted_devices[0].adb.GetDeviceSerial())
self.assertEquals('ffffffffffffffff',
sorted_devices[1].adb.GetDeviceSerial())
class DeviceUtilsStrTest(DeviceUtilsTest):
def testStr_returnsSerial(self):
with self.assertCalls(
(self.call.adb.GetDeviceSerial(), '0123456789abcdef')):
self.assertEqual('0123456789abcdef', str(self.device))
class DeviceUtilsIsOnlineTest(DeviceUtilsTest):
def testIsOnline_true(self):
with self.assertCall(self.call.adb.GetState(), 'device'):
self.assertTrue(self.device.IsOnline())
def testIsOnline_false(self):
with self.assertCall(self.call.adb.GetState(), 'offline'):
self.assertFalse(self.device.IsOnline())
def testIsOnline_error(self):
with self.assertCall(self.call.adb.GetState(), self.CommandError()):
self.assertFalse(self.device.IsOnline())
class DeviceUtilsHasRootTest(DeviceUtilsTest):
def testHasRoot_true(self):
with self.patch_call(self.call.device.product_name,
return_value='notasailfish'), (
self.assertCall(self.call.adb.Shell('ls /root'), 'foo\n')):
self.assertTrue(self.device.HasRoot())
def testhasRootSpecial_true(self):
with self.patch_call(self.call.device.product_name,
return_value='sailfish'), (
self.assertCall(self.call.adb.Shell('getprop service.adb.root'),
'1\n')):
self.assertTrue(self.device.HasRoot())
def testHasRoot_false(self):
with self.patch_call(self.call.device.product_name,
return_value='notasailfish'), (
self.assertCall(self.call.adb.Shell('ls /root'),
self.ShellError())):
self.assertFalse(self.device.HasRoot())
def testHasRootSpecial_false(self):
with self.patch_call(self.call.device.product_name,
return_value='sailfish'), (
self.assertCall(self.call.adb.Shell('getprop service.adb.root'),
'\n')):
self.assertFalse(self.device.HasRoot())
class DeviceUtilsEnableRootTest(DeviceUtilsTest):
def testEnableRoot_succeeds(self):
with self.assertCalls(
(self.call.device.IsUserBuild(), False),
self.call.adb.Root(),
self.call.device.WaitUntilFullyBooted()):
self.device.EnableRoot()
def testEnableRoot_userBuild(self):
with self.assertCalls(
(self.call.device.IsUserBuild(), True)):
with self.assertRaises(device_errors.CommandFailedError):
self.device.EnableRoot()
def testEnableRoot_rootFails(self):
with self.assertCalls(
(self.call.device.IsUserBuild(), False),
(self.call.adb.Root(), self.CommandError())):
with self.assertRaises(device_errors.CommandFailedError):
self.device.EnableRoot()
class DeviceUtilsIsUserBuildTest(DeviceUtilsTest):
def testIsUserBuild_yes(self):
with self.assertCall(
self.call.device.GetProp('ro.build.type', cache=True), 'user'):
self.assertTrue(self.device.IsUserBuild())
def testIsUserBuild_no(self):
with self.assertCall(
self.call.device.GetProp('ro.build.type', cache=True), 'userdebug'):
self.assertFalse(self.device.IsUserBuild())
class DeviceUtilsGetExternalStoragePathTest(DeviceUtilsTest):
def testGetExternalStoragePath_succeeds(self):
with self.assertCalls(
self.EnsureCacheInitialized(sdcard='/fake/storage/path')):
self.assertEquals('/fake/storage/path',
self.device.GetExternalStoragePath())
def testGetExternalStoragePath_fails(self):
with self.assertCalls(
self.EnsureCacheInitialized(sdcard='')):
with self.assertRaises(device_errors.CommandFailedError):
self.device.GetExternalStoragePath()
class DeviceUtilsGetApplicationPathsInternalTest(DeviceUtilsTest):
def testGetApplicationPathsInternal_exists(self):
with self.assertCalls(
(self.call.device.GetProp('ro.build.version.sdk', cache=True), '19'),
(self.call.device.RunShellCommand(
['pm', 'path', 'android'], check_return=True),
['package:/path/to/android.apk'])):
self.assertEquals(['/path/to/android.apk'],
self.device._GetApplicationPathsInternal('android'))
def testGetApplicationPathsInternal_notExists(self):
with self.assertCalls(
(self.call.device.GetProp('ro.build.version.sdk', cache=True), '19'),
(self.call.device.RunShellCommand(
['pm', 'path', 'not.installed.app'], check_return=True),
'')):
self.assertEquals([],
self.device._GetApplicationPathsInternal('not.installed.app'))
def testGetApplicationPathsInternal_garbageFirstLine(self):
with self.assertCalls(
(self.call.device.GetProp('ro.build.version.sdk', cache=True), '19'),
(self.call.device.RunShellCommand(
['pm', 'path', 'android'], check_return=True),
['garbage first line'])):
with self.assertRaises(device_errors.CommandFailedError):
self.device._GetApplicationPathsInternal('android')
def testGetApplicationPathsInternal_fails(self):
with self.assertCalls(
(self.call.device.GetProp('ro.build.version.sdk', cache=True), '19'),
(self.call.device.RunShellCommand(
['pm', 'path', 'android'], check_return=True),
self.CommandError('ERROR. Is package manager running?\n'))):
with self.assertRaises(device_errors.CommandFailedError):
self.device._GetApplicationPathsInternal('android')
class DeviceUtils_GetApplicationVersionTest(DeviceUtilsTest):
def test_GetApplicationVersion_exists(self):
with self.assertCalls(
(self.call.adb.Shell('dumpsys package com.android.chrome'),
'Packages:\n'
' Package [com.android.chrome] (3901ecfb):\n'
' userId=1234 gids=[123, 456, 789]\n'
' pkg=Package{1fecf634 com.android.chrome}\n'
' versionName=45.0.1234.7\n')):
self.assertEquals('45.0.1234.7',
self.device.GetApplicationVersion('com.android.chrome'))
def test_GetApplicationVersion_notExists(self):
with self.assertCalls(
(self.call.adb.Shell('dumpsys package com.android.chrome'), '')):
self.assertEquals(None,
self.device.GetApplicationVersion('com.android.chrome'))
def test_GetApplicationVersion_fails(self):
with self.assertCalls(
(self.call.adb.Shell('dumpsys package com.android.chrome'),
'Packages:\n'
' Package [com.android.chrome] (3901ecfb):\n'
' userId=1234 gids=[123, 456, 789]\n'
' pkg=Package{1fecf634 com.android.chrome}\n')):
with self.assertRaises(device_errors.CommandFailedError):
self.device.GetApplicationVersion('com.android.chrome')
class DeviceUtilsGetApplicationDataDirectoryTest(DeviceUtilsTest):
def testGetApplicationDataDirectory_exists(self):
with self.assertCall(
self.call.device._RunPipedShellCommand(
'pm dump foo.bar.baz | grep dataDir='),
['dataDir=/data/data/foo.bar.baz']):
self.assertEquals(
'/data/data/foo.bar.baz',
self.device.GetApplicationDataDirectory('foo.bar.baz'))
def testGetApplicationDataDirectory_notExists(self):
with self.assertCall(
self.call.device._RunPipedShellCommand(
'pm dump foo.bar.baz | grep dataDir='),
self.ShellError()):
with self.assertRaises(device_errors.CommandFailedError):
self.device.GetApplicationDataDirectory('foo.bar.baz')
@mock.patch('time.sleep', mock.Mock())
class DeviceUtilsWaitUntilFullyBootedTest(DeviceUtilsTest):
def testWaitUntilFullyBooted_succeedsNoWifi(self):
with self.assertCalls(
self.call.adb.WaitForDevice(),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), '/fake/storage/path'),
(self.call.adb.Shell('test -d /fake/storage/path'), ''),
# pm_ready
(self.call.device._GetApplicationPathsInternal('android',
skip_cache=True),
['package:/some/fake/path']),
# boot_completed
(self.call.device.GetProp('sys.boot_completed', cache=False), '1')):
self.device.WaitUntilFullyBooted(wifi=False)
def testWaitUntilFullyBooted_succeedsWithWifi(self):
with self.assertCalls(
self.call.adb.WaitForDevice(),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), '/fake/storage/path'),
(self.call.adb.Shell('test -d /fake/storage/path'), ''),
# pm_ready
(self.call.device._GetApplicationPathsInternal('android',
skip_cache=True),
['package:/some/fake/path']),
# boot_completed
(self.call.device.GetProp('sys.boot_completed', cache=False), '1'),
# wifi_enabled
(self.call.adb.Shell('dumpsys wifi'),
'stuff\nWi-Fi is enabled\nmore stuff\n')):
self.device.WaitUntilFullyBooted(wifi=True)
def testWaitUntilFullyBooted_deviceNotInitiallyAvailable(self):
with self.assertCalls(
self.call.adb.WaitForDevice(),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), self.AdbCommandError()),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), self.AdbCommandError()),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), self.AdbCommandError()),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), self.AdbCommandError()),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), '/fake/storage/path'),
(self.call.adb.Shell('test -d /fake/storage/path'), ''),
# pm_ready
(self.call.device._GetApplicationPathsInternal('android',
skip_cache=True),
['package:/some/fake/path']),
# boot_completed
(self.call.device.GetProp('sys.boot_completed', cache=False), '1')):
self.device.WaitUntilFullyBooted(wifi=False)
def testWaitUntilFullyBooted_deviceBrieflyOffline(self):
with self.assertCalls(
self.call.adb.WaitForDevice(),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), '/fake/storage/path'),
(self.call.adb.Shell('test -d /fake/storage/path'), ''),
# pm_ready
(self.call.device._GetApplicationPathsInternal('android',
skip_cache=True),
['package:/some/fake/path']),
# boot_completed
(self.call.device.GetProp('sys.boot_completed', cache=False),
self.AdbCommandError()),
# boot_completed
(self.call.device.GetProp('sys.boot_completed', cache=False), '1')):
self.device.WaitUntilFullyBooted(wifi=False)
def testWaitUntilFullyBooted_sdCardReadyFails_noPath(self):
with self.assertCalls(
self.call.adb.WaitForDevice(),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), self.CommandError())):
with self.assertRaises(device_errors.CommandFailedError):
self.device.WaitUntilFullyBooted(wifi=False)
def testWaitUntilFullyBooted_sdCardReadyFails_notExists(self):
with self.assertCalls(
self.call.adb.WaitForDevice(),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), '/fake/storage/path'),
(self.call.adb.Shell('test -d /fake/storage/path'), self.ShellError()),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), '/fake/storage/path'),
(self.call.adb.Shell('test -d /fake/storage/path'), self.ShellError()),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), '/fake/storage/path'),
(self.call.adb.Shell('test -d /fake/storage/path'),
self.TimeoutError())):
with self.assertRaises(device_errors.CommandTimeoutError):
self.device.WaitUntilFullyBooted(wifi=False)
def testWaitUntilFullyBooted_devicePmFails(self):
with self.assertCalls(
self.call.adb.WaitForDevice(),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), '/fake/storage/path'),
(self.call.adb.Shell('test -d /fake/storage/path'), ''),
# pm_ready
(self.call.device._GetApplicationPathsInternal('android',
skip_cache=True),
self.CommandError()),
# pm_ready
(self.call.device._GetApplicationPathsInternal('android',
skip_cache=True),
self.CommandError()),
# pm_ready
(self.call.device._GetApplicationPathsInternal('android',
skip_cache=True),
self.TimeoutError())):
with self.assertRaises(device_errors.CommandTimeoutError):
self.device.WaitUntilFullyBooted(wifi=False)
def testWaitUntilFullyBooted_bootFails(self):
with self.assertCalls(
self.call.adb.WaitForDevice(),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), '/fake/storage/path'),
(self.call.adb.Shell('test -d /fake/storage/path'), ''),
# pm_ready
(self.call.device._GetApplicationPathsInternal('android',
skip_cache=True),
['package:/some/fake/path']),
# boot_completed
(self.call.device.GetProp('sys.boot_completed', cache=False), '0'),
# boot_completed
(self.call.device.GetProp('sys.boot_completed', cache=False), '0'),
# boot_completed
(self.call.device.GetProp('sys.boot_completed', cache=False),
self.TimeoutError())):
with self.assertRaises(device_errors.CommandTimeoutError):
self.device.WaitUntilFullyBooted(wifi=False)
def testWaitUntilFullyBooted_wifiFails(self):
with self.assertCalls(
self.call.adb.WaitForDevice(),
# sd_card_ready
(self.call.device.GetExternalStoragePath(), '/fake/storage/path'),
(self.call.adb.Shell('test -d /fake/storage/path'), ''),
# pm_ready
(self.call.device._GetApplicationPathsInternal('android',
skip_cache=True),
['package:/some/fake/path']),
# boot_completed
(self.call.device.GetProp('sys.boot_completed', cache=False), '1'),
# wifi_enabled
(self.call.adb.Shell('dumpsys wifi'), 'stuff\nmore stuff\n'),
# wifi_enabled
(self.call.adb.Shell('dumpsys wifi'), 'stuff\nmore stuff\n'),
# wifi_enabled
(self.call.adb.Shell('dumpsys wifi'), self.TimeoutError())):
with self.assertRaises(device_errors.CommandTimeoutError):
self.device.WaitUntilFullyBooted(wifi=True)
@mock.patch('time.sleep', mock.Mock())
class DeviceUtilsRebootTest(DeviceUtilsTest):
def testReboot_nonBlocking(self):
with self.assertCalls(
self.call.adb.Reboot(),
(self.call.device.IsOnline(), True),
(self.call.device.IsOnline(), False)):
self.device.Reboot(block=False)
def testReboot_blocking(self):
with self.assertCalls(
self.call.adb.Reboot(),
(self.call.device.IsOnline(), True),
(self.call.device.IsOnline(), False),
self.call.device.WaitUntilFullyBooted(wifi=False)):
self.device.Reboot(block=True)
def testReboot_blockUntilWifi(self):
with self.assertCalls(
self.call.adb.Reboot(),
(self.call.device.IsOnline(), True),
(self.call.device.IsOnline(), False),
self.call.device.WaitUntilFullyBooted(wifi=True)):
self.device.Reboot(block=True, wifi=True)
class DeviceUtilsInstallTest(DeviceUtilsTest):
mock_apk = _MockApkHelper('/fake/test/app.apk', 'test.package', ['p1'])
def testInstall_noPriorInstall(self):
with self.patch_call(self.call.device.build_version_sdk, return_value=23):
with self.assertCalls(
(mock.call.os.path.exists('/fake/test/app.apk'), True),
(self.call.device._GetApplicationPathsInternal('test.package'), []),
self.call.adb.Install('/fake/test/app.apk', reinstall=False,
allow_downgrade=False),
(self.call.device.GrantPermissions('test.package', ['p1']), [])):
self.device.Install(DeviceUtilsInstallTest.mock_apk, retries=0)
def testInstall_permissionsPreM(self):
with self.patch_call(self.call.device.build_version_sdk, return_value=20):
with self.assertCalls(
(mock.call.os.path.exists('/fake/test/app.apk'), True),
(self.call.device._GetApplicationPathsInternal('test.package'), []),
(self.call.adb.Install('/fake/test/app.apk', reinstall=False,
allow_downgrade=False))):
self.device.Install(DeviceUtilsInstallTest.mock_apk, retries=0)
def testInstall_findPermissions(self):
with self.patch_call(self.call.device.build_version_sdk, return_value=23):
with self.assertCalls(
(mock.call.os.path.exists('/fake/test/app.apk'), True),
(self.call.device._GetApplicationPathsInternal('test.package'), []),
(self.call.adb.Install('/fake/test/app.apk', reinstall=False,
allow_downgrade=False)),
(self.call.device.GrantPermissions('test.package', ['p1']), [])):
self.device.Install(DeviceUtilsInstallTest.mock_apk, retries=0)
def testInstall_passPermissions(self):
with self.assertCalls(
(mock.call.os.path.exists('/fake/test/app.apk'), True),
(self.call.device._GetApplicationPathsInternal('test.package'), []),
(self.call.adb.Install('/fake/test/app.apk', reinstall=False,
allow_downgrade=False)),
(self.call.device.GrantPermissions('test.package', ['p1', 'p2']), [])):
self.device.Install(DeviceUtilsInstallTest.mock_apk, retries=0,
permissions=['p1', 'p2'])
def testInstall_differentPriorInstall(self):
with self.assertCalls(
(mock.call.os.path.exists('/fake/test/app.apk'), True),
(self.call.device._GetApplicationPathsInternal('test.package'),
['/fake/data/app/test.package.apk']),
(self.call.device._ComputeStaleApks('test.package',
['/fake/test/app.apk']),
(['/fake/test/app.apk'], None)),
self.call.device.Uninstall('test.package'),
self.call.adb.Install('/fake/test/app.apk', reinstall=False,
allow_downgrade=False)):
self.device.Install(DeviceUtilsInstallTest.mock_apk, retries=0,
permissions=[])
def testInstall_differentPriorInstall_reinstall(self):
with self.assertCalls(
(mock.call.os.path.exists('/fake/test/app.apk'), True),
(self.call.device._GetApplicationPathsInternal('test.package'),
['/fake/data/app/test.package.apk']),
(self.call.device._ComputeStaleApks('test.package',
['/fake/test/app.apk']),
(['/fake/test/app.apk'], None)),
self.call.adb.Install('/fake/test/app.apk', reinstall=True,
allow_downgrade=False)):
self.device.Install(DeviceUtilsInstallTest.mock_apk,
reinstall=True, retries=0, permissions=[])
def testInstall_identicalPriorInstall_reinstall(self):
with self.assertCalls(
(mock.call.os.path.exists('/fake/test/app.apk'), True),
(self.call.device._GetApplicationPathsInternal('test.package'),
['/fake/data/app/test.package.apk']),
(self.call.device._ComputeStaleApks('test.package',
['/fake/test/app.apk']),
([], None)),
(self.call.device.ForceStop('test.package'))):
self.device.Install(DeviceUtilsInstallTest.mock_apk,
reinstall=True, retries=0, permissions=[])
def testInstall_missingApk(self):
with self.assertCalls(
(mock.call.os.path.exists('/fake/test/app.apk'), False)):
with self.assertRaises(device_errors.CommandFailedError):
self.device.Install(DeviceUtilsInstallTest.mock_apk, retries=0)
def testInstall_fails(self):
with self.assertCalls(
(mock.call.os.path.exists('/fake/test/app.apk'), True),
(self.call.device._GetApplicationPathsInternal('test.package'), []),
(self.call.adb.Install('/fake/test/app.apk', reinstall=False,
allow_downgrade=False),
self.CommandError('Failure\r\n'))):
with self.assertRaises(device_errors.CommandFailedError):
self.device.Install(DeviceUtilsInstallTest.mock_apk, retries=0)
def testInstall_downgrade(self):
with self.assertCalls(
(mock.call.os.path.exists('/fake/test/app.apk'), True),
(self.call.device._GetApplicationPathsInternal('test.package'),
['/fake/data/app/test.package.apk']),
(self.call.device._ComputeStaleApks('test.package',
['/fake/test/app.apk']),
(['/fake/test/app.apk'], None)),
self.call.adb.Install('/fake/test/app.apk', reinstall=True,
allow_downgrade=True)):
self.device.Install(DeviceUtilsInstallTest.mock_apk,
reinstall=True, retries=0, permissions=[], allow_downgrade=True)
class DeviceUtilsInstallSplitApkTest(DeviceUtilsTest):
mock_apk = _MockApkHelper('base.apk', 'test.package', ['p1'])
def testInstallSplitApk_noPriorInstall(self):
with self.assertCalls(
(self.call.device._CheckSdkLevel(21)),
(mock.call.devil.android.sdk.split_select.SelectSplits(
self.device, 'base.apk',
['split1.apk', 'split2.apk', 'split3.apk'],
allow_cached_props=False),
['split2.apk']),
(mock.call.os.path.exists('base.apk'), True),
(mock.call.os.path.exists('split2.apk'), True),
(self.call.device._GetApplicationPathsInternal('test.package'), []),
(self.call.adb.InstallMultiple(
['base.apk', 'split2.apk'], partial=None, reinstall=False,
allow_downgrade=False))):
self.device.InstallSplitApk(DeviceUtilsInstallSplitApkTest.mock_apk,
['split1.apk', 'split2.apk', 'split3.apk'], permissions=[], retries=0)
def testInstallSplitApk_partialInstall(self):
with self.assertCalls(
(self.call.device._CheckSdkLevel(21)),
(mock.call.devil.android.sdk.split_select.SelectSplits(
self.device, 'base.apk',
['split1.apk', 'split2.apk', 'split3.apk'],
allow_cached_props=False),
['split2.apk']),
(mock.call.os.path.exists('base.apk'), True),
(mock.call.os.path.exists('split2.apk'), True),
(self.call.device._GetApplicationPathsInternal('test.package'),
['base-on-device.apk', 'split2-on-device.apk']),
(self.call.device._ComputeStaleApks('test.package',
['base.apk', 'split2.apk']),
(['split2.apk'], None)),
(self.call.adb.InstallMultiple(
['split2.apk'], partial='test.package', reinstall=True,
allow_downgrade=False))):
self.device.InstallSplitApk(DeviceUtilsInstallSplitApkTest.mock_apk,
['split1.apk', 'split2.apk', 'split3.apk'],
reinstall=True, permissions=[], retries=0)
def testInstallSplitApk_downgrade(self):
with self.assertCalls(
(self.call.device._CheckSdkLevel(21)),
(mock.call.devil.android.sdk.split_select.SelectSplits(
self.device, 'base.apk',
['split1.apk', 'split2.apk', 'split3.apk'],
allow_cached_props=False),
['split2.apk']),
(mock.call.os.path.exists('base.apk'), True),
(mock.call.os.path.exists('split2.apk'), True),
(self.call.device._GetApplicationPathsInternal('test.package'),
['base-on-device.apk', 'split2-on-device.apk']),
(self.call.device._ComputeStaleApks('test.package',
['base.apk', 'split2.apk']),
(['split2.apk'], None)),
(self.call.adb.InstallMultiple(
['split2.apk'], partial='test.package', reinstall=True,
allow_downgrade=True))):
self.device.InstallSplitApk(DeviceUtilsInstallSplitApkTest.mock_apk,
['split1.apk', 'split2.apk', 'split3.apk'],
reinstall=True, permissions=[], retries=0,
allow_downgrade=True)
def testInstallSplitApk_missingSplit(self):
with self.assertCalls(
(self.call.device._CheckSdkLevel(21)),
(mock.call.devil.android.sdk.split_select.SelectSplits(
self.device, 'base.apk',
['split1.apk', 'split2.apk', 'split3.apk'],
allow_cached_props=False),
['split2.apk']),
(mock.call.os.path.exists('base.apk'), True),
(mock.call.os.path.exists('split2.apk'), False)):
with self.assertRaises(device_errors.CommandFailedError):
self.device.InstallSplitApk(DeviceUtilsInstallSplitApkTest.mock_apk,
['split1.apk', 'split2.apk', 'split3.apk'], permissions=[],
retries=0)
class DeviceUtilsUninstallTest(DeviceUtilsTest):
def testUninstall_callsThrough(self):
with self.assertCalls(
(self.call.device._GetApplicationPathsInternal('test.package'),
['/path.apk']),
self.call.adb.Uninstall('test.package', True)):
self.device.Uninstall('test.package', True)
def testUninstall_noop(self):
with self.assertCalls(
(self.call.device._GetApplicationPathsInternal('test.package'), [])):
self.device.Uninstall('test.package', True)
class DeviceUtilsSuTest(DeviceUtilsTest):
def testSu_preM(self):
with self.patch_call(
self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP_MR1):
self.assertEquals('su -c foo', self.device._Su('foo'))
def testSu_mAndAbove(self):
with self.patch_call(
self.call.device.build_version_sdk,
return_value=version_codes.MARSHMALLOW):
self.assertEquals('su 0 foo', self.device._Su('foo'))
class DeviceUtilsRunShellCommandTest(DeviceUtilsTest):
def setUp(self):
super(DeviceUtilsRunShellCommandTest, self).setUp()
self.device.NeedsSU = mock.Mock(return_value=False)
def testRunShellCommand_commandAsList(self):
with self.assertCall(self.call.adb.Shell('pm list packages'), ''):
self.device.RunShellCommand(
['pm', 'list', 'packages'], check_return=True)
def testRunShellCommand_commandAsListQuoted(self):
with self.assertCall(self.call.adb.Shell("echo 'hello world' '$10'"), ''):
self.device.RunShellCommand(
['echo', 'hello world', '$10'], check_return=True)
def testRunShellCommand_commandAsString(self):
with self.assertCall(self.call.adb.Shell('echo "$VAR"'), ''):
self.device.RunShellCommand(
'echo "$VAR"', shell=True, check_return=True)
def testNewRunShellImpl_withEnv(self):
with self.assertCall(
self.call.adb.Shell('VAR=some_string echo "$VAR"'), ''):
self.device.RunShellCommand(
'echo "$VAR"', shell=True, check_return=True,
env={'VAR': 'some_string'})
def testNewRunShellImpl_withEnvQuoted(self):
with self.assertCall(
self.call.adb.Shell('PATH="$PATH:/other/path" run_this'), ''):
self.device.RunShellCommand(
['run_this'], check_return=True, env={'PATH': '$PATH:/other/path'})
def testNewRunShellImpl_withEnv_failure(self):
with self.assertRaises(KeyError):
self.device.RunShellCommand(
['some_cmd'], check_return=True, env={'INVALID NAME': 'value'})
def testNewRunShellImpl_withCwd(self):
with self.assertCall(self.call.adb.Shell('cd /some/test/path && ls'), ''):
self.device.RunShellCommand(
['ls'], check_return=True, cwd='/some/test/path')
def testNewRunShellImpl_withCwdQuoted(self):
with self.assertCall(
self.call.adb.Shell("cd '/some test/path with/spaces' && ls"), ''):
self.device.RunShellCommand(
['ls'], check_return=True, cwd='/some test/path with/spaces')
def testRunShellCommand_withHugeCmd(self):
payload = 'hi! ' * 1024
expected_cmd = "echo '%s'" % payload
with self.assertCalls(
(mock.call.devil.android.device_temp_file.DeviceTempFile(
self.adb, suffix='.sh'), MockTempFile('/sdcard/temp-123.sh')),
self.call.device._WriteFileWithPush('/sdcard/temp-123.sh', expected_cmd),
(self.call.adb.Shell('sh /sdcard/temp-123.sh'), payload + '\n')):
self.assertEquals(
[payload],
self.device.RunShellCommand(['echo', payload], check_return=True))
def testRunShellCommand_withHugeCmdAndSu(self):
payload = 'hi! ' * 1024
expected_cmd_without_su = """sh -c 'echo '"'"'%s'"'"''""" % payload
expected_cmd = 'su -c %s' % expected_cmd_without_su
with self.assertCalls(
(self.call.device.NeedsSU(), True),
(self.call.device._Su(expected_cmd_without_su), expected_cmd),
(mock.call.devil.android.device_temp_file.DeviceTempFile(
self.adb, suffix='.sh'), MockTempFile('/sdcard/temp-123.sh')),
self.call.device._WriteFileWithPush('/sdcard/temp-123.sh', expected_cmd),
(self.call.adb.Shell('sh /sdcard/temp-123.sh'), payload + '\n')):
self.assertEquals(
[payload],
self.device.RunShellCommand(
['echo', payload], check_return=True, as_root=True))
def testRunShellCommand_withSu(self):
expected_cmd_without_su = "sh -c 'setprop service.adb.root 0'"
expected_cmd = 'su -c %s' % expected_cmd_without_su
with self.assertCalls(
(self.call.device.NeedsSU(), True),
(self.call.device._Su(expected_cmd_without_su), expected_cmd),
(self.call.adb.Shell(expected_cmd), '')):
self.device.RunShellCommand(
['setprop', 'service.adb.root', '0'],
check_return=True, as_root=True)
def testRunShellCommand_withRunAs(self):
expected_cmd_without_run_as = "sh -c 'mkdir -p files'"
expected_cmd = (
'run-as org.devil.test_package %s' % expected_cmd_without_run_as)
with self.assertCall(self.call.adb.Shell(expected_cmd), ''):
self.device.RunShellCommand(
['mkdir', '-p', 'files'],
check_return=True, run_as='org.devil.test_package')
def testRunShellCommand_withRunAsAndSu(self):
expected_cmd_with_nothing = "sh -c 'mkdir -p files'"
expected_cmd_with_run_as = (
'run-as org.devil.test_package %s' % expected_cmd_with_nothing)
expected_cmd_without_su = (
'sh -c %s' % cmd_helper.SingleQuote(expected_cmd_with_run_as))
expected_cmd = 'su -c %s' % expected_cmd_without_su
with self.assertCalls(
(self.call.device.NeedsSU(), True),
(self.call.device._Su(expected_cmd_without_su), expected_cmd),
(self.call.adb.Shell(expected_cmd), '')):
self.device.RunShellCommand(
['mkdir', '-p', 'files'],
check_return=True, run_as='org.devil.test_package',
as_root=True)
def testRunShellCommand_manyLines(self):
cmd = 'ls /some/path'
with self.assertCall(self.call.adb.Shell(cmd), 'file1\nfile2\nfile3\n'):
self.assertEquals(
['file1', 'file2', 'file3'],
self.device.RunShellCommand(cmd.split(), check_return=True))
def testRunShellCommand_manyLinesRawOutput(self):
cmd = 'ls /some/path'
with self.assertCall(self.call.adb.Shell(cmd), '\rfile1\nfile2\r\nfile3\n'):
self.assertEquals(
'\rfile1\nfile2\r\nfile3\n',
self.device.RunShellCommand(
cmd.split(), check_return=True, raw_output=True))
def testRunShellCommand_singleLine_success(self):
cmd = 'echo $VALUE'
with self.assertCall(self.call.adb.Shell(cmd), 'some value\n'):
self.assertEquals(
'some value',
self.device.RunShellCommand(
cmd, shell=True, check_return=True, single_line=True))
def testRunShellCommand_singleLine_successEmptyLine(self):
cmd = 'echo $VALUE'
with self.assertCall(self.call.adb.Shell(cmd), '\n'):
self.assertEquals(
'',
self.device.RunShellCommand(
cmd, shell=True, check_return=True, single_line=True))
def testRunShellCommand_singleLine_successWithoutEndLine(self):
cmd = 'echo -n $VALUE'
with self.assertCall(self.call.adb.Shell(cmd), 'some value'):
self.assertEquals(
'some value',
self.device.RunShellCommand(
cmd, shell=True, check_return=True, single_line=True))
def testRunShellCommand_singleLine_successNoOutput(self):
cmd = 'echo -n $VALUE'
with self.assertCall(self.call.adb.Shell(cmd), ''):
self.assertEquals(
'',
self.device.RunShellCommand(
cmd, shell=True, check_return=True, single_line=True))
def testRunShellCommand_singleLine_failTooManyLines(self):
cmd = 'echo $VALUE'
with self.assertCall(self.call.adb.Shell(cmd),
'some value\nanother value\n'):
with self.assertRaises(device_errors.CommandFailedError):
self.device.RunShellCommand(
cmd, shell=True, check_return=True, single_line=True)
def testRunShellCommand_checkReturn_success(self):
cmd = 'echo $ANDROID_DATA'
output = '/data\n'
with self.assertCall(self.call.adb.Shell(cmd), output):
self.assertEquals(
[output.rstrip()],
self.device.RunShellCommand(cmd, shell=True, check_return=True))
def testRunShellCommand_checkReturn_failure(self):
cmd = 'ls /root'
output = 'opendir failed, Permission denied\n'
with self.assertCall(self.call.adb.Shell(cmd), self.ShellError(output)):
with self.assertRaises(device_errors.AdbCommandFailedError):
self.device.RunShellCommand(cmd.split(), check_return=True)
def testRunShellCommand_checkReturn_disabled(self):
cmd = 'ls /root'
output = 'opendir failed, Permission denied\n'
with self.assertCall(self.call.adb.Shell(cmd), self.ShellError(output)):
self.assertEquals(
[output.rstrip()],
self.device.RunShellCommand(cmd.split(), check_return=False))
def testRunShellCommand_largeOutput_enabled(self):
cmd = 'echo $VALUE'
temp_file = MockTempFile('/sdcard/temp-123')
cmd_redirect = '( %s )>%s' % (cmd, temp_file.name)
with self.assertCalls(
(mock.call.devil.android.device_temp_file.DeviceTempFile(self.adb),
temp_file),
(self.call.adb.Shell(cmd_redirect)),
(self.call.device.ReadFile(temp_file.name, force_pull=True),
'something')):
self.assertEquals(
['something'],
self.device.RunShellCommand(
cmd, shell=True, large_output=True, check_return=True))
def testRunShellCommand_largeOutput_disabledNoTrigger(self):
cmd = 'something'
with self.assertCall(self.call.adb.Shell(cmd), self.ShellError('')):
with self.assertRaises(device_errors.AdbCommandFailedError):
self.device.RunShellCommand([cmd], check_return=True)
def testRunShellCommand_largeOutput_disabledTrigger(self):
cmd = 'echo $VALUE'
temp_file = MockTempFile('/sdcard/temp-123')
cmd_redirect = '( %s )>%s' % (cmd, temp_file.name)
with self.assertCalls(
(self.call.adb.Shell(cmd), self.ShellError('', None)),
(mock.call.devil.android.device_temp_file.DeviceTempFile(self.adb),
temp_file),
(self.call.adb.Shell(cmd_redirect)),
(self.call.device.ReadFile(mock.ANY, force_pull=True),
'something')):
self.assertEquals(
['something'],
self.device.RunShellCommand(cmd, shell=True, check_return=True))
class DeviceUtilsRunPipedShellCommandTest(DeviceUtilsTest):
def testRunPipedShellCommand_success(self):
with self.assertCall(
self.call.device.RunShellCommand(
'ps | grep foo; echo "PIPESTATUS: ${PIPESTATUS[@]}"',
shell=True, check_return=True),
['This line contains foo', 'PIPESTATUS: 0 0']):
self.assertEquals(['This line contains foo'],
self.device._RunPipedShellCommand('ps | grep foo'))
def testRunPipedShellCommand_firstCommandFails(self):
with self.assertCall(
self.call.device.RunShellCommand(
'ps | grep foo; echo "PIPESTATUS: ${PIPESTATUS[@]}"',
shell=True, check_return=True),
['PIPESTATUS: 1 0']):
with self.assertRaises(device_errors.AdbShellCommandFailedError) as ec:
self.device._RunPipedShellCommand('ps | grep foo')
self.assertEquals([1, 0], ec.exception.status)
def testRunPipedShellCommand_secondCommandFails(self):
with self.assertCall(
self.call.device.RunShellCommand(
'ps | grep foo; echo "PIPESTATUS: ${PIPESTATUS[@]}"',
shell=True, check_return=True),
['PIPESTATUS: 0 1']):
with self.assertRaises(device_errors.AdbShellCommandFailedError) as ec:
self.device._RunPipedShellCommand('ps | grep foo')
self.assertEquals([0, 1], ec.exception.status)
def testRunPipedShellCommand_outputCutOff(self):
with self.assertCall(
self.call.device.RunShellCommand(
'ps | grep foo; echo "PIPESTATUS: ${PIPESTATUS[@]}"',
shell=True, check_return=True),
['foo.bar'] * 256 + ['foo.ba']):
with self.assertRaises(device_errors.AdbShellCommandFailedError) as ec:
self.device._RunPipedShellCommand('ps | grep foo')
self.assertIs(None, ec.exception.status)
@mock.patch('time.sleep', mock.Mock())
class DeviceUtilsKillAllTest(DeviceUtilsTest):
def testKillAll_noMatchingProcessesFailure(self):
with self.assertCall(self.call.device.GetPids('test_process'), {}):
with self.assertRaises(device_errors.CommandFailedError):
self.device.KillAll('test_process')
def testKillAll_noMatchingProcessesQuiet(self):
with self.assertCall(self.call.device.GetPids('test_process'), {}):
self.assertEqual(0, self.device.KillAll('test_process', quiet=True))
def testKillAll_nonblocking(self):
with self.assertCalls(
(self.call.device.GetPids('some.process'),
{'some.process': ['1234'], 'some.processing.thing': ['5678']}),
(self.call.adb.Shell('kill -9 1234 5678'), '')):
self.assertEquals(
2, self.device.KillAll('some.process', blocking=False))
def testKillAll_blocking(self):
with self.assertCalls(
(self.call.device.GetPids('some.process'),
{'some.process': ['1234'], 'some.processing.thing': ['5678']}),
(self.call.adb.Shell('kill -9 1234 5678'), ''),
(self.call.device.GetPids('some.process'),
{'some.processing.thing': ['5678']}),
(self.call.device.GetPids('some.process'),
{'some.process': ['1111']})): # Other instance with different pid.
self.assertEquals(
2, self.device.KillAll('some.process', blocking=True))
def testKillAll_exactNonblocking(self):
with self.assertCalls(
(self.call.device.GetPids('some.process'),
{'some.process': ['1234'], 'some.processing.thing': ['5678']}),
(self.call.adb.Shell('kill -9 1234'), '')):
self.assertEquals(
1, self.device.KillAll('some.process', exact=True, blocking=False))
def testKillAll_exactBlocking(self):
with self.assertCalls(
(self.call.device.GetPids('some.process'),
{'some.process': ['1234'], 'some.processing.thing': ['5678']}),
(self.call.adb.Shell('kill -9 1234'), ''),
(self.call.device.GetPids('some.process'),
{'some.process': ['1234'], 'some.processing.thing': ['5678']}),
(self.call.device.GetPids('some.process'),
{'some.processing.thing': ['5678']})):
self.assertEquals(
1, self.device.KillAll('some.process', exact=True, blocking=True))
def testKillAll_root(self):
with self.assertCalls(
(self.call.device.GetPids('some.process'), {'some.process': ['1234']}),
(self.call.device.NeedsSU(), True),
(self.call.device._Su("sh -c 'kill -9 1234'"),
"su -c sh -c 'kill -9 1234'"),
(self.call.adb.Shell("su -c sh -c 'kill -9 1234'"), '')):
self.assertEquals(
1, self.device.KillAll('some.process', as_root=True))
def testKillAll_sigterm(self):
with self.assertCalls(
(self.call.device.GetPids('some.process'),
{'some.process': ['1234']}),
(self.call.adb.Shell('kill -15 1234'), '')):
self.assertEquals(
1, self.device.KillAll('some.process', signum=device_signal.SIGTERM))
def testKillAll_multipleInstances(self):
with self.assertCalls(
(self.call.device.GetPids('some.process'),
{'some.process': ['1234', '4567']}),
(self.call.adb.Shell('kill -15 1234 4567'), '')):
self.assertEquals(
2, self.device.KillAll('some.process', signum=device_signal.SIGTERM))
class DeviceUtilsStartActivityTest(DeviceUtilsTest):
def testStartActivity_actionOnly(self):
test_intent = intent.Intent(action='android.intent.action.VIEW')
with self.assertCall(
self.call.adb.Shell('am start '
'-a android.intent.action.VIEW'),
'Starting: Intent { act=android.intent.action.VIEW }'):
self.device.StartActivity(test_intent)
def testStartActivity_success(self):
test_intent = intent.Intent(action='android.intent.action.VIEW',
package='test.package',
activity='.Main')
with self.assertCall(
self.call.adb.Shell('am start '
'-a android.intent.action.VIEW '
'-n test.package/.Main'),
'Starting: Intent { act=android.intent.action.VIEW }'):
self.device.StartActivity(test_intent)
def testStartActivity_failure(self):
test_intent = intent.Intent(action='android.intent.action.VIEW',
package='test.package',
activity='.Main')
with self.assertCall(
self.call.adb.Shell('am start '
'-a android.intent.action.VIEW '
'-n test.package/.Main'),
'Error: Failed to start test activity'):
with self.assertRaises(device_errors.CommandFailedError):
self.device.StartActivity(test_intent)
def testStartActivity_blocking(self):
test_intent = intent.Intent(action='android.intent.action.VIEW',
package='test.package',
activity='.Main')
with self.assertCall(
self.call.adb.Shell('am start '
'-W '
'-a android.intent.action.VIEW '
'-n test.package/.Main'),
'Starting: Intent { act=android.intent.action.VIEW }'):
self.device.StartActivity(test_intent, blocking=True)
def testStartActivity_withCategory(self):
test_intent = intent.Intent(action='android.intent.action.VIEW',
package='test.package',
activity='.Main',
category='android.intent.category.HOME')
with self.assertCall(
self.call.adb.Shell('am start '
'-a android.intent.action.VIEW '
'-c android.intent.category.HOME '
'-n test.package/.Main'),
'Starting: Intent { act=android.intent.action.VIEW }'):
self.device.StartActivity(test_intent)
def testStartActivity_withMultipleCategories(self):
test_intent = intent.Intent(action='android.intent.action.VIEW',
package='test.package',
activity='.Main',
category=['android.intent.category.HOME',
'android.intent.category.BROWSABLE'])
with self.assertCall(
self.call.adb.Shell('am start '
'-a android.intent.action.VIEW '
'-c android.intent.category.HOME '
'-c android.intent.category.BROWSABLE '
'-n test.package/.Main'),
'Starting: Intent { act=android.intent.action.VIEW }'):
self.device.StartActivity(test_intent)
def testStartActivity_withData(self):
test_intent = intent.Intent(action='android.intent.action.VIEW',
package='test.package',
activity='.Main',
data='http://www.google.com/')
with self.assertCall(
self.call.adb.Shell('am start '
'-a android.intent.action.VIEW '
'-d http://www.google.com/ '
'-n test.package/.Main'),
'Starting: Intent { act=android.intent.action.VIEW }'):
self.device.StartActivity(test_intent)
def testStartActivity_withStringExtra(self):
test_intent = intent.Intent(action='android.intent.action.VIEW',
package='test.package',
activity='.Main',
extras={'foo': 'test'})
with self.assertCall(
self.call.adb.Shell('am start '
'-a android.intent.action.VIEW '
'-n test.package/.Main '
'--es foo test'),
'Starting: Intent { act=android.intent.action.VIEW }'):
self.device.StartActivity(test_intent)
def testStartActivity_withBoolExtra(self):
test_intent = intent.Intent(action='android.intent.action.VIEW',
package='test.package',
activity='.Main',
extras={'foo': True})
with self.assertCall(
self.call.adb.Shell('am start '
'-a android.intent.action.VIEW '
'-n test.package/.Main '
'--ez foo True'),
'Starting: Intent { act=android.intent.action.VIEW }'):
self.device.StartActivity(test_intent)
def testStartActivity_withIntExtra(self):
test_intent = intent.Intent(action='android.intent.action.VIEW',
package='test.package',
activity='.Main',
extras={'foo': 123})
with self.assertCall(
self.call.adb.Shell('am start '
'-a android.intent.action.VIEW '
'-n test.package/.Main '
'--ei foo 123'),
'Starting: Intent { act=android.intent.action.VIEW }'):
self.device.StartActivity(test_intent)
def testStartActivity_withTraceFile(self):
test_intent = intent.Intent(action='android.intent.action.VIEW',
package='test.package',
activity='.Main')
with self.assertCall(
self.call.adb.Shell('am start '
'--start-profiler test_trace_file.out '
'-a android.intent.action.VIEW '
'-n test.package/.Main'),
'Starting: Intent { act=android.intent.action.VIEW }'):
self.device.StartActivity(test_intent,
trace_file_name='test_trace_file.out')
def testStartActivity_withForceStop(self):
test_intent = intent.Intent(action='android.intent.action.VIEW',
package='test.package',
activity='.Main')
with self.assertCall(
self.call.adb.Shell('am start '
'-S '
'-a android.intent.action.VIEW '
'-n test.package/.Main'),
'Starting: Intent { act=android.intent.action.VIEW }'):
self.device.StartActivity(test_intent, force_stop=True)
def testStartActivity_withFlags(self):
test_intent = intent.Intent(action='android.intent.action.VIEW',
package='test.package',
activity='.Main',
flags=[
intent.FLAG_ACTIVITY_NEW_TASK,
intent.FLAG_ACTIVITY_RESET_TASK_IF_NEEDED
])
with self.assertCall(
self.call.adb.Shell('am start '
'-a android.intent.action.VIEW '
'-n test.package/.Main '
'-f 0x10200000'),
'Starting: Intent { act=android.intent.action.VIEW }'):
self.device.StartActivity(test_intent)
class DeviceUtilsStartInstrumentationTest(DeviceUtilsTest):
def testStartInstrumentation_nothing(self):
with self.assertCalls(
self.call.device.RunShellCommand(
'p=test.package;am instrument "$p"/.TestInstrumentation',
shell=True, check_return=True, large_output=True)):
self.device.StartInstrumentation(
'test.package/.TestInstrumentation',
finish=False, raw=False, extras=None)
def testStartInstrumentation_finish(self):
with self.assertCalls(
(self.call.device.RunShellCommand(
'p=test.package;am instrument -w "$p"/.TestInstrumentation',
shell=True, check_return=True, large_output=True),
['OK (1 test)'])):
output = self.device.StartInstrumentation(
'test.package/.TestInstrumentation',
finish=True, raw=False, extras=None)
self.assertEquals(['OK (1 test)'], output)
def testStartInstrumentation_raw(self):
with self.assertCalls(
self.call.device.RunShellCommand(
'p=test.package;am instrument -r "$p"/.TestInstrumentation',
shell=True, check_return=True, large_output=True)):
self.device.StartInstrumentation(
'test.package/.TestInstrumentation',
finish=False, raw=True, extras=None)
def testStartInstrumentation_extras(self):
with self.assertCalls(
self.call.device.RunShellCommand(
'p=test.package;am instrument -e "$p".foo Foo -e bar \'Val \'"$p" '
'"$p"/.TestInstrumentation',
shell=True, check_return=True, large_output=True)):
self.device.StartInstrumentation(
'test.package/.TestInstrumentation',
finish=False, raw=False, extras={'test.package.foo': 'Foo',
'bar': 'Val test.package'})
class DeviceUtilsBroadcastIntentTest(DeviceUtilsTest):
def testBroadcastIntent_noExtras(self):
test_intent = intent.Intent(action='test.package.with.an.INTENT')
with self.assertCall(
self.call.adb.Shell('am broadcast -a test.package.with.an.INTENT'),
'Broadcasting: Intent { act=test.package.with.an.INTENT } '):
self.device.BroadcastIntent(test_intent)
def testBroadcastIntent_withExtra(self):
test_intent = intent.Intent(action='test.package.with.an.INTENT',
extras={'foo': 'bar value'})
with self.assertCall(
self.call.adb.Shell(
"am broadcast -a test.package.with.an.INTENT --es foo 'bar value'"),
'Broadcasting: Intent { act=test.package.with.an.INTENT } '):
self.device.BroadcastIntent(test_intent)
def testBroadcastIntent_withExtra_noValue(self):
test_intent = intent.Intent(action='test.package.with.an.INTENT',
extras={'foo': None})
with self.assertCall(
self.call.adb.Shell(
'am broadcast -a test.package.with.an.INTENT --esn foo'),
'Broadcasting: Intent { act=test.package.with.an.INTENT } '):
self.device.BroadcastIntent(test_intent)
class DeviceUtilsGoHomeTest(DeviceUtilsTest):
def testGoHome_popupsExist(self):
with self.assertCalls(
(self.call.device.RunShellCommand(
['dumpsys', 'window', 'windows'], check_return=True,
large_output=True), []),
(self.call.device.RunShellCommand(
['am', 'start', '-W', '-a', 'android.intent.action.MAIN',
'-c', 'android.intent.category.HOME'], check_return=True),
'Starting: Intent { act=android.intent.action.MAIN }\r\n'''),
(self.call.device.RunShellCommand(
['dumpsys', 'window', 'windows'], check_return=True,
large_output=True), []),
(self.call.device.RunShellCommand(
['input', 'keyevent', '66'], check_return=True)),
(self.call.device.RunShellCommand(
['input', 'keyevent', '4'], check_return=True)),
(self.call.device.RunShellCommand(
['dumpsys', 'window', 'windows'], check_return=True,
large_output=True),
['mCurrentFocus Launcher'])):
self.device.GoHome()
def testGoHome_willRetry(self):
with self.assertCalls(
(self.call.device.RunShellCommand(
['dumpsys', 'window', 'windows'], check_return=True,
large_output=True), []),
(self.call.device.RunShellCommand(
['am', 'start', '-W', '-a', 'android.intent.action.MAIN',
'-c', 'android.intent.category.HOME'], check_return=True),
'Starting: Intent { act=android.intent.action.MAIN }\r\n'''),
(self.call.device.RunShellCommand(
['dumpsys', 'window', 'windows'], check_return=True,
large_output=True), []),
(self.call.device.RunShellCommand(
['input', 'keyevent', '66'], check_return=True,)),
(self.call.device.RunShellCommand(
['input', 'keyevent', '4'], check_return=True)),
(self.call.device.RunShellCommand(
['dumpsys', 'window', 'windows'], check_return=True,
large_output=True), []),
(self.call.device.RunShellCommand(
['input', 'keyevent', '66'], check_return=True)),
(self.call.device.RunShellCommand(
['input', 'keyevent', '4'], check_return=True)),
(self.call.device.RunShellCommand(
['dumpsys', 'window', 'windows'], check_return=True,
large_output=True),
self.TimeoutError())):
with self.assertRaises(device_errors.CommandTimeoutError):
self.device.GoHome()
def testGoHome_alreadyFocused(self):
with self.assertCall(
self.call.device.RunShellCommand(
['dumpsys', 'window', 'windows'], check_return=True,
large_output=True),
['mCurrentFocus Launcher']):
self.device.GoHome()
def testGoHome_alreadyFocusedAlternateCase(self):
with self.assertCall(
self.call.device.RunShellCommand(
['dumpsys', 'window', 'windows'], check_return=True,
large_output=True),
[' mCurrentFocus .launcher/.']):
self.device.GoHome()
def testGoHome_obtainsFocusAfterGoingHome(self):
with self.assertCalls(
(self.call.device.RunShellCommand(
['dumpsys', 'window', 'windows'], check_return=True,
large_output=True), []),
(self.call.device.RunShellCommand(
['am', 'start', '-W', '-a', 'android.intent.action.MAIN',
'-c', 'android.intent.category.HOME'], check_return=True),
'Starting: Intent { act=android.intent.action.MAIN }\r\n'''),
(self.call.device.RunShellCommand(
['dumpsys', 'window', 'windows'], check_return=True,
large_output=True),
['mCurrentFocus Launcher'])):
self.device.GoHome()
class DeviceUtilsForceStopTest(DeviceUtilsTest):
def testForceStop(self):
with self.assertCall(
self.call.adb.Shell('p=test.package;if [[ "$(ps)" = *$p* ]]; then '
'am force-stop $p; fi'),
''):
self.device.ForceStop('test.package')
class DeviceUtilsClearApplicationStateTest(DeviceUtilsTest):
def testClearApplicationState_setPermissions(self):
with self.assertCalls(
(self.call.device.GetProp('ro.build.version.sdk', cache=True), '17'),
(self.call.device._GetApplicationPathsInternal('this.package.exists'),
['/data/app/this.package.exists.apk']),
(self.call.device.RunShellCommand(
['pm', 'clear', 'this.package.exists'],
check_return=True),
['Success']),
(self.call.device.GrantPermissions(
'this.package.exists', ['p1']), [])):
self.device.ClearApplicationState(
'this.package.exists', permissions=['p1'])
def testClearApplicationState_packageDoesntExist(self):
with self.assertCalls(
(self.call.device.GetProp('ro.build.version.sdk', cache=True), '11'),
(self.call.device._GetApplicationPathsInternal('does.not.exist'),
[])):
self.device.ClearApplicationState('does.not.exist')
def testClearApplicationState_packageDoesntExistOnAndroidJBMR2OrAbove(self):
with self.assertCalls(
(self.call.device.GetProp('ro.build.version.sdk', cache=True), '18'),
(self.call.device.RunShellCommand(
['pm', 'clear', 'this.package.does.not.exist'],
check_return=True),
['Failed'])):
self.device.ClearApplicationState('this.package.does.not.exist')
def testClearApplicationState_packageExists(self):
with self.assertCalls(
(self.call.device.GetProp('ro.build.version.sdk', cache=True), '17'),
(self.call.device._GetApplicationPathsInternal('this.package.exists'),
['/data/app/this.package.exists.apk']),
(self.call.device.RunShellCommand(
['pm', 'clear', 'this.package.exists'],
check_return=True),
['Success'])):
self.device.ClearApplicationState('this.package.exists')
def testClearApplicationState_packageExistsOnAndroidJBMR2OrAbove(self):
with self.assertCalls(
(self.call.device.GetProp('ro.build.version.sdk', cache=True), '18'),
(self.call.device.RunShellCommand(
['pm', 'clear', 'this.package.exists'],
check_return=True),
['Success'])):
self.device.ClearApplicationState('this.package.exists')
class DeviceUtilsSendKeyEventTest(DeviceUtilsTest):
def testSendKeyEvent(self):
with self.assertCall(self.call.adb.Shell('input keyevent 66'), ''):
self.device.SendKeyEvent(66)
class DeviceUtilsPushChangedFilesIndividuallyTest(DeviceUtilsTest):
def testPushChangedFilesIndividually_empty(self):
test_files = []
with self.assertCalls():
self.device._PushChangedFilesIndividually(test_files)
def testPushChangedFilesIndividually_single(self):
test_files = [('/test/host/path', '/test/device/path')]
with self.assertCalls(self.call.adb.Push(*test_files[0])):
self.device._PushChangedFilesIndividually(test_files)
def testPushChangedFilesIndividually_multiple(self):
test_files = [
('/test/host/path/file1', '/test/device/path/file1'),
('/test/host/path/file2', '/test/device/path/file2')]
with self.assertCalls(
self.call.adb.Push(*test_files[0]),
self.call.adb.Push(*test_files[1])):
self.device._PushChangedFilesIndividually(test_files)
class DeviceUtilsPushChangedFilesZippedTest(DeviceUtilsTest):
def testPushChangedFilesZipped_noUnzipCommand(self):
test_files = [('/test/host/path/file1', '/test/device/path/file1')]
mock_zip_temp = mock.mock_open()
mock_zip_temp.return_value.name = '/test/temp/file/tmp.zip'
with self.assertCalls(
(mock.call.tempfile.NamedTemporaryFile(suffix='.zip'), mock_zip_temp),
(mock.call.multiprocessing.Process(
target=device_utils.DeviceUtils._CreateDeviceZip,
args=('/test/temp/file/tmp.zip', test_files)), mock.Mock()),
(self.call.device._MaybeInstallCommands(), False)):
self.assertFalse(self.device._PushChangedFilesZipped(test_files,
['/test/dir']))
def _testPushChangedFilesZipped_spec(self, test_files):
mock_zip_temp = mock.mock_open()
mock_zip_temp.return_value.name = '/test/temp/file/tmp.zip'
with self.assertCalls(
(mock.call.tempfile.NamedTemporaryFile(suffix='.zip'), mock_zip_temp),
(mock.call.multiprocessing.Process(
target=device_utils.DeviceUtils._CreateDeviceZip,
args=('/test/temp/file/tmp.zip', test_files)), mock.Mock()),
(self.call.device._MaybeInstallCommands(), True),
(self.call.device.NeedsSU(), True),
(mock.call.devil.android.device_temp_file.DeviceTempFile(self.adb,
suffix='.zip'),
MockTempFile('/test/sdcard/foo123.zip')),
self.call.adb.Push(
'/test/temp/file/tmp.zip', '/test/sdcard/foo123.zip'),
self.call.device.RunShellCommand(
'unzip /test/sdcard/foo123.zip&&chmod -R 777 /test/dir',
shell=True, as_root=True,
env={'PATH': '/data/local/tmp/bin:$PATH'},
check_return=True)):
self.assertTrue(self.device._PushChangedFilesZipped(test_files,
['/test/dir']))
def testPushChangedFilesZipped_single(self):
self._testPushChangedFilesZipped_spec(
[('/test/host/path/file1', '/test/device/path/file1')])
def testPushChangedFilesZipped_multiple(self):
self._testPushChangedFilesZipped_spec(
[('/test/host/path/file1', '/test/device/path/file1'),
('/test/host/path/file2', '/test/device/path/file2')])
class DeviceUtilsPathExistsTest(DeviceUtilsTest):
def testPathExists_pathExists(self):
with self.assertCall(
self.call.device.RunShellCommand(
['test', '-e', '/path/file exists'],
as_root=False, check_return=True, timeout=10, retries=0),
[]):
self.assertTrue(self.device.PathExists('/path/file exists'))
def testPathExists_multiplePathExists(self):
with self.assertCall(
self.call.device.RunShellCommand(
['test', '-e', '/path 1', '-a', '-e', '/path2'],
as_root=False, check_return=True, timeout=10, retries=0),
[]):
self.assertTrue(self.device.PathExists(('/path 1', '/path2')))
def testPathExists_pathDoesntExist(self):
with self.assertCall(
self.call.device.RunShellCommand(
['test', '-e', '/path/file.not.exists'],
as_root=False, check_return=True, timeout=10, retries=0),
self.ShellError()):
self.assertFalse(self.device.PathExists('/path/file.not.exists'))
def testPathExists_asRoot(self):
with self.assertCall(
self.call.device.RunShellCommand(
['test', '-e', '/root/path/exists'],
as_root=True, check_return=True, timeout=10, retries=0),
self.ShellError()):
self.assertFalse(
self.device.PathExists('/root/path/exists', as_root=True))
def testFileExists_pathDoesntExist(self):
with self.assertCall(
self.call.device.RunShellCommand(
['test', '-e', '/path/file.not.exists'],
as_root=False, check_return=True, timeout=10, retries=0),
self.ShellError()):
self.assertFalse(self.device.FileExists('/path/file.not.exists'))
class DeviceUtilsRemovePathTest(DeviceUtilsTest):
def testRemovePath_regular(self):
with self.assertCall(
self.call.device.RunShellCommand(
['rm', 'some file'], as_root=False, check_return=True),
[]):
self.device.RemovePath('some file')
def testRemovePath_withForce(self):
with self.assertCall(
self.call.device.RunShellCommand(
['rm', '-f', 'some file'], as_root=False, check_return=True),
[]):
self.device.RemovePath('some file', force=True)
def testRemovePath_recursively(self):
with self.assertCall(
self.call.device.RunShellCommand(
['rm', '-r', '/remove/this/dir'], as_root=False, check_return=True),
[]):
self.device.RemovePath('/remove/this/dir', recursive=True)
def testRemovePath_withRoot(self):
with self.assertCall(
self.call.device.RunShellCommand(
['rm', 'some file'], as_root=True, check_return=True),
[]):
self.device.RemovePath('some file', as_root=True)
def testRemovePath_manyPaths(self):
with self.assertCall(
self.call.device.RunShellCommand(
['rm', 'eeny', 'meeny', 'miny', 'moe'],
as_root=False, check_return=True),
[]):
self.device.RemovePath(['eeny', 'meeny', 'miny', 'moe'])
class DeviceUtilsPullFileTest(DeviceUtilsTest):
def testPullFile_existsOnDevice(self):
with mock.patch('os.path.exists', return_value=True):
with self.assertCall(
self.call.adb.Pull('/data/app/test.file.exists',
'/test/file/host/path')):
self.device.PullFile('/data/app/test.file.exists',
'/test/file/host/path')
def testPullFile_doesntExistOnDevice(self):
with mock.patch('os.path.exists', return_value=True):
with self.assertCall(
self.call.adb.Pull('/data/app/test.file.does.not.exist',
'/test/file/host/path'),
self.CommandError('remote object does not exist')):
with self.assertRaises(device_errors.CommandFailedError):
self.device.PullFile('/data/app/test.file.does.not.exist',
'/test/file/host/path')
class DeviceUtilsReadFileTest(DeviceUtilsTest):
def testReadFileWithPull_success(self):
tmp_host_dir = '/tmp/dir/on.host/'
tmp_host = MockTempFile('/tmp/dir/on.host/tmp_ReadFileWithPull')
tmp_host.file.read.return_value = 'some interesting contents'
with self.assertCalls(
(mock.call.tempfile.mkdtemp(), tmp_host_dir),
(self.call.adb.Pull('/path/to/device/file', mock.ANY)),
(mock.call.__builtin__.open(mock.ANY, 'r'), tmp_host),
(mock.call.os.path.exists(tmp_host_dir), True),
(mock.call.shutil.rmtree(tmp_host_dir), None)):
self.assertEquals('some interesting contents',
self.device._ReadFileWithPull('/path/to/device/file'))
tmp_host.file.read.assert_called_once_with()
def testReadFileWithPull_rejected(self):
tmp_host_dir = '/tmp/dir/on.host/'
with self.assertCalls(
(mock.call.tempfile.mkdtemp(), tmp_host_dir),
(self.call.adb.Pull('/path/to/device/file', mock.ANY),
self.CommandError()),
(mock.call.os.path.exists(tmp_host_dir), True),
(mock.call.shutil.rmtree(tmp_host_dir), None)):
with self.assertRaises(device_errors.CommandFailedError):
self.device._ReadFileWithPull('/path/to/device/file')
def testReadFile_exists(self):
with self.assertCalls(
(self.call.device.FileSize('/read/this/test/file', as_root=False), 256),
(self.call.device.RunShellCommand(
['cat', '/read/this/test/file'],
as_root=False, check_return=True),
['this is a test file'])):
self.assertEqual('this is a test file\n',
self.device.ReadFile('/read/this/test/file'))
def testReadFile_exists2(self):
# Same as testReadFile_exists, but uses Android N ls output.
with self.assertCalls(
(self.call.device.FileSize('/read/this/test/file', as_root=False), 256),
(self.call.device.RunShellCommand(
['cat', '/read/this/test/file'],
as_root=False, check_return=True),
['this is a test file'])):
self.assertEqual('this is a test file\n',
self.device.ReadFile('/read/this/test/file'))
def testReadFile_doesNotExist(self):
with self.assertCall(
self.call.device.FileSize('/this/file/does.not.exist', as_root=False),
self.CommandError('File does not exist')):
with self.assertRaises(device_errors.CommandFailedError):
self.device.ReadFile('/this/file/does.not.exist')
def testReadFile_zeroSize(self):
with self.assertCalls(
(self.call.device.FileSize('/this/file/has/zero/size', as_root=False),
0),
(self.call.device._ReadFileWithPull('/this/file/has/zero/size'),
'but it has contents\n')):
self.assertEqual('but it has contents\n',
self.device.ReadFile('/this/file/has/zero/size'))
def testReadFile_withSU(self):
with self.assertCalls(
(self.call.device.FileSize(
'/this/file/can.be.read.with.su', as_root=True), 256),
(self.call.device.RunShellCommand(
['cat', '/this/file/can.be.read.with.su'],
as_root=True, check_return=True),
['this is a test file', 'read with su'])):
self.assertEqual(
'this is a test file\nread with su\n',
self.device.ReadFile('/this/file/can.be.read.with.su',
as_root=True))
def testReadFile_withPull(self):
contents = 'a' * 123456
with self.assertCalls(
(self.call.device.FileSize('/read/this/big/test/file', as_root=False),
123456),
(self.call.device._ReadFileWithPull('/read/this/big/test/file'),
contents)):
self.assertEqual(
contents, self.device.ReadFile('/read/this/big/test/file'))
def testReadFile_withPullAndSU(self):
contents = 'b' * 123456
with self.assertCalls(
(self.call.device.FileSize(
'/this/big/file/can.be.read.with.su', as_root=True), 123456),
(self.call.device.NeedsSU(), True),
(mock.call.devil.android.device_temp_file.DeviceTempFile(self.adb),
MockTempFile('/sdcard/tmp/on.device')),
self.call.device.RunShellCommand(
'SRC=/this/big/file/can.be.read.with.su DEST=/sdcard/tmp/on.device;'
'cp "$SRC" "$DEST" && chmod 666 "$DEST"',
shell=True, as_root=True, check_return=True),
(self.call.device._ReadFileWithPull('/sdcard/tmp/on.device'),
contents)):
self.assertEqual(
contents,
self.device.ReadFile('/this/big/file/can.be.read.with.su',
as_root=True))
def testReadFile_forcePull(self):
contents = 'a' * 123456
with self.assertCall(
self.call.device._ReadFileWithPull('/read/this/big/test/file'),
contents):
self.assertEqual(
contents,
self.device.ReadFile('/read/this/big/test/file', force_pull=True))
class DeviceUtilsWriteFileTest(DeviceUtilsTest):
def testWriteFileWithPush_success(self):
tmp_host = MockTempFile('/tmp/file/on.host')
contents = 'some interesting contents'
with self.assertCalls(
(mock.call.tempfile.NamedTemporaryFile(), tmp_host),
self.call.adb.Push('/tmp/file/on.host', '/path/to/device/file')):
self.device._WriteFileWithPush('/path/to/device/file', contents)
tmp_host.file.write.assert_called_once_with(contents)
def testWriteFileWithPush_rejected(self):
tmp_host = MockTempFile('/tmp/file/on.host')
contents = 'some interesting contents'
with self.assertCalls(
(mock.call.tempfile.NamedTemporaryFile(), tmp_host),
(self.call.adb.Push('/tmp/file/on.host', '/path/to/device/file'),
self.CommandError())):
with self.assertRaises(device_errors.CommandFailedError):
self.device._WriteFileWithPush('/path/to/device/file', contents)
def testWriteFile_withPush(self):
contents = 'some large contents ' * 26 # 20 * 26 = 520 chars
with self.assertCalls(
self.call.device._WriteFileWithPush('/path/to/device/file', contents)):
self.device.WriteFile('/path/to/device/file', contents)
def testWriteFile_withPushForced(self):
contents = 'tiny contents'
with self.assertCalls(
self.call.device._WriteFileWithPush('/path/to/device/file', contents)):
self.device.WriteFile('/path/to/device/file', contents, force_push=True)
def testWriteFile_withPushAndSU(self):
contents = 'some large contents ' * 26 # 20 * 26 = 520 chars
with self.assertCalls(
(self.call.device.NeedsSU(), True),
(mock.call.devil.android.device_temp_file.DeviceTempFile(self.adb),
MockTempFile('/sdcard/tmp/on.device')),
self.call.device._WriteFileWithPush('/sdcard/tmp/on.device', contents),
self.call.device.RunShellCommand(
['cp', '/sdcard/tmp/on.device', '/path/to/device/file'],
as_root=True, check_return=True)):
self.device.WriteFile('/path/to/device/file', contents, as_root=True)
def testWriteFile_withEcho(self):
with self.assertCall(self.call.adb.Shell(
"echo -n the.contents > /test/file/to.write"), ''):
self.device.WriteFile('/test/file/to.write', 'the.contents')
def testWriteFile_withEchoAndQuotes(self):
with self.assertCall(self.call.adb.Shell(
"echo -n 'the contents' > '/test/file/to write'"), ''):
self.device.WriteFile('/test/file/to write', 'the contents')
def testWriteFile_withEchoAndSU(self):
expected_cmd_without_su = "sh -c 'echo -n contents > /test/file'"
expected_cmd = 'su -c %s' % expected_cmd_without_su
with self.assertCalls(
(self.call.device.NeedsSU(), True),
(self.call.device._Su(expected_cmd_without_su), expected_cmd),
(self.call.adb.Shell(expected_cmd),
'')):
self.device.WriteFile('/test/file', 'contents', as_root=True)
class DeviceUtilsStatDirectoryTest(DeviceUtilsTest):
# Note: Also tests ListDirectory in testStatDirectory_fileList.
EXAMPLE_LS_OUTPUT = [
'total 12345',
'drwxr-xr-x 19 root root 0 1970-04-06 18:03 .',
'drwxr-xr-x 19 root root 0 1970-04-06 18:03 ..',
'drwxr-xr-x 6 root root 1970-01-01 00:00 some_dir',
'-rw-r--r-- 1 root root 723 1971-01-01 07:04 some_file',
'-rw-r----- 1 root root 327 2009-02-13 23:30 My Music File',
# Older Android versions do not print st_nlink
'lrwxrwxrwx root root 1970-01-01 00:00 lnk -> /some/path',
'srwxrwx--- system system 2016-05-31 17:25 a_socket1',
'drwxrwxrwt system misc 1970-11-23 02:25 tmp',
'drwxr-s--- system shell 1970-11-23 02:24 my_cmd',
'cr--r----- root system 10, 183 1971-01-01 07:04 random',
'brw------- root root 7, 0 1971-01-01 07:04 block_dev',
'-rwS------ root shell 157404 2015-04-13 15:44 silly',
]
FILENAMES = [
'some_dir', 'some_file', 'My Music File', 'lnk', 'a_socket1',
'tmp', 'my_cmd', 'random', 'block_dev', 'silly']
def getStatEntries(self, path_given='/', path_listed='/'):
with self.assertCall(
self.call.device.RunShellCommand(
['ls', '-a', '-l', path_listed],
check_return=True, as_root=False, env={'TZ': 'utc'}),
self.EXAMPLE_LS_OUTPUT):
entries = self.device.StatDirectory(path_given)
return {f['filename']: f for f in entries}
def getListEntries(self):
with self.assertCall(
self.call.device.RunShellCommand(
['ls', '-a', '-l', '/'],
check_return=True, as_root=False, env={'TZ': 'utc'}),
self.EXAMPLE_LS_OUTPUT):
return self.device.ListDirectory('/')
def testStatDirectory_forceTrailingSlash(self):
self.getStatEntries(path_given='/foo/bar/', path_listed='/foo/bar/')
self.getStatEntries(path_given='/foo/bar', path_listed='/foo/bar/')
def testStatDirectory_fileList(self):
self.assertItemsEqual(self.getStatEntries().keys(), self.FILENAMES)
self.assertItemsEqual(self.getListEntries(), self.FILENAMES)
def testStatDirectory_fileModes(self):
expected_modes = (
('some_dir', stat.S_ISDIR),
('some_file', stat.S_ISREG),
('lnk', stat.S_ISLNK),
('a_socket1', stat.S_ISSOCK),
('block_dev', stat.S_ISBLK),
('random', stat.S_ISCHR),
)
entries = self.getStatEntries()
for filename, check in expected_modes:
self.assertTrue(check(entries[filename]['st_mode']))
def testStatDirectory_filePermissions(self):
should_have = (
('some_file', stat.S_IWUSR), # Owner can write.
('tmp', stat.S_IXOTH), # Others can execute.
('tmp', stat.S_ISVTX), # Has sticky bit.
('my_cmd', stat.S_ISGID), # Has set-group-ID bit.
('silly', stat.S_ISUID), # Has set UID bit.
)
should_not_have = (
('some_file', stat.S_IWOTH), # Others can't write.
('block_dev', stat.S_IRGRP), # Group can't read.
('silly', stat.S_IXUSR), # Owner can't execute.
)
entries = self.getStatEntries()
for filename, bit in should_have:
self.assertTrue(entries[filename]['st_mode'] & bit)
for filename, bit in should_not_have:
self.assertFalse(entries[filename]['st_mode'] & bit)
def testStatDirectory_numHardLinks(self):
entries = self.getStatEntries()
self.assertEqual(entries['some_dir']['st_nlink'], 6)
self.assertEqual(entries['some_file']['st_nlink'], 1)
self.assertFalse('st_nlink' in entries['tmp'])
def testStatDirectory_fileOwners(self):
entries = self.getStatEntries()
self.assertEqual(entries['some_dir']['st_owner'], 'root')
self.assertEqual(entries['my_cmd']['st_owner'], 'system')
self.assertEqual(entries['my_cmd']['st_group'], 'shell')
self.assertEqual(entries['tmp']['st_group'], 'misc')
def testStatDirectory_fileSize(self):
entries = self.getStatEntries()
self.assertEqual(entries['some_file']['st_size'], 723)
self.assertEqual(entries['My Music File']['st_size'], 327)
# Sizes are sometimes not reported for non-regular files, don't try to
# guess the size in those cases.
self.assertFalse('st_size' in entries['some_dir'])
def testStatDirectory_fileDateTime(self):
entries = self.getStatEntries()
self.assertEqual(entries['some_dir']['st_mtime'], 0) # Epoch!
self.assertEqual(entries['My Music File']['st_mtime'], 1234567800)
def testStatDirectory_deviceType(self):
entries = self.getStatEntries()
self.assertEqual(entries['random']['st_rdev_pair'], (10, 183))
self.assertEqual(entries['block_dev']['st_rdev_pair'], (7, 0))
def testStatDirectory_symbolicLinks(self):
entries = self.getStatEntries()
self.assertEqual(entries['lnk']['symbolic_link_to'], '/some/path')
for d in entries.itervalues():
self.assertEqual('symbolic_link_to' in d, stat.S_ISLNK(d['st_mode']))
class DeviceUtilsStatPathTest(DeviceUtilsTest):
EXAMPLE_DIRECTORY = [
{'filename': 'foo.txt', 'st_size': 123, 'st_time': 456},
{'filename': 'some_dir', 'st_time': 0}
]
INDEX = {e['filename']: e for e in EXAMPLE_DIRECTORY}
def testStatPath_file(self):
with self.assertCall(
self.call.device.StatDirectory('/data/local/tmp', as_root=False),
self.EXAMPLE_DIRECTORY):
self.assertEquals(self.INDEX['foo.txt'],
self.device.StatPath('/data/local/tmp/foo.txt'))
def testStatPath_directory(self):
with self.assertCall(
self.call.device.StatDirectory('/data/local/tmp', as_root=False),
self.EXAMPLE_DIRECTORY):
self.assertEquals(self.INDEX['some_dir'],
self.device.StatPath('/data/local/tmp/some_dir'))
def testStatPath_directoryWithTrailingSlash(self):
with self.assertCall(
self.call.device.StatDirectory('/data/local/tmp', as_root=False),
self.EXAMPLE_DIRECTORY):
self.assertEquals(self.INDEX['some_dir'],
self.device.StatPath('/data/local/tmp/some_dir/'))
def testStatPath_doesNotExist(self):
with self.assertCall(
self.call.device.StatDirectory('/data/local/tmp', as_root=False),
self.EXAMPLE_DIRECTORY):
with self.assertRaises(device_errors.CommandFailedError):
self.device.StatPath('/data/local/tmp/does.not.exist.txt')
class DeviceUtilsFileSizeTest(DeviceUtilsTest):
EXAMPLE_DIRECTORY = [
{'filename': 'foo.txt', 'st_size': 123, 'st_mtime': 456},
{'filename': 'some_dir', 'st_mtime': 0}
]
def testFileSize_file(self):
with self.assertCall(
self.call.device.StatDirectory('/data/local/tmp', as_root=False),
self.EXAMPLE_DIRECTORY):
self.assertEquals(123,
self.device.FileSize('/data/local/tmp/foo.txt'))
def testFileSize_doesNotExist(self):
with self.assertCall(
self.call.device.StatDirectory('/data/local/tmp', as_root=False),
self.EXAMPLE_DIRECTORY):
with self.assertRaises(device_errors.CommandFailedError):
self.device.FileSize('/data/local/tmp/does.not.exist.txt')
def testFileSize_directoryWithNoSize(self):
with self.assertCall(
self.call.device.StatDirectory('/data/local/tmp', as_root=False),
self.EXAMPLE_DIRECTORY):
with self.assertRaises(device_errors.CommandFailedError):
self.device.FileSize('/data/local/tmp/some_dir')
class DeviceUtilsSetJavaAssertsTest(DeviceUtilsTest):
def testSetJavaAsserts_enable(self):
with self.assertCalls(
(self.call.device.ReadFile(self.device.LOCAL_PROPERTIES_PATH),
'some.example.prop=with an example value\n'
'some.other.prop=value_ok\n'),
self.call.device.WriteFile(
self.device.LOCAL_PROPERTIES_PATH,
'some.example.prop=with an example value\n'
'some.other.prop=value_ok\n'
'dalvik.vm.enableassertions=all\n'),
(self.call.device.GetProp('dalvik.vm.enableassertions'), ''),
self.call.device.SetProp('dalvik.vm.enableassertions', 'all')):
self.assertTrue(self.device.SetJavaAsserts(True))
def testSetJavaAsserts_disable(self):
with self.assertCalls(
(self.call.device.ReadFile(self.device.LOCAL_PROPERTIES_PATH),
'some.example.prop=with an example value\n'
'dalvik.vm.enableassertions=all\n'
'some.other.prop=value_ok\n'),
self.call.device.WriteFile(
self.device.LOCAL_PROPERTIES_PATH,
'some.example.prop=with an example value\n'
'some.other.prop=value_ok\n'),
(self.call.device.GetProp('dalvik.vm.enableassertions'), 'all'),
self.call.device.SetProp('dalvik.vm.enableassertions', '')):
self.assertTrue(self.device.SetJavaAsserts(False))
def testSetJavaAsserts_alreadyEnabled(self):
with self.assertCalls(
(self.call.device.ReadFile(self.device.LOCAL_PROPERTIES_PATH),
'some.example.prop=with an example value\n'
'dalvik.vm.enableassertions=all\n'
'some.other.prop=value_ok\n'),
(self.call.device.GetProp('dalvik.vm.enableassertions'), 'all')):
self.assertFalse(self.device.SetJavaAsserts(True))
def testSetJavaAsserts_malformedLocalProp(self):
with self.assertCalls(
(self.call.device.ReadFile(self.device.LOCAL_PROPERTIES_PATH),
'some.example.prop=with an example value\n'
'malformed_property\n'
'dalvik.vm.enableassertions=all\n'
'some.other.prop=value_ok\n'),
(self.call.device.GetProp('dalvik.vm.enableassertions'), 'all')):
self.assertFalse(self.device.SetJavaAsserts(True))
class DeviceUtilsEnsureCacheInitializedTest(DeviceUtilsTest):
def testEnsureCacheInitialized_noCache_success(self):
self.assertIsNone(self.device._cache['token'])
with self.assertCall(
self.call.device.RunShellCommand(
AnyStringWith('getprop'),
shell=True, check_return=True, large_output=True),
['/sdcard', 'TOKEN']):
self.device._EnsureCacheInitialized()
self.assertIsNotNone(self.device._cache['token'])
def testEnsureCacheInitialized_noCache_failure(self):
self.assertIsNone(self.device._cache['token'])
with self.assertCall(
self.call.device.RunShellCommand(
AnyStringWith('getprop'),
shell=True, check_return=True, large_output=True),
self.TimeoutError()):
with self.assertRaises(device_errors.CommandTimeoutError):
self.device._EnsureCacheInitialized()
self.assertIsNone(self.device._cache['token'])
def testEnsureCacheInitialized_cache(self):
self.device._cache['token'] = 'TOKEN'
with self.assertCalls():
self.device._EnsureCacheInitialized()
self.assertIsNotNone(self.device._cache['token'])
class DeviceUtilsGetPropTest(DeviceUtilsTest):
def testGetProp_exists(self):
with self.assertCall(
self.call.device.RunShellCommand(
['getprop', 'test.property'], check_return=True, single_line=True,
timeout=self.device._default_timeout,
retries=self.device._default_retries),
'property_value'):
self.assertEqual('property_value',
self.device.GetProp('test.property'))
def testGetProp_doesNotExist(self):
with self.assertCall(
self.call.device.RunShellCommand(
['getprop', 'property.does.not.exist'],
check_return=True, single_line=True,
timeout=self.device._default_timeout,
retries=self.device._default_retries),
''):
self.assertEqual('', self.device.GetProp('property.does.not.exist'))
def testGetProp_cachedRoProp(self):
with self.assertCalls(
self.EnsureCacheInitialized(props=['[ro.build.type]: [userdebug]'])):
self.assertEqual('userdebug',
self.device.GetProp('ro.build.type', cache=True))
self.assertEqual('userdebug',
self.device.GetProp('ro.build.type', cache=True))
class DeviceUtilsSetPropTest(DeviceUtilsTest):
def testSetProp(self):
with self.assertCall(
self.call.device.RunShellCommand(
['setprop', 'test.property', 'test value'], check_return=True)):
self.device.SetProp('test.property', 'test value')
def testSetProp_check_succeeds(self):
with self.assertCalls(
(self.call.device.RunShellCommand(
['setprop', 'test.property', 'new_value'], check_return=True)),
(self.call.device.GetProp('test.property', cache=False), 'new_value')):
self.device.SetProp('test.property', 'new_value', check=True)
def testSetProp_check_fails(self):
with self.assertCalls(
(self.call.device.RunShellCommand(
['setprop', 'test.property', 'new_value'], check_return=True)),
(self.call.device.GetProp('test.property', cache=False), 'old_value')):
with self.assertRaises(device_errors.CommandFailedError):
self.device.SetProp('test.property', 'new_value', check=True)
class DeviceUtilsGetPidsTest(DeviceUtilsTest):
def setUp(self):
super(DeviceUtilsGetPidsTest, self).setUp()
self.sample_output = [
'USER PID PPID VSIZE RSS WCHAN PC NAME',
'user 1001 100 1024 1024 ffffffff 00000000 one.match',
'user 1002 100 1024 1024 ffffffff 00000000 two.match',
'user 1003 100 1024 1024 ffffffff 00000000 three.match',
'user 1234 100 1024 1024 ffffffff 00000000 my$process',
'user 1000 100 1024 1024 ffffffff 00000000 foo',
'user 1236 100 1024 1024 ffffffff 00000000 foo',
]
def _grepOutput(self, substring):
return [line for line in self.sample_output if substring in line]
def testGetPids_sdkGreaterThanNougatMR1(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=(version_codes.NOUGAT_MR1 + 1)):
with self.patch_call(self.call.device.build_id,
return_value='ZZZ99Z'):
with self.assertCall(
self.call.device._RunPipedShellCommand(
'ps -e | grep -F example.process'), []):
self.device.GetPids('example.process')
def testGetPids_noMatches(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertCall(
self.call.device._RunPipedShellCommand('ps | grep -F does.not.match'),
self._grepOutput('does.not.match')):
self.assertEqual({}, self.device.GetPids('does.not.match'))
def testGetPids_oneMatch(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertCall(
self.call.device._RunPipedShellCommand('ps | grep -F one.match'),
self._grepOutput('one.match')):
self.assertEqual(
{'one.match': ['1001']},
self.device.GetPids('one.match'))
def testGetPids_multipleMatches(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertCall(
self.call.device._RunPipedShellCommand('ps | grep -F match'),
self._grepOutput('match')):
self.assertEqual(
{'one.match': ['1001'],
'two.match': ['1002'],
'three.match': ['1003']},
self.device.GetPids('match'))
def testGetPids_quotable(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertCall(
self.call.device._RunPipedShellCommand("ps | grep -F 'my$process'"),
self._grepOutput('my$process')):
self.assertEqual(
{'my$process': ['1234']}, self.device.GetPids('my$process'))
def testGetPids_multipleInstances(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertCall(
self.call.device._RunPipedShellCommand('ps | grep -F foo'),
self._grepOutput('foo')):
self.assertEqual(
{'foo': ['1000', '1236']},
self.device.GetPids('foo'))
def testGetPids_allProcesses(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertCall(
self.call.device.RunShellCommand(
['ps'], check_return=True, large_output=True),
self.sample_output):
self.assertEqual(
{'one.match': ['1001'],
'two.match': ['1002'],
'three.match': ['1003'],
'my$process': ['1234'],
'foo': ['1000', '1236']},
self.device.GetPids())
def testGetApplicationPids_notFound(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertCall(
self.call.device._RunPipedShellCommand('ps | grep -F match'),
self._grepOutput('match')):
# No PIDs found, process name should be exact match.
self.assertEqual([], self.device.GetApplicationPids('match'))
def testGetApplicationPids_foundOne(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertCall(
self.call.device._RunPipedShellCommand('ps | grep -F one.match'),
self._grepOutput('one.match')):
self.assertEqual(['1001'], self.device.GetApplicationPids('one.match'))
def testGetApplicationPids_foundMany(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertCall(
self.call.device._RunPipedShellCommand('ps | grep -F foo'),
self._grepOutput('foo')):
self.assertEqual(
['1000', '1236'],
self.device.GetApplicationPids('foo'))
def testGetApplicationPids_atMostOneNotFound(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertCall(
self.call.device._RunPipedShellCommand('ps | grep -F match'),
self._grepOutput('match')):
# No PIDs found, process name should be exact match.
self.assertEqual(
None,
self.device.GetApplicationPids('match', at_most_one=True))
def testGetApplicationPids_atMostOneFound(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertCall(
self.call.device._RunPipedShellCommand('ps | grep -F one.match'),
self._grepOutput('one.match')):
self.assertEqual(
'1001',
self.device.GetApplicationPids('one.match', at_most_one=True))
def testGetApplicationPids_atMostOneFoundTooMany(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertRaises(device_errors.CommandFailedError):
with self.assertCall(
self.call.device._RunPipedShellCommand('ps | grep -F foo'),
self._grepOutput('foo')):
self.device.GetApplicationPids('foo', at_most_one=True)
class DeviceUtilsGetSetEnforce(DeviceUtilsTest):
def testGetEnforce_Enforcing(self):
with self.assertCall(self.call.adb.Shell('getenforce'), 'Enforcing'):
self.assertEqual(True, self.device.GetEnforce())
def testGetEnforce_Permissive(self):
with self.assertCall(self.call.adb.Shell('getenforce'), 'Permissive'):
self.assertEqual(False, self.device.GetEnforce())
def testGetEnforce_Disabled(self):
with self.assertCall(self.call.adb.Shell('getenforce'), 'Disabled'):
self.assertEqual(None, self.device.GetEnforce())
def testSetEnforce_Enforcing(self):
with self.assertCalls(
(self.call.device.NeedsSU(), False),
(self.call.adb.Shell('setenforce 1'), '')):
self.device.SetEnforce(enabled=True)
def testSetEnforce_Permissive(self):
with self.assertCalls(
(self.call.device.NeedsSU(), False),
(self.call.adb.Shell('setenforce 0'), '')):
self.device.SetEnforce(enabled=False)
def testSetEnforce_EnforcingWithInt(self):
with self.assertCalls(
(self.call.device.NeedsSU(), False),
(self.call.adb.Shell('setenforce 1'), '')):
self.device.SetEnforce(enabled=1)
def testSetEnforce_PermissiveWithInt(self):
with self.assertCalls(
(self.call.device.NeedsSU(), False),
(self.call.adb.Shell('setenforce 0'), '')):
self.device.SetEnforce(enabled=0)
def testSetEnforce_EnforcingWithStr(self):
with self.assertCalls(
(self.call.device.NeedsSU(), False),
(self.call.adb.Shell('setenforce 1'), '')):
self.device.SetEnforce(enabled='1')
def testSetEnforce_PermissiveWithStr(self):
with self.assertCalls(
(self.call.device.NeedsSU(), False),
(self.call.adb.Shell('setenforce 0'), '')):
self.device.SetEnforce(enabled='0') # Not recommended but it works!
class DeviceUtilsTakeScreenshotTest(DeviceUtilsTest):
def testTakeScreenshot_fileNameProvided(self):
with self.assertCalls(
(mock.call.devil.android.device_temp_file.DeviceTempFile(
self.adb, suffix='.png'),
MockTempFile('/tmp/path/temp-123.png')),
(self.call.adb.Shell('/system/bin/screencap -p /tmp/path/temp-123.png'),
''),
self.call.device.PullFile('/tmp/path/temp-123.png',
'/test/host/screenshot.png')):
self.device.TakeScreenshot('/test/host/screenshot.png')
class DeviceUtilsGetMemoryUsageForPidTest(DeviceUtilsTest):
def setUp(self):
super(DeviceUtilsGetMemoryUsageForPidTest, self).setUp()
def testGetMemoryUsageForPid_validPid(self):
with self.assertCalls(
(self.call.device._RunPipedShellCommand(
'showmap 1234 | grep TOTAL', as_root=True),
['100 101 102 103 104 105 106 107 TOTAL']),
(self.call.device.ReadFile('/proc/1234/status', as_root=True),
'VmHWM: 1024 kB\n')):
self.assertEqual(
{
'Size': 100,
'Rss': 101,
'Pss': 102,
'Shared_Clean': 103,
'Shared_Dirty': 104,
'Private_Clean': 105,
'Private_Dirty': 106,
'VmHWM': 1024
},
self.device.GetMemoryUsageForPid(1234))
def testGetMemoryUsageForPid_noSmaps(self):
with self.assertCalls(
(self.call.device._RunPipedShellCommand(
'showmap 4321 | grep TOTAL', as_root=True),
['cannot open /proc/4321/smaps: No such file or directory']),
(self.call.device.ReadFile('/proc/4321/status', as_root=True),
'VmHWM: 1024 kb\n')):
self.assertEquals({'VmHWM': 1024}, self.device.GetMemoryUsageForPid(4321))
def testGetMemoryUsageForPid_noStatus(self):
with self.assertCalls(
(self.call.device._RunPipedShellCommand(
'showmap 4321 | grep TOTAL', as_root=True),
['100 101 102 103 104 105 106 107 TOTAL']),
(self.call.device.ReadFile('/proc/4321/status', as_root=True),
self.CommandError())):
self.assertEquals(
{
'Size': 100,
'Rss': 101,
'Pss': 102,
'Shared_Clean': 103,
'Shared_Dirty': 104,
'Private_Clean': 105,
'Private_Dirty': 106,
},
self.device.GetMemoryUsageForPid(4321))
class DeviceUtilsDismissCrashDialogIfNeededTest(DeviceUtilsTest):
def testDismissCrashDialogIfNeeded_crashedPageckageNotFound(self):
sample_dumpsys_output = '''
WINDOW MANAGER WINDOWS (dumpsys window windows)
Window #11 Window{f8b647a u0 SearchPanel}:
mDisplayId=0 mSession=Session{8 94:122} mClient=android.os.BinderProxy@1ba5
mOwnerUid=100 mShowToOwnerOnly=false package=com.android.systemui appop=NONE
mAttrs=WM.LayoutParams{(0,0)(fillxfill) gr=#53 sim=#31 ty=2024 fl=100
Requested w=1080 h=1920 mLayoutSeq=426
mBaseLayer=211000 mSubLayer=0 mAnimLayer=211000+0=211000 mLastLayer=211000
'''
with self.assertCalls(
(self.call.device.RunShellCommand(
['dumpsys', 'window', 'windows'], check_return=True,
large_output=True), sample_dumpsys_output.split('\n'))):
package_name = self.device.DismissCrashDialogIfNeeded()
self.assertIsNone(package_name)
def testDismissCrashDialogIfNeeded_crashedPageckageFound(self):
sample_dumpsys_output = '''
WINDOW MANAGER WINDOWS (dumpsys window windows)
Window #11 Window{f8b647a u0 SearchPanel}:
mDisplayId=0 mSession=Session{8 94:122} mClient=android.os.BinderProxy@1ba5
mOwnerUid=102 mShowToOwnerOnly=false package=com.android.systemui appop=NONE
mAttrs=WM.LayoutParams{(0,0)(fillxfill) gr=#53 sim=#31 ty=2024 fl=100
Requested w=1080 h=1920 mLayoutSeq=426
mBaseLayer=211000 mSubLayer=0 mAnimLayer=211000+0=211000 mLastLayer=211000
mHasPermanentDpad=false
mCurrentFocus=Window{3a27740f u0 Application Error: com.android.chrome}
mFocusedApp=AppWindowToken{470af6f token=Token{272ec24e ActivityRecord{t894}}}
'''
with self.assertCalls(
(self.call.device.RunShellCommand(
['dumpsys', 'window', 'windows'], check_return=True,
large_output=True), sample_dumpsys_output.split('\n')),
(self.call.device.RunShellCommand(
['input', 'keyevent', '22'], check_return=True)),
(self.call.device.RunShellCommand(
['input', 'keyevent', '22'], check_return=True)),
(self.call.device.RunShellCommand(
['input', 'keyevent', '66'], check_return=True)),
(self.call.device.RunShellCommand(
['dumpsys', 'window', 'windows'], check_return=True,
large_output=True), [])):
package_name = self.device.DismissCrashDialogIfNeeded()
self.assertEqual(package_name, 'com.android.chrome')
class DeviceUtilsClientCache(DeviceUtilsTest):
def testClientCache_twoCaches(self):
self.device._cache['test'] = 0
client_cache_one = self.device.GetClientCache('ClientOne')
client_cache_one['test'] = 1
client_cache_two = self.device.GetClientCache('ClientTwo')
client_cache_two['test'] = 2
self.assertEqual(self.device._cache['test'], 0)
self.assertEqual(client_cache_one, {'test': 1})
self.assertEqual(client_cache_two, {'test': 2})
self.device._ClearCache()
self.assertTrue('test' not in self.device._cache)
self.assertEqual(client_cache_one, {})
self.assertEqual(client_cache_two, {})
def testClientCache_multipleInstances(self):
client_cache_one = self.device.GetClientCache('ClientOne')
client_cache_one['test'] = 1
client_cache_two = self.device.GetClientCache('ClientOne')
self.assertEqual(client_cache_one, {'test': 1})
self.assertEqual(client_cache_two, {'test': 1})
self.device._ClearCache()
self.assertEqual(client_cache_one, {})
self.assertEqual(client_cache_two, {})
class DeviceUtilsHealthyDevicesTest(mock_calls.TestCase):
def testHealthyDevices_emptyBlacklist_defaultDeviceArg(self):
test_serials = ['0123456789abcdef', 'fedcba9876543210']
with self.assertCalls(
(mock.call.devil.android.sdk.adb_wrapper.AdbWrapper.Devices(),
[_AdbWrapperMock(s) for s in test_serials])):
blacklist = mock.NonCallableMock(**{'Read.return_value': []})
devices = device_utils.DeviceUtils.HealthyDevices(blacklist)
for serial, device in zip(test_serials, devices):
self.assertTrue(isinstance(device, device_utils.DeviceUtils))
self.assertEquals(serial, device.adb.GetDeviceSerial())
def testHealthyDevices_blacklist_defaultDeviceArg(self):
test_serials = ['0123456789abcdef', 'fedcba9876543210']
with self.assertCalls(
(mock.call.devil.android.sdk.adb_wrapper.AdbWrapper.Devices(),
[_AdbWrapperMock(s) for s in test_serials])):
blacklist = mock.NonCallableMock(
**{'Read.return_value': ['fedcba9876543210']})
devices = device_utils.DeviceUtils.HealthyDevices(blacklist)
self.assertEquals(1, len(devices))
self.assertTrue(isinstance(devices[0], device_utils.DeviceUtils))
self.assertEquals('0123456789abcdef', devices[0].adb.GetDeviceSerial())
def testHealthyDevices_noneDeviceArg_multiple_attached(self):
test_serials = ['0123456789abcdef', 'fedcba9876543210']
with self.assertCalls(
(mock.call.devil.android.sdk.adb_wrapper.AdbWrapper.Devices(),
[_AdbWrapperMock(s) for s in test_serials]),
(mock.call.devil.android.device_errors.MultipleDevicesError(mock.ANY),
_MockMultipleDevicesError())):
with self.assertRaises(_MockMultipleDevicesError):
device_utils.DeviceUtils.HealthyDevices(device_arg=None)
def testHealthyDevices_noneDeviceArg_one_attached(self):
test_serials = ['0123456789abcdef']
with self.assertCalls(
(mock.call.devil.android.sdk.adb_wrapper.AdbWrapper.Devices(),
[_AdbWrapperMock(s) for s in test_serials])):
devices = device_utils.DeviceUtils.HealthyDevices(device_arg=None)
self.assertEquals(1, len(devices))
def testHealthyDevices_noneDeviceArg_no_attached(self):
test_serials = []
with self.assertCalls(
(mock.call.devil.android.sdk.adb_wrapper.AdbWrapper.Devices(),
[_AdbWrapperMock(s) for s in test_serials])):
with self.assertRaises(device_errors.NoDevicesError):
device_utils.DeviceUtils.HealthyDevices(device_arg=None)
def testHealthyDevices_noneDeviceArg_multiple_attached_ANDROID_SERIAL(self):
try:
os.environ['ANDROID_SERIAL'] = '0123456789abcdef'
with self.assertCalls(): # Should skip adb devices when device is known.
device_utils.DeviceUtils.HealthyDevices(device_arg=None)
finally:
del os.environ['ANDROID_SERIAL']
def testHealthyDevices_stringDeviceArg(self):
with self.assertCalls(): # Should skip adb devices when device is known.
devices = device_utils.DeviceUtils.HealthyDevices(
device_arg='0123456789abcdef')
self.assertEquals(1, len(devices))
def testHealthyDevices_EmptyListDeviceArg_multiple_attached(self):
test_serials = ['0123456789abcdef', 'fedcba9876543210']
with self.assertCalls(
(mock.call.devil.android.sdk.adb_wrapper.AdbWrapper.Devices(),
[_AdbWrapperMock(s) for s in test_serials])):
devices = device_utils.DeviceUtils.HealthyDevices(device_arg=())
self.assertEquals(2, len(devices))
def testHealthyDevices_EmptyListDeviceArg_ANDROID_SERIAL(self):
try:
os.environ['ANDROID_SERIAL'] = '0123456789abcdef'
with self.assertCalls(): # Should skip adb devices when device is known.
devices = device_utils.DeviceUtils.HealthyDevices(device_arg=())
finally:
del os.environ['ANDROID_SERIAL']
self.assertEquals(1, len(devices))
def testHealthyDevices_EmptyListDeviceArg_no_attached(self):
test_serials = []
with self.assertCalls(
(mock.call.devil.android.sdk.adb_wrapper.AdbWrapper.Devices(),
[_AdbWrapperMock(s) for s in test_serials])):
with self.assertRaises(device_errors.NoDevicesError):
device_utils.DeviceUtils.HealthyDevices(device_arg=[])
def testHealthyDevices_ListDeviceArg(self):
device_arg = ['0123456789abcdef', 'fedcba9876543210']
try:
os.environ['ANDROID_SERIAL'] = 'should-not-apply'
with self.assertCalls(): # Should skip adb devices when device is known.
devices = device_utils.DeviceUtils.HealthyDevices(device_arg=device_arg)
finally:
del os.environ['ANDROID_SERIAL']
self.assertEquals(2, len(devices))
class DeviceUtilsRestartAdbdTest(DeviceUtilsTest):
def testAdbdRestart(self):
mock_temp_file = '/sdcard/temp-123.sh'
with self.assertCalls(
(mock.call.devil.android.device_temp_file.DeviceTempFile(
self.adb, suffix='.sh'), MockTempFile(mock_temp_file)),
self.call.device.WriteFile(mock.ANY, mock.ANY),
(self.call.device.RunShellCommand(
['source', mock_temp_file], check_return=True, as_root=True)),
self.call.adb.WaitForDevice()):
self.device.RestartAdbd()
class DeviceUtilsGrantPermissionsTest(DeviceUtilsTest):
def testGrantPermissions_none(self):
self.device.GrantPermissions('package', [])
def testGrantPermissions_underM(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
self.device.GrantPermissions('package', ['p1'])
def testGrantPermissions_one(self):
permissions_cmd = 'pm grant package p1'
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.MARSHMALLOW):
with self.assertCalls(
(self.call.device.RunShellCommand(
permissions_cmd, shell=True, check_return=True), [])):
self.device.GrantPermissions('package', ['p1'])
def testGrantPermissions_multiple(self):
permissions_cmd = 'pm grant package p1&&pm grant package p2'
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.MARSHMALLOW):
with self.assertCalls(
(self.call.device.RunShellCommand(
permissions_cmd, shell=True, check_return=True), [])):
self.device.GrantPermissions('package', ['p1', 'p2'])
def testGrantPermissions_WriteExtrnalStorage(self):
permissions_cmd = (
'pm grant package android.permission.WRITE_EXTERNAL_STORAGE&&'
'pm grant package android.permission.READ_EXTERNAL_STORAGE')
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.MARSHMALLOW):
with self.assertCalls(
(self.call.device.RunShellCommand(
permissions_cmd, shell=True, check_return=True), [])):
self.device.GrantPermissions(
'package', ['android.permission.WRITE_EXTERNAL_STORAGE'])
def testGrantPermissions_BlackList(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.MARSHMALLOW):
self.device.GrantPermissions(
'package', ['android.permission.ACCESS_MOCK_LOCATION'])
class DeviecUtilsIsScreenOn(DeviceUtilsTest):
_L_SCREEN_ON = ['test=test mInteractive=true']
_K_SCREEN_ON = ['test=test mScreenOn=true']
_L_SCREEN_OFF = ['mInteractive=false']
_K_SCREEN_OFF = ['mScreenOn=false']
def testIsScreenOn_onPreL(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.KITKAT):
with self.assertCalls(
(self.call.device._RunPipedShellCommand(
'dumpsys input_method | grep mScreenOn'), self._K_SCREEN_ON)):
self.assertTrue(self.device.IsScreenOn())
def testIsScreenOn_onL(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertCalls(
(self.call.device._RunPipedShellCommand(
'dumpsys input_method | grep mInteractive'), self._L_SCREEN_ON)):
self.assertTrue(self.device.IsScreenOn())
def testIsScreenOn_offPreL(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.KITKAT):
with self.assertCalls(
(self.call.device._RunPipedShellCommand(
'dumpsys input_method | grep mScreenOn'), self._K_SCREEN_OFF)):
self.assertFalse(self.device.IsScreenOn())
def testIsScreenOn_offL(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertCalls(
(self.call.device._RunPipedShellCommand(
'dumpsys input_method | grep mInteractive'), self._L_SCREEN_OFF)):
self.assertFalse(self.device.IsScreenOn())
def testIsScreenOn_noOutput(self):
with self.patch_call(self.call.device.build_version_sdk,
return_value=version_codes.LOLLIPOP):
with self.assertCalls(
(self.call.device._RunPipedShellCommand(
'dumpsys input_method | grep mInteractive'), [])):
with self.assertRaises(device_errors.CommandFailedError):
self.device.IsScreenOn()
class DeviecUtilsSetScreen(DeviceUtilsTest):
@mock.patch('time.sleep', mock.Mock())
def testSetScren_alreadySet(self):
with self.assertCalls(
(self.call.device.IsScreenOn(), False)):
self.device.SetScreen(False)
@mock.patch('time.sleep', mock.Mock())
def testSetScreen_on(self):
with self.assertCalls(
(self.call.device.IsScreenOn(), False),
(self.call.device.SendKeyEvent(keyevent.KEYCODE_POWER), None),
(self.call.device.IsScreenOn(), True)):
self.device.SetScreen(True)
@mock.patch('time.sleep', mock.Mock())
def testSetScreen_off(self):
with self.assertCalls(
(self.call.device.IsScreenOn(), True),
(self.call.device.SendKeyEvent(keyevent.KEYCODE_POWER), None),
(self.call.device.IsScreenOn(), False)):
self.device.SetScreen(False)
@mock.patch('time.sleep', mock.Mock())
def testSetScreen_slow(self):
with self.assertCalls(
(self.call.device.IsScreenOn(), True),
(self.call.device.SendKeyEvent(keyevent.KEYCODE_POWER), None),
(self.call.device.IsScreenOn(), True),
(self.call.device.IsScreenOn(), True),
(self.call.device.IsScreenOn(), False)):
self.device.SetScreen(False)
class DeviecUtilsLoadCacheData(DeviceUtilsTest):
def testTokenMissing(self):
with self.assertCalls(
self.EnsureCacheInitialized()):
self.assertFalse(self.device.LoadCacheData('{}'))
def testTokenStale(self):
with self.assertCalls(
self.EnsureCacheInitialized()):
self.assertFalse(self.device.LoadCacheData('{"token":"foo"}'))
def testTokenMatches(self):
with self.assertCalls(
self.EnsureCacheInitialized()):
self.assertTrue(self.device.LoadCacheData('{"token":"TOKEN"}'))
def testDumpThenLoad(self):
with self.assertCalls(
self.EnsureCacheInitialized()):
data = json.loads(self.device.DumpCacheData())
data['token'] = 'TOKEN'
self.assertTrue(self.device.LoadCacheData(json.dumps(data)))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
unittest.main(verbosity=2)