xref: /freebsd/sys/kern/sys_pipe.c (revision dca5129987a1ec4da4a2627b0a4e74a4eaebc5c9)
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. This work was done expressly for inclusion into FreeBSD.  Other use
17  *    is allowed if this notation is included.
18  * 5. Modifications may be freely made to this file if the above conditions
19  *    are met.
20  *
21  * $Id: sys_pipe.c,v 1.2 1996/01/29 02:57:33 dyson Exp $
22  */
23 
24 #ifndef OLD_PIPE
25 
26 /*
27  * This file contains a high-performance replacement for the socket-based
28  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
29  * all features of sockets, but does do everything that pipes normally
30  * do.
31  */
32 
33 /*
34  * This code has two modes of operation, a small write mode and a large
35  * write mode.  The small write mode acts like conventional pipes with
36  * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
37  * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
38  * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
39  * the receiving process can copy it directly from the pages in the sending
40  * process.
41  *
42  * If the sending process receives a signal, it is possible that it will
43  * go away, and certainly it's address space can change, because control
44  * is returned back to the user-mode side.  In that case, the pipe code
45  * arranges to copy the buffer supplied by the user process, to a pageable
46  * kernel buffer, and the receiving process will grab the data from the
47  * pageable kernel buffer.  Since signals don't happen all that often,
48  * the copy operation is normally eliminated.
49  *
50  * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
51  * happen for small transfers so that the system will not spend all of
52  * it's time context switching.  PIPE_SIZE is constrained by the
53  * amount of kernel virtual memory.
54  */
55 
56 #include <sys/param.h>
57 #include <sys/systm.h>
58 #include <sys/proc.h>
59 #include <sys/file.h>
60 #include <sys/protosw.h>
61 #include <sys/stat.h>
62 #include <sys/filedesc.h>
63 #include <sys/malloc.h>
64 #include <sys/ioctl.h>
65 #include <sys/stat.h>
66 #include <sys/select.h>
67 #include <sys/signalvar.h>
68 #include <sys/errno.h>
69 #include <sys/queue.h>
70 #include <sys/vmmeter.h>
71 #include <sys/kernel.h>
72 #include <sys/sysproto.h>
73 #include <sys/pipe.h>
74 
75 #include <vm/vm.h>
76 #include <vm/vm_prot.h>
77 #include <vm/vm_param.h>
78 #include <vm/lock.h>
79 #include <vm/vm_object.h>
80 #include <vm/vm_kern.h>
81 #include <vm/vm_extern.h>
82 #include <vm/pmap.h>
83 #include <vm/vm_map.h>
84 #include <vm/vm_page.h>
85 
86 static int pipe_read __P((struct file *fp, struct uio *uio,
87 		struct ucred *cred));
88 static int pipe_write __P((struct file *fp, struct uio *uio,
89 		struct ucred *cred));
90 static int pipe_close __P((struct file *fp, struct proc *p));
91 static int pipe_select __P((struct file *fp, int which, struct proc *p));
92 static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
93 
94 static struct fileops pipeops =
95     { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
96 
97 /*
98  * Default pipe buffer size(s), this can be kind-of large now because pipe
99  * space is pageable.  The pipe code will try to maintain locality of
100  * reference for performance reasons, so small amounts of outstanding I/O
101  * will not wipe the cache.
102  */
103 #define MINPIPESIZE (PIPE_SIZE/3)
104 #define MAXPIPESIZE (2*PIPE_SIZE/3)
105 
106 /*
107  * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
108  * is there so that on large systems, we don't exhaust it.
109  */
110 #define MAXPIPEKVA (8*1024*1024)
111 
112 /*
113  * Limit for direct transfers, we cannot, of course limit
114  * the amount of kva for pipes in general though.
115  */
116 #define LIMITPIPEKVA (16*1024*1024)
117 int amountpipekva;
118 
119 static void pipeclose __P((struct pipe *cpipe));
120 static void pipebufferinit __P((struct pipe *cpipe));
121 static void pipeinit __P((struct pipe *cpipe));
122 static __inline int pipelock __P((struct pipe *cpipe, int catch));
123 static __inline void pipeunlock __P((struct pipe *cpipe));
124 static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
125 static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
126 static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
127 static void pipe_clone_write_buffer __P((struct pipe *wpipe));
128 static void pipe_mark_pages_clean __P((struct pipe *cpipe));
129 static int pipewrite __P((struct pipe *wpipe, struct uio *uio, int nbio));
130 static void pipespace __P((struct pipe *cpipe));
131 
132 /*
133  * The pipe system call for the DTYPE_PIPE type of pipes
134  */
135 
136 /* ARGSUSED */
137 int
138 pipe(p, uap, retval)
139 	struct proc *p;
140 	struct pipe_args /* {
141 		int	dummy;
142 	} */ *uap;
143 	int retval[];
144 {
145 	register struct filedesc *fdp = p->p_fd;
146 	struct file *rf, *wf;
147 	struct pipe *rpipe, *wpipe;
148 	int fd, error;
149 
150 	rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
151 	pipeinit(rpipe);
152 	rpipe->pipe_state |= PIPE_DIRECTOK;
153 	wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
154 	pipeinit(wpipe);
155 	wpipe->pipe_state |= PIPE_DIRECTOK;
156 
157 	error = falloc(p, &rf, &fd);
158 	if (error)
159 		goto free2;
160 	retval[0] = fd;
161 	rf->f_flag = FREAD | FWRITE;
162 	rf->f_type = DTYPE_PIPE;
163 	rf->f_ops = &pipeops;
164 	rf->f_data = (caddr_t)rpipe;
165 	error = falloc(p, &wf, &fd);
166 	if (error)
167 		goto free3;
168 	wf->f_flag = FREAD | FWRITE;
169 	wf->f_type = DTYPE_PIPE;
170 	wf->f_ops = &pipeops;
171 	wf->f_data = (caddr_t)wpipe;
172 	retval[1] = fd;
173 
174 	rpipe->pipe_peer = wpipe;
175 	wpipe->pipe_peer = rpipe;
176 
177 	return (0);
178 free3:
179 	ffree(rf);
180 	fdp->fd_ofiles[retval[0]] = 0;
181 free2:
182 	(void)pipeclose(wpipe);
183 free1:
184 	(void)pipeclose(rpipe);
185 	return (error);
186 }
187 
188 /*
189  * Allocate kva for pipe circular buffer, the space is pageable
190  */
191 static void
192 pipespace(cpipe)
193 	struct pipe *cpipe;
194 {
195 	int npages, error;
196 
197 	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
198 	/*
199 	 * Create an object, I don't like the idea of paging to/from
200 	 * kernel_object.
201 	 */
202 	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
203 	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
204 
205 	/*
206 	 * Insert the object into the kernel map, and allocate kva for it.
207 	 * The map entry is, by default, pageable.
208 	 */
209 	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
210 		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
211 		cpipe->pipe_buffer.size, 1,
212 		VM_PROT_ALL, VM_PROT_ALL, 0);
213 
214 	if (error != KERN_SUCCESS)
215 		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
216 	amountpipekva += cpipe->pipe_buffer.size;
217 }
218 
219 /*
220  * initialize and allocate VM and memory for pipe
221  */
222 static void
223 pipeinit(cpipe)
224 	struct pipe *cpipe;
225 {
226 
227 	cpipe->pipe_buffer.in = 0;
228 	cpipe->pipe_buffer.out = 0;
229 	cpipe->pipe_buffer.cnt = 0;
230 	cpipe->pipe_buffer.size = PIPE_SIZE;
231 	/* Buffer kva gets dynamically allocated */
232 	cpipe->pipe_buffer.buffer = NULL;
233 
234 	cpipe->pipe_state = 0;
235 	cpipe->pipe_peer = NULL;
236 	cpipe->pipe_busy = 0;
237 	cpipe->pipe_ctime = time;
238 	cpipe->pipe_atime = time;
239 	cpipe->pipe_mtime = time;
240 	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
241 
242 	/*
243 	 * pipe data structure initializations to support direct pipe I/O
244 	 */
245 	cpipe->pipe_map.cnt = 0;
246 	cpipe->pipe_map.kva = 0;
247 	cpipe->pipe_map.pos = 0;
248 	cpipe->pipe_map.npages = 0;
249 }
250 
251 
252 /*
253  * lock a pipe for I/O, blocking other access
254  */
255 static __inline int
256 pipelock(cpipe, catch)
257 	struct pipe *cpipe;
258 	int catch;
259 {
260 	int error;
261 	while (cpipe->pipe_state & PIPE_LOCK) {
262 		cpipe->pipe_state |= PIPE_LWANT;
263 		if (error = tsleep( &cpipe->pipe_state,
264 			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
265 			return error;
266 		}
267 	}
268 	cpipe->pipe_state |= PIPE_LOCK;
269 	return 0;
270 }
271 
272 /*
273  * unlock a pipe I/O lock
274  */
275 static __inline void
276 pipeunlock(cpipe)
277 	struct pipe *cpipe;
278 {
279 	cpipe->pipe_state &= ~PIPE_LOCK;
280 	if (cpipe->pipe_state & PIPE_LWANT) {
281 		cpipe->pipe_state &= ~PIPE_LWANT;
282 		wakeup(&cpipe->pipe_state);
283 	}
284 	return;
285 }
286 
287 #if 0
288 static void
289 pipe_mark_pages_clean(cpipe)
290 	struct pipe *cpipe;
291 {
292 	vm_size_t off;
293 	vm_page_t m;
294 
295 	for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) {
296 		m = vm_page_lookup(cpipe->pipe_buffer.object, off);
297 		if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) {
298 			m->dirty = 0;
299 			pmap_clear_modify(VM_PAGE_TO_PHYS(m));
300 		}
301 	}
302 }
303 #endif
304 
305 /* ARGSUSED */
306 static int
307 pipe_read(fp, uio, cred)
308 	struct file *fp;
309 	struct uio *uio;
310 	struct ucred *cred;
311 {
312 
313 	struct pipe *rpipe = (struct pipe *) fp->f_data;
314 	int error = 0;
315 	int nread = 0;
316 	int size;
317 
318 	++rpipe->pipe_busy;
319 	while (uio->uio_resid) {
320 		/*
321 		 * normal pipe buffer receive
322 		 */
323 		if (rpipe->pipe_buffer.cnt > 0) {
324 			int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
325 			if (size > rpipe->pipe_buffer.cnt)
326 				size = rpipe->pipe_buffer.cnt;
327 			if (size > uio->uio_resid)
328 				size = uio->uio_resid;
329 			if ((error = pipelock(rpipe,1)) == 0) {
330 				error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
331 					size, uio);
332 				pipeunlock(rpipe);
333 			}
334 			if (error) {
335 				break;
336 			}
337 			rpipe->pipe_buffer.out += size;
338 			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
339 				rpipe->pipe_buffer.out = 0;
340 
341 			rpipe->pipe_buffer.cnt -= size;
342 			nread += size;
343 			rpipe->pipe_atime = time;
344 		/*
345 		 * Direct copy, bypassing a kernel buffer.
346 		 */
347 		} else if ((size = rpipe->pipe_map.cnt) &&
348 			(rpipe->pipe_state & PIPE_DIRECTW)) {
349 			caddr_t va;
350 			if (size > uio->uio_resid)
351 				size = uio->uio_resid;
352 			if ((error = pipelock(rpipe,1)) == 0) {
353 				va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
354 				error = uiomove(va, size, uio);
355 				pipeunlock(rpipe);
356 			}
357 			if (error)
358 				break;
359 			nread += size;
360 			rpipe->pipe_atime = time;
361 			rpipe->pipe_map.pos += size;
362 			rpipe->pipe_map.cnt -= size;
363 			if (rpipe->pipe_map.cnt == 0) {
364 				rpipe->pipe_state &= ~PIPE_DIRECTW;
365 				wakeup(rpipe);
366 			}
367 		} else {
368 			/*
369 			 * detect EOF condition
370 			 */
371 			if (rpipe->pipe_state & PIPE_EOF) {
372 				break;
373 			}
374 			/*
375 			 * If the "write-side" has been blocked, wake it up now.
376 			 */
377 			if (rpipe->pipe_state & PIPE_WANTW) {
378 				rpipe->pipe_state &= ~PIPE_WANTW;
379 				wakeup(rpipe);
380 			}
381 			if (nread > 0)
382 				break;
383 			if (rpipe->pipe_state & PIPE_NBIO) {
384 				error = EAGAIN;
385 				break;
386 			}
387 
388 			/*
389 			 * If there is no more to read in the pipe, reset
390 			 * it's pointers to the beginning.  This improves
391 			 * cache hit stats.
392 			 */
393 
394 			if ((error = pipelock(rpipe,1)) == 0) {
395 				if (rpipe->pipe_buffer.cnt == 0) {
396 					rpipe->pipe_buffer.in = 0;
397 					rpipe->pipe_buffer.out = 0;
398 				}
399 				pipeunlock(rpipe);
400 			} else {
401 				break;
402 			}
403 			rpipe->pipe_state |= PIPE_WANTR;
404 			if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
405 				break;
406 			}
407 		}
408 	}
409 
410 	--rpipe->pipe_busy;
411 	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
412 		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
413 		wakeup(rpipe);
414 	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
415 		/*
416 		 * If there is no more to read in the pipe, reset
417 		 * it's pointers to the beginning.  This improves
418 		 * cache hit stats.
419 		 */
420 		if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
421 			if (rpipe->pipe_buffer.cnt == 0) {
422 #if 0
423 				pipe_mark_pages_clean(rpipe);
424 #endif
425 				rpipe->pipe_buffer.in = 0;
426 				rpipe->pipe_buffer.out = 0;
427 			}
428 			pipeunlock(rpipe);
429 		}
430 
431 		/*
432 		 * If the "write-side" has been blocked, wake it up now.
433 		 */
434 		if (rpipe->pipe_state & PIPE_WANTW) {
435 			rpipe->pipe_state &= ~PIPE_WANTW;
436 			wakeup(rpipe);
437 		}
438 	}
439 	if (rpipe->pipe_state & PIPE_SEL) {
440 		rpipe->pipe_state &= ~PIPE_SEL;
441 		selwakeup(&rpipe->pipe_sel);
442 	}
443 	return error;
444 }
445 
446 /*
447  * Map the sending processes' buffer into kernel space and wire it.
448  * This is similar to a physical write operation.
449  */
450 static int
451 pipe_build_write_buffer(wpipe, uio)
452 	struct pipe *wpipe;
453 	struct uio *uio;
454 {
455 	int size;
456 	int i;
457 	vm_offset_t addr, endaddr, paddr;
458 
459 	size = uio->uio_iov->iov_len;
460 	if (size > wpipe->pipe_buffer.size)
461 		size = wpipe->pipe_buffer.size;
462 
463 	endaddr = round_page(uio->uio_iov->iov_base + size);
464 	for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
465 		addr < endaddr;
466 		addr += PAGE_SIZE, i+=1) {
467 
468 		vm_page_t m;
469 
470 		vm_fault_quick( (caddr_t) addr, VM_PROT_READ);
471 		paddr = pmap_kextract(addr);
472 		if (!paddr) {
473 			int j;
474 			for(j=0;j<i;j++)
475 				vm_page_unwire(wpipe->pipe_map.ms[j]);
476 			return EFAULT;
477 		}
478 
479 		m = PHYS_TO_VM_PAGE(paddr);
480 		vm_page_wire(m);
481 		wpipe->pipe_map.ms[i] = m;
482 	}
483 
484 /*
485  * set up the control block
486  */
487 	wpipe->pipe_map.npages = i;
488 	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
489 	wpipe->pipe_map.cnt = size;
490 
491 /*
492  * and map the buffer
493  */
494 	if (wpipe->pipe_map.kva == 0) {
495 		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
496 			wpipe->pipe_buffer.size);
497 		amountpipekva += wpipe->pipe_buffer.size;
498 	}
499 	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
500 		wpipe->pipe_map.npages);
501 
502 /*
503  * and update the uio data
504  */
505 
506 	uio->uio_iov->iov_len -= size;
507 	uio->uio_iov->iov_base += size;
508 	if (uio->uio_iov->iov_len == 0)
509 		uio->uio_iov++;
510 	uio->uio_resid -= size;
511 	uio->uio_offset += size;
512 	return 0;
513 }
514 
515 /*
516  * unmap and unwire the process buffer
517  */
518 static void
519 pipe_destroy_write_buffer(wpipe)
520 struct pipe *wpipe;
521 {
522 	int i;
523 	pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
524 
525 	if (wpipe->pipe_map.kva) {
526 		if (amountpipekva > MAXPIPEKVA) {
527 			vm_offset_t kva = wpipe->pipe_map.kva;
528 			wpipe->pipe_map.kva = 0;
529 			kmem_free(kernel_map, kva,
530 				wpipe->pipe_buffer.size);
531 			amountpipekva -= wpipe->pipe_buffer.size;
532 		}
533 	}
534 	for (i=0;i<wpipe->pipe_map.npages;i++)
535 		vm_page_unwire(wpipe->pipe_map.ms[i]);
536 }
537 
538 /*
539  * In the case of a signal, the writing process might go away.  This
540  * code copies the data into the circular buffer so that the source
541  * pages can be freed without loss of data.
542  */
543 static void
544 pipe_clone_write_buffer(wpipe)
545 struct pipe *wpipe;
546 {
547 	int size;
548 	int pos;
549 
550 	size = wpipe->pipe_map.cnt;
551 	pos = wpipe->pipe_map.pos;
552 	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
553 			(caddr_t) wpipe->pipe_buffer.buffer,
554 			size);
555 
556 	wpipe->pipe_buffer.in = size;
557 	wpipe->pipe_buffer.out = 0;
558 	wpipe->pipe_buffer.cnt = size;
559 	wpipe->pipe_state &= ~PIPE_DIRECTW;
560 
561 	pipe_destroy_write_buffer(wpipe);
562 }
563 
564 /*
565  * This implements the pipe buffer write mechanism.  Note that only
566  * a direct write OR a normal pipe write can be pending at any given time.
567  * If there are any characters in the pipe buffer, the direct write will
568  * be deferred until the receiving process grabs all of the bytes from
569  * the pipe buffer.  Then the direct mapping write is set-up.
570  */
571 static int
572 pipe_direct_write(wpipe, uio)
573 	struct pipe *wpipe;
574 	struct uio *uio;
575 {
576 	int error;
577 	while (wpipe->pipe_state & PIPE_DIRECTW) {
578 		error = tsleep(wpipe,
579 				PRIBIO|PCATCH, "pipdww", 0);
580 		if (error || (wpipe->pipe_state & PIPE_EOF))
581 			goto error1;
582 	}
583 	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
584 	wpipe->pipe_state |= PIPE_DIRECTW;
585 	while (wpipe->pipe_buffer.cnt > 0) {
586 		error = tsleep(wpipe,
587 				PRIBIO|PCATCH, "pipdwc", 0);
588 		if (error || (wpipe->pipe_state & PIPE_EOF)) {
589 			wpipe->pipe_state &= ~PIPE_DIRECTW;
590 			if (error == 0)
591 				error = EPIPE;
592 			goto error1;
593 		}
594 	}
595 
596 	error = pipe_build_write_buffer(wpipe, uio);
597 	if (error) {
598 		wpipe->pipe_state &= ~PIPE_DIRECTW;
599 		goto error1;
600 	}
601 
602 	if (wpipe->pipe_state & PIPE_WANTR) {
603 		wpipe->pipe_state &= ~PIPE_WANTR;
604 		wakeup(wpipe);
605 	}
606 
607 	error = 0;
608 	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
609 		if (wpipe->pipe_state & PIPE_EOF) {
610 			pipelock(wpipe, 0);
611 			pipe_destroy_write_buffer(wpipe);
612 			pipeunlock(wpipe);
613 			wakeup(wpipe);
614 			return EPIPE;
615 		}
616 		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
617 	}
618 
619 	pipelock(wpipe,0);
620 	if (wpipe->pipe_state & PIPE_DIRECTW) {
621 		/*
622 		 * this bit of trickery substitutes a kernel buffer for
623 		 * the process that might be going away.
624 		 */
625 		pipe_clone_write_buffer(wpipe);
626 	} else {
627 		pipe_destroy_write_buffer(wpipe);
628 	}
629 	pipeunlock(wpipe);
630 	return error;
631 
632 error1:
633 	wakeup(wpipe);
634 	return error;
635 }
636 
637 static __inline int
638 pipewrite(wpipe, uio, nbio)
639 	struct pipe *wpipe;
640 	struct uio *uio;
641 	int nbio;
642 {
643 	int error = 0;
644 
645 	/*
646 	 * detect loss of pipe read side, issue SIGPIPE if lost.
647 	 */
648 	if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) {
649 		return EPIPE;
650 	}
651 
652 	if( wpipe->pipe_buffer.buffer == NULL) {
653 		if ((error = pipelock(wpipe,1)) == 0) {
654 			pipespace(wpipe);
655 			pipeunlock(wpipe);
656 		} else {
657 			return error;
658 		}
659 	}
660 
661 	++wpipe->pipe_busy;
662 	while (uio->uio_resid) {
663 		int space;
664 		/*
665 		 * If the transfer is large, we can gain performance if
666 		 * we do process-to-process copies directly.
667 		 */
668 		if ((amountpipekva < LIMITPIPEKVA) &&
669 			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
670 			error = pipe_direct_write( wpipe, uio);
671 			if (error) {
672 				break;
673 			}
674 			continue;
675 		}
676 
677 		/*
678 		 * Pipe buffered writes cannot be coincidental with
679 		 * direct writes.  We wait until the currently executing
680 		 * direct write is completed before we start filling the
681 		 * pipe buffer.
682 		 */
683 	retrywrite:
684 		while (wpipe->pipe_state & PIPE_DIRECTW) {
685 			error = tsleep(wpipe,
686 					PRIBIO|PCATCH, "pipbww", 0);
687 			if (error)
688 				break;
689 		}
690 
691 		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
692 
693 		/*
694 		 * We must afford contiguous writes on buffers of size
695 		 * PIPE_BUF or less.
696 		 */
697 		if ((space > 0) &&
698 			((uio->uio_resid > PIPE_BUF) || (uio->uio_resid <= space))) {
699 			int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
700 			if (size > space)
701 				size = space;
702 			if (size > uio->uio_resid)
703 				size = uio->uio_resid;
704 			if ((error = pipelock(wpipe,1)) == 0) {
705 				/*
706 				 * It is possible for a direct write to
707 				 * slip in on us... handle it here...
708 				 */
709 				if (wpipe->pipe_state & PIPE_DIRECTW) {
710 					pipeunlock(wpipe);
711 					goto retrywrite;
712 				}
713 				error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
714 					size, uio);
715 				pipeunlock(wpipe);
716 			}
717 			if (error)
718 				break;
719 
720 			wpipe->pipe_buffer.in += size;
721 			if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
722 				wpipe->pipe_buffer.in = 0;
723 
724 			wpipe->pipe_buffer.cnt += size;
725 			wpipe->pipe_mtime = time;
726 		} else {
727 			/*
728 			 * If the "read-side" has been blocked, wake it up now.
729 			 */
730 			if (wpipe->pipe_state & PIPE_WANTR) {
731 				wpipe->pipe_state &= ~PIPE_WANTR;
732 				wakeup(wpipe);
733 			}
734 			/*
735 			 * don't block on non-blocking I/O
736 			 */
737 			if (nbio) {
738 				error = EAGAIN;
739 				break;
740 			}
741 
742 			wpipe->pipe_state |= PIPE_WANTW;
743 			if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
744 				break;
745 			}
746 			/*
747 			 * If read side wants to go away, we just issue a signal
748 			 * to ourselves.
749 			 */
750 			if (wpipe->pipe_state & PIPE_EOF) {
751 				error = EPIPE;
752 				break;
753 			}
754 		}
755 	}
756 
757 	if ((wpipe->pipe_busy == 0) &&
758 		(wpipe->pipe_state & PIPE_WANT)) {
759 		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
760 		wakeup(wpipe);
761 	} else if (wpipe->pipe_buffer.cnt > 0) {
762 		/*
763 		 * If we have put any characters in the buffer, we wake up
764 		 * the reader.
765 		 */
766 		if (wpipe->pipe_state & PIPE_WANTR) {
767 			wpipe->pipe_state &= ~PIPE_WANTR;
768 			wakeup(wpipe);
769 		}
770 	}
771 
772 	/*
773 	 * Don't return EPIPE if I/O was successful
774 	 */
775 	if ((wpipe->pipe_buffer.cnt == 0) &&
776 		(uio->uio_resid == 0) &&
777 		(error == EPIPE))
778 		error = 0;
779 
780 	if (wpipe->pipe_state & PIPE_SEL) {
781 		wpipe->pipe_state &= ~PIPE_SEL;
782 		selwakeup(&wpipe->pipe_sel);
783 	}
784 
785 	--wpipe->pipe_busy;
786 	return error;
787 }
788 
789 /* ARGSUSED */
790 static int
791 pipe_write(fp, uio, cred)
792 	struct file *fp;
793 	struct uio *uio;
794 	struct ucred *cred;
795 {
796 	struct pipe *rpipe = (struct pipe *) fp->f_data;
797 	struct pipe *wpipe = rpipe->pipe_peer;
798 	return pipewrite(wpipe, uio, (rpipe->pipe_state & PIPE_NBIO)?1:0);
799 }
800 
801 /*
802  * we implement a very minimal set of ioctls for compatibility with sockets.
803  */
804 int
805 pipe_ioctl(fp, cmd, data, p)
806 	struct file *fp;
807 	int cmd;
808 	register caddr_t data;
809 	struct proc *p;
810 {
811 	register struct pipe *mpipe = (struct pipe *)fp->f_data;
812 
813 	switch (cmd) {
814 
815 	case FIONBIO:
816 		if (*(int *)data)
817 			mpipe->pipe_state |= PIPE_NBIO;
818 		else
819 			mpipe->pipe_state &= ~PIPE_NBIO;
820 		return (0);
821 
822 	case FIOASYNC:
823 		if (*(int *)data) {
824 			mpipe->pipe_state |= PIPE_ASYNC;
825 		} else {
826 			mpipe->pipe_state &= ~PIPE_ASYNC;
827 		}
828 		return (0);
829 
830 	case FIONREAD:
831 		*(int *)data = mpipe->pipe_buffer.cnt;
832 		return (0);
833 
834 	case SIOCSPGRP:
835 		mpipe->pipe_pgid = *(int *)data;
836 		return (0);
837 
838 	case SIOCGPGRP:
839 		*(int *)data = mpipe->pipe_pgid;
840 		return (0);
841 
842 	}
843 	return ENOSYS;
844 }
845 
846 int
847 pipe_select(fp, which, p)
848 	struct file *fp;
849 	int which;
850 	struct proc *p;
851 {
852 	register struct pipe *rpipe = (struct pipe *)fp->f_data;
853 	struct pipe *wpipe;
854 	register int s = splnet();
855 
856 	wpipe = rpipe->pipe_peer;
857 	switch (which) {
858 
859 	case FREAD:
860 		if (rpipe->pipe_buffer.cnt > 0 ||
861 			(rpipe->pipe_state & PIPE_EOF)) {
862 			splx(s);
863 			return (1);
864 		}
865 		selrecord(p, &rpipe->pipe_sel);
866 		rpipe->pipe_state |= PIPE_SEL;
867 		break;
868 
869 	case FWRITE:
870 		if ((wpipe == NULL) ||
871 			(wpipe->pipe_state & PIPE_EOF) ||
872 			((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
873 			splx(s);
874 			return (1);
875 		}
876 		selrecord(p, &wpipe->pipe_sel);
877 		wpipe->pipe_state |= PIPE_SEL;
878 		break;
879 
880 	case 0:
881 		if ((rpipe->pipe_state & PIPE_EOF) ||
882 			(wpipe == NULL) ||
883 			(wpipe->pipe_state & PIPE_EOF)) {
884 			splx(s);
885 			return (1);
886 		}
887 
888 		selrecord(p, &rpipe->pipe_sel);
889 		rpipe->pipe_state |= PIPE_SEL;
890 		break;
891 	}
892 	splx(s);
893 	return (0);
894 }
895 
896 int
897 pipe_stat(pipe, ub)
898 	register struct pipe *pipe;
899 	register struct stat *ub;
900 {
901 	bzero((caddr_t)ub, sizeof (*ub));
902 	ub->st_mode = S_IFSOCK;
903 	ub->st_blksize = pipe->pipe_buffer.size;
904 	ub->st_size = pipe->pipe_buffer.cnt;
905 	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
906 	TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
907 	TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
908 	TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
909 	return 0;
910 }
911 
912 /* ARGSUSED */
913 static int
914 pipe_close(fp, p)
915 	struct file *fp;
916 	struct proc *p;
917 {
918 	int error = 0;
919 	struct pipe *cpipe = (struct pipe *)fp->f_data;
920 	pipeclose(cpipe);
921 	fp->f_data = NULL;
922 	return 0;
923 }
924 
925 /*
926  * shutdown the pipe
927  */
928 static void
929 pipeclose(cpipe)
930 	struct pipe *cpipe;
931 {
932 	struct pipe *ppipe;
933 	if (cpipe) {
934 
935 		if (cpipe->pipe_state & PIPE_SEL) {
936 			cpipe->pipe_state &= ~PIPE_SEL;
937 			selwakeup(&cpipe->pipe_sel);
938 		}
939 
940 		/*
941 		 * If the other side is blocked, wake it up saying that
942 		 * we want to close it down.
943 		 */
944 		while (cpipe->pipe_busy) {
945 			wakeup(cpipe);
946 			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
947 			tsleep(cpipe, PRIBIO, "pipecl", 0);
948 		}
949 
950 		/*
951 		 * Disconnect from peer
952 		 */
953 		if (ppipe = cpipe->pipe_peer) {
954 			if (ppipe->pipe_state & PIPE_SEL) {
955 				ppipe->pipe_state &= ~PIPE_SEL;
956 				selwakeup(&ppipe->pipe_sel);
957 			}
958 
959 			ppipe->pipe_state |= PIPE_EOF;
960 			wakeup(ppipe);
961 			ppipe->pipe_peer = NULL;
962 		}
963 
964 		/*
965 		 * free resources
966 		 */
967 		if (cpipe->pipe_buffer.buffer) {
968 			amountpipekva -= cpipe->pipe_buffer.size;
969 			kmem_free(kernel_map,
970 				(vm_offset_t)cpipe->pipe_buffer.buffer,
971 				cpipe->pipe_buffer.size);
972 		}
973 		if (cpipe->pipe_map.kva) {
974 			amountpipekva -= cpipe->pipe_buffer.size;
975 			kmem_free(kernel_map,
976 				cpipe->pipe_map.kva,
977 				cpipe->pipe_buffer.size);
978 		}
979 		free(cpipe, M_TEMP);
980 	}
981 }
982 #endif
983