1 /* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice immediately at the beginning of the file, without modification, 10 * this list of conditions, and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. Absolutely no warranty of function or purpose is made by the author 15 * John S. Dyson. 16 * 4. This work was done expressly for inclusion into FreeBSD. Other use 17 * is allowed if this notation is included. 18 * 5. Modifications may be freely made to this file if the above conditions 19 * are met. 20 * 21 * $Id: sys_pipe.c,v 1.2 1996/01/29 02:57:33 dyson Exp $ 22 */ 23 24 #ifndef OLD_PIPE 25 26 /* 27 * This file contains a high-performance replacement for the socket-based 28 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 29 * all features of sockets, but does do everything that pipes normally 30 * do. 31 */ 32 33 /* 34 * This code has two modes of operation, a small write mode and a large 35 * write mode. The small write mode acts like conventional pipes with 36 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 37 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 38 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and 39 * the receiving process can copy it directly from the pages in the sending 40 * process. 41 * 42 * If the sending process receives a signal, it is possible that it will 43 * go away, and certainly it's address space can change, because control 44 * is returned back to the user-mode side. In that case, the pipe code 45 * arranges to copy the buffer supplied by the user process, to a pageable 46 * kernel buffer, and the receiving process will grab the data from the 47 * pageable kernel buffer. Since signals don't happen all that often, 48 * the copy operation is normally eliminated. 49 * 50 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 51 * happen for small transfers so that the system will not spend all of 52 * it's time context switching. PIPE_SIZE is constrained by the 53 * amount of kernel virtual memory. 54 */ 55 56 #include <sys/param.h> 57 #include <sys/systm.h> 58 #include <sys/proc.h> 59 #include <sys/file.h> 60 #include <sys/protosw.h> 61 #include <sys/stat.h> 62 #include <sys/filedesc.h> 63 #include <sys/malloc.h> 64 #include <sys/ioctl.h> 65 #include <sys/stat.h> 66 #include <sys/select.h> 67 #include <sys/signalvar.h> 68 #include <sys/errno.h> 69 #include <sys/queue.h> 70 #include <sys/vmmeter.h> 71 #include <sys/kernel.h> 72 #include <sys/sysproto.h> 73 #include <sys/pipe.h> 74 75 #include <vm/vm.h> 76 #include <vm/vm_prot.h> 77 #include <vm/vm_param.h> 78 #include <vm/lock.h> 79 #include <vm/vm_object.h> 80 #include <vm/vm_kern.h> 81 #include <vm/vm_extern.h> 82 #include <vm/pmap.h> 83 #include <vm/vm_map.h> 84 #include <vm/vm_page.h> 85 86 static int pipe_read __P((struct file *fp, struct uio *uio, 87 struct ucred *cred)); 88 static int pipe_write __P((struct file *fp, struct uio *uio, 89 struct ucred *cred)); 90 static int pipe_close __P((struct file *fp, struct proc *p)); 91 static int pipe_select __P((struct file *fp, int which, struct proc *p)); 92 static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p)); 93 94 static struct fileops pipeops = 95 { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close }; 96 97 /* 98 * Default pipe buffer size(s), this can be kind-of large now because pipe 99 * space is pageable. The pipe code will try to maintain locality of 100 * reference for performance reasons, so small amounts of outstanding I/O 101 * will not wipe the cache. 102 */ 103 #define MINPIPESIZE (PIPE_SIZE/3) 104 #define MAXPIPESIZE (2*PIPE_SIZE/3) 105 106 /* 107 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but 108 * is there so that on large systems, we don't exhaust it. 109 */ 110 #define MAXPIPEKVA (8*1024*1024) 111 112 /* 113 * Limit for direct transfers, we cannot, of course limit 114 * the amount of kva for pipes in general though. 115 */ 116 #define LIMITPIPEKVA (16*1024*1024) 117 int amountpipekva; 118 119 static void pipeclose __P((struct pipe *cpipe)); 120 static void pipebufferinit __P((struct pipe *cpipe)); 121 static void pipeinit __P((struct pipe *cpipe)); 122 static __inline int pipelock __P((struct pipe *cpipe, int catch)); 123 static __inline void pipeunlock __P((struct pipe *cpipe)); 124 static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio)); 125 static void pipe_destroy_write_buffer __P((struct pipe *wpipe)); 126 static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio)); 127 static void pipe_clone_write_buffer __P((struct pipe *wpipe)); 128 static void pipe_mark_pages_clean __P((struct pipe *cpipe)); 129 static int pipewrite __P((struct pipe *wpipe, struct uio *uio, int nbio)); 130 static void pipespace __P((struct pipe *cpipe)); 131 132 /* 133 * The pipe system call for the DTYPE_PIPE type of pipes 134 */ 135 136 /* ARGSUSED */ 137 int 138 pipe(p, uap, retval) 139 struct proc *p; 140 struct pipe_args /* { 141 int dummy; 142 } */ *uap; 143 int retval[]; 144 { 145 register struct filedesc *fdp = p->p_fd; 146 struct file *rf, *wf; 147 struct pipe *rpipe, *wpipe; 148 int fd, error; 149 150 rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK); 151 pipeinit(rpipe); 152 rpipe->pipe_state |= PIPE_DIRECTOK; 153 wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK); 154 pipeinit(wpipe); 155 wpipe->pipe_state |= PIPE_DIRECTOK; 156 157 error = falloc(p, &rf, &fd); 158 if (error) 159 goto free2; 160 retval[0] = fd; 161 rf->f_flag = FREAD | FWRITE; 162 rf->f_type = DTYPE_PIPE; 163 rf->f_ops = &pipeops; 164 rf->f_data = (caddr_t)rpipe; 165 error = falloc(p, &wf, &fd); 166 if (error) 167 goto free3; 168 wf->f_flag = FREAD | FWRITE; 169 wf->f_type = DTYPE_PIPE; 170 wf->f_ops = &pipeops; 171 wf->f_data = (caddr_t)wpipe; 172 retval[1] = fd; 173 174 rpipe->pipe_peer = wpipe; 175 wpipe->pipe_peer = rpipe; 176 177 return (0); 178 free3: 179 ffree(rf); 180 fdp->fd_ofiles[retval[0]] = 0; 181 free2: 182 (void)pipeclose(wpipe); 183 free1: 184 (void)pipeclose(rpipe); 185 return (error); 186 } 187 188 static void 189 pipespace(cpipe) 190 struct pipe *cpipe; 191 { 192 int npages, error; 193 194 npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE; 195 /* 196 * Create an object, I don't like the idea of paging to/from 197 * kernel_object. 198 */ 199 cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages); 200 cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map); 201 202 /* 203 * Insert the object into the kernel map, and allocate kva for it. 204 * The map entry is, by default, pageable. 205 */ 206 error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0, 207 (vm_offset_t *) &cpipe->pipe_buffer.buffer, 208 cpipe->pipe_buffer.size, 1, 209 VM_PROT_ALL, VM_PROT_ALL, 0); 210 211 if (error != KERN_SUCCESS) 212 panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error); 213 amountpipekva += cpipe->pipe_buffer.size; 214 } 215 216 /* 217 * initialize and allocate VM and memory for pipe 218 */ 219 static void 220 pipeinit(cpipe) 221 struct pipe *cpipe; 222 { 223 224 cpipe->pipe_buffer.in = 0; 225 cpipe->pipe_buffer.out = 0; 226 cpipe->pipe_buffer.cnt = 0; 227 cpipe->pipe_buffer.size = PIPE_SIZE; 228 /* Buffer kva gets dynamically allocated */ 229 cpipe->pipe_buffer.buffer = NULL; 230 231 cpipe->pipe_state = 0; 232 cpipe->pipe_peer = NULL; 233 cpipe->pipe_busy = 0; 234 cpipe->pipe_ctime = time; 235 cpipe->pipe_atime = time; 236 cpipe->pipe_mtime = time; 237 bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel); 238 239 /* 240 * pipe data structure initializations to support direct pipe I/O 241 */ 242 cpipe->pipe_map.cnt = 0; 243 cpipe->pipe_map.kva = 0; 244 cpipe->pipe_map.pos = 0; 245 cpipe->pipe_map.npages = 0; 246 } 247 248 249 /* 250 * lock a pipe for I/O, blocking other access 251 */ 252 static __inline int 253 pipelock(cpipe, catch) 254 struct pipe *cpipe; 255 int catch; 256 { 257 int error; 258 while (cpipe->pipe_state & PIPE_LOCK) { 259 cpipe->pipe_state |= PIPE_LWANT; 260 if (error = tsleep( &cpipe->pipe_state, 261 catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) { 262 return error; 263 } 264 } 265 cpipe->pipe_state |= PIPE_LOCK; 266 return 0; 267 } 268 269 /* 270 * unlock a pipe I/O lock 271 */ 272 static __inline void 273 pipeunlock(cpipe) 274 struct pipe *cpipe; 275 { 276 cpipe->pipe_state &= ~PIPE_LOCK; 277 if (cpipe->pipe_state & PIPE_LWANT) { 278 cpipe->pipe_state &= ~PIPE_LWANT; 279 wakeup(&cpipe->pipe_state); 280 } 281 return; 282 } 283 284 #if 0 285 static void 286 pipe_mark_pages_clean(cpipe) 287 struct pipe *cpipe; 288 { 289 vm_size_t off; 290 vm_page_t m; 291 292 for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) { 293 m = vm_page_lookup(cpipe->pipe_buffer.object, off); 294 if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) { 295 m->dirty = 0; 296 pmap_clear_modify(VM_PAGE_TO_PHYS(m)); 297 } 298 } 299 } 300 #endif 301 302 /* ARGSUSED */ 303 static int 304 pipe_read(fp, uio, cred) 305 struct file *fp; 306 struct uio *uio; 307 struct ucred *cred; 308 { 309 310 struct pipe *rpipe = (struct pipe *) fp->f_data; 311 int error = 0; 312 int nread = 0; 313 int size; 314 315 ++rpipe->pipe_busy; 316 while (uio->uio_resid) { 317 /* 318 * normal pipe buffer receive 319 */ 320 if (rpipe->pipe_buffer.cnt > 0) { 321 int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 322 if (size > rpipe->pipe_buffer.cnt) 323 size = rpipe->pipe_buffer.cnt; 324 if (size > uio->uio_resid) 325 size = uio->uio_resid; 326 if ((error = pipelock(rpipe,1)) == 0) { 327 error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 328 size, uio); 329 pipeunlock(rpipe); 330 } 331 if (error) { 332 break; 333 } 334 rpipe->pipe_buffer.out += size; 335 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 336 rpipe->pipe_buffer.out = 0; 337 338 rpipe->pipe_buffer.cnt -= size; 339 nread += size; 340 rpipe->pipe_atime = time; 341 /* 342 * Direct copy, bypassing a kernel buffer. 343 */ 344 } else if ((size = rpipe->pipe_map.cnt) && 345 (rpipe->pipe_state & PIPE_DIRECTW)) { 346 caddr_t va; 347 if (size > uio->uio_resid) 348 size = uio->uio_resid; 349 if ((error = pipelock(rpipe,1)) == 0) { 350 va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos; 351 error = uiomove(va, size, uio); 352 pipeunlock(rpipe); 353 } 354 if (error) 355 break; 356 nread += size; 357 rpipe->pipe_atime = time; 358 rpipe->pipe_map.pos += size; 359 rpipe->pipe_map.cnt -= size; 360 if (rpipe->pipe_map.cnt == 0) { 361 rpipe->pipe_state &= ~PIPE_DIRECTW; 362 wakeup(rpipe); 363 } 364 } else { 365 /* 366 * detect EOF condition 367 */ 368 if (rpipe->pipe_state & PIPE_EOF) { 369 break; 370 } 371 /* 372 * If the "write-side" has been blocked, wake it up now. 373 */ 374 if (rpipe->pipe_state & PIPE_WANTW) { 375 rpipe->pipe_state &= ~PIPE_WANTW; 376 wakeup(rpipe); 377 } 378 if (nread > 0) 379 break; 380 if (rpipe->pipe_state & PIPE_NBIO) { 381 error = EAGAIN; 382 break; 383 } 384 385 /* 386 * If there is no more to read in the pipe, reset 387 * it's pointers to the beginning. This improves 388 * cache hit stats. 389 */ 390 391 if ((error = pipelock(rpipe,1)) == 0) { 392 if (rpipe->pipe_buffer.cnt == 0) { 393 rpipe->pipe_buffer.in = 0; 394 rpipe->pipe_buffer.out = 0; 395 } 396 pipeunlock(rpipe); 397 } else { 398 break; 399 } 400 rpipe->pipe_state |= PIPE_WANTR; 401 if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) { 402 break; 403 } 404 } 405 } 406 407 --rpipe->pipe_busy; 408 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 409 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 410 wakeup(rpipe); 411 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 412 /* 413 * If there is no more to read in the pipe, reset 414 * it's pointers to the beginning. This improves 415 * cache hit stats. 416 */ 417 if ((error == 0) && (error = pipelock(rpipe,1)) == 0) { 418 if (rpipe->pipe_buffer.cnt == 0) { 419 #if 0 420 pipe_mark_pages_clean(rpipe); 421 #endif 422 rpipe->pipe_buffer.in = 0; 423 rpipe->pipe_buffer.out = 0; 424 } 425 pipeunlock(rpipe); 426 } 427 428 /* 429 * If the "write-side" has been blocked, wake it up now. 430 */ 431 if (rpipe->pipe_state & PIPE_WANTW) { 432 rpipe->pipe_state &= ~PIPE_WANTW; 433 wakeup(rpipe); 434 } 435 } 436 if (rpipe->pipe_state & PIPE_SEL) { 437 rpipe->pipe_state &= ~PIPE_SEL; 438 selwakeup(&rpipe->pipe_sel); 439 } 440 return error; 441 } 442 443 /* 444 * Map the sending processes' buffer into kernel space and wire it. 445 * This is similar to a physical write operation. 446 */ 447 static int 448 pipe_build_write_buffer(wpipe, uio) 449 struct pipe *wpipe; 450 struct uio *uio; 451 { 452 int size; 453 int i; 454 vm_offset_t addr, endaddr, paddr; 455 456 size = uio->uio_iov->iov_len; 457 if (size > wpipe->pipe_buffer.size) 458 size = wpipe->pipe_buffer.size; 459 460 endaddr = round_page(uio->uio_iov->iov_base + size); 461 for(i = 0, addr = trunc_page(uio->uio_iov->iov_base); 462 addr < endaddr; 463 addr += PAGE_SIZE, i+=1) { 464 465 vm_page_t m; 466 467 vm_fault_quick( addr, VM_PROT_READ); 468 paddr = pmap_kextract(addr); 469 if (!paddr) { 470 int j; 471 for(j=0;j<i;j++) 472 vm_page_unwire(wpipe->pipe_map.ms[j]); 473 return EFAULT; 474 } 475 476 m = PHYS_TO_VM_PAGE(paddr); 477 vm_page_wire(m); 478 wpipe->pipe_map.ms[i] = m; 479 } 480 481 /* 482 * set up the control block 483 */ 484 wpipe->pipe_map.npages = i; 485 wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK; 486 wpipe->pipe_map.cnt = size; 487 488 /* 489 * and map the buffer 490 */ 491 if (wpipe->pipe_map.kva == 0) { 492 wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map, 493 wpipe->pipe_buffer.size); 494 amountpipekva += wpipe->pipe_buffer.size; 495 } 496 pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms, 497 wpipe->pipe_map.npages); 498 499 /* 500 * and update the uio data 501 */ 502 503 uio->uio_iov->iov_len -= size; 504 uio->uio_iov->iov_base += size; 505 if (uio->uio_iov->iov_len == 0) 506 uio->uio_iov++; 507 uio->uio_resid -= size; 508 uio->uio_offset += size; 509 return 0; 510 } 511 512 /* 513 * unmap and unwire the process buffer 514 */ 515 static void 516 pipe_destroy_write_buffer(wpipe) 517 struct pipe *wpipe; 518 { 519 int i; 520 pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages); 521 522 if (wpipe->pipe_map.kva) { 523 if (amountpipekva > MAXPIPEKVA) { 524 vm_offset_t kva = wpipe->pipe_map.kva; 525 wpipe->pipe_map.kva = 0; 526 kmem_free(kernel_map, kva, 527 wpipe->pipe_buffer.size); 528 amountpipekva -= wpipe->pipe_buffer.size; 529 } 530 } 531 for (i=0;i<wpipe->pipe_map.npages;i++) 532 vm_page_unwire(wpipe->pipe_map.ms[i]); 533 } 534 535 /* 536 * In the case of a signal, the writing process might go away. This 537 * code copies the data into the circular buffer so that the source 538 * pages can be freed without loss of data. 539 */ 540 static void 541 pipe_clone_write_buffer(wpipe) 542 struct pipe *wpipe; 543 { 544 int size; 545 int pos; 546 547 size = wpipe->pipe_map.cnt; 548 pos = wpipe->pipe_map.pos; 549 bcopy((caddr_t) wpipe->pipe_map.kva+pos, 550 (caddr_t) wpipe->pipe_buffer.buffer, 551 size); 552 553 wpipe->pipe_buffer.in = size; 554 wpipe->pipe_buffer.out = 0; 555 wpipe->pipe_buffer.cnt = size; 556 wpipe->pipe_state &= ~PIPE_DIRECTW; 557 558 pipe_destroy_write_buffer(wpipe); 559 } 560 561 /* 562 * This implements the pipe buffer write mechanism. Note that only 563 * a direct write OR a normal pipe write can be pending at any given time. 564 * If there are any characters in the pipe buffer, the direct write will 565 * be deferred until the receiving process grabs all of the bytes from 566 * the pipe buffer. Then the direct mapping write is set-up. 567 */ 568 static int 569 pipe_direct_write(wpipe, uio) 570 struct pipe *wpipe; 571 struct uio *uio; 572 { 573 int error; 574 while (wpipe->pipe_state & PIPE_DIRECTW) { 575 error = tsleep(wpipe, 576 PRIBIO|PCATCH, "pipdww", 0); 577 if (error || (wpipe->pipe_state & PIPE_EOF)) 578 goto error1; 579 } 580 wpipe->pipe_map.cnt = 0; /* transfer not ready yet */ 581 wpipe->pipe_state |= PIPE_DIRECTW; 582 while (wpipe->pipe_buffer.cnt > 0) { 583 error = tsleep(wpipe, 584 PRIBIO|PCATCH, "pipdwc", 0); 585 if (error || (wpipe->pipe_state & PIPE_EOF)) { 586 wpipe->pipe_state &= ~PIPE_DIRECTW; 587 if (error == 0) 588 error = EPIPE; 589 goto error1; 590 } 591 } 592 593 error = pipe_build_write_buffer(wpipe, uio); 594 if (error) { 595 wpipe->pipe_state &= ~PIPE_DIRECTW; 596 goto error1; 597 } 598 599 if (wpipe->pipe_state & PIPE_WANTR) { 600 wpipe->pipe_state &= ~PIPE_WANTR; 601 wakeup(wpipe); 602 } 603 604 error = 0; 605 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) { 606 if (wpipe->pipe_state & PIPE_EOF) { 607 pipelock(wpipe, 0); 608 pipe_destroy_write_buffer(wpipe); 609 pipeunlock(wpipe); 610 wakeup(wpipe); 611 return EPIPE; 612 } 613 error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0); 614 } 615 616 pipelock(wpipe,0); 617 if (wpipe->pipe_state & PIPE_DIRECTW) { 618 /* 619 * this bit of trickery substitutes a kernel buffer for 620 * the process that might be going away. 621 */ 622 pipe_clone_write_buffer(wpipe); 623 } else { 624 pipe_destroy_write_buffer(wpipe); 625 } 626 pipeunlock(wpipe); 627 return error; 628 629 error1: 630 wakeup(wpipe); 631 return error; 632 } 633 634 static __inline int 635 pipewrite(wpipe, uio, nbio) 636 struct pipe *wpipe; 637 struct uio *uio; 638 int nbio; 639 { 640 int error = 0; 641 642 /* 643 * detect loss of pipe read side, issue SIGPIPE if lost. 644 */ 645 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) { 646 return EPIPE; 647 } 648 649 if( wpipe->pipe_buffer.buffer == NULL) { 650 if ((error = pipelock(wpipe,1)) == 0) { 651 pipespace(wpipe); 652 pipeunlock(wpipe); 653 } else { 654 return error; 655 } 656 } 657 658 ++wpipe->pipe_busy; 659 while (uio->uio_resid) { 660 int space; 661 /* 662 * If the transfer is large, we can gain performance if 663 * we do process-to-process copies directly. 664 */ 665 if ((amountpipekva < LIMITPIPEKVA) && 666 (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) { 667 error = pipe_direct_write( wpipe, uio); 668 if (error) { 669 break; 670 } 671 continue; 672 } 673 674 /* 675 * Pipe buffered writes cannot be coincidental with 676 * direct writes. We wait until the currently executing 677 * direct write is completed before we start filling the 678 * pipe buffer. 679 */ 680 retrywrite: 681 while (wpipe->pipe_state & PIPE_DIRECTW) { 682 error = tsleep(wpipe, 683 PRIBIO|PCATCH, "pipbww", 0); 684 if (error) 685 break; 686 } 687 688 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 689 690 /* 691 * We must afford contiguous writes on buffers of size 692 * PIPE_BUF or less. 693 */ 694 if ((space > 0) && 695 ((uio->uio_resid > PIPE_BUF) || (uio->uio_resid <= space))) { 696 int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in; 697 if (size > space) 698 size = space; 699 if (size > uio->uio_resid) 700 size = uio->uio_resid; 701 if ((error = pipelock(wpipe,1)) == 0) { 702 /* 703 * It is possible for a direct write to 704 * slip in on us... handle it here... 705 */ 706 if (wpipe->pipe_state & PIPE_DIRECTW) { 707 pipeunlock(wpipe); 708 goto retrywrite; 709 } 710 error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 711 size, uio); 712 pipeunlock(wpipe); 713 } 714 if (error) 715 break; 716 717 wpipe->pipe_buffer.in += size; 718 if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size) 719 wpipe->pipe_buffer.in = 0; 720 721 wpipe->pipe_buffer.cnt += size; 722 wpipe->pipe_mtime = time; 723 } else { 724 /* 725 * If the "read-side" has been blocked, wake it up now. 726 */ 727 if (wpipe->pipe_state & PIPE_WANTR) { 728 wpipe->pipe_state &= ~PIPE_WANTR; 729 wakeup(wpipe); 730 } 731 /* 732 * don't block on non-blocking I/O 733 */ 734 if (nbio) { 735 error = EAGAIN; 736 break; 737 } 738 739 wpipe->pipe_state |= PIPE_WANTW; 740 if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) { 741 break; 742 } 743 /* 744 * If read side wants to go away, we just issue a signal 745 * to ourselves. 746 */ 747 if (wpipe->pipe_state & PIPE_EOF) { 748 error = EPIPE; 749 break; 750 } 751 } 752 } 753 754 if ((wpipe->pipe_busy == 0) && 755 (wpipe->pipe_state & PIPE_WANT)) { 756 wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR); 757 wakeup(wpipe); 758 } else if (wpipe->pipe_buffer.cnt > 0) { 759 /* 760 * If we have put any characters in the buffer, we wake up 761 * the reader. 762 */ 763 if (wpipe->pipe_state & PIPE_WANTR) { 764 wpipe->pipe_state &= ~PIPE_WANTR; 765 wakeup(wpipe); 766 } 767 } 768 if ((wpipe->pipe_buffer.cnt == 0) && 769 (uio->uio_resid == 0) && 770 (error == EPIPE)) 771 error = 0; 772 773 if (wpipe->pipe_state & PIPE_SEL) { 774 wpipe->pipe_state &= ~PIPE_SEL; 775 selwakeup(&wpipe->pipe_sel); 776 } 777 778 --wpipe->pipe_busy; 779 return error; 780 } 781 782 /* ARGSUSED */ 783 static int 784 pipe_write(fp, uio, cred) 785 struct file *fp; 786 struct uio *uio; 787 struct ucred *cred; 788 { 789 struct pipe *rpipe = (struct pipe *) fp->f_data; 790 struct pipe *wpipe = rpipe->pipe_peer; 791 return pipewrite(wpipe, uio, (rpipe->pipe_state & PIPE_NBIO)?1:0); 792 } 793 794 /* 795 * we implement a very minimal set of ioctls for compatibility with sockets. 796 */ 797 int 798 pipe_ioctl(fp, cmd, data, p) 799 struct file *fp; 800 int cmd; 801 register caddr_t data; 802 struct proc *p; 803 { 804 register struct pipe *mpipe = (struct pipe *)fp->f_data; 805 806 switch (cmd) { 807 808 case FIONBIO: 809 if (*(int *)data) 810 mpipe->pipe_state |= PIPE_NBIO; 811 else 812 mpipe->pipe_state &= ~PIPE_NBIO; 813 return (0); 814 815 case FIOASYNC: 816 if (*(int *)data) { 817 mpipe->pipe_state |= PIPE_ASYNC; 818 } else { 819 mpipe->pipe_state &= ~PIPE_ASYNC; 820 } 821 return (0); 822 823 case FIONREAD: 824 *(int *)data = mpipe->pipe_buffer.cnt; 825 return (0); 826 827 case SIOCSPGRP: 828 mpipe->pipe_pgid = *(int *)data; 829 return (0); 830 831 case SIOCGPGRP: 832 *(int *)data = mpipe->pipe_pgid; 833 return (0); 834 835 } 836 return ENOSYS; 837 } 838 839 int 840 pipe_select(fp, which, p) 841 struct file *fp; 842 int which; 843 struct proc *p; 844 { 845 register struct pipe *rpipe = (struct pipe *)fp->f_data; 846 struct pipe *wpipe; 847 register int s = splnet(); 848 849 wpipe = rpipe->pipe_peer; 850 switch (which) { 851 852 case FREAD: 853 if (rpipe->pipe_buffer.cnt > 0 || 854 (rpipe->pipe_state & PIPE_EOF)) { 855 splx(s); 856 return (1); 857 } 858 selrecord(p, &rpipe->pipe_sel); 859 rpipe->pipe_state |= PIPE_SEL; 860 break; 861 862 case FWRITE: 863 if ((wpipe == NULL) || 864 (wpipe->pipe_state & PIPE_EOF) || 865 ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) { 866 splx(s); 867 return (1); 868 } 869 selrecord(p, &wpipe->pipe_sel); 870 wpipe->pipe_state |= PIPE_SEL; 871 break; 872 873 case 0: 874 if ((rpipe->pipe_state & PIPE_EOF) || 875 (wpipe == NULL) || 876 (wpipe->pipe_state & PIPE_EOF)) { 877 splx(s); 878 return (1); 879 } 880 881 selrecord(p, &rpipe->pipe_sel); 882 rpipe->pipe_state |= PIPE_SEL; 883 break; 884 } 885 splx(s); 886 return (0); 887 } 888 889 int 890 pipe_stat(pipe, ub) 891 register struct pipe *pipe; 892 register struct stat *ub; 893 { 894 bzero((caddr_t)ub, sizeof (*ub)); 895 ub->st_mode = S_IFSOCK; 896 ub->st_blksize = pipe->pipe_buffer.size; 897 ub->st_size = pipe->pipe_buffer.cnt; 898 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 899 TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec); 900 TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec); 901 TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec); 902 return 0; 903 } 904 905 /* ARGSUSED */ 906 static int 907 pipe_close(fp, p) 908 struct file *fp; 909 struct proc *p; 910 { 911 int error = 0; 912 struct pipe *cpipe = (struct pipe *)fp->f_data; 913 pipeclose(cpipe); 914 fp->f_data = NULL; 915 return 0; 916 } 917 918 /* 919 * shutdown the pipe 920 */ 921 static void 922 pipeclose(cpipe) 923 struct pipe *cpipe; 924 { 925 struct pipe *ppipe; 926 if (cpipe) { 927 928 if (cpipe->pipe_state & PIPE_SEL) { 929 cpipe->pipe_state &= ~PIPE_SEL; 930 selwakeup(&cpipe->pipe_sel); 931 } 932 933 /* 934 * If the other side is blocked, wake it up saying that 935 * we want to close it down. 936 */ 937 while (cpipe->pipe_busy) { 938 wakeup(cpipe); 939 cpipe->pipe_state |= PIPE_WANT|PIPE_EOF; 940 tsleep(cpipe, PRIBIO, "pipecl", 0); 941 } 942 943 /* 944 * Disconnect from peer 945 */ 946 if (ppipe = cpipe->pipe_peer) { 947 if (ppipe->pipe_state & PIPE_SEL) { 948 ppipe->pipe_state &= ~PIPE_SEL; 949 selwakeup(&ppipe->pipe_sel); 950 } 951 952 ppipe->pipe_state |= PIPE_EOF; 953 wakeup(ppipe); 954 ppipe->pipe_peer = NULL; 955 } 956 957 /* 958 * free resources 959 */ 960 if (cpipe->pipe_buffer.buffer) { 961 amountpipekva -= cpipe->pipe_buffer.size; 962 kmem_free(kernel_map, 963 (vm_offset_t)cpipe->pipe_buffer.buffer, 964 cpipe->pipe_buffer.size); 965 } 966 if (cpipe->pipe_map.kva) { 967 amountpipekva -= cpipe->pipe_buffer.size; 968 kmem_free(kernel_map, 969 cpipe->pipe_map.kva, 970 cpipe->pipe_buffer.size); 971 } 972 free(cpipe, M_TEMP); 973 } 974 } 975 #endif 976