1#!/usr/local/bin/python2 2# 3# Copyright (c) 2014 The FreeBSD Foundation 4# All rights reserved. 5# Copyright 2019 Enji Cooper 6# 7# This software was developed by John-Mark Gurney under 8# the sponsorship from the FreeBSD Foundation. 9# Redistribution and use in source and binary forms, with or without 10# modification, are permitted provided that the following conditions 11# are met: 12# 1. Redistributions of source code must retain the above copyright 13# notice, this list of conditions and the following disclaimer. 14# 2. Redistributions in binary form must reproduce the above copyright 15# notice, this list of conditions and the following disclaimer in the 16# documentation and/or other materials provided with the distribution. 17# 18# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 19# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28# SUCH DAMAGE. 29# 30# $FreeBSD$ 31# 32 33from __future__ import print_function 34 35import binascii 36import errno 37import cryptodev 38import itertools 39import os 40import struct 41import unittest 42from cryptodev import * 43from glob import iglob 44 45katdir = '/usr/local/share/nist-kat' 46 47def katg(base, glob): 48 assert os.path.exists(katdir), "Please 'pkg install nist-kat'" 49 if not os.path.exists(os.path.join(katdir, base)): 50 raise unittest.SkipTest("Missing %s test vectors" % (base)) 51 return iglob(os.path.join(katdir, base, glob)) 52 53aesmodules = [ 'cryptosoft0', 'aesni0', 'armv8crypto0', 'ccr0', 'ccp0' ] 54desmodules = [ 'cryptosoft0', ] 55shamodules = [ 'cryptosoft0', 'aesni0', 'armv8crypto0', 'ccr0', 'ccp0' ] 56 57def GenTestCase(cname): 58 try: 59 crid = cryptodev.Crypto.findcrid(cname) 60 except IOError: 61 return None 62 63 class GendCryptoTestCase(unittest.TestCase): 64 ############### 65 ##### AES ##### 66 ############### 67 @unittest.skipIf(cname not in aesmodules, 'skipping AES-XTS on %s' % (cname)) 68 def test_xts(self): 69 for i in katg('XTSTestVectors/format tweak value input - data unit seq no', '*.rsp'): 70 self.runXTS(i, cryptodev.CRYPTO_AES_XTS) 71 72 @unittest.skipIf(cname not in aesmodules, 'skipping AES-CBC on %s' % (cname)) 73 def test_cbc(self): 74 for i in katg('KAT_AES', 'CBC[GKV]*.rsp'): 75 self.runCBC(i) 76 77 @unittest.skipIf(cname not in aesmodules, 'skipping AES-CCM on %s' % (cname)) 78 def test_ccm(self): 79 for i in katg('ccmtestvectors', 'V*.rsp'): 80 self.runCCMEncrypt(i) 81 82 for i in katg('ccmtestvectors', 'D*.rsp'): 83 self.runCCMDecrypt(i) 84 85 @unittest.skipIf(cname not in aesmodules, 'skipping AES-GCM on %s' % (cname)) 86 def test_gcm(self): 87 for i in katg('gcmtestvectors', 'gcmEncrypt*'): 88 self.runGCM(i, 'ENCRYPT') 89 90 for i in katg('gcmtestvectors', 'gcmDecrypt*'): 91 self.runGCM(i, 'DECRYPT') 92 93 def runGCM(self, fname, mode): 94 curfun = None 95 if mode == 'ENCRYPT': 96 swapptct = False 97 curfun = Crypto.encrypt 98 elif mode == 'DECRYPT': 99 swapptct = True 100 curfun = Crypto.decrypt 101 else: 102 raise RuntimeError('unknown mode: %r' % repr(mode)) 103 104 columns = [ 'Count', 'Key', 'IV', 'CT', 'AAD', 'Tag', 'PT', ] 105 with cryptodev.KATParser(fname, columns) as parser: 106 self.runGCMWithParser(parser, mode) 107 108 def runGCMWithParser(self, parser, mode): 109 for _, lines in next(parser): 110 for data in lines: 111 curcnt = int(data['Count']) 112 cipherkey = binascii.unhexlify(data['Key']) 113 iv = binascii.unhexlify(data['IV']) 114 aad = binascii.unhexlify(data['AAD']) 115 tag = binascii.unhexlify(data['Tag']) 116 if 'FAIL' not in data: 117 pt = binascii.unhexlify(data['PT']) 118 ct = binascii.unhexlify(data['CT']) 119 120 if len(iv) != 12: 121 # XXX - isn't supported 122 continue 123 124 try: 125 c = Crypto(cryptodev.CRYPTO_AES_NIST_GCM_16, 126 cipherkey, crid=crid, 127 maclen=16) 128 except EnvironmentError as e: 129 # Can't test algorithms the driver does not support. 130 if e.errno != errno.EOPNOTSUPP: 131 raise 132 continue 133 134 if mode == 'ENCRYPT': 135 try: 136 rct, rtag = c.encrypt(pt, iv, aad) 137 except EnvironmentError as e: 138 # Can't test inputs the driver does not support. 139 if e.errno != errno.EINVAL: 140 raise 141 continue 142 rtag = rtag[:len(tag)] 143 data['rct'] = binascii.hexlify(rct) 144 data['rtag'] = binascii.hexlify(rtag) 145 self.assertEqual(rct, ct, repr(data)) 146 self.assertEqual(rtag, tag, repr(data)) 147 else: 148 if len(tag) != 16: 149 continue 150 args = (ct, iv, aad, tag) 151 if 'FAIL' in data: 152 self.assertRaises(IOError, 153 c.decrypt, *args) 154 else: 155 try: 156 rpt, rtag = c.decrypt(*args) 157 except EnvironmentError as e: 158 # Can't test inputs the driver does not support. 159 if e.errno != errno.EINVAL: 160 raise 161 continue 162 data['rpt'] = binascii.hexlify(rpt) 163 data['rtag'] = binascii.hexlify(rtag) 164 self.assertEqual(rpt, pt, 165 repr(data)) 166 167 def runCBC(self, fname): 168 columns = [ 'COUNT', 'KEY', 'IV', 'PLAINTEXT', 'CIPHERTEXT', ] 169 with cryptodev.KATParser(fname, columns) as parser: 170 self.runCBCWithParser(parser) 171 172 def runCBCWithParser(self, parser): 173 curfun = None 174 for mode, lines in next(parser): 175 if mode == 'ENCRYPT': 176 swapptct = False 177 curfun = Crypto.encrypt 178 elif mode == 'DECRYPT': 179 swapptct = True 180 curfun = Crypto.decrypt 181 else: 182 raise RuntimeError('unknown mode: %r' % repr(mode)) 183 184 for data in lines: 185 curcnt = int(data['COUNT']) 186 cipherkey = binascii.unhexlify(data['KEY']) 187 iv = binascii.unhexlify(data['IV']) 188 pt = binascii.unhexlify(data['PLAINTEXT']) 189 ct = binascii.unhexlify(data['CIPHERTEXT']) 190 191 if swapptct: 192 pt, ct = ct, pt 193 # run the fun 194 c = Crypto(cryptodev.CRYPTO_AES_CBC, cipherkey, crid=crid) 195 r = curfun(c, pt, iv) 196 self.assertEqual(r, ct) 197 198 def runXTS(self, fname, meth): 199 columns = [ 'COUNT', 'DataUnitLen', 'Key', 'DataUnitSeqNumber', 'PT', 200 'CT'] 201 with cryptodev.KATParser(fname, columns) as parser: 202 self.runXTSWithParser(parser, meth) 203 204 def runXTSWithParser(self, parser, meth): 205 curfun = None 206 for mode, lines in next(parser): 207 if mode == 'ENCRYPT': 208 swapptct = False 209 curfun = Crypto.encrypt 210 elif mode == 'DECRYPT': 211 swapptct = True 212 curfun = Crypto.decrypt 213 else: 214 raise RuntimeError('unknown mode: %r' % repr(mode)) 215 216 for data in lines: 217 curcnt = int(data['COUNT']) 218 nbits = int(data['DataUnitLen']) 219 cipherkey = binascii.unhexlify(data['Key']) 220 iv = struct.pack('QQ', int(data['DataUnitSeqNumber']), 0) 221 pt = binascii.unhexlify(data['PT']) 222 ct = binascii.unhexlify(data['CT']) 223 224 if nbits % 128 != 0: 225 # XXX - mark as skipped 226 continue 227 if swapptct: 228 pt, ct = ct, pt 229 # run the fun 230 try: 231 c = Crypto(meth, cipherkey, crid=crid) 232 r = curfun(c, pt, iv) 233 except EnvironmentError as e: 234 # Can't test hashes the driver does not support. 235 if e.errno != errno.EOPNOTSUPP: 236 raise 237 continue 238 self.assertEqual(r, ct) 239 240 def runCCMEncrypt(self, fname): 241 with cryptodev.KATCCMParser(fname) as parser: 242 self.runCCMEncryptWithParser(parser) 243 244 def runCCMEncryptWithParser(self, parser): 245 for data in next(parser): 246 Nlen = int(data['Nlen']) 247 if Nlen != 12: 248 # OCF only supports 12 byte IVs 249 continue 250 key = binascii.unhexlify(data['Key']) 251 nonce = binascii.unhexlify(data['Nonce']) 252 Alen = int(data['Alen']) 253 if Alen != 0: 254 aad = binascii.unhexlify(data['Adata']) 255 else: 256 aad = None 257 payload = binascii.unhexlify(data['Payload']) 258 ct = binascii.unhexlify(data['CT']) 259 260 try: 261 c = Crypto(crid=crid, 262 cipher=cryptodev.CRYPTO_AES_CCM_16, 263 key=key, 264 mac=cryptodev.CRYPTO_AES_CCM_CBC_MAC, 265 mackey=key, maclen=16) 266 r, tag = Crypto.encrypt(c, payload, 267 nonce, aad) 268 except EnvironmentError as e: 269 if e.errno != errno.EOPNOTSUPP: 270 raise 271 continue 272 273 out = r + tag 274 self.assertEqual(out, ct, 275 "Count " + data['Count'] + " Actual: " + \ 276 repr(binascii.hexlify(out)) + " Expected: " + \ 277 repr(data) + " on " + cname) 278 279 def runCCMDecrypt(self, fname): 280 with cryptodev.KATCCMParser(fname) as parser: 281 self.runCCMDecryptWithParser(parser) 282 283 def runCCMDecryptWithParser(self, parser): 284 # XXX: Note that all of the current CCM 285 # decryption test vectors use IV and tag sizes 286 # that aren't supported by OCF none of the 287 # tests are actually ran. 288 for data in next(parser): 289 Nlen = int(data['Nlen']) 290 if Nlen != 12: 291 # OCF only supports 12 byte IVs 292 continue 293 Tlen = int(data['Tlen']) 294 if Tlen != 16: 295 # OCF only supports 16 byte tags 296 continue 297 key = binascii.unhexlify(data['Key']) 298 nonce = binascii.unhexlify(data['Nonce']) 299 Alen = int(data['Alen']) 300 if Alen != 0: 301 aad = binascii.unhexlify(data['Adata']) 302 else: 303 aad = None 304 ct = binascii.unhexlify(data['CT']) 305 tag = ct[-16:] 306 ct = ct[:-16] 307 308 try: 309 c = Crypto(crid=crid, 310 cipher=cryptodev.CRYPTO_AES_CCM_16, 311 key=key, 312 mac=cryptodev.CRYPTO_AES_CCM_CBC_MAC, 313 mackey=key, maclen=16) 314 except EnvironmentError as e: 315 if e.errno != errno.EOPNOTSUPP: 316 raise 317 continue 318 319 if data['Result'] == 'Fail': 320 self.assertRaises(IOError, 321 c.decrypt, payload, nonce, aad, tag) 322 else: 323 r = Crypto.decrypt(c, payload, nonce, 324 aad, tag) 325 326 payload = binascii.unhexlify(data['Payload']) 327 plen = int(data('Plen')) 328 payload = payload[:plen] 329 self.assertEqual(r, payload, 330 "Count " + data['Count'] + \ 331 " Actual: " + repr(binascii.hexlify(r)) + \ 332 " Expected: " + repr(data) + \ 333 " on " + cname) 334 335 ############### 336 ##### DES ##### 337 ############### 338 @unittest.skipIf(cname not in desmodules, 'skipping DES on %s' % (cname)) 339 def test_tdes(self): 340 for i in katg('KAT_TDES', 'TCBC[a-z]*.rsp'): 341 self.runTDES(i) 342 343 def runTDES(self, fname): 344 columns = [ 'COUNT', 'KEYs', 'IV', 'PLAINTEXT', 'CIPHERTEXT', ] 345 with cryptodev.KATParser(fname, columns) as parser: 346 self.runTDESWithParser(parser) 347 348 def runTDESWithParser(self, parser): 349 curfun = None 350 for mode, lines in next(parser): 351 if mode == 'ENCRYPT': 352 swapptct = False 353 curfun = Crypto.encrypt 354 elif mode == 'DECRYPT': 355 swapptct = True 356 curfun = Crypto.decrypt 357 else: 358 raise RuntimeError('unknown mode: %r' % repr(mode)) 359 360 for data in lines: 361 curcnt = int(data['COUNT']) 362 key = data['KEYs'] * 3 363 cipherkey = binascii.unhexlify(key) 364 iv = binascii.unhexlify(data['IV']) 365 pt = binascii.unhexlify(data['PLAINTEXT']) 366 ct = binascii.unhexlify(data['CIPHERTEXT']) 367 368 if swapptct: 369 pt, ct = ct, pt 370 # run the fun 371 c = Crypto(cryptodev.CRYPTO_3DES_CBC, cipherkey, crid=crid) 372 r = curfun(c, pt, iv) 373 self.assertEqual(r, ct) 374 375 ############### 376 ##### SHA ##### 377 ############### 378 @unittest.skipIf(cname not in shamodules, 'skipping SHA on %s' % str(cname)) 379 def test_sha(self): 380 for i in katg('shabytetestvectors', 'SHA*Msg.rsp'): 381 self.runSHA(i) 382 383 def runSHA(self, fname): 384 # Skip SHA512_(224|256) tests 385 if fname.find('SHA512_') != -1: 386 return 387 columns = [ 'Len', 'Msg', 'MD' ] 388 with cryptodev.KATParser(fname, columns) as parser: 389 self.runSHAWithParser(parser) 390 391 def runSHAWithParser(self, parser): 392 for hashlength, lines in next(parser): 393 # E.g., hashlength will be "L=20" (bytes) 394 hashlen = int(hashlength.split("=")[1]) 395 396 if hashlen == 20: 397 alg = cryptodev.CRYPTO_SHA1 398 elif hashlen == 28: 399 alg = cryptodev.CRYPTO_SHA2_224 400 elif hashlen == 32: 401 alg = cryptodev.CRYPTO_SHA2_256 402 elif hashlen == 48: 403 alg = cryptodev.CRYPTO_SHA2_384 404 elif hashlen == 64: 405 alg = cryptodev.CRYPTO_SHA2_512 406 else: 407 # Skip unsupported hashes 408 # Slurp remaining input in section 409 for data in lines: 410 continue 411 continue 412 413 for data in lines: 414 msg = binascii.unhexlify(data['Msg']) 415 msg = msg[:int(data['Len'])] 416 md = binascii.unhexlify(data['MD']) 417 418 try: 419 c = Crypto(mac=alg, crid=crid, 420 maclen=hashlen) 421 except EnvironmentError as e: 422 # Can't test hashes the driver does not support. 423 if e.errno != errno.EOPNOTSUPP: 424 raise 425 continue 426 427 _, r = c.encrypt(msg, iv="") 428 429 self.assertEqual(r, md, "Actual: " + \ 430 repr(binascii.hexlify(r)) + " Expected: " + repr(data) + " on " + cname) 431 432 @unittest.skipIf(cname not in shamodules, 'skipping SHA-HMAC on %s' % str(cname)) 433 def test_sha1hmac(self): 434 for i in katg('hmactestvectors', 'HMAC.rsp'): 435 self.runSHA1HMAC(i) 436 437 def runSHA1HMAC(self, fname): 438 columns = [ 'Count', 'Klen', 'Tlen', 'Key', 'Msg', 'Mac' ] 439 with cryptodev.KATParser(fname, columns) as parser: 440 self.runSHA1HMACWithParser(parser) 441 442 def runSHA1HMACWithParser(self, parser): 443 for hashlength, lines in next(parser): 444 # E.g., hashlength will be "L=20" (bytes) 445 hashlen = int(hashlength.split("=")[1]) 446 447 blocksize = None 448 if hashlen == 20: 449 alg = cryptodev.CRYPTO_SHA1_HMAC 450 blocksize = 64 451 elif hashlen == 28: 452 alg = cryptodev.CRYPTO_SHA2_224_HMAC 453 blocksize = 64 454 elif hashlen == 32: 455 alg = cryptodev.CRYPTO_SHA2_256_HMAC 456 blocksize = 64 457 elif hashlen == 48: 458 alg = cryptodev.CRYPTO_SHA2_384_HMAC 459 blocksize = 128 460 elif hashlen == 64: 461 alg = cryptodev.CRYPTO_SHA2_512_HMAC 462 blocksize = 128 463 else: 464 # Skip unsupported hashes 465 # Slurp remaining input in section 466 for data in lines: 467 continue 468 continue 469 470 for data in lines: 471 key = binascii.unhexlify(data['Key']) 472 msg = binascii.unhexlify(data['Msg']) 473 mac = binascii.unhexlify(data['Mac']) 474 tlen = int(data['Tlen']) 475 476 if len(key) > blocksize: 477 continue 478 479 try: 480 c = Crypto(mac=alg, mackey=key, 481 crid=crid, maclen=hashlen) 482 except EnvironmentError as e: 483 # Can't test hashes the driver does not support. 484 if e.errno != errno.EOPNOTSUPP: 485 raise 486 continue 487 488 _, r = c.encrypt(msg, iv="") 489 490 self.assertEqual(r[:tlen], mac, "Actual: " + \ 491 repr(binascii.hexlify(r)) + " Expected: " + repr(data)) 492 493 return GendCryptoTestCase 494 495cryptosoft = GenTestCase('cryptosoft0') 496aesni = GenTestCase('aesni0') 497armv8crypto = GenTestCase('armv8crypto0') 498ccr = GenTestCase('ccr0') 499ccp = GenTestCase('ccp0') 500 501if __name__ == '__main__': 502 unittest.main() 503