xref: /linux/tools/testing/selftests/drivers/net/xdp.py (revision a7ddedc84c59a645ef970b992f7cda5bffc70cc0)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""
5This file contains tests to verify native XDP support in network drivers.
6The tests utilize the BPF program `xdp_native.bpf.o` from the `selftests.net.lib`
7directory, with each test focusing on a specific aspect of XDP functionality.
8"""
9import random
10import string
11from dataclasses import dataclass
12from enum import Enum
13
14from lib.py import ksft_run, ksft_exit, ksft_eq, ksft_ne, ksft_pr
15from lib.py import KsftFailEx, NetDrvEpEnv, EthtoolFamily, NlError
16from lib.py import bkg, cmd, rand_port, wait_port_listen
17from lib.py import ip, bpftool, defer
18
19
20class TestConfig(Enum):
21    """Enum for XDP configuration options."""
22    MODE = 0  # Configures the BPF program for a specific test
23    PORT = 1  # Port configuration to communicate with the remote host
24    ADJST_OFFSET = 2  # Tail/Head adjustment offset for extension/shrinking
25    ADJST_TAG = 3  # Adjustment tag to annotate the start and end of extension
26
27
28class XDPAction(Enum):
29    """Enum for XDP actions."""
30    PASS = 0  # Pass the packet up to the stack
31    DROP = 1  # Drop the packet
32    TX = 2    # Route the packet to the remote host
33    TAIL_ADJST = 3  # Adjust the tail of the packet
34    HEAD_ADJST = 4  # Adjust the head of the packet
35
36
37class XDPStats(Enum):
38    """Enum for XDP statistics."""
39    RX = 0    # Count of valid packets received for testing
40    PASS = 1  # Count of packets passed up to the stack
41    DROP = 2  # Count of packets dropped
42    TX = 3    # Count of incoming packets routed to the remote host
43    ABORT = 4 # Count of packets that were aborted
44
45
46@dataclass
47class BPFProgInfo:
48    """Data class to store information about a BPF program."""
49    name: str               # Name of the BPF program
50    file: str               # BPF program object file
51    xdp_sec: str = "xdp"    # XDP section name (e.g., "xdp" or "xdp.frags")
52    mtu: int = 1500         # Maximum Transmission Unit, default is 1500
53
54
55def _exchg_udp(cfg, port, test_string):
56    """
57    Exchanges UDP packets between a local and remote host using the socat tool.
58
59    Args:
60        cfg: Configuration object containing network settings.
61        port: Port number to use for the UDP communication.
62        test_string: String that the remote host will send.
63
64    Returns:
65        The string received by the test host.
66    """
67    cfg.require_cmd("socat", remote=True)
68
69    rx_udp_cmd = f"socat -{cfg.addr_ipver} -T 2 -u UDP-RECV:{port},reuseport STDOUT"
70    tx_udp_cmd = f"echo -n {test_string} | socat -t 2 -u STDIN UDP:{cfg.baddr}:{port}"
71
72    with bkg(rx_udp_cmd, exit_wait=True) as nc:
73        wait_port_listen(port, proto="udp")
74        cmd(tx_udp_cmd, host=cfg.remote, shell=True)
75
76    return nc.stdout.strip()
77
78
79def _test_udp(cfg, port, size=256):
80    """
81    Tests UDP packet exchange between a local and remote host.
82
83    Args:
84        cfg: Configuration object containing network settings.
85        port: Port number to use for the UDP communication.
86        size: The length of the test string to be exchanged, default is 256 characters.
87
88    Returns:
89        bool: True if the received string matches the sent string, False otherwise.
90    """
91    test_str = "".join(random.choice(string.ascii_lowercase) for _ in range(size))
92    recvd_str = _exchg_udp(cfg, port, test_str)
93
94    return recvd_str == test_str
95
96
97def _load_xdp_prog(cfg, bpf_info):
98    """
99    Loads an XDP program onto a network interface.
100
101    Args:
102        cfg: Configuration object containing network settings.
103        bpf_info: BPFProgInfo object containing information about the BPF program.
104
105    Returns:
106        dict: A dictionary containing the XDP program ID, name, and associated map IDs.
107    """
108    abs_path = cfg.net_lib_dir / bpf_info.file
109    prog_info = {}
110
111    cmd(f"ip link set dev {cfg.remote_ifname} mtu {bpf_info.mtu}", shell=True, host=cfg.remote)
112    defer(ip, f"link set dev {cfg.remote_ifname} mtu 1500", host=cfg.remote)
113
114    cmd(
115    f"ip link set dev {cfg.ifname} mtu {bpf_info.mtu} xdpdrv obj {abs_path} sec {bpf_info.xdp_sec}",
116    shell=True
117    )
118    defer(ip, f"link set dev {cfg.ifname} mtu 1500 xdpdrv off")
119
120    xdp_info = ip(f"-d link show dev {cfg.ifname}", json=True)[0]
121    prog_info["id"] = xdp_info["xdp"]["prog"]["id"]
122    prog_info["name"] = xdp_info["xdp"]["prog"]["name"]
123    prog_id = prog_info["id"]
124
125    map_ids = bpftool(f"prog show id {prog_id}", json=True)["map_ids"]
126    prog_info["maps"] = {}
127    for map_id in map_ids:
128        name = bpftool(f"map show id {map_id}", json=True)["name"]
129        prog_info["maps"][name] = map_id
130
131    return prog_info
132
133
134def format_hex_bytes(value):
135    """
136    Helper function that converts an integer into a formatted hexadecimal byte string.
137
138    Args:
139        value: An integer representing the number to be converted.
140
141    Returns:
142        A string representing hexadecimal equivalent of value, with bytes separated by spaces.
143    """
144    hex_str = value.to_bytes(4, byteorder='little', signed=True)
145    return ' '.join(f'{byte:02x}' for byte in hex_str)
146
147
148def _set_xdp_map(map_name, key, value):
149    """
150    Updates an XDP map with a given key-value pair using bpftool.
151
152    Args:
153        map_name: The name of the XDP map to update.
154        key: The key to update in the map, formatted as a hexadecimal string.
155        value: The value to associate with the key, formatted as a hexadecimal string.
156    """
157    key_formatted = format_hex_bytes(key)
158    value_formatted = format_hex_bytes(value)
159    bpftool(
160        f"map update name {map_name} key hex {key_formatted} value hex {value_formatted}"
161    )
162
163
164def _get_stats(xdp_map_id):
165    """
166    Retrieves and formats statistics from an XDP map.
167
168    Args:
169        xdp_map_id: The ID of the XDP map from which to retrieve statistics.
170
171    Returns:
172        A dictionary containing formatted packet statistics for various XDP actions.
173        The keys are based on the XDPStats Enum values.
174
175    Raises:
176        KsftFailEx: If the stats retrieval fails.
177    """
178    stats_dump = bpftool(f"map dump id {xdp_map_id}", json=True)
179    if not stats_dump:
180        raise KsftFailEx(f"Failed to get stats for map {xdp_map_id}")
181
182    stats_formatted = {}
183    for key in range(0, 5):
184        val = stats_dump[key]["formatted"]["value"]
185        if stats_dump[key]["formatted"]["key"] == XDPStats.RX.value:
186            stats_formatted[XDPStats.RX.value] = val
187        elif stats_dump[key]["formatted"]["key"] == XDPStats.PASS.value:
188            stats_formatted[XDPStats.PASS.value] = val
189        elif stats_dump[key]["formatted"]["key"] == XDPStats.DROP.value:
190            stats_formatted[XDPStats.DROP.value] = val
191        elif stats_dump[key]["formatted"]["key"] == XDPStats.TX.value:
192            stats_formatted[XDPStats.TX.value] = val
193        elif stats_dump[key]["formatted"]["key"] == XDPStats.ABORT.value:
194            stats_formatted[XDPStats.ABORT.value] = val
195
196    return stats_formatted
197
198
199def _test_pass(cfg, bpf_info, msg_sz):
200    """
201    Tests the XDP_PASS action by exchanging UDP packets.
202
203    Args:
204        cfg: Configuration object containing network settings.
205        bpf_info: BPFProgInfo object containing information about the BPF program.
206        msg_sz: Size of the test message to send.
207    """
208
209    prog_info = _load_xdp_prog(cfg, bpf_info)
210    port = rand_port()
211
212    _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.PASS.value)
213    _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
214
215    ksft_eq(_test_udp(cfg, port, msg_sz), True, "UDP packet exchange failed")
216    stats = _get_stats(prog_info["maps"]["map_xdp_stats"])
217
218    ksft_ne(stats[XDPStats.RX.value], 0, "RX stats should not be zero")
219    ksft_eq(stats[XDPStats.RX.value], stats[XDPStats.PASS.value], "RX and PASS stats mismatch")
220
221
222def test_xdp_native_pass_sb(cfg):
223    """
224    Tests the XDP_PASS action for single buffer case.
225
226    Args:
227        cfg: Configuration object containing network settings.
228    """
229    bpf_info = BPFProgInfo("xdp_prog", "xdp_native.bpf.o", "xdp", 1500)
230
231    _test_pass(cfg, bpf_info, 256)
232
233
234def test_xdp_native_pass_mb(cfg):
235    """
236    Tests the XDP_PASS action for a multi-buff size.
237
238    Args:
239        cfg: Configuration object containing network settings.
240    """
241    bpf_info = BPFProgInfo("xdp_prog_frags", "xdp_native.bpf.o", "xdp.frags", 9000)
242
243    _test_pass(cfg, bpf_info, 8000)
244
245
246def _test_drop(cfg, bpf_info, msg_sz):
247    """
248    Tests the XDP_DROP action by exchanging UDP packets.
249
250    Args:
251        cfg: Configuration object containing network settings.
252        bpf_info: BPFProgInfo object containing information about the BPF program.
253        msg_sz: Size of the test message to send.
254    """
255
256    prog_info = _load_xdp_prog(cfg, bpf_info)
257    port = rand_port()
258
259    _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.DROP.value)
260    _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
261
262    ksft_eq(_test_udp(cfg, port, msg_sz), False, "UDP packet exchange should fail")
263    stats = _get_stats(prog_info["maps"]["map_xdp_stats"])
264
265    ksft_ne(stats[XDPStats.RX.value], 0, "RX stats should be zero")
266    ksft_eq(stats[XDPStats.RX.value], stats[XDPStats.DROP.value], "RX and DROP stats mismatch")
267
268
269def test_xdp_native_drop_sb(cfg):
270    """
271    Tests the XDP_DROP action for a signle-buff case.
272
273    Args:
274        cfg: Configuration object containing network settings.
275    """
276    bpf_info = BPFProgInfo("xdp_prog", "xdp_native.bpf.o", "xdp", 1500)
277
278    _test_drop(cfg, bpf_info, 256)
279
280
281def test_xdp_native_drop_mb(cfg):
282    """
283    Tests the XDP_DROP action for a multi-buff case.
284
285    Args:
286        cfg: Configuration object containing network settings.
287    """
288    bpf_info = BPFProgInfo("xdp_prog_frags", "xdp_native.bpf.o", "xdp.frags", 9000)
289
290    _test_drop(cfg, bpf_info, 8000)
291
292
293def _test_xdp_native_tx(cfg, bpf_info, payload_lens):
294    """
295    Tests the XDP_TX action.
296
297    Args:
298        cfg: Configuration object containing network settings.
299        bpf_info: BPFProgInfo object containing the BPF program metadata.
300        payload_lens: Array of packet lengths to send.
301    """
302    cfg.require_cmd("socat", remote=True)
303    prog_info = _load_xdp_prog(cfg, bpf_info)
304    port = rand_port()
305
306    _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.TX.value)
307    _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
308
309    expected_pkts = 0
310    for payload_len in payload_lens:
311        test_string = "".join(
312            random.choice(string.ascii_lowercase) for _ in range(payload_len)
313        )
314
315        rx_udp = f"socat -{cfg.addr_ipver} -T 2 " + \
316                 f"-u UDP-RECV:{port},reuseport STDOUT"
317
318        # Writing zero bytes to stdin gets ignored by socat,
319        # but with the shut-null flag socat generates a zero sized packet
320        # when the socket is closed.
321        tx_cmd_suffix = ",shut-null" if payload_len == 0 else ""
322        tx_udp = f"echo -n {test_string} | socat -t 2 " + \
323                 f"-u STDIN UDP:{cfg.baddr}:{port}{tx_cmd_suffix}"
324
325        with bkg(rx_udp, host=cfg.remote, exit_wait=True) as rnc:
326            wait_port_listen(port, proto="udp", host=cfg.remote)
327            cmd(tx_udp, host=cfg.remote, shell=True)
328
329        ksft_eq(rnc.stdout.strip(), test_string, "UDP packet exchange failed")
330
331        expected_pkts += 1
332        stats = _get_stats(prog_info["maps"]["map_xdp_stats"])
333        ksft_eq(stats[XDPStats.RX.value], expected_pkts, "RX stats mismatch")
334        ksft_eq(stats[XDPStats.TX.value], expected_pkts, "TX stats mismatch")
335
336
337def test_xdp_native_tx_sb(cfg):
338    """
339    Tests the XDP_TX action for a single-buff case.
340
341    Args:
342        cfg: Configuration object containing network settings.
343    """
344    bpf_info = BPFProgInfo("xdp_prog", "xdp_native.bpf.o", "xdp", 1500)
345
346    # Ensure there's enough room for an ETH / IP / UDP header
347    pkt_hdr_len = 42 if cfg.addr_ipver == "4" else 62
348
349    _test_xdp_native_tx(cfg, bpf_info, [0, 1500 // 2, 1500 - pkt_hdr_len])
350
351
352def test_xdp_native_tx_mb(cfg):
353    """
354    Tests the XDP_TX action for a multi-buff case.
355
356    Args:
357        cfg: Configuration object containing network settings.
358    """
359    bpf_info = BPFProgInfo("xdp_prog_frags", "xdp_native.bpf.o",
360                           "xdp.frags", 9000)
361    # The first packet ensures we exercise the fragmented code path.
362    # And the subsequent 0-sized packet ensures the driver
363    # reinitializes xdp_buff correctly.
364    _test_xdp_native_tx(cfg, bpf_info, [8000, 0])
365
366
367def _validate_res(res, offset_lst, pkt_sz_lst):
368    """
369    Validates the result of a test.
370
371    Args:
372        res: The result of the test, which should be a dictionary with a "status" key.
373
374    Raises:
375        KsftFailEx: If the test fails to pass any combination of offset and packet size.
376    """
377    if "status" not in res:
378        raise KsftFailEx("Missing 'status' key in result dictionary")
379
380    # Validate that not a single case was successful
381    if res["status"] == "fail":
382        if res["offset"] == offset_lst[0] and res["pkt_sz"] == pkt_sz_lst[0]:
383            raise KsftFailEx(f"{res['reason']}")
384
385        # Get the previous offset and packet size to report the successful run
386        tmp_idx = offset_lst.index(res["offset"])
387        prev_offset = offset_lst[tmp_idx - 1]
388        if tmp_idx == 0:
389            tmp_idx = pkt_sz_lst.index(res["pkt_sz"])
390            prev_pkt_sz = pkt_sz_lst[tmp_idx - 1]
391        else:
392            prev_pkt_sz = res["pkt_sz"]
393
394        # Use these values for error reporting
395        ksft_pr(
396        f"Failed run: pkt_sz {res['pkt_sz']}, offset {res['offset']}. "
397        f"Last successful run: pkt_sz {prev_pkt_sz}, offset {prev_offset}. "
398        f"Reason: {res['reason']}"
399        )
400
401
402def _check_for_failures(recvd_str, stats):
403    """
404    Checks for common failures while adjusting headroom or tailroom.
405
406    Args:
407        recvd_str: The string received from the remote host after sending a test string.
408        stats: A dictionary containing formatted packet statistics for various XDP actions.
409
410    Returns:
411        str: A string describing the failure reason if a failure is detected, otherwise None.
412    """
413
414    # Any adjustment failure result in an abort hence, we track this counter
415    if stats[XDPStats.ABORT.value] != 0:
416        return "Adjustment failed"
417
418    # Since we are using aggregate stats for a single test across all offsets and packet sizes
419    # we can't use RX stats only to track data exchange failure without taking a previous
420    # snapshot. An easier way is to simply check for non-zero length of received string.
421    if len(recvd_str) == 0:
422        return "Data exchange failed"
423
424    # Check for RX and PASS stats mismatch. Ideally, they should be equal for a successful run
425    if stats[XDPStats.RX.value] != stats[XDPStats.PASS.value]:
426        return "RX stats mismatch"
427
428    return None
429
430
431def _test_xdp_native_tail_adjst(cfg, pkt_sz_lst, offset_lst):
432    """
433    Tests the XDP tail adjustment functionality.
434
435    This function loads the appropriate XDP program based on the provided
436    program name and configures the XDP map for tail adjustment. It then
437    validates the tail adjustment by sending and receiving UDP packets
438    with specified packet sizes and offsets.
439
440    Args:
441        cfg: Configuration object containing network settings.
442        prog: Name of the XDP program to load.
443        pkt_sz_lst: List of packet sizes to test.
444        offset_lst: List of offsets to validate support for tail adjustment.
445
446    Returns:
447        dict: A dictionary with test status and failure details if applicable.
448    """
449    port = rand_port()
450    bpf_info = BPFProgInfo("xdp_prog_frags", "xdp_native.bpf.o", "xdp.frags", 9000)
451
452    prog_info = _load_xdp_prog(cfg, bpf_info)
453
454    # Configure the XDP map for tail adjustment
455    _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.TAIL_ADJST.value)
456    _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
457
458    for offset in offset_lst:
459        tag = format(random.randint(65, 90), "02x")
460
461        _set_xdp_map("map_xdp_setup", TestConfig.ADJST_OFFSET.value, offset)
462        if offset > 0:
463            _set_xdp_map("map_xdp_setup", TestConfig.ADJST_TAG.value, int(tag, 16))
464
465        for pkt_sz in pkt_sz_lst:
466            test_str = "".join(random.choice(string.ascii_lowercase) for _ in range(pkt_sz))
467            recvd_str = _exchg_udp(cfg, port, test_str)
468            stats = _get_stats(prog_info["maps"]["map_xdp_stats"])
469
470            failure = _check_for_failures(recvd_str, stats)
471            if failure is not None:
472                return {
473                    "status": "fail",
474                    "reason": failure,
475                    "offset": offset,
476                    "pkt_sz": pkt_sz,
477                }
478
479            # Validate data content based on offset direction
480            expected_data = None
481            if offset > 0:
482                expected_data = test_str + (offset * chr(int(tag, 16)))
483            else:
484                expected_data = test_str[0:pkt_sz + offset]
485
486            if recvd_str != expected_data:
487                return {
488                    "status": "fail",
489                    "reason": "Data mismatch",
490                    "offset": offset,
491                    "pkt_sz": pkt_sz,
492                }
493
494    return {"status": "pass"}
495
496
497def test_xdp_native_adjst_tail_grow_data(cfg):
498    """
499    Tests the XDP tail adjustment by growing packet data.
500
501    Args:
502        cfg: Configuration object containing network settings.
503    """
504    pkt_sz_lst = [512, 1024, 2048]
505    offset_lst = [1, 16, 32, 64, 128, 256]
506    res = _test_xdp_native_tail_adjst(
507        cfg,
508        pkt_sz_lst,
509        offset_lst,
510    )
511
512    _validate_res(res, offset_lst, pkt_sz_lst)
513
514
515def test_xdp_native_adjst_tail_shrnk_data(cfg):
516    """
517    Tests the XDP tail adjustment by shrinking packet data.
518
519    Args:
520        cfg: Configuration object containing network settings.
521    """
522    pkt_sz_lst = [512, 1024, 2048]
523    offset_lst = [-16, -32, -64, -128, -256]
524    res = _test_xdp_native_tail_adjst(
525        cfg,
526        pkt_sz_lst,
527        offset_lst,
528    )
529
530    _validate_res(res, offset_lst, pkt_sz_lst)
531
532
533def get_hds_thresh(cfg):
534    """
535    Retrieves the header data split (HDS) threshold for a network interface.
536
537    Args:
538        cfg: Configuration object containing network settings.
539
540    Returns:
541        The HDS threshold value. If the threshold is not supported or an error occurs,
542        a default value of 1500 is returned.
543    """
544    netnl = cfg.netnl
545    hds_thresh = 1500
546
547    try:
548        rings = netnl.rings_get({'header': {'dev-index': cfg.ifindex}})
549        if 'hds-thresh' not in rings:
550            ksft_pr(f'hds-thresh not supported. Using default: {hds_thresh}')
551            return hds_thresh
552        hds_thresh = rings['hds-thresh']
553    except NlError as e:
554        ksft_pr(f"Failed to get rings: {e}. Using default: {hds_thresh}")
555
556    return hds_thresh
557
558
559def _test_xdp_native_head_adjst(cfg, prog, pkt_sz_lst, offset_lst):
560    """
561    Tests the XDP head adjustment action for a multi-buffer case.
562
563    Args:
564        cfg: Configuration object containing network settings.
565        netnl: Network namespace or link object (not used in this function).
566
567    This function sets up the packet size and offset lists, then performs
568    the head adjustment test by sending and receiving UDP packets.
569    """
570    cfg.require_cmd("socat", remote=True)
571
572    prog_info = _load_xdp_prog(cfg, BPFProgInfo(prog, "xdp_native.bpf.o", "xdp.frags", 9000))
573    port = rand_port()
574
575    _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.HEAD_ADJST.value)
576    _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
577
578    hds_thresh = get_hds_thresh(cfg)
579    for offset in offset_lst:
580        for pkt_sz in pkt_sz_lst:
581            # The "head" buffer must contain at least the Ethernet header
582            # after we eat into it. We send large-enough packets, but if HDS
583            # is enabled head will only contain headers. Don't try to eat
584            # more than 28 bytes (UDPv4 + eth hdr left: (14 + 20 + 8) - 14)
585            l2_cut_off = 28 if cfg.addr_ipver == 4 else 48
586            if pkt_sz > hds_thresh and offset > l2_cut_off:
587                ksft_pr(
588                f"Failed run: pkt_sz ({pkt_sz}) > HDS threshold ({hds_thresh}) and "
589                f"offset {offset} > {l2_cut_off}"
590                )
591                return {"status": "pass"}
592
593            test_str = ''.join(random.choice(string.ascii_lowercase) for _ in range(pkt_sz))
594            tag = format(random.randint(65, 90), '02x')
595
596            _set_xdp_map("map_xdp_setup",
597                     TestConfig.ADJST_OFFSET.value,
598                     offset)
599            _set_xdp_map("map_xdp_setup", TestConfig.ADJST_TAG.value, int(tag, 16))
600            _set_xdp_map("map_xdp_setup", TestConfig.ADJST_OFFSET.value, offset)
601
602            recvd_str = _exchg_udp(cfg, port, test_str)
603
604            # Check for failures around adjustment and data exchange
605            failure = _check_for_failures(recvd_str, _get_stats(prog_info['maps']['map_xdp_stats']))
606            if failure is not None:
607                return {
608                    "status": "fail",
609                    "reason": failure,
610                    "offset": offset,
611                    "pkt_sz": pkt_sz
612                }
613
614            # Validate data content based on offset direction
615            expected_data = None
616            if offset < 0:
617                expected_data = chr(int(tag, 16)) * (0 - offset) + test_str
618            else:
619                expected_data = test_str[offset:]
620
621            if recvd_str != expected_data:
622                return {
623                    "status": "fail",
624                    "reason": "Data mismatch",
625                    "offset": offset,
626                    "pkt_sz": pkt_sz
627                }
628
629    return {"status": "pass"}
630
631
632def test_xdp_native_adjst_head_grow_data(cfg):
633    """
634    Tests the XDP headroom growth support.
635
636    Args:
637        cfg: Configuration object containing network settings.
638
639    This function sets up the packet size and offset lists, then calls the
640    _test_xdp_native_head_adjst_mb function to perform the actual test. The
641    test is passed if the headroom is successfully extended for given packet
642    sizes and offsets.
643    """
644    pkt_sz_lst = [512, 1024, 2048]
645
646    # Negative values result in headroom shrinking, resulting in growing of payload
647    offset_lst = [-16, -32, -64, -128, -256]
648    res = _test_xdp_native_head_adjst(cfg, "xdp_prog_frags", pkt_sz_lst, offset_lst)
649
650    _validate_res(res, offset_lst, pkt_sz_lst)
651
652
653def test_xdp_native_adjst_head_shrnk_data(cfg):
654    """
655    Tests the XDP headroom shrinking support.
656
657    Args:
658        cfg: Configuration object containing network settings.
659
660    This function sets up the packet size and offset lists, then calls the
661    _test_xdp_native_head_adjst_mb function to perform the actual test. The
662    test is passed if the headroom is successfully shrunk for given packet
663    sizes and offsets.
664    """
665    pkt_sz_lst = [512, 1024, 2048]
666
667    # Positive values result in headroom growing, resulting in shrinking of payload
668    offset_lst = [16, 32, 64, 128, 256]
669    res = _test_xdp_native_head_adjst(cfg, "xdp_prog_frags", pkt_sz_lst, offset_lst)
670
671    _validate_res(res, offset_lst, pkt_sz_lst)
672
673
674def main():
675    """
676    Main function to execute the XDP tests.
677
678    This function runs a series of tests to validate the XDP support for
679    both the single and multi-buffer. It uses the NetDrvEpEnv context
680    manager to manage the network driver environment and the ksft_run
681    function to execute the tests.
682    """
683    with NetDrvEpEnv(__file__) as cfg:
684        cfg.netnl = EthtoolFamily()
685        ksft_run(
686            [
687                test_xdp_native_pass_sb,
688                test_xdp_native_pass_mb,
689                test_xdp_native_drop_sb,
690                test_xdp_native_drop_mb,
691                test_xdp_native_tx_sb,
692                test_xdp_native_tx_mb,
693                test_xdp_native_adjst_tail_grow_data,
694                test_xdp_native_adjst_tail_shrnk_data,
695                test_xdp_native_adjst_head_grow_data,
696                test_xdp_native_adjst_head_shrnk_data,
697            ],
698            args=(cfg,))
699    ksft_exit()
700
701
702if __name__ == "__main__":
703    main()
704