[scripts] add live capture for sniffer.py on simulation traffic (#5327)
This commit enhances sniffer.py so that we can use it as a standalone
sniffer for non Virtual Time simulations.
diff --git a/tests/scripts/thread-cert/pcap.py b/tests/scripts/thread-cert/pcap.py
index d478811..911f96e 100644
--- a/tests/scripts/thread-cert/pcap.py
+++ b/tests/scripts/thread-cert/pcap.py
@@ -80,6 +80,7 @@
pkt = self.encode_frame(frame, *timestamp)
self._pcap_file.write(pkt)
self._pcap_file.flush()
+ return pkt
def __del__(self):
self._pcap_file.close()
diff --git a/tests/scripts/thread-cert/sniffer.py b/tests/scripts/thread-cert/sniffer.py
old mode 100644
new mode 100755
index d6ad885..a189f5d
--- a/tests/scripts/thread-cert/sniffer.py
+++ b/tests/scripts/thread-cert/sniffer.py
@@ -31,6 +31,9 @@
import io
import logging
import os
+import sys
+import time
+
import pcap
import threading
import traceback
@@ -62,6 +65,9 @@
self._message_factory = message_factory
self._pcap = pcap.PcapCodec(os.getenv('TEST_NAME', 'current'))
+ if __name__ == '__main__':
+ sys.stdout.buffer.write(self._pcap.encode_header())
+ sys.stdout.buffer.flush()
# Create transport
transport_factory = sniffer_transport.SnifferTransportFactory()
@@ -81,19 +87,27 @@
while self._thread_alive.is_set():
data, nodeid = self._transport.recv(self.RECV_BUFFER_SIZE)
- self._pcap.append(data)
+ pkt = self._pcap.append(data)
+ if __name__ == '__main__':
+ try:
+ sys.stdout.buffer.write(pkt)
+ sys.stdout.flush()
+ except BrokenPipeError:
+ self._thread_alive.clear()
+ break
# Ignore any exceptions
- try:
- messages = self._message_factory.create(io.BytesIO(data))
- self.logger.debug("Received messages: {}".format(messages))
- for msg in messages:
- self._buckets[nodeid].put(msg)
+ if self._message_factory is not None:
+ try:
+ messages = self._message_factory.create(io.BytesIO(data))
+ self.logger.debug("Received messages: {}".format(messages))
+ for msg in messages:
+ self._buckets[nodeid].put(msg)
- except Exception as e:
- # Just print the exception to the console
- print("EXCEPTION: %s" % e)
- traceback.print_exc()
+ except Exception as e:
+ # Just print the exception to the console
+ self.logger.error("EXCEPTION: %s" % e)
+ traceback.print_exc()
self.logger.debug("Sniffer stopped.")
@@ -115,7 +129,7 @@
self._transport.close()
- self._thread.join()
+ self._thread.join(timeout=1)
self._thread = None
def set_lowpan_context(self, cid, prefix):
@@ -140,3 +154,19 @@
messages.append(bucket.get_nowait())
return message.MessagesSet(messages)
+
+
+def run_sniffer():
+ sniffer = Sniffer(None)
+ sniffer.start()
+ while sniffer._thread_alive.is_set():
+ try:
+ time.sleep(1)
+ except KeyboardInterrupt:
+ break
+
+ sniffer.stop()
+
+
+if __name__ == '__main__':
+ run_sniffer()