Implementation of lower layer ISOTP frames
diff --git a/scapy/contrib/isotp.py b/scapy/contrib/isotp.py
index 990b848..8eced92 100644
--- a/scapy/contrib/isotp.py
+++ b/scapy/contrib/isotp.py
@@ -14,7 +14,10 @@
from threading import Thread, Event, RLock
from scapy.packet import Packet
-from scapy.fields import StrField
+from scapy.fields import BitField, FlagsField, StrLenField, \
+ ThreeBytesField, XBitField, ConditionalField, \
+ BitEnumField, ByteField, XByteField, BitFieldLenField, StrField
+from scapy.compat import chb, orb
from scapy.layers.can import CAN
import scapy.modules.six as six
from scapy.error import Scapy_Exception, warning, log_loading
@@ -22,8 +25,9 @@
from scapy.config import conf
from scapy.consts import LINUX
-__all__ = ["ISOTP", "ISOTPSniffer", "ISOTPSoftSocket", "ISOTPSocket",
- "ISOTPSocketImplementation", "ISOTPMessageBuilder"]
+__all__ = ["ISOTP", "ISOTPHeader", "ISOTPHeaderEA", "ISOTP_SF", "ISOTP_FF",
+ "ISOTP_CF", "ISOTP_FC", "ISOTPSniffer", "ISOTPSoftSocket",
+ "ISOTPSocket", "ISOTPSocketImplementation", "ISOTPMessageBuilder"]
USE_CAN_ISOTP_KERNEL_MODULE = False
if six.PY3 and LINUX:
@@ -168,6 +172,107 @@
return results[0]
+class ISOTPHeader(CAN):
+ name = 'ISOTPHeader'
+ fields_desc = [
+ FlagsField('flags', 0, 3, ['error',
+ 'remote_transmission_request',
+ 'extended']),
+ XBitField('identifier', 0, 29),
+ ByteField('length', None),
+ ThreeBytesField('reserved', 0),
+ ]
+
+ def extract_padding(self, p):
+ return p, None
+
+ def post_build(self, p, pay):
+ """
+ This will set the ByteField 'length' to the correct value.
+ """
+ if self.length is None:
+ p = p[:4] + chb(len(pay)) + p[5:]
+ return p + pay
+
+ def guess_payload_class(self, payload):
+ """
+ ISOTP encodes the frame type in the first nibble of a frame.
+ """
+ t = (orb(payload[0]) & 0xf0) >> 4
+ if t == 0:
+ return ISOTP_SF
+ elif t == 1:
+ return ISOTP_FF
+ elif t == 2:
+ return ISOTP_CF
+ else:
+ return ISOTP_FC
+
+
+class ISOTPHeaderEA(ISOTPHeader):
+ name = 'ISOTPHeaderExtendedAddress'
+ fields_desc = ISOTPHeader.fields_desc + [
+ XByteField('extended_address', 0),
+ ]
+
+ def post_build(self, p, pay):
+ """
+ This will set the ByteField 'length' to the correct value.
+ 'chb(len(pay) + 1)' is required, because the field 'extended_address'
+ is counted as payload on the CAN layer
+ """
+ if self.length is None:
+ p = p[:4] + chb(len(pay) + 1) + p[5:]
+ return p + pay
+
+
+ISOTP_TYPE = {0: 'single',
+ 1: 'first',
+ 2: 'consecutive',
+ 3: 'flow_control'}
+
+
+class ISOTP_SF(Packet):
+ name = 'ISOTPSingleFrame'
+ fields_desc = [
+ BitEnumField('type', 0, 4, ISOTP_TYPE),
+ BitFieldLenField('message_size', None, 4, length_of='data'),
+ StrLenField('data', '', length_from=lambda pkt: pkt.message_size)
+ ]
+
+
+class ISOTP_FF(Packet):
+ name = 'ISOTPFirstFrame'
+ fields_desc = [
+ BitEnumField('type', 1, 4, ISOTP_TYPE),
+ BitField('message_size', 0, 12),
+ ConditionalField(BitField('extended_message_size', 0, 32),
+ lambda pkt: pkt.message_size == 0),
+ StrField('data', '', fmt="B")
+ ]
+
+
+class ISOTP_CF(Packet):
+ name = 'ISOTPConsecutiveFrame'
+ fields_desc = [
+ BitEnumField('type', 2, 4, ISOTP_TYPE),
+ BitField('index', 0, 4),
+ StrField('data', '', fmt="B")
+ ]
+
+
+class ISOTP_FC(Packet):
+ name = 'ISOTPFlowControlFrame'
+ fields_desc = [
+ BitEnumField('type', 3, 4, ISOTP_TYPE),
+ BitEnumField('fc_flag', 0, 4, {0: 'continue',
+ 1: 'wait',
+ 2: 'abort'}),
+ ByteField('block_size', 0),
+ ByteField('separation_time', 0),
+ ]
+
+
class ISOTPMessageBuilder:
"""
Utility class to build ISOTP messages out of CAN frames, used by both
diff --git a/scapy/contrib/isotp.uts b/scapy/contrib/isotp.uts
index cb2e15b..5aeba12 100644
--- a/scapy/contrib/isotp.uts
+++ b/scapy/contrib/isotp.uts
@@ -122,7 +122,7 @@
= Import isotp
conf.contribs['ISOTP'] = {'use-can-isotp-kernel-module': False}
load_contrib("isotp")
-from scapy.contrib.isotp import ISOTP
+
+ ISOTP packet check
@@ -138,6 +138,192 @@
assert(p.data == b"eee")
assert(bytes(p) == b"eee")
++ ISOTPFrame related checks
+
+= Build a packet with extended addressing
+
+pkt = CAN(identifier=0x123, data=b'\x42\x10\xff\xde\xea\xdd\xaa\xaa')
+isotpex = ISOTPHeaderEA(bytes(pkt))
+assert(isotpex.type == 1)
+assert(isotpex.message_size == 0xff)
+assert(isotpex.extended_address == 0x42)
+assert(isotpex.identifier == 0x123)
+assert(isotpex.length == 8)
+
+
+= Build a packet with normal addressing
+
+pkt = CAN(identifier=0x123, data=b'\x10\xff\xde\xea\xdd\xaa\xaa')
+isotpno = ISOTPHeader(bytes(pkt))
+assert(isotpno.type == 1)
+assert(isotpno.message_size == 0xff)
+assert(isotpno.identifier == 0x123)
+assert(isotpno.length == 7)
+
+
+= Compare both isotp payloads
+
+assert(isotpno.data == isotpex.data)
+assert(isotpno.message_size == isotpex.message_size)
+
+
+= Dissect multiple packets
+
+frames = \
+ [b'\x00\x00\x00\x00\x08\x00\x00\x00\x10(\xde\xad\xbe\xef\xde\xad',
+ b'\x00\x00\x00\x00\x08\x00\x00\x00!\xbe\xef\xde\xad\xbe\xef\xde',
+ b'\x00\x00\x00\x00\x08\x00\x00\x00"\xad\xbe\xef\xde\xad\xbe\xef',
+ b'\x00\x00\x00\x00\x08\x00\x00\x00#\xde\xad\xbe\xef\xde\xad\xbe',
+ b'\x00\x00\x00\x00\x08\x00\x00\x00$\xef\xde\xad\xbe\xef\xde\xad',
+ b'\x00\x00\x00\x00\x07\x00\x00\x00%\xbe\xef\xde\xad\xbe\xef']
+
+isotpframes = [ISOTPHeader(x) for x in frames]
+
+assert(isotpframes[0].type == 1)
+assert(isotpframes[0].message_size == 40)
+assert(isotpframes[0].length == 8)
+assert(isotpframes[1].type == 2)
+assert(isotpframes[1].index == 1)
+assert(isotpframes[1].length == 8)
+assert(isotpframes[2].type == 2)
+assert(isotpframes[2].index == 2)
+assert(isotpframes[2].length == 8)
+assert(isotpframes[3].type == 2)
+assert(isotpframes[3].index == 3)
+assert(isotpframes[3].length == 8)
+assert(isotpframes[4].type == 2)
+assert(isotpframes[4].index == 4)
+assert(isotpframes[4].length == 8)
+assert(isotpframes[5].type == 2)
+assert(isotpframes[5].index == 5)
+assert(isotpframes[5].length == 7)
+
+= Build SF frame with constructor, check for correct length assignments
+
+p = ISOTPHeader(bytes(ISOTPHeader()/ISOTP_SF(data=b'\xad\xbe\xad\xff')))
+assert(p.length == 5)
+assert(p.message_size == 4)
+assert(len(p.data) == 4)
+assert(p.data == b'\xad\xbe\xad\xff')
+assert(p.type == 0)
+assert(p.identifier == 0)
+
+= Build SF frame EA with constructor, check for correct length assignments
+
+p = ISOTPHeaderEA(bytes(ISOTPHeaderEA()/ISOTP_SF(data=b'\xad\xbe\xad\xff')))
+assert(p.extended_address == 0)
+assert(p.length == 6)
+assert(p.message_size == 4)
+assert(len(p.data) == 4)
+assert(p.data == b'\xad\xbe\xad\xff')
+assert(p.type == 0)
+assert(p.identifier == 0)
+
+= Build FF frame with constructor, check for correct length assignments
+
+p = ISOTPHeader(bytes(ISOTPHeader()/ISOTP_FF(message_size=10, data=b'\xad\xbe\xad\xff')))
+assert(p.length == 6)
+assert(p.message_size == 10)
+assert(len(p.data) == 4)
+assert(p.data == b'\xad\xbe\xad\xff')
+assert(p.type == 1)
+assert(p.identifier == 0)
+
+= Build FF frame EA with constructor, check for correct length assignments
+
+p = ISOTPHeaderEA(bytes(ISOTPHeaderEA()/ISOTP_FF(message_size=10, data=b'\xad\xbe\xad\xff')))
+assert(p.extended_address == 0)
+assert(p.length == 7)
+assert(p.message_size == 10)
+assert(len(p.data) == 4)
+assert(p.data == b'\xad\xbe\xad\xff')
+assert(p.type == 1)
+assert(p.identifier == 0)
+
+= Build FF frame EA, extended size, with constructor, check for correct length assignments
+
+p = ISOTPHeaderEA(bytes(ISOTPHeaderEA()/ISOTP_FF(message_size=0,
+ extended_message_size=2000,
+ data=b'\xad')))
+assert(p.extended_address == 0)
+assert(p.length == 8)
+assert(p.message_size == 0)
+assert(p.extended_message_size == 2000)
+assert(len(p.data) == 1)
+assert(p.data == b'\xad')
+assert(p.type == 1)
+assert(p.identifier == 0)
+
+= Build FF frame, extended size, with constructor, check for correct length assignments
+
+p = ISOTPHeader(bytes(ISOTPHeader()/ISOTP_FF(message_size=0,
+ extended_message_size=2000,
+ data=b'\xad')))
+assert(p.length == 7)
+assert(p.message_size == 0)
+assert(p.extended_message_size == 2000)
+assert(len(p.data) == 1)
+assert(p.data == b'\xad')
+assert(p.type == 1)
+assert(p.identifier == 0)
+
+= Build CF frame with constructor, check for correct length assignments
+
+p = ISOTPHeader(bytes(ISOTPHeader()/ISOTP_CF(data=b'\xad')))
+assert(p.length == 2)
+assert(p.index == 0)
+assert(len(p.data) == 1)
+assert(p.data == b'\xad')
+assert(p.type == 2)
+assert(p.identifier == 0)
+
+= Build CF frame EA with constructor, check for correct length assignments
+
+p = ISOTPHeaderEA(bytes(ISOTPHeaderEA()/ISOTP_CF(data=b'\xad')))
+assert(p.length == 3)
+assert(p.index == 0)
+assert(len(p.data) == 1)
+assert(p.data == b'\xad')
+assert(p.type == 2)
+assert(p.identifier == 0)
+
+= Build FC frame EA with constructor, check for correct length assignments
+
+p = ISOTPHeaderEA(bytes(ISOTPHeaderEA()/ISOTP_FC()))
+assert(p.length == 4)
+assert(p.block_size == 0)
+assert(p.separation_time == 0)
+assert(p.type == 3)
+assert(p.identifier == 0)
+
+= Build FC frame with constructor, check for correct length assignments
+
+p = ISOTPHeader(bytes(ISOTPHeader()/ISOTP_FC()))
+assert(p.length == 3)
+assert(p.block_size == 0)
+assert(p.separation_time == 0)
+assert(p.type == 3)
+assert(p.identifier == 0)
+
+= Construct some single frames
+
+p = ISOTPHeader(identifier=0x123, length=5)/ISOTP_SF(message_size=4, data=b'abcd')
+assert(p.length == 5)
+assert(p.identifier == 0x123)
+assert(p.type == 0)
+assert(p.message_size == 4)
+assert(p.data == b'abcd')
+
+= Construct some single frames EA
+
+p = ISOTPHeaderEA(identifier=0x123, length=6, extended_address=42)/ISOTP_SF(message_size=4, data=b'abcd')
+assert(p.length == 6)
+assert(p.extended_address == 42)
+assert(p.identifier == 0x123)
+assert(p.type == 0)
+assert(p.message_size == 4)
+assert(p.data == b'abcd')
+
+ ISOTP fragment and defragment checks