pushimage: push keyset loading down to allow for input flexibility
Sometimes we do want sub-artifacts to sign with a different keyset, like
when a firmware builder also produces accessory firmware. The current
code will only check the keysets from the main file and then ignore any
setting in the sub-file. Instead, delay the keyset loading to after we
know the image type so we can pull it from the specific input file.
BUG=chrome-os-partner:46635
TEST=`./cbuildbot/run_tests` passes
TEST=`./pushimage -n gs://foo/ --board smaug --debug` with custom input insns shows correct output insns
Change-Id: I8ed170e43b6e2c115d39dfdc772fbc93054cc9d7
Reviewed-on: https://chromium-review.googlesource.com/311280
Commit-Ready: Mike Frysinger <vapier@chromium.org>
Tested-by: Mike Frysinger <vapier@chromium.org>
Reviewed-by: Vincent Palatin <vpalatin@chromium.org>
diff --git a/scripts/pushimage.py b/scripts/pushimage.py
index 7c56e34..e770d12 100644
--- a/scripts/pushimage.py
+++ b/scripts/pushimage.py
@@ -62,6 +62,10 @@
class MissingBoardInstructions(Exception):
"""Raised when a board lacks any signer instructions."""
+ def __init__(self, board, image_type, input_insns):
+ Exception.__init__(self, 'Board %s lacks insns for %s image: %s not found' %
+ (board, image_type, input_insns))
+
class InputInsns(object):
"""Object to hold settings for a signable board.
@@ -70,18 +74,38 @@
reads) is not exactly the same as the instruction file pushimage reads.
"""
- def __init__(self, board):
+ def __init__(self, board, image_type=None):
+ """Initialization.
+
+ Args:
+ board: The board to look up details.
+ image_type: The type of image we will be signing (see --sign-types).
+ """
self.board = board
config = ConfigParser.ConfigParser()
config.readfp(open(self.GetInsnFile('DEFAULT')))
+
# What pushimage internally refers to as 'recovery', are the basic signing
# instructions in practice, and other types are stacked on top.
+ if image_type is None:
+ image_type = constants.IMAGE_TYPE_RECOVERY
+ self.image_type = image_type
input_insns = self.GetInsnFile(constants.IMAGE_TYPE_RECOVERY)
if not os.path.exists(input_insns):
# This board doesn't have any signing instructions.
- raise MissingBoardInstructions(self.board)
+ raise MissingBoardInstructions(self.board, image_type, input_insns)
config.readfp(open(input_insns))
+
+ if image_type is not None:
+ input_insns = self.GetInsnFile(image_type)
+ if not os.path.exists(input_insns):
+ # This type doesn't have any signing instructions.
+ raise MissingBoardInstructions(self.board, image_type, input_insns)
+
+ self.image_type = image_type
+ config.readfp(open(input_insns))
+
self.cfg = config
def GetInsnFile(self, image_type):
@@ -129,22 +153,42 @@
def GetKeysets(self):
"""Return the list of keysets to sign for this board."""
+ # We do not perturb the order (e.g. using sorted() or making a set())
+ # because we want the behavior stable, and we want the input insns to
+ # explicitly control the order (since it has an impact on naming).
return self.SplitCfgField(self.cfg.get('insns', 'keyset'))
- def OutputInsns(self, image_type, output_file, sect_insns, sect_general):
+ @staticmethod
+ def CopyConfigParser(config):
+ """Return a copy of a ConfigParser object.
+
+ The python guys broke the ability to use something like deepcopy:
+ https://bugs.python.org/issue16058
+ """
+ # Write the current config to a string io object.
+ data = cStringIO.StringIO()
+ config.write(data)
+ data.seek(0)
+
+ # Create a new ConfigParser from the serialized data.
+ ret = ConfigParser.ConfigParser()
+ ret.readfp(data)
+
+ return ret
+
+ def OutputInsns(self, output_file, sect_insns, sect_general):
"""Generate the output instruction file for sending to the signer.
Note: The format of the instruction file pushimage outputs (and the signer
reads) is not exactly the same as the instruction file pushimage reads.
Args:
- image_type: The type of image we will be signing (see --sign-types).
output_file: The file to write the new instruction file to.
sect_insns: Items to set/override in the [insns] section.
sect_general: Items to set/override in the [general] section.
"""
- config = ConfigParser.ConfigParser()
- config.readfp(open(self.GetInsnFile(image_type)))
+ # Create a copy so we can clobber certain fields.
+ config = self.CopyConfigParser(self.cfg)
# Clear channel entry in instructions file, ensuring we only get
# one channel for the signer to look at. Then provide all the
@@ -160,7 +204,7 @@
config.write(output)
data = output.getvalue()
osutils.WriteFile(output_file, data)
- logging.debug('generated insns file for %s:\n%s', image_type, data)
+ logging.debug('generated insns file for %s:\n%s', self.image_type, data)
def MarkImageToBeSigned(ctx, tbs_base, insns_path, priority):
@@ -246,14 +290,13 @@
try:
input_insns = InputInsns(board)
except MissingBoardInstructions as e:
- logging.warning('board "%s" is missing base instruction file: %s', board, e)
+ logging.warning('Missing base instruction file: %s', e)
logging.warning('not uploading anything for signing')
return
channels = input_insns.GetChannels()
- # We want force_keysets as a set, and keysets as a list.
+ # We want force_keysets as a set.
force_keysets = set(force_keysets)
- keysets = list(force_keysets) if force_keysets else input_insns.GetKeysets()
if mock:
logging.info('Upload mode: mock; signers will not process anything')
@@ -281,7 +324,6 @@
if dry_run:
logging.info('DRY RUN MODE ACTIVE: NOTHING WILL BE UPLOADED')
logging.info('Signing for channels: %s', ' '.join(channels))
- logging.info('Signing for keysets : %s', ' '.join(keysets))
instruction_urls = {}
@@ -397,43 +439,53 @@
logging.debug('Files to sign: %s', files_to_sign)
# Now go through the subset for signing.
- for keyset in keysets:
- logging.debug('\n\n#### KEYSET: %s ####\n', keyset)
- sect_insns['keyset'] = keyset
- for image_type, dst_name, suffix in files_to_sign:
- dst_archive = '%s.%s' % (dst_name, suffix)
- sect_general['archive'] = dst_archive
- sect_general['type'] = image_type
+ for image_type, dst_name, suffix in files_to_sign:
+ try:
+ input_insns = InputInsns(board, image_type=image_type)
+ except MissingBoardInstructions as e:
+ logging.info('Nothing to sign: %s', e)
+ continue
- # In the default/automatic mode, only flag files for signing if the
- # archives were actually uploaded in a previous stage. This additional
- # check can be removed in future once |sign_types| becomes a required
- # argument.
- # TODO: Make |sign_types| a required argument.
- gs_artifact_path = os.path.join(dst_path, dst_archive)
- exists = False
- try:
- exists = ctx.Exists(gs_artifact_path)
- except gs.GSContextException:
- unknown_error[0] = True
- logging.error('Unknown error while checking %s', gs_artifact_path,
- exc_info=True)
- if not exists:
- logging.info('%s does not exist. Nothing to sign.',
- gs_artifact_path)
- continue
+ dst_archive = '%s.%s' % (dst_name, suffix)
+ sect_general['archive'] = dst_archive
+ sect_general['type'] = image_type
- input_insn_path = input_insns.GetInsnFile(image_type)
- if not os.path.exists(input_insn_path):
- logging.info('%s does not exist. Nothing to sign.', input_insn_path)
- continue
+ # In the default/automatic mode, only flag files for signing if the
+ # archives were actually uploaded in a previous stage. This additional
+ # check can be removed in future once |sign_types| becomes a required
+ # argument.
+ # TODO: Make |sign_types| a required argument.
+ gs_artifact_path = os.path.join(dst_path, dst_archive)
+ exists = False
+ try:
+ exists = ctx.Exists(gs_artifact_path)
+ except gs.GSContextException:
+ unknown_error[0] = True
+ logging.error('Unknown error while checking %s', gs_artifact_path,
+ exc_info=True)
+ if not exists:
+ logging.info('%s does not exist. Nothing to sign.',
+ gs_artifact_path)
+ continue
+
+ # Figure out which keysets have been requested for this type. We sort the
+ # forced set so tests/runtime behavior is stable, and because we need/want
+ # list since we'll be indexing it below w/multiple keysets.
+ keysets = sorted(force_keysets)
+ if not keysets:
+ keysets = input_insns.GetKeysets()
+ if not keysets:
+ logging.warning('Skipping %s image signing due to no keysets',
+ image_type)
+
+ for keyset in keysets:
+ sect_insns['keyset'] = keyset
# Generate the insn file for this artifact that the signer will use,
# and flag it for signing.
with tempfile.NamedTemporaryFile(
bufsize=0, prefix='pushimage.insns.') as insns_path:
- input_insns.OutputInsns(image_type, insns_path.name, sect_insns,
- sect_general)
+ input_insns.OutputInsns(insns_path.name, sect_insns, sect_general)
gs_insns_path = '%s/%s' % (dst_path, dst_name)
if keyset != keysets[0]:
@@ -455,7 +507,8 @@
logging.error('Unknown error while marking for signing %s',
gs_insns_path, exc_info=True)
continue
- logging.info('Signing %s image %s', image_type, gs_insns_path)
+ logging.info('Signing %s image with keyset %s at %s', image_type,
+ keyset, gs_insns_path)
instruction_urls.setdefault(channel, []).append(gs_insns_path)
if unknown_error[0]:
diff --git a/scripts/pushimage_unittest.py b/scripts/pushimage_unittest.py
index 9f5dcf1..ac9b492 100644
--- a/scripts/pushimage_unittest.py
+++ b/scripts/pushimage_unittest.py
@@ -62,8 +62,8 @@
def testOutputInsnsBasic(self):
"""Verify output instructions are sane"""
exp_content = """[insns]
-keyset = stumpy-mp-v3
channel = dev canary
+keyset = stumpy-mp-v3
chromeos_shell = false
ensure_no_password = true
firmware_update = true
@@ -75,7 +75,7 @@
insns = pushimage.InputInsns('test.board')
m = self.PatchObject(osutils, 'WriteFile')
- insns.OutputInsns('recovery', '/bogus', {}, {})
+ insns.OutputInsns('/bogus', {}, {})
self.assertTrue(m.called)
content = m.call_args_list[0][0][1]
self.assertEqual(content.rstrip(), exp_content.rstrip())
@@ -83,8 +83,8 @@
def testOutputInsnsReplacements(self):
"""Verify output instructions can be updated"""
exp_content = """[insns]
-keyset = batman
channel = dev
+keyset = batman
chromeos_shell = false
ensure_no_password = true
firmware_update = true
@@ -106,7 +106,7 @@
insns = pushimage.InputInsns('test.board')
m = self.PatchObject(osutils, 'WriteFile')
- insns.OutputInsns('recovery', '/a/file', sect_insns, sect_general)
+ insns.OutputInsns('/a/file', sect_insns, sect_general)
self.assertTrue(m.called)
content = m.call_args_list[0][0][1]
self.assertEqual(content.rstrip(), exp_content.rstrip())
@@ -271,6 +271,31 @@
self.assertRaises(pushimage.PushError, pushimage.PushImage, '/src',
'test.board', 'R34-5126.0.0')
+ def testMultipleKeysets(self):
+ """Verify behavior when processing an insn w/multiple keysets"""
+ EXPECTED = {
+ 'canary': [
+ ('gs://chromeos-releases/canary-channel/test.board/5126.0.0/'
+ 'ChromeOS-recovery-R34-5126.0.0-test.board.instructions'),
+ ('gs://chromeos-releases/canary-channel/test.board/5126.0.0/'
+ 'ChromeOS-recovery-R34-5126.0.0-test.board-key2.instructions'),
+ ('gs://chromeos-releases/canary-channel/test.board/5126.0.0/'
+ 'ChromeOS-recovery-R34-5126.0.0-test.board-key3.instructions'),
+ ],
+ 'dev': [
+ ('gs://chromeos-releases/dev-channel/test.board/5126.0.0/'
+ 'ChromeOS-recovery-R34-5126.0.0-test.board.instructions'),
+ ('gs://chromeos-releases/dev-channel/test.board/5126.0.0/'
+ 'ChromeOS-recovery-R34-5126.0.0-test.board-key2.instructions'),
+ ('gs://chromeos-releases/dev-channel/test.board/5126.0.0/'
+ 'ChromeOS-recovery-R34-5126.0.0-test.board-key3.instructions'),
+ ],
+ }
+ with mock.patch.object(gs.GSContext, 'Exists', return_value=True):
+ urls = pushimage.PushImage('/src', 'test.board', 'R34-5126.0.0',
+ force_keysets=('key1', 'key2', 'key3'))
+ self.assertEqual(urls, EXPECTED)
+
class MainTests(cros_test_lib.MockTestCase):
"""Tests for main()"""
@@ -289,4 +314,4 @@
signing.INPUT_INSN_DIR = signing.TEST_INPUT_INSN_DIR
# Run the tests.
- cros_test_lib.main(level='info', module=__name__)
+ cros_test_lib.main(level='notice', module=__name__)