1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
7 * with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22 /*
23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #pragma ident "%Z%%M% %I% %E% SMI"
28
29 #include "streams_stdio.h"
30 #include "streams_common.h"
31
32 #define SHELF_OCCUPIED 1
33 #define SHELF_VACANT 0
34 static int shelf = SHELF_VACANT;
35
36 /*
37 * Single-byte character file i/o-based streams implementation
38 *
39 * The routines in this file contain the implementation of the i/o streams
40 * interface for those situations where the input is via stdio.
41 *
42 * The "shelf"
43 * In the case where the input buffer contains insufficient room to hold the
44 * entire line, the fractional line is shelved, and will be grafted to on the
45 * subsequent read.
46 */
47 int
stream_stdio_open_for_write(stream_t * str)48 stream_stdio_open_for_write(stream_t *str)
49 {
50 stream_simple_file_t *SF = &(str->s_type.SF);
51
52 ASSERT(!(str->s_status & STREAM_OPEN));
53 ASSERT(!(str->s_status & STREAM_OUTPUT));
54
55 if (str->s_status & STREAM_NOTFILE)
56 SF->s_fd = fileno(stdout);
57 else
58 if ((SF->s_fd = open(str->s_filename, O_CREAT | O_TRUNC |
59 O_WRONLY, OUTPUT_MODE)) < 0) {
60 if (errno == EMFILE || errno == ENFILE)
61 return (-1);
62 else
63 die(EMSG_OPEN, str->s_filename);
64 }
65
66 stream_set(str, STREAM_OPEN | STREAM_OUTPUT);
67
68 return (1);
69 }
70
71 /*
72 * In the case of an instantaneous stream, we allocate a small buffer (64k) here
73 * for the stream; otherwise, the s_buffer and s_buffer_size members should have
74 * been set by stream_set_size() prior to calling stream_prime().
75 *
76 * Repriming (priming an already primed stream) is done when we are reentering a
77 * file after having sorted a previous portion of the file.
78 */
79 static int
stream_stdio_prime(stream_t * str)80 stream_stdio_prime(stream_t *str)
81 {
82 stream_buffered_file_t *BF = &(str->s_type.BF);
83 char *current_position;
84 char *end_of_buffer;
85 char *next_nl;
86
87 ASSERT(!(str->s_status & STREAM_OUTPUT));
88 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
89 ASSERT(str->s_status & STREAM_OPEN);
90
91 if (str->s_status & STREAM_INSTANT && (str->s_buffer == NULL)) {
92 str->s_buffer = xzmap(0, STDIO_VBUF_SIZE, PROT_READ |
93 PROT_WRITE, MAP_PRIVATE, 0);
94 if (str->s_buffer == MAP_FAILED)
95 die(EMSG_MMAP);
96 str->s_buffer_size = STDIO_VBUF_SIZE;
97 }
98
99 ASSERT(str->s_buffer != NULL);
100
101 if (stream_is_primed(str)) {
102 /*
103 * l_data_length is only set to -1 in the case of coincidental
104 * exhaustion of the input butter. This is thus the only case
105 * which involves no copying on a re-prime.
106 */
107 int shelf_state = shelf;
108
109 ASSERT(str->s_current.l_data_length >= -1);
110 (void) memcpy(str->s_buffer, str->s_current.l_data.sp,
111 str->s_current.l_data_length + 1);
112 str->s_current.l_data.sp = str->s_buffer;
113
114 /*
115 * If our current line is incomplete, we need to get the rest of
116 * the line--if we can't, then we've exhausted memory.
117 */
118 if ((str->s_current.l_data_length == -1 ||
119 shelf_state == SHELF_OCCUPIED ||
120 *(str->s_current.l_data.sp +
121 str->s_current.l_data_length) != '\n') &&
122 SOP_FETCH(str) == NEXT_LINE_INCOMPLETE &&
123 shelf_state == SHELF_OCCUPIED)
124 die(EMSG_MEMORY);
125
126 str->s_current.l_collate.sp = NULL;
127 str->s_current.l_collate_length = 0;
128
129 return (PRIME_SUCCEEDED);
130 }
131
132 stream_set(str, STREAM_PRIMED);
133
134 current_position = (char *)str->s_buffer;
135 end_of_buffer = (char *)str->s_buffer + str->s_buffer_size;
136
137 trip_eof(BF->s_fp);
138 if (!feof(BF->s_fp))
139 (void) fgets(current_position, end_of_buffer - current_position,
140 BF->s_fp);
141 else {
142 stream_set(str, STREAM_EOS_REACHED);
143 stream_unset(str, STREAM_PRIMED);
144 return (PRIME_FAILED_EMPTY_FILE);
145 }
146
147 str->s_current.l_data.sp = current_position;
148 /*
149 * Because one might run sort on a binary file, strlen() is no longer
150 * trustworthy--we must explicitly search for a newline.
151 */
152 if ((next_nl = memchr(current_position, '\n',
153 end_of_buffer - current_position)) == NULL) {
154 warn(WMSG_NEWLINE_ADDED, str->s_filename);
155 str->s_current.l_data_length = MIN(strlen(current_position),
156 end_of_buffer - current_position);
157 } else {
158 str->s_current.l_data_length = next_nl - current_position;
159 }
160
161 str->s_current.l_collate.sp = NULL;
162 str->s_current.l_collate_length = 0;
163
164 __S(stats_incr_fetches());
165 return (PRIME_SUCCEEDED);
166 }
167
168 /*
169 * stream_stdio_fetch() guarantees the return of a complete line, or a flag
170 * indicating that the complete line could not be read.
171 */
172 static ssize_t
stream_stdio_fetch(stream_t * str)173 stream_stdio_fetch(stream_t *str)
174 {
175 ssize_t dist_to_buf_end;
176 int ret_val;
177 char *graft_pt, *next_nl;
178
179 ASSERT(str->s_status & STREAM_OPEN);
180 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
181 ASSERT((str->s_status & STREAM_EOS_REACHED) == 0);
182
183 graft_pt = str->s_current.l_data.sp + str->s_current.l_data_length + 1;
184
185 if (shelf == SHELF_VACANT) {
186 /*
187 * The graft point is the start of the current line.
188 */
189 str->s_current.l_data.sp = graft_pt;
190 } else if (str->s_current.l_data_length > -1) {
191 /*
192 * Correct for terminating NUL on shelved line. This NUL is
193 * only present if we didn't have the coincidental case
194 * mentioned in the comment below.
195 */
196 graft_pt--;
197 }
198
199 dist_to_buf_end = str->s_buffer_size - (graft_pt -
200 (char *)str->s_buffer);
201
202 if (dist_to_buf_end <= 1) {
203 /*
204 * fgets()'s behaviour in the case of a one-character buffer is
205 * somewhat unhelpful: it fills the buffer with '\0' and
206 * returns successfully (even if EOF has been reached for the
207 * file in question). Since we may be in the middle of a
208 * grafting operation, we leave early, maintaining the shelf in
209 * its current state.
210 */
211 str->s_current.l_data_length = -1;
212 return (NEXT_LINE_INCOMPLETE);
213 }
214
215 if (fgets(graft_pt, dist_to_buf_end, str->s_type.BF.s_fp) == NULL) {
216 if (feof(str->s_type.BF.s_fp))
217 stream_set(str, STREAM_EOS_REACHED);
218 else
219 die(EMSG_READ, str->s_filename);
220 }
221
222 trip_eof(str->s_type.BF.s_fp);
223 /*
224 * Because one might run sort on a binary file, strlen() is no longer
225 * trustworthy--we must explicitly search for a newline.
226 */
227 if ((next_nl = memchr(str->s_current.l_data.sp, '\n',
228 dist_to_buf_end)) == NULL) {
229 str->s_current.l_data_length = strlen(str->s_current.l_data.sp);
230 } else {
231 str->s_current.l_data_length = next_nl -
232 str->s_current.l_data.sp;
233 }
234
235 str->s_current.l_collate_length = 0;
236
237 if (*(str->s_current.l_data.sp + str->s_current.l_data_length) !=
238 '\n') {
239 if (!feof(str->s_type.BF.s_fp)) {
240 /*
241 * We were only able to read part of the line; note that
242 * we have something on the shelf for our next fetch.
243 * If the shelf was previously occupied, and we still
244 * can't get the entire line, then we need more
245 * resources.
246 */
247 if (shelf == SHELF_OCCUPIED)
248 die(EMSG_MEMORY);
249
250 shelf = SHELF_OCCUPIED;
251 ret_val = NEXT_LINE_INCOMPLETE;
252
253 __S(stats_incr_shelves());
254 } else {
255 stream_set(str, STREAM_EOS_REACHED);
256 warn(WMSG_NEWLINE_ADDED, str->s_filename);
257 }
258 } else {
259 shelf = SHELF_VACANT;
260 ret_val = NEXT_LINE_COMPLETE;
261 __S(stats_incr_fetches());
262 }
263
264 return (ret_val);
265 }
266
267 /*
268 * stdio_fetch_overwrite() is used when we are performing an operation where we
269 * need the buffer contents only over a single period. (merge and check are
270 * operations of this kind.) In this case, we read the current line at the head
271 * of the stream's defined buffer. If we cannot read the entire line, we have
272 * not allocated sufficient memory.
273 */
274 ssize_t
stream_stdio_fetch_overwrite(stream_t * str)275 stream_stdio_fetch_overwrite(stream_t *str)
276 {
277 ssize_t dist_to_buf_end;
278
279 ASSERT(str->s_status & STREAM_OPEN);
280 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
281 ASSERT((str->s_status & STREAM_EOS_REACHED) == 0);
282
283 str->s_current.l_data.sp = str->s_buffer;
284 dist_to_buf_end = str->s_buffer_size;
285
286 if (fgets(str->s_current.l_data.sp, dist_to_buf_end,
287 str->s_type.BF.s_fp) == NULL) {
288 if (feof(str->s_type.BF.s_fp))
289 stream_set(str, STREAM_EOS_REACHED);
290 else
291 die(EMSG_READ, str->s_filename);
292 }
293
294 trip_eof(str->s_type.BF.s_fp);
295 str->s_current.l_data_length = strlen(str->s_current.l_data.sp) - 1;
296 str->s_current.l_collate_length = 0;
297
298 if (str->s_current.l_data_length == -1 ||
299 *(str->s_current.l_data.sp + str->s_current.l_data_length) !=
300 '\n') {
301 if (!feof(str->s_type.BF.s_fp)) {
302 /*
303 * In the overwrite case, failure to read the entire
304 * line means our buffer size was insufficient (as we
305 * are using all of it). Exit, requesting more
306 * resources.
307 */
308 die(EMSG_MEMORY);
309 } else {
310 stream_set(str, STREAM_EOS_REACHED);
311 warn(WMSG_NEWLINE_ADDED, str->s_filename);
312 }
313 }
314
315 __S(stats_incr_fetches());
316 return (NEXT_LINE_COMPLETE);
317 }
318
319 int
stream_stdio_is_closable(stream_t * str)320 stream_stdio_is_closable(stream_t *str)
321 {
322 if (str->s_status & STREAM_OPEN && !(str->s_status & STREAM_NOTFILE))
323 return (1);
324 return (0);
325 }
326
327 int
stream_stdio_close(stream_t * str)328 stream_stdio_close(stream_t *str)
329 {
330 ASSERT(str->s_status & STREAM_OPEN);
331
332 if (!(str->s_status & STREAM_OUTPUT)) {
333 if (!(str->s_status & STREAM_NOTFILE))
334 (void) fclose(str->s_type.BF.s_fp);
335
336 if (str->s_type.BF.s_vbuf != NULL) {
337 free(str->s_type.BF.s_vbuf);
338 str->s_type.BF.s_vbuf = NULL;
339 }
340 } else {
341 if (cxwrite(str->s_type.SF.s_fd, NULL, 0) == 0)
342 (void) close(str->s_type.SF.s_fd);
343 else
344 die(EMSG_WRITE, str->s_filename);
345 }
346
347 stream_unset(str, STREAM_OPEN | STREAM_PRIMED | STREAM_OUTPUT);
348 return (1);
349 }
350
351 static void
stream_stdio_send_eol(stream_t * str)352 stream_stdio_send_eol(stream_t *str)
353 {
354 ASSERT(str->s_status & STREAM_OPEN);
355 ASSERT(str->s_status & STREAM_OUTPUT);
356
357 if (cxwrite(str->s_type.SF.s_fd, "\n", 1) < 0)
358 die(EMSG_WRITE, str->s_filename);
359 }
360
361 void
stream_stdio_flush(stream_t * str)362 stream_stdio_flush(stream_t *str)
363 {
364 ASSERT(str->s_status & STREAM_OPEN);
365 ASSERT(str->s_status & STREAM_OUTPUT);
366
367 if (cxwrite(str->s_type.SF.s_fd, NULL, 0) < 0)
368 die(EMSG_WRITE, str->s_filename);
369 }
370
371 static void
stream_stdio_put_line(stream_t * str,line_rec_t * line)372 stream_stdio_put_line(stream_t *str, line_rec_t *line)
373 {
374 ASSERT(str->s_status & STREAM_OPEN);
375 ASSERT(str->s_status & STREAM_OUTPUT);
376
377 if (line->l_data_length >= 0) {
378 if (cxwrite(str->s_type.SF.s_fd, line->l_data.sp,
379 line->l_data_length) < 0)
380 die(EMSG_WRITE, str->s_filename);
381
382 stream_stdio_send_eol(str);
383 __S(stats_incr_puts());
384 }
385 safe_free(line->l_raw_collate.sp);
386 line->l_raw_collate.sp = NULL;
387 }
388
389 void
stream_stdio_put_line_unique(stream_t * str,line_rec_t * line)390 stream_stdio_put_line_unique(stream_t *str, line_rec_t *line)
391 {
392 static line_rec_t pvs;
393 static size_t collate_buf_len;
394
395 ASSERT(str->s_status & STREAM_OPEN);
396 ASSERT(str->s_status & STREAM_OUTPUT);
397
398 if (pvs.l_collate.sp != NULL &&
399 collated(&pvs, line, 0, COLL_UNIQUE) == 0) {
400 __S(stats_incr_not_unique());
401 return;
402 }
403
404 __S(stats_incr_put_unique());
405 stream_stdio_put_line(str, line);
406
407 if (line->l_collate_length + 1 > collate_buf_len) {
408 pvs.l_collate.sp = safe_realloc(pvs.l_collate.sp,
409 line->l_collate_length + 1);
410 collate_buf_len = line->l_collate_length + 1;
411 }
412
413 (void) memcpy(pvs.l_collate.sp, line->l_collate.sp,
414 line->l_collate_length);
415 *(pvs.l_collate.sp + line->l_collate_length) = '\0';
416 pvs.l_collate_length = line->l_collate_length;
417 }
418
419 int
stream_stdio_unlink(stream_t * str)420 stream_stdio_unlink(stream_t *str)
421 {
422 if (!(str->s_status & STREAM_NOTFILE))
423 return (unlink(str->s_filename));
424
425 return (0);
426 }
427
428 int
stream_stdio_free(stream_t * str)429 stream_stdio_free(stream_t *str)
430 {
431 /*
432 * Unmap the memory we allocated for input, if it's valid to do so.
433 */
434 if (!(str->s_status & STREAM_OPEN) ||
435 (str->s_consumer != NULL &&
436 str->s_consumer->s_status & STREAM_NOT_FREEABLE))
437 return (0);
438
439 if (str->s_buffer != NULL) {
440 if (munmap(str->s_buffer, str->s_buffer_size) < 0)
441 die(EMSG_MUNMAP, "/dev/zero");
442 else {
443 str->s_buffer = NULL;
444 str->s_buffer_size = 0;
445 }
446 }
447
448 stream_unset(str, STREAM_PRIMED | STREAM_INSTANT);
449
450 return (1);
451 }
452
453 static int
stream_stdio_eos(stream_t * str)454 stream_stdio_eos(stream_t *str)
455 {
456 int retval = 0;
457
458 ASSERT(!(str->s_status & STREAM_OUTPUT));
459 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
460
461 if (str == NULL || str->s_status & STREAM_EOS_REACHED)
462 return (1);
463
464 trip_eof(str->s_type.BF.s_fp);
465 if (feof(str->s_type.BF.s_fp) &&
466 shelf == SHELF_VACANT &&
467 str->s_current.l_collate_length != -1) {
468 retval = 1;
469 stream_set(str, STREAM_EOS_REACHED);
470 }
471
472 return (retval);
473 }
474
475 /*ARGSUSED*/
476 static void
stream_stdio_release_line(stream_t * str)477 stream_stdio_release_line(stream_t *str)
478 {
479 }
480
481 const stream_ops_t stream_stdio_ops = {
482 stream_stdio_is_closable,
483 stream_stdio_close,
484 stream_stdio_eos,
485 stream_stdio_fetch,
486 stream_stdio_flush,
487 stream_stdio_free,
488 stream_stdio_open_for_write,
489 stream_stdio_prime,
490 stream_stdio_put_line,
491 stream_stdio_release_line,
492 stream_stdio_send_eol,
493 stream_stdio_unlink
494 };
495