1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #include "streams.h" 27 28 static const stream_ops_t invalid_ops = { 29 NULL, 30 NULL, 31 NULL, 32 NULL, 33 NULL, 34 NULL, 35 NULL, 36 NULL, 37 NULL, 38 NULL 39 }; 40 41 stream_t * 42 stream_new(int src) 43 { 44 stream_t *str = safe_realloc(NULL, sizeof (stream_t)); 45 46 stream_clear(str); 47 stream_set(str, src); 48 49 return (str); 50 } 51 52 void 53 stream_set(stream_t *str, flag_t flags) 54 { 55 if (flags & STREAM_SOURCE_MASK) { 56 ASSERT((flags & STREAM_SOURCE_MASK) == STREAM_ARRAY || 57 (flags & STREAM_SOURCE_MASK) == STREAM_SINGLE || 58 (flags & STREAM_SOURCE_MASK) == STREAM_MMAP || 59 (flags & STREAM_SOURCE_MASK) == STREAM_WIDE); 60 61 str->s_status &= ~STREAM_SOURCE_MASK; 62 str->s_status |= flags & STREAM_SOURCE_MASK; 63 64 switch (flags & STREAM_SOURCE_MASK) { 65 case STREAM_NO_SOURCE: 66 str->s_element_size = 0; 67 str->s_ops = invalid_ops; 68 return; 69 case STREAM_ARRAY: 70 /* 71 * Array streams inherit element size. 72 */ 73 str->s_ops = stream_array_ops; 74 break; 75 case STREAM_MMAP: 76 str->s_element_size = sizeof (char); 77 str->s_ops = stream_mmap_ops; 78 break; 79 case STREAM_SINGLE: 80 str->s_element_size = sizeof (char); 81 str->s_ops = stream_stdio_ops; 82 break; 83 case STREAM_WIDE: 84 str->s_element_size = sizeof (wchar_t); 85 str->s_ops = stream_wide_ops; 86 break; 87 default: 88 die(EMSG_UNKN_STREAM, str->s_status); 89 } 90 } 91 92 str->s_status |= (flags & ~STREAM_SOURCE_MASK); 93 94 if (str->s_status & STREAM_UNIQUE) 95 switch (str->s_status & STREAM_SOURCE_MASK) { 96 case STREAM_SINGLE : 97 str->s_ops.sop_put_line = 98 stream_stdio_put_line_unique; 99 break; 100 case STREAM_WIDE : 101 str->s_ops.sop_put_line = 102 stream_wide_put_line_unique; 103 break; 104 default : 105 break; 106 } 107 108 if (str->s_status & STREAM_INSTANT) 109 switch (str->s_status & STREAM_SOURCE_MASK) { 110 case STREAM_SINGLE : 111 str->s_ops.sop_fetch = 112 stream_stdio_fetch_overwrite; 113 break; 114 case STREAM_WIDE : 115 str->s_ops.sop_fetch = 116 stream_wide_fetch_overwrite; 117 break; 118 default : 119 break; 120 } 121 } 122 123 void 124 stream_unset(stream_t *streamp, flag_t flags) 125 { 126 ASSERT(!(flags & STREAM_SOURCE_MASK)); 127 128 streamp->s_status &= ~(flags & ~STREAM_SOURCE_MASK); 129 } 130 131 int 132 stream_is_primed(stream_t *streamp) 133 { 134 return (streamp->s_status & STREAM_PRIMED); 135 } 136 137 void 138 stream_clear(stream_t *str) 139 { 140 (void) memset(str, 0, sizeof (stream_t)); 141 } 142 143 static void 144 stream_copy(stream_t *dest, stream_t *src) 145 { 146 (void) memcpy(dest, src, sizeof (stream_t)); 147 } 148 149 void 150 stream_stat_chain(stream_t *strp) 151 { 152 struct stat buf; 153 stream_t *cur_strp = strp; 154 155 while (cur_strp != NULL) { 156 if (cur_strp->s_status & STREAM_NOTFILE || 157 cur_strp->s_status & STREAM_ARRAY) { 158 cur_strp = cur_strp->s_next; 159 continue; 160 } 161 162 if (stat(cur_strp->s_filename, &buf) < 0) 163 die(EMSG_STAT, cur_strp->s_filename); 164 165 cur_strp->s_dev = buf.st_dev; 166 cur_strp->s_ino = buf.st_ino; 167 cur_strp->s_filesize = buf.st_size; 168 169 cur_strp = cur_strp->s_next; 170 } 171 } 172 173 uint_t 174 stream_count_chain(stream_t *str) 175 { 176 uint_t n = 0; 177 178 while (str != NULL) { 179 n++; 180 str = str->s_next; 181 } 182 183 return (n); 184 } 185 186 int 187 stream_open_for_read(sort_t *S, stream_t *str) 188 { 189 int fd; 190 191 ASSERT(!(str->s_status & STREAM_OUTPUT)); 192 193 /* 194 * STREAM_ARRAY streams are open by definition. 195 */ 196 if ((str->s_status & STREAM_SOURCE_MASK) == STREAM_ARRAY) { 197 stream_set(str, STREAM_ARRAY | STREAM_OPEN); 198 return (1); 199 } 200 201 /* 202 * Set data type according to locale for input from stdin. 203 */ 204 if (str->s_status & STREAM_NOTFILE) { 205 str->s_type.BF.s_fp = stdin; 206 stream_set(str, STREAM_OPEN | (S->m_single_byte_locale ? 207 STREAM_SINGLE : STREAM_WIDE)); 208 return (1); 209 } 210 211 ASSERT(str->s_filename); 212 213 #ifndef DEBUG_DISALLOW_MMAP 214 if (S->m_single_byte_locale && 215 str->s_filesize > 0 && 216 str->s_filesize < SSIZE_MAX) { 217 /* 218 * make mmap() attempt; set s_status and return if successful 219 */ 220 fd = open(str->s_filename, O_RDONLY); 221 if (fd < 0) { 222 if (errno == EMFILE || errno == ENFILE) 223 return (-1); 224 else 225 die(EMSG_OPEN, str->s_filename); 226 } 227 str->s_buffer = mmap(0, str->s_filesize, PROT_READ, 228 MAP_SHARED, fd, 0); 229 230 if (str->s_buffer != MAP_FAILED) { 231 str->s_buffer_size = str->s_filesize; 232 str->s_type.SF.s_fd = fd; 233 234 stream_set(str, STREAM_MMAP | STREAM_OPEN); 235 stream_unset(str, STREAM_PRIMED); 236 return (1); 237 } 238 239 /* 240 * Otherwise the mmap() failed due to address space exhaustion; 241 * since we have already opened the file, we close it and drop 242 * into the normal (STDIO) case. 243 */ 244 (void) close(fd); 245 str->s_buffer = NULL; 246 } 247 #endif /* DEBUG_DISALLOW_MMAP */ 248 249 if ((str->s_type.BF.s_fp = fopen(str->s_filename, "r")) == NULL) { 250 if (errno == EMFILE || errno == ENFILE) 251 return (-1); 252 else 253 die(EMSG_OPEN, str->s_filename); 254 } 255 256 str->s_type.BF.s_vbuf = safe_realloc(NULL, STDIO_VBUF_SIZE); 257 if (setvbuf(str->s_type.BF.s_fp, str->s_type.BF.s_vbuf, _IOFBF, 258 STDIO_VBUF_SIZE) != 0) { 259 safe_free(str->s_type.BF.s_vbuf); 260 str->s_type.BF.s_vbuf = NULL; 261 } 262 263 stream_set(str, STREAM_OPEN | (S->m_single_byte_locale ? STREAM_SINGLE : 264 STREAM_WIDE)); 265 stream_unset(str, STREAM_PRIMED); 266 267 return (1); 268 } 269 270 void 271 stream_set_size(stream_t *str, size_t new_size) 272 { 273 /* 274 * p_new_size is new_size rounded upwards to nearest multiple of 275 * PAGESIZE, since mmap() is going to reserve it in any case. This 276 * ensures that the far end of the buffer is also aligned, such that we 277 * obtain aligned pointers if we choose to subtract from it. 278 */ 279 size_t p_new_size = (new_size + PAGESIZE) & ~(PAGESIZE - 1); 280 281 if (str->s_buffer_size == p_new_size) 282 return; 283 284 if (str->s_buffer != NULL) 285 (void) munmap(str->s_buffer, str->s_buffer_size); 286 287 if (new_size == 0) { 288 str->s_buffer = NULL; 289 str->s_buffer_size = 0; 290 return; 291 } 292 293 str->s_buffer = xzmap(0, p_new_size, PROT_READ | PROT_WRITE, 294 MAP_PRIVATE, 0); 295 296 if (str->s_buffer == MAP_FAILED) 297 die(EMSG_MMAP); 298 299 str->s_buffer_size = p_new_size; 300 } 301 302 void 303 stream_add_file_to_chain(stream_t **str_chain, char *filename) 304 { 305 stream_t *str; 306 307 str = stream_new(STREAM_NO_SOURCE); 308 309 str->s_filename = filename; 310 str->s_type.SF.s_fd = -1; 311 312 stream_push_to_chain(str_chain, str); 313 } 314 315 void 316 stream_push_to_chain(stream_t **str_chain, stream_t *streamp) 317 { 318 stream_t *cur_streamp = *str_chain; 319 320 if (cur_streamp == NULL) { 321 *str_chain = streamp; 322 streamp->s_next = NULL; 323 return; 324 } 325 326 while (cur_streamp->s_next != NULL) 327 cur_streamp = cur_streamp->s_next; 328 329 cur_streamp->s_next = streamp; 330 streamp->s_previous = cur_streamp; 331 streamp->s_next = NULL; 332 } 333 334 static void 335 stream_dump(stream_t *str_in, stream_t *str_out) 336 { 337 ASSERT(!(str_in->s_status & STREAM_OUTPUT)); 338 ASSERT(str_out->s_status & STREAM_OUTPUT); 339 340 SOP_PUT_LINE(str_out, &str_in->s_current); 341 342 while (!SOP_EOS(str_in)) { 343 SOP_FETCH(str_in); 344 SOP_PUT_LINE(str_out, &str_in->s_current); 345 } 346 } 347 348 /* 349 * stream_push_to_temporary() with flags set to ST_CACHE merely copies the 350 * stream_t pointer onto the chain. With flags set to ST_NOCACHE, the stream is 351 * written out to a file. Stream pointers passed to stream_push_to_temporary() 352 * must refer to allocated objects, and not to objects created on function 353 * stacks. Finally, if strp == NULL, stream_push_to_temporary() creates and 354 * pushes the new stream; the output stream is left open if ST_OPEN is set. 355 */ 356 stream_t * 357 stream_push_to_temporary(stream_t **str_chain, stream_t *streamp, int flags) 358 { 359 stream_t *out_streamp; 360 361 if (flags & ST_CACHE) { 362 ASSERT(streamp->s_status & STREAM_ARRAY); 363 stream_set(streamp, STREAM_NOT_FREEABLE | STREAM_TEMPORARY); 364 stream_push_to_chain(str_chain, streamp); 365 return (streamp); 366 } 367 368 out_streamp = safe_realloc(NULL, sizeof (stream_t)); 369 370 if (streamp != NULL) { 371 stream_copy(out_streamp, streamp); 372 stream_unset(out_streamp, STREAM_OPEN); 373 ASSERT(streamp->s_element_size == sizeof (char) || 374 streamp->s_element_size == sizeof (wchar_t)); 375 stream_set(out_streamp, 376 streamp->s_element_size == 1 ? STREAM_SINGLE : STREAM_WIDE); 377 out_streamp->s_buffer = NULL; 378 out_streamp->s_buffer_size = 0; 379 } else { 380 stream_clear(out_streamp); 381 stream_set(out_streamp, flags & ST_WIDE ? STREAM_WIDE : 382 STREAM_SINGLE); 383 } 384 385 (void) bump_file_template(); 386 out_streamp->s_filename = strdup(get_file_template()); 387 388 if (SOP_OPEN_FOR_WRITE(out_streamp) == -1) 389 return (NULL); 390 391 stream_set(out_streamp, STREAM_TEMPORARY); 392 stream_push_to_chain(str_chain, out_streamp); 393 394 if (streamp != NULL) { 395 /* 396 * We reset the input stream to the beginning, and copy it in 397 * sequence to the output stream, freeing the raw_collate field 398 * as we go. 399 */ 400 if (SOP_PRIME(streamp) != PRIME_SUCCEEDED) 401 die(EMSG_BADPRIME); 402 stream_dump(streamp, out_streamp); 403 } 404 405 if (!(flags & ST_OPEN)) { 406 SOP_FREE(out_streamp); 407 (void) SOP_CLOSE(out_streamp); 408 } 409 410 /* 411 * Now that we've written this stream to disk, we needn't protect any 412 * in-memory consumer. 413 */ 414 if (streamp != NULL) 415 streamp->s_consumer = NULL; 416 417 return (out_streamp); 418 } 419 420 void 421 stream_close_all_previous(stream_t *tail_streamp) 422 { 423 stream_t *cur_streamp; 424 425 ASSERT(tail_streamp != NULL); 426 427 cur_streamp = tail_streamp->s_previous; 428 while (cur_streamp != NULL) { 429 (void) SOP_FREE(cur_streamp); 430 if (SOP_IS_CLOSABLE(cur_streamp)) 431 (void) SOP_CLOSE(cur_streamp); 432 433 cur_streamp = cur_streamp->s_previous; 434 } 435 } 436 437 void 438 stream_unlink_temporary(stream_t *streamp) 439 { 440 if (streamp->s_status & STREAM_TEMPORARY) { 441 (void) SOP_FREE(streamp); 442 443 if (streamp->s_ops.sop_unlink) 444 (void) SOP_UNLINK(streamp); 445 } 446 } 447 448 /* 449 * stream_insert() takes input from src stream, converts to each line to 450 * collatable form, and places a line_rec_t in dest stream, which is of type 451 * STREAM_ARRAY. 452 */ 453 int 454 stream_insert(sort_t *S, stream_t *src, stream_t *dest) 455 { 456 ssize_t i = dest->s_type.LA.s_array_size; 457 line_rec_t *l_series; 458 char *l_convert = dest->s_buffer; 459 int return_val = ST_MEM_AVAIL; 460 int fetch_result = NEXT_LINE_COMPLETE; 461 462 /* 463 * Scan through until total bytes allowed accumulated, and return. 464 * Use SOP_FETCH(src) so that this works for all stream types, 465 * and so that we can repeat until eos. 466 * 467 * For each new line, we move back sizeof (line_rec_t) from the end of 468 * the array buffer, and copy into the start of the array buffer. When 469 * the pointers meet, or when we exhaust the current stream, we return. 470 * If we have not filled the current memory allocation, we return 471 * ST_MEM_AVAIL, else we return ST_MEM_FILLED. 472 */ 473 ASSERT(stream_is_primed(src)); 474 ASSERT(dest->s_status & STREAM_ARRAY); 475 476 /*LINTED ALIGNMENT*/ 477 l_series = (line_rec_t *)((caddr_t)dest->s_buffer 478 + dest->s_buffer_size) - dest->s_type.LA.s_array_size; 479 480 if (dest->s_type.LA.s_array_size) 481 l_convert = l_series->l_collate.sp + 482 l_series->l_collate_length + src->s_element_size; 483 484 /* 485 * current line has been set prior to entry 486 */ 487 src->s_current.l_collate.sp = l_convert; 488 src->s_current.l_collate_bufsize = (caddr_t)l_series 489 - (caddr_t)l_convert - sizeof (line_rec_t); 490 src->s_current.l_raw_collate.sp = NULL; 491 492 if (src->s_current.l_collate_bufsize <= 0) 493 return (ST_MEM_FILLED); 494 495 src->s_consumer = dest; 496 497 while (src->s_current.l_collate_bufsize > 0 && 498 (src->s_current.l_collate_length = S->m_coll_convert( 499 S->m_fields_head, &src->s_current, FCV_FAIL, 500 S->m_field_separator)) >= 0) { 501 ASSERT((char *)l_series > l_convert); 502 l_series--; 503 l_convert += src->s_current.l_collate_length; 504 505 if ((char *)l_series <= l_convert) { 506 __S(stats_incr_insert_filled_downward()); 507 l_series++; 508 return_val = ST_MEM_FILLED; 509 break; 510 } 511 512 /* 513 * There's no collision with the lower part of the buffer, so we 514 * can safely begin processing the line. In the debug case, we 515 * test for uninitialized data by copying a non-zero pattern. 516 */ 517 #ifdef DEBUG 518 memset(l_series, 0x1ff11ff1, sizeof (line_rec_t)); 519 #endif 520 521 copy_line_rec(&src->s_current, l_series); 522 i++; 523 524 if (SOP_EOS(src) || 525 (fetch_result = SOP_FETCH(src)) == NEXT_LINE_INCOMPLETE) 526 break; 527 528 src->s_current.l_collate.sp = l_convert; 529 src->s_current.l_collate_bufsize = (caddr_t)l_series 530 - (caddr_t)l_convert - sizeof (line_rec_t); 531 src->s_current.l_raw_collate.sp = NULL; 532 } 533 534 if (fetch_result == NEXT_LINE_INCOMPLETE) { 535 __S(stats_incr_insert_filled_input()); 536 return_val = ST_MEM_FILLED; 537 } else if (src->s_current.l_collate_length < 0 || 538 src->s_current.l_collate_bufsize <= 0) { 539 __S(stats_incr_insert_filled_upward()); 540 return_val = ST_MEM_FILLED; 541 } 542 543 if (fetch_result != NEXT_LINE_INCOMPLETE && 544 src->s_current.l_collate_length < 0 && 545 i == 0) 546 /* 547 * There's no room for conversion of our only line; need to 548 * execute with larger memory. 549 */ 550 die(EMSG_MEMORY); 551 552 /* 553 * Set up pointer array to line records. 554 */ 555 if (i > dest->s_type.LA.s_array_size) 556 dest->s_type.LA.s_array = safe_realloc(dest->s_type.LA.s_array, 557 sizeof (line_rec_t *) * i); 558 dest->s_type.LA.s_array_size = i; 559 560 i = 0; 561 while (i < dest->s_type.LA.s_array_size) { 562 dest->s_type.LA.s_array[i] = l_series; 563 l_series++; 564 i++; 565 } 566 567 /* 568 * LINES_ARRAY streams are always open. 569 */ 570 stream_set(dest, STREAM_OPEN); 571 572 return (return_val); 573 } 574 575 /* 576 * stream_swap_buffer() exchanges the stream's buffer with the proffered one; 577 * s_current is not adjusted so this is safe only for STREAM_INSTANT. 578 */ 579 void 580 stream_swap_buffer(stream_t *str, char **buf, size_t *size) 581 { 582 void *tb = *buf; 583 size_t ts = *size; 584 585 *buf = str->s_buffer; 586 *size = str->s_buffer_size; 587 588 str->s_buffer = tb; 589 str->s_buffer_size = ts; 590 } 591