1 /* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice immediately at the beginning of the file, without modification, 10 * this list of conditions, and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. Absolutely no warranty of function or purpose is made by the author 15 * John S. Dyson. 16 * 4. Modifications may be freely made to this file if the above conditions 17 * are met. 18 * 19 * $Id: sys_pipe.c,v 1.14 1996/03/17 04:52:10 dyson Exp $ 20 */ 21 22 #ifndef OLD_PIPE 23 24 /* 25 * This file contains a high-performance replacement for the socket-based 26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 27 * all features of sockets, but does do everything that pipes normally 28 * do. 29 */ 30 31 /* 32 * This code has two modes of operation, a small write mode and a large 33 * write mode. The small write mode acts like conventional pipes with 34 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 35 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 36 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and 37 * the receiving process can copy it directly from the pages in the sending 38 * process. 39 * 40 * If the sending process receives a signal, it is possible that it will 41 * go away, and certainly its address space can change, because control 42 * is returned back to the user-mode side. In that case, the pipe code 43 * arranges to copy the buffer supplied by the user process, to a pageable 44 * kernel buffer, and the receiving process will grab the data from the 45 * pageable kernel buffer. Since signals don't happen all that often, 46 * the copy operation is normally eliminated. 47 * 48 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 49 * happen for small transfers so that the system will not spend all of 50 * its time context switching. PIPE_SIZE is constrained by the 51 * amount of kernel virtual memory. 52 */ 53 54 #include <sys/param.h> 55 #include <sys/systm.h> 56 #include <sys/proc.h> 57 #include <sys/file.h> 58 #include <sys/protosw.h> 59 #include <sys/stat.h> 60 #include <sys/filedesc.h> 61 #include <sys/malloc.h> 62 #include <sys/ioctl.h> 63 #include <sys/stat.h> 64 #include <sys/select.h> 65 #include <sys/signalvar.h> 66 #include <sys/errno.h> 67 #include <sys/queue.h> 68 #include <sys/vmmeter.h> 69 #include <sys/kernel.h> 70 #include <sys/sysproto.h> 71 #include <sys/pipe.h> 72 73 #include <vm/vm.h> 74 #include <vm/vm_prot.h> 75 #include <vm/vm_param.h> 76 #include <vm/lock.h> 77 #include <vm/vm_object.h> 78 #include <vm/vm_kern.h> 79 #include <vm/vm_extern.h> 80 #include <vm/pmap.h> 81 #include <vm/vm_map.h> 82 #include <vm/vm_page.h> 83 84 /* 85 * Use this define if you want to disable *fancy* VM things. Expect an 86 * approx 30% decrease in transfer rate. This could be useful for 87 * NetBSD or OpenBSD. 88 */ 89 /* #define PIPE_NODIRECT */ 90 91 /* 92 * interfaces to the outside world 93 */ 94 static int pipe_read __P((struct file *fp, struct uio *uio, 95 struct ucred *cred)); 96 static int pipe_write __P((struct file *fp, struct uio *uio, 97 struct ucred *cred)); 98 static int pipe_close __P((struct file *fp, struct proc *p)); 99 static int pipe_select __P((struct file *fp, int which, struct proc *p)); 100 static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p)); 101 102 static struct fileops pipeops = 103 { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close }; 104 105 /* 106 * Default pipe buffer size(s), this can be kind-of large now because pipe 107 * space is pageable. The pipe code will try to maintain locality of 108 * reference for performance reasons, so small amounts of outstanding I/O 109 * will not wipe the cache. 110 */ 111 #define MINPIPESIZE (PIPE_SIZE/3) 112 #define MAXPIPESIZE (2*PIPE_SIZE/3) 113 114 /* 115 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but 116 * is there so that on large systems, we don't exhaust it. 117 */ 118 #define MAXPIPEKVA (8*1024*1024) 119 120 /* 121 * Limit for direct transfers, we cannot, of course limit 122 * the amount of kva for pipes in general though. 123 */ 124 #define LIMITPIPEKVA (16*1024*1024) 125 int amountpipekva; 126 127 static void pipeclose __P((struct pipe *cpipe)); 128 static void pipebufferinit __P((struct pipe *cpipe)); 129 static void pipeinit __P((struct pipe *cpipe)); 130 static __inline int pipelock __P((struct pipe *cpipe, int catch)); 131 static __inline void pipeunlock __P((struct pipe *cpipe)); 132 static __inline void pipeselwakeup __P((struct pipe *cpipe)); 133 #ifndef PIPE_NODIRECT 134 static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio)); 135 static void pipe_destroy_write_buffer __P((struct pipe *wpipe)); 136 static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio)); 137 static void pipe_clone_write_buffer __P((struct pipe *wpipe)); 138 static void pipe_mark_pages_clean __P((struct pipe *cpipe)); 139 #endif 140 static int pipewrite __P((struct pipe *wpipe, struct uio *uio, int nbio)); 141 static void pipespace __P((struct pipe *cpipe)); 142 143 /* 144 * The pipe system call for the DTYPE_PIPE type of pipes 145 */ 146 147 /* ARGSUSED */ 148 int 149 pipe(p, uap, retval) 150 struct proc *p; 151 struct pipe_args /* { 152 int dummy; 153 } */ *uap; 154 int retval[]; 155 { 156 register struct filedesc *fdp = p->p_fd; 157 struct file *rf, *wf; 158 struct pipe *rpipe, *wpipe; 159 int fd, error; 160 161 rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK); 162 pipeinit(rpipe); 163 rpipe->pipe_state |= PIPE_DIRECTOK; 164 wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK); 165 pipeinit(wpipe); 166 wpipe->pipe_state |= PIPE_DIRECTOK; 167 168 error = falloc(p, &rf, &fd); 169 if (error) 170 goto free2; 171 retval[0] = fd; 172 rf->f_flag = FREAD | FWRITE; 173 rf->f_type = DTYPE_PIPE; 174 rf->f_ops = &pipeops; 175 rf->f_data = (caddr_t)rpipe; 176 error = falloc(p, &wf, &fd); 177 if (error) 178 goto free3; 179 wf->f_flag = FREAD | FWRITE; 180 wf->f_type = DTYPE_PIPE; 181 wf->f_ops = &pipeops; 182 wf->f_data = (caddr_t)wpipe; 183 retval[1] = fd; 184 185 rpipe->pipe_peer = wpipe; 186 wpipe->pipe_peer = rpipe; 187 188 return (0); 189 free3: 190 ffree(rf); 191 fdp->fd_ofiles[retval[0]] = 0; 192 free2: 193 (void)pipeclose(wpipe); 194 free1: 195 (void)pipeclose(rpipe); 196 return (error); 197 } 198 199 /* 200 * Allocate kva for pipe circular buffer, the space is pageable 201 */ 202 static void 203 pipespace(cpipe) 204 struct pipe *cpipe; 205 { 206 int npages, error; 207 208 npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE; 209 /* 210 * Create an object, I don't like the idea of paging to/from 211 * kernel_object. 212 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 213 */ 214 cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages); 215 cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map); 216 217 /* 218 * Insert the object into the kernel map, and allocate kva for it. 219 * The map entry is, by default, pageable. 220 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 221 */ 222 error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0, 223 (vm_offset_t *) &cpipe->pipe_buffer.buffer, 224 cpipe->pipe_buffer.size, 1, 225 VM_PROT_ALL, VM_PROT_ALL, 0); 226 227 if (error != KERN_SUCCESS) 228 panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error); 229 amountpipekva += cpipe->pipe_buffer.size; 230 } 231 232 /* 233 * initialize and allocate VM and memory for pipe 234 */ 235 static void 236 pipeinit(cpipe) 237 struct pipe *cpipe; 238 { 239 int s; 240 241 cpipe->pipe_buffer.in = 0; 242 cpipe->pipe_buffer.out = 0; 243 cpipe->pipe_buffer.cnt = 0; 244 cpipe->pipe_buffer.size = PIPE_SIZE; 245 /* Buffer kva gets dynamically allocated */ 246 cpipe->pipe_buffer.buffer = NULL; 247 248 cpipe->pipe_state = 0; 249 cpipe->pipe_peer = NULL; 250 cpipe->pipe_busy = 0; 251 s = splhigh(); 252 cpipe->pipe_ctime = time; 253 cpipe->pipe_atime = time; 254 cpipe->pipe_mtime = time; 255 splx(s); 256 bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel); 257 258 #ifndef PIPE_NODIRECT 259 /* 260 * pipe data structure initializations to support direct pipe I/O 261 */ 262 cpipe->pipe_map.cnt = 0; 263 cpipe->pipe_map.kva = 0; 264 cpipe->pipe_map.pos = 0; 265 cpipe->pipe_map.npages = 0; 266 #endif 267 } 268 269 270 /* 271 * lock a pipe for I/O, blocking other access 272 */ 273 static __inline int 274 pipelock(cpipe, catch) 275 struct pipe *cpipe; 276 int catch; 277 { 278 int error; 279 while (cpipe->pipe_state & PIPE_LOCK) { 280 cpipe->pipe_state |= PIPE_LWANT; 281 if (error = tsleep( cpipe, 282 catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) { 283 return error; 284 } 285 } 286 cpipe->pipe_state |= PIPE_LOCK; 287 return 0; 288 } 289 290 /* 291 * unlock a pipe I/O lock 292 */ 293 static __inline void 294 pipeunlock(cpipe) 295 struct pipe *cpipe; 296 { 297 cpipe->pipe_state &= ~PIPE_LOCK; 298 if (cpipe->pipe_state & PIPE_LWANT) { 299 cpipe->pipe_state &= ~PIPE_LWANT; 300 wakeup(cpipe); 301 } 302 return; 303 } 304 305 static __inline void 306 pipeselwakeup(cpipe) 307 struct pipe *cpipe; 308 { 309 if (cpipe->pipe_state & PIPE_SEL) { 310 cpipe->pipe_state &= ~PIPE_SEL; 311 selwakeup(&cpipe->pipe_sel); 312 } 313 } 314 315 #ifndef PIPE_NODIRECT 316 #if 0 317 static void 318 pipe_mark_pages_clean(cpipe) 319 struct pipe *cpipe; 320 { 321 vm_size_t off; 322 vm_page_t m; 323 324 for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) { 325 m = vm_page_lookup(cpipe->pipe_buffer.object, off); 326 if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) { 327 m->dirty = 0; 328 pmap_clear_modify(VM_PAGE_TO_PHYS(m)); 329 } 330 } 331 } 332 #endif 333 #endif 334 335 /* ARGSUSED */ 336 static int 337 pipe_read(fp, uio, cred) 338 struct file *fp; 339 struct uio *uio; 340 struct ucred *cred; 341 { 342 343 struct pipe *rpipe = (struct pipe *) fp->f_data; 344 int error = 0; 345 int nread = 0; 346 int size; 347 348 ++rpipe->pipe_busy; 349 while (uio->uio_resid) { 350 /* 351 * normal pipe buffer receive 352 */ 353 if (rpipe->pipe_buffer.cnt > 0) { 354 int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 355 if (size > rpipe->pipe_buffer.cnt) 356 size = rpipe->pipe_buffer.cnt; 357 if (size > uio->uio_resid) 358 size = uio->uio_resid; 359 if ((error = pipelock(rpipe,1)) == 0) { 360 error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 361 size, uio); 362 pipeunlock(rpipe); 363 } 364 if (error) { 365 break; 366 } 367 rpipe->pipe_buffer.out += size; 368 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 369 rpipe->pipe_buffer.out = 0; 370 371 rpipe->pipe_buffer.cnt -= size; 372 nread += size; 373 #ifndef PIPE_NODIRECT 374 /* 375 * Direct copy, bypassing a kernel buffer. 376 */ 377 } else if ((size = rpipe->pipe_map.cnt) && 378 (rpipe->pipe_state & PIPE_DIRECTW)) { 379 caddr_t va; 380 if (size > uio->uio_resid) 381 size = uio->uio_resid; 382 if ((error = pipelock(rpipe,1)) == 0) { 383 va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos; 384 error = uiomove(va, size, uio); 385 pipeunlock(rpipe); 386 } 387 if (error) 388 break; 389 nread += size; 390 rpipe->pipe_map.pos += size; 391 rpipe->pipe_map.cnt -= size; 392 if (rpipe->pipe_map.cnt == 0) { 393 rpipe->pipe_state &= ~PIPE_DIRECTW; 394 wakeup(rpipe); 395 } 396 #endif 397 } else { 398 /* 399 * detect EOF condition 400 */ 401 if (rpipe->pipe_state & PIPE_EOF) { 402 /* XXX error = ? */ 403 break; 404 } 405 /* 406 * If the "write-side" has been blocked, wake it up now. 407 */ 408 if (rpipe->pipe_state & PIPE_WANTW) { 409 rpipe->pipe_state &= ~PIPE_WANTW; 410 wakeup(rpipe); 411 } 412 if (nread > 0) 413 break; 414 if (rpipe->pipe_state & PIPE_NBIO) { 415 error = EAGAIN; 416 break; 417 } 418 419 /* 420 * If there is no more to read in the pipe, reset 421 * its pointers to the beginning. This improves 422 * cache hit stats. 423 */ 424 425 if ((error = pipelock(rpipe,1)) == 0) { 426 if (rpipe->pipe_buffer.cnt == 0) { 427 rpipe->pipe_buffer.in = 0; 428 rpipe->pipe_buffer.out = 0; 429 } 430 pipeunlock(rpipe); 431 } else { 432 break; 433 } 434 435 if (rpipe->pipe_state & PIPE_WANTW) { 436 rpipe->pipe_state &= ~PIPE_WANTW; 437 wakeup(rpipe); 438 } 439 440 rpipe->pipe_state |= PIPE_WANTR; 441 if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) { 442 break; 443 } 444 } 445 } 446 447 if (error == 0) { 448 int s = splhigh(); 449 rpipe->pipe_atime = time; 450 splx(s); 451 } 452 453 --rpipe->pipe_busy; 454 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 455 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 456 wakeup(rpipe); 457 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 458 /* 459 * If there is no more to read in the pipe, reset 460 * its pointers to the beginning. This improves 461 * cache hit stats. 462 */ 463 if ((error == 0) && (error = pipelock(rpipe,1)) == 0) { 464 if (rpipe->pipe_buffer.cnt == 0) { 465 #if 0 466 pipe_mark_pages_clean(rpipe); 467 #endif 468 rpipe->pipe_buffer.in = 0; 469 rpipe->pipe_buffer.out = 0; 470 } 471 pipeunlock(rpipe); 472 } 473 474 /* 475 * If the "write-side" has been blocked, wake it up now. 476 */ 477 if (rpipe->pipe_state & PIPE_WANTW) { 478 rpipe->pipe_state &= ~PIPE_WANTW; 479 wakeup(rpipe); 480 } 481 } 482 483 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 484 pipeselwakeup(rpipe); 485 486 return error; 487 } 488 489 #ifndef PIPE_NODIRECT 490 /* 491 * Map the sending processes' buffer into kernel space and wire it. 492 * This is similar to a physical write operation. 493 */ 494 static int 495 pipe_build_write_buffer(wpipe, uio) 496 struct pipe *wpipe; 497 struct uio *uio; 498 { 499 int size; 500 int i; 501 vm_offset_t addr, endaddr, paddr; 502 503 size = uio->uio_iov->iov_len; 504 if (size > wpipe->pipe_buffer.size) 505 size = wpipe->pipe_buffer.size; 506 507 endaddr = round_page(uio->uio_iov->iov_base + size); 508 for(i = 0, addr = trunc_page(uio->uio_iov->iov_base); 509 addr < endaddr; 510 addr += PAGE_SIZE, i+=1) { 511 512 vm_page_t m; 513 514 vm_fault_quick( (caddr_t) addr, VM_PROT_READ); 515 paddr = pmap_kextract(addr); 516 if (!paddr) { 517 int j; 518 for(j=0;j<i;j++) 519 vm_page_unwire(wpipe->pipe_map.ms[j]); 520 return EFAULT; 521 } 522 523 m = PHYS_TO_VM_PAGE(paddr); 524 vm_page_wire(m); 525 wpipe->pipe_map.ms[i] = m; 526 } 527 528 /* 529 * set up the control block 530 */ 531 wpipe->pipe_map.npages = i; 532 wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK; 533 wpipe->pipe_map.cnt = size; 534 535 /* 536 * and map the buffer 537 */ 538 if (wpipe->pipe_map.kva == 0) { 539 /* 540 * We need to allocate space for an extra page because the 541 * address range might (will) span pages at times. 542 */ 543 wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map, 544 wpipe->pipe_buffer.size + PAGE_SIZE); 545 amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE; 546 } 547 pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms, 548 wpipe->pipe_map.npages); 549 550 /* 551 * and update the uio data 552 */ 553 554 uio->uio_iov->iov_len -= size; 555 uio->uio_iov->iov_base += size; 556 if (uio->uio_iov->iov_len == 0) 557 uio->uio_iov++; 558 uio->uio_resid -= size; 559 uio->uio_offset += size; 560 return 0; 561 } 562 563 /* 564 * unmap and unwire the process buffer 565 */ 566 static void 567 pipe_destroy_write_buffer(wpipe) 568 struct pipe *wpipe; 569 { 570 int i; 571 pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages); 572 573 if (wpipe->pipe_map.kva) { 574 if (amountpipekva > MAXPIPEKVA) { 575 vm_offset_t kva = wpipe->pipe_map.kva; 576 wpipe->pipe_map.kva = 0; 577 kmem_free(kernel_map, kva, 578 wpipe->pipe_buffer.size + PAGE_SIZE); 579 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE; 580 } 581 } 582 for (i=0;i<wpipe->pipe_map.npages;i++) 583 vm_page_unwire(wpipe->pipe_map.ms[i]); 584 } 585 586 /* 587 * In the case of a signal, the writing process might go away. This 588 * code copies the data into the circular buffer so that the source 589 * pages can be freed without loss of data. 590 */ 591 static void 592 pipe_clone_write_buffer(wpipe) 593 struct pipe *wpipe; 594 { 595 int size; 596 int pos; 597 598 size = wpipe->pipe_map.cnt; 599 pos = wpipe->pipe_map.pos; 600 bcopy((caddr_t) wpipe->pipe_map.kva+pos, 601 (caddr_t) wpipe->pipe_buffer.buffer, 602 size); 603 604 wpipe->pipe_buffer.in = size; 605 wpipe->pipe_buffer.out = 0; 606 wpipe->pipe_buffer.cnt = size; 607 wpipe->pipe_state &= ~PIPE_DIRECTW; 608 609 pipe_destroy_write_buffer(wpipe); 610 } 611 612 /* 613 * This implements the pipe buffer write mechanism. Note that only 614 * a direct write OR a normal pipe write can be pending at any given time. 615 * If there are any characters in the pipe buffer, the direct write will 616 * be deferred until the receiving process grabs all of the bytes from 617 * the pipe buffer. Then the direct mapping write is set-up. 618 */ 619 static int 620 pipe_direct_write(wpipe, uio) 621 struct pipe *wpipe; 622 struct uio *uio; 623 { 624 int error; 625 retry: 626 while (wpipe->pipe_state & PIPE_DIRECTW) { 627 if ( wpipe->pipe_state & PIPE_WANTR) { 628 wpipe->pipe_state &= ~PIPE_WANTR; 629 wakeup(wpipe); 630 } 631 wpipe->pipe_state |= PIPE_WANTW; 632 error = tsleep(wpipe, 633 PRIBIO|PCATCH, "pipdww", 0); 634 if (error) 635 goto error1; 636 if (wpipe->pipe_state & PIPE_EOF) { 637 error = EPIPE; 638 goto error1; 639 } 640 } 641 wpipe->pipe_map.cnt = 0; /* transfer not ready yet */ 642 if (wpipe->pipe_buffer.cnt > 0) { 643 if ( wpipe->pipe_state & PIPE_WANTR) { 644 wpipe->pipe_state &= ~PIPE_WANTR; 645 wakeup(wpipe); 646 } 647 648 wpipe->pipe_state |= PIPE_WANTW; 649 error = tsleep(wpipe, 650 PRIBIO|PCATCH, "pipdwc", 0); 651 if (error) 652 goto error1; 653 if (wpipe->pipe_state & PIPE_EOF) { 654 error = EPIPE; 655 goto error1; 656 } 657 goto retry; 658 } 659 660 wpipe->pipe_state |= PIPE_DIRECTW; 661 662 error = pipe_build_write_buffer(wpipe, uio); 663 if (error) { 664 wpipe->pipe_state &= ~PIPE_DIRECTW; 665 goto error1; 666 } 667 668 error = 0; 669 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) { 670 if (wpipe->pipe_state & PIPE_EOF) { 671 pipelock(wpipe, 0); 672 pipe_destroy_write_buffer(wpipe); 673 pipeunlock(wpipe); 674 pipeselwakeup(wpipe); 675 error = EPIPE; 676 goto error1; 677 } 678 if (wpipe->pipe_state & PIPE_WANTR) { 679 wpipe->pipe_state &= ~PIPE_WANTR; 680 wakeup(wpipe); 681 } 682 pipeselwakeup(wpipe); 683 error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0); 684 } 685 686 pipelock(wpipe,0); 687 if (wpipe->pipe_state & PIPE_DIRECTW) { 688 /* 689 * this bit of trickery substitutes a kernel buffer for 690 * the process that might be going away. 691 */ 692 pipe_clone_write_buffer(wpipe); 693 } else { 694 pipe_destroy_write_buffer(wpipe); 695 } 696 pipeunlock(wpipe); 697 return error; 698 699 error1: 700 wakeup(wpipe); 701 return error; 702 } 703 #endif 704 705 static __inline int 706 pipewrite(wpipe, uio, nbio) 707 struct pipe *wpipe; 708 struct uio *uio; 709 int nbio; 710 { 711 int error = 0; 712 int orig_resid; 713 714 /* 715 * detect loss of pipe read side, issue SIGPIPE if lost. 716 */ 717 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) { 718 return EPIPE; 719 } 720 721 if( wpipe->pipe_buffer.buffer == NULL) { 722 if ((error = pipelock(wpipe,1)) == 0) { 723 pipespace(wpipe); 724 pipeunlock(wpipe); 725 } else { 726 return error; 727 } 728 } 729 730 ++wpipe->pipe_busy; 731 orig_resid = uio->uio_resid; 732 while (uio->uio_resid) { 733 int space; 734 #ifndef PIPE_NODIRECT 735 /* 736 * If the transfer is large, we can gain performance if 737 * we do process-to-process copies directly. 738 */ 739 if ((amountpipekva < LIMITPIPEKVA) && 740 (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) { 741 error = pipe_direct_write( wpipe, uio); 742 if (error) { 743 break; 744 } 745 continue; 746 } 747 #endif 748 749 /* 750 * Pipe buffered writes cannot be coincidental with 751 * direct writes. We wait until the currently executing 752 * direct write is completed before we start filling the 753 * pipe buffer. 754 */ 755 retrywrite: 756 while (wpipe->pipe_state & PIPE_DIRECTW) { 757 if (wpipe->pipe_state & PIPE_WANTR) { 758 wpipe->pipe_state &= ~PIPE_WANTR; 759 wakeup(wpipe); 760 } 761 error = tsleep(wpipe, 762 PRIBIO|PCATCH, "pipbww", 0); 763 if (error) 764 break; 765 } 766 767 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 768 769 /* Writes of size <= PIPE_BUF must be atomic. */ 770 /* XXX perhaps they need to be contiguous to be atomic? */ 771 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 772 space = 0; 773 774 if (space > 0) { 775 int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in; 776 if (size > space) 777 size = space; 778 if (size > uio->uio_resid) 779 size = uio->uio_resid; 780 if ((error = pipelock(wpipe,1)) == 0) { 781 /* 782 * It is possible for a direct write to 783 * slip in on us... handle it here... 784 */ 785 if (wpipe->pipe_state & PIPE_DIRECTW) { 786 pipeunlock(wpipe); 787 goto retrywrite; 788 } 789 error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 790 size, uio); 791 pipeunlock(wpipe); 792 } 793 if (error) 794 break; 795 796 wpipe->pipe_buffer.in += size; 797 if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size) 798 wpipe->pipe_buffer.in = 0; 799 800 wpipe->pipe_buffer.cnt += size; 801 } else { 802 /* 803 * If the "read-side" has been blocked, wake it up now. 804 */ 805 if (wpipe->pipe_state & PIPE_WANTR) { 806 wpipe->pipe_state &= ~PIPE_WANTR; 807 wakeup(wpipe); 808 } 809 810 /* 811 * don't block on non-blocking I/O 812 */ 813 if (nbio) { 814 error = EAGAIN; 815 break; 816 } 817 818 /* 819 * We have no more space and have something to offer, 820 * wake up selects. 821 */ 822 pipeselwakeup(wpipe); 823 824 wpipe->pipe_state |= PIPE_WANTW; 825 if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) { 826 break; 827 } 828 /* 829 * If read side wants to go away, we just issue a signal 830 * to ourselves. 831 */ 832 if (wpipe->pipe_state & PIPE_EOF) { 833 error = EPIPE; 834 break; 835 } 836 } 837 } 838 839 --wpipe->pipe_busy; 840 if ((wpipe->pipe_busy == 0) && 841 (wpipe->pipe_state & PIPE_WANT)) { 842 wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR); 843 wakeup(wpipe); 844 } else if (wpipe->pipe_buffer.cnt > 0) { 845 /* 846 * If we have put any characters in the buffer, we wake up 847 * the reader. 848 */ 849 if (wpipe->pipe_state & PIPE_WANTR) { 850 wpipe->pipe_state &= ~PIPE_WANTR; 851 wakeup(wpipe); 852 } 853 } 854 855 /* 856 * Don't return EPIPE if I/O was successful 857 */ 858 if ((wpipe->pipe_buffer.cnt == 0) && 859 (uio->uio_resid == 0) && 860 (error == EPIPE)) 861 error = 0; 862 863 if (error == 0) { 864 int s = splhigh(); 865 wpipe->pipe_mtime = time; 866 splx(s); 867 } 868 /* 869 * We have something to offer, 870 * wake up select. 871 */ 872 if (wpipe->pipe_buffer.cnt) 873 pipeselwakeup(wpipe); 874 875 return error; 876 } 877 878 /* ARGSUSED */ 879 static int 880 pipe_write(fp, uio, cred) 881 struct file *fp; 882 struct uio *uio; 883 struct ucred *cred; 884 { 885 struct pipe *rpipe = (struct pipe *) fp->f_data; 886 struct pipe *wpipe = rpipe->pipe_peer; 887 return pipewrite(wpipe, uio, (rpipe->pipe_state & PIPE_NBIO)?1:0); 888 } 889 890 /* 891 * we implement a very minimal set of ioctls for compatibility with sockets. 892 */ 893 int 894 pipe_ioctl(fp, cmd, data, p) 895 struct file *fp; 896 int cmd; 897 register caddr_t data; 898 struct proc *p; 899 { 900 register struct pipe *mpipe = (struct pipe *)fp->f_data; 901 902 switch (cmd) { 903 904 case FIONBIO: 905 if (*(int *)data) 906 mpipe->pipe_state |= PIPE_NBIO; 907 else 908 mpipe->pipe_state &= ~PIPE_NBIO; 909 return (0); 910 911 case FIOASYNC: 912 if (*(int *)data) { 913 mpipe->pipe_state |= PIPE_ASYNC; 914 } else { 915 mpipe->pipe_state &= ~PIPE_ASYNC; 916 } 917 return (0); 918 919 case FIONREAD: 920 if (mpipe->pipe_state & PIPE_DIRECTW) 921 *(int *)data = mpipe->pipe_map.cnt; 922 else 923 *(int *)data = mpipe->pipe_buffer.cnt; 924 return (0); 925 926 case SIOCSPGRP: 927 mpipe->pipe_pgid = *(int *)data; 928 return (0); 929 930 case SIOCGPGRP: 931 *(int *)data = mpipe->pipe_pgid; 932 return (0); 933 934 } 935 return ENOSYS; 936 } 937 938 int 939 pipe_select(fp, which, p) 940 struct file *fp; 941 int which; 942 struct proc *p; 943 { 944 register struct pipe *rpipe = (struct pipe *)fp->f_data; 945 struct pipe *wpipe; 946 947 wpipe = rpipe->pipe_peer; 948 switch (which) { 949 950 case FREAD: 951 if ( (rpipe->pipe_state & PIPE_DIRECTW) || 952 (rpipe->pipe_buffer.cnt > 0) || 953 (rpipe->pipe_state & PIPE_EOF)) { 954 return (1); 955 } 956 selrecord(p, &rpipe->pipe_sel); 957 rpipe->pipe_state |= PIPE_SEL; 958 break; 959 960 case FWRITE: 961 if ((wpipe == NULL) || 962 (wpipe->pipe_state & PIPE_EOF) || 963 (((wpipe->pipe_state & PIPE_DIRECTW) == 0) && 964 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) { 965 return (1); 966 } 967 selrecord(p, &wpipe->pipe_sel); 968 wpipe->pipe_state |= PIPE_SEL; 969 break; 970 971 case 0: 972 if ((rpipe->pipe_state & PIPE_EOF) || 973 (wpipe == NULL) || 974 (wpipe->pipe_state & PIPE_EOF)) { 975 return (1); 976 } 977 978 selrecord(p, &rpipe->pipe_sel); 979 rpipe->pipe_state |= PIPE_SEL; 980 break; 981 } 982 return (0); 983 } 984 985 int 986 pipe_stat(pipe, ub) 987 register struct pipe *pipe; 988 register struct stat *ub; 989 { 990 bzero((caddr_t)ub, sizeof (*ub)); 991 ub->st_mode = S_IFSOCK; 992 ub->st_blksize = pipe->pipe_buffer.size; 993 ub->st_size = pipe->pipe_buffer.cnt; 994 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 995 TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec); 996 TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec); 997 TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec); 998 return 0; 999 } 1000 1001 /* ARGSUSED */ 1002 static int 1003 pipe_close(fp, p) 1004 struct file *fp; 1005 struct proc *p; 1006 { 1007 int error = 0; 1008 struct pipe *cpipe = (struct pipe *)fp->f_data; 1009 pipeclose(cpipe); 1010 fp->f_data = NULL; 1011 return 0; 1012 } 1013 1014 /* 1015 * shutdown the pipe 1016 */ 1017 static void 1018 pipeclose(cpipe) 1019 struct pipe *cpipe; 1020 { 1021 struct pipe *ppipe; 1022 if (cpipe) { 1023 1024 pipeselwakeup(cpipe); 1025 1026 /* 1027 * If the other side is blocked, wake it up saying that 1028 * we want to close it down. 1029 */ 1030 while (cpipe->pipe_busy) { 1031 wakeup(cpipe); 1032 cpipe->pipe_state |= PIPE_WANT|PIPE_EOF; 1033 tsleep(cpipe, PRIBIO, "pipecl", 0); 1034 } 1035 1036 /* 1037 * Disconnect from peer 1038 */ 1039 if (ppipe = cpipe->pipe_peer) { 1040 pipeselwakeup(ppipe); 1041 1042 ppipe->pipe_state |= PIPE_EOF; 1043 wakeup(ppipe); 1044 ppipe->pipe_peer = NULL; 1045 } 1046 1047 /* 1048 * free resources 1049 */ 1050 if (cpipe->pipe_buffer.buffer) { 1051 amountpipekva -= cpipe->pipe_buffer.size; 1052 kmem_free(kernel_map, 1053 (vm_offset_t)cpipe->pipe_buffer.buffer, 1054 cpipe->pipe_buffer.size); 1055 } 1056 #ifndef PIPE_NODIRECT 1057 if (cpipe->pipe_map.kva) { 1058 amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE; 1059 kmem_free(kernel_map, 1060 cpipe->pipe_map.kva, 1061 cpipe->pipe_buffer.size + PAGE_SIZE); 1062 } 1063 #endif 1064 free(cpipe, M_TEMP); 1065 } 1066 } 1067 #endif 1068