xref: /freebsd/sys/kern/sys_pipe.c (revision ef5d438ed4bc17ad7ece3e40fe4d1f9baf3aadf7)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $Id: sys_pipe.c,v 1.12 1996/02/17 14:47:16 peter Exp $
20  */
21 
22 #ifndef OLD_PIPE
23 
24 /*
25  * This file contains a high-performance replacement for the socket-based
26  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27  * all features of sockets, but does do everything that pipes normally
28  * do.
29  */
30 
31 /*
32  * This code has two modes of operation, a small write mode and a large
33  * write mode.  The small write mode acts like conventional pipes with
34  * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
35  * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
36  * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
37  * the receiving process can copy it directly from the pages in the sending
38  * process.
39  *
40  * If the sending process receives a signal, it is possible that it will
41  * go away, and certainly its address space can change, because control
42  * is returned back to the user-mode side.  In that case, the pipe code
43  * arranges to copy the buffer supplied by the user process, to a pageable
44  * kernel buffer, and the receiving process will grab the data from the
45  * pageable kernel buffer.  Since signals don't happen all that often,
46  * the copy operation is normally eliminated.
47  *
48  * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
49  * happen for small transfers so that the system will not spend all of
50  * its time context switching.  PIPE_SIZE is constrained by the
51  * amount of kernel virtual memory.
52  */
53 
54 #include <sys/param.h>
55 #include <sys/systm.h>
56 #include <sys/proc.h>
57 #include <sys/file.h>
58 #include <sys/protosw.h>
59 #include <sys/stat.h>
60 #include <sys/filedesc.h>
61 #include <sys/malloc.h>
62 #include <sys/ioctl.h>
63 #include <sys/stat.h>
64 #include <sys/select.h>
65 #include <sys/signalvar.h>
66 #include <sys/errno.h>
67 #include <sys/queue.h>
68 #include <sys/vmmeter.h>
69 #include <sys/kernel.h>
70 #include <sys/sysproto.h>
71 #include <sys/pipe.h>
72 
73 #include <vm/vm.h>
74 #include <vm/vm_prot.h>
75 #include <vm/vm_param.h>
76 #include <vm/lock.h>
77 #include <vm/vm_object.h>
78 #include <vm/vm_kern.h>
79 #include <vm/vm_extern.h>
80 #include <vm/pmap.h>
81 #include <vm/vm_map.h>
82 #include <vm/vm_page.h>
83 
84 /*
85  * Use this define if you want to disable *fancy* VM things.  Expect an
86  * approx 30% decrease in transfer rate.  This could be useful for
87  * NetBSD or OpenBSD.
88  */
89 /* #define PIPE_NODIRECT */
90 
91 /*
92  * interfaces to the outside world
93  */
94 static int pipe_read __P((struct file *fp, struct uio *uio,
95 		struct ucred *cred));
96 static int pipe_write __P((struct file *fp, struct uio *uio,
97 		struct ucred *cred));
98 static int pipe_close __P((struct file *fp, struct proc *p));
99 static int pipe_select __P((struct file *fp, int which, struct proc *p));
100 static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
101 
102 static struct fileops pipeops =
103     { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
104 
105 /*
106  * Default pipe buffer size(s), this can be kind-of large now because pipe
107  * space is pageable.  The pipe code will try to maintain locality of
108  * reference for performance reasons, so small amounts of outstanding I/O
109  * will not wipe the cache.
110  */
111 #define MINPIPESIZE (PIPE_SIZE/3)
112 #define MAXPIPESIZE (2*PIPE_SIZE/3)
113 
114 /*
115  * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
116  * is there so that on large systems, we don't exhaust it.
117  */
118 #define MAXPIPEKVA (8*1024*1024)
119 
120 /*
121  * Limit for direct transfers, we cannot, of course limit
122  * the amount of kva for pipes in general though.
123  */
124 #define LIMITPIPEKVA (16*1024*1024)
125 int amountpipekva;
126 
127 static void pipeclose __P((struct pipe *cpipe));
128 static void pipebufferinit __P((struct pipe *cpipe));
129 static void pipeinit __P((struct pipe *cpipe));
130 static __inline int pipelock __P((struct pipe *cpipe, int catch));
131 static __inline void pipeunlock __P((struct pipe *cpipe));
132 static __inline void pipeselwakeup __P((struct pipe *cpipe));
133 #ifndef PIPE_NODIRECT
134 static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
135 static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
136 static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
137 static void pipe_clone_write_buffer __P((struct pipe *wpipe));
138 static void pipe_mark_pages_clean __P((struct pipe *cpipe));
139 #endif
140 static int pipewrite __P((struct pipe *wpipe, struct uio *uio, int nbio));
141 static void pipespace __P((struct pipe *cpipe));
142 
143 /*
144  * The pipe system call for the DTYPE_PIPE type of pipes
145  */
146 
147 /* ARGSUSED */
148 int
149 pipe(p, uap, retval)
150 	struct proc *p;
151 	struct pipe_args /* {
152 		int	dummy;
153 	} */ *uap;
154 	int retval[];
155 {
156 	register struct filedesc *fdp = p->p_fd;
157 	struct file *rf, *wf;
158 	struct pipe *rpipe, *wpipe;
159 	int fd, error;
160 
161 	rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
162 	pipeinit(rpipe);
163 	rpipe->pipe_state |= PIPE_DIRECTOK;
164 	wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
165 	pipeinit(wpipe);
166 	wpipe->pipe_state |= PIPE_DIRECTOK;
167 
168 	error = falloc(p, &rf, &fd);
169 	if (error)
170 		goto free2;
171 	retval[0] = fd;
172 	rf->f_flag = FREAD | FWRITE;
173 	rf->f_type = DTYPE_PIPE;
174 	rf->f_ops = &pipeops;
175 	rf->f_data = (caddr_t)rpipe;
176 	error = falloc(p, &wf, &fd);
177 	if (error)
178 		goto free3;
179 	wf->f_flag = FREAD | FWRITE;
180 	wf->f_type = DTYPE_PIPE;
181 	wf->f_ops = &pipeops;
182 	wf->f_data = (caddr_t)wpipe;
183 	retval[1] = fd;
184 
185 	rpipe->pipe_peer = wpipe;
186 	wpipe->pipe_peer = rpipe;
187 
188 	return (0);
189 free3:
190 	ffree(rf);
191 	fdp->fd_ofiles[retval[0]] = 0;
192 free2:
193 	(void)pipeclose(wpipe);
194 free1:
195 	(void)pipeclose(rpipe);
196 	return (error);
197 }
198 
199 /*
200  * Allocate kva for pipe circular buffer, the space is pageable
201  */
202 static void
203 pipespace(cpipe)
204 	struct pipe *cpipe;
205 {
206 	int npages, error;
207 
208 	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
209 	/*
210 	 * Create an object, I don't like the idea of paging to/from
211 	 * kernel_object.
212 	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
213 	 */
214 	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
215 	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
216 
217 	/*
218 	 * Insert the object into the kernel map, and allocate kva for it.
219 	 * The map entry is, by default, pageable.
220 	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
221 	 */
222 	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
223 		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
224 		cpipe->pipe_buffer.size, 1,
225 		VM_PROT_ALL, VM_PROT_ALL, 0);
226 
227 	if (error != KERN_SUCCESS)
228 		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
229 	amountpipekva += cpipe->pipe_buffer.size;
230 }
231 
232 /*
233  * initialize and allocate VM and memory for pipe
234  */
235 static void
236 pipeinit(cpipe)
237 	struct pipe *cpipe;
238 {
239 	int s;
240 
241 	cpipe->pipe_buffer.in = 0;
242 	cpipe->pipe_buffer.out = 0;
243 	cpipe->pipe_buffer.cnt = 0;
244 	cpipe->pipe_buffer.size = PIPE_SIZE;
245 	/* Buffer kva gets dynamically allocated */
246 	cpipe->pipe_buffer.buffer = NULL;
247 
248 	cpipe->pipe_state = 0;
249 	cpipe->pipe_peer = NULL;
250 	cpipe->pipe_busy = 0;
251 	s = splhigh();
252 	cpipe->pipe_ctime = time;
253 	cpipe->pipe_atime = time;
254 	cpipe->pipe_mtime = time;
255 	splx(s);
256 	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
257 
258 #ifndef PIPE_NODIRECT
259 	/*
260 	 * pipe data structure initializations to support direct pipe I/O
261 	 */
262 	cpipe->pipe_map.cnt = 0;
263 	cpipe->pipe_map.kva = 0;
264 	cpipe->pipe_map.pos = 0;
265 	cpipe->pipe_map.npages = 0;
266 #endif
267 }
268 
269 
270 /*
271  * lock a pipe for I/O, blocking other access
272  */
273 static __inline int
274 pipelock(cpipe, catch)
275 	struct pipe *cpipe;
276 	int catch;
277 {
278 	int error;
279 	while (cpipe->pipe_state & PIPE_LOCK) {
280 		cpipe->pipe_state |= PIPE_LWANT;
281 		if (error = tsleep( cpipe,
282 			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
283 			return error;
284 		}
285 	}
286 	cpipe->pipe_state |= PIPE_LOCK;
287 	return 0;
288 }
289 
290 /*
291  * unlock a pipe I/O lock
292  */
293 static __inline void
294 pipeunlock(cpipe)
295 	struct pipe *cpipe;
296 {
297 	cpipe->pipe_state &= ~PIPE_LOCK;
298 	if (cpipe->pipe_state & PIPE_LWANT) {
299 		cpipe->pipe_state &= ~PIPE_LWANT;
300 		wakeup(cpipe);
301 	}
302 	return;
303 }
304 
305 static __inline void
306 pipeselwakeup(cpipe)
307 	struct pipe *cpipe;
308 {
309 	if (cpipe->pipe_state & PIPE_SEL) {
310 		cpipe->pipe_state &= ~PIPE_SEL;
311 		selwakeup(&cpipe->pipe_sel);
312 	}
313 }
314 
315 #ifndef PIPE_NODIRECT
316 #if 0
317 static void
318 pipe_mark_pages_clean(cpipe)
319 	struct pipe *cpipe;
320 {
321 	vm_size_t off;
322 	vm_page_t m;
323 
324 	for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) {
325 		m = vm_page_lookup(cpipe->pipe_buffer.object, off);
326 		if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) {
327 			m->dirty = 0;
328 			pmap_clear_modify(VM_PAGE_TO_PHYS(m));
329 		}
330 	}
331 }
332 #endif
333 #endif
334 
335 /* ARGSUSED */
336 static int
337 pipe_read(fp, uio, cred)
338 	struct file *fp;
339 	struct uio *uio;
340 	struct ucred *cred;
341 {
342 
343 	struct pipe *rpipe = (struct pipe *) fp->f_data;
344 	int error = 0;
345 	int nread = 0;
346 	int size;
347 
348 	++rpipe->pipe_busy;
349 	while (uio->uio_resid) {
350 		/*
351 		 * normal pipe buffer receive
352 		 */
353 		if (rpipe->pipe_buffer.cnt > 0) {
354 			int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
355 			if (size > rpipe->pipe_buffer.cnt)
356 				size = rpipe->pipe_buffer.cnt;
357 			if (size > uio->uio_resid)
358 				size = uio->uio_resid;
359 			if ((error = pipelock(rpipe,1)) == 0) {
360 				error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
361 					size, uio);
362 				pipeunlock(rpipe);
363 			}
364 			if (error) {
365 				break;
366 			}
367 			rpipe->pipe_buffer.out += size;
368 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
369 				rpipe->pipe_buffer.out = 0;
370 
371 			rpipe->pipe_buffer.cnt -= size;
372 			nread += size;
373 #ifndef PIPE_NODIRECT
374 		/*
375 		 * Direct copy, bypassing a kernel buffer.
376 		 */
377 		} else if ((size = rpipe->pipe_map.cnt) &&
378 			(rpipe->pipe_state & PIPE_DIRECTW)) {
379 			caddr_t va;
380 			if (size > uio->uio_resid)
381 				size = uio->uio_resid;
382 			if ((error = pipelock(rpipe,1)) == 0) {
383 				va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
384 				error = uiomove(va, size, uio);
385 				pipeunlock(rpipe);
386 			}
387 			if (error)
388 				break;
389 			nread += size;
390 			rpipe->pipe_map.pos += size;
391 			rpipe->pipe_map.cnt -= size;
392 			if (rpipe->pipe_map.cnt == 0) {
393 				rpipe->pipe_state &= ~PIPE_DIRECTW;
394 				wakeup(rpipe);
395 			}
396 #endif
397 		} else {
398 			/*
399 			 * detect EOF condition
400 			 */
401 			if (rpipe->pipe_state & PIPE_EOF) {
402 				break;
403 			}
404 			/*
405 			 * If the "write-side" has been blocked, wake it up now.
406 			 */
407 			if (rpipe->pipe_state & PIPE_WANTW) {
408 				rpipe->pipe_state &= ~PIPE_WANTW;
409 				wakeup(rpipe);
410 			}
411 			if (nread > 0)
412 				break;
413 			if (rpipe->pipe_state & PIPE_NBIO) {
414 				error = EAGAIN;
415 				break;
416 			}
417 
418 			/*
419 			 * If there is no more to read in the pipe, reset
420 			 * its pointers to the beginning.  This improves
421 			 * cache hit stats.
422 			 */
423 
424 			if ((error = pipelock(rpipe,1)) == 0) {
425 				if (rpipe->pipe_buffer.cnt == 0) {
426 					rpipe->pipe_buffer.in = 0;
427 					rpipe->pipe_buffer.out = 0;
428 				}
429 				pipeunlock(rpipe);
430 			} else {
431 				break;
432 			}
433 
434 			if (rpipe->pipe_state & PIPE_WANTW) {
435 				rpipe->pipe_state &= ~PIPE_WANTW;
436 				wakeup(rpipe);
437 			}
438 
439 			rpipe->pipe_state |= PIPE_WANTR;
440 			if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
441 				break;
442 			}
443 		}
444 	}
445 
446 	if (error == 0) {
447 		int s = splhigh();
448 		rpipe->pipe_atime = time;
449 		splx(s);
450 	}
451 
452 	--rpipe->pipe_busy;
453 	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
454 		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
455 		wakeup(rpipe);
456 	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
457 		/*
458 		 * If there is no more to read in the pipe, reset
459 		 * its pointers to the beginning.  This improves
460 		 * cache hit stats.
461 		 */
462 		if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
463 			if (rpipe->pipe_buffer.cnt == 0) {
464 #if 0
465 				pipe_mark_pages_clean(rpipe);
466 #endif
467 				rpipe->pipe_buffer.in = 0;
468 				rpipe->pipe_buffer.out = 0;
469 			}
470 			pipeunlock(rpipe);
471 		}
472 
473 		/*
474 		 * If the "write-side" has been blocked, wake it up now.
475 		 */
476 		if (rpipe->pipe_state & PIPE_WANTW) {
477 			rpipe->pipe_state &= ~PIPE_WANTW;
478 			wakeup(rpipe);
479 		}
480 	}
481 
482 	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) > PIPE_BUF)
483 		pipeselwakeup(rpipe);
484 
485 	return error;
486 }
487 
488 #ifndef PIPE_NODIRECT
489 /*
490  * Map the sending processes' buffer into kernel space and wire it.
491  * This is similar to a physical write operation.
492  */
493 static int
494 pipe_build_write_buffer(wpipe, uio)
495 	struct pipe *wpipe;
496 	struct uio *uio;
497 {
498 	int size;
499 	int i;
500 	vm_offset_t addr, endaddr, paddr;
501 
502 	size = uio->uio_iov->iov_len;
503 	if (size > wpipe->pipe_buffer.size)
504 		size = wpipe->pipe_buffer.size;
505 
506 	endaddr = round_page(uio->uio_iov->iov_base + size);
507 	for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
508 		addr < endaddr;
509 		addr += PAGE_SIZE, i+=1) {
510 
511 		vm_page_t m;
512 
513 		vm_fault_quick( (caddr_t) addr, VM_PROT_READ);
514 		paddr = pmap_kextract(addr);
515 		if (!paddr) {
516 			int j;
517 			for(j=0;j<i;j++)
518 				vm_page_unwire(wpipe->pipe_map.ms[j]);
519 			return EFAULT;
520 		}
521 
522 		m = PHYS_TO_VM_PAGE(paddr);
523 		vm_page_wire(m);
524 		wpipe->pipe_map.ms[i] = m;
525 	}
526 
527 /*
528  * set up the control block
529  */
530 	wpipe->pipe_map.npages = i;
531 	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
532 	wpipe->pipe_map.cnt = size;
533 
534 /*
535  * and map the buffer
536  */
537 	if (wpipe->pipe_map.kva == 0) {
538 		/*
539 		 * We need to allocate space for an extra page because the
540 		 * address range might (will) span pages at times.
541 		 */
542 		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
543 			wpipe->pipe_buffer.size + PAGE_SIZE);
544 		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
545 	}
546 	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
547 		wpipe->pipe_map.npages);
548 
549 /*
550  * and update the uio data
551  */
552 
553 	uio->uio_iov->iov_len -= size;
554 	uio->uio_iov->iov_base += size;
555 	if (uio->uio_iov->iov_len == 0)
556 		uio->uio_iov++;
557 	uio->uio_resid -= size;
558 	uio->uio_offset += size;
559 	return 0;
560 }
561 
562 /*
563  * unmap and unwire the process buffer
564  */
565 static void
566 pipe_destroy_write_buffer(wpipe)
567 struct pipe *wpipe;
568 {
569 	int i;
570 	pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
571 
572 	if (wpipe->pipe_map.kva) {
573 		if (amountpipekva > MAXPIPEKVA) {
574 			vm_offset_t kva = wpipe->pipe_map.kva;
575 			wpipe->pipe_map.kva = 0;
576 			kmem_free(kernel_map, kva,
577 				wpipe->pipe_buffer.size + PAGE_SIZE);
578 			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
579 		}
580 	}
581 	for (i=0;i<wpipe->pipe_map.npages;i++)
582 		vm_page_unwire(wpipe->pipe_map.ms[i]);
583 }
584 
585 /*
586  * In the case of a signal, the writing process might go away.  This
587  * code copies the data into the circular buffer so that the source
588  * pages can be freed without loss of data.
589  */
590 static void
591 pipe_clone_write_buffer(wpipe)
592 struct pipe *wpipe;
593 {
594 	int size;
595 	int pos;
596 
597 	size = wpipe->pipe_map.cnt;
598 	pos = wpipe->pipe_map.pos;
599 	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
600 			(caddr_t) wpipe->pipe_buffer.buffer,
601 			size);
602 
603 	wpipe->pipe_buffer.in = size;
604 	wpipe->pipe_buffer.out = 0;
605 	wpipe->pipe_buffer.cnt = size;
606 	wpipe->pipe_state &= ~PIPE_DIRECTW;
607 
608 	pipe_destroy_write_buffer(wpipe);
609 }
610 
611 /*
612  * This implements the pipe buffer write mechanism.  Note that only
613  * a direct write OR a normal pipe write can be pending at any given time.
614  * If there are any characters in the pipe buffer, the direct write will
615  * be deferred until the receiving process grabs all of the bytes from
616  * the pipe buffer.  Then the direct mapping write is set-up.
617  */
618 static int
619 pipe_direct_write(wpipe, uio)
620 	struct pipe *wpipe;
621 	struct uio *uio;
622 {
623 	int error;
624 retry:
625 	while (wpipe->pipe_state & PIPE_DIRECTW) {
626 		if ( wpipe->pipe_state & PIPE_WANTR) {
627 			wpipe->pipe_state &= ~PIPE_WANTR;
628 			wakeup(wpipe);
629 		}
630 		wpipe->pipe_state |= PIPE_WANTW;
631 		error = tsleep(wpipe,
632 				PRIBIO|PCATCH, "pipdww", 0);
633 		if (error || (wpipe->pipe_state & PIPE_EOF))
634 			goto error1;
635 	}
636 	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
637 	if (wpipe->pipe_buffer.cnt > 0) {
638 		if ( wpipe->pipe_state & PIPE_WANTR) {
639 			wpipe->pipe_state &= ~PIPE_WANTR;
640 			wakeup(wpipe);
641 		}
642 
643 		wpipe->pipe_state |= PIPE_WANTW;
644 		error = tsleep(wpipe,
645 				PRIBIO|PCATCH, "pipdwc", 0);
646 		if (error || (wpipe->pipe_state & PIPE_EOF)) {
647 			if (error == 0)
648 				error = EPIPE;
649 			goto error1;
650 		}
651 		goto retry;
652 	}
653 
654 	wpipe->pipe_state |= PIPE_DIRECTW;
655 
656 	error = pipe_build_write_buffer(wpipe, uio);
657 	if (error) {
658 		wpipe->pipe_state &= ~PIPE_DIRECTW;
659 		goto error1;
660 	}
661 
662 	error = 0;
663 	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
664 		if (wpipe->pipe_state & PIPE_EOF) {
665 			pipelock(wpipe, 0);
666 			pipe_destroy_write_buffer(wpipe);
667 			pipeunlock(wpipe);
668 			pipeselwakeup(wpipe);
669 			wakeup(wpipe);
670 			return EPIPE;
671 		}
672 		if (wpipe->pipe_state & PIPE_WANTR) {
673 			wpipe->pipe_state &= ~PIPE_WANTR;
674 			wakeup(wpipe);
675 		}
676 		pipeselwakeup(wpipe);
677 		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
678 	}
679 
680 	pipelock(wpipe,0);
681 	if (wpipe->pipe_state & PIPE_DIRECTW) {
682 		/*
683 		 * this bit of trickery substitutes a kernel buffer for
684 		 * the process that might be going away.
685 		 */
686 		pipe_clone_write_buffer(wpipe);
687 	} else {
688 		pipe_destroy_write_buffer(wpipe);
689 	}
690 	pipeunlock(wpipe);
691 	return error;
692 
693 error1:
694 	wakeup(wpipe);
695 	return error;
696 }
697 #endif
698 
699 static __inline int
700 pipewrite(wpipe, uio, nbio)
701 	struct pipe *wpipe;
702 	struct uio *uio;
703 	int nbio;
704 {
705 	int error = 0;
706 	int orig_resid;
707 
708 	/*
709 	 * detect loss of pipe read side, issue SIGPIPE if lost.
710 	 */
711 	if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) {
712 		return EPIPE;
713 	}
714 
715 	if( wpipe->pipe_buffer.buffer == NULL) {
716 		if ((error = pipelock(wpipe,1)) == 0) {
717 			pipespace(wpipe);
718 			pipeunlock(wpipe);
719 		} else {
720 			return error;
721 		}
722 	}
723 
724 	++wpipe->pipe_busy;
725 	orig_resid = uio->uio_resid;
726 	while (uio->uio_resid) {
727 		int space;
728 #ifndef PIPE_NODIRECT
729 		/*
730 		 * If the transfer is large, we can gain performance if
731 		 * we do process-to-process copies directly.
732 		 */
733 		if ((amountpipekva < LIMITPIPEKVA) &&
734 			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
735 			error = pipe_direct_write( wpipe, uio);
736 			if (error) {
737 				break;
738 			}
739 			continue;
740 		}
741 #endif
742 
743 		/*
744 		 * Pipe buffered writes cannot be coincidental with
745 		 * direct writes.  We wait until the currently executing
746 		 * direct write is completed before we start filling the
747 		 * pipe buffer.
748 		 */
749 	retrywrite:
750 		while (wpipe->pipe_state & PIPE_DIRECTW) {
751 			if (wpipe->pipe_state & PIPE_WANTR) {
752 				wpipe->pipe_state &= ~PIPE_WANTR;
753 				wakeup(wpipe);
754 			}
755 			error = tsleep(wpipe,
756 					PRIBIO|PCATCH, "pipbww", 0);
757 			if (error)
758 				break;
759 		}
760 
761 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
762 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
763 			space = 0;
764 
765 		/*
766 		 * We must afford contiguous writes on buffers of size
767 		 * PIPE_BUF or less.
768 		 */
769 		if (space > 0) {
770 			int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
771 			if (size > space)
772 				size = space;
773 			if (size > uio->uio_resid)
774 				size = uio->uio_resid;
775 			if ((error = pipelock(wpipe,1)) == 0) {
776 				/*
777 				 * It is possible for a direct write to
778 				 * slip in on us... handle it here...
779 				 */
780 				if (wpipe->pipe_state & PIPE_DIRECTW) {
781 					pipeunlock(wpipe);
782 					goto retrywrite;
783 				}
784 				error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
785 					size, uio);
786 				pipeunlock(wpipe);
787 			}
788 			if (error)
789 				break;
790 
791 			wpipe->pipe_buffer.in += size;
792 			if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
793 				wpipe->pipe_buffer.in = 0;
794 
795 			wpipe->pipe_buffer.cnt += size;
796 		} else {
797 			/*
798 			 * If the "read-side" has been blocked, wake it up now.
799 			 */
800 			if (wpipe->pipe_state & PIPE_WANTR) {
801 				wpipe->pipe_state &= ~PIPE_WANTR;
802 				wakeup(wpipe);
803 			}
804 
805 			/*
806 			 * don't block on non-blocking I/O
807 			 */
808 			if (nbio) {
809 				error = EAGAIN;
810 				break;
811 			}
812 
813 			/*
814 			 * We have no more space and have something to offer,
815 			 * wake up selects.
816 			 */
817 			pipeselwakeup(wpipe);
818 
819 			wpipe->pipe_state |= PIPE_WANTW;
820 			if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
821 				break;
822 			}
823 			/*
824 			 * If read side wants to go away, we just issue a signal
825 			 * to ourselves.
826 			 */
827 			if (wpipe->pipe_state & PIPE_EOF) {
828 				error = EPIPE;
829 				break;
830 			}
831 		}
832 	}
833 
834 	if ((wpipe->pipe_busy == 0) &&
835 		(wpipe->pipe_state & PIPE_WANT)) {
836 		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
837 		wakeup(wpipe);
838 	} else if (wpipe->pipe_buffer.cnt > 0) {
839 		/*
840 		 * If we have put any characters in the buffer, we wake up
841 		 * the reader.
842 		 */
843 		if (wpipe->pipe_state & PIPE_WANTR) {
844 			wpipe->pipe_state &= ~PIPE_WANTR;
845 			wakeup(wpipe);
846 		}
847 	}
848 
849 	/*
850 	 * Don't return EPIPE if I/O was successful
851 	 */
852 	if ((wpipe->pipe_buffer.cnt == 0) &&
853 		(uio->uio_resid == 0) &&
854 		(error == EPIPE))
855 		error = 0;
856 
857 	if (error = 0) {
858 		int s = splhigh();
859 		wpipe->pipe_mtime = time;
860 		splx(s);
861 	}
862 	/*
863 	 * We have something to offer,
864 	 * wake up select.
865 	 */
866 	if (wpipe->pipe_buffer.cnt)
867 		pipeselwakeup(wpipe);
868 
869 	--wpipe->pipe_busy;
870 	return error;
871 }
872 
873 /* ARGSUSED */
874 static int
875 pipe_write(fp, uio, cred)
876 	struct file *fp;
877 	struct uio *uio;
878 	struct ucred *cred;
879 {
880 	struct pipe *rpipe = (struct pipe *) fp->f_data;
881 	struct pipe *wpipe = rpipe->pipe_peer;
882 	return pipewrite(wpipe, uio, (rpipe->pipe_state & PIPE_NBIO)?1:0);
883 }
884 
885 /*
886  * we implement a very minimal set of ioctls for compatibility with sockets.
887  */
888 int
889 pipe_ioctl(fp, cmd, data, p)
890 	struct file *fp;
891 	int cmd;
892 	register caddr_t data;
893 	struct proc *p;
894 {
895 	register struct pipe *mpipe = (struct pipe *)fp->f_data;
896 
897 	switch (cmd) {
898 
899 	case FIONBIO:
900 		if (*(int *)data)
901 			mpipe->pipe_state |= PIPE_NBIO;
902 		else
903 			mpipe->pipe_state &= ~PIPE_NBIO;
904 		return (0);
905 
906 	case FIOASYNC:
907 		if (*(int *)data) {
908 			mpipe->pipe_state |= PIPE_ASYNC;
909 		} else {
910 			mpipe->pipe_state &= ~PIPE_ASYNC;
911 		}
912 		return (0);
913 
914 	case FIONREAD:
915 		if (mpipe->pipe_state & PIPE_DIRECTW)
916 			*(int *)data = mpipe->pipe_map.cnt;
917 		else
918 			*(int *)data = mpipe->pipe_buffer.cnt;
919 		return (0);
920 
921 	case SIOCSPGRP:
922 		mpipe->pipe_pgid = *(int *)data;
923 		return (0);
924 
925 	case SIOCGPGRP:
926 		*(int *)data = mpipe->pipe_pgid;
927 		return (0);
928 
929 	}
930 	return ENOSYS;
931 }
932 
933 int
934 pipe_select(fp, which, p)
935 	struct file *fp;
936 	int which;
937 	struct proc *p;
938 {
939 	register struct pipe *rpipe = (struct pipe *)fp->f_data;
940 	struct pipe *wpipe;
941 
942 	wpipe = rpipe->pipe_peer;
943 	switch (which) {
944 
945 	case FREAD:
946 		if ( (rpipe->pipe_state & PIPE_DIRECTW) ||
947 			(rpipe->pipe_buffer.cnt > 0) ||
948 			(rpipe->pipe_state & PIPE_EOF)) {
949 			return (1);
950 		}
951 		selrecord(p, &rpipe->pipe_sel);
952 		rpipe->pipe_state |= PIPE_SEL;
953 		break;
954 
955 	case FWRITE:
956 		if ((wpipe == NULL) ||
957 			(wpipe->pipe_state & PIPE_EOF) ||
958 			(((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
959 			 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
960 			return (1);
961 		}
962 		selrecord(p, &wpipe->pipe_sel);
963 		wpipe->pipe_state |= PIPE_SEL;
964 		break;
965 
966 	case 0:
967 		if ((rpipe->pipe_state & PIPE_EOF) ||
968 			(wpipe == NULL) ||
969 			(wpipe->pipe_state & PIPE_EOF)) {
970 			return (1);
971 		}
972 
973 		selrecord(p, &rpipe->pipe_sel);
974 		rpipe->pipe_state |= PIPE_SEL;
975 		break;
976 	}
977 	return (0);
978 }
979 
980 int
981 pipe_stat(pipe, ub)
982 	register struct pipe *pipe;
983 	register struct stat *ub;
984 {
985 	bzero((caddr_t)ub, sizeof (*ub));
986 	ub->st_mode = S_IFSOCK;
987 	ub->st_blksize = pipe->pipe_buffer.size;
988 	ub->st_size = pipe->pipe_buffer.cnt;
989 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
990 	TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
991 	TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
992 	TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
993 	return 0;
994 }
995 
996 /* ARGSUSED */
997 static int
998 pipe_close(fp, p)
999 	struct file *fp;
1000 	struct proc *p;
1001 {
1002 	int error = 0;
1003 	struct pipe *cpipe = (struct pipe *)fp->f_data;
1004 	pipeclose(cpipe);
1005 	fp->f_data = NULL;
1006 	return 0;
1007 }
1008 
1009 /*
1010  * shutdown the pipe
1011  */
1012 static void
1013 pipeclose(cpipe)
1014 	struct pipe *cpipe;
1015 {
1016 	struct pipe *ppipe;
1017 	if (cpipe) {
1018 
1019 		pipeselwakeup(cpipe);
1020 
1021 		/*
1022 		 * If the other side is blocked, wake it up saying that
1023 		 * we want to close it down.
1024 		 */
1025 		while (cpipe->pipe_busy) {
1026 			wakeup(cpipe);
1027 			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
1028 			tsleep(cpipe, PRIBIO, "pipecl", 0);
1029 		}
1030 
1031 		/*
1032 		 * Disconnect from peer
1033 		 */
1034 		if (ppipe = cpipe->pipe_peer) {
1035 			pipeselwakeup(ppipe);
1036 
1037 			ppipe->pipe_state |= PIPE_EOF;
1038 			wakeup(ppipe);
1039 			ppipe->pipe_peer = NULL;
1040 		}
1041 
1042 		/*
1043 		 * free resources
1044 		 */
1045 		if (cpipe->pipe_buffer.buffer) {
1046 			amountpipekva -= cpipe->pipe_buffer.size;
1047 			kmem_free(kernel_map,
1048 				(vm_offset_t)cpipe->pipe_buffer.buffer,
1049 				cpipe->pipe_buffer.size);
1050 		}
1051 #ifndef PIPE_NODIRECT
1052 		if (cpipe->pipe_map.kva) {
1053 			amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1054 			kmem_free(kernel_map,
1055 				cpipe->pipe_map.kva,
1056 				cpipe->pipe_buffer.size + PAGE_SIZE);
1057 		}
1058 #endif
1059 		free(cpipe, M_TEMP);
1060 	}
1061 }
1062 #endif
1063