Use PriorityQueue internally in PythonCANSockets (#3061)

* This PR adds a priority queue on python-can sockets to ensure the right order of packets.
Incorrect order of CAN packets in PythonCANSockets are a long existing
bug in Scapy and probably the reason for most instability in Unit Tests.

This PR consists of the following changes:
	* Use PriorityQueue in python_can Socket Multiplexers
	* Introduce prio counter to not have message inversion on identical time stamps
	* enable isotp.uts on non root CI systems to check stability of change
	* add additional unit test

* Enable unstable isotpscan tests to see if this PR has an effect on them

* Validate if PriorityQueue is causing the stability I'm seeing in the tests

* Validate again. This time remove prio code which shouldn't had any effect

* Revert "Validate again. This time remove prio code which shouldn't had any effect"

This reverts commit bd1d868d0277b7ae43a785c23ebd6e27b3d1b753.

* Revert "Validate if PriorityQueue is causing the stability I'm seeing in the tests"

This reverts commit a25c579d0d6b6d99b53a3670ca8d0d8a07f13fbf.

* disable some long tests

* fix rebase bug

* minor addition to __lt__ operator

* Add a comment why priority is necessary

* fix unit test
diff --git a/scapy/contrib/cansocket_python_can.py b/scapy/contrib/cansocket_python_can.py
index a82d526..456091c 100644
--- a/scapy/contrib/cansocket_python_can.py
+++ b/scapy/contrib/cansocket_python_can.py
@@ -13,7 +13,6 @@
 import time
 import struct
 import threading
-import copy
 
 from functools import reduce
 from operator import add
@@ -23,26 +22,70 @@
 from scapy.layers.can import CAN
 from scapy.error import warning
 from scapy.modules.six.moves import queue
+from scapy.compat import Any, List
 from can import Message as can_Message
 from can import CanError as can_CanError
 from can import BusABC as can_BusABC
 from can.interface import Bus as can_Bus
 
 
+class PriotizedCanMessage(object):
+    """Helper object for comparison of CAN messages. If the timestamps of two
+    messages are equal, the counter value of a priority counter, is used
+    for comparison. It's only important that this priority counter always
+    get increased for every CAN message in the receive heapq. This compensates
+    a low resolution of `time.time()` on some operating systems.
+    """
+    def __init__(self, msg, count):
+        # type: (can_Message, int) -> None
+        self.msg = msg
+        self.count = count
+
+    def __eq__(self, other):
+        # type: (Any) -> bool
+        if not isinstance(other, PriotizedCanMessage):
+            return False
+        return self.msg.timestamp == other.msg.timestamp and \
+            self.count == other.count
+
+    def __lt__(self, other):
+        # type: (Any) -> bool
+        if not isinstance(other, PriotizedCanMessage):
+            return False
+        return self.msg.timestamp < other.msg.timestamp or \
+            (self.msg.timestamp == other.msg.timestamp and
+             self.count < other.count)
+
+    def __le__(self, other):
+        # type: (Any) -> bool
+        return self == other or self < other
+
+    def __gt__(self, other):
+        # type: (Any) -> bool
+        return not self <= other
+
+    def __ge__(self, other):
+        # type: (Any) -> bool
+        return not self < other
+
+
 class SocketMapper:
     def __init__(self, bus, sockets):
-        self.bus = bus           # type: can_BusABC
-        self.sockets = sockets   # type: list[SocketWrapper]
+        # type: (can_BusABC, List[SocketWrapper]) -> None
+        self.bus = bus
+        self.sockets = sockets
 
     def mux(self):
         while True:
+            prio_count = 0
             try:
                 msg = self.bus.recv(timeout=0)
                 if msg is None:
                     return
                 for sock in self.sockets:
                     if sock._matches_filters(msg):
-                        sock.rx_queue.put(copy.copy(msg))
+                        prio_count += 1
+                        sock.rx_queue.put(PriotizedCanMessage(msg, prio_count))
             except Exception as e:
                 warning("[MUX] python-can exception caught: %s" % e)
 
@@ -57,7 +100,7 @@
             SocketsPool.__instance.pool_mutex = threading.Lock()
         return SocketsPool.__instance
 
-    def internal_send(self, sender, msg):
+    def internal_send(self, sender, msg, prio=0):
         with self.pool_mutex:
             try:
                 mapper = self.pool[sender.name]
@@ -68,9 +111,7 @@
                     if not sock._matches_filters(msg):
                         continue
 
-                    m = copy.copy(msg)
-                    m.timestamp = time.time()
-                    sock.rx_queue.put(m)
+                    sock.rx_queue.put(PriotizedCanMessage(msg, prio))
             except KeyError:
                 warning("[SND] Socket %s not found in pool" % sender.name)
             except can_CanError as e:
@@ -118,19 +159,22 @@
 
     def __init__(self, *args, **kwargs):
         super(SocketWrapper, self).__init__(*args, **kwargs)
-        self.rx_queue = queue.Queue()  # type: queue.Queue[can_Message]
+        self.rx_queue = queue.PriorityQueue()  # type: queue.PriorityQueue[PriotizedCanMessage]  # noqa: E501
         self.name = None
+        self.prio_counter = 0
         SocketsPool().register(self, *args, **kwargs)
 
     def _recv_internal(self, timeout):
         SocketsPool().multiplex_rx_packets()
         try:
-            return self.rx_queue.get(block=True, timeout=timeout), True
+            pm = self.rx_queue.get(block=True, timeout=timeout)
+            return pm.msg, True
         except queue.Empty:
             return None, True
 
     def send(self, msg, timeout=None):
-        SocketsPool().internal_send(self, msg)
+        self.prio_counter += 1
+        SocketsPool().internal_send(self, msg, self.prio_counter)
 
     def shutdown(self):
         SocketsPool().unregister(self)
@@ -165,6 +209,7 @@
                           arbitration_id=x.identifier,
                           dlc=x.length,
                           data=bytes(x)[8:])
+        msg.timestamp = time.time()
         try:
             x.sent_time = time.time()
         except AttributeError:
diff --git a/scapy/contrib/isotp.py b/scapy/contrib/isotp.py
index 084aee6..62271a6 100644
--- a/scapy/contrib/isotp.py
+++ b/scapy/contrib/isotp.py
@@ -1234,8 +1234,10 @@
                     if self.tx_gap == 0:
                         continue
                     else:
+                        # stop and wait for tx gap
                         self.tx_timeout_handle = TimeoutScheduler.schedule(
                             self.tx_gap, self._tx_timer_handler)
+                        return
 
     def on_recv(self, cf):
         """Function that must be called every time a CAN frame is received, to
diff --git a/test/contrib/isotp.uts b/test/contrib/isotp.uts
index ccbc281..65991a4 100644
--- a/test/contrib/isotp.uts
+++ b/test/contrib/isotp.uts
@@ -1602,6 +1602,21 @@
 assert(result[0].data == isotp.data)
 
 
+= Two ISOTPSockets at the same time, sending and receiving with tx_gap
+
+with new_can_socket0() as cs1, ISOTPSocket(cs1, sid=0x641, did=0x241, rx_separation_time_min=1) as s1, \
+        new_can_socket0() as cs2, ISOTPSocket(cs2, sid=0x241, did=0x641) as s2:
+    isotp = ISOTP(data=b"\x10\x25" * 43)
+    def sender():
+        s2.send(isotp)
+    t = Thread(target=sender)
+    result = s1.sniff(count=1, timeout=5, started_callback=t.start)
+    t.join(timeout=5)
+
+assert len(result) == 1
+assert(result[0].data == isotp.data)
+
+
 = Two ISOTPSockets at the same time, multiple sends/receives
 with new_can_socket0() as cs1, ISOTPSocket(cs1, sid=0x641, did=0x241) as s1, \
         new_can_socket0() as cs2, ISOTPSocket(cs2, sid=0x241, did=0x641) as s2:
diff --git a/test/contrib/isotpscan.uts b/test/contrib/isotpscan.uts
index ca5d01f..f4bbcc5 100644
--- a/test/contrib/isotpscan.uts
+++ b/test/contrib/isotpscan.uts
@@ -1,14 +1,12 @@
 % Regression tests for ISOTPScan
-
-# Currently too unstable
-
-~ disabled
+* Some tests are disabled to lower the CI utilitzation
 
 + Configuration
 ~ conf
 
 = Imports
 import scapy.modules.six as six
+from scapy.contrib.isotp import send_multiple_ext, filter_periodic_packets, scan_extended, scan
 
 if six.PY3:
     exec(open("test/contrib/automotive/interface_mockup.py").read())
@@ -737,15 +735,15 @@
 test_dynamic(test_isotpscan_code)
 
 = Test ISOTPScan with noise (output_format=code)
-
+~ disabled
 test_dynamic(test_isotpscan_code_noise)
 
 = Test extended ISOTPScan(output_format=code)
-
+~ disabled
 test_dynamic(test_extended_isotpscan_code)
 
 = Test extended ISOTPScan(output_format=code) extended_can_id
-
+~ disabled
 test_dynamic(test_extended_isotpscan_code_extended_can_id)
 
 = Test ISOTPScan(output_format=None)
@@ -765,7 +763,7 @@
 test_dynamic(test_isotpscan_none_random_ids)
 
 = Test ISOTPScan(output_format=None) random IDs padding
-
+~ disabled
 test_dynamic(test_isotpscan_none_random_ids_padding)
 
 + Cleanup