1from k5test import * 2from base64 import b64encode 3import shutil 4 5realm = K5Realm(create_host=False, get_creds=False) 6usercache = 'FILE:' + os.path.join(realm.testdir, 'usercache') 7storagecache = 'FILE:' + os.path.join(realm.testdir, 'save') 8 9# Create two service principals with keys in the default keytab. 10service1 = 'service/1@%s' % realm.realm 11realm.addprinc(service1) 12realm.extract_keytab(service1, realm.keytab) 13service2 = 'service/2@%s' % realm.realm 14realm.addprinc(service2) 15realm.extract_keytab(service2, realm.keytab) 16 17puser = 'p:' + realm.user_princ 18pservice1 = 'p:' + service1 19pservice2 = 'p:' + service2 20 21# Get forwardable creds for service1 in the default cache. 22realm.kinit(service1, None, ['-f', '-k']) 23 24# Try S4U2Self for user with a restricted password. 25realm.run([kadminl, 'modprinc', '+needchange', realm.user_princ]) 26realm.run(['./t_s4u', 'e:user', '-']) 27realm.run([kadminl, 'modprinc', '-needchange', 28 '-pwexpire', '1/1/2000', realm.user_princ]) 29realm.run(['./t_s4u', 'e:user', '-']) 30realm.run([kadminl, 'modprinc', '-pwexpire', 'never', realm.user_princ]) 31 32# Try krb5 -> S4U2Proxy with forwardable user creds. This should fail 33# at the S4U2Proxy step since the DB2 back end currently has no 34# support for allowing it. 35realm.kinit(realm.user_princ, password('user'), ['-f', '-c', usercache]) 36output = realm.run(['./t_s4u2proxy_krb5', usercache, storagecache, '-', 37 pservice1, pservice2], expected_code=1) 38if ('auth1: ' + realm.user_princ not in output or 39 'KDC can\'t fulfill requested option' not in output): 40 fail('krb5 -> s4u2proxy') 41 42# Again with SPNEGO. 43output = realm.run(['./t_s4u2proxy_krb5', '--spnego', usercache, storagecache, 44 '-', pservice1, pservice2], 45 expected_code=1) 46if ('auth1: ' + realm.user_princ not in output or 47 'KDC can\'t fulfill requested option' not in output): 48 fail('krb5 -> s4u2proxy (SPNEGO)') 49 50# Try krb5 -> S4U2Proxy without forwardable user creds. 51realm.kinit(realm.user_princ, password('user'), ['-c', usercache]) 52output = realm.run(['./t_s4u2proxy_krb5', usercache, storagecache, pservice1, 53 pservice1, pservice2], expected_code=1) 54if ('auth1: ' + realm.user_princ not in output or 55 'KDC can\'t fulfill requested option' not in output): 56 fail('krb5 -> s4u2proxy not-forwardable') 57 58# Try S4U2Self. Ask for an S4U2Proxy step; this won't succeed because 59# service/1 isn't allowed to get a forwardable S4U2Self ticket. 60realm.run(['./t_s4u', puser, pservice2], expected_code=1, 61 expected_msg='KDC can\'t fulfill requested option') 62realm.run(['./t_s4u', '--spnego', puser, pservice2], expected_code=1, 63 expected_msg='KDC can\'t fulfill requested option') 64 65# Correct that problem and try again. As above, the S4U2Proxy step 66# won't actually succeed since we don't support that in DB2. 67realm.run([kadminl, 'modprinc', '+ok_to_auth_as_delegate', service1]) 68realm.run(['./t_s4u', puser, pservice2], expected_code=1, 69 expected_msg='KDC can\'t fulfill requested option') 70 71# Again with SPNEGO. This uses SPNEGO for the initial authentication, 72# but still uses krb5 for S4U2Proxy--the delegated cred is returned as 73# a krb5 cred, not a SPNEGO cred, and t_s4u uses the delegated cred 74# directly rather than saving and reacquiring it. 75realm.run(['./t_s4u', '--spnego', puser, pservice2], expected_code=1, 76 expected_msg='KDC can\'t fulfill requested option') 77 78realm.stop() 79 80# Set up a realm using the test KDB module so that we can do 81# successful S4U2Proxy delegations. 82testprincs = {'krbtgt/KRBTEST.COM': {'keys': 'aes128-cts'}, 83 'user': {'keys': 'aes128-cts'}, 84 'service/1': {'flags': '+ok-to-auth-as-delegate', 85 'keys': 'aes128-cts'}, 86 'service/2': {'keys': 'aes128-cts'}} 87conf = {'realms': {'$realm': {'database_module': 'test'}}, 88 'dbmodules': {'test': {'db_library': 'test', 89 'princs': testprincs, 90 'delegation': {'service/1': 'service/2'}}}} 91realm = K5Realm(create_kdb=False, kdc_conf=conf) 92userkeytab = 'FILE:' + os.path.join(realm.testdir, 'userkeytab') 93realm.extract_keytab(realm.user_princ, userkeytab) 94realm.extract_keytab(service1, realm.keytab) 95realm.extract_keytab(service2, realm.keytab) 96realm.start_kdc() 97 98# Get forwardable creds for service1 in the default cache. 99realm.kinit(service1, None, ['-f', '-k']) 100 101# Successful krb5 -> S4U2Proxy, with krb5 and SPNEGO mechs. 102realm.kinit(realm.user_princ, None, ['-f', '-k', '-c', usercache, 103 '-t', userkeytab]) 104out = realm.run(['./t_s4u2proxy_krb5', usercache, storagecache, '-', 105 pservice1, pservice2]) 106if 'auth1: user@' not in out or 'auth2: user@' not in out: 107 fail('krb5 -> s4u2proxy') 108out = realm.run(['./t_s4u2proxy_krb5', '--spnego', usercache, storagecache, 109 '-', pservice1, pservice2]) 110if 'auth1: user@' not in out or 'auth2: user@' not in out: 111 fail('krb5 -> s4u2proxy') 112 113# Successful S4U2Self -> S4U2Proxy. 114out = realm.run(['./t_s4u', puser, pservice2]) 115 116# Regression test for #8139: get a user ticket directly for service1 and 117# try krb5 -> S4U2Proxy. 118realm.kinit(realm.user_princ, None, ['-f', '-k', '-c', usercache, 119 '-t', userkeytab, '-S', service1]) 120out = realm.run(['./t_s4u2proxy_krb5', usercache, storagecache, '-', 121 pservice1, pservice2]) 122if 'auth1: user@' not in out or 'auth2: user@' not in out: 123 fail('krb5 -> s4u2proxy') 124 125# Simulate a krbtgt rollover and verify that the user ticket can still 126# be validated. 127realm.stop_kdc() 128newtgt_keys = ['2 aes128-cts', '1 aes128-cts'] 129newtgt_princs = {'krbtgt/KRBTEST.COM': {'keys': newtgt_keys}} 130newtgt_conf = {'dbmodules': {'test': {'princs': newtgt_princs}}} 131newtgt_env = realm.special_env('newtgt', True, kdc_conf=newtgt_conf) 132realm.start_kdc(env=newtgt_env) 133out = realm.run(['./t_s4u2proxy_krb5', usercache, storagecache, '-', 134 pservice1, pservice2]) 135if 'auth1: user@' not in out or 'auth2: user@' not in out: 136 fail('krb5 -> s4u2proxy') 137 138# Get a user ticket after the krbtgt rollover and verify that 139# S4U2Proxy delegation works (also a #8139 regression test). 140realm.kinit(realm.user_princ, None, ['-f', '-k', '-c', usercache, 141 '-t', userkeytab]) 142out = realm.run(['./t_s4u2proxy_krb5', usercache, storagecache, '-', 143 pservice1, pservice2]) 144if 'auth1: user@' not in out or 'auth2: user@' not in out: 145 fail('krb5 -> s4u2proxy') 146 147realm.stop() 148 149mark('S4U2Self with various enctypes') 150for realm in multipass_realms(create_host=False, get_creds=False): 151 service1 = 'service/1@%s' % realm.realm 152 realm.addprinc(service1) 153 realm.extract_keytab(service1, realm.keytab) 154 realm.kinit(service1, None, ['-k']) 155 realm.run(['./t_s4u', 'e:user', '-']) 156 157# Test cross realm S4U2Self using server referrals. 158mark('cross-realm S4U2Self') 159testprincs = {'krbtgt/SREALM': {'keys': 'aes128-cts'}, 160 'krbtgt/UREALM': {'keys': 'aes128-cts'}, 161 'user': {'keys': 'aes128-cts', 'flags': '+preauth'}, 162 'other': {'keys': 'aes128-cts'}} 163kdcconf1 = {'realms': {'$realm': {'database_module': 'test'}}, 164 'dbmodules': {'test': {'db_library': 'test', 165 'princs': testprincs, 166 'alias': {'enterprise@abc': '@UREALM', 167 'user@UREALM': '@UREALM'}}}} 168kdcconf2 = {'realms': {'$realm': {'database_module': 'test'}}, 169 'dbmodules': {'test': {'db_library': 'test', 170 'princs': testprincs, 171 'alias': {'user@SREALM': '@SREALM', 172 'user@UREALM': 'user', 173 'enterprise@abc': 'user'}}}} 174r1, r2 = cross_realms(2, xtgts=(), 175 args=({'realm': 'SREALM', 'kdc_conf': kdcconf1}, 176 {'realm': 'UREALM', 'kdc_conf': kdcconf2}), 177 create_kdb=False) 178 179r1.start_kdc() 180r2.start_kdc() 181r1.extract_keytab(r1.user_princ, r1.keytab) 182r1.kinit(r1.user_princ, None, ['-k', '-t', r1.keytab]) 183savefile = r1.ccache + '.save' 184shutil.copyfile(r1.ccache, savefile) 185 186# Include a regression test for #8741 by unsetting the default realm. 187remove_default = {'libdefaults': {'default_realm': None}} 188no_default = r1.special_env('no_default', False, krb5_conf=remove_default) 189msgs = ('Getting credentials user@UREALM -> user@SREALM', 190 '/Matching credential not found', 191 'Getting credentials user@SREALM -> krbtgt/UREALM@SREALM', 192 'Received creds for desired service krbtgt/UREALM@SREALM', 193 'via TGT krbtgt/UREALM@SREALM after requesting user\\@SREALM@UREALM', 194 'krbtgt/SREALM@UREALM differs from requested user\\@SREALM@UREALM', 195 'via TGT krbtgt/SREALM@UREALM after requesting user@SREALM', 196 'TGS reply is for user@UREALM -> user@SREALM') 197r1.run(['./t_s4u', 'p:' + r2.user_princ, '-', r1.keytab], env=no_default, 198 expected_trace=msgs) 199 200# Test realm identification of enterprise principal names ([MS-SFU] 201# 3.1.5.1.1.1). Attach a bogus realm to the enterprise name to verify 202# that we start at the server realm. 203mark('cross-realm S4U2Self with enterprise name') 204msgs = ('Getting initial credentials for enterprise\\@abc@SREALM', 205 'Sending unauthenticated request', 206 '/Realm not local to KDC', 207 'Following referral to realm UREALM', 208 'Sending unauthenticated request', 209 '/Additional pre-authentication required', 210 'Identified realm of client principal as UREALM', 211 'Getting credentials enterprise\\@abc@UREALM -> user@SREALM', 212 'TGS reply is for enterprise\\@abc@UREALM -> user@SREALM') 213r1.run(['./t_s4u', 'e:enterprise@abc@NOREALM', '-', r1.keytab], 214 expected_trace=msgs) 215 216mark('S4U2Self using X509 certificate') 217 218# Encode name as a PEM certificate file (sort of) for use by kvno. 219def princ_cert(name): 220 enc = b64encode(name.encode('ascii')).decode('ascii') 221 return '-----BEGIN CERTIFICATE-----\n%s\n-----END y\n' % enc 222 223cert_path = os.path.join(r1.testdir, 'fake_cert') 224with open(cert_path, "w") as cert_file: 225 cert_file.write(princ_cert('other')) 226 227shutil.copyfile(savefile, r1.ccache) 228msgs = ('Getting initial credentials for @SREALM', 229 'Identified realm of client principal as SREALM', 230 'TGS reply is for other@SREALM', 231 'Getting credentials other@SREALM', 232 'Storing other@SREALM') 233r1.run([kvno, '-F', cert_path, r1.user_princ], expected_trace=msgs) 234 235shutil.copyfile(savefile, r1.ccache) 236msgs = ('Getting credentials other@SREALM', 237 'TGS reply is for other@SREALM', 238 'Storing other@SREALM') 239r1.run([kvno, '-I', 'other', '-F', cert_path, r1.user_princ], 240 expected_trace=msgs) 241 242shutil.copyfile(savefile, r1.ccache) 243msgs = ('Getting initial credentials for other@SREALM', 244 'Identified realm of client principal as SREALM', 245 'Getting credentials other@SREALM', 246 'TGS reply is for other@SREALM', 247 'Storing other@SREALM') 248r1.run([kvno, '-U', 'other', '-F', cert_path, r1.user_princ], 249 expected_trace=msgs) 250 251mark('cross-realm S4U2Self using X509 certificate') 252 253with open(cert_path, "w") as cert_file: 254 cert_file.write(princ_cert('user@UREALM')) 255 256shutil.copyfile(savefile, r1.ccache) 257msgs = ('Getting initial credentials for @SREALM', 258 'Identified realm of client principal as UREALM', 259 'TGS reply is for user@UREALM', 260 'Getting credentials user@UREALM', 261 'Storing user@UREALM') 262r1.run([kvno, '-F', cert_path, r1.user_princ], expected_trace=msgs) 263 264shutil.copyfile(savefile, r1.ccache) 265msgs = ('Getting credentials user@UREALM', 266 'TGS reply is for user@UREALM', 267 'Storing user@UREALM') 268r1.run([kvno, '-I', 'user@UREALM', '-F', cert_path, r1.user_princ], 269 expected_trace=msgs) 270 271shutil.copyfile(savefile, r1.ccache) 272msgs = ('Getting initial credentials for enterprise\\@abc@SREALM', 273 'Identified realm of client principal as UREALM', 274 'Getting credentials enterprise\\@abc@UREALM', 275 'TGS reply is for enterprise\\@abc@UREALM', 276 'Storing enterprise\\@abc@UREALM') 277r1.run([kvno, '-U', 'enterprise@abc', '-F', cert_path, r1.user_princ], 278 expected_trace=msgs) 279 280shutil.copyfile(savefile, r1.ccache) 281 282mark('S4U2Self using X509 certificate (GSSAPI)') 283 284r1.run(['./t_s4u', 'c:other', '-', r1.keytab]) 285r1.run(['./t_s4u', 'c:user@UREALM', '-', r1.keytab]) 286 287r1.run(['./t_s4u', '--spnego', 'c:other', '-', r1.keytab]) 288r1.run(['./t_s4u', '--spnego', 'c:user@UREALM', '-', r1.keytab]) 289 290r1.stop() 291r2.stop() 292 293mark('Resource-based constrained delegation') 294 295a_princs = {'krbtgt/A': {'keys': 'aes128-cts'}, 296 'krbtgt/B': {'keys': 'aes128-cts'}, 297 'user': {'keys': 'aes128-cts', 'flags': '+preauth'}, 298 'sensitive': {'keys': 'aes128-cts', 299 'flags': '+disallow_forwardable'}, 300 'impersonator': {'keys': 'aes128-cts'}, 301 'service1': {'keys': 'aes128-cts'}, 302 'rb2': {'keys': 'aes128-cts'}, 303 'rb': {'keys': 'aes128-cts'}} 304a_kconf = {'realms': {'$realm': {'database_module': 'test'}}, 305 'dbmodules': {'test': {'db_library': 'test', 306 'princs': a_princs, 307 'rbcd': {'rb@A': 'impersonator@A', 308 'rb2@A': 'service1@A'}, 309 'delegation': {'service1': 'rb2'}, 310 'alias': {'rb@A': 'rb', 311 'rb@B': '@B', 312 'rb@C': '@B', 313 'service/rb.a': 'rb', 314 'service/rb.b': '@B', 315 'service/rb.c': '@B' }}}} 316 317b_princs = {'krbtgt/B': {'keys': 'aes128-cts'}, 318 'krbtgt/A': {'keys': 'aes128-cts'}, 319 'krbtgt/C': {'keys': 'aes128-cts'}, 320 'user': {'keys': 'aes128-cts', 'flags': '+preauth'}, 321 'rb': {'keys': 'aes128-cts'}} 322b_kconf = {'realms': {'$realm': {'database_module': 'test'}}, 323 'dbmodules': {'test': {'db_library': 'test', 324 'princs': b_princs, 325 'rbcd': {'rb@B': 'impersonator@A'}, 326 'alias': {'rb@B': 'rb', 327 'service/rb.b': 'rb', 328 'rb@C': '@C', 329 'impersonator@A': '@A', 330 'service/rb.c': '@C'}}}} 331 332c_princs = {'krbtgt/C': {'keys': 'aes128-cts'}, 333 'krbtgt/B': {'keys': 'aes128-cts'}, 334 'rb': {'keys': 'aes128-cts'}} 335c_kconf = {'realms': {'$realm': {'database_module': 'test'}}, 336 'capaths': { 'A' : { 'C' : 'B' }}, 337 'dbmodules': {'test': {'db_library': 'test', 338 'princs': c_princs, 339 'rbcd': {'rb@C': ['impersonator@A', 340 'service1@A']}, 341 'alias': {'rb@C': 'rb', 342 'service/rb.c': 'rb' }}}} 343 344ra, rb, rc = cross_realms(3, xtgts=(), 345 args=({'realm': 'A', 'kdc_conf': a_kconf}, 346 {'realm': 'B', 'kdc_conf': b_kconf}, 347 {'realm': 'C', 'kdc_conf': c_kconf}), 348 create_kdb=False) 349 350ra.start_kdc() 351rb.start_kdc() 352rc.start_kdc() 353 354domain_realm = {'domain_realm': {'.a':'A', '.b':'B', '.c':'C'}} 355domain_conf = ra.special_env('domain_conf', False, krb5_conf=domain_realm) 356 357ra.extract_keytab('impersonator@A', ra.keytab) 358ra.kinit('impersonator@A', None, ['-f', '-k', '-t', ra.keytab]) 359 360mark('Local-realm RBCD') 361ra.run(['./t_s4u', 'p:' + ra.user_princ, 'p:rb']) 362ra.run(['./t_s4u', 'p:' + ra.user_princ, 'e:rb']) 363ra.run(['./t_s4u', 'p:' + ra.user_princ, 'p:rb@A']) 364ra.run(['./t_s4u', 'p:' + ra.user_princ, 'e:rb@A']) 365ra.run(['./t_s4u', 'p:' + ra.user_princ, 'e:rb@A@']) 366ra.run(['./t_s4u', 'p:' + ra.user_princ, 'e:rb@A@A']) 367ra.run(['./t_s4u', 'p:' + ra.user_princ, 'h:service@rb.a']) 368ra.run(['./t_s4u', 'p:' + ra.user_princ, 'h:service@rb.a'], env=domain_conf) 369ra.run(['./t_s4u', 'p:' + 'sensitive@A', 'h:service@rb.a'], expected_code=1) 370ra.run(['./t_s4u', 'p:' + rb.user_princ, 'h:service@rb.a']) 371 372mark('Cross-realm RBCD') 373ra.run(['./t_s4u', 'p:' + ra.user_princ, 'e:rb@B']) 374ra.run(['./t_s4u', 'p:' + ra.user_princ, 'e:rb@B@']) 375ra.run(['./t_s4u', 'p:' + ra.user_princ, 'e:rb@B@A']) 376ra.run(['./t_s4u', 'p:' + ra.user_princ, 'h:service@rb.b']) 377ra.run(['./t_s4u', 'p:' + ra.user_princ, 'h:service@rb.b'], env=domain_conf) 378ra.run(['./t_s4u', 'p:' + 'sensitive@A', 'h:service@rb.b'], expected_code=1) 379ra.run(['./t_s4u', 'p:' + rb.user_princ, 'h:service@rb.b']) 380 381mark('RBCD transitive trust') 382ra.run(['./t_s4u', 'p:' + ra.user_princ, 'e:rb@C']) 383ra.run(['./t_s4u', 'p:' + ra.user_princ, 'e:rb@C@']) 384ra.run(['./t_s4u', 'p:' + ra.user_princ, 'e:rb@C@A']) 385ra.run(['./t_s4u', 'p:' + ra.user_princ, 'h:service@rb.c']) 386ra.run(['./t_s4u', 'p:' + ra.user_princ, 'h:service@rb.c'], env=domain_conf) 387ra.run(['./t_s4u', 'p:' + 'sensitive@A', 'h:service@rb.c'], expected_code=1) 388ra.run(['./t_s4u', 'p:' + rb.user_princ, 'h:service@rb.c']) 389 390# Although service1 has RBCD delegation privileges to rb2@A, it does 391# not have ok-to-auth-as-delegate and does have traditional delegation 392# privileges, so it cannot get a forwardable S4U2Self ticket. 393mark('RBCD forwardable blocked by forward delegation privileges') 394ra.extract_keytab('service1@A', ra.keytab) 395ra.kinit('service1@A', None, ['-f', '-k', '-t', ra.keytab]) 396ra.run(['./t_s4u', 'p:' + ra.user_princ, 'e:rb2@A'], expected_code=1, 397 expected_msg='KDC can\'t fulfill requested option') 398 399ra.stop() 400rb.stop() 401rc.stop() 402 403success('S4U test cases') 404