1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
7 * with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22 /*
23 * Copyright 1998-2003 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #pragma ident "%Z%%M% %I% %E% SMI"
28
29 #include "merge.h"
30
31 /*
32 * External merge sort
33 *
34 * The following code implements the merge phase of sort(1) using a heap-based
35 * priority queue. Fast paths for merging two files as well as outputting a
36 * single file are provided.
37 *
38 * Memory footprint management
39 *
40 * The N-way fan-out of the merge phase can lead to compromising memory
41 * consumption if not constrained, so two mechanisms are used to regulate
42 * the memory footprint during the merge phase:
43 *
44 * 1. Single use memory advice. Since we proceed through each merge file in
45 * order, any line we have output is never required again--at least, not
46 * from that input file. Accordingly, we use the SOP_RELEASE_LINE()
47 * operation to advise that the memory backing the raw data for the stream
48 * up to that line is no longer of interest. (For certain classes of
49 * streams, this leads to an madvise(3C) call with the MADV_DONTNEED
50 * flag.)
51 *
52 * 2. Number of merge files. The number of merge files is constrained based
53 * on the amount of physical memory specified via the -S option (or deemed
54 * available based on an inquiry of sysconf(3C) for _SC_AVPHYS_PAGES).
55 * The number of merge files is calculated based on the average resident
56 * size of a stream that supports the SOP_RELEASE_LINE() operation; this
57 * number is conservative for streams that do not support this operation.
58 * A minimum of four subfiles will always be used, resource limits
59 * permitting.
60 *
61 * Temporary filespace footprint management
62 *
63 * Once the merge sort has utilized a temporary file, it may be deleted at
64 * close, as it's not used again and preserving the files until exit may
65 * compromise sort completion when limited temporary space is available.
66 */
67
68 static int pq_N;
69 static stream_t **pq_queue;
70 static int (*pq_coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t);
71
72 static ssize_t (*mg_coll_convert)(field_t *, line_rec_t *, flag_t, vchar_t);
73
74 static int
prepare_output_stream(stream_t * ostrp,sort_t * S)75 prepare_output_stream(stream_t *ostrp, sort_t *S)
76 {
77 stream_clear(ostrp);
78 stream_unset(ostrp, STREAM_OPEN);
79
80 stream_set(ostrp,
81 (S->m_single_byte_locale ? STREAM_SINGLE : STREAM_WIDE) |
82 (S->m_unique_lines ? STREAM_UNIQUE : 0));
83
84 if (S->m_output_to_stdout) {
85 stream_set(ostrp, STREAM_NOTFILE);
86 ostrp->s_filename = (char *)filename_stdout;
87 } else
88 ostrp->s_filename = S->m_output_filename;
89
90 return (SOP_OPEN_FOR_WRITE(ostrp));
91 }
92
93 static void
merge_one_stream(field_t * fields_chain,stream_t * strp,stream_t * outstrp,vchar_t field_separator)94 merge_one_stream(field_t *fields_chain, stream_t *strp, stream_t *outstrp,
95 vchar_t field_separator)
96 {
97 size_t element_size = strp->s_element_size;
98 size_t initial_size = INITIAL_COLLATION_SIZE * element_size;
99
100 if (strp->s_status & STREAM_SINGLE || strp->s_status & STREAM_WIDE)
101 stream_set(strp, STREAM_INSTANT);
102
103 if (SOP_PRIME(strp) == PRIME_SUCCEEDED) {
104 strp->s_current.l_collate_bufsize = initial_size;
105 strp->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
106
107 (void) mg_coll_convert(fields_chain, &strp->s_current,
108 FCV_REALLOC, field_separator);
109 SOP_PUT_LINE(outstrp, &strp->s_current);
110 SOP_RELEASE_LINE(strp);
111
112 while (!SOP_EOS(strp)) {
113 SOP_FETCH(strp);
114 if (strp->s_current.l_collate_length == 0)
115 (void) mg_coll_convert(fields_chain,
116 &strp->s_current, FCV_REALLOC,
117 field_separator);
118 SOP_PUT_LINE(outstrp, &strp->s_current);
119 SOP_RELEASE_LINE(strp);
120 }
121
122 (void) SOP_CLOSE(strp);
123 SOP_FLUSH(outstrp);
124 }
125 }
126
127 static void
merge_two_streams(field_t * fields_chain,stream_t * str_a,stream_t * str_b,stream_t * outstrp,vchar_t field_separator,flag_t coll_flags)128 merge_two_streams(field_t *fields_chain, stream_t *str_a, stream_t *str_b,
129 stream_t *outstrp, vchar_t field_separator, flag_t coll_flags)
130 {
131 int (*collate_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t);
132 size_t element_size = str_a->s_element_size;
133 size_t initial_size = INITIAL_COLLATION_SIZE * element_size;
134
135 ASSERT(str_a->s_element_size == str_b->s_element_size);
136
137 if (str_a->s_element_size == sizeof (char))
138 collate_fcn = collated;
139 else
140 collate_fcn = collated_wide;
141
142 if (str_a->s_status & STREAM_SINGLE || str_a->s_status & STREAM_WIDE)
143 stream_set(str_a, STREAM_INSTANT);
144 if (str_b->s_status & STREAM_SINGLE || str_b->s_status & STREAM_WIDE)
145 stream_set(str_b, STREAM_INSTANT);
146
147 if (SOP_PRIME(str_a) != PRIME_SUCCEEDED) {
148 if (SOP_PRIME(str_b) != PRIME_SUCCEEDED)
149 return;
150
151 merge_one_stream(fields_chain, str_b, outstrp,
152 field_separator);
153 return;
154 }
155
156 if (SOP_PRIME(str_b) != PRIME_SUCCEEDED) {
157 merge_one_stream(fields_chain, str_a, outstrp,
158 field_separator);
159 return;
160 }
161
162 str_a->s_current.l_collate_bufsize =
163 str_b->s_current.l_collate_bufsize = initial_size;
164
165 str_a->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
166 str_b->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
167
168 (void) mg_coll_convert(fields_chain, &str_a->s_current, FCV_REALLOC,
169 field_separator);
170 (void) mg_coll_convert(fields_chain, &str_b->s_current, FCV_REALLOC,
171 field_separator);
172
173 for (;;) {
174 if (collate_fcn(&str_a->s_current, &str_b->s_current, 0,
175 coll_flags) < 0) {
176 SOP_PUT_LINE(outstrp, &str_a->s_current);
177 SOP_RELEASE_LINE(str_a);
178 if (SOP_EOS(str_a)) {
179 (void) SOP_CLOSE(str_a);
180 str_a = str_b;
181 break;
182 }
183 SOP_FETCH(str_a);
184 if (str_a->s_current.l_collate_length != 0)
185 continue;
186 (void) mg_coll_convert(fields_chain, &str_a->s_current,
187 FCV_REALLOC, field_separator);
188 } else {
189 SOP_PUT_LINE(outstrp, &str_b->s_current);
190 SOP_RELEASE_LINE(str_b);
191 if (SOP_EOS(str_b)) {
192 SOP_CLOSE(str_b);
193 break;
194 }
195 SOP_FETCH(str_b);
196 if (str_b->s_current.l_collate_length != 0)
197 continue;
198 (void) mg_coll_convert(fields_chain, &str_b->s_current,
199 FCV_REALLOC, field_separator);
200 }
201 }
202
203 SOP_PUT_LINE(outstrp, &str_a->s_current);
204 SOP_RELEASE_LINE(str_a);
205
206 while (!SOP_EOS(str_a)) {
207 SOP_FETCH(str_a);
208 if (str_a->s_current.l_collate_length == 0)
209 (void) mg_coll_convert(fields_chain, &str_a->s_current,
210 FCV_REALLOC, field_separator);
211 SOP_PUT_LINE(outstrp, &str_a->s_current);
212 SOP_RELEASE_LINE(str_a);
213 }
214
215 (void) SOP_CLOSE(str_a);
216 SOP_FLUSH(outstrp);
217 }
218
219 /*
220 * priority queue routines
221 * used for merges involving more than two sources
222 */
223 static void
heap_up(stream_t ** A,int k,flag_t coll_flags)224 heap_up(stream_t **A, int k, flag_t coll_flags)
225 {
226 while (k > 1 &&
227 pq_coll_fcn(&A[k / 2]->s_current, &A[k]->s_current, 0,
228 coll_flags) > 0) {
229 swap((void **)&pq_queue[k], (void **)&pq_queue[k / 2]);
230 k /= 2;
231 }
232 }
233
234 static void
heap_down(stream_t ** A,int k,int N,flag_t coll_flags)235 heap_down(stream_t **A, int k, int N, flag_t coll_flags)
236 {
237 int j;
238
239 while (2 * k <= N) {
240 j = 2 * k;
241 if (j < N && pq_coll_fcn(&A[j]->s_current,
242 &A[j + 1]->s_current, 0, coll_flags) > 0)
243 j++;
244 if (pq_coll_fcn(&A[k]->s_current, &A[j]->s_current, 0,
245 coll_flags) <= 0)
246 break;
247 swap((void **)&pq_queue[k], (void **)&pq_queue[j]);
248 k = j;
249 }
250 }
251
252 static int
pqueue_empty()253 pqueue_empty()
254 {
255 return (pq_N == 0);
256 }
257
258 static void
pqueue_init(size_t max_size,int (* coll_fcn)(line_rec_t *,line_rec_t *,ssize_t,flag_t))259 pqueue_init(size_t max_size,
260 int (*coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t))
261 {
262 pq_queue = safe_realloc(NULL, sizeof (stream_t *) * (max_size + 1));
263 pq_N = 0;
264 pq_coll_fcn = coll_fcn;
265 }
266
267 static void
pqueue_insert(stream_t * source,flag_t coll_flags)268 pqueue_insert(stream_t *source, flag_t coll_flags)
269 {
270 pq_queue[++pq_N] = source;
271 heap_up(pq_queue, pq_N, coll_flags);
272 }
273
274 static stream_t *
pqueue_head(flag_t coll_flags)275 pqueue_head(flag_t coll_flags)
276 {
277 swap((void **)&pq_queue[1], (void **)&pq_queue[pq_N]);
278 heap_down(pq_queue, 1, pq_N - 1, coll_flags);
279 return (pq_queue[pq_N--]);
280 }
281
282 static void
merge_n_streams(sort_t * S,stream_t * head_streamp,int n_streams,stream_t * out_streamp,flag_t coll_flags)283 merge_n_streams(sort_t *S, stream_t *head_streamp, int n_streams,
284 stream_t *out_streamp, flag_t coll_flags)
285 {
286 stream_t *top_streamp;
287 stream_t *cur_streamp;
288 stream_t *bot_streamp;
289 stream_t *loop_out_streamp;
290 flag_t is_single_byte = S->m_single_byte_locale;
291
292 int n_opens = 0;
293 int threshold_opens;
294
295 threshold_opens = MAX(4,
296 2 * S->m_memory_available / DEFAULT_RELEASE_SIZE);
297
298 pqueue_init(n_streams, is_single_byte ? collated : collated_wide);
299
300 top_streamp = bot_streamp = head_streamp;
301
302 for (;;) {
303 hold_file_descriptor();
304 while (bot_streamp != NULL) {
305
306 if (n_opens > threshold_opens ||
307 stream_open_for_read(S, bot_streamp) == -1) {
308 /*
309 * Available file descriptors would exceed
310 * memory target or have been exhausted; back
311 * off to the last valid, primed stream.
312 */
313 bot_streamp = bot_streamp->s_previous;
314 break;
315 }
316
317 if (bot_streamp->s_status & STREAM_SINGLE ||
318 bot_streamp->s_status & STREAM_WIDE)
319 stream_set(bot_streamp, STREAM_INSTANT);
320
321 bot_streamp = bot_streamp->s_next;
322 n_opens++;
323 }
324 release_file_descriptor();
325
326 if (bot_streamp == NULL) {
327 if (prepare_output_stream(out_streamp, S) != -1)
328 loop_out_streamp = out_streamp;
329 else
330 die(EMSG_DESCRIPTORS);
331 } else {
332 loop_out_streamp = stream_push_to_temporary(
333 &head_streamp, NULL, ST_OPEN | ST_NOCACHE |
334 (is_single_byte ? 0 : ST_WIDE));
335
336 if (loop_out_streamp == NULL ||
337 top_streamp == bot_streamp)
338 /*
339 * We need three file descriptors to make
340 * progress; if top_streamp == bot_streamp, then
341 * we have only two.
342 */
343 die(EMSG_DESCRIPTORS);
344 }
345
346 for (cur_streamp = top_streamp; cur_streamp != bot_streamp;
347 cur_streamp = cur_streamp->s_next) {
348 /*
349 * Empty stream?
350 */
351 if (!(cur_streamp->s_status & STREAM_ARRAY) &&
352 SOP_EOS(cur_streamp)) {
353 stream_unlink_temporary(cur_streamp);
354 continue;
355 }
356
357 /*
358 * Given that stream is not empty, any error in priming
359 * must be fatal.
360 */
361 if (SOP_PRIME(cur_streamp) != PRIME_SUCCEEDED)
362 die(EMSG_BADPRIME);
363
364 cur_streamp->s_current.l_collate_bufsize =
365 INITIAL_COLLATION_SIZE;
366 cur_streamp->s_current.l_collate.sp =
367 safe_realloc(NULL, INITIAL_COLLATION_SIZE);
368 (void) mg_coll_convert(S->m_fields_head,
369 &cur_streamp->s_current, FCV_REALLOC,
370 S->m_field_separator);
371
372 pqueue_insert(cur_streamp, coll_flags);
373 }
374
375 while (!pqueue_empty()) {
376 cur_streamp = pqueue_head(coll_flags);
377
378 SOP_PUT_LINE(loop_out_streamp, &cur_streamp->s_current);
379 SOP_RELEASE_LINE(cur_streamp);
380
381 if (!SOP_EOS(cur_streamp)) {
382 SOP_FETCH(cur_streamp);
383 (void) mg_coll_convert(S->m_fields_head,
384 &cur_streamp->s_current, FCV_REALLOC,
385 S->m_field_separator);
386 pqueue_insert(cur_streamp, coll_flags);
387 }
388 }
389
390 cur_streamp = top_streamp;
391 while (cur_streamp != bot_streamp) {
392 if (!(cur_streamp->s_status & STREAM_ARRAY))
393 safe_free(cur_streamp->s_current.l_collate.sp);
394 cur_streamp->s_current.l_collate.sp = NULL;
395
396 (void) SOP_FREE(cur_streamp);
397 stream_unlink_temporary(cur_streamp);
398 (void) SOP_CLOSE(cur_streamp);
399
400 cur_streamp = cur_streamp->s_next;
401 }
402
403 (void) SOP_FLUSH(loop_out_streamp);
404
405 if (bot_streamp == NULL)
406 break;
407
408 if (!(loop_out_streamp->s_status & STREAM_NOTFILE)) {
409 (void) SOP_CLOSE(loop_out_streamp);
410 /*
411 * Get file size so that we may treat intermediate files
412 * with our stream_mmap facilities.
413 */
414 stream_stat_chain(loop_out_streamp);
415 __S(stats_incr_merge_files());
416 }
417
418 n_opens = 0;
419
420 top_streamp = bot_streamp;
421 bot_streamp = bot_streamp->s_next;
422 }
423 }
424
425 void
merge(sort_t * S)426 merge(sort_t *S)
427 {
428 stream_t *merge_chain;
429 stream_t *cur_streamp;
430 stream_t out_stream;
431 uint_t n_merges;
432 flag_t coll_flags;
433
434 if (S->m_merge_only) {
435 merge_chain = S->m_input_streams;
436 set_cleanup_chain(&S->m_input_streams);
437 } else {
438 /*
439 * Otherwise we're inheriting the temporary output files from
440 * our internal sort.
441 */
442 merge_chain = S->m_temporary_streams;
443 stream_stat_chain(merge_chain);
444 __S(stats_set_merge_files(stream_count_chain(merge_chain)));
445 }
446
447 if (S->m_field_options & FIELD_REVERSE_COMPARISONS)
448 coll_flags = COLL_REVERSE;
449 else
450 coll_flags = 0;
451 if (S->m_entire_line)
452 coll_flags |= COLL_UNIQUE;
453
454 n_merges = stream_count_chain(merge_chain);
455
456 mg_coll_convert = S->m_coll_convert;
457 cur_streamp = merge_chain;
458
459 switch (n_merges) {
460 case 0:
461 /*
462 * No files for merge.
463 */
464 warn(gettext("no files available to merge\n"));
465 break;
466 case 1:
467 /*
468 * Fast path: only one file for merge.
469 */
470 (void) stream_open_for_read(S, cur_streamp);
471 (void) prepare_output_stream(&out_stream, S);
472 merge_one_stream(S->m_fields_head, cur_streamp,
473 &out_stream, S->m_field_separator);
474 break;
475 case 2:
476 /*
477 * Fast path: only two files for merge.
478 */
479 (void) stream_open_for_read(S, cur_streamp);
480 (void) stream_open_for_read(S, cur_streamp->s_next);
481 if (prepare_output_stream(&out_stream, S) == -1)
482 die(EMSG_DESCRIPTORS);
483 merge_two_streams(S->m_fields_head, cur_streamp,
484 cur_streamp->s_next, &out_stream,
485 S->m_field_separator, coll_flags);
486 break;
487 default:
488 /*
489 * Full merge.
490 */
491 merge_n_streams(S, cur_streamp, n_merges, &out_stream,
492 coll_flags);
493 break;
494 }
495
496 remove_output_guard();
497 }
498