xref: /freebsd/sys/kern/sys_pipe.c (revision ee41f1b1cf5e3d4f586cb85b46123b416275862c)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD$
20  */
21 
22 /*
23  * This file contains a high-performance replacement for the socket-based
24  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25  * all features of sockets, but does do everything that pipes normally
26  * do.
27  */
28 
29 /*
30  * This code has two modes of operation, a small write mode and a large
31  * write mode.  The small write mode acts like conventional pipes with
32  * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
33  * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
34  * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35  * the receiving process can copy it directly from the pages in the sending
36  * process.
37  *
38  * If the sending process receives a signal, it is possible that it will
39  * go away, and certainly its address space can change, because control
40  * is returned back to the user-mode side.  In that case, the pipe code
41  * arranges to copy the buffer supplied by the user process, to a pageable
42  * kernel buffer, and the receiving process will grab the data from the
43  * pageable kernel buffer.  Since signals don't happen all that often,
44  * the copy operation is normally eliminated.
45  *
46  * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47  * happen for small transfers so that the system will not spend all of
48  * its time context switching.  PIPE_SIZE is constrained by the
49  * amount of kernel virtual memory.
50  */
51 
52 #include <sys/param.h>
53 #include <sys/systm.h>
54 #include <sys/proc.h>
55 #include <sys/fcntl.h>
56 #include <sys/file.h>
57 #include <sys/filedesc.h>
58 #include <sys/filio.h>
59 #include <sys/ttycom.h>
60 #include <sys/stat.h>
61 #include <sys/poll.h>
62 #include <sys/selinfo.h>
63 #include <sys/signalvar.h>
64 #include <sys/sysproto.h>
65 #include <sys/pipe.h>
66 #include <sys/vnode.h>
67 #include <sys/uio.h>
68 #include <sys/event.h>
69 
70 #include <vm/vm.h>
71 #include <vm/vm_param.h>
72 #include <sys/lock.h>
73 #include <vm/vm_object.h>
74 #include <vm/vm_kern.h>
75 #include <vm/vm_extern.h>
76 #include <vm/pmap.h>
77 #include <vm/vm_map.h>
78 #include <vm/vm_page.h>
79 #include <vm/vm_zone.h>
80 
81 /*
82  * Use this define if you want to disable *fancy* VM things.  Expect an
83  * approx 30% decrease in transfer rate.  This could be useful for
84  * NetBSD or OpenBSD.
85  */
86 /* #define PIPE_NODIRECT */
87 
88 /*
89  * interfaces to the outside world
90  */
91 static int pipe_read __P((struct file *fp, struct uio *uio,
92 		struct ucred *cred, int flags, struct proc *p));
93 static int pipe_write __P((struct file *fp, struct uio *uio,
94 		struct ucred *cred, int flags, struct proc *p));
95 static int pipe_close __P((struct file *fp, struct proc *p));
96 static int pipe_poll __P((struct file *fp, int events, struct ucred *cred,
97 		struct proc *p));
98 static int pipe_kqfilter __P((struct file *fp, struct knote *kn));
99 static int pipe_stat __P((struct file *fp, struct stat *sb, struct proc *p));
100 static int pipe_ioctl __P((struct file *fp, u_long cmd, caddr_t data, struct proc *p));
101 
102 static struct fileops pipeops = {
103 	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
104 	pipe_stat, pipe_close
105 };
106 
107 static void	filt_pipedetach(struct knote *kn);
108 static int	filt_piperead(struct knote *kn, long hint);
109 static int	filt_pipewrite(struct knote *kn, long hint);
110 
111 static struct filterops pipe_rfiltops =
112 	{ 1, NULL, filt_pipedetach, filt_piperead };
113 static struct filterops pipe_wfiltops =
114 	{ 1, NULL, filt_pipedetach, filt_pipewrite };
115 
116 
117 /*
118  * Default pipe buffer size(s), this can be kind-of large now because pipe
119  * space is pageable.  The pipe code will try to maintain locality of
120  * reference for performance reasons, so small amounts of outstanding I/O
121  * will not wipe the cache.
122  */
123 #define MINPIPESIZE (PIPE_SIZE/3)
124 #define MAXPIPESIZE (2*PIPE_SIZE/3)
125 
126 /*
127  * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
128  * is there so that on large systems, we don't exhaust it.
129  */
130 #define MAXPIPEKVA (8*1024*1024)
131 
132 /*
133  * Limit for direct transfers, we cannot, of course limit
134  * the amount of kva for pipes in general though.
135  */
136 #define LIMITPIPEKVA (16*1024*1024)
137 
138 /*
139  * Limit the number of "big" pipes
140  */
141 #define LIMITBIGPIPES	32
142 static int nbigpipe;
143 
144 static int amountpipekva;
145 
146 static void pipeclose __P((struct pipe *cpipe));
147 static void pipeinit __P((struct pipe *cpipe));
148 static __inline int pipelock __P((struct pipe *cpipe, int catch));
149 static __inline void pipeunlock __P((struct pipe *cpipe));
150 static __inline void pipeselwakeup __P((struct pipe *cpipe));
151 #ifndef PIPE_NODIRECT
152 static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
153 static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
154 static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
155 static void pipe_clone_write_buffer __P((struct pipe *wpipe));
156 #endif
157 static void pipespace __P((struct pipe *cpipe));
158 
159 static vm_zone_t pipe_zone;
160 
161 /*
162  * The pipe system call for the DTYPE_PIPE type of pipes
163  */
164 
165 /* ARGSUSED */
166 int
167 pipe(p, uap)
168 	struct proc *p;
169 	struct pipe_args /* {
170 		int	dummy;
171 	} */ *uap;
172 {
173 	register struct filedesc *fdp = p->p_fd;
174 	struct file *rf, *wf;
175 	struct pipe *rpipe, *wpipe;
176 	int fd, error;
177 
178 	if (pipe_zone == NULL)
179 		pipe_zone = zinit("PIPE", sizeof (struct pipe), 0, 0, 4);
180 
181 	rpipe = zalloc( pipe_zone);
182 	pipeinit(rpipe);
183 	rpipe->pipe_state |= PIPE_DIRECTOK;
184 	wpipe = zalloc( pipe_zone);
185 	pipeinit(wpipe);
186 	wpipe->pipe_state |= PIPE_DIRECTOK;
187 
188 	error = falloc(p, &rf, &fd);
189 	if (error) {
190 		pipeclose(rpipe);
191 		pipeclose(wpipe);
192 		return (error);
193 	}
194 	fhold(rf);
195 	p->p_retval[0] = fd;
196 
197 	/*
198 	 * Warning: once we've gotten past allocation of the fd for the
199 	 * read-side, we can only drop the read side via fdrop() in order
200 	 * to avoid races against processes which manage to dup() the read
201 	 * side while we are blocked trying to allocate the write side.
202 	 */
203 	rf->f_flag = FREAD | FWRITE;
204 	rf->f_type = DTYPE_PIPE;
205 	rf->f_data = (caddr_t)rpipe;
206 	rf->f_ops = &pipeops;
207 	error = falloc(p, &wf, &fd);
208 	if (error) {
209 		if (fdp->fd_ofiles[p->p_retval[0]] == rf) {
210 			fdp->fd_ofiles[p->p_retval[0]] = NULL;
211 			fdrop(rf, p);
212 		}
213 		fdrop(rf, p);
214 		/* rpipe has been closed by fdrop(). */
215 		pipeclose(wpipe);
216 		return (error);
217 	}
218 	wf->f_flag = FREAD | FWRITE;
219 	wf->f_type = DTYPE_PIPE;
220 	wf->f_data = (caddr_t)wpipe;
221 	wf->f_ops = &pipeops;
222 	p->p_retval[1] = fd;
223 
224 	rpipe->pipe_peer = wpipe;
225 	wpipe->pipe_peer = rpipe;
226 	fdrop(rf, p);
227 
228 	return (0);
229 }
230 
231 /*
232  * Allocate kva for pipe circular buffer, the space is pageable
233  */
234 static void
235 pipespace(cpipe)
236 	struct pipe *cpipe;
237 {
238 	int npages, error;
239 
240 	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
241 	/*
242 	 * Create an object, I don't like the idea of paging to/from
243 	 * kernel_object.
244 	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
245 	 */
246 	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
247 	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
248 
249 	/*
250 	 * Insert the object into the kernel map, and allocate kva for it.
251 	 * The map entry is, by default, pageable.
252 	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
253 	 */
254 	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
255 		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
256 		cpipe->pipe_buffer.size, 1,
257 		VM_PROT_ALL, VM_PROT_ALL, 0);
258 
259 	if (error != KERN_SUCCESS)
260 		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
261 	amountpipekva += cpipe->pipe_buffer.size;
262 }
263 
264 /*
265  * initialize and allocate VM and memory for pipe
266  */
267 static void
268 pipeinit(cpipe)
269 	struct pipe *cpipe;
270 {
271 
272 	cpipe->pipe_buffer.in = 0;
273 	cpipe->pipe_buffer.out = 0;
274 	cpipe->pipe_buffer.cnt = 0;
275 	cpipe->pipe_buffer.size = PIPE_SIZE;
276 
277 	/* Buffer kva gets dynamically allocated */
278 	cpipe->pipe_buffer.buffer = NULL;
279 	/* cpipe->pipe_buffer.object = invalid */
280 
281 	cpipe->pipe_state = 0;
282 	cpipe->pipe_peer = NULL;
283 	cpipe->pipe_busy = 0;
284 	vfs_timestamp(&cpipe->pipe_ctime);
285 	cpipe->pipe_atime = cpipe->pipe_ctime;
286 	cpipe->pipe_mtime = cpipe->pipe_ctime;
287 	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
288 
289 #ifndef PIPE_NODIRECT
290 	/*
291 	 * pipe data structure initializations to support direct pipe I/O
292 	 */
293 	cpipe->pipe_map.cnt = 0;
294 	cpipe->pipe_map.kva = 0;
295 	cpipe->pipe_map.pos = 0;
296 	cpipe->pipe_map.npages = 0;
297 	/* cpipe->pipe_map.ms[] = invalid */
298 #endif
299 }
300 
301 
302 /*
303  * lock a pipe for I/O, blocking other access
304  */
305 static __inline int
306 pipelock(cpipe, catch)
307 	struct pipe *cpipe;
308 	int catch;
309 {
310 	int error;
311 	while (cpipe->pipe_state & PIPE_LOCK) {
312 		cpipe->pipe_state |= PIPE_LWANT;
313 		if ((error = tsleep( cpipe,
314 			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) != 0) {
315 			return error;
316 		}
317 	}
318 	cpipe->pipe_state |= PIPE_LOCK;
319 	return 0;
320 }
321 
322 /*
323  * unlock a pipe I/O lock
324  */
325 static __inline void
326 pipeunlock(cpipe)
327 	struct pipe *cpipe;
328 {
329 	cpipe->pipe_state &= ~PIPE_LOCK;
330 	if (cpipe->pipe_state & PIPE_LWANT) {
331 		cpipe->pipe_state &= ~PIPE_LWANT;
332 		wakeup(cpipe);
333 	}
334 }
335 
336 static __inline void
337 pipeselwakeup(cpipe)
338 	struct pipe *cpipe;
339 {
340 	if (cpipe->pipe_state & PIPE_SEL) {
341 		cpipe->pipe_state &= ~PIPE_SEL;
342 		selwakeup(&cpipe->pipe_sel);
343 	}
344 	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
345 		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
346 	KNOTE(&cpipe->pipe_sel.si_note, 0);
347 }
348 
349 /* ARGSUSED */
350 static int
351 pipe_read(fp, uio, cred, flags, p)
352 	struct file *fp;
353 	struct uio *uio;
354 	struct ucred *cred;
355 	struct proc *p;
356 	int flags;
357 {
358 
359 	struct pipe *rpipe = (struct pipe *) fp->f_data;
360 	int error;
361 	int nread = 0;
362 	u_int size;
363 
364 	++rpipe->pipe_busy;
365 	error = pipelock(rpipe, 1);
366 	if (error)
367 		goto unlocked_error;
368 
369 	while (uio->uio_resid) {
370 		/*
371 		 * normal pipe buffer receive
372 		 */
373 		if (rpipe->pipe_buffer.cnt > 0) {
374 			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
375 			if (size > rpipe->pipe_buffer.cnt)
376 				size = rpipe->pipe_buffer.cnt;
377 			if (size > (u_int) uio->uio_resid)
378 				size = (u_int) uio->uio_resid;
379 
380 			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
381 					size, uio);
382 			if (error) {
383 				break;
384 			}
385 			rpipe->pipe_buffer.out += size;
386 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
387 				rpipe->pipe_buffer.out = 0;
388 
389 			rpipe->pipe_buffer.cnt -= size;
390 
391 			/*
392 			 * If there is no more to read in the pipe, reset
393 			 * its pointers to the beginning.  This improves
394 			 * cache hit stats.
395 			 */
396 			if (rpipe->pipe_buffer.cnt == 0) {
397 				rpipe->pipe_buffer.in = 0;
398 				rpipe->pipe_buffer.out = 0;
399 			}
400 			nread += size;
401 #ifndef PIPE_NODIRECT
402 		/*
403 		 * Direct copy, bypassing a kernel buffer.
404 		 */
405 		} else if ((size = rpipe->pipe_map.cnt) &&
406 			   (rpipe->pipe_state & PIPE_DIRECTW)) {
407 			caddr_t	va;
408 			if (size > (u_int) uio->uio_resid)
409 				size = (u_int) uio->uio_resid;
410 
411 			va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
412 			error = uiomove(va, size, uio);
413 			if (error)
414 				break;
415 			nread += size;
416 			rpipe->pipe_map.pos += size;
417 			rpipe->pipe_map.cnt -= size;
418 			if (rpipe->pipe_map.cnt == 0) {
419 				rpipe->pipe_state &= ~PIPE_DIRECTW;
420 				wakeup(rpipe);
421 			}
422 #endif
423 		} else {
424 			/*
425 			 * detect EOF condition
426 			 */
427 			if (rpipe->pipe_state & PIPE_EOF) {
428 				/* XXX error = ? */
429 				break;
430 			}
431 
432 			/*
433 			 * If the "write-side" has been blocked, wake it up now.
434 			 */
435 			if (rpipe->pipe_state & PIPE_WANTW) {
436 				rpipe->pipe_state &= ~PIPE_WANTW;
437 				wakeup(rpipe);
438 			}
439 
440 			/*
441 			 * Break if some data was read.
442 			 */
443 			if (nread > 0)
444 				break;
445 
446 			/*
447 			 * Unlock the pipe buffer for our remaining processing.  We
448 			 * will either break out with an error or we will sleep and
449 			 * relock to loop.
450 			 */
451 			pipeunlock(rpipe);
452 
453 			/*
454 			 * Handle non-blocking mode operation or
455 			 * wait for more data.
456 			 */
457 			if (fp->f_flag & FNONBLOCK)
458 				error = EAGAIN;
459 			else {
460 				rpipe->pipe_state |= PIPE_WANTR;
461 				if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0)
462 					error = pipelock(rpipe, 1);
463 			}
464 			if (error)
465 				goto unlocked_error;
466 		}
467 	}
468 	pipeunlock(rpipe);
469 
470 	if (error == 0)
471 		vfs_timestamp(&rpipe->pipe_atime);
472 unlocked_error:
473 	--rpipe->pipe_busy;
474 
475 	/*
476 	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
477 	 */
478 	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
479 		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
480 		wakeup(rpipe);
481 	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
482 		/*
483 		 * Handle write blocking hysteresis.
484 		 */
485 		if (rpipe->pipe_state & PIPE_WANTW) {
486 			rpipe->pipe_state &= ~PIPE_WANTW;
487 			wakeup(rpipe);
488 		}
489 	}
490 
491 	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
492 		pipeselwakeup(rpipe);
493 
494 	return error;
495 }
496 
497 #ifndef PIPE_NODIRECT
498 /*
499  * Map the sending processes' buffer into kernel space and wire it.
500  * This is similar to a physical write operation.
501  */
502 static int
503 pipe_build_write_buffer(wpipe, uio)
504 	struct pipe *wpipe;
505 	struct uio *uio;
506 {
507 	u_int size;
508 	int i;
509 	vm_offset_t addr, endaddr, paddr;
510 
511 	size = (u_int) uio->uio_iov->iov_len;
512 	if (size > wpipe->pipe_buffer.size)
513 		size = wpipe->pipe_buffer.size;
514 
515 	endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
516 	for(i = 0, addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
517 		addr < endaddr;
518 		addr += PAGE_SIZE, i+=1) {
519 
520 		vm_page_t m;
521 
522 		if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
523 		    (paddr = pmap_kextract(addr)) == 0) {
524 			int j;
525 			for(j=0;j<i;j++)
526 				vm_page_unwire(wpipe->pipe_map.ms[j], 1);
527 			return EFAULT;
528 		}
529 
530 		m = PHYS_TO_VM_PAGE(paddr);
531 		vm_page_wire(m);
532 		wpipe->pipe_map.ms[i] = m;
533 	}
534 
535 /*
536  * set up the control block
537  */
538 	wpipe->pipe_map.npages = i;
539 	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
540 	wpipe->pipe_map.cnt = size;
541 
542 /*
543  * and map the buffer
544  */
545 	if (wpipe->pipe_map.kva == 0) {
546 		/*
547 		 * We need to allocate space for an extra page because the
548 		 * address range might (will) span pages at times.
549 		 */
550 		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
551 			wpipe->pipe_buffer.size + PAGE_SIZE);
552 		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
553 	}
554 	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
555 		wpipe->pipe_map.npages);
556 
557 /*
558  * and update the uio data
559  */
560 
561 	uio->uio_iov->iov_len -= size;
562 	uio->uio_iov->iov_base += size;
563 	if (uio->uio_iov->iov_len == 0)
564 		uio->uio_iov++;
565 	uio->uio_resid -= size;
566 	uio->uio_offset += size;
567 	return 0;
568 }
569 
570 /*
571  * unmap and unwire the process buffer
572  */
573 static void
574 pipe_destroy_write_buffer(wpipe)
575 struct pipe *wpipe;
576 {
577 	int i;
578 	if (wpipe->pipe_map.kva) {
579 		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
580 
581 		if (amountpipekva > MAXPIPEKVA) {
582 			vm_offset_t kva = wpipe->pipe_map.kva;
583 			wpipe->pipe_map.kva = 0;
584 			kmem_free(kernel_map, kva,
585 				wpipe->pipe_buffer.size + PAGE_SIZE);
586 			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
587 		}
588 	}
589 	for (i=0;i<wpipe->pipe_map.npages;i++)
590 		vm_page_unwire(wpipe->pipe_map.ms[i], 1);
591 }
592 
593 /*
594  * In the case of a signal, the writing process might go away.  This
595  * code copies the data into the circular buffer so that the source
596  * pages can be freed without loss of data.
597  */
598 static void
599 pipe_clone_write_buffer(wpipe)
600 struct pipe *wpipe;
601 {
602 	int size;
603 	int pos;
604 
605 	size = wpipe->pipe_map.cnt;
606 	pos = wpipe->pipe_map.pos;
607 	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
608 			(caddr_t) wpipe->pipe_buffer.buffer,
609 			size);
610 
611 	wpipe->pipe_buffer.in = size;
612 	wpipe->pipe_buffer.out = 0;
613 	wpipe->pipe_buffer.cnt = size;
614 	wpipe->pipe_state &= ~PIPE_DIRECTW;
615 
616 	pipe_destroy_write_buffer(wpipe);
617 }
618 
619 /*
620  * This implements the pipe buffer write mechanism.  Note that only
621  * a direct write OR a normal pipe write can be pending at any given time.
622  * If there are any characters in the pipe buffer, the direct write will
623  * be deferred until the receiving process grabs all of the bytes from
624  * the pipe buffer.  Then the direct mapping write is set-up.
625  */
626 static int
627 pipe_direct_write(wpipe, uio)
628 	struct pipe *wpipe;
629 	struct uio *uio;
630 {
631 	int error;
632 retry:
633 	while (wpipe->pipe_state & PIPE_DIRECTW) {
634 		if ( wpipe->pipe_state & PIPE_WANTR) {
635 			wpipe->pipe_state &= ~PIPE_WANTR;
636 			wakeup(wpipe);
637 		}
638 		wpipe->pipe_state |= PIPE_WANTW;
639 		error = tsleep(wpipe,
640 				PRIBIO|PCATCH, "pipdww", 0);
641 		if (error)
642 			goto error1;
643 		if (wpipe->pipe_state & PIPE_EOF) {
644 			error = EPIPE;
645 			goto error1;
646 		}
647 	}
648 	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
649 	if (wpipe->pipe_buffer.cnt > 0) {
650 		if ( wpipe->pipe_state & PIPE_WANTR) {
651 			wpipe->pipe_state &= ~PIPE_WANTR;
652 			wakeup(wpipe);
653 		}
654 
655 		wpipe->pipe_state |= PIPE_WANTW;
656 		error = tsleep(wpipe,
657 				PRIBIO|PCATCH, "pipdwc", 0);
658 		if (error)
659 			goto error1;
660 		if (wpipe->pipe_state & PIPE_EOF) {
661 			error = EPIPE;
662 			goto error1;
663 		}
664 		goto retry;
665 	}
666 
667 	wpipe->pipe_state |= PIPE_DIRECTW;
668 
669 	error = pipe_build_write_buffer(wpipe, uio);
670 	if (error) {
671 		wpipe->pipe_state &= ~PIPE_DIRECTW;
672 		goto error1;
673 	}
674 
675 	error = 0;
676 	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
677 		if (wpipe->pipe_state & PIPE_EOF) {
678 			pipelock(wpipe, 0);
679 			pipe_destroy_write_buffer(wpipe);
680 			pipeunlock(wpipe);
681 			pipeselwakeup(wpipe);
682 			error = EPIPE;
683 			goto error1;
684 		}
685 		if (wpipe->pipe_state & PIPE_WANTR) {
686 			wpipe->pipe_state &= ~PIPE_WANTR;
687 			wakeup(wpipe);
688 		}
689 		pipeselwakeup(wpipe);
690 		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
691 	}
692 
693 	pipelock(wpipe,0);
694 	if (wpipe->pipe_state & PIPE_DIRECTW) {
695 		/*
696 		 * this bit of trickery substitutes a kernel buffer for
697 		 * the process that might be going away.
698 		 */
699 		pipe_clone_write_buffer(wpipe);
700 	} else {
701 		pipe_destroy_write_buffer(wpipe);
702 	}
703 	pipeunlock(wpipe);
704 	return error;
705 
706 error1:
707 	wakeup(wpipe);
708 	return error;
709 }
710 #endif
711 
712 static int
713 pipe_write(fp, uio, cred, flags, p)
714 	struct file *fp;
715 	struct uio *uio;
716 	struct ucred *cred;
717 	struct proc *p;
718 	int flags;
719 {
720 	int error = 0;
721 	int orig_resid;
722 
723 	struct pipe *wpipe, *rpipe;
724 
725 	rpipe = (struct pipe *) fp->f_data;
726 	wpipe = rpipe->pipe_peer;
727 
728 	/*
729 	 * detect loss of pipe read side, issue SIGPIPE if lost.
730 	 */
731 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
732 		return EPIPE;
733 	}
734 
735 	/*
736 	 * If it is advantageous to resize the pipe buffer, do
737 	 * so.
738 	 */
739 	if ((uio->uio_resid > PIPE_SIZE) &&
740 		(nbigpipe < LIMITBIGPIPES) &&
741 		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
742 		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
743 		(wpipe->pipe_buffer.cnt == 0)) {
744 
745 		if (wpipe->pipe_buffer.buffer) {
746 			amountpipekva -= wpipe->pipe_buffer.size;
747 			kmem_free(kernel_map,
748 				(vm_offset_t)wpipe->pipe_buffer.buffer,
749 				wpipe->pipe_buffer.size);
750 		}
751 
752 #ifndef PIPE_NODIRECT
753 		if (wpipe->pipe_map.kva) {
754 			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
755 			kmem_free(kernel_map,
756 				wpipe->pipe_map.kva,
757 				wpipe->pipe_buffer.size + PAGE_SIZE);
758 		}
759 #endif
760 
761 		wpipe->pipe_buffer.in = 0;
762 		wpipe->pipe_buffer.out = 0;
763 		wpipe->pipe_buffer.cnt = 0;
764 		wpipe->pipe_buffer.size = BIG_PIPE_SIZE;
765 		wpipe->pipe_buffer.buffer = NULL;
766 		++nbigpipe;
767 
768 #ifndef PIPE_NODIRECT
769 		wpipe->pipe_map.cnt = 0;
770 		wpipe->pipe_map.kva = 0;
771 		wpipe->pipe_map.pos = 0;
772 		wpipe->pipe_map.npages = 0;
773 #endif
774 
775 	}
776 
777 
778 	if( wpipe->pipe_buffer.buffer == NULL) {
779 		if ((error = pipelock(wpipe,1)) == 0) {
780 			pipespace(wpipe);
781 			pipeunlock(wpipe);
782 		} else {
783 			return error;
784 		}
785 	}
786 
787 	++wpipe->pipe_busy;
788 	orig_resid = uio->uio_resid;
789 	while (uio->uio_resid) {
790 		int space;
791 #ifndef PIPE_NODIRECT
792 		/*
793 		 * If the transfer is large, we can gain performance if
794 		 * we do process-to-process copies directly.
795 		 * If the write is non-blocking, we don't use the
796 		 * direct write mechanism.
797 		 *
798 		 * The direct write mechanism will detect the reader going
799 		 * away on us.
800 		 */
801 		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
802 		    (fp->f_flag & FNONBLOCK) == 0 &&
803 			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
804 			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
805 			error = pipe_direct_write( wpipe, uio);
806 			if (error) {
807 				break;
808 			}
809 			continue;
810 		}
811 #endif
812 
813 		/*
814 		 * Pipe buffered writes cannot be coincidental with
815 		 * direct writes.  We wait until the currently executing
816 		 * direct write is completed before we start filling the
817 		 * pipe buffer.  We break out if a signal occurs or the
818 		 * reader goes away.
819 		 */
820 	retrywrite:
821 		while (wpipe->pipe_state & PIPE_DIRECTW) {
822 			if (wpipe->pipe_state & PIPE_WANTR) {
823 				wpipe->pipe_state &= ~PIPE_WANTR;
824 				wakeup(wpipe);
825 			}
826 			error = tsleep(wpipe, PRIBIO|PCATCH, "pipbww", 0);
827 			if (wpipe->pipe_state & PIPE_EOF)
828 				break;
829 			if (error)
830 				break;
831 		}
832 		if (wpipe->pipe_state & PIPE_EOF) {
833 			error = EPIPE;
834 			break;
835 		}
836 
837 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
838 
839 		/* Writes of size <= PIPE_BUF must be atomic. */
840 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
841 			space = 0;
842 
843 		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
844 			if ((error = pipelock(wpipe,1)) == 0) {
845 				int size;	/* Transfer size */
846 				int segsize;	/* first segment to transfer */
847 				/*
848 				 * It is possible for a direct write to
849 				 * slip in on us... handle it here...
850 				 */
851 				if (wpipe->pipe_state & PIPE_DIRECTW) {
852 					pipeunlock(wpipe);
853 					goto retrywrite;
854 				}
855 				/*
856 				 * If a process blocked in uiomove, our
857 				 * value for space might be bad.
858 				 *
859 				 * XXX will we be ok if the reader has gone
860 				 * away here?
861 				 */
862 				if (space > wpipe->pipe_buffer.size -
863 				    wpipe->pipe_buffer.cnt) {
864 					pipeunlock(wpipe);
865 					goto retrywrite;
866 				}
867 
868 				/*
869 				 * Transfer size is minimum of uio transfer
870 				 * and free space in pipe buffer.
871 				 */
872 				if (space > uio->uio_resid)
873 					size = uio->uio_resid;
874 				else
875 					size = space;
876 				/*
877 				 * First segment to transfer is minimum of
878 				 * transfer size and contiguous space in
879 				 * pipe buffer.  If first segment to transfer
880 				 * is less than the transfer size, we've got
881 				 * a wraparound in the buffer.
882 				 */
883 				segsize = wpipe->pipe_buffer.size -
884 					wpipe->pipe_buffer.in;
885 				if (segsize > size)
886 					segsize = size;
887 
888 				/* Transfer first segment */
889 
890 				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
891 						segsize, uio);
892 
893 				if (error == 0 && segsize < size) {
894 					/*
895 					 * Transfer remaining part now, to
896 					 * support atomic writes.  Wraparound
897 					 * happened.
898 					 */
899 					if (wpipe->pipe_buffer.in + segsize !=
900 					    wpipe->pipe_buffer.size)
901 						panic("Expected pipe buffer wraparound disappeared");
902 
903 					error = uiomove(&wpipe->pipe_buffer.buffer[0],
904 							size - segsize, uio);
905 				}
906 				if (error == 0) {
907 					wpipe->pipe_buffer.in += size;
908 					if (wpipe->pipe_buffer.in >=
909 					    wpipe->pipe_buffer.size) {
910 						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
911 							panic("Expected wraparound bad");
912 						wpipe->pipe_buffer.in = size - segsize;
913 					}
914 
915 					wpipe->pipe_buffer.cnt += size;
916 					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
917 						panic("Pipe buffer overflow");
918 
919 				}
920 				pipeunlock(wpipe);
921 			}
922 			if (error)
923 				break;
924 
925 		} else {
926 			/*
927 			 * If the "read-side" has been blocked, wake it up now.
928 			 */
929 			if (wpipe->pipe_state & PIPE_WANTR) {
930 				wpipe->pipe_state &= ~PIPE_WANTR;
931 				wakeup(wpipe);
932 			}
933 
934 			/*
935 			 * don't block on non-blocking I/O
936 			 */
937 			if (fp->f_flag & FNONBLOCK) {
938 				error = EAGAIN;
939 				break;
940 			}
941 
942 			/*
943 			 * We have no more space and have something to offer,
944 			 * wake up select/poll.
945 			 */
946 			pipeselwakeup(wpipe);
947 
948 			wpipe->pipe_state |= PIPE_WANTW;
949 			if ((error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) != 0) {
950 				break;
951 			}
952 			/*
953 			 * If read side wants to go away, we just issue a signal
954 			 * to ourselves.
955 			 */
956 			if (wpipe->pipe_state & PIPE_EOF) {
957 				error = EPIPE;
958 				break;
959 			}
960 		}
961 	}
962 
963 	--wpipe->pipe_busy;
964 	if ((wpipe->pipe_busy == 0) &&
965 		(wpipe->pipe_state & PIPE_WANT)) {
966 		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
967 		wakeup(wpipe);
968 	} else if (wpipe->pipe_buffer.cnt > 0) {
969 		/*
970 		 * If we have put any characters in the buffer, we wake up
971 		 * the reader.
972 		 */
973 		if (wpipe->pipe_state & PIPE_WANTR) {
974 			wpipe->pipe_state &= ~PIPE_WANTR;
975 			wakeup(wpipe);
976 		}
977 	}
978 
979 	/*
980 	 * Don't return EPIPE if I/O was successful
981 	 */
982 	if ((wpipe->pipe_buffer.cnt == 0) &&
983 		(uio->uio_resid == 0) &&
984 		(error == EPIPE))
985 		error = 0;
986 
987 	if (error == 0)
988 		vfs_timestamp(&wpipe->pipe_mtime);
989 
990 	/*
991 	 * We have something to offer,
992 	 * wake up select/poll.
993 	 */
994 	if (wpipe->pipe_buffer.cnt)
995 		pipeselwakeup(wpipe);
996 
997 	return error;
998 }
999 
1000 /*
1001  * we implement a very minimal set of ioctls for compatibility with sockets.
1002  */
1003 int
1004 pipe_ioctl(fp, cmd, data, p)
1005 	struct file *fp;
1006 	u_long cmd;
1007 	register caddr_t data;
1008 	struct proc *p;
1009 {
1010 	register struct pipe *mpipe = (struct pipe *)fp->f_data;
1011 
1012 	switch (cmd) {
1013 
1014 	case FIONBIO:
1015 		return (0);
1016 
1017 	case FIOASYNC:
1018 		if (*(int *)data) {
1019 			mpipe->pipe_state |= PIPE_ASYNC;
1020 		} else {
1021 			mpipe->pipe_state &= ~PIPE_ASYNC;
1022 		}
1023 		return (0);
1024 
1025 	case FIONREAD:
1026 		if (mpipe->pipe_state & PIPE_DIRECTW)
1027 			*(int *)data = mpipe->pipe_map.cnt;
1028 		else
1029 			*(int *)data = mpipe->pipe_buffer.cnt;
1030 		return (0);
1031 
1032 	case FIOSETOWN:
1033 		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1034 
1035 	case FIOGETOWN:
1036 		*(int *)data = fgetown(mpipe->pipe_sigio);
1037 		return (0);
1038 
1039 	/* This is deprecated, FIOSETOWN should be used instead. */
1040 	case TIOCSPGRP:
1041 		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1042 
1043 	/* This is deprecated, FIOGETOWN should be used instead. */
1044 	case TIOCGPGRP:
1045 		*(int *)data = -fgetown(mpipe->pipe_sigio);
1046 		return (0);
1047 
1048 	}
1049 	return (ENOTTY);
1050 }
1051 
1052 int
1053 pipe_poll(fp, events, cred, p)
1054 	struct file *fp;
1055 	int events;
1056 	struct ucred *cred;
1057 	struct proc *p;
1058 {
1059 	register struct pipe *rpipe = (struct pipe *)fp->f_data;
1060 	struct pipe *wpipe;
1061 	int revents = 0;
1062 
1063 	wpipe = rpipe->pipe_peer;
1064 	if (events & (POLLIN | POLLRDNORM))
1065 		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1066 		    (rpipe->pipe_buffer.cnt > 0) ||
1067 		    (rpipe->pipe_state & PIPE_EOF))
1068 			revents |= events & (POLLIN | POLLRDNORM);
1069 
1070 	if (events & (POLLOUT | POLLWRNORM))
1071 		if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1072 		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1073 		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1074 			revents |= events & (POLLOUT | POLLWRNORM);
1075 
1076 	if ((rpipe->pipe_state & PIPE_EOF) ||
1077 	    (wpipe == NULL) ||
1078 	    (wpipe->pipe_state & PIPE_EOF))
1079 		revents |= POLLHUP;
1080 
1081 	if (revents == 0) {
1082 		if (events & (POLLIN | POLLRDNORM)) {
1083 			selrecord(p, &rpipe->pipe_sel);
1084 			rpipe->pipe_state |= PIPE_SEL;
1085 		}
1086 
1087 		if (events & (POLLOUT | POLLWRNORM)) {
1088 			selrecord(p, &wpipe->pipe_sel);
1089 			wpipe->pipe_state |= PIPE_SEL;
1090 		}
1091 	}
1092 
1093 	return (revents);
1094 }
1095 
1096 static int
1097 pipe_stat(fp, ub, p)
1098 	struct file *fp;
1099 	struct stat *ub;
1100 	struct proc *p;
1101 {
1102 	struct pipe *pipe = (struct pipe *)fp->f_data;
1103 
1104 	bzero((caddr_t)ub, sizeof (*ub));
1105 	ub->st_mode = S_IFIFO;
1106 	ub->st_blksize = pipe->pipe_buffer.size;
1107 	ub->st_size = pipe->pipe_buffer.cnt;
1108 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1109 	ub->st_atimespec = pipe->pipe_atime;
1110 	ub->st_mtimespec = pipe->pipe_mtime;
1111 	ub->st_ctimespec = pipe->pipe_ctime;
1112 	ub->st_uid = fp->f_cred->cr_uid;
1113 	ub->st_gid = fp->f_cred->cr_gid;
1114 	/*
1115 	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1116 	 * XXX (st_dev, st_ino) should be unique.
1117 	 */
1118 	return 0;
1119 }
1120 
1121 /* ARGSUSED */
1122 static int
1123 pipe_close(fp, p)
1124 	struct file *fp;
1125 	struct proc *p;
1126 {
1127 	struct pipe *cpipe = (struct pipe *)fp->f_data;
1128 
1129 	fp->f_ops = &badfileops;
1130 	fp->f_data = NULL;
1131 	funsetown(cpipe->pipe_sigio);
1132 	pipeclose(cpipe);
1133 	return 0;
1134 }
1135 
1136 /*
1137  * shutdown the pipe
1138  */
1139 static void
1140 pipeclose(cpipe)
1141 	struct pipe *cpipe;
1142 {
1143 	struct pipe *ppipe;
1144 	if (cpipe) {
1145 
1146 		pipeselwakeup(cpipe);
1147 
1148 		/*
1149 		 * If the other side is blocked, wake it up saying that
1150 		 * we want to close it down.
1151 		 */
1152 		while (cpipe->pipe_busy) {
1153 			wakeup(cpipe);
1154 			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
1155 			tsleep(cpipe, PRIBIO, "pipecl", 0);
1156 		}
1157 
1158 		/*
1159 		 * Disconnect from peer
1160 		 */
1161 		if ((ppipe = cpipe->pipe_peer) != NULL) {
1162 			pipeselwakeup(ppipe);
1163 
1164 			ppipe->pipe_state |= PIPE_EOF;
1165 			wakeup(ppipe);
1166 			ppipe->pipe_peer = NULL;
1167 		}
1168 
1169 		/*
1170 		 * free resources
1171 		 */
1172 		if (cpipe->pipe_buffer.buffer) {
1173 			if (cpipe->pipe_buffer.size > PIPE_SIZE)
1174 				--nbigpipe;
1175 			amountpipekva -= cpipe->pipe_buffer.size;
1176 			kmem_free(kernel_map,
1177 				(vm_offset_t)cpipe->pipe_buffer.buffer,
1178 				cpipe->pipe_buffer.size);
1179 		}
1180 #ifndef PIPE_NODIRECT
1181 		if (cpipe->pipe_map.kva) {
1182 			amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1183 			kmem_free(kernel_map,
1184 				cpipe->pipe_map.kva,
1185 				cpipe->pipe_buffer.size + PAGE_SIZE);
1186 		}
1187 #endif
1188 		zfree(pipe_zone, cpipe);
1189 	}
1190 }
1191 
1192 /*ARGSUSED*/
1193 static int
1194 pipe_kqfilter(struct file *fp, struct knote *kn)
1195 {
1196 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1197 
1198 	switch (kn->kn_filter) {
1199 	case EVFILT_READ:
1200 		kn->kn_fop = &pipe_rfiltops;
1201 		break;
1202 	case EVFILT_WRITE:
1203 		kn->kn_fop = &pipe_wfiltops;
1204 		break;
1205 	default:
1206 		return (1);
1207 	}
1208 
1209 	SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext);
1210 	return (0);
1211 }
1212 
1213 static void
1214 filt_pipedetach(struct knote *kn)
1215 {
1216 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1217 
1218 	SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1219 }
1220 
1221 /*ARGSUSED*/
1222 static int
1223 filt_piperead(struct knote *kn, long hint)
1224 {
1225 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1226 	struct pipe *wpipe = rpipe->pipe_peer;
1227 
1228 	kn->kn_data = rpipe->pipe_buffer.cnt;
1229 	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1230 		kn->kn_data = rpipe->pipe_map.cnt;
1231 
1232 	if ((rpipe->pipe_state & PIPE_EOF) ||
1233 	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1234 		kn->kn_flags |= EV_EOF;
1235 		return (1);
1236 	}
1237 	return (kn->kn_data > 0);
1238 }
1239 
1240 /*ARGSUSED*/
1241 static int
1242 filt_pipewrite(struct knote *kn, long hint)
1243 {
1244 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1245 	struct pipe *wpipe = rpipe->pipe_peer;
1246 
1247 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1248 		kn->kn_data = 0;
1249 		kn->kn_flags |= EV_EOF;
1250 		return (1);
1251 	}
1252 	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1253 	if (wpipe->pipe_state & PIPE_DIRECTW)
1254 		kn->kn_data = 0;
1255 
1256 	return (kn->kn_data >= PIPE_BUF);
1257 }
1258