1 /* 2 * linux/net/sunrpc/clnt.c 3 * 4 * This file contains the high-level RPC interface. 5 * It is modeled as a finite state machine to support both synchronous 6 * and asynchronous requests. 7 * 8 * - RPC header generation and argument serialization. 9 * - Credential refresh. 10 * - TCP connect handling. 11 * - Retry of operation when it is suspected the operation failed because 12 * of uid squashing on the server, or when the credentials were stale 13 * and need to be refreshed, or when a packet was damaged in transit. 14 * This may be have to be moved to the VFS layer. 15 * 16 * NB: BSD uses a more intelligent approach to guessing when a request 17 * or reply has been lost by keeping the RTO estimate for each procedure. 18 * We currently make do with a constant timeout value. 19 * 20 * Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com> 21 * Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de> 22 */ 23 24 #include <asm/system.h> 25 26 #include <linux/module.h> 27 #include <linux/types.h> 28 #include <linux/mm.h> 29 #include <linux/slab.h> 30 #include <linux/smp_lock.h> 31 #include <linux/utsname.h> 32 #include <linux/workqueue.h> 33 34 #include <linux/sunrpc/clnt.h> 35 #include <linux/sunrpc/rpc_pipe_fs.h> 36 #include <linux/sunrpc/metrics.h> 37 38 39 #define RPC_SLACK_SPACE (1024) /* total overkill */ 40 41 #ifdef RPC_DEBUG 42 # define RPCDBG_FACILITY RPCDBG_CALL 43 #endif 44 45 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait); 46 47 48 static void call_start(struct rpc_task *task); 49 static void call_reserve(struct rpc_task *task); 50 static void call_reserveresult(struct rpc_task *task); 51 static void call_allocate(struct rpc_task *task); 52 static void call_encode(struct rpc_task *task); 53 static void call_decode(struct rpc_task *task); 54 static void call_bind(struct rpc_task *task); 55 static void call_bind_status(struct rpc_task *task); 56 static void call_transmit(struct rpc_task *task); 57 static void call_status(struct rpc_task *task); 58 static void call_transmit_status(struct rpc_task *task); 59 static void call_refresh(struct rpc_task *task); 60 static void call_refreshresult(struct rpc_task *task); 61 static void call_timeout(struct rpc_task *task); 62 static void call_connect(struct rpc_task *task); 63 static void call_connect_status(struct rpc_task *task); 64 static __be32 * call_header(struct rpc_task *task); 65 static __be32 * call_verify(struct rpc_task *task); 66 67 68 static int 69 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name) 70 { 71 static uint32_t clntid; 72 int error; 73 74 clnt->cl_vfsmnt = ERR_PTR(-ENOENT); 75 clnt->cl_dentry = ERR_PTR(-ENOENT); 76 if (dir_name == NULL) 77 return 0; 78 79 clnt->cl_vfsmnt = rpc_get_mount(); 80 if (IS_ERR(clnt->cl_vfsmnt)) 81 return PTR_ERR(clnt->cl_vfsmnt); 82 83 for (;;) { 84 snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname), 85 "%s/clnt%x", dir_name, 86 (unsigned int)clntid++); 87 clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0'; 88 clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt); 89 if (!IS_ERR(clnt->cl_dentry)) 90 return 0; 91 error = PTR_ERR(clnt->cl_dentry); 92 if (error != -EEXIST) { 93 printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n", 94 clnt->cl_pathname, error); 95 rpc_put_mount(); 96 return error; 97 } 98 } 99 } 100 101 static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, struct rpc_program *program, u32 vers, rpc_authflavor_t flavor) 102 { 103 struct rpc_version *version; 104 struct rpc_clnt *clnt = NULL; 105 struct rpc_auth *auth; 106 int err; 107 int len; 108 109 dprintk("RPC: creating %s client for %s (xprt %p)\n", 110 program->name, servname, xprt); 111 112 err = -EINVAL; 113 if (!xprt) 114 goto out_no_xprt; 115 if (vers >= program->nrvers || !(version = program->version[vers])) 116 goto out_err; 117 118 err = -ENOMEM; 119 clnt = kzalloc(sizeof(*clnt), GFP_KERNEL); 120 if (!clnt) 121 goto out_err; 122 atomic_set(&clnt->cl_users, 0); 123 atomic_set(&clnt->cl_count, 1); 124 clnt->cl_parent = clnt; 125 126 clnt->cl_server = clnt->cl_inline_name; 127 len = strlen(servname) + 1; 128 if (len > sizeof(clnt->cl_inline_name)) { 129 char *buf = kmalloc(len, GFP_KERNEL); 130 if (buf != 0) 131 clnt->cl_server = buf; 132 else 133 len = sizeof(clnt->cl_inline_name); 134 } 135 strlcpy(clnt->cl_server, servname, len); 136 137 clnt->cl_xprt = xprt; 138 clnt->cl_procinfo = version->procs; 139 clnt->cl_maxproc = version->nrprocs; 140 clnt->cl_protname = program->name; 141 clnt->cl_prog = program->number; 142 clnt->cl_vers = version->number; 143 clnt->cl_stats = program->stats; 144 clnt->cl_metrics = rpc_alloc_iostats(clnt); 145 err = -ENOMEM; 146 if (clnt->cl_metrics == NULL) 147 goto out_no_stats; 148 clnt->cl_program = program; 149 150 if (!xprt_bound(clnt->cl_xprt)) 151 clnt->cl_autobind = 1; 152 153 clnt->cl_rtt = &clnt->cl_rtt_default; 154 rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval); 155 156 err = rpc_setup_pipedir(clnt, program->pipe_dir_name); 157 if (err < 0) 158 goto out_no_path; 159 160 auth = rpcauth_create(flavor, clnt); 161 if (IS_ERR(auth)) { 162 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n", 163 flavor); 164 err = PTR_ERR(auth); 165 goto out_no_auth; 166 } 167 168 /* save the nodename */ 169 clnt->cl_nodelen = strlen(utsname()->nodename); 170 if (clnt->cl_nodelen > UNX_MAXNODENAME) 171 clnt->cl_nodelen = UNX_MAXNODENAME; 172 memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen); 173 return clnt; 174 175 out_no_auth: 176 if (!IS_ERR(clnt->cl_dentry)) { 177 rpc_rmdir(clnt->cl_dentry); 178 rpc_put_mount(); 179 } 180 out_no_path: 181 rpc_free_iostats(clnt->cl_metrics); 182 out_no_stats: 183 if (clnt->cl_server != clnt->cl_inline_name) 184 kfree(clnt->cl_server); 185 kfree(clnt); 186 out_err: 187 xprt_put(xprt); 188 out_no_xprt: 189 return ERR_PTR(err); 190 } 191 192 /* 193 * rpc_create - create an RPC client and transport with one call 194 * @args: rpc_clnt create argument structure 195 * 196 * Creates and initializes an RPC transport and an RPC client. 197 * 198 * It can ping the server in order to determine if it is up, and to see if 199 * it supports this program and version. RPC_CLNT_CREATE_NOPING disables 200 * this behavior so asynchronous tasks can also use rpc_create. 201 */ 202 struct rpc_clnt *rpc_create(struct rpc_create_args *args) 203 { 204 struct rpc_xprt *xprt; 205 struct rpc_clnt *clnt; 206 207 xprt = xprt_create_transport(args->protocol, args->address, 208 args->addrsize, args->timeout); 209 if (IS_ERR(xprt)) 210 return (struct rpc_clnt *)xprt; 211 212 /* 213 * By default, kernel RPC client connects from a reserved port. 214 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters, 215 * but it is always enabled for rpciod, which handles the connect 216 * operation. 217 */ 218 xprt->resvport = 1; 219 if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT) 220 xprt->resvport = 0; 221 222 dprintk("RPC: creating %s client for %s (xprt %p)\n", 223 args->program->name, args->servername, xprt); 224 225 clnt = rpc_new_client(xprt, args->servername, args->program, 226 args->version, args->authflavor); 227 if (IS_ERR(clnt)) 228 return clnt; 229 230 if (!(args->flags & RPC_CLNT_CREATE_NOPING)) { 231 int err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR); 232 if (err != 0) { 233 rpc_shutdown_client(clnt); 234 return ERR_PTR(err); 235 } 236 } 237 238 clnt->cl_softrtry = 1; 239 if (args->flags & RPC_CLNT_CREATE_HARDRTRY) 240 clnt->cl_softrtry = 0; 241 242 if (args->flags & RPC_CLNT_CREATE_INTR) 243 clnt->cl_intr = 1; 244 if (args->flags & RPC_CLNT_CREATE_AUTOBIND) 245 clnt->cl_autobind = 1; 246 if (args->flags & RPC_CLNT_CREATE_ONESHOT) 247 clnt->cl_oneshot = 1; 248 249 return clnt; 250 } 251 EXPORT_SYMBOL_GPL(rpc_create); 252 253 /* 254 * This function clones the RPC client structure. It allows us to share the 255 * same transport while varying parameters such as the authentication 256 * flavour. 257 */ 258 struct rpc_clnt * 259 rpc_clone_client(struct rpc_clnt *clnt) 260 { 261 struct rpc_clnt *new; 262 int err = -ENOMEM; 263 264 new = kmemdup(clnt, sizeof(*new), GFP_KERNEL); 265 if (!new) 266 goto out_no_clnt; 267 atomic_set(&new->cl_count, 1); 268 atomic_set(&new->cl_users, 0); 269 new->cl_metrics = rpc_alloc_iostats(clnt); 270 if (new->cl_metrics == NULL) 271 goto out_no_stats; 272 err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name); 273 if (err != 0) 274 goto out_no_path; 275 new->cl_parent = clnt; 276 atomic_inc(&clnt->cl_count); 277 new->cl_xprt = xprt_get(clnt->cl_xprt); 278 /* Turn off autobind on clones */ 279 new->cl_autobind = 0; 280 new->cl_oneshot = 0; 281 new->cl_dead = 0; 282 rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval); 283 if (new->cl_auth) 284 atomic_inc(&new->cl_auth->au_count); 285 return new; 286 out_no_path: 287 rpc_free_iostats(new->cl_metrics); 288 out_no_stats: 289 kfree(new); 290 out_no_clnt: 291 dprintk("RPC: %s returned error %d\n", __FUNCTION__, err); 292 return ERR_PTR(err); 293 } 294 295 /* 296 * Properly shut down an RPC client, terminating all outstanding 297 * requests. Note that we must be certain that cl_oneshot and 298 * cl_dead are cleared, or else the client would be destroyed 299 * when the last task releases it. 300 */ 301 int 302 rpc_shutdown_client(struct rpc_clnt *clnt) 303 { 304 dprintk("RPC: shutting down %s client for %s, tasks=%d\n", 305 clnt->cl_protname, clnt->cl_server, 306 atomic_read(&clnt->cl_users)); 307 308 while (atomic_read(&clnt->cl_users) > 0) { 309 /* Don't let rpc_release_client destroy us */ 310 clnt->cl_oneshot = 0; 311 clnt->cl_dead = 0; 312 rpc_killall_tasks(clnt); 313 wait_event_timeout(destroy_wait, 314 !atomic_read(&clnt->cl_users), 1*HZ); 315 } 316 317 if (atomic_read(&clnt->cl_users) < 0) { 318 printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n", 319 clnt, atomic_read(&clnt->cl_users)); 320 #ifdef RPC_DEBUG 321 rpc_show_tasks(); 322 #endif 323 BUG(); 324 } 325 326 return rpc_destroy_client(clnt); 327 } 328 329 /* 330 * Delete an RPC client 331 */ 332 int 333 rpc_destroy_client(struct rpc_clnt *clnt) 334 { 335 if (!atomic_dec_and_test(&clnt->cl_count)) 336 return 1; 337 BUG_ON(atomic_read(&clnt->cl_users) != 0); 338 339 dprintk("RPC: destroying %s client for %s\n", 340 clnt->cl_protname, clnt->cl_server); 341 if (clnt->cl_auth) { 342 rpcauth_destroy(clnt->cl_auth); 343 clnt->cl_auth = NULL; 344 } 345 if (!IS_ERR(clnt->cl_dentry)) { 346 rpc_rmdir(clnt->cl_dentry); 347 rpc_put_mount(); 348 } 349 if (clnt->cl_parent != clnt) { 350 rpc_destroy_client(clnt->cl_parent); 351 goto out_free; 352 } 353 if (clnt->cl_server != clnt->cl_inline_name) 354 kfree(clnt->cl_server); 355 out_free: 356 rpc_free_iostats(clnt->cl_metrics); 357 clnt->cl_metrics = NULL; 358 xprt_put(clnt->cl_xprt); 359 kfree(clnt); 360 return 0; 361 } 362 363 /* 364 * Release an RPC client 365 */ 366 void 367 rpc_release_client(struct rpc_clnt *clnt) 368 { 369 dprintk("RPC: rpc_release_client(%p, %d)\n", 370 clnt, atomic_read(&clnt->cl_users)); 371 372 if (!atomic_dec_and_test(&clnt->cl_users)) 373 return; 374 wake_up(&destroy_wait); 375 if (clnt->cl_oneshot || clnt->cl_dead) 376 rpc_destroy_client(clnt); 377 } 378 379 /** 380 * rpc_bind_new_program - bind a new RPC program to an existing client 381 * @old - old rpc_client 382 * @program - rpc program to set 383 * @vers - rpc program version 384 * 385 * Clones the rpc client and sets up a new RPC program. This is mainly 386 * of use for enabling different RPC programs to share the same transport. 387 * The Sun NFSv2/v3 ACL protocol can do this. 388 */ 389 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old, 390 struct rpc_program *program, 391 int vers) 392 { 393 struct rpc_clnt *clnt; 394 struct rpc_version *version; 395 int err; 396 397 BUG_ON(vers >= program->nrvers || !program->version[vers]); 398 version = program->version[vers]; 399 clnt = rpc_clone_client(old); 400 if (IS_ERR(clnt)) 401 goto out; 402 clnt->cl_procinfo = version->procs; 403 clnt->cl_maxproc = version->nrprocs; 404 clnt->cl_protname = program->name; 405 clnt->cl_prog = program->number; 406 clnt->cl_vers = version->number; 407 clnt->cl_stats = program->stats; 408 err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR); 409 if (err != 0) { 410 rpc_shutdown_client(clnt); 411 clnt = ERR_PTR(err); 412 } 413 out: 414 return clnt; 415 } 416 417 /* 418 * Default callback for async RPC calls 419 */ 420 static void 421 rpc_default_callback(struct rpc_task *task, void *data) 422 { 423 } 424 425 static const struct rpc_call_ops rpc_default_ops = { 426 .rpc_call_done = rpc_default_callback, 427 }; 428 429 /* 430 * Export the signal mask handling for synchronous code that 431 * sleeps on RPC calls 432 */ 433 #define RPC_INTR_SIGNALS (sigmask(SIGHUP) | sigmask(SIGINT) | sigmask(SIGQUIT) | sigmask(SIGTERM)) 434 435 static void rpc_save_sigmask(sigset_t *oldset, int intr) 436 { 437 unsigned long sigallow = sigmask(SIGKILL); 438 sigset_t sigmask; 439 440 /* Block all signals except those listed in sigallow */ 441 if (intr) 442 sigallow |= RPC_INTR_SIGNALS; 443 siginitsetinv(&sigmask, sigallow); 444 sigprocmask(SIG_BLOCK, &sigmask, oldset); 445 } 446 447 static inline void rpc_task_sigmask(struct rpc_task *task, sigset_t *oldset) 448 { 449 rpc_save_sigmask(oldset, !RPC_TASK_UNINTERRUPTIBLE(task)); 450 } 451 452 static inline void rpc_restore_sigmask(sigset_t *oldset) 453 { 454 sigprocmask(SIG_SETMASK, oldset, NULL); 455 } 456 457 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset) 458 { 459 rpc_save_sigmask(oldset, clnt->cl_intr); 460 } 461 462 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset) 463 { 464 rpc_restore_sigmask(oldset); 465 } 466 467 /* 468 * New rpc_call implementation 469 */ 470 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags) 471 { 472 struct rpc_task *task; 473 sigset_t oldset; 474 int status; 475 476 /* If this client is slain all further I/O fails */ 477 if (clnt->cl_dead) 478 return -EIO; 479 480 BUG_ON(flags & RPC_TASK_ASYNC); 481 482 task = rpc_new_task(clnt, flags, &rpc_default_ops, NULL); 483 if (task == NULL) 484 return -ENOMEM; 485 486 /* Mask signals on RPC calls _and_ GSS_AUTH upcalls */ 487 rpc_task_sigmask(task, &oldset); 488 489 rpc_call_setup(task, msg, 0); 490 491 /* Set up the call info struct and execute the task */ 492 status = task->tk_status; 493 if (status != 0) { 494 rpc_release_task(task); 495 goto out; 496 } 497 atomic_inc(&task->tk_count); 498 status = rpc_execute(task); 499 if (status == 0) 500 status = task->tk_status; 501 rpc_put_task(task); 502 out: 503 rpc_restore_sigmask(&oldset); 504 return status; 505 } 506 507 /* 508 * New rpc_call implementation 509 */ 510 int 511 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags, 512 const struct rpc_call_ops *tk_ops, void *data) 513 { 514 struct rpc_task *task; 515 sigset_t oldset; 516 int status; 517 518 /* If this client is slain all further I/O fails */ 519 status = -EIO; 520 if (clnt->cl_dead) 521 goto out_release; 522 523 flags |= RPC_TASK_ASYNC; 524 525 /* Create/initialize a new RPC task */ 526 status = -ENOMEM; 527 if (!(task = rpc_new_task(clnt, flags, tk_ops, data))) 528 goto out_release; 529 530 /* Mask signals on GSS_AUTH upcalls */ 531 rpc_task_sigmask(task, &oldset); 532 533 rpc_call_setup(task, msg, 0); 534 535 /* Set up the call info struct and execute the task */ 536 status = task->tk_status; 537 if (status == 0) 538 rpc_execute(task); 539 else 540 rpc_release_task(task); 541 542 rpc_restore_sigmask(&oldset); 543 return status; 544 out_release: 545 rpc_release_calldata(tk_ops, data); 546 return status; 547 } 548 549 550 void 551 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags) 552 { 553 task->tk_msg = *msg; 554 task->tk_flags |= flags; 555 /* Bind the user cred */ 556 if (task->tk_msg.rpc_cred != NULL) 557 rpcauth_holdcred(task); 558 else 559 rpcauth_bindcred(task); 560 561 if (task->tk_status == 0) 562 task->tk_action = call_start; 563 else 564 task->tk_action = rpc_exit_task; 565 } 566 567 /** 568 * rpc_peeraddr - extract remote peer address from clnt's xprt 569 * @clnt: RPC client structure 570 * @buf: target buffer 571 * @size: length of target buffer 572 * 573 * Returns the number of bytes that are actually in the stored address. 574 */ 575 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize) 576 { 577 size_t bytes; 578 struct rpc_xprt *xprt = clnt->cl_xprt; 579 580 bytes = sizeof(xprt->addr); 581 if (bytes > bufsize) 582 bytes = bufsize; 583 memcpy(buf, &clnt->cl_xprt->addr, bytes); 584 return xprt->addrlen; 585 } 586 EXPORT_SYMBOL_GPL(rpc_peeraddr); 587 588 /** 589 * rpc_peeraddr2str - return remote peer address in printable format 590 * @clnt: RPC client structure 591 * @format: address format 592 * 593 */ 594 char *rpc_peeraddr2str(struct rpc_clnt *clnt, enum rpc_display_format_t format) 595 { 596 struct rpc_xprt *xprt = clnt->cl_xprt; 597 598 if (xprt->address_strings[format] != NULL) 599 return xprt->address_strings[format]; 600 else 601 return "unprintable"; 602 } 603 EXPORT_SYMBOL_GPL(rpc_peeraddr2str); 604 605 void 606 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize) 607 { 608 struct rpc_xprt *xprt = clnt->cl_xprt; 609 if (xprt->ops->set_buffer_size) 610 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize); 611 } 612 613 /* 614 * Return size of largest payload RPC client can support, in bytes 615 * 616 * For stream transports, this is one RPC record fragment (see RFC 617 * 1831), as we don't support multi-record requests yet. For datagram 618 * transports, this is the size of an IP packet minus the IP, UDP, and 619 * RPC header sizes. 620 */ 621 size_t rpc_max_payload(struct rpc_clnt *clnt) 622 { 623 return clnt->cl_xprt->max_payload; 624 } 625 EXPORT_SYMBOL_GPL(rpc_max_payload); 626 627 /** 628 * rpc_force_rebind - force transport to check that remote port is unchanged 629 * @clnt: client to rebind 630 * 631 */ 632 void rpc_force_rebind(struct rpc_clnt *clnt) 633 { 634 if (clnt->cl_autobind) 635 xprt_clear_bound(clnt->cl_xprt); 636 } 637 EXPORT_SYMBOL_GPL(rpc_force_rebind); 638 639 /* 640 * Restart an (async) RPC call. Usually called from within the 641 * exit handler. 642 */ 643 void 644 rpc_restart_call(struct rpc_task *task) 645 { 646 if (RPC_ASSASSINATED(task)) 647 return; 648 649 task->tk_action = call_start; 650 } 651 652 /* 653 * 0. Initial state 654 * 655 * Other FSM states can be visited zero or more times, but 656 * this state is visited exactly once for each RPC. 657 */ 658 static void 659 call_start(struct rpc_task *task) 660 { 661 struct rpc_clnt *clnt = task->tk_client; 662 663 dprintk("RPC: %4d call_start %s%d proc %d (%s)\n", task->tk_pid, 664 clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc->p_proc, 665 (RPC_IS_ASYNC(task) ? "async" : "sync")); 666 667 /* Increment call count */ 668 task->tk_msg.rpc_proc->p_count++; 669 clnt->cl_stats->rpccnt++; 670 task->tk_action = call_reserve; 671 } 672 673 /* 674 * 1. Reserve an RPC call slot 675 */ 676 static void 677 call_reserve(struct rpc_task *task) 678 { 679 dprintk("RPC: %4d call_reserve\n", task->tk_pid); 680 681 if (!rpcauth_uptodatecred(task)) { 682 task->tk_action = call_refresh; 683 return; 684 } 685 686 task->tk_status = 0; 687 task->tk_action = call_reserveresult; 688 xprt_reserve(task); 689 } 690 691 /* 692 * 1b. Grok the result of xprt_reserve() 693 */ 694 static void 695 call_reserveresult(struct rpc_task *task) 696 { 697 int status = task->tk_status; 698 699 dprintk("RPC: %4d call_reserveresult (status %d)\n", 700 task->tk_pid, task->tk_status); 701 702 /* 703 * After a call to xprt_reserve(), we must have either 704 * a request slot or else an error status. 705 */ 706 task->tk_status = 0; 707 if (status >= 0) { 708 if (task->tk_rqstp) { 709 task->tk_action = call_allocate; 710 return; 711 } 712 713 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n", 714 __FUNCTION__, status); 715 rpc_exit(task, -EIO); 716 return; 717 } 718 719 /* 720 * Even though there was an error, we may have acquired 721 * a request slot somehow. Make sure not to leak it. 722 */ 723 if (task->tk_rqstp) { 724 printk(KERN_ERR "%s: status=%d, request allocated anyway\n", 725 __FUNCTION__, status); 726 xprt_release(task); 727 } 728 729 switch (status) { 730 case -EAGAIN: /* woken up; retry */ 731 task->tk_action = call_reserve; 732 return; 733 case -EIO: /* probably a shutdown */ 734 break; 735 default: 736 printk(KERN_ERR "%s: unrecognized error %d, exiting\n", 737 __FUNCTION__, status); 738 break; 739 } 740 rpc_exit(task, status); 741 } 742 743 /* 744 * 2. Allocate the buffer. For details, see sched.c:rpc_malloc. 745 * (Note: buffer memory is freed in xprt_release). 746 */ 747 static void 748 call_allocate(struct rpc_task *task) 749 { 750 struct rpc_rqst *req = task->tk_rqstp; 751 struct rpc_xprt *xprt = task->tk_xprt; 752 unsigned int bufsiz; 753 754 dprintk("RPC: %4d call_allocate (status %d)\n", 755 task->tk_pid, task->tk_status); 756 task->tk_action = call_bind; 757 if (req->rq_buffer) 758 return; 759 760 /* FIXME: compute buffer requirements more exactly using 761 * auth->au_wslack */ 762 bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE; 763 764 if (xprt->ops->buf_alloc(task, bufsiz << 1) != NULL) 765 return; 766 printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); 767 768 if (RPC_IS_ASYNC(task) || !signalled()) { 769 xprt_release(task); 770 task->tk_action = call_reserve; 771 rpc_delay(task, HZ>>4); 772 return; 773 } 774 775 rpc_exit(task, -ERESTARTSYS); 776 } 777 778 static inline int 779 rpc_task_need_encode(struct rpc_task *task) 780 { 781 return task->tk_rqstp->rq_snd_buf.len == 0; 782 } 783 784 static inline void 785 rpc_task_force_reencode(struct rpc_task *task) 786 { 787 task->tk_rqstp->rq_snd_buf.len = 0; 788 } 789 790 /* 791 * 3. Encode arguments of an RPC call 792 */ 793 static void 794 call_encode(struct rpc_task *task) 795 { 796 struct rpc_rqst *req = task->tk_rqstp; 797 struct xdr_buf *sndbuf = &req->rq_snd_buf; 798 struct xdr_buf *rcvbuf = &req->rq_rcv_buf; 799 unsigned int bufsiz; 800 kxdrproc_t encode; 801 __be32 *p; 802 803 dprintk("RPC: %4d call_encode (status %d)\n", 804 task->tk_pid, task->tk_status); 805 806 /* Default buffer setup */ 807 bufsiz = req->rq_bufsize >> 1; 808 sndbuf->head[0].iov_base = (void *)req->rq_buffer; 809 sndbuf->head[0].iov_len = bufsiz; 810 sndbuf->tail[0].iov_len = 0; 811 sndbuf->page_len = 0; 812 sndbuf->len = 0; 813 sndbuf->buflen = bufsiz; 814 rcvbuf->head[0].iov_base = (void *)((char *)req->rq_buffer + bufsiz); 815 rcvbuf->head[0].iov_len = bufsiz; 816 rcvbuf->tail[0].iov_len = 0; 817 rcvbuf->page_len = 0; 818 rcvbuf->len = 0; 819 rcvbuf->buflen = bufsiz; 820 821 /* Encode header and provided arguments */ 822 encode = task->tk_msg.rpc_proc->p_encode; 823 if (!(p = call_header(task))) { 824 printk(KERN_INFO "RPC: call_header failed, exit EIO\n"); 825 rpc_exit(task, -EIO); 826 return; 827 } 828 if (encode == NULL) 829 return; 830 831 lock_kernel(); 832 task->tk_status = rpcauth_wrap_req(task, encode, req, p, 833 task->tk_msg.rpc_argp); 834 unlock_kernel(); 835 if (task->tk_status == -ENOMEM) { 836 /* XXX: Is this sane? */ 837 rpc_delay(task, 3*HZ); 838 task->tk_status = -EAGAIN; 839 } 840 } 841 842 /* 843 * 4. Get the server port number if not yet set 844 */ 845 static void 846 call_bind(struct rpc_task *task) 847 { 848 struct rpc_xprt *xprt = task->tk_xprt; 849 850 dprintk("RPC: %4d call_bind (status %d)\n", 851 task->tk_pid, task->tk_status); 852 853 task->tk_action = call_connect; 854 if (!xprt_bound(xprt)) { 855 task->tk_action = call_bind_status; 856 task->tk_timeout = xprt->bind_timeout; 857 xprt->ops->rpcbind(task); 858 } 859 } 860 861 /* 862 * 4a. Sort out bind result 863 */ 864 static void 865 call_bind_status(struct rpc_task *task) 866 { 867 int status = -EACCES; 868 869 if (task->tk_status >= 0) { 870 dprintk("RPC: %4d call_bind_status (status %d)\n", 871 task->tk_pid, task->tk_status); 872 task->tk_status = 0; 873 task->tk_action = call_connect; 874 return; 875 } 876 877 switch (task->tk_status) { 878 case -EACCES: 879 dprintk("RPC: %4d remote rpcbind: RPC program/version unavailable\n", 880 task->tk_pid); 881 rpc_delay(task, 3*HZ); 882 goto retry_timeout; 883 case -ETIMEDOUT: 884 dprintk("RPC: %4d rpcbind request timed out\n", 885 task->tk_pid); 886 goto retry_timeout; 887 case -EPFNOSUPPORT: 888 dprintk("RPC: %4d remote rpcbind service unavailable\n", 889 task->tk_pid); 890 break; 891 case -EPROTONOSUPPORT: 892 dprintk("RPC: %4d remote rpcbind version 2 unavailable\n", 893 task->tk_pid); 894 break; 895 default: 896 dprintk("RPC: %4d unrecognized rpcbind error (%d)\n", 897 task->tk_pid, -task->tk_status); 898 status = -EIO; 899 } 900 901 rpc_exit(task, status); 902 return; 903 904 retry_timeout: 905 task->tk_action = call_timeout; 906 } 907 908 /* 909 * 4b. Connect to the RPC server 910 */ 911 static void 912 call_connect(struct rpc_task *task) 913 { 914 struct rpc_xprt *xprt = task->tk_xprt; 915 916 dprintk("RPC: %4d call_connect xprt %p %s connected\n", 917 task->tk_pid, xprt, 918 (xprt_connected(xprt) ? "is" : "is not")); 919 920 task->tk_action = call_transmit; 921 if (!xprt_connected(xprt)) { 922 task->tk_action = call_connect_status; 923 if (task->tk_status < 0) 924 return; 925 xprt_connect(task); 926 } 927 } 928 929 /* 930 * 4c. Sort out connect result 931 */ 932 static void 933 call_connect_status(struct rpc_task *task) 934 { 935 struct rpc_clnt *clnt = task->tk_client; 936 int status = task->tk_status; 937 938 dprintk("RPC: %5u call_connect_status (status %d)\n", 939 task->tk_pid, task->tk_status); 940 941 task->tk_status = 0; 942 if (status >= 0) { 943 clnt->cl_stats->netreconn++; 944 task->tk_action = call_transmit; 945 return; 946 } 947 948 /* Something failed: remote service port may have changed */ 949 rpc_force_rebind(clnt); 950 951 switch (status) { 952 case -ENOTCONN: 953 case -EAGAIN: 954 task->tk_action = call_bind; 955 if (!RPC_IS_SOFT(task)) 956 return; 957 /* if soft mounted, test if we've timed out */ 958 case -ETIMEDOUT: 959 task->tk_action = call_timeout; 960 return; 961 } 962 rpc_exit(task, -EIO); 963 } 964 965 /* 966 * 5. Transmit the RPC request, and wait for reply 967 */ 968 static void 969 call_transmit(struct rpc_task *task) 970 { 971 dprintk("RPC: %4d call_transmit (status %d)\n", 972 task->tk_pid, task->tk_status); 973 974 task->tk_action = call_status; 975 if (task->tk_status < 0) 976 return; 977 task->tk_status = xprt_prepare_transmit(task); 978 if (task->tk_status != 0) 979 return; 980 task->tk_action = call_transmit_status; 981 /* Encode here so that rpcsec_gss can use correct sequence number. */ 982 if (rpc_task_need_encode(task)) { 983 BUG_ON(task->tk_rqstp->rq_bytes_sent != 0); 984 call_encode(task); 985 /* Did the encode result in an error condition? */ 986 if (task->tk_status != 0) 987 return; 988 } 989 xprt_transmit(task); 990 if (task->tk_status < 0) 991 return; 992 /* 993 * On success, ensure that we call xprt_end_transmit() before sleeping 994 * in order to allow access to the socket to other RPC requests. 995 */ 996 call_transmit_status(task); 997 if (task->tk_msg.rpc_proc->p_decode != NULL) 998 return; 999 task->tk_action = rpc_exit_task; 1000 rpc_wake_up_task(task); 1001 } 1002 1003 /* 1004 * 5a. Handle cleanup after a transmission 1005 */ 1006 static void 1007 call_transmit_status(struct rpc_task *task) 1008 { 1009 task->tk_action = call_status; 1010 /* 1011 * Special case: if we've been waiting on the socket's write_space() 1012 * callback, then don't call xprt_end_transmit(). 1013 */ 1014 if (task->tk_status == -EAGAIN) 1015 return; 1016 xprt_end_transmit(task); 1017 rpc_task_force_reencode(task); 1018 } 1019 1020 /* 1021 * 6. Sort out the RPC call status 1022 */ 1023 static void 1024 call_status(struct rpc_task *task) 1025 { 1026 struct rpc_clnt *clnt = task->tk_client; 1027 struct rpc_rqst *req = task->tk_rqstp; 1028 int status; 1029 1030 if (req->rq_received > 0 && !req->rq_bytes_sent) 1031 task->tk_status = req->rq_received; 1032 1033 dprintk("RPC: %4d call_status (status %d)\n", 1034 task->tk_pid, task->tk_status); 1035 1036 status = task->tk_status; 1037 if (status >= 0) { 1038 task->tk_action = call_decode; 1039 return; 1040 } 1041 1042 task->tk_status = 0; 1043 switch(status) { 1044 case -EHOSTDOWN: 1045 case -EHOSTUNREACH: 1046 case -ENETUNREACH: 1047 /* 1048 * Delay any retries for 3 seconds, then handle as if it 1049 * were a timeout. 1050 */ 1051 rpc_delay(task, 3*HZ); 1052 case -ETIMEDOUT: 1053 task->tk_action = call_timeout; 1054 break; 1055 case -ECONNREFUSED: 1056 case -ENOTCONN: 1057 rpc_force_rebind(clnt); 1058 task->tk_action = call_bind; 1059 break; 1060 case -EAGAIN: 1061 task->tk_action = call_transmit; 1062 break; 1063 case -EIO: 1064 /* shutdown or soft timeout */ 1065 rpc_exit(task, status); 1066 break; 1067 default: 1068 printk("%s: RPC call returned error %d\n", 1069 clnt->cl_protname, -status); 1070 rpc_exit(task, status); 1071 } 1072 } 1073 1074 /* 1075 * 6a. Handle RPC timeout 1076 * We do not release the request slot, so we keep using the 1077 * same XID for all retransmits. 1078 */ 1079 static void 1080 call_timeout(struct rpc_task *task) 1081 { 1082 struct rpc_clnt *clnt = task->tk_client; 1083 1084 if (xprt_adjust_timeout(task->tk_rqstp) == 0) { 1085 dprintk("RPC: %4d call_timeout (minor)\n", task->tk_pid); 1086 goto retry; 1087 } 1088 1089 dprintk("RPC: %4d call_timeout (major)\n", task->tk_pid); 1090 task->tk_timeouts++; 1091 1092 if (RPC_IS_SOFT(task)) { 1093 printk(KERN_NOTICE "%s: server %s not responding, timed out\n", 1094 clnt->cl_protname, clnt->cl_server); 1095 rpc_exit(task, -EIO); 1096 return; 1097 } 1098 1099 if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) { 1100 task->tk_flags |= RPC_CALL_MAJORSEEN; 1101 printk(KERN_NOTICE "%s: server %s not responding, still trying\n", 1102 clnt->cl_protname, clnt->cl_server); 1103 } 1104 rpc_force_rebind(clnt); 1105 1106 retry: 1107 clnt->cl_stats->rpcretrans++; 1108 task->tk_action = call_bind; 1109 task->tk_status = 0; 1110 } 1111 1112 /* 1113 * 7. Decode the RPC reply 1114 */ 1115 static void 1116 call_decode(struct rpc_task *task) 1117 { 1118 struct rpc_clnt *clnt = task->tk_client; 1119 struct rpc_rqst *req = task->tk_rqstp; 1120 kxdrproc_t decode = task->tk_msg.rpc_proc->p_decode; 1121 __be32 *p; 1122 1123 dprintk("RPC: %4d call_decode (status %d)\n", 1124 task->tk_pid, task->tk_status); 1125 1126 if (task->tk_flags & RPC_CALL_MAJORSEEN) { 1127 printk(KERN_NOTICE "%s: server %s OK\n", 1128 clnt->cl_protname, clnt->cl_server); 1129 task->tk_flags &= ~RPC_CALL_MAJORSEEN; 1130 } 1131 1132 if (task->tk_status < 12) { 1133 if (!RPC_IS_SOFT(task)) { 1134 task->tk_action = call_bind; 1135 clnt->cl_stats->rpcretrans++; 1136 goto out_retry; 1137 } 1138 dprintk("%s: too small RPC reply size (%d bytes)\n", 1139 clnt->cl_protname, task->tk_status); 1140 task->tk_action = call_timeout; 1141 goto out_retry; 1142 } 1143 1144 /* 1145 * Ensure that we see all writes made by xprt_complete_rqst() 1146 * before it changed req->rq_received. 1147 */ 1148 smp_rmb(); 1149 req->rq_rcv_buf.len = req->rq_private_buf.len; 1150 1151 /* Check that the softirq receive buffer is valid */ 1152 WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf, 1153 sizeof(req->rq_rcv_buf)) != 0); 1154 1155 /* Verify the RPC header */ 1156 p = call_verify(task); 1157 if (IS_ERR(p)) { 1158 if (p == ERR_PTR(-EAGAIN)) 1159 goto out_retry; 1160 return; 1161 } 1162 1163 task->tk_action = rpc_exit_task; 1164 1165 if (decode) { 1166 lock_kernel(); 1167 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p, 1168 task->tk_msg.rpc_resp); 1169 unlock_kernel(); 1170 } 1171 dprintk("RPC: %4d call_decode result %d\n", task->tk_pid, 1172 task->tk_status); 1173 return; 1174 out_retry: 1175 req->rq_received = req->rq_private_buf.len = 0; 1176 task->tk_status = 0; 1177 } 1178 1179 /* 1180 * 8. Refresh the credentials if rejected by the server 1181 */ 1182 static void 1183 call_refresh(struct rpc_task *task) 1184 { 1185 dprintk("RPC: %4d call_refresh\n", task->tk_pid); 1186 1187 xprt_release(task); /* Must do to obtain new XID */ 1188 task->tk_action = call_refreshresult; 1189 task->tk_status = 0; 1190 task->tk_client->cl_stats->rpcauthrefresh++; 1191 rpcauth_refreshcred(task); 1192 } 1193 1194 /* 1195 * 8a. Process the results of a credential refresh 1196 */ 1197 static void 1198 call_refreshresult(struct rpc_task *task) 1199 { 1200 int status = task->tk_status; 1201 dprintk("RPC: %4d call_refreshresult (status %d)\n", 1202 task->tk_pid, task->tk_status); 1203 1204 task->tk_status = 0; 1205 task->tk_action = call_reserve; 1206 if (status >= 0 && rpcauth_uptodatecred(task)) 1207 return; 1208 if (status == -EACCES) { 1209 rpc_exit(task, -EACCES); 1210 return; 1211 } 1212 task->tk_action = call_refresh; 1213 if (status != -ETIMEDOUT) 1214 rpc_delay(task, 3*HZ); 1215 return; 1216 } 1217 1218 /* 1219 * Call header serialization 1220 */ 1221 static __be32 * 1222 call_header(struct rpc_task *task) 1223 { 1224 struct rpc_clnt *clnt = task->tk_client; 1225 struct rpc_rqst *req = task->tk_rqstp; 1226 __be32 *p = req->rq_svec[0].iov_base; 1227 1228 /* FIXME: check buffer size? */ 1229 1230 p = xprt_skip_transport_header(task->tk_xprt, p); 1231 *p++ = req->rq_xid; /* XID */ 1232 *p++ = htonl(RPC_CALL); /* CALL */ 1233 *p++ = htonl(RPC_VERSION); /* RPC version */ 1234 *p++ = htonl(clnt->cl_prog); /* program number */ 1235 *p++ = htonl(clnt->cl_vers); /* program version */ 1236 *p++ = htonl(task->tk_msg.rpc_proc->p_proc); /* procedure */ 1237 p = rpcauth_marshcred(task, p); 1238 req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p); 1239 return p; 1240 } 1241 1242 /* 1243 * Reply header verification 1244 */ 1245 static __be32 * 1246 call_verify(struct rpc_task *task) 1247 { 1248 struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0]; 1249 int len = task->tk_rqstp->rq_rcv_buf.len >> 2; 1250 __be32 *p = iov->iov_base; 1251 u32 n; 1252 int error = -EACCES; 1253 1254 if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) { 1255 /* RFC-1014 says that the representation of XDR data must be a 1256 * multiple of four bytes 1257 * - if it isn't pointer subtraction in the NFS client may give 1258 * undefined results 1259 */ 1260 printk(KERN_WARNING 1261 "call_verify: XDR representation not a multiple of" 1262 " 4 bytes: 0x%x\n", task->tk_rqstp->rq_rcv_buf.len); 1263 goto out_eio; 1264 } 1265 if ((len -= 3) < 0) 1266 goto out_overflow; 1267 p += 1; /* skip XID */ 1268 1269 if ((n = ntohl(*p++)) != RPC_REPLY) { 1270 printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n); 1271 goto out_garbage; 1272 } 1273 if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) { 1274 if (--len < 0) 1275 goto out_overflow; 1276 switch ((n = ntohl(*p++))) { 1277 case RPC_AUTH_ERROR: 1278 break; 1279 case RPC_MISMATCH: 1280 dprintk("%s: RPC call version mismatch!\n", __FUNCTION__); 1281 error = -EPROTONOSUPPORT; 1282 goto out_err; 1283 default: 1284 dprintk("%s: RPC call rejected, unknown error: %x\n", __FUNCTION__, n); 1285 goto out_eio; 1286 } 1287 if (--len < 0) 1288 goto out_overflow; 1289 switch ((n = ntohl(*p++))) { 1290 case RPC_AUTH_REJECTEDCRED: 1291 case RPC_AUTH_REJECTEDVERF: 1292 case RPCSEC_GSS_CREDPROBLEM: 1293 case RPCSEC_GSS_CTXPROBLEM: 1294 if (!task->tk_cred_retry) 1295 break; 1296 task->tk_cred_retry--; 1297 dprintk("RPC: %4d call_verify: retry stale creds\n", 1298 task->tk_pid); 1299 rpcauth_invalcred(task); 1300 task->tk_action = call_refresh; 1301 goto out_retry; 1302 case RPC_AUTH_BADCRED: 1303 case RPC_AUTH_BADVERF: 1304 /* possibly garbled cred/verf? */ 1305 if (!task->tk_garb_retry) 1306 break; 1307 task->tk_garb_retry--; 1308 dprintk("RPC: %4d call_verify: retry garbled creds\n", 1309 task->tk_pid); 1310 task->tk_action = call_bind; 1311 goto out_retry; 1312 case RPC_AUTH_TOOWEAK: 1313 printk(KERN_NOTICE "call_verify: server %s requires stronger " 1314 "authentication.\n", task->tk_client->cl_server); 1315 break; 1316 default: 1317 printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n); 1318 error = -EIO; 1319 } 1320 dprintk("RPC: %4d call_verify: call rejected %d\n", 1321 task->tk_pid, n); 1322 goto out_err; 1323 } 1324 if (!(p = rpcauth_checkverf(task, p))) { 1325 printk(KERN_WARNING "call_verify: auth check failed\n"); 1326 goto out_garbage; /* bad verifier, retry */ 1327 } 1328 len = p - (__be32 *)iov->iov_base - 1; 1329 if (len < 0) 1330 goto out_overflow; 1331 switch ((n = ntohl(*p++))) { 1332 case RPC_SUCCESS: 1333 return p; 1334 case RPC_PROG_UNAVAIL: 1335 dprintk("RPC: call_verify: program %u is unsupported by server %s\n", 1336 (unsigned int)task->tk_client->cl_prog, 1337 task->tk_client->cl_server); 1338 error = -EPFNOSUPPORT; 1339 goto out_err; 1340 case RPC_PROG_MISMATCH: 1341 dprintk("RPC: call_verify: program %u, version %u unsupported by server %s\n", 1342 (unsigned int)task->tk_client->cl_prog, 1343 (unsigned int)task->tk_client->cl_vers, 1344 task->tk_client->cl_server); 1345 error = -EPROTONOSUPPORT; 1346 goto out_err; 1347 case RPC_PROC_UNAVAIL: 1348 dprintk("RPC: call_verify: proc %p unsupported by program %u, version %u on server %s\n", 1349 task->tk_msg.rpc_proc, 1350 task->tk_client->cl_prog, 1351 task->tk_client->cl_vers, 1352 task->tk_client->cl_server); 1353 error = -EOPNOTSUPP; 1354 goto out_err; 1355 case RPC_GARBAGE_ARGS: 1356 dprintk("RPC: %4d %s: server saw garbage\n", task->tk_pid, __FUNCTION__); 1357 break; /* retry */ 1358 default: 1359 printk(KERN_WARNING "call_verify: server accept status: %x\n", n); 1360 /* Also retry */ 1361 } 1362 1363 out_garbage: 1364 task->tk_client->cl_stats->rpcgarbage++; 1365 if (task->tk_garb_retry) { 1366 task->tk_garb_retry--; 1367 dprintk("RPC %s: retrying %4d\n", __FUNCTION__, task->tk_pid); 1368 task->tk_action = call_bind; 1369 out_retry: 1370 return ERR_PTR(-EAGAIN); 1371 } 1372 printk(KERN_WARNING "RPC %s: retry failed, exit EIO\n", __FUNCTION__); 1373 out_eio: 1374 error = -EIO; 1375 out_err: 1376 rpc_exit(task, error); 1377 return ERR_PTR(error); 1378 out_overflow: 1379 printk(KERN_WARNING "RPC %s: server reply was truncated.\n", __FUNCTION__); 1380 goto out_garbage; 1381 } 1382 1383 static int rpcproc_encode_null(void *rqstp, __be32 *data, void *obj) 1384 { 1385 return 0; 1386 } 1387 1388 static int rpcproc_decode_null(void *rqstp, __be32 *data, void *obj) 1389 { 1390 return 0; 1391 } 1392 1393 static struct rpc_procinfo rpcproc_null = { 1394 .p_encode = rpcproc_encode_null, 1395 .p_decode = rpcproc_decode_null, 1396 }; 1397 1398 int rpc_ping(struct rpc_clnt *clnt, int flags) 1399 { 1400 struct rpc_message msg = { 1401 .rpc_proc = &rpcproc_null, 1402 }; 1403 int err; 1404 msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0); 1405 err = rpc_call_sync(clnt, &msg, flags); 1406 put_rpccred(msg.rpc_cred); 1407 return err; 1408 } 1409