blob: 69e7ae3b0277aaf367a6a2f8a2ffd00e78b0bc7e [file] [log] [blame]
#############################################################################
## ipsec.py --- IPsec support for Scapy ##
## ##
## Copyright (C) 2014 6WIND ##
## ##
## This program is free software; you can redistribute it and/or modify it ##
## under the terms of the GNU General Public License version 2 as ##
## published by the Free Software Foundation. ##
## ##
## This program is distributed in the hope that it will be useful, but ##
## WITHOUT ANY WARRANTY; without even the implied warranty of ##
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ##
## General Public License for more details. ##
#############################################################################
"""
IPsec layer
===========
Example of use:
>>> sa = SecurityAssociation(ESP, spi=0xdeadbeef, crypt_algo='AES-CBC',
... crypt_key='sixteenbytes key')
>>> p = IP(src='1.1.1.1', dst='2.2.2.2')
>>> p /= TCP(sport=45012, dport=80)
>>> p /= Raw('testdata')
>>> p = IP(raw(p))
>>> p
<IP version=4L ihl=5L tos=0x0 len=48 id=1 flags= frag=0L ttl=64 proto=tcp chksum=0x74c2 src=1.1.1.1 dst=2.2.2.2 options=[] |<TCP sport=45012 dport=http seq=0 ack=0 dataofs=5L reserved=0L flags=S window=8192 chksum=0x1914 urgptr=0 options=[] |<Raw load='testdata' |>>>
>>>
>>> e = sa.encrypt(p)
>>> e
<IP version=4L ihl=5L tos=0x0 len=76 id=1 flags= frag=0L ttl=64 proto=esp chksum=0x747a src=1.1.1.1 dst=2.2.2.2 |<ESP spi=0xdeadbeef seq=1 data=b'\xf8\xdb\x1e\x83[T\xab\\\xd2\x1b\xed\xd1\xe5\xc8Y\xc2\xa5d\x92\xc1\x05\x17\xa6\x92\x831\xe6\xc1]\x9a\xd6K}W\x8bFfd\xa5B*+\xde\xc8\x89\xbf{\xa9' |>>
>>>
>>> d = sa.decrypt(e)
>>> d
<IP version=4L ihl=5L tos=0x0 len=48 id=1 flags= frag=0L ttl=64 proto=tcp chksum=0x74c2 src=1.1.1.1 dst=2.2.2.2 |<TCP sport=45012 dport=http seq=0 ack=0 dataofs=5L reserved=0L flags=S window=8192 chksum=0x1914 urgptr=0 options=[] |<Raw load='testdata' |>>>
>>>
>>> d == p
True
"""
from __future__ import absolute_import
from fractions import gcd
import os
import socket
import struct
from scapy.config import conf, crypto_validator
from scapy.compat import orb, raw
from scapy.data import IP_PROTOS
from scapy.compat import *
from scapy.error import log_loading
from scapy.fields import ByteEnumField, ByteField, IntField, PacketField, \
ShortField, StrField, XIntField, XStrField, XStrLenField
from scapy.packet import Packet, bind_layers, Raw
from scapy.layers.inet import IP, UDP
import scapy.modules.six as six
from scapy.modules.six.moves import range
from scapy.layers.inet6 import IPv6, IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt, \
IPv6ExtHdrRouting
#------------------------------------------------------------------------------
class AH(Packet):
"""
Authentication Header
See https://tools.ietf.org/rfc/rfc4302.txt
"""
name = 'AH'
def __get_icv_len(self):
"""
Compute the size of the ICV based on the payloadlen field.
Padding size is included as it can only be known from the authentication
algorithm provided by the Security Association.
"""
# payloadlen = length of AH in 32-bit words (4-byte units), minus "2"
# payloadlen = 3 32-bit word fixed fields + ICV + padding - 2
# ICV = (payloadlen + 2 - 3 - padding) in 32-bit words
return (self.payloadlen - 1) * 4
fields_desc = [
ByteEnumField('nh', None, IP_PROTOS),
ByteField('payloadlen', None),
ShortField('reserved', None),
XIntField('spi', 0x0),
IntField('seq', 0),
XStrLenField('icv', None, length_from=__get_icv_len),
# Padding len can only be known with the SecurityAssociation.auth_algo
XStrLenField('padding', None, length_from=lambda x: 0),
]
overload_fields = {
IP: {'proto': socket.IPPROTO_AH},
IPv6: {'nh': socket.IPPROTO_AH},
IPv6ExtHdrHopByHop: {'nh': socket.IPPROTO_AH},
IPv6ExtHdrDestOpt: {'nh': socket.IPPROTO_AH},
IPv6ExtHdrRouting: {'nh': socket.IPPROTO_AH},
}
bind_layers(IP, AH, proto=socket.IPPROTO_AH)
bind_layers(IPv6, AH, nh=socket.IPPROTO_AH)
bind_layers(AH, IP, nh=socket.IPPROTO_IP)
bind_layers(AH, IPv6, nh=socket.IPPROTO_IPV6)
#------------------------------------------------------------------------------
class ESP(Packet):
"""
Encapsulated Security Payload
See https://tools.ietf.org/rfc/rfc4303.txt
"""
name = 'ESP'
fields_desc = [
XIntField('spi', 0x0),
IntField('seq', 0),
XStrField('data', None),
]
overload_fields = {
IP: {'proto': socket.IPPROTO_ESP},
IPv6: {'nh': socket.IPPROTO_ESP},
IPv6ExtHdrHopByHop: {'nh': socket.IPPROTO_ESP},
IPv6ExtHdrDestOpt: {'nh': socket.IPPROTO_ESP},
IPv6ExtHdrRouting: {'nh': socket.IPPROTO_ESP},
}
bind_layers(IP, ESP, proto=socket.IPPROTO_ESP)
bind_layers(IPv6, ESP, nh=socket.IPPROTO_ESP)
bind_layers(UDP, ESP, dport=4500) # NAT-Traversal encapsulation
bind_layers(UDP, ESP, sport=4500) # NAT-Traversal encapsulation
#------------------------------------------------------------------------------
class _ESPPlain(Packet):
"""
Internal class to represent unencrypted ESP packets.
"""
name = 'ESP'
fields_desc = [
XIntField('spi', 0x0),
IntField('seq', 0),
StrField('iv', ''),
PacketField('data', '', Raw),
StrField('padding', ''),
ByteField('padlen', 0),
ByteEnumField('nh', 0, IP_PROTOS),
StrField('icv', ''),
]
def data_for_encryption(self):
return raw(self.data) + self.padding + struct.pack("BB", self.padlen, self.nh)
#------------------------------------------------------------------------------
if conf.crypto_valid:
from cryptography.exceptions import InvalidTag
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import (
Cipher,
algorithms,
modes,
)
else:
log_loading.info("Can't import python-cryptography v1.7+. "
"Disabled IPsec encryption/authentication.")
InvalidTag = default_backend = None
Cipher = algorithms = modes = None
#------------------------------------------------------------------------------
def _lcm(a, b):
"""
Least Common Multiple between 2 integers.
"""
if a == 0 or b == 0:
return 0
else:
return abs(a * b) // gcd(a, b)
class CryptAlgo(object):
"""
IPsec encryption algorithm
"""
def __init__(self, name, cipher, mode, block_size=None, iv_size=None,
key_size=None, icv_size=None, salt_size=None, format_mode_iv=None):
"""
@param name: the name of this encryption algorithm
@param cipher: a Cipher module
@param mode: the mode used with the cipher module
@param block_size: the length a block for this algo. Defaults to the
`block_size` of the cipher.
@param iv_size: the length of the initialization vector of this algo.
Defaults to the `block_size` of the cipher.
@param key_size: an integer or list/tuple of integers. If specified,
force the secret keys length to one of the values.
Defaults to the `key_size` of the cipher.
@param icv_size: the length of the Integrity Check Value of this algo.
Used by Combined Mode Algorithms e.g. GCM
@param salt_size: the length of the salt to use as the IV prefix.
Usually used by Counter modes e.g. CTR
@param format_mode_iv: function to format the Initialization Vector
e.g. handle the salt value
Default is the random buffer from `generate_iv`
"""
self.name = name
self.cipher = cipher
self.mode = mode
self.icv_size = icv_size
if modes and self.mode is not None:
self.is_aead = issubclass(self.mode,
modes.ModeWithAuthenticationTag)
else:
self.is_aead = False
if block_size is not None:
self.block_size = block_size
elif cipher is not None:
self.block_size = cipher.block_size // 8
else:
self.block_size = 1
if iv_size is None:
self.iv_size = self.block_size
else:
self.iv_size = iv_size
if key_size is not None:
self.key_size = key_size
elif cipher is not None:
self.key_size = tuple(i // 8 for i in cipher.key_sizes)
else:
self.key_size = None
if salt_size is None:
self.salt_size = 0
else:
self.salt_size = salt_size
if format_mode_iv is None:
self._format_mode_iv = lambda iv, **kw: iv
else:
self._format_mode_iv = format_mode_iv
def check_key(self, key):
"""
Check that the key length is valid.
@param key: a byte string
"""
if self.key_size and not (len(key) == self.key_size or len(key) in self.key_size):
raise TypeError('invalid key size %s, must be %s' %
(len(key), self.key_size))
def generate_iv(self):
"""
Generate a random initialization vector.
"""
# XXX: Handle counter modes with real counters? RFCs allow the use of
# XXX: random bytes for counters, so it is not wrong to do it that way
return os.urandom(self.iv_size)
@crypto_validator
def new_cipher(self, key, mode_iv, digest=None):
"""
@param key: the secret key, a byte string
@param mode_iv: the initialization vector or nonce, a byte string.
Formatted by `format_mode_iv`.
@param digest: also known as tag or icv. A byte string containing the
digest of the encrypted data. Only use this during
decryption!
@return: an initialized cipher object for this algo
"""
if self.is_aead and digest is not None:
# With AEAD, the mode needs the digest during decryption.
return Cipher(
self.cipher(key),
self.mode(mode_iv, digest, len(digest)),
default_backend(),
)
else:
return Cipher(
self.cipher(key),
self.mode(mode_iv),
default_backend(),
)
def pad(self, esp):
"""
Add the correct amount of padding so that the data to encrypt is
exactly a multiple of the algorithm's block size.
Also, make sure that the total ESP packet length is a multiple of 4
bytes.
@param esp: an unencrypted _ESPPlain packet
@return: an unencrypted _ESPPlain packet with valid padding
"""
# 2 extra bytes for padlen and nh
data_len = len(esp.data) + 2
# according to the RFC4303, section 2.4. Padding (for Encryption)
# the size of the ESP payload must be a multiple of 32 bits
align = _lcm(self.block_size, 4)
# pad for block size
esp.padlen = -data_len % align
# Still according to the RFC, the default value for padding *MUST* be an
# array of bytes starting from 1 to padlen
# TODO: Handle padding function according to the encryption algo
esp.padding = struct.pack("B" * esp.padlen, *range(1, esp.padlen + 1))
# If the following test fails, it means that this algo does not comply
# with the RFC
payload_len = len(esp.iv) + len(esp.data) + len(esp.padding) + 2
if payload_len % 4 != 0:
raise ValueError('The size of the ESP data is not aligned to 32 bits after padding.')
return esp
def encrypt(self, sa, esp, key):
"""
Encrypt an ESP packet
@param sa: the SecurityAssociation associated with the ESP packet.
@param esp: an unencrypted _ESPPlain packet with valid padding
@param key: the secret key used for encryption
@return: a valid ESP packet encrypted with this algorithm
"""
data = esp.data_for_encryption()
if self.cipher:
mode_iv = self._format_mode_iv(algo=self, sa=sa, iv=esp.iv)
cipher = self.new_cipher(key, mode_iv)
encryptor = cipher.encryptor()
if self.is_aead:
aad = struct.pack('!LL', esp.spi, esp.seq)
encryptor.authenticate_additional_data(aad)
data = encryptor.update(data) + encryptor.finalize()
data += encryptor.tag[:self.icv_size]
else:
data = encryptor.update(data) + encryptor.finalize()
return ESP(spi=esp.spi, seq=esp.seq, data=esp.iv + data)
def decrypt(self, sa, esp, key, icv_size=None):
"""
Decrypt an ESP packet
@param sa: the SecurityAssociation associated with the ESP packet.
@param esp: an encrypted ESP packet
@param key: the secret key used for encryption
@param icv_size: the length of the icv used for integrity check
@return: a valid ESP packet encrypted with this algorithm
@raise IPSecIntegrityError: if the integrity check fails with an AEAD
algorithm
"""
if icv_size is None:
icv_size = self.icv_size if self.is_aead else 0
iv = esp.data[:self.iv_size]
data = esp.data[self.iv_size:len(esp.data) - icv_size]
icv = esp.data[len(esp.data) - icv_size:]
if self.cipher:
mode_iv = self._format_mode_iv(sa=sa, iv=iv)
cipher = self.new_cipher(key, mode_iv, icv)
decryptor = cipher.decryptor()
if self.is_aead:
# Tag value check is done during the finalize method
decryptor.authenticate_additional_data(
struct.pack('!LL', esp.spi, esp.seq)
)
try:
data = decryptor.update(data) + decryptor.finalize()
except InvalidTag as err:
raise IPSecIntegrityError(err)
# extract padlen and nh
padlen = orb(data[-2])
nh = orb(data[-1])
# then use padlen to determine data and padding
data = data[:len(data) - padlen - 2]
padding = data[len(data) - padlen - 2: len(data) - 2]
return _ESPPlain(spi=esp.spi,
seq=esp.seq,
iv=iv,
data=data,
padding=padding,
padlen=padlen,
nh=nh,
icv=icv)
#------------------------------------------------------------------------------
# The names of the encryption algorithms are the same than in scapy.contrib.ikev2
# see http://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xhtml
CRYPT_ALGOS = {
'NULL': CryptAlgo('NULL', cipher=None, mode=None, iv_size=0),
}
if algorithms:
CRYPT_ALGOS['AES-CBC'] = CryptAlgo('AES-CBC',
cipher=algorithms.AES,
mode=modes.CBC)
_aes_ctr_format_mode_iv = lambda sa, iv, **kw: sa.crypt_salt + iv + b'\x00\x00\x00\x01'
CRYPT_ALGOS['AES-CTR'] = CryptAlgo('AES-CTR',
cipher=algorithms.AES,
mode=modes.CTR,
iv_size=8,
salt_size=4,
format_mode_iv=_aes_ctr_format_mode_iv)
_salt_format_mode_iv = lambda sa, iv, **kw: sa.crypt_salt + iv
CRYPT_ALGOS['AES-GCM'] = CryptAlgo('AES-GCM',
cipher=algorithms.AES,
mode=modes.GCM,
salt_size=4,
iv_size=8,
icv_size=16,
format_mode_iv=_salt_format_mode_iv)
if hasattr(modes, 'CCM'):
CRYPT_ALGOS['AES-CCM'] = CryptAlgo('AES-CCM',
cipher=algorithms.AES,
mode=modes.CCM,
iv_size=8,
salt_size=3,
icv_size=16,
format_mode_iv=_salt_format_mode_iv)
# XXX: Flagged as weak by 'cryptography'. Kept for backward compatibility
CRYPT_ALGOS['Blowfish'] = CryptAlgo('Blowfish',
cipher=algorithms.Blowfish,
mode=modes.CBC)
# XXX: RFC7321 states that DES *MUST NOT* be implemented.
# XXX: Keep for backward compatibility?
# Using a TripleDES cipher algorithm for DES is done by using the same 64
# bits key 3 times (done by cryptography when given a 64 bits key)
CRYPT_ALGOS['DES'] = CryptAlgo('DES',
cipher=algorithms.TripleDES,
mode=modes.CBC,
key_size=(8,))
CRYPT_ALGOS['3DES'] = CryptAlgo('3DES',
cipher=algorithms.TripleDES,
mode=modes.CBC)
CRYPT_ALGOS['CAST'] = CryptAlgo('CAST',
cipher=algorithms.CAST5,
mode=modes.CBC)
#------------------------------------------------------------------------------
if conf.crypto_valid:
from cryptography.hazmat.primitives.hmac import HMAC
from cryptography.hazmat.primitives.cmac import CMAC
from cryptography.hazmat.primitives import hashes
else:
# no error if cryptography is not available but authentication won't be supported
HMAC = CMAC = hashes = None
#------------------------------------------------------------------------------
class IPSecIntegrityError(Exception):
"""
Error risen when the integrity check fails.
"""
pass
class AuthAlgo(object):
"""
IPsec integrity algorithm
"""
def __init__(self, name, mac, digestmod, icv_size, key_size=None):
"""
@param name: the name of this integrity algorithm
@param mac: a Message Authentication Code module
@param digestmod: a Hash or Cipher module
@param icv_size: the length of the integrity check value of this algo
@param key_size: an integer or list/tuple of integers. If specified,
force the secret keys length to one of the values.
Defaults to the `key_size` of the cipher.
"""
self.name = name
self.mac = mac
self.digestmod = digestmod
self.icv_size = icv_size
self.key_size = key_size
def check_key(self, key):
"""
Check that the key length is valid.
@param key: a byte string
"""
if self.key_size and len(key) not in self.key_size:
raise TypeError('invalid key size %s, must be one of %s' %
(len(key), self.key_size))
@crypto_validator
def new_mac(self, key):
"""
@param key: a byte string
@return: an initialized mac object for this algo
"""
if self.mac is CMAC:
return self.mac(self.digestmod(key), default_backend())
else:
return self.mac(key, self.digestmod(), default_backend())
def sign(self, pkt, key):
"""
Sign an IPsec (ESP or AH) packet with this algo.
@param pkt: a packet that contains a valid encrypted ESP or AH layer
@param key: the authentication key, a byte string
@return: the signed packet
"""
if not self.mac:
return pkt
mac = self.new_mac(key)
if pkt.haslayer(ESP):
mac.update(raw(pkt[ESP]))
pkt[ESP].data += mac.finalize()[:self.icv_size]
elif pkt.haslayer(AH):
clone = zero_mutable_fields(pkt.copy(), sending=True)
mac.update(raw(clone))
pkt[AH].icv = mac.finalize()[:self.icv_size]
return pkt
def verify(self, pkt, key):
"""
Check that the integrity check value (icv) of a packet is valid.
@param pkt: a packet that contains a valid encrypted ESP or AH layer
@param key: the authentication key, a byte string
@raise IPSecIntegrityError: if the integrity check fails
"""
if not self.mac or self.icv_size == 0:
return
mac = self.new_mac(key)
pkt_icv = 'not found'
computed_icv = 'not computed'
if isinstance(pkt, ESP):
pkt_icv = pkt.data[len(pkt.data) - self.icv_size:]
clone = pkt.copy()
clone.data = clone.data[:len(clone.data) - self.icv_size]
elif pkt.haslayer(AH):
if len(pkt[AH].icv) != self.icv_size:
# Fill padding since we know the actual icv_size
pkt[AH].padding = pkt[AH].icv[self.icv_size:]
pkt[AH].icv = pkt[AH].icv[:self.icv_size]
pkt_icv = pkt[AH].icv
clone = zero_mutable_fields(pkt.copy(), sending=False)
mac.update(raw(clone))
computed_icv = mac.finalize()[:self.icv_size]
# XXX: Cannot use mac.verify because the ICV can be truncated
if pkt_icv != computed_icv:
raise IPSecIntegrityError('pkt_icv=%r, computed_icv=%r' %
(pkt_icv, computed_icv))
#------------------------------------------------------------------------------
# The names of the integrity algorithms are the same than in scapy.contrib.ikev2
# see http://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xhtml
AUTH_ALGOS = {
'NULL': AuthAlgo('NULL', mac=None, digestmod=None, icv_size=0),
}
if HMAC and hashes:
# XXX: NIST has deprecated SHA1 but is required by RFC7321
AUTH_ALGOS['HMAC-SHA1-96'] = AuthAlgo('HMAC-SHA1-96',
mac=HMAC,
digestmod=hashes.SHA1,
icv_size=12)
AUTH_ALGOS['SHA2-256-128'] = AuthAlgo('SHA2-256-128',
mac=HMAC,
digestmod=hashes.SHA256,
icv_size=16)
AUTH_ALGOS['SHA2-384-192'] = AuthAlgo('SHA2-384-192',
mac=HMAC,
digestmod=hashes.SHA384,
icv_size=24)
AUTH_ALGOS['SHA2-512-256'] = AuthAlgo('SHA2-512-256',
mac=HMAC,
digestmod=hashes.SHA512,
icv_size=32)
# XXX:Flagged as deprecated by 'cryptography'. Kept for backward compat
AUTH_ALGOS['HMAC-MD5-96'] = AuthAlgo('HMAC-MD5-96',
mac=HMAC,
digestmod=hashes.MD5,
icv_size=12)
if CMAC and algorithms:
AUTH_ALGOS['AES-CMAC-96'] = AuthAlgo('AES-CMAC-96',
mac=CMAC,
digestmod=algorithms.AES,
icv_size=12,
key_size=(16,))
#------------------------------------------------------------------------------
def split_for_transport(orig_pkt, transport_proto):
"""
Split an IP(v6) packet in the correct location to insert an ESP or AH
header.
@param orig_pkt: the packet to split. Must be an IP or IPv6 packet
@param transport_proto: the IPsec protocol number that will be inserted
at the split position.
@return: a tuple (header, nh, payload) where nh is the protocol number of
payload.
"""
# force resolution of default fields to avoid padding errors
header = orig_pkt.__class__(raw(orig_pkt))
next_hdr = header.payload
nh = None
if header.version == 4:
nh = header.proto
header.proto = transport_proto
header.remove_payload()
del header.chksum
del header.len
return header, nh, next_hdr
else:
found_rt_hdr = False
prev = header
# Since the RFC 4302 is vague about where the ESP/AH headers should be
# inserted in IPv6, I chose to follow the linux implementation.
while isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrRouting, IPv6ExtHdrDestOpt)):
if isinstance(next_hdr, IPv6ExtHdrHopByHop):
pass
if isinstance(next_hdr, IPv6ExtHdrRouting):
found_rt_hdr = True
elif isinstance(next_hdr, IPv6ExtHdrDestOpt) and found_rt_hdr:
break
prev = next_hdr
next_hdr = next_hdr.payload
nh = prev.nh
prev.nh = transport_proto
prev.remove_payload()
del header.plen
return header, nh, next_hdr
#------------------------------------------------------------------------------
# see RFC 4302 - Appendix A. Mutability of IP Options/Extension Headers
IMMUTABLE_IPV4_OPTIONS = (
0, # End Of List
1, # No OPeration
2, # Security
5, # Extended Security
6, # Commercial Security
20, # Router Alert
21, # Sender Directed Multi-Destination Delivery
)
def zero_mutable_fields(pkt, sending=False):
"""
When using AH, all "mutable" fields must be "zeroed" before calculating
the ICV. See RFC 4302, Section 3.3.3.1. Handling Mutable Fields.
@param pkt: an IP(v6) packet containing an AH layer.
NOTE: The packet will be modified
@param sending: if true, ipv6 routing headers will not be reordered
"""
if pkt.haslayer(AH):
pkt[AH].icv = b"\x00" * len(pkt[AH].icv)
else:
raise TypeError('no AH layer found')
if pkt.version == 4:
# the tos field has been replaced by DSCP and ECN
# Routers may rewrite the DS field as needed to provide a
# desired local or end-to-end service
pkt.tos = 0
# an intermediate router might set the DF bit, even if the source
# did not select it.
pkt.flags = 0
# changed en route as a normal course of processing by routers
pkt.ttl = 0
# will change if any of these other fields change
pkt.chksum = 0
immutable_opts = []
for opt in pkt.options:
if opt.option in IMMUTABLE_IPV4_OPTIONS:
immutable_opts.append(opt)
else:
immutable_opts.append(Raw(b"\x00" * len(opt)))
pkt.options = immutable_opts
else:
# holds DSCP and ECN
pkt.tc = 0
# The flow label described in AHv1 was mutable, and in RFC 2460 [DH98]
# was potentially mutable. To retain compatibility with existing AH
# implementations, the flow label is not included in the ICV in AHv2.
pkt.fl = 0
# same as ttl
pkt.hlim = 0
next_hdr = pkt.payload
while isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrRouting, IPv6ExtHdrDestOpt)):
if isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt)):
for opt in next_hdr.options:
if opt.otype & 0x20:
# option data can change en-route and must be zeroed
opt.optdata = b"\x00" * opt.optlen
elif isinstance(next_hdr, IPv6ExtHdrRouting) and sending:
# The sender must order the field so that it appears as it
# will at the receiver, prior to performing the ICV computation.
next_hdr.segleft = 0
if next_hdr.addresses:
final = next_hdr.addresses.pop()
next_hdr.addresses.insert(0, pkt.dst)
pkt.dst = final
else:
break
next_hdr = next_hdr.payload
return pkt
#------------------------------------------------------------------------------
class SecurityAssociation(object):
"""
This class is responsible of "encryption" and "decryption" of IPsec packets.
"""
SUPPORTED_PROTOS = (IP, IPv6)
def __init__(self, proto, spi, seq_num=1, crypt_algo=None, crypt_key=None,
auth_algo=None, auth_key=None, tunnel_header=None, nat_t_header=None):
"""
@param proto: the IPsec proto to use (ESP or AH)
@param spi: the Security Parameters Index of this SA
@param seq_num: the initial value for the sequence number on encrypted
packets
@param crypt_algo: the encryption algorithm name (only used with ESP)
@param crypt_key: the encryption key (only used with ESP)
@param auth_algo: the integrity algorithm name
@param auth_key: the integrity key
@param tunnel_header: an instance of a IP(v6) header that will be used
to encapsulate the encrypted packets.
@param nat_t_header: an instance of a UDP header that will be used
for NAT-Traversal.
"""
if proto not in (ESP, AH, ESP.name, AH.name):
raise ValueError("proto must be either ESP or AH")
if isinstance(proto, six.string_types):
self.proto = eval(proto)
else:
self.proto = proto
self.spi = spi
self.seq_num = seq_num
if crypt_algo:
if crypt_algo not in CRYPT_ALGOS:
raise TypeError('unsupported encryption algo %r, try %r' %
(crypt_algo, list(CRYPT_ALGOS)))
self.crypt_algo = CRYPT_ALGOS[crypt_algo]
if crypt_key:
salt_size = self.crypt_algo.salt_size
self.crypt_key = crypt_key[:len(crypt_key) - salt_size]
self.crypt_salt = crypt_key[len(crypt_key) - salt_size:]
else:
self.crypt_key = None
self.crypt_salt = None
else:
self.crypt_algo = CRYPT_ALGOS['NULL']
self.crypt_key = None
if auth_algo:
if auth_algo not in AUTH_ALGOS:
raise TypeError('unsupported integrity algo %r, try %r' %
(auth_algo, list(AUTH_ALGOS)))
self.auth_algo = AUTH_ALGOS[auth_algo]
self.auth_key = auth_key
else:
self.auth_algo = AUTH_ALGOS['NULL']
self.auth_key = None
if tunnel_header and not isinstance(tunnel_header, (IP, IPv6)):
raise TypeError('tunnel_header must be %s or %s' % (IP.name, IPv6.name))
self.tunnel_header = tunnel_header
if nat_t_header:
if proto is not ESP:
raise TypeError('nat_t_header is only allowed with ESP')
if not isinstance(nat_t_header, UDP):
raise TypeError('nat_t_header must be %s' % UDP.name)
self.nat_t_header = nat_t_header
def check_spi(self, pkt):
if pkt.spi != self.spi:
raise TypeError('packet spi=0x%x does not match the SA spi=0x%x' %
(pkt.spi, self.spi))
def _encrypt_esp(self, pkt, seq_num=None, iv=None):
if iv is None:
iv = self.crypt_algo.generate_iv()
else:
if len(iv) != self.crypt_algo.iv_size:
raise TypeError('iv length must be %s' % self.crypt_algo.iv_size)
esp = _ESPPlain(spi=self.spi, seq=seq_num or self.seq_num, iv=iv)
if self.tunnel_header:
tunnel = self.tunnel_header.copy()
if tunnel.version == 4:
del tunnel.proto
del tunnel.len
del tunnel.chksum
else:
del tunnel.nh
del tunnel.plen
pkt = tunnel.__class__(raw(tunnel / pkt))
ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_ESP)
esp.data = payload
esp.nh = nh
esp = self.crypt_algo.pad(esp)
esp = self.crypt_algo.encrypt(self, esp, self.crypt_key)
self.auth_algo.sign(esp, self.auth_key)
if self.nat_t_header:
nat_t_header = self.nat_t_header.copy()
nat_t_header.chksum = 0
del nat_t_header.len
if ip_header.version == 4:
del ip_header.proto
else:
del ip_header.nh
ip_header /= nat_t_header
if ip_header.version == 4:
ip_header.len = len(ip_header) + len(esp)
del ip_header.chksum
ip_header = ip_header.__class__(raw(ip_header))
else:
ip_header.plen = len(ip_header.payload) + len(esp)
# sequence number must always change, unless specified by the user
if seq_num is None:
self.seq_num += 1
return ip_header / esp
def _encrypt_ah(self, pkt, seq_num=None):
ah = AH(spi=self.spi, seq=seq_num or self.seq_num,
icv = b"\x00" * self.auth_algo.icv_size)
if self.tunnel_header:
tunnel = self.tunnel_header.copy()
if tunnel.version == 4:
del tunnel.proto
del tunnel.len
del tunnel.chksum
else:
del tunnel.nh
del tunnel.plen
pkt = tunnel.__class__(raw(tunnel / pkt))
ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_AH)
ah.nh = nh
if ip_header.version == 6 and len(ah) % 8 != 0:
# For IPv6, the total length of the header must be a multiple of
# 8-octet units.
ah.padding = b"\x00" * (-len(ah) % 8)
elif len(ah) % 4 != 0:
# For IPv4, the total length of the header must be a multiple of
# 4-octet units.
ah.padding = b"\x00" * (-len(ah) % 4)
# RFC 4302 - Section 2.2. Payload Length
# This 8-bit field specifies the length of AH in 32-bit words (4-byte
# units), minus "2".
ah.payloadlen = len(ah) // 4 - 2
if ip_header.version == 4:
ip_header.len = len(ip_header) + len(ah) + len(payload)
del ip_header.chksum
ip_header = ip_header.__class__(raw(ip_header))
else:
ip_header.plen = len(ip_header.payload) + len(ah) + len(payload)
signed_pkt = self.auth_algo.sign(ip_header / ah / payload, self.auth_key)
# sequence number must always change, unless specified by the user
if seq_num is None:
self.seq_num += 1
return signed_pkt
def encrypt(self, pkt, seq_num=None, iv=None):
"""
Encrypt (and encapsulate) an IP(v6) packet with ESP or AH according
to this SecurityAssociation.
@param pkt: the packet to encrypt
@param seq_num: if specified, use this sequence number instead of the
generated one
@param iv: if specified, use this initialization vector for
encryption instead of a random one.
@return: the encrypted/encapsulated packet
"""
if not isinstance(pkt, self.SUPPORTED_PROTOS):
raise TypeError('cannot encrypt %s, supported protos are %s'
% (pkt.__class__, self.SUPPORTED_PROTOS))
if self.proto is ESP:
return self._encrypt_esp(pkt, seq_num=seq_num, iv=iv)
else:
return self._encrypt_ah(pkt, seq_num=seq_num)
def _decrypt_esp(self, pkt, verify=True):
encrypted = pkt[ESP]
if verify:
self.check_spi(pkt)
self.auth_algo.verify(encrypted, self.auth_key)
esp = self.crypt_algo.decrypt(self, encrypted, self.crypt_key,
self.crypt_algo.icv_size or
self.auth_algo.icv_size)
if self.tunnel_header:
# drop the tunnel header and return the payload untouched
pkt.remove_payload()
if pkt.version == 4:
pkt.proto = esp.nh
else:
pkt.nh = esp.nh
cls = pkt.guess_payload_class(esp.data)
return cls(esp.data)
else:
ip_header = pkt
if ip_header.version == 4:
ip_header.proto = esp.nh
del ip_header.chksum
ip_header.remove_payload()
ip_header.len = len(ip_header) + len(esp.data)
# recompute checksum
ip_header = ip_header.__class__(raw(ip_header))
else:
encrypted.underlayer.nh = esp.nh
encrypted.underlayer.remove_payload()
ip_header.plen = len(ip_header.payload) + len(esp.data)
cls = ip_header.guess_payload_class(esp.data)
# reassemble the ip_header with the ESP payload
return ip_header / cls(esp.data)
def _decrypt_ah(self, pkt, verify=True):
if verify:
self.check_spi(pkt)
self.auth_algo.verify(pkt, self.auth_key)
ah = pkt[AH]
payload = ah.payload
payload.remove_underlayer(None) # useless argument...
if self.tunnel_header:
return payload
else:
ip_header = pkt
if ip_header.version == 4:
ip_header.proto = ah.nh
del ip_header.chksum
ip_header.remove_payload()
ip_header.len = len(ip_header) + len(payload)
# recompute checksum
ip_header = ip_header.__class__(raw(ip_header))
else:
ah.underlayer.nh = ah.nh
ah.underlayer.remove_payload()
ip_header.plen = len(ip_header.payload) + len(payload)
# reassemble the ip_header with the AH payload
return ip_header / payload
def decrypt(self, pkt, verify=True):
"""
Decrypt (and decapsulate) an IP(v6) packet containing ESP or AH.
@param pkt: the packet to decrypt
@param verify: if False, do not perform the integrity check
@return: the decrypted/decapsulated packet
@raise IPSecIntegrityError: if the integrity check fails
"""
if not isinstance(pkt, self.SUPPORTED_PROTOS):
raise TypeError('cannot decrypt %s, supported protos are %s'
% (pkt.__class__, self.SUPPORTED_PROTOS))
if self.proto is ESP and pkt.haslayer(ESP):
return self._decrypt_esp(pkt, verify=verify)
elif self.proto is AH and pkt.haslayer(AH):
return self._decrypt_ah(pkt, verify=verify)
else:
raise TypeError('%s has no %s layer' % (pkt, self.proto.name))