1 /* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice immediately at the beginning of the file, without modification, 10 * this list of conditions, and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. Absolutely no warranty of function or purpose is made by the author 15 * John S. Dyson. 16 * 4. Modifications may be freely made to this file if the above conditions 17 * are met. 18 * 19 * $Id: sys_pipe.c,v 1.19 1996/07/12 08:14:58 bde Exp $ 20 */ 21 22 #ifndef OLD_PIPE 23 24 /* 25 * This file contains a high-performance replacement for the socket-based 26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 27 * all features of sockets, but does do everything that pipes normally 28 * do. 29 */ 30 31 /* 32 * This code has two modes of operation, a small write mode and a large 33 * write mode. The small write mode acts like conventional pipes with 34 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 35 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 36 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and 37 * the receiving process can copy it directly from the pages in the sending 38 * process. 39 * 40 * If the sending process receives a signal, it is possible that it will 41 * go away, and certainly its address space can change, because control 42 * is returned back to the user-mode side. In that case, the pipe code 43 * arranges to copy the buffer supplied by the user process, to a pageable 44 * kernel buffer, and the receiving process will grab the data from the 45 * pageable kernel buffer. Since signals don't happen all that often, 46 * the copy operation is normally eliminated. 47 * 48 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 49 * happen for small transfers so that the system will not spend all of 50 * its time context switching. PIPE_SIZE is constrained by the 51 * amount of kernel virtual memory. 52 */ 53 54 #include <sys/param.h> 55 #include <sys/systm.h> 56 #include <sys/proc.h> 57 #include <sys/file.h> 58 #include <sys/protosw.h> 59 #include <sys/stat.h> 60 #include <sys/filedesc.h> 61 #include <sys/malloc.h> 62 #include <sys/ioctl.h> 63 #include <sys/stat.h> 64 #include <sys/select.h> 65 #include <sys/signalvar.h> 66 #include <sys/errno.h> 67 #include <sys/queue.h> 68 #include <sys/vmmeter.h> 69 #include <sys/kernel.h> 70 #include <sys/sysproto.h> 71 #include <sys/pipe.h> 72 73 #include <vm/vm.h> 74 #include <vm/vm_prot.h> 75 #include <vm/vm_param.h> 76 #include <vm/lock.h> 77 #include <vm/vm_object.h> 78 #include <vm/vm_kern.h> 79 #include <vm/vm_extern.h> 80 #include <vm/pmap.h> 81 #include <vm/vm_map.h> 82 #include <vm/vm_page.h> 83 84 /* 85 * Use this define if you want to disable *fancy* VM things. Expect an 86 * approx 30% decrease in transfer rate. This could be useful for 87 * NetBSD or OpenBSD. 88 */ 89 /* #define PIPE_NODIRECT */ 90 91 /* 92 * interfaces to the outside world 93 */ 94 static int pipe_read __P((struct file *fp, struct uio *uio, 95 struct ucred *cred)); 96 static int pipe_write __P((struct file *fp, struct uio *uio, 97 struct ucred *cred)); 98 static int pipe_close __P((struct file *fp, struct proc *p)); 99 static int pipe_select __P((struct file *fp, int which, struct proc *p)); 100 static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p)); 101 102 static struct fileops pipeops = 103 { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close }; 104 105 /* 106 * Default pipe buffer size(s), this can be kind-of large now because pipe 107 * space is pageable. The pipe code will try to maintain locality of 108 * reference for performance reasons, so small amounts of outstanding I/O 109 * will not wipe the cache. 110 */ 111 #define MINPIPESIZE (PIPE_SIZE/3) 112 #define MAXPIPESIZE (2*PIPE_SIZE/3) 113 114 /* 115 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but 116 * is there so that on large systems, we don't exhaust it. 117 */ 118 #define MAXPIPEKVA (8*1024*1024) 119 120 /* 121 * Limit for direct transfers, we cannot, of course limit 122 * the amount of kva for pipes in general though. 123 */ 124 #define LIMITPIPEKVA (16*1024*1024) 125 126 /* 127 * Limit the number of "big" pipes 128 */ 129 #define LIMITBIGPIPES 32 130 int nbigpipe; 131 132 static int amountpipekva; 133 134 static void pipeclose __P((struct pipe *cpipe)); 135 static void pipeinit __P((struct pipe *cpipe)); 136 static __inline int pipelock __P((struct pipe *cpipe, int catch)); 137 static __inline void pipeunlock __P((struct pipe *cpipe)); 138 static __inline void pipeselwakeup __P((struct pipe *cpipe)); 139 #ifndef PIPE_NODIRECT 140 static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio)); 141 static void pipe_destroy_write_buffer __P((struct pipe *wpipe)); 142 static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio)); 143 static void pipe_clone_write_buffer __P((struct pipe *wpipe)); 144 #endif 145 static void pipespace __P((struct pipe *cpipe)); 146 147 /* 148 * The pipe system call for the DTYPE_PIPE type of pipes 149 */ 150 151 /* ARGSUSED */ 152 int 153 pipe(p, uap, retval) 154 struct proc *p; 155 struct pipe_args /* { 156 int dummy; 157 } */ *uap; 158 int retval[]; 159 { 160 register struct filedesc *fdp = p->p_fd; 161 struct file *rf, *wf; 162 struct pipe *rpipe, *wpipe; 163 int fd, error; 164 165 rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK); 166 pipeinit(rpipe); 167 rpipe->pipe_state |= PIPE_DIRECTOK; 168 wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK); 169 pipeinit(wpipe); 170 wpipe->pipe_state |= PIPE_DIRECTOK; 171 172 error = falloc(p, &rf, &fd); 173 if (error) 174 goto free2; 175 retval[0] = fd; 176 rf->f_flag = FREAD | FWRITE; 177 rf->f_type = DTYPE_PIPE; 178 rf->f_ops = &pipeops; 179 rf->f_data = (caddr_t)rpipe; 180 error = falloc(p, &wf, &fd); 181 if (error) 182 goto free3; 183 wf->f_flag = FREAD | FWRITE; 184 wf->f_type = DTYPE_PIPE; 185 wf->f_ops = &pipeops; 186 wf->f_data = (caddr_t)wpipe; 187 retval[1] = fd; 188 189 rpipe->pipe_peer = wpipe; 190 wpipe->pipe_peer = rpipe; 191 192 return (0); 193 free3: 194 ffree(rf); 195 fdp->fd_ofiles[retval[0]] = 0; 196 free2: 197 (void)pipeclose(wpipe); 198 (void)pipeclose(rpipe); 199 return (error); 200 } 201 202 /* 203 * Allocate kva for pipe circular buffer, the space is pageable 204 */ 205 static void 206 pipespace(cpipe) 207 struct pipe *cpipe; 208 { 209 int npages, error; 210 211 npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE; 212 /* 213 * Create an object, I don't like the idea of paging to/from 214 * kernel_object. 215 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 216 */ 217 cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages); 218 cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map); 219 220 /* 221 * Insert the object into the kernel map, and allocate kva for it. 222 * The map entry is, by default, pageable. 223 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 224 */ 225 error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0, 226 (vm_offset_t *) &cpipe->pipe_buffer.buffer, 227 cpipe->pipe_buffer.size, 1, 228 VM_PROT_ALL, VM_PROT_ALL, 0); 229 230 if (error != KERN_SUCCESS) 231 panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error); 232 amountpipekva += cpipe->pipe_buffer.size; 233 } 234 235 /* 236 * initialize and allocate VM and memory for pipe 237 */ 238 static void 239 pipeinit(cpipe) 240 struct pipe *cpipe; 241 { 242 int s; 243 244 cpipe->pipe_buffer.in = 0; 245 cpipe->pipe_buffer.out = 0; 246 cpipe->pipe_buffer.cnt = 0; 247 cpipe->pipe_buffer.size = PIPE_SIZE; 248 249 /* Buffer kva gets dynamically allocated */ 250 cpipe->pipe_buffer.buffer = NULL; 251 /* cpipe->pipe_buffer.object = invalid */ 252 253 cpipe->pipe_state = 0; 254 cpipe->pipe_peer = NULL; 255 cpipe->pipe_busy = 0; 256 s = splhigh(); 257 cpipe->pipe_ctime = time; 258 cpipe->pipe_atime = time; 259 cpipe->pipe_mtime = time; 260 splx(s); 261 bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel); 262 cpipe->pipe_pgid = NO_PID; 263 264 #ifndef PIPE_NODIRECT 265 /* 266 * pipe data structure initializations to support direct pipe I/O 267 */ 268 cpipe->pipe_map.cnt = 0; 269 cpipe->pipe_map.kva = 0; 270 cpipe->pipe_map.pos = 0; 271 cpipe->pipe_map.npages = 0; 272 /* cpipe->pipe_map.ms[] = invalid */ 273 #endif 274 } 275 276 277 /* 278 * lock a pipe for I/O, blocking other access 279 */ 280 static __inline int 281 pipelock(cpipe, catch) 282 struct pipe *cpipe; 283 int catch; 284 { 285 int error; 286 while (cpipe->pipe_state & PIPE_LOCK) { 287 cpipe->pipe_state |= PIPE_LWANT; 288 if (error = tsleep( cpipe, 289 catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) { 290 return error; 291 } 292 } 293 cpipe->pipe_state |= PIPE_LOCK; 294 return 0; 295 } 296 297 /* 298 * unlock a pipe I/O lock 299 */ 300 static __inline void 301 pipeunlock(cpipe) 302 struct pipe *cpipe; 303 { 304 cpipe->pipe_state &= ~PIPE_LOCK; 305 if (cpipe->pipe_state & PIPE_LWANT) { 306 cpipe->pipe_state &= ~PIPE_LWANT; 307 wakeup(cpipe); 308 } 309 } 310 311 static __inline void 312 pipeselwakeup(cpipe) 313 struct pipe *cpipe; 314 { 315 if (cpipe->pipe_state & PIPE_SEL) { 316 cpipe->pipe_state &= ~PIPE_SEL; 317 selwakeup(&cpipe->pipe_sel); 318 } 319 } 320 321 /* ARGSUSED */ 322 static int 323 pipe_read(fp, uio, cred) 324 struct file *fp; 325 struct uio *uio; 326 struct ucred *cred; 327 { 328 329 struct pipe *rpipe = (struct pipe *) fp->f_data; 330 int error = 0; 331 int nread = 0; 332 int size; 333 334 ++rpipe->pipe_busy; 335 while (uio->uio_resid) { 336 /* 337 * normal pipe buffer receive 338 */ 339 if (rpipe->pipe_buffer.cnt > 0) { 340 int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 341 if (size > rpipe->pipe_buffer.cnt) 342 size = rpipe->pipe_buffer.cnt; 343 if (size > uio->uio_resid) 344 size = uio->uio_resid; 345 if ((error = pipelock(rpipe,1)) == 0) { 346 error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 347 size, uio); 348 pipeunlock(rpipe); 349 } 350 if (error) { 351 break; 352 } 353 rpipe->pipe_buffer.out += size; 354 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 355 rpipe->pipe_buffer.out = 0; 356 357 rpipe->pipe_buffer.cnt -= size; 358 nread += size; 359 #ifndef PIPE_NODIRECT 360 /* 361 * Direct copy, bypassing a kernel buffer. 362 */ 363 } else if ((size = rpipe->pipe_map.cnt) && 364 (rpipe->pipe_state & PIPE_DIRECTW)) { 365 caddr_t va; 366 if (size > uio->uio_resid) 367 size = uio->uio_resid; 368 if ((error = pipelock(rpipe,1)) == 0) { 369 va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos; 370 error = uiomove(va, size, uio); 371 pipeunlock(rpipe); 372 } 373 if (error) 374 break; 375 nread += size; 376 rpipe->pipe_map.pos += size; 377 rpipe->pipe_map.cnt -= size; 378 if (rpipe->pipe_map.cnt == 0) { 379 rpipe->pipe_state &= ~PIPE_DIRECTW; 380 wakeup(rpipe); 381 } 382 #endif 383 } else { 384 /* 385 * detect EOF condition 386 */ 387 if (rpipe->pipe_state & PIPE_EOF) { 388 /* XXX error = ? */ 389 break; 390 } 391 /* 392 * If the "write-side" has been blocked, wake it up now. 393 */ 394 if (rpipe->pipe_state & PIPE_WANTW) { 395 rpipe->pipe_state &= ~PIPE_WANTW; 396 wakeup(rpipe); 397 } 398 if (nread > 0) 399 break; 400 401 if (fp->f_flag & FNONBLOCK) { 402 error = EAGAIN; 403 break; 404 } 405 406 /* 407 * If there is no more to read in the pipe, reset 408 * its pointers to the beginning. This improves 409 * cache hit stats. 410 */ 411 412 if ((error = pipelock(rpipe,1)) == 0) { 413 if (rpipe->pipe_buffer.cnt == 0) { 414 rpipe->pipe_buffer.in = 0; 415 rpipe->pipe_buffer.out = 0; 416 } 417 pipeunlock(rpipe); 418 } else { 419 break; 420 } 421 422 if (rpipe->pipe_state & PIPE_WANTW) { 423 rpipe->pipe_state &= ~PIPE_WANTW; 424 wakeup(rpipe); 425 } 426 427 rpipe->pipe_state |= PIPE_WANTR; 428 if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) { 429 break; 430 } 431 } 432 } 433 434 if (error == 0) { 435 int s = splhigh(); 436 rpipe->pipe_atime = time; 437 splx(s); 438 } 439 440 --rpipe->pipe_busy; 441 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 442 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 443 wakeup(rpipe); 444 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 445 /* 446 * If there is no more to read in the pipe, reset 447 * its pointers to the beginning. This improves 448 * cache hit stats. 449 */ 450 if (rpipe->pipe_buffer.cnt == 0) { 451 if ((error == 0) && (error = pipelock(rpipe,1)) == 0) { 452 rpipe->pipe_buffer.in = 0; 453 rpipe->pipe_buffer.out = 0; 454 pipeunlock(rpipe); 455 } 456 } 457 458 /* 459 * If the "write-side" has been blocked, wake it up now. 460 */ 461 if (rpipe->pipe_state & PIPE_WANTW) { 462 rpipe->pipe_state &= ~PIPE_WANTW; 463 wakeup(rpipe); 464 } 465 } 466 467 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 468 pipeselwakeup(rpipe); 469 470 return error; 471 } 472 473 #ifndef PIPE_NODIRECT 474 /* 475 * Map the sending processes' buffer into kernel space and wire it. 476 * This is similar to a physical write operation. 477 */ 478 static int 479 pipe_build_write_buffer(wpipe, uio) 480 struct pipe *wpipe; 481 struct uio *uio; 482 { 483 int size; 484 int i; 485 vm_offset_t addr, endaddr, paddr; 486 487 size = uio->uio_iov->iov_len; 488 if (size > wpipe->pipe_buffer.size) 489 size = wpipe->pipe_buffer.size; 490 491 endaddr = round_page(uio->uio_iov->iov_base + size); 492 for(i = 0, addr = trunc_page(uio->uio_iov->iov_base); 493 addr < endaddr; 494 addr += PAGE_SIZE, i+=1) { 495 496 vm_page_t m; 497 498 vm_fault_quick( (caddr_t) addr, VM_PROT_READ); 499 paddr = pmap_kextract(addr); 500 if (!paddr) { 501 int j; 502 for(j=0;j<i;j++) 503 vm_page_unwire(wpipe->pipe_map.ms[j]); 504 return EFAULT; 505 } 506 507 m = PHYS_TO_VM_PAGE(paddr); 508 vm_page_wire(m); 509 wpipe->pipe_map.ms[i] = m; 510 } 511 512 /* 513 * set up the control block 514 */ 515 wpipe->pipe_map.npages = i; 516 wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK; 517 wpipe->pipe_map.cnt = size; 518 519 /* 520 * and map the buffer 521 */ 522 if (wpipe->pipe_map.kva == 0) { 523 /* 524 * We need to allocate space for an extra page because the 525 * address range might (will) span pages at times. 526 */ 527 wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map, 528 wpipe->pipe_buffer.size + PAGE_SIZE); 529 amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE; 530 } 531 pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms, 532 wpipe->pipe_map.npages); 533 534 /* 535 * and update the uio data 536 */ 537 538 uio->uio_iov->iov_len -= size; 539 uio->uio_iov->iov_base += size; 540 if (uio->uio_iov->iov_len == 0) 541 uio->uio_iov++; 542 uio->uio_resid -= size; 543 uio->uio_offset += size; 544 return 0; 545 } 546 547 /* 548 * unmap and unwire the process buffer 549 */ 550 static void 551 pipe_destroy_write_buffer(wpipe) 552 struct pipe *wpipe; 553 { 554 int i; 555 if (wpipe->pipe_map.kva) { 556 pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages); 557 558 if (amountpipekva > MAXPIPEKVA) { 559 vm_offset_t kva = wpipe->pipe_map.kva; 560 wpipe->pipe_map.kva = 0; 561 kmem_free(kernel_map, kva, 562 wpipe->pipe_buffer.size + PAGE_SIZE); 563 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE; 564 } 565 } 566 for (i=0;i<wpipe->pipe_map.npages;i++) 567 vm_page_unwire(wpipe->pipe_map.ms[i]); 568 } 569 570 /* 571 * In the case of a signal, the writing process might go away. This 572 * code copies the data into the circular buffer so that the source 573 * pages can be freed without loss of data. 574 */ 575 static void 576 pipe_clone_write_buffer(wpipe) 577 struct pipe *wpipe; 578 { 579 int size; 580 int pos; 581 582 size = wpipe->pipe_map.cnt; 583 pos = wpipe->pipe_map.pos; 584 bcopy((caddr_t) wpipe->pipe_map.kva+pos, 585 (caddr_t) wpipe->pipe_buffer.buffer, 586 size); 587 588 wpipe->pipe_buffer.in = size; 589 wpipe->pipe_buffer.out = 0; 590 wpipe->pipe_buffer.cnt = size; 591 wpipe->pipe_state &= ~PIPE_DIRECTW; 592 593 pipe_destroy_write_buffer(wpipe); 594 } 595 596 /* 597 * This implements the pipe buffer write mechanism. Note that only 598 * a direct write OR a normal pipe write can be pending at any given time. 599 * If there are any characters in the pipe buffer, the direct write will 600 * be deferred until the receiving process grabs all of the bytes from 601 * the pipe buffer. Then the direct mapping write is set-up. 602 */ 603 static int 604 pipe_direct_write(wpipe, uio) 605 struct pipe *wpipe; 606 struct uio *uio; 607 { 608 int error; 609 retry: 610 while (wpipe->pipe_state & PIPE_DIRECTW) { 611 if ( wpipe->pipe_state & PIPE_WANTR) { 612 wpipe->pipe_state &= ~PIPE_WANTR; 613 wakeup(wpipe); 614 } 615 wpipe->pipe_state |= PIPE_WANTW; 616 error = tsleep(wpipe, 617 PRIBIO|PCATCH, "pipdww", 0); 618 if (error) 619 goto error1; 620 if (wpipe->pipe_state & PIPE_EOF) { 621 error = EPIPE; 622 goto error1; 623 } 624 } 625 wpipe->pipe_map.cnt = 0; /* transfer not ready yet */ 626 if (wpipe->pipe_buffer.cnt > 0) { 627 if ( wpipe->pipe_state & PIPE_WANTR) { 628 wpipe->pipe_state &= ~PIPE_WANTR; 629 wakeup(wpipe); 630 } 631 632 wpipe->pipe_state |= PIPE_WANTW; 633 error = tsleep(wpipe, 634 PRIBIO|PCATCH, "pipdwc", 0); 635 if (error) 636 goto error1; 637 if (wpipe->pipe_state & PIPE_EOF) { 638 error = EPIPE; 639 goto error1; 640 } 641 goto retry; 642 } 643 644 wpipe->pipe_state |= PIPE_DIRECTW; 645 646 error = pipe_build_write_buffer(wpipe, uio); 647 if (error) { 648 wpipe->pipe_state &= ~PIPE_DIRECTW; 649 goto error1; 650 } 651 652 error = 0; 653 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) { 654 if (wpipe->pipe_state & PIPE_EOF) { 655 pipelock(wpipe, 0); 656 pipe_destroy_write_buffer(wpipe); 657 pipeunlock(wpipe); 658 pipeselwakeup(wpipe); 659 error = EPIPE; 660 goto error1; 661 } 662 if (wpipe->pipe_state & PIPE_WANTR) { 663 wpipe->pipe_state &= ~PIPE_WANTR; 664 wakeup(wpipe); 665 } 666 pipeselwakeup(wpipe); 667 error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0); 668 } 669 670 pipelock(wpipe,0); 671 if (wpipe->pipe_state & PIPE_DIRECTW) { 672 /* 673 * this bit of trickery substitutes a kernel buffer for 674 * the process that might be going away. 675 */ 676 pipe_clone_write_buffer(wpipe); 677 } else { 678 pipe_destroy_write_buffer(wpipe); 679 } 680 pipeunlock(wpipe); 681 return error; 682 683 error1: 684 wakeup(wpipe); 685 return error; 686 } 687 #endif 688 689 static int 690 pipe_write(fp, uio, cred) 691 struct file *fp; 692 struct uio *uio; 693 struct ucred *cred; 694 { 695 int error = 0; 696 int orig_resid; 697 698 struct pipe *wpipe, *rpipe; 699 700 rpipe = (struct pipe *) fp->f_data; 701 wpipe = rpipe->pipe_peer; 702 703 /* 704 * detect loss of pipe read side, issue SIGPIPE if lost. 705 */ 706 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 707 return EPIPE; 708 } 709 710 /* 711 * If it is advantageous to resize the pipe buffer, do 712 * so. 713 */ 714 if ((uio->uio_resid > PIPE_SIZE) && 715 (nbigpipe < LIMITBIGPIPES) && 716 (wpipe->pipe_state & PIPE_DIRECTW) == 0 && 717 (wpipe->pipe_buffer.size <= PIPE_SIZE) && 718 (wpipe->pipe_buffer.cnt == 0)) { 719 720 if (wpipe->pipe_buffer.buffer) { 721 amountpipekva -= wpipe->pipe_buffer.size; 722 kmem_free(kernel_map, 723 (vm_offset_t)wpipe->pipe_buffer.buffer, 724 wpipe->pipe_buffer.size); 725 } 726 727 #ifndef PIPE_NODIRECT 728 if (wpipe->pipe_map.kva) { 729 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE; 730 kmem_free(kernel_map, 731 wpipe->pipe_map.kva, 732 wpipe->pipe_buffer.size + PAGE_SIZE); 733 } 734 #endif 735 736 wpipe->pipe_buffer.in = 0; 737 wpipe->pipe_buffer.out = 0; 738 wpipe->pipe_buffer.cnt = 0; 739 wpipe->pipe_buffer.size = BIG_PIPE_SIZE; 740 wpipe->pipe_buffer.buffer = NULL; 741 ++nbigpipe; 742 743 #ifndef PIPE_NODIRECT 744 wpipe->pipe_map.cnt = 0; 745 wpipe->pipe_map.kva = 0; 746 wpipe->pipe_map.pos = 0; 747 wpipe->pipe_map.npages = 0; 748 #endif 749 750 } 751 752 753 if( wpipe->pipe_buffer.buffer == NULL) { 754 if ((error = pipelock(wpipe,1)) == 0) { 755 pipespace(wpipe); 756 pipeunlock(wpipe); 757 } else { 758 return error; 759 } 760 } 761 762 ++wpipe->pipe_busy; 763 orig_resid = uio->uio_resid; 764 while (uio->uio_resid) { 765 int space; 766 #ifndef PIPE_NODIRECT 767 /* 768 * If the transfer is large, we can gain performance if 769 * we do process-to-process copies directly. 770 * If the write is non-blocking, we don't use the 771 * direct write mechanism. 772 */ 773 if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) && 774 (fp->f_flag & FNONBLOCK) == 0 && 775 (wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) && 776 (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) { 777 error = pipe_direct_write( wpipe, uio); 778 if (error) { 779 break; 780 } 781 continue; 782 } 783 #endif 784 785 /* 786 * Pipe buffered writes cannot be coincidental with 787 * direct writes. We wait until the currently executing 788 * direct write is completed before we start filling the 789 * pipe buffer. 790 */ 791 retrywrite: 792 while (wpipe->pipe_state & PIPE_DIRECTW) { 793 if (wpipe->pipe_state & PIPE_WANTR) { 794 wpipe->pipe_state &= ~PIPE_WANTR; 795 wakeup(wpipe); 796 } 797 error = tsleep(wpipe, 798 PRIBIO|PCATCH, "pipbww", 0); 799 if (error) 800 break; 801 } 802 803 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 804 805 /* Writes of size <= PIPE_BUF must be atomic. */ 806 /* XXX perhaps they need to be contiguous to be atomic? */ 807 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 808 space = 0; 809 810 if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) { 811 /* 812 * This set the maximum transfer as a segment of 813 * the buffer. 814 */ 815 int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in; 816 /* 817 * space is the size left in the buffer 818 */ 819 if (size > space) 820 size = space; 821 /* 822 * now limit it to the size of the uio transfer 823 */ 824 if (size > uio->uio_resid) 825 size = uio->uio_resid; 826 if ((error = pipelock(wpipe,1)) == 0) { 827 /* 828 * It is possible for a direct write to 829 * slip in on us... handle it here... 830 */ 831 if (wpipe->pipe_state & PIPE_DIRECTW) { 832 pipeunlock(wpipe); 833 goto retrywrite; 834 } 835 error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 836 size, uio); 837 pipeunlock(wpipe); 838 } 839 if (error) 840 break; 841 842 wpipe->pipe_buffer.in += size; 843 if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size) 844 wpipe->pipe_buffer.in = 0; 845 846 wpipe->pipe_buffer.cnt += size; 847 } else { 848 /* 849 * If the "read-side" has been blocked, wake it up now. 850 */ 851 if (wpipe->pipe_state & PIPE_WANTR) { 852 wpipe->pipe_state &= ~PIPE_WANTR; 853 wakeup(wpipe); 854 } 855 856 /* 857 * don't block on non-blocking I/O 858 */ 859 if (fp->f_flag & FNONBLOCK) { 860 error = EAGAIN; 861 break; 862 } 863 864 /* 865 * We have no more space and have something to offer, 866 * wake up selects. 867 */ 868 pipeselwakeup(wpipe); 869 870 wpipe->pipe_state |= PIPE_WANTW; 871 if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) { 872 break; 873 } 874 /* 875 * If read side wants to go away, we just issue a signal 876 * to ourselves. 877 */ 878 if (wpipe->pipe_state & PIPE_EOF) { 879 error = EPIPE; 880 break; 881 } 882 } 883 } 884 885 --wpipe->pipe_busy; 886 if ((wpipe->pipe_busy == 0) && 887 (wpipe->pipe_state & PIPE_WANT)) { 888 wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR); 889 wakeup(wpipe); 890 } else if (wpipe->pipe_buffer.cnt > 0) { 891 /* 892 * If we have put any characters in the buffer, we wake up 893 * the reader. 894 */ 895 if (wpipe->pipe_state & PIPE_WANTR) { 896 wpipe->pipe_state &= ~PIPE_WANTR; 897 wakeup(wpipe); 898 } 899 } 900 901 /* 902 * Don't return EPIPE if I/O was successful 903 */ 904 if ((wpipe->pipe_buffer.cnt == 0) && 905 (uio->uio_resid == 0) && 906 (error == EPIPE)) 907 error = 0; 908 909 if (error == 0) { 910 int s = splhigh(); 911 wpipe->pipe_mtime = time; 912 splx(s); 913 } 914 /* 915 * We have something to offer, 916 * wake up select. 917 */ 918 if (wpipe->pipe_buffer.cnt) 919 pipeselwakeup(wpipe); 920 921 return error; 922 } 923 924 /* 925 * we implement a very minimal set of ioctls for compatibility with sockets. 926 */ 927 int 928 pipe_ioctl(fp, cmd, data, p) 929 struct file *fp; 930 int cmd; 931 register caddr_t data; 932 struct proc *p; 933 { 934 register struct pipe *mpipe = (struct pipe *)fp->f_data; 935 936 switch (cmd) { 937 938 case FIONBIO: 939 return (0); 940 941 case FIOASYNC: 942 if (*(int *)data) { 943 mpipe->pipe_state |= PIPE_ASYNC; 944 } else { 945 mpipe->pipe_state &= ~PIPE_ASYNC; 946 } 947 return (0); 948 949 case FIONREAD: 950 if (mpipe->pipe_state & PIPE_DIRECTW) 951 *(int *)data = mpipe->pipe_map.cnt; 952 else 953 *(int *)data = mpipe->pipe_buffer.cnt; 954 return (0); 955 956 case SIOCSPGRP: 957 mpipe->pipe_pgid = *(int *)data; 958 return (0); 959 960 case SIOCGPGRP: 961 *(int *)data = mpipe->pipe_pgid; 962 return (0); 963 964 } 965 return (ENOTTY); 966 } 967 968 int 969 pipe_select(fp, which, p) 970 struct file *fp; 971 int which; 972 struct proc *p; 973 { 974 register struct pipe *rpipe = (struct pipe *)fp->f_data; 975 struct pipe *wpipe; 976 977 wpipe = rpipe->pipe_peer; 978 switch (which) { 979 980 case FREAD: 981 if ( (rpipe->pipe_state & PIPE_DIRECTW) || 982 (rpipe->pipe_buffer.cnt > 0) || 983 (rpipe->pipe_state & PIPE_EOF)) { 984 return (1); 985 } 986 selrecord(p, &rpipe->pipe_sel); 987 rpipe->pipe_state |= PIPE_SEL; 988 break; 989 990 case FWRITE: 991 if ((wpipe == NULL) || 992 (wpipe->pipe_state & PIPE_EOF) || 993 (((wpipe->pipe_state & PIPE_DIRECTW) == 0) && 994 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) { 995 return (1); 996 } 997 selrecord(p, &wpipe->pipe_sel); 998 wpipe->pipe_state |= PIPE_SEL; 999 break; 1000 1001 case 0: 1002 if ((rpipe->pipe_state & PIPE_EOF) || 1003 (wpipe == NULL) || 1004 (wpipe->pipe_state & PIPE_EOF)) { 1005 return (1); 1006 } 1007 1008 selrecord(p, &rpipe->pipe_sel); 1009 rpipe->pipe_state |= PIPE_SEL; 1010 break; 1011 } 1012 return (0); 1013 } 1014 1015 int 1016 pipe_stat(pipe, ub) 1017 register struct pipe *pipe; 1018 register struct stat *ub; 1019 { 1020 bzero((caddr_t)ub, sizeof (*ub)); 1021 ub->st_mode = S_IFIFO; 1022 ub->st_blksize = pipe->pipe_buffer.size; 1023 ub->st_size = pipe->pipe_buffer.cnt; 1024 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 1025 TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec); 1026 TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec); 1027 TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec); 1028 /* 1029 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev, 1030 * st_flags, st_gen. 1031 * XXX (st_dev, st_ino) should be unique. 1032 */ 1033 return 0; 1034 } 1035 1036 /* ARGSUSED */ 1037 static int 1038 pipe_close(fp, p) 1039 struct file *fp; 1040 struct proc *p; 1041 { 1042 struct pipe *cpipe = (struct pipe *)fp->f_data; 1043 1044 pipeclose(cpipe); 1045 fp->f_data = NULL; 1046 return 0; 1047 } 1048 1049 /* 1050 * shutdown the pipe 1051 */ 1052 static void 1053 pipeclose(cpipe) 1054 struct pipe *cpipe; 1055 { 1056 struct pipe *ppipe; 1057 if (cpipe) { 1058 1059 pipeselwakeup(cpipe); 1060 1061 /* 1062 * If the other side is blocked, wake it up saying that 1063 * we want to close it down. 1064 */ 1065 while (cpipe->pipe_busy) { 1066 wakeup(cpipe); 1067 cpipe->pipe_state |= PIPE_WANT|PIPE_EOF; 1068 tsleep(cpipe, PRIBIO, "pipecl", 0); 1069 } 1070 1071 /* 1072 * Disconnect from peer 1073 */ 1074 if (ppipe = cpipe->pipe_peer) { 1075 pipeselwakeup(ppipe); 1076 1077 ppipe->pipe_state |= PIPE_EOF; 1078 wakeup(ppipe); 1079 ppipe->pipe_peer = NULL; 1080 } 1081 1082 /* 1083 * free resources 1084 */ 1085 if (cpipe->pipe_buffer.buffer) { 1086 if (cpipe->pipe_buffer.size > PIPE_SIZE) 1087 --nbigpipe; 1088 amountpipekva -= cpipe->pipe_buffer.size; 1089 kmem_free(kernel_map, 1090 (vm_offset_t)cpipe->pipe_buffer.buffer, 1091 cpipe->pipe_buffer.size); 1092 } 1093 #ifndef PIPE_NODIRECT 1094 if (cpipe->pipe_map.kva) { 1095 amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE; 1096 kmem_free(kernel_map, 1097 cpipe->pipe_map.kva, 1098 cpipe->pipe_buffer.size + PAGE_SIZE); 1099 } 1100 #endif 1101 free(cpipe, M_TEMP); 1102 } 1103 } 1104 #endif 1105