| ############################################################################# |
| ## ipsec.py --- IPsec support for Scapy ## |
| ## ## |
| ## Copyright (C) 2014 6WIND ## |
| ## ## |
| ## This program is free software; you can redistribute it and/or modify it ## |
| ## under the terms of the GNU General Public License version 2 as ## |
| ## published by the Free Software Foundation. ## |
| ## ## |
| ## This program is distributed in the hope that it will be useful, but ## |
| ## WITHOUT ANY WARRANTY; without even the implied warranty of ## |
| ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## |
| ## General Public License for more details. ## |
| ############################################################################# |
| """ |
| IPsec layer |
| =========== |
| |
| Example of use: |
| |
| >>> sa = SecurityAssociation(ESP, spi=0xdeadbeef, crypt_algo='AES-CBC', |
| ... crypt_key='sixteenbytes key') |
| >>> p = IP(src='1.1.1.1', dst='2.2.2.2') |
| >>> p /= TCP(sport=45012, dport=80) |
| >>> p /= Raw('testdata') |
| >>> p = IP(raw(p)) |
| >>> p |
| <IP version=4L ihl=5L tos=0x0 len=48 id=1 flags= frag=0L ttl=64 proto=tcp chksum=0x74c2 src=1.1.1.1 dst=2.2.2.2 options=[] |<TCP sport=45012 dport=http seq=0 ack=0 dataofs=5L reserved=0L flags=S window=8192 chksum=0x1914 urgptr=0 options=[] |<Raw load='testdata' |>>> |
| >>> |
| >>> e = sa.encrypt(p) |
| >>> e |
| <IP version=4L ihl=5L tos=0x0 len=76 id=1 flags= frag=0L ttl=64 proto=esp chksum=0x747a src=1.1.1.1 dst=2.2.2.2 |<ESP spi=0xdeadbeef seq=1 data=b'\xf8\xdb\x1e\x83[T\xab\\\xd2\x1b\xed\xd1\xe5\xc8Y\xc2\xa5d\x92\xc1\x05\x17\xa6\x92\x831\xe6\xc1]\x9a\xd6K}W\x8bFfd\xa5B*+\xde\xc8\x89\xbf{\xa9' |>> |
| >>> |
| >>> d = sa.decrypt(e) |
| >>> d |
| <IP version=4L ihl=5L tos=0x0 len=48 id=1 flags= frag=0L ttl=64 proto=tcp chksum=0x74c2 src=1.1.1.1 dst=2.2.2.2 |<TCP sport=45012 dport=http seq=0 ack=0 dataofs=5L reserved=0L flags=S window=8192 chksum=0x1914 urgptr=0 options=[] |<Raw load='testdata' |>>> |
| >>> |
| >>> d == p |
| True |
| """ |
| |
| from __future__ import absolute_import |
| from fractions import gcd |
| import os |
| import socket |
| import struct |
| |
| from scapy.config import conf, crypto_validator |
| from scapy.compat import orb, raw |
| from scapy.data import IP_PROTOS |
| from scapy.compat import * |
| from scapy.error import log_loading |
| from scapy.fields import ByteEnumField, ByteField, IntField, PacketField, \ |
| ShortField, StrField, XIntField, XStrField, XStrLenField |
| from scapy.packet import Packet, bind_layers, Raw |
| from scapy.layers.inet import IP, UDP |
| import scapy.modules.six as six |
| from scapy.modules.six.moves import range |
| from scapy.layers.inet6 import IPv6, IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt, \ |
| IPv6ExtHdrRouting |
| |
| |
| #------------------------------------------------------------------------------ |
| class AH(Packet): |
| """ |
| Authentication Header |
| |
| See https://tools.ietf.org/rfc/rfc4302.txt |
| """ |
| |
| name = 'AH' |
| |
| def __get_icv_len(self): |
| """ |
| Compute the size of the ICV based on the payloadlen field. |
| Padding size is included as it can only be known from the authentication |
| algorithm provided by the Security Association. |
| """ |
| # payloadlen = length of AH in 32-bit words (4-byte units), minus "2" |
| # payloadlen = 3 32-bit word fixed fields + ICV + padding - 2 |
| # ICV = (payloadlen + 2 - 3 - padding) in 32-bit words |
| return (self.payloadlen - 1) * 4 |
| |
| fields_desc = [ |
| ByteEnumField('nh', None, IP_PROTOS), |
| ByteField('payloadlen', None), |
| ShortField('reserved', None), |
| XIntField('spi', 0x0), |
| IntField('seq', 0), |
| XStrLenField('icv', None, length_from=__get_icv_len), |
| # Padding len can only be known with the SecurityAssociation.auth_algo |
| XStrLenField('padding', None, length_from=lambda x: 0), |
| ] |
| |
| overload_fields = { |
| IP: {'proto': socket.IPPROTO_AH}, |
| IPv6: {'nh': socket.IPPROTO_AH}, |
| IPv6ExtHdrHopByHop: {'nh': socket.IPPROTO_AH}, |
| IPv6ExtHdrDestOpt: {'nh': socket.IPPROTO_AH}, |
| IPv6ExtHdrRouting: {'nh': socket.IPPROTO_AH}, |
| } |
| |
| bind_layers(IP, AH, proto=socket.IPPROTO_AH) |
| bind_layers(IPv6, AH, nh=socket.IPPROTO_AH) |
| bind_layers(AH, IP, nh=socket.IPPROTO_IP) |
| bind_layers(AH, IPv6, nh=socket.IPPROTO_IPV6) |
| |
| #------------------------------------------------------------------------------ |
| class ESP(Packet): |
| """ |
| Encapsulated Security Payload |
| |
| See https://tools.ietf.org/rfc/rfc4303.txt |
| """ |
| name = 'ESP' |
| |
| fields_desc = [ |
| XIntField('spi', 0x0), |
| IntField('seq', 0), |
| XStrField('data', None), |
| ] |
| |
| overload_fields = { |
| IP: {'proto': socket.IPPROTO_ESP}, |
| IPv6: {'nh': socket.IPPROTO_ESP}, |
| IPv6ExtHdrHopByHop: {'nh': socket.IPPROTO_ESP}, |
| IPv6ExtHdrDestOpt: {'nh': socket.IPPROTO_ESP}, |
| IPv6ExtHdrRouting: {'nh': socket.IPPROTO_ESP}, |
| } |
| |
| bind_layers(IP, ESP, proto=socket.IPPROTO_ESP) |
| bind_layers(IPv6, ESP, nh=socket.IPPROTO_ESP) |
| bind_layers(UDP, ESP, dport=4500) # NAT-Traversal encapsulation |
| bind_layers(UDP, ESP, sport=4500) # NAT-Traversal encapsulation |
| |
| #------------------------------------------------------------------------------ |
| class _ESPPlain(Packet): |
| """ |
| Internal class to represent unencrypted ESP packets. |
| """ |
| name = 'ESP' |
| |
| fields_desc = [ |
| XIntField('spi', 0x0), |
| IntField('seq', 0), |
| |
| StrField('iv', ''), |
| PacketField('data', '', Raw), |
| StrField('padding', ''), |
| |
| ByteField('padlen', 0), |
| ByteEnumField('nh', 0, IP_PROTOS), |
| StrField('icv', ''), |
| ] |
| |
| def data_for_encryption(self): |
| return raw(self.data) + self.padding + struct.pack("BB", self.padlen, self.nh) |
| |
| #------------------------------------------------------------------------------ |
| if conf.crypto_valid: |
| from cryptography.exceptions import InvalidTag |
| from cryptography.hazmat.backends import default_backend |
| from cryptography.hazmat.primitives.ciphers import ( |
| Cipher, |
| algorithms, |
| modes, |
| ) |
| else: |
| log_loading.info("Can't import python-cryptography v1.7+. " |
| "Disabled IPsec encryption/authentication.") |
| InvalidTag = default_backend = None |
| Cipher = algorithms = modes = None |
| |
| #------------------------------------------------------------------------------ |
| def _lcm(a, b): |
| """ |
| Least Common Multiple between 2 integers. |
| """ |
| if a == 0 or b == 0: |
| return 0 |
| else: |
| return abs(a * b) // gcd(a, b) |
| |
| class CryptAlgo(object): |
| """ |
| IPsec encryption algorithm |
| """ |
| |
| def __init__(self, name, cipher, mode, block_size=None, iv_size=None, |
| key_size=None, icv_size=None, salt_size=None, format_mode_iv=None): |
| """ |
| @param name: the name of this encryption algorithm |
| @param cipher: a Cipher module |
| @param mode: the mode used with the cipher module |
| @param block_size: the length a block for this algo. Defaults to the |
| `block_size` of the cipher. |
| @param iv_size: the length of the initialization vector of this algo. |
| Defaults to the `block_size` of the cipher. |
| @param key_size: an integer or list/tuple of integers. If specified, |
| force the secret keys length to one of the values. |
| Defaults to the `key_size` of the cipher. |
| @param icv_size: the length of the Integrity Check Value of this algo. |
| Used by Combined Mode Algorithms e.g. GCM |
| @param salt_size: the length of the salt to use as the IV prefix. |
| Usually used by Counter modes e.g. CTR |
| @param format_mode_iv: function to format the Initialization Vector |
| e.g. handle the salt value |
| Default is the random buffer from `generate_iv` |
| """ |
| self.name = name |
| self.cipher = cipher |
| self.mode = mode |
| self.icv_size = icv_size |
| |
| if modes and self.mode is not None: |
| self.is_aead = issubclass(self.mode, |
| modes.ModeWithAuthenticationTag) |
| else: |
| self.is_aead = False |
| |
| if block_size is not None: |
| self.block_size = block_size |
| elif cipher is not None: |
| self.block_size = cipher.block_size // 8 |
| else: |
| self.block_size = 1 |
| |
| if iv_size is None: |
| self.iv_size = self.block_size |
| else: |
| self.iv_size = iv_size |
| |
| if key_size is not None: |
| self.key_size = key_size |
| elif cipher is not None: |
| self.key_size = tuple(i // 8 for i in cipher.key_sizes) |
| else: |
| self.key_size = None |
| |
| if salt_size is None: |
| self.salt_size = 0 |
| else: |
| self.salt_size = salt_size |
| |
| if format_mode_iv is None: |
| self._format_mode_iv = lambda iv, **kw: iv |
| else: |
| self._format_mode_iv = format_mode_iv |
| |
| def check_key(self, key): |
| """ |
| Check that the key length is valid. |
| |
| @param key: a byte string |
| """ |
| if self.key_size and not (len(key) == self.key_size or len(key) in self.key_size): |
| raise TypeError('invalid key size %s, must be %s' % |
| (len(key), self.key_size)) |
| |
| def generate_iv(self): |
| """ |
| Generate a random initialization vector. |
| """ |
| # XXX: Handle counter modes with real counters? RFCs allow the use of |
| # XXX: random bytes for counters, so it is not wrong to do it that way |
| return os.urandom(self.iv_size) |
| |
| @crypto_validator |
| def new_cipher(self, key, mode_iv, digest=None): |
| """ |
| @param key: the secret key, a byte string |
| @param mode_iv: the initialization vector or nonce, a byte string. |
| Formatted by `format_mode_iv`. |
| @param digest: also known as tag or icv. A byte string containing the |
| digest of the encrypted data. Only use this during |
| decryption! |
| |
| @return: an initialized cipher object for this algo |
| """ |
| if self.is_aead and digest is not None: |
| # With AEAD, the mode needs the digest during decryption. |
| return Cipher( |
| self.cipher(key), |
| self.mode(mode_iv, digest, len(digest)), |
| default_backend(), |
| ) |
| else: |
| return Cipher( |
| self.cipher(key), |
| self.mode(mode_iv), |
| default_backend(), |
| ) |
| |
| def pad(self, esp): |
| """ |
| Add the correct amount of padding so that the data to encrypt is |
| exactly a multiple of the algorithm's block size. |
| |
| Also, make sure that the total ESP packet length is a multiple of 4 |
| bytes. |
| |
| @param esp: an unencrypted _ESPPlain packet |
| |
| @return: an unencrypted _ESPPlain packet with valid padding |
| """ |
| # 2 extra bytes for padlen and nh |
| data_len = len(esp.data) + 2 |
| |
| # according to the RFC4303, section 2.4. Padding (for Encryption) |
| # the size of the ESP payload must be a multiple of 32 bits |
| align = _lcm(self.block_size, 4) |
| |
| # pad for block size |
| esp.padlen = -data_len % align |
| |
| # Still according to the RFC, the default value for padding *MUST* be an |
| # array of bytes starting from 1 to padlen |
| # TODO: Handle padding function according to the encryption algo |
| esp.padding = struct.pack("B" * esp.padlen, *range(1, esp.padlen + 1)) |
| |
| # If the following test fails, it means that this algo does not comply |
| # with the RFC |
| payload_len = len(esp.iv) + len(esp.data) + len(esp.padding) + 2 |
| if payload_len % 4 != 0: |
| raise ValueError('The size of the ESP data is not aligned to 32 bits after padding.') |
| |
| return esp |
| |
| def encrypt(self, sa, esp, key): |
| """ |
| Encrypt an ESP packet |
| |
| @param sa: the SecurityAssociation associated with the ESP packet. |
| @param esp: an unencrypted _ESPPlain packet with valid padding |
| @param key: the secret key used for encryption |
| |
| @return: a valid ESP packet encrypted with this algorithm |
| """ |
| data = esp.data_for_encryption() |
| |
| if self.cipher: |
| mode_iv = self._format_mode_iv(algo=self, sa=sa, iv=esp.iv) |
| cipher = self.new_cipher(key, mode_iv) |
| encryptor = cipher.encryptor() |
| |
| if self.is_aead: |
| aad = struct.pack('!LL', esp.spi, esp.seq) |
| encryptor.authenticate_additional_data(aad) |
| data = encryptor.update(data) + encryptor.finalize() |
| data += encryptor.tag[:self.icv_size] |
| else: |
| data = encryptor.update(data) + encryptor.finalize() |
| |
| return ESP(spi=esp.spi, seq=esp.seq, data=esp.iv + data) |
| |
| def decrypt(self, sa, esp, key, icv_size=None): |
| """ |
| Decrypt an ESP packet |
| |
| @param sa: the SecurityAssociation associated with the ESP packet. |
| @param esp: an encrypted ESP packet |
| @param key: the secret key used for encryption |
| @param icv_size: the length of the icv used for integrity check |
| |
| @return: a valid ESP packet encrypted with this algorithm |
| @raise IPSecIntegrityError: if the integrity check fails with an AEAD |
| algorithm |
| """ |
| if icv_size is None: |
| icv_size = self.icv_size if self.is_aead else 0 |
| |
| iv = esp.data[:self.iv_size] |
| data = esp.data[self.iv_size:len(esp.data) - icv_size] |
| icv = esp.data[len(esp.data) - icv_size:] |
| |
| if self.cipher: |
| mode_iv = self._format_mode_iv(sa=sa, iv=iv) |
| cipher = self.new_cipher(key, mode_iv, icv) |
| decryptor = cipher.decryptor() |
| |
| if self.is_aead: |
| # Tag value check is done during the finalize method |
| decryptor.authenticate_additional_data( |
| struct.pack('!LL', esp.spi, esp.seq) |
| ) |
| |
| try: |
| data = decryptor.update(data) + decryptor.finalize() |
| except InvalidTag as err: |
| raise IPSecIntegrityError(err) |
| |
| # extract padlen and nh |
| padlen = orb(data[-2]) |
| nh = orb(data[-1]) |
| |
| # then use padlen to determine data and padding |
| data = data[:len(data) - padlen - 2] |
| padding = data[len(data) - padlen - 2: len(data) - 2] |
| |
| return _ESPPlain(spi=esp.spi, |
| seq=esp.seq, |
| iv=iv, |
| data=data, |
| padding=padding, |
| padlen=padlen, |
| nh=nh, |
| icv=icv) |
| |
| #------------------------------------------------------------------------------ |
| # The names of the encryption algorithms are the same than in scapy.contrib.ikev2 |
| # see http://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xhtml |
| |
| CRYPT_ALGOS = { |
| 'NULL': CryptAlgo('NULL', cipher=None, mode=None, iv_size=0), |
| } |
| |
| if algorithms: |
| CRYPT_ALGOS['AES-CBC'] = CryptAlgo('AES-CBC', |
| cipher=algorithms.AES, |
| mode=modes.CBC) |
| _aes_ctr_format_mode_iv = lambda sa, iv, **kw: sa.crypt_salt + iv + b'\x00\x00\x00\x01' |
| CRYPT_ALGOS['AES-CTR'] = CryptAlgo('AES-CTR', |
| cipher=algorithms.AES, |
| mode=modes.CTR, |
| iv_size=8, |
| salt_size=4, |
| format_mode_iv=_aes_ctr_format_mode_iv) |
| _salt_format_mode_iv = lambda sa, iv, **kw: sa.crypt_salt + iv |
| CRYPT_ALGOS['AES-GCM'] = CryptAlgo('AES-GCM', |
| cipher=algorithms.AES, |
| mode=modes.GCM, |
| salt_size=4, |
| iv_size=8, |
| icv_size=16, |
| format_mode_iv=_salt_format_mode_iv) |
| if hasattr(modes, 'CCM'): |
| CRYPT_ALGOS['AES-CCM'] = CryptAlgo('AES-CCM', |
| cipher=algorithms.AES, |
| mode=modes.CCM, |
| iv_size=8, |
| salt_size=3, |
| icv_size=16, |
| format_mode_iv=_salt_format_mode_iv) |
| # XXX: Flagged as weak by 'cryptography'. Kept for backward compatibility |
| CRYPT_ALGOS['Blowfish'] = CryptAlgo('Blowfish', |
| cipher=algorithms.Blowfish, |
| mode=modes.CBC) |
| # XXX: RFC7321 states that DES *MUST NOT* be implemented. |
| # XXX: Keep for backward compatibility? |
| # Using a TripleDES cipher algorithm for DES is done by using the same 64 |
| # bits key 3 times (done by cryptography when given a 64 bits key) |
| CRYPT_ALGOS['DES'] = CryptAlgo('DES', |
| cipher=algorithms.TripleDES, |
| mode=modes.CBC, |
| key_size=(8,)) |
| CRYPT_ALGOS['3DES'] = CryptAlgo('3DES', |
| cipher=algorithms.TripleDES, |
| mode=modes.CBC) |
| CRYPT_ALGOS['CAST'] = CryptAlgo('CAST', |
| cipher=algorithms.CAST5, |
| mode=modes.CBC) |
| |
| #------------------------------------------------------------------------------ |
| if conf.crypto_valid: |
| from cryptography.hazmat.primitives.hmac import HMAC |
| from cryptography.hazmat.primitives.cmac import CMAC |
| from cryptography.hazmat.primitives import hashes |
| else: |
| # no error if cryptography is not available but authentication won't be supported |
| HMAC = CMAC = hashes = None |
| |
| #------------------------------------------------------------------------------ |
| class IPSecIntegrityError(Exception): |
| """ |
| Error risen when the integrity check fails. |
| """ |
| pass |
| |
| class AuthAlgo(object): |
| """ |
| IPsec integrity algorithm |
| """ |
| |
| def __init__(self, name, mac, digestmod, icv_size, key_size=None): |
| """ |
| @param name: the name of this integrity algorithm |
| @param mac: a Message Authentication Code module |
| @param digestmod: a Hash or Cipher module |
| @param icv_size: the length of the integrity check value of this algo |
| @param key_size: an integer or list/tuple of integers. If specified, |
| force the secret keys length to one of the values. |
| Defaults to the `key_size` of the cipher. |
| """ |
| self.name = name |
| self.mac = mac |
| self.digestmod = digestmod |
| self.icv_size = icv_size |
| self.key_size = key_size |
| |
| def check_key(self, key): |
| """ |
| Check that the key length is valid. |
| |
| @param key: a byte string |
| """ |
| if self.key_size and len(key) not in self.key_size: |
| raise TypeError('invalid key size %s, must be one of %s' % |
| (len(key), self.key_size)) |
| |
| @crypto_validator |
| def new_mac(self, key): |
| """ |
| @param key: a byte string |
| @return: an initialized mac object for this algo |
| """ |
| if self.mac is CMAC: |
| return self.mac(self.digestmod(key), default_backend()) |
| else: |
| return self.mac(key, self.digestmod(), default_backend()) |
| |
| def sign(self, pkt, key): |
| """ |
| Sign an IPsec (ESP or AH) packet with this algo. |
| |
| @param pkt: a packet that contains a valid encrypted ESP or AH layer |
| @param key: the authentication key, a byte string |
| |
| @return: the signed packet |
| """ |
| if not self.mac: |
| return pkt |
| |
| mac = self.new_mac(key) |
| |
| if pkt.haslayer(ESP): |
| mac.update(raw(pkt[ESP])) |
| pkt[ESP].data += mac.finalize()[:self.icv_size] |
| |
| elif pkt.haslayer(AH): |
| clone = zero_mutable_fields(pkt.copy(), sending=True) |
| mac.update(raw(clone)) |
| pkt[AH].icv = mac.finalize()[:self.icv_size] |
| |
| return pkt |
| |
| def verify(self, pkt, key): |
| """ |
| Check that the integrity check value (icv) of a packet is valid. |
| |
| @param pkt: a packet that contains a valid encrypted ESP or AH layer |
| @param key: the authentication key, a byte string |
| |
| @raise IPSecIntegrityError: if the integrity check fails |
| """ |
| if not self.mac or self.icv_size == 0: |
| return |
| |
| mac = self.new_mac(key) |
| |
| pkt_icv = 'not found' |
| computed_icv = 'not computed' |
| |
| if isinstance(pkt, ESP): |
| pkt_icv = pkt.data[len(pkt.data) - self.icv_size:] |
| clone = pkt.copy() |
| clone.data = clone.data[:len(clone.data) - self.icv_size] |
| |
| elif pkt.haslayer(AH): |
| if len(pkt[AH].icv) != self.icv_size: |
| # Fill padding since we know the actual icv_size |
| pkt[AH].padding = pkt[AH].icv[self.icv_size:] |
| pkt[AH].icv = pkt[AH].icv[:self.icv_size] |
| pkt_icv = pkt[AH].icv |
| clone = zero_mutable_fields(pkt.copy(), sending=False) |
| |
| mac.update(raw(clone)) |
| computed_icv = mac.finalize()[:self.icv_size] |
| |
| # XXX: Cannot use mac.verify because the ICV can be truncated |
| if pkt_icv != computed_icv: |
| raise IPSecIntegrityError('pkt_icv=%r, computed_icv=%r' % |
| (pkt_icv, computed_icv)) |
| |
| #------------------------------------------------------------------------------ |
| # The names of the integrity algorithms are the same than in scapy.contrib.ikev2 |
| # see http://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xhtml |
| |
| AUTH_ALGOS = { |
| 'NULL': AuthAlgo('NULL', mac=None, digestmod=None, icv_size=0), |
| } |
| |
| if HMAC and hashes: |
| # XXX: NIST has deprecated SHA1 but is required by RFC7321 |
| AUTH_ALGOS['HMAC-SHA1-96'] = AuthAlgo('HMAC-SHA1-96', |
| mac=HMAC, |
| digestmod=hashes.SHA1, |
| icv_size=12) |
| AUTH_ALGOS['SHA2-256-128'] = AuthAlgo('SHA2-256-128', |
| mac=HMAC, |
| digestmod=hashes.SHA256, |
| icv_size=16) |
| AUTH_ALGOS['SHA2-384-192'] = AuthAlgo('SHA2-384-192', |
| mac=HMAC, |
| digestmod=hashes.SHA384, |
| icv_size=24) |
| AUTH_ALGOS['SHA2-512-256'] = AuthAlgo('SHA2-512-256', |
| mac=HMAC, |
| digestmod=hashes.SHA512, |
| icv_size=32) |
| # XXX:Flagged as deprecated by 'cryptography'. Kept for backward compat |
| AUTH_ALGOS['HMAC-MD5-96'] = AuthAlgo('HMAC-MD5-96', |
| mac=HMAC, |
| digestmod=hashes.MD5, |
| icv_size=12) |
| if CMAC and algorithms: |
| AUTH_ALGOS['AES-CMAC-96'] = AuthAlgo('AES-CMAC-96', |
| mac=CMAC, |
| digestmod=algorithms.AES, |
| icv_size=12, |
| key_size=(16,)) |
| |
| #------------------------------------------------------------------------------ |
| def split_for_transport(orig_pkt, transport_proto): |
| """ |
| Split an IP(v6) packet in the correct location to insert an ESP or AH |
| header. |
| |
| @param orig_pkt: the packet to split. Must be an IP or IPv6 packet |
| @param transport_proto: the IPsec protocol number that will be inserted |
| at the split position. |
| @return: a tuple (header, nh, payload) where nh is the protocol number of |
| payload. |
| """ |
| # force resolution of default fields to avoid padding errors |
| header = orig_pkt.__class__(raw(orig_pkt)) |
| next_hdr = header.payload |
| nh = None |
| |
| if header.version == 4: |
| nh = header.proto |
| header.proto = transport_proto |
| header.remove_payload() |
| del header.chksum |
| del header.len |
| |
| return header, nh, next_hdr |
| else: |
| found_rt_hdr = False |
| prev = header |
| |
| # Since the RFC 4302 is vague about where the ESP/AH headers should be |
| # inserted in IPv6, I chose to follow the linux implementation. |
| while isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrRouting, IPv6ExtHdrDestOpt)): |
| if isinstance(next_hdr, IPv6ExtHdrHopByHop): |
| pass |
| if isinstance(next_hdr, IPv6ExtHdrRouting): |
| found_rt_hdr = True |
| elif isinstance(next_hdr, IPv6ExtHdrDestOpt) and found_rt_hdr: |
| break |
| |
| prev = next_hdr |
| next_hdr = next_hdr.payload |
| |
| nh = prev.nh |
| prev.nh = transport_proto |
| prev.remove_payload() |
| del header.plen |
| |
| return header, nh, next_hdr |
| |
| #------------------------------------------------------------------------------ |
| # see RFC 4302 - Appendix A. Mutability of IP Options/Extension Headers |
| IMMUTABLE_IPV4_OPTIONS = ( |
| 0, # End Of List |
| 1, # No OPeration |
| 2, # Security |
| 5, # Extended Security |
| 6, # Commercial Security |
| 20, # Router Alert |
| 21, # Sender Directed Multi-Destination Delivery |
| ) |
| def zero_mutable_fields(pkt, sending=False): |
| """ |
| When using AH, all "mutable" fields must be "zeroed" before calculating |
| the ICV. See RFC 4302, Section 3.3.3.1. Handling Mutable Fields. |
| |
| @param pkt: an IP(v6) packet containing an AH layer. |
| NOTE: The packet will be modified |
| @param sending: if true, ipv6 routing headers will not be reordered |
| """ |
| |
| if pkt.haslayer(AH): |
| pkt[AH].icv = b"\x00" * len(pkt[AH].icv) |
| else: |
| raise TypeError('no AH layer found') |
| |
| if pkt.version == 4: |
| # the tos field has been replaced by DSCP and ECN |
| # Routers may rewrite the DS field as needed to provide a |
| # desired local or end-to-end service |
| pkt.tos = 0 |
| # an intermediate router might set the DF bit, even if the source |
| # did not select it. |
| pkt.flags = 0 |
| # changed en route as a normal course of processing by routers |
| pkt.ttl = 0 |
| # will change if any of these other fields change |
| pkt.chksum = 0 |
| |
| immutable_opts = [] |
| for opt in pkt.options: |
| if opt.option in IMMUTABLE_IPV4_OPTIONS: |
| immutable_opts.append(opt) |
| else: |
| immutable_opts.append(Raw(b"\x00" * len(opt))) |
| pkt.options = immutable_opts |
| |
| else: |
| # holds DSCP and ECN |
| pkt.tc = 0 |
| # The flow label described in AHv1 was mutable, and in RFC 2460 [DH98] |
| # was potentially mutable. To retain compatibility with existing AH |
| # implementations, the flow label is not included in the ICV in AHv2. |
| pkt.fl = 0 |
| # same as ttl |
| pkt.hlim = 0 |
| |
| next_hdr = pkt.payload |
| |
| while isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrRouting, IPv6ExtHdrDestOpt)): |
| if isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt)): |
| for opt in next_hdr.options: |
| if opt.otype & 0x20: |
| # option data can change en-route and must be zeroed |
| opt.optdata = b"\x00" * opt.optlen |
| elif isinstance(next_hdr, IPv6ExtHdrRouting) and sending: |
| # The sender must order the field so that it appears as it |
| # will at the receiver, prior to performing the ICV computation. |
| next_hdr.segleft = 0 |
| if next_hdr.addresses: |
| final = next_hdr.addresses.pop() |
| next_hdr.addresses.insert(0, pkt.dst) |
| pkt.dst = final |
| else: |
| break |
| |
| next_hdr = next_hdr.payload |
| |
| return pkt |
| |
| #------------------------------------------------------------------------------ |
| class SecurityAssociation(object): |
| """ |
| This class is responsible of "encryption" and "decryption" of IPsec packets. |
| """ |
| |
| SUPPORTED_PROTOS = (IP, IPv6) |
| |
| def __init__(self, proto, spi, seq_num=1, crypt_algo=None, crypt_key=None, |
| auth_algo=None, auth_key=None, tunnel_header=None, nat_t_header=None): |
| """ |
| @param proto: the IPsec proto to use (ESP or AH) |
| @param spi: the Security Parameters Index of this SA |
| @param seq_num: the initial value for the sequence number on encrypted |
| packets |
| @param crypt_algo: the encryption algorithm name (only used with ESP) |
| @param crypt_key: the encryption key (only used with ESP) |
| @param auth_algo: the integrity algorithm name |
| @param auth_key: the integrity key |
| @param tunnel_header: an instance of a IP(v6) header that will be used |
| to encapsulate the encrypted packets. |
| @param nat_t_header: an instance of a UDP header that will be used |
| for NAT-Traversal. |
| """ |
| |
| if proto not in (ESP, AH, ESP.name, AH.name): |
| raise ValueError("proto must be either ESP or AH") |
| if isinstance(proto, six.string_types): |
| self.proto = eval(proto) |
| else: |
| self.proto = proto |
| |
| self.spi = spi |
| self.seq_num = seq_num |
| |
| if crypt_algo: |
| if crypt_algo not in CRYPT_ALGOS: |
| raise TypeError('unsupported encryption algo %r, try %r' % |
| (crypt_algo, list(CRYPT_ALGOS))) |
| self.crypt_algo = CRYPT_ALGOS[crypt_algo] |
| |
| if crypt_key: |
| salt_size = self.crypt_algo.salt_size |
| self.crypt_key = crypt_key[:len(crypt_key) - salt_size] |
| self.crypt_salt = crypt_key[len(crypt_key) - salt_size:] |
| else: |
| self.crypt_key = None |
| self.crypt_salt = None |
| |
| else: |
| self.crypt_algo = CRYPT_ALGOS['NULL'] |
| self.crypt_key = None |
| |
| if auth_algo: |
| if auth_algo not in AUTH_ALGOS: |
| raise TypeError('unsupported integrity algo %r, try %r' % |
| (auth_algo, list(AUTH_ALGOS))) |
| self.auth_algo = AUTH_ALGOS[auth_algo] |
| self.auth_key = auth_key |
| else: |
| self.auth_algo = AUTH_ALGOS['NULL'] |
| self.auth_key = None |
| |
| if tunnel_header and not isinstance(tunnel_header, (IP, IPv6)): |
| raise TypeError('tunnel_header must be %s or %s' % (IP.name, IPv6.name)) |
| self.tunnel_header = tunnel_header |
| |
| if nat_t_header: |
| if proto is not ESP: |
| raise TypeError('nat_t_header is only allowed with ESP') |
| if not isinstance(nat_t_header, UDP): |
| raise TypeError('nat_t_header must be %s' % UDP.name) |
| self.nat_t_header = nat_t_header |
| |
| def check_spi(self, pkt): |
| if pkt.spi != self.spi: |
| raise TypeError('packet spi=0x%x does not match the SA spi=0x%x' % |
| (pkt.spi, self.spi)) |
| |
| def _encrypt_esp(self, pkt, seq_num=None, iv=None): |
| |
| if iv is None: |
| iv = self.crypt_algo.generate_iv() |
| else: |
| if len(iv) != self.crypt_algo.iv_size: |
| raise TypeError('iv length must be %s' % self.crypt_algo.iv_size) |
| |
| esp = _ESPPlain(spi=self.spi, seq=seq_num or self.seq_num, iv=iv) |
| |
| if self.tunnel_header: |
| tunnel = self.tunnel_header.copy() |
| |
| if tunnel.version == 4: |
| del tunnel.proto |
| del tunnel.len |
| del tunnel.chksum |
| else: |
| del tunnel.nh |
| del tunnel.plen |
| |
| pkt = tunnel.__class__(raw(tunnel / pkt)) |
| |
| ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_ESP) |
| esp.data = payload |
| esp.nh = nh |
| |
| esp = self.crypt_algo.pad(esp) |
| esp = self.crypt_algo.encrypt(self, esp, self.crypt_key) |
| |
| self.auth_algo.sign(esp, self.auth_key) |
| |
| if self.nat_t_header: |
| nat_t_header = self.nat_t_header.copy() |
| nat_t_header.chksum = 0 |
| del nat_t_header.len |
| if ip_header.version == 4: |
| del ip_header.proto |
| else: |
| del ip_header.nh |
| ip_header /= nat_t_header |
| |
| if ip_header.version == 4: |
| ip_header.len = len(ip_header) + len(esp) |
| del ip_header.chksum |
| ip_header = ip_header.__class__(raw(ip_header)) |
| else: |
| ip_header.plen = len(ip_header.payload) + len(esp) |
| |
| # sequence number must always change, unless specified by the user |
| if seq_num is None: |
| self.seq_num += 1 |
| |
| return ip_header / esp |
| |
| def _encrypt_ah(self, pkt, seq_num=None): |
| |
| ah = AH(spi=self.spi, seq=seq_num or self.seq_num, |
| icv = b"\x00" * self.auth_algo.icv_size) |
| |
| if self.tunnel_header: |
| tunnel = self.tunnel_header.copy() |
| |
| if tunnel.version == 4: |
| del tunnel.proto |
| del tunnel.len |
| del tunnel.chksum |
| else: |
| del tunnel.nh |
| del tunnel.plen |
| |
| pkt = tunnel.__class__(raw(tunnel / pkt)) |
| |
| ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_AH) |
| ah.nh = nh |
| |
| if ip_header.version == 6 and len(ah) % 8 != 0: |
| # For IPv6, the total length of the header must be a multiple of |
| # 8-octet units. |
| ah.padding = b"\x00" * (-len(ah) % 8) |
| elif len(ah) % 4 != 0: |
| # For IPv4, the total length of the header must be a multiple of |
| # 4-octet units. |
| ah.padding = b"\x00" * (-len(ah) % 4) |
| |
| # RFC 4302 - Section 2.2. Payload Length |
| # This 8-bit field specifies the length of AH in 32-bit words (4-byte |
| # units), minus "2". |
| ah.payloadlen = len(ah) // 4 - 2 |
| |
| if ip_header.version == 4: |
| ip_header.len = len(ip_header) + len(ah) + len(payload) |
| del ip_header.chksum |
| ip_header = ip_header.__class__(raw(ip_header)) |
| else: |
| ip_header.plen = len(ip_header.payload) + len(ah) + len(payload) |
| |
| signed_pkt = self.auth_algo.sign(ip_header / ah / payload, self.auth_key) |
| |
| # sequence number must always change, unless specified by the user |
| if seq_num is None: |
| self.seq_num += 1 |
| |
| return signed_pkt |
| |
| def encrypt(self, pkt, seq_num=None, iv=None): |
| """ |
| Encrypt (and encapsulate) an IP(v6) packet with ESP or AH according |
| to this SecurityAssociation. |
| |
| @param pkt: the packet to encrypt |
| @param seq_num: if specified, use this sequence number instead of the |
| generated one |
| @param iv: if specified, use this initialization vector for |
| encryption instead of a random one. |
| |
| @return: the encrypted/encapsulated packet |
| """ |
| if not isinstance(pkt, self.SUPPORTED_PROTOS): |
| raise TypeError('cannot encrypt %s, supported protos are %s' |
| % (pkt.__class__, self.SUPPORTED_PROTOS)) |
| if self.proto is ESP: |
| return self._encrypt_esp(pkt, seq_num=seq_num, iv=iv) |
| else: |
| return self._encrypt_ah(pkt, seq_num=seq_num) |
| |
| def _decrypt_esp(self, pkt, verify=True): |
| |
| encrypted = pkt[ESP] |
| |
| if verify: |
| self.check_spi(pkt) |
| self.auth_algo.verify(encrypted, self.auth_key) |
| |
| esp = self.crypt_algo.decrypt(self, encrypted, self.crypt_key, |
| self.crypt_algo.icv_size or |
| self.auth_algo.icv_size) |
| |
| if self.tunnel_header: |
| # drop the tunnel header and return the payload untouched |
| |
| pkt.remove_payload() |
| if pkt.version == 4: |
| pkt.proto = esp.nh |
| else: |
| pkt.nh = esp.nh |
| cls = pkt.guess_payload_class(esp.data) |
| |
| return cls(esp.data) |
| else: |
| ip_header = pkt |
| |
| if ip_header.version == 4: |
| ip_header.proto = esp.nh |
| del ip_header.chksum |
| ip_header.remove_payload() |
| ip_header.len = len(ip_header) + len(esp.data) |
| # recompute checksum |
| ip_header = ip_header.__class__(raw(ip_header)) |
| else: |
| encrypted.underlayer.nh = esp.nh |
| encrypted.underlayer.remove_payload() |
| ip_header.plen = len(ip_header.payload) + len(esp.data) |
| |
| cls = ip_header.guess_payload_class(esp.data) |
| |
| # reassemble the ip_header with the ESP payload |
| return ip_header / cls(esp.data) |
| |
| def _decrypt_ah(self, pkt, verify=True): |
| |
| if verify: |
| self.check_spi(pkt) |
| self.auth_algo.verify(pkt, self.auth_key) |
| |
| ah = pkt[AH] |
| payload = ah.payload |
| payload.remove_underlayer(None) # useless argument... |
| |
| if self.tunnel_header: |
| return payload |
| else: |
| ip_header = pkt |
| |
| if ip_header.version == 4: |
| ip_header.proto = ah.nh |
| del ip_header.chksum |
| ip_header.remove_payload() |
| ip_header.len = len(ip_header) + len(payload) |
| # recompute checksum |
| ip_header = ip_header.__class__(raw(ip_header)) |
| else: |
| ah.underlayer.nh = ah.nh |
| ah.underlayer.remove_payload() |
| ip_header.plen = len(ip_header.payload) + len(payload) |
| |
| # reassemble the ip_header with the AH payload |
| return ip_header / payload |
| |
| def decrypt(self, pkt, verify=True): |
| """ |
| Decrypt (and decapsulate) an IP(v6) packet containing ESP or AH. |
| |
| @param pkt: the packet to decrypt |
| @param verify: if False, do not perform the integrity check |
| |
| @return: the decrypted/decapsulated packet |
| @raise IPSecIntegrityError: if the integrity check fails |
| """ |
| if not isinstance(pkt, self.SUPPORTED_PROTOS): |
| raise TypeError('cannot decrypt %s, supported protos are %s' |
| % (pkt.__class__, self.SUPPORTED_PROTOS)) |
| |
| if self.proto is ESP and pkt.haslayer(ESP): |
| return self._decrypt_esp(pkt, verify=verify) |
| elif self.proto is AH and pkt.haslayer(AH): |
| return self._decrypt_ah(pkt, verify=verify) |
| else: |
| raise TypeError('%s has no %s layer' % (pkt, self.proto.name)) |