xref: /titanic_51/usr/src/cmd/sort/common/merge.c (revision 7c478bd95313f5f23a4c958a745db2134aa03244)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 1998-2003 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include "merge.h"
30 
31 /*
32  * External merge sort
33  *
34  *   The following code implements the merge phase of sort(1) using a heap-based
35  *   priority queue.  Fast paths for merging two files as well as outputting a
36  *   single file are provided.
37  *
38  * Memory footprint management
39  *
40  *   The N-way fan-out of the merge phase can lead to compromising memory
41  *   consumption if not constrained, so two mechanisms are used to regulate
42  *   the memory footprint during the merge phase:
43  *
44  *   1.  Single use memory advice.  Since we proceed through each merge file in
45  *       order, any line we have output is never required again--at least, not
46  *       from that input file.  Accordingly, we use the SOP_RELEASE_LINE()
47  *       operation to advise that the memory backing the raw data for the stream
48  *       up to that line is no longer of interest.  (For certain classes of
49  *       streams, this leads to an madvise(3C) call with the MADV_DONTNEED
50  *       flag.)
51  *
52  *   2.  Number of merge files.  The number of merge files is constrained based
53  *       on the amount of physical memory specified via the -S option (or deemed
54  *       available based on an inquiry of sysconf(3C) for _SC_AVPHYS_PAGES).
55  *       The number of merge files is calculated based on the average resident
56  *       size of a stream that supports the SOP_RELEASE_LINE() operation; this
57  *       number is conservative for streams that do not support this operation.
58  *       A minimum of four subfiles will always be used, resource limits
59  *       permitting.
60  *
61  * Temporary filespace footprint management
62  *
63  *   Once the merge sort has utilized a temporary file, it may be deleted at
64  *   close, as it's not used again and preserving the files until exit may
65  *   compromise sort completion when limited temporary space is available.
66  */
67 
68 static int pq_N;
69 static stream_t	**pq_queue;
70 static int (*pq_coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t);
71 
72 static ssize_t (*mg_coll_convert)(field_t *, line_rec_t *, flag_t, vchar_t);
73 
74 static int
75 prepare_output_stream(stream_t *ostrp, sort_t *S)
76 {
77 	stream_clear(ostrp);
78 	stream_unset(ostrp, STREAM_OPEN);
79 
80 	stream_set(ostrp,
81 	    (S->m_single_byte_locale ? STREAM_SINGLE : STREAM_WIDE) |
82 	    (S->m_unique_lines ? STREAM_UNIQUE : 0));
83 
84 	if (S->m_output_to_stdout) {
85 		stream_set(ostrp, STREAM_NOTFILE);
86 		ostrp->s_filename = (char *)filename_stdout;
87 	} else
88 		ostrp->s_filename = S->m_output_filename;
89 
90 	return (SOP_OPEN_FOR_WRITE(ostrp));
91 }
92 
93 static void
94 merge_one_stream(field_t *fields_chain, stream_t *strp, stream_t *outstrp,
95     vchar_t field_separator)
96 {
97 	size_t element_size = strp->s_element_size;
98 	size_t initial_size = INITIAL_COLLATION_SIZE * element_size;
99 
100 	if (strp->s_status & STREAM_SINGLE || strp->s_status & STREAM_WIDE)
101 		stream_set(strp, STREAM_INSTANT);
102 
103 	if (SOP_PRIME(strp) == PRIME_SUCCEEDED) {
104 		strp->s_current.l_collate_bufsize = initial_size;
105 		strp->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
106 
107 		(void) mg_coll_convert(fields_chain, &strp->s_current,
108 		    FCV_REALLOC, field_separator);
109 		SOP_PUT_LINE(outstrp, &strp->s_current);
110 		SOP_RELEASE_LINE(strp);
111 
112 		while (!SOP_EOS(strp)) {
113 			SOP_FETCH(strp);
114 			if (strp->s_current.l_collate_length == 0)
115 				(void) mg_coll_convert(fields_chain,
116 				    &strp->s_current, FCV_REALLOC,
117 				    field_separator);
118 			SOP_PUT_LINE(outstrp, &strp->s_current);
119 			SOP_RELEASE_LINE(strp);
120 		}
121 
122 		(void) SOP_CLOSE(strp);
123 		SOP_FLUSH(outstrp);
124 	}
125 }
126 
127 static void
128 merge_two_streams(field_t *fields_chain, stream_t *str_a, stream_t *str_b,
129     stream_t *outstrp, vchar_t field_separator, flag_t coll_flags)
130 {
131 	int (*collate_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t);
132 	size_t element_size = str_a->s_element_size;
133 	size_t initial_size = INITIAL_COLLATION_SIZE * element_size;
134 
135 	ASSERT(str_a->s_element_size == str_b->s_element_size);
136 
137 	if (str_a->s_element_size == sizeof (char))
138 		collate_fcn = collated;
139 	else
140 		collate_fcn = collated_wide;
141 
142 	if (str_a->s_status & STREAM_SINGLE || str_a->s_status & STREAM_WIDE)
143 		stream_set(str_a, STREAM_INSTANT);
144 	if (str_b->s_status & STREAM_SINGLE || str_b->s_status & STREAM_WIDE)
145 		stream_set(str_b, STREAM_INSTANT);
146 
147 	if (SOP_PRIME(str_a) != PRIME_SUCCEEDED) {
148 		if (SOP_PRIME(str_b) != PRIME_SUCCEEDED)
149 			return;
150 
151 		merge_one_stream(fields_chain, str_b, outstrp,
152 		    field_separator);
153 		return;
154 	}
155 
156 	if (SOP_PRIME(str_b) != PRIME_SUCCEEDED) {
157 		merge_one_stream(fields_chain, str_a, outstrp,
158 		    field_separator);
159 		return;
160 	}
161 
162 	str_a->s_current.l_collate_bufsize =
163 	    str_b->s_current.l_collate_bufsize = initial_size;
164 
165 	str_a->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
166 	str_b->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
167 
168 	(void) mg_coll_convert(fields_chain, &str_a->s_current, FCV_REALLOC,
169 	    field_separator);
170 	(void) mg_coll_convert(fields_chain, &str_b->s_current, FCV_REALLOC,
171 	    field_separator);
172 
173 	for (;;) {
174 		if (collate_fcn(&str_a->s_current, &str_b->s_current, 0,
175 		    coll_flags) < 0) {
176 			SOP_PUT_LINE(outstrp, &str_a->s_current);
177 			SOP_RELEASE_LINE(str_a);
178 			if (SOP_EOS(str_a)) {
179 				(void) SOP_CLOSE(str_a);
180 				str_a = str_b;
181 				break;
182 			}
183 			SOP_FETCH(str_a);
184 			if (str_a->s_current.l_collate_length != 0)
185 				continue;
186 			(void) mg_coll_convert(fields_chain, &str_a->s_current,
187 			    FCV_REALLOC, field_separator);
188 		} else {
189 			SOP_PUT_LINE(outstrp, &str_b->s_current);
190 			SOP_RELEASE_LINE(str_b);
191 			if (SOP_EOS(str_b)) {
192 				SOP_CLOSE(str_b);
193 				break;
194 			}
195 			SOP_FETCH(str_b);
196 			if (str_b->s_current.l_collate_length != 0)
197 				continue;
198 			(void) mg_coll_convert(fields_chain, &str_b->s_current,
199 			    FCV_REALLOC, field_separator);
200 		}
201 	}
202 
203 	SOP_PUT_LINE(outstrp, &str_a->s_current);
204 	SOP_RELEASE_LINE(str_a);
205 
206 	while (!SOP_EOS(str_a)) {
207 		SOP_FETCH(str_a);
208 		if (str_a->s_current.l_collate_length == 0)
209 			(void) mg_coll_convert(fields_chain, &str_a->s_current,
210 			    FCV_REALLOC, field_separator);
211 		SOP_PUT_LINE(outstrp, &str_a->s_current);
212 		SOP_RELEASE_LINE(str_a);
213 	}
214 
215 	(void) SOP_CLOSE(str_a);
216 	SOP_FLUSH(outstrp);
217 }
218 
219 /*
220  * priority queue routines
221  *   used for merges involving more than two sources
222  */
223 static void
224 heap_up(stream_t **A, int k, flag_t coll_flags)
225 {
226 	while (k > 1 &&
227 	    pq_coll_fcn(&A[k / 2]->s_current, &A[k]->s_current, 0,
228 	    coll_flags) > 0) {
229 		swap((void **)&pq_queue[k], (void **)&pq_queue[k / 2]);
230 		k /= 2;
231 	}
232 }
233 
234 static void
235 heap_down(stream_t **A, int k, int N, flag_t coll_flags)
236 {
237 	int	j;
238 
239 	while (2 * k <= N) {
240 		j = 2 * k;
241 		if (j < N && pq_coll_fcn(&A[j]->s_current,
242 		    &A[j + 1]->s_current, 0, coll_flags) > 0)
243 			j++;
244 		if (pq_coll_fcn(&A[k]->s_current, &A[j]->s_current, 0,
245 		    coll_flags) <= 0)
246 			break;
247 		swap((void **)&pq_queue[k], (void **)&pq_queue[j]);
248 		k = j;
249 	}
250 }
251 
252 static int
253 pqueue_empty()
254 {
255 	return (pq_N == 0);
256 }
257 
258 static void
259 pqueue_init(size_t max_size,
260     int (*coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t))
261 {
262 	pq_queue = safe_realloc(NULL, sizeof (stream_t *) * (max_size + 1));
263 	pq_N = 0;
264 	pq_coll_fcn = coll_fcn;
265 }
266 
267 static void
268 pqueue_insert(stream_t *source, flag_t coll_flags)
269 {
270 	pq_queue[++pq_N] = source;
271 	heap_up(pq_queue, pq_N, coll_flags);
272 }
273 
274 static stream_t *
275 pqueue_head(flag_t coll_flags)
276 {
277 	swap((void **)&pq_queue[1], (void **)&pq_queue[pq_N]);
278 	heap_down(pq_queue, 1, pq_N - 1, coll_flags);
279 	return (pq_queue[pq_N--]);
280 }
281 
282 static void
283 merge_n_streams(sort_t *S, stream_t *head_streamp, int n_streams,
284     stream_t *out_streamp, flag_t coll_flags)
285 {
286 	stream_t *top_streamp;
287 	stream_t *cur_streamp;
288 	stream_t *bot_streamp;
289 	stream_t *loop_out_streamp;
290 	flag_t is_single_byte = S->m_single_byte_locale;
291 
292 	int n_opens = 0;
293 	int threshold_opens;
294 
295 	threshold_opens = MAX(4,
296 	    2 * S->m_memory_available / DEFAULT_RELEASE_SIZE);
297 
298 	pqueue_init(n_streams, is_single_byte ? collated : collated_wide);
299 
300 	top_streamp = bot_streamp = head_streamp;
301 
302 	for (;;) {
303 		hold_file_descriptor();
304 		while (bot_streamp != NULL) {
305 
306 			if (n_opens > threshold_opens ||
307 			    stream_open_for_read(S, bot_streamp) == -1) {
308 				/*
309 				 * Available file descriptors would exceed
310 				 * memory target or have been exhausted; back
311 				 * off to the last valid, primed stream.
312 				 */
313 				bot_streamp = bot_streamp->s_previous;
314 				break;
315 			}
316 
317 			if (bot_streamp->s_status & STREAM_SINGLE ||
318 			    bot_streamp->s_status & STREAM_WIDE)
319 				stream_set(bot_streamp, STREAM_INSTANT);
320 
321 			bot_streamp = bot_streamp->s_next;
322 			n_opens++;
323 		}
324 		release_file_descriptor();
325 
326 		if (bot_streamp == NULL) {
327 			if (prepare_output_stream(out_streamp, S) != -1)
328 				loop_out_streamp = out_streamp;
329 			else
330 				die(EMSG_DESCRIPTORS);
331 		} else {
332 			loop_out_streamp = stream_push_to_temporary(
333 			    &head_streamp, NULL, ST_OPEN | ST_NOCACHE |
334 			    (is_single_byte ? 0 : ST_WIDE));
335 
336 			if (loop_out_streamp == NULL ||
337 			    top_streamp == bot_streamp)
338 				/*
339 				 * We need three file descriptors to make
340 				 * progress; if top_streamp == bot_streamp, then
341 				 * we have only two.
342 				 */
343 				die(EMSG_DESCRIPTORS);
344 		}
345 
346 		for (cur_streamp = top_streamp; cur_streamp != bot_streamp;
347 		    cur_streamp = cur_streamp->s_next) {
348 			/*
349 			 * Empty stream?
350 			 */
351 			if (!(cur_streamp->s_status & STREAM_ARRAY) &&
352 			    SOP_EOS(cur_streamp)) {
353 				stream_unlink_temporary(cur_streamp);
354 				continue;
355 			}
356 
357 			/*
358 			 * Given that stream is not empty, any error in priming
359 			 * must be fatal.
360 			 */
361 			if (SOP_PRIME(cur_streamp) != PRIME_SUCCEEDED)
362 				die(EMSG_BADPRIME);
363 
364 			cur_streamp->s_current.l_collate_bufsize =
365 				INITIAL_COLLATION_SIZE;
366 			cur_streamp->s_current.l_collate.sp =
367 			    safe_realloc(NULL, INITIAL_COLLATION_SIZE);
368 			(void) mg_coll_convert(S->m_fields_head,
369 			    &cur_streamp->s_current, FCV_REALLOC,
370 			    S->m_field_separator);
371 
372 			pqueue_insert(cur_streamp, coll_flags);
373 		}
374 
375 		while (!pqueue_empty()) {
376 			cur_streamp = pqueue_head(coll_flags);
377 
378 			SOP_PUT_LINE(loop_out_streamp, &cur_streamp->s_current);
379 			SOP_RELEASE_LINE(cur_streamp);
380 
381 			if (!SOP_EOS(cur_streamp)) {
382 				SOP_FETCH(cur_streamp);
383 				(void) mg_coll_convert(S->m_fields_head,
384 				    &cur_streamp->s_current, FCV_REALLOC,
385 				    S->m_field_separator);
386 				pqueue_insert(cur_streamp, coll_flags);
387 			}
388 		}
389 
390 		cur_streamp = top_streamp;
391 		while (cur_streamp != bot_streamp) {
392 			if (!(cur_streamp->s_status & STREAM_ARRAY))
393 				safe_free(cur_streamp->s_current.l_collate.sp);
394 			cur_streamp->s_current.l_collate.sp = NULL;
395 
396 			(void) SOP_FREE(cur_streamp);
397 			stream_unlink_temporary(cur_streamp);
398 			(void) SOP_CLOSE(cur_streamp);
399 
400 			cur_streamp = cur_streamp->s_next;
401 		}
402 
403 		(void) SOP_FLUSH(loop_out_streamp);
404 
405 		if (bot_streamp == NULL)
406 			break;
407 
408 		if (!(loop_out_streamp->s_status & STREAM_NOTFILE)) {
409 			(void) SOP_CLOSE(loop_out_streamp);
410 			/*
411 			 * Get file size so that we may treat intermediate files
412 			 * with our stream_mmap facilities.
413 			 */
414 			stream_stat_chain(loop_out_streamp);
415 			__S(stats_incr_merge_files());
416 		}
417 
418 		n_opens = 0;
419 
420 		top_streamp = bot_streamp;
421 		bot_streamp = bot_streamp->s_next;
422 	}
423 }
424 
425 void
426 merge(sort_t *S)
427 {
428 	stream_t *merge_chain;
429 	stream_t *cur_streamp;
430 	stream_t out_stream;
431 	uint_t n_merges;
432 	flag_t coll_flags;
433 
434 	if (S->m_merge_only) {
435 		merge_chain = S->m_input_streams;
436 		set_cleanup_chain(&S->m_input_streams);
437 	} else {
438 		/*
439 		 * Otherwise we're inheriting the temporary output files from
440 		 * our internal sort.
441 		 */
442 		merge_chain = S->m_temporary_streams;
443 		stream_stat_chain(merge_chain);
444 		__S(stats_set_merge_files(stream_count_chain(merge_chain)));
445 	}
446 
447 	if (S->m_field_options & FIELD_REVERSE_COMPARISONS)
448 		coll_flags = COLL_REVERSE;
449 	else
450 		coll_flags = 0;
451 	if (S->m_entire_line)
452 		coll_flags |= COLL_UNIQUE;
453 
454 	n_merges = stream_count_chain(merge_chain);
455 
456 	mg_coll_convert = S->m_coll_convert;
457 	cur_streamp = merge_chain;
458 
459 	switch (n_merges) {
460 		case 0:
461 			/*
462 			 * No files for merge.
463 			 */
464 			warn(gettext("no files available to merge\n"));
465 			break;
466 		case 1:
467 			/*
468 			 * Fast path: only one file for merge.
469 			 */
470 			(void) stream_open_for_read(S, cur_streamp);
471 			(void) prepare_output_stream(&out_stream, S);
472 			merge_one_stream(S->m_fields_head, cur_streamp,
473 			    &out_stream, S->m_field_separator);
474 			break;
475 		case 2:
476 			/*
477 			 * Fast path: only two files for merge.
478 			 */
479 			(void) stream_open_for_read(S, cur_streamp);
480 			(void) stream_open_for_read(S, cur_streamp->s_next);
481 			if (prepare_output_stream(&out_stream, S) == -1)
482 				die(EMSG_DESCRIPTORS);
483 			merge_two_streams(S->m_fields_head, cur_streamp,
484 			    cur_streamp->s_next, &out_stream,
485 			    S->m_field_separator, coll_flags);
486 			break;
487 		default:
488 			/*
489 			 * Full merge.
490 			 */
491 			merge_n_streams(S, cur_streamp, n_merges, &out_stream,
492 			    coll_flags);
493 			break;
494 	}
495 
496 	remove_output_guard();
497 }
498