xref: /linux/tools/testing/selftests/tc-testing/plugin-lib/scapyPlugin.py (revision 2330437da0994321020777c605a2a8cb0ecb7001)
1#!/usr/bin/env python3
2
3import os
4import signal
5from string import Template
6import subprocess
7import time
8from TdcPlugin import TdcPlugin
9
10from tdc_config import *
11
12try:
13    from scapy.all import *
14except ImportError:
15    print("Unable to import the scapy python module.")
16    print("\nIf not already installed, you may do so with:")
17    print("\t\tpip3 install scapy==2.4.2")
18    exit(1)
19
20class SubPlugin(TdcPlugin):
21    def __init__(self):
22        self.sub_class = 'scapy/SubPlugin'
23        super().__init__()
24
25    def post_execute(self):
26        if 'scapy' not in self.args.caseinfo:
27            if self.args.verbose:
28                print('{}.post_execute: no scapy info in test case'.format(self.sub_class))
29            return
30
31        # Check for required fields
32        lscapyinfo = self.args.caseinfo['scapy']
33        if type(lscapyinfo) != list:
34            lscapyinfo = [ lscapyinfo, ]
35
36        for scapyinfo in lscapyinfo:
37            scapy_keys = ['iface', 'count', 'packet']
38            missing_keys = []
39            keyfail = False
40            for k in scapy_keys:
41                if k not in scapyinfo:
42                    keyfail = True
43                    missing_keys.append(k)
44            if keyfail:
45                print('{}: Scapy block present in the test, but is missing info:'
46                    .format(self.sub_class))
47                print('{}'.format(missing_keys))
48
49            pkt = eval(scapyinfo['packet'])
50            if '$' in scapyinfo['iface']:
51                tpl = Template(scapyinfo['iface'])
52                scapyinfo['iface'] = tpl.safe_substitute(NAMES)
53            for count in range(scapyinfo['count']):
54                sendp(pkt, iface=scapyinfo['iface'])
55