blob: 2feb147949fb829e162aa11d21cf7f063af9b3da [file] [log] [blame]
# This file is part of Scapy
## See http://www.secdev.org/projects/scapy for more informations
## Copyright (C) Sabrina Dubroca <sd@queasysnail.net>
## This program is published under a GPLv2 license
"""
Classes and functions for MACsec.
"""
from __future__ import absolute_import
from __future__ import print_function
import struct
from scapy.config import conf
from scapy.fields import *
from scapy.packet import Packet, Raw, bind_layers
from scapy.layers.l2 import Ether, Dot1AD, Dot1Q
from scapy.layers.eap import MACsecSCI
from scapy.layers.inet import IP
from scapy.layers.inet6 import IPv6
import scapy.modules.six as six
if conf.crypto_valid:
from cryptography.exceptions import InvalidTag
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import (
Cipher,
algorithms,
modes,
)
else:
log_loading.info("Can't import python-cryptography v1.7+. "
"Disabled MACsec encryption/authentication.")
NOSCI_LEN = 14 + 6
SCI_LEN = 8
DEFAULT_ICV_LEN = 16
class MACsecSA(object):
"""Representation of a MACsec Secure Association
Provides encapsulation, decapsulation, encryption, and decryption
of MACsec frames
"""
def __init__(self, sci, an, pn, key, icvlen, encrypt, send_sci):
if isinstance(sci, six.integer_types):
self.sci = struct.pack('!Q', sci)
elif isinstance(sci, bytes):
self.sci = sci
else:
raise TypeError("SCI must be either bytes or int")
self.an = an
self.pn = pn
self.key = key
self.icvlen = icvlen
self.do_encrypt = encrypt
self.send_sci = send_sci
def make_iv(self, pkt):
"""generate an IV for the packet"""
return self.sci + struct.pack('!I', pkt[MACsec].pn)
@staticmethod
def split_pkt(pkt, assoclen, icvlen=0):
"""
split the packet into associated data, plaintext or ciphertext, and
optional ICV
"""
data = raw(pkt)
assoc = data[:assoclen]
if icvlen:
icv = data[-icvlen:]
enc = data[assoclen:-icvlen]
else:
icv = b''
enc = data[assoclen:]
return assoc, enc, icv
def e_bit(self):
"""returns the value of the E bit for packets sent through this SA"""
return self.do_encrypt
def c_bit(self):
"""returns the value of the C bit for packets sent through this SA"""
return self.do_encrypt or self.icvlen != DEFAULT_ICV_LEN
@staticmethod
def shortlen(pkt):
"""determine shortlen for a raw packet (not encapsulated yet)"""
datalen = len(pkt) - 2*6
if datalen < 48:
return datalen
return 0
def encap(self, pkt):
"""encapsulate a frame using this Secure Association"""
if pkt.name != Ether().name:
raise TypeError('cannot encapsulate packet in MACsec, must be Ethernet')
hdr = copy.deepcopy(pkt)
payload = hdr.payload
del hdr.payload
tag = MACsec(sci=self.sci, an=self.an,
SC=self.send_sci,
E=self.e_bit(), C=self.c_bit(),
shortlen=MACsecSA.shortlen(pkt),
pn=self.pn, type=pkt.type)
hdr.type = ETH_P_MACSEC
return hdr/tag/payload
# this doesn't really need to be a method, but for symmetry with
# encap(), it is
def decap(self, orig_pkt):
"""decapsulate a MACsec frame"""
if orig_pkt.name != Ether().name or orig_pkt.payload.name != MACsec().name:
raise TypeError('cannot decapsulate MACsec packet, must be Ethernet/MACsec')
packet = copy.deepcopy(orig_pkt)
prev_layer = packet[MACsec].underlayer
prev_layer.type = packet[MACsec].type
next_layer = packet[MACsec].payload
del prev_layer.payload
if prev_layer.name == Ether().name:
return Ether(raw(prev_layer/next_layer))
return prev_layer/next_layer
def encrypt(self, orig_pkt, assoclen=None):
"""encrypt a MACsec frame for this Secure Association"""
hdr = copy.deepcopy(orig_pkt)
del hdr[MACsec].payload
del hdr[MACsec].type
pktlen = len(orig_pkt)
if self.send_sci:
hdrlen = NOSCI_LEN + SCI_LEN
else:
hdrlen = NOSCI_LEN
if assoclen is None or not self.do_encrypt:
if self.do_encrypt:
assoclen = hdrlen
else:
assoclen = pktlen
iv = self.make_iv(orig_pkt)
assoc, pt, _ = MACsecSA.split_pkt(orig_pkt, assoclen)
encryptor = Cipher(
algorithms.AES(self.key),
modes.GCM(iv),
backend=default_backend()
).encryptor()
encryptor.authenticate_additional_data(assoc)
ct = encryptor.update(pt) + encryptor.finalize()
hdr[MACsec].payload = Raw(assoc[hdrlen:assoclen] + ct + encryptor.tag)
return hdr
def decrypt(self, orig_pkt, assoclen=None):
"""decrypt a MACsec frame for this Secure Association"""
hdr = copy.deepcopy(orig_pkt)
del hdr[MACsec].payload
pktlen = len(orig_pkt)
if self.send_sci:
hdrlen = NOSCI_LEN + SCI_LEN
else:
hdrlen = NOSCI_LEN
if assoclen is None or not self.do_encrypt:
if self.do_encrypt:
assoclen = hdrlen
else:
assoclen = pktlen - self.icvlen
iv = self.make_iv(hdr)
assoc, ct, icv = MACsecSA.split_pkt(orig_pkt, assoclen, self.icvlen)
decryptor = Cipher(
algorithms.AES(self.key),
modes.GCM(iv, icv),
backend=default_backend()
).decryptor()
decryptor.authenticate_additional_data(assoc)
pt = assoc[hdrlen:assoclen]
pt += decryptor.update(ct)
pt += decryptor.finalize()
hdr[MACsec].type = struct.unpack('!H', pt[0:2])[0]
hdr[MACsec].payload = Raw(pt[2:])
return hdr
class MACsec(Packet):
"""representation of one MACsec frame"""
name = '802.1AE'
fields_desc = [BitField('Ver', 0, 1),
BitField('ES', 0, 1),
BitField('SC', 0, 1),
BitField('SCB', 0, 1),
BitField('E', 0, 1),
BitField('C', 0, 1),
BitField('an', 0, 2),
BitField('reserved', 0, 2),
BitField('shortlen', 0, 6),
IntField("pn", 1),
ConditionalField(PacketField("sci", None, MACsecSCI), lambda pkt: pkt.SC),
ConditionalField(XShortEnumField("type", None, ETHER_TYPES),
lambda pkt: pkt.type is not None)]
def mysummary(self):
summary = self.sprintf("an=%MACsec.an%, pn=%MACsec.pn%")
if self.SC:
summary += self.sprintf(", sci=%MACsec.sci%")
if self.type is not None:
summary += self.sprintf(", %MACsec.type%")
return summary
bind_layers(MACsec, IP, type=ETH_P_IP)
bind_layers(MACsec, IPv6, type=ETH_P_IPV6)
bind_layers( Dot1AD, MACsec, type=ETH_P_MACSEC)
bind_layers( Dot1Q, MACsec, type=ETH_P_MACSEC)
bind_layers( Ether, MACsec, type=ETH_P_MACSEC)