Add reconfiguration while probing case
Add test case to ensure that reconfiguration behavior in all kernel
versions meets the following requirements:
1. Set IP neigh ucast_solicit as 10.
2. Trigger probe starts and tries 4 times
3. Change ucast_solicat as 3.
Expected reuslt:
Probe stop immediatly and the neighbour state changes to NUD_FAILED.
Bug: 130871097
Test: kernel_net_tests
Change-Id: I9c48b59f1289554d4fd2f436b92c4607323f47cb
diff --git a/net/test/neighbour_test.py b/net/test/neighbour_test.py
index caf2e6e..2cb5c23 100755
--- a/net/test/neighbour_test.py
+++ b/net/test/neighbour_test.py
@@ -55,15 +55,24 @@
BASE_REACHABLE_TIME_MS = 2 * DELAY_TIME_MS
MAX_REACHABLE_TIME_MS = 1.5 * BASE_REACHABLE_TIME_MS
+ # Kernel default unicast solicit is 3, but it need be changed larger
+ # when test recofiguration during probing
+ UCAST_SOLICIT_DEFAULT = 3
+ UCAST_SOLICIT_LARGE = 10
+
@classmethod
def setUpClass(cls):
super(NeighbourTest, cls).setUpClass()
for netid in cls.tuns:
iface = cls.GetInterfaceName(netid)
# This can't be set in an RA.
- cls.SetSysctl(
- "/proc/sys/net/ipv6/neigh/%s/delay_first_probe_time" % iface,
- cls.DELAY_TIME_MS / 1000)
+ for proto in ["ipv4", "ipv6"]:
+ cls.SetSysctl(
+ "/proc/sys/net/%s/neigh/%s/delay_first_probe_time" % (proto, iface),
+ cls.DELAY_TIME_MS / 1000)
+ cls.SetSysctl(
+ "/proc/sys/net/%s/neigh/%s/retrans_time_ms" % (proto, iface),
+ cls.RETRANS_TIME_MS)
def setUp(self):
super(NeighbourTest, self).setUp()
@@ -87,6 +96,41 @@
self.netid = random.choice(self.tuns.keys())
self.ifindex = self.ifindices[self.netid]
+ # MultinetworkBaseTest always uses NUD_PERMANENT for router ARP entries.
+ # Temporarily change those entries to NUD_STALE so we can test them.
+ if net_test.LINUX_VERSION < (4, 9, 0):
+ # Cannot change state from NUD_PERMANENT to NUD_STALE directly,
+ # so delete it to make it NUD_FAILED then change it to NUD_STALE.
+ router = self._RouterAddress(self.netid, 4)
+ macaddr = self.RouterMacAddress(self.netid)
+ self.iproute.DelNeighbour(4, router, macaddr, self.ifindex)
+ self.ExpectNeighbourNotification(router, NUD_FAILED)
+ self.assertNeighbourState(NUD_FAILED, router)
+ self.ChangeRouterNudState(4, NUD_STALE)
+
+ def SetUnicastSolicit(self, proto, iface, value):
+ self.SetSysctl(
+ "/proc/sys/net/%s/neigh/%s/ucast_solicit" % (proto, iface), value)
+
+ def tearDown(self):
+ super(NeighbourTest, self).tearDown()
+ # It is already reset to default by TearDownClass,
+ # but here we need to set it to default after each testcase.
+ iface = self.GetInterfaceName(self.netid)
+ for proto in ["ipv4", "ipv6"]:
+ self.SetUnicastSolicit(proto, iface, self.UCAST_SOLICIT_DEFAULT)
+
+ # Change router ARP entries back to NUD_PERMANENT,
+ # so as not to affect other tests.
+ self.ChangeRouterNudState(4, NUD_PERMANENT)
+
+ def ChangeRouterNudState(self, version, state):
+ router = self._RouterAddress(self.netid, version)
+ macaddr = self.RouterMacAddress(self.netid)
+ self.iproute.UpdateNeighbour(version, router, macaddr, self.ifindex, state)
+ self.ExpectNeighbourNotification(router, state)
+ self.assertNeighbourState(state, router)
+
def GetNeighbour(self, addr, ifindex):
version = csocket.AddressVersion(addr)
for msg, args in self.iproute.DumpNeighbours(version, ifindex):
@@ -116,8 +160,8 @@
def ExpectProbe(self, is_unicast, addr):
version = csocket.AddressVersion(addr)
+ llsrc = self.MyMacAddress(self.netid)
if version == 6:
- llsrc = self.MyMacAddress(self.netid)
if is_unicast:
src = self.MyLinkLocalAddress(self.netid)
dst = addr
@@ -133,8 +177,15 @@
)
msg = "%s probe" % ("Unicast" if is_unicast else "Multicast")
self.ExpectPacketOn(self.netid, msg, expected)
- else:
- raise NotImplementedError
+ else: # version == 4
+ if is_unicast:
+ src = self._MyIPv4Address(self.netid)
+ dst = addr
+ else:
+ raise NotImplementedError("This test does not support broadcast ARP")
+ expected = scapy.ARP(psrc=src, pdst=dst, hwsrc=llsrc, op=1)
+ msg = "Unicast ARP probe"
+ self.ExpectPacketOn(self.netid, msg, expected)
def ExpectUnicastProbe(self, addr):
self.ExpectProbe(True, addr)
@@ -160,6 +211,14 @@
else:
raise NotImplementedError
+ def SendDnsRequest(self, addr):
+ version = csocket.AddressVersion(addr)
+ routing_mode = random.choice(["mark", "oif", "uid"])
+ s = self.BuildSocket(version, net_test.UDPSocket, self.netid, routing_mode)
+ s.connect((addr, 53))
+ s.send(net_test.UDP_PAYLOAD)
+ return s
+
def MonitorSleepMs(self, interval, addr):
slept = 0
while slept < interval:
@@ -191,14 +250,11 @@
"""
router4 = self._RouterAddress(self.netid, 4)
router6 = self._RouterAddress(self.netid, 6)
- self.assertNeighbourState(NUD_PERMANENT, router4)
+ self.assertNeighbourState(NUD_STALE, router4)
self.assertNeighbourState(NUD_STALE, router6)
# Send a packet and check that we go into DELAY.
- routing_mode = random.choice(["mark", "oif", "uid"])
- s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode)
- s.connect((net_test.IPV6_ADDR, 53))
- s.send(net_test.UDP_PAYLOAD)
+ s = self.SendDnsRequest(net_test.IPV6_ADDR)
self.assertNeighbourState(NUD_DELAY, router6)
# Wait for the probe interval, then check that we're in PROBE, and that the
@@ -249,17 +305,19 @@
router4 = self._RouterAddress(self.netid, 4)
router6 = self._RouterAddress(self.netid, 6)
routermac = self.RouterMacAddress(self.netid)
- self.assertNeighbourState(NUD_PERMANENT, router4)
+ self.assertNeighbourState(NUD_STALE, router4)
self.assertNeighbourState(NUD_STALE, router6)
def ForceProbe(addr, mac):
self.iproute.UpdateNeighbour(6, addr, None, self.ifindex, NUD_PROBE)
self.assertNeighbourState(NUD_PROBE, addr)
+ self.ExpectNeighbourNotification(addr, NUD_PROBE)
self.SleepMs(1) # TODO: Why is this necessary?
self.assertNeighbourState(NUD_PROBE, addr)
self.ExpectUnicastProbe(addr)
self.ReceiveUnicastAdvertisement(addr, mac)
self.assertNeighbourState(NUD_REACHABLE, addr)
+ self.ExpectNeighbourNotification(addr, NUD_REACHABLE)
for _ in xrange(5):
ForceProbe(router6, routermac)
@@ -277,10 +335,7 @@
time.sleep(1)
# Send another packet and expect a multicast NS.
- routing_mode = random.choice(["mark", "oif", "uid"])
- s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode)
- s.connect((net_test.IPV6_ADDR, 53))
- s.send(net_test.UDP_PAYLOAD)
+ self.SendDnsRequest(net_test.IPV6_ADDR)
self.ExpectMulticastNS(router6)
# Receive a unicast NA with the R flag set to 0.
@@ -293,6 +348,44 @@
self.ExpectNeighbourNotification(router6, NUD_REACHABLE)
self.assertNeighbourState(NUD_REACHABLE, router6)
+ def DoReconfigureDuringProbing(self, version):
+ if version == 6:
+ proto = "ipv6"
+ ip_addr = net_test.IPV6_ADDR
+ else:
+ proto = "ipv4"
+ ip_addr = net_test.IPV4_ADDR
+ router = self._RouterAddress(self.netid, version)
+ self.assertNeighbourState(NUD_STALE, router)
+
+ iface = self.GetInterfaceName(self.netid)
+ # set unicast solicit larger.
+ self.SetUnicastSolicit(proto, iface, self.UCAST_SOLICIT_LARGE)
+
+ # Send a packet and check that we go into DELAY.
+ self.SendDnsRequest(ip_addr)
+ self.assertNeighbourState(NUD_DELAY, router)
+
+ # Probing 4 times but no reponse
+ self.SleepMs(self.DELAY_TIME_MS * 1.1)
+ self.ExpectNeighbourNotification(router, NUD_PROBE)
+ self.assertNeighbourState(NUD_PROBE, router)
+ self.ExpectUnicastProbe(router)
+
+ for i in range(0, 3):
+ self.SleepMs(self.RETRANS_TIME_MS)
+ self.ExpectUnicastProbe(router)
+
+ # reconfiguration to 3 while probing and the state change to NUD_FAILED
+ self.SetUnicastSolicit(proto, iface, self.UCAST_SOLICIT_DEFAULT)
+ self.SleepMs(self.RETRANS_TIME_MS)
+ self.ExpectNeighbourNotification(router, NUD_FAILED)
+ self.assertNeighbourState(NUD_FAILED, router)
+
+ # Check neighbor state after re-config ARP probe times.
+ def testReconfigureDuringProbing(self):
+ self.DoReconfigureDuringProbing(4)
+ self.DoReconfigureDuringProbing(6)
if __name__ == "__main__":
unittest.main()