1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #pragma ident "%Z%%M% %I% %E% SMI" 27 28 #include "streams.h" 29 30 static const stream_ops_t invalid_ops = { 31 NULL, 32 NULL, 33 NULL, 34 NULL, 35 NULL, 36 NULL, 37 NULL, 38 NULL, 39 NULL, 40 NULL 41 }; 42 43 stream_t * 44 stream_new(int src) 45 { 46 stream_t *str = safe_realloc(NULL, sizeof (stream_t)); 47 48 stream_clear(str); 49 stream_set(str, src); 50 51 return (str); 52 } 53 54 void 55 stream_set(stream_t *str, flag_t flags) 56 { 57 if (flags & STREAM_SOURCE_MASK) { 58 ASSERT((flags & STREAM_SOURCE_MASK) == STREAM_ARRAY || 59 (flags & STREAM_SOURCE_MASK) == STREAM_SINGLE || 60 (flags & STREAM_SOURCE_MASK) == STREAM_MMAP || 61 (flags & STREAM_SOURCE_MASK) == STREAM_WIDE); 62 63 str->s_status &= ~STREAM_SOURCE_MASK; 64 str->s_status |= flags & STREAM_SOURCE_MASK; 65 66 switch (flags & STREAM_SOURCE_MASK) { 67 case STREAM_NO_SOURCE: 68 str->s_element_size = 0; 69 str->s_ops = invalid_ops; 70 return; 71 case STREAM_ARRAY: 72 /* 73 * Array streams inherit element size. 74 */ 75 str->s_ops = stream_array_ops; 76 break; 77 case STREAM_MMAP: 78 str->s_element_size = sizeof (char); 79 str->s_ops = stream_mmap_ops; 80 break; 81 case STREAM_SINGLE: 82 str->s_element_size = sizeof (char); 83 str->s_ops = stream_stdio_ops; 84 break; 85 case STREAM_WIDE: 86 str->s_element_size = sizeof (wchar_t); 87 str->s_ops = stream_wide_ops; 88 break; 89 default: 90 die(EMSG_UNKN_STREAM, str->s_status); 91 } 92 } 93 94 str->s_status |= (flags & ~STREAM_SOURCE_MASK); 95 96 if (str->s_status & STREAM_UNIQUE) 97 switch (str->s_status & STREAM_SOURCE_MASK) { 98 case STREAM_SINGLE : 99 str->s_ops.sop_put_line = 100 stream_stdio_put_line_unique; 101 break; 102 case STREAM_WIDE : 103 str->s_ops.sop_put_line = 104 stream_wide_put_line_unique; 105 break; 106 default : 107 break; 108 } 109 110 if (str->s_status & STREAM_INSTANT) 111 switch (str->s_status & STREAM_SOURCE_MASK) { 112 case STREAM_SINGLE : 113 str->s_ops.sop_fetch = 114 stream_stdio_fetch_overwrite; 115 break; 116 case STREAM_WIDE : 117 str->s_ops.sop_fetch = 118 stream_wide_fetch_overwrite; 119 break; 120 default : 121 break; 122 } 123 } 124 125 void 126 stream_unset(stream_t *streamp, flag_t flags) 127 { 128 ASSERT(!(flags & STREAM_SOURCE_MASK)); 129 130 streamp->s_status &= ~(flags & ~STREAM_SOURCE_MASK); 131 } 132 133 int 134 stream_is_primed(stream_t *streamp) 135 { 136 return (streamp->s_status & STREAM_PRIMED); 137 } 138 139 void 140 stream_clear(stream_t *str) 141 { 142 (void) memset(str, 0, sizeof (stream_t)); 143 } 144 145 static void 146 stream_copy(stream_t *dest, stream_t *src) 147 { 148 (void) memcpy(dest, src, sizeof (stream_t)); 149 } 150 151 void 152 stream_stat_chain(stream_t *strp) 153 { 154 struct stat buf; 155 stream_t *cur_strp = strp; 156 157 while (cur_strp != NULL) { 158 if (cur_strp->s_status & STREAM_NOTFILE || 159 cur_strp->s_status & STREAM_ARRAY) { 160 cur_strp = cur_strp->s_next; 161 continue; 162 } 163 164 if (stat(cur_strp->s_filename, &buf) < 0) 165 die(EMSG_STAT, cur_strp->s_filename); 166 167 cur_strp->s_dev = buf.st_dev; 168 cur_strp->s_ino = buf.st_ino; 169 cur_strp->s_filesize = buf.st_size; 170 171 cur_strp = cur_strp->s_next; 172 } 173 } 174 175 uint_t 176 stream_count_chain(stream_t *str) 177 { 178 uint_t n = 0; 179 180 while (str != NULL) { 181 n++; 182 str = str->s_next; 183 } 184 185 return (n); 186 } 187 188 int 189 stream_open_for_read(sort_t *S, stream_t *str) 190 { 191 int fd; 192 193 ASSERT(!(str->s_status & STREAM_OUTPUT)); 194 195 /* 196 * STREAM_ARRAY streams are open by definition. 197 */ 198 if ((str->s_status & STREAM_SOURCE_MASK) == STREAM_ARRAY) { 199 stream_set(str, STREAM_ARRAY | STREAM_OPEN); 200 return (1); 201 } 202 203 /* 204 * Set data type according to locale for input from stdin. 205 */ 206 if (str->s_status & STREAM_NOTFILE) { 207 str->s_type.BF.s_fp = stdin; 208 stream_set(str, STREAM_OPEN | (S->m_single_byte_locale ? 209 STREAM_SINGLE : STREAM_WIDE)); 210 return (1); 211 } 212 213 ASSERT(str->s_filename); 214 215 #ifndef DEBUG_DISALLOW_MMAP 216 if (S->m_single_byte_locale && 217 str->s_filesize > 0 && 218 str->s_filesize < SSIZE_MAX) { 219 /* 220 * make mmap() attempt; set s_status and return if successful 221 */ 222 fd = open(str->s_filename, O_RDONLY); 223 if (fd < 0) { 224 if (errno == EMFILE || errno == ENFILE) 225 return (-1); 226 else 227 die(EMSG_OPEN, str->s_filename); 228 } 229 str->s_buffer = mmap(0, str->s_filesize, PROT_READ, 230 MAP_SHARED, fd, 0); 231 232 if (str->s_buffer != MAP_FAILED) { 233 str->s_buffer_size = str->s_filesize; 234 str->s_type.SF.s_fd = fd; 235 236 stream_set(str, STREAM_MMAP | STREAM_OPEN); 237 stream_unset(str, STREAM_PRIMED); 238 return (1); 239 } 240 241 /* 242 * Otherwise the mmap() failed due to address space exhaustion; 243 * since we have already opened the file, we close it and drop 244 * into the normal (STDIO) case. 245 */ 246 (void) close(fd); 247 str->s_buffer = NULL; 248 } 249 #endif /* DEBUG_DISALLOW_MMAP */ 250 251 if ((str->s_type.BF.s_fp = fopen(str->s_filename, "r")) == NULL) { 252 if (errno == EMFILE || errno == ENFILE) 253 return (-1); 254 else 255 die(EMSG_OPEN, str->s_filename); 256 } 257 258 str->s_type.BF.s_vbuf = safe_realloc(NULL, STDIO_VBUF_SIZE); 259 if (setvbuf(str->s_type.BF.s_fp, str->s_type.BF.s_vbuf, _IOFBF, 260 STDIO_VBUF_SIZE) != 0) { 261 safe_free(str->s_type.BF.s_vbuf); 262 str->s_type.BF.s_vbuf = NULL; 263 } 264 265 stream_set(str, STREAM_OPEN | (S->m_single_byte_locale ? STREAM_SINGLE : 266 STREAM_WIDE)); 267 stream_unset(str, STREAM_PRIMED); 268 269 return (1); 270 } 271 272 void 273 stream_set_size(stream_t *str, size_t new_size) 274 { 275 /* 276 * p_new_size is new_size rounded upwards to nearest multiple of 277 * PAGESIZE, since mmap() is going to reserve it in any case. This 278 * ensures that the far end of the buffer is also aligned, such that we 279 * obtain aligned pointers if we choose to subtract from it. 280 */ 281 size_t p_new_size = (new_size + PAGESIZE) & ~(PAGESIZE - 1); 282 283 if (str->s_buffer_size == p_new_size) 284 return; 285 286 if (str->s_buffer != NULL) 287 (void) munmap(str->s_buffer, str->s_buffer_size); 288 289 if (new_size == 0) { 290 str->s_buffer = NULL; 291 str->s_buffer_size = 0; 292 return; 293 } 294 295 str->s_buffer = xzmap(0, p_new_size, PROT_READ | PROT_WRITE, 296 MAP_PRIVATE, 0); 297 298 if (str->s_buffer == MAP_FAILED) 299 die(EMSG_MMAP); 300 301 str->s_buffer_size = p_new_size; 302 } 303 304 void 305 stream_add_file_to_chain(stream_t **str_chain, char *filename) 306 { 307 stream_t *str; 308 309 str = stream_new(STREAM_NO_SOURCE); 310 311 str->s_filename = filename; 312 str->s_type.SF.s_fd = -1; 313 314 stream_push_to_chain(str_chain, str); 315 } 316 317 void 318 stream_push_to_chain(stream_t **str_chain, stream_t *streamp) 319 { 320 stream_t *cur_streamp = *str_chain; 321 322 if (cur_streamp == NULL) { 323 *str_chain = streamp; 324 streamp->s_next = NULL; 325 return; 326 } 327 328 while (cur_streamp->s_next != NULL) 329 cur_streamp = cur_streamp->s_next; 330 331 cur_streamp->s_next = streamp; 332 streamp->s_previous = cur_streamp; 333 streamp->s_next = NULL; 334 } 335 336 static void 337 stream_dump(stream_t *str_in, stream_t *str_out) 338 { 339 ASSERT(!(str_in->s_status & STREAM_OUTPUT)); 340 ASSERT(str_out->s_status & STREAM_OUTPUT); 341 342 SOP_PUT_LINE(str_out, &str_in->s_current); 343 344 while (!SOP_EOS(str_in)) { 345 SOP_FETCH(str_in); 346 SOP_PUT_LINE(str_out, &str_in->s_current); 347 } 348 } 349 350 /* 351 * stream_push_to_temporary() with flags set to ST_CACHE merely copies the 352 * stream_t pointer onto the chain. With flags set to ST_NOCACHE, the stream is 353 * written out to a file. Stream pointers passed to stream_push_to_temporary() 354 * must refer to allocated objects, and not to objects created on function 355 * stacks. Finally, if strp == NULL, stream_push_to_temporary() creates and 356 * pushes the new stream; the output stream is left open if ST_OPEN is set. 357 */ 358 stream_t * 359 stream_push_to_temporary(stream_t **str_chain, stream_t *streamp, int flags) 360 { 361 stream_t *out_streamp; 362 363 if (flags & ST_CACHE) { 364 ASSERT(streamp->s_status & STREAM_ARRAY); 365 stream_set(streamp, STREAM_NOT_FREEABLE | STREAM_TEMPORARY); 366 stream_push_to_chain(str_chain, streamp); 367 return (streamp); 368 } 369 370 out_streamp = safe_realloc(NULL, sizeof (stream_t)); 371 372 if (streamp != NULL) { 373 stream_copy(out_streamp, streamp); 374 stream_unset(out_streamp, STREAM_OPEN); 375 ASSERT(streamp->s_element_size == sizeof (char) || 376 streamp->s_element_size == sizeof (wchar_t)); 377 stream_set(out_streamp, 378 streamp->s_element_size == 1 ? STREAM_SINGLE : STREAM_WIDE); 379 out_streamp->s_buffer = NULL; 380 out_streamp->s_buffer_size = 0; 381 } else { 382 stream_clear(out_streamp); 383 stream_set(out_streamp, flags & ST_WIDE ? STREAM_WIDE : 384 STREAM_SINGLE); 385 } 386 387 (void) bump_file_template(); 388 out_streamp->s_filename = strdup(get_file_template()); 389 390 if (SOP_OPEN_FOR_WRITE(out_streamp) == -1) 391 return (NULL); 392 393 stream_set(out_streamp, STREAM_TEMPORARY); 394 stream_push_to_chain(str_chain, out_streamp); 395 396 if (streamp != NULL) { 397 /* 398 * We reset the input stream to the beginning, and copy it in 399 * sequence to the output stream, freeing the raw_collate field 400 * as we go. 401 */ 402 if (SOP_PRIME(streamp) != PRIME_SUCCEEDED) 403 die(EMSG_BADPRIME); 404 stream_dump(streamp, out_streamp); 405 } 406 407 if (!(flags & ST_OPEN)) { 408 SOP_FREE(out_streamp); 409 (void) SOP_CLOSE(out_streamp); 410 } 411 412 /* 413 * Now that we've written this stream to disk, we needn't protect any 414 * in-memory consumer. 415 */ 416 if (streamp != NULL) 417 streamp->s_consumer = NULL; 418 419 return (out_streamp); 420 } 421 422 void 423 stream_close_all_previous(stream_t *tail_streamp) 424 { 425 stream_t *cur_streamp; 426 427 ASSERT(tail_streamp != NULL); 428 429 cur_streamp = tail_streamp->s_previous; 430 while (cur_streamp != NULL) { 431 (void) SOP_FREE(cur_streamp); 432 if (SOP_IS_CLOSABLE(cur_streamp)) 433 (void) SOP_CLOSE(cur_streamp); 434 435 cur_streamp = cur_streamp->s_previous; 436 } 437 } 438 439 void 440 stream_unlink_temporary(stream_t *streamp) 441 { 442 if (streamp->s_status & STREAM_TEMPORARY) { 443 (void) SOP_FREE(streamp); 444 445 if (streamp->s_ops.sop_unlink) 446 (void) SOP_UNLINK(streamp); 447 } 448 } 449 450 /* 451 * stream_insert() takes input from src stream, converts to each line to 452 * collatable form, and places a line_rec_t in dest stream, which is of type 453 * STREAM_ARRAY. 454 */ 455 int 456 stream_insert(sort_t *S, stream_t *src, stream_t *dest) 457 { 458 ssize_t i = dest->s_type.LA.s_array_size; 459 line_rec_t *l_series; 460 char *l_convert = dest->s_buffer; 461 int return_val = ST_MEM_AVAIL; 462 int fetch_result = NEXT_LINE_COMPLETE; 463 464 /* 465 * Scan through until total bytes allowed accumulated, and return. 466 * Use SOP_FETCH(src) so that this works for all stream types, 467 * and so that we can repeat until eos. 468 * 469 * For each new line, we move back sizeof (line_rec_t) from the end of 470 * the array buffer, and copy into the start of the array buffer. When 471 * the pointers meet, or when we exhaust the current stream, we return. 472 * If we have not filled the current memory allocation, we return 473 * ST_MEM_AVAIL, else we return ST_MEM_FILLED. 474 */ 475 ASSERT(stream_is_primed(src)); 476 ASSERT(dest->s_status & STREAM_ARRAY); 477 478 /*LINTED ALIGNMENT*/ 479 l_series = (line_rec_t *)((caddr_t)dest->s_buffer 480 + dest->s_buffer_size) - dest->s_type.LA.s_array_size; 481 482 if (dest->s_type.LA.s_array_size) 483 l_convert = l_series->l_collate.sp + 484 l_series->l_collate_length + src->s_element_size; 485 486 /* 487 * current line has been set prior to entry 488 */ 489 src->s_current.l_collate.sp = l_convert; 490 src->s_current.l_collate_bufsize = (caddr_t)l_series 491 - (caddr_t)l_convert - sizeof (line_rec_t); 492 src->s_current.l_raw_collate.sp = NULL; 493 494 if (src->s_current.l_collate_bufsize <= 0) 495 return (ST_MEM_FILLED); 496 497 src->s_consumer = dest; 498 499 while (src->s_current.l_collate_bufsize > 0 && 500 (src->s_current.l_collate_length = S->m_coll_convert( 501 S->m_fields_head, &src->s_current, FCV_FAIL, 502 S->m_field_separator)) >= 0) { 503 ASSERT((char *)l_series > l_convert); 504 l_series--; 505 l_convert += src->s_current.l_collate_length; 506 507 if ((char *)l_series <= l_convert) { 508 __S(stats_incr_insert_filled_downward()); 509 l_series++; 510 return_val = ST_MEM_FILLED; 511 break; 512 } 513 514 /* 515 * There's no collision with the lower part of the buffer, so we 516 * can safely begin processing the line. In the debug case, we 517 * test for uninitialized data by copying a non-zero pattern. 518 */ 519 #ifdef DEBUG 520 memset(l_series, 0x1ff11ff1, sizeof (line_rec_t)); 521 #endif 522 523 copy_line_rec(&src->s_current, l_series); 524 i++; 525 526 if (SOP_EOS(src) || 527 (fetch_result = SOP_FETCH(src)) == NEXT_LINE_INCOMPLETE) 528 break; 529 530 src->s_current.l_collate.sp = l_convert; 531 src->s_current.l_collate_bufsize = (caddr_t)l_series 532 - (caddr_t)l_convert - sizeof (line_rec_t); 533 src->s_current.l_raw_collate.sp = NULL; 534 } 535 536 if (fetch_result == NEXT_LINE_INCOMPLETE) { 537 __S(stats_incr_insert_filled_input()); 538 return_val = ST_MEM_FILLED; 539 } else if (src->s_current.l_collate_length < 0 || 540 src->s_current.l_collate_bufsize <= 0) { 541 __S(stats_incr_insert_filled_upward()); 542 return_val = ST_MEM_FILLED; 543 } 544 545 if (fetch_result != NEXT_LINE_INCOMPLETE && 546 src->s_current.l_collate_length < 0 && 547 i == 0) 548 /* 549 * There's no room for conversion of our only line; need to 550 * execute with larger memory. 551 */ 552 die(EMSG_MEMORY); 553 554 /* 555 * Set up pointer array to line records. 556 */ 557 if (i > dest->s_type.LA.s_array_size) 558 dest->s_type.LA.s_array = safe_realloc(dest->s_type.LA.s_array, 559 sizeof (line_rec_t *) * i); 560 dest->s_type.LA.s_array_size = i; 561 562 i = 0; 563 while (i < dest->s_type.LA.s_array_size) { 564 dest->s_type.LA.s_array[i] = l_series; 565 l_series++; 566 i++; 567 } 568 569 /* 570 * LINES_ARRAY streams are always open. 571 */ 572 stream_set(dest, STREAM_OPEN); 573 574 return (return_val); 575 } 576 577 /* 578 * stream_swap_buffer() exchanges the stream's buffer with the proffered one; 579 * s_current is not adjusted so this is safe only for STREAM_INSTANT. 580 */ 581 void 582 stream_swap_buffer(stream_t *str, char **buf, size_t *size) 583 { 584 void *tb = *buf; 585 size_t ts = *size; 586 587 *buf = str->s_buffer; 588 *size = str->s_buffer_size; 589 590 str->s_buffer = tb; 591 str->s_buffer_size = ts; 592 } 593