[thread-cert] refactor case 5.3.2 using pktverify (#5803)
diff --git a/tests/scripts/thread-cert/Cert_5_3_02_RealmLocal.py b/tests/scripts/thread-cert/Cert_5_3_02_RealmLocal.py
index 737927e..3b12627 100755
--- a/tests/scripts/thread-cert/Cert_5_3_02_RealmLocal.py
+++ b/tests/scripts/thread-cert/Cert_5_3_02_RealmLocal.py
@@ -31,33 +31,59 @@
import config
import thread_cert
+from pktverify.consts import MLE_CHILD_ID_REQUEST, MLE_CHILD_ID_RESPONSE, REALM_LOCAL_ALL_NODES_ADDRESS, REALM_LOCAL_ALL_ROUTERS_ADDRESS, REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS
+from pktverify.packet_verifier import PacketVerifier
LEADER = 1
ROUTER1 = 2
DUT_ROUTER2 = 3
SED1 = 4
+FRAGMENTED_DATA_LEN = 256
+
+# Test Purpose and Description:
+# -----------------------------
+# The purpose of this test case is to validate the Realm-Local addresses
+# that the DUT auto-configures.
+#
+# Test Topology:
+# -------------
+# Leader
+# |
+# Router_1 - Router_2(DUT)
+# |
+# SED
+#
+# DUT Types:
+# ----------
+# Router
class Cert_5_3_2_RealmLocal(thread_cert.TestCase):
+ USE_MESSAGE_FACTORY = False
+
TOPOLOGY = {
LEADER: {
+ 'name': 'LEADER',
'mode': 'rdn',
'panid': 0xface,
'allowlist': [ROUTER1]
},
ROUTER1: {
+ 'name': 'ROUTER_1',
'mode': 'rdn',
'panid': 0xface,
'router_selection_jitter': 1,
'allowlist': [LEADER, DUT_ROUTER2]
},
DUT_ROUTER2: {
+ 'name': 'ROUTER_2',
'mode': 'rdn',
'panid': 0xface,
'router_selection_jitter': 1,
'allowlist': [ROUTER1, SED1]
},
SED1: {
+ 'name': 'SED',
'is_mtd': True,
'mode': 'n',
'panid': 0xface,
@@ -84,46 +110,191 @@
self.simulator.go(5)
self.assertEqual(self.nodes[SED1].get_state(), 'child')
+ self.collect_ipaddrs()
+ self.collect_rloc16s()
+
# 2 & 3
mleid = self.nodes[DUT_ROUTER2].get_ip6_address(config.ADDRESS_TYPE.ML_EID)
- self.assertTrue(self.nodes[LEADER].ping(mleid, size=256))
+ self.assertTrue(self.nodes[LEADER].ping(mleid, size=FRAGMENTED_DATA_LEN))
+ self.simulator.go(2)
self.assertTrue(self.nodes[LEADER].ping(mleid))
+ self.simulator.go(2)
# 4 & 5
- self.assertTrue(self.nodes[LEADER].ping('ff03::1', num_responses=2, size=256))
- sed_messages = self.simulator.get_messages_sent_by(SED1)
- self.assertFalse(sed_messages.contains_icmp_message())
-
+ self.assertTrue(self.nodes[LEADER].ping('ff03::1', num_responses=2, size=FRAGMENTED_DATA_LEN))
+ self.simulator.go(5)
self.assertTrue(self.nodes[LEADER].ping('ff03::1', num_responses=2))
- sed_messages = self.simulator.get_messages_sent_by(SED1)
- self.assertFalse(sed_messages.contains_icmp_message())
+ self.simulator.go(5)
# 6 & 7
- self.assertTrue(self.nodes[LEADER].ping('ff03::2', num_responses=2, size=256))
- sed_messages = self.simulator.get_messages_sent_by(SED1)
- self.assertFalse(sed_messages.contains_icmp_message())
-
+ self.assertTrue(self.nodes[LEADER].ping('ff03::2', num_responses=2, size=FRAGMENTED_DATA_LEN))
+ self.simulator.go(5)
self.assertTrue(self.nodes[LEADER].ping('ff03::2', num_responses=2))
- sed_messages = self.simulator.get_messages_sent_by(SED1)
- self.assertFalse(sed_messages.contains_icmp_message())
+ self.simulator.go(5)
# 8
self.assertTrue(self.nodes[LEADER].ping(
config.REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS,
num_responses=3,
- size=256,
+ size=FRAGMENTED_DATA_LEN,
))
- self.simulator.go(2)
- sed_messages = self.simulator.get_messages_sent_by(SED1)
- self.assertTrue(sed_messages.contains_icmp_message())
+ self.simulator.go(5)
- self.assertTrue(self.nodes[LEADER].ping(
- config.REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS,
- num_responses=3,
- ))
- self.simulator.go(2)
- sed_messages = self.simulator.get_messages_sent_by(SED1)
- self.assertTrue(sed_messages.contains_icmp_message())
+ def verify(self, pv):
+ pkts = pv.pkts
+ pv.summary.show()
+
+ LEADER = pv.vars['LEADER']
+ LEADER_MLEID = pv.vars['LEADER_MLEID']
+ ROUTER_1 = pv.vars['ROUTER_1']
+ ROUTER_2 = pv.vars['ROUTER_2']
+ ROUTER_2_RLOC16 = pv.vars['ROUTER_2_RLOC16']
+ ROUTER_2_MLEID = pv.vars['ROUTER_2_MLEID']
+ SED = pv.vars['SED']
+ SED_RLOC16 = pv.vars['SED_RLOC16']
+
+ # Step 1: Build the topology as described
+ pv.verify_attached('ROUTER_1', 'LEADER')
+ pv.verify_attached('ROUTER_2', 'ROUTER_1')
+ pv.verify_attached('SED', 'ROUTER_2', 'MTD')
+
+ # Step 2: Leader sends a Fragmented ICMPv6 Echo Request to
+ # DUT's ML-EID
+ # The DUT MUST respond with an ICMPv6 Echo Reply
+
+ _pkt = pkts.filter_ping_request().\
+ filter_ipv6_src_dst(LEADER_MLEID, ROUTER_2_MLEID).\
+ filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
+ must_next()
+ pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
+ filter_ipv6_src_dst(ROUTER_2_MLEID, LEADER_MLEID).\
+ filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
+ must_next()
+
+ # Step 3: Leader sends a Unfragmented ICMPv6 Echo Request to
+ # DUT’s ML-EID
+ # The DUT MUST respond with an ICMPv6 Echo Reply
+
+ _pkt = pkts.filter_ping_request().\
+ filter_ipv6_src_dst(LEADER_MLEID, ROUTER_2_MLEID).\
+ must_next()
+ pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
+ filter_ipv6_src_dst(ROUTER_2_MLEID, LEADER_MLEID).\
+ must_next()
+
+ # Step 4: Leader sends a Fragmented ICMPv6 Echo Request to the
+ # Realm-Local All Nodes multicast address (FF03::1)
+ # The DUT MUST respond with an ICMPv6 Echo Reply
+ # The DUT MUST NOT forward the ICMPv6 Echo Request to SED
+
+ _pkt1 = pkts.filter_ping_request().\
+ filter_wpan_src64(LEADER).\
+ filter_ipv6_dst(REALM_LOCAL_ALL_NODES_ADDRESS).\
+ filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
+ must_next()
+ with pkts.save_index():
+ pkts.filter_ping_reply(identifier=_pkt1.icmpv6.echo.identifier).\
+ filter_ipv6_src_dst(ROUTER_2_MLEID, LEADER_MLEID).\
+ filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
+ must_next()
+ pkts.filter_ping_request(identifier=_pkt1.icmpv6.echo.identifier).\
+ filter_wpan_src16_dst16(ROUTER_2_RLOC16, SED_RLOC16).\
+ filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
+ must_not_next()
+
+ # Step 5: Leader sends an Unfragmented ICMPv6 Echo Request to the
+ # Realm-Local All Nodes multicast address (FF03::1)
+ # The DUT MUST respond with an ICMPv6 Echo Reply
+ # The DUT MUST NOT forward the ICMPv6 Echo Request to SED
+
+ _pkt2 = pkts.filter_ping_request().\
+ filter_wpan_src64(LEADER).\
+ filter_ipv6_dst(REALM_LOCAL_ALL_NODES_ADDRESS).\
+ filter(lambda p: p.icmpv6.echo.sequence_number !=
+ _pkt1.icmpv6.echo.sequence_number
+ ).\
+ must_next()
+ with pkts.save_index():
+ pkts.filter_ping_reply(identifier=_pkt2.icmpv6.echo.identifier).\
+ filter_ipv6_src_dst(ROUTER_2_MLEID, LEADER_MLEID).\
+ must_next()
+ pkts.filter_ping_request(identifier = _pkt2.icmpv6.echo.identifier).\
+ filter_wpan_src16_dst16(ROUTER_2_RLOC16, SED_RLOC16).\
+ must_not_next()
+
+ # Step 6: Leader sends a Fragmented ICMPv6 Echo Request to the
+ # Realm-Local All Routers multicast address (FF03::2)
+ # The DUT MUST respond with an ICMPv6 Echo Reply
+ # The DUT MUST NOT forward the ICMPv6 Echo Request to SED
+
+ _pkt1 = pkts.filter_ping_request().\
+ filter_wpan_src64(LEADER).\
+ filter_ipv6_dst(REALM_LOCAL_ALL_ROUTERS_ADDRESS).\
+ filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
+ must_next()
+ with pkts.save_index():
+ pkts.filter_ping_reply(identifier=_pkt1.icmpv6.echo.identifier).\
+ filter_ipv6_src_dst(ROUTER_2_MLEID, LEADER_MLEID).\
+ filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
+ must_next()
+ pkts.filter_ping_request(identifier=_pkt1.icmpv6.echo.identifier).\
+ filter_wpan_src16_dst16(ROUTER_2_RLOC16, SED_RLOC16).\
+ filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
+ must_not_next()
+
+ # Step 7: Leader sends an Unfragmented ICMPv6 Echo Request to the
+ # Realm-Local All Routers multicast address (FF03::2)
+ # The DUT MUST respond with an ICMPv6 Echo Reply
+ # The DUT MUST NOT forward the ICMPv6 Echo Request to SED
+
+ _pkt2 = pkts.filter_ping_request().\
+ filter_wpan_src64(LEADER).\
+ filter_ipv6_dst(REALM_LOCAL_ALL_ROUTERS_ADDRESS).\
+ filter(lambda p: p.icmpv6.echo.sequence_number !=
+ _pkt1.icmpv6.echo.sequence_number
+ ).\
+ must_next()
+ with pkts.save_index():
+ pkts.filter_ping_reply(identifier=_pkt2.icmpv6.echo.identifier).\
+ filter_ipv6_src_dst(ROUTER_2_MLEID, LEADER_MLEID).\
+ must_next()
+ pkts.filter_ping_request(identifier=_pkt2.icmpv6.echo.identifier).\
+ filter_wpan_src16_dst16(ROUTER_2_RLOC16, SED_RLOC16).\
+ must_not_next()
+
+ # Step 8: Leader sends a Fragmented ICMPv6 Echo Request to the
+ # Realm-Local All Thread Nodes multicast address
+ # The DUT MUST respond with an ICMPv6 Echo Reply
+ # The Realm-Local All Thread Nodes multicast address
+ # MUST be a realm-local Unicast Prefix-Based Multicast
+ # Address [RFC 3306], with:
+ # - flgs set to 3 (P = 1 and T = 1)
+ # - scop set to 3
+ # - plen set to the Mesh Local Prefix length
+ # - network prefix set to the Mesh Local Prefix
+ # - group ID set to 1
+ # The DUT MUST use IEEE 802.15.4 indirect transmissions
+ # to forward packet to SED
+
+ _pkt = pkts.filter_ping_request().\
+ filter_wpan_src64(LEADER).\
+ filter_ipv6_dst(REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS).\
+ filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
+ must_next()
+ with pkts.save_index():
+ pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
+ filter_ipv6_src_dst(ROUTER_2_MLEID, LEADER_MLEID).\
+ filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
+ must_next()
+ pkts.filter_ping_request(identifier = _pkt.icmpv6.echo.identifier).\
+ filter_wpan_src16_dst16(ROUTER_2_RLOC16, SED_RLOC16).\
+ filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
+ must_next()
+ pkts.filter_ping_reply(identifier=_pkt.icmpv6.echo.identifier).\
+ filter_wpan_src64(SED).\
+ filter_ipv6_dst(LEADER_MLEID).\
+ filter(lambda p: p.icmpv6.data.len == FRAGMENTED_DATA_LEN).\
+ must_next()
if __name__ == '__main__':
diff --git a/tests/scripts/thread-cert/pktverify/consts.py b/tests/scripts/thread-cert/pktverify/consts.py
index 35a91c6..6752316 100644
--- a/tests/scripts/thread-cert/pktverify/consts.py
+++ b/tests/scripts/thread-cert/pktverify/consts.py
@@ -35,6 +35,7 @@
LINK_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS = Ipv6Addr('ff32:40:fd00:db8::1')
REALM_LOCAL_All_THREAD_NODES_MULTICAST_ADDRESS = Ipv6Addr('ff33:40:fd00:db8::1')
+REALM_LOCAL_ALL_NODES_ADDRESS = Ipv6Addr('ff03::1')
REALM_LOCAL_ALL_ROUTERS_ADDRESS = Ipv6Addr('ff03::2')
LINK_LOCAL_ALL_NODES_MULTICAST_ADDRESS = Ipv6Addr('ff02::1')
LINK_LOCAL_ALL_ROUTERS_MULTICAST_ADDRESS = Ipv6Addr('ff02::2')
diff --git a/tests/scripts/thread-cert/pktverify/packet_verifier.py b/tests/scripts/thread-cert/pktverify/packet_verifier.py
index 5dd8fcd..1066c4b 100644
--- a/tests/scripts/thread-cert/pktverify/packet_verifier.py
+++ b/tests/scripts/thread-cert/pktverify/packet_verifier.py
@@ -161,29 +161,41 @@
logging.info("add extra var: %s = %s", k, v)
self._vars[k] = v
- def verify_attached(self, name: str, pkts=None) -> VerifyResult:
+ def verify_attached(self, child: str, parent: str = None, child_type: str = 'FTD', pkts=None) -> VerifyResult:
"""
Verify that the device attaches to the Thread network.
- :param name: The device name.
+ :param child: The child device name.
+ :param parent: The parent device name.
+ :param child_type: The child device type (FTD, MTD).
"""
result = VerifyResult()
- assert self.is_thread_device(name), name
+ assert self.is_thread_device(child), child
+ assert child_type in ('FTD', 'MTD'), child_type
pkts = pkts or self.pkts
- extaddr = self.vars[name]
+ child_extaddr = self.vars[child]
- src_pkts = pkts.filter_wpan_src64(extaddr)
+ src_pkts = pkts.filter_wpan_src64(child_extaddr)
+ if parent:
+ assert self.is_thread_device(parent), parent
+ src_pkts = pkts.filter_wpan_src64(child_extaddr).\
+ filter_wpan_dst64(self.vars[parent])
src_pkts.filter_mle_cmd(MLE_CHILD_ID_REQUEST).must_next() # Child Id Request
result.record_last('child_id_request', pkts)
- dst_pkts = pkts.filter_wpan_dst64(extaddr)
+ dst_pkts = pkts.filter_wpan_dst64(child_extaddr)
+ if parent:
+ dst_pkts = pkts.filter_wpan_src64(self.vars[parent]).\
+ filter_wpan_dst64(child_extaddr)
dst_pkts.filter_mle_cmd(MLE_CHILD_ID_RESPONSE).must_next() # Child Id Response
result.record_last('child_id_response', pkts)
with pkts.save_index():
- src_pkts.filter_mle_cmd(MLE_ADVERTISEMENT).must_next() # MLE Advertisement
- result.record_last('mle_advertisement', pkts)
- logging.info(f"verify attached: d={name}, result={result}")
+ if child_type == 'FTD':
+ src_pkts = pkts.filter_wpan_src64(child_extaddr)
+ src_pkts.filter_mle_cmd(MLE_ADVERTISEMENT).must_next() # MLE Advertisement
+ result.record_last('mle_advertisement', pkts)
+ logging.info(f"verify attached: d={child}, result={result}")
return result