xref: /titanic_50/usr/src/cmd/sort/common/streams_stdio.c (revision 7c478bd95313f5f23a4c958a745db2134aa03244)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include "streams_stdio.h"
30 #include "streams_common.h"
31 
32 #define	SHELF_OCCUPIED	1
33 #define	SHELF_VACANT	0
34 static int shelf = SHELF_VACANT;
35 
36 /*
37  * Single-byte character file i/o-based streams implementation
38  *
39  *   The routines in this file contain the implementation of the i/o streams
40  *   interface for those situations where the input is via stdio.
41  *
42  * The "shelf"
43  *   In the case where the input buffer contains insufficient room to hold the
44  *   entire line, the fractional line is shelved, and will be grafted to on the
45  *   subsequent read.
46  */
47 int
stream_stdio_open_for_write(stream_t * str)48 stream_stdio_open_for_write(stream_t *str)
49 {
50 	stream_simple_file_t	*SF = &(str->s_type.SF);
51 
52 	ASSERT(!(str->s_status & STREAM_OPEN));
53 	ASSERT(!(str->s_status & STREAM_OUTPUT));
54 
55 	if (str->s_status & STREAM_NOTFILE)
56 		SF->s_fd = fileno(stdout);
57 	else
58 		if ((SF->s_fd = open(str->s_filename, O_CREAT | O_TRUNC |
59 		    O_WRONLY, OUTPUT_MODE)) < 0) {
60 			if (errno == EMFILE || errno == ENFILE)
61 				return (-1);
62 			else
63 				die(EMSG_OPEN, str->s_filename);
64 		}
65 
66 	stream_set(str, STREAM_OPEN | STREAM_OUTPUT);
67 
68 	return (1);
69 }
70 
71 /*
72  * In the case of an instantaneous stream, we allocate a small buffer (64k) here
73  * for the stream; otherwise, the s_buffer and s_buffer_size members should have
74  * been set by stream_set_size() prior to calling stream_prime().
75  *
76  * Repriming (priming an already primed stream) is done when we are reentering a
77  * file after having sorted a previous portion of the file.
78  */
79 static int
stream_stdio_prime(stream_t * str)80 stream_stdio_prime(stream_t *str)
81 {
82 	stream_buffered_file_t *BF = &(str->s_type.BF);
83 	char *current_position;
84 	char *end_of_buffer;
85 	char *next_nl;
86 
87 	ASSERT(!(str->s_status & STREAM_OUTPUT));
88 	ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
89 	ASSERT(str->s_status & STREAM_OPEN);
90 
91 	if (str->s_status & STREAM_INSTANT && (str->s_buffer == NULL)) {
92 		str->s_buffer = xzmap(0, STDIO_VBUF_SIZE, PROT_READ |
93 		    PROT_WRITE, MAP_PRIVATE, 0);
94 		if (str->s_buffer == MAP_FAILED)
95 			die(EMSG_MMAP);
96 		str->s_buffer_size = STDIO_VBUF_SIZE;
97 	}
98 
99 	ASSERT(str->s_buffer != NULL);
100 
101 	if (stream_is_primed(str)) {
102 		/*
103 		 * l_data_length is only set to -1 in the case of coincidental
104 		 * exhaustion of the input butter.  This is thus the only case
105 		 * which involves no copying on a re-prime.
106 		 */
107 		int shelf_state = shelf;
108 
109 		ASSERT(str->s_current.l_data_length >= -1);
110 		(void) memcpy(str->s_buffer, str->s_current.l_data.sp,
111 		    str->s_current.l_data_length + 1);
112 		str->s_current.l_data.sp = str->s_buffer;
113 
114 		/*
115 		 * If our current line is incomplete, we need to get the rest of
116 		 * the line--if we can't, then we've exhausted memory.
117 		 */
118 		if ((str->s_current.l_data_length == -1 ||
119 		    shelf_state == SHELF_OCCUPIED ||
120 		    *(str->s_current.l_data.sp +
121 		    str->s_current.l_data_length) != '\n') &&
122 		    SOP_FETCH(str) == NEXT_LINE_INCOMPLETE &&
123 		    shelf_state == SHELF_OCCUPIED)
124 			die(EMSG_MEMORY);
125 
126 		str->s_current.l_collate.sp = NULL;
127 		str->s_current.l_collate_length = 0;
128 
129 		return (PRIME_SUCCEEDED);
130 	}
131 
132 	stream_set(str, STREAM_PRIMED);
133 
134 	current_position = (char *)str->s_buffer;
135 	end_of_buffer = (char *)str->s_buffer + str->s_buffer_size;
136 
137 	trip_eof(BF->s_fp);
138 	if (!feof(BF->s_fp))
139 		(void) fgets(current_position, end_of_buffer - current_position,
140 		    BF->s_fp);
141 	else {
142 		stream_set(str, STREAM_EOS_REACHED);
143 		stream_unset(str, STREAM_PRIMED);
144 		return (PRIME_FAILED_EMPTY_FILE);
145 	}
146 
147 	str->s_current.l_data.sp = current_position;
148 	/*
149 	 * Because one might run sort on a binary file, strlen() is no longer
150 	 * trustworthy--we must explicitly search for a newline.
151 	 */
152 	if ((next_nl = memchr(current_position, '\n',
153 	    end_of_buffer - current_position)) == NULL) {
154 		warn(WMSG_NEWLINE_ADDED, str->s_filename);
155 		str->s_current.l_data_length = MIN(strlen(current_position),
156 		    end_of_buffer - current_position);
157 	} else {
158 		str->s_current.l_data_length = next_nl - current_position;
159 	}
160 
161 	str->s_current.l_collate.sp = NULL;
162 	str->s_current.l_collate_length = 0;
163 
164 	__S(stats_incr_fetches());
165 	return (PRIME_SUCCEEDED);
166 }
167 
168 /*
169  * stream_stdio_fetch() guarantees the return of a complete line, or a flag
170  * indicating that the complete line could not be read.
171  */
172 static ssize_t
stream_stdio_fetch(stream_t * str)173 stream_stdio_fetch(stream_t *str)
174 {
175 	ssize_t	dist_to_buf_end;
176 	int ret_val;
177 	char *graft_pt, *next_nl;
178 
179 	ASSERT(str->s_status & STREAM_OPEN);
180 	ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
181 	ASSERT((str->s_status & STREAM_EOS_REACHED) == 0);
182 
183 	graft_pt = str->s_current.l_data.sp + str->s_current.l_data_length + 1;
184 
185 	if (shelf == SHELF_VACANT) {
186 		/*
187 		 * The graft point is the start of the current line.
188 		 */
189 		str->s_current.l_data.sp = graft_pt;
190 	} else if (str->s_current.l_data_length > -1) {
191 		/*
192 		 * Correct for terminating NUL on shelved line.  This NUL is
193 		 * only present if we didn't have the coincidental case
194 		 * mentioned in the comment below.
195 		 */
196 		graft_pt--;
197 	}
198 
199 	dist_to_buf_end = str->s_buffer_size - (graft_pt -
200 	    (char *)str->s_buffer);
201 
202 	if (dist_to_buf_end <= 1) {
203 		/*
204 		 * fgets()'s behaviour in the case of a one-character buffer is
205 		 * somewhat unhelpful:  it fills the buffer with '\0' and
206 		 * returns successfully (even if EOF has been reached for the
207 		 * file in question).  Since we may be in the middle of a
208 		 * grafting operation, we leave early, maintaining the shelf in
209 		 * its current state.
210 		 */
211 		str->s_current.l_data_length = -1;
212 		return (NEXT_LINE_INCOMPLETE);
213 	}
214 
215 	if (fgets(graft_pt, dist_to_buf_end, str->s_type.BF.s_fp) == NULL) {
216 		if (feof(str->s_type.BF.s_fp))
217 			stream_set(str, STREAM_EOS_REACHED);
218 		else
219 			die(EMSG_READ, str->s_filename);
220 	}
221 
222 	trip_eof(str->s_type.BF.s_fp);
223 	/*
224 	 * Because one might run sort on a binary file, strlen() is no longer
225 	 * trustworthy--we must explicitly search for a newline.
226 	 */
227 	if ((next_nl = memchr(str->s_current.l_data.sp, '\n',
228 	    dist_to_buf_end)) == NULL) {
229 		str->s_current.l_data_length = strlen(str->s_current.l_data.sp);
230 	} else {
231 		str->s_current.l_data_length = next_nl -
232 		    str->s_current.l_data.sp;
233 	}
234 
235 	str->s_current.l_collate_length = 0;
236 
237 	if (*(str->s_current.l_data.sp + str->s_current.l_data_length) !=
238 	    '\n') {
239 		if (!feof(str->s_type.BF.s_fp)) {
240 			/*
241 			 * We were only able to read part of the line; note that
242 			 * we have something on the shelf for our next fetch.
243 			 * If the shelf was previously occupied, and we still
244 			 * can't get the entire line, then we need more
245 			 * resources.
246 			 */
247 			if (shelf == SHELF_OCCUPIED)
248 				die(EMSG_MEMORY);
249 
250 			shelf = SHELF_OCCUPIED;
251 			ret_val = NEXT_LINE_INCOMPLETE;
252 
253 			__S(stats_incr_shelves());
254 		} else {
255 			stream_set(str, STREAM_EOS_REACHED);
256 			warn(WMSG_NEWLINE_ADDED, str->s_filename);
257 		}
258 	} else {
259 		shelf = SHELF_VACANT;
260 		ret_val = NEXT_LINE_COMPLETE;
261 		__S(stats_incr_fetches());
262 	}
263 
264 	return (ret_val);
265 }
266 
267 /*
268  * stdio_fetch_overwrite() is used when we are performing an operation where we
269  * need the buffer contents only over a single period.  (merge and check are
270  * operations of this kind.)  In this case, we read the current line at the head
271  * of the stream's defined buffer.  If we cannot read the entire line, we have
272  * not allocated sufficient memory.
273  */
274 ssize_t
stream_stdio_fetch_overwrite(stream_t * str)275 stream_stdio_fetch_overwrite(stream_t *str)
276 {
277 	ssize_t	dist_to_buf_end;
278 
279 	ASSERT(str->s_status & STREAM_OPEN);
280 	ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
281 	ASSERT((str->s_status & STREAM_EOS_REACHED) == 0);
282 
283 	str->s_current.l_data.sp = str->s_buffer;
284 	dist_to_buf_end = str->s_buffer_size;
285 
286 	if (fgets(str->s_current.l_data.sp, dist_to_buf_end,
287 	    str->s_type.BF.s_fp) == NULL) {
288 		if (feof(str->s_type.BF.s_fp))
289 			stream_set(str, STREAM_EOS_REACHED);
290 		else
291 			die(EMSG_READ, str->s_filename);
292 	}
293 
294 	trip_eof(str->s_type.BF.s_fp);
295 	str->s_current.l_data_length = strlen(str->s_current.l_data.sp) - 1;
296 	str->s_current.l_collate_length = 0;
297 
298 	if (str->s_current.l_data_length == -1 ||
299 	    *(str->s_current.l_data.sp + str->s_current.l_data_length) !=
300 	    '\n') {
301 		if (!feof(str->s_type.BF.s_fp)) {
302 			/*
303 			 * In the overwrite case, failure to read the entire
304 			 * line means our buffer size was insufficient (as we
305 			 * are using all of it).  Exit, requesting more
306 			 * resources.
307 			 */
308 			die(EMSG_MEMORY);
309 		} else {
310 			stream_set(str, STREAM_EOS_REACHED);
311 			warn(WMSG_NEWLINE_ADDED, str->s_filename);
312 		}
313 	}
314 
315 	__S(stats_incr_fetches());
316 	return (NEXT_LINE_COMPLETE);
317 }
318 
319 int
stream_stdio_is_closable(stream_t * str)320 stream_stdio_is_closable(stream_t *str)
321 {
322 	if (str->s_status & STREAM_OPEN && !(str->s_status & STREAM_NOTFILE))
323 		return (1);
324 	return (0);
325 }
326 
327 int
stream_stdio_close(stream_t * str)328 stream_stdio_close(stream_t *str)
329 {
330 	ASSERT(str->s_status & STREAM_OPEN);
331 
332 	if (!(str->s_status & STREAM_OUTPUT)) {
333 		if (!(str->s_status & STREAM_NOTFILE))
334 			(void) fclose(str->s_type.BF.s_fp);
335 
336 		if (str->s_type.BF.s_vbuf != NULL) {
337 			free(str->s_type.BF.s_vbuf);
338 			str->s_type.BF.s_vbuf = NULL;
339 		}
340 	} else {
341 		if (cxwrite(str->s_type.SF.s_fd, NULL, 0) == 0)
342 			(void) close(str->s_type.SF.s_fd);
343 		else
344 			die(EMSG_WRITE, str->s_filename);
345 	}
346 
347 	stream_unset(str, STREAM_OPEN | STREAM_PRIMED | STREAM_OUTPUT);
348 	return (1);
349 }
350 
351 static void
stream_stdio_send_eol(stream_t * str)352 stream_stdio_send_eol(stream_t *str)
353 {
354 	ASSERT(str->s_status & STREAM_OPEN);
355 	ASSERT(str->s_status & STREAM_OUTPUT);
356 
357 	if (cxwrite(str->s_type.SF.s_fd, "\n", 1) < 0)
358 		die(EMSG_WRITE, str->s_filename);
359 }
360 
361 void
stream_stdio_flush(stream_t * str)362 stream_stdio_flush(stream_t *str)
363 {
364 	ASSERT(str->s_status & STREAM_OPEN);
365 	ASSERT(str->s_status & STREAM_OUTPUT);
366 
367 	if (cxwrite(str->s_type.SF.s_fd, NULL, 0) < 0)
368 		die(EMSG_WRITE, str->s_filename);
369 }
370 
371 static void
stream_stdio_put_line(stream_t * str,line_rec_t * line)372 stream_stdio_put_line(stream_t *str, line_rec_t *line)
373 {
374 	ASSERT(str->s_status & STREAM_OPEN);
375 	ASSERT(str->s_status & STREAM_OUTPUT);
376 
377 	if (line->l_data_length >= 0) {
378 		if (cxwrite(str->s_type.SF.s_fd, line->l_data.sp,
379 		    line->l_data_length) < 0)
380 			die(EMSG_WRITE, str->s_filename);
381 
382 		stream_stdio_send_eol(str);
383 		__S(stats_incr_puts());
384 	}
385 	safe_free(line->l_raw_collate.sp);
386 	line->l_raw_collate.sp = NULL;
387 }
388 
389 void
stream_stdio_put_line_unique(stream_t * str,line_rec_t * line)390 stream_stdio_put_line_unique(stream_t *str, line_rec_t *line)
391 {
392 	static line_rec_t pvs;
393 	static size_t collate_buf_len;
394 
395 	ASSERT(str->s_status & STREAM_OPEN);
396 	ASSERT(str->s_status & STREAM_OUTPUT);
397 
398 	if (pvs.l_collate.sp != NULL &&
399 	    collated(&pvs, line, 0, COLL_UNIQUE) == 0) {
400 		__S(stats_incr_not_unique());
401 		return;
402 	}
403 
404 	__S(stats_incr_put_unique());
405 	stream_stdio_put_line(str, line);
406 
407 	if (line->l_collate_length + 1 > collate_buf_len) {
408 		pvs.l_collate.sp = safe_realloc(pvs.l_collate.sp,
409 		    line->l_collate_length + 1);
410 		collate_buf_len = line->l_collate_length + 1;
411 	}
412 
413 	(void) memcpy(pvs.l_collate.sp, line->l_collate.sp,
414 	    line->l_collate_length);
415 	*(pvs.l_collate.sp + line->l_collate_length) = '\0';
416 	pvs.l_collate_length = line->l_collate_length;
417 }
418 
419 int
stream_stdio_unlink(stream_t * str)420 stream_stdio_unlink(stream_t *str)
421 {
422 	if (!(str->s_status & STREAM_NOTFILE))
423 		return (unlink(str->s_filename));
424 
425 	return (0);
426 }
427 
428 int
stream_stdio_free(stream_t * str)429 stream_stdio_free(stream_t *str)
430 {
431 	/*
432 	 * Unmap the memory we allocated for input, if it's valid to do so.
433 	 */
434 	if (!(str->s_status & STREAM_OPEN) ||
435 	    (str->s_consumer != NULL &&
436 	    str->s_consumer->s_status & STREAM_NOT_FREEABLE))
437 		return (0);
438 
439 	if (str->s_buffer != NULL) {
440 		if (munmap(str->s_buffer, str->s_buffer_size) < 0)
441 			die(EMSG_MUNMAP, "/dev/zero");
442 		else {
443 			str->s_buffer = NULL;
444 			str->s_buffer_size = 0;
445 		}
446 	}
447 
448 	stream_unset(str, STREAM_PRIMED | STREAM_INSTANT);
449 
450 	return (1);
451 }
452 
453 static int
stream_stdio_eos(stream_t * str)454 stream_stdio_eos(stream_t *str)
455 {
456 	int retval = 0;
457 
458 	ASSERT(!(str->s_status & STREAM_OUTPUT));
459 	ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
460 
461 	if (str == NULL || str->s_status & STREAM_EOS_REACHED)
462 		return (1);
463 
464 	trip_eof(str->s_type.BF.s_fp);
465 	if (feof(str->s_type.BF.s_fp) &&
466 	    shelf == SHELF_VACANT &&
467 	    str->s_current.l_collate_length != -1) {
468 		retval = 1;
469 		stream_set(str, STREAM_EOS_REACHED);
470 	}
471 
472 	return (retval);
473 }
474 
475 /*ARGSUSED*/
476 static void
stream_stdio_release_line(stream_t * str)477 stream_stdio_release_line(stream_t *str)
478 {
479 }
480 
481 const stream_ops_t stream_stdio_ops = {
482 	stream_stdio_is_closable,
483 	stream_stdio_close,
484 	stream_stdio_eos,
485 	stream_stdio_fetch,
486 	stream_stdio_flush,
487 	stream_stdio_free,
488 	stream_stdio_open_for_write,
489 	stream_stdio_prime,
490 	stream_stdio_put_line,
491 	stream_stdio_release_line,
492 	stream_stdio_send_eol,
493 	stream_stdio_unlink
494 };
495