1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
7 * with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22 /*
23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #include "streams_stdio.h"
28 #include "streams_common.h"
29
30 #define SHELF_OCCUPIED 1
31 #define SHELF_VACANT 0
32 static int shelf = SHELF_VACANT;
33
34 /*
35 * Single-byte character file i/o-based streams implementation
36 *
37 * The routines in this file contain the implementation of the i/o streams
38 * interface for those situations where the input is via stdio.
39 *
40 * The "shelf"
41 * In the case where the input buffer contains insufficient room to hold the
42 * entire line, the fractional line is shelved, and will be grafted to on the
43 * subsequent read.
44 */
45 int
stream_stdio_open_for_write(stream_t * str)46 stream_stdio_open_for_write(stream_t *str)
47 {
48 stream_simple_file_t *SF = &(str->s_type.SF);
49
50 ASSERT(!(str->s_status & STREAM_OPEN));
51 ASSERT(!(str->s_status & STREAM_OUTPUT));
52
53 if (str->s_status & STREAM_NOTFILE)
54 SF->s_fd = fileno(stdout);
55 else
56 if ((SF->s_fd = open(str->s_filename, O_CREAT | O_TRUNC |
57 O_WRONLY, OUTPUT_MODE)) < 0) {
58 if (errno == EMFILE || errno == ENFILE)
59 return (-1);
60 else
61 die(EMSG_OPEN, str->s_filename);
62 }
63
64 stream_set(str, STREAM_OPEN | STREAM_OUTPUT);
65
66 return (1);
67 }
68
69 /*
70 * In the case of an instantaneous stream, we allocate a small buffer (64k) here
71 * for the stream; otherwise, the s_buffer and s_buffer_size members should have
72 * been set by stream_set_size() prior to calling stream_prime().
73 *
74 * Repriming (priming an already primed stream) is done when we are reentering a
75 * file after having sorted a previous portion of the file.
76 */
77 static int
stream_stdio_prime(stream_t * str)78 stream_stdio_prime(stream_t *str)
79 {
80 stream_buffered_file_t *BF = &(str->s_type.BF);
81 char *current_position;
82 char *end_of_buffer;
83 char *next_nl;
84
85 ASSERT(!(str->s_status & STREAM_OUTPUT));
86 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
87 ASSERT(str->s_status & STREAM_OPEN);
88
89 if (str->s_status & STREAM_INSTANT && (str->s_buffer == NULL)) {
90 str->s_buffer = xzmap(0, STDIO_VBUF_SIZE, PROT_READ |
91 PROT_WRITE, MAP_PRIVATE, 0);
92 if (str->s_buffer == MAP_FAILED)
93 die(EMSG_MMAP);
94 str->s_buffer_size = STDIO_VBUF_SIZE;
95 }
96
97 ASSERT(str->s_buffer != NULL);
98
99 if (stream_is_primed(str)) {
100 /*
101 * l_data_length is only set to -1 in the case of coincidental
102 * exhaustion of the input butter. This is thus the only case
103 * which involves no copying on a re-prime.
104 */
105 int shelf_state = shelf;
106
107 ASSERT(str->s_current.l_data_length >= -1);
108 (void) memcpy(str->s_buffer, str->s_current.l_data.sp,
109 str->s_current.l_data_length + 1);
110 str->s_current.l_data.sp = str->s_buffer;
111
112 /*
113 * If our current line is incomplete, we need to get the rest of
114 * the line--if we can't, then we've exhausted memory.
115 */
116 if ((str->s_current.l_data_length == -1 ||
117 shelf_state == SHELF_OCCUPIED ||
118 *(str->s_current.l_data.sp +
119 str->s_current.l_data_length) != '\n') &&
120 SOP_FETCH(str) == NEXT_LINE_INCOMPLETE &&
121 shelf_state == SHELF_OCCUPIED)
122 die(EMSG_MEMORY);
123
124 str->s_current.l_collate.sp = NULL;
125 str->s_current.l_collate_length = 0;
126
127 return (PRIME_SUCCEEDED);
128 }
129
130 stream_set(str, STREAM_PRIMED);
131
132 current_position = (char *)str->s_buffer;
133 end_of_buffer = (char *)str->s_buffer + str->s_buffer_size;
134
135 trip_eof(BF->s_fp);
136 if (!feof(BF->s_fp))
137 (void) fgets(current_position, end_of_buffer - current_position,
138 BF->s_fp);
139 else {
140 stream_set(str, STREAM_EOS_REACHED);
141 stream_unset(str, STREAM_PRIMED);
142 return (PRIME_FAILED_EMPTY_FILE);
143 }
144
145 str->s_current.l_data.sp = current_position;
146 /*
147 * Because one might run sort on a binary file, strlen() is no longer
148 * trustworthy--we must explicitly search for a newline.
149 */
150 if ((next_nl = memchr(current_position, '\n',
151 end_of_buffer - current_position)) == NULL) {
152 warn(WMSG_NEWLINE_ADDED, str->s_filename);
153 str->s_current.l_data_length = MIN(strlen(current_position),
154 end_of_buffer - current_position);
155 } else {
156 str->s_current.l_data_length = next_nl - current_position;
157 }
158
159 str->s_current.l_collate.sp = NULL;
160 str->s_current.l_collate_length = 0;
161
162 __S(stats_incr_fetches());
163 return (PRIME_SUCCEEDED);
164 }
165
166 /*
167 * stream_stdio_fetch() guarantees the return of a complete line, or a flag
168 * indicating that the complete line could not be read.
169 */
170 static ssize_t
stream_stdio_fetch(stream_t * str)171 stream_stdio_fetch(stream_t *str)
172 {
173 ssize_t dist_to_buf_end;
174 int ret_val;
175 char *graft_pt, *next_nl;
176
177 ASSERT(str->s_status & STREAM_OPEN);
178 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
179 ASSERT((str->s_status & STREAM_EOS_REACHED) == 0);
180
181 graft_pt = str->s_current.l_data.sp + str->s_current.l_data_length + 1;
182
183 if (shelf == SHELF_VACANT) {
184 /*
185 * The graft point is the start of the current line.
186 */
187 str->s_current.l_data.sp = graft_pt;
188 } else if (str->s_current.l_data_length > -1) {
189 /*
190 * Correct for terminating NUL on shelved line. This NUL is
191 * only present if we didn't have the coincidental case
192 * mentioned in the comment below.
193 */
194 graft_pt--;
195 }
196
197 dist_to_buf_end = str->s_buffer_size - (graft_pt -
198 (char *)str->s_buffer);
199
200 if (dist_to_buf_end <= 1) {
201 /*
202 * fgets()'s behaviour in the case of a one-character buffer is
203 * somewhat unhelpful: it fills the buffer with '\0' and
204 * returns successfully (even if EOF has been reached for the
205 * file in question). Since we may be in the middle of a
206 * grafting operation, we leave early, maintaining the shelf in
207 * its current state.
208 */
209 str->s_current.l_data_length = -1;
210 return (NEXT_LINE_INCOMPLETE);
211 }
212
213 if (fgets(graft_pt, dist_to_buf_end, str->s_type.BF.s_fp) == NULL) {
214 if (feof(str->s_type.BF.s_fp))
215 stream_set(str, STREAM_EOS_REACHED);
216 else
217 die(EMSG_READ, str->s_filename);
218 }
219
220 trip_eof(str->s_type.BF.s_fp);
221 /*
222 * Because one might run sort on a binary file, strlen() is no longer
223 * trustworthy--we must explicitly search for a newline.
224 */
225 if ((next_nl = memchr(str->s_current.l_data.sp, '\n',
226 dist_to_buf_end)) == NULL) {
227 str->s_current.l_data_length = strlen(str->s_current.l_data.sp);
228 } else {
229 str->s_current.l_data_length = next_nl -
230 str->s_current.l_data.sp;
231 }
232
233 str->s_current.l_collate_length = 0;
234
235 if (*(str->s_current.l_data.sp + str->s_current.l_data_length) !=
236 '\n') {
237 if (!feof(str->s_type.BF.s_fp)) {
238 /*
239 * We were only able to read part of the line; note that
240 * we have something on the shelf for our next fetch.
241 * If the shelf was previously occupied, and we still
242 * can't get the entire line, then we need more
243 * resources.
244 */
245 if (shelf == SHELF_OCCUPIED)
246 die(EMSG_MEMORY);
247
248 shelf = SHELF_OCCUPIED;
249 ret_val = NEXT_LINE_INCOMPLETE;
250
251 __S(stats_incr_shelves());
252 } else {
253 stream_set(str, STREAM_EOS_REACHED);
254 warn(WMSG_NEWLINE_ADDED, str->s_filename);
255 }
256 } else {
257 shelf = SHELF_VACANT;
258 ret_val = NEXT_LINE_COMPLETE;
259 __S(stats_incr_fetches());
260 }
261
262 return (ret_val);
263 }
264
265 /*
266 * stdio_fetch_overwrite() is used when we are performing an operation where we
267 * need the buffer contents only over a single period. (merge and check are
268 * operations of this kind.) In this case, we read the current line at the head
269 * of the stream's defined buffer. If we cannot read the entire line, we have
270 * not allocated sufficient memory.
271 */
272 ssize_t
stream_stdio_fetch_overwrite(stream_t * str)273 stream_stdio_fetch_overwrite(stream_t *str)
274 {
275 ssize_t dist_to_buf_end;
276
277 ASSERT(str->s_status & STREAM_OPEN);
278 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
279 ASSERT((str->s_status & STREAM_EOS_REACHED) == 0);
280
281 str->s_current.l_data.sp = str->s_buffer;
282 dist_to_buf_end = str->s_buffer_size;
283
284 if (fgets(str->s_current.l_data.sp, dist_to_buf_end,
285 str->s_type.BF.s_fp) == NULL) {
286 if (feof(str->s_type.BF.s_fp))
287 stream_set(str, STREAM_EOS_REACHED);
288 else
289 die(EMSG_READ, str->s_filename);
290 }
291
292 trip_eof(str->s_type.BF.s_fp);
293 str->s_current.l_data_length = strlen(str->s_current.l_data.sp) - 1;
294 str->s_current.l_collate_length = 0;
295
296 if (str->s_current.l_data_length == -1 ||
297 *(str->s_current.l_data.sp + str->s_current.l_data_length) !=
298 '\n') {
299 if (!feof(str->s_type.BF.s_fp)) {
300 /*
301 * In the overwrite case, failure to read the entire
302 * line means our buffer size was insufficient (as we
303 * are using all of it). Exit, requesting more
304 * resources.
305 */
306 die(EMSG_MEMORY);
307 } else {
308 stream_set(str, STREAM_EOS_REACHED);
309 warn(WMSG_NEWLINE_ADDED, str->s_filename);
310 }
311 }
312
313 __S(stats_incr_fetches());
314 return (NEXT_LINE_COMPLETE);
315 }
316
317 int
stream_stdio_is_closable(stream_t * str)318 stream_stdio_is_closable(stream_t *str)
319 {
320 if (str->s_status & STREAM_OPEN && !(str->s_status & STREAM_NOTFILE))
321 return (1);
322 return (0);
323 }
324
325 int
stream_stdio_close(stream_t * str)326 stream_stdio_close(stream_t *str)
327 {
328 ASSERT(str->s_status & STREAM_OPEN);
329
330 if (!(str->s_status & STREAM_OUTPUT)) {
331 if (!(str->s_status & STREAM_NOTFILE))
332 (void) fclose(str->s_type.BF.s_fp);
333
334 if (str->s_type.BF.s_vbuf != NULL) {
335 free(str->s_type.BF.s_vbuf);
336 str->s_type.BF.s_vbuf = NULL;
337 }
338 } else {
339 if (cxwrite(str->s_type.SF.s_fd, NULL, 0) == 0)
340 (void) close(str->s_type.SF.s_fd);
341 else
342 die(EMSG_WRITE, str->s_filename);
343 }
344
345 stream_unset(str, STREAM_OPEN | STREAM_PRIMED | STREAM_OUTPUT);
346 return (1);
347 }
348
349 static void
stream_stdio_send_eol(stream_t * str)350 stream_stdio_send_eol(stream_t *str)
351 {
352 ASSERT(str->s_status & STREAM_OPEN);
353 ASSERT(str->s_status & STREAM_OUTPUT);
354
355 if (cxwrite(str->s_type.SF.s_fd, "\n", 1) < 0)
356 die(EMSG_WRITE, str->s_filename);
357 }
358
359 void
stream_stdio_flush(stream_t * str)360 stream_stdio_flush(stream_t *str)
361 {
362 ASSERT(str->s_status & STREAM_OPEN);
363 ASSERT(str->s_status & STREAM_OUTPUT);
364
365 if (cxwrite(str->s_type.SF.s_fd, NULL, 0) < 0)
366 die(EMSG_WRITE, str->s_filename);
367 }
368
369 static void
stream_stdio_put_line(stream_t * str,line_rec_t * line)370 stream_stdio_put_line(stream_t *str, line_rec_t *line)
371 {
372 ASSERT(str->s_status & STREAM_OPEN);
373 ASSERT(str->s_status & STREAM_OUTPUT);
374
375 if (line->l_data_length >= 0) {
376 if (cxwrite(str->s_type.SF.s_fd, line->l_data.sp,
377 line->l_data_length) < 0)
378 die(EMSG_WRITE, str->s_filename);
379
380 stream_stdio_send_eol(str);
381 __S(stats_incr_puts());
382 }
383 safe_free(line->l_raw_collate.sp);
384 line->l_raw_collate.sp = NULL;
385 }
386
387 void
stream_stdio_put_line_unique(stream_t * str,line_rec_t * line)388 stream_stdio_put_line_unique(stream_t *str, line_rec_t *line)
389 {
390 static line_rec_t pvs;
391 static size_t collate_buf_len;
392
393 ASSERT(str->s_status & STREAM_OPEN);
394 ASSERT(str->s_status & STREAM_OUTPUT);
395
396 if (pvs.l_collate.sp != NULL &&
397 collated(&pvs, line, 0, COLL_UNIQUE) == 0) {
398 __S(stats_incr_not_unique());
399 return;
400 }
401
402 __S(stats_incr_put_unique());
403 stream_stdio_put_line(str, line);
404
405 if (line->l_collate_length + 1 > collate_buf_len) {
406 pvs.l_collate.sp = safe_realloc(pvs.l_collate.sp,
407 line->l_collate_length + 1);
408 collate_buf_len = line->l_collate_length + 1;
409 }
410
411 (void) memcpy(pvs.l_collate.sp, line->l_collate.sp,
412 line->l_collate_length);
413 *(pvs.l_collate.sp + line->l_collate_length) = '\0';
414 pvs.l_collate_length = line->l_collate_length;
415 }
416
417 int
stream_stdio_unlink(stream_t * str)418 stream_stdio_unlink(stream_t *str)
419 {
420 if (!(str->s_status & STREAM_NOTFILE))
421 return (unlink(str->s_filename));
422
423 return (0);
424 }
425
426 int
stream_stdio_free(stream_t * str)427 stream_stdio_free(stream_t *str)
428 {
429 /*
430 * Unmap the memory we allocated for input, if it's valid to do so.
431 */
432 if (!(str->s_status & STREAM_OPEN) ||
433 (str->s_consumer != NULL &&
434 str->s_consumer->s_status & STREAM_NOT_FREEABLE))
435 return (0);
436
437 if (str->s_buffer != NULL) {
438 if (munmap(str->s_buffer, str->s_buffer_size) < 0)
439 die(EMSG_MUNMAP, "/dev/zero");
440 else {
441 str->s_buffer = NULL;
442 str->s_buffer_size = 0;
443 }
444 }
445
446 stream_unset(str, STREAM_PRIMED | STREAM_INSTANT);
447
448 return (1);
449 }
450
451 static int
stream_stdio_eos(stream_t * str)452 stream_stdio_eos(stream_t *str)
453 {
454 int retval = 0;
455
456 ASSERT(!(str->s_status & STREAM_OUTPUT));
457 ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
458
459 if (str == NULL || str->s_status & STREAM_EOS_REACHED)
460 return (1);
461
462 trip_eof(str->s_type.BF.s_fp);
463 if (feof(str->s_type.BF.s_fp) &&
464 shelf == SHELF_VACANT &&
465 str->s_current.l_collate_length != -1) {
466 retval = 1;
467 stream_set(str, STREAM_EOS_REACHED);
468 }
469
470 return (retval);
471 }
472
473 /*ARGSUSED*/
474 static void
stream_stdio_release_line(stream_t * str)475 stream_stdio_release_line(stream_t *str)
476 {
477 }
478
479 const stream_ops_t stream_stdio_ops = {
480 stream_stdio_is_closable,
481 stream_stdio_close,
482 stream_stdio_eos,
483 stream_stdio_fetch,
484 stream_stdio_flush,
485 stream_stdio_free,
486 stream_stdio_open_for_write,
487 stream_stdio_prime,
488 stream_stdio_put_line,
489 stream_stdio_release_line,
490 stream_stdio_send_eol,
491 stream_stdio_unlink
492 };
493