blob: 59b76da0b0795842855ed0fe0ecf56fc314f6554 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Scripts to provision attestation key to Android Things device.
"""
import argparse
import os
import shutil
import struct
import subprocess
import sys
import tempfile
from aesgcm import AESGCM
import cryptography.exceptions
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
import curve25519
import ec_helper
class _AtapSessionParameters(object):
"""Information stored for this AThings Attestation Protocol session.
Attributes:
algorithm: The key exchange algorithm.
operation: The operation in this session.
private_key: The host generated private key.
public_key: The host generated public key.
device_pub_key: The device side public key.
shared_key: The computed shared key.
auth_value: The challenge value.
message_version: The ATAP message version.
"""
def __init__(self):
self.algorithm = 0
self.operation = 0
self.private_key = bytes()
self.public_key = bytes()
self.device_pub_key = bytes()
self.shared_key = bytes()
self.auth_value = bytes()
self.message_version = None
class EncryptionAlgorithm(object):
"""The support encryption algorithm constant."""
ALGORITHM_P256 = 1
ALGORITHM_CURVE25519 = 2
# Version 2 adds SoM key suppport but is otherwise compatible with version 1.
_MESSAGE_VERSION_1 = 1
_MESSAGE_VERSION_2 = 2
_ALGORITHMS = {'p256': 1, 'x25519': 2}
_ECDH_KEY_LEN = 33
_VAR_LEN = 4
_HEADER_LEN = 8
_GCM_IV_LEN = 12
_GCM_TAG_LEN = 16
_HASH_LEN = 32
_HKDF_HASH_LEN = 16
def _get_message_version():
return _MESSAGE_VERSION_1
def _write_operation_start(
algorithm, operation, message_version, output_folder):
"""Writes a fresh Operation Start message to output folder.
Generates an ECDHE key specified by <algorithm> and writes an Operation
Start message for executing <operation> on the device.
Args:
algorithm: Integer specifying the curve to use for the session key.
1: P256, 2: X25519
operation: Specifies the operation. 1: Certify, 2: Issue, 3: Issue Encrypted
message_version: The ATAP version. If message_version is None, than we
select the default version according to the operation.
output_folder: The folder to write operation_start message to.
Raises:
ValueError: algorithm or operation is is invalid.
Returns:
session_params: The session information.
"""
if not algorithm or algorithm > 2 or algorithm < 1:
raise ValueError('Invalid algorithm value.')
if not operation or operation > 5 or operation < 1:
raise ValueError('Invalid operation value.')
# Generate new key for each provisioning session
if algorithm == _ALGORITHMS['x25519']:
private_key = curve25519.genkey()
# Make 33 bytes to match P256
public_key = curve25519.public(private_key) + '\0'
elif algorithm == _ALGORITHMS['p256']:
[private_key, public_key] = ec_helper.generate_p256_key()
session_params = _AtapSessionParameters()
session_params.operation = operation
session_params.algorithm = algorithm
session_params.private_key = private_key
session_params.public_key = public_key
# "Operation Start" Header
# +2 for algo and operation bytes
message_version = _get_message_version()
session_params.message_version = message_version
header = (message_version, 0, 0, 0, _ECDH_KEY_LEN + 2)
operation_start = bytearray(struct.pack('<4B I', *header))
# "Operation Start" Message
op_start = (algorithm, operation, public_key)
operation_start.extend(struct.pack('<2B 33s', *op_start))
with open(os.path.join(output_folder, 'operation_start.bin'), 'wb') as f:
f.write(operation_start)
return session_params
def _get_ca_response(ca_request, session_params, key_file):
"""Construct ca_response.bin.
Parses the CA Request message at ca_request. Computes the session key from
the ca_request, decrypts the inner request, verifies the SOM key signature,
and issues or certifies attestation keys as applicable. The CA Response
message containing test keys is written to ca_response.bin.
Args:
ca_request: The CA Request message from the device.
session_params: Session information.
key_file: The key file.
Returns:
ca response message.
Raises:
ValueError: ca_request is malformed.
CA Request message format for reference, sizes in bytes
cleartext header 8
cleartext device ephemeral public key 33
cleartext GCM IV 12
cleartext inner ca request length 4
encrypted header 8
encrypted SOM key certificate chain variable
encrypted SOM key authentication signature variable
encrypted product ID SHA256 hash 32
encrypted RSA public key variable
encrypted ECDSA public key variable
encrypted edDSA public key variable
cleartext GCM tag 16
For SoM:
cleartext header 8
cleartext device ephemeral public key 33
cleartext GCM IV 12
cleartext inner ca request length 4
encrypted header 8
encrypted SOM ID SHA256 hash 32
cleartext GCM tag 16
"""
pub_key_len = _ECDH_KEY_LEN
min_message_length = (
_HEADER_LEN + pub_key_len + _GCM_IV_LEN + _VAR_LEN + _HEADER_LEN +
_VAR_LEN + _VAR_LEN + _HASH_LEN + _VAR_LEN + _VAR_LEN +
_VAR_LEN + _GCM_TAG_LEN)
if len(ca_request) < min_message_length:
raise ValueError('Malformed message: Length invalid')
# Unpack Request header
end = _HEADER_LEN
ca_req_start = ca_request[:end]
(device_message_version, res1, res2, res3,
device_message_len) = struct.unpack('<4B I', ca_req_start)
if device_message_version > _MESSAGE_VERSION_2:
raise ValueError('Malformed message: Unsupported protocol version')
if res1 or res2 or res3:
raise ValueError('Malformed message: Reserved values set')
if device_message_len > len(ca_request) - _HEADER_LEN:
raise ValueError('Malformed message: Incorrect device message length')
# Extract AT device ephemeral public key
start = _HEADER_LEN
end = start + pub_key_len
session_params.device_pub_key = bytes(ca_request[start:end])
_derive_from_shared_secret(session_params)
# Decrypt AES-128-GCM message using the shared_key
# Extract the GCM IV
start = _HEADER_LEN + pub_key_len
end = start + _GCM_IV_LEN
gcm_iv = bytes(ca_request[start:end])
# Extract the encrypted message
start = _HEADER_LEN + pub_key_len + _GCM_IV_LEN
enc_message_len = _get_var_len(ca_request, start)
if enc_message_len > len(ca_request) - _GCM_TAG_LEN - start - _VAR_LEN:
raise ValueError('Encrypted message size %d too large' % enc_message_len)
start += _VAR_LEN
end = start + enc_message_len
enc_message = bytes(ca_request[start:end])
# Extract the GCM Tag
gcm_tag = bytes(ca_request[-_GCM_TAG_LEN:])
# Decrypt message
try:
data = AESGCM.decrypt(
enc_message, session_params.shared_key, gcm_iv, gcm_tag)
except cryptography.exceptions.InvalidTag:
raise ValueError('Malformed message: GCM decrypt failed')
# Unpack Inner header
end = _HEADER_LEN
ca_req_inner_header = data[:end]
(inner_message_version, res1, res2, res3, inner_message_len) = struct.unpack(
'<4B I', ca_req_inner_header)
if device_message_version != inner_message_version:
raise ValueError('Malformed message: Incorrect inner message version')
if res1 or res2 or res3:
raise ValueError('Malformed message: Reserved values set')
remaining_bytes = len(ca_request) - _HEADER_LEN - pub_key_len
remaining_bytes = remaining_bytes - _GCM_IV_LEN - _GCM_TAG_LEN
if inner_message_len > remaining_bytes:
raise ValueError('Malformed message: Incorrect device inner message length')
inner_ca_response = _parse_inner_ca_request(data, key_file)
(gcm_iv, encrypted_keyset, gcm_tag) = AESGCM.encrypt(
inner_ca_response, session_params.shared_key)
# "CA Response" Header
header = (
session_params.message_version,
0, 0, 0, 12 + 4 + len(encrypted_keyset) + 16)
ca_response = bytearray(struct.pack('<4B I', *header))
struct_fmt = '12s I %ds 16s' % len(inner_ca_response)
message = (gcm_iv, len(encrypted_keyset), encrypted_keyset, gcm_tag)
ca_response.extend(struct.pack(struct_fmt, *message))
return ca_response
def _parse_inner_ca_request(data, key_file):
"""Parse decrypted inner ca request and generate ca response.
The inner ca request is for issuing product key.
Args:
data: Decrypted inner ca request.
key_file: The key file.
Returns:
The inner ca response byte object.
Raises:
ValueError: inner ca request is malformed.
"""
# SOM key certificate chain
som_chain_start = _HEADER_LEN
som_chain_len = _get_var_len(data, som_chain_start)
# SOM key authentication signature
som_sig_start = som_chain_start + _VAR_LEN + som_chain_len
som_sig_len = _get_var_len(data, som_sig_start)
# Product ID SHA-256 hash
prod_id_start = som_sig_start + _VAR_LEN + som_sig_len
# RSA public key to certify
rsa_start = prod_id_start + _HASH_LEN
rsa_len = _get_var_len(data, rsa_start)
if rsa_len > 0:
raise ValueError(
'Certify operation not supported, set RSA public key length to zero')
# ECDSA public key to certify
ecdsa_start = rsa_start + _VAR_LEN + rsa_len
ecdsa_len = _get_var_len(data, ecdsa_start)
if ecdsa_len > 0:
raise ValueError(
'Certify operation not supported, set ECDSA public key length to zero')
# edDSA public key to certify
eddsa_start = prod_id_start + _VAR_LEN + _HASH_LEN
eddsa_len = _get_var_len(data, eddsa_start)
if eddsa_len > 0:
raise ValueError(
'Certify operation not supported, set edDSA public key length to zero')
with open(key_file, 'rb') as infile:
inner_ca_response = bytes(infile.read())
return inner_ca_response
def _derive_from_shared_secret(session_params):
"""Generates the shared key based on ECDH and HKDF.
Uses a particular ECDH algorithm and HKDF-SHA256 to create shared key and a
auth value. The auth value would be sent to the device as a challenge to get
som authentication if available. The generated shared key and auth value
would be stored in session_params.
Args:
session_params: Session information.
Raises:
RuntimeError: Computing the shared secret fails.
"""
hkdf_salt = session_params.public_key + session_params.device_pub_key
if session_params.algorithm == _ALGORITHMS['p256']:
ecdhe_shared_secret = ec_helper.compute_p256_shared_secret(
session_params.private_key, session_params.device_pub_key)
elif session_params.algorithm == _ALGORITHMS['x25519']:
device_pub_key = session_params.device_pub_key[:-1]
ecdhe_shared_secret = curve25519.shared(session_params.private_key,
device_pub_key)
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=_HKDF_HASH_LEN,
salt=hkdf_salt,
info='KEY',
backend=default_backend())
session_params.shared_key = hkdf.derive(ecdhe_shared_secret)
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=_HKDF_HASH_LEN,
salt=hkdf_salt,
info='SIGN',
backend=default_backend())
session_params.auth_value = hkdf.derive(ecdhe_shared_secret)
def _get_algorithm_list(serial):
"""Get the supported algorithm list.
Get the available algorithm list using getvar at-attest-dh
at_attest_dh should be in format 1:p256,2:curve25519
or 1:p256
or 2:curve25519.
Args:
serial: The device serial number.
Returns:
The supported algorithm.
"""
at_attest_dh = ''
try:
at_attest_dh = _get_var('at-attest-dh', serial)
except subprocess.CalledProcessError as e:
print 'Failed to get available exchange algorithm. Error: \n%s' % e.output
sys.exit(-1)
if not at_attest_dh:
return None
algorithm_strings = at_attest_dh.split(',')
algorithm_list = []
for algorithm_string in algorithm_strings:
algorithm_list.append(int(algorithm_string.split(':')[0]))
if EncryptionAlgorithm.ALGORITHM_CURVE25519 in algorithm_list:
return EncryptionAlgorithm.ALGORITHM_CURVE25519
elif EncryptionAlgorithm.ALGORITHM_P256 in algorithm_list:
return EncryptionAlgorithm.ALGORITHM_P256
return None
def _get_var(var, serial):
"""Get a variable from the device.
Note that the return value is in stderr instead of stdout.
Args:
var: The name of the variable.
serial: The device serial number.
Returns:
The value for the variable.
"""
if serial:
out = subprocess.check_output(
['fastboot', '-s', serial, 'getvar', var],
stderr=subprocess.STDOUT)
else:
out = subprocess.check_output(
['fastboot', 'getvar', var],
stderr=subprocess.STDOUT)
lines = out.splitlines()
for line in lines:
if line.startswith(var + ': '):
value = line.replace(var + ': ', '').replace('\r', '')
return value
def _get_var_len(data, index):
"""Reads the 4 byte little endian unsigned integer at data[index].
Args:
data: Start of bytearray
index: Offset that indicates where the integer begins
Returns:
Little endian unsigned integer at data[index]
"""
return struct.unpack('<I', data[index:index + 4])[0]
def _run_fastboot_command(commands, serial):
"""Execute a fastboot commands.
Args:
commands: The command to be executed.
serial: The serial number of the device.
"""
if serial:
fastboot_commands = ['fastboot', '-s', serial]
else:
fastboot_commands = ['fastboot']
subprocess.check_call(fastboot_commands + commands)
def main():
parser = argparse.ArgumentParser(
description='Test for Android Things key provisioning.')
parser.add_argument(
'-s',
'--serial',
type=str,
required=False,
dest='serial',
help='Fastboot serial device',
metavar='FASTBOOT_SERIAL_NUMBER')
parser.add_argument(
'-k',
'--key',
type=str,
required=True,
dest='key',
help='Key file name',
metavar='KEY_FILE_NAME')
results = parser.parse_args()
algorithm = _get_algorithm_list(results.serial)
key_file = results.key
# Operation is 'ISSUE'.
operation = 2
message_version = _get_message_version()
print 'Start giving attestation key to the Android Things device'
print 'Please keep device connected until operation finished.'
temp_folder = tempfile.mkdtemp()
try:
session_params = _write_operation_start(
algorithm, operation, message_version, temp_folder)
_run_fastboot_command(
['stage', os.path.join(temp_folder, 'operation_start.bin')],
results.serial)
_run_fastboot_command(['oem', 'at-get-ca-request'], results.serial)
_run_fastboot_command(
['get_staged', os.path.join(temp_folder, 'ca_request.bin')],
results.serial)
with open(os.path.join(temp_folder, 'ca_request.bin'), 'rb') as f:
ca_request = bytearray(f.read())
ca_response = _get_ca_response(ca_request, session_params, key_file)
with open(os.path.join(temp_folder, 'ca_response.bin'), 'wb') as f:
f.write(ca_response)
_run_fastboot_command(
['stage', os.path.join(temp_folder, 'ca_response.bin')],
results.serial)
_run_fastboot_command(['oem', 'at-set-ca-response'], results.serial)
at_attest_uuid = _get_var('at-attest-uuid', results.serial)
print ('Giving attestation key succeed! Attestation UUID: %s' %
at_attest_uuid)
except subprocess.CalledProcessError as e:
print 'Giving attestation key failed! Error: \n%s' % e.output
print 'Please try again.'
except ValueError as e:
print 'Giving attestation key failed! Error: \n%s' % str(e)
print 'Please check your key file format.'
finally:
shutil.rmtree(temp_folder)
if __name__ == '__main__':
main()