xref: /illumos-gate/usr/src/cmd/sort/merge.c (revision 3b8e64428fecd54234c133286f9c0009ad8940b0)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 1998-2003 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #include "merge.h"
28 
29 /*
30  * External merge sort
31  *
32  *   The following code implements the merge phase of sort(1) using a heap-based
33  *   priority queue.  Fast paths for merging two files as well as outputting a
34  *   single file are provided.
35  *
36  * Memory footprint management
37  *
38  *   The N-way fan-out of the merge phase can lead to compromising memory
39  *   consumption if not constrained, so two mechanisms are used to regulate
40  *   the memory footprint during the merge phase:
41  *
42  *   1.  Single use memory advice.  Since we proceed through each merge file in
43  *       order, any line we have output is never required again--at least, not
44  *       from that input file.  Accordingly, we use the SOP_RELEASE_LINE()
45  *       operation to advise that the memory backing the raw data for the stream
46  *       up to that line is no longer of interest.  (For certain classes of
47  *       streams, this leads to an madvise(3C) call with the MADV_DONTNEED
48  *       flag.)
49  *
50  *   2.  Number of merge files.  The number of merge files is constrained based
51  *       on the amount of physical memory specified via the -S option (or deemed
52  *       available based on an inquiry of sysconf(3C) for _SC_AVPHYS_PAGES).
53  *       The number of merge files is calculated based on the average resident
54  *       size of a stream that supports the SOP_RELEASE_LINE() operation; this
55  *       number is conservative for streams that do not support this operation.
56  *       A minimum of four subfiles will always be used, resource limits
57  *       permitting.
58  *
59  * Temporary filespace footprint management
60  *
61  *   Once the merge sort has utilized a temporary file, it may be deleted at
62  *   close, as it's not used again and preserving the files until exit may
63  *   compromise sort completion when limited temporary space is available.
64  */
65 
66 static int pq_N;
67 static stream_t	**pq_queue;
68 static int (*pq_coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t);
69 
70 static ssize_t (*mg_coll_convert)(field_t *, line_rec_t *, flag_t, vchar_t);
71 
72 static int
73 prepare_output_stream(stream_t *ostrp, sort_t *S)
74 {
75 	stream_clear(ostrp);
76 	stream_unset(ostrp, STREAM_OPEN);
77 
78 	stream_set(ostrp,
79 	    (S->m_single_byte_locale ? STREAM_SINGLE : STREAM_WIDE) |
80 	    (S->m_unique_lines ? STREAM_UNIQUE : 0));
81 
82 	if (S->m_output_to_stdout) {
83 		stream_set(ostrp, STREAM_NOTFILE);
84 		ostrp->s_filename = (char *)filename_stdout;
85 	} else
86 		ostrp->s_filename = S->m_output_filename;
87 
88 	return (SOP_OPEN_FOR_WRITE(ostrp));
89 }
90 
91 static void
92 merge_one_stream(field_t *fields_chain, stream_t *strp, stream_t *outstrp,
93     vchar_t field_separator)
94 {
95 	size_t element_size = strp->s_element_size;
96 	size_t initial_size = INITIAL_COLLATION_SIZE * element_size;
97 
98 	if (strp->s_status & STREAM_SINGLE || strp->s_status & STREAM_WIDE)
99 		stream_set(strp, STREAM_INSTANT);
100 
101 	if (SOP_PRIME(strp) == PRIME_SUCCEEDED) {
102 		strp->s_current.l_collate_bufsize = initial_size;
103 		strp->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
104 
105 		(void) mg_coll_convert(fields_chain, &strp->s_current,
106 		    FCV_REALLOC, field_separator);
107 		SOP_PUT_LINE(outstrp, &strp->s_current);
108 		SOP_RELEASE_LINE(strp);
109 
110 		while (!SOP_EOS(strp)) {
111 			SOP_FETCH(strp);
112 			if (strp->s_current.l_collate_length == 0)
113 				(void) mg_coll_convert(fields_chain,
114 				    &strp->s_current, FCV_REALLOC,
115 				    field_separator);
116 			SOP_PUT_LINE(outstrp, &strp->s_current);
117 			SOP_RELEASE_LINE(strp);
118 		}
119 
120 		(void) SOP_CLOSE(strp);
121 		SOP_FLUSH(outstrp);
122 	}
123 }
124 
125 static void
126 merge_two_streams(field_t *fields_chain, stream_t *str_a, stream_t *str_b,
127     stream_t *outstrp, vchar_t field_separator, flag_t coll_flags)
128 {
129 	int (*collate_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t);
130 	size_t element_size = str_a->s_element_size;
131 	size_t initial_size = INITIAL_COLLATION_SIZE * element_size;
132 
133 	ASSERT(str_a->s_element_size == str_b->s_element_size);
134 
135 	if (str_a->s_element_size == sizeof (char))
136 		collate_fcn = collated;
137 	else
138 		collate_fcn = collated_wide;
139 
140 	if (str_a->s_status & STREAM_SINGLE || str_a->s_status & STREAM_WIDE)
141 		stream_set(str_a, STREAM_INSTANT);
142 	if (str_b->s_status & STREAM_SINGLE || str_b->s_status & STREAM_WIDE)
143 		stream_set(str_b, STREAM_INSTANT);
144 
145 	if (SOP_PRIME(str_a) != PRIME_SUCCEEDED) {
146 		if (SOP_PRIME(str_b) != PRIME_SUCCEEDED)
147 			return;
148 
149 		merge_one_stream(fields_chain, str_b, outstrp,
150 		    field_separator);
151 		return;
152 	}
153 
154 	if (SOP_PRIME(str_b) != PRIME_SUCCEEDED) {
155 		merge_one_stream(fields_chain, str_a, outstrp,
156 		    field_separator);
157 		return;
158 	}
159 
160 	str_a->s_current.l_collate_bufsize =
161 	    str_b->s_current.l_collate_bufsize = initial_size;
162 
163 	str_a->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
164 	str_b->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
165 
166 	(void) mg_coll_convert(fields_chain, &str_a->s_current, FCV_REALLOC,
167 	    field_separator);
168 	(void) mg_coll_convert(fields_chain, &str_b->s_current, FCV_REALLOC,
169 	    field_separator);
170 
171 	for (;;) {
172 		if (collate_fcn(&str_a->s_current, &str_b->s_current, 0,
173 		    coll_flags) < 0) {
174 			SOP_PUT_LINE(outstrp, &str_a->s_current);
175 			SOP_RELEASE_LINE(str_a);
176 			if (SOP_EOS(str_a)) {
177 				(void) SOP_CLOSE(str_a);
178 				str_a = str_b;
179 				break;
180 			}
181 			SOP_FETCH(str_a);
182 			if (str_a->s_current.l_collate_length != 0)
183 				continue;
184 			(void) mg_coll_convert(fields_chain, &str_a->s_current,
185 			    FCV_REALLOC, field_separator);
186 		} else {
187 			SOP_PUT_LINE(outstrp, &str_b->s_current);
188 			SOP_RELEASE_LINE(str_b);
189 			if (SOP_EOS(str_b)) {
190 				SOP_CLOSE(str_b);
191 				break;
192 			}
193 			SOP_FETCH(str_b);
194 			if (str_b->s_current.l_collate_length != 0)
195 				continue;
196 			(void) mg_coll_convert(fields_chain, &str_b->s_current,
197 			    FCV_REALLOC, field_separator);
198 		}
199 	}
200 
201 	SOP_PUT_LINE(outstrp, &str_a->s_current);
202 	SOP_RELEASE_LINE(str_a);
203 
204 	while (!SOP_EOS(str_a)) {
205 		SOP_FETCH(str_a);
206 		if (str_a->s_current.l_collate_length == 0)
207 			(void) mg_coll_convert(fields_chain, &str_a->s_current,
208 			    FCV_REALLOC, field_separator);
209 		SOP_PUT_LINE(outstrp, &str_a->s_current);
210 		SOP_RELEASE_LINE(str_a);
211 	}
212 
213 	(void) SOP_CLOSE(str_a);
214 	SOP_FLUSH(outstrp);
215 }
216 
217 /*
218  * priority queue routines
219  *   used for merges involving more than two sources
220  */
221 static void
222 heap_up(stream_t **A, int k, flag_t coll_flags)
223 {
224 	while (k > 1 &&
225 	    pq_coll_fcn(&A[k / 2]->s_current, &A[k]->s_current, 0,
226 	    coll_flags) > 0) {
227 		swap((void **)&pq_queue[k], (void **)&pq_queue[k / 2]);
228 		k /= 2;
229 	}
230 }
231 
232 static void
233 heap_down(stream_t **A, int k, int N, flag_t coll_flags)
234 {
235 	int	j;
236 
237 	while (2 * k <= N) {
238 		j = 2 * k;
239 		if (j < N && pq_coll_fcn(&A[j]->s_current,
240 		    &A[j + 1]->s_current, 0, coll_flags) > 0)
241 			j++;
242 		if (pq_coll_fcn(&A[k]->s_current, &A[j]->s_current, 0,
243 		    coll_flags) <= 0)
244 			break;
245 		swap((void **)&pq_queue[k], (void **)&pq_queue[j]);
246 		k = j;
247 	}
248 }
249 
250 static int
251 pqueue_empty()
252 {
253 	return (pq_N == 0);
254 }
255 
256 static void
257 pqueue_init(size_t max_size,
258     int (*coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t))
259 {
260 	pq_queue = safe_realloc(NULL, sizeof (stream_t *) * (max_size + 1));
261 	pq_N = 0;
262 	pq_coll_fcn = coll_fcn;
263 }
264 
265 static void
266 pqueue_insert(stream_t *source, flag_t coll_flags)
267 {
268 	pq_queue[++pq_N] = source;
269 	heap_up(pq_queue, pq_N, coll_flags);
270 }
271 
272 static stream_t *
273 pqueue_head(flag_t coll_flags)
274 {
275 	swap((void **)&pq_queue[1], (void **)&pq_queue[pq_N]);
276 	heap_down(pq_queue, 1, pq_N - 1, coll_flags);
277 	return (pq_queue[pq_N--]);
278 }
279 
280 static void
281 merge_n_streams(sort_t *S, stream_t *head_streamp, int n_streams,
282     stream_t *out_streamp, flag_t coll_flags)
283 {
284 	stream_t *top_streamp;
285 	stream_t *cur_streamp;
286 	stream_t *bot_streamp;
287 	stream_t *loop_out_streamp;
288 	flag_t is_single_byte = S->m_single_byte_locale;
289 
290 	int n_opens = 0;
291 	int threshold_opens;
292 
293 	threshold_opens = MAX(4,
294 	    2 * S->m_memory_available / DEFAULT_RELEASE_SIZE);
295 
296 	pqueue_init(n_streams, is_single_byte ? collated : collated_wide);
297 
298 	top_streamp = bot_streamp = head_streamp;
299 
300 	for (;;) {
301 		hold_file_descriptor();
302 		while (bot_streamp != NULL) {
303 
304 			if (n_opens > threshold_opens ||
305 			    stream_open_for_read(S, bot_streamp) == -1) {
306 				/*
307 				 * Available file descriptors would exceed
308 				 * memory target or have been exhausted; back
309 				 * off to the last valid, primed stream.
310 				 */
311 				bot_streamp = bot_streamp->s_previous;
312 				break;
313 			}
314 
315 			if (bot_streamp->s_status & STREAM_SINGLE ||
316 			    bot_streamp->s_status & STREAM_WIDE)
317 				stream_set(bot_streamp, STREAM_INSTANT);
318 
319 			bot_streamp = bot_streamp->s_next;
320 			n_opens++;
321 		}
322 		release_file_descriptor();
323 
324 		if (bot_streamp == NULL) {
325 			if (prepare_output_stream(out_streamp, S) != -1)
326 				loop_out_streamp = out_streamp;
327 			else
328 				die(EMSG_DESCRIPTORS);
329 		} else {
330 			loop_out_streamp = stream_push_to_temporary(
331 			    &head_streamp, NULL, ST_OPEN | ST_NOCACHE |
332 			    (is_single_byte ? 0 : ST_WIDE));
333 
334 			if (loop_out_streamp == NULL ||
335 			    top_streamp == bot_streamp)
336 				/*
337 				 * We need three file descriptors to make
338 				 * progress; if top_streamp == bot_streamp, then
339 				 * we have only two.
340 				 */
341 				die(EMSG_DESCRIPTORS);
342 		}
343 
344 		for (cur_streamp = top_streamp; cur_streamp != bot_streamp;
345 		    cur_streamp = cur_streamp->s_next) {
346 			/*
347 			 * Empty stream?
348 			 */
349 			if (!(cur_streamp->s_status & STREAM_ARRAY) &&
350 			    SOP_EOS(cur_streamp)) {
351 				stream_unlink_temporary(cur_streamp);
352 				continue;
353 			}
354 
355 			/*
356 			 * Given that stream is not empty, any error in priming
357 			 * must be fatal.
358 			 */
359 			if (SOP_PRIME(cur_streamp) != PRIME_SUCCEEDED)
360 				die(EMSG_BADPRIME);
361 
362 			cur_streamp->s_current.l_collate_bufsize =
363 			    INITIAL_COLLATION_SIZE;
364 			cur_streamp->s_current.l_collate.sp =
365 			    safe_realloc(NULL, INITIAL_COLLATION_SIZE);
366 			(void) mg_coll_convert(S->m_fields_head,
367 			    &cur_streamp->s_current, FCV_REALLOC,
368 			    S->m_field_separator);
369 
370 			pqueue_insert(cur_streamp, coll_flags);
371 		}
372 
373 		while (!pqueue_empty()) {
374 			cur_streamp = pqueue_head(coll_flags);
375 
376 			SOP_PUT_LINE(loop_out_streamp, &cur_streamp->s_current);
377 			SOP_RELEASE_LINE(cur_streamp);
378 
379 			if (!SOP_EOS(cur_streamp)) {
380 				SOP_FETCH(cur_streamp);
381 				(void) mg_coll_convert(S->m_fields_head,
382 				    &cur_streamp->s_current, FCV_REALLOC,
383 				    S->m_field_separator);
384 				pqueue_insert(cur_streamp, coll_flags);
385 			}
386 		}
387 
388 		cur_streamp = top_streamp;
389 		while (cur_streamp != bot_streamp) {
390 			if (!(cur_streamp->s_status & STREAM_ARRAY))
391 				safe_free(cur_streamp->s_current.l_collate.sp);
392 			cur_streamp->s_current.l_collate.sp = NULL;
393 
394 			(void) SOP_FREE(cur_streamp);
395 			stream_unlink_temporary(cur_streamp);
396 			(void) SOP_CLOSE(cur_streamp);
397 
398 			cur_streamp = cur_streamp->s_next;
399 		}
400 
401 		(void) SOP_FLUSH(loop_out_streamp);
402 
403 		if (bot_streamp == NULL)
404 			break;
405 
406 		if (!(loop_out_streamp->s_status & STREAM_NOTFILE)) {
407 			(void) SOP_CLOSE(loop_out_streamp);
408 			/*
409 			 * Get file size so that we may treat intermediate files
410 			 * with our stream_mmap facilities.
411 			 */
412 			stream_stat_chain(loop_out_streamp);
413 			__S(stats_incr_merge_files());
414 		}
415 
416 		n_opens = 0;
417 
418 		top_streamp = bot_streamp;
419 		bot_streamp = bot_streamp->s_next;
420 	}
421 }
422 
423 void
424 merge(sort_t *S)
425 {
426 	stream_t *merge_chain;
427 	stream_t *cur_streamp;
428 	stream_t out_stream;
429 	uint_t n_merges;
430 	flag_t coll_flags;
431 
432 	if (S->m_merge_only) {
433 		merge_chain = S->m_input_streams;
434 		set_cleanup_chain(&S->m_input_streams);
435 	} else {
436 		/*
437 		 * Otherwise we're inheriting the temporary output files from
438 		 * our internal sort.
439 		 */
440 		merge_chain = S->m_temporary_streams;
441 		stream_stat_chain(merge_chain);
442 		__S(stats_set_merge_files(stream_count_chain(merge_chain)));
443 	}
444 
445 	if (S->m_field_options & FIELD_REVERSE_COMPARISONS)
446 		coll_flags = COLL_REVERSE;
447 	else
448 		coll_flags = 0;
449 	if (S->m_entire_line)
450 		coll_flags |= COLL_UNIQUE;
451 
452 	n_merges = stream_count_chain(merge_chain);
453 
454 	mg_coll_convert = S->m_coll_convert;
455 	cur_streamp = merge_chain;
456 
457 	switch (n_merges) {
458 		case 0:
459 			/*
460 			 * No files for merge.
461 			 */
462 			warn(gettext("no files available to merge\n"));
463 			break;
464 		case 1:
465 			/*
466 			 * Fast path: only one file for merge.
467 			 */
468 			(void) stream_open_for_read(S, cur_streamp);
469 			(void) prepare_output_stream(&out_stream, S);
470 			merge_one_stream(S->m_fields_head, cur_streamp,
471 			    &out_stream, S->m_field_separator);
472 			break;
473 		case 2:
474 			/*
475 			 * Fast path: only two files for merge.
476 			 */
477 			(void) stream_open_for_read(S, cur_streamp);
478 			(void) stream_open_for_read(S, cur_streamp->s_next);
479 			if (prepare_output_stream(&out_stream, S) == -1)
480 				die(EMSG_DESCRIPTORS);
481 			merge_two_streams(S->m_fields_head, cur_streamp,
482 			    cur_streamp->s_next, &out_stream,
483 			    S->m_field_separator, coll_flags);
484 			break;
485 		default:
486 			/*
487 			 * Full merge.
488 			 */
489 			merge_n_streams(S, cur_streamp, n_merges, &out_stream,
490 			    coll_flags);
491 			break;
492 	}
493 
494 	remove_output_guard();
495 }
496