1 /* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice immediately at the beginning of the file, without modification, 10 * this list of conditions, and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. Absolutely no warranty of function or purpose is made by the author 15 * John S. Dyson. 16 * 4. Modifications may be freely made to this file if the above conditions 17 * are met. 18 * 19 * $Id: sys_pipe.c,v 1.12 1996/02/17 14:47:16 peter Exp $ 20 */ 21 22 #ifndef OLD_PIPE 23 24 /* 25 * This file contains a high-performance replacement for the socket-based 26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 27 * all features of sockets, but does do everything that pipes normally 28 * do. 29 */ 30 31 /* 32 * This code has two modes of operation, a small write mode and a large 33 * write mode. The small write mode acts like conventional pipes with 34 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 35 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 36 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and 37 * the receiving process can copy it directly from the pages in the sending 38 * process. 39 * 40 * If the sending process receives a signal, it is possible that it will 41 * go away, and certainly its address space can change, because control 42 * is returned back to the user-mode side. In that case, the pipe code 43 * arranges to copy the buffer supplied by the user process, to a pageable 44 * kernel buffer, and the receiving process will grab the data from the 45 * pageable kernel buffer. Since signals don't happen all that often, 46 * the copy operation is normally eliminated. 47 * 48 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 49 * happen for small transfers so that the system will not spend all of 50 * its time context switching. PIPE_SIZE is constrained by the 51 * amount of kernel virtual memory. 52 */ 53 54 #include <sys/param.h> 55 #include <sys/systm.h> 56 #include <sys/proc.h> 57 #include <sys/file.h> 58 #include <sys/protosw.h> 59 #include <sys/stat.h> 60 #include <sys/filedesc.h> 61 #include <sys/malloc.h> 62 #include <sys/ioctl.h> 63 #include <sys/stat.h> 64 #include <sys/select.h> 65 #include <sys/signalvar.h> 66 #include <sys/errno.h> 67 #include <sys/queue.h> 68 #include <sys/vmmeter.h> 69 #include <sys/kernel.h> 70 #include <sys/sysproto.h> 71 #include <sys/pipe.h> 72 73 #include <vm/vm.h> 74 #include <vm/vm_prot.h> 75 #include <vm/vm_param.h> 76 #include <vm/lock.h> 77 #include <vm/vm_object.h> 78 #include <vm/vm_kern.h> 79 #include <vm/vm_extern.h> 80 #include <vm/pmap.h> 81 #include <vm/vm_map.h> 82 #include <vm/vm_page.h> 83 84 /* 85 * Use this define if you want to disable *fancy* VM things. Expect an 86 * approx 30% decrease in transfer rate. This could be useful for 87 * NetBSD or OpenBSD. 88 */ 89 /* #define PIPE_NODIRECT */ 90 91 /* 92 * interfaces to the outside world 93 */ 94 static int pipe_read __P((struct file *fp, struct uio *uio, 95 struct ucred *cred)); 96 static int pipe_write __P((struct file *fp, struct uio *uio, 97 struct ucred *cred)); 98 static int pipe_close __P((struct file *fp, struct proc *p)); 99 static int pipe_select __P((struct file *fp, int which, struct proc *p)); 100 static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p)); 101 102 static struct fileops pipeops = 103 { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close }; 104 105 /* 106 * Default pipe buffer size(s), this can be kind-of large now because pipe 107 * space is pageable. The pipe code will try to maintain locality of 108 * reference for performance reasons, so small amounts of outstanding I/O 109 * will not wipe the cache. 110 */ 111 #define MINPIPESIZE (PIPE_SIZE/3) 112 #define MAXPIPESIZE (2*PIPE_SIZE/3) 113 114 /* 115 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but 116 * is there so that on large systems, we don't exhaust it. 117 */ 118 #define MAXPIPEKVA (8*1024*1024) 119 120 /* 121 * Limit for direct transfers, we cannot, of course limit 122 * the amount of kva for pipes in general though. 123 */ 124 #define LIMITPIPEKVA (16*1024*1024) 125 int amountpipekva; 126 127 static void pipeclose __P((struct pipe *cpipe)); 128 static void pipebufferinit __P((struct pipe *cpipe)); 129 static void pipeinit __P((struct pipe *cpipe)); 130 static __inline int pipelock __P((struct pipe *cpipe, int catch)); 131 static __inline void pipeunlock __P((struct pipe *cpipe)); 132 static __inline void pipeselwakeup __P((struct pipe *cpipe)); 133 #ifndef PIPE_NODIRECT 134 static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio)); 135 static void pipe_destroy_write_buffer __P((struct pipe *wpipe)); 136 static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio)); 137 static void pipe_clone_write_buffer __P((struct pipe *wpipe)); 138 static void pipe_mark_pages_clean __P((struct pipe *cpipe)); 139 #endif 140 static int pipewrite __P((struct pipe *wpipe, struct uio *uio, int nbio)); 141 static void pipespace __P((struct pipe *cpipe)); 142 143 /* 144 * The pipe system call for the DTYPE_PIPE type of pipes 145 */ 146 147 /* ARGSUSED */ 148 int 149 pipe(p, uap, retval) 150 struct proc *p; 151 struct pipe_args /* { 152 int dummy; 153 } */ *uap; 154 int retval[]; 155 { 156 register struct filedesc *fdp = p->p_fd; 157 struct file *rf, *wf; 158 struct pipe *rpipe, *wpipe; 159 int fd, error; 160 161 rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK); 162 pipeinit(rpipe); 163 rpipe->pipe_state |= PIPE_DIRECTOK; 164 wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK); 165 pipeinit(wpipe); 166 wpipe->pipe_state |= PIPE_DIRECTOK; 167 168 error = falloc(p, &rf, &fd); 169 if (error) 170 goto free2; 171 retval[0] = fd; 172 rf->f_flag = FREAD | FWRITE; 173 rf->f_type = DTYPE_PIPE; 174 rf->f_ops = &pipeops; 175 rf->f_data = (caddr_t)rpipe; 176 error = falloc(p, &wf, &fd); 177 if (error) 178 goto free3; 179 wf->f_flag = FREAD | FWRITE; 180 wf->f_type = DTYPE_PIPE; 181 wf->f_ops = &pipeops; 182 wf->f_data = (caddr_t)wpipe; 183 retval[1] = fd; 184 185 rpipe->pipe_peer = wpipe; 186 wpipe->pipe_peer = rpipe; 187 188 return (0); 189 free3: 190 ffree(rf); 191 fdp->fd_ofiles[retval[0]] = 0; 192 free2: 193 (void)pipeclose(wpipe); 194 free1: 195 (void)pipeclose(rpipe); 196 return (error); 197 } 198 199 /* 200 * Allocate kva for pipe circular buffer, the space is pageable 201 */ 202 static void 203 pipespace(cpipe) 204 struct pipe *cpipe; 205 { 206 int npages, error; 207 208 npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE; 209 /* 210 * Create an object, I don't like the idea of paging to/from 211 * kernel_object. 212 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 213 */ 214 cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages); 215 cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map); 216 217 /* 218 * Insert the object into the kernel map, and allocate kva for it. 219 * The map entry is, by default, pageable. 220 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 221 */ 222 error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0, 223 (vm_offset_t *) &cpipe->pipe_buffer.buffer, 224 cpipe->pipe_buffer.size, 1, 225 VM_PROT_ALL, VM_PROT_ALL, 0); 226 227 if (error != KERN_SUCCESS) 228 panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error); 229 amountpipekva += cpipe->pipe_buffer.size; 230 } 231 232 /* 233 * initialize and allocate VM and memory for pipe 234 */ 235 static void 236 pipeinit(cpipe) 237 struct pipe *cpipe; 238 { 239 int s; 240 241 cpipe->pipe_buffer.in = 0; 242 cpipe->pipe_buffer.out = 0; 243 cpipe->pipe_buffer.cnt = 0; 244 cpipe->pipe_buffer.size = PIPE_SIZE; 245 /* Buffer kva gets dynamically allocated */ 246 cpipe->pipe_buffer.buffer = NULL; 247 248 cpipe->pipe_state = 0; 249 cpipe->pipe_peer = NULL; 250 cpipe->pipe_busy = 0; 251 s = splhigh(); 252 cpipe->pipe_ctime = time; 253 cpipe->pipe_atime = time; 254 cpipe->pipe_mtime = time; 255 splx(s); 256 bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel); 257 258 #ifndef PIPE_NODIRECT 259 /* 260 * pipe data structure initializations to support direct pipe I/O 261 */ 262 cpipe->pipe_map.cnt = 0; 263 cpipe->pipe_map.kva = 0; 264 cpipe->pipe_map.pos = 0; 265 cpipe->pipe_map.npages = 0; 266 #endif 267 } 268 269 270 /* 271 * lock a pipe for I/O, blocking other access 272 */ 273 static __inline int 274 pipelock(cpipe, catch) 275 struct pipe *cpipe; 276 int catch; 277 { 278 int error; 279 while (cpipe->pipe_state & PIPE_LOCK) { 280 cpipe->pipe_state |= PIPE_LWANT; 281 if (error = tsleep( cpipe, 282 catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) { 283 return error; 284 } 285 } 286 cpipe->pipe_state |= PIPE_LOCK; 287 return 0; 288 } 289 290 /* 291 * unlock a pipe I/O lock 292 */ 293 static __inline void 294 pipeunlock(cpipe) 295 struct pipe *cpipe; 296 { 297 cpipe->pipe_state &= ~PIPE_LOCK; 298 if (cpipe->pipe_state & PIPE_LWANT) { 299 cpipe->pipe_state &= ~PIPE_LWANT; 300 wakeup(cpipe); 301 } 302 return; 303 } 304 305 static __inline void 306 pipeselwakeup(cpipe) 307 struct pipe *cpipe; 308 { 309 if (cpipe->pipe_state & PIPE_SEL) { 310 cpipe->pipe_state &= ~PIPE_SEL; 311 selwakeup(&cpipe->pipe_sel); 312 } 313 } 314 315 #ifndef PIPE_NODIRECT 316 #if 0 317 static void 318 pipe_mark_pages_clean(cpipe) 319 struct pipe *cpipe; 320 { 321 vm_size_t off; 322 vm_page_t m; 323 324 for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) { 325 m = vm_page_lookup(cpipe->pipe_buffer.object, off); 326 if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) { 327 m->dirty = 0; 328 pmap_clear_modify(VM_PAGE_TO_PHYS(m)); 329 } 330 } 331 } 332 #endif 333 #endif 334 335 /* ARGSUSED */ 336 static int 337 pipe_read(fp, uio, cred) 338 struct file *fp; 339 struct uio *uio; 340 struct ucred *cred; 341 { 342 343 struct pipe *rpipe = (struct pipe *) fp->f_data; 344 int error = 0; 345 int nread = 0; 346 int size; 347 348 ++rpipe->pipe_busy; 349 while (uio->uio_resid) { 350 /* 351 * normal pipe buffer receive 352 */ 353 if (rpipe->pipe_buffer.cnt > 0) { 354 int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 355 if (size > rpipe->pipe_buffer.cnt) 356 size = rpipe->pipe_buffer.cnt; 357 if (size > uio->uio_resid) 358 size = uio->uio_resid; 359 if ((error = pipelock(rpipe,1)) == 0) { 360 error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 361 size, uio); 362 pipeunlock(rpipe); 363 } 364 if (error) { 365 break; 366 } 367 rpipe->pipe_buffer.out += size; 368 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 369 rpipe->pipe_buffer.out = 0; 370 371 rpipe->pipe_buffer.cnt -= size; 372 nread += size; 373 #ifndef PIPE_NODIRECT 374 /* 375 * Direct copy, bypassing a kernel buffer. 376 */ 377 } else if ((size = rpipe->pipe_map.cnt) && 378 (rpipe->pipe_state & PIPE_DIRECTW)) { 379 caddr_t va; 380 if (size > uio->uio_resid) 381 size = uio->uio_resid; 382 if ((error = pipelock(rpipe,1)) == 0) { 383 va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos; 384 error = uiomove(va, size, uio); 385 pipeunlock(rpipe); 386 } 387 if (error) 388 break; 389 nread += size; 390 rpipe->pipe_map.pos += size; 391 rpipe->pipe_map.cnt -= size; 392 if (rpipe->pipe_map.cnt == 0) { 393 rpipe->pipe_state &= ~PIPE_DIRECTW; 394 wakeup(rpipe); 395 } 396 #endif 397 } else { 398 /* 399 * detect EOF condition 400 */ 401 if (rpipe->pipe_state & PIPE_EOF) { 402 break; 403 } 404 /* 405 * If the "write-side" has been blocked, wake it up now. 406 */ 407 if (rpipe->pipe_state & PIPE_WANTW) { 408 rpipe->pipe_state &= ~PIPE_WANTW; 409 wakeup(rpipe); 410 } 411 if (nread > 0) 412 break; 413 if (rpipe->pipe_state & PIPE_NBIO) { 414 error = EAGAIN; 415 break; 416 } 417 418 /* 419 * If there is no more to read in the pipe, reset 420 * its pointers to the beginning. This improves 421 * cache hit stats. 422 */ 423 424 if ((error = pipelock(rpipe,1)) == 0) { 425 if (rpipe->pipe_buffer.cnt == 0) { 426 rpipe->pipe_buffer.in = 0; 427 rpipe->pipe_buffer.out = 0; 428 } 429 pipeunlock(rpipe); 430 } else { 431 break; 432 } 433 434 if (rpipe->pipe_state & PIPE_WANTW) { 435 rpipe->pipe_state &= ~PIPE_WANTW; 436 wakeup(rpipe); 437 } 438 439 rpipe->pipe_state |= PIPE_WANTR; 440 if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) { 441 break; 442 } 443 } 444 } 445 446 if (error == 0) { 447 int s = splhigh(); 448 rpipe->pipe_atime = time; 449 splx(s); 450 } 451 452 --rpipe->pipe_busy; 453 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 454 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 455 wakeup(rpipe); 456 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 457 /* 458 * If there is no more to read in the pipe, reset 459 * its pointers to the beginning. This improves 460 * cache hit stats. 461 */ 462 if ((error == 0) && (error = pipelock(rpipe,1)) == 0) { 463 if (rpipe->pipe_buffer.cnt == 0) { 464 #if 0 465 pipe_mark_pages_clean(rpipe); 466 #endif 467 rpipe->pipe_buffer.in = 0; 468 rpipe->pipe_buffer.out = 0; 469 } 470 pipeunlock(rpipe); 471 } 472 473 /* 474 * If the "write-side" has been blocked, wake it up now. 475 */ 476 if (rpipe->pipe_state & PIPE_WANTW) { 477 rpipe->pipe_state &= ~PIPE_WANTW; 478 wakeup(rpipe); 479 } 480 } 481 482 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) > PIPE_BUF) 483 pipeselwakeup(rpipe); 484 485 return error; 486 } 487 488 #ifndef PIPE_NODIRECT 489 /* 490 * Map the sending processes' buffer into kernel space and wire it. 491 * This is similar to a physical write operation. 492 */ 493 static int 494 pipe_build_write_buffer(wpipe, uio) 495 struct pipe *wpipe; 496 struct uio *uio; 497 { 498 int size; 499 int i; 500 vm_offset_t addr, endaddr, paddr; 501 502 size = uio->uio_iov->iov_len; 503 if (size > wpipe->pipe_buffer.size) 504 size = wpipe->pipe_buffer.size; 505 506 endaddr = round_page(uio->uio_iov->iov_base + size); 507 for(i = 0, addr = trunc_page(uio->uio_iov->iov_base); 508 addr < endaddr; 509 addr += PAGE_SIZE, i+=1) { 510 511 vm_page_t m; 512 513 vm_fault_quick( (caddr_t) addr, VM_PROT_READ); 514 paddr = pmap_kextract(addr); 515 if (!paddr) { 516 int j; 517 for(j=0;j<i;j++) 518 vm_page_unwire(wpipe->pipe_map.ms[j]); 519 return EFAULT; 520 } 521 522 m = PHYS_TO_VM_PAGE(paddr); 523 vm_page_wire(m); 524 wpipe->pipe_map.ms[i] = m; 525 } 526 527 /* 528 * set up the control block 529 */ 530 wpipe->pipe_map.npages = i; 531 wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK; 532 wpipe->pipe_map.cnt = size; 533 534 /* 535 * and map the buffer 536 */ 537 if (wpipe->pipe_map.kva == 0) { 538 /* 539 * We need to allocate space for an extra page because the 540 * address range might (will) span pages at times. 541 */ 542 wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map, 543 wpipe->pipe_buffer.size + PAGE_SIZE); 544 amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE; 545 } 546 pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms, 547 wpipe->pipe_map.npages); 548 549 /* 550 * and update the uio data 551 */ 552 553 uio->uio_iov->iov_len -= size; 554 uio->uio_iov->iov_base += size; 555 if (uio->uio_iov->iov_len == 0) 556 uio->uio_iov++; 557 uio->uio_resid -= size; 558 uio->uio_offset += size; 559 return 0; 560 } 561 562 /* 563 * unmap and unwire the process buffer 564 */ 565 static void 566 pipe_destroy_write_buffer(wpipe) 567 struct pipe *wpipe; 568 { 569 int i; 570 pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages); 571 572 if (wpipe->pipe_map.kva) { 573 if (amountpipekva > MAXPIPEKVA) { 574 vm_offset_t kva = wpipe->pipe_map.kva; 575 wpipe->pipe_map.kva = 0; 576 kmem_free(kernel_map, kva, 577 wpipe->pipe_buffer.size + PAGE_SIZE); 578 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE; 579 } 580 } 581 for (i=0;i<wpipe->pipe_map.npages;i++) 582 vm_page_unwire(wpipe->pipe_map.ms[i]); 583 } 584 585 /* 586 * In the case of a signal, the writing process might go away. This 587 * code copies the data into the circular buffer so that the source 588 * pages can be freed without loss of data. 589 */ 590 static void 591 pipe_clone_write_buffer(wpipe) 592 struct pipe *wpipe; 593 { 594 int size; 595 int pos; 596 597 size = wpipe->pipe_map.cnt; 598 pos = wpipe->pipe_map.pos; 599 bcopy((caddr_t) wpipe->pipe_map.kva+pos, 600 (caddr_t) wpipe->pipe_buffer.buffer, 601 size); 602 603 wpipe->pipe_buffer.in = size; 604 wpipe->pipe_buffer.out = 0; 605 wpipe->pipe_buffer.cnt = size; 606 wpipe->pipe_state &= ~PIPE_DIRECTW; 607 608 pipe_destroy_write_buffer(wpipe); 609 } 610 611 /* 612 * This implements the pipe buffer write mechanism. Note that only 613 * a direct write OR a normal pipe write can be pending at any given time. 614 * If there are any characters in the pipe buffer, the direct write will 615 * be deferred until the receiving process grabs all of the bytes from 616 * the pipe buffer. Then the direct mapping write is set-up. 617 */ 618 static int 619 pipe_direct_write(wpipe, uio) 620 struct pipe *wpipe; 621 struct uio *uio; 622 { 623 int error; 624 retry: 625 while (wpipe->pipe_state & PIPE_DIRECTW) { 626 if ( wpipe->pipe_state & PIPE_WANTR) { 627 wpipe->pipe_state &= ~PIPE_WANTR; 628 wakeup(wpipe); 629 } 630 wpipe->pipe_state |= PIPE_WANTW; 631 error = tsleep(wpipe, 632 PRIBIO|PCATCH, "pipdww", 0); 633 if (error || (wpipe->pipe_state & PIPE_EOF)) 634 goto error1; 635 } 636 wpipe->pipe_map.cnt = 0; /* transfer not ready yet */ 637 if (wpipe->pipe_buffer.cnt > 0) { 638 if ( wpipe->pipe_state & PIPE_WANTR) { 639 wpipe->pipe_state &= ~PIPE_WANTR; 640 wakeup(wpipe); 641 } 642 643 wpipe->pipe_state |= PIPE_WANTW; 644 error = tsleep(wpipe, 645 PRIBIO|PCATCH, "pipdwc", 0); 646 if (error || (wpipe->pipe_state & PIPE_EOF)) { 647 if (error == 0) 648 error = EPIPE; 649 goto error1; 650 } 651 goto retry; 652 } 653 654 wpipe->pipe_state |= PIPE_DIRECTW; 655 656 error = pipe_build_write_buffer(wpipe, uio); 657 if (error) { 658 wpipe->pipe_state &= ~PIPE_DIRECTW; 659 goto error1; 660 } 661 662 error = 0; 663 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) { 664 if (wpipe->pipe_state & PIPE_EOF) { 665 pipelock(wpipe, 0); 666 pipe_destroy_write_buffer(wpipe); 667 pipeunlock(wpipe); 668 pipeselwakeup(wpipe); 669 wakeup(wpipe); 670 return EPIPE; 671 } 672 if (wpipe->pipe_state & PIPE_WANTR) { 673 wpipe->pipe_state &= ~PIPE_WANTR; 674 wakeup(wpipe); 675 } 676 pipeselwakeup(wpipe); 677 error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0); 678 } 679 680 pipelock(wpipe,0); 681 if (wpipe->pipe_state & PIPE_DIRECTW) { 682 /* 683 * this bit of trickery substitutes a kernel buffer for 684 * the process that might be going away. 685 */ 686 pipe_clone_write_buffer(wpipe); 687 } else { 688 pipe_destroy_write_buffer(wpipe); 689 } 690 pipeunlock(wpipe); 691 return error; 692 693 error1: 694 wakeup(wpipe); 695 return error; 696 } 697 #endif 698 699 static __inline int 700 pipewrite(wpipe, uio, nbio) 701 struct pipe *wpipe; 702 struct uio *uio; 703 int nbio; 704 { 705 int error = 0; 706 int orig_resid; 707 708 /* 709 * detect loss of pipe read side, issue SIGPIPE if lost. 710 */ 711 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) { 712 return EPIPE; 713 } 714 715 if( wpipe->pipe_buffer.buffer == NULL) { 716 if ((error = pipelock(wpipe,1)) == 0) { 717 pipespace(wpipe); 718 pipeunlock(wpipe); 719 } else { 720 return error; 721 } 722 } 723 724 ++wpipe->pipe_busy; 725 orig_resid = uio->uio_resid; 726 while (uio->uio_resid) { 727 int space; 728 #ifndef PIPE_NODIRECT 729 /* 730 * If the transfer is large, we can gain performance if 731 * we do process-to-process copies directly. 732 */ 733 if ((amountpipekva < LIMITPIPEKVA) && 734 (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) { 735 error = pipe_direct_write( wpipe, uio); 736 if (error) { 737 break; 738 } 739 continue; 740 } 741 #endif 742 743 /* 744 * Pipe buffered writes cannot be coincidental with 745 * direct writes. We wait until the currently executing 746 * direct write is completed before we start filling the 747 * pipe buffer. 748 */ 749 retrywrite: 750 while (wpipe->pipe_state & PIPE_DIRECTW) { 751 if (wpipe->pipe_state & PIPE_WANTR) { 752 wpipe->pipe_state &= ~PIPE_WANTR; 753 wakeup(wpipe); 754 } 755 error = tsleep(wpipe, 756 PRIBIO|PCATCH, "pipbww", 0); 757 if (error) 758 break; 759 } 760 761 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 762 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 763 space = 0; 764 765 /* 766 * We must afford contiguous writes on buffers of size 767 * PIPE_BUF or less. 768 */ 769 if (space > 0) { 770 int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in; 771 if (size > space) 772 size = space; 773 if (size > uio->uio_resid) 774 size = uio->uio_resid; 775 if ((error = pipelock(wpipe,1)) == 0) { 776 /* 777 * It is possible for a direct write to 778 * slip in on us... handle it here... 779 */ 780 if (wpipe->pipe_state & PIPE_DIRECTW) { 781 pipeunlock(wpipe); 782 goto retrywrite; 783 } 784 error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 785 size, uio); 786 pipeunlock(wpipe); 787 } 788 if (error) 789 break; 790 791 wpipe->pipe_buffer.in += size; 792 if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size) 793 wpipe->pipe_buffer.in = 0; 794 795 wpipe->pipe_buffer.cnt += size; 796 } else { 797 /* 798 * If the "read-side" has been blocked, wake it up now. 799 */ 800 if (wpipe->pipe_state & PIPE_WANTR) { 801 wpipe->pipe_state &= ~PIPE_WANTR; 802 wakeup(wpipe); 803 } 804 805 /* 806 * don't block on non-blocking I/O 807 */ 808 if (nbio) { 809 error = EAGAIN; 810 break; 811 } 812 813 /* 814 * We have no more space and have something to offer, 815 * wake up selects. 816 */ 817 pipeselwakeup(wpipe); 818 819 wpipe->pipe_state |= PIPE_WANTW; 820 if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) { 821 break; 822 } 823 /* 824 * If read side wants to go away, we just issue a signal 825 * to ourselves. 826 */ 827 if (wpipe->pipe_state & PIPE_EOF) { 828 error = EPIPE; 829 break; 830 } 831 } 832 } 833 834 if ((wpipe->pipe_busy == 0) && 835 (wpipe->pipe_state & PIPE_WANT)) { 836 wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR); 837 wakeup(wpipe); 838 } else if (wpipe->pipe_buffer.cnt > 0) { 839 /* 840 * If we have put any characters in the buffer, we wake up 841 * the reader. 842 */ 843 if (wpipe->pipe_state & PIPE_WANTR) { 844 wpipe->pipe_state &= ~PIPE_WANTR; 845 wakeup(wpipe); 846 } 847 } 848 849 /* 850 * Don't return EPIPE if I/O was successful 851 */ 852 if ((wpipe->pipe_buffer.cnt == 0) && 853 (uio->uio_resid == 0) && 854 (error == EPIPE)) 855 error = 0; 856 857 if (error = 0) { 858 int s = splhigh(); 859 wpipe->pipe_mtime = time; 860 splx(s); 861 } 862 /* 863 * We have something to offer, 864 * wake up select. 865 */ 866 if (wpipe->pipe_buffer.cnt) 867 pipeselwakeup(wpipe); 868 869 --wpipe->pipe_busy; 870 return error; 871 } 872 873 /* ARGSUSED */ 874 static int 875 pipe_write(fp, uio, cred) 876 struct file *fp; 877 struct uio *uio; 878 struct ucred *cred; 879 { 880 struct pipe *rpipe = (struct pipe *) fp->f_data; 881 struct pipe *wpipe = rpipe->pipe_peer; 882 return pipewrite(wpipe, uio, (rpipe->pipe_state & PIPE_NBIO)?1:0); 883 } 884 885 /* 886 * we implement a very minimal set of ioctls for compatibility with sockets. 887 */ 888 int 889 pipe_ioctl(fp, cmd, data, p) 890 struct file *fp; 891 int cmd; 892 register caddr_t data; 893 struct proc *p; 894 { 895 register struct pipe *mpipe = (struct pipe *)fp->f_data; 896 897 switch (cmd) { 898 899 case FIONBIO: 900 if (*(int *)data) 901 mpipe->pipe_state |= PIPE_NBIO; 902 else 903 mpipe->pipe_state &= ~PIPE_NBIO; 904 return (0); 905 906 case FIOASYNC: 907 if (*(int *)data) { 908 mpipe->pipe_state |= PIPE_ASYNC; 909 } else { 910 mpipe->pipe_state &= ~PIPE_ASYNC; 911 } 912 return (0); 913 914 case FIONREAD: 915 if (mpipe->pipe_state & PIPE_DIRECTW) 916 *(int *)data = mpipe->pipe_map.cnt; 917 else 918 *(int *)data = mpipe->pipe_buffer.cnt; 919 return (0); 920 921 case SIOCSPGRP: 922 mpipe->pipe_pgid = *(int *)data; 923 return (0); 924 925 case SIOCGPGRP: 926 *(int *)data = mpipe->pipe_pgid; 927 return (0); 928 929 } 930 return ENOSYS; 931 } 932 933 int 934 pipe_select(fp, which, p) 935 struct file *fp; 936 int which; 937 struct proc *p; 938 { 939 register struct pipe *rpipe = (struct pipe *)fp->f_data; 940 struct pipe *wpipe; 941 942 wpipe = rpipe->pipe_peer; 943 switch (which) { 944 945 case FREAD: 946 if ( (rpipe->pipe_state & PIPE_DIRECTW) || 947 (rpipe->pipe_buffer.cnt > 0) || 948 (rpipe->pipe_state & PIPE_EOF)) { 949 return (1); 950 } 951 selrecord(p, &rpipe->pipe_sel); 952 rpipe->pipe_state |= PIPE_SEL; 953 break; 954 955 case FWRITE: 956 if ((wpipe == NULL) || 957 (wpipe->pipe_state & PIPE_EOF) || 958 (((wpipe->pipe_state & PIPE_DIRECTW) == 0) && 959 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) { 960 return (1); 961 } 962 selrecord(p, &wpipe->pipe_sel); 963 wpipe->pipe_state |= PIPE_SEL; 964 break; 965 966 case 0: 967 if ((rpipe->pipe_state & PIPE_EOF) || 968 (wpipe == NULL) || 969 (wpipe->pipe_state & PIPE_EOF)) { 970 return (1); 971 } 972 973 selrecord(p, &rpipe->pipe_sel); 974 rpipe->pipe_state |= PIPE_SEL; 975 break; 976 } 977 return (0); 978 } 979 980 int 981 pipe_stat(pipe, ub) 982 register struct pipe *pipe; 983 register struct stat *ub; 984 { 985 bzero((caddr_t)ub, sizeof (*ub)); 986 ub->st_mode = S_IFSOCK; 987 ub->st_blksize = pipe->pipe_buffer.size; 988 ub->st_size = pipe->pipe_buffer.cnt; 989 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 990 TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec); 991 TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec); 992 TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec); 993 return 0; 994 } 995 996 /* ARGSUSED */ 997 static int 998 pipe_close(fp, p) 999 struct file *fp; 1000 struct proc *p; 1001 { 1002 int error = 0; 1003 struct pipe *cpipe = (struct pipe *)fp->f_data; 1004 pipeclose(cpipe); 1005 fp->f_data = NULL; 1006 return 0; 1007 } 1008 1009 /* 1010 * shutdown the pipe 1011 */ 1012 static void 1013 pipeclose(cpipe) 1014 struct pipe *cpipe; 1015 { 1016 struct pipe *ppipe; 1017 if (cpipe) { 1018 1019 pipeselwakeup(cpipe); 1020 1021 /* 1022 * If the other side is blocked, wake it up saying that 1023 * we want to close it down. 1024 */ 1025 while (cpipe->pipe_busy) { 1026 wakeup(cpipe); 1027 cpipe->pipe_state |= PIPE_WANT|PIPE_EOF; 1028 tsleep(cpipe, PRIBIO, "pipecl", 0); 1029 } 1030 1031 /* 1032 * Disconnect from peer 1033 */ 1034 if (ppipe = cpipe->pipe_peer) { 1035 pipeselwakeup(ppipe); 1036 1037 ppipe->pipe_state |= PIPE_EOF; 1038 wakeup(ppipe); 1039 ppipe->pipe_peer = NULL; 1040 } 1041 1042 /* 1043 * free resources 1044 */ 1045 if (cpipe->pipe_buffer.buffer) { 1046 amountpipekva -= cpipe->pipe_buffer.size; 1047 kmem_free(kernel_map, 1048 (vm_offset_t)cpipe->pipe_buffer.buffer, 1049 cpipe->pipe_buffer.size); 1050 } 1051 #ifndef PIPE_NODIRECT 1052 if (cpipe->pipe_map.kva) { 1053 amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE; 1054 kmem_free(kernel_map, 1055 cpipe->pipe_map.kva, 1056 cpipe->pipe_buffer.size + PAGE_SIZE); 1057 } 1058 #endif 1059 free(cpipe, M_TEMP); 1060 } 1061 } 1062 #endif 1063