/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE * or http://www.opensolaris.org/os/licensing. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at usr/src/OPENSOLARIS.LICENSE. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END */ /* * Copyright 2004 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ #include "streams_stdio.h" #include "streams_common.h" #define SHELF_OCCUPIED 1 #define SHELF_VACANT 0 static int shelf = SHELF_VACANT; /* * Single-byte character file i/o-based streams implementation * * The routines in this file contain the implementation of the i/o streams * interface for those situations where the input is via stdio. * * The "shelf" * In the case where the input buffer contains insufficient room to hold the * entire line, the fractional line is shelved, and will be grafted to on the * subsequent read. */ int stream_stdio_open_for_write(stream_t *str) { stream_simple_file_t *SF = &(str->s_type.SF); ASSERT(!(str->s_status & STREAM_OPEN)); ASSERT(!(str->s_status & STREAM_OUTPUT)); if (str->s_status & STREAM_NOTFILE) SF->s_fd = fileno(stdout); else if ((SF->s_fd = open(str->s_filename, O_CREAT | O_TRUNC | O_WRONLY, OUTPUT_MODE)) < 0) { if (errno == EMFILE || errno == ENFILE) return (-1); else die(EMSG_OPEN, str->s_filename); } stream_set(str, STREAM_OPEN | STREAM_OUTPUT); return (1); } /* * In the case of an instantaneous stream, we allocate a small buffer (64k) here * for the stream; otherwise, the s_buffer and s_buffer_size members should have * been set by stream_set_size() prior to calling stream_prime(). * * Repriming (priming an already primed stream) is done when we are reentering a * file after having sorted a previous portion of the file. */ static int stream_stdio_prime(stream_t *str) { stream_buffered_file_t *BF = &(str->s_type.BF); char *current_position; char *end_of_buffer; char *next_nl; ASSERT(!(str->s_status & STREAM_OUTPUT)); ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE)); ASSERT(str->s_status & STREAM_OPEN); if (str->s_status & STREAM_INSTANT && (str->s_buffer == NULL)) { str->s_buffer = xzmap(0, STDIO_VBUF_SIZE, PROT_READ | PROT_WRITE, MAP_PRIVATE, 0); if (str->s_buffer == MAP_FAILED) die(EMSG_MMAP); str->s_buffer_size = STDIO_VBUF_SIZE; } ASSERT(str->s_buffer != NULL); if (stream_is_primed(str)) { /* * l_data_length is only set to -1 in the case of coincidental * exhaustion of the input butter. This is thus the only case * which involves no copying on a re-prime. */ int shelf_state = shelf; ASSERT(str->s_current.l_data_length >= -1); (void) memcpy(str->s_buffer, str->s_current.l_data.sp, str->s_current.l_data_length + 1); str->s_current.l_data.sp = str->s_buffer; /* * If our current line is incomplete, we need to get the rest of * the line--if we can't, then we've exhausted memory. */ if ((str->s_current.l_data_length == -1 || shelf_state == SHELF_OCCUPIED || *(str->s_current.l_data.sp + str->s_current.l_data_length) != '\n') && SOP_FETCH(str) == NEXT_LINE_INCOMPLETE && shelf_state == SHELF_OCCUPIED) die(EMSG_MEMORY); str->s_current.l_collate.sp = NULL; str->s_current.l_collate_length = 0; return (PRIME_SUCCEEDED); } stream_set(str, STREAM_PRIMED); current_position = (char *)str->s_buffer; end_of_buffer = (char *)str->s_buffer + str->s_buffer_size; trip_eof(BF->s_fp); if (!feof(BF->s_fp)) (void) fgets(current_position, end_of_buffer - current_position, BF->s_fp); else { stream_set(str, STREAM_EOS_REACHED); stream_unset(str, STREAM_PRIMED); return (PRIME_FAILED_EMPTY_FILE); } str->s_current.l_data.sp = current_position; /* * Because one might run sort on a binary file, strlen() is no longer * trustworthy--we must explicitly search for a newline. */ if ((next_nl = memchr(current_position, '\n', end_of_buffer - current_position)) == NULL) { warn(WMSG_NEWLINE_ADDED, str->s_filename); str->s_current.l_data_length = MIN(strlen(current_position), end_of_buffer - current_position); } else { str->s_current.l_data_length = next_nl - current_position; } str->s_current.l_collate.sp = NULL; str->s_current.l_collate_length = 0; __S(stats_incr_fetches()); return (PRIME_SUCCEEDED); } /* * stream_stdio_fetch() guarantees the return of a complete line, or a flag * indicating that the complete line could not be read. */ static ssize_t stream_stdio_fetch(stream_t *str) { ssize_t dist_to_buf_end; int ret_val; char *graft_pt, *next_nl; ASSERT(str->s_status & STREAM_OPEN); ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE)); ASSERT((str->s_status & STREAM_EOS_REACHED) == 0); graft_pt = str->s_current.l_data.sp + str->s_current.l_data_length + 1; if (shelf == SHELF_VACANT) { /* * The graft point is the start of the current line. */ str->s_current.l_data.sp = graft_pt; } else if (str->s_current.l_data_length > -1) { /* * Correct for terminating NUL on shelved line. This NUL is * only present if we didn't have the coincidental case * mentioned in the comment below. */ graft_pt--; } dist_to_buf_end = str->s_buffer_size - (graft_pt - (char *)str->s_buffer); if (dist_to_buf_end <= 1) { /* * fgets()'s behaviour in the case of a one-character buffer is * somewhat unhelpful: it fills the buffer with '\0' and * returns successfully (even if EOF has been reached for the * file in question). Since we may be in the middle of a * grafting operation, we leave early, maintaining the shelf in * its current state. */ str->s_current.l_data_length = -1; return (NEXT_LINE_INCOMPLETE); } if (fgets(graft_pt, dist_to_buf_end, str->s_type.BF.s_fp) == NULL) { if (feof(str->s_type.BF.s_fp)) stream_set(str, STREAM_EOS_REACHED); else die(EMSG_READ, str->s_filename); } trip_eof(str->s_type.BF.s_fp); /* * Because one might run sort on a binary file, strlen() is no longer * trustworthy--we must explicitly search for a newline. */ if ((next_nl = memchr(str->s_current.l_data.sp, '\n', dist_to_buf_end)) == NULL) { str->s_current.l_data_length = strlen(str->s_current.l_data.sp); } else { str->s_current.l_data_length = next_nl - str->s_current.l_data.sp; } str->s_current.l_collate_length = 0; if (*(str->s_current.l_data.sp + str->s_current.l_data_length) != '\n') { if (!feof(str->s_type.BF.s_fp)) { /* * We were only able to read part of the line; note that * we have something on the shelf for our next fetch. * If the shelf was previously occupied, and we still * can't get the entire line, then we need more * resources. */ if (shelf == SHELF_OCCUPIED) die(EMSG_MEMORY); shelf = SHELF_OCCUPIED; ret_val = NEXT_LINE_INCOMPLETE; __S(stats_incr_shelves()); } else { stream_set(str, STREAM_EOS_REACHED); warn(WMSG_NEWLINE_ADDED, str->s_filename); } } else { shelf = SHELF_VACANT; ret_val = NEXT_LINE_COMPLETE; __S(stats_incr_fetches()); } return (ret_val); } /* * stdio_fetch_overwrite() is used when we are performing an operation where we * need the buffer contents only over a single period. (merge and check are * operations of this kind.) In this case, we read the current line at the head * of the stream's defined buffer. If we cannot read the entire line, we have * not allocated sufficient memory. */ ssize_t stream_stdio_fetch_overwrite(stream_t *str) { ssize_t dist_to_buf_end; ASSERT(str->s_status & STREAM_OPEN); ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE)); ASSERT((str->s_status & STREAM_EOS_REACHED) == 0); str->s_current.l_data.sp = str->s_buffer; dist_to_buf_end = str->s_buffer_size; if (fgets(str->s_current.l_data.sp, dist_to_buf_end, str->s_type.BF.s_fp) == NULL) { if (feof(str->s_type.BF.s_fp)) stream_set(str, STREAM_EOS_REACHED); else die(EMSG_READ, str->s_filename); } trip_eof(str->s_type.BF.s_fp); str->s_current.l_data_length = strlen(str->s_current.l_data.sp) - 1; str->s_current.l_collate_length = 0; if (str->s_current.l_data_length == -1 || *(str->s_current.l_data.sp + str->s_current.l_data_length) != '\n') { if (!feof(str->s_type.BF.s_fp)) { /* * In the overwrite case, failure to read the entire * line means our buffer size was insufficient (as we * are using all of it). Exit, requesting more * resources. */ die(EMSG_MEMORY); } else { stream_set(str, STREAM_EOS_REACHED); warn(WMSG_NEWLINE_ADDED, str->s_filename); } } __S(stats_incr_fetches()); return (NEXT_LINE_COMPLETE); } int stream_stdio_is_closable(stream_t *str) { if (str->s_status & STREAM_OPEN && !(str->s_status & STREAM_NOTFILE)) return (1); return (0); } int stream_stdio_close(stream_t *str) { ASSERT(str->s_status & STREAM_OPEN); if (!(str->s_status & STREAM_OUTPUT)) { if (!(str->s_status & STREAM_NOTFILE)) (void) fclose(str->s_type.BF.s_fp); if (str->s_type.BF.s_vbuf != NULL) { free(str->s_type.BF.s_vbuf); str->s_type.BF.s_vbuf = NULL; } } else { if (cxwrite(str->s_type.SF.s_fd, NULL, 0) == 0) (void) close(str->s_type.SF.s_fd); else die(EMSG_WRITE, str->s_filename); } stream_unset(str, STREAM_OPEN | STREAM_PRIMED | STREAM_OUTPUT); return (1); } static void stream_stdio_send_eol(stream_t *str) { ASSERT(str->s_status & STREAM_OPEN); ASSERT(str->s_status & STREAM_OUTPUT); if (cxwrite(str->s_type.SF.s_fd, "\n", 1) < 0) die(EMSG_WRITE, str->s_filename); } void stream_stdio_flush(stream_t *str) { ASSERT(str->s_status & STREAM_OPEN); ASSERT(str->s_status & STREAM_OUTPUT); if (cxwrite(str->s_type.SF.s_fd, NULL, 0) < 0) die(EMSG_WRITE, str->s_filename); } static void stream_stdio_put_line(stream_t *str, line_rec_t *line) { ASSERT(str->s_status & STREAM_OPEN); ASSERT(str->s_status & STREAM_OUTPUT); if (line->l_data_length >= 0) { if (cxwrite(str->s_type.SF.s_fd, line->l_data.sp, line->l_data_length) < 0) die(EMSG_WRITE, str->s_filename); stream_stdio_send_eol(str); __S(stats_incr_puts()); } safe_free(line->l_raw_collate.sp); line->l_raw_collate.sp = NULL; } void stream_stdio_put_line_unique(stream_t *str, line_rec_t *line) { static line_rec_t pvs; static size_t collate_buf_len; ASSERT(str->s_status & STREAM_OPEN); ASSERT(str->s_status & STREAM_OUTPUT); if (pvs.l_collate.sp != NULL && collated(&pvs, line, 0, COLL_UNIQUE) == 0) { __S(stats_incr_not_unique()); return; } __S(stats_incr_put_unique()); stream_stdio_put_line(str, line); if (line->l_collate_length + 1 > collate_buf_len) { pvs.l_collate.sp = safe_realloc(pvs.l_collate.sp, line->l_collate_length + 1); collate_buf_len = line->l_collate_length + 1; } (void) memcpy(pvs.l_collate.sp, line->l_collate.sp, line->l_collate_length); *(pvs.l_collate.sp + line->l_collate_length) = '\0'; pvs.l_collate_length = line->l_collate_length; } int stream_stdio_unlink(stream_t *str) { if (!(str->s_status & STREAM_NOTFILE)) return (unlink(str->s_filename)); return (0); } int stream_stdio_free(stream_t *str) { /* * Unmap the memory we allocated for input, if it's valid to do so. */ if (!(str->s_status & STREAM_OPEN) || (str->s_consumer != NULL && str->s_consumer->s_status & STREAM_NOT_FREEABLE)) return (0); if (str->s_buffer != NULL) { if (munmap(str->s_buffer, str->s_buffer_size) < 0) die(EMSG_MUNMAP, "/dev/zero"); else { str->s_buffer = NULL; str->s_buffer_size = 0; } } stream_unset(str, STREAM_PRIMED | STREAM_INSTANT); return (1); } static int stream_stdio_eos(stream_t *str) { int retval = 0; ASSERT(!(str->s_status & STREAM_OUTPUT)); ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE)); if (str == NULL || str->s_status & STREAM_EOS_REACHED) return (1); trip_eof(str->s_type.BF.s_fp); if (feof(str->s_type.BF.s_fp) && shelf == SHELF_VACANT && str->s_current.l_collate_length != -1) { retval = 1; stream_set(str, STREAM_EOS_REACHED); } return (retval); } /*ARGSUSED*/ static void stream_stdio_release_line(stream_t *str) { } const stream_ops_t stream_stdio_ops = { stream_stdio_is_closable, stream_stdio_close, stream_stdio_eos, stream_stdio_fetch, stream_stdio_flush, stream_stdio_free, stream_stdio_open_for_write, stream_stdio_prime, stream_stdio_put_line, stream_stdio_release_line, stream_stdio_send_eol, stream_stdio_unlink };