1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 1998-2003 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #include "merge.h" 28 29 /* 30 * External merge sort 31 * 32 * The following code implements the merge phase of sort(1) using a heap-based 33 * priority queue. Fast paths for merging two files as well as outputting a 34 * single file are provided. 35 * 36 * Memory footprint management 37 * 38 * The N-way fan-out of the merge phase can lead to compromising memory 39 * consumption if not constrained, so two mechanisms are used to regulate 40 * the memory footprint during the merge phase: 41 * 42 * 1. Single use memory advice. Since we proceed through each merge file in 43 * order, any line we have output is never required again--at least, not 44 * from that input file. Accordingly, we use the SOP_RELEASE_LINE() 45 * operation to advise that the memory backing the raw data for the stream 46 * up to that line is no longer of interest. (For certain classes of 47 * streams, this leads to an madvise(3C) call with the MADV_DONTNEED 48 * flag.) 49 * 50 * 2. Number of merge files. The number of merge files is constrained based 51 * on the amount of physical memory specified via the -S option (or deemed 52 * available based on an inquiry of sysconf(3C) for _SC_AVPHYS_PAGES). 53 * The number of merge files is calculated based on the average resident 54 * size of a stream that supports the SOP_RELEASE_LINE() operation; this 55 * number is conservative for streams that do not support this operation. 56 * A minimum of four subfiles will always be used, resource limits 57 * permitting. 58 * 59 * Temporary filespace footprint management 60 * 61 * Once the merge sort has utilized a temporary file, it may be deleted at 62 * close, as it's not used again and preserving the files until exit may 63 * compromise sort completion when limited temporary space is available. 64 */ 65 66 static int pq_N; 67 static stream_t **pq_queue; 68 static int (*pq_coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t); 69 70 static ssize_t (*mg_coll_convert)(field_t *, line_rec_t *, flag_t, vchar_t); 71 72 static int 73 prepare_output_stream(stream_t *ostrp, sort_t *S) 74 { 75 stream_clear(ostrp); 76 stream_unset(ostrp, STREAM_OPEN); 77 78 stream_set(ostrp, 79 (S->m_single_byte_locale ? STREAM_SINGLE : STREAM_WIDE) | 80 (S->m_unique_lines ? STREAM_UNIQUE : 0)); 81 82 if (S->m_output_to_stdout) { 83 stream_set(ostrp, STREAM_NOTFILE); 84 ostrp->s_filename = (char *)filename_stdout; 85 } else 86 ostrp->s_filename = S->m_output_filename; 87 88 return (SOP_OPEN_FOR_WRITE(ostrp)); 89 } 90 91 static void 92 merge_one_stream(field_t *fields_chain, stream_t *strp, stream_t *outstrp, 93 vchar_t field_separator) 94 { 95 size_t element_size = strp->s_element_size; 96 size_t initial_size = INITIAL_COLLATION_SIZE * element_size; 97 98 if (strp->s_status & STREAM_SINGLE || strp->s_status & STREAM_WIDE) 99 stream_set(strp, STREAM_INSTANT); 100 101 if (SOP_PRIME(strp) == PRIME_SUCCEEDED) { 102 strp->s_current.l_collate_bufsize = initial_size; 103 strp->s_current.l_collate.sp = safe_realloc(NULL, initial_size); 104 105 (void) mg_coll_convert(fields_chain, &strp->s_current, 106 FCV_REALLOC, field_separator); 107 SOP_PUT_LINE(outstrp, &strp->s_current); 108 SOP_RELEASE_LINE(strp); 109 110 while (!SOP_EOS(strp)) { 111 SOP_FETCH(strp); 112 if (strp->s_current.l_collate_length == 0) 113 (void) mg_coll_convert(fields_chain, 114 &strp->s_current, FCV_REALLOC, 115 field_separator); 116 SOP_PUT_LINE(outstrp, &strp->s_current); 117 SOP_RELEASE_LINE(strp); 118 } 119 120 (void) SOP_CLOSE(strp); 121 SOP_FLUSH(outstrp); 122 } 123 } 124 125 static void 126 merge_two_streams(field_t *fields_chain, stream_t *str_a, stream_t *str_b, 127 stream_t *outstrp, vchar_t field_separator, flag_t coll_flags) 128 { 129 int (*collate_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t); 130 size_t element_size = str_a->s_element_size; 131 size_t initial_size = INITIAL_COLLATION_SIZE * element_size; 132 133 ASSERT(str_a->s_element_size == str_b->s_element_size); 134 135 if (str_a->s_element_size == sizeof (char)) 136 collate_fcn = collated; 137 else 138 collate_fcn = collated_wide; 139 140 if (str_a->s_status & STREAM_SINGLE || str_a->s_status & STREAM_WIDE) 141 stream_set(str_a, STREAM_INSTANT); 142 if (str_b->s_status & STREAM_SINGLE || str_b->s_status & STREAM_WIDE) 143 stream_set(str_b, STREAM_INSTANT); 144 145 if (SOP_PRIME(str_a) != PRIME_SUCCEEDED) { 146 if (SOP_PRIME(str_b) != PRIME_SUCCEEDED) 147 return; 148 149 merge_one_stream(fields_chain, str_b, outstrp, 150 field_separator); 151 return; 152 } 153 154 if (SOP_PRIME(str_b) != PRIME_SUCCEEDED) { 155 merge_one_stream(fields_chain, str_a, outstrp, 156 field_separator); 157 return; 158 } 159 160 str_a->s_current.l_collate_bufsize = 161 str_b->s_current.l_collate_bufsize = initial_size; 162 163 str_a->s_current.l_collate.sp = safe_realloc(NULL, initial_size); 164 str_b->s_current.l_collate.sp = safe_realloc(NULL, initial_size); 165 166 (void) mg_coll_convert(fields_chain, &str_a->s_current, FCV_REALLOC, 167 field_separator); 168 (void) mg_coll_convert(fields_chain, &str_b->s_current, FCV_REALLOC, 169 field_separator); 170 171 for (;;) { 172 if (collate_fcn(&str_a->s_current, &str_b->s_current, 0, 173 coll_flags) < 0) { 174 SOP_PUT_LINE(outstrp, &str_a->s_current); 175 SOP_RELEASE_LINE(str_a); 176 if (SOP_EOS(str_a)) { 177 (void) SOP_CLOSE(str_a); 178 str_a = str_b; 179 break; 180 } 181 SOP_FETCH(str_a); 182 if (str_a->s_current.l_collate_length != 0) 183 continue; 184 (void) mg_coll_convert(fields_chain, &str_a->s_current, 185 FCV_REALLOC, field_separator); 186 } else { 187 SOP_PUT_LINE(outstrp, &str_b->s_current); 188 SOP_RELEASE_LINE(str_b); 189 if (SOP_EOS(str_b)) { 190 SOP_CLOSE(str_b); 191 break; 192 } 193 SOP_FETCH(str_b); 194 if (str_b->s_current.l_collate_length != 0) 195 continue; 196 (void) mg_coll_convert(fields_chain, &str_b->s_current, 197 FCV_REALLOC, field_separator); 198 } 199 } 200 201 SOP_PUT_LINE(outstrp, &str_a->s_current); 202 SOP_RELEASE_LINE(str_a); 203 204 while (!SOP_EOS(str_a)) { 205 SOP_FETCH(str_a); 206 if (str_a->s_current.l_collate_length == 0) 207 (void) mg_coll_convert(fields_chain, &str_a->s_current, 208 FCV_REALLOC, field_separator); 209 SOP_PUT_LINE(outstrp, &str_a->s_current); 210 SOP_RELEASE_LINE(str_a); 211 } 212 213 (void) SOP_CLOSE(str_a); 214 SOP_FLUSH(outstrp); 215 } 216 217 /* 218 * priority queue routines 219 * used for merges involving more than two sources 220 */ 221 static void 222 heap_up(stream_t **A, int k, flag_t coll_flags) 223 { 224 while (k > 1 && 225 pq_coll_fcn(&A[k / 2]->s_current, &A[k]->s_current, 0, 226 coll_flags) > 0) { 227 swap((void **)&pq_queue[k], (void **)&pq_queue[k / 2]); 228 k /= 2; 229 } 230 } 231 232 static void 233 heap_down(stream_t **A, int k, int N, flag_t coll_flags) 234 { 235 int j; 236 237 while (2 * k <= N) { 238 j = 2 * k; 239 if (j < N && pq_coll_fcn(&A[j]->s_current, 240 &A[j + 1]->s_current, 0, coll_flags) > 0) 241 j++; 242 if (pq_coll_fcn(&A[k]->s_current, &A[j]->s_current, 0, 243 coll_flags) <= 0) 244 break; 245 swap((void **)&pq_queue[k], (void **)&pq_queue[j]); 246 k = j; 247 } 248 } 249 250 static int 251 pqueue_empty() 252 { 253 return (pq_N == 0); 254 } 255 256 static void 257 pqueue_init(size_t max_size, 258 int (*coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t)) 259 { 260 pq_queue = safe_realloc(NULL, sizeof (stream_t *) * (max_size + 1)); 261 pq_N = 0; 262 pq_coll_fcn = coll_fcn; 263 } 264 265 static void 266 pqueue_insert(stream_t *source, flag_t coll_flags) 267 { 268 pq_queue[++pq_N] = source; 269 heap_up(pq_queue, pq_N, coll_flags); 270 } 271 272 static stream_t * 273 pqueue_head(flag_t coll_flags) 274 { 275 swap((void **)&pq_queue[1], (void **)&pq_queue[pq_N]); 276 heap_down(pq_queue, 1, pq_N - 1, coll_flags); 277 return (pq_queue[pq_N--]); 278 } 279 280 static void 281 merge_n_streams(sort_t *S, stream_t *head_streamp, int n_streams, 282 stream_t *out_streamp, flag_t coll_flags) 283 { 284 stream_t *top_streamp; 285 stream_t *cur_streamp; 286 stream_t *bot_streamp; 287 stream_t *loop_out_streamp; 288 flag_t is_single_byte = S->m_single_byte_locale; 289 290 int n_opens = 0; 291 int threshold_opens; 292 293 threshold_opens = MAX(4, 294 2 * S->m_memory_available / DEFAULT_RELEASE_SIZE); 295 296 pqueue_init(n_streams, is_single_byte ? collated : collated_wide); 297 298 top_streamp = bot_streamp = head_streamp; 299 300 for (;;) { 301 hold_file_descriptor(); 302 while (bot_streamp != NULL) { 303 304 if (n_opens > threshold_opens || 305 stream_open_for_read(S, bot_streamp) == -1) { 306 /* 307 * Available file descriptors would exceed 308 * memory target or have been exhausted; back 309 * off to the last valid, primed stream. 310 */ 311 bot_streamp = bot_streamp->s_previous; 312 break; 313 } 314 315 if (bot_streamp->s_status & STREAM_SINGLE || 316 bot_streamp->s_status & STREAM_WIDE) 317 stream_set(bot_streamp, STREAM_INSTANT); 318 319 bot_streamp = bot_streamp->s_next; 320 n_opens++; 321 } 322 release_file_descriptor(); 323 324 if (bot_streamp == NULL) { 325 if (prepare_output_stream(out_streamp, S) != -1) 326 loop_out_streamp = out_streamp; 327 else 328 die(EMSG_DESCRIPTORS); 329 } else { 330 loop_out_streamp = stream_push_to_temporary( 331 &head_streamp, NULL, ST_OPEN | ST_NOCACHE | 332 (is_single_byte ? 0 : ST_WIDE)); 333 334 if (loop_out_streamp == NULL || 335 top_streamp == bot_streamp) 336 /* 337 * We need three file descriptors to make 338 * progress; if top_streamp == bot_streamp, then 339 * we have only two. 340 */ 341 die(EMSG_DESCRIPTORS); 342 } 343 344 for (cur_streamp = top_streamp; cur_streamp != bot_streamp; 345 cur_streamp = cur_streamp->s_next) { 346 /* 347 * Empty stream? 348 */ 349 if (!(cur_streamp->s_status & STREAM_ARRAY) && 350 SOP_EOS(cur_streamp)) { 351 stream_unlink_temporary(cur_streamp); 352 continue; 353 } 354 355 /* 356 * Given that stream is not empty, any error in priming 357 * must be fatal. 358 */ 359 if (SOP_PRIME(cur_streamp) != PRIME_SUCCEEDED) 360 die(EMSG_BADPRIME); 361 362 cur_streamp->s_current.l_collate_bufsize = 363 INITIAL_COLLATION_SIZE; 364 cur_streamp->s_current.l_collate.sp = 365 safe_realloc(NULL, INITIAL_COLLATION_SIZE); 366 (void) mg_coll_convert(S->m_fields_head, 367 &cur_streamp->s_current, FCV_REALLOC, 368 S->m_field_separator); 369 370 pqueue_insert(cur_streamp, coll_flags); 371 } 372 373 while (!pqueue_empty()) { 374 cur_streamp = pqueue_head(coll_flags); 375 376 SOP_PUT_LINE(loop_out_streamp, &cur_streamp->s_current); 377 SOP_RELEASE_LINE(cur_streamp); 378 379 if (!SOP_EOS(cur_streamp)) { 380 SOP_FETCH(cur_streamp); 381 (void) mg_coll_convert(S->m_fields_head, 382 &cur_streamp->s_current, FCV_REALLOC, 383 S->m_field_separator); 384 pqueue_insert(cur_streamp, coll_flags); 385 } 386 } 387 388 cur_streamp = top_streamp; 389 while (cur_streamp != bot_streamp) { 390 if (!(cur_streamp->s_status & STREAM_ARRAY)) 391 safe_free(cur_streamp->s_current.l_collate.sp); 392 cur_streamp->s_current.l_collate.sp = NULL; 393 394 (void) SOP_FREE(cur_streamp); 395 stream_unlink_temporary(cur_streamp); 396 (void) SOP_CLOSE(cur_streamp); 397 398 cur_streamp = cur_streamp->s_next; 399 } 400 401 (void) SOP_FLUSH(loop_out_streamp); 402 403 if (bot_streamp == NULL) 404 break; 405 406 if (!(loop_out_streamp->s_status & STREAM_NOTFILE)) { 407 (void) SOP_CLOSE(loop_out_streamp); 408 /* 409 * Get file size so that we may treat intermediate files 410 * with our stream_mmap facilities. 411 */ 412 stream_stat_chain(loop_out_streamp); 413 __S(stats_incr_merge_files()); 414 } 415 416 n_opens = 0; 417 418 top_streamp = bot_streamp; 419 bot_streamp = bot_streamp->s_next; 420 } 421 } 422 423 void 424 merge(sort_t *S) 425 { 426 stream_t *merge_chain; 427 stream_t *cur_streamp; 428 stream_t out_stream; 429 uint_t n_merges; 430 flag_t coll_flags; 431 432 if (S->m_merge_only) { 433 merge_chain = S->m_input_streams; 434 set_cleanup_chain(&S->m_input_streams); 435 } else { 436 /* 437 * Otherwise we're inheriting the temporary output files from 438 * our internal sort. 439 */ 440 merge_chain = S->m_temporary_streams; 441 stream_stat_chain(merge_chain); 442 __S(stats_set_merge_files(stream_count_chain(merge_chain))); 443 } 444 445 if (S->m_field_options & FIELD_REVERSE_COMPARISONS) 446 coll_flags = COLL_REVERSE; 447 else 448 coll_flags = 0; 449 if (S->m_entire_line) 450 coll_flags |= COLL_UNIQUE; 451 452 n_merges = stream_count_chain(merge_chain); 453 454 mg_coll_convert = S->m_coll_convert; 455 cur_streamp = merge_chain; 456 457 switch (n_merges) { 458 case 0: 459 /* 460 * No files for merge. 461 */ 462 warn(gettext("no files available to merge\n")); 463 break; 464 case 1: 465 /* 466 * Fast path: only one file for merge. 467 */ 468 (void) stream_open_for_read(S, cur_streamp); 469 (void) prepare_output_stream(&out_stream, S); 470 merge_one_stream(S->m_fields_head, cur_streamp, 471 &out_stream, S->m_field_separator); 472 break; 473 case 2: 474 /* 475 * Fast path: only two files for merge. 476 */ 477 (void) stream_open_for_read(S, cur_streamp); 478 (void) stream_open_for_read(S, cur_streamp->s_next); 479 if (prepare_output_stream(&out_stream, S) == -1) 480 die(EMSG_DESCRIPTORS); 481 merge_two_streams(S->m_fields_head, cur_streamp, 482 cur_streamp->s_next, &out_stream, 483 S->m_field_separator, coll_flags); 484 break; 485 default: 486 /* 487 * Full merge. 488 */ 489 merge_n_streams(S, cur_streamp, n_merges, &out_stream, 490 coll_flags); 491 break; 492 } 493 494 remove_output_guard(); 495 } 496