xref: /linux/net/rxrpc/rxperf.c (revision 497e6b37b0099dc415578488287fd84fb74433eb)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* In-kernel rxperf server for testing purposes.
3  *
4  * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) "rxperf: " fmt
9 #include <linux/module.h>
10 #include <linux/slab.h>
11 #include <net/sock.h>
12 #include <net/af_rxrpc.h>
13 
14 MODULE_DESCRIPTION("rxperf test server (afs)");
15 MODULE_AUTHOR("Red Hat, Inc.");
16 MODULE_LICENSE("GPL");
17 
18 #define RXPERF_PORT		7009
19 #define RX_PERF_SERVICE		147
20 #define RX_PERF_VERSION		3
21 #define RX_PERF_SEND		0
22 #define RX_PERF_RECV		1
23 #define RX_PERF_RPC		3
24 #define RX_PERF_FILE		4
25 #define RX_PERF_MAGIC_COOKIE	0x4711
26 
27 struct rxperf_proto_params {
28 	__be32		version;
29 	__be32		type;
30 	__be32		rsize;
31 	__be32		wsize;
32 } __packed;
33 
34 static const u8 rxperf_magic_cookie[] = { 0x00, 0x00, 0x47, 0x11 };
35 static const u8 secret[8] = { 0xa7, 0x83, 0x8a, 0xcb, 0xc7, 0x83, 0xec, 0x94 };
36 
37 enum rxperf_call_state {
38 	RXPERF_CALL_SV_AWAIT_PARAMS,	/* Server: Awaiting parameter block */
39 	RXPERF_CALL_SV_AWAIT_REQUEST,	/* Server: Awaiting request data */
40 	RXPERF_CALL_SV_REPLYING,	/* Server: Replying */
41 	RXPERF_CALL_SV_AWAIT_ACK,	/* Server: Awaiting final ACK */
42 	RXPERF_CALL_COMPLETE,		/* Completed or failed */
43 };
44 
45 struct rxperf_call {
46 	struct rxrpc_call	*rxcall;
47 	struct iov_iter		iter;
48 	struct kvec		kvec[1];
49 	struct work_struct	work;
50 	const char		*type;
51 	size_t			iov_len;
52 	size_t			req_len;	/* Size of request blob */
53 	size_t			reply_len;	/* Size of reply blob */
54 	unsigned int		debug_id;
55 	unsigned int		operation_id;
56 	struct rxperf_proto_params params;
57 	__be32			tmp[2];
58 	s32			abort_code;
59 	enum rxperf_call_state	state;
60 	short			error;
61 	unsigned short		unmarshal;
62 	u16			service_id;
63 	int (*deliver)(struct rxperf_call *call);
64 	void (*processor)(struct work_struct *work);
65 };
66 
67 static struct socket *rxperf_socket;
68 static struct key *rxperf_sec_keyring;	/* Ring of security/crypto keys */
69 static struct workqueue_struct *rxperf_workqueue;
70 
71 static void rxperf_deliver_to_call(struct work_struct *work);
72 static int rxperf_deliver_param_block(struct rxperf_call *call);
73 static int rxperf_deliver_request(struct rxperf_call *call);
74 static int rxperf_process_call(struct rxperf_call *call);
75 static void rxperf_charge_preallocation(struct work_struct *work);
76 
77 static DECLARE_WORK(rxperf_charge_preallocation_work,
78 		    rxperf_charge_preallocation);
79 
80 static inline void rxperf_set_call_state(struct rxperf_call *call,
81 					 enum rxperf_call_state to)
82 {
83 	call->state = to;
84 }
85 
86 static inline void rxperf_set_call_complete(struct rxperf_call *call,
87 					    int error, s32 remote_abort)
88 {
89 	if (call->state != RXPERF_CALL_COMPLETE) {
90 		call->abort_code = remote_abort;
91 		call->error = error;
92 		call->state = RXPERF_CALL_COMPLETE;
93 	}
94 }
95 
96 static void rxperf_rx_discard_new_call(struct rxrpc_call *rxcall,
97 				       unsigned long user_call_ID)
98 {
99 	kfree((struct rxperf_call *)user_call_ID);
100 }
101 
102 static void rxperf_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
103 			       unsigned long user_call_ID)
104 {
105 	queue_work(rxperf_workqueue, &rxperf_charge_preallocation_work);
106 }
107 
108 static void rxperf_queue_call_work(struct rxperf_call *call)
109 {
110 	queue_work(rxperf_workqueue, &call->work);
111 }
112 
113 static void rxperf_notify_rx(struct sock *sk, struct rxrpc_call *rxcall,
114 			     unsigned long call_user_ID)
115 {
116 	struct rxperf_call *call = (struct rxperf_call *)call_user_ID;
117 
118 	if (call->state != RXPERF_CALL_COMPLETE)
119 		rxperf_queue_call_work(call);
120 }
121 
122 static void rxperf_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
123 {
124 	struct rxperf_call *call = (struct rxperf_call *)user_call_ID;
125 
126 	call->rxcall = rxcall;
127 }
128 
129 static void rxperf_notify_end_reply_tx(struct sock *sock,
130 				       struct rxrpc_call *rxcall,
131 				       unsigned long call_user_ID)
132 {
133 	rxperf_set_call_state((struct rxperf_call *)call_user_ID,
134 			      RXPERF_CALL_SV_AWAIT_ACK);
135 }
136 
137 /*
138  * Charge the incoming call preallocation.
139  */
140 static void rxperf_charge_preallocation(struct work_struct *work)
141 {
142 	struct rxperf_call *call;
143 
144 	for (;;) {
145 		call = kzalloc(sizeof(*call), GFP_KERNEL);
146 		if (!call)
147 			break;
148 
149 		call->type		= "unset";
150 		call->debug_id		= atomic_inc_return(&rxrpc_debug_id);
151 		call->deliver		= rxperf_deliver_param_block;
152 		call->state		= RXPERF_CALL_SV_AWAIT_PARAMS;
153 		call->service_id	= RX_PERF_SERVICE;
154 		call->iov_len		= sizeof(call->params);
155 		call->kvec[0].iov_len	= sizeof(call->params);
156 		call->kvec[0].iov_base	= &call->params;
157 		iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
158 		INIT_WORK(&call->work, rxperf_deliver_to_call);
159 
160 		if (rxrpc_kernel_charge_accept(rxperf_socket,
161 					       rxperf_notify_rx,
162 					       rxperf_rx_attach,
163 					       (unsigned long)call,
164 					       GFP_KERNEL,
165 					       call->debug_id) < 0)
166 			break;
167 		call = NULL;
168 	}
169 
170 	kfree(call);
171 }
172 
173 /*
174  * Open an rxrpc socket and bind it to be a server for callback notifications
175  * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
176  */
177 static int rxperf_open_socket(void)
178 {
179 	struct sockaddr_rxrpc srx;
180 	struct socket *socket;
181 	int ret;
182 
183 	ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6,
184 			       &socket);
185 	if (ret < 0)
186 		goto error_1;
187 
188 	socket->sk->sk_allocation = GFP_NOFS;
189 
190 	/* bind the callback manager's address to make this a server socket */
191 	memset(&srx, 0, sizeof(srx));
192 	srx.srx_family			= AF_RXRPC;
193 	srx.srx_service			= RX_PERF_SERVICE;
194 	srx.transport_type		= SOCK_DGRAM;
195 	srx.transport_len		= sizeof(srx.transport.sin6);
196 	srx.transport.sin6.sin6_family	= AF_INET6;
197 	srx.transport.sin6.sin6_port	= htons(RXPERF_PORT);
198 
199 	ret = rxrpc_sock_set_min_security_level(socket->sk,
200 						RXRPC_SECURITY_ENCRYPT);
201 	if (ret < 0)
202 		goto error_2;
203 
204 	ret = rxrpc_sock_set_security_keyring(socket->sk, rxperf_sec_keyring);
205 
206 	ret = kernel_bind(socket, (struct sockaddr *)&srx, sizeof(srx));
207 	if (ret < 0)
208 		goto error_2;
209 
210 	rxrpc_kernel_new_call_notification(socket, rxperf_rx_new_call,
211 					   rxperf_rx_discard_new_call);
212 
213 	ret = kernel_listen(socket, INT_MAX);
214 	if (ret < 0)
215 		goto error_2;
216 
217 	rxperf_socket = socket;
218 	rxperf_charge_preallocation(&rxperf_charge_preallocation_work);
219 	return 0;
220 
221 error_2:
222 	sock_release(socket);
223 error_1:
224 	pr_err("Can't set up rxperf socket: %d\n", ret);
225 	return ret;
226 }
227 
228 /*
229  * close the rxrpc socket rxperf was using
230  */
231 static void rxperf_close_socket(void)
232 {
233 	kernel_listen(rxperf_socket, 0);
234 	kernel_sock_shutdown(rxperf_socket, SHUT_RDWR);
235 	flush_workqueue(rxperf_workqueue);
236 	sock_release(rxperf_socket);
237 }
238 
239 /*
240  * Log remote abort codes that indicate that we have a protocol disagreement
241  * with the server.
242  */
243 static void rxperf_log_error(struct rxperf_call *call, s32 remote_abort)
244 {
245 	static int max = 0;
246 	const char *msg;
247 	int m;
248 
249 	switch (remote_abort) {
250 	case RX_EOF:		 msg = "unexpected EOF";	break;
251 	case RXGEN_CC_MARSHAL:	 msg = "client marshalling";	break;
252 	case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling";	break;
253 	case RXGEN_SS_MARSHAL:	 msg = "server marshalling";	break;
254 	case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling";	break;
255 	case RXGEN_DECODE:	 msg = "opcode decode";		break;
256 	case RXGEN_SS_XDRFREE:	 msg = "server XDR cleanup";	break;
257 	case RXGEN_CC_XDRFREE:	 msg = "client XDR cleanup";	break;
258 	case -32:		 msg = "insufficient data";	break;
259 	default:
260 		return;
261 	}
262 
263 	m = max;
264 	if (m < 3) {
265 		max = m + 1;
266 		pr_info("Peer reported %s failure on %s\n", msg, call->type);
267 	}
268 }
269 
270 /*
271  * deliver messages to a call
272  */
273 static void rxperf_deliver_to_call(struct work_struct *work)
274 {
275 	struct rxperf_call *call = container_of(work, struct rxperf_call, work);
276 	enum rxperf_call_state state;
277 	u32 abort_code, remote_abort = 0;
278 	int ret = 0;
279 
280 	if (call->state == RXPERF_CALL_COMPLETE)
281 		return;
282 
283 	while (state = call->state,
284 	       state == RXPERF_CALL_SV_AWAIT_PARAMS ||
285 	       state == RXPERF_CALL_SV_AWAIT_REQUEST ||
286 	       state == RXPERF_CALL_SV_AWAIT_ACK
287 	       ) {
288 		if (state == RXPERF_CALL_SV_AWAIT_ACK) {
289 			if (!rxrpc_kernel_check_life(rxperf_socket, call->rxcall))
290 				goto call_complete;
291 			return;
292 		}
293 
294 		ret = call->deliver(call);
295 		if (ret == 0)
296 			ret = rxperf_process_call(call);
297 
298 		switch (ret) {
299 		case 0:
300 			continue;
301 		case -EINPROGRESS:
302 		case -EAGAIN:
303 			return;
304 		case -ECONNABORTED:
305 			rxperf_log_error(call, call->abort_code);
306 			goto call_complete;
307 		case -EOPNOTSUPP:
308 			abort_code = RXGEN_OPCODE;
309 			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
310 						abort_code, ret, "GOP");
311 			goto call_complete;
312 		case -ENOTSUPP:
313 			abort_code = RX_USER_ABORT;
314 			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
315 						abort_code, ret, "GUA");
316 			goto call_complete;
317 		case -EIO:
318 			pr_err("Call %u in bad state %u\n",
319 			       call->debug_id, call->state);
320 			fallthrough;
321 		case -ENODATA:
322 		case -EBADMSG:
323 		case -EMSGSIZE:
324 		case -ENOMEM:
325 		case -EFAULT:
326 			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
327 						RXGEN_SS_UNMARSHAL, ret, "GUM");
328 			goto call_complete;
329 		default:
330 			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
331 						RX_CALL_DEAD, ret, "GER");
332 			goto call_complete;
333 		}
334 	}
335 
336 call_complete:
337 	rxperf_set_call_complete(call, ret, remote_abort);
338 	/* The call may have been requeued */
339 	rxrpc_kernel_end_call(rxperf_socket, call->rxcall);
340 	cancel_work(&call->work);
341 	kfree(call);
342 }
343 
344 /*
345  * Extract a piece of data from the received data socket buffers.
346  */
347 static int rxperf_extract_data(struct rxperf_call *call, bool want_more)
348 {
349 	u32 remote_abort = 0;
350 	int ret;
351 
352 	ret = rxrpc_kernel_recv_data(rxperf_socket, call->rxcall, &call->iter,
353 				     &call->iov_len, want_more, &remote_abort,
354 				     &call->service_id);
355 	pr_debug("Extract i=%zu l=%zu m=%u ret=%d\n",
356 		 iov_iter_count(&call->iter), call->iov_len, want_more, ret);
357 	if (ret == 0 || ret == -EAGAIN)
358 		return ret;
359 
360 	if (ret == 1) {
361 		switch (call->state) {
362 		case RXPERF_CALL_SV_AWAIT_REQUEST:
363 			rxperf_set_call_state(call, RXPERF_CALL_SV_REPLYING);
364 			break;
365 		case RXPERF_CALL_COMPLETE:
366 			pr_debug("premature completion %d", call->error);
367 			return call->error;
368 		default:
369 			break;
370 		}
371 		return 0;
372 	}
373 
374 	rxperf_set_call_complete(call, ret, remote_abort);
375 	return ret;
376 }
377 
378 /*
379  * Grab the operation ID from an incoming manager call.
380  */
381 static int rxperf_deliver_param_block(struct rxperf_call *call)
382 {
383 	u32 version;
384 	int ret;
385 
386 	/* Extract the parameter block */
387 	ret = rxperf_extract_data(call, true);
388 	if (ret < 0)
389 		return ret;
390 
391 	version			= ntohl(call->params.version);
392 	call->operation_id	= ntohl(call->params.type);
393 	call->deliver		= rxperf_deliver_request;
394 
395 	if (version != RX_PERF_VERSION) {
396 		pr_info("Version mismatch %x\n", version);
397 		return -ENOTSUPP;
398 	}
399 
400 	switch (call->operation_id) {
401 	case RX_PERF_SEND:
402 		call->type = "send";
403 		call->reply_len = 0;
404 		call->iov_len = 4;	/* Expect req size */
405 		break;
406 	case RX_PERF_RECV:
407 		call->type = "recv";
408 		call->req_len = 0;
409 		call->iov_len = 4;	/* Expect reply size */
410 		break;
411 	case RX_PERF_RPC:
412 		call->type = "rpc";
413 		call->iov_len = 8;	/* Expect req size and reply size */
414 		break;
415 	case RX_PERF_FILE:
416 		call->type = "file";
417 		fallthrough;
418 	default:
419 		return -EOPNOTSUPP;
420 	}
421 
422 	rxperf_set_call_state(call, RXPERF_CALL_SV_AWAIT_REQUEST);
423 	return call->deliver(call);
424 }
425 
426 /*
427  * Deliver the request data.
428  */
429 static int rxperf_deliver_request(struct rxperf_call *call)
430 {
431 	int ret;
432 
433 	switch (call->unmarshal) {
434 	case 0:
435 		call->kvec[0].iov_len	= call->iov_len;
436 		call->kvec[0].iov_base	= call->tmp;
437 		iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
438 		call->unmarshal++;
439 		fallthrough;
440 	case 1:
441 		ret = rxperf_extract_data(call, true);
442 		if (ret < 0)
443 			return ret;
444 
445 		switch (call->operation_id) {
446 		case RX_PERF_SEND:
447 			call->type = "send";
448 			call->req_len	= ntohl(call->tmp[0]);
449 			call->reply_len	= 0;
450 			break;
451 		case RX_PERF_RECV:
452 			call->type = "recv";
453 			call->req_len = 0;
454 			call->reply_len	= ntohl(call->tmp[0]);
455 			break;
456 		case RX_PERF_RPC:
457 			call->type = "rpc";
458 			call->req_len	= ntohl(call->tmp[0]);
459 			call->reply_len	= ntohl(call->tmp[1]);
460 			break;
461 		default:
462 			pr_info("Can't parse extra params\n");
463 			return -EIO;
464 		}
465 
466 		pr_debug("CALL op=%s rq=%zx rp=%zx\n",
467 			 call->type, call->req_len, call->reply_len);
468 
469 		call->iov_len = call->req_len;
470 		iov_iter_discard(&call->iter, READ, call->req_len);
471 		call->unmarshal++;
472 		fallthrough;
473 	case 2:
474 		ret = rxperf_extract_data(call, false);
475 		if (ret < 0)
476 			return ret;
477 		call->unmarshal++;
478 		fallthrough;
479 	default:
480 		return 0;
481 	}
482 }
483 
484 /*
485  * Process a call for which we've received the request.
486  */
487 static int rxperf_process_call(struct rxperf_call *call)
488 {
489 	struct msghdr msg = {};
490 	struct bio_vec bv[1];
491 	struct kvec iov[1];
492 	ssize_t n;
493 	size_t reply_len = call->reply_len, len;
494 
495 	rxrpc_kernel_set_tx_length(rxperf_socket, call->rxcall,
496 				   reply_len + sizeof(rxperf_magic_cookie));
497 
498 	while (reply_len > 0) {
499 		len = min_t(size_t, reply_len, PAGE_SIZE);
500 		bv[0].bv_page	= ZERO_PAGE(0);
501 		bv[0].bv_offset	= 0;
502 		bv[0].bv_len	= len;
503 		iov_iter_bvec(&msg.msg_iter, WRITE, bv, 1, len);
504 		msg.msg_flags = MSG_MORE;
505 		n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg,
506 					   len, rxperf_notify_end_reply_tx);
507 		if (n < 0)
508 			return n;
509 		if (n == 0)
510 			return -EIO;
511 		reply_len -= n;
512 	}
513 
514 	len = sizeof(rxperf_magic_cookie);
515 	iov[0].iov_base	= (void *)rxperf_magic_cookie;
516 	iov[0].iov_len	= len;
517 	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
518 	msg.msg_flags = 0;
519 	n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg, len,
520 				   rxperf_notify_end_reply_tx);
521 	if (n >= 0)
522 		return 0; /* Success */
523 
524 	if (n == -ENOMEM)
525 		rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
526 					RXGEN_SS_MARSHAL, -ENOMEM, "GOM");
527 	return n;
528 }
529 
530 /*
531  * Add a key to the security keyring.
532  */
533 static int rxperf_add_key(struct key *keyring)
534 {
535 	key_ref_t kref;
536 	int ret;
537 
538 	kref = key_create_or_update(make_key_ref(keyring, true),
539 				    "rxrpc_s",
540 				    __stringify(RX_PERF_SERVICE) ":2",
541 				    secret,
542 				    sizeof(secret),
543 				    KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH
544 				    | KEY_USR_VIEW,
545 				    KEY_ALLOC_NOT_IN_QUOTA);
546 
547 	if (IS_ERR(kref)) {
548 		pr_err("Can't allocate rxperf server key: %ld\n", PTR_ERR(kref));
549 		return PTR_ERR(kref);
550 	}
551 
552 	ret = key_link(keyring, key_ref_to_ptr(kref));
553 	if (ret < 0)
554 		pr_err("Can't link rxperf server key: %d\n", ret);
555 	key_ref_put(kref);
556 	return ret;
557 }
558 
559 /*
560  * Initialise the rxperf server.
561  */
562 static int __init rxperf_init(void)
563 {
564 	struct key *keyring;
565 	int ret = -ENOMEM;
566 
567 	pr_info("Server registering\n");
568 
569 	rxperf_workqueue = alloc_workqueue("rxperf", 0, 0);
570 	if (!rxperf_workqueue)
571 		goto error_workqueue;
572 
573 	keyring = keyring_alloc("rxperf_server",
574 				GLOBAL_ROOT_UID, GLOBAL_ROOT_GID, current_cred(),
575 				KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH |
576 				KEY_POS_WRITE |
577 				KEY_USR_VIEW | KEY_USR_READ | KEY_USR_SEARCH |
578 				KEY_USR_WRITE |
579 				KEY_OTH_VIEW | KEY_OTH_READ | KEY_OTH_SEARCH,
580 				KEY_ALLOC_NOT_IN_QUOTA,
581 				NULL, NULL);
582 	if (IS_ERR(keyring)) {
583 		pr_err("Can't allocate rxperf server keyring: %ld\n",
584 		       PTR_ERR(keyring));
585 		goto error_keyring;
586 	}
587 	rxperf_sec_keyring = keyring;
588 	ret = rxperf_add_key(keyring);
589 	if (ret < 0)
590 		goto error_key;
591 
592 	ret = rxperf_open_socket();
593 	if (ret < 0)
594 		goto error_socket;
595 	return 0;
596 
597 error_socket:
598 error_key:
599 	key_put(rxperf_sec_keyring);
600 error_keyring:
601 	destroy_workqueue(rxperf_workqueue);
602 	rcu_barrier();
603 error_workqueue:
604 	pr_err("Failed to register: %d\n", ret);
605 	return ret;
606 }
607 late_initcall(rxperf_init); /* Must be called after net/ to create socket */
608 
609 static void __exit rxperf_exit(void)
610 {
611 	pr_info("Server unregistering.\n");
612 
613 	rxperf_close_socket();
614 	key_put(rxperf_sec_keyring);
615 	destroy_workqueue(rxperf_workqueue);
616 	rcu_barrier();
617 }
618 module_exit(rxperf_exit);
619 
620