1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "streams_stdio.h" 30 #include "streams_common.h" 31 32 #define SHELF_OCCUPIED 1 33 #define SHELF_VACANT 0 34 static int shelf = SHELF_VACANT; 35 36 /* 37 * Single-byte character file i/o-based streams implementation 38 * 39 * The routines in this file contain the implementation of the i/o streams 40 * interface for those situations where the input is via stdio. 41 * 42 * The "shelf" 43 * In the case where the input buffer contains insufficient room to hold the 44 * entire line, the fractional line is shelved, and will be grafted to on the 45 * subsequent read. 46 */ 47 int 48 stream_stdio_open_for_write(stream_t *str) 49 { 50 stream_simple_file_t *SF = &(str->s_type.SF); 51 52 ASSERT(!(str->s_status & STREAM_OPEN)); 53 ASSERT(!(str->s_status & STREAM_OUTPUT)); 54 55 if (str->s_status & STREAM_NOTFILE) 56 SF->s_fd = fileno(stdout); 57 else 58 if ((SF->s_fd = open(str->s_filename, O_CREAT | O_TRUNC | 59 O_WRONLY, OUTPUT_MODE)) < 0) { 60 if (errno == EMFILE || errno == ENFILE) 61 return (-1); 62 else 63 die(EMSG_OPEN, str->s_filename); 64 } 65 66 stream_set(str, STREAM_OPEN | STREAM_OUTPUT); 67 68 return (1); 69 } 70 71 /* 72 * In the case of an instantaneous stream, we allocate a small buffer (64k) here 73 * for the stream; otherwise, the s_buffer and s_buffer_size members should have 74 * been set by stream_set_size() prior to calling stream_prime(). 75 * 76 * Repriming (priming an already primed stream) is done when we are reentering a 77 * file after having sorted a previous portion of the file. 78 */ 79 static int 80 stream_stdio_prime(stream_t *str) 81 { 82 stream_buffered_file_t *BF = &(str->s_type.BF); 83 char *current_position; 84 char *end_of_buffer; 85 char *next_nl; 86 87 ASSERT(!(str->s_status & STREAM_OUTPUT)); 88 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE)); 89 ASSERT(str->s_status & STREAM_OPEN); 90 91 if (str->s_status & STREAM_INSTANT && (str->s_buffer == NULL)) { 92 str->s_buffer = xzmap(0, STDIO_VBUF_SIZE, PROT_READ | 93 PROT_WRITE, MAP_PRIVATE, 0); 94 if (str->s_buffer == MAP_FAILED) 95 die(EMSG_MMAP); 96 str->s_buffer_size = STDIO_VBUF_SIZE; 97 } 98 99 ASSERT(str->s_buffer != NULL); 100 101 if (stream_is_primed(str)) { 102 /* 103 * l_data_length is only set to -1 in the case of coincidental 104 * exhaustion of the input butter. This is thus the only case 105 * which involves no copying on a re-prime. 106 */ 107 int shelf_state = shelf; 108 109 ASSERT(str->s_current.l_data_length >= -1); 110 (void) memcpy(str->s_buffer, str->s_current.l_data.sp, 111 str->s_current.l_data_length + 1); 112 str->s_current.l_data.sp = str->s_buffer; 113 114 /* 115 * If our current line is incomplete, we need to get the rest of 116 * the line--if we can't, then we've exhausted memory. 117 */ 118 if ((str->s_current.l_data_length == -1 || 119 shelf_state == SHELF_OCCUPIED || 120 *(str->s_current.l_data.sp + 121 str->s_current.l_data_length) != '\n') && 122 SOP_FETCH(str) == NEXT_LINE_INCOMPLETE && 123 shelf_state == SHELF_OCCUPIED) 124 die(EMSG_MEMORY); 125 126 str->s_current.l_collate.sp = NULL; 127 str->s_current.l_collate_length = 0; 128 129 return (PRIME_SUCCEEDED); 130 } 131 132 stream_set(str, STREAM_PRIMED); 133 134 current_position = (char *)str->s_buffer; 135 end_of_buffer = (char *)str->s_buffer + str->s_buffer_size; 136 137 trip_eof(BF->s_fp); 138 if (!feof(BF->s_fp)) 139 (void) fgets(current_position, end_of_buffer - current_position, 140 BF->s_fp); 141 else { 142 stream_set(str, STREAM_EOS_REACHED); 143 stream_unset(str, STREAM_PRIMED); 144 return (PRIME_FAILED_EMPTY_FILE); 145 } 146 147 str->s_current.l_data.sp = current_position; 148 /* 149 * Because one might run sort on a binary file, strlen() is no longer 150 * trustworthy--we must explicitly search for a newline. 151 */ 152 if ((next_nl = memchr(current_position, '\n', 153 end_of_buffer - current_position)) == NULL) { 154 warn(WMSG_NEWLINE_ADDED, str->s_filename); 155 str->s_current.l_data_length = MIN(strlen(current_position), 156 end_of_buffer - current_position); 157 } else { 158 str->s_current.l_data_length = next_nl - current_position; 159 } 160 161 str->s_current.l_collate.sp = NULL; 162 str->s_current.l_collate_length = 0; 163 164 __S(stats_incr_fetches()); 165 return (PRIME_SUCCEEDED); 166 } 167 168 /* 169 * stream_stdio_fetch() guarantees the return of a complete line, or a flag 170 * indicating that the complete line could not be read. 171 */ 172 static ssize_t 173 stream_stdio_fetch(stream_t *str) 174 { 175 ssize_t dist_to_buf_end; 176 int ret_val; 177 char *graft_pt, *next_nl; 178 179 ASSERT(str->s_status & STREAM_OPEN); 180 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE)); 181 ASSERT((str->s_status & STREAM_EOS_REACHED) == 0); 182 183 graft_pt = str->s_current.l_data.sp + str->s_current.l_data_length + 1; 184 185 if (shelf == SHELF_VACANT) { 186 /* 187 * The graft point is the start of the current line. 188 */ 189 str->s_current.l_data.sp = graft_pt; 190 } else if (str->s_current.l_data_length > -1) { 191 /* 192 * Correct for terminating NUL on shelved line. This NUL is 193 * only present if we didn't have the coincidental case 194 * mentioned in the comment below. 195 */ 196 graft_pt--; 197 } 198 199 dist_to_buf_end = str->s_buffer_size - (graft_pt - 200 (char *)str->s_buffer); 201 202 if (dist_to_buf_end <= 1) { 203 /* 204 * fgets()'s behaviour in the case of a one-character buffer is 205 * somewhat unhelpful: it fills the buffer with '\0' and 206 * returns successfully (even if EOF has been reached for the 207 * file in question). Since we may be in the middle of a 208 * grafting operation, we leave early, maintaining the shelf in 209 * its current state. 210 */ 211 str->s_current.l_data_length = -1; 212 return (NEXT_LINE_INCOMPLETE); 213 } 214 215 if (fgets(graft_pt, dist_to_buf_end, str->s_type.BF.s_fp) == NULL) { 216 if (feof(str->s_type.BF.s_fp)) 217 stream_set(str, STREAM_EOS_REACHED); 218 else 219 die(EMSG_READ, str->s_filename); 220 } 221 222 trip_eof(str->s_type.BF.s_fp); 223 /* 224 * Because one might run sort on a binary file, strlen() is no longer 225 * trustworthy--we must explicitly search for a newline. 226 */ 227 if ((next_nl = memchr(str->s_current.l_data.sp, '\n', 228 dist_to_buf_end)) == NULL) { 229 str->s_current.l_data_length = strlen(str->s_current.l_data.sp); 230 } else { 231 str->s_current.l_data_length = next_nl - 232 str->s_current.l_data.sp; 233 } 234 235 str->s_current.l_collate_length = 0; 236 237 if (*(str->s_current.l_data.sp + str->s_current.l_data_length) != 238 '\n') { 239 if (!feof(str->s_type.BF.s_fp)) { 240 /* 241 * We were only able to read part of the line; note that 242 * we have something on the shelf for our next fetch. 243 * If the shelf was previously occupied, and we still 244 * can't get the entire line, then we need more 245 * resources. 246 */ 247 if (shelf == SHELF_OCCUPIED) 248 die(EMSG_MEMORY); 249 250 shelf = SHELF_OCCUPIED; 251 ret_val = NEXT_LINE_INCOMPLETE; 252 253 __S(stats_incr_shelves()); 254 } else { 255 stream_set(str, STREAM_EOS_REACHED); 256 warn(WMSG_NEWLINE_ADDED, str->s_filename); 257 } 258 } else { 259 shelf = SHELF_VACANT; 260 ret_val = NEXT_LINE_COMPLETE; 261 __S(stats_incr_fetches()); 262 } 263 264 return (ret_val); 265 } 266 267 /* 268 * stdio_fetch_overwrite() is used when we are performing an operation where we 269 * need the buffer contents only over a single period. (merge and check are 270 * operations of this kind.) In this case, we read the current line at the head 271 * of the stream's defined buffer. If we cannot read the entire line, we have 272 * not allocated sufficient memory. 273 */ 274 ssize_t 275 stream_stdio_fetch_overwrite(stream_t *str) 276 { 277 ssize_t dist_to_buf_end; 278 279 ASSERT(str->s_status & STREAM_OPEN); 280 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE)); 281 ASSERT((str->s_status & STREAM_EOS_REACHED) == 0); 282 283 str->s_current.l_data.sp = str->s_buffer; 284 dist_to_buf_end = str->s_buffer_size; 285 286 if (fgets(str->s_current.l_data.sp, dist_to_buf_end, 287 str->s_type.BF.s_fp) == NULL) { 288 if (feof(str->s_type.BF.s_fp)) 289 stream_set(str, STREAM_EOS_REACHED); 290 else 291 die(EMSG_READ, str->s_filename); 292 } 293 294 trip_eof(str->s_type.BF.s_fp); 295 str->s_current.l_data_length = strlen(str->s_current.l_data.sp) - 1; 296 str->s_current.l_collate_length = 0; 297 298 if (str->s_current.l_data_length == -1 || 299 *(str->s_current.l_data.sp + str->s_current.l_data_length) != 300 '\n') { 301 if (!feof(str->s_type.BF.s_fp)) { 302 /* 303 * In the overwrite case, failure to read the entire 304 * line means our buffer size was insufficient (as we 305 * are using all of it). Exit, requesting more 306 * resources. 307 */ 308 die(EMSG_MEMORY); 309 } else { 310 stream_set(str, STREAM_EOS_REACHED); 311 warn(WMSG_NEWLINE_ADDED, str->s_filename); 312 } 313 } 314 315 __S(stats_incr_fetches()); 316 return (NEXT_LINE_COMPLETE); 317 } 318 319 int 320 stream_stdio_is_closable(stream_t *str) 321 { 322 if (str->s_status & STREAM_OPEN && !(str->s_status & STREAM_NOTFILE)) 323 return (1); 324 return (0); 325 } 326 327 int 328 stream_stdio_close(stream_t *str) 329 { 330 ASSERT(str->s_status & STREAM_OPEN); 331 332 if (!(str->s_status & STREAM_OUTPUT)) { 333 if (!(str->s_status & STREAM_NOTFILE)) 334 (void) fclose(str->s_type.BF.s_fp); 335 336 if (str->s_type.BF.s_vbuf != NULL) { 337 free(str->s_type.BF.s_vbuf); 338 str->s_type.BF.s_vbuf = NULL; 339 } 340 } else { 341 if (cxwrite(str->s_type.SF.s_fd, NULL, 0) == 0) 342 (void) close(str->s_type.SF.s_fd); 343 else 344 die(EMSG_WRITE, str->s_filename); 345 } 346 347 stream_unset(str, STREAM_OPEN | STREAM_PRIMED | STREAM_OUTPUT); 348 return (1); 349 } 350 351 static void 352 stream_stdio_send_eol(stream_t *str) 353 { 354 ASSERT(str->s_status & STREAM_OPEN); 355 ASSERT(str->s_status & STREAM_OUTPUT); 356 357 if (cxwrite(str->s_type.SF.s_fd, "\n", 1) < 0) 358 die(EMSG_WRITE, str->s_filename); 359 } 360 361 void 362 stream_stdio_flush(stream_t *str) 363 { 364 ASSERT(str->s_status & STREAM_OPEN); 365 ASSERT(str->s_status & STREAM_OUTPUT); 366 367 if (cxwrite(str->s_type.SF.s_fd, NULL, 0) < 0) 368 die(EMSG_WRITE, str->s_filename); 369 } 370 371 static void 372 stream_stdio_put_line(stream_t *str, line_rec_t *line) 373 { 374 ASSERT(str->s_status & STREAM_OPEN); 375 ASSERT(str->s_status & STREAM_OUTPUT); 376 377 if (line->l_data_length >= 0) { 378 if (cxwrite(str->s_type.SF.s_fd, line->l_data.sp, 379 line->l_data_length) < 0) 380 die(EMSG_WRITE, str->s_filename); 381 382 stream_stdio_send_eol(str); 383 __S(stats_incr_puts()); 384 } 385 safe_free(line->l_raw_collate.sp); 386 line->l_raw_collate.sp = NULL; 387 } 388 389 void 390 stream_stdio_put_line_unique(stream_t *str, line_rec_t *line) 391 { 392 static line_rec_t pvs; 393 static size_t collate_buf_len; 394 395 ASSERT(str->s_status & STREAM_OPEN); 396 ASSERT(str->s_status & STREAM_OUTPUT); 397 398 if (pvs.l_collate.sp != NULL && 399 collated(&pvs, line, 0, COLL_UNIQUE) == 0) { 400 __S(stats_incr_not_unique()); 401 return; 402 } 403 404 __S(stats_incr_put_unique()); 405 stream_stdio_put_line(str, line); 406 407 if (line->l_collate_length + 1 > collate_buf_len) { 408 pvs.l_collate.sp = safe_realloc(pvs.l_collate.sp, 409 line->l_collate_length + 1); 410 collate_buf_len = line->l_collate_length + 1; 411 } 412 413 (void) memcpy(pvs.l_collate.sp, line->l_collate.sp, 414 line->l_collate_length); 415 *(pvs.l_collate.sp + line->l_collate_length) = '\0'; 416 pvs.l_collate_length = line->l_collate_length; 417 } 418 419 int 420 stream_stdio_unlink(stream_t *str) 421 { 422 if (!(str->s_status & STREAM_NOTFILE)) 423 return (unlink(str->s_filename)); 424 425 return (0); 426 } 427 428 int 429 stream_stdio_free(stream_t *str) 430 { 431 /* 432 * Unmap the memory we allocated for input, if it's valid to do so. 433 */ 434 if (!(str->s_status & STREAM_OPEN) || 435 (str->s_consumer != NULL && 436 str->s_consumer->s_status & STREAM_NOT_FREEABLE)) 437 return (0); 438 439 if (str->s_buffer != NULL) { 440 if (munmap(str->s_buffer, str->s_buffer_size) < 0) 441 die(EMSG_MUNMAP, "/dev/zero"); 442 else { 443 str->s_buffer = NULL; 444 str->s_buffer_size = 0; 445 } 446 } 447 448 stream_unset(str, STREAM_PRIMED | STREAM_INSTANT); 449 450 return (1); 451 } 452 453 static int 454 stream_stdio_eos(stream_t *str) 455 { 456 int retval = 0; 457 458 ASSERT(!(str->s_status & STREAM_OUTPUT)); 459 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE)); 460 461 if (str == NULL || str->s_status & STREAM_EOS_REACHED) 462 return (1); 463 464 trip_eof(str->s_type.BF.s_fp); 465 if (feof(str->s_type.BF.s_fp) && 466 shelf == SHELF_VACANT && 467 str->s_current.l_collate_length != -1) { 468 retval = 1; 469 stream_set(str, STREAM_EOS_REACHED); 470 } 471 472 return (retval); 473 } 474 475 /*ARGSUSED*/ 476 static void 477 stream_stdio_release_line(stream_t *str) 478 { 479 } 480 481 const stream_ops_t stream_stdio_ops = { 482 stream_stdio_is_closable, 483 stream_stdio_close, 484 stream_stdio_eos, 485 stream_stdio_fetch, 486 stream_stdio_flush, 487 stream_stdio_free, 488 stream_stdio_open_for_write, 489 stream_stdio_prime, 490 stream_stdio_put_line, 491 stream_stdio_release_line, 492 stream_stdio_send_eol, 493 stream_stdio_unlink 494 }; 495