Use PriorityQueue internally in PythonCANSockets (#3061)
* This PR adds a priority queue on python-can sockets to ensure the right order of packets.
Incorrect order of CAN packets in PythonCANSockets are a long existing
bug in Scapy and probably the reason for most instability in Unit Tests.
This PR consists of the following changes:
* Use PriorityQueue in python_can Socket Multiplexers
* Introduce prio counter to not have message inversion on identical time stamps
* enable isotp.uts on non root CI systems to check stability of change
* add additional unit test
* Enable unstable isotpscan tests to see if this PR has an effect on them
* Validate if PriorityQueue is causing the stability I'm seeing in the tests
* Validate again. This time remove prio code which shouldn't had any effect
* Revert "Validate again. This time remove prio code which shouldn't had any effect"
This reverts commit bd1d868d0277b7ae43a785c23ebd6e27b3d1b753.
* Revert "Validate if PriorityQueue is causing the stability I'm seeing in the tests"
This reverts commit a25c579d0d6b6d99b53a3670ca8d0d8a07f13fbf.
* disable some long tests
* fix rebase bug
* minor addition to __lt__ operator
* Add a comment why priority is necessary
* fix unit test
diff --git a/scapy/contrib/cansocket_python_can.py b/scapy/contrib/cansocket_python_can.py
index a82d526..456091c 100644
--- a/scapy/contrib/cansocket_python_can.py
+++ b/scapy/contrib/cansocket_python_can.py
@@ -13,7 +13,6 @@
import time
import struct
import threading
-import copy
from functools import reduce
from operator import add
@@ -23,26 +22,70 @@
from scapy.layers.can import CAN
from scapy.error import warning
from scapy.modules.six.moves import queue
+from scapy.compat import Any, List
from can import Message as can_Message
from can import CanError as can_CanError
from can import BusABC as can_BusABC
from can.interface import Bus as can_Bus
+class PriotizedCanMessage(object):
+ """Helper object for comparison of CAN messages. If the timestamps of two
+ messages are equal, the counter value of a priority counter, is used
+ for comparison. It's only important that this priority counter always
+ get increased for every CAN message in the receive heapq. This compensates
+ a low resolution of `time.time()` on some operating systems.
+ """
+ def __init__(self, msg, count):
+ # type: (can_Message, int) -> None
+ self.msg = msg
+ self.count = count
+
+ def __eq__(self, other):
+ # type: (Any) -> bool
+ if not isinstance(other, PriotizedCanMessage):
+ return False
+ return self.msg.timestamp == other.msg.timestamp and \
+ self.count == other.count
+
+ def __lt__(self, other):
+ # type: (Any) -> bool
+ if not isinstance(other, PriotizedCanMessage):
+ return False
+ return self.msg.timestamp < other.msg.timestamp or \
+ (self.msg.timestamp == other.msg.timestamp and
+ self.count < other.count)
+
+ def __le__(self, other):
+ # type: (Any) -> bool
+ return self == other or self < other
+
+ def __gt__(self, other):
+ # type: (Any) -> bool
+ return not self <= other
+
+ def __ge__(self, other):
+ # type: (Any) -> bool
+ return not self < other
+
+
class SocketMapper:
def __init__(self, bus, sockets):
- self.bus = bus # type: can_BusABC
- self.sockets = sockets # type: list[SocketWrapper]
+ # type: (can_BusABC, List[SocketWrapper]) -> None
+ self.bus = bus
+ self.sockets = sockets
def mux(self):
while True:
+ prio_count = 0
try:
msg = self.bus.recv(timeout=0)
if msg is None:
return
for sock in self.sockets:
if sock._matches_filters(msg):
- sock.rx_queue.put(copy.copy(msg))
+ prio_count += 1
+ sock.rx_queue.put(PriotizedCanMessage(msg, prio_count))
except Exception as e:
warning("[MUX] python-can exception caught: %s" % e)
@@ -57,7 +100,7 @@
SocketsPool.__instance.pool_mutex = threading.Lock()
return SocketsPool.__instance
- def internal_send(self, sender, msg):
+ def internal_send(self, sender, msg, prio=0):
with self.pool_mutex:
try:
mapper = self.pool[sender.name]
@@ -68,9 +111,7 @@
if not sock._matches_filters(msg):
continue
- m = copy.copy(msg)
- m.timestamp = time.time()
- sock.rx_queue.put(m)
+ sock.rx_queue.put(PriotizedCanMessage(msg, prio))
except KeyError:
warning("[SND] Socket %s not found in pool" % sender.name)
except can_CanError as e:
@@ -118,19 +159,22 @@
def __init__(self, *args, **kwargs):
super(SocketWrapper, self).__init__(*args, **kwargs)
- self.rx_queue = queue.Queue() # type: queue.Queue[can_Message]
+ self.rx_queue = queue.PriorityQueue() # type: queue.PriorityQueue[PriotizedCanMessage] # noqa: E501
self.name = None
+ self.prio_counter = 0
SocketsPool().register(self, *args, **kwargs)
def _recv_internal(self, timeout):
SocketsPool().multiplex_rx_packets()
try:
- return self.rx_queue.get(block=True, timeout=timeout), True
+ pm = self.rx_queue.get(block=True, timeout=timeout)
+ return pm.msg, True
except queue.Empty:
return None, True
def send(self, msg, timeout=None):
- SocketsPool().internal_send(self, msg)
+ self.prio_counter += 1
+ SocketsPool().internal_send(self, msg, self.prio_counter)
def shutdown(self):
SocketsPool().unregister(self)
@@ -165,6 +209,7 @@
arbitration_id=x.identifier,
dlc=x.length,
data=bytes(x)[8:])
+ msg.timestamp = time.time()
try:
x.sent_time = time.time()
except AttributeError:
diff --git a/scapy/contrib/isotp.py b/scapy/contrib/isotp.py
index 084aee6..62271a6 100644
--- a/scapy/contrib/isotp.py
+++ b/scapy/contrib/isotp.py
@@ -1234,8 +1234,10 @@
if self.tx_gap == 0:
continue
else:
+ # stop and wait for tx gap
self.tx_timeout_handle = TimeoutScheduler.schedule(
self.tx_gap, self._tx_timer_handler)
+ return
def on_recv(self, cf):
"""Function that must be called every time a CAN frame is received, to
diff --git a/test/contrib/isotp.uts b/test/contrib/isotp.uts
index ccbc281..65991a4 100644
--- a/test/contrib/isotp.uts
+++ b/test/contrib/isotp.uts
@@ -1602,6 +1602,21 @@
assert(result[0].data == isotp.data)
+= Two ISOTPSockets at the same time, sending and receiving with tx_gap
+
+with new_can_socket0() as cs1, ISOTPSocket(cs1, sid=0x641, did=0x241, rx_separation_time_min=1) as s1, \
+ new_can_socket0() as cs2, ISOTPSocket(cs2, sid=0x241, did=0x641) as s2:
+ isotp = ISOTP(data=b"\x10\x25" * 43)
+ def sender():
+ s2.send(isotp)
+ t = Thread(target=sender)
+ result = s1.sniff(count=1, timeout=5, started_callback=t.start)
+ t.join(timeout=5)
+
+assert len(result) == 1
+assert(result[0].data == isotp.data)
+
+
= Two ISOTPSockets at the same time, multiple sends/receives
with new_can_socket0() as cs1, ISOTPSocket(cs1, sid=0x641, did=0x241) as s1, \
new_can_socket0() as cs2, ISOTPSocket(cs2, sid=0x241, did=0x641) as s2:
diff --git a/test/contrib/isotpscan.uts b/test/contrib/isotpscan.uts
index ca5d01f..f4bbcc5 100644
--- a/test/contrib/isotpscan.uts
+++ b/test/contrib/isotpscan.uts
@@ -1,14 +1,12 @@
% Regression tests for ISOTPScan
-
-# Currently too unstable
-
-~ disabled
+* Some tests are disabled to lower the CI utilitzation
+ Configuration
~ conf
= Imports
import scapy.modules.six as six
+from scapy.contrib.isotp import send_multiple_ext, filter_periodic_packets, scan_extended, scan
if six.PY3:
exec(open("test/contrib/automotive/interface_mockup.py").read())
@@ -737,15 +735,15 @@
test_dynamic(test_isotpscan_code)
= Test ISOTPScan with noise (output_format=code)
-
+~ disabled
test_dynamic(test_isotpscan_code_noise)
= Test extended ISOTPScan(output_format=code)
-
+~ disabled
test_dynamic(test_extended_isotpscan_code)
= Test extended ISOTPScan(output_format=code) extended_can_id
-
+~ disabled
test_dynamic(test_extended_isotpscan_code_extended_can_id)
= Test ISOTPScan(output_format=None)
@@ -765,7 +763,7 @@
test_dynamic(test_isotpscan_none_random_ids)
= Test ISOTPScan(output_format=None) random IDs padding
-
+~ disabled
test_dynamic(test_isotpscan_none_random_ids_padding)
+ Cleanup