1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * linux/net/sunrpc/svc.c 4 * 5 * High-level RPC service routines 6 * 7 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de> 8 * 9 * Multiple threads pools and NUMAisation 10 * Copyright (c) 2006 Silicon Graphics, Inc. 11 * by Greg Banks <gnb@melbourne.sgi.com> 12 */ 13 14 #include <linux/linkage.h> 15 #include <linux/sched/signal.h> 16 #include <linux/errno.h> 17 #include <linux/net.h> 18 #include <linux/in.h> 19 #include <linux/mm.h> 20 #include <linux/interrupt.h> 21 #include <linux/module.h> 22 #include <linux/kthread.h> 23 #include <linux/slab.h> 24 25 #include <linux/sunrpc/types.h> 26 #include <linux/sunrpc/xdr.h> 27 #include <linux/sunrpc/stats.h> 28 #include <linux/sunrpc/svcsock.h> 29 #include <linux/sunrpc/clnt.h> 30 #include <linux/sunrpc/bc_xprt.h> 31 32 #include <trace/events/sunrpc.h> 33 34 #include "fail.h" 35 36 #define RPCDBG_FACILITY RPCDBG_SVCDSP 37 38 static void svc_unregister(const struct svc_serv *serv, struct net *net); 39 40 #define SVC_POOL_DEFAULT SVC_POOL_GLOBAL 41 42 /* 43 * Mode for mapping cpus to pools. 44 */ 45 enum { 46 SVC_POOL_AUTO = -1, /* choose one of the others */ 47 SVC_POOL_GLOBAL, /* no mapping, just a single global pool 48 * (legacy & UP mode) */ 49 SVC_POOL_PERCPU, /* one pool per cpu */ 50 SVC_POOL_PERNODE /* one pool per numa node */ 51 }; 52 53 /* 54 * Structure for mapping cpus to pools and vice versa. 55 * Setup once during sunrpc initialisation. 56 */ 57 58 struct svc_pool_map { 59 int count; /* How many svc_servs use us */ 60 int mode; /* Note: int not enum to avoid 61 * warnings about "enumeration value 62 * not handled in switch" */ 63 unsigned int npools; 64 unsigned int *pool_to; /* maps pool id to cpu or node */ 65 unsigned int *to_pool; /* maps cpu or node to pool id */ 66 }; 67 68 static struct svc_pool_map svc_pool_map = { 69 .mode = SVC_POOL_DEFAULT 70 }; 71 72 static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */ 73 74 static int 75 __param_set_pool_mode(const char *val, struct svc_pool_map *m) 76 { 77 int err, mode; 78 79 mutex_lock(&svc_pool_map_mutex); 80 81 err = 0; 82 if (!strncmp(val, "auto", 4)) 83 mode = SVC_POOL_AUTO; 84 else if (!strncmp(val, "global", 6)) 85 mode = SVC_POOL_GLOBAL; 86 else if (!strncmp(val, "percpu", 6)) 87 mode = SVC_POOL_PERCPU; 88 else if (!strncmp(val, "pernode", 7)) 89 mode = SVC_POOL_PERNODE; 90 else 91 err = -EINVAL; 92 93 if (err) 94 goto out; 95 96 if (m->count == 0) 97 m->mode = mode; 98 else if (mode != m->mode) 99 err = -EBUSY; 100 out: 101 mutex_unlock(&svc_pool_map_mutex); 102 return err; 103 } 104 105 static int 106 param_set_pool_mode(const char *val, const struct kernel_param *kp) 107 { 108 struct svc_pool_map *m = kp->arg; 109 110 return __param_set_pool_mode(val, m); 111 } 112 113 int sunrpc_set_pool_mode(const char *val) 114 { 115 return __param_set_pool_mode(val, &svc_pool_map); 116 } 117 EXPORT_SYMBOL(sunrpc_set_pool_mode); 118 119 /** 120 * sunrpc_get_pool_mode - get the current pool_mode for the host 121 * @buf: where to write the current pool_mode 122 * @size: size of @buf 123 * 124 * Grab the current pool_mode from the svc_pool_map and write 125 * the resulting string to @buf. Returns the number of characters 126 * written to @buf (a'la snprintf()). 127 */ 128 int 129 sunrpc_get_pool_mode(char *buf, size_t size) 130 { 131 struct svc_pool_map *m = &svc_pool_map; 132 133 switch (m->mode) 134 { 135 case SVC_POOL_AUTO: 136 return snprintf(buf, size, "auto"); 137 case SVC_POOL_GLOBAL: 138 return snprintf(buf, size, "global"); 139 case SVC_POOL_PERCPU: 140 return snprintf(buf, size, "percpu"); 141 case SVC_POOL_PERNODE: 142 return snprintf(buf, size, "pernode"); 143 default: 144 return snprintf(buf, size, "%d", m->mode); 145 } 146 } 147 EXPORT_SYMBOL(sunrpc_get_pool_mode); 148 149 static int 150 param_get_pool_mode(char *buf, const struct kernel_param *kp) 151 { 152 char str[16]; 153 int len; 154 155 len = sunrpc_get_pool_mode(str, ARRAY_SIZE(str)); 156 157 /* Ensure we have room for newline and NUL */ 158 len = min_t(int, len, ARRAY_SIZE(str) - 2); 159 160 /* tack on the newline */ 161 str[len] = '\n'; 162 str[len + 1] = '\0'; 163 164 return sysfs_emit(buf, "%s", str); 165 } 166 167 module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode, 168 &svc_pool_map, 0644); 169 170 /* 171 * Detect best pool mapping mode heuristically, 172 * according to the machine's topology. 173 */ 174 static int 175 svc_pool_map_choose_mode(void) 176 { 177 unsigned int node; 178 179 if (nr_online_nodes > 1) { 180 /* 181 * Actually have multiple NUMA nodes, 182 * so split pools on NUMA node boundaries 183 */ 184 return SVC_POOL_PERNODE; 185 } 186 187 node = first_online_node; 188 if (nr_cpus_node(node) > 2) { 189 /* 190 * Non-trivial SMP, or CONFIG_NUMA on 191 * non-NUMA hardware, e.g. with a generic 192 * x86_64 kernel on Xeons. In this case we 193 * want to divide the pools on cpu boundaries. 194 */ 195 return SVC_POOL_PERCPU; 196 } 197 198 /* default: one global pool */ 199 return SVC_POOL_GLOBAL; 200 } 201 202 /* 203 * Allocate the to_pool[] and pool_to[] arrays. 204 * Returns 0 on success or an errno. 205 */ 206 static int 207 svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools) 208 { 209 m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL); 210 if (!m->to_pool) 211 goto fail; 212 m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL); 213 if (!m->pool_to) 214 goto fail_free; 215 216 return 0; 217 218 fail_free: 219 kfree(m->to_pool); 220 m->to_pool = NULL; 221 fail: 222 return -ENOMEM; 223 } 224 225 /* 226 * Initialise the pool map for SVC_POOL_PERCPU mode. 227 * Returns number of pools or <0 on error. 228 */ 229 static int 230 svc_pool_map_init_percpu(struct svc_pool_map *m) 231 { 232 unsigned int maxpools = nr_cpu_ids; 233 unsigned int pidx = 0; 234 unsigned int cpu; 235 int err; 236 237 err = svc_pool_map_alloc_arrays(m, maxpools); 238 if (err) 239 return err; 240 241 for_each_online_cpu(cpu) { 242 BUG_ON(pidx >= maxpools); 243 m->to_pool[cpu] = pidx; 244 m->pool_to[pidx] = cpu; 245 pidx++; 246 } 247 /* cpus brought online later all get mapped to pool0, sorry */ 248 249 return pidx; 250 }; 251 252 253 /* 254 * Initialise the pool map for SVC_POOL_PERNODE mode. 255 * Returns number of pools or <0 on error. 256 */ 257 static int 258 svc_pool_map_init_pernode(struct svc_pool_map *m) 259 { 260 unsigned int maxpools = nr_node_ids; 261 unsigned int pidx = 0; 262 unsigned int node; 263 int err; 264 265 err = svc_pool_map_alloc_arrays(m, maxpools); 266 if (err) 267 return err; 268 269 for_each_node_with_cpus(node) { 270 /* some architectures (e.g. SN2) have cpuless nodes */ 271 BUG_ON(pidx > maxpools); 272 m->to_pool[node] = pidx; 273 m->pool_to[pidx] = node; 274 pidx++; 275 } 276 /* nodes brought online later all get mapped to pool0, sorry */ 277 278 return pidx; 279 } 280 281 282 /* 283 * Add a reference to the global map of cpus to pools (and 284 * vice versa) if pools are in use. 285 * Initialise the map if we're the first user. 286 * Returns the number of pools. If this is '1', no reference 287 * was taken. 288 */ 289 static unsigned int 290 svc_pool_map_get(void) 291 { 292 struct svc_pool_map *m = &svc_pool_map; 293 int npools = -1; 294 295 mutex_lock(&svc_pool_map_mutex); 296 if (m->count++) { 297 mutex_unlock(&svc_pool_map_mutex); 298 return m->npools; 299 } 300 301 if (m->mode == SVC_POOL_AUTO) 302 m->mode = svc_pool_map_choose_mode(); 303 304 switch (m->mode) { 305 case SVC_POOL_PERCPU: 306 npools = svc_pool_map_init_percpu(m); 307 break; 308 case SVC_POOL_PERNODE: 309 npools = svc_pool_map_init_pernode(m); 310 break; 311 } 312 313 if (npools <= 0) { 314 /* default, or memory allocation failure */ 315 npools = 1; 316 m->mode = SVC_POOL_GLOBAL; 317 } 318 m->npools = npools; 319 mutex_unlock(&svc_pool_map_mutex); 320 return npools; 321 } 322 323 /* 324 * Drop a reference to the global map of cpus to pools. 325 * When the last reference is dropped, the map data is 326 * freed; this allows the sysadmin to change the pool. 327 */ 328 static void 329 svc_pool_map_put(void) 330 { 331 struct svc_pool_map *m = &svc_pool_map; 332 333 mutex_lock(&svc_pool_map_mutex); 334 if (!--m->count) { 335 kfree(m->to_pool); 336 m->to_pool = NULL; 337 kfree(m->pool_to); 338 m->pool_to = NULL; 339 m->npools = 0; 340 } 341 mutex_unlock(&svc_pool_map_mutex); 342 } 343 344 static int svc_pool_map_get_node(unsigned int pidx) 345 { 346 const struct svc_pool_map *m = &svc_pool_map; 347 348 if (m->count) { 349 if (m->mode == SVC_POOL_PERCPU) 350 return cpu_to_node(m->pool_to[pidx]); 351 if (m->mode == SVC_POOL_PERNODE) 352 return m->pool_to[pidx]; 353 } 354 return NUMA_NO_NODE; 355 } 356 /* 357 * Set the given thread's cpus_allowed mask so that it 358 * will only run on cpus in the given pool. 359 */ 360 static inline void 361 svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx) 362 { 363 struct svc_pool_map *m = &svc_pool_map; 364 unsigned int node = m->pool_to[pidx]; 365 366 /* 367 * The caller checks for sv_nrpools > 1, which 368 * implies that we've been initialized. 369 */ 370 WARN_ON_ONCE(m->count == 0); 371 if (m->count == 0) 372 return; 373 374 switch (m->mode) { 375 case SVC_POOL_PERCPU: 376 { 377 set_cpus_allowed_ptr(task, cpumask_of(node)); 378 break; 379 } 380 case SVC_POOL_PERNODE: 381 { 382 set_cpus_allowed_ptr(task, cpumask_of_node(node)); 383 break; 384 } 385 } 386 } 387 388 /** 389 * svc_pool_for_cpu - Select pool to run a thread on this cpu 390 * @serv: An RPC service 391 * 392 * Use the active CPU and the svc_pool_map's mode setting to 393 * select the svc thread pool to use. Once initialized, the 394 * svc_pool_map does not change. 395 * 396 * Return value: 397 * A pointer to an svc_pool 398 */ 399 struct svc_pool *svc_pool_for_cpu(struct svc_serv *serv) 400 { 401 struct svc_pool_map *m = &svc_pool_map; 402 int cpu = raw_smp_processor_id(); 403 unsigned int pidx = 0; 404 405 if (serv->sv_nrpools <= 1) 406 return serv->sv_pools; 407 408 switch (m->mode) { 409 case SVC_POOL_PERCPU: 410 pidx = m->to_pool[cpu]; 411 break; 412 case SVC_POOL_PERNODE: 413 pidx = m->to_pool[cpu_to_node(cpu)]; 414 break; 415 } 416 417 return &serv->sv_pools[pidx % serv->sv_nrpools]; 418 } 419 420 int svc_rpcb_setup(struct svc_serv *serv, struct net *net) 421 { 422 int err; 423 424 err = rpcb_create_local(net); 425 if (err) 426 return err; 427 428 /* Remove any stale portmap registrations */ 429 svc_unregister(serv, net); 430 return 0; 431 } 432 EXPORT_SYMBOL_GPL(svc_rpcb_setup); 433 434 void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net) 435 { 436 svc_unregister(serv, net); 437 rpcb_put_local(net); 438 } 439 EXPORT_SYMBOL_GPL(svc_rpcb_cleanup); 440 441 static int svc_uses_rpcbind(struct svc_serv *serv) 442 { 443 struct svc_program *progp; 444 unsigned int i; 445 446 for (progp = serv->sv_program; progp; progp = progp->pg_next) { 447 for (i = 0; i < progp->pg_nvers; i++) { 448 if (progp->pg_vers[i] == NULL) 449 continue; 450 if (!progp->pg_vers[i]->vs_hidden) 451 return 1; 452 } 453 } 454 455 return 0; 456 } 457 458 int svc_bind(struct svc_serv *serv, struct net *net) 459 { 460 if (!svc_uses_rpcbind(serv)) 461 return 0; 462 return svc_rpcb_setup(serv, net); 463 } 464 EXPORT_SYMBOL_GPL(svc_bind); 465 466 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 467 static void 468 __svc_init_bc(struct svc_serv *serv) 469 { 470 lwq_init(&serv->sv_cb_list); 471 } 472 #else 473 static void 474 __svc_init_bc(struct svc_serv *serv) 475 { 476 } 477 #endif 478 479 /* 480 * Create an RPC service 481 */ 482 static struct svc_serv * 483 __svc_create(struct svc_program *prog, struct svc_stat *stats, 484 unsigned int bufsize, int npools, int (*threadfn)(void *data)) 485 { 486 struct svc_serv *serv; 487 unsigned int vers; 488 unsigned int xdrsize; 489 unsigned int i; 490 491 if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL))) 492 return NULL; 493 serv->sv_name = prog->pg_name; 494 serv->sv_program = prog; 495 serv->sv_stats = stats; 496 if (bufsize > RPCSVC_MAXPAYLOAD) 497 bufsize = RPCSVC_MAXPAYLOAD; 498 serv->sv_max_payload = bufsize? bufsize : 4096; 499 serv->sv_max_mesg = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE); 500 serv->sv_threadfn = threadfn; 501 xdrsize = 0; 502 while (prog) { 503 prog->pg_lovers = prog->pg_nvers-1; 504 for (vers=0; vers<prog->pg_nvers ; vers++) 505 if (prog->pg_vers[vers]) { 506 prog->pg_hivers = vers; 507 if (prog->pg_lovers > vers) 508 prog->pg_lovers = vers; 509 if (prog->pg_vers[vers]->vs_xdrsize > xdrsize) 510 xdrsize = prog->pg_vers[vers]->vs_xdrsize; 511 } 512 prog = prog->pg_next; 513 } 514 serv->sv_xdrsize = xdrsize; 515 INIT_LIST_HEAD(&serv->sv_tempsocks); 516 INIT_LIST_HEAD(&serv->sv_permsocks); 517 timer_setup(&serv->sv_temptimer, NULL, 0); 518 spin_lock_init(&serv->sv_lock); 519 520 __svc_init_bc(serv); 521 522 serv->sv_nrpools = npools; 523 serv->sv_pools = 524 kcalloc(serv->sv_nrpools, sizeof(struct svc_pool), 525 GFP_KERNEL); 526 if (!serv->sv_pools) { 527 kfree(serv); 528 return NULL; 529 } 530 531 for (i = 0; i < serv->sv_nrpools; i++) { 532 struct svc_pool *pool = &serv->sv_pools[i]; 533 534 dprintk("svc: initialising pool %u for %s\n", 535 i, serv->sv_name); 536 537 pool->sp_id = i; 538 lwq_init(&pool->sp_xprts); 539 INIT_LIST_HEAD(&pool->sp_all_threads); 540 init_llist_head(&pool->sp_idle_threads); 541 542 percpu_counter_init(&pool->sp_messages_arrived, 0, GFP_KERNEL); 543 percpu_counter_init(&pool->sp_sockets_queued, 0, GFP_KERNEL); 544 percpu_counter_init(&pool->sp_threads_woken, 0, GFP_KERNEL); 545 } 546 547 return serv; 548 } 549 550 /** 551 * svc_create - Create an RPC service 552 * @prog: the RPC program the new service will handle 553 * @bufsize: maximum message size for @prog 554 * @threadfn: a function to service RPC requests for @prog 555 * 556 * Returns an instantiated struct svc_serv object or NULL. 557 */ 558 struct svc_serv *svc_create(struct svc_program *prog, unsigned int bufsize, 559 int (*threadfn)(void *data)) 560 { 561 return __svc_create(prog, NULL, bufsize, 1, threadfn); 562 } 563 EXPORT_SYMBOL_GPL(svc_create); 564 565 /** 566 * svc_create_pooled - Create an RPC service with pooled threads 567 * @prog: the RPC program the new service will handle 568 * @stats: the stats struct if desired 569 * @bufsize: maximum message size for @prog 570 * @threadfn: a function to service RPC requests for @prog 571 * 572 * Returns an instantiated struct svc_serv object or NULL. 573 */ 574 struct svc_serv *svc_create_pooled(struct svc_program *prog, 575 struct svc_stat *stats, 576 unsigned int bufsize, 577 int (*threadfn)(void *data)) 578 { 579 struct svc_serv *serv; 580 unsigned int npools = svc_pool_map_get(); 581 582 serv = __svc_create(prog, stats, bufsize, npools, threadfn); 583 if (!serv) 584 goto out_err; 585 serv->sv_is_pooled = true; 586 return serv; 587 out_err: 588 svc_pool_map_put(); 589 return NULL; 590 } 591 EXPORT_SYMBOL_GPL(svc_create_pooled); 592 593 /* 594 * Destroy an RPC service. Should be called with appropriate locking to 595 * protect sv_permsocks and sv_tempsocks. 596 */ 597 void 598 svc_destroy(struct svc_serv **servp) 599 { 600 struct svc_serv *serv = *servp; 601 unsigned int i; 602 603 *servp = NULL; 604 605 dprintk("svc: svc_destroy(%s)\n", serv->sv_program->pg_name); 606 timer_shutdown_sync(&serv->sv_temptimer); 607 608 /* 609 * Remaining transports at this point are not expected. 610 */ 611 WARN_ONCE(!list_empty(&serv->sv_permsocks), 612 "SVC: permsocks remain for %s\n", serv->sv_program->pg_name); 613 WARN_ONCE(!list_empty(&serv->sv_tempsocks), 614 "SVC: tempsocks remain for %s\n", serv->sv_program->pg_name); 615 616 cache_clean_deferred(serv); 617 618 if (serv->sv_is_pooled) 619 svc_pool_map_put(); 620 621 for (i = 0; i < serv->sv_nrpools; i++) { 622 struct svc_pool *pool = &serv->sv_pools[i]; 623 624 percpu_counter_destroy(&pool->sp_messages_arrived); 625 percpu_counter_destroy(&pool->sp_sockets_queued); 626 percpu_counter_destroy(&pool->sp_threads_woken); 627 } 628 kfree(serv->sv_pools); 629 kfree(serv); 630 } 631 EXPORT_SYMBOL_GPL(svc_destroy); 632 633 static bool 634 svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node) 635 { 636 unsigned long pages, ret; 637 638 /* bc_xprt uses fore channel allocated buffers */ 639 if (svc_is_backchannel(rqstp)) 640 return true; 641 642 pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply. 643 * We assume one is at most one page 644 */ 645 WARN_ON_ONCE(pages > RPCSVC_MAXPAGES); 646 if (pages > RPCSVC_MAXPAGES) 647 pages = RPCSVC_MAXPAGES; 648 649 ret = alloc_pages_bulk_array_node(GFP_KERNEL, node, pages, 650 rqstp->rq_pages); 651 return ret == pages; 652 } 653 654 /* 655 * Release an RPC server buffer 656 */ 657 static void 658 svc_release_buffer(struct svc_rqst *rqstp) 659 { 660 unsigned int i; 661 662 for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++) 663 if (rqstp->rq_pages[i]) 664 put_page(rqstp->rq_pages[i]); 665 } 666 667 struct svc_rqst * 668 svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node) 669 { 670 struct svc_rqst *rqstp; 671 672 rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node); 673 if (!rqstp) 674 return rqstp; 675 676 folio_batch_init(&rqstp->rq_fbatch); 677 678 rqstp->rq_server = serv; 679 rqstp->rq_pool = pool; 680 681 rqstp->rq_scratch_page = alloc_pages_node(node, GFP_KERNEL, 0); 682 if (!rqstp->rq_scratch_page) 683 goto out_enomem; 684 685 rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node); 686 if (!rqstp->rq_argp) 687 goto out_enomem; 688 689 rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node); 690 if (!rqstp->rq_resp) 691 goto out_enomem; 692 693 if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node)) 694 goto out_enomem; 695 696 return rqstp; 697 out_enomem: 698 svc_rqst_free(rqstp); 699 return NULL; 700 } 701 EXPORT_SYMBOL_GPL(svc_rqst_alloc); 702 703 static struct svc_rqst * 704 svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node) 705 { 706 struct svc_rqst *rqstp; 707 708 rqstp = svc_rqst_alloc(serv, pool, node); 709 if (!rqstp) 710 return ERR_PTR(-ENOMEM); 711 712 spin_lock_bh(&serv->sv_lock); 713 serv->sv_nrthreads += 1; 714 spin_unlock_bh(&serv->sv_lock); 715 716 atomic_inc(&pool->sp_nrthreads); 717 718 /* Protected by whatever lock the service uses when calling 719 * svc_set_num_threads() 720 */ 721 list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads); 722 723 return rqstp; 724 } 725 726 /** 727 * svc_pool_wake_idle_thread - Awaken an idle thread in @pool 728 * @pool: service thread pool 729 * 730 * Can be called from soft IRQ or process context. Finding an idle 731 * service thread and marking it BUSY is atomic with respect to 732 * other calls to svc_pool_wake_idle_thread(). 733 * 734 */ 735 void svc_pool_wake_idle_thread(struct svc_pool *pool) 736 { 737 struct svc_rqst *rqstp; 738 struct llist_node *ln; 739 740 rcu_read_lock(); 741 ln = READ_ONCE(pool->sp_idle_threads.first); 742 if (ln) { 743 rqstp = llist_entry(ln, struct svc_rqst, rq_idle); 744 WRITE_ONCE(rqstp->rq_qtime, ktime_get()); 745 if (!task_is_running(rqstp->rq_task)) { 746 wake_up_process(rqstp->rq_task); 747 trace_svc_wake_up(rqstp->rq_task->pid); 748 percpu_counter_inc(&pool->sp_threads_woken); 749 } 750 rcu_read_unlock(); 751 return; 752 } 753 rcu_read_unlock(); 754 755 } 756 EXPORT_SYMBOL_GPL(svc_pool_wake_idle_thread); 757 758 static struct svc_pool * 759 svc_pool_next(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state) 760 { 761 return pool ? pool : &serv->sv_pools[(*state)++ % serv->sv_nrpools]; 762 } 763 764 static struct svc_pool * 765 svc_pool_victim(struct svc_serv *serv, struct svc_pool *target_pool, 766 unsigned int *state) 767 { 768 struct svc_pool *pool; 769 unsigned int i; 770 771 retry: 772 pool = target_pool; 773 774 if (pool != NULL) { 775 if (atomic_inc_not_zero(&pool->sp_nrthreads)) 776 goto found_pool; 777 return NULL; 778 } else { 779 for (i = 0; i < serv->sv_nrpools; i++) { 780 pool = &serv->sv_pools[--(*state) % serv->sv_nrpools]; 781 if (atomic_inc_not_zero(&pool->sp_nrthreads)) 782 goto found_pool; 783 } 784 return NULL; 785 } 786 787 found_pool: 788 set_bit(SP_VICTIM_REMAINS, &pool->sp_flags); 789 set_bit(SP_NEED_VICTIM, &pool->sp_flags); 790 if (!atomic_dec_and_test(&pool->sp_nrthreads)) 791 return pool; 792 /* Nothing left in this pool any more */ 793 clear_bit(SP_NEED_VICTIM, &pool->sp_flags); 794 clear_bit(SP_VICTIM_REMAINS, &pool->sp_flags); 795 goto retry; 796 } 797 798 static int 799 svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) 800 { 801 struct svc_rqst *rqstp; 802 struct task_struct *task; 803 struct svc_pool *chosen_pool; 804 unsigned int state = serv->sv_nrthreads-1; 805 int node; 806 807 do { 808 nrservs--; 809 chosen_pool = svc_pool_next(serv, pool, &state); 810 node = svc_pool_map_get_node(chosen_pool->sp_id); 811 812 rqstp = svc_prepare_thread(serv, chosen_pool, node); 813 if (IS_ERR(rqstp)) 814 return PTR_ERR(rqstp); 815 task = kthread_create_on_node(serv->sv_threadfn, rqstp, 816 node, "%s", serv->sv_name); 817 if (IS_ERR(task)) { 818 svc_exit_thread(rqstp); 819 return PTR_ERR(task); 820 } 821 822 rqstp->rq_task = task; 823 if (serv->sv_nrpools > 1) 824 svc_pool_map_set_cpumask(task, chosen_pool->sp_id); 825 826 svc_sock_update_bufs(serv); 827 wake_up_process(task); 828 } while (nrservs > 0); 829 830 return 0; 831 } 832 833 static int 834 svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) 835 { 836 unsigned int state = serv->sv_nrthreads-1; 837 struct svc_pool *victim; 838 839 do { 840 victim = svc_pool_victim(serv, pool, &state); 841 if (!victim) 842 break; 843 svc_pool_wake_idle_thread(victim); 844 wait_on_bit(&victim->sp_flags, SP_VICTIM_REMAINS, 845 TASK_IDLE); 846 nrservs++; 847 } while (nrservs < 0); 848 return 0; 849 } 850 851 /** 852 * svc_set_num_threads - adjust number of threads per RPC service 853 * @serv: RPC service to adjust 854 * @pool: Specific pool from which to choose threads, or NULL 855 * @nrservs: New number of threads for @serv (0 or less means kill all threads) 856 * 857 * Create or destroy threads to make the number of threads for @serv the 858 * given number. If @pool is non-NULL, change only threads in that pool; 859 * otherwise, round-robin between all pools for @serv. @serv's 860 * sv_nrthreads is adjusted for each thread created or destroyed. 861 * 862 * Caller must ensure mutual exclusion between this and server startup or 863 * shutdown. 864 * 865 * Returns zero on success or a negative errno if an error occurred while 866 * starting a thread. 867 */ 868 int 869 svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) 870 { 871 if (!pool) 872 nrservs -= serv->sv_nrthreads; 873 else 874 nrservs -= atomic_read(&pool->sp_nrthreads); 875 876 if (nrservs > 0) 877 return svc_start_kthreads(serv, pool, nrservs); 878 if (nrservs < 0) 879 return svc_stop_kthreads(serv, pool, nrservs); 880 return 0; 881 } 882 EXPORT_SYMBOL_GPL(svc_set_num_threads); 883 884 /** 885 * svc_rqst_replace_page - Replace one page in rq_pages[] 886 * @rqstp: svc_rqst with pages to replace 887 * @page: replacement page 888 * 889 * When replacing a page in rq_pages, batch the release of the 890 * replaced pages to avoid hammering the page allocator. 891 * 892 * Return values: 893 * %true: page replaced 894 * %false: array bounds checking failed 895 */ 896 bool svc_rqst_replace_page(struct svc_rqst *rqstp, struct page *page) 897 { 898 struct page **begin = rqstp->rq_pages; 899 struct page **end = &rqstp->rq_pages[RPCSVC_MAXPAGES]; 900 901 if (unlikely(rqstp->rq_next_page < begin || rqstp->rq_next_page > end)) { 902 trace_svc_replace_page_err(rqstp); 903 return false; 904 } 905 906 if (*rqstp->rq_next_page) { 907 if (!folio_batch_add(&rqstp->rq_fbatch, 908 page_folio(*rqstp->rq_next_page))) 909 __folio_batch_release(&rqstp->rq_fbatch); 910 } 911 912 get_page(page); 913 *(rqstp->rq_next_page++) = page; 914 return true; 915 } 916 EXPORT_SYMBOL_GPL(svc_rqst_replace_page); 917 918 /** 919 * svc_rqst_release_pages - Release Reply buffer pages 920 * @rqstp: RPC transaction context 921 * 922 * Release response pages that might still be in flight after 923 * svc_send, and any spliced filesystem-owned pages. 924 */ 925 void svc_rqst_release_pages(struct svc_rqst *rqstp) 926 { 927 int i, count = rqstp->rq_next_page - rqstp->rq_respages; 928 929 if (count) { 930 release_pages(rqstp->rq_respages, count); 931 for (i = 0; i < count; i++) 932 rqstp->rq_respages[i] = NULL; 933 } 934 } 935 936 /* 937 * Called from a server thread as it's exiting. Caller must hold the "service 938 * mutex" for the service. 939 */ 940 void 941 svc_rqst_free(struct svc_rqst *rqstp) 942 { 943 folio_batch_release(&rqstp->rq_fbatch); 944 svc_release_buffer(rqstp); 945 if (rqstp->rq_scratch_page) 946 put_page(rqstp->rq_scratch_page); 947 kfree(rqstp->rq_resp); 948 kfree(rqstp->rq_argp); 949 kfree(rqstp->rq_auth_data); 950 kfree_rcu(rqstp, rq_rcu_head); 951 } 952 EXPORT_SYMBOL_GPL(svc_rqst_free); 953 954 void 955 svc_exit_thread(struct svc_rqst *rqstp) 956 { 957 struct svc_serv *serv = rqstp->rq_server; 958 struct svc_pool *pool = rqstp->rq_pool; 959 960 list_del_rcu(&rqstp->rq_all); 961 962 atomic_dec(&pool->sp_nrthreads); 963 964 spin_lock_bh(&serv->sv_lock); 965 serv->sv_nrthreads -= 1; 966 spin_unlock_bh(&serv->sv_lock); 967 svc_sock_update_bufs(serv); 968 969 svc_rqst_free(rqstp); 970 971 clear_and_wake_up_bit(SP_VICTIM_REMAINS, &pool->sp_flags); 972 } 973 EXPORT_SYMBOL_GPL(svc_exit_thread); 974 975 /* 976 * Register an "inet" protocol family netid with the local 977 * rpcbind daemon via an rpcbind v4 SET request. 978 * 979 * No netconfig infrastructure is available in the kernel, so 980 * we map IP_ protocol numbers to netids by hand. 981 * 982 * Returns zero on success; a negative errno value is returned 983 * if any error occurs. 984 */ 985 static int __svc_rpcb_register4(struct net *net, const u32 program, 986 const u32 version, 987 const unsigned short protocol, 988 const unsigned short port) 989 { 990 const struct sockaddr_in sin = { 991 .sin_family = AF_INET, 992 .sin_addr.s_addr = htonl(INADDR_ANY), 993 .sin_port = htons(port), 994 }; 995 const char *netid; 996 int error; 997 998 switch (protocol) { 999 case IPPROTO_UDP: 1000 netid = RPCBIND_NETID_UDP; 1001 break; 1002 case IPPROTO_TCP: 1003 netid = RPCBIND_NETID_TCP; 1004 break; 1005 default: 1006 return -ENOPROTOOPT; 1007 } 1008 1009 error = rpcb_v4_register(net, program, version, 1010 (const struct sockaddr *)&sin, netid); 1011 1012 /* 1013 * User space didn't support rpcbind v4, so retry this 1014 * registration request with the legacy rpcbind v2 protocol. 1015 */ 1016 if (error == -EPROTONOSUPPORT) 1017 error = rpcb_register(net, program, version, protocol, port); 1018 1019 return error; 1020 } 1021 1022 #if IS_ENABLED(CONFIG_IPV6) 1023 /* 1024 * Register an "inet6" protocol family netid with the local 1025 * rpcbind daemon via an rpcbind v4 SET request. 1026 * 1027 * No netconfig infrastructure is available in the kernel, so 1028 * we map IP_ protocol numbers to netids by hand. 1029 * 1030 * Returns zero on success; a negative errno value is returned 1031 * if any error occurs. 1032 */ 1033 static int __svc_rpcb_register6(struct net *net, const u32 program, 1034 const u32 version, 1035 const unsigned short protocol, 1036 const unsigned short port) 1037 { 1038 const struct sockaddr_in6 sin6 = { 1039 .sin6_family = AF_INET6, 1040 .sin6_addr = IN6ADDR_ANY_INIT, 1041 .sin6_port = htons(port), 1042 }; 1043 const char *netid; 1044 int error; 1045 1046 switch (protocol) { 1047 case IPPROTO_UDP: 1048 netid = RPCBIND_NETID_UDP6; 1049 break; 1050 case IPPROTO_TCP: 1051 netid = RPCBIND_NETID_TCP6; 1052 break; 1053 default: 1054 return -ENOPROTOOPT; 1055 } 1056 1057 error = rpcb_v4_register(net, program, version, 1058 (const struct sockaddr *)&sin6, netid); 1059 1060 /* 1061 * User space didn't support rpcbind version 4, so we won't 1062 * use a PF_INET6 listener. 1063 */ 1064 if (error == -EPROTONOSUPPORT) 1065 error = -EAFNOSUPPORT; 1066 1067 return error; 1068 } 1069 #endif /* IS_ENABLED(CONFIG_IPV6) */ 1070 1071 /* 1072 * Register a kernel RPC service via rpcbind version 4. 1073 * 1074 * Returns zero on success; a negative errno value is returned 1075 * if any error occurs. 1076 */ 1077 static int __svc_register(struct net *net, const char *progname, 1078 const u32 program, const u32 version, 1079 const int family, 1080 const unsigned short protocol, 1081 const unsigned short port) 1082 { 1083 int error = -EAFNOSUPPORT; 1084 1085 switch (family) { 1086 case PF_INET: 1087 error = __svc_rpcb_register4(net, program, version, 1088 protocol, port); 1089 break; 1090 #if IS_ENABLED(CONFIG_IPV6) 1091 case PF_INET6: 1092 error = __svc_rpcb_register6(net, program, version, 1093 protocol, port); 1094 #endif 1095 } 1096 1097 trace_svc_register(progname, version, family, protocol, port, error); 1098 return error; 1099 } 1100 1101 int svc_rpcbind_set_version(struct net *net, 1102 const struct svc_program *progp, 1103 u32 version, int family, 1104 unsigned short proto, 1105 unsigned short port) 1106 { 1107 return __svc_register(net, progp->pg_name, progp->pg_prog, 1108 version, family, proto, port); 1109 1110 } 1111 EXPORT_SYMBOL_GPL(svc_rpcbind_set_version); 1112 1113 int svc_generic_rpcbind_set(struct net *net, 1114 const struct svc_program *progp, 1115 u32 version, int family, 1116 unsigned short proto, 1117 unsigned short port) 1118 { 1119 const struct svc_version *vers = progp->pg_vers[version]; 1120 int error; 1121 1122 if (vers == NULL) 1123 return 0; 1124 1125 if (vers->vs_hidden) { 1126 trace_svc_noregister(progp->pg_name, version, proto, 1127 port, family, 0); 1128 return 0; 1129 } 1130 1131 /* 1132 * Don't register a UDP port if we need congestion 1133 * control. 1134 */ 1135 if (vers->vs_need_cong_ctrl && proto == IPPROTO_UDP) 1136 return 0; 1137 1138 error = svc_rpcbind_set_version(net, progp, version, 1139 family, proto, port); 1140 1141 return (vers->vs_rpcb_optnl) ? 0 : error; 1142 } 1143 EXPORT_SYMBOL_GPL(svc_generic_rpcbind_set); 1144 1145 /** 1146 * svc_register - register an RPC service with the local portmapper 1147 * @serv: svc_serv struct for the service to register 1148 * @net: net namespace for the service to register 1149 * @family: protocol family of service's listener socket 1150 * @proto: transport protocol number to advertise 1151 * @port: port to advertise 1152 * 1153 * Service is registered for any address in the passed-in protocol family 1154 */ 1155 int svc_register(const struct svc_serv *serv, struct net *net, 1156 const int family, const unsigned short proto, 1157 const unsigned short port) 1158 { 1159 struct svc_program *progp; 1160 unsigned int i; 1161 int error = 0; 1162 1163 WARN_ON_ONCE(proto == 0 && port == 0); 1164 if (proto == 0 && port == 0) 1165 return -EINVAL; 1166 1167 for (progp = serv->sv_program; progp; progp = progp->pg_next) { 1168 for (i = 0; i < progp->pg_nvers; i++) { 1169 1170 error = progp->pg_rpcbind_set(net, progp, i, 1171 family, proto, port); 1172 if (error < 0) { 1173 printk(KERN_WARNING "svc: failed to register " 1174 "%sv%u RPC service (errno %d).\n", 1175 progp->pg_name, i, -error); 1176 break; 1177 } 1178 } 1179 } 1180 1181 return error; 1182 } 1183 1184 /* 1185 * If user space is running rpcbind, it should take the v4 UNSET 1186 * and clear everything for this [program, version]. If user space 1187 * is running portmap, it will reject the v4 UNSET, but won't have 1188 * any "inet6" entries anyway. So a PMAP_UNSET should be sufficient 1189 * in this case to clear all existing entries for [program, version]. 1190 */ 1191 static void __svc_unregister(struct net *net, const u32 program, const u32 version, 1192 const char *progname) 1193 { 1194 int error; 1195 1196 error = rpcb_v4_register(net, program, version, NULL, ""); 1197 1198 /* 1199 * User space didn't support rpcbind v4, so retry this 1200 * request with the legacy rpcbind v2 protocol. 1201 */ 1202 if (error == -EPROTONOSUPPORT) 1203 error = rpcb_register(net, program, version, 0, 0); 1204 1205 trace_svc_unregister(progname, version, error); 1206 } 1207 1208 /* 1209 * All netids, bind addresses and ports registered for [program, version] 1210 * are removed from the local rpcbind database (if the service is not 1211 * hidden) to make way for a new instance of the service. 1212 * 1213 * The result of unregistration is reported via dprintk for those who want 1214 * verification of the result, but is otherwise not important. 1215 */ 1216 static void svc_unregister(const struct svc_serv *serv, struct net *net) 1217 { 1218 struct sighand_struct *sighand; 1219 struct svc_program *progp; 1220 unsigned long flags; 1221 unsigned int i; 1222 1223 clear_thread_flag(TIF_SIGPENDING); 1224 1225 for (progp = serv->sv_program; progp; progp = progp->pg_next) { 1226 for (i = 0; i < progp->pg_nvers; i++) { 1227 if (progp->pg_vers[i] == NULL) 1228 continue; 1229 if (progp->pg_vers[i]->vs_hidden) 1230 continue; 1231 __svc_unregister(net, progp->pg_prog, i, progp->pg_name); 1232 } 1233 } 1234 1235 rcu_read_lock(); 1236 sighand = rcu_dereference(current->sighand); 1237 spin_lock_irqsave(&sighand->siglock, flags); 1238 recalc_sigpending(); 1239 spin_unlock_irqrestore(&sighand->siglock, flags); 1240 rcu_read_unlock(); 1241 } 1242 1243 /* 1244 * dprintk the given error with the address of the client that caused it. 1245 */ 1246 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 1247 static __printf(2, 3) 1248 void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) 1249 { 1250 struct va_format vaf; 1251 va_list args; 1252 char buf[RPC_MAX_ADDRBUFLEN]; 1253 1254 va_start(args, fmt); 1255 1256 vaf.fmt = fmt; 1257 vaf.va = &args; 1258 1259 dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf); 1260 1261 va_end(args); 1262 } 1263 #else 1264 static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {} 1265 #endif 1266 1267 __be32 1268 svc_generic_init_request(struct svc_rqst *rqstp, 1269 const struct svc_program *progp, 1270 struct svc_process_info *ret) 1271 { 1272 const struct svc_version *versp = NULL; /* compiler food */ 1273 const struct svc_procedure *procp = NULL; 1274 1275 if (rqstp->rq_vers >= progp->pg_nvers ) 1276 goto err_bad_vers; 1277 versp = progp->pg_vers[rqstp->rq_vers]; 1278 if (!versp) 1279 goto err_bad_vers; 1280 1281 /* 1282 * Some protocol versions (namely NFSv4) require some form of 1283 * congestion control. (See RFC 7530 section 3.1 paragraph 2) 1284 * In other words, UDP is not allowed. We mark those when setting 1285 * up the svc_xprt, and verify that here. 1286 * 1287 * The spec is not very clear about what error should be returned 1288 * when someone tries to access a server that is listening on UDP 1289 * for lower versions. RPC_PROG_MISMATCH seems to be the closest 1290 * fit. 1291 */ 1292 if (versp->vs_need_cong_ctrl && rqstp->rq_xprt && 1293 !test_bit(XPT_CONG_CTRL, &rqstp->rq_xprt->xpt_flags)) 1294 goto err_bad_vers; 1295 1296 if (rqstp->rq_proc >= versp->vs_nproc) 1297 goto err_bad_proc; 1298 rqstp->rq_procinfo = procp = &versp->vs_proc[rqstp->rq_proc]; 1299 1300 /* Initialize storage for argp and resp */ 1301 memset(rqstp->rq_argp, 0, procp->pc_argzero); 1302 memset(rqstp->rq_resp, 0, procp->pc_ressize); 1303 1304 /* Bump per-procedure stats counter */ 1305 this_cpu_inc(versp->vs_count[rqstp->rq_proc]); 1306 1307 ret->dispatch = versp->vs_dispatch; 1308 return rpc_success; 1309 err_bad_vers: 1310 ret->mismatch.lovers = progp->pg_lovers; 1311 ret->mismatch.hivers = progp->pg_hivers; 1312 return rpc_prog_mismatch; 1313 err_bad_proc: 1314 return rpc_proc_unavail; 1315 } 1316 EXPORT_SYMBOL_GPL(svc_generic_init_request); 1317 1318 /* 1319 * Common routine for processing the RPC request. 1320 */ 1321 static int 1322 svc_process_common(struct svc_rqst *rqstp) 1323 { 1324 struct xdr_stream *xdr = &rqstp->rq_res_stream; 1325 struct svc_program *progp; 1326 const struct svc_procedure *procp = NULL; 1327 struct svc_serv *serv = rqstp->rq_server; 1328 struct svc_process_info process; 1329 enum svc_auth_status auth_res; 1330 unsigned int aoffset; 1331 int rc; 1332 __be32 *p; 1333 1334 /* Will be turned off only when NFSv4 Sessions are used */ 1335 set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags); 1336 clear_bit(RQ_DROPME, &rqstp->rq_flags); 1337 1338 /* Construct the first words of the reply: */ 1339 svcxdr_init_encode(rqstp); 1340 xdr_stream_encode_be32(xdr, rqstp->rq_xid); 1341 xdr_stream_encode_be32(xdr, rpc_reply); 1342 1343 p = xdr_inline_decode(&rqstp->rq_arg_stream, XDR_UNIT * 4); 1344 if (unlikely(!p)) 1345 goto err_short_len; 1346 if (*p++ != cpu_to_be32(RPC_VERSION)) 1347 goto err_bad_rpc; 1348 1349 xdr_stream_encode_be32(xdr, rpc_msg_accepted); 1350 1351 rqstp->rq_prog = be32_to_cpup(p++); 1352 rqstp->rq_vers = be32_to_cpup(p++); 1353 rqstp->rq_proc = be32_to_cpup(p); 1354 1355 for (progp = serv->sv_program; progp; progp = progp->pg_next) 1356 if (rqstp->rq_prog == progp->pg_prog) 1357 break; 1358 1359 /* 1360 * Decode auth data, and add verifier to reply buffer. 1361 * We do this before anything else in order to get a decent 1362 * auth verifier. 1363 */ 1364 auth_res = svc_authenticate(rqstp); 1365 /* Also give the program a chance to reject this call: */ 1366 if (auth_res == SVC_OK && progp) 1367 auth_res = progp->pg_authenticate(rqstp); 1368 trace_svc_authenticate(rqstp, auth_res); 1369 switch (auth_res) { 1370 case SVC_OK: 1371 break; 1372 case SVC_GARBAGE: 1373 goto err_garbage_args; 1374 case SVC_SYSERR: 1375 goto err_system_err; 1376 case SVC_DENIED: 1377 goto err_bad_auth; 1378 case SVC_CLOSE: 1379 goto close; 1380 case SVC_DROP: 1381 goto dropit; 1382 case SVC_COMPLETE: 1383 goto sendit; 1384 default: 1385 pr_warn_once("Unexpected svc_auth_status (%d)\n", auth_res); 1386 goto err_system_err; 1387 } 1388 1389 if (progp == NULL) 1390 goto err_bad_prog; 1391 1392 switch (progp->pg_init_request(rqstp, progp, &process)) { 1393 case rpc_success: 1394 break; 1395 case rpc_prog_unavail: 1396 goto err_bad_prog; 1397 case rpc_prog_mismatch: 1398 goto err_bad_vers; 1399 case rpc_proc_unavail: 1400 goto err_bad_proc; 1401 } 1402 1403 procp = rqstp->rq_procinfo; 1404 /* Should this check go into the dispatcher? */ 1405 if (!procp || !procp->pc_func) 1406 goto err_bad_proc; 1407 1408 /* Syntactic check complete */ 1409 if (serv->sv_stats) 1410 serv->sv_stats->rpccnt++; 1411 trace_svc_process(rqstp, progp->pg_name); 1412 1413 aoffset = xdr_stream_pos(xdr); 1414 1415 /* un-reserve some of the out-queue now that we have a 1416 * better idea of reply size 1417 */ 1418 if (procp->pc_xdrressize) 1419 svc_reserve_auth(rqstp, procp->pc_xdrressize<<2); 1420 1421 /* Call the function that processes the request. */ 1422 rc = process.dispatch(rqstp); 1423 if (procp->pc_release) 1424 procp->pc_release(rqstp); 1425 xdr_finish_decode(xdr); 1426 1427 if (!rc) 1428 goto dropit; 1429 if (rqstp->rq_auth_stat != rpc_auth_ok) 1430 goto err_bad_auth; 1431 1432 if (*rqstp->rq_accept_statp != rpc_success) 1433 xdr_truncate_encode(xdr, aoffset); 1434 1435 if (procp->pc_encode == NULL) 1436 goto dropit; 1437 1438 sendit: 1439 if (svc_authorise(rqstp)) 1440 goto close_xprt; 1441 return 1; /* Caller can now send it */ 1442 1443 dropit: 1444 svc_authorise(rqstp); /* doesn't hurt to call this twice */ 1445 dprintk("svc: svc_process dropit\n"); 1446 return 0; 1447 1448 close: 1449 svc_authorise(rqstp); 1450 close_xprt: 1451 if (rqstp->rq_xprt && test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags)) 1452 svc_xprt_close(rqstp->rq_xprt); 1453 dprintk("svc: svc_process close\n"); 1454 return 0; 1455 1456 err_short_len: 1457 svc_printk(rqstp, "short len %u, dropping request\n", 1458 rqstp->rq_arg.len); 1459 goto close_xprt; 1460 1461 err_bad_rpc: 1462 if (serv->sv_stats) 1463 serv->sv_stats->rpcbadfmt++; 1464 xdr_stream_encode_u32(xdr, RPC_MSG_DENIED); 1465 xdr_stream_encode_u32(xdr, RPC_MISMATCH); 1466 /* Only RPCv2 supported */ 1467 xdr_stream_encode_u32(xdr, RPC_VERSION); 1468 xdr_stream_encode_u32(xdr, RPC_VERSION); 1469 return 1; /* don't wrap */ 1470 1471 err_bad_auth: 1472 dprintk("svc: authentication failed (%d)\n", 1473 be32_to_cpu(rqstp->rq_auth_stat)); 1474 if (serv->sv_stats) 1475 serv->sv_stats->rpcbadauth++; 1476 /* Restore write pointer to location of reply status: */ 1477 xdr_truncate_encode(xdr, XDR_UNIT * 2); 1478 xdr_stream_encode_u32(xdr, RPC_MSG_DENIED); 1479 xdr_stream_encode_u32(xdr, RPC_AUTH_ERROR); 1480 xdr_stream_encode_be32(xdr, rqstp->rq_auth_stat); 1481 goto sendit; 1482 1483 err_bad_prog: 1484 dprintk("svc: unknown program %d\n", rqstp->rq_prog); 1485 if (serv->sv_stats) 1486 serv->sv_stats->rpcbadfmt++; 1487 *rqstp->rq_accept_statp = rpc_prog_unavail; 1488 goto sendit; 1489 1490 err_bad_vers: 1491 svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n", 1492 rqstp->rq_vers, rqstp->rq_prog, progp->pg_name); 1493 1494 if (serv->sv_stats) 1495 serv->sv_stats->rpcbadfmt++; 1496 *rqstp->rq_accept_statp = rpc_prog_mismatch; 1497 1498 /* 1499 * svc_authenticate() has already added the verifier and 1500 * advanced the stream just past rq_accept_statp. 1501 */ 1502 xdr_stream_encode_u32(xdr, process.mismatch.lovers); 1503 xdr_stream_encode_u32(xdr, process.mismatch.hivers); 1504 goto sendit; 1505 1506 err_bad_proc: 1507 svc_printk(rqstp, "unknown procedure (%d)\n", rqstp->rq_proc); 1508 1509 if (serv->sv_stats) 1510 serv->sv_stats->rpcbadfmt++; 1511 *rqstp->rq_accept_statp = rpc_proc_unavail; 1512 goto sendit; 1513 1514 err_garbage_args: 1515 svc_printk(rqstp, "failed to decode RPC header\n"); 1516 1517 if (serv->sv_stats) 1518 serv->sv_stats->rpcbadfmt++; 1519 *rqstp->rq_accept_statp = rpc_garbage_args; 1520 goto sendit; 1521 1522 err_system_err: 1523 if (serv->sv_stats) 1524 serv->sv_stats->rpcbadfmt++; 1525 *rqstp->rq_accept_statp = rpc_system_err; 1526 goto sendit; 1527 } 1528 1529 /** 1530 * svc_process - Execute one RPC transaction 1531 * @rqstp: RPC transaction context 1532 * 1533 */ 1534 void svc_process(struct svc_rqst *rqstp) 1535 { 1536 struct kvec *resv = &rqstp->rq_res.head[0]; 1537 __be32 *p; 1538 1539 #if IS_ENABLED(CONFIG_FAIL_SUNRPC) 1540 if (!fail_sunrpc.ignore_server_disconnect && 1541 should_fail(&fail_sunrpc.attr, 1)) 1542 svc_xprt_deferred_close(rqstp->rq_xprt); 1543 #endif 1544 1545 /* 1546 * Setup response xdr_buf. 1547 * Initially it has just one page 1548 */ 1549 rqstp->rq_next_page = &rqstp->rq_respages[1]; 1550 resv->iov_base = page_address(rqstp->rq_respages[0]); 1551 resv->iov_len = 0; 1552 rqstp->rq_res.pages = rqstp->rq_next_page; 1553 rqstp->rq_res.len = 0; 1554 rqstp->rq_res.page_base = 0; 1555 rqstp->rq_res.page_len = 0; 1556 rqstp->rq_res.buflen = PAGE_SIZE; 1557 rqstp->rq_res.tail[0].iov_base = NULL; 1558 rqstp->rq_res.tail[0].iov_len = 0; 1559 1560 svcxdr_init_decode(rqstp); 1561 p = xdr_inline_decode(&rqstp->rq_arg_stream, XDR_UNIT * 2); 1562 if (unlikely(!p)) 1563 goto out_drop; 1564 rqstp->rq_xid = *p++; 1565 if (unlikely(*p != rpc_call)) 1566 goto out_baddir; 1567 1568 if (!svc_process_common(rqstp)) 1569 goto out_drop; 1570 svc_send(rqstp); 1571 return; 1572 1573 out_baddir: 1574 svc_printk(rqstp, "bad direction 0x%08x, dropping request\n", 1575 be32_to_cpu(*p)); 1576 if (rqstp->rq_server->sv_stats) 1577 rqstp->rq_server->sv_stats->rpcbadfmt++; 1578 out_drop: 1579 svc_drop(rqstp); 1580 } 1581 1582 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1583 /** 1584 * svc_process_bc - process a reverse-direction RPC request 1585 * @req: RPC request to be used for client-side processing 1586 * @rqstp: server-side execution context 1587 * 1588 */ 1589 void svc_process_bc(struct rpc_rqst *req, struct svc_rqst *rqstp) 1590 { 1591 struct rpc_timeout timeout = { 1592 .to_increment = 0, 1593 }; 1594 struct rpc_task *task; 1595 int proc_error; 1596 1597 /* Build the svc_rqst used by the common processing routine */ 1598 rqstp->rq_xid = req->rq_xid; 1599 rqstp->rq_prot = req->rq_xprt->prot; 1600 rqstp->rq_bc_net = req->rq_xprt->xprt_net; 1601 1602 rqstp->rq_addrlen = sizeof(req->rq_xprt->addr); 1603 memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen); 1604 memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg)); 1605 memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res)); 1606 1607 /* Adjust the argument buffer length */ 1608 rqstp->rq_arg.len = req->rq_private_buf.len; 1609 if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) { 1610 rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len; 1611 rqstp->rq_arg.page_len = 0; 1612 } else if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len + 1613 rqstp->rq_arg.page_len) 1614 rqstp->rq_arg.page_len = rqstp->rq_arg.len - 1615 rqstp->rq_arg.head[0].iov_len; 1616 else 1617 rqstp->rq_arg.len = rqstp->rq_arg.head[0].iov_len + 1618 rqstp->rq_arg.page_len; 1619 1620 /* Reset the response buffer */ 1621 rqstp->rq_res.head[0].iov_len = 0; 1622 1623 /* 1624 * Skip the XID and calldir fields because they've already 1625 * been processed by the caller. 1626 */ 1627 svcxdr_init_decode(rqstp); 1628 if (!xdr_inline_decode(&rqstp->rq_arg_stream, XDR_UNIT * 2)) 1629 return; 1630 1631 /* Parse and execute the bc call */ 1632 proc_error = svc_process_common(rqstp); 1633 1634 atomic_dec(&req->rq_xprt->bc_slot_count); 1635 if (!proc_error) { 1636 /* Processing error: drop the request */ 1637 xprt_free_bc_request(req); 1638 return; 1639 } 1640 /* Finally, send the reply synchronously */ 1641 if (rqstp->bc_to_initval > 0) { 1642 timeout.to_initval = rqstp->bc_to_initval; 1643 timeout.to_retries = rqstp->bc_to_retries; 1644 } else { 1645 timeout.to_initval = req->rq_xprt->timeout->to_initval; 1646 timeout.to_retries = req->rq_xprt->timeout->to_retries; 1647 } 1648 timeout.to_maxval = timeout.to_initval; 1649 memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf)); 1650 task = rpc_run_bc_task(req, &timeout); 1651 1652 if (IS_ERR(task)) 1653 return; 1654 1655 WARN_ON_ONCE(atomic_read(&task->tk_count) != 1); 1656 rpc_put_task(task); 1657 } 1658 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1659 1660 /** 1661 * svc_max_payload - Return transport-specific limit on the RPC payload 1662 * @rqstp: RPC transaction context 1663 * 1664 * Returns the maximum number of payload bytes the current transport 1665 * allows. 1666 */ 1667 u32 svc_max_payload(const struct svc_rqst *rqstp) 1668 { 1669 u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload; 1670 1671 if (rqstp->rq_server->sv_max_payload < max) 1672 max = rqstp->rq_server->sv_max_payload; 1673 return max; 1674 } 1675 EXPORT_SYMBOL_GPL(svc_max_payload); 1676 1677 /** 1678 * svc_proc_name - Return RPC procedure name in string form 1679 * @rqstp: svc_rqst to operate on 1680 * 1681 * Return value: 1682 * Pointer to a NUL-terminated string 1683 */ 1684 const char *svc_proc_name(const struct svc_rqst *rqstp) 1685 { 1686 if (rqstp && rqstp->rq_procinfo) 1687 return rqstp->rq_procinfo->pc_name; 1688 return "unknown"; 1689 } 1690 1691 1692 /** 1693 * svc_encode_result_payload - mark a range of bytes as a result payload 1694 * @rqstp: svc_rqst to operate on 1695 * @offset: payload's byte offset in rqstp->rq_res 1696 * @length: size of payload, in bytes 1697 * 1698 * Returns zero on success, or a negative errno if a permanent 1699 * error occurred. 1700 */ 1701 int svc_encode_result_payload(struct svc_rqst *rqstp, unsigned int offset, 1702 unsigned int length) 1703 { 1704 return rqstp->rq_xprt->xpt_ops->xpo_result_payload(rqstp, offset, 1705 length); 1706 } 1707 EXPORT_SYMBOL_GPL(svc_encode_result_payload); 1708 1709 /** 1710 * svc_fill_write_vector - Construct data argument for VFS write call 1711 * @rqstp: svc_rqst to operate on 1712 * @payload: xdr_buf containing only the write data payload 1713 * 1714 * Fills in rqstp::rq_vec, and returns the number of elements. 1715 */ 1716 unsigned int svc_fill_write_vector(struct svc_rqst *rqstp, 1717 struct xdr_buf *payload) 1718 { 1719 struct page **pages = payload->pages; 1720 struct kvec *first = payload->head; 1721 struct kvec *vec = rqstp->rq_vec; 1722 size_t total = payload->len; 1723 unsigned int i; 1724 1725 /* Some types of transport can present the write payload 1726 * entirely in rq_arg.pages. In this case, @first is empty. 1727 */ 1728 i = 0; 1729 if (first->iov_len) { 1730 vec[i].iov_base = first->iov_base; 1731 vec[i].iov_len = min_t(size_t, total, first->iov_len); 1732 total -= vec[i].iov_len; 1733 ++i; 1734 } 1735 1736 while (total) { 1737 vec[i].iov_base = page_address(*pages); 1738 vec[i].iov_len = min_t(size_t, total, PAGE_SIZE); 1739 total -= vec[i].iov_len; 1740 ++i; 1741 ++pages; 1742 } 1743 1744 WARN_ON_ONCE(i > ARRAY_SIZE(rqstp->rq_vec)); 1745 return i; 1746 } 1747 EXPORT_SYMBOL_GPL(svc_fill_write_vector); 1748 1749 /** 1750 * svc_fill_symlink_pathname - Construct pathname argument for VFS symlink call 1751 * @rqstp: svc_rqst to operate on 1752 * @first: buffer containing first section of pathname 1753 * @p: buffer containing remaining section of pathname 1754 * @total: total length of the pathname argument 1755 * 1756 * The VFS symlink API demands a NUL-terminated pathname in mapped memory. 1757 * Returns pointer to a NUL-terminated string, or an ERR_PTR. Caller must free 1758 * the returned string. 1759 */ 1760 char *svc_fill_symlink_pathname(struct svc_rqst *rqstp, struct kvec *first, 1761 void *p, size_t total) 1762 { 1763 size_t len, remaining; 1764 char *result, *dst; 1765 1766 result = kmalloc(total + 1, GFP_KERNEL); 1767 if (!result) 1768 return ERR_PTR(-ESERVERFAULT); 1769 1770 dst = result; 1771 remaining = total; 1772 1773 len = min_t(size_t, total, first->iov_len); 1774 if (len) { 1775 memcpy(dst, first->iov_base, len); 1776 dst += len; 1777 remaining -= len; 1778 } 1779 1780 if (remaining) { 1781 len = min_t(size_t, remaining, PAGE_SIZE); 1782 memcpy(dst, p, len); 1783 dst += len; 1784 } 1785 1786 *dst = '\0'; 1787 1788 /* Sanity check: Linux doesn't allow the pathname argument to 1789 * contain a NUL byte. 1790 */ 1791 if (strlen(result) != total) { 1792 kfree(result); 1793 return ERR_PTR(-EINVAL); 1794 } 1795 return result; 1796 } 1797 EXPORT_SYMBOL_GPL(svc_fill_symlink_pathname); 1798