1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
7 * with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22 /*
23 * Copyright 1998-2003 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #include "merge.h"
28
29 /*
30 * External merge sort
31 *
32 * The following code implements the merge phase of sort(1) using a heap-based
33 * priority queue. Fast paths for merging two files as well as outputting a
34 * single file are provided.
35 *
36 * Memory footprint management
37 *
38 * The N-way fan-out of the merge phase can lead to compromising memory
39 * consumption if not constrained, so two mechanisms are used to regulate
40 * the memory footprint during the merge phase:
41 *
42 * 1. Single use memory advice. Since we proceed through each merge file in
43 * order, any line we have output is never required again--at least, not
44 * from that input file. Accordingly, we use the SOP_RELEASE_LINE()
45 * operation to advise that the memory backing the raw data for the stream
46 * up to that line is no longer of interest. (For certain classes of
47 * streams, this leads to an madvise(3C) call with the MADV_DONTNEED
48 * flag.)
49 *
50 * 2. Number of merge files. The number of merge files is constrained based
51 * on the amount of physical memory specified via the -S option (or deemed
52 * available based on an inquiry of sysconf(3C) for _SC_AVPHYS_PAGES).
53 * The number of merge files is calculated based on the average resident
54 * size of a stream that supports the SOP_RELEASE_LINE() operation; this
55 * number is conservative for streams that do not support this operation.
56 * A minimum of four subfiles will always be used, resource limits
57 * permitting.
58 *
59 * Temporary filespace footprint management
60 *
61 * Once the merge sort has utilized a temporary file, it may be deleted at
62 * close, as it's not used again and preserving the files until exit may
63 * compromise sort completion when limited temporary space is available.
64 */
65
66 static int pq_N;
67 static stream_t **pq_queue;
68 static int (*pq_coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t);
69
70 static ssize_t (*mg_coll_convert)(field_t *, line_rec_t *, flag_t, vchar_t);
71
72 static int
prepare_output_stream(stream_t * ostrp,sort_t * S)73 prepare_output_stream(stream_t *ostrp, sort_t *S)
74 {
75 stream_clear(ostrp);
76 stream_unset(ostrp, STREAM_OPEN);
77
78 stream_set(ostrp,
79 (S->m_single_byte_locale ? STREAM_SINGLE : STREAM_WIDE) |
80 (S->m_unique_lines ? STREAM_UNIQUE : 0));
81
82 if (S->m_output_to_stdout) {
83 stream_set(ostrp, STREAM_NOTFILE);
84 ostrp->s_filename = (char *)filename_stdout;
85 } else
86 ostrp->s_filename = S->m_output_filename;
87
88 return (SOP_OPEN_FOR_WRITE(ostrp));
89 }
90
91 static void
merge_one_stream(field_t * fields_chain,stream_t * strp,stream_t * outstrp,vchar_t field_separator)92 merge_one_stream(field_t *fields_chain, stream_t *strp, stream_t *outstrp,
93 vchar_t field_separator)
94 {
95 size_t element_size = strp->s_element_size;
96 size_t initial_size = INITIAL_COLLATION_SIZE * element_size;
97
98 if (strp->s_status & STREAM_SINGLE || strp->s_status & STREAM_WIDE)
99 stream_set(strp, STREAM_INSTANT);
100
101 if (SOP_PRIME(strp) == PRIME_SUCCEEDED) {
102 strp->s_current.l_collate_bufsize = initial_size;
103 strp->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
104
105 (void) mg_coll_convert(fields_chain, &strp->s_current,
106 FCV_REALLOC, field_separator);
107 SOP_PUT_LINE(outstrp, &strp->s_current);
108 SOP_RELEASE_LINE(strp);
109
110 while (!SOP_EOS(strp)) {
111 SOP_FETCH(strp);
112 if (strp->s_current.l_collate_length == 0)
113 (void) mg_coll_convert(fields_chain,
114 &strp->s_current, FCV_REALLOC,
115 field_separator);
116 SOP_PUT_LINE(outstrp, &strp->s_current);
117 SOP_RELEASE_LINE(strp);
118 }
119
120 (void) SOP_CLOSE(strp);
121 SOP_FLUSH(outstrp);
122 }
123 }
124
125 static void
merge_two_streams(field_t * fields_chain,stream_t * str_a,stream_t * str_b,stream_t * outstrp,vchar_t field_separator,flag_t coll_flags)126 merge_two_streams(field_t *fields_chain, stream_t *str_a, stream_t *str_b,
127 stream_t *outstrp, vchar_t field_separator, flag_t coll_flags)
128 {
129 int (*collate_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t);
130 size_t element_size = str_a->s_element_size;
131 size_t initial_size = INITIAL_COLLATION_SIZE * element_size;
132
133 ASSERT(str_a->s_element_size == str_b->s_element_size);
134
135 if (str_a->s_element_size == sizeof (char))
136 collate_fcn = collated;
137 else
138 collate_fcn = collated_wide;
139
140 if (str_a->s_status & STREAM_SINGLE || str_a->s_status & STREAM_WIDE)
141 stream_set(str_a, STREAM_INSTANT);
142 if (str_b->s_status & STREAM_SINGLE || str_b->s_status & STREAM_WIDE)
143 stream_set(str_b, STREAM_INSTANT);
144
145 if (SOP_PRIME(str_a) != PRIME_SUCCEEDED) {
146 if (SOP_PRIME(str_b) != PRIME_SUCCEEDED)
147 return;
148
149 merge_one_stream(fields_chain, str_b, outstrp,
150 field_separator);
151 return;
152 }
153
154 if (SOP_PRIME(str_b) != PRIME_SUCCEEDED) {
155 merge_one_stream(fields_chain, str_a, outstrp,
156 field_separator);
157 return;
158 }
159
160 str_a->s_current.l_collate_bufsize =
161 str_b->s_current.l_collate_bufsize = initial_size;
162
163 str_a->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
164 str_b->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
165
166 (void) mg_coll_convert(fields_chain, &str_a->s_current, FCV_REALLOC,
167 field_separator);
168 (void) mg_coll_convert(fields_chain, &str_b->s_current, FCV_REALLOC,
169 field_separator);
170
171 for (;;) {
172 if (collate_fcn(&str_a->s_current, &str_b->s_current, 0,
173 coll_flags) < 0) {
174 SOP_PUT_LINE(outstrp, &str_a->s_current);
175 SOP_RELEASE_LINE(str_a);
176 if (SOP_EOS(str_a)) {
177 (void) SOP_CLOSE(str_a);
178 str_a = str_b;
179 break;
180 }
181 SOP_FETCH(str_a);
182 if (str_a->s_current.l_collate_length != 0)
183 continue;
184 (void) mg_coll_convert(fields_chain, &str_a->s_current,
185 FCV_REALLOC, field_separator);
186 } else {
187 SOP_PUT_LINE(outstrp, &str_b->s_current);
188 SOP_RELEASE_LINE(str_b);
189 if (SOP_EOS(str_b)) {
190 SOP_CLOSE(str_b);
191 break;
192 }
193 SOP_FETCH(str_b);
194 if (str_b->s_current.l_collate_length != 0)
195 continue;
196 (void) mg_coll_convert(fields_chain, &str_b->s_current,
197 FCV_REALLOC, field_separator);
198 }
199 }
200
201 SOP_PUT_LINE(outstrp, &str_a->s_current);
202 SOP_RELEASE_LINE(str_a);
203
204 while (!SOP_EOS(str_a)) {
205 SOP_FETCH(str_a);
206 if (str_a->s_current.l_collate_length == 0)
207 (void) mg_coll_convert(fields_chain, &str_a->s_current,
208 FCV_REALLOC, field_separator);
209 SOP_PUT_LINE(outstrp, &str_a->s_current);
210 SOP_RELEASE_LINE(str_a);
211 }
212
213 (void) SOP_CLOSE(str_a);
214 SOP_FLUSH(outstrp);
215 }
216
217 /*
218 * priority queue routines
219 * used for merges involving more than two sources
220 */
221 static void
heap_up(stream_t ** A,int k,flag_t coll_flags)222 heap_up(stream_t **A, int k, flag_t coll_flags)
223 {
224 while (k > 1 &&
225 pq_coll_fcn(&A[k / 2]->s_current, &A[k]->s_current, 0,
226 coll_flags) > 0) {
227 swap((void **)&pq_queue[k], (void **)&pq_queue[k / 2]);
228 k /= 2;
229 }
230 }
231
232 static void
heap_down(stream_t ** A,int k,int N,flag_t coll_flags)233 heap_down(stream_t **A, int k, int N, flag_t coll_flags)
234 {
235 int j;
236
237 while (2 * k <= N) {
238 j = 2 * k;
239 if (j < N && pq_coll_fcn(&A[j]->s_current,
240 &A[j + 1]->s_current, 0, coll_flags) > 0)
241 j++;
242 if (pq_coll_fcn(&A[k]->s_current, &A[j]->s_current, 0,
243 coll_flags) <= 0)
244 break;
245 swap((void **)&pq_queue[k], (void **)&pq_queue[j]);
246 k = j;
247 }
248 }
249
250 static int
pqueue_empty()251 pqueue_empty()
252 {
253 return (pq_N == 0);
254 }
255
256 static void
pqueue_init(size_t max_size,int (* coll_fcn)(line_rec_t *,line_rec_t *,ssize_t,flag_t))257 pqueue_init(size_t max_size,
258 int (*coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t))
259 {
260 pq_queue = safe_realloc(NULL, sizeof (stream_t *) * (max_size + 1));
261 pq_N = 0;
262 pq_coll_fcn = coll_fcn;
263 }
264
265 static void
pqueue_insert(stream_t * source,flag_t coll_flags)266 pqueue_insert(stream_t *source, flag_t coll_flags)
267 {
268 pq_queue[++pq_N] = source;
269 heap_up(pq_queue, pq_N, coll_flags);
270 }
271
272 static stream_t *
pqueue_head(flag_t coll_flags)273 pqueue_head(flag_t coll_flags)
274 {
275 swap((void **)&pq_queue[1], (void **)&pq_queue[pq_N]);
276 heap_down(pq_queue, 1, pq_N - 1, coll_flags);
277 return (pq_queue[pq_N--]);
278 }
279
280 static void
merge_n_streams(sort_t * S,stream_t * head_streamp,int n_streams,stream_t * out_streamp,flag_t coll_flags)281 merge_n_streams(sort_t *S, stream_t *head_streamp, int n_streams,
282 stream_t *out_streamp, flag_t coll_flags)
283 {
284 stream_t *top_streamp;
285 stream_t *cur_streamp;
286 stream_t *bot_streamp;
287 stream_t *loop_out_streamp;
288 flag_t is_single_byte = S->m_single_byte_locale;
289
290 int n_opens = 0;
291 int threshold_opens;
292
293 threshold_opens = MAX(4,
294 2 * S->m_memory_available / DEFAULT_RELEASE_SIZE);
295
296 pqueue_init(n_streams, is_single_byte ? collated : collated_wide);
297
298 top_streamp = bot_streamp = head_streamp;
299
300 for (;;) {
301 hold_file_descriptor();
302 while (bot_streamp != NULL) {
303
304 if (n_opens > threshold_opens ||
305 stream_open_for_read(S, bot_streamp) == -1) {
306 /*
307 * Available file descriptors would exceed
308 * memory target or have been exhausted; back
309 * off to the last valid, primed stream.
310 */
311 bot_streamp = bot_streamp->s_previous;
312 break;
313 }
314
315 if (bot_streamp->s_status & STREAM_SINGLE ||
316 bot_streamp->s_status & STREAM_WIDE)
317 stream_set(bot_streamp, STREAM_INSTANT);
318
319 bot_streamp = bot_streamp->s_next;
320 n_opens++;
321 }
322 release_file_descriptor();
323
324 if (bot_streamp == NULL) {
325 if (prepare_output_stream(out_streamp, S) != -1)
326 loop_out_streamp = out_streamp;
327 else
328 die(EMSG_DESCRIPTORS);
329 } else {
330 loop_out_streamp = stream_push_to_temporary(
331 &head_streamp, NULL, ST_OPEN | ST_NOCACHE |
332 (is_single_byte ? 0 : ST_WIDE));
333
334 if (loop_out_streamp == NULL ||
335 top_streamp == bot_streamp)
336 /*
337 * We need three file descriptors to make
338 * progress; if top_streamp == bot_streamp, then
339 * we have only two.
340 */
341 die(EMSG_DESCRIPTORS);
342 }
343
344 for (cur_streamp = top_streamp; cur_streamp != bot_streamp;
345 cur_streamp = cur_streamp->s_next) {
346 /*
347 * Empty stream?
348 */
349 if (!(cur_streamp->s_status & STREAM_ARRAY) &&
350 SOP_EOS(cur_streamp)) {
351 stream_unlink_temporary(cur_streamp);
352 continue;
353 }
354
355 /*
356 * Given that stream is not empty, any error in priming
357 * must be fatal.
358 */
359 if (SOP_PRIME(cur_streamp) != PRIME_SUCCEEDED)
360 die(EMSG_BADPRIME);
361
362 cur_streamp->s_current.l_collate_bufsize =
363 INITIAL_COLLATION_SIZE;
364 cur_streamp->s_current.l_collate.sp =
365 safe_realloc(NULL, INITIAL_COLLATION_SIZE);
366 (void) mg_coll_convert(S->m_fields_head,
367 &cur_streamp->s_current, FCV_REALLOC,
368 S->m_field_separator);
369
370 pqueue_insert(cur_streamp, coll_flags);
371 }
372
373 while (!pqueue_empty()) {
374 cur_streamp = pqueue_head(coll_flags);
375
376 SOP_PUT_LINE(loop_out_streamp, &cur_streamp->s_current);
377 SOP_RELEASE_LINE(cur_streamp);
378
379 if (!SOP_EOS(cur_streamp)) {
380 SOP_FETCH(cur_streamp);
381 (void) mg_coll_convert(S->m_fields_head,
382 &cur_streamp->s_current, FCV_REALLOC,
383 S->m_field_separator);
384 pqueue_insert(cur_streamp, coll_flags);
385 }
386 }
387
388 cur_streamp = top_streamp;
389 while (cur_streamp != bot_streamp) {
390 if (!(cur_streamp->s_status & STREAM_ARRAY))
391 safe_free(cur_streamp->s_current.l_collate.sp);
392 cur_streamp->s_current.l_collate.sp = NULL;
393
394 (void) SOP_FREE(cur_streamp);
395 stream_unlink_temporary(cur_streamp);
396 (void) SOP_CLOSE(cur_streamp);
397
398 cur_streamp = cur_streamp->s_next;
399 }
400
401 (void) SOP_FLUSH(loop_out_streamp);
402
403 if (bot_streamp == NULL)
404 break;
405
406 if (!(loop_out_streamp->s_status & STREAM_NOTFILE)) {
407 (void) SOP_CLOSE(loop_out_streamp);
408 /*
409 * Get file size so that we may treat intermediate files
410 * with our stream_mmap facilities.
411 */
412 stream_stat_chain(loop_out_streamp);
413 __S(stats_incr_merge_files());
414 }
415
416 n_opens = 0;
417
418 top_streamp = bot_streamp;
419 bot_streamp = bot_streamp->s_next;
420 }
421 }
422
423 void
merge(sort_t * S)424 merge(sort_t *S)
425 {
426 stream_t *merge_chain;
427 stream_t *cur_streamp;
428 stream_t out_stream;
429 uint_t n_merges;
430 flag_t coll_flags;
431
432 if (S->m_merge_only) {
433 merge_chain = S->m_input_streams;
434 set_cleanup_chain(&S->m_input_streams);
435 } else {
436 /*
437 * Otherwise we're inheriting the temporary output files from
438 * our internal sort.
439 */
440 merge_chain = S->m_temporary_streams;
441 stream_stat_chain(merge_chain);
442 __S(stats_set_merge_files(stream_count_chain(merge_chain)));
443 }
444
445 if (S->m_field_options & FIELD_REVERSE_COMPARISONS)
446 coll_flags = COLL_REVERSE;
447 else
448 coll_flags = 0;
449 if (S->m_entire_line)
450 coll_flags |= COLL_UNIQUE;
451
452 n_merges = stream_count_chain(merge_chain);
453
454 mg_coll_convert = S->m_coll_convert;
455 cur_streamp = merge_chain;
456
457 switch (n_merges) {
458 case 0:
459 /*
460 * No files for merge.
461 */
462 warn(gettext("no files available to merge\n"));
463 break;
464 case 1:
465 /*
466 * Fast path: only one file for merge.
467 */
468 (void) stream_open_for_read(S, cur_streamp);
469 (void) prepare_output_stream(&out_stream, S);
470 merge_one_stream(S->m_fields_head, cur_streamp,
471 &out_stream, S->m_field_separator);
472 break;
473 case 2:
474 /*
475 * Fast path: only two files for merge.
476 */
477 (void) stream_open_for_read(S, cur_streamp);
478 (void) stream_open_for_read(S, cur_streamp->s_next);
479 if (prepare_output_stream(&out_stream, S) == -1)
480 die(EMSG_DESCRIPTORS);
481 merge_two_streams(S->m_fields_head, cur_streamp,
482 cur_streamp->s_next, &out_stream,
483 S->m_field_separator, coll_flags);
484 break;
485 default:
486 /*
487 * Full merge.
488 */
489 merge_n_streams(S, cur_streamp, n_merges, &out_stream,
490 coll_flags);
491 break;
492 }
493
494 remove_output_guard();
495 }
496