Add a test for repeated neighbour probes.
Bug: 21909622
Change-Id: Ie0581da732f330f1b7b6a1cab15fcc8fdeb4203d
diff --git a/net/test/iproute.py b/net/test/iproute.py
index 7d953d7..9cc8257 100644
--- a/net/test/iproute.py
+++ b/net/test/iproute.py
@@ -33,6 +33,7 @@
# Request constants.
NLM_F_REQUEST = 1
NLM_F_ACK = 4
+NLM_F_REPLACE = 0x100
NLM_F_EXCL = 0x200
NLM_F_CREATE = 0x400
NLM_F_DUMP = 0x300
@@ -407,13 +408,14 @@
def _AddressFamily(self, version):
return {4: socket.AF_INET, 6: socket.AF_INET6}[version]
- def _SendNlRequest(self, command, data):
+ def _SendNlRequest(self, command, data, flags=0):
"""Sends a netlink request and expects an ack."""
- flags = NLM_F_REQUEST
+ flags |= NLM_F_REQUEST
if CommandVerb(command) != "GET":
flags |= NLM_F_ACK
if CommandVerb(command) == "NEW":
- flags |= (NLM_F_EXCL | NLM_F_CREATE)
+ if not flags & NLM_F_REPLACE:
+ flags |= (NLM_F_EXCL | NLM_F_CREATE)
length = len(NLMsgHdr) + len(data)
nlmsg = NLMsgHdr((length, command, flags, self.seq, self.pid)).Pack()
@@ -657,12 +659,12 @@
routes = self._GetMsgList(RTMsg, data, False)
return routes
- def _Neighbour(self, version, is_add, addr, lladdr, dev, state):
+ def _Neighbour(self, version, is_add, addr, lladdr, dev, state, flags=0):
"""Adds or deletes a neighbour cache entry."""
family = self._AddressFamily(version)
# Convert the link-layer address to a raw byte string.
- if is_add:
+ if is_add and lladdr:
lladdr = lladdr.split(":")
if len(lladdr) != 6:
raise ValueError("Invalid lladdr %s" % ":".join(lladdr))
@@ -670,10 +672,10 @@
ndmsg = NdMsg((family, dev, state, 0, RTN_UNICAST)).Pack()
ndmsg += self._NlAttrIPAddress(NDA_DST, family, addr)
- if is_add:
+ if is_add and lladdr:
ndmsg += self._NlAttr(NDA_LLADDR, lladdr)
command = RTM_NEWNEIGH if is_add else RTM_DELNEIGH
- self._SendNlRequest(command, ndmsg)
+ self._SendNlRequest(command, ndmsg, flags)
def AddNeighbour(self, version, addr, lladdr, dev):
self._Neighbour(version, True, addr, lladdr, dev, NUD_PERMANENT)
@@ -681,6 +683,10 @@
def DelNeighbour(self, version, addr, lladdr, dev):
self._Neighbour(version, False, addr, lladdr, dev, 0)
+ def UpdateNeighbour(self, version, addr, lladdr, dev, state):
+ self._Neighbour(version, True, addr, lladdr, dev, state,
+ flags=NLM_F_REPLACE)
+
def DumpNeighbours(self, version):
ndmsg = NdMsg((self._AddressFamily(version), 0, 0, 0, 0))
return self._Dump(RTM_GETNEIGH, ndmsg, NdMsg)
diff --git a/net/test/neighbour_test.py b/net/test/neighbour_test.py
index b51da3d..828a86b 100755
--- a/net/test/neighbour_test.py
+++ b/net/test/neighbour_test.py
@@ -76,6 +76,7 @@
reachabletime=self.REACHABLE_TIME_MS)
self.netid = random.choice(self.tuns.keys())
+ self.ifindex = self.ifindices[self.netid]
def GetNeighbour(self, addr):
version = 6 if ":" in addr else 4
@@ -191,6 +192,25 @@
self.assertNeighbourState(NUD_FAILED, router6)
self.ExpectNeighbourNotification(router6, NUD_FAILED, {"NDA_PROBES": 3})
+ def testRepeatedProbes(self):
+ router4 = self._RouterAddress(self.netid, 4)
+ router6 = self._RouterAddress(self.netid, 6)
+ routermac = self.RouterMacAddress(self.netid)
+ self.assertNeighbourState(NUD_PERMANENT, router4)
+ self.assertNeighbourState(NUD_STALE, router6)
+
+ def ForceProbe(addr, mac):
+ self.iproute.UpdateNeighbour(6, addr, None, self.ifindex, NUD_PROBE)
+ self.assertNeighbourState(NUD_PROBE, addr)
+ self.SleepMs(1) # TODO: Why is this necessary?
+ self.assertNeighbourState(NUD_PROBE, addr)
+ self.ExpectUnicastProbe(addr)
+ self.ReceiveUnicastAdvertisement(addr, mac)
+ self.assertNeighbourState(NUD_REACHABLE, addr)
+
+ for i in xrange(5):
+ ForceProbe(router6, routermac)
+
if __name__ == "__main__":
unittest.main()