| ## This file is part of Scapy |
| ## Copyright (C) 2017 Maxence Tury |
| ## This program is published under a GPLv2 license |
| |
| """ |
| TLS 1.3 key exchange logic. |
| """ |
| |
| import math |
| |
| from scapy.config import conf, crypto_validator |
| from scapy.error import log_runtime, warning |
| from scapy.fields import * |
| from scapy.packet import Packet, Raw, Padding |
| from scapy.layers.tls.cert import PubKeyRSA, PrivKeyRSA |
| from scapy.layers.tls.session import _GenericTLSSessionInheritance |
| from scapy.layers.tls.basefields import _tls_version, _TLSClientVersionField |
| from scapy.layers.tls.extensions import TLS_Ext_Unknown, _tls_ext |
| from scapy.layers.tls.crypto.pkcs1 import pkcs_i2osp, pkcs_os2ip |
| from scapy.layers.tls.crypto.groups import (_tls_named_ffdh_groups, |
| _tls_named_curves, _ffdh_groups, |
| _tls_named_groups) |
| |
| if conf.crypto_valid: |
| from cryptography.hazmat.backends import default_backend |
| from cryptography.hazmat.primitives.asymmetric import dh, ec |
| if conf.crypto_valid_advanced: |
| from cryptography.hazmat.primitives.asymmetric import x25519 |
| |
| |
| class KeyShareEntry(Packet): |
| """ |
| When building from scratch, we create a DH private key, and when |
| dissecting, we create a DH public key. Default group is secp256r1. |
| """ |
| __slots__ = ["privkey", "pubkey"] |
| name = "Key Share Entry" |
| fields_desc = [ShortEnumField("group", None, _tls_named_groups), |
| FieldLenField("kxlen", None, length_of="key_exchange"), |
| StrLenField("key_exchange", "", |
| length_from=lambda pkt: pkt.kxlen) ] |
| |
| def __init__(self, *args, **kargs): |
| self.privkey = None |
| self.pubkey = None |
| super(KeyShareEntry, self).__init__(*args, **kargs) |
| |
| def do_build(self): |
| """ |
| We need this hack, else 'self' would be replaced by __iter__.next(). |
| """ |
| tmp = self.explicit |
| self.explicit = True |
| b = super(KeyShareEntry, self).do_build() |
| self.explicit = tmp |
| return b |
| |
| @crypto_validator |
| def create_privkey(self): |
| """ |
| This is called by post_build() for key creation. |
| """ |
| if self.group in _tls_named_ffdh_groups: |
| params = _ffdh_groups[_tls_named_ffdh_groups[self.group]][0] |
| privkey = params.generate_private_key() |
| self.privkey = privkey |
| pubkey = privkey.public_key() |
| self.key_exchange = pubkey.public_numbers().y |
| elif self.group in _tls_named_curves: |
| if _tls_named_curves[self.group] == "x25519": |
| if conf.crypto_valid_advanced: |
| privkey = x25519.X25519PrivateKey.generate() |
| self.privkey = privkey |
| pubkey = privkey.public_key() |
| self.key_exchange = pubkey.public_bytes() |
| elif _tls_named_curves[self.group] != "x448": |
| curve = ec._CURVE_TYPES[_tls_named_curves[self.group]]() |
| privkey = ec.generate_private_key(curve, default_backend()) |
| self.privkey = privkey |
| pubkey = privkey.public_key() |
| self.key_exchange = pubkey.public_numbers().encode_point() |
| |
| def post_build(self, pkt, pay): |
| if self.group is None: |
| self.group = 23 # secp256r1 |
| |
| if not self.key_exchange: |
| try: |
| self.create_privkey() |
| except ImportError: |
| pass |
| |
| if self.kxlen is None: |
| self.kxlen = len(self.key_exchange) |
| |
| group = struct.pack("!H", self.group) |
| kxlen = struct.pack("!H", self.kxlen) |
| return group + kxlen + self.key_exchange + pay |
| |
| @crypto_validator |
| def register_pubkey(self): |
| if self.group in _tls_named_ffdh_groups: |
| params = _ffdh_groups[_tls_named_ffdh_groups[self.group]][0] |
| pn = params.parameter_numbers() |
| public_numbers = dh.DHPublicNumbers(self.key_exchange, pn) |
| self.pubkey = public_numbers.public_key(default_backend()) |
| elif self.group in _tls_named_curves: |
| if _tls_named_curves[self.group] == "x25519": |
| if conf.crypto_valid_advanced: |
| import_point = x25519.X25519PublicKey.from_public_bytes |
| self.pubkey = import_point(self.key_exchange) |
| elif _tls_named_curves[self.group] != "x448": |
| curve = ec._CURVE_TYPES[_tls_named_curves[self.group]]() |
| import_point = ec.EllipticCurvePublicNumbers.from_encoded_point |
| public_numbers = import_point(curve, self.key_exchange) |
| self.pubkey = public_numbers.public_key(default_backend()) |
| |
| def post_dissection(self, r): |
| try: |
| self.register_pubkey() |
| except ImportError: |
| pass |
| |
| def extract_padding(self, s): |
| return "", s |
| |
| |
| class TLS_Ext_KeyShare_CH(TLS_Ext_Unknown): |
| name = "TLS Extension - Key Share (for ClientHello)" |
| fields_desc = [ShortEnumField("type", 0x28, _tls_ext), |
| ShortField("len", None), |
| FieldLenField("client_shares_len", None, |
| length_of="client_shares"), |
| PacketListField("client_shares", [], KeyShareEntry, |
| length_from=lambda pkt: pkt.client_shares_len) ] |
| |
| def post_build(self, pkt, pay): |
| if not self.tls_session.frozen: |
| privshares = self.tls_session.tls13_client_privshares |
| for kse in self.client_shares: |
| if kse.privkey: |
| if _tls_named_curves[kse.group] in privshares: |
| pkt_info = pkt.firstlayer().summary() |
| log_runtime.info("TLS: group %s used twice in the same ClientHello [%s]", kse.group, pkt_info) |
| break |
| privshares[_tls_named_groups[kse.group]] = kse.privkey |
| return super(TLS_Ext_KeyShare_CH, self).post_build(pkt, pay) |
| |
| def post_dissection(self, r): |
| if not self.tls_session.frozen: |
| for kse in self.client_shares: |
| if kse.pubkey: |
| pubshares = self.tls_session.tls13_client_pubshares |
| if _tls_named_curves[kse.group] in pubshares: |
| pkt_info = r.firstlayer().summary() |
| log_runtime.info("TLS: group %s used twice in the same ClientHello [%s]", kse.group, pkt_info) |
| break |
| pubshares[_tls_named_curves[kse.group]] = kse.pubkey |
| return super(TLS_Ext_KeyShare_CH, self).post_dissection(r) |
| |
| |
| class TLS_Ext_KeyShare_HRR(TLS_Ext_Unknown): |
| name = "TLS Extension - Key Share (for HelloRetryRequest)" |
| fields_desc = [ShortEnumField("type", 0x28, _tls_ext), |
| ShortField("len", None), |
| ShortEnumField("selected_group", None, _tls_named_groups) ] |
| |
| |
| class TLS_Ext_KeyShare_SH(TLS_Ext_Unknown): |
| name = "TLS Extension - Key Share (for ServerHello)" |
| fields_desc = [ShortEnumField("type", 0x28, _tls_ext), |
| ShortField("len", None), |
| PacketField("server_share", None, KeyShareEntry) ] |
| |
| def post_build(self, pkt, pay): |
| if not self.tls_session.frozen and self.server_share.privkey: |
| # if there is a privkey, we assume the crypto library is ok |
| privshare = self.tls_session.tls13_server_privshare |
| if len(privshare) > 0: |
| pkt_info = pkt.firstlayer().summary() |
| log_runtime.info("TLS: overwriting previous server key share [%s]", pkt_info) |
| group_name = _tls_named_groups[self.server_share.group] |
| privshare[group_name] = self.server_share.privkey |
| |
| if group_name in self.tls_session.tls13_client_pubshares: |
| privkey = self.server_share.privkey |
| pubkey = self.tls_session.tls13_client_pubshares[group_name] |
| if group_name in six.itervalues(_tls_named_ffdh_groups): |
| pms = privkey.exchange(pubkey) |
| elif group_name in six.itervalues(_tls_named_curves): |
| if group_name == "x25519": |
| pms = privkey.exchange(pubkey) |
| else: |
| pms = privkey.exchange(ec.ECDH(), pubkey) |
| self.tls_session.tls13_dhe_secret = pms |
| return super(TLS_Ext_KeyShare_SH, self).post_build(pkt, pay) |
| |
| def post_dissection(self, r): |
| if not self.tls_session.frozen and self.server_share.pubkey: |
| # if there is a pubkey, we assume the crypto library is ok |
| pubshare = self.tls_session.tls13_server_pubshare |
| if len(pubshare) > 0: |
| pkt_info = r.firstlayer().summary() |
| log_runtime.info("TLS: overwriting previous server key share [%s]", pkt_info) |
| group_name = _tls_named_groups[self.server_share.group] |
| pubshare[group_name] = self.server_share.pubkey |
| |
| if group_name in self.tls_session.tls13_client_privshares: |
| pubkey = self.server_share.pubkey |
| privkey = self.tls_session.tls13_client_privshares[group_name] |
| if group_name in six.itervalues(_tls_named_ffdh_groups): |
| pms = privkey.exchange(pubkey) |
| elif group_name in six.itervalues(_tls_named_curves): |
| if group_name == "x25519": |
| pms = privkey.exchange(pubkey) |
| else: |
| pms = privkey.exchange(ec.ECDH(), pubkey) |
| self.tls_session.tls13_dhe_secret = pms |
| return super(TLS_Ext_KeyShare_SH, self).post_dissection(r) |
| |
| |
| _tls_ext_keyshare_cls = { 1: TLS_Ext_KeyShare_CH, |
| 2: TLS_Ext_KeyShare_SH, |
| 6: TLS_Ext_KeyShare_HRR } |
| |
| |
| class Ticket(Packet): |
| name = "Recommended Ticket Construction (from RFC 5077)" |
| fields_desc = [ StrFixedLenField("key_name", None, 16), |
| StrFixedLenField("iv", None, 16), |
| FieldLenField("encstatelen", None, length_of="encstate"), |
| StrLenField("encstate", "", |
| length_from=lambda pkt: pkt.encstatelen), |
| StrFixedLenField("mac", None, 32) ] |
| |
| class TicketField(PacketField): |
| __slots__ = ["length_from"] |
| def __init__(self, name, default, length_from=None, **kargs): |
| self.length_from = length_from |
| PacketField.__init__(self, name, default, Ticket, **kargs) |
| |
| def m2i(self, pkt, m): |
| l = self.length_from(pkt) |
| tbd, rem = m[:l], m[l:] |
| return self.cls(tbd)/Padding(rem) |
| |
| class PSKIdentity(Packet): |
| name = "PSK Identity" |
| fields_desc = [FieldLenField("identity_len", None, |
| length_of="identity"), |
| TicketField("identity", "", |
| length_from=lambda pkt: pkt.identity_len), |
| IntField("obfuscated_ticket_age", 0) ] |
| |
| class PSKBinderEntry(Packet): |
| name = "PSK Binder Entry" |
| fields_desc = [FieldLenField("binder_len", None, fmt="B", |
| length_of="binder"), |
| StrLenField("binder", "", |
| length_from=lambda pkt: pkt.binder_len) ] |
| |
| class TLS_Ext_PreSharedKey_CH(TLS_Ext_Unknown): |
| #XXX define post_build and post_dissection methods |
| name = "TLS Extension - Pre Shared Key (for ClientHello)" |
| fields_desc = [ShortEnumField("type", 0x28, _tls_ext), |
| ShortField("len", None), |
| FieldLenField("identities_len", None, |
| length_of="identities"), |
| PacketListField("identities", [], PSKIdentity, |
| length_from=lambda pkt: pkt.identities_len), |
| FieldLenField("binders_len", None, |
| length_of="binders"), |
| PacketListField("binders", [], PSKBinderEntry, |
| length_from=lambda pkt: pkt.binders_len) ] |
| |
| |
| class TLS_Ext_PreSharedKey_SH(TLS_Ext_Unknown): |
| name = "TLS Extension - Pre Shared Key (for ServerHello)" |
| fields_desc = [ShortEnumField("type", 0x29, _tls_ext), |
| ShortField("len", None), |
| ShortField("selected_identity", None) ] |
| |
| |
| _tls_ext_presharedkey_cls = { 1: TLS_Ext_PreSharedKey_CH, |
| 2: TLS_Ext_PreSharedKey_SH } |
| |