| ## This file is part of Scapy |
| ## See http://www.secdev.org/projects/scapy for more informations |
| ## Copyright (C) Philippe Biondi <phil@secdev.org> |
| ## Modified by Maxence Tury <maxence.tury@ssi.gouv.fr> |
| ## Acknowledgment: Ralph Broenink |
| ## This program is published under a GPLv2 license |
| |
| """ |
| Basic Encoding Rules (BER) for ASN.1 |
| """ |
| |
| from __future__ import absolute_import |
| from scapy.error import warning |
| from scapy.compat import * |
| from scapy.utils import binrepr,inet_aton,inet_ntoa |
| from scapy.asn1.asn1 import ASN1_Decoding_Error,ASN1_Encoding_Error,ASN1_BadTag_Decoding_Error,ASN1_Codecs,ASN1_Class_UNIVERSAL,ASN1_Error,ASN1_DECODING_ERROR,ASN1_BADTAG |
| import scapy.modules.six as six |
| |
| ################## |
| ## BER encoding ## |
| ################## |
| |
| |
| |
| #####[ BER tools ]##### |
| |
| |
| class BER_Exception(Exception): |
| pass |
| |
| class BER_Encoding_Error(ASN1_Encoding_Error): |
| def __init__(self, msg, encoded=None, remaining=None): |
| Exception.__init__(self, msg) |
| self.remaining = remaining |
| self.encoded = encoded |
| def __str__(self): |
| s = Exception.__str__(self) |
| if isinstance(self.encoded, BERcodec_Object): |
| s+="\n### Already encoded ###\n%s" % self.encoded.strshow() |
| else: |
| s+="\n### Already encoded ###\n%r" % self.encoded |
| s+="\n### Remaining ###\n%r" % self.remaining |
| return s |
| |
| class BER_Decoding_Error(ASN1_Decoding_Error): |
| def __init__(self, msg, decoded=None, remaining=None): |
| Exception.__init__(self, msg) |
| self.remaining = remaining |
| self.decoded = decoded |
| def __str__(self): |
| s = Exception.__str__(self) |
| if isinstance(self.decoded, BERcodec_Object): |
| s+="\n### Already decoded ###\n%s" % self.decoded.strshow() |
| else: |
| s+="\n### Already decoded ###\n%r" % self.decoded |
| s+="\n### Remaining ###\n%r" % self.remaining |
| return s |
| |
| class BER_BadTag_Decoding_Error(BER_Decoding_Error, ASN1_BadTag_Decoding_Error): |
| pass |
| |
| def BER_len_enc(l, size=0): |
| if l <= 127 and size==0: |
| return chb(l) |
| s = b"" |
| while l or size>0: |
| s = chb(l&0xff)+s |
| l >>= 8 |
| size -= 1 |
| if len(s) > 127: |
| raise BER_Exception("BER_len_enc: Length too long (%i) to be encoded [%r]" % (len(s),s)) |
| return chb(len(s)|0x80)+s |
| def BER_len_dec(s): |
| l = orb(s[0]) |
| if not l & 0x80: |
| return l,s[1:] |
| l &= 0x7f |
| if len(s) <= l: |
| raise BER_Decoding_Error("BER_len_dec: Got %i bytes while expecting %i" % (len(s)-1, l),remaining=s) |
| ll = 0 |
| for c in s[1:l+1]: |
| ll <<= 8 |
| ll |= orb(c) |
| return ll,s[l+1:] |
| |
| def BER_num_enc(l, size=1): |
| x=[] |
| while l or size>0: |
| x.insert(0, l & 0x7f) |
| if len(x) > 1: |
| x[0] |= 0x80 |
| l >>= 7 |
| size -= 1 |
| return b"".join(chb(k) for k in x) |
| def BER_num_dec(s, cls_id=0): |
| if len(s) == 0: |
| raise BER_Decoding_Error("BER_num_dec: got empty string", remaining=s) |
| x = cls_id |
| for i, c in enumerate(s): |
| c = orb(c) |
| x <<= 7 |
| x |= c&0x7f |
| if not c&0x80: |
| break |
| if c&0x80: |
| raise BER_Decoding_Error("BER_num_dec: unfinished number description", remaining=s) |
| return x, s[i+1:] |
| |
| def BER_id_dec(s): |
| # This returns the tag ALONG WITH THE PADDED CLASS+CONSTRUCTIVE INFO. |
| # Let's recall that bits 8-7 from the first byte of the tag encode |
| # the class information, while bit 6 means primitive or constructive. |
| # |
| # For instance, with low-tag-number b'\x81', class would be 0b10 |
| # ('context-specific') and tag 0x01, but we return 0x81 as a whole. |
| # For b'\xff\x22', class would be 0b11 ('private'), constructed, then |
| # padding, then tag 0x22, but we return (0xff>>5)*128^1 + 0x22*128^0. |
| # Why the 5-bit-shifting? Because it provides an unequivocal encoding |
| # on base 128 (note that 0xff would equal 1*128^1 + 127*128^0...), |
| # as we know that bits 5 to 1 are fixed to 1 anyway. |
| # |
| # As long as there is no class differentiation, we have to keep this info |
| # encoded in scapy's tag in order to reuse it for packet building. |
| # Note that tags thus may have to be hard-coded with their extended |
| # information, e.g. a SEQUENCE from asn1.py has a direct tag 0x20|16. |
| x = orb(s[0]) |
| if x & 0x1f != 0x1f: |
| # low-tag-number |
| return x,s[1:] |
| else: |
| # high-tag-number |
| return BER_num_dec(s[1:], cls_id=x>>5) |
| def BER_id_enc(n): |
| if n < 256: |
| # low-tag-number |
| return chb(n) |
| else: |
| # high-tag-number |
| s = BER_num_enc(n) |
| tag = orb(s[0]) # first byte, as an int |
| tag &= 0x07 # reset every bit from 8 to 4 |
| tag <<= 5 # move back the info bits on top |
| tag |= 0x1f # pad with 1s every bit from 5 to 1 |
| return chb(tag) + s[1:] |
| |
| # The functions below provide implicit and explicit tagging support. |
| def BER_tagging_dec(s, hidden_tag=None, implicit_tag=None, |
| explicit_tag=None, safe=False): |
| # We output the 'real_tag' if it is different from the (im|ex)plicit_tag. |
| real_tag = None |
| if len(s) > 0: |
| err_msg = "BER_tagging_dec: observed tag does not match expected tag" |
| if implicit_tag is not None: |
| ber_id,s = BER_id_dec(s) |
| if ber_id != implicit_tag: |
| if not safe: |
| raise BER_Decoding_Error(err_msg, remaining=s) |
| else: |
| real_tag = ber_id |
| s = chb(hash(hidden_tag)) + s |
| elif explicit_tag is not None: |
| ber_id,s = BER_id_dec(s) |
| if ber_id != explicit_tag: |
| if not safe: |
| raise BER_Decoding_Error(err_msg, remaining=s) |
| else: |
| real_tag = ber_id |
| l,s = BER_len_dec(s) |
| return real_tag, s |
| def BER_tagging_enc(s, implicit_tag=None, explicit_tag=None): |
| if len(s) > 0: |
| if implicit_tag is not None: |
| s = BER_id_enc(implicit_tag) + s[1:] |
| elif explicit_tag is not None: |
| s = BER_id_enc(explicit_tag) + BER_len_enc(len(s)) + s |
| return s |
| |
| #####[ BER classes ]##### |
| |
| class BERcodec_metaclass(type): |
| def __new__(cls, name, bases, dct): |
| c = super(BERcodec_metaclass, cls).__new__(cls, name, bases, dct) |
| try: |
| c.tag.register(c.codec, c) |
| except: |
| warning("Error registering %r for %r" % (c.tag, c.codec)) |
| return c |
| |
| |
| class BERcodec_Object(six.with_metaclass(BERcodec_metaclass)): |
| codec = ASN1_Codecs.BER |
| tag = ASN1_Class_UNIVERSAL.ANY |
| |
| @classmethod |
| def asn1_object(cls, val): |
| return cls.tag.asn1_object(val) |
| |
| @classmethod |
| def check_string(cls, s): |
| if not s: |
| raise BER_Decoding_Error("%s: Got empty object while expecting tag %r" % |
| (cls.__name__,cls.tag), remaining=s) |
| @classmethod |
| def check_type(cls, s): |
| cls.check_string(s) |
| tag, remainder = BER_id_dec(s) |
| if cls.tag != tag: |
| raise BER_BadTag_Decoding_Error("%s: Got tag [%i/%#x] while expecting %r" % |
| (cls.__name__, tag, tag, cls.tag), remaining=s) |
| return remainder |
| @classmethod |
| def check_type_get_len(cls, s): |
| s2 = cls.check_type(s) |
| if not s2: |
| raise BER_Decoding_Error("%s: No bytes while expecting a length" % |
| cls.__name__, remaining=s) |
| return BER_len_dec(s2) |
| @classmethod |
| def check_type_check_len(cls, s): |
| l,s3 = cls.check_type_get_len(s) |
| if len(s3) < l: |
| raise BER_Decoding_Error("%s: Got %i bytes while expecting %i" % |
| (cls.__name__, len(s3), l), remaining=s) |
| return l,s3[:l],s3[l:] |
| |
| @classmethod |
| def do_dec(cls, s, context=None, safe=False): |
| if context is None: |
| context = cls.tag.context |
| cls.check_string(s) |
| p,_ = BER_id_dec(s) |
| if p not in context: |
| t = s |
| if len(t) > 18: |
| t = t[:15]+b"..." |
| raise BER_Decoding_Error("Unknown prefix [%02x] for [%r]" % (p,t), remaining=s) |
| codec = context[p].get_codec(ASN1_Codecs.BER) |
| return codec.dec(s,context,safe) |
| |
| @classmethod |
| def dec(cls, s, context=None, safe=False): |
| if not safe: |
| return cls.do_dec(s, context, safe) |
| try: |
| return cls.do_dec(s, context, safe) |
| except BER_BadTag_Decoding_Error as e: |
| o,remain = BERcodec_Object.dec(e.remaining, context, safe) |
| return ASN1_BADTAG(o),remain |
| except BER_Decoding_Error as e: |
| return ASN1_DECODING_ERROR(s, exc=e),"" |
| except ASN1_Error as e: |
| return ASN1_DECODING_ERROR(s, exc=e),"" |
| |
| @classmethod |
| def safedec(cls, s, context=None): |
| return cls.dec(s, context, safe=True) |
| |
| |
| @classmethod |
| def enc(cls, s): |
| if isinstance(s, six.string_types): |
| return BERcodec_STRING.enc(s) |
| else: |
| return BERcodec_INTEGER.enc(int(s)) |
| |
| ASN1_Codecs.BER.register_stem(BERcodec_Object) |
| |
| |
| ########################## |
| #### BERcodec objects #### |
| ########################## |
| |
| class BERcodec_INTEGER(BERcodec_Object): |
| tag = ASN1_Class_UNIVERSAL.INTEGER |
| @classmethod |
| def enc(cls, i): |
| s = [] |
| while True: |
| s.append(i&0xff) |
| if -127 <= i < 0: |
| break |
| if 128 <= i <= 255: |
| s.append(0) |
| i >>= 8 |
| if not i: |
| break |
| s = [chb(hash(c)) for c in s] |
| s.append(BER_len_enc(len(s))) |
| s.append(chb(hash(cls.tag))) |
| s.reverse() |
| return b"".join(s) |
| @classmethod |
| def do_dec(cls, s, context=None, safe=False): |
| l,s,t = cls.check_type_check_len(s) |
| x = 0 |
| if s: |
| if orb(s[0])&0x80: # negative int |
| x = -1 |
| for c in s: |
| x <<= 8 |
| x |= orb(c) |
| return cls.asn1_object(x),t |
| |
| class BERcodec_BOOLEAN(BERcodec_INTEGER): |
| tag = ASN1_Class_UNIVERSAL.BOOLEAN |
| |
| class BERcodec_BIT_STRING(BERcodec_Object): |
| tag = ASN1_Class_UNIVERSAL.BIT_STRING |
| @classmethod |
| def do_dec(cls, s, context=None, safe=False): |
| # /!\ the unused_bits information is lost after this decoding |
| l,s,t = cls.check_type_check_len(s) |
| if len(s) > 0: |
| unused_bits = orb(s[0]) |
| if safe and unused_bits > 7: |
| raise BER_Decoding_Error("BERcodec_BIT_STRING: too many unused_bits advertised", remaining=s) |
| s = "".join(binrepr(orb(x)).zfill(8) for x in s[1:]) |
| if unused_bits > 0: |
| s = s[:-unused_bits] |
| return cls.tag.asn1_object(s),t |
| else: |
| raise BER_Decoding_Error("BERcodec_BIT_STRING found no content (not even unused_bits byte)", remaining=s) |
| @classmethod |
| def enc(cls,s): |
| # /!\ this is DER encoding (bit strings are only zero-bit padded) |
| s = raw(s) |
| if len(s) % 8 == 0: |
| unused_bits = 0 |
| else: |
| unused_bits = 8 - len(s)%8 |
| s += b"0"*unused_bits |
| s = b"".join(chb(int(b"".join(chb(y) for y in x),2)) for x in zip(*[iter(s)]*8)) |
| s = chb(unused_bits) + s |
| return chb(hash(cls.tag))+BER_len_enc(len(s))+s |
| |
| class BERcodec_STRING(BERcodec_Object): |
| tag = ASN1_Class_UNIVERSAL.STRING |
| @classmethod |
| def enc(cls,s): |
| s = raw(s) |
| return chb(hash(cls.tag))+BER_len_enc(len(s))+s # Be sure we are encoding bytes |
| @classmethod |
| def do_dec(cls, s, context=None, safe=False): |
| l,s,t = cls.check_type_check_len(s) |
| return cls.tag.asn1_object(s),t |
| |
| class BERcodec_NULL(BERcodec_INTEGER): |
| tag = ASN1_Class_UNIVERSAL.NULL |
| @classmethod |
| def enc(cls, i): |
| if i == 0: |
| return chb(hash(cls.tag))+b"\0" |
| else: |
| return super(cls,cls).enc(i) |
| |
| class BERcodec_OID(BERcodec_Object): |
| tag = ASN1_Class_UNIVERSAL.OID |
| @classmethod |
| def enc(cls, oid): |
| oid = raw(oid) |
| lst = [int(x) for x in oid.strip(b".").split(b".")] |
| if len(lst) >= 2: |
| lst[1] += 40*lst[0] |
| del(lst[0]) |
| s = b"".join(BER_num_enc(k) for k in lst) |
| return chb(hash(cls.tag))+BER_len_enc(len(s))+s |
| @classmethod |
| def do_dec(cls, s, context=None, safe=False): |
| l,s,t = cls.check_type_check_len(s) |
| lst = [] |
| while s: |
| l,s = BER_num_dec(s) |
| lst.append(l) |
| if (len(lst) > 0): |
| lst.insert(0,lst[0]//40) |
| lst[1] %= 40 |
| return cls.asn1_object(b".".join(str(k).encode('ascii') for k in lst)), t |
| |
| class BERcodec_ENUMERATED(BERcodec_INTEGER): |
| tag = ASN1_Class_UNIVERSAL.ENUMERATED |
| |
| class BERcodec_UTF8_STRING(BERcodec_STRING): |
| tag = ASN1_Class_UNIVERSAL.UTF8_STRING |
| |
| class BERcodec_NUMERIC_STRING(BERcodec_STRING): |
| tag = ASN1_Class_UNIVERSAL.NUMERIC_STRING |
| |
| class BERcodec_PRINTABLE_STRING(BERcodec_STRING): |
| tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING |
| |
| class BERcodec_T61_STRING(BERcodec_STRING): |
| tag = ASN1_Class_UNIVERSAL.T61_STRING |
| |
| class BERcodec_VIDEOTEX_STRING(BERcodec_STRING): |
| tag = ASN1_Class_UNIVERSAL.VIDEOTEX_STRING |
| |
| class BERcodec_IA5_STRING(BERcodec_STRING): |
| tag = ASN1_Class_UNIVERSAL.IA5_STRING |
| |
| class BERcodec_UTC_TIME(BERcodec_STRING): |
| tag = ASN1_Class_UNIVERSAL.UTC_TIME |
| |
| class BERcodec_GENERALIZED_TIME(BERcodec_STRING): |
| tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME |
| |
| class BERcodec_ISO646_STRING(BERcodec_STRING): |
| tag = ASN1_Class_UNIVERSAL.ISO646_STRING |
| |
| class BERcodec_UNIVERSAL_STRING(BERcodec_STRING): |
| tag = ASN1_Class_UNIVERSAL.UNIVERSAL_STRING |
| |
| class BERcodec_BMP_STRING(BERcodec_STRING): |
| tag = ASN1_Class_UNIVERSAL.BMP_STRING |
| |
| class BERcodec_SEQUENCE(BERcodec_Object): |
| tag = ASN1_Class_UNIVERSAL.SEQUENCE |
| @classmethod |
| def enc(cls, l): |
| if not isinstance(l, bytes): |
| l = b"".join(x.enc(cls.codec) for x in l) |
| return chb(hash(cls.tag))+BER_len_enc(len(l))+l |
| @classmethod |
| def do_dec(cls, s, context=None, safe=False): |
| if context is None: |
| context = cls.tag.context |
| l,st = cls.check_type_get_len(s) # we may have len(s) < l |
| s,t = st[:l],st[l:] |
| obj = [] |
| while s: |
| try: |
| o,s = BERcodec_Object.dec(s, context, safe) |
| except BER_Decoding_Error as err: |
| err.remaining += t |
| if err.decoded is not None: |
| obj.append(err.decoded) |
| err.decoded = obj |
| raise |
| obj.append(o) |
| if len(st) < l: |
| raise BER_Decoding_Error("Not enough bytes to decode sequence", decoded=obj) |
| return cls.asn1_object(obj),t |
| |
| class BERcodec_SET(BERcodec_SEQUENCE): |
| tag = ASN1_Class_UNIVERSAL.SET |
| |
| class BERcodec_IPADDRESS(BERcodec_STRING): |
| tag = ASN1_Class_UNIVERSAL.IPADDRESS |
| @classmethod |
| def enc(cls, ipaddr_ascii): |
| try: |
| s = inet_aton(ipaddr_ascii) |
| except Exception: |
| raise BER_Encoding_Error("IPv4 address could not be encoded") |
| return chb(hash(cls.tag))+BER_len_enc(len(s))+s |
| @classmethod |
| def do_dec(cls, s, context=None, safe=False): |
| l,s,t = cls.check_type_check_len(s) |
| try: |
| ipaddr_ascii = inet_ntoa(s) |
| except Exception: |
| raise BER_Decoding_Error("IP address could not be decoded", remaining=s) |
| return cls.asn1_object(ipaddr_ascii), t |
| |
| class BERcodec_COUNTER32(BERcodec_INTEGER): |
| tag = ASN1_Class_UNIVERSAL.COUNTER32 |
| |
| class BERcodec_GAUGE32(BERcodec_INTEGER): |
| tag = ASN1_Class_UNIVERSAL.GAUGE32 |
| |
| class BERcodec_TIME_TICKS(BERcodec_INTEGER): |
| tag = ASN1_Class_UNIVERSAL.TIME_TICKS |