xref: /freebsd/tests/sys/opencrypto/cryptodev.py (revision 6829dae12bb055451fa467da4589c43bd03b1e64)
1#!/usr/bin/env python
2#
3# Copyright (c) 2014 The FreeBSD Foundation
4# Copyright 2014 John-Mark Gurney
5# All rights reserved.
6#
7# This software was developed by John-Mark Gurney under
8# the sponsorship from the FreeBSD Foundation.
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12# 1.  Redistributions of source code must retain the above copyright
13#     notice, this list of conditions and the following disclaimer.
14# 2.  Redistributions in binary form must reproduce the above copyright
15#     notice, this list of conditions and the following disclaimer in the
16#     documentation and/or other materials provided with the distribution.
17#
18# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
19# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
22# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28# SUCH DAMAGE.
29#
30# $FreeBSD$
31#
32
33from __future__ import print_function
34import array
35import dpkt
36from fcntl import ioctl
37import os
38import signal
39from struct import pack as _pack
40
41from cryptodevh import *
42
43__all__ = [ 'Crypto', 'MismatchError', ]
44
45class FindOp(dpkt.Packet):
46	__byte_order__ = '@'
47	__hdr__ = ( ('crid', 'i', 0),
48		('name', '32s', 0),
49	)
50
51class SessionOp(dpkt.Packet):
52	__byte_order__ = '@'
53	__hdr__ = ( ('cipher', 'I', 0),
54		('mac', 'I', 0),
55		('keylen', 'I', 0),
56		('key', 'P', 0),
57		('mackeylen', 'i', 0),
58		('mackey', 'P', 0),
59		('ses', 'I', 0),
60	)
61
62class SessionOp2(dpkt.Packet):
63	__byte_order__ = '@'
64	__hdr__ = ( ('cipher', 'I', 0),
65		('mac', 'I', 0),
66		('keylen', 'I', 0),
67		('key', 'P', 0),
68		('mackeylen', 'i', 0),
69		('mackey', 'P', 0),
70		('ses', 'I', 0),
71		('crid', 'i', 0),
72		('pad0', 'i', 0),
73		('pad1', 'i', 0),
74		('pad2', 'i', 0),
75		('pad3', 'i', 0),
76	)
77
78class CryptOp(dpkt.Packet):
79	__byte_order__ = '@'
80	__hdr__ = ( ('ses', 'I', 0),
81		('op', 'H', 0),
82		('flags', 'H', 0),
83		('len', 'I', 0),
84		('src', 'P', 0),
85		('dst', 'P', 0),
86		('mac', 'P', 0),
87		('iv', 'P', 0),
88	)
89
90class CryptAEAD(dpkt.Packet):
91	__byte_order__ = '@'
92	__hdr__ = (
93		('ses',		'I', 0),
94		('op',		'H', 0),
95		('flags',	'H', 0),
96		('len',		'I', 0),
97		('aadlen',	'I', 0),
98		('ivlen',	'I', 0),
99		('src',		'P', 0),
100		('dst',		'P', 0),
101		('aad',		'P', 0),
102		('tag',		'P', 0),
103		('iv',		'P', 0),
104	)
105
106# h2py.py can't handle multiarg macros
107CRIOGET = 3221513060
108CIOCGSESSION = 3224396645
109CIOCGSESSION2 = 3225445226
110CIOCFSESSION = 2147771238
111CIOCCRYPT = 3224396647
112CIOCKEY = 3230688104
113CIOCASYMFEAT = 1074029417
114CIOCKEY2 = 3230688107
115CIOCFINDDEV = 3223610220
116CIOCCRYPTAEAD = 3225445229
117
118def _getdev():
119	fd = os.open('/dev/crypto', os.O_RDWR)
120	buf = array.array('I', [0])
121	ioctl(fd, CRIOGET, buf, 1)
122	os.close(fd)
123
124	return buf[0]
125
126_cryptodev = _getdev()
127
128def _findop(crid, name):
129	fop = FindOp()
130	fop.crid = crid
131	fop.name = name
132	s = array.array('B', fop.pack_hdr())
133	ioctl(_cryptodev, CIOCFINDDEV, s, 1)
134	fop.unpack(s)
135
136	try:
137		idx = fop.name.index('\x00')
138		name = fop.name[:idx]
139	except ValueError:
140		name = fop.name
141
142	return fop.crid, name
143
144class Crypto:
145	@staticmethod
146	def findcrid(name):
147		return _findop(-1, name)[0]
148
149	@staticmethod
150	def getcridname(crid):
151		return _findop(crid, '')[1]
152
153	def __init__(self, cipher=0, key=None, mac=0, mackey=None,
154	    crid=CRYPTOCAP_F_SOFTWARE | CRYPTOCAP_F_HARDWARE):
155		self._ses = None
156		ses = SessionOp2()
157		ses.cipher = cipher
158		ses.mac = mac
159
160		if key is not None:
161			ses.keylen = len(key)
162			k = array.array('B', key)
163			ses.key = k.buffer_info()[0]
164		else:
165			self.key = None
166
167		if mackey is not None:
168			ses.mackeylen = len(mackey)
169			mk = array.array('B', mackey)
170			ses.mackey = mk.buffer_info()[0]
171			self._maclen = 16	# parameterize?
172		else:
173			self._maclen = None
174
175		if not cipher and not mac:
176			raise ValueError('one of cipher or mac MUST be specified.')
177		ses.crid = crid
178		#print(ses)
179		s = array.array('B', ses.pack_hdr())
180		#print(s)
181		ioctl(_cryptodev, CIOCGSESSION2, s, 1)
182		ses.unpack(s)
183
184		self._ses = ses.ses
185
186	def __del__(self):
187		if self._ses is None:
188			return
189
190		try:
191			ioctl(_cryptodev, CIOCFSESSION, _pack('I', self._ses))
192		except TypeError:
193			pass
194		self._ses = None
195
196	def _doop(self, op, src, iv):
197		cop = CryptOp()
198		cop.ses = self._ses
199		cop.op = op
200		cop.flags = 0
201		cop.len = len(src)
202		s = array.array('B', src)
203		cop.src = cop.dst = s.buffer_info()[0]
204		if self._maclen is not None:
205			m = array.array('B', [0] * self._maclen)
206			cop.mac = m.buffer_info()[0]
207		ivbuf = array.array('B', iv)
208		cop.iv = ivbuf.buffer_info()[0]
209
210		#print('cop:', cop)
211		ioctl(_cryptodev, CIOCCRYPT, str(cop))
212
213		s = s.tostring()
214		if self._maclen is not None:
215			return s, m.tostring()
216
217		return s
218
219	def _doaead(self, op, src, aad, iv, tag=None):
220		caead = CryptAEAD()
221		caead.ses = self._ses
222		caead.op = op
223		caead.flags = CRD_F_IV_EXPLICIT
224		caead.flags = 0
225		caead.len = len(src)
226		s = array.array('B', src)
227		caead.src = caead.dst = s.buffer_info()[0]
228		caead.aadlen = len(aad)
229		saad = array.array('B', aad)
230		caead.aad = saad.buffer_info()[0]
231
232		if self._maclen is None:
233			raise ValueError('must have a tag length')
234
235		if tag is None:
236			tag = array.array('B', [0] * self._maclen)
237		else:
238			assert len(tag) == self._maclen, \
239                '%d != %d' % (len(tag), self._maclen)
240			tag = array.array('B', tag)
241
242		caead.tag = tag.buffer_info()[0]
243
244		ivbuf = array.array('B', iv)
245		caead.ivlen = len(iv)
246		caead.iv = ivbuf.buffer_info()[0]
247
248		ioctl(_cryptodev, CIOCCRYPTAEAD, str(caead))
249
250		s = s.tostring()
251
252		return s, tag.tostring()
253
254	def perftest(self, op, size, timeo=3):
255		import random
256		import time
257
258		inp = array.array('B', (random.randint(0, 255) for x in xrange(size)))
259		out = array.array('B', inp)
260
261		# prep ioctl
262		cop = CryptOp()
263		cop.ses = self._ses
264		cop.op = op
265		cop.flags = 0
266		cop.len = len(inp)
267		s = array.array('B', inp)
268		cop.src = s.buffer_info()[0]
269		cop.dst = out.buffer_info()[0]
270		if self._maclen is not None:
271			m = array.array('B', [0] * self._maclen)
272			cop.mac = m.buffer_info()[0]
273		ivbuf = array.array('B', (random.randint(0, 255) for x in xrange(16)))
274		cop.iv = ivbuf.buffer_info()[0]
275
276		exit = [ False ]
277		def alarmhandle(a, b, exit=exit):
278			exit[0] = True
279
280		oldalarm = signal.signal(signal.SIGALRM, alarmhandle)
281		signal.alarm(timeo)
282
283		start = time.time()
284		reps = 0
285		while not exit[0]:
286			ioctl(_cryptodev, CIOCCRYPT, str(cop))
287			reps += 1
288
289		end = time.time()
290
291		signal.signal(signal.SIGALRM, oldalarm)
292
293		print('time:', end - start)
294		print('perf MB/sec:', (reps * size) / (end - start) / 1024 / 1024)
295
296	def encrypt(self, data, iv, aad=None):
297		if aad is None:
298			return self._doop(COP_ENCRYPT, data, iv)
299		else:
300			return self._doaead(COP_ENCRYPT, data, aad,
301			    iv)
302
303	def decrypt(self, data, iv, aad=None, tag=None):
304		if aad is None:
305			return self._doop(COP_DECRYPT, data, iv)
306		else:
307			return self._doaead(COP_DECRYPT, data, aad,
308			    iv, tag=tag)
309
310class MismatchError(Exception):
311	pass
312
313class KATParser:
314	def __init__(self, fname, fields):
315		self.fp = open(fname)
316		self.fields = set(fields)
317		self._pending = None
318
319	def __iter__(self):
320		while True:
321			didread = False
322			if self._pending is not None:
323				i = self._pending
324				self._pending = None
325			else:
326				i = self.fp.readline()
327				didread = True
328
329			if didread and not i:
330				return
331
332			if (i and i[0] == '#') or not i.strip():
333				continue
334			if i[0] == '[':
335				yield i[1:].split(']', 1)[0], self.fielditer()
336			else:
337				raise ValueError('unknown line: %r' % repr(i))
338
339	def eatblanks(self):
340		while True:
341			line = self.fp.readline()
342			if line == '':
343				break
344
345			line = line.strip()
346			if line:
347				break
348
349		return line
350
351	def fielditer(self):
352		while True:
353			values = {}
354
355			line = self.eatblanks()
356			if not line or line[0] == '[':
357				self._pending = line
358				return
359
360			while True:
361				try:
362					f, v = line.split(' =')
363				except:
364					if line == 'FAIL':
365						f, v = 'FAIL', ''
366					else:
367						print('line:', repr(line))
368						raise
369				v = v.strip()
370
371				if f in values:
372					raise ValueError('already present: %r' % repr(f))
373				values[f] = v
374				line = self.fp.readline().strip()
375				if not line:
376					break
377
378			# we should have everything
379			remain = self.fields.copy() - set(values.keys())
380			# XXX - special case GCM decrypt
381			if remain and not ('FAIL' in values and 'PT' in remain):
382				raise ValueError('not all fields found: %r' % repr(remain))
383
384			yield values
385
386def _spdechex(s):
387	return ''.join(s.split()).decode('hex')
388
389if __name__ == '__main__':
390	if True:
391		try:
392			crid = Crypto.findcrid('aesni0')
393			print('aesni:', crid)
394		except IOError:
395			print('aesni0 not found')
396
397		for i in xrange(10):
398			try:
399				name = Crypto.getcridname(i)
400				print('%2d: %r' % (i, repr(name)))
401			except IOError:
402				pass
403	elif False:
404		kp = KATParser('/usr/home/jmg/aesni.testing/format tweak value input - data unit seq no/XTSGenAES128.rsp', [ 'COUNT', 'DataUnitLen', 'Key', 'DataUnitSeqNumber', 'PT', 'CT' ])
405		for mode, ni in kp:
406			print(i, ni)
407			for j in ni:
408				print(j)
409	elif False:
410		key = _spdechex('c939cc13397c1d37de6ae0e1cb7c423c')
411		iv = _spdechex('00000000000000000000000000000001')
412		pt = _spdechex('ab3cabed693a32946055524052afe3c9cb49664f09fc8b7da824d924006b7496353b8c1657c5dec564d8f38d7432e1de35aae9d95590e66278d4acce883e51abaf94977fcd3679660109a92bf7b2973ccd547f065ec6cee4cb4a72a5e9f45e615d920d76cb34cba482467b3e21422a7242e7d931330c0fbf465c3a3a46fae943029fd899626dda542750a1eee253df323c6ef1573f1c8c156613e2ea0a6cdbf2ae9701020be2d6a83ecb7f3f9d8e')
413		#pt = _spdechex('00000000000000000000000000000000')
414		ct = _spdechex('f42c33853ecc5ce2949865fdb83de3bff1089e9360c94f830baebfaff72836ab5236f77212f1e7396c8c54ac73d81986375a6e9e299cfeca5ba051ed25e8d1affa5beaf6c1d2b45e90802408f2ced21663497e906de5f29341e5e52ddfea5363d628b3eb7806835e17bae051b3a6da3f8e2941fe44384eac17a9d298d2c331ca8320c775b5d53263a5e905059d891b21dede2d8110fd427c7bd5a9a274ddb47b1945ee79522203b6e297d0e399ef')
415
416		c = Crypto(CRYPTO_AES_ICM, key)
417		enc = c.encrypt(pt, iv)
418
419		print('enc:', enc.encode('hex'))
420		print(' ct:', ct.encode('hex'))
421
422		assert ct == enc
423
424		dec = c.decrypt(ct, iv)
425
426		print('dec:', dec.encode('hex'))
427		print(' pt:', pt.encode('hex'))
428
429		assert pt == dec
430	elif False:
431		key = _spdechex('c939cc13397c1d37de6ae0e1cb7c423c')
432		iv = _spdechex('00000000000000000000000000000001')
433		pt = _spdechex('ab3cabed693a32946055524052afe3c9cb49664f09fc8b7da824d924006b7496353b8c1657c5dec564d8f38d7432e1de35aae9d95590e66278d4acce883e51abaf94977fcd3679660109a92bf7b2973ccd547f065ec6cee4cb4a72a5e9f45e615d920d76cb34cba482467b3e21422a7242e7d931330c0fbf465c3a3a46fae943029fd899626dda542750a1eee253df323c6ef1573f1c8c156613e2ea0a6cdbf2ae9701020be2d6a83ecb7f3f9d8e0a3f')
434		#pt = _spdechex('00000000000000000000000000000000')
435		ct = _spdechex('f42c33853ecc5ce2949865fdb83de3bff1089e9360c94f830baebfaff72836ab5236f77212f1e7396c8c54ac73d81986375a6e9e299cfeca5ba051ed25e8d1affa5beaf6c1d2b45e90802408f2ced21663497e906de5f29341e5e52ddfea5363d628b3eb7806835e17bae051b3a6da3f8e2941fe44384eac17a9d298d2c331ca8320c775b5d53263a5e905059d891b21dede2d8110fd427c7bd5a9a274ddb47b1945ee79522203b6e297d0e399ef3768')
436
437		c = Crypto(CRYPTO_AES_ICM, key)
438		enc = c.encrypt(pt, iv)
439
440		print('enc:', enc.encode('hex'))
441		print(' ct:', ct.encode('hex'))
442
443		assert ct == enc
444
445		dec = c.decrypt(ct, iv)
446
447		print('dec:', dec.encode('hex'))
448		print(' pt:', pt.encode('hex'))
449
450		assert pt == dec
451	elif False:
452		key = _spdechex('c939cc13397c1d37de6ae0e1cb7c423c')
453		iv = _spdechex('6eba2716ec0bd6fa5cdef5e6d3a795bc')
454		pt = _spdechex('ab3cabed693a32946055524052afe3c9cb49664f09fc8b7da824d924006b7496353b8c1657c5dec564d8f38d7432e1de35aae9d95590e66278d4acce883e51abaf94977fcd3679660109a92bf7b2973ccd547f065ec6cee4cb4a72a5e9f45e615d920d76cb34cba482467b3e21422a7242e7d931330c0fbf465c3a3a46fae943029fd899626dda542750a1eee253df323c6ef1573f1c8c156613e2ea0a6cdbf2ae9701020be2d6a83ecb7f3f9d8e0a3f')
455		ct = _spdechex('f1f81f12e72e992dbdc304032705dc75dc3e4180eff8ee4819906af6aee876d5b00b7c36d282a445ce3620327be481e8e53a8e5a8e5ca9abfeb2281be88d12ffa8f46d958d8224738c1f7eea48bda03edbf9adeb900985f4fa25648b406d13a886c25e70cfdecdde0ad0f2991420eb48a61c64fd797237cf2798c2675b9bb744360b0a3f329ac53bbceb4e3e7456e6514f1a9d2f06c236c31d0f080b79c15dce1096357416602520daa098b17d1af427')
456		c = Crypto(CRYPTO_AES_CBC, key)
457
458		enc = c.encrypt(pt, iv)
459
460		print('enc:', enc.encode('hex'))
461		print(' ct:', ct.encode('hex'))
462
463		assert ct == enc
464
465		dec = c.decrypt(ct, iv)
466
467		print('dec:', dec.encode('hex'))
468		print(' pt:', pt.encode('hex'))
469
470		assert pt == dec
471	elif False:
472		key = _spdechex('c939cc13397c1d37de6ae0e1cb7c423c')
473		iv = _spdechex('b3d8cc017cbb89b39e0f67e2')
474		pt = _spdechex('c3b3c41f113a31b73d9a5cd4321030')
475		aad = _spdechex('24825602bd12a984e0092d3e448eda5f')
476		ct = _spdechex('93fe7d9e9bfd10348a5606e5cafa7354')
477		ct = _spdechex('93fe7d9e9bfd10348a5606e5cafa73')
478		tag = _spdechex('0032a1dc85f1c9786925a2e71d8272dd')
479		tag = _spdechex('8d11a0929cb3fbe1fef01a4a38d5f8ea')
480
481		c = Crypto(CRYPTO_AES_NIST_GCM_16, key,
482		    mac=CRYPTO_AES_128_NIST_GMAC, mackey=key)
483
484		enc, enctag = c.encrypt(pt, iv, aad=aad)
485
486		print('enc:', enc.encode('hex'))
487		print(' ct:', ct.encode('hex'))
488
489		assert enc == ct
490
491		print('etg:', enctag.encode('hex'))
492		print('tag:', tag.encode('hex'))
493		assert enctag == tag
494
495		# Make sure we get EBADMSG
496		#enctag = enctag[:-1] + 'a'
497		dec, dectag = c.decrypt(ct, iv, aad=aad, tag=enctag)
498
499		print('dec:', dec.encode('hex'))
500		print(' pt:', pt.encode('hex'))
501
502		assert dec == pt
503
504		print('dtg:', dectag.encode('hex'))
505		print('tag:', tag.encode('hex'))
506
507		assert dectag == tag
508	elif False:
509		key = _spdechex('c939cc13397c1d37de6ae0e1cb7c423c')
510		iv = _spdechex('b3d8cc017cbb89b39e0f67e2')
511		key = key + iv[:4]
512		iv = iv[4:]
513		pt = _spdechex('c3b3c41f113a31b73d9a5cd432103069')
514		aad = _spdechex('24825602bd12a984e0092d3e448eda5f')
515		ct = _spdechex('93fe7d9e9bfd10348a5606e5cafa7354')
516		tag = _spdechex('0032a1dc85f1c9786925a2e71d8272dd')
517
518		c = Crypto(CRYPTO_AES_GCM_16, key, mac=CRYPTO_AES_128_GMAC, mackey=key)
519
520		enc, enctag = c.encrypt(pt, iv, aad=aad)
521
522		print('enc:', enc.encode('hex'))
523		print(' ct:', ct.encode('hex'))
524
525		assert enc == ct
526
527		print('etg:', enctag.encode('hex'))
528		print('tag:', tag.encode('hex'))
529		assert enctag == tag
530	elif False:
531		for i in xrange(100000):
532			c = Crypto(CRYPTO_AES_XTS, '1bbfeadf539daedcae33ced497343f3ca1f2474ad932b903997d44707db41382'.decode('hex'))
533			data = '52a42bca4e9425a25bbc8c8bf6129dec'.decode('hex')
534			ct = '517e602becd066b65fa4f4f56ddfe240'.decode('hex')
535			iv = _pack('QQ', 71, 0)
536
537			enc = c.encrypt(data, iv)
538			assert enc == ct
539	elif True:
540		c = Crypto(CRYPTO_AES_XTS, '1bbfeadf539daedcae33ced497343f3ca1f2474ad932b903997d44707db41382'.decode('hex'))
541		data = '52a42bca4e9425a25bbc8c8bf6129dec'.decode('hex')
542		ct = '517e602becd066b65fa4f4f56ddfe240'.decode('hex')
543		iv = _pack('QQ', 71, 0)
544
545		enc = c.encrypt(data, iv)
546		assert enc == ct
547
548		dec = c.decrypt(enc, iv)
549		assert dec == data
550
551		#c.perftest(COP_ENCRYPT, 192*1024, reps=30000)
552
553	else:
554		key = '1bbfeadf539daedcae33ced497343f3ca1f2474ad932b903997d44707db41382'.decode('hex')
555		print('XTS %d testing:' % (len(key) * 8))
556		c = Crypto(CRYPTO_AES_XTS, key)
557		for i in [ 8192, 192*1024]:
558			print('block size: %d' % i)
559			c.perftest(COP_ENCRYPT, i)
560			c.perftest(COP_DECRYPT, i)
561