blob: deadf770f5ef2ae13bdb4ea7aca4b47c97a38109 [file] [log] [blame]
## This file is part of Scapy
## See http://www.secdev.org/projects/scapy for more informations
## Copyright (C) Philippe Biondi <phil@secdev.org>
## Modified by Maxence Tury <maxence.tury@ssi.gouv.fr>
## Acknowledgment: Ralph Broenink
## This program is published under a GPLv2 license
"""
Basic Encoding Rules (BER) for ASN.1
"""
from __future__ import absolute_import
from scapy.error import warning
from scapy.compat import *
from scapy.utils import binrepr,inet_aton,inet_ntoa
from scapy.asn1.asn1 import ASN1_Decoding_Error,ASN1_Encoding_Error,ASN1_BadTag_Decoding_Error,ASN1_Codecs,ASN1_Class_UNIVERSAL,ASN1_Error,ASN1_DECODING_ERROR,ASN1_BADTAG
import scapy.modules.six as six
##################
## BER encoding ##
##################
#####[ BER tools ]#####
class BER_Exception(Exception):
pass
class BER_Encoding_Error(ASN1_Encoding_Error):
def __init__(self, msg, encoded=None, remaining=None):
Exception.__init__(self, msg)
self.remaining = remaining
self.encoded = encoded
def __str__(self):
s = Exception.__str__(self)
if isinstance(self.encoded, BERcodec_Object):
s+="\n### Already encoded ###\n%s" % self.encoded.strshow()
else:
s+="\n### Already encoded ###\n%r" % self.encoded
s+="\n### Remaining ###\n%r" % self.remaining
return s
class BER_Decoding_Error(ASN1_Decoding_Error):
def __init__(self, msg, decoded=None, remaining=None):
Exception.__init__(self, msg)
self.remaining = remaining
self.decoded = decoded
def __str__(self):
s = Exception.__str__(self)
if isinstance(self.decoded, BERcodec_Object):
s+="\n### Already decoded ###\n%s" % self.decoded.strshow()
else:
s+="\n### Already decoded ###\n%r" % self.decoded
s+="\n### Remaining ###\n%r" % self.remaining
return s
class BER_BadTag_Decoding_Error(BER_Decoding_Error, ASN1_BadTag_Decoding_Error):
pass
def BER_len_enc(l, size=0):
if l <= 127 and size==0:
return chb(l)
s = b""
while l or size>0:
s = chb(l&0xff)+s
l >>= 8
size -= 1
if len(s) > 127:
raise BER_Exception("BER_len_enc: Length too long (%i) to be encoded [%r]" % (len(s),s))
return chb(len(s)|0x80)+s
def BER_len_dec(s):
l = orb(s[0])
if not l & 0x80:
return l,s[1:]
l &= 0x7f
if len(s) <= l:
raise BER_Decoding_Error("BER_len_dec: Got %i bytes while expecting %i" % (len(s)-1, l),remaining=s)
ll = 0
for c in s[1:l+1]:
ll <<= 8
ll |= orb(c)
return ll,s[l+1:]
def BER_num_enc(l, size=1):
x=[]
while l or size>0:
x.insert(0, l & 0x7f)
if len(x) > 1:
x[0] |= 0x80
l >>= 7
size -= 1
return b"".join(chb(k) for k in x)
def BER_num_dec(s, cls_id=0):
if len(s) == 0:
raise BER_Decoding_Error("BER_num_dec: got empty string", remaining=s)
x = cls_id
for i, c in enumerate(s):
c = orb(c)
x <<= 7
x |= c&0x7f
if not c&0x80:
break
if c&0x80:
raise BER_Decoding_Error("BER_num_dec: unfinished number description", remaining=s)
return x, s[i+1:]
def BER_id_dec(s):
# This returns the tag ALONG WITH THE PADDED CLASS+CONSTRUCTIVE INFO.
# Let's recall that bits 8-7 from the first byte of the tag encode
# the class information, while bit 6 means primitive or constructive.
#
# For instance, with low-tag-number b'\x81', class would be 0b10
# ('context-specific') and tag 0x01, but we return 0x81 as a whole.
# For b'\xff\x22', class would be 0b11 ('private'), constructed, then
# padding, then tag 0x22, but we return (0xff>>5)*128^1 + 0x22*128^0.
# Why the 5-bit-shifting? Because it provides an unequivocal encoding
# on base 128 (note that 0xff would equal 1*128^1 + 127*128^0...),
# as we know that bits 5 to 1 are fixed to 1 anyway.
#
# As long as there is no class differentiation, we have to keep this info
# encoded in scapy's tag in order to reuse it for packet building.
# Note that tags thus may have to be hard-coded with their extended
# information, e.g. a SEQUENCE from asn1.py has a direct tag 0x20|16.
x = orb(s[0])
if x & 0x1f != 0x1f:
# low-tag-number
return x,s[1:]
else:
# high-tag-number
return BER_num_dec(s[1:], cls_id=x>>5)
def BER_id_enc(n):
if n < 256:
# low-tag-number
return chb(n)
else:
# high-tag-number
s = BER_num_enc(n)
tag = orb(s[0]) # first byte, as an int
tag &= 0x07 # reset every bit from 8 to 4
tag <<= 5 # move back the info bits on top
tag |= 0x1f # pad with 1s every bit from 5 to 1
return chb(tag) + s[1:]
# The functions below provide implicit and explicit tagging support.
def BER_tagging_dec(s, hidden_tag=None, implicit_tag=None,
explicit_tag=None, safe=False):
# We output the 'real_tag' if it is different from the (im|ex)plicit_tag.
real_tag = None
if len(s) > 0:
err_msg = "BER_tagging_dec: observed tag does not match expected tag"
if implicit_tag is not None:
ber_id,s = BER_id_dec(s)
if ber_id != implicit_tag:
if not safe:
raise BER_Decoding_Error(err_msg, remaining=s)
else:
real_tag = ber_id
s = chb(hash(hidden_tag)) + s
elif explicit_tag is not None:
ber_id,s = BER_id_dec(s)
if ber_id != explicit_tag:
if not safe:
raise BER_Decoding_Error(err_msg, remaining=s)
else:
real_tag = ber_id
l,s = BER_len_dec(s)
return real_tag, s
def BER_tagging_enc(s, implicit_tag=None, explicit_tag=None):
if len(s) > 0:
if implicit_tag is not None:
s = BER_id_enc(implicit_tag) + s[1:]
elif explicit_tag is not None:
s = BER_id_enc(explicit_tag) + BER_len_enc(len(s)) + s
return s
#####[ BER classes ]#####
class BERcodec_metaclass(type):
def __new__(cls, name, bases, dct):
c = super(BERcodec_metaclass, cls).__new__(cls, name, bases, dct)
try:
c.tag.register(c.codec, c)
except:
warning("Error registering %r for %r" % (c.tag, c.codec))
return c
class BERcodec_Object(six.with_metaclass(BERcodec_metaclass)):
codec = ASN1_Codecs.BER
tag = ASN1_Class_UNIVERSAL.ANY
@classmethod
def asn1_object(cls, val):
return cls.tag.asn1_object(val)
@classmethod
def check_string(cls, s):
if not s:
raise BER_Decoding_Error("%s: Got empty object while expecting tag %r" %
(cls.__name__,cls.tag), remaining=s)
@classmethod
def check_type(cls, s):
cls.check_string(s)
tag, remainder = BER_id_dec(s)
if cls.tag != tag:
raise BER_BadTag_Decoding_Error("%s: Got tag [%i/%#x] while expecting %r" %
(cls.__name__, tag, tag, cls.tag), remaining=s)
return remainder
@classmethod
def check_type_get_len(cls, s):
s2 = cls.check_type(s)
if not s2:
raise BER_Decoding_Error("%s: No bytes while expecting a length" %
cls.__name__, remaining=s)
return BER_len_dec(s2)
@classmethod
def check_type_check_len(cls, s):
l,s3 = cls.check_type_get_len(s)
if len(s3) < l:
raise BER_Decoding_Error("%s: Got %i bytes while expecting %i" %
(cls.__name__, len(s3), l), remaining=s)
return l,s3[:l],s3[l:]
@classmethod
def do_dec(cls, s, context=None, safe=False):
if context is None:
context = cls.tag.context
cls.check_string(s)
p,_ = BER_id_dec(s)
if p not in context:
t = s
if len(t) > 18:
t = t[:15]+b"..."
raise BER_Decoding_Error("Unknown prefix [%02x] for [%r]" % (p,t), remaining=s)
codec = context[p].get_codec(ASN1_Codecs.BER)
return codec.dec(s,context,safe)
@classmethod
def dec(cls, s, context=None, safe=False):
if not safe:
return cls.do_dec(s, context, safe)
try:
return cls.do_dec(s, context, safe)
except BER_BadTag_Decoding_Error as e:
o,remain = BERcodec_Object.dec(e.remaining, context, safe)
return ASN1_BADTAG(o),remain
except BER_Decoding_Error as e:
return ASN1_DECODING_ERROR(s, exc=e),""
except ASN1_Error as e:
return ASN1_DECODING_ERROR(s, exc=e),""
@classmethod
def safedec(cls, s, context=None):
return cls.dec(s, context, safe=True)
@classmethod
def enc(cls, s):
if isinstance(s, six.string_types):
return BERcodec_STRING.enc(s)
else:
return BERcodec_INTEGER.enc(int(s))
ASN1_Codecs.BER.register_stem(BERcodec_Object)
##########################
#### BERcodec objects ####
##########################
class BERcodec_INTEGER(BERcodec_Object):
tag = ASN1_Class_UNIVERSAL.INTEGER
@classmethod
def enc(cls, i):
s = []
while True:
s.append(i&0xff)
if -127 <= i < 0:
break
if 128 <= i <= 255:
s.append(0)
i >>= 8
if not i:
break
s = [chb(hash(c)) for c in s]
s.append(BER_len_enc(len(s)))
s.append(chb(hash(cls.tag)))
s.reverse()
return b"".join(s)
@classmethod
def do_dec(cls, s, context=None, safe=False):
l,s,t = cls.check_type_check_len(s)
x = 0
if s:
if orb(s[0])&0x80: # negative int
x = -1
for c in s:
x <<= 8
x |= orb(c)
return cls.asn1_object(x),t
class BERcodec_BOOLEAN(BERcodec_INTEGER):
tag = ASN1_Class_UNIVERSAL.BOOLEAN
class BERcodec_BIT_STRING(BERcodec_Object):
tag = ASN1_Class_UNIVERSAL.BIT_STRING
@classmethod
def do_dec(cls, s, context=None, safe=False):
# /!\ the unused_bits information is lost after this decoding
l,s,t = cls.check_type_check_len(s)
if len(s) > 0:
unused_bits = orb(s[0])
if safe and unused_bits > 7:
raise BER_Decoding_Error("BERcodec_BIT_STRING: too many unused_bits advertised", remaining=s)
s = "".join(binrepr(orb(x)).zfill(8) for x in s[1:])
if unused_bits > 0:
s = s[:-unused_bits]
return cls.tag.asn1_object(s),t
else:
raise BER_Decoding_Error("BERcodec_BIT_STRING found no content (not even unused_bits byte)", remaining=s)
@classmethod
def enc(cls,s):
# /!\ this is DER encoding (bit strings are only zero-bit padded)
s = raw(s)
if len(s) % 8 == 0:
unused_bits = 0
else:
unused_bits = 8 - len(s)%8
s += b"0"*unused_bits
s = b"".join(chb(int(b"".join(chb(y) for y in x),2)) for x in zip(*[iter(s)]*8))
s = chb(unused_bits) + s
return chb(hash(cls.tag))+BER_len_enc(len(s))+s
class BERcodec_STRING(BERcodec_Object):
tag = ASN1_Class_UNIVERSAL.STRING
@classmethod
def enc(cls,s):
s = raw(s)
return chb(hash(cls.tag))+BER_len_enc(len(s))+s # Be sure we are encoding bytes
@classmethod
def do_dec(cls, s, context=None, safe=False):
l,s,t = cls.check_type_check_len(s)
return cls.tag.asn1_object(s),t
class BERcodec_NULL(BERcodec_INTEGER):
tag = ASN1_Class_UNIVERSAL.NULL
@classmethod
def enc(cls, i):
if i == 0:
return chb(hash(cls.tag))+b"\0"
else:
return super(cls,cls).enc(i)
class BERcodec_OID(BERcodec_Object):
tag = ASN1_Class_UNIVERSAL.OID
@classmethod
def enc(cls, oid):
oid = raw(oid)
lst = [int(x) for x in oid.strip(b".").split(b".")]
if len(lst) >= 2:
lst[1] += 40*lst[0]
del(lst[0])
s = b"".join(BER_num_enc(k) for k in lst)
return chb(hash(cls.tag))+BER_len_enc(len(s))+s
@classmethod
def do_dec(cls, s, context=None, safe=False):
l,s,t = cls.check_type_check_len(s)
lst = []
while s:
l,s = BER_num_dec(s)
lst.append(l)
if (len(lst) > 0):
lst.insert(0,lst[0]//40)
lst[1] %= 40
return cls.asn1_object(b".".join(str(k).encode('ascii') for k in lst)), t
class BERcodec_ENUMERATED(BERcodec_INTEGER):
tag = ASN1_Class_UNIVERSAL.ENUMERATED
class BERcodec_UTF8_STRING(BERcodec_STRING):
tag = ASN1_Class_UNIVERSAL.UTF8_STRING
class BERcodec_NUMERIC_STRING(BERcodec_STRING):
tag = ASN1_Class_UNIVERSAL.NUMERIC_STRING
class BERcodec_PRINTABLE_STRING(BERcodec_STRING):
tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING
class BERcodec_T61_STRING(BERcodec_STRING):
tag = ASN1_Class_UNIVERSAL.T61_STRING
class BERcodec_VIDEOTEX_STRING(BERcodec_STRING):
tag = ASN1_Class_UNIVERSAL.VIDEOTEX_STRING
class BERcodec_IA5_STRING(BERcodec_STRING):
tag = ASN1_Class_UNIVERSAL.IA5_STRING
class BERcodec_UTC_TIME(BERcodec_STRING):
tag = ASN1_Class_UNIVERSAL.UTC_TIME
class BERcodec_GENERALIZED_TIME(BERcodec_STRING):
tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME
class BERcodec_ISO646_STRING(BERcodec_STRING):
tag = ASN1_Class_UNIVERSAL.ISO646_STRING
class BERcodec_UNIVERSAL_STRING(BERcodec_STRING):
tag = ASN1_Class_UNIVERSAL.UNIVERSAL_STRING
class BERcodec_BMP_STRING(BERcodec_STRING):
tag = ASN1_Class_UNIVERSAL.BMP_STRING
class BERcodec_SEQUENCE(BERcodec_Object):
tag = ASN1_Class_UNIVERSAL.SEQUENCE
@classmethod
def enc(cls, l):
if not isinstance(l, bytes):
l = b"".join(x.enc(cls.codec) for x in l)
return chb(hash(cls.tag))+BER_len_enc(len(l))+l
@classmethod
def do_dec(cls, s, context=None, safe=False):
if context is None:
context = cls.tag.context
l,st = cls.check_type_get_len(s) # we may have len(s) < l
s,t = st[:l],st[l:]
obj = []
while s:
try:
o,s = BERcodec_Object.dec(s, context, safe)
except BER_Decoding_Error as err:
err.remaining += t
if err.decoded is not None:
obj.append(err.decoded)
err.decoded = obj
raise
obj.append(o)
if len(st) < l:
raise BER_Decoding_Error("Not enough bytes to decode sequence", decoded=obj)
return cls.asn1_object(obj),t
class BERcodec_SET(BERcodec_SEQUENCE):
tag = ASN1_Class_UNIVERSAL.SET
class BERcodec_IPADDRESS(BERcodec_STRING):
tag = ASN1_Class_UNIVERSAL.IPADDRESS
@classmethod
def enc(cls, ipaddr_ascii):
try:
s = inet_aton(ipaddr_ascii)
except Exception:
raise BER_Encoding_Error("IPv4 address could not be encoded")
return chb(hash(cls.tag))+BER_len_enc(len(s))+s
@classmethod
def do_dec(cls, s, context=None, safe=False):
l,s,t = cls.check_type_check_len(s)
try:
ipaddr_ascii = inet_ntoa(s)
except Exception:
raise BER_Decoding_Error("IP address could not be decoded", remaining=s)
return cls.asn1_object(ipaddr_ascii), t
class BERcodec_COUNTER32(BERcodec_INTEGER):
tag = ASN1_Class_UNIVERSAL.COUNTER32
class BERcodec_GAUGE32(BERcodec_INTEGER):
tag = ASN1_Class_UNIVERSAL.GAUGE32
class BERcodec_TIME_TICKS(BERcodec_INTEGER):
tag = ASN1_Class_UNIVERSAL.TIME_TICKS