1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 1998-2003 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "merge.h" 30 31 /* 32 * External merge sort 33 * 34 * The following code implements the merge phase of sort(1) using a heap-based 35 * priority queue. Fast paths for merging two files as well as outputting a 36 * single file are provided. 37 * 38 * Memory footprint management 39 * 40 * The N-way fan-out of the merge phase can lead to compromising memory 41 * consumption if not constrained, so two mechanisms are used to regulate 42 * the memory footprint during the merge phase: 43 * 44 * 1. Single use memory advice. Since we proceed through each merge file in 45 * order, any line we have output is never required again--at least, not 46 * from that input file. Accordingly, we use the SOP_RELEASE_LINE() 47 * operation to advise that the memory backing the raw data for the stream 48 * up to that line is no longer of interest. (For certain classes of 49 * streams, this leads to an madvise(3C) call with the MADV_DONTNEED 50 * flag.) 51 * 52 * 2. Number of merge files. The number of merge files is constrained based 53 * on the amount of physical memory specified via the -S option (or deemed 54 * available based on an inquiry of sysconf(3C) for _SC_AVPHYS_PAGES). 55 * The number of merge files is calculated based on the average resident 56 * size of a stream that supports the SOP_RELEASE_LINE() operation; this 57 * number is conservative for streams that do not support this operation. 58 * A minimum of four subfiles will always be used, resource limits 59 * permitting. 60 * 61 * Temporary filespace footprint management 62 * 63 * Once the merge sort has utilized a temporary file, it may be deleted at 64 * close, as it's not used again and preserving the files until exit may 65 * compromise sort completion when limited temporary space is available. 66 */ 67 68 static int pq_N; 69 static stream_t **pq_queue; 70 static int (*pq_coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t); 71 72 static ssize_t (*mg_coll_convert)(field_t *, line_rec_t *, flag_t, vchar_t); 73 74 static int 75 prepare_output_stream(stream_t *ostrp, sort_t *S) 76 { 77 stream_clear(ostrp); 78 stream_unset(ostrp, STREAM_OPEN); 79 80 stream_set(ostrp, 81 (S->m_single_byte_locale ? STREAM_SINGLE : STREAM_WIDE) | 82 (S->m_unique_lines ? STREAM_UNIQUE : 0)); 83 84 if (S->m_output_to_stdout) { 85 stream_set(ostrp, STREAM_NOTFILE); 86 ostrp->s_filename = (char *)filename_stdout; 87 } else 88 ostrp->s_filename = S->m_output_filename; 89 90 return (SOP_OPEN_FOR_WRITE(ostrp)); 91 } 92 93 static void 94 merge_one_stream(field_t *fields_chain, stream_t *strp, stream_t *outstrp, 95 vchar_t field_separator) 96 { 97 size_t element_size = strp->s_element_size; 98 size_t initial_size = INITIAL_COLLATION_SIZE * element_size; 99 100 if (strp->s_status & STREAM_SINGLE || strp->s_status & STREAM_WIDE) 101 stream_set(strp, STREAM_INSTANT); 102 103 if (SOP_PRIME(strp) == PRIME_SUCCEEDED) { 104 strp->s_current.l_collate_bufsize = initial_size; 105 strp->s_current.l_collate.sp = safe_realloc(NULL, initial_size); 106 107 (void) mg_coll_convert(fields_chain, &strp->s_current, 108 FCV_REALLOC, field_separator); 109 SOP_PUT_LINE(outstrp, &strp->s_current); 110 SOP_RELEASE_LINE(strp); 111 112 while (!SOP_EOS(strp)) { 113 SOP_FETCH(strp); 114 if (strp->s_current.l_collate_length == 0) 115 (void) mg_coll_convert(fields_chain, 116 &strp->s_current, FCV_REALLOC, 117 field_separator); 118 SOP_PUT_LINE(outstrp, &strp->s_current); 119 SOP_RELEASE_LINE(strp); 120 } 121 122 (void) SOP_CLOSE(strp); 123 SOP_FLUSH(outstrp); 124 } 125 } 126 127 static void 128 merge_two_streams(field_t *fields_chain, stream_t *str_a, stream_t *str_b, 129 stream_t *outstrp, vchar_t field_separator, flag_t coll_flags) 130 { 131 int (*collate_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t); 132 size_t element_size = str_a->s_element_size; 133 size_t initial_size = INITIAL_COLLATION_SIZE * element_size; 134 135 ASSERT(str_a->s_element_size == str_b->s_element_size); 136 137 if (str_a->s_element_size == sizeof (char)) 138 collate_fcn = collated; 139 else 140 collate_fcn = collated_wide; 141 142 if (str_a->s_status & STREAM_SINGLE || str_a->s_status & STREAM_WIDE) 143 stream_set(str_a, STREAM_INSTANT); 144 if (str_b->s_status & STREAM_SINGLE || str_b->s_status & STREAM_WIDE) 145 stream_set(str_b, STREAM_INSTANT); 146 147 if (SOP_PRIME(str_a) != PRIME_SUCCEEDED) { 148 if (SOP_PRIME(str_b) != PRIME_SUCCEEDED) 149 return; 150 151 merge_one_stream(fields_chain, str_b, outstrp, 152 field_separator); 153 return; 154 } 155 156 if (SOP_PRIME(str_b) != PRIME_SUCCEEDED) { 157 merge_one_stream(fields_chain, str_a, outstrp, 158 field_separator); 159 return; 160 } 161 162 str_a->s_current.l_collate_bufsize = 163 str_b->s_current.l_collate_bufsize = initial_size; 164 165 str_a->s_current.l_collate.sp = safe_realloc(NULL, initial_size); 166 str_b->s_current.l_collate.sp = safe_realloc(NULL, initial_size); 167 168 (void) mg_coll_convert(fields_chain, &str_a->s_current, FCV_REALLOC, 169 field_separator); 170 (void) mg_coll_convert(fields_chain, &str_b->s_current, FCV_REALLOC, 171 field_separator); 172 173 for (;;) { 174 if (collate_fcn(&str_a->s_current, &str_b->s_current, 0, 175 coll_flags) < 0) { 176 SOP_PUT_LINE(outstrp, &str_a->s_current); 177 SOP_RELEASE_LINE(str_a); 178 if (SOP_EOS(str_a)) { 179 (void) SOP_CLOSE(str_a); 180 str_a = str_b; 181 break; 182 } 183 SOP_FETCH(str_a); 184 if (str_a->s_current.l_collate_length != 0) 185 continue; 186 (void) mg_coll_convert(fields_chain, &str_a->s_current, 187 FCV_REALLOC, field_separator); 188 } else { 189 SOP_PUT_LINE(outstrp, &str_b->s_current); 190 SOP_RELEASE_LINE(str_b); 191 if (SOP_EOS(str_b)) { 192 SOP_CLOSE(str_b); 193 break; 194 } 195 SOP_FETCH(str_b); 196 if (str_b->s_current.l_collate_length != 0) 197 continue; 198 (void) mg_coll_convert(fields_chain, &str_b->s_current, 199 FCV_REALLOC, field_separator); 200 } 201 } 202 203 SOP_PUT_LINE(outstrp, &str_a->s_current); 204 SOP_RELEASE_LINE(str_a); 205 206 while (!SOP_EOS(str_a)) { 207 SOP_FETCH(str_a); 208 if (str_a->s_current.l_collate_length == 0) 209 (void) mg_coll_convert(fields_chain, &str_a->s_current, 210 FCV_REALLOC, field_separator); 211 SOP_PUT_LINE(outstrp, &str_a->s_current); 212 SOP_RELEASE_LINE(str_a); 213 } 214 215 (void) SOP_CLOSE(str_a); 216 SOP_FLUSH(outstrp); 217 } 218 219 /* 220 * priority queue routines 221 * used for merges involving more than two sources 222 */ 223 static void 224 heap_up(stream_t **A, int k, flag_t coll_flags) 225 { 226 while (k > 1 && 227 pq_coll_fcn(&A[k / 2]->s_current, &A[k]->s_current, 0, 228 coll_flags) > 0) { 229 swap((void **)&pq_queue[k], (void **)&pq_queue[k / 2]); 230 k /= 2; 231 } 232 } 233 234 static void 235 heap_down(stream_t **A, int k, int N, flag_t coll_flags) 236 { 237 int j; 238 239 while (2 * k <= N) { 240 j = 2 * k; 241 if (j < N && pq_coll_fcn(&A[j]->s_current, 242 &A[j + 1]->s_current, 0, coll_flags) > 0) 243 j++; 244 if (pq_coll_fcn(&A[k]->s_current, &A[j]->s_current, 0, 245 coll_flags) <= 0) 246 break; 247 swap((void **)&pq_queue[k], (void **)&pq_queue[j]); 248 k = j; 249 } 250 } 251 252 static int 253 pqueue_empty() 254 { 255 return (pq_N == 0); 256 } 257 258 static void 259 pqueue_init(size_t max_size, 260 int (*coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t)) 261 { 262 pq_queue = safe_realloc(NULL, sizeof (stream_t *) * (max_size + 1)); 263 pq_N = 0; 264 pq_coll_fcn = coll_fcn; 265 } 266 267 static void 268 pqueue_insert(stream_t *source, flag_t coll_flags) 269 { 270 pq_queue[++pq_N] = source; 271 heap_up(pq_queue, pq_N, coll_flags); 272 } 273 274 static stream_t * 275 pqueue_head(flag_t coll_flags) 276 { 277 swap((void **)&pq_queue[1], (void **)&pq_queue[pq_N]); 278 heap_down(pq_queue, 1, pq_N - 1, coll_flags); 279 return (pq_queue[pq_N--]); 280 } 281 282 static void 283 merge_n_streams(sort_t *S, stream_t *head_streamp, int n_streams, 284 stream_t *out_streamp, flag_t coll_flags) 285 { 286 stream_t *top_streamp; 287 stream_t *cur_streamp; 288 stream_t *bot_streamp; 289 stream_t *loop_out_streamp; 290 flag_t is_single_byte = S->m_single_byte_locale; 291 292 int n_opens = 0; 293 int threshold_opens; 294 295 threshold_opens = MAX(4, 296 2 * S->m_memory_available / DEFAULT_RELEASE_SIZE); 297 298 pqueue_init(n_streams, is_single_byte ? collated : collated_wide); 299 300 top_streamp = bot_streamp = head_streamp; 301 302 for (;;) { 303 hold_file_descriptor(); 304 while (bot_streamp != NULL) { 305 306 if (n_opens > threshold_opens || 307 stream_open_for_read(S, bot_streamp) == -1) { 308 /* 309 * Available file descriptors would exceed 310 * memory target or have been exhausted; back 311 * off to the last valid, primed stream. 312 */ 313 bot_streamp = bot_streamp->s_previous; 314 break; 315 } 316 317 if (bot_streamp->s_status & STREAM_SINGLE || 318 bot_streamp->s_status & STREAM_WIDE) 319 stream_set(bot_streamp, STREAM_INSTANT); 320 321 bot_streamp = bot_streamp->s_next; 322 n_opens++; 323 } 324 release_file_descriptor(); 325 326 if (bot_streamp == NULL) { 327 if (prepare_output_stream(out_streamp, S) != -1) 328 loop_out_streamp = out_streamp; 329 else 330 die(EMSG_DESCRIPTORS); 331 } else { 332 loop_out_streamp = stream_push_to_temporary( 333 &head_streamp, NULL, ST_OPEN | ST_NOCACHE | 334 (is_single_byte ? 0 : ST_WIDE)); 335 336 if (loop_out_streamp == NULL || 337 top_streamp == bot_streamp) 338 /* 339 * We need three file descriptors to make 340 * progress; if top_streamp == bot_streamp, then 341 * we have only two. 342 */ 343 die(EMSG_DESCRIPTORS); 344 } 345 346 for (cur_streamp = top_streamp; cur_streamp != bot_streamp; 347 cur_streamp = cur_streamp->s_next) { 348 /* 349 * Empty stream? 350 */ 351 if (!(cur_streamp->s_status & STREAM_ARRAY) && 352 SOP_EOS(cur_streamp)) { 353 stream_unlink_temporary(cur_streamp); 354 continue; 355 } 356 357 /* 358 * Given that stream is not empty, any error in priming 359 * must be fatal. 360 */ 361 if (SOP_PRIME(cur_streamp) != PRIME_SUCCEEDED) 362 die(EMSG_BADPRIME); 363 364 cur_streamp->s_current.l_collate_bufsize = 365 INITIAL_COLLATION_SIZE; 366 cur_streamp->s_current.l_collate.sp = 367 safe_realloc(NULL, INITIAL_COLLATION_SIZE); 368 (void) mg_coll_convert(S->m_fields_head, 369 &cur_streamp->s_current, FCV_REALLOC, 370 S->m_field_separator); 371 372 pqueue_insert(cur_streamp, coll_flags); 373 } 374 375 while (!pqueue_empty()) { 376 cur_streamp = pqueue_head(coll_flags); 377 378 SOP_PUT_LINE(loop_out_streamp, &cur_streamp->s_current); 379 SOP_RELEASE_LINE(cur_streamp); 380 381 if (!SOP_EOS(cur_streamp)) { 382 SOP_FETCH(cur_streamp); 383 (void) mg_coll_convert(S->m_fields_head, 384 &cur_streamp->s_current, FCV_REALLOC, 385 S->m_field_separator); 386 pqueue_insert(cur_streamp, coll_flags); 387 } 388 } 389 390 cur_streamp = top_streamp; 391 while (cur_streamp != bot_streamp) { 392 if (!(cur_streamp->s_status & STREAM_ARRAY)) 393 safe_free(cur_streamp->s_current.l_collate.sp); 394 cur_streamp->s_current.l_collate.sp = NULL; 395 396 (void) SOP_FREE(cur_streamp); 397 stream_unlink_temporary(cur_streamp); 398 (void) SOP_CLOSE(cur_streamp); 399 400 cur_streamp = cur_streamp->s_next; 401 } 402 403 (void) SOP_FLUSH(loop_out_streamp); 404 405 if (bot_streamp == NULL) 406 break; 407 408 if (!(loop_out_streamp->s_status & STREAM_NOTFILE)) { 409 (void) SOP_CLOSE(loop_out_streamp); 410 /* 411 * Get file size so that we may treat intermediate files 412 * with our stream_mmap facilities. 413 */ 414 stream_stat_chain(loop_out_streamp); 415 __S(stats_incr_merge_files()); 416 } 417 418 n_opens = 0; 419 420 top_streamp = bot_streamp; 421 bot_streamp = bot_streamp->s_next; 422 } 423 } 424 425 void 426 merge(sort_t *S) 427 { 428 stream_t *merge_chain; 429 stream_t *cur_streamp; 430 stream_t out_stream; 431 uint_t n_merges; 432 flag_t coll_flags; 433 434 if (S->m_merge_only) { 435 merge_chain = S->m_input_streams; 436 set_cleanup_chain(&S->m_input_streams); 437 } else { 438 /* 439 * Otherwise we're inheriting the temporary output files from 440 * our internal sort. 441 */ 442 merge_chain = S->m_temporary_streams; 443 stream_stat_chain(merge_chain); 444 __S(stats_set_merge_files(stream_count_chain(merge_chain))); 445 } 446 447 if (S->m_field_options & FIELD_REVERSE_COMPARISONS) 448 coll_flags = COLL_REVERSE; 449 else 450 coll_flags = 0; 451 if (S->m_entire_line) 452 coll_flags |= COLL_UNIQUE; 453 454 n_merges = stream_count_chain(merge_chain); 455 456 mg_coll_convert = S->m_coll_convert; 457 cur_streamp = merge_chain; 458 459 switch (n_merges) { 460 case 0: 461 /* 462 * No files for merge. 463 */ 464 warn(gettext("no files available to merge\n")); 465 break; 466 case 1: 467 /* 468 * Fast path: only one file for merge. 469 */ 470 (void) stream_open_for_read(S, cur_streamp); 471 (void) prepare_output_stream(&out_stream, S); 472 merge_one_stream(S->m_fields_head, cur_streamp, 473 &out_stream, S->m_field_separator); 474 break; 475 case 2: 476 /* 477 * Fast path: only two files for merge. 478 */ 479 (void) stream_open_for_read(S, cur_streamp); 480 (void) stream_open_for_read(S, cur_streamp->s_next); 481 if (prepare_output_stream(&out_stream, S) == -1) 482 die(EMSG_DESCRIPTORS); 483 merge_two_streams(S->m_fields_head, cur_streamp, 484 cur_streamp->s_next, &out_stream, 485 S->m_field_separator, coll_flags); 486 break; 487 default: 488 /* 489 * Full merge. 490 */ 491 merge_n_streams(S, cur_streamp, n_merges, &out_stream, 492 coll_flags); 493 break; 494 } 495 496 remove_output_guard(); 497 } 498