xref: /freebsd/sys/kern/sys_pipe.c (revision 8fa113e5fc65fe6abc757f0089f477a87ee4d185)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD$
20  */
21 
22 /*
23  * This file contains a high-performance replacement for the socket-based
24  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25  * all features of sockets, but does do everything that pipes normally
26  * do.
27  */
28 
29 /*
30  * This code has two modes of operation, a small write mode and a large
31  * write mode.  The small write mode acts like conventional pipes with
32  * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
33  * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
34  * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35  * the receiving process can copy it directly from the pages in the sending
36  * process.
37  *
38  * If the sending process receives a signal, it is possible that it will
39  * go away, and certainly its address space can change, because control
40  * is returned back to the user-mode side.  In that case, the pipe code
41  * arranges to copy the buffer supplied by the user process, to a pageable
42  * kernel buffer, and the receiving process will grab the data from the
43  * pageable kernel buffer.  Since signals don't happen all that often,
44  * the copy operation is normally eliminated.
45  *
46  * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47  * happen for small transfers so that the system will not spend all of
48  * its time context switching.  PIPE_SIZE is constrained by the
49  * amount of kernel virtual memory.
50  */
51 
52 #include <sys/param.h>
53 #include <sys/systm.h>
54 #include <sys/fcntl.h>
55 #include <sys/file.h>
56 #include <sys/filedesc.h>
57 #include <sys/filio.h>
58 #include <sys/lock.h>
59 #include <sys/mutex.h>
60 #include <sys/ttycom.h>
61 #include <sys/stat.h>
62 #include <sys/poll.h>
63 #include <sys/selinfo.h>
64 #include <sys/signalvar.h>
65 #include <sys/sysproto.h>
66 #include <sys/pipe.h>
67 #include <sys/proc.h>
68 #include <sys/vnode.h>
69 #include <sys/uio.h>
70 #include <sys/event.h>
71 
72 #include <vm/vm.h>
73 #include <vm/vm_param.h>
74 #include <vm/vm_object.h>
75 #include <vm/vm_kern.h>
76 #include <vm/vm_extern.h>
77 #include <vm/pmap.h>
78 #include <vm/vm_map.h>
79 #include <vm/vm_page.h>
80 #include <vm/vm_zone.h>
81 
82 /*
83  * Use this define if you want to disable *fancy* VM things.  Expect an
84  * approx 30% decrease in transfer rate.  This could be useful for
85  * NetBSD or OpenBSD.
86  */
87 /* #define PIPE_NODIRECT */
88 
89 /*
90  * interfaces to the outside world
91  */
92 static int pipe_read __P((struct file *fp, struct uio *uio,
93 		struct ucred *cred, int flags, struct thread *td));
94 static int pipe_write __P((struct file *fp, struct uio *uio,
95 		struct ucred *cred, int flags, struct thread *td));
96 static int pipe_close __P((struct file *fp, struct thread *td));
97 static int pipe_poll __P((struct file *fp, int events, struct ucred *cred,
98 		struct thread *td));
99 static int pipe_kqfilter __P((struct file *fp, struct knote *kn));
100 static int pipe_stat __P((struct file *fp, struct stat *sb, struct thread *td));
101 static int pipe_ioctl __P((struct file *fp, u_long cmd, caddr_t data, struct thread *td));
102 
103 static struct fileops pipeops = {
104 	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
105 	pipe_stat, pipe_close
106 };
107 
108 static void	filt_pipedetach(struct knote *kn);
109 static int	filt_piperead(struct knote *kn, long hint);
110 static int	filt_pipewrite(struct knote *kn, long hint);
111 
112 static struct filterops pipe_rfiltops =
113 	{ 1, NULL, filt_pipedetach, filt_piperead };
114 static struct filterops pipe_wfiltops =
115 	{ 1, NULL, filt_pipedetach, filt_pipewrite };
116 
117 
118 /*
119  * Default pipe buffer size(s), this can be kind-of large now because pipe
120  * space is pageable.  The pipe code will try to maintain locality of
121  * reference for performance reasons, so small amounts of outstanding I/O
122  * will not wipe the cache.
123  */
124 #define MINPIPESIZE (PIPE_SIZE/3)
125 #define MAXPIPESIZE (2*PIPE_SIZE/3)
126 
127 /*
128  * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
129  * is there so that on large systems, we don't exhaust it.
130  */
131 #define MAXPIPEKVA (8*1024*1024)
132 
133 /*
134  * Limit for direct transfers, we cannot, of course limit
135  * the amount of kva for pipes in general though.
136  */
137 #define LIMITPIPEKVA (16*1024*1024)
138 
139 /*
140  * Limit the number of "big" pipes
141  */
142 #define LIMITBIGPIPES	32
143 static int nbigpipe;
144 
145 static int amountpipekva;
146 
147 static void pipeclose __P((struct pipe *cpipe));
148 static void pipe_free_kmem __P((struct pipe *cpipe));
149 static int pipe_create __P((struct pipe **cpipep));
150 static __inline int pipelock __P((struct pipe *cpipe, int catch));
151 static __inline void pipeunlock __P((struct pipe *cpipe));
152 static __inline void pipeselwakeup __P((struct pipe *cpipe));
153 #ifndef PIPE_NODIRECT
154 static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
155 static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
156 static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
157 static void pipe_clone_write_buffer __P((struct pipe *wpipe));
158 #endif
159 static int pipespace __P((struct pipe *cpipe, int size));
160 
161 static vm_zone_t pipe_zone;
162 
163 /*
164  * The pipe system call for the DTYPE_PIPE type of pipes
165  */
166 
167 /* ARGSUSED */
168 int
169 pipe(td, uap)
170 	struct thread *td;
171 	struct pipe_args /* {
172 		int	dummy;
173 	} */ *uap;
174 {
175 	struct filedesc *fdp = td->td_proc->p_fd;
176 	struct file *rf, *wf;
177 	struct pipe *rpipe, *wpipe;
178 	int fd, error;
179 
180 	if (pipe_zone == NULL)
181 		pipe_zone = zinit("PIPE", sizeof(struct pipe), 0, 0, 4);
182 
183 	rpipe = wpipe = NULL;
184 	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
185 		pipeclose(rpipe);
186 		pipeclose(wpipe);
187 		return (ENFILE);
188 	}
189 
190 	rpipe->pipe_state |= PIPE_DIRECTOK;
191 	wpipe->pipe_state |= PIPE_DIRECTOK;
192 
193 	error = falloc(td, &rf, &fd);
194 	if (error) {
195 		pipeclose(rpipe);
196 		pipeclose(wpipe);
197 		return (error);
198 	}
199 	fhold(rf);
200 	td->td_retval[0] = fd;
201 
202 	/*
203 	 * Warning: once we've gotten past allocation of the fd for the
204 	 * read-side, we can only drop the read side via fdrop() in order
205 	 * to avoid races against processes which manage to dup() the read
206 	 * side while we are blocked trying to allocate the write side.
207 	 */
208 	rf->f_flag = FREAD | FWRITE;
209 	rf->f_type = DTYPE_PIPE;
210 	rf->f_data = (caddr_t)rpipe;
211 	rf->f_ops = &pipeops;
212 	error = falloc(td, &wf, &fd);
213 	if (error) {
214 		if (fdp->fd_ofiles[td->td_retval[0]] == rf) {
215 			fdp->fd_ofiles[td->td_retval[0]] = NULL;
216 			fdrop(rf, td);
217 		}
218 		fdrop(rf, td);
219 		/* rpipe has been closed by fdrop(). */
220 		pipeclose(wpipe);
221 		return (error);
222 	}
223 	wf->f_flag = FREAD | FWRITE;
224 	wf->f_type = DTYPE_PIPE;
225 	wf->f_data = (caddr_t)wpipe;
226 	wf->f_ops = &pipeops;
227 	td->td_retval[1] = fd;
228 
229 	rpipe->pipe_peer = wpipe;
230 	wpipe->pipe_peer = rpipe;
231 	fdrop(rf, td);
232 
233 	return (0);
234 }
235 
236 /*
237  * Allocate kva for pipe circular buffer, the space is pageable
238  * This routine will 'realloc' the size of a pipe safely, if it fails
239  * it will retain the old buffer.
240  * If it fails it will return ENOMEM.
241  */
242 static int
243 pipespace(cpipe, size)
244 	struct pipe *cpipe;
245 	int size;
246 {
247 	struct vm_object *object;
248 	caddr_t buffer;
249 	int npages, error;
250 
251 	GIANT_REQUIRED;
252 
253 	npages = round_page(size)/PAGE_SIZE;
254 	/*
255 	 * Create an object, I don't like the idea of paging to/from
256 	 * kernel_object.
257 	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
258 	 */
259 	object = vm_object_allocate(OBJT_DEFAULT, npages);
260 	buffer = (caddr_t) vm_map_min(kernel_map);
261 
262 	/*
263 	 * Insert the object into the kernel map, and allocate kva for it.
264 	 * The map entry is, by default, pageable.
265 	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
266 	 */
267 	error = vm_map_find(kernel_map, object, 0,
268 		(vm_offset_t *) &buffer, size, 1,
269 		VM_PROT_ALL, VM_PROT_ALL, 0);
270 
271 	if (error != KERN_SUCCESS) {
272 		vm_object_deallocate(object);
273 		return (ENOMEM);
274 	}
275 
276 	/* free old resources if we're resizing */
277 	pipe_free_kmem(cpipe);
278 	cpipe->pipe_buffer.object = object;
279 	cpipe->pipe_buffer.buffer = buffer;
280 	cpipe->pipe_buffer.size = size;
281 	cpipe->pipe_buffer.in = 0;
282 	cpipe->pipe_buffer.out = 0;
283 	cpipe->pipe_buffer.cnt = 0;
284 	amountpipekva += cpipe->pipe_buffer.size;
285 	return (0);
286 }
287 
288 /*
289  * initialize and allocate VM and memory for pipe
290  */
291 static int
292 pipe_create(cpipep)
293 	struct pipe **cpipep;
294 {
295 	struct pipe *cpipe;
296 	int error;
297 
298 	*cpipep = zalloc(pipe_zone);
299 	if (*cpipep == NULL)
300 		return (ENOMEM);
301 
302 	cpipe = *cpipep;
303 
304 	/* so pipespace()->pipe_free_kmem() doesn't follow junk pointer */
305 	cpipe->pipe_buffer.object = NULL;
306 #ifndef PIPE_NODIRECT
307 	cpipe->pipe_map.kva = NULL;
308 #endif
309 	/*
310 	 * protect so pipeclose() doesn't follow a junk pointer
311 	 * if pipespace() fails.
312 	 */
313 	bzero(&cpipe->pipe_sel, sizeof(cpipe->pipe_sel));
314 	cpipe->pipe_state = 0;
315 	cpipe->pipe_peer = NULL;
316 	cpipe->pipe_busy = 0;
317 
318 #ifndef PIPE_NODIRECT
319 	/*
320 	 * pipe data structure initializations to support direct pipe I/O
321 	 */
322 	cpipe->pipe_map.cnt = 0;
323 	cpipe->pipe_map.kva = 0;
324 	cpipe->pipe_map.pos = 0;
325 	cpipe->pipe_map.npages = 0;
326 	/* cpipe->pipe_map.ms[] = invalid */
327 #endif
328 
329 	error = pipespace(cpipe, PIPE_SIZE);
330 	if (error)
331 		return (error);
332 
333 	vfs_timestamp(&cpipe->pipe_ctime);
334 	cpipe->pipe_atime = cpipe->pipe_ctime;
335 	cpipe->pipe_mtime = cpipe->pipe_ctime;
336 
337 	return (0);
338 }
339 
340 
341 /*
342  * lock a pipe for I/O, blocking other access
343  */
344 static __inline int
345 pipelock(cpipe, catch)
346 	struct pipe *cpipe;
347 	int catch;
348 {
349 	int error;
350 
351 	while (cpipe->pipe_state & PIPE_LOCK) {
352 		cpipe->pipe_state |= PIPE_LWANT;
353 		error = tsleep(cpipe, catch ? (PRIBIO | PCATCH) : PRIBIO,
354 		    "pipelk", 0);
355 		if (error != 0)
356 			return (error);
357 	}
358 	cpipe->pipe_state |= PIPE_LOCK;
359 	return (0);
360 }
361 
362 /*
363  * unlock a pipe I/O lock
364  */
365 static __inline void
366 pipeunlock(cpipe)
367 	struct pipe *cpipe;
368 {
369 
370 	cpipe->pipe_state &= ~PIPE_LOCK;
371 	if (cpipe->pipe_state & PIPE_LWANT) {
372 		cpipe->pipe_state &= ~PIPE_LWANT;
373 		wakeup(cpipe);
374 	}
375 }
376 
377 static __inline void
378 pipeselwakeup(cpipe)
379 	struct pipe *cpipe;
380 {
381 
382 	if (cpipe->pipe_state & PIPE_SEL) {
383 		cpipe->pipe_state &= ~PIPE_SEL;
384 		selwakeup(&cpipe->pipe_sel);
385 	}
386 	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
387 		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
388 	KNOTE(&cpipe->pipe_sel.si_note, 0);
389 }
390 
391 /* ARGSUSED */
392 static int
393 pipe_read(fp, uio, cred, flags, td)
394 	struct file *fp;
395 	struct uio *uio;
396 	struct ucred *cred;
397 	struct thread *td;
398 	int flags;
399 {
400 	struct pipe *rpipe = (struct pipe *) fp->f_data;
401 	int error;
402 	int nread = 0;
403 	u_int size;
404 
405 	++rpipe->pipe_busy;
406 	error = pipelock(rpipe, 1);
407 	if (error)
408 		goto unlocked_error;
409 
410 	while (uio->uio_resid) {
411 		/*
412 		 * normal pipe buffer receive
413 		 */
414 		if (rpipe->pipe_buffer.cnt > 0) {
415 			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
416 			if (size > rpipe->pipe_buffer.cnt)
417 				size = rpipe->pipe_buffer.cnt;
418 			if (size > (u_int) uio->uio_resid)
419 				size = (u_int) uio->uio_resid;
420 
421 			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
422 					size, uio);
423 			if (error)
424 				break;
425 
426 			rpipe->pipe_buffer.out += size;
427 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
428 				rpipe->pipe_buffer.out = 0;
429 
430 			rpipe->pipe_buffer.cnt -= size;
431 
432 			/*
433 			 * If there is no more to read in the pipe, reset
434 			 * its pointers to the beginning.  This improves
435 			 * cache hit stats.
436 			 */
437 			if (rpipe->pipe_buffer.cnt == 0) {
438 				rpipe->pipe_buffer.in = 0;
439 				rpipe->pipe_buffer.out = 0;
440 			}
441 			nread += size;
442 #ifndef PIPE_NODIRECT
443 		/*
444 		 * Direct copy, bypassing a kernel buffer.
445 		 */
446 		} else if ((size = rpipe->pipe_map.cnt) &&
447 			   (rpipe->pipe_state & PIPE_DIRECTW)) {
448 			caddr_t	va;
449 			if (size > (u_int) uio->uio_resid)
450 				size = (u_int) uio->uio_resid;
451 
452 			va = (caddr_t) rpipe->pipe_map.kva +
453 			    rpipe->pipe_map.pos;
454 			error = uiomove(va, size, uio);
455 			if (error)
456 				break;
457 			nread += size;
458 			rpipe->pipe_map.pos += size;
459 			rpipe->pipe_map.cnt -= size;
460 			if (rpipe->pipe_map.cnt == 0) {
461 				rpipe->pipe_state &= ~PIPE_DIRECTW;
462 				wakeup(rpipe);
463 			}
464 #endif
465 		} else {
466 			/*
467 			 * detect EOF condition
468 			 * read returns 0 on EOF, no need to set error
469 			 */
470 			if (rpipe->pipe_state & PIPE_EOF)
471 				break;
472 
473 			/*
474 			 * If the "write-side" has been blocked, wake it up now.
475 			 */
476 			if (rpipe->pipe_state & PIPE_WANTW) {
477 				rpipe->pipe_state &= ~PIPE_WANTW;
478 				wakeup(rpipe);
479 			}
480 
481 			/*
482 			 * Break if some data was read.
483 			 */
484 			if (nread > 0)
485 				break;
486 
487 			/*
488 			 * Unlock the pipe buffer for our remaining processing.  We
489 			 * will either break out with an error or we will sleep and
490 			 * relock to loop.
491 			 */
492 			pipeunlock(rpipe);
493 
494 			/*
495 			 * Handle non-blocking mode operation or
496 			 * wait for more data.
497 			 */
498 			if (fp->f_flag & FNONBLOCK) {
499 				error = EAGAIN;
500 			} else {
501 				rpipe->pipe_state |= PIPE_WANTR;
502 				if ((error = tsleep(rpipe, PRIBIO | PCATCH,
503 				    "piperd", 0)) == 0)
504 					error = pipelock(rpipe, 1);
505 			}
506 			if (error)
507 				goto unlocked_error;
508 		}
509 	}
510 	pipeunlock(rpipe);
511 
512 	if (error == 0)
513 		vfs_timestamp(&rpipe->pipe_atime);
514 unlocked_error:
515 	--rpipe->pipe_busy;
516 
517 	/*
518 	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
519 	 */
520 	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
521 		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
522 		wakeup(rpipe);
523 	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
524 		/*
525 		 * Handle write blocking hysteresis.
526 		 */
527 		if (rpipe->pipe_state & PIPE_WANTW) {
528 			rpipe->pipe_state &= ~PIPE_WANTW;
529 			wakeup(rpipe);
530 		}
531 	}
532 
533 	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
534 		pipeselwakeup(rpipe);
535 
536 	return (error);
537 }
538 
539 #ifndef PIPE_NODIRECT
540 /*
541  * Map the sending processes' buffer into kernel space and wire it.
542  * This is similar to a physical write operation.
543  */
544 static int
545 pipe_build_write_buffer(wpipe, uio)
546 	struct pipe *wpipe;
547 	struct uio *uio;
548 {
549 	u_int size;
550 	int i;
551 	vm_offset_t addr, endaddr, paddr;
552 
553 	GIANT_REQUIRED;
554 
555 	size = (u_int) uio->uio_iov->iov_len;
556 	if (size > wpipe->pipe_buffer.size)
557 		size = wpipe->pipe_buffer.size;
558 
559 	endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
560 	addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
561 	for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
562 		vm_page_t m;
563 
564 		if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
565 		    (paddr = pmap_kextract(addr)) == 0) {
566 			int j;
567 
568 			for (j = 0; j < i; j++)
569 				vm_page_unwire(wpipe->pipe_map.ms[j], 1);
570 			return (EFAULT);
571 		}
572 
573 		m = PHYS_TO_VM_PAGE(paddr);
574 		vm_page_wire(m);
575 		wpipe->pipe_map.ms[i] = m;
576 	}
577 
578 /*
579  * set up the control block
580  */
581 	wpipe->pipe_map.npages = i;
582 	wpipe->pipe_map.pos =
583 	    ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
584 	wpipe->pipe_map.cnt = size;
585 
586 /*
587  * and map the buffer
588  */
589 	if (wpipe->pipe_map.kva == 0) {
590 		/*
591 		 * We need to allocate space for an extra page because the
592 		 * address range might (will) span pages at times.
593 		 */
594 		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
595 			wpipe->pipe_buffer.size + PAGE_SIZE);
596 		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
597 	}
598 	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
599 		wpipe->pipe_map.npages);
600 
601 /*
602  * and update the uio data
603  */
604 
605 	uio->uio_iov->iov_len -= size;
606 	uio->uio_iov->iov_base += size;
607 	if (uio->uio_iov->iov_len == 0)
608 		uio->uio_iov++;
609 	uio->uio_resid -= size;
610 	uio->uio_offset += size;
611 	return (0);
612 }
613 
614 /*
615  * unmap and unwire the process buffer
616  */
617 static void
618 pipe_destroy_write_buffer(wpipe)
619 	struct pipe *wpipe;
620 {
621 	int i;
622 
623 	GIANT_REQUIRED;
624 
625 	if (wpipe->pipe_map.kva) {
626 		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
627 
628 		if (amountpipekva > MAXPIPEKVA) {
629 			vm_offset_t kva = wpipe->pipe_map.kva;
630 			wpipe->pipe_map.kva = 0;
631 			kmem_free(kernel_map, kva,
632 				wpipe->pipe_buffer.size + PAGE_SIZE);
633 			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
634 		}
635 	}
636 	for (i = 0; i < wpipe->pipe_map.npages; i++)
637 		vm_page_unwire(wpipe->pipe_map.ms[i], 1);
638 }
639 
640 /*
641  * In the case of a signal, the writing process might go away.  This
642  * code copies the data into the circular buffer so that the source
643  * pages can be freed without loss of data.
644  */
645 static void
646 pipe_clone_write_buffer(wpipe)
647 	struct pipe *wpipe;
648 {
649 	int size;
650 	int pos;
651 
652 	size = wpipe->pipe_map.cnt;
653 	pos = wpipe->pipe_map.pos;
654 	bcopy((caddr_t) wpipe->pipe_map.kva + pos,
655 	    (caddr_t) wpipe->pipe_buffer.buffer, size);
656 
657 	wpipe->pipe_buffer.in = size;
658 	wpipe->pipe_buffer.out = 0;
659 	wpipe->pipe_buffer.cnt = size;
660 	wpipe->pipe_state &= ~PIPE_DIRECTW;
661 
662 	pipe_destroy_write_buffer(wpipe);
663 }
664 
665 /*
666  * This implements the pipe buffer write mechanism.  Note that only
667  * a direct write OR a normal pipe write can be pending at any given time.
668  * If there are any characters in the pipe buffer, the direct write will
669  * be deferred until the receiving process grabs all of the bytes from
670  * the pipe buffer.  Then the direct mapping write is set-up.
671  */
672 static int
673 pipe_direct_write(wpipe, uio)
674 	struct pipe *wpipe;
675 	struct uio *uio;
676 {
677 	int error;
678 
679 retry:
680 	while (wpipe->pipe_state & PIPE_DIRECTW) {
681 		if (wpipe->pipe_state & PIPE_WANTR) {
682 			wpipe->pipe_state &= ~PIPE_WANTR;
683 			wakeup(wpipe);
684 		}
685 		wpipe->pipe_state |= PIPE_WANTW;
686 		error = tsleep(wpipe, PRIBIO | PCATCH, "pipdww", 0);
687 		if (error)
688 			goto error1;
689 		if (wpipe->pipe_state & PIPE_EOF) {
690 			error = EPIPE;
691 			goto error1;
692 		}
693 	}
694 	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
695 	if (wpipe->pipe_buffer.cnt > 0) {
696 		if (wpipe->pipe_state & PIPE_WANTR) {
697 			wpipe->pipe_state &= ~PIPE_WANTR;
698 			wakeup(wpipe);
699 		}
700 
701 		wpipe->pipe_state |= PIPE_WANTW;
702 		error = tsleep(wpipe, PRIBIO | PCATCH, "pipdwc", 0);
703 		if (error)
704 			goto error1;
705 		if (wpipe->pipe_state & PIPE_EOF) {
706 			error = EPIPE;
707 			goto error1;
708 		}
709 		goto retry;
710 	}
711 
712 	wpipe->pipe_state |= PIPE_DIRECTW;
713 
714 	error = pipe_build_write_buffer(wpipe, uio);
715 	if (error) {
716 		wpipe->pipe_state &= ~PIPE_DIRECTW;
717 		goto error1;
718 	}
719 
720 	error = 0;
721 	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
722 		if (wpipe->pipe_state & PIPE_EOF) {
723 			pipelock(wpipe, 0);
724 			pipe_destroy_write_buffer(wpipe);
725 			pipeunlock(wpipe);
726 			pipeselwakeup(wpipe);
727 			error = EPIPE;
728 			goto error1;
729 		}
730 		if (wpipe->pipe_state & PIPE_WANTR) {
731 			wpipe->pipe_state &= ~PIPE_WANTR;
732 			wakeup(wpipe);
733 		}
734 		pipeselwakeup(wpipe);
735 		error = tsleep(wpipe, PRIBIO | PCATCH, "pipdwt", 0);
736 	}
737 
738 	pipelock(wpipe,0);
739 	if (wpipe->pipe_state & PIPE_DIRECTW) {
740 		/*
741 		 * this bit of trickery substitutes a kernel buffer for
742 		 * the process that might be going away.
743 		 */
744 		pipe_clone_write_buffer(wpipe);
745 	} else {
746 		pipe_destroy_write_buffer(wpipe);
747 	}
748 	pipeunlock(wpipe);
749 	return (error);
750 
751 error1:
752 	wakeup(wpipe);
753 	return (error);
754 }
755 #endif
756 
757 static int
758 pipe_write(fp, uio, cred, flags, td)
759 	struct file *fp;
760 	struct uio *uio;
761 	struct ucred *cred;
762 	struct thread *td;
763 	int flags;
764 {
765 	int error = 0;
766 	int orig_resid;
767 	struct pipe *wpipe, *rpipe;
768 
769 	rpipe = (struct pipe *) fp->f_data;
770 	wpipe = rpipe->pipe_peer;
771 
772 	/*
773 	 * detect loss of pipe read side, issue SIGPIPE if lost.
774 	 */
775 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
776 		return (EPIPE);
777 	}
778 	++wpipe->pipe_busy;
779 
780 	/*
781 	 * If it is advantageous to resize the pipe buffer, do
782 	 * so.
783 	 */
784 	if ((uio->uio_resid > PIPE_SIZE) &&
785 		(nbigpipe < LIMITBIGPIPES) &&
786 		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
787 		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
788 		(wpipe->pipe_buffer.cnt == 0)) {
789 
790 		if ((error = pipelock(wpipe,1)) == 0) {
791 			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
792 				nbigpipe++;
793 			pipeunlock(wpipe);
794 		}
795 	}
796 
797 	/*
798 	 * If an early error occured unbusy and return, waking up any pending
799 	 * readers.
800 	 */
801 	if (error) {
802 		--wpipe->pipe_busy;
803 		if ((wpipe->pipe_busy == 0) &&
804 		    (wpipe->pipe_state & PIPE_WANT)) {
805 			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
806 			wakeup(wpipe);
807 		}
808 		return(error);
809 	}
810 
811 	KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone"));
812 
813 	orig_resid = uio->uio_resid;
814 
815 	while (uio->uio_resid) {
816 		int space;
817 
818 #ifndef PIPE_NODIRECT
819 		/*
820 		 * If the transfer is large, we can gain performance if
821 		 * we do process-to-process copies directly.
822 		 * If the write is non-blocking, we don't use the
823 		 * direct write mechanism.
824 		 *
825 		 * The direct write mechanism will detect the reader going
826 		 * away on us.
827 		 */
828 		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
829 		    (fp->f_flag & FNONBLOCK) == 0 &&
830 			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
831 			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
832 			error = pipe_direct_write( wpipe, uio);
833 			if (error)
834 				break;
835 			continue;
836 		}
837 #endif
838 
839 		/*
840 		 * Pipe buffered writes cannot be coincidental with
841 		 * direct writes.  We wait until the currently executing
842 		 * direct write is completed before we start filling the
843 		 * pipe buffer.  We break out if a signal occurs or the
844 		 * reader goes away.
845 		 */
846 	retrywrite:
847 		while (wpipe->pipe_state & PIPE_DIRECTW) {
848 			if (wpipe->pipe_state & PIPE_WANTR) {
849 				wpipe->pipe_state &= ~PIPE_WANTR;
850 				wakeup(wpipe);
851 			}
852 			error = tsleep(wpipe, PRIBIO | PCATCH, "pipbww", 0);
853 			if (wpipe->pipe_state & PIPE_EOF)
854 				break;
855 			if (error)
856 				break;
857 		}
858 		if (wpipe->pipe_state & PIPE_EOF) {
859 			error = EPIPE;
860 			break;
861 		}
862 
863 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
864 
865 		/* Writes of size <= PIPE_BUF must be atomic. */
866 		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
867 			space = 0;
868 
869 		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
870 			if ((error = pipelock(wpipe,1)) == 0) {
871 				int size;	/* Transfer size */
872 				int segsize;	/* first segment to transfer */
873 
874 				/*
875 				 * It is possible for a direct write to
876 				 * slip in on us... handle it here...
877 				 */
878 				if (wpipe->pipe_state & PIPE_DIRECTW) {
879 					pipeunlock(wpipe);
880 					goto retrywrite;
881 				}
882 				/*
883 				 * If a process blocked in uiomove, our
884 				 * value for space might be bad.
885 				 *
886 				 * XXX will we be ok if the reader has gone
887 				 * away here?
888 				 */
889 				if (space > wpipe->pipe_buffer.size -
890 				    wpipe->pipe_buffer.cnt) {
891 					pipeunlock(wpipe);
892 					goto retrywrite;
893 				}
894 
895 				/*
896 				 * Transfer size is minimum of uio transfer
897 				 * and free space in pipe buffer.
898 				 */
899 				if (space > uio->uio_resid)
900 					size = uio->uio_resid;
901 				else
902 					size = space;
903 				/*
904 				 * First segment to transfer is minimum of
905 				 * transfer size and contiguous space in
906 				 * pipe buffer.  If first segment to transfer
907 				 * is less than the transfer size, we've got
908 				 * a wraparound in the buffer.
909 				 */
910 				segsize = wpipe->pipe_buffer.size -
911 					wpipe->pipe_buffer.in;
912 				if (segsize > size)
913 					segsize = size;
914 
915 				/* Transfer first segment */
916 
917 				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
918 						segsize, uio);
919 
920 				if (error == 0 && segsize < size) {
921 					/*
922 					 * Transfer remaining part now, to
923 					 * support atomic writes.  Wraparound
924 					 * happened.
925 					 */
926 					if (wpipe->pipe_buffer.in + segsize !=
927 					    wpipe->pipe_buffer.size)
928 						panic("Expected pipe buffer wraparound disappeared");
929 
930 					error = uiomove(&wpipe->pipe_buffer.buffer[0],
931 							size - segsize, uio);
932 				}
933 				if (error == 0) {
934 					wpipe->pipe_buffer.in += size;
935 					if (wpipe->pipe_buffer.in >=
936 					    wpipe->pipe_buffer.size) {
937 						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
938 							panic("Expected wraparound bad");
939 						wpipe->pipe_buffer.in = size - segsize;
940 					}
941 
942 					wpipe->pipe_buffer.cnt += size;
943 					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
944 						panic("Pipe buffer overflow");
945 
946 				}
947 				pipeunlock(wpipe);
948 			}
949 			if (error)
950 				break;
951 
952 		} else {
953 			/*
954 			 * If the "read-side" has been blocked, wake it up now.
955 			 */
956 			if (wpipe->pipe_state & PIPE_WANTR) {
957 				wpipe->pipe_state &= ~PIPE_WANTR;
958 				wakeup(wpipe);
959 			}
960 
961 			/*
962 			 * don't block on non-blocking I/O
963 			 */
964 			if (fp->f_flag & FNONBLOCK) {
965 				error = EAGAIN;
966 				break;
967 			}
968 
969 			/*
970 			 * We have no more space and have something to offer,
971 			 * wake up select/poll.
972 			 */
973 			pipeselwakeup(wpipe);
974 
975 			wpipe->pipe_state |= PIPE_WANTW;
976 			error = tsleep(wpipe, PRIBIO | PCATCH, "pipewr", 0);
977 			if (error != 0)
978 				break;
979 			/*
980 			 * If read side wants to go away, we just issue a signal
981 			 * to ourselves.
982 			 */
983 			if (wpipe->pipe_state & PIPE_EOF) {
984 				error = EPIPE;
985 				break;
986 			}
987 		}
988 	}
989 
990 	--wpipe->pipe_busy;
991 
992 	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
993 		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
994 		wakeup(wpipe);
995 	} else if (wpipe->pipe_buffer.cnt > 0) {
996 		/*
997 		 * If we have put any characters in the buffer, we wake up
998 		 * the reader.
999 		 */
1000 		if (wpipe->pipe_state & PIPE_WANTR) {
1001 			wpipe->pipe_state &= ~PIPE_WANTR;
1002 			wakeup(wpipe);
1003 		}
1004 	}
1005 
1006 	/*
1007 	 * Don't return EPIPE if I/O was successful
1008 	 */
1009 	if ((wpipe->pipe_buffer.cnt == 0) &&
1010 	    (uio->uio_resid == 0) &&
1011 	    (error == EPIPE)) {
1012 		error = 0;
1013 	}
1014 
1015 	if (error == 0)
1016 		vfs_timestamp(&wpipe->pipe_mtime);
1017 
1018 	/*
1019 	 * We have something to offer,
1020 	 * wake up select/poll.
1021 	 */
1022 	if (wpipe->pipe_buffer.cnt)
1023 		pipeselwakeup(wpipe);
1024 
1025 	return (error);
1026 }
1027 
1028 /*
1029  * we implement a very minimal set of ioctls for compatibility with sockets.
1030  */
1031 int
1032 pipe_ioctl(fp, cmd, data, td)
1033 	struct file *fp;
1034 	u_long cmd;
1035 	caddr_t data;
1036 	struct thread *td;
1037 {
1038 	struct pipe *mpipe = (struct pipe *)fp->f_data;
1039 
1040 	switch (cmd) {
1041 
1042 	case FIONBIO:
1043 		return (0);
1044 
1045 	case FIOASYNC:
1046 		if (*(int *)data) {
1047 			mpipe->pipe_state |= PIPE_ASYNC;
1048 		} else {
1049 			mpipe->pipe_state &= ~PIPE_ASYNC;
1050 		}
1051 		return (0);
1052 
1053 	case FIONREAD:
1054 		if (mpipe->pipe_state & PIPE_DIRECTW)
1055 			*(int *)data = mpipe->pipe_map.cnt;
1056 		else
1057 			*(int *)data = mpipe->pipe_buffer.cnt;
1058 		return (0);
1059 
1060 	case FIOSETOWN:
1061 		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1062 
1063 	case FIOGETOWN:
1064 		*(int *)data = fgetown(mpipe->pipe_sigio);
1065 		return (0);
1066 
1067 	/* This is deprecated, FIOSETOWN should be used instead. */
1068 	case TIOCSPGRP:
1069 		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1070 
1071 	/* This is deprecated, FIOGETOWN should be used instead. */
1072 	case TIOCGPGRP:
1073 		*(int *)data = -fgetown(mpipe->pipe_sigio);
1074 		return (0);
1075 
1076 	}
1077 	return (ENOTTY);
1078 }
1079 
1080 int
1081 pipe_poll(fp, events, cred, td)
1082 	struct file *fp;
1083 	int events;
1084 	struct ucred *cred;
1085 	struct thread *td;
1086 {
1087 	struct pipe *rpipe = (struct pipe *)fp->f_data;
1088 	struct pipe *wpipe;
1089 	int revents = 0;
1090 
1091 	wpipe = rpipe->pipe_peer;
1092 	if (events & (POLLIN | POLLRDNORM))
1093 		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1094 		    (rpipe->pipe_buffer.cnt > 0) ||
1095 		    (rpipe->pipe_state & PIPE_EOF))
1096 			revents |= events & (POLLIN | POLLRDNORM);
1097 
1098 	if (events & (POLLOUT | POLLWRNORM))
1099 		if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1100 		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1101 		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1102 			revents |= events & (POLLOUT | POLLWRNORM);
1103 
1104 	if ((rpipe->pipe_state & PIPE_EOF) ||
1105 	    (wpipe == NULL) ||
1106 	    (wpipe->pipe_state & PIPE_EOF))
1107 		revents |= POLLHUP;
1108 
1109 	if (revents == 0) {
1110 		if (events & (POLLIN | POLLRDNORM)) {
1111 			selrecord(td, &rpipe->pipe_sel);
1112 			rpipe->pipe_state |= PIPE_SEL;
1113 		}
1114 
1115 		if (events & (POLLOUT | POLLWRNORM)) {
1116 			selrecord(td, &wpipe->pipe_sel);
1117 			wpipe->pipe_state |= PIPE_SEL;
1118 		}
1119 	}
1120 
1121 	return (revents);
1122 }
1123 
1124 static int
1125 pipe_stat(fp, ub, td)
1126 	struct file *fp;
1127 	struct stat *ub;
1128 	struct thread *td;
1129 {
1130 	struct pipe *pipe = (struct pipe *)fp->f_data;
1131 
1132 	bzero((caddr_t)ub, sizeof(*ub));
1133 	ub->st_mode = S_IFIFO;
1134 	ub->st_blksize = pipe->pipe_buffer.size;
1135 	ub->st_size = pipe->pipe_buffer.cnt;
1136 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1137 	ub->st_atimespec = pipe->pipe_atime;
1138 	ub->st_mtimespec = pipe->pipe_mtime;
1139 	ub->st_ctimespec = pipe->pipe_ctime;
1140 	ub->st_uid = fp->f_cred->cr_uid;
1141 	ub->st_gid = fp->f_cred->cr_gid;
1142 	/*
1143 	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1144 	 * XXX (st_dev, st_ino) should be unique.
1145 	 */
1146 	return (0);
1147 }
1148 
1149 /* ARGSUSED */
1150 static int
1151 pipe_close(fp, td)
1152 	struct file *fp;
1153 	struct thread *td;
1154 {
1155 	struct pipe *cpipe = (struct pipe *)fp->f_data;
1156 
1157 	fp->f_ops = &badfileops;
1158 	fp->f_data = NULL;
1159 	funsetown(cpipe->pipe_sigio);
1160 	pipeclose(cpipe);
1161 	return (0);
1162 }
1163 
1164 static void
1165 pipe_free_kmem(cpipe)
1166 	struct pipe *cpipe;
1167 {
1168 	GIANT_REQUIRED;
1169 
1170 	if (cpipe->pipe_buffer.buffer != NULL) {
1171 		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1172 			--nbigpipe;
1173 		amountpipekva -= cpipe->pipe_buffer.size;
1174 		kmem_free(kernel_map,
1175 			(vm_offset_t)cpipe->pipe_buffer.buffer,
1176 			cpipe->pipe_buffer.size);
1177 		cpipe->pipe_buffer.buffer = NULL;
1178 	}
1179 #ifndef PIPE_NODIRECT
1180 	if (cpipe->pipe_map.kva != NULL) {
1181 		amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1182 		kmem_free(kernel_map,
1183 			cpipe->pipe_map.kva,
1184 			cpipe->pipe_buffer.size + PAGE_SIZE);
1185 		cpipe->pipe_map.cnt = 0;
1186 		cpipe->pipe_map.kva = 0;
1187 		cpipe->pipe_map.pos = 0;
1188 		cpipe->pipe_map.npages = 0;
1189 	}
1190 #endif
1191 }
1192 
1193 /*
1194  * shutdown the pipe
1195  */
1196 static void
1197 pipeclose(cpipe)
1198 	struct pipe *cpipe;
1199 {
1200 	struct pipe *ppipe;
1201 
1202 	if (cpipe) {
1203 
1204 		pipeselwakeup(cpipe);
1205 
1206 		/*
1207 		 * If the other side is blocked, wake it up saying that
1208 		 * we want to close it down.
1209 		 */
1210 		while (cpipe->pipe_busy) {
1211 			wakeup(cpipe);
1212 			cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
1213 			tsleep(cpipe, PRIBIO, "pipecl", 0);
1214 		}
1215 
1216 		/*
1217 		 * Disconnect from peer
1218 		 */
1219 		if ((ppipe = cpipe->pipe_peer) != NULL) {
1220 			pipeselwakeup(ppipe);
1221 
1222 			ppipe->pipe_state |= PIPE_EOF;
1223 			wakeup(ppipe);
1224 			KNOTE(&ppipe->pipe_sel.si_note, 0);
1225 			ppipe->pipe_peer = NULL;
1226 		}
1227 		/*
1228 		 * free resources
1229 		 */
1230 		pipe_free_kmem(cpipe);
1231 		zfree(pipe_zone, cpipe);
1232 	}
1233 }
1234 
1235 /*ARGSUSED*/
1236 static int
1237 pipe_kqfilter(struct file *fp, struct knote *kn)
1238 {
1239 	struct pipe *cpipe = (struct pipe *)kn->kn_fp->f_data;
1240 
1241 	switch (kn->kn_filter) {
1242 	case EVFILT_READ:
1243 		kn->kn_fop = &pipe_rfiltops;
1244 		break;
1245 	case EVFILT_WRITE:
1246 		kn->kn_fop = &pipe_wfiltops;
1247 		cpipe = cpipe->pipe_peer;
1248 		break;
1249 	default:
1250 		return (1);
1251 	}
1252 	kn->kn_hook = (caddr_t)cpipe;
1253 
1254 	SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext);
1255 	return (0);
1256 }
1257 
1258 static void
1259 filt_pipedetach(struct knote *kn)
1260 {
1261 	struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1262 
1263 	SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1264 }
1265 
1266 /*ARGSUSED*/
1267 static int
1268 filt_piperead(struct knote *kn, long hint)
1269 {
1270 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1271 	struct pipe *wpipe = rpipe->pipe_peer;
1272 
1273 	kn->kn_data = rpipe->pipe_buffer.cnt;
1274 	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1275 		kn->kn_data = rpipe->pipe_map.cnt;
1276 
1277 	if ((rpipe->pipe_state & PIPE_EOF) ||
1278 	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1279 		kn->kn_flags |= EV_EOF;
1280 		return (1);
1281 	}
1282 	return (kn->kn_data > 0);
1283 }
1284 
1285 /*ARGSUSED*/
1286 static int
1287 filt_pipewrite(struct knote *kn, long hint)
1288 {
1289 	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1290 	struct pipe *wpipe = rpipe->pipe_peer;
1291 
1292 	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1293 		kn->kn_data = 0;
1294 		kn->kn_flags |= EV_EOF;
1295 		return (1);
1296 	}
1297 	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1298 	if (wpipe->pipe_state & PIPE_DIRECTW)
1299 		kn->kn_data = 0;
1300 
1301 	return (kn->kn_data >= PIPE_BUF);
1302 }
1303