| ## This file is part of Scapy |
| ## Copyright (C) 2007, 2008, 2009 Arnaud Ebalard |
| ## 2015, 2016, 2017 Maxence Tury |
| ## This program is published under a GPLv2 license |
| |
| """ |
| TLS handshake fields & logic. |
| |
| This module covers the handshake TLS subprotocol, except for the key exchange |
| mechanisms which are addressed with keyexchange.py. |
| """ |
| |
| from __future__ import absolute_import |
| import math |
| |
| from scapy.error import log_runtime, warning |
| from scapy.fields import * |
| from scapy.compat import * |
| from scapy.packet import Packet, Raw, Padding |
| from scapy.utils import repr_hex |
| from scapy.layers.x509 import OCSP_Response |
| from scapy.layers.tls.cert import Cert, PrivKey, PubKey |
| from scapy.layers.tls.basefields import (_tls_version, _TLSVersionField, |
| _TLSClientVersionField) |
| from scapy.layers.tls.extensions import (_ExtensionsLenField, _ExtensionsField, |
| _cert_status_type, TLS_Ext_SupportedVersions) |
| from scapy.layers.tls.keyexchange import (_TLSSignature, _TLSServerParamsField, |
| _TLSSignatureField, ServerRSAParams, |
| SigAndHashAlgsField, _tls_hash_sig, |
| SigAndHashAlgsLenField) |
| from scapy.layers.tls.keyexchange_tls13 import TicketField |
| from scapy.layers.tls.session import (_GenericTLSSessionInheritance, |
| readConnState, writeConnState) |
| from scapy.layers.tls.crypto.compression import (_tls_compression_algs, |
| _tls_compression_algs_cls, |
| Comp_NULL, _GenericComp, |
| _GenericCompMetaclass) |
| from scapy.layers.tls.crypto.suites import (_tls_cipher_suites, |
| _tls_cipher_suites_cls, |
| _GenericCipherSuite, |
| _GenericCipherSuiteMetaclass) |
| |
| |
| ############################################################################### |
| ### Generic TLS Handshake message ### |
| ############################################################################### |
| |
| _tls_handshake_type = { 0: "hello_request", 1: "client_hello", |
| 2: "server_hello", 3: "hello_verify_request", |
| 4: "session_ticket", 6: "hello_retry_request", |
| 8: "encrypted_extensions", 11: "certificate", |
| 12: "server_key_exchange", 13: "certificate_request", |
| 14: "server_hello_done", 15: "certificate_verify", |
| 16: "client_key_exchange", 20: "finished", |
| 21: "certificate_url", 22: "certificate_status", |
| 23: "supplemental_data", 24: "key_update" } |
| |
| |
| class _TLSHandshake(_GenericTLSSessionInheritance): |
| """ |
| Inherited by other Handshake classes to get post_build(). |
| Also used as a fallback for unknown TLS Handshake packets. |
| """ |
| name = "TLS Handshake Generic message" |
| fields_desc = [ ByteEnumField("msgtype", None, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| StrLenField("msg", "", |
| length_from=lambda pkt: pkt.msglen) ] |
| |
| def post_build(self, p, pay): |
| l = len(p) |
| if self.msglen is None: |
| l2 = l - 4 |
| p = struct.pack("!I", (orb(p[0]) << 24) | l2) + p[4:] |
| return p + pay |
| |
| def guess_payload_class(self, p): |
| return conf.padding_layer |
| |
| def tls_session_update(self, msg_str): |
| """ |
| Covers both post_build- and post_dissection- context updates. |
| """ |
| self.tls_session.handshake_messages.append(msg_str) |
| self.tls_session.handshake_messages_parsed.append(self) |
| |
| |
| ############################################################################### |
| ### HelloRequest ### |
| ############################################################################### |
| |
| class TLSHelloRequest(_TLSHandshake): |
| name = "TLS Handshake - Hello Request" |
| fields_desc = [ ByteEnumField("msgtype", 0, _tls_handshake_type), |
| ThreeBytesField("msglen", None) ] |
| |
| def tls_session_update(self, msg_str): |
| """ |
| Message should not be added to the list of handshake messages |
| that will be hashed in the finished and certificate verify messages. |
| """ |
| return |
| |
| |
| ############################################################################### |
| ### ClientHello fields ### |
| ############################################################################### |
| |
| class _GMTUnixTimeField(UTCTimeField): |
| """ |
| "The current time and date in standard UNIX 32-bit format (seconds since |
| the midnight starting Jan 1, 1970, GMT, ignoring leap seconds)." |
| """ |
| def i2h(self, pkt, x): |
| if x is not None: |
| return x |
| return 0 |
| |
| class _TLSRandomBytesField(StrFixedLenField): |
| def i2repr(self, pkt, x): |
| if x is None: |
| return repr(x) |
| return repr_hex(self.i2h(pkt,x)) |
| |
| |
| class _SessionIDField(StrLenField): |
| """ |
| opaque SessionID<0..32>; section 7.4.1.2 of RFC 4346 |
| """ |
| pass |
| |
| |
| class _CipherSuitesField(StrLenField): |
| __slots__ = ["itemfmt", "itemsize", "i2s", "s2i"] |
| islist = 1 |
| def __init__(self, name, default, dico, length_from=None, itemfmt="!H"): |
| StrLenField.__init__(self, name, default, length_from=length_from) |
| self.itemfmt = itemfmt |
| self.itemsize = struct.calcsize(itemfmt) |
| i2s = self.i2s = {} |
| s2i = self.s2i = {} |
| for k in six.iterkeys(dico): |
| i2s[k] = dico[k] |
| s2i[dico[k]] = k |
| |
| def any2i_one(self, pkt, x): |
| if (isinstance(x, _GenericCipherSuite) or |
| isinstance(x, _GenericCipherSuiteMetaclass)): |
| x = x.val |
| if isinstance(x, bytes): |
| x = self.s2i[x] |
| return x |
| |
| def i2repr_one(self, pkt, x): |
| fmt = "0x%%0%dx" % self.itemsize |
| return self.i2s.get(x, fmt % x) |
| |
| def any2i(self, pkt, x): |
| if x is None: |
| return None |
| if not isinstance(x, list): |
| x = [x] |
| return [self.any2i_one(pkt, z) for z in x] |
| |
| def i2repr(self, pkt, x): |
| if x is None: |
| return "None" |
| l = [self.i2repr_one(pkt, z) for z in x] |
| if len(l) == 1: |
| l = l[0] |
| else: |
| l = "[%s]" % ", ".join(l) |
| return l |
| |
| def i2m(self, pkt, val): |
| if val is None: |
| val = [] |
| return b"".join(struct.pack(self.itemfmt, x) for x in val) |
| |
| def m2i(self, pkt, m): |
| res = [] |
| itemlen = struct.calcsize(self.itemfmt) |
| while m: |
| res.append(struct.unpack(self.itemfmt, m[:itemlen])[0]) |
| m = m[itemlen:] |
| return res |
| |
| def i2len(self, pkt, i): |
| if i is None: |
| return 0 |
| return len(i)*self.itemsize |
| |
| |
| class _CompressionMethodsField(_CipherSuitesField): |
| |
| def any2i_one(self, pkt, x): |
| if (isinstance(x, _GenericComp) or |
| isinstance(x, _GenericCompMetaclass)): |
| x = x.val |
| if isinstance(x, str): |
| x = self.s2i[x] |
| return x |
| |
| |
| ############################################################################### |
| ### ClientHello ### |
| ############################################################################### |
| |
| class TLSClientHello(_TLSHandshake): |
| """ |
| TLS ClientHello, with abilities to handle extensions. |
| |
| The Random structure follows the RFC 5246: while it is 32-byte long, |
| many implementations use the first 4 bytes as a gmt_unix_time, and then |
| the remaining 28 byts should be completely random. This was designed in |
| order to (sort of) mitigate broken RNGs. If you prefer to show the full |
| 32 random bytes without any GMT time, just comment in/out the lines below. |
| """ |
| name = "TLS Handshake - Client Hello" |
| fields_desc = [ ByteEnumField("msgtype", 1, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| _TLSClientVersionField("version", None, _tls_version), |
| |
| #_TLSRandomBytesField("random_bytes", None, 32), |
| _GMTUnixTimeField("gmt_unix_time", None), |
| _TLSRandomBytesField("random_bytes", None, 28), |
| |
| FieldLenField("sidlen", None, fmt="B", length_of="sid"), |
| _SessionIDField("sid", "", |
| length_from=lambda pkt:pkt.sidlen), |
| |
| FieldLenField("cipherslen", None, fmt="!H", |
| length_of="ciphers"), |
| _CipherSuitesField("ciphers", None, |
| _tls_cipher_suites, itemfmt="!H", |
| length_from=lambda pkt: pkt.cipherslen), |
| |
| FieldLenField("complen", None, fmt="B", length_of="comp"), |
| _CompressionMethodsField("comp", [0], |
| _tls_compression_algs, |
| itemfmt="B", |
| length_from= |
| lambda pkt: pkt.complen), |
| |
| _ExtensionsLenField("extlen", None, length_of="ext"), |
| _ExtensionsField("ext", None, |
| length_from=lambda pkt: (pkt.msglen - |
| (pkt.sidlen or 0) - |
| (pkt.cipherslen or 0) - |
| (pkt.complen or 0) - |
| 40)) ] |
| |
| def post_build(self, p, pay): |
| if self.random_bytes is None: |
| p = p[:10] + randstring(28) + p[10+28:] |
| |
| # if no ciphersuites were provided, we add a few usual, supported |
| # ciphersuites along with the appropriate extensions |
| if self.ciphers is None: |
| cipherstart = 39 + (self.sidlen or 0) |
| s = b"001ac02bc023c02fc027009e0067009c003cc009c0130033002f000a" |
| p = p[:cipherstart] + bytes_hex(s) + p[cipherstart+2:] |
| if self.ext is None: |
| ext_len = b'\x00\x2c' |
| ext_reneg = b'\xff\x01\x00\x01\x00' |
| ext_sn = b'\x00\x00\x00\x0f\x00\r\x00\x00\nsecdev.org' |
| ext_sigalg = b'\x00\r\x00\x08\x00\x06\x04\x03\x04\x01\x02\x01' |
| ext_supgroups = b'\x00\n\x00\x04\x00\x02\x00\x17' |
| p += ext_len + ext_reneg + ext_sn + ext_sigalg + ext_supgroups |
| |
| return super(TLSClientHello, self).post_build(p, pay) |
| |
| def tls_session_update(self, msg_str): |
| """ |
| Either for parsing or building, we store the client_random |
| along with the raw string representing this handshake message. |
| """ |
| super(TLSClientHello, self).tls_session_update(msg_str) |
| |
| self.tls_session.advertised_tls_version = self.version |
| self.random_bytes = msg_str[10:38] |
| self.tls_session.client_random = (struct.pack('!I', |
| self.gmt_unix_time) + |
| self.random_bytes) |
| if self.ext: |
| for e in self.ext: |
| if isinstance(e, TLS_Ext_SupportedVersions): |
| if self.tls_session.tls13_early_secret is None: |
| # this is not recomputed if there was a TLS 1.3 HRR |
| self.tls_session.compute_tls13_early_secrets() |
| break |
| |
| ############################################################################### |
| ### ServerHello ### |
| ############################################################################### |
| |
| class TLSServerHello(TLSClientHello): |
| """ |
| TLS ServerHello, with abilities to handle extensions. |
| |
| The Random structure follows the RFC 5246: while it is 32-byte long, |
| many implementations use the first 4 bytes as a gmt_unix_time, and then |
| the remaining 28 byts should be completely random. This was designed in |
| order to (sort of) mitigate broken RNGs. If you prefer to show the full |
| 32 random bytes without any GMT time, just comment in/out the lines below. |
| """ |
| name = "TLS Handshake - Server Hello" |
| fields_desc = [ ByteEnumField("msgtype", 2, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| _TLSVersionField("version", None, _tls_version), |
| |
| #_TLSRandomBytesField("random_bytes", None, 32), |
| _GMTUnixTimeField("gmt_unix_time", None), |
| _TLSRandomBytesField("random_bytes", None, 28), |
| |
| FieldLenField("sidlen", None, length_of="sid", fmt="B"), |
| _SessionIDField("sid", "", |
| length_from = lambda pkt: pkt.sidlen), |
| |
| EnumField("cipher", None, _tls_cipher_suites), |
| _CompressionMethodsField("comp", [0], |
| _tls_compression_algs, |
| itemfmt="B", |
| length_from=lambda pkt: 1), |
| |
| _ExtensionsLenField("extlen", None, length_of="ext"), |
| _ExtensionsField("ext", None, |
| length_from=lambda pkt: (pkt.msglen - |
| (pkt.sidlen or 0) - |
| 38)) ] |
| #40)) ] |
| |
| @classmethod |
| def dispatch_hook(cls, _pkt=None, *args, **kargs): |
| if _pkt and len(_pkt) >= 6: |
| version = struct.unpack("!H", _pkt[4:6])[0] |
| if version == 0x0304 or version > 0x7f00: |
| return TLS13ServerHello |
| return TLSServerHello |
| |
| def post_build(self, p, pay): |
| if self.random_bytes is None: |
| p = p[:10] + randstring(28) + p[10+28:] |
| return super(TLSClientHello, self).post_build(p, pay) |
| |
| def tls_session_update(self, msg_str): |
| """ |
| Either for parsing or building, we store the server_random |
| along with the raw string representing this handshake message. |
| We also store the session_id, the cipher suite (if recognized), |
| the compression method, and finally we instantiate the pending write |
| and read connection states. Usually they get updated later on in the |
| negotiation when we learn the session keys, and eventually they |
| are committed once a ChangeCipherSpec has been sent/received. |
| """ |
| super(TLSClientHello, self).tls_session_update(msg_str) |
| |
| self.tls_session.tls_version = self.version |
| self.random_bytes = msg_str[10:38] |
| self.tls_session.server_random = (struct.pack('!I', |
| self.gmt_unix_time) + |
| self.random_bytes) |
| self.tls_session.sid = self.sid |
| |
| cs_cls = None |
| if self.cipher: |
| cs_val = self.cipher |
| if cs_val not in _tls_cipher_suites_cls: |
| warning("Unknown cipher suite %d from ServerHello" % cs_val) |
| # we do not try to set a default nor stop the execution |
| else: |
| cs_cls = _tls_cipher_suites_cls[cs_val] |
| |
| comp_cls = Comp_NULL |
| if self.comp: |
| comp_val = self.comp[0] |
| if comp_val not in _tls_compression_algs_cls: |
| err = "Unknown compression alg %d from ServerHello" % comp_val |
| warning(err) |
| comp_val = 0 |
| comp_cls = _tls_compression_algs_cls[comp_val] |
| |
| connection_end = self.tls_session.connection_end |
| self.tls_session.pwcs = writeConnState(ciphersuite=cs_cls, |
| compression_alg=comp_cls, |
| connection_end=connection_end, |
| tls_version=self.version) |
| self.tls_session.prcs = readConnState(ciphersuite=cs_cls, |
| compression_alg=comp_cls, |
| connection_end=connection_end, |
| tls_version=self.version) |
| |
| |
| class TLS13ServerHello(TLSClientHello): |
| """ TLS 1.3 ServerHello """ |
| name = "TLS 1.3 Handshake - Server Hello" |
| fields_desc = [ ByteEnumField("msgtype", 2, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| _TLSVersionField("version", None, _tls_version), |
| _TLSRandomBytesField("random_bytes", None, 32), |
| EnumField("cipher", None, _tls_cipher_suites), |
| _ExtensionsLenField("extlen", None, length_of="ext"), |
| _ExtensionsField("ext", None, |
| length_from=lambda pkt: (pkt.msglen - |
| 38)) ] |
| |
| def tls_session_update(self, msg_str): |
| """ |
| Either for parsing or building, we store the server_random along with |
| the raw string representing this handshake message. We also store the |
| cipher suite (if recognized), and finally we instantiate the write and |
| read connection states. |
| """ |
| super(TLSClientHello, self).tls_session_update(msg_str) |
| |
| s = self.tls_session |
| s.tls_version = self.version |
| s.server_random = self.random_bytes |
| |
| cs_cls = None |
| if self.cipher: |
| cs_val = self.cipher |
| if cs_val not in _tls_cipher_suites_cls: |
| warning("Unknown cipher suite %d from ServerHello" % cs_val) |
| # we do not try to set a default nor stop the execution |
| else: |
| cs_cls = _tls_cipher_suites_cls[cs_val] |
| |
| connection_end = s.connection_end |
| s.pwcs = writeConnState(ciphersuite=cs_cls, |
| connection_end=connection_end, |
| tls_version=self.version) |
| s.triggered_pwcs_commit = True |
| s.prcs = readConnState(ciphersuite=cs_cls, |
| connection_end=connection_end, |
| tls_version=self.version) |
| s.triggered_prcs_commit = True |
| |
| if self.tls_session.tls13_early_secret is None: |
| # In case the connState was not pre-initialized, we could not |
| # compute the early secrets at the ClientHello, so we do it here. |
| self.tls_session.compute_tls13_early_secrets() |
| s.compute_tls13_handshake_secrets() |
| |
| |
| ############################################################################### |
| ### HelloRetryRequest ### |
| ############################################################################### |
| |
| class TLSHelloRetryRequest(_TLSHandshake): |
| name = "TLS 1.3 Handshake - Hello Retry Request" |
| fields_desc = [ ByteEnumField("msgtype", 6, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| _TLSVersionField("version", None, _tls_version), |
| _ExtensionsLenField("extlen", None, length_of="ext"), |
| _ExtensionsField("ext", None, |
| length_from=lambda pkt: pkt.msglen - 4) ] |
| |
| |
| ############################################################################### |
| ### EncryptedExtensions ### |
| ############################################################################### |
| |
| class TLSEncryptedExtensions(_TLSHandshake): |
| name = "TLS 1.3 Handshake - Encrypted Extensions" |
| fields_desc = [ ByteEnumField("msgtype", 8, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| _ExtensionsLenField("extlen", None, length_of="ext"), |
| _ExtensionsField("ext", None, |
| length_from=lambda pkt: pkt.msglen - 2) ] |
| |
| |
| ############################################################################### |
| ### Certificate ### |
| ############################################################################### |
| |
| #XXX It might be appropriate to rewrite this mess with basic 3-byte FieldLenField. |
| |
| class _ASN1CertLenField(FieldLenField): |
| """ |
| This is mostly a 3-byte FieldLenField. |
| """ |
| def __init__(self, name, default, length_of=None, adjust=lambda pkt, x: x): |
| self.length_of = length_of |
| self.adjust = adjust |
| Field.__init__(self, name, default, fmt="!I") |
| |
| def i2m(self, pkt, x): |
| if x is None: |
| if self.length_of is not None: |
| fld,fval = pkt.getfield_and_val(self.length_of) |
| f = fld.i2len(pkt, fval) |
| x = self.adjust(pkt, f) |
| return x |
| |
| def addfield(self, pkt, s, val): |
| return s + struct.pack(self.fmt, self.i2m(pkt,val))[1:4] |
| |
| def getfield(self, pkt, s): |
| return s[3:], self.m2i(pkt, struct.unpack(self.fmt, b"\x00" + s[:3])[0]) |
| |
| |
| class _ASN1CertListField(StrLenField): |
| islist = 1 |
| def i2len(self, pkt, i): |
| if i is None: |
| return 0 |
| return len(self.i2m(pkt, i)) |
| |
| def getfield(self, pkt, s): |
| """ |
| Extract Certs in a loop. |
| XXX We should provide safeguards when trying to parse a Cert. |
| """ |
| l = None |
| if self.length_from is not None: |
| l = self.length_from(pkt) |
| |
| lst = [] |
| ret = b"" |
| m = s |
| if l is not None: |
| m, ret = s[:l], s[l:] |
| while m: |
| clen = struct.unpack("!I", b'\x00' + m[:3])[0] |
| lst.append((clen, Cert(m[3:3 + clen]))) |
| m = m[3 + clen:] |
| return m + ret, lst |
| |
| def i2m(self, pkt, i): |
| def i2m_one(i): |
| if isinstance(i, str): |
| return i |
| if isinstance(i, Cert): |
| s = i.der |
| l = struct.pack("!I", len(s))[1:4] |
| return l + s |
| |
| (l, s) = i |
| if isinstance(s, Cert): |
| s = s.der |
| return struct.pack("!I", l)[1:4] + s |
| |
| if i is None: |
| return b"" |
| if isinstance(i, str): |
| return i |
| if isinstance(i, Cert): |
| i = [i] |
| return b"".join(i2m_one(x) for x in i) |
| |
| def any2i(self, pkt, x): |
| return x |
| |
| class _ASN1CertField(StrLenField): |
| def i2len(self, pkt, i): |
| if i is None: |
| return 0 |
| return len(self.i2m(pkt, i)) |
| |
| def getfield(self, pkt, s): |
| l = None |
| if self.length_from is not None: |
| l = self.length_from(pkt) |
| ret = b"" |
| m = s |
| if l is not None: |
| m, ret = s[:l], s[l:] |
| clen = struct.unpack("!I", b'\x00' + m[:3])[0] |
| len_cert = (clen, Cert(m[3:3 + clen])) |
| m = m[3 + clen:] |
| return m + ret, len_cert |
| |
| def i2m(self, pkt, i): |
| def i2m_one(i): |
| if isinstance(i, str): |
| return i |
| if isinstance(i, Cert): |
| s = i.der |
| l = struct.pack("!I", len(s))[1:4] |
| return l + s |
| |
| (l, s) = i |
| if isinstance(s, Cert): |
| s = s.der |
| return struct.pack("!I", l)[1:4] + s |
| |
| if i is None: |
| return b"" |
| return i2m_one(i) |
| |
| def any2i(self, pkt, x): |
| return x |
| |
| |
| class TLSCertificate(_TLSHandshake): |
| """ |
| XXX We do not support RFC 5081, i.e. OpenPGP certificates. |
| """ |
| name = "TLS Handshake - Certificate" |
| fields_desc = [ ByteEnumField("msgtype", 11, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| _ASN1CertLenField("certslen", None, length_of="certs"), |
| _ASN1CertListField("certs", [], |
| length_from = lambda pkt: pkt.certslen) ] |
| |
| @classmethod |
| def dispatch_hook(cls, _pkt=None, *args, **kargs): |
| if _pkt: |
| tls_session = kargs.get("tls_session", None) |
| if tls_session and tls_session.tls_version >= 0x0304: |
| return TLS13Certificate |
| return TLSCertificate |
| |
| def post_dissection_tls_session_update(self, msg_str): |
| self.tls_session_update(msg_str) |
| connection_end = self.tls_session.connection_end |
| if connection_end == "client": |
| self.tls_session.server_certs = [x[1] for x in self.certs] |
| else: |
| self.tls_session.client_certs = [x[1] for x in self.certs] |
| |
| |
| class _ASN1CertAndExt(_GenericTLSSessionInheritance): |
| name = "Certificate and Extensions" |
| fields_desc = [ _ASN1CertField("cert", ""), |
| FieldLenField("extlen", None, length_of="ext"), |
| _ExtensionsField("ext", [], |
| length_from=lambda pkt: pkt.extlen) ] |
| def extract_padding(self, s): |
| return b"", s |
| |
| class _ASN1CertAndExtListField(PacketListField): |
| def m2i(self, pkt, m): |
| return self.cls(m, tls_session=pkt.tls_session) |
| |
| class TLS13Certificate(_TLSHandshake): |
| name = "TLS 1.3 Handshake - Certificate" |
| fields_desc = [ ByteEnumField("msgtype", 11, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| FieldLenField("cert_req_ctxt_len", None, fmt="B", |
| length_of="cert_req_ctxt"), |
| StrLenField("cert_req_ctxt", "", |
| length_from=lambda pkt: pkt.cert_req_ctxt_len), |
| _ASN1CertLenField("certslen", None, length_of="certs"), |
| _ASN1CertAndExtListField("certs", [], _ASN1CertAndExt, |
| length_from=lambda pkt: pkt.certslen) ] |
| |
| def post_dissection_tls_session_update(self, msg_str): |
| self.tls_session_update(msg_str) |
| connection_end = self.tls_session.connection_end |
| if connection_end == "client": |
| if self.certs: |
| sc = [x.cert[1] for x in self.certs] |
| self.tls_session.server_certs = sc |
| else: |
| if self.certs: |
| cc = [x.cert[1] for x in self.certs] |
| self.tls_session.client_certs = cc |
| |
| |
| ############################################################################### |
| ### ServerKeyExchange ### |
| ############################################################################### |
| |
| class TLSServerKeyExchange(_TLSHandshake): |
| name = "TLS Handshake - Server Key Exchange" |
| fields_desc = [ ByteEnumField("msgtype", 12, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| _TLSServerParamsField("params", None, |
| length_from=lambda pkt: pkt.msglen), |
| _TLSSignatureField("sig", None, |
| length_from=lambda pkt: pkt.msglen - len(pkt.params)) ] |
| |
| def build(self, *args, **kargs): |
| """ |
| We overload build() method in order to provide a valid default value |
| for params based on TLS session if not provided. This cannot be done by |
| overriding i2m() because the method is called on a copy of the packet. |
| |
| The 'params' field is built according to key_exchange.server_kx_msg_cls |
| which should have been set after receiving a cipher suite in a |
| previous ServerHello. Usual cases are: |
| - None: for RSA encryption or fixed FF/ECDH. This should never happen, |
| as no ServerKeyExchange should be generated in the first place. |
| - ServerDHParams: for ephemeral FFDH. In that case, the parameter to |
| server_kx_msg_cls does not matter. |
| - ServerECDH*Params: for ephemeral ECDH. There are actually three |
| classes, which are dispatched by _tls_server_ecdh_cls_guess on |
| the first byte retrieved. The default here is b"\03", which |
| corresponds to ServerECDHNamedCurveParams (implicit curves). |
| |
| When the Server*DHParams are built via .fill_missing(), the session |
| server_kx_privkey will be updated accordingly. |
| """ |
| fval = self.getfieldval("params") |
| if fval is None: |
| s = self.tls_session |
| if s.pwcs: |
| if s.pwcs.key_exchange.export: |
| cls = ServerRSAParams(tls_session=s) |
| else: |
| cls = s.pwcs.key_exchange.server_kx_msg_cls(b"\x03") |
| cls = cls(tls_session=s) |
| try: |
| cls.fill_missing() |
| except: |
| pass |
| else: |
| cls = Raw() |
| self.params = cls |
| |
| fval = self.getfieldval("sig") |
| if fval is None: |
| s = self.tls_session |
| if s.pwcs: |
| if not s.pwcs.key_exchange.anonymous: |
| p = self.params |
| if p is None: |
| p = b"" |
| m = s.client_random + s.server_random + raw(p) |
| cls = _TLSSignature(tls_session=s) |
| cls._update_sig(m, s.server_key) |
| else: |
| cls = Raw() |
| else: |
| cls = Raw() |
| self.sig = cls |
| |
| return _TLSHandshake.build(self, *args, **kargs) |
| |
| def post_dissection(self, pkt): |
| """ |
| While previously dissecting Server*DHParams, the session |
| server_kx_pubkey should have been updated. |
| |
| XXX Add a 'fixed_dh' OR condition to the 'anonymous' test. |
| """ |
| s = self.tls_session |
| if s.prcs and s.prcs.key_exchange.no_ske: |
| pkt_info = pkt.firstlayer().summary() |
| log_runtime.info("TLS: useless ServerKeyExchange [%s]", pkt_info) |
| if (s.prcs and |
| not s.prcs.key_exchange.anonymous and |
| s.client_random and s.server_random and |
| s.server_certs and len(s.server_certs) > 0): |
| m = s.client_random + s.server_random + raw(self.params) |
| sig_test = self.sig._verify_sig(m, s.server_certs[0]) |
| if not sig_test: |
| pkt_info = pkt.firstlayer().summary() |
| log_runtime.info("TLS: invalid ServerKeyExchange signature [%s]", pkt_info) |
| |
| |
| ############################################################################### |
| ### CertificateRequest ### |
| ############################################################################### |
| |
| _tls_client_certificate_types = { 1: "rsa_sign", |
| 2: "dss_sign", |
| 3: "rsa_fixed_dh", |
| 4: "dss_fixed_dh", |
| 5: "rsa_ephemeral_dh_RESERVED", |
| 6: "dss_ephemeral_dh_RESERVED", |
| 20: "fortezza_dms_RESERVED", |
| 64: "ecdsa_sign", |
| 65: "rsa_fixed_ecdh", |
| 66: "ecdsa_fixed_ecdh" } |
| |
| |
| class _CertTypesField(_CipherSuitesField): |
| pass |
| |
| class _CertAuthoritiesField(StrLenField): |
| """ |
| XXX Rework this with proper ASN.1 parsing. |
| """ |
| islist = 1 |
| |
| def getfield(self, pkt, s): |
| l = self.length_from(pkt) |
| return s[l:], self.m2i(pkt, s[:l]) |
| |
| def m2i(self, pkt, m): |
| res = [] |
| while len(m) > 1: |
| l = struct.unpack("!H", m[:2])[0] |
| if len(m) < l + 2: |
| res.append((l, m[2:])) |
| break |
| dn = m[2:2+l] |
| res.append((l, dn)) |
| m = m[2+l:] |
| return res |
| |
| def i2m(self, pkt, i): |
| return b"".join(map(lambda x_y: struct.pack("!H", x_y[0]) + x_y[1], i)) |
| |
| def addfield(self, pkt, s, val): |
| return s + self.i2m(pkt, val) |
| |
| def i2len(self, pkt, val): |
| if val is None: |
| return 0 |
| else: |
| return len(self.i2m(pkt, val)) |
| |
| |
| class TLSCertificateRequest(_TLSHandshake): |
| name = "TLS Handshake - Certificate Request" |
| fields_desc = [ ByteEnumField("msgtype", 13, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| FieldLenField("ctypeslen", None, fmt="B", |
| length_of="ctypes"), |
| _CertTypesField("ctypes", [1, 64], |
| _tls_client_certificate_types, |
| itemfmt="!B", |
| length_from=lambda pkt: pkt.ctypeslen), |
| SigAndHashAlgsLenField("sig_algs_len", None, |
| length_of="sig_algs"), |
| SigAndHashAlgsField("sig_algs", [0x0403, 0x0401, 0x0201], |
| EnumField("hash_sig", None, _tls_hash_sig), |
| length_from=lambda pkt: pkt.sig_algs_len), |
| FieldLenField("certauthlen", None, fmt="!H", |
| length_of="certauth"), |
| _CertAuthoritiesField("certauth", [], |
| length_from=lambda pkt: pkt.certauthlen) ] |
| |
| |
| ############################################################################### |
| ### ServerHelloDone ### |
| ############################################################################### |
| |
| class TLSServerHelloDone(_TLSHandshake): |
| name = "TLS Handshake - Server Hello Done" |
| fields_desc = [ ByteEnumField("msgtype", 14, _tls_handshake_type), |
| ThreeBytesField("msglen", None) ] |
| |
| |
| ############################################################################### |
| ### CertificateVerify ### |
| ############################################################################### |
| |
| class TLSCertificateVerify(_TLSHandshake): |
| name = "TLS Handshake - Certificate Verify" |
| fields_desc = [ ByteEnumField("msgtype", 15, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| _TLSSignatureField("sig", None, |
| length_from=lambda pkt: pkt.msglen) ] |
| |
| def build(self, *args, **kargs): |
| sig = self.getfieldval("sig") |
| if sig is None: |
| s = self.tls_session |
| m = b"".join(s.handshake_messages) |
| if s.tls_version >= 0x0304: |
| if s.connection_end == "client": |
| context_string = "TLS 1.3, client CertificateVerify" |
| elif s.connection_end == "server": |
| context_string = "TLS 1.3, server CertificateVerify" |
| m = b"\x20"*64 + context_string + b"\x00" + s.wcs.hash.digest(m) |
| self.sig = _TLSSignature(tls_session=s) |
| if s.connection_end == "client": |
| self.sig._update_sig(m, s.client_key) |
| elif s.connection_end == "server": |
| # should be TLS 1.3 only |
| self.sig._update_sig(m, s.server_key) |
| return _TLSHandshake.build(self, *args, **kargs) |
| |
| def post_dissection(self, pkt): |
| s = self.tls_session |
| m = b"".join(s.handshake_messages) |
| if s.tls_version >= 0x0304: |
| if s.connection_end == "client": |
| context_string = b"TLS 1.3, server CertificateVerify" |
| elif s.connection_end == "server": |
| context_string = b"TLS 1.3, client CertificateVerify" |
| m = b"\x20"*64 + context_string + b"\x00" + s.rcs.hash.digest(m) |
| |
| if s.connection_end == "server": |
| if s.client_certs and len(s.client_certs) > 0: |
| sig_test = self.sig._verify_sig(m, s.client_certs[0]) |
| if not sig_test: |
| pkt_info = pkt.firstlayer().summary() |
| log_runtime.info("TLS: invalid CertificateVerify signature [%s]", pkt_info) |
| elif s.connection_end == "client": |
| # should be TLS 1.3 only |
| if s.server_certs and len(s.server_certs) > 0: |
| sig_test = self.sig._verify_sig(m, s.server_certs[0]) |
| if not sig_test: |
| pkt_info = pkt.firstlayer().summary() |
| log_runtime.info("TLS: invalid CertificateVerify signature [%s]", pkt_info) |
| |
| |
| ############################################################################### |
| ### ClientKeyExchange ### |
| ############################################################################### |
| |
| class _TLSCKExchKeysField(PacketField): |
| __slots__ = ["length_from"] |
| holds_packet = 1 |
| def __init__(self, name, length_from=None, remain=0): |
| self.length_from = length_from |
| PacketField.__init__(self, name, None, None, remain=remain) |
| |
| def m2i(self, pkt, m): |
| """ |
| The client_kx_msg may be either None, EncryptedPreMasterSecret |
| (for RSA encryption key exchange), ClientDiffieHellmanPublic, |
| or ClientECDiffieHellmanPublic. When either one of them gets |
| dissected, the session context is updated accordingly. |
| """ |
| l = self.length_from(pkt) |
| tbd, rem = m[:l], m[l:] |
| |
| s = pkt.tls_session |
| cls = None |
| |
| if s.prcs and s.prcs.key_exchange: |
| cls = s.prcs.key_exchange.client_kx_msg_cls |
| |
| if cls is None: |
| return Raw(tbd)/Padding(rem) |
| |
| return cls(tbd, tls_session=s)/Padding(rem) |
| |
| |
| class TLSClientKeyExchange(_TLSHandshake): |
| """ |
| This class mostly works like TLSServerKeyExchange and its 'params' field. |
| """ |
| name = "TLS Handshake - Client Key Exchange" |
| fields_desc = [ ByteEnumField("msgtype", 16, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| _TLSCKExchKeysField("exchkeys", |
| length_from = lambda pkt: pkt.msglen) ] |
| |
| def build(self, *args, **kargs): |
| fval = self.getfieldval("exchkeys") |
| if fval is None: |
| s = self.tls_session |
| if s.prcs: |
| cls = s.prcs.key_exchange.client_kx_msg_cls |
| cls = cls(tls_session=s) |
| else: |
| cls = Raw() |
| self.exchkeys = cls |
| return _TLSHandshake.build(self, *args, **kargs) |
| |
| |
| ############################################################################### |
| ### Finished ### |
| ############################################################################### |
| |
| class _VerifyDataField(StrLenField): |
| def getfield(self, pkt, s): |
| if pkt.tls_session.tls_version == 0x0300: |
| sep = 36 |
| elif pkt.tls_session.tls_version >= 0x0304: |
| sep = pkt.tls_session.rcs.hash.hash_len |
| else: |
| sep = 12 |
| return s[sep:], s[:sep] |
| |
| class TLSFinished(_TLSHandshake): |
| name = "TLS Handshake - Finished" |
| fields_desc = [ ByteEnumField("msgtype", 20, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| _VerifyDataField("vdata", None) ] |
| |
| def build(self, *args, **kargs): |
| fval = self.getfieldval("vdata") |
| if fval is None: |
| s = self.tls_session |
| handshake_msg = b"".join(s.handshake_messages) |
| con_end = s.connection_end |
| if s.tls_version < 0x0304: |
| ms = s.master_secret |
| self.vdata = s.wcs.prf.compute_verify_data(con_end, "write", |
| handshake_msg, ms) |
| else: |
| self.vdata = s.compute_tls13_verify_data(con_end, "write") |
| return _TLSHandshake.build(self, *args, **kargs) |
| |
| def post_dissection(self, pkt): |
| s = self.tls_session |
| if not s.frozen: |
| handshake_msg = b"".join(s.handshake_messages) |
| if s.tls_version < 0x0304 and s.master_secret is not None: |
| ms = s.master_secret |
| con_end = s.connection_end |
| verify_data = s.rcs.prf.compute_verify_data(con_end, "read", |
| handshake_msg, ms) |
| if self.vdata != verify_data: |
| pkt_info = pkt.firstlayer().summary() |
| log_runtime.info("TLS: invalid Finished received [%s]", pkt_info) |
| elif s.tls_version >= 0x0304: |
| con_end = s.connection_end |
| verify_data = s.compute_tls13_verify_data(con_end, "read") |
| if self.vdata != verify_data: |
| pkt_info = pkt.firstlayer().summary() |
| log_runtime.info("TLS: invalid Finished received [%s]", pkt_info) |
| |
| def post_build_tls_session_update(self, msg_str): |
| self.tls_session_update(msg_str) |
| s = self.tls_session |
| if s.tls_version >= 0x0304: |
| s.pwcs = writeConnState(ciphersuite=type(s.wcs.ciphersuite), |
| connection_end=s.connection_end, |
| tls_version=s.tls_version) |
| s.triggered_pwcs_commit = True |
| if s.connection_end == "server": |
| s.compute_tls13_traffic_secrets() |
| elif s.connection_end == "client": |
| s.compute_tls13_traffic_secrets_end() |
| s.compute_tls13_resumption_secret() |
| |
| def post_dissection_tls_session_update(self, msg_str): |
| self.tls_session_update(msg_str) |
| s = self.tls_session |
| if s.tls_version >= 0x0304: |
| s.prcs = readConnState(ciphersuite=type(s.rcs.ciphersuite), |
| connection_end=s.connection_end, |
| tls_version=s.tls_version) |
| s.triggered_prcs_commit = True |
| if s.connection_end == "client": |
| s.compute_tls13_traffic_secrets() |
| elif s.connection_end == "server": |
| s.compute_tls13_traffic_secrets_end() |
| s.compute_tls13_resumption_secret() |
| |
| |
| ## Additional handshake messages |
| |
| ############################################################################### |
| ### HelloVerifyRequest ### |
| ############################################################################### |
| |
| class TLSHelloVerifyRequest(_TLSHandshake): |
| """ |
| Defined for DTLS, see RFC 6347. |
| """ |
| name = "TLS Handshake - Hello Verify Request" |
| fields_desc = [ ByteEnumField("msgtype", 21, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| FieldLenField("cookielen", None, |
| fmt="B", length_of="cookie"), |
| StrLenField("cookie", "", |
| length_from=lambda pkt: pkt.cookielen) ] |
| |
| |
| ############################################################################### |
| ### CertificateURL ### |
| ############################################################################### |
| |
| _tls_cert_chain_types = { 0: "individual_certs", |
| 1: "pkipath" } |
| |
| class URLAndOptionalHash(Packet): |
| name = "URLAndOptionHash structure for TLSCertificateURL" |
| fields_desc = [ FieldLenField("urllen", None, length_of="url"), |
| StrLenField("url", "", |
| length_from=lambda pkt: pkt.urllen), |
| FieldLenField("hash_present", None, |
| fmt="B", length_of="hash", |
| adjust=lambda pkt,x: int(math.ceil(x/20.))), |
| StrLenField("hash", "", |
| length_from=lambda pkt: 20*pkt.hash_present) ] |
| def guess_payload_class(self, p): |
| return Padding |
| |
| class TLSCertificateURL(_TLSHandshake): |
| """ |
| Defined in RFC 4366. PkiPath structure of section 8 is not implemented yet. |
| """ |
| name = "TLS Handshake - Certificate URL" |
| fields_desc = [ ByteEnumField("msgtype", 21, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| ByteEnumField("certchaintype", None, _tls_cert_chain_types), |
| FieldLenField("uahlen", None, length_of="uah"), |
| PacketListField("uah", [], URLAndOptionalHash, |
| length_from=lambda pkt: pkt.uahlen) ] |
| |
| |
| ############################################################################### |
| ### CertificateStatus ### |
| ############################################################################### |
| |
| class ThreeBytesLenField(FieldLenField): |
| def __init__(self, name, default, length_of=None, adjust=lambda pkt, x:x): |
| FieldLenField.__init__(self, name, default, length_of=length_of, |
| fmt='!I', adjust=adjust) |
| def i2repr(self, pkt, x): |
| if x is None: |
| return 0 |
| return repr(self.i2h(pkt,x)) |
| def addfield(self, pkt, s, val): |
| return s+struct.pack(self.fmt, self.i2m(pkt,val))[1:4] |
| def getfield(self, pkt, s): |
| return s[3:], self.m2i(pkt, struct.unpack(self.fmt, b"\x00"+s[:3])[0]) |
| |
| _cert_status_cls = { 1: OCSP_Response } |
| |
| class _StatusField(PacketField): |
| def m2i(self, pkt, m): |
| idtype = pkt.status_type |
| cls = self.cls |
| if idtype in _cert_status_cls: |
| cls = _cert_status_cls[idtype] |
| return cls(m) |
| |
| class TLSCertificateStatus(_TLSHandshake): |
| name = "TLS Handshake - Certificate Status" |
| fields_desc = [ ByteEnumField("msgtype", 22, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| ByteEnumField("status_type", 1, _cert_status_type), |
| ThreeBytesLenField("responselen", None, |
| length_of="response"), |
| _StatusField("response", None, Raw) ] |
| |
| |
| ############################################################################### |
| ### SupplementalData ### |
| ############################################################################### |
| |
| class SupDataEntry(Packet): |
| name = "Supplemental Data Entry - Generic" |
| fields_desc = [ ShortField("sdtype", None), |
| FieldLenField("len", None, length_of="data"), |
| StrLenField("data", "", |
| length_from=lambda pkt:pkt.len) ] |
| def guess_payload_class(self, p): |
| return Padding |
| |
| class UserMappingData(Packet): |
| name = "User Mapping Data" |
| fields_desc = [ ByteField("version", None), |
| FieldLenField("len", None, length_of="data"), |
| StrLenField("data", "", |
| length_from=lambda pkt: pkt.len)] |
| def guess_payload_class(self, p): |
| return Padding |
| |
| class SupDataEntryUM(Packet): |
| name = "Supplemental Data Entry - User Mapping" |
| fields_desc = [ ShortField("sdtype", None), |
| FieldLenField("len", None, length_of="data", |
| adjust=lambda pkt, x: x+2), |
| FieldLenField("dlen", None, length_of="data"), |
| PacketListField("data", [], UserMappingData, |
| length_from=lambda pkt:pkt.dlen) ] |
| def guess_payload_class(self, p): |
| return Padding |
| |
| class TLSSupplementalData(_TLSHandshake): |
| name = "TLS Handshake - Supplemental Data" |
| fields_desc = [ ByteEnumField("msgtype", 23, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| ThreeBytesLenField("sdatalen", None, length_of="sdata"), |
| PacketListField("sdata", [], SupDataEntry, |
| length_from=lambda pkt: pkt.sdatalen) ] |
| |
| |
| ############################################################################### |
| ### NewSessionTicket ### |
| ############################################################################### |
| |
| class TLSNewSessionTicket(_TLSHandshake): |
| """ |
| XXX When knowing the right secret, we should be able to read the ticket. |
| """ |
| name = "TLS Handshake - New Session Ticket" |
| fields_desc = [ ByteEnumField("msgtype", 4, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| IntField("lifetime", 0xffffffff), |
| FieldLenField("ticketlen", None, length_of="ticket"), |
| StrLenField("ticket", "", |
| length_from=lambda pkt: pkt.ticketlen) ] |
| |
| @classmethod |
| def dispatch_hook(cls, _pkt=None, *args, **kargs): |
| s = kargs.get("tls_session", None) |
| if s and s.tls_version >= 0x0304: |
| return TLS13NewSessionTicket |
| return TLSNewSessionTicket |
| |
| def post_dissection_tls_session_update(self, msg_str): |
| self.tls_session_update(msg_str) |
| if self.tls_session.connection_end == "client": |
| self.tls_session.client_session_ticket = self.ticket |
| |
| |
| class TLS13NewSessionTicket(_TLSHandshake): |
| """ |
| Uncomment the TicketField line for parsing a RFC 5077 ticket. |
| """ |
| name = "TLS Handshake - New Session Ticket" |
| fields_desc = [ ByteEnumField("msgtype", 4, _tls_handshake_type), |
| ThreeBytesField("msglen", None), |
| IntField("ticket_lifetime", 0xffffffff), |
| IntField("ticket_age_add", 0), |
| FieldLenField("ticketlen", None, length_of="ticket"), |
| #TicketField("ticket", "", |
| StrLenField("ticket", "", |
| length_from=lambda pkt: pkt.ticketlen), |
| _ExtensionsLenField("extlen", None, length_of="ext"), |
| _ExtensionsField("ext", None, |
| length_from=lambda pkt: (pkt.msglen - |
| (pkt.ticketlen or 0) - |
| 12)) ] |
| |
| def post_dissection_tls_session_update(self, msg_str): |
| self.tls_session_update(msg_str) |
| if self.tls_session.connection_end == "client": |
| self.tls_session.client_session_ticket = self.ticket |
| |
| |
| ############################################################################### |
| ### All handshake messages defined in this module ### |
| ############################################################################### |
| |
| _tls_handshake_cls = { 0: TLSHelloRequest, 1: TLSClientHello, |
| 2: TLSServerHello, 3: TLSHelloVerifyRequest, |
| 4: TLSNewSessionTicket, 6: TLSHelloRetryRequest, |
| 8: TLSEncryptedExtensions, 11: TLSCertificate, |
| 12: TLSServerKeyExchange, 13: TLSCertificateRequest, |
| 14: TLSServerHelloDone, 15: TLSCertificateVerify, |
| 16: TLSClientKeyExchange, 20: TLSFinished, |
| 21: TLSCertificateURL, 22: TLSCertificateStatus, |
| 23: TLSSupplementalData } |
| |