xref: /titanic_44/usr/src/cmd/sort/common/streams.c (revision bc37da3aa8455efcf567c456746e3fb9d7f0a189)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include "streams.h"
29 
30 static const stream_ops_t invalid_ops = {
31 	NULL,
32 	NULL,
33 	NULL,
34 	NULL,
35 	NULL,
36 	NULL,
37 	NULL,
38 	NULL,
39 	NULL,
40 	NULL
41 };
42 
43 stream_t *
stream_new(int src)44 stream_new(int src)
45 {
46 	stream_t *str = safe_realloc(NULL, sizeof (stream_t));
47 
48 	stream_clear(str);
49 	stream_set(str, src);
50 
51 	return (str);
52 }
53 
54 void
stream_set(stream_t * str,flag_t flags)55 stream_set(stream_t *str, flag_t flags)
56 {
57 	if (flags & STREAM_SOURCE_MASK) {
58 		ASSERT((flags & STREAM_SOURCE_MASK) == STREAM_ARRAY ||
59 		    (flags & STREAM_SOURCE_MASK) == STREAM_SINGLE ||
60 		    (flags & STREAM_SOURCE_MASK) == STREAM_MMAP ||
61 		    (flags & STREAM_SOURCE_MASK) == STREAM_WIDE);
62 
63 		str->s_status &= ~STREAM_SOURCE_MASK;
64 		str->s_status |= flags & STREAM_SOURCE_MASK;
65 
66 		switch (flags & STREAM_SOURCE_MASK) {
67 			case STREAM_NO_SOURCE:
68 				str->s_element_size = 0;
69 				str->s_ops = invalid_ops;
70 				return;
71 			case STREAM_ARRAY:
72 				/*
73 				 * Array streams inherit element size.
74 				 */
75 				str->s_ops = stream_array_ops;
76 				break;
77 			case STREAM_MMAP:
78 				str->s_element_size = sizeof (char);
79 				str->s_ops = stream_mmap_ops;
80 				break;
81 			case STREAM_SINGLE:
82 				str->s_element_size = sizeof (char);
83 				str->s_ops = stream_stdio_ops;
84 				break;
85 			case STREAM_WIDE:
86 				str->s_element_size = sizeof (wchar_t);
87 				str->s_ops = stream_wide_ops;
88 				break;
89 			default:
90 				die(EMSG_UNKN_STREAM, str->s_status);
91 		}
92 	}
93 
94 	str->s_status |= (flags & ~STREAM_SOURCE_MASK);
95 
96 	if (str->s_status & STREAM_UNIQUE)
97 		switch (str->s_status & STREAM_SOURCE_MASK) {
98 			case STREAM_SINGLE :
99 				str->s_ops.sop_put_line =
100 				    stream_stdio_put_line_unique;
101 				break;
102 			case STREAM_WIDE :
103 				str->s_ops.sop_put_line =
104 				    stream_wide_put_line_unique;
105 				break;
106 			default :
107 				break;
108 		}
109 
110 	if (str->s_status & STREAM_INSTANT)
111 		switch (str->s_status & STREAM_SOURCE_MASK) {
112 			case STREAM_SINGLE :
113 				str->s_ops.sop_fetch =
114 				    stream_stdio_fetch_overwrite;
115 				break;
116 			case STREAM_WIDE :
117 				str->s_ops.sop_fetch =
118 				    stream_wide_fetch_overwrite;
119 				break;
120 			default :
121 				break;
122 		}
123 }
124 
125 void
stream_unset(stream_t * streamp,flag_t flags)126 stream_unset(stream_t *streamp, flag_t flags)
127 {
128 	ASSERT(!(flags & STREAM_SOURCE_MASK));
129 
130 	streamp->s_status &= ~(flags & ~STREAM_SOURCE_MASK);
131 }
132 
133 int
stream_is_primed(stream_t * streamp)134 stream_is_primed(stream_t *streamp)
135 {
136 	return (streamp->s_status & STREAM_PRIMED);
137 }
138 
139 void
stream_clear(stream_t * str)140 stream_clear(stream_t *str)
141 {
142 	(void) memset(str, 0, sizeof (stream_t));
143 }
144 
145 static void
stream_copy(stream_t * dest,stream_t * src)146 stream_copy(stream_t *dest, stream_t *src)
147 {
148 	(void) memcpy(dest, src, sizeof (stream_t));
149 }
150 
151 void
stream_stat_chain(stream_t * strp)152 stream_stat_chain(stream_t *strp)
153 {
154 	struct stat buf;
155 	stream_t *cur_strp = strp;
156 
157 	while (cur_strp != NULL) {
158 		if (cur_strp->s_status & STREAM_NOTFILE ||
159 		    cur_strp->s_status & STREAM_ARRAY) {
160 			cur_strp = cur_strp->s_next;
161 			continue;
162 		}
163 
164 		if (stat(cur_strp->s_filename, &buf) < 0)
165 			die(EMSG_STAT, cur_strp->s_filename);
166 
167 		cur_strp->s_dev = buf.st_dev;
168 		cur_strp->s_ino = buf.st_ino;
169 		cur_strp->s_filesize = buf.st_size;
170 
171 		cur_strp = cur_strp->s_next;
172 	}
173 }
174 
175 uint_t
stream_count_chain(stream_t * str)176 stream_count_chain(stream_t *str)
177 {
178 	uint_t n = 0;
179 
180 	while (str != NULL) {
181 		n++;
182 		str = str->s_next;
183 	}
184 
185 	return (n);
186 }
187 
188 int
stream_open_for_read(sort_t * S,stream_t * str)189 stream_open_for_read(sort_t *S, stream_t *str)
190 {
191 	int fd;
192 
193 	ASSERT(!(str->s_status & STREAM_OUTPUT));
194 
195 	/*
196 	 * STREAM_ARRAY streams are open by definition.
197 	 */
198 	if ((str->s_status & STREAM_SOURCE_MASK) == STREAM_ARRAY) {
199 		stream_set(str, STREAM_ARRAY | STREAM_OPEN);
200 		return (1);
201 	}
202 
203 	/*
204 	 * Set data type according to locale for input from stdin.
205 	 */
206 	if (str->s_status & STREAM_NOTFILE) {
207 		str->s_type.BF.s_fp = stdin;
208 		stream_set(str, STREAM_OPEN | (S->m_single_byte_locale ?
209 		    STREAM_SINGLE : STREAM_WIDE));
210 		return (1);
211 	}
212 
213 	ASSERT(str->s_filename);
214 
215 #ifndef DEBUG_DISALLOW_MMAP
216 	if (S->m_single_byte_locale &&
217 	    str->s_filesize > 0 &&
218 	    str->s_filesize < SSIZE_MAX) {
219 		/*
220 		 * make mmap() attempt; set s_status and return if successful
221 		 */
222 		fd = open(str->s_filename, O_RDONLY);
223 		if (fd < 0) {
224 			if (errno == EMFILE || errno == ENFILE)
225 				return (-1);
226 			else
227 				die(EMSG_OPEN, str->s_filename);
228 		}
229 		str->s_buffer = mmap(0, str->s_filesize, PROT_READ,
230 		    MAP_SHARED, fd, 0);
231 
232 		if (str->s_buffer != MAP_FAILED) {
233 			str->s_buffer_size = str->s_filesize;
234 			str->s_type.SF.s_fd = fd;
235 
236 			stream_set(str, STREAM_MMAP | STREAM_OPEN);
237 			stream_unset(str, STREAM_PRIMED);
238 			return (1);
239 		}
240 
241 		/*
242 		 * Otherwise the mmap() failed due to address space exhaustion;
243 		 * since we have already opened the file, we close it and drop
244 		 * into the normal (STDIO) case.
245 		 */
246 		(void) close(fd);
247 		str->s_buffer = NULL;
248 	}
249 #endif /* DEBUG_DISALLOW_MMAP */
250 
251 	if ((str->s_type.BF.s_fp = fopen(str->s_filename, "r")) == NULL) {
252 		if (errno == EMFILE || errno == ENFILE)
253 			return (-1);
254 		else
255 			die(EMSG_OPEN, str->s_filename);
256 	}
257 
258 	str->s_type.BF.s_vbuf = safe_realloc(NULL, STDIO_VBUF_SIZE);
259 	if (setvbuf(str->s_type.BF.s_fp, str->s_type.BF.s_vbuf, _IOFBF,
260 	    STDIO_VBUF_SIZE) != 0) {
261 		safe_free(str->s_type.BF.s_vbuf);
262 		str->s_type.BF.s_vbuf = NULL;
263 	}
264 
265 	stream_set(str, STREAM_OPEN | (S->m_single_byte_locale ? STREAM_SINGLE :
266 	    STREAM_WIDE));
267 	stream_unset(str, STREAM_PRIMED);
268 
269 	return (1);
270 }
271 
272 void
stream_set_size(stream_t * str,size_t new_size)273 stream_set_size(stream_t *str, size_t new_size)
274 {
275 	/*
276 	 * p_new_size is new_size rounded upwards to nearest multiple of
277 	 * PAGESIZE, since mmap() is going to reserve it in any case.  This
278 	 * ensures that the far end of the buffer is also aligned, such that we
279 	 * obtain aligned pointers if we choose to subtract from it.
280 	 */
281 	size_t p_new_size = (new_size + PAGESIZE) & ~(PAGESIZE - 1);
282 
283 	if (str->s_buffer_size == p_new_size)
284 		return;
285 
286 	if (str->s_buffer != NULL)
287 		(void) munmap(str->s_buffer, str->s_buffer_size);
288 
289 	if (new_size == 0) {
290 		str->s_buffer = NULL;
291 		str->s_buffer_size = 0;
292 		return;
293 	}
294 
295 	str->s_buffer = xzmap(0, p_new_size, PROT_READ | PROT_WRITE,
296 	    MAP_PRIVATE, 0);
297 
298 	if (str->s_buffer == MAP_FAILED)
299 		die(EMSG_MMAP);
300 
301 	str->s_buffer_size = p_new_size;
302 }
303 
304 void
stream_add_file_to_chain(stream_t ** str_chain,char * filename)305 stream_add_file_to_chain(stream_t **str_chain, char *filename)
306 {
307 	stream_t *str;
308 
309 	str = stream_new(STREAM_NO_SOURCE);
310 
311 	str->s_filename = filename;
312 	str->s_type.SF.s_fd = -1;
313 
314 	stream_push_to_chain(str_chain, str);
315 }
316 
317 void
stream_push_to_chain(stream_t ** str_chain,stream_t * streamp)318 stream_push_to_chain(stream_t **str_chain, stream_t *streamp)
319 {
320 	stream_t *cur_streamp = *str_chain;
321 
322 	if (cur_streamp == NULL) {
323 		*str_chain = streamp;
324 		streamp->s_next = NULL;
325 		return;
326 	}
327 
328 	while (cur_streamp->s_next != NULL)
329 		cur_streamp = cur_streamp->s_next;
330 
331 	cur_streamp->s_next = streamp;
332 	streamp->s_previous = cur_streamp;
333 	streamp->s_next = NULL;
334 }
335 
336 static void
stream_dump(stream_t * str_in,stream_t * str_out)337 stream_dump(stream_t *str_in, stream_t *str_out)
338 {
339 	ASSERT(!(str_in->s_status & STREAM_OUTPUT));
340 	ASSERT(str_out->s_status & STREAM_OUTPUT);
341 
342 	SOP_PUT_LINE(str_out, &str_in->s_current);
343 
344 	while (!SOP_EOS(str_in)) {
345 		SOP_FETCH(str_in);
346 		SOP_PUT_LINE(str_out, &str_in->s_current);
347 	}
348 }
349 
350 /*
351  * stream_push_to_temporary() with flags set to ST_CACHE merely copies the
352  * stream_t pointer onto the chain.  With flags set to ST_NOCACHE, the stream is
353  * written out to a file.  Stream pointers passed to stream_push_to_temporary()
354  * must refer to allocated objects, and not to objects created on function
355  * stacks.  Finally, if strp == NULL, stream_push_to_temporary() creates and
356  * pushes the new stream; the output stream is left open if ST_OPEN is set.
357  */
358 stream_t *
stream_push_to_temporary(stream_t ** str_chain,stream_t * streamp,int flags)359 stream_push_to_temporary(stream_t **str_chain, stream_t *streamp, int flags)
360 {
361 	stream_t *out_streamp;
362 
363 	if (flags & ST_CACHE) {
364 		ASSERT(streamp->s_status & STREAM_ARRAY);
365 		stream_set(streamp, STREAM_NOT_FREEABLE | STREAM_TEMPORARY);
366 		stream_push_to_chain(str_chain, streamp);
367 		return (streamp);
368 	}
369 
370 	out_streamp = safe_realloc(NULL, sizeof (stream_t));
371 
372 	if (streamp != NULL) {
373 		stream_copy(out_streamp, streamp);
374 		stream_unset(out_streamp, STREAM_OPEN);
375 		ASSERT(streamp->s_element_size == sizeof (char) ||
376 		    streamp->s_element_size == sizeof (wchar_t));
377 		stream_set(out_streamp,
378 		    streamp->s_element_size == 1 ? STREAM_SINGLE : STREAM_WIDE);
379 		out_streamp->s_buffer = NULL;
380 		out_streamp->s_buffer_size = 0;
381 	} else {
382 		stream_clear(out_streamp);
383 		stream_set(out_streamp, flags & ST_WIDE ? STREAM_WIDE :
384 		    STREAM_SINGLE);
385 	}
386 
387 	(void) bump_file_template();
388 	out_streamp->s_filename = strdup(get_file_template());
389 
390 	if (SOP_OPEN_FOR_WRITE(out_streamp) == -1)
391 		return (NULL);
392 
393 	stream_set(out_streamp, STREAM_TEMPORARY);
394 	stream_push_to_chain(str_chain, out_streamp);
395 
396 	if (streamp != NULL) {
397 		/*
398 		 * We reset the input stream to the beginning, and copy it in
399 		 * sequence to the output stream, freeing the raw_collate field
400 		 * as we go.
401 		 */
402 		if (SOP_PRIME(streamp) != PRIME_SUCCEEDED)
403 			die(EMSG_BADPRIME);
404 		stream_dump(streamp, out_streamp);
405 	}
406 
407 	if (!(flags & ST_OPEN)) {
408 		SOP_FREE(out_streamp);
409 		(void) SOP_CLOSE(out_streamp);
410 	}
411 
412 	/*
413 	 * Now that we've written this stream to disk, we needn't protect any
414 	 * in-memory consumer.
415 	 */
416 	if (streamp != NULL)
417 		streamp->s_consumer = NULL;
418 
419 	return (out_streamp);
420 }
421 
422 void
stream_close_all_previous(stream_t * tail_streamp)423 stream_close_all_previous(stream_t *tail_streamp)
424 {
425 	stream_t *cur_streamp;
426 
427 	ASSERT(tail_streamp != NULL);
428 
429 	cur_streamp = tail_streamp->s_previous;
430 	while (cur_streamp != NULL) {
431 		(void) SOP_FREE(cur_streamp);
432 		if (SOP_IS_CLOSABLE(cur_streamp))
433 			(void) SOP_CLOSE(cur_streamp);
434 
435 		cur_streamp = cur_streamp->s_previous;
436 	}
437 }
438 
439 void
stream_unlink_temporary(stream_t * streamp)440 stream_unlink_temporary(stream_t *streamp)
441 {
442 	if (streamp->s_status & STREAM_TEMPORARY) {
443 		(void) SOP_FREE(streamp);
444 
445 		if (streamp->s_ops.sop_unlink)
446 			(void) SOP_UNLINK(streamp);
447 	}
448 }
449 
450 /*
451  * stream_insert() takes input from src stream, converts to each line to
452  * collatable form, and places a line_rec_t in dest stream, which is of type
453  * STREAM_ARRAY.
454  */
455 int
stream_insert(sort_t * S,stream_t * src,stream_t * dest)456 stream_insert(sort_t *S, stream_t *src, stream_t *dest)
457 {
458 	ssize_t i = dest->s_type.LA.s_array_size;
459 	line_rec_t *l_series;
460 	char *l_convert = dest->s_buffer;
461 	int return_val = ST_MEM_AVAIL;
462 	int fetch_result = NEXT_LINE_COMPLETE;
463 
464 	/*
465 	 * Scan through until total bytes allowed accumulated, and return.
466 	 * Use SOP_FETCH(src) so that this works for all stream types,
467 	 * and so that we can repeat until eos.
468 	 *
469 	 * For each new line, we move back sizeof (line_rec_t) from the end of
470 	 * the array buffer, and copy into the start of the array buffer.  When
471 	 * the pointers meet, or when we exhaust the current stream, we return.
472 	 * If we have not filled the current memory allocation, we return
473 	 * ST_MEM_AVAIL, else we return ST_MEM_FILLED.
474 	 */
475 	ASSERT(stream_is_primed(src));
476 	ASSERT(dest->s_status & STREAM_ARRAY);
477 
478 	/*LINTED ALIGNMENT*/
479 	l_series = (line_rec_t *)((caddr_t)dest->s_buffer
480 	    + dest->s_buffer_size) - dest->s_type.LA.s_array_size;
481 
482 	if (dest->s_type.LA.s_array_size)
483 		l_convert = l_series->l_collate.sp +
484 		    l_series->l_collate_length + src->s_element_size;
485 
486 	/*
487 	 * current line has been set prior to entry
488 	 */
489 	src->s_current.l_collate.sp = l_convert;
490 	src->s_current.l_collate_bufsize = (caddr_t)l_series
491 	    - (caddr_t)l_convert - sizeof (line_rec_t);
492 	src->s_current.l_raw_collate.sp = NULL;
493 
494 	if (src->s_current.l_collate_bufsize <= 0)
495 		return (ST_MEM_FILLED);
496 
497 	src->s_consumer = dest;
498 
499 	while (src->s_current.l_collate_bufsize > 0 &&
500 	    (src->s_current.l_collate_length = S->m_coll_convert(
501 	    S->m_fields_head, &src->s_current, FCV_FAIL,
502 	    S->m_field_separator)) >= 0) {
503 		ASSERT((char *)l_series > l_convert);
504 		l_series--;
505 		l_convert += src->s_current.l_collate_length;
506 
507 		if ((char *)l_series <= l_convert) {
508 			__S(stats_incr_insert_filled_downward());
509 			l_series++;
510 			return_val = ST_MEM_FILLED;
511 			break;
512 		}
513 
514 		/*
515 		 * There's no collision with the lower part of the buffer, so we
516 		 * can safely begin processing the line.  In the debug case, we
517 		 * test for uninitialized data by copying a non-zero pattern.
518 		 */
519 #ifdef DEBUG
520 		memset(l_series, 0x1ff11ff1, sizeof (line_rec_t));
521 #endif
522 
523 		copy_line_rec(&src->s_current, l_series);
524 		i++;
525 
526 		if (SOP_EOS(src) ||
527 		    (fetch_result = SOP_FETCH(src)) == NEXT_LINE_INCOMPLETE)
528 			break;
529 
530 		src->s_current.l_collate.sp = l_convert;
531 		src->s_current.l_collate_bufsize = (caddr_t)l_series
532 		    - (caddr_t)l_convert - sizeof (line_rec_t);
533 		src->s_current.l_raw_collate.sp = NULL;
534 	}
535 
536 	if (fetch_result == NEXT_LINE_INCOMPLETE) {
537 		__S(stats_incr_insert_filled_input());
538 		return_val = ST_MEM_FILLED;
539 	} else if (src->s_current.l_collate_length < 0 ||
540 	    src->s_current.l_collate_bufsize <= 0) {
541 		__S(stats_incr_insert_filled_upward());
542 		return_val = ST_MEM_FILLED;
543 	}
544 
545 	if (fetch_result != NEXT_LINE_INCOMPLETE &&
546 	    src->s_current.l_collate_length < 0 &&
547 	    i == 0)
548 		/*
549 		 * There's no room for conversion of our only line; need to
550 		 * execute with larger memory.
551 		 */
552 		die(EMSG_MEMORY);
553 
554 	/*
555 	 * Set up pointer array to line records.
556 	 */
557 	if (i > dest->s_type.LA.s_array_size)
558 		dest->s_type.LA.s_array = safe_realloc(dest->s_type.LA.s_array,
559 		    sizeof (line_rec_t *) * i);
560 	dest->s_type.LA.s_array_size = i;
561 
562 	i = 0;
563 	while (i < dest->s_type.LA.s_array_size) {
564 		dest->s_type.LA.s_array[i] = l_series;
565 		l_series++;
566 		i++;
567 	}
568 
569 	/*
570 	 * LINES_ARRAY streams are always open.
571 	 */
572 	stream_set(dest, STREAM_OPEN);
573 
574 	return (return_val);
575 }
576 
577 /*
578  * stream_swap_buffer() exchanges the stream's buffer with the proffered one;
579  * s_current is not adjusted so this is safe only for STREAM_INSTANT.
580  */
581 void
stream_swap_buffer(stream_t * str,char ** buf,size_t * size)582 stream_swap_buffer(stream_t *str, char **buf, size_t *size)
583 {
584 	void *tb = *buf;
585 	size_t ts = *size;
586 
587 	*buf = str->s_buffer;
588 	*size = str->s_buffer_size;
589 
590 	str->s_buffer = tb;
591 	str->s_buffer_size = ts;
592 }
593