blob: 53d998f8440b8c3acf77893395c27194282d31fc [file] [log] [blame]
import unittest
from ota_interface import JobInfo, ProcessesManagement
from unittest.mock import patch, mock_open, Mock, MagicMock
import os
import sqlite3
import copy
class TestJobInfo(unittest.TestCase):
def setUp(self):
self.test_id = '286feab0-f16b-11eb-a72a-f7b1de0921ef'
self.test_target = 'target/build.zip'
self.test_verbose = True
self.test_status = 'Running'
self.test_extra = '--downgrade'
self.test_start_time = 1628698830
self.test_finish_time = 1628698831
def setup_job(self, incremental = '', partial = [],
output = '', stdout = '', stderr = ''):
job_info = JobInfo(id=self.test_id,
target=self.test_target,
incremental=incremental,
verbose=self.test_verbose,
partial=partial,
output=output,
status=self.test_status,
extra=self.test_extra,
start_time=self.test_start_time,
finish_time=self.test_finish_time,
stderr=stderr,
stdout=stdout
)
return job_info
def test_init(self):
# No incremental, no output, no stdout/stderr set.
job_info1 = self.setup_job()
for key, value in self.__dict__.items():
if key.startswith('test_'):
self.assertEqual(job_info1.__dict__[key[5:]], value,
'The value of ' + key + 'is not initialized correctly'
)
self.assertEqual(job_info1.output, 'output/'+self.test_id+'.zip',
'Default output cannot be setup correctly'
)
self.assertEqual(job_info1.stderr, 'output/stderr.'+self.test_id,
'Default stderr cannot be setup correctly'
)
self.assertEqual(job_info1.stdout, 'output/stdout.'+self.test_id,
'Default stdout cannot be setup correctly'
)
# Test the incremental setup
job_info2 = self.setup_job(incremental='target/source.zip')
self.assertEqual(job_info2.incremental, 'target/source.zip',
'incremental source cannot be initialized correctly'
)
self.assertTrue(job_info2.isIncremental,
'incremental status cannot be initialized correctly'
)
# Test the stdout/stderr setup
job_info3 = self.setup_job(stderr='output/stderr',
stdout='output/stdout'
)
self.assertEqual(job_info3.stderr, 'output/stderr',
'the stderr cannot be setup manually'
)
self.assertEqual(job_info3.stdout, 'output/stdout',
'the stdout cannot be setup manually'
)
# Test the output setup
job_info4 = self.setup_job(output='output/output.zip')
self.assertEqual(job_info4.output, 'output/output.zip',
'output cannot be setup manually'
)
# Test the partial setup
job_info5 = self.setup_job(partial=['system', 'vendor'])
self.assertEqual(job_info5.partial, ['system', 'vendor'],
'partial list cannot be setup correctly'
)
def test_to_sql_form_dict(self):
partial_list = ['system', 'vendor']
job_info = self.setup_job(partial=partial_list)
sql_dict = job_info.to_sql_form_dict()
test_dict = {'id': self.test_id,
'target': self.test_target,
'incremental': '',
'verbose': int(self.test_verbose),
'partial': ','.join(partial_list),
'output': 'output/' + self.test_id + '.zip',
'status': self.test_status,
'extra': self.test_extra,
'stdout': 'output/stdout.' + self.test_id,
'stderr': 'output/stderr.' + self.test_id,
'start_time': self.test_start_time,
'finish_time': self.test_finish_time,
'isPartial': True,
'isIncremental': False
}
for key, value in test_dict.items():
self.assertEqual(value, sql_dict[key],
'the ' + key + ' is not converted to sql form dict correctly'
)
def test_to_dict_basic(self):
partial_list = ['system', 'vendor']
job_info = self.setup_job(partial=partial_list)
basic_dict = job_info.to_dict_basic()
test_dict = {'id': self.test_id,
'target': self.test_target,
'target_name': self.test_target.split('/')[-1],
'incremental': '',
'verbose': int(self.test_verbose),
'partial': partial_list,
'output': 'output/' + self.test_id + '.zip',
'status': self.test_status,
'extra': self.test_extra,
'stdout': 'output/stdout.' + self.test_id,
'stderr': 'output/stderr.' + self.test_id,
'start_time': self.test_start_time,
'finish_time': self.test_finish_time,
'isPartial': True,
'isIncremental': False
}
for key, value in test_dict.items():
self.assertEqual(value, basic_dict[key],
'the ' + key + ' is not converted to basic form dict correctly'
)
def test_to_dict_detail(self):
partial_list = ['system', 'vendor']
test_incremental = 'target/source.zip'
job_info = self.setup_job(partial=partial_list, incremental=test_incremental)
mock_target_lib = Mock()
mock_target_lib.get_build_by_path = Mock(
side_effect = [
Mock(file_name='build.zip', build_version=''),
Mock(file_name='source.zip', build_version='')
]
)
test_dict = {'id': self.test_id,
'target': self.test_target,
'incremental': 'target/source.zip',
'verbose': int(self.test_verbose),
'partial': partial_list,
'output': 'output/' + self.test_id + '.zip',
'status': self.test_status,
'extra': self.test_extra,
'stdout': 'NO STD OUTPUT IS FOUND',
'stderr': 'NO STD ERROR IS FOUND',
'start_time': self.test_start_time,
'finish_time': self.test_finish_time,
'isPartial': True,
'isIncremental': True
}
# Test with no stdout and stderr
dict_detail = job_info.to_dict_detail(mock_target_lib)
mock_target_lib.get_build_by_path.assert_any_call(self.test_target)
mock_target_lib.get_build_by_path.assert_any_call(test_incremental)
for key, value in test_dict.items():
self.assertEqual(value, dict_detail[key],
'the ' + key + ' is not converted to detailed dict correctly'
)
# Test with mocked stdout and stderr
mock_target_lib.get_build_by_path = Mock(
side_effect = [
Mock(file_name='build.zip', build_version=''),
Mock(file_name='source.zip', build_version='')
]
)
mock_file = mock_open(read_data="mock output")
with patch("builtins.open", mock_file):
dict_detail = job_info.to_dict_detail(mock_target_lib)
test_dict['stderr'] = 'mock output'
test_dict['stdout'] = 'mock output'
for key, value in test_dict.items():
self.assertEqual(value, dict_detail[key],
'the ' + key + ' is not converted to detailed dict correctly'
)
class TestProcessesManagement(unittest.TestCase):
def setUp(self):
if os.path.isfile('test_process.db'):
self.tearDown()
self.processes = ProcessesManagement(db_path='test_process.db')
testcase_job_info = TestJobInfo()
testcase_job_info.setUp()
self.test_job_info = testcase_job_info.setup_job(incremental='target/source.zip')
self.processes.insert_database(self.test_job_info)
def tearDown(self):
os.remove('test_process.db')
try:
os.remove('output/stderr.'+self.test_job_info.id)
os.remove('output/stdout.'+self.test_job_info.id)
except FileNotFoundError:
pass
def test_init(self):
# Test the database is created successfully
self.assertTrue(os.path.isfile('test_process.db'))
test_columns = [
{'name': 'ID','type':'TEXT'},
{'name': 'TargetPath','type':'TEXT'},
{'name': 'IncrementalPath','type':'TEXT'},
{'name': 'Verbose','type':'INTEGER'},
{'name': 'Partial','type':'TEXT'},
{'name': 'OutputPath','type':'TEXT'},
{'name': 'Status','type':'TEXT'},
{'name': 'Downgrade','type':'INTEGER'},
{'name': 'OtherFlags','type':'TEXT'},
{'name': 'STDOUT','type':'TEXT'},
{'name': 'STDERR','type':'TEXT'},
{'name': 'StartTime','type':'INTEGER'},
{'name': 'FinishTime','type':'INTEGER'},
]
connect = sqlite3.connect('test_process.db')
cursor = connect.cursor()
cursor.execute("PRAGMA table_info(jobs)")
columns = cursor.fetchall()
for column in test_columns:
column_found = list(filter(lambda x: x[1]==column['name'], columns))
self.assertEqual(len(column_found), 1,
'The column ' + column['name'] + ' is not found in database'
)
self.assertEqual(column_found[0][2], column['type'],
'The column' + column['name'] + ' has a wrong type'
)
def test_get_status_by_ID(self):
job_info = self.processes.get_status_by_ID(self.test_job_info.id)
self.assertEqual(job_info, self.test_job_info,
'The data read from database is not the same one as inserted'
)
def test_get_status(self):
# Insert the same info again, but change the last digit of id to 0
test_job_info2 = copy.copy(self.test_job_info)
test_job_info2.id = test_job_info2.id[:-1] + '0'
self.processes.insert_database(test_job_info2)
job_infos = self.processes.get_status()
self.assertEqual(len(job_infos), 2,
'The number of data entries is not the same as created'
)
self.assertEqual(job_infos[0], self.test_job_info,
'The data list read from database is not the same one as inserted'
)
self.assertEqual(job_infos[1], test_job_info2,
'The data list read from database is not the same one as inserted'
)
def test_ota_run(self):
# Test when the job exit normally
mock_proc = Mock()
mock_proc.wait = Mock(return_value=0)
mock_Popen = Mock(return_value=mock_proc)
test_command = [
"ota_from_target_files", "-v","build/target.zip", "output/ota.zip",
]
mock_pipes_template = Mock()
mock_pipes_template.open = Mock()
mock_Template = Mock(return_value=mock_pipes_template)
# Mock the subprocess.Popen, subprocess.Popen().wait and pipes.Template
with patch("subprocess.Popen", mock_Popen), \
patch("pipes.Template", mock_Template):
self.processes.ota_run(test_command, self.test_job_info.id)
mock_Popen.assert_called_once()
mock_proc.wait.assert_called_once()
job_info = self.processes.get_status_by_ID(self.test_job_info.id)
self.assertEqual(job_info.status, 'Finished')
mock_Popen.reset_mock()
mock_proc.wait.reset_mock()
# Test when the job exit with prbolems
mock_proc.wait = Mock(return_value=1)
with patch("subprocess.Popen", mock_Popen), \
patch("pipes.Template", mock_Template):
self.processes.ota_run(test_command, self.test_job_info.id)
mock_Popen.assert_called_once()
mock_proc.wait.assert_called_once()
job_info = self.processes.get_status_by_ID(self.test_job_info.id)
self.assertEqual(job_info.status, 'Error')
def test_ota_generate(self):
test_args = dict({
'output': 'ota.zip',
'extra_keys': ['downgrade', 'wipe_user_data'],
'extra': '--disable_vabc',
'isIncremental': True,
'isPartial': True,
'partial': ['system', 'vendor'],
'incremental': 'target/source.zip',
'target': 'target/build.zip',
'verbose': True
})
# Usually the order of commands make no difference, but the following
# order has been validated, so it is best to follow this manner:
# ota_from_target_files [flags like -v, --downgrade]
# [-i incremental_source] [-p partial_list] target output
test_command = [
'ota_from_target_files', '-v', '--downgrade',
'--wipe_user_data', '--disable_vabc', '-k',
'build/make/target/product/security/testkey',
'-i', 'target/source.zip',
'--partial', 'system vendor', 'target/build.zip', 'ota.zip'
]
mock_os_path_isfile = Mock(return_value=True)
mock_threading = Mock()
mock_thread = Mock(return_value=mock_threading)
with patch("os.path.isfile", mock_os_path_isfile), \
patch("threading.Thread", mock_thread):
self.processes.ota_generate(test_args, id='test')
job_info = self.processes.get_status_by_ID('test')
self.assertEqual(job_info.status, 'Running',
'The job cannot be stored into database properly'
)
# Test if the job stored into database properly
for key, value in test_args.items():
# extra_keys is merged to extra when stored into database
if key=='extra_keys':
continue
self.assertEqual(job_info.__dict__[key], value,
'The column ' + key + ' is not stored into database properly'
)
# Test if the command is in its order
self.assertEqual(mock_thread.call_args[1]['args'][0], test_command,
'The subprocess command is not in its good shape'
)
if __name__ == '__main__':
unittest.main()