1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3 4import time 5 6from lib.py import ksft_run, ksft_exit, ksft_eq, ksft_true 7from lib.py import ip 8from lib.py import NetNS, NetNSEnter 9from lib.py import RtnlFamily 10 11 12LINK_NETNSID = 100 13LINK_NETNSID2 = 200 14 15 16def test_event() -> None: 17 with NetNS() as ns1, NetNS() as ns2: 18 with NetNSEnter(str(ns2)): 19 rtnl = RtnlFamily() 20 21 rtnl.ntf_subscribe("rtnlgrp-link") 22 23 ip(f"netns set {ns2} {LINK_NETNSID}", ns=str(ns1)) 24 ip(f"link add netns {ns1} link-netnsid {LINK_NETNSID} dummy1 type dummy") 25 ip(f"link add netns {ns1} dummy2 type dummy", ns=str(ns2)) 26 27 ip("link del dummy1", ns=str(ns1)) 28 ip("link del dummy2", ns=str(ns1)) 29 30 time.sleep(1) 31 rtnl.check_ntf() 32 ksft_true(rtnl.async_msg_queue.empty(), 33 "Received unexpected link notification") 34 35 36def test_event_all_nsid() -> None: 37 """NETLINK_LISTEN_ALL_NSID notifications: local events must not 38 carry nsid even with a self-referential mapping. Remote events 39 must carry the correct nsid.""" 40 41 with NetNS() as ns1, NetNS() as ns2: 42 net1, net2 = str(ns1), str(ns2) 43 44 with NetNSEnter(net1): 45 rtnl = RtnlFamily() 46 rtnl.ntf_listen_all_nsid() 47 rtnl.ntf_subscribe("rtnlgrp-link") 48 49 # Case 1: no nsid assigned, local event, no nsid expected. 50 ip("link add dummy-lo type dummy", ns=net1) 51 52 # Case 2: self-referential nsid, local event, still no nsid. 53 ip(f"netns set {net1} {LINK_NETNSID}", ns=net1) 54 ip("link add dummy-sr type dummy", ns=net1) 55 56 # Case 3: remote event, nsid present. 57 ip(f"netns set {net2} {LINK_NETNSID2}", ns=net1) 58 ip("link add dummy-re type dummy", ns=net2) 59 60 # Collect the three newlink events, ignoring unrelated noise. 61 events = {} 62 for msg in rtnl.poll_ntf(duration=1): 63 if msg['name'] == 'getlink': 64 ifname = msg['msg'].get('ifname') 65 if ifname in ('dummy-lo', 'dummy-sr', 'dummy-re'): 66 events[ifname] = msg 67 if len(events) == 3: 68 break 69 70 ksft_true('dummy-lo' in events, "missing local event") 71 ksft_true(events['dummy-lo'].get('nsid') is None, 72 "local event without nsid should not carry nsid") 73 74 ksft_true('dummy-sr' in events, "missing self-ref event") 75 ksft_true(events['dummy-sr'].get('nsid') is None, 76 "local event with self-ref nsid should not carry nsid") 77 78 ksft_true('dummy-re' in events, "missing remote event") 79 ksft_eq(events['dummy-re'].get('nsid'), LINK_NETNSID2, 80 "remote event should carry nsid") 81 82 ip("link del dummy-lo", ns=net1) 83 ip("link del dummy-sr", ns=net1) 84 ip("link del dummy-re", ns=net2) 85 86 87def validate_link_netns(netns, ifname, link_netnsid) -> bool: 88 link_info = ip(f"-d link show dev {ifname}", ns=netns, json=True) 89 if not link_info: 90 return False 91 return link_info[0].get("link_netnsid") == link_netnsid 92 93 94def test_link_net() -> None: 95 configs = [ 96 # type, common args, type args, fallback to dev_net 97 ("ipvlan", "link dummy1", "", False), 98 ("macsec", "link dummy1", "", False), 99 ("macvlan", "link dummy1", "", False), 100 ("macvtap", "link dummy1", "", False), 101 ("vlan", "link dummy1", "id 100", False), 102 ("gre", "", "local 192.0.2.1", True), 103 ("vti", "", "local 192.0.2.1", True), 104 ("ipip", "", "local 192.0.2.1", True), 105 ("ip6gre", "", "local 2001:db8::1", True), 106 ("ip6tnl", "", "local 2001:db8::1", True), 107 ("vti6", "", "local 2001:db8::1", True), 108 ("sit", "", "local 192.0.2.1", True), 109 ("xfrm", "", "if_id 1", True), 110 ] 111 112 with NetNS() as ns1, NetNS() as ns2, NetNS() as ns3: 113 net1, net2, net3 = str(ns1), str(ns2), str(ns3) 114 115 # prepare link netnsid and a dummy link needed by certain drivers 116 ip(f"netns set {net3} {LINK_NETNSID}", ns=str(net2)) 117 ip("link add dummy1 type dummy", ns=net3) 118 119 cases = [ 120 # source, "netns", "link-netns", expected link-netns 121 (net3, None, None, None, None), 122 (net3, net2, None, None, LINK_NETNSID), 123 (net2, None, net3, LINK_NETNSID, LINK_NETNSID), 124 (net1, net2, net3, LINK_NETNSID, LINK_NETNSID), 125 ] 126 127 for src_net, netns, link_netns, exp1, exp2 in cases: 128 tgt_net = netns or src_net 129 for typ, cargs, targs, fb_dev_net in configs: 130 cmd = "link add" 131 if netns: 132 cmd += f" netns {netns}" 133 if link_netns: 134 cmd += f" link-netns {link_netns}" 135 cmd += f" {cargs} foo type {typ} {targs}" 136 ip(cmd, ns=src_net) 137 if fb_dev_net: 138 ksft_true(validate_link_netns(tgt_net, "foo", exp1), 139 f"{typ} link_netns validation failed") 140 else: 141 ksft_true(validate_link_netns(tgt_net, "foo", exp2), 142 f"{typ} link_netns validation failed") 143 ip(f"link del foo", ns=tgt_net) 144 145 146def test_peer_net() -> None: 147 types = [ 148 "vxcan", 149 "netkit", 150 "veth", 151 ] 152 153 with NetNS() as ns1, NetNS() as ns2, NetNS() as ns3, NetNS() as ns4: 154 net1, net2, net3, net4 = str(ns1), str(ns2), str(ns3), str(ns4) 155 156 ip(f"netns set {net3} {LINK_NETNSID}", ns=str(net2)) 157 158 cases = [ 159 # source, "netns", "link-netns", "peer netns", expected 160 (net1, None, None, None, None), 161 (net1, net2, None, None, None), 162 (net2, None, net3, None, LINK_NETNSID), 163 (net1, net2, net3, None, None), 164 (net2, None, None, net3, LINK_NETNSID), 165 (net1, net2, None, net3, LINK_NETNSID), 166 (net2, None, net2, net3, LINK_NETNSID), 167 (net1, net2, net4, net3, LINK_NETNSID), 168 ] 169 170 for src_net, netns, link_netns, peer_netns, exp in cases: 171 tgt_net = netns or src_net 172 for typ in types: 173 cmd = "link add" 174 if netns: 175 cmd += f" netns {netns}" 176 if link_netns: 177 cmd += f" link-netns {link_netns}" 178 cmd += f" foo type {typ}" 179 if peer_netns: 180 cmd += f" peer netns {peer_netns}" 181 ip(cmd, ns=src_net) 182 ksft_true(validate_link_netns(tgt_net, "foo", exp), 183 f"{typ} peer_netns validation failed") 184 ip(f"link del foo", ns=tgt_net) 185 186 187def main() -> None: 188 ksft_run([ 189 test_event, 190 test_event_all_nsid, 191 test_link_net, 192 test_peer_net, 193 ]) 194 ksft_exit() 195 196 197if __name__ == "__main__": 198 main() 199