| % Regression tests for nativecansocket |
| ~ python3_only |
| |
| # More informations at http://www.secdev.org/projects/UTscapy/ |
| |
| |
| ############ |
| ############ |
| + Configuration of CAN virtual sockets |
| |
| = Load module |
| ~ conf command needs_root linux |
| |
| load_layer("can") |
| conf.contribs['CANSocket'] = {'use-python-can': False} |
| from scapy.contrib.cansocket_native import * |
| |
| = Setup string for vcan |
| ~ conf command |
| bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'" |
| |
| = Load os |
| ~ conf command needs_root linux |
| |
| import os |
| import threading |
| from time import sleep |
| |
| = Setup vcan0 |
| ~ conf command needs_root linux |
| |
| 0 == os.system(bashCommand) |
| |
| + Basic Packet Tests() |
| = CAN Packet init |
| ~ needs_root linux |
| |
| canframe = CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') |
| bytes(canframe) == b'\x00\x00\x07\xff\x08\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08' |
| |
| + Basic Socket Tests() |
| = CAN Socket Init |
| ~ needs_root linux |
| |
| sock1 = CANSocket(iface="vcan0") |
| |
| = CAN Socket send recv small packet |
| ~ needs_root linux |
| |
| def sender(): |
| sleep(0.1) |
| sock2 = CANSocket(iface="vcan0") |
| sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01')) |
| |
| thread = threading.Thread(target=sender) |
| thread.start() |
| rx = sock1.recv() |
| rx == CAN(identifier=0x7ff,length=1,data=b'\x01') |
| |
| = CAN Socket send recv |
| ~ needs_root linux |
| |
| def sender(): |
| sleep(0.1) |
| sock2 = CANSocket(iface="vcan0") |
| sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| |
| thread = threading.Thread(target=sender) |
| thread.start() |
| rx = sock1.recv() |
| rx == CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') |
| |
| + Advanced Socket Tests() |
| = CAN Socket sr1 |
| ~ needs_root linux |
| |
| tx = CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') |
| |
| = CAN Socket sr1 init time |
| ~ needs_root linux |
| |
| tx.sent_time == None |
| |
| def sender(): |
| sleep(0.1) |
| sock2 = CANSocket(iface="vcan0") |
| sock2.send(tx) |
| |
| thread = threading.Thread(target=sender) |
| thread.start() |
| rx = None |
| rx = sock1.sr1(tx) |
| rx == tx |
| |
| = CAN Socket sr1 time check |
| ~ needs_root linux |
| |
| tx.sent_time < rx.time and rx.time > 0 |
| |
| = srcan |
| ~ needs_root linux |
| |
| tx = CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') |
| |
| = srcan check init time |
| ~ needs_root linux |
| |
| tx.sent_time == None |
| |
| def sender(): |
| sleep(0.1) |
| sock2 = CANSocket(iface="vcan0") |
| sock2.send(tx) |
| |
| thread = threading.Thread(target=sender) |
| thread.start() |
| rx = None |
| rx = srcan(tx, "vcan0", timeout=1) |
| rx = rx[0][0][1] |
| tx == rx |
| |
| = srcan check rx and tx |
| ~ needs_root linux |
| |
| tx.sent_time > 0 and rx.time > 0 and tx.sent_time < rx.time |
| |
| = sniff with filtermask 0x7ff |
| ~ needs_root linux |
| |
| sock1 = CANSocket(iface='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) |
| |
| def sender(): |
| sleep(0.1) |
| sock2 = CANSocket(iface="vcan0") |
| sock2.send(CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| |
| thread = threading.Thread(target=sender) |
| thread.start() |
| packets = sock1.sniff(timeout=0.3) |
| len(packets) == 3 |
| |
| = sniff with filtermask 0x700 |
| ~ needs_root linux |
| |
| sock1 = CANSocket(iface='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x700}]) |
| |
| def sender(): |
| sleep(0.1) |
| sock2 = CANSocket(iface="vcan0") |
| sock2.send(CAN(identifier=0x212, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x2ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x1ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x2aa, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| |
| thread = threading.Thread(target=sender) |
| thread.start() |
| packets = sock1.sniff(timeout=0.3) |
| len(packets) == 4 |
| |
| = sniff with filtermask 0x0ff |
| ~ needs_root linux |
| |
| sock1 = CANSocket(iface='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x0ff}]) |
| |
| def sender(): |
| sleep(0.1) |
| sock2 = CANSocket(iface="vcan0") |
| sock2.send(CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x301, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x1ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x700, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| |
| thread = threading.Thread(target=sender) |
| thread.start() |
| packets = sock1.sniff(timeout=0.3) |
| len(packets) == 4 |
| |
| = sniff with multiple filters |
| ~ needs_root linux |
| |
| sock1 = CANSocket(iface='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}, {'can_id': 0x400, 'can_mask': 0x7ff}, {'can_id': 0x600, 'can_mask': 0x7ff}, {'can_id': 0x7ff, 'can_mask': 0x7ff}]) |
| |
| def sender(): |
| sleep(0.1) |
| sock2 = CANSocket(iface="vcan0") |
| sock2.send(CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x400, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x500, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x600, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x700, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x7ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| |
| thread = threading.Thread(target=sender) |
| thread.start() |
| packets = sock1.sniff(timeout=0.3) |
| len(packets) == 4 |
| |
| = sniff with filtermask 0x7ff and inverse filter |
| ~ needs_root linux |
| |
| sock1 = CANSocket(iface='vcan0', can_filters=[{'can_id': 0x200 | CAN_INV_FILTER, 'can_mask': 0x7ff}]) |
| |
| def sender(): |
| sleep(0.1) |
| sock2 = CANSocket(iface="vcan0") |
| sock2.send(CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| |
| thread = threading.Thread(target=sender) |
| thread.start() |
| packets = sock1.sniff(timeout=0.3) |
| len(packets) == 2 |
| |
| = sniff with filtermask 0x1FFFFFFF |
| ~ needs_root linux |
| |
| sock1 = CANSocket(iface='vcan0', can_filters=[{'can_id': 0x10000000, 'can_mask': 0x1fffffff}]) |
| |
| def sender(): |
| sleep(0.1) |
| sock2 = CANSocket(iface="vcan0") |
| sock2.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock2.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| |
| thread = threading.Thread(target=sender) |
| thread.start() |
| packets = sock1.sniff(timeout=0.3) |
| len(packets) == 2 |
| |
| = sniff with filtermask 0x1FFFFFFF and inverse filter |
| ~ needs_root linux |
| |
| sock1 = CANSocket(iface='vcan0', can_filters=[{'can_id': 0x10000000 | CAN_INV_FILTER, 'can_mask': 0x1fffffff}]) |
| |
| if six.PY3: |
| thread = threading.Thread(target=sender) |
| thread.start() |
| packets = sock1.sniff(timeout=0.3) |
| len(packets) == 4 |
| |
| = CAN Socket sr1 with receive own messages |
| ~ needs_root linux |
| |
| sock1 = CANSocket(iface="vcan0", receive_own_messages=True) |
| tx = CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') |
| rx = None |
| rx = sock1.sr1(tx) |
| tx == rx |
| tx.sent_time < rx.time and tx == rx and rx.time > 0 |
| |
| = srcan |
| ~ needs_root linux |
| |
| tx = CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') |
| rx = None |
| rx = srcan(tx, iface="vcan0", receive_own_messages=True, timeout=1) |
| tx == rx[0][0][1] |