1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* In-kernel rxperf server for testing purposes. 3 * 4 * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) "rxperf: " fmt 9 #include <linux/module.h> 10 #include <linux/slab.h> 11 #include <net/sock.h> 12 #include <net/af_rxrpc.h> 13 #define RXRPC_TRACE_ONLY_DEFINE_ENUMS 14 #include <trace/events/rxrpc.h> 15 16 MODULE_DESCRIPTION("rxperf test server (afs)"); 17 MODULE_AUTHOR("Red Hat, Inc."); 18 MODULE_LICENSE("GPL"); 19 20 #define RXPERF_PORT 7009 21 #define RX_PERF_SERVICE 147 22 #define RX_PERF_VERSION 3 23 #define RX_PERF_SEND 0 24 #define RX_PERF_RECV 1 25 #define RX_PERF_RPC 3 26 #define RX_PERF_FILE 4 27 #define RX_PERF_MAGIC_COOKIE 0x4711 28 29 struct rxperf_proto_params { 30 __be32 version; 31 __be32 type; 32 __be32 rsize; 33 __be32 wsize; 34 } __packed; 35 36 static const u8 rxperf_magic_cookie[] = { 0x00, 0x00, 0x47, 0x11 }; 37 static const u8 secret[8] = { 0xa7, 0x83, 0x8a, 0xcb, 0xc7, 0x83, 0xec, 0x94 }; 38 39 enum rxperf_call_state { 40 RXPERF_CALL_SV_AWAIT_PARAMS, /* Server: Awaiting parameter block */ 41 RXPERF_CALL_SV_AWAIT_REQUEST, /* Server: Awaiting request data */ 42 RXPERF_CALL_SV_REPLYING, /* Server: Replying */ 43 RXPERF_CALL_SV_AWAIT_ACK, /* Server: Awaiting final ACK */ 44 RXPERF_CALL_COMPLETE, /* Completed or failed */ 45 }; 46 47 struct rxperf_call { 48 struct rxrpc_call *rxcall; 49 struct iov_iter iter; 50 struct kvec kvec[1]; 51 struct work_struct work; 52 const char *type; 53 size_t iov_len; 54 size_t req_len; /* Size of request blob */ 55 size_t reply_len; /* Size of reply blob */ 56 unsigned int debug_id; 57 unsigned int operation_id; 58 struct rxperf_proto_params params; 59 __be32 tmp[2]; 60 s32 abort_code; 61 enum rxperf_call_state state; 62 short error; 63 unsigned short unmarshal; 64 u16 service_id; 65 int (*deliver)(struct rxperf_call *call); 66 void (*processor)(struct work_struct *work); 67 }; 68 69 static struct socket *rxperf_socket; 70 static struct key *rxperf_sec_keyring; /* Ring of security/crypto keys */ 71 static struct workqueue_struct *rxperf_workqueue; 72 73 static void rxperf_deliver_to_call(struct work_struct *work); 74 static int rxperf_deliver_param_block(struct rxperf_call *call); 75 static int rxperf_deliver_request(struct rxperf_call *call); 76 static int rxperf_process_call(struct rxperf_call *call); 77 static void rxperf_charge_preallocation(struct work_struct *work); 78 79 static DECLARE_WORK(rxperf_charge_preallocation_work, 80 rxperf_charge_preallocation); 81 82 static inline void rxperf_set_call_state(struct rxperf_call *call, 83 enum rxperf_call_state to) 84 { 85 call->state = to; 86 } 87 88 static inline void rxperf_set_call_complete(struct rxperf_call *call, 89 int error, s32 remote_abort) 90 { 91 if (call->state != RXPERF_CALL_COMPLETE) { 92 call->abort_code = remote_abort; 93 call->error = error; 94 call->state = RXPERF_CALL_COMPLETE; 95 } 96 } 97 98 static void rxperf_rx_discard_new_call(struct rxrpc_call *rxcall, 99 unsigned long user_call_ID) 100 { 101 kfree((struct rxperf_call *)user_call_ID); 102 } 103 104 static void rxperf_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall, 105 unsigned long user_call_ID) 106 { 107 queue_work(rxperf_workqueue, &rxperf_charge_preallocation_work); 108 } 109 110 static void rxperf_queue_call_work(struct rxperf_call *call) 111 { 112 queue_work(rxperf_workqueue, &call->work); 113 } 114 115 static void rxperf_notify_rx(struct sock *sk, struct rxrpc_call *rxcall, 116 unsigned long call_user_ID) 117 { 118 struct rxperf_call *call = (struct rxperf_call *)call_user_ID; 119 120 if (call->state != RXPERF_CALL_COMPLETE) 121 rxperf_queue_call_work(call); 122 } 123 124 static void rxperf_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID) 125 { 126 struct rxperf_call *call = (struct rxperf_call *)user_call_ID; 127 128 call->rxcall = rxcall; 129 } 130 131 static void rxperf_notify_end_reply_tx(struct sock *sock, 132 struct rxrpc_call *rxcall, 133 unsigned long call_user_ID) 134 { 135 rxperf_set_call_state((struct rxperf_call *)call_user_ID, 136 RXPERF_CALL_SV_AWAIT_ACK); 137 } 138 139 /* 140 * Charge the incoming call preallocation. 141 */ 142 static void rxperf_charge_preallocation(struct work_struct *work) 143 { 144 struct rxperf_call *call; 145 146 for (;;) { 147 call = kzalloc(sizeof(*call), GFP_KERNEL); 148 if (!call) 149 break; 150 151 call->type = "unset"; 152 call->debug_id = atomic_inc_return(&rxrpc_debug_id); 153 call->deliver = rxperf_deliver_param_block; 154 call->state = RXPERF_CALL_SV_AWAIT_PARAMS; 155 call->service_id = RX_PERF_SERVICE; 156 call->iov_len = sizeof(call->params); 157 call->kvec[0].iov_len = sizeof(call->params); 158 call->kvec[0].iov_base = &call->params; 159 iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len); 160 INIT_WORK(&call->work, rxperf_deliver_to_call); 161 162 if (rxrpc_kernel_charge_accept(rxperf_socket, 163 rxperf_notify_rx, 164 rxperf_rx_attach, 165 (unsigned long)call, 166 GFP_KERNEL, 167 call->debug_id) < 0) 168 break; 169 call = NULL; 170 } 171 172 kfree(call); 173 } 174 175 /* 176 * Open an rxrpc socket and bind it to be a server for callback notifications 177 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT 178 */ 179 static int rxperf_open_socket(void) 180 { 181 struct sockaddr_rxrpc srx; 182 struct socket *socket; 183 int ret; 184 185 ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6, 186 &socket); 187 if (ret < 0) 188 goto error_1; 189 190 socket->sk->sk_allocation = GFP_NOFS; 191 192 /* bind the callback manager's address to make this a server socket */ 193 memset(&srx, 0, sizeof(srx)); 194 srx.srx_family = AF_RXRPC; 195 srx.srx_service = RX_PERF_SERVICE; 196 srx.transport_type = SOCK_DGRAM; 197 srx.transport_len = sizeof(srx.transport.sin6); 198 srx.transport.sin6.sin6_family = AF_INET6; 199 srx.transport.sin6.sin6_port = htons(RXPERF_PORT); 200 201 ret = rxrpc_sock_set_min_security_level(socket->sk, 202 RXRPC_SECURITY_ENCRYPT); 203 if (ret < 0) 204 goto error_2; 205 206 ret = rxrpc_sock_set_security_keyring(socket->sk, rxperf_sec_keyring); 207 208 ret = kernel_bind(socket, (struct sockaddr *)&srx, sizeof(srx)); 209 if (ret < 0) 210 goto error_2; 211 212 rxrpc_kernel_new_call_notification(socket, rxperf_rx_new_call, 213 rxperf_rx_discard_new_call); 214 215 ret = kernel_listen(socket, INT_MAX); 216 if (ret < 0) 217 goto error_2; 218 219 rxperf_socket = socket; 220 rxperf_charge_preallocation(&rxperf_charge_preallocation_work); 221 return 0; 222 223 error_2: 224 sock_release(socket); 225 error_1: 226 pr_err("Can't set up rxperf socket: %d\n", ret); 227 return ret; 228 } 229 230 /* 231 * close the rxrpc socket rxperf was using 232 */ 233 static void rxperf_close_socket(void) 234 { 235 kernel_listen(rxperf_socket, 0); 236 kernel_sock_shutdown(rxperf_socket, SHUT_RDWR); 237 flush_workqueue(rxperf_workqueue); 238 sock_release(rxperf_socket); 239 } 240 241 /* 242 * Log remote abort codes that indicate that we have a protocol disagreement 243 * with the server. 244 */ 245 static void rxperf_log_error(struct rxperf_call *call, s32 remote_abort) 246 { 247 static int max = 0; 248 const char *msg; 249 int m; 250 251 switch (remote_abort) { 252 case RX_EOF: msg = "unexpected EOF"; break; 253 case RXGEN_CC_MARSHAL: msg = "client marshalling"; break; 254 case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling"; break; 255 case RXGEN_SS_MARSHAL: msg = "server marshalling"; break; 256 case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling"; break; 257 case RXGEN_DECODE: msg = "opcode decode"; break; 258 case RXGEN_SS_XDRFREE: msg = "server XDR cleanup"; break; 259 case RXGEN_CC_XDRFREE: msg = "client XDR cleanup"; break; 260 case -32: msg = "insufficient data"; break; 261 default: 262 return; 263 } 264 265 m = max; 266 if (m < 3) { 267 max = m + 1; 268 pr_info("Peer reported %s failure on %s\n", msg, call->type); 269 } 270 } 271 272 /* 273 * deliver messages to a call 274 */ 275 static void rxperf_deliver_to_call(struct work_struct *work) 276 { 277 struct rxperf_call *call = container_of(work, struct rxperf_call, work); 278 enum rxperf_call_state state; 279 u32 abort_code, remote_abort = 0; 280 int ret = 0; 281 282 if (call->state == RXPERF_CALL_COMPLETE) 283 return; 284 285 while (state = call->state, 286 state == RXPERF_CALL_SV_AWAIT_PARAMS || 287 state == RXPERF_CALL_SV_AWAIT_REQUEST || 288 state == RXPERF_CALL_SV_AWAIT_ACK 289 ) { 290 if (state == RXPERF_CALL_SV_AWAIT_ACK) { 291 if (!rxrpc_kernel_check_life(rxperf_socket, call->rxcall)) 292 goto call_complete; 293 return; 294 } 295 296 ret = call->deliver(call); 297 if (ret == 0) 298 ret = rxperf_process_call(call); 299 300 switch (ret) { 301 case 0: 302 continue; 303 case -EINPROGRESS: 304 case -EAGAIN: 305 return; 306 case -ECONNABORTED: 307 rxperf_log_error(call, call->abort_code); 308 goto call_complete; 309 case -EOPNOTSUPP: 310 abort_code = RXGEN_OPCODE; 311 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 312 abort_code, ret, 313 rxperf_abort_op_not_supported); 314 goto call_complete; 315 case -ENOTSUPP: 316 abort_code = RX_USER_ABORT; 317 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 318 abort_code, ret, 319 rxperf_abort_op_not_supported); 320 goto call_complete; 321 case -EIO: 322 pr_err("Call %u in bad state %u\n", 323 call->debug_id, call->state); 324 fallthrough; 325 case -ENODATA: 326 case -EBADMSG: 327 case -EMSGSIZE: 328 case -ENOMEM: 329 case -EFAULT: 330 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 331 RXGEN_SS_UNMARSHAL, ret, 332 rxperf_abort_unmarshal_error); 333 goto call_complete; 334 default: 335 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 336 RX_CALL_DEAD, ret, 337 rxperf_abort_general_error); 338 goto call_complete; 339 } 340 } 341 342 call_complete: 343 rxperf_set_call_complete(call, ret, remote_abort); 344 /* The call may have been requeued */ 345 rxrpc_kernel_end_call(rxperf_socket, call->rxcall); 346 cancel_work(&call->work); 347 kfree(call); 348 } 349 350 /* 351 * Extract a piece of data from the received data socket buffers. 352 */ 353 static int rxperf_extract_data(struct rxperf_call *call, bool want_more) 354 { 355 u32 remote_abort = 0; 356 int ret; 357 358 ret = rxrpc_kernel_recv_data(rxperf_socket, call->rxcall, &call->iter, 359 &call->iov_len, want_more, &remote_abort, 360 &call->service_id); 361 pr_debug("Extract i=%zu l=%zu m=%u ret=%d\n", 362 iov_iter_count(&call->iter), call->iov_len, want_more, ret); 363 if (ret == 0 || ret == -EAGAIN) 364 return ret; 365 366 if (ret == 1) { 367 switch (call->state) { 368 case RXPERF_CALL_SV_AWAIT_REQUEST: 369 rxperf_set_call_state(call, RXPERF_CALL_SV_REPLYING); 370 break; 371 case RXPERF_CALL_COMPLETE: 372 pr_debug("premature completion %d", call->error); 373 return call->error; 374 default: 375 break; 376 } 377 return 0; 378 } 379 380 rxperf_set_call_complete(call, ret, remote_abort); 381 return ret; 382 } 383 384 /* 385 * Grab the operation ID from an incoming manager call. 386 */ 387 static int rxperf_deliver_param_block(struct rxperf_call *call) 388 { 389 u32 version; 390 int ret; 391 392 /* Extract the parameter block */ 393 ret = rxperf_extract_data(call, true); 394 if (ret < 0) 395 return ret; 396 397 version = ntohl(call->params.version); 398 call->operation_id = ntohl(call->params.type); 399 call->deliver = rxperf_deliver_request; 400 401 if (version != RX_PERF_VERSION) { 402 pr_info("Version mismatch %x\n", version); 403 return -ENOTSUPP; 404 } 405 406 switch (call->operation_id) { 407 case RX_PERF_SEND: 408 call->type = "send"; 409 call->reply_len = 0; 410 call->iov_len = 4; /* Expect req size */ 411 break; 412 case RX_PERF_RECV: 413 call->type = "recv"; 414 call->req_len = 0; 415 call->iov_len = 4; /* Expect reply size */ 416 break; 417 case RX_PERF_RPC: 418 call->type = "rpc"; 419 call->iov_len = 8; /* Expect req size and reply size */ 420 break; 421 case RX_PERF_FILE: 422 call->type = "file"; 423 fallthrough; 424 default: 425 return -EOPNOTSUPP; 426 } 427 428 rxperf_set_call_state(call, RXPERF_CALL_SV_AWAIT_REQUEST); 429 return call->deliver(call); 430 } 431 432 /* 433 * Deliver the request data. 434 */ 435 static int rxperf_deliver_request(struct rxperf_call *call) 436 { 437 int ret; 438 439 switch (call->unmarshal) { 440 case 0: 441 call->kvec[0].iov_len = call->iov_len; 442 call->kvec[0].iov_base = call->tmp; 443 iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len); 444 call->unmarshal++; 445 fallthrough; 446 case 1: 447 ret = rxperf_extract_data(call, true); 448 if (ret < 0) 449 return ret; 450 451 switch (call->operation_id) { 452 case RX_PERF_SEND: 453 call->type = "send"; 454 call->req_len = ntohl(call->tmp[0]); 455 call->reply_len = 0; 456 break; 457 case RX_PERF_RECV: 458 call->type = "recv"; 459 call->req_len = 0; 460 call->reply_len = ntohl(call->tmp[0]); 461 break; 462 case RX_PERF_RPC: 463 call->type = "rpc"; 464 call->req_len = ntohl(call->tmp[0]); 465 call->reply_len = ntohl(call->tmp[1]); 466 break; 467 default: 468 pr_info("Can't parse extra params\n"); 469 return -EIO; 470 } 471 472 pr_debug("CALL op=%s rq=%zx rp=%zx\n", 473 call->type, call->req_len, call->reply_len); 474 475 call->iov_len = call->req_len; 476 iov_iter_discard(&call->iter, READ, call->req_len); 477 call->unmarshal++; 478 fallthrough; 479 case 2: 480 ret = rxperf_extract_data(call, false); 481 if (ret < 0) 482 return ret; 483 call->unmarshal++; 484 fallthrough; 485 default: 486 return 0; 487 } 488 } 489 490 /* 491 * Process a call for which we've received the request. 492 */ 493 static int rxperf_process_call(struct rxperf_call *call) 494 { 495 struct msghdr msg = {}; 496 struct bio_vec bv[1]; 497 struct kvec iov[1]; 498 ssize_t n; 499 size_t reply_len = call->reply_len, len; 500 501 rxrpc_kernel_set_tx_length(rxperf_socket, call->rxcall, 502 reply_len + sizeof(rxperf_magic_cookie)); 503 504 while (reply_len > 0) { 505 len = min_t(size_t, reply_len, PAGE_SIZE); 506 bv[0].bv_page = ZERO_PAGE(0); 507 bv[0].bv_offset = 0; 508 bv[0].bv_len = len; 509 iov_iter_bvec(&msg.msg_iter, WRITE, bv, 1, len); 510 msg.msg_flags = MSG_MORE; 511 n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg, 512 len, rxperf_notify_end_reply_tx); 513 if (n < 0) 514 return n; 515 if (n == 0) 516 return -EIO; 517 reply_len -= n; 518 } 519 520 len = sizeof(rxperf_magic_cookie); 521 iov[0].iov_base = (void *)rxperf_magic_cookie; 522 iov[0].iov_len = len; 523 iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len); 524 msg.msg_flags = 0; 525 n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg, len, 526 rxperf_notify_end_reply_tx); 527 if (n >= 0) 528 return 0; /* Success */ 529 530 if (n == -ENOMEM) 531 rxrpc_kernel_abort_call(rxperf_socket, call->rxcall, 532 RXGEN_SS_MARSHAL, -ENOMEM, 533 rxperf_abort_oom); 534 return n; 535 } 536 537 /* 538 * Add a key to the security keyring. 539 */ 540 static int rxperf_add_key(struct key *keyring) 541 { 542 key_ref_t kref; 543 int ret; 544 545 kref = key_create_or_update(make_key_ref(keyring, true), 546 "rxrpc_s", 547 __stringify(RX_PERF_SERVICE) ":2", 548 secret, 549 sizeof(secret), 550 KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH 551 | KEY_USR_VIEW, 552 KEY_ALLOC_NOT_IN_QUOTA); 553 554 if (IS_ERR(kref)) { 555 pr_err("Can't allocate rxperf server key: %ld\n", PTR_ERR(kref)); 556 return PTR_ERR(kref); 557 } 558 559 ret = key_link(keyring, key_ref_to_ptr(kref)); 560 if (ret < 0) 561 pr_err("Can't link rxperf server key: %d\n", ret); 562 key_ref_put(kref); 563 return ret; 564 } 565 566 /* 567 * Initialise the rxperf server. 568 */ 569 static int __init rxperf_init(void) 570 { 571 struct key *keyring; 572 int ret = -ENOMEM; 573 574 pr_info("Server registering\n"); 575 576 rxperf_workqueue = alloc_workqueue("rxperf", 0, 0); 577 if (!rxperf_workqueue) 578 goto error_workqueue; 579 580 keyring = keyring_alloc("rxperf_server", 581 GLOBAL_ROOT_UID, GLOBAL_ROOT_GID, current_cred(), 582 KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH | 583 KEY_POS_WRITE | 584 KEY_USR_VIEW | KEY_USR_READ | KEY_USR_SEARCH | 585 KEY_USR_WRITE | 586 KEY_OTH_VIEW | KEY_OTH_READ | KEY_OTH_SEARCH, 587 KEY_ALLOC_NOT_IN_QUOTA, 588 NULL, NULL); 589 if (IS_ERR(keyring)) { 590 pr_err("Can't allocate rxperf server keyring: %ld\n", 591 PTR_ERR(keyring)); 592 goto error_keyring; 593 } 594 rxperf_sec_keyring = keyring; 595 ret = rxperf_add_key(keyring); 596 if (ret < 0) 597 goto error_key; 598 599 ret = rxperf_open_socket(); 600 if (ret < 0) 601 goto error_socket; 602 return 0; 603 604 error_socket: 605 error_key: 606 key_put(rxperf_sec_keyring); 607 error_keyring: 608 destroy_workqueue(rxperf_workqueue); 609 rcu_barrier(); 610 error_workqueue: 611 pr_err("Failed to register: %d\n", ret); 612 return ret; 613 } 614 late_initcall(rxperf_init); /* Must be called after net/ to create socket */ 615 616 static void __exit rxperf_exit(void) 617 { 618 pr_info("Server unregistering.\n"); 619 620 rxperf_close_socket(); 621 key_put(rxperf_sec_keyring); 622 destroy_workqueue(rxperf_workqueue); 623 rcu_barrier(); 624 } 625 module_exit(rxperf_exit); 626 627