1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #include "streams_mmap.h" 28 #include "streams_common.h" 29 30 /* 31 * Single-byte character memory map-based streams implementation 32 */ 33 34 static int 35 stream_mmap_prime(stream_t *str) 36 { 37 char *nl; 38 39 if (stream_is_primed(str)) 40 return (PRIME_SUCCEEDED); 41 42 stream_set(str, STREAM_PRIMED); 43 44 if (str->s_buffer_size == 0) { 45 stream_set(str, STREAM_EOS_REACHED); 46 return (PRIME_FAILED_EMPTY_FILE); 47 } 48 49 str->s_current.l_data.sp = str->s_buffer; 50 str->s_type.SF.s_release_origin = str->s_buffer; 51 if ((nl = (char *)memchr(str->s_buffer, '\n', str->s_buffer_size)) == 52 NULL) { 53 warn(WMSG_NEWLINE_ADDED, str->s_filename); 54 str->s_current.l_data_length = str->s_buffer_size; 55 } else { 56 str->s_current.l_data_length = nl - (char *)str->s_buffer; 57 } 58 59 str->s_current.l_collate.sp = NULL; 60 str->s_current.l_collate_length = 0; 61 62 __S(stats_incr_fetches()); 63 return (PRIME_SUCCEEDED); 64 } 65 66 /* 67 * stream_mmap_fetch() sets the fields of str->s_current to delimit the next 68 * line of the field. 69 */ 70 static ssize_t 71 stream_mmap_fetch(stream_t *str) 72 { 73 ssize_t dist_to_buf_end; 74 char *next_nl; 75 76 ASSERT(stream_is_primed(str)); 77 ASSERT((str->s_status & STREAM_EOS_REACHED) == 0); 78 79 /* 80 * adding one for newline 81 */ 82 str->s_current.l_data.sp = str->s_current.l_data.sp + 83 str->s_current.l_data_length + 1; 84 85 dist_to_buf_end = str->s_buffer_size - (str->s_current.l_data.sp 86 - (char *)str->s_buffer); 87 ASSERT(dist_to_buf_end >= 0 && dist_to_buf_end <= str->s_buffer_size); 88 89 next_nl = memchr(str->s_current.l_data.sp, '\n', dist_to_buf_end); 90 91 if (next_nl) 92 str->s_current.l_data_length = next_nl 93 - str->s_current.l_data.sp; 94 else { 95 warn(WMSG_NEWLINE_ADDED, str->s_filename); 96 str->s_current.l_data_length = dist_to_buf_end; 97 } 98 99 /* 100 * adding one for newline 101 */ 102 if (str->s_current.l_data.sp + str->s_current.l_data_length + 1 >= 103 (char *)str->s_buffer + str->s_buffer_size) 104 stream_set(str, STREAM_EOS_REACHED); 105 106 str->s_current.l_collate_length = 0; 107 108 __S(stats_incr_fetches()); 109 return (NEXT_LINE_COMPLETE); 110 } 111 112 static int 113 stream_mmap_is_closable(stream_t *str) 114 { 115 if (str->s_status & STREAM_OPEN) 116 return (1); 117 return (0); 118 } 119 120 static int 121 stream_mmap_close(stream_t *str) 122 { 123 if (str->s_type.SF.s_fd > -1) { 124 (void) close(str->s_type.SF.s_fd); 125 stream_unset(str, STREAM_OPEN); 126 return (1); 127 } 128 129 return (0); 130 } 131 132 static int 133 stream_mmap_free(stream_t *str) 134 { 135 if (!(str->s_status & STREAM_OPEN) || 136 (str->s_consumer != NULL && 137 str->s_consumer->s_status & STREAM_NOT_FREEABLE)) 138 return (0); 139 140 if (str->s_buffer == NULL) 141 return (1); 142 143 if (munmap(str->s_buffer, str->s_buffer_size) < 0) 144 die(EMSG_MUNMAP, str->s_filename); 145 146 str->s_buffer = NULL; 147 str->s_buffer_size = 0; 148 149 stream_unset(str, STREAM_PRIMED); 150 151 return (1); 152 } 153 154 static int 155 stream_mmap_eos(stream_t *str) 156 { 157 int retval = 0; 158 159 if (str == NULL || str->s_status & STREAM_EOS_REACHED) 160 return (1); 161 162 /* 163 * If the file's size is known to be zero, then we are at EOS; the 164 * remaining checks are only sensible if we successfully primed this 165 * stream. The additional character is for the optional newline. 166 */ 167 if (str->s_filesize == 0 || 168 (stream_is_primed(str) && str->s_current.l_data.sp - 169 (char *)str->s_buffer + str->s_current.l_data_length + 1 >= 170 str->s_buffer_size)) { 171 retval = 1; 172 stream_set(str, STREAM_EOS_REACHED); 173 } 174 175 return (retval); 176 } 177 178 #define ALIGNED (~(ulong_t)(PAGESIZE - 1)) 179 180 /* 181 * In certain cases, we know that we will never need the data on a page again 182 * for the duration of the sort. (These cases are associated with merges 183 * involving temporary files.) We can thus release all pages previous to the 184 * page containing the current line, using the MADV_DONTNEED flag to 185 * madvise(3C). This additional memory management improves our chances of 186 * avoiding a paging situation, by evicting pages we know are of no use. 187 */ 188 static void 189 stream_mmap_release_line(stream_t *str) 190 { 191 caddr_t origin = str->s_type.SF.s_release_origin; 192 size_t release = 0; 193 194 while ((caddr_t)((ulong_t)str->s_current.l_data.sp & ALIGNED) - 195 (origin + release) >= DEFAULT_RELEASE_SIZE) 196 release += DEFAULT_RELEASE_SIZE; 197 198 if (release == 0) 199 return; 200 201 if (madvise(origin, release, MADV_DONTNEED) == -1) 202 warn(gettext("madvise failed")); 203 204 str->s_type.SF.s_release_origin += release; 205 } 206 207 const stream_ops_t stream_mmap_ops = { 208 stream_mmap_is_closable, 209 stream_mmap_close, 210 stream_mmap_eos, 211 stream_mmap_fetch, 212 NULL, 213 stream_mmap_free, 214 NULL, 215 stream_mmap_prime, 216 NULL, 217 stream_mmap_release_line, 218 NULL, 219 stream_stdio_unlink 220 }; 221