1 /* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice immediately at the beginning of the file, without modification, 10 * this list of conditions, and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. Absolutely no warranty of function or purpose is made by the author 15 * John S. Dyson. 16 * 4. Modifications may be freely made to this file if the above conditions 17 * are met. 18 * 19 * $Id: sys_pipe.c,v 1.26 1997/03/23 03:36:24 bde Exp $ 20 */ 21 22 #ifndef OLD_PIPE 23 24 /* 25 * This file contains a high-performance replacement for the socket-based 26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 27 * all features of sockets, but does do everything that pipes normally 28 * do. 29 */ 30 31 /* 32 * This code has two modes of operation, a small write mode and a large 33 * write mode. The small write mode acts like conventional pipes with 34 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 35 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 36 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and 37 * the receiving process can copy it directly from the pages in the sending 38 * process. 39 * 40 * If the sending process receives a signal, it is possible that it will 41 * go away, and certainly its address space can change, because control 42 * is returned back to the user-mode side. In that case, the pipe code 43 * arranges to copy the buffer supplied by the user process, to a pageable 44 * kernel buffer, and the receiving process will grab the data from the 45 * pageable kernel buffer. Since signals don't happen all that often, 46 * the copy operation is normally eliminated. 47 * 48 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 49 * happen for small transfers so that the system will not spend all of 50 * its time context switching. PIPE_SIZE is constrained by the 51 * amount of kernel virtual memory. 52 */ 53 54 #include <sys/param.h> 55 #include <sys/systm.h> 56 #include <sys/proc.h> 57 #include <sys/fcntl.h> 58 #include <sys/file.h> 59 #include <sys/protosw.h> 60 #include <sys/stat.h> 61 #include <sys/filedesc.h> 62 #include <sys/malloc.h> 63 #include <sys/filio.h> 64 #include <sys/ttycom.h> 65 #include <sys/stat.h> 66 #include <sys/select.h> 67 #include <sys/signalvar.h> 68 #include <sys/errno.h> 69 #include <sys/queue.h> 70 #include <sys/vmmeter.h> 71 #include <sys/kernel.h> 72 #include <sys/sysproto.h> 73 #include <sys/pipe.h> 74 75 #include <vm/vm.h> 76 #include <vm/vm_prot.h> 77 #include <vm/vm_param.h> 78 #include <sys/lock.h> 79 #include <vm/vm_object.h> 80 #include <vm/vm_kern.h> 81 #include <vm/vm_extern.h> 82 #include <vm/pmap.h> 83 #include <vm/vm_map.h> 84 #include <vm/vm_page.h> 85 86 /* 87 * Use this define if you want to disable *fancy* VM things. Expect an 88 * approx 30% decrease in transfer rate. This could be useful for 89 * NetBSD or OpenBSD. 90 */ 91 /* #define PIPE_NODIRECT */ 92 93 /* 94 * interfaces to the outside world 95 */ 96 static int pipe_read __P((struct file *fp, struct uio *uio, 97 struct ucred *cred)); 98 static int pipe_write __P((struct file *fp, struct uio *uio, 99 struct ucred *cred)); 100 static int pipe_close __P((struct file *fp, struct proc *p)); 101 static int pipe_select __P((struct file *fp, int which, struct proc *p)); 102 static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p)); 103 104 static struct fileops pipeops = 105 { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close }; 106 107 /* 108 * Default pipe buffer size(s), this can be kind-of large now because pipe 109 * space is pageable. The pipe code will try to maintain locality of 110 * reference for performance reasons, so small amounts of outstanding I/O 111 * will not wipe the cache. 112 */ 113 #define MINPIPESIZE (PIPE_SIZE/3) 114 #define MAXPIPESIZE (2*PIPE_SIZE/3) 115 116 /* 117 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but 118 * is there so that on large systems, we don't exhaust it. 119 */ 120 #define MAXPIPEKVA (8*1024*1024) 121 122 /* 123 * Limit for direct transfers, we cannot, of course limit 124 * the amount of kva for pipes in general though. 125 */ 126 #define LIMITPIPEKVA (16*1024*1024) 127 128 /* 129 * Limit the number of "big" pipes 130 */ 131 #define LIMITBIGPIPES 32 132 int nbigpipe; 133 134 static int amountpipekva; 135 136 static void pipeclose __P((struct pipe *cpipe)); 137 static void pipeinit __P((struct pipe *cpipe)); 138 static __inline int pipelock __P((struct pipe *cpipe, int catch)); 139 static __inline void pipeunlock __P((struct pipe *cpipe)); 140 static __inline void pipeselwakeup __P((struct pipe *cpipe)); 141 #ifndef PIPE_NODIRECT 142 static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio)); 143 static void pipe_destroy_write_buffer __P((struct pipe *wpipe)); 144 static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio)); 145 static void pipe_clone_write_buffer __P((struct pipe *wpipe)); 146 #endif 147 static void pipespace __P((struct pipe *cpipe)); 148 149 /* 150 * The pipe system call for the DTYPE_PIPE type of pipes 151 */ 152 153 /* ARGSUSED */ 154 int 155 pipe(p, uap, retval) 156 struct proc *p; 157 struct pipe_args /* { 158 int dummy; 159 } */ *uap; 160 int retval[]; 161 { 162 register struct filedesc *fdp = p->p_fd; 163 struct file *rf, *wf; 164 struct pipe *rpipe, *wpipe; 165 int fd, error; 166 167 rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK); 168 pipeinit(rpipe); 169 rpipe->pipe_state |= PIPE_DIRECTOK; 170 wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK); 171 pipeinit(wpipe); 172 wpipe->pipe_state |= PIPE_DIRECTOK; 173 174 error = falloc(p, &rf, &fd); 175 if (error) 176 goto free2; 177 retval[0] = fd; 178 rf->f_flag = FREAD | FWRITE; 179 rf->f_type = DTYPE_PIPE; 180 rf->f_ops = &pipeops; 181 rf->f_data = (caddr_t)rpipe; 182 error = falloc(p, &wf, &fd); 183 if (error) 184 goto free3; 185 wf->f_flag = FREAD | FWRITE; 186 wf->f_type = DTYPE_PIPE; 187 wf->f_ops = &pipeops; 188 wf->f_data = (caddr_t)wpipe; 189 retval[1] = fd; 190 191 rpipe->pipe_peer = wpipe; 192 wpipe->pipe_peer = rpipe; 193 194 return (0); 195 free3: 196 ffree(rf); 197 fdp->fd_ofiles[retval[0]] = 0; 198 free2: 199 (void)pipeclose(wpipe); 200 (void)pipeclose(rpipe); 201 return (error); 202 } 203 204 /* 205 * Allocate kva for pipe circular buffer, the space is pageable 206 */ 207 static void 208 pipespace(cpipe) 209 struct pipe *cpipe; 210 { 211 int npages, error; 212 213 npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE; 214 /* 215 * Create an object, I don't like the idea of paging to/from 216 * kernel_object. 217 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 218 */ 219 cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages); 220 cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map); 221 222 /* 223 * Insert the object into the kernel map, and allocate kva for it. 224 * The map entry is, by default, pageable. 225 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 226 */ 227 error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0, 228 (vm_offset_t *) &cpipe->pipe_buffer.buffer, 229 cpipe->pipe_buffer.size, 1, 230 VM_PROT_ALL, VM_PROT_ALL, 0); 231 232 if (error != KERN_SUCCESS) 233 panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error); 234 amountpipekva += cpipe->pipe_buffer.size; 235 } 236 237 /* 238 * initialize and allocate VM and memory for pipe 239 */ 240 static void 241 pipeinit(cpipe) 242 struct pipe *cpipe; 243 { 244 int s; 245 246 cpipe->pipe_buffer.in = 0; 247 cpipe->pipe_buffer.out = 0; 248 cpipe->pipe_buffer.cnt = 0; 249 cpipe->pipe_buffer.size = PIPE_SIZE; 250 251 /* Buffer kva gets dynamically allocated */ 252 cpipe->pipe_buffer.buffer = NULL; 253 /* cpipe->pipe_buffer.object = invalid */ 254 255 cpipe->pipe_state = 0; 256 cpipe->pipe_peer = NULL; 257 cpipe->pipe_busy = 0; 258 gettime(&cpipe->pipe_ctime); 259 cpipe->pipe_atime = cpipe->pipe_ctime; 260 cpipe->pipe_mtime = cpipe->pipe_ctime; 261 bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel); 262 cpipe->pipe_pgid = NO_PID; 263 264 #ifndef PIPE_NODIRECT 265 /* 266 * pipe data structure initializations to support direct pipe I/O 267 */ 268 cpipe->pipe_map.cnt = 0; 269 cpipe->pipe_map.kva = 0; 270 cpipe->pipe_map.pos = 0; 271 cpipe->pipe_map.npages = 0; 272 /* cpipe->pipe_map.ms[] = invalid */ 273 #endif 274 } 275 276 277 /* 278 * lock a pipe for I/O, blocking other access 279 */ 280 static __inline int 281 pipelock(cpipe, catch) 282 struct pipe *cpipe; 283 int catch; 284 { 285 int error; 286 while (cpipe->pipe_state & PIPE_LOCK) { 287 cpipe->pipe_state |= PIPE_LWANT; 288 if (error = tsleep( cpipe, 289 catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) { 290 return error; 291 } 292 } 293 cpipe->pipe_state |= PIPE_LOCK; 294 return 0; 295 } 296 297 /* 298 * unlock a pipe I/O lock 299 */ 300 static __inline void 301 pipeunlock(cpipe) 302 struct pipe *cpipe; 303 { 304 cpipe->pipe_state &= ~PIPE_LOCK; 305 if (cpipe->pipe_state & PIPE_LWANT) { 306 cpipe->pipe_state &= ~PIPE_LWANT; 307 wakeup(cpipe); 308 } 309 } 310 311 static __inline void 312 pipeselwakeup(cpipe) 313 struct pipe *cpipe; 314 { 315 struct proc *p; 316 317 if (cpipe->pipe_state & PIPE_SEL) { 318 cpipe->pipe_state &= ~PIPE_SEL; 319 selwakeup(&cpipe->pipe_sel); 320 } 321 if (cpipe->pipe_state & PIPE_ASYNC) { 322 if (cpipe->pipe_pgid < 0) 323 gsignal(-cpipe->pipe_pgid, SIGIO); 324 else if ((p = pfind(cpipe->pipe_pgid)) != NULL) 325 psignal(p, SIGIO); 326 } 327 } 328 329 /* ARGSUSED */ 330 static int 331 pipe_read(fp, uio, cred) 332 struct file *fp; 333 struct uio *uio; 334 struct ucred *cred; 335 { 336 337 struct pipe *rpipe = (struct pipe *) fp->f_data; 338 int error = 0; 339 int nread = 0; 340 u_int size; 341 342 ++rpipe->pipe_busy; 343 while (uio->uio_resid) { 344 /* 345 * normal pipe buffer receive 346 */ 347 if (rpipe->pipe_buffer.cnt > 0) { 348 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 349 if (size > rpipe->pipe_buffer.cnt) 350 size = rpipe->pipe_buffer.cnt; 351 if (size > (u_int) uio->uio_resid) 352 size = (u_int) uio->uio_resid; 353 if ((error = pipelock(rpipe,1)) == 0) { 354 error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 355 size, uio); 356 pipeunlock(rpipe); 357 } 358 if (error) { 359 break; 360 } 361 rpipe->pipe_buffer.out += size; 362 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 363 rpipe->pipe_buffer.out = 0; 364 365 rpipe->pipe_buffer.cnt -= size; 366 nread += size; 367 #ifndef PIPE_NODIRECT 368 /* 369 * Direct copy, bypassing a kernel buffer. 370 */ 371 } else if ((size = rpipe->pipe_map.cnt) && 372 (rpipe->pipe_state & PIPE_DIRECTW)) { 373 caddr_t va; 374 if (size > (u_int) uio->uio_resid) 375 size = (u_int) uio->uio_resid; 376 if ((error = pipelock(rpipe,1)) == 0) { 377 va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos; 378 error = uiomove(va, size, uio); 379 pipeunlock(rpipe); 380 } 381 if (error) 382 break; 383 nread += size; 384 rpipe->pipe_map.pos += size; 385 rpipe->pipe_map.cnt -= size; 386 if (rpipe->pipe_map.cnt == 0) { 387 rpipe->pipe_state &= ~PIPE_DIRECTW; 388 wakeup(rpipe); 389 } 390 #endif 391 } else { 392 /* 393 * detect EOF condition 394 */ 395 if (rpipe->pipe_state & PIPE_EOF) { 396 /* XXX error = ? */ 397 break; 398 } 399 /* 400 * If the "write-side" has been blocked, wake it up now. 401 */ 402 if (rpipe->pipe_state & PIPE_WANTW) { 403 rpipe->pipe_state &= ~PIPE_WANTW; 404 wakeup(rpipe); 405 } 406 if (nread > 0) 407 break; 408 409 if (fp->f_flag & FNONBLOCK) { 410 error = EAGAIN; 411 break; 412 } 413 414 /* 415 * If there is no more to read in the pipe, reset 416 * its pointers to the beginning. This improves 417 * cache hit stats. 418 */ 419 420 if ((error = pipelock(rpipe,1)) == 0) { 421 if (rpipe->pipe_buffer.cnt == 0) { 422 rpipe->pipe_buffer.in = 0; 423 rpipe->pipe_buffer.out = 0; 424 } 425 pipeunlock(rpipe); 426 } else { 427 break; 428 } 429 430 if (rpipe->pipe_state & PIPE_WANTW) { 431 rpipe->pipe_state &= ~PIPE_WANTW; 432 wakeup(rpipe); 433 } 434 435 rpipe->pipe_state |= PIPE_WANTR; 436 if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) { 437 break; 438 } 439 } 440 } 441 442 if (error == 0) 443 gettime(&rpipe->pipe_atime); 444 445 --rpipe->pipe_busy; 446 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 447 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 448 wakeup(rpipe); 449 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 450 /* 451 * If there is no more to read in the pipe, reset 452 * its pointers to the beginning. This improves 453 * cache hit stats. 454 */ 455 if (rpipe->pipe_buffer.cnt == 0) { 456 if ((error == 0) && (error = pipelock(rpipe,1)) == 0) { 457 rpipe->pipe_buffer.in = 0; 458 rpipe->pipe_buffer.out = 0; 459 pipeunlock(rpipe); 460 } 461 } 462 463 /* 464 * If the "write-side" has been blocked, wake it up now. 465 */ 466 if (rpipe->pipe_state & PIPE_WANTW) { 467 rpipe->pipe_state &= ~PIPE_WANTW; 468 wakeup(rpipe); 469 } 470 } 471 472 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 473 pipeselwakeup(rpipe); 474 475 return error; 476 } 477 478 #ifndef PIPE_NODIRECT 479 /* 480 * Map the sending processes' buffer into kernel space and wire it. 481 * This is similar to a physical write operation. 482 */ 483 static int 484 pipe_build_write_buffer(wpipe, uio) 485 struct pipe *wpipe; 486 struct uio *uio; 487 { 488 u_int size; 489 int i; 490 vm_offset_t addr, endaddr, paddr; 491 492 size = (u_int) uio->uio_iov->iov_len; 493 if (size > wpipe->pipe_buffer.size) 494 size = wpipe->pipe_buffer.size; 495 496 endaddr = round_page(uio->uio_iov->iov_base + size); 497 for(i = 0, addr = trunc_page(uio->uio_iov->iov_base); 498 addr < endaddr; 499 addr += PAGE_SIZE, i+=1) { 500 501 vm_page_t m; 502 503 vm_fault_quick( (caddr_t) addr, VM_PROT_READ); 504 paddr = pmap_kextract(addr); 505 if (!paddr) { 506 int j; 507 for(j=0;j<i;j++) 508 vm_page_unwire(wpipe->pipe_map.ms[j]); 509 return EFAULT; 510 } 511 512 m = PHYS_TO_VM_PAGE(paddr); 513 vm_page_wire(m); 514 wpipe->pipe_map.ms[i] = m; 515 } 516 517 /* 518 * set up the control block 519 */ 520 wpipe->pipe_map.npages = i; 521 wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK; 522 wpipe->pipe_map.cnt = size; 523 524 /* 525 * and map the buffer 526 */ 527 if (wpipe->pipe_map.kva == 0) { 528 /* 529 * We need to allocate space for an extra page because the 530 * address range might (will) span pages at times. 531 */ 532 wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map, 533 wpipe->pipe_buffer.size + PAGE_SIZE); 534 amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE; 535 } 536 pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms, 537 wpipe->pipe_map.npages); 538 539 /* 540 * and update the uio data 541 */ 542 543 uio->uio_iov->iov_len -= size; 544 uio->uio_iov->iov_base += size; 545 if (uio->uio_iov->iov_len == 0) 546 uio->uio_iov++; 547 uio->uio_resid -= size; 548 uio->uio_offset += size; 549 return 0; 550 } 551 552 /* 553 * unmap and unwire the process buffer 554 */ 555 static void 556 pipe_destroy_write_buffer(wpipe) 557 struct pipe *wpipe; 558 { 559 int i; 560 if (wpipe->pipe_map.kva) { 561 pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages); 562 563 if (amountpipekva > MAXPIPEKVA) { 564 vm_offset_t kva = wpipe->pipe_map.kva; 565 wpipe->pipe_map.kva = 0; 566 kmem_free(kernel_map, kva, 567 wpipe->pipe_buffer.size + PAGE_SIZE); 568 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE; 569 } 570 } 571 for (i=0;i<wpipe->pipe_map.npages;i++) 572 vm_page_unwire(wpipe->pipe_map.ms[i]); 573 } 574 575 /* 576 * In the case of a signal, the writing process might go away. This 577 * code copies the data into the circular buffer so that the source 578 * pages can be freed without loss of data. 579 */ 580 static void 581 pipe_clone_write_buffer(wpipe) 582 struct pipe *wpipe; 583 { 584 int size; 585 int pos; 586 587 size = wpipe->pipe_map.cnt; 588 pos = wpipe->pipe_map.pos; 589 bcopy((caddr_t) wpipe->pipe_map.kva+pos, 590 (caddr_t) wpipe->pipe_buffer.buffer, 591 size); 592 593 wpipe->pipe_buffer.in = size; 594 wpipe->pipe_buffer.out = 0; 595 wpipe->pipe_buffer.cnt = size; 596 wpipe->pipe_state &= ~PIPE_DIRECTW; 597 598 pipe_destroy_write_buffer(wpipe); 599 } 600 601 /* 602 * This implements the pipe buffer write mechanism. Note that only 603 * a direct write OR a normal pipe write can be pending at any given time. 604 * If there are any characters in the pipe buffer, the direct write will 605 * be deferred until the receiving process grabs all of the bytes from 606 * the pipe buffer. Then the direct mapping write is set-up. 607 */ 608 static int 609 pipe_direct_write(wpipe, uio) 610 struct pipe *wpipe; 611 struct uio *uio; 612 { 613 int error; 614 retry: 615 while (wpipe->pipe_state & PIPE_DIRECTW) { 616 if ( wpipe->pipe_state & PIPE_WANTR) { 617 wpipe->pipe_state &= ~PIPE_WANTR; 618 wakeup(wpipe); 619 } 620 wpipe->pipe_state |= PIPE_WANTW; 621 error = tsleep(wpipe, 622 PRIBIO|PCATCH, "pipdww", 0); 623 if (error) 624 goto error1; 625 if (wpipe->pipe_state & PIPE_EOF) { 626 error = EPIPE; 627 goto error1; 628 } 629 } 630 wpipe->pipe_map.cnt = 0; /* transfer not ready yet */ 631 if (wpipe->pipe_buffer.cnt > 0) { 632 if ( wpipe->pipe_state & PIPE_WANTR) { 633 wpipe->pipe_state &= ~PIPE_WANTR; 634 wakeup(wpipe); 635 } 636 637 wpipe->pipe_state |= PIPE_WANTW; 638 error = tsleep(wpipe, 639 PRIBIO|PCATCH, "pipdwc", 0); 640 if (error) 641 goto error1; 642 if (wpipe->pipe_state & PIPE_EOF) { 643 error = EPIPE; 644 goto error1; 645 } 646 goto retry; 647 } 648 649 wpipe->pipe_state |= PIPE_DIRECTW; 650 651 error = pipe_build_write_buffer(wpipe, uio); 652 if (error) { 653 wpipe->pipe_state &= ~PIPE_DIRECTW; 654 goto error1; 655 } 656 657 error = 0; 658 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) { 659 if (wpipe->pipe_state & PIPE_EOF) { 660 pipelock(wpipe, 0); 661 pipe_destroy_write_buffer(wpipe); 662 pipeunlock(wpipe); 663 pipeselwakeup(wpipe); 664 error = EPIPE; 665 goto error1; 666 } 667 if (wpipe->pipe_state & PIPE_WANTR) { 668 wpipe->pipe_state &= ~PIPE_WANTR; 669 wakeup(wpipe); 670 } 671 pipeselwakeup(wpipe); 672 error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0); 673 } 674 675 pipelock(wpipe,0); 676 if (wpipe->pipe_state & PIPE_DIRECTW) { 677 /* 678 * this bit of trickery substitutes a kernel buffer for 679 * the process that might be going away. 680 */ 681 pipe_clone_write_buffer(wpipe); 682 } else { 683 pipe_destroy_write_buffer(wpipe); 684 } 685 pipeunlock(wpipe); 686 return error; 687 688 error1: 689 wakeup(wpipe); 690 return error; 691 } 692 #endif 693 694 static int 695 pipe_write(fp, uio, cred) 696 struct file *fp; 697 struct uio *uio; 698 struct ucred *cred; 699 { 700 int error = 0; 701 int orig_resid; 702 703 struct pipe *wpipe, *rpipe; 704 705 rpipe = (struct pipe *) fp->f_data; 706 wpipe = rpipe->pipe_peer; 707 708 /* 709 * detect loss of pipe read side, issue SIGPIPE if lost. 710 */ 711 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 712 return EPIPE; 713 } 714 715 /* 716 * If it is advantageous to resize the pipe buffer, do 717 * so. 718 */ 719 if ((uio->uio_resid > PIPE_SIZE) && 720 (nbigpipe < LIMITBIGPIPES) && 721 (wpipe->pipe_state & PIPE_DIRECTW) == 0 && 722 (wpipe->pipe_buffer.size <= PIPE_SIZE) && 723 (wpipe->pipe_buffer.cnt == 0)) { 724 725 if (wpipe->pipe_buffer.buffer) { 726 amountpipekva -= wpipe->pipe_buffer.size; 727 kmem_free(kernel_map, 728 (vm_offset_t)wpipe->pipe_buffer.buffer, 729 wpipe->pipe_buffer.size); 730 } 731 732 #ifndef PIPE_NODIRECT 733 if (wpipe->pipe_map.kva) { 734 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE; 735 kmem_free(kernel_map, 736 wpipe->pipe_map.kva, 737 wpipe->pipe_buffer.size + PAGE_SIZE); 738 } 739 #endif 740 741 wpipe->pipe_buffer.in = 0; 742 wpipe->pipe_buffer.out = 0; 743 wpipe->pipe_buffer.cnt = 0; 744 wpipe->pipe_buffer.size = BIG_PIPE_SIZE; 745 wpipe->pipe_buffer.buffer = NULL; 746 ++nbigpipe; 747 748 #ifndef PIPE_NODIRECT 749 wpipe->pipe_map.cnt = 0; 750 wpipe->pipe_map.kva = 0; 751 wpipe->pipe_map.pos = 0; 752 wpipe->pipe_map.npages = 0; 753 #endif 754 755 } 756 757 758 if( wpipe->pipe_buffer.buffer == NULL) { 759 if ((error = pipelock(wpipe,1)) == 0) { 760 pipespace(wpipe); 761 pipeunlock(wpipe); 762 } else { 763 return error; 764 } 765 } 766 767 ++wpipe->pipe_busy; 768 orig_resid = uio->uio_resid; 769 while (uio->uio_resid) { 770 int space; 771 #ifndef PIPE_NODIRECT 772 /* 773 * If the transfer is large, we can gain performance if 774 * we do process-to-process copies directly. 775 * If the write is non-blocking, we don't use the 776 * direct write mechanism. 777 */ 778 if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) && 779 (fp->f_flag & FNONBLOCK) == 0 && 780 (wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) && 781 (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) { 782 error = pipe_direct_write( wpipe, uio); 783 if (error) { 784 break; 785 } 786 continue; 787 } 788 #endif 789 790 /* 791 * Pipe buffered writes cannot be coincidental with 792 * direct writes. We wait until the currently executing 793 * direct write is completed before we start filling the 794 * pipe buffer. 795 */ 796 retrywrite: 797 while (wpipe->pipe_state & PIPE_DIRECTW) { 798 if (wpipe->pipe_state & PIPE_WANTR) { 799 wpipe->pipe_state &= ~PIPE_WANTR; 800 wakeup(wpipe); 801 } 802 error = tsleep(wpipe, 803 PRIBIO|PCATCH, "pipbww", 0); 804 if (error) 805 break; 806 } 807 808 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 809 810 /* Writes of size <= PIPE_BUF must be atomic. */ 811 /* XXX perhaps they need to be contiguous to be atomic? */ 812 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 813 space = 0; 814 815 if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) { 816 /* 817 * This set the maximum transfer as a segment of 818 * the buffer. 819 */ 820 int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in; 821 /* 822 * space is the size left in the buffer 823 */ 824 if (size > space) 825 size = space; 826 /* 827 * now limit it to the size of the uio transfer 828 */ 829 if (size > uio->uio_resid) 830 size = uio->uio_resid; 831 if ((error = pipelock(wpipe,1)) == 0) { 832 /* 833 * It is possible for a direct write to 834 * slip in on us... handle it here... 835 */ 836 if (wpipe->pipe_state & PIPE_DIRECTW) { 837 pipeunlock(wpipe); 838 goto retrywrite; 839 } 840 error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 841 size, uio); 842 pipeunlock(wpipe); 843 } 844 if (error) 845 break; 846 847 wpipe->pipe_buffer.in += size; 848 if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size) 849 wpipe->pipe_buffer.in = 0; 850 851 wpipe->pipe_buffer.cnt += size; 852 } else { 853 /* 854 * If the "read-side" has been blocked, wake it up now. 855 */ 856 if (wpipe->pipe_state & PIPE_WANTR) { 857 wpipe->pipe_state &= ~PIPE_WANTR; 858 wakeup(wpipe); 859 } 860 861 /* 862 * don't block on non-blocking I/O 863 */ 864 if (fp->f_flag & FNONBLOCK) { 865 error = EAGAIN; 866 break; 867 } 868 869 /* 870 * We have no more space and have something to offer, 871 * wake up selects. 872 */ 873 pipeselwakeup(wpipe); 874 875 wpipe->pipe_state |= PIPE_WANTW; 876 if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) { 877 break; 878 } 879 /* 880 * If read side wants to go away, we just issue a signal 881 * to ourselves. 882 */ 883 if (wpipe->pipe_state & PIPE_EOF) { 884 error = EPIPE; 885 break; 886 } 887 } 888 } 889 890 --wpipe->pipe_busy; 891 if ((wpipe->pipe_busy == 0) && 892 (wpipe->pipe_state & PIPE_WANT)) { 893 wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR); 894 wakeup(wpipe); 895 } else if (wpipe->pipe_buffer.cnt > 0) { 896 /* 897 * If we have put any characters in the buffer, we wake up 898 * the reader. 899 */ 900 if (wpipe->pipe_state & PIPE_WANTR) { 901 wpipe->pipe_state &= ~PIPE_WANTR; 902 wakeup(wpipe); 903 } 904 } 905 906 /* 907 * Don't return EPIPE if I/O was successful 908 */ 909 if ((wpipe->pipe_buffer.cnt == 0) && 910 (uio->uio_resid == 0) && 911 (error == EPIPE)) 912 error = 0; 913 914 if (error == 0) 915 gettime(&wpipe->pipe_mtime); 916 917 /* 918 * We have something to offer, 919 * wake up select. 920 */ 921 if (wpipe->pipe_buffer.cnt) 922 pipeselwakeup(wpipe); 923 924 return error; 925 } 926 927 /* 928 * we implement a very minimal set of ioctls for compatibility with sockets. 929 */ 930 int 931 pipe_ioctl(fp, cmd, data, p) 932 struct file *fp; 933 int cmd; 934 register caddr_t data; 935 struct proc *p; 936 { 937 register struct pipe *mpipe = (struct pipe *)fp->f_data; 938 939 switch (cmd) { 940 941 case FIONBIO: 942 return (0); 943 944 case FIOASYNC: 945 if (*(int *)data) { 946 mpipe->pipe_state |= PIPE_ASYNC; 947 } else { 948 mpipe->pipe_state &= ~PIPE_ASYNC; 949 } 950 return (0); 951 952 case FIONREAD: 953 if (mpipe->pipe_state & PIPE_DIRECTW) 954 *(int *)data = mpipe->pipe_map.cnt; 955 else 956 *(int *)data = mpipe->pipe_buffer.cnt; 957 return (0); 958 959 case TIOCSPGRP: 960 mpipe->pipe_pgid = *(int *)data; 961 return (0); 962 963 case TIOCGPGRP: 964 *(int *)data = mpipe->pipe_pgid; 965 return (0); 966 967 } 968 return (ENOTTY); 969 } 970 971 int 972 pipe_select(fp, which, p) 973 struct file *fp; 974 int which; 975 struct proc *p; 976 { 977 register struct pipe *rpipe = (struct pipe *)fp->f_data; 978 struct pipe *wpipe; 979 980 wpipe = rpipe->pipe_peer; 981 switch (which) { 982 983 case FREAD: 984 if ( (rpipe->pipe_state & PIPE_DIRECTW) || 985 (rpipe->pipe_buffer.cnt > 0) || 986 (rpipe->pipe_state & PIPE_EOF)) { 987 return (1); 988 } 989 selrecord(p, &rpipe->pipe_sel); 990 rpipe->pipe_state |= PIPE_SEL; 991 break; 992 993 case FWRITE: 994 if ((wpipe == NULL) || 995 (wpipe->pipe_state & PIPE_EOF) || 996 (((wpipe->pipe_state & PIPE_DIRECTW) == 0) && 997 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) { 998 return (1); 999 } 1000 selrecord(p, &wpipe->pipe_sel); 1001 wpipe->pipe_state |= PIPE_SEL; 1002 break; 1003 1004 case 0: 1005 if ((rpipe->pipe_state & PIPE_EOF) || 1006 (wpipe == NULL) || 1007 (wpipe->pipe_state & PIPE_EOF)) { 1008 return (1); 1009 } 1010 1011 selrecord(p, &rpipe->pipe_sel); 1012 rpipe->pipe_state |= PIPE_SEL; 1013 break; 1014 } 1015 return (0); 1016 } 1017 1018 int 1019 pipe_stat(pipe, ub) 1020 register struct pipe *pipe; 1021 register struct stat *ub; 1022 { 1023 bzero((caddr_t)ub, sizeof (*ub)); 1024 ub->st_mode = S_IFIFO; 1025 ub->st_blksize = pipe->pipe_buffer.size; 1026 ub->st_size = pipe->pipe_buffer.cnt; 1027 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 1028 TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec); 1029 TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec); 1030 TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec); 1031 /* 1032 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev, 1033 * st_flags, st_gen. 1034 * XXX (st_dev, st_ino) should be unique. 1035 */ 1036 return 0; 1037 } 1038 1039 /* ARGSUSED */ 1040 static int 1041 pipe_close(fp, p) 1042 struct file *fp; 1043 struct proc *p; 1044 { 1045 struct pipe *cpipe = (struct pipe *)fp->f_data; 1046 1047 pipeclose(cpipe); 1048 fp->f_data = NULL; 1049 return 0; 1050 } 1051 1052 /* 1053 * shutdown the pipe 1054 */ 1055 static void 1056 pipeclose(cpipe) 1057 struct pipe *cpipe; 1058 { 1059 struct pipe *ppipe; 1060 if (cpipe) { 1061 1062 pipeselwakeup(cpipe); 1063 1064 /* 1065 * If the other side is blocked, wake it up saying that 1066 * we want to close it down. 1067 */ 1068 while (cpipe->pipe_busy) { 1069 wakeup(cpipe); 1070 cpipe->pipe_state |= PIPE_WANT|PIPE_EOF; 1071 tsleep(cpipe, PRIBIO, "pipecl", 0); 1072 } 1073 1074 /* 1075 * Disconnect from peer 1076 */ 1077 if (ppipe = cpipe->pipe_peer) { 1078 pipeselwakeup(ppipe); 1079 1080 ppipe->pipe_state |= PIPE_EOF; 1081 wakeup(ppipe); 1082 ppipe->pipe_peer = NULL; 1083 } 1084 1085 /* 1086 * free resources 1087 */ 1088 if (cpipe->pipe_buffer.buffer) { 1089 if (cpipe->pipe_buffer.size > PIPE_SIZE) 1090 --nbigpipe; 1091 amountpipekva -= cpipe->pipe_buffer.size; 1092 kmem_free(kernel_map, 1093 (vm_offset_t)cpipe->pipe_buffer.buffer, 1094 cpipe->pipe_buffer.size); 1095 } 1096 #ifndef PIPE_NODIRECT 1097 if (cpipe->pipe_map.kva) { 1098 amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE; 1099 kmem_free(kernel_map, 1100 cpipe->pipe_map.kva, 1101 cpipe->pipe_buffer.size + PAGE_SIZE); 1102 } 1103 #endif 1104 free(cpipe, M_TEMP); 1105 } 1106 } 1107 #endif 1108