1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #include "streams_stdio.h" 28 #include "streams_common.h" 29 30 #define SHELF_OCCUPIED 1 31 #define SHELF_VACANT 0 32 static int shelf = SHELF_VACANT; 33 34 /* 35 * Single-byte character file i/o-based streams implementation 36 * 37 * The routines in this file contain the implementation of the i/o streams 38 * interface for those situations where the input is via stdio. 39 * 40 * The "shelf" 41 * In the case where the input buffer contains insufficient room to hold the 42 * entire line, the fractional line is shelved, and will be grafted to on the 43 * subsequent read. 44 */ 45 int 46 stream_stdio_open_for_write(stream_t *str) 47 { 48 stream_simple_file_t *SF = &(str->s_type.SF); 49 50 ASSERT(!(str->s_status & STREAM_OPEN)); 51 ASSERT(!(str->s_status & STREAM_OUTPUT)); 52 53 if (str->s_status & STREAM_NOTFILE) 54 SF->s_fd = fileno(stdout); 55 else 56 if ((SF->s_fd = open(str->s_filename, O_CREAT | O_TRUNC | 57 O_WRONLY, OUTPUT_MODE)) < 0) { 58 if (errno == EMFILE || errno == ENFILE) 59 return (-1); 60 else 61 die(EMSG_OPEN, str->s_filename); 62 } 63 64 stream_set(str, STREAM_OPEN | STREAM_OUTPUT); 65 66 return (1); 67 } 68 69 /* 70 * In the case of an instantaneous stream, we allocate a small buffer (64k) here 71 * for the stream; otherwise, the s_buffer and s_buffer_size members should have 72 * been set by stream_set_size() prior to calling stream_prime(). 73 * 74 * Repriming (priming an already primed stream) is done when we are reentering a 75 * file after having sorted a previous portion of the file. 76 */ 77 static int 78 stream_stdio_prime(stream_t *str) 79 { 80 stream_buffered_file_t *BF = &(str->s_type.BF); 81 char *current_position; 82 char *end_of_buffer; 83 char *next_nl; 84 85 ASSERT(!(str->s_status & STREAM_OUTPUT)); 86 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE)); 87 ASSERT(str->s_status & STREAM_OPEN); 88 89 if (str->s_status & STREAM_INSTANT && (str->s_buffer == NULL)) { 90 str->s_buffer = xzmap(0, STDIO_VBUF_SIZE, PROT_READ | 91 PROT_WRITE, MAP_PRIVATE, 0); 92 if (str->s_buffer == MAP_FAILED) 93 die(EMSG_MMAP); 94 str->s_buffer_size = STDIO_VBUF_SIZE; 95 } 96 97 ASSERT(str->s_buffer != NULL); 98 99 if (stream_is_primed(str)) { 100 /* 101 * l_data_length is only set to -1 in the case of coincidental 102 * exhaustion of the input butter. This is thus the only case 103 * which involves no copying on a re-prime. 104 */ 105 int shelf_state = shelf; 106 107 ASSERT(str->s_current.l_data_length >= -1); 108 (void) memcpy(str->s_buffer, str->s_current.l_data.sp, 109 str->s_current.l_data_length + 1); 110 str->s_current.l_data.sp = str->s_buffer; 111 112 /* 113 * If our current line is incomplete, we need to get the rest of 114 * the line--if we can't, then we've exhausted memory. 115 */ 116 if ((str->s_current.l_data_length == -1 || 117 shelf_state == SHELF_OCCUPIED || 118 *(str->s_current.l_data.sp + 119 str->s_current.l_data_length) != '\n') && 120 SOP_FETCH(str) == NEXT_LINE_INCOMPLETE && 121 shelf_state == SHELF_OCCUPIED) 122 die(EMSG_MEMORY); 123 124 str->s_current.l_collate.sp = NULL; 125 str->s_current.l_collate_length = 0; 126 127 return (PRIME_SUCCEEDED); 128 } 129 130 stream_set(str, STREAM_PRIMED); 131 132 current_position = (char *)str->s_buffer; 133 end_of_buffer = (char *)str->s_buffer + str->s_buffer_size; 134 135 trip_eof(BF->s_fp); 136 if (!feof(BF->s_fp)) 137 (void) fgets(current_position, end_of_buffer - current_position, 138 BF->s_fp); 139 else { 140 stream_set(str, STREAM_EOS_REACHED); 141 stream_unset(str, STREAM_PRIMED); 142 return (PRIME_FAILED_EMPTY_FILE); 143 } 144 145 str->s_current.l_data.sp = current_position; 146 /* 147 * Because one might run sort on a binary file, strlen() is no longer 148 * trustworthy--we must explicitly search for a newline. 149 */ 150 if ((next_nl = memchr(current_position, '\n', 151 end_of_buffer - current_position)) == NULL) { 152 warn(WMSG_NEWLINE_ADDED, str->s_filename); 153 str->s_current.l_data_length = MIN(strlen(current_position), 154 end_of_buffer - current_position); 155 } else { 156 str->s_current.l_data_length = next_nl - current_position; 157 } 158 159 str->s_current.l_collate.sp = NULL; 160 str->s_current.l_collate_length = 0; 161 162 __S(stats_incr_fetches()); 163 return (PRIME_SUCCEEDED); 164 } 165 166 /* 167 * stream_stdio_fetch() guarantees the return of a complete line, or a flag 168 * indicating that the complete line could not be read. 169 */ 170 static ssize_t 171 stream_stdio_fetch(stream_t *str) 172 { 173 ssize_t dist_to_buf_end; 174 int ret_val; 175 char *graft_pt, *next_nl; 176 177 ASSERT(str->s_status & STREAM_OPEN); 178 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE)); 179 ASSERT((str->s_status & STREAM_EOS_REACHED) == 0); 180 181 graft_pt = str->s_current.l_data.sp + str->s_current.l_data_length + 1; 182 183 if (shelf == SHELF_VACANT) { 184 /* 185 * The graft point is the start of the current line. 186 */ 187 str->s_current.l_data.sp = graft_pt; 188 } else if (str->s_current.l_data_length > -1) { 189 /* 190 * Correct for terminating NUL on shelved line. This NUL is 191 * only present if we didn't have the coincidental case 192 * mentioned in the comment below. 193 */ 194 graft_pt--; 195 } 196 197 dist_to_buf_end = str->s_buffer_size - (graft_pt - 198 (char *)str->s_buffer); 199 200 if (dist_to_buf_end <= 1) { 201 /* 202 * fgets()'s behaviour in the case of a one-character buffer is 203 * somewhat unhelpful: it fills the buffer with '\0' and 204 * returns successfully (even if EOF has been reached for the 205 * file in question). Since we may be in the middle of a 206 * grafting operation, we leave early, maintaining the shelf in 207 * its current state. 208 */ 209 str->s_current.l_data_length = -1; 210 return (NEXT_LINE_INCOMPLETE); 211 } 212 213 if (fgets(graft_pt, dist_to_buf_end, str->s_type.BF.s_fp) == NULL) { 214 if (feof(str->s_type.BF.s_fp)) 215 stream_set(str, STREAM_EOS_REACHED); 216 else 217 die(EMSG_READ, str->s_filename); 218 } 219 220 trip_eof(str->s_type.BF.s_fp); 221 /* 222 * Because one might run sort on a binary file, strlen() is no longer 223 * trustworthy--we must explicitly search for a newline. 224 */ 225 if ((next_nl = memchr(str->s_current.l_data.sp, '\n', 226 dist_to_buf_end)) == NULL) { 227 str->s_current.l_data_length = strlen(str->s_current.l_data.sp); 228 } else { 229 str->s_current.l_data_length = next_nl - 230 str->s_current.l_data.sp; 231 } 232 233 str->s_current.l_collate_length = 0; 234 235 if (*(str->s_current.l_data.sp + str->s_current.l_data_length) != 236 '\n') { 237 if (!feof(str->s_type.BF.s_fp)) { 238 /* 239 * We were only able to read part of the line; note that 240 * we have something on the shelf for our next fetch. 241 * If the shelf was previously occupied, and we still 242 * can't get the entire line, then we need more 243 * resources. 244 */ 245 if (shelf == SHELF_OCCUPIED) 246 die(EMSG_MEMORY); 247 248 shelf = SHELF_OCCUPIED; 249 ret_val = NEXT_LINE_INCOMPLETE; 250 251 __S(stats_incr_shelves()); 252 } else { 253 stream_set(str, STREAM_EOS_REACHED); 254 warn(WMSG_NEWLINE_ADDED, str->s_filename); 255 } 256 } else { 257 shelf = SHELF_VACANT; 258 ret_val = NEXT_LINE_COMPLETE; 259 __S(stats_incr_fetches()); 260 } 261 262 return (ret_val); 263 } 264 265 /* 266 * stdio_fetch_overwrite() is used when we are performing an operation where we 267 * need the buffer contents only over a single period. (merge and check are 268 * operations of this kind.) In this case, we read the current line at the head 269 * of the stream's defined buffer. If we cannot read the entire line, we have 270 * not allocated sufficient memory. 271 */ 272 ssize_t 273 stream_stdio_fetch_overwrite(stream_t *str) 274 { 275 ssize_t dist_to_buf_end; 276 277 ASSERT(str->s_status & STREAM_OPEN); 278 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE)); 279 ASSERT((str->s_status & STREAM_EOS_REACHED) == 0); 280 281 str->s_current.l_data.sp = str->s_buffer; 282 dist_to_buf_end = str->s_buffer_size; 283 284 if (fgets(str->s_current.l_data.sp, dist_to_buf_end, 285 str->s_type.BF.s_fp) == NULL) { 286 if (feof(str->s_type.BF.s_fp)) 287 stream_set(str, STREAM_EOS_REACHED); 288 else 289 die(EMSG_READ, str->s_filename); 290 } 291 292 trip_eof(str->s_type.BF.s_fp); 293 str->s_current.l_data_length = strlen(str->s_current.l_data.sp) - 1; 294 str->s_current.l_collate_length = 0; 295 296 if (str->s_current.l_data_length == -1 || 297 *(str->s_current.l_data.sp + str->s_current.l_data_length) != 298 '\n') { 299 if (!feof(str->s_type.BF.s_fp)) { 300 /* 301 * In the overwrite case, failure to read the entire 302 * line means our buffer size was insufficient (as we 303 * are using all of it). Exit, requesting more 304 * resources. 305 */ 306 die(EMSG_MEMORY); 307 } else { 308 stream_set(str, STREAM_EOS_REACHED); 309 warn(WMSG_NEWLINE_ADDED, str->s_filename); 310 } 311 } 312 313 __S(stats_incr_fetches()); 314 return (NEXT_LINE_COMPLETE); 315 } 316 317 int 318 stream_stdio_is_closable(stream_t *str) 319 { 320 if (str->s_status & STREAM_OPEN && !(str->s_status & STREAM_NOTFILE)) 321 return (1); 322 return (0); 323 } 324 325 int 326 stream_stdio_close(stream_t *str) 327 { 328 ASSERT(str->s_status & STREAM_OPEN); 329 330 if (!(str->s_status & STREAM_OUTPUT)) { 331 if (!(str->s_status & STREAM_NOTFILE)) 332 (void) fclose(str->s_type.BF.s_fp); 333 334 if (str->s_type.BF.s_vbuf != NULL) { 335 free(str->s_type.BF.s_vbuf); 336 str->s_type.BF.s_vbuf = NULL; 337 } 338 } else { 339 if (cxwrite(str->s_type.SF.s_fd, NULL, 0) == 0) 340 (void) close(str->s_type.SF.s_fd); 341 else 342 die(EMSG_WRITE, str->s_filename); 343 } 344 345 stream_unset(str, STREAM_OPEN | STREAM_PRIMED | STREAM_OUTPUT); 346 return (1); 347 } 348 349 static void 350 stream_stdio_send_eol(stream_t *str) 351 { 352 ASSERT(str->s_status & STREAM_OPEN); 353 ASSERT(str->s_status & STREAM_OUTPUT); 354 355 if (cxwrite(str->s_type.SF.s_fd, "\n", 1) < 0) 356 die(EMSG_WRITE, str->s_filename); 357 } 358 359 void 360 stream_stdio_flush(stream_t *str) 361 { 362 ASSERT(str->s_status & STREAM_OPEN); 363 ASSERT(str->s_status & STREAM_OUTPUT); 364 365 if (cxwrite(str->s_type.SF.s_fd, NULL, 0) < 0) 366 die(EMSG_WRITE, str->s_filename); 367 } 368 369 static void 370 stream_stdio_put_line(stream_t *str, line_rec_t *line) 371 { 372 ASSERT(str->s_status & STREAM_OPEN); 373 ASSERT(str->s_status & STREAM_OUTPUT); 374 375 if (line->l_data_length >= 0) { 376 if (cxwrite(str->s_type.SF.s_fd, line->l_data.sp, 377 line->l_data_length) < 0) 378 die(EMSG_WRITE, str->s_filename); 379 380 stream_stdio_send_eol(str); 381 __S(stats_incr_puts()); 382 } 383 safe_free(line->l_raw_collate.sp); 384 line->l_raw_collate.sp = NULL; 385 } 386 387 void 388 stream_stdio_put_line_unique(stream_t *str, line_rec_t *line) 389 { 390 static line_rec_t pvs; 391 static size_t collate_buf_len; 392 393 ASSERT(str->s_status & STREAM_OPEN); 394 ASSERT(str->s_status & STREAM_OUTPUT); 395 396 if (pvs.l_collate.sp != NULL && 397 collated(&pvs, line, 0, COLL_UNIQUE) == 0) { 398 __S(stats_incr_not_unique()); 399 return; 400 } 401 402 __S(stats_incr_put_unique()); 403 stream_stdio_put_line(str, line); 404 405 if (line->l_collate_length + 1 > collate_buf_len) { 406 pvs.l_collate.sp = safe_realloc(pvs.l_collate.sp, 407 line->l_collate_length + 1); 408 collate_buf_len = line->l_collate_length + 1; 409 } 410 411 (void) memcpy(pvs.l_collate.sp, line->l_collate.sp, 412 line->l_collate_length); 413 *(pvs.l_collate.sp + line->l_collate_length) = '\0'; 414 pvs.l_collate_length = line->l_collate_length; 415 } 416 417 int 418 stream_stdio_unlink(stream_t *str) 419 { 420 if (!(str->s_status & STREAM_NOTFILE)) 421 return (unlink(str->s_filename)); 422 423 return (0); 424 } 425 426 int 427 stream_stdio_free(stream_t *str) 428 { 429 /* 430 * Unmap the memory we allocated for input, if it's valid to do so. 431 */ 432 if (!(str->s_status & STREAM_OPEN) || 433 (str->s_consumer != NULL && 434 str->s_consumer->s_status & STREAM_NOT_FREEABLE)) 435 return (0); 436 437 if (str->s_buffer != NULL) { 438 if (munmap(str->s_buffer, str->s_buffer_size) < 0) 439 die(EMSG_MUNMAP, "/dev/zero"); 440 else { 441 str->s_buffer = NULL; 442 str->s_buffer_size = 0; 443 } 444 } 445 446 stream_unset(str, STREAM_PRIMED | STREAM_INSTANT); 447 448 return (1); 449 } 450 451 static int 452 stream_stdio_eos(stream_t *str) 453 { 454 int retval = 0; 455 456 ASSERT(!(str->s_status & STREAM_OUTPUT)); 457 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE)); 458 459 if (str == NULL || str->s_status & STREAM_EOS_REACHED) 460 return (1); 461 462 trip_eof(str->s_type.BF.s_fp); 463 if (feof(str->s_type.BF.s_fp) && 464 shelf == SHELF_VACANT && 465 str->s_current.l_collate_length != -1) { 466 retval = 1; 467 stream_set(str, STREAM_EOS_REACHED); 468 } 469 470 return (retval); 471 } 472 473 /*ARGSUSED*/ 474 static void 475 stream_stdio_release_line(stream_t *str) 476 { 477 } 478 479 const stream_ops_t stream_stdio_ops = { 480 stream_stdio_is_closable, 481 stream_stdio_close, 482 stream_stdio_eos, 483 stream_stdio_fetch, 484 stream_stdio_flush, 485 stream_stdio_free, 486 stream_stdio_open_for_write, 487 stream_stdio_prime, 488 stream_stdio_put_line, 489 stream_stdio_release_line, 490 stream_stdio_send_eol, 491 stream_stdio_unlink 492 }; 493