xref: /freebsd/sys/kern/sys_pipe.c (revision 2834ceec7cb6584490e4b230601ef15103e00ba8)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. This work was done expressly for inclusion into FreeBSD.  Other use
17  *    is allowed if this notation is included.
18  * 5. Modifications may be freely made to this file if the above conditions
19  *    are met.
20  *
21  * $Id: sys_pipe.c,v 1.2 1996/01/29 02:57:33 dyson Exp $
22  */
23 
24 #ifndef OLD_PIPE
25 
26 /*
27  * This file contains a high-performance replacement for the socket-based
28  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
29  * all features of sockets, but does do everything that pipes normally
30  * do.
31  */
32 
33 /*
34  * This code has two modes of operation, a small write mode and a large
35  * write mode.  The small write mode acts like conventional pipes with
36  * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
37  * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
38  * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
39  * the receiving process can copy it directly from the pages in the sending
40  * process.
41  *
42  * If the sending process receives a signal, it is possible that it will
43  * go away, and certainly it's address space can change, because control
44  * is returned back to the user-mode side.  In that case, the pipe code
45  * arranges to copy the buffer supplied by the user process, to a pageable
46  * kernel buffer, and the receiving process will grab the data from the
47  * pageable kernel buffer.  Since signals don't happen all that often,
48  * the copy operation is normally eliminated.
49  *
50  * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
51  * happen for small transfers so that the system will not spend all of
52  * it's time context switching.  PIPE_SIZE is constrained by the
53  * amount of kernel virtual memory.
54  */
55 
56 #include <sys/param.h>
57 #include <sys/systm.h>
58 #include <sys/proc.h>
59 #include <sys/file.h>
60 #include <sys/protosw.h>
61 #include <sys/stat.h>
62 #include <sys/filedesc.h>
63 #include <sys/malloc.h>
64 #include <sys/ioctl.h>
65 #include <sys/stat.h>
66 #include <sys/select.h>
67 #include <sys/signalvar.h>
68 #include <sys/errno.h>
69 #include <sys/queue.h>
70 #include <sys/vmmeter.h>
71 #include <sys/kernel.h>
72 #include <sys/sysproto.h>
73 #include <sys/pipe.h>
74 
75 #include <vm/vm.h>
76 #include <vm/vm_prot.h>
77 #include <vm/vm_param.h>
78 #include <vm/lock.h>
79 #include <vm/vm_object.h>
80 #include <vm/vm_kern.h>
81 #include <vm/vm_extern.h>
82 #include <vm/pmap.h>
83 #include <vm/vm_map.h>
84 #include <vm/vm_page.h>
85 
86 static int pipe_read __P((struct file *fp, struct uio *uio,
87 		struct ucred *cred));
88 static int pipe_write __P((struct file *fp, struct uio *uio,
89 		struct ucred *cred));
90 static int pipe_close __P((struct file *fp, struct proc *p));
91 static int pipe_select __P((struct file *fp, int which, struct proc *p));
92 static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
93 
94 static struct fileops pipeops =
95     { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
96 
97 /*
98  * Default pipe buffer size(s), this can be kind-of large now because pipe
99  * space is pageable.  The pipe code will try to maintain locality of
100  * reference for performance reasons, so small amounts of outstanding I/O
101  * will not wipe the cache.
102  */
103 #define MINPIPESIZE (PIPE_SIZE/3)
104 #define MAXPIPESIZE (2*PIPE_SIZE/3)
105 
106 /*
107  * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
108  * is there so that on large systems, we don't exhaust it.
109  */
110 #define MAXPIPEKVA (8*1024*1024)
111 
112 /*
113  * Limit for direct transfers, we cannot, of course limit
114  * the amount of kva for pipes in general though.
115  */
116 #define LIMITPIPEKVA (16*1024*1024)
117 int amountpipekva;
118 
119 static void pipeclose __P((struct pipe *cpipe));
120 static void pipebufferinit __P((struct pipe *cpipe));
121 static void pipeinit __P((struct pipe *cpipe));
122 static __inline int pipelock __P((struct pipe *cpipe, int catch));
123 static __inline void pipeunlock __P((struct pipe *cpipe));
124 static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
125 static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
126 static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
127 static void pipe_clone_write_buffer __P((struct pipe *wpipe));
128 static void pipe_mark_pages_clean __P((struct pipe *cpipe));
129 static int pipewrite __P((struct pipe *wpipe, struct uio *uio, int nbio));
130 static void pipespace __P((struct pipe *cpipe));
131 
132 /*
133  * The pipe system call for the DTYPE_PIPE type of pipes
134  */
135 
136 /* ARGSUSED */
137 int
138 pipe(p, uap, retval)
139 	struct proc *p;
140 	struct pipe_args /* {
141 		int	dummy;
142 	} */ *uap;
143 	int retval[];
144 {
145 	register struct filedesc *fdp = p->p_fd;
146 	struct file *rf, *wf;
147 	struct pipe *rpipe, *wpipe;
148 	int fd, error;
149 
150 	rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
151 	pipeinit(rpipe);
152 	rpipe->pipe_state |= PIPE_DIRECTOK;
153 	wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
154 	pipeinit(wpipe);
155 	wpipe->pipe_state |= PIPE_DIRECTOK;
156 
157 	error = falloc(p, &rf, &fd);
158 	if (error)
159 		goto free2;
160 	retval[0] = fd;
161 	rf->f_flag = FREAD | FWRITE;
162 	rf->f_type = DTYPE_PIPE;
163 	rf->f_ops = &pipeops;
164 	rf->f_data = (caddr_t)rpipe;
165 	error = falloc(p, &wf, &fd);
166 	if (error)
167 		goto free3;
168 	wf->f_flag = FREAD | FWRITE;
169 	wf->f_type = DTYPE_PIPE;
170 	wf->f_ops = &pipeops;
171 	wf->f_data = (caddr_t)wpipe;
172 	retval[1] = fd;
173 
174 	rpipe->pipe_peer = wpipe;
175 	wpipe->pipe_peer = rpipe;
176 
177 	return (0);
178 free3:
179 	ffree(rf);
180 	fdp->fd_ofiles[retval[0]] = 0;
181 free2:
182 	(void)pipeclose(wpipe);
183 free1:
184 	(void)pipeclose(rpipe);
185 	return (error);
186 }
187 
188 static void
189 pipespace(cpipe)
190 	struct pipe *cpipe;
191 {
192 	int npages, error;
193 
194 	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
195 	/*
196 	 * Create an object, I don't like the idea of paging to/from
197 	 * kernel_object.
198 	 */
199 	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
200 	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
201 
202 	/*
203 	 * Insert the object into the kernel map, and allocate kva for it.
204 	 * The map entry is, by default, pageable.
205 	 */
206 	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
207 		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
208 		cpipe->pipe_buffer.size, 1,
209 		VM_PROT_ALL, VM_PROT_ALL, 0);
210 
211 	if (error != KERN_SUCCESS)
212 		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
213 	amountpipekva += cpipe->pipe_buffer.size;
214 }
215 
216 /*
217  * initialize and allocate VM and memory for pipe
218  */
219 static void
220 pipeinit(cpipe)
221 	struct pipe *cpipe;
222 {
223 
224 	cpipe->pipe_buffer.in = 0;
225 	cpipe->pipe_buffer.out = 0;
226 	cpipe->pipe_buffer.cnt = 0;
227 	cpipe->pipe_buffer.size = PIPE_SIZE;
228 	/* Buffer kva gets dynamically allocated */
229 	cpipe->pipe_buffer.buffer = NULL;
230 
231 	cpipe->pipe_state = 0;
232 	cpipe->pipe_peer = NULL;
233 	cpipe->pipe_busy = 0;
234 	cpipe->pipe_ctime = time;
235 	cpipe->pipe_atime = time;
236 	cpipe->pipe_mtime = time;
237 	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
238 
239 	/*
240 	 * pipe data structure initializations to support direct pipe I/O
241 	 */
242 	cpipe->pipe_map.cnt = 0;
243 	cpipe->pipe_map.kva = 0;
244 	cpipe->pipe_map.pos = 0;
245 	cpipe->pipe_map.npages = 0;
246 }
247 
248 
249 /*
250  * lock a pipe for I/O, blocking other access
251  */
252 static __inline int
253 pipelock(cpipe, catch)
254 	struct pipe *cpipe;
255 	int catch;
256 {
257 	int error;
258 	while (cpipe->pipe_state & PIPE_LOCK) {
259 		cpipe->pipe_state |= PIPE_LWANT;
260 		if (error = tsleep( &cpipe->pipe_state,
261 			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
262 			return error;
263 		}
264 	}
265 	cpipe->pipe_state |= PIPE_LOCK;
266 	return 0;
267 }
268 
269 /*
270  * unlock a pipe I/O lock
271  */
272 static __inline void
273 pipeunlock(cpipe)
274 	struct pipe *cpipe;
275 {
276 	cpipe->pipe_state &= ~PIPE_LOCK;
277 	if (cpipe->pipe_state & PIPE_LWANT) {
278 		cpipe->pipe_state &= ~PIPE_LWANT;
279 		wakeup(&cpipe->pipe_state);
280 	}
281 	return;
282 }
283 
284 #if 0
285 static void
286 pipe_mark_pages_clean(cpipe)
287 	struct pipe *cpipe;
288 {
289 	vm_size_t off;
290 	vm_page_t m;
291 
292 	for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) {
293 		m = vm_page_lookup(cpipe->pipe_buffer.object, off);
294 		if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) {
295 			m->dirty = 0;
296 			pmap_clear_modify(VM_PAGE_TO_PHYS(m));
297 		}
298 	}
299 }
300 #endif
301 
302 /* ARGSUSED */
303 static int
304 pipe_read(fp, uio, cred)
305 	struct file *fp;
306 	struct uio *uio;
307 	struct ucred *cred;
308 {
309 
310 	struct pipe *rpipe = (struct pipe *) fp->f_data;
311 	int error = 0;
312 	int nread = 0;
313 	int size;
314 
315 	++rpipe->pipe_busy;
316 	while (uio->uio_resid) {
317 		/*
318 		 * normal pipe buffer receive
319 		 */
320 		if (rpipe->pipe_buffer.cnt > 0) {
321 			int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
322 			if (size > rpipe->pipe_buffer.cnt)
323 				size = rpipe->pipe_buffer.cnt;
324 			if (size > uio->uio_resid)
325 				size = uio->uio_resid;
326 			if ((error = pipelock(rpipe,1)) == 0) {
327 				error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
328 					size, uio);
329 				pipeunlock(rpipe);
330 			}
331 			if (error) {
332 				break;
333 			}
334 			rpipe->pipe_buffer.out += size;
335 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
336 				rpipe->pipe_buffer.out = 0;
337 
338 			rpipe->pipe_buffer.cnt -= size;
339 			nread += size;
340 			rpipe->pipe_atime = time;
341 		/*
342 		 * Direct copy, bypassing a kernel buffer.
343 		 */
344 		} else if ((size = rpipe->pipe_map.cnt) &&
345 			(rpipe->pipe_state & PIPE_DIRECTW)) {
346 			caddr_t va;
347 			if (size > uio->uio_resid)
348 				size = uio->uio_resid;
349 			if ((error = pipelock(rpipe,1)) == 0) {
350 				va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
351 				error = uiomove(va, size, uio);
352 				pipeunlock(rpipe);
353 			}
354 			if (error)
355 				break;
356 			nread += size;
357 			rpipe->pipe_atime = time;
358 			rpipe->pipe_map.pos += size;
359 			rpipe->pipe_map.cnt -= size;
360 			if (rpipe->pipe_map.cnt == 0) {
361 				rpipe->pipe_state &= ~PIPE_DIRECTW;
362 				wakeup(rpipe);
363 			}
364 		} else {
365 			/*
366 			 * detect EOF condition
367 			 */
368 			if (rpipe->pipe_state & PIPE_EOF) {
369 				break;
370 			}
371 			/*
372 			 * If the "write-side" has been blocked, wake it up now.
373 			 */
374 			if (rpipe->pipe_state & PIPE_WANTW) {
375 				rpipe->pipe_state &= ~PIPE_WANTW;
376 				wakeup(rpipe);
377 			}
378 			if (nread > 0)
379 				break;
380 			if (rpipe->pipe_state & PIPE_NBIO) {
381 				error = EAGAIN;
382 				break;
383 			}
384 
385 			/*
386 			 * If there is no more to read in the pipe, reset
387 			 * it's pointers to the beginning.  This improves
388 			 * cache hit stats.
389 			 */
390 
391 			if ((error = pipelock(rpipe,1)) == 0) {
392 				if (rpipe->pipe_buffer.cnt == 0) {
393 					rpipe->pipe_buffer.in = 0;
394 					rpipe->pipe_buffer.out = 0;
395 				}
396 				pipeunlock(rpipe);
397 			} else {
398 				break;
399 			}
400 			rpipe->pipe_state |= PIPE_WANTR;
401 			if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
402 				break;
403 			}
404 		}
405 	}
406 
407 	--rpipe->pipe_busy;
408 	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
409 		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
410 		wakeup(rpipe);
411 	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
412 		/*
413 		 * If there is no more to read in the pipe, reset
414 		 * it's pointers to the beginning.  This improves
415 		 * cache hit stats.
416 		 */
417 		if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
418 			if (rpipe->pipe_buffer.cnt == 0) {
419 #if 0
420 				pipe_mark_pages_clean(rpipe);
421 #endif
422 				rpipe->pipe_buffer.in = 0;
423 				rpipe->pipe_buffer.out = 0;
424 			}
425 			pipeunlock(rpipe);
426 		}
427 
428 		/*
429 		 * If the "write-side" has been blocked, wake it up now.
430 		 */
431 		if (rpipe->pipe_state & PIPE_WANTW) {
432 			rpipe->pipe_state &= ~PIPE_WANTW;
433 			wakeup(rpipe);
434 		}
435 	}
436 	if (rpipe->pipe_state & PIPE_SEL) {
437 		rpipe->pipe_state &= ~PIPE_SEL;
438 		selwakeup(&rpipe->pipe_sel);
439 	}
440 	return error;
441 }
442 
443 /*
444  * Map the sending processes' buffer into kernel space and wire it.
445  * This is similar to a physical write operation.
446  */
447 static int
448 pipe_build_write_buffer(wpipe, uio)
449 	struct pipe *wpipe;
450 	struct uio *uio;
451 {
452 	int size;
453 	int i;
454 	vm_offset_t addr, endaddr, paddr;
455 
456 	size = uio->uio_iov->iov_len;
457 	if (size > wpipe->pipe_buffer.size)
458 		size = wpipe->pipe_buffer.size;
459 
460 	endaddr = round_page(uio->uio_iov->iov_base + size);
461 	for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
462 		addr < endaddr;
463 		addr += PAGE_SIZE, i+=1) {
464 
465 		vm_page_t m;
466 
467 		vm_fault_quick( addr, VM_PROT_READ);
468 		paddr = pmap_kextract(addr);
469 		if (!paddr) {
470 			int j;
471 			for(j=0;j<i;j++)
472 				vm_page_unwire(wpipe->pipe_map.ms[j]);
473 			return EFAULT;
474 		}
475 
476 		m = PHYS_TO_VM_PAGE(paddr);
477 		vm_page_wire(m);
478 		wpipe->pipe_map.ms[i] = m;
479 	}
480 
481 /*
482  * set up the control block
483  */
484 	wpipe->pipe_map.npages = i;
485 	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
486 	wpipe->pipe_map.cnt = size;
487 
488 /*
489  * and map the buffer
490  */
491 	if (wpipe->pipe_map.kva == 0) {
492 		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
493 			wpipe->pipe_buffer.size);
494 		amountpipekva += wpipe->pipe_buffer.size;
495 	}
496 	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
497 		wpipe->pipe_map.npages);
498 
499 /*
500  * and update the uio data
501  */
502 
503 	uio->uio_iov->iov_len -= size;
504 	uio->uio_iov->iov_base += size;
505 	if (uio->uio_iov->iov_len == 0)
506 		uio->uio_iov++;
507 	uio->uio_resid -= size;
508 	uio->uio_offset += size;
509 	return 0;
510 }
511 
512 /*
513  * unmap and unwire the process buffer
514  */
515 static void
516 pipe_destroy_write_buffer(wpipe)
517 struct pipe *wpipe;
518 {
519 	int i;
520 	pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
521 
522 	if (wpipe->pipe_map.kva) {
523 		if (amountpipekva > MAXPIPEKVA) {
524 			vm_offset_t kva = wpipe->pipe_map.kva;
525 			wpipe->pipe_map.kva = 0;
526 			kmem_free(kernel_map, kva,
527 				wpipe->pipe_buffer.size);
528 			amountpipekva -= wpipe->pipe_buffer.size;
529 		}
530 	}
531 	for (i=0;i<wpipe->pipe_map.npages;i++)
532 		vm_page_unwire(wpipe->pipe_map.ms[i]);
533 }
534 
535 /*
536  * In the case of a signal, the writing process might go away.  This
537  * code copies the data into the circular buffer so that the source
538  * pages can be freed without loss of data.
539  */
540 static void
541 pipe_clone_write_buffer(wpipe)
542 struct pipe *wpipe;
543 {
544 	int size;
545 	int pos;
546 
547 	size = wpipe->pipe_map.cnt;
548 	pos = wpipe->pipe_map.pos;
549 	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
550 			(caddr_t) wpipe->pipe_buffer.buffer,
551 			size);
552 
553 	wpipe->pipe_buffer.in = size;
554 	wpipe->pipe_buffer.out = 0;
555 	wpipe->pipe_buffer.cnt = size;
556 	wpipe->pipe_state &= ~PIPE_DIRECTW;
557 
558 	pipe_destroy_write_buffer(wpipe);
559 }
560 
561 /*
562  * This implements the pipe buffer write mechanism.  Note that only
563  * a direct write OR a normal pipe write can be pending at any given time.
564  * If there are any characters in the pipe buffer, the direct write will
565  * be deferred until the receiving process grabs all of the bytes from
566  * the pipe buffer.  Then the direct mapping write is set-up.
567  */
568 static int
569 pipe_direct_write(wpipe, uio)
570 	struct pipe *wpipe;
571 	struct uio *uio;
572 {
573 	int error;
574 	while (wpipe->pipe_state & PIPE_DIRECTW) {
575 		error = tsleep(wpipe,
576 				PRIBIO|PCATCH, "pipdww", 0);
577 		if (error || (wpipe->pipe_state & PIPE_EOF))
578 			goto error1;
579 	}
580 	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
581 	wpipe->pipe_state |= PIPE_DIRECTW;
582 	while (wpipe->pipe_buffer.cnt > 0) {
583 		error = tsleep(wpipe,
584 				PRIBIO|PCATCH, "pipdwc", 0);
585 		if (error || (wpipe->pipe_state & PIPE_EOF)) {
586 			wpipe->pipe_state &= ~PIPE_DIRECTW;
587 			if (error == 0)
588 				error = EPIPE;
589 			goto error1;
590 		}
591 	}
592 
593 	error = pipe_build_write_buffer(wpipe, uio);
594 	if (error) {
595 		wpipe->pipe_state &= ~PIPE_DIRECTW;
596 		goto error1;
597 	}
598 
599 	if (wpipe->pipe_state & PIPE_WANTR) {
600 		wpipe->pipe_state &= ~PIPE_WANTR;
601 		wakeup(wpipe);
602 	}
603 
604 	error = 0;
605 	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
606 		if (wpipe->pipe_state & PIPE_EOF) {
607 			pipelock(wpipe, 0);
608 			pipe_destroy_write_buffer(wpipe);
609 			pipeunlock(wpipe);
610 			wakeup(wpipe);
611 			return EPIPE;
612 		}
613 		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
614 	}
615 
616 	pipelock(wpipe,0);
617 	if (wpipe->pipe_state & PIPE_DIRECTW) {
618 		/*
619 		 * this bit of trickery substitutes a kernel buffer for
620 		 * the process that might be going away.
621 		 */
622 		pipe_clone_write_buffer(wpipe);
623 	} else {
624 		pipe_destroy_write_buffer(wpipe);
625 	}
626 	pipeunlock(wpipe);
627 	return error;
628 
629 error1:
630 	wakeup(wpipe);
631 	return error;
632 }
633 
634 static __inline int
635 pipewrite(wpipe, uio, nbio)
636 	struct pipe *wpipe;
637 	struct uio *uio;
638 	int nbio;
639 {
640 	int error = 0;
641 
642 	/*
643 	 * detect loss of pipe read side, issue SIGPIPE if lost.
644 	 */
645 	if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) {
646 		return EPIPE;
647 	}
648 
649 	if( wpipe->pipe_buffer.buffer == NULL) {
650 		if ((error = pipelock(wpipe,1)) == 0) {
651 			pipespace(wpipe);
652 			pipeunlock(wpipe);
653 		} else {
654 			return error;
655 		}
656 	}
657 
658 	++wpipe->pipe_busy;
659 	while (uio->uio_resid) {
660 		int space;
661 		/*
662 		 * If the transfer is large, we can gain performance if
663 		 * we do process-to-process copies directly.
664 		 */
665 		if ((amountpipekva < LIMITPIPEKVA) &&
666 			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
667 			error = pipe_direct_write( wpipe, uio);
668 			if (error) {
669 				break;
670 			}
671 			continue;
672 		}
673 
674 		/*
675 		 * Pipe buffered writes cannot be coincidental with
676 		 * direct writes.  We wait until the currently executing
677 		 * direct write is completed before we start filling the
678 		 * pipe buffer.
679 		 */
680 	retrywrite:
681 		while (wpipe->pipe_state & PIPE_DIRECTW) {
682 			error = tsleep(wpipe,
683 					PRIBIO|PCATCH, "pipbww", 0);
684 			if (error)
685 				break;
686 		}
687 
688 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
689 
690 		/*
691 		 * We must afford contiguous writes on buffers of size
692 		 * PIPE_BUF or less.
693 		 */
694 		if ((space > 0) &&
695 			((uio->uio_resid > PIPE_BUF) || (uio->uio_resid <= space))) {
696 			int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
697 			if (size > space)
698 				size = space;
699 			if (size > uio->uio_resid)
700 				size = uio->uio_resid;
701 			if ((error = pipelock(wpipe,1)) == 0) {
702 				/*
703 				 * It is possible for a direct write to
704 				 * slip in on us... handle it here...
705 				 */
706 				if (wpipe->pipe_state & PIPE_DIRECTW) {
707 					pipeunlock(wpipe);
708 					goto retrywrite;
709 				}
710 				error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
711 					size, uio);
712 				pipeunlock(wpipe);
713 			}
714 			if (error)
715 				break;
716 
717 			wpipe->pipe_buffer.in += size;
718 			if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
719 				wpipe->pipe_buffer.in = 0;
720 
721 			wpipe->pipe_buffer.cnt += size;
722 			wpipe->pipe_mtime = time;
723 		} else {
724 			/*
725 			 * If the "read-side" has been blocked, wake it up now.
726 			 */
727 			if (wpipe->pipe_state & PIPE_WANTR) {
728 				wpipe->pipe_state &= ~PIPE_WANTR;
729 				wakeup(wpipe);
730 			}
731 			/*
732 			 * don't block on non-blocking I/O
733 			 */
734 			if (nbio) {
735 				error = EAGAIN;
736 				break;
737 			}
738 
739 			wpipe->pipe_state |= PIPE_WANTW;
740 			if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
741 				break;
742 			}
743 			/*
744 			 * If read side wants to go away, we just issue a signal
745 			 * to ourselves.
746 			 */
747 			if (wpipe->pipe_state & PIPE_EOF) {
748 				error = EPIPE;
749 				break;
750 			}
751 		}
752 	}
753 
754 	if ((wpipe->pipe_busy == 0) &&
755 		(wpipe->pipe_state & PIPE_WANT)) {
756 		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
757 		wakeup(wpipe);
758 	} else if (wpipe->pipe_buffer.cnt > 0) {
759 		/*
760 		 * If we have put any characters in the buffer, we wake up
761 		 * the reader.
762 		 */
763 		if (wpipe->pipe_state & PIPE_WANTR) {
764 			wpipe->pipe_state &= ~PIPE_WANTR;
765 			wakeup(wpipe);
766 		}
767 	}
768 	if ((wpipe->pipe_buffer.cnt == 0) &&
769 		(uio->uio_resid == 0) &&
770 		(error == EPIPE))
771 		error = 0;
772 
773 	if (wpipe->pipe_state & PIPE_SEL) {
774 		wpipe->pipe_state &= ~PIPE_SEL;
775 		selwakeup(&wpipe->pipe_sel);
776 	}
777 
778 	--wpipe->pipe_busy;
779 	return error;
780 }
781 
782 /* ARGSUSED */
783 static int
784 pipe_write(fp, uio, cred)
785 	struct file *fp;
786 	struct uio *uio;
787 	struct ucred *cred;
788 {
789 	struct pipe *rpipe = (struct pipe *) fp->f_data;
790 	struct pipe *wpipe = rpipe->pipe_peer;
791 	return pipewrite(wpipe, uio, (rpipe->pipe_state & PIPE_NBIO)?1:0);
792 }
793 
794 /*
795  * we implement a very minimal set of ioctls for compatibility with sockets.
796  */
797 int
798 pipe_ioctl(fp, cmd, data, p)
799 	struct file *fp;
800 	int cmd;
801 	register caddr_t data;
802 	struct proc *p;
803 {
804 	register struct pipe *mpipe = (struct pipe *)fp->f_data;
805 
806 	switch (cmd) {
807 
808 	case FIONBIO:
809 		if (*(int *)data)
810 			mpipe->pipe_state |= PIPE_NBIO;
811 		else
812 			mpipe->pipe_state &= ~PIPE_NBIO;
813 		return (0);
814 
815 	case FIOASYNC:
816 		if (*(int *)data) {
817 			mpipe->pipe_state |= PIPE_ASYNC;
818 		} else {
819 			mpipe->pipe_state &= ~PIPE_ASYNC;
820 		}
821 		return (0);
822 
823 	case FIONREAD:
824 		*(int *)data = mpipe->pipe_buffer.cnt;
825 		return (0);
826 
827 	case SIOCSPGRP:
828 		mpipe->pipe_pgid = *(int *)data;
829 		return (0);
830 
831 	case SIOCGPGRP:
832 		*(int *)data = mpipe->pipe_pgid;
833 		return (0);
834 
835 	}
836 	return ENOSYS;
837 }
838 
839 int
840 pipe_select(fp, which, p)
841 	struct file *fp;
842 	int which;
843 	struct proc *p;
844 {
845 	register struct pipe *rpipe = (struct pipe *)fp->f_data;
846 	struct pipe *wpipe;
847 	register int s = splnet();
848 
849 	wpipe = rpipe->pipe_peer;
850 	switch (which) {
851 
852 	case FREAD:
853 		if (rpipe->pipe_buffer.cnt > 0 ||
854 			(rpipe->pipe_state & PIPE_EOF)) {
855 			splx(s);
856 			return (1);
857 		}
858 		selrecord(p, &rpipe->pipe_sel);
859 		rpipe->pipe_state |= PIPE_SEL;
860 		break;
861 
862 	case FWRITE:
863 		if ((wpipe == NULL) ||
864 			(wpipe->pipe_state & PIPE_EOF) ||
865 			((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
866 			splx(s);
867 			return (1);
868 		}
869 		selrecord(p, &wpipe->pipe_sel);
870 		wpipe->pipe_state |= PIPE_SEL;
871 		break;
872 
873 	case 0:
874 		if ((rpipe->pipe_state & PIPE_EOF) ||
875 			(wpipe == NULL) ||
876 			(wpipe->pipe_state & PIPE_EOF)) {
877 			splx(s);
878 			return (1);
879 		}
880 
881 		selrecord(p, &rpipe->pipe_sel);
882 		rpipe->pipe_state |= PIPE_SEL;
883 		break;
884 	}
885 	splx(s);
886 	return (0);
887 }
888 
889 int
890 pipe_stat(pipe, ub)
891 	register struct pipe *pipe;
892 	register struct stat *ub;
893 {
894 	bzero((caddr_t)ub, sizeof (*ub));
895 	ub->st_mode = S_IFSOCK;
896 	ub->st_blksize = pipe->pipe_buffer.size;
897 	ub->st_size = pipe->pipe_buffer.cnt;
898 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
899 	TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
900 	TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
901 	TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
902 	return 0;
903 }
904 
905 /* ARGSUSED */
906 static int
907 pipe_close(fp, p)
908 	struct file *fp;
909 	struct proc *p;
910 {
911 	int error = 0;
912 	struct pipe *cpipe = (struct pipe *)fp->f_data;
913 	pipeclose(cpipe);
914 	fp->f_data = NULL;
915 	return 0;
916 }
917 
918 /*
919  * shutdown the pipe
920  */
921 static void
922 pipeclose(cpipe)
923 	struct pipe *cpipe;
924 {
925 	struct pipe *ppipe;
926 	if (cpipe) {
927 
928 		if (cpipe->pipe_state & PIPE_SEL) {
929 			cpipe->pipe_state &= ~PIPE_SEL;
930 			selwakeup(&cpipe->pipe_sel);
931 		}
932 
933 		/*
934 		 * If the other side is blocked, wake it up saying that
935 		 * we want to close it down.
936 		 */
937 		while (cpipe->pipe_busy) {
938 			wakeup(cpipe);
939 			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
940 			tsleep(cpipe, PRIBIO, "pipecl", 0);
941 		}
942 
943 		/*
944 		 * Disconnect from peer
945 		 */
946 		if (ppipe = cpipe->pipe_peer) {
947 			if (ppipe->pipe_state & PIPE_SEL) {
948 				ppipe->pipe_state &= ~PIPE_SEL;
949 				selwakeup(&ppipe->pipe_sel);
950 			}
951 
952 			ppipe->pipe_state |= PIPE_EOF;
953 			wakeup(ppipe);
954 			ppipe->pipe_peer = NULL;
955 		}
956 
957 		/*
958 		 * free resources
959 		 */
960 		if (cpipe->pipe_buffer.buffer) {
961 			amountpipekva -= cpipe->pipe_buffer.size;
962 			kmem_free(kernel_map,
963 				(vm_offset_t)cpipe->pipe_buffer.buffer,
964 				cpipe->pipe_buffer.size);
965 		}
966 		if (cpipe->pipe_map.kva) {
967 			amountpipekva -= cpipe->pipe_buffer.size;
968 			kmem_free(kernel_map,
969 				cpipe->pipe_map.kva,
970 				cpipe->pipe_buffer.size);
971 		}
972 		free(cpipe, M_TEMP);
973 	}
974 }
975 #endif
976