[tests] add traffic analysis for Cert_5_3_08 as TestPlan (#2484)
This commit adds the method set_lowpan_context to set the lowpan context of sniffer.
diff --git a/tests/scripts/thread-cert/Cert_5_3_08_ChildAddressSet.py b/tests/scripts/thread-cert/Cert_5_3_08_ChildAddressSet.py
index 8011f1a..1b36db5 100755
--- a/tests/scripts/thread-cert/Cert_5_3_08_ChildAddressSet.py
+++ b/tests/scripts/thread-cert/Cert_5_3_08_ChildAddressSet.py
@@ -30,81 +30,158 @@
import time
import unittest
+import config
+import command
+import ipv6
+import mle
import node
-LEADER = 1
-ED1 = 2
-ED2 = 3
-ED3 = 4
-ED4 = 5
+DUT_LEADER = 1
+BR = 2
+MED1 = 3
+MED2 = 4
-MTDS = [ED1, ED2, ED3, ED4]
+MTDS = [MED1, MED2]
class Cert_5_3_8_ChildAddressSet(unittest.TestCase):
def setUp(self):
+
self.nodes = {}
- for i in range(1,6):
+ for i in range(1, 5):
self.nodes[i] = node.Node(i, (i in MTDS))
- self.nodes[LEADER].set_panid(0xface)
- self.nodes[LEADER].set_mode('rsdn')
- self.nodes[LEADER].add_whitelist(self.nodes[ED1].get_addr64())
- self.nodes[LEADER].add_whitelist(self.nodes[ED2].get_addr64())
- self.nodes[LEADER].add_whitelist(self.nodes[ED3].get_addr64())
- self.nodes[LEADER].add_whitelist(self.nodes[ED4].get_addr64())
- self.nodes[LEADER].enable_whitelist()
+ self.nodes[DUT_LEADER].set_panid(0xface)
+ self.nodes[DUT_LEADER].set_mode('rsdn')
+ self.nodes[DUT_LEADER].add_whitelist(self.nodes[BR].get_addr64())
+ self.nodes[DUT_LEADER].add_whitelist(self.nodes[MED1].get_addr64())
+ self.nodes[DUT_LEADER].add_whitelist(self.nodes[MED2].get_addr64())
+ self.nodes[DUT_LEADER].enable_whitelist()
- self.nodes[ED1].set_panid(0xface)
- self.nodes[ED1].set_mode('rsn')
- self.nodes[ED1].add_whitelist(self.nodes[LEADER].get_addr64())
- self.nodes[ED1].enable_whitelist()
+ self.nodes[BR].set_panid(0xface)
+ self.nodes[BR].set_mode('rsdn')
+ self.nodes[BR].add_whitelist(self.nodes[DUT_LEADER].get_addr64())
+ self.nodes[BR].enable_whitelist()
+ self.nodes[BR].set_router_selection_jitter(1)
- self.nodes[ED2].set_panid(0xface)
- self.nodes[ED2].set_mode('rsn')
- self.nodes[ED2].add_whitelist(self.nodes[LEADER].get_addr64())
- self.nodes[ED2].enable_whitelist()
+ for i in MTDS:
+ self.nodes[i].set_panid(0xface)
+ self.nodes[i].set_mode('rsn')
+ self.nodes[i].add_whitelist(self.nodes[DUT_LEADER].get_addr64())
+ self.nodes[i].enable_whitelist()
- self.nodes[ED3].set_panid(0xface)
- self.nodes[ED3].set_mode('rsn')
- self.nodes[ED3].add_whitelist(self.nodes[LEADER].get_addr64())
- self.nodes[ED3].enable_whitelist()
-
- self.nodes[ED4].set_panid(0xface)
- self.nodes[ED4].set_mode('rsn')
- self.nodes[ED4].add_whitelist(self.nodes[LEADER].get_addr64())
- self.nodes[ED4].enable_whitelist()
+ self.sniffer = config.create_default_thread_sniffer()
+ self.sniffer.start()
def tearDown(self):
+ self.sniffer.stop()
+ del self.sniffer
+
for node in list(self.nodes.values()):
node.stop()
del self.nodes
def test(self):
- self.nodes[LEADER].start()
- self.nodes[LEADER].set_state('leader')
- self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
+ self.nodes[DUT_LEADER].start()
+ self.nodes[DUT_LEADER].set_state('leader')
+ self.assertEqual(self.nodes[DUT_LEADER].get_state(), 'leader')
- self.nodes[ED1].start()
+ self.nodes[BR].start()
time.sleep(5)
- self.assertEqual(self.nodes[ED1].get_state(), 'child')
+ self.assertEqual(self.nodes[BR].get_state(), 'router')
- self.nodes[ED2].start()
- time.sleep(5)
- self.assertEqual(self.nodes[ED2].get_state(), 'child')
+ # 1 BR: Configure BR to be a DHCPv6 server
+ self.nodes[BR].add_prefix('2001::/64', 'pdros')
+ self.nodes[BR].add_prefix('2002::/64', 'pdros')
+ self.nodes[BR].add_prefix('2003::/64', 'pdros')
+ self.nodes[BR].register_netdata()
- self.nodes[ED3].start()
- time.sleep(5)
- self.assertEqual(self.nodes[ED3].get_state(), 'child')
+ # Set lowpan context of sniffer
+ self.sniffer.set_lowpan_context(1, '2001::/64')
+ self.sniffer.set_lowpan_context(2, '2002::/64')
+ self.sniffer.set_lowpan_context(3, '2003::/64')
- self.nodes[ED4].start()
- time.sleep(5)
- self.assertEqual(self.nodes[ED4].get_state(), 'child')
+ # 2 DUT_LEADER: Verify LEADER sent an Advertisement
+ leader_messages = self.sniffer.get_messages_sent_by(DUT_LEADER)
+ msg = leader_messages.next_mle_message(mle.CommandType.ADVERTISEMENT)
+ command.check_mle_advertisement(msg)
- for i in range(2,6):
- addrs = self.nodes[i].get_addrs()
- for addr in addrs:
- if addr[0:4] != 'fe80':
- self.assertTrue(self.nodes[LEADER].ping(addr))
+ # 3 MED1, MED2: MED1 and MED2 attach to DUT_LEADER
+ for i in MTDS:
+ self.nodes[i].start()
+ time.sleep(5)
+ self.assertEqual(self.nodes[i].get_state(), 'child')
+
+ # 4 MED1: MED1 send an ICMPv6 Echo Request to the MED2 ML-EID
+ med2_ml_eid = self.nodes[MED2].get_ip6_address(config.ADDRESS_TYPE.ML_EID)
+ self.assertTrue(med2_ml_eid != None)
+ self.assertTrue(self.nodes[MED1].ping(med2_ml_eid))
+
+ # Verify DUT_LEADER didn't generate an Address Query Request
+ leader_messages = self.sniffer.get_messages_sent_by(DUT_LEADER)
+ msg = leader_messages.next_coap_message('0.02', '/a/aq', False)
+ assert msg is None, "Error: The DUT_LEADER sent an unexpected Address Query Request"
+
+ # Wait for sniffer got packets
+ time.sleep(1)
+
+ # Verify MED2 sent an ICMPv6 Echo Reply
+ med2_messages = self.sniffer.get_messages_sent_by(MED2)
+ msg = med2_messages.get_icmp_message(ipv6.ICMP_ECHO_RESPONSE)
+ assert msg is not None, "Error: The MED2 didn't send ICMPv6 Echo Reply to MED1"
+
+ # 5 MED1: MED1 send an ICMPv6 Echo Request to the MED2 2001::GUA
+ addr = self.nodes[MED2].get_addr("2001::/64")
+ self.assertTrue(addr is not None)
+ self.assertTrue(self.nodes[MED1].ping(addr))
+
+ # Verify DUT_LEADER didn't generate an Address Query Request
+ leader_messages = self.sniffer.get_messages_sent_by(DUT_LEADER)
+ msg = leader_messages.next_coap_message('0.02', '/a/aq', False)
+ assert msg is None, "Error: The DUT_LEADER sent an unexpected Address Query Request"
+
+ # Wait for sniffer got packets
+ time.sleep(1)
+
+ # Verify MED2 sent an ICMPv6 Echo Reply
+ med2_messages = self.sniffer.get_messages_sent_by(MED2)
+ msg = med2_messages.get_icmp_message(ipv6.ICMP_ECHO_RESPONSE)
+ assert msg is not None, "Error: The MED2 didn't send ICMPv6 Echo Reply to MED1"
+
+ # 6 MED1: MED1 send an ICMPv6 Echo Request to the MED2 2002::GUA
+ addr = self.nodes[MED2].get_addr("2002::/64")
+ self.assertTrue(addr is not None)
+ self.assertTrue(self.nodes[MED1].ping(addr))
+
+ # Verify DUT_LEADER didn't generate an Address Query Request
+ leader_messages = self.sniffer.get_messages_sent_by(DUT_LEADER)
+ msg = leader_messages.next_coap_message('0.02', '/a/aq', False)
+ assert msg is None, "Error: The DUT_LEADER sent an unexpected Address Query Request"
+
+ # Wait for sniffer got packets
+ time.sleep(1)
+
+ # Verify MED2 sent an ICMPv6 Echo Reply
+ med2_messages = self.sniffer.get_messages_sent_by(MED2)
+ msg = med2_messages.get_icmp_message(ipv6.ICMP_ECHO_RESPONSE)
+ assert msg is not None, "Error: The MED2 didn't send ICMPv6 Echo Reply to MED1"
+
+ # 7 MED1: MED1 send an ICMPv6 Echo Request to the MED2 2003::GUA
+ addr = self.nodes[MED2].get_addr("2003::/64")
+ self.assertTrue(addr is not None)
+ self.assertTrue(self.nodes[MED1].ping(addr))
+
+ # Verify DUT_LEADER didn't generate an Address Query Request
+ leader_messages = self.sniffer.get_messages_sent_by(DUT_LEADER)
+ msg = leader_messages.next_coap_message('0.02', '/a/aq', False)
+ assert msg is None, "Error: The DUT_LEADER sent an unexpected Address Query Request"
+
+ # Wait for sniffer got packets
+ time.sleep(1)
+
+ # Verify MED2 sent an ICMPv6 Echo Reply
+ med2_messages = self.sniffer.get_messages_sent_by(MED2)
+ msg = med2_messages.get_icmp_message(ipv6.ICMP_ECHO_RESPONSE)
+ assert msg is not None, "Error: The MED2 didn't send ICMPv6 Echo Reply to MED1"
if __name__ == '__main__':
unittest.main()
diff --git a/tests/scripts/thread-cert/Makefile.am b/tests/scripts/thread-cert/Makefile.am
index c2c3f0b..6cbe91f 100644
--- a/tests/scripts/thread-cert/Makefile.am
+++ b/tests/scripts/thread-cert/Makefile.am
@@ -265,6 +265,7 @@
XFAIL_NCP_TESTS = \
test_service.py \
Cert_5_2_05_AddressQuery.py \
+ Cert_5_3_08_ChildAddressSet.py \
Cert_5_3_09_AddressQuery.py \
Cert_8_1_01_Commissioning.py \
Cert_8_1_02_Commissioning.py \
diff --git a/tests/scripts/thread-cert/config.py b/tests/scripts/thread-cert/config.py
index 356c201..eadf5c1 100644
--- a/tests/scripts/thread-cert/config.py
+++ b/tests/scripts/thread-cert/config.py
@@ -195,7 +195,7 @@
network_layer.TlvType.ROUTER_MASK: network_layer.RouterMaskFactory(),
network_layer.TlvType.ND_OPTION: network_layer.NdOptionFactory(),
network_layer.TlvType.ND_DATA: network_layer.NdDataFactory(),
- network_layer.TlvType.THREAD_NETWORK_DATA: network_layer.ThreadNetworkDataFactory(create_default_network_data_tlvs_factory),
+ network_layer.TlvType.THREAD_NETWORK_DATA: network_layer.ThreadNetworkDataFactory(create_default_network_data_tlvs_factory()),
# Routing information are distributed in a Thread network by MLE Routing TLV
# which is in fact MLE Route64 TLV. Thread specificaton v1.1. - Chapter 5.20
@@ -216,7 +216,8 @@
"/a/aq": network_layer_tlvs_factory,
"/a/ar": network_layer_tlvs_factory,
"/a/ae": network_layer_tlvs_factory,
- "/a/an": network_layer_tlvs_factory
+ "/a/an": network_layer_tlvs_factory,
+ "/a/sd": network_layer_tlvs_factory
}
diff --git a/tests/scripts/thread-cert/lowpan.py b/tests/scripts/thread-cert/lowpan.py
index 7a88307..945b44c 100644
--- a/tests/scripts/thread-cert/lowpan.py
+++ b/tests/scripts/thread-cert/lowpan.py
@@ -731,6 +731,9 @@
elif iphc.m == self.IPHC_M_YES:
return self._decompress_multicast_dst_addr(iphc, dci, data)
+ def set_lowpan_context(self, cid, prefix):
+ self._context_manager[cid] = Context(prefix)
+
def parse(self, data, message_info):
iphc = LowpanIPHC.from_bytes(bytearray(data.read(2)))
@@ -776,6 +779,9 @@
def _is_next_header_compressed(self, header):
return (header.next_header is None)
+ def set_lowpan_context(self, cid, prefix):
+ self._lowpan_ip_header_factory.set_lowpan_context(cid, prefix)
+
def decompress(self, data, message_info):
ipv6_header = self._lowpan_ip_header_factory.parse(data, message_info)
@@ -1076,6 +1082,9 @@
return self._ipv6_packet_factory.parse(io.BytesIO(decompressed_data), message_info)
+ def set_lowpan_context(self, cid, prefix):
+ self._lowpan_decompressor.set_lowpan_context(cid, prefix)
+
def parse(self, data, message_info):
while data.tell() < len(data.getvalue()):
diff --git a/tests/scripts/thread-cert/message.py b/tests/scripts/thread-cert/message.py
index 5e80e67..c1604e6 100644
--- a/tests/scripts/thread-cert/message.py
+++ b/tests/scripts/thread-cert/message.py
@@ -443,6 +443,9 @@
mac_frame.parse(data)
return mac_frame
+ def set_lowpan_context(self, cid, prefix):
+ self._lowpan_parser.set_lowpan_context(cid, prefix)
+
def create(self, data):
message = Message()
message.channel = struct.unpack(">B", data.read(1))
diff --git a/tests/scripts/thread-cert/node.py b/tests/scripts/thread-cert/node.py
index 4e1be86..cd5caa9 100755
--- a/tests/scripts/thread-cert/node.py
+++ b/tests/scripts/thread-cert/node.py
@@ -194,6 +194,9 @@
def get_addrs(self):
return self.interface.get_addrs()
+ def get_addr(self, prefix):
+ return self.interface.get_addr(prefix)
+
def add_service(self, enterpriseNumber, serviceData, serverData):
self.interface.add_service(enterpriseNumber, serviceData, serverData)
diff --git a/tests/scripts/thread-cert/node_cli.py b/tests/scripts/thread-cert/node_cli.py
index da135fb..dfa61ed 100644
--- a/tests/scripts/thread-cert/node_cli.py
+++ b/tests/scripts/thread-cert/node_cli.py
@@ -32,6 +32,7 @@
import time
import pexpect
import re
+import ipaddress
import config
@@ -388,6 +389,17 @@
return addrs
+ def get_addr(self, prefix):
+ network = ipaddress.ip_network(unicode(prefix))
+ addrs = self.get_addrs()
+
+ for addr in addrs:
+ ipv6_address = ipaddress.ip_address(addr.decode("utf-8"))
+ if ipv6_address in network:
+ return ipv6_address.exploded.encode('utf-8')
+
+ return None
+
def add_service(self, enterpriseNumber, serviceData, serverData):
cmd = 'service add ' + enterpriseNumber + ' ' + serviceData+ ' ' + serverData
self.send_command(cmd)
diff --git a/tests/scripts/thread-cert/sniffer.py b/tests/scripts/thread-cert/sniffer.py
index 1e695a8..6249166 100644
--- a/tests/scripts/thread-cert/sniffer.py
+++ b/tests/scripts/thread-cert/sniffer.py
@@ -116,6 +116,9 @@
self._thread.join()
self._thread = None
+ def set_lowpan_context(self, cid, prefix):
+ self._message_factory.set_lowpan_context(cid, prefix)
+
def get_messages_sent_by(self, nodeid):
""" Get sniffed messages.