xref: /freebsd/crypto/krb5/src/tests/t_otp.py (revision f1c4c3daccbaf3820f0e2224de53df12fc952fcc)
1# Author: Nathaniel McCallum <npmccallum@redhat.com>
2#
3# Copyright (c) 2013 Red Hat, Inc.
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy
6# of this software and associated documentation files (the "Software"), to deal
7# in the Software without restriction, including without limitation the rights
8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9# copies of the Software, and to permit persons to whom the Software is
10# furnished to do so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in
13# all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21# THE SOFTWARE.
22
23
24#
25# This script tests OTP, both UDP and Unix Sockets, with a variety of
26# configuration. It requires pyrad to run, but exits gracefully if not found.
27# It also deliberately shuts down the test daemons between tests in order to
28# test how OTP handles the case of short daemon restarts.
29#
30
31from k5test import *
32from queue import Empty
33import io
34import struct
35
36try:
37    from pyrad import packet, dictionary
38except ImportError:
39    skip_rest('OTP tests', 'Python pyrad module not found')
40try:
41    from multiprocessing import Process, Queue
42except ImportError:
43    skip_rest('OTP tests', 'Python version 2.6 required')
44
45# We could use a dictionary file, but since we need so few attributes,
46# we'll just include them here.
47radius_attributes = '''
48ATTRIBUTE    User-Name    1    string
49ATTRIBUTE    User-Password   2    octets
50ATTRIBUTE    Service-Type    6    integer
51ATTRIBUTE    NAS-Identifier  32    string
52ATTRIBUTE    Message-Authenticator 80 octets
53'''
54
55class RadiusDaemon(Process):
56    MAX_PACKET_SIZE = 4096
57    DICTIONARY = dictionary.Dictionary(io.StringIO(radius_attributes))
58
59    def listen(self, addr):
60        raise NotImplementedError()
61
62    def recvRequest(self, data):
63        raise NotImplementedError()
64
65    def run(self):
66        addr = self._args[0]
67        secrfile = self._args[1]
68        pswd = self._args[2]
69        outq = self._args[3]
70
71        if secrfile:
72            with open(secrfile, 'rb') as file:
73                secr = file.read().strip()
74        else:
75            secr = b''
76
77        data = self.listen(addr)
78        outq.put("started")
79        (buf, sock, addr) = self.recvRequest(data)
80        pkt = packet.AuthPacket(secret=secr,
81                                dict=RadiusDaemon.DICTIONARY,
82                                packet=buf)
83
84        usernm = []
85        passwd = []
86        for key in pkt.keys():
87            if key == 'User-Password':
88                passwd = list(map(pkt.PwDecrypt, pkt[key]))
89            elif key == 'User-Name':
90                usernm = pkt[key]
91
92        reply = pkt.CreateReply()
93        replyq = {'user': usernm, 'pass': passwd}
94        if passwd == [pswd]:
95            reply.code = packet.AccessAccept
96            replyq['reply'] = True
97        else:
98            reply.code = packet.AccessReject
99            replyq['reply'] = False
100
101        reply.add_message_authenticator()
102
103        outq.put(replyq)
104        if addr is None:
105            sock.send(reply.ReplyPacket())
106        else:
107            sock.sendto(reply.ReplyPacket(), addr)
108        sock.close()
109
110class UDPRadiusDaemon(RadiusDaemon):
111    def listen(self, addr):
112        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
113        sock.bind((addr.split(':')[0], int(addr.split(':')[1])))
114        return sock
115
116    def recvRequest(self, sock):
117        (buf, addr) = sock.recvfrom(RadiusDaemon.MAX_PACKET_SIZE)
118        return (buf, sock, addr)
119
120class UnixRadiusDaemon(RadiusDaemon):
121    def listen(self, addr):
122        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
123        if os.path.exists(addr):
124            os.remove(addr)
125        sock.bind(addr)
126        sock.listen(1)
127        return (sock, addr)
128
129    def recvRequest(self, sock_and_addr):
130        sock, addr = sock_and_addr
131        conn = sock.accept()[0]
132        sock.close()
133        os.remove(addr)
134
135        buf = b''
136        remain = RadiusDaemon.MAX_PACKET_SIZE
137        while True:
138            buf += conn.recv(remain)
139            remain = RadiusDaemon.MAX_PACKET_SIZE - len(buf)
140            if (len(buf) >= 4):
141                remain = struct.unpack("!BBH", buf[0:4])[2] - len(buf)
142                if (remain <= 0):
143                    return (buf, conn, None)
144
145def verify(daemon, queue, reply, usernm, passwd):
146    try:
147        data = queue.get(timeout=1)
148    except Empty:
149        sys.stderr.write("ERROR: Packet not received by daemon!\n")
150        daemon.terminate()
151        sys.exit(1)
152    assert data['reply'] is reply
153    assert data['user'] == [usernm]
154    assert data['pass'] == [passwd]
155    daemon.join()
156
157# Compose a single token configuration.
158def otpconfig_1(toktype, username=None, indicators=None):
159    val = '{"type": "%s"' % toktype
160    if username is not None:
161        val += ', "username": "%s"' % username
162    if indicators is not None:
163        qind = ['"%s"' % s for s in indicators]
164        jsonlist = '[' + ', '.join(qind) + ']'
165        val += ', "indicators":' + jsonlist
166    val += '}'
167    return val
168
169# Compose a token configuration list suitable for the "otp" string
170# attribute.
171def otpconfig(toktype, username=None, indicators=None):
172    return '[' + otpconfig_1(toktype, username, indicators) + ']'
173
174prefix = "/tmp/%d" % os.getpid()
175secret_file = prefix + ".secret"
176socket_file = prefix + ".socket"
177with open(secret_file, "w") as file:
178    file.write("otptest")
179atexit.register(lambda: os.remove(secret_file))
180
181conf = {'plugins': {'kdcpreauth': {'enable_only': 'otp'}},
182        'otp': {'udp': {'server': '127.0.0.1:$port9',
183                        'secret': secret_file,
184                        'strip_realm': 'true',
185                        'indicator': ['indotp1', 'indotp2']},
186                'unix': {'server': socket_file,
187                         'strip_realm': 'false'}}}
188
189queue = Queue()
190
191realm = K5Realm(kdc_conf=conf)
192realm.run([kadminl, 'modprinc', '+requires_preauth', realm.user_princ])
193flags = ['-T', realm.ccache]
194server_addr = '127.0.0.1:' + str(realm.portbase + 9)
195
196## Test UDP fail / custom username
197mark('UDP fail / custom username')
198daemon = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept', queue))
199daemon.start()
200queue.get()
201realm.run([kadminl, 'setstr', realm.user_princ, 'otp',
202           otpconfig('udp', 'custom')])
203realm.kinit(realm.user_princ, 'reject', flags=flags, expected_code=1)
204verify(daemon, queue, False, 'custom', 'reject')
205
206## Test UDP success / standard username
207mark('UDP success / standard username')
208daemon = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept', queue))
209daemon.start()
210queue.get()
211realm.run([kadminl, 'setstr', realm.user_princ, 'otp', otpconfig('udp')])
212realm.kinit(realm.user_princ, 'accept', flags=flags)
213verify(daemon, queue, True, realm.user_princ.split('@')[0], 'accept')
214realm.extract_keytab(realm.krbtgt_princ, realm.keytab)
215realm.run(['./adata', realm.krbtgt_princ],
216          expected_msg='+97: [indotp1, indotp2]')
217
218# Repeat with an indicators override in the string attribute.
219mark('auth indicator override')
220daemon = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept', queue))
221daemon.start()
222queue.get()
223oconf = otpconfig('udp', indicators=['indtok1', 'indtok2'])
224realm.run([kadminl, 'setstr', realm.user_princ, 'otp', oconf])
225realm.kinit(realm.user_princ, 'accept', flags=flags)
226verify(daemon, queue, True, realm.user_princ.split('@')[0], 'accept')
227realm.extract_keytab(realm.krbtgt_princ, realm.keytab)
228realm.run(['./adata', realm.krbtgt_princ],
229          expected_msg='+97: [indtok1, indtok2]')
230
231# Detect upstream pyrad bug
232#   https://github.com/wichert/pyrad/pull/18
233try:
234    auth = packet.Packet.CreateAuthenticator()
235    packet.Packet(authenticator=auth, secret=b'').ReplyPacket()
236except AssertionError:
237    skip_rest('OTP UNIX domain socket tests', 'pyrad assertion bug detected')
238
239## Test Unix fail / custom username
240mark('Unix socket fail / custom username')
241daemon = UnixRadiusDaemon(args=(socket_file, None, 'accept', queue))
242daemon.start()
243queue.get()
244realm.run([kadminl, 'setstr', realm.user_princ, 'otp',
245           otpconfig('unix', 'custom')])
246realm.kinit(realm.user_princ, 'reject', flags=flags, expected_code=1)
247verify(daemon, queue, False, 'custom', 'reject')
248
249## Test Unix success / standard username
250mark('Unix socket success / standard username')
251daemon = UnixRadiusDaemon(args=(socket_file, None, 'accept', queue))
252daemon.start()
253queue.get()
254realm.run([kadminl, 'setstr', realm.user_princ, 'otp', otpconfig('unix')])
255realm.kinit(realm.user_princ, 'accept', flags=flags)
256verify(daemon, queue, True, realm.user_princ, 'accept')
257
258## Regression test for #8708: test with the standard username and two
259## tokens configured, with the first rejecting and the second
260## accepting.  With the bug, the KDC incorrectly rejects the request
261## and then performs invalid memory accesses, most likely crashing.
262queue2 = Queue()
263daemon1 = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept1', queue))
264daemon2 = UnixRadiusDaemon(args=(socket_file, None, 'accept2', queue2))
265daemon1.start()
266queue.get()
267daemon2.start()
268queue2.get()
269oconf = '[' + otpconfig_1('udp') + ', ' + otpconfig_1('unix') + ']'
270realm.run([kadminl, 'setstr', realm.user_princ, 'otp', oconf])
271realm.kinit(realm.user_princ, 'accept2', flags=flags)
272verify(daemon1, queue, False, realm.user_princ.split('@')[0], 'accept2')
273verify(daemon2, queue2, True, realm.user_princ, 'accept2')
274
275success('OTP tests')
276