1 /*********************************************************************** 2 * * 3 * This software is part of the ast package * 4 * Copyright (c) 1985-2008 AT&T Intellectual Property * 5 * and is licensed under the * 6 * Common Public License, Version 1.0 * 7 * by AT&T Intellectual Property * 8 * * 9 * A copy of the License is available at * 10 * http://www.opensource.org/licenses/cpl1.0.txt * 11 * (with md5 checksum 059e8cd6165cb4c31e351f2b69388fd9) * 12 * * 13 * Information and Software Systems Research * 14 * AT&T Research * 15 * Florham Park NJ * 16 * * 17 * Glenn Fowler <gsf@research.att.com> * 18 * David Korn <dgk@research.att.com> * 19 * Phong Vo <kpv@research.att.com> * 20 * * 21 ***********************************************************************/ 22 #include "sfhdr.h" 23 24 /* Poll a set of streams to see if any is available for I/O. 25 ** Ready streams are moved to front of array but retain the 26 ** same relative order. 27 ** 28 ** Written by Kiem-Phong Vo. 29 */ 30 31 #if __STD_C 32 int sfpoll(Sfio_t** fa, reg int n, int tm) 33 #else 34 int sfpoll(fa, n, tm) 35 Sfio_t** fa; /* array of streams to poll */ 36 reg int n; /* number of streams in array */ 37 int tm; /* time in millisecs for select/poll */ 38 #endif 39 { 40 reg int r, c, m, np; 41 reg Sfio_t* f; 42 reg int *status, *check; 43 44 if(n <= 0 || !fa) 45 return -1; 46 47 if(!(status = (int*)malloc(2*n*sizeof(int))) ) 48 return -1; 49 check = status+n; /* streams that need polling */ 50 51 /* a SF_READ stream is ready if there is buffered read data */ 52 #define RDREADY(f) (((f->mode&SF_READ) && f->next < f->endb) || \ 53 ((f->mode&SF_WRITE) && f->proc && f->proc->ndata > 0) ) 54 55 /* a SF_WRITE stream is ready if there is no write data */ 56 #define WRREADY(f) (!(f->mode&SF_WRITE) || f->next == f->data) 57 58 #define HASAUXFD(f) (f->proc && f->proc->file >= 0 && f->proc->file != f->file) 59 60 for(r = c = 0; r < n; ++r) /* compute streams that must be checked */ 61 { f = fa[r]; 62 status[r] = 0; 63 64 /* check accessibility */ 65 m = f->mode&SF_RDWR; 66 if((int)f->mode != m && _sfmode(f,m,0) < 0) 67 continue; 68 69 if((f->flags&SF_READ) && RDREADY(f)) 70 status[r] |= SF_READ; 71 72 if((f->flags&SF_WRITE) && WRREADY(f)) 73 status[r] |= SF_WRITE; 74 75 if((f->flags&SF_RDWR) == status[r]) 76 continue; 77 78 /* has discipline, ask its opinion */ 79 if(f->disc && f->disc->exceptf) 80 { if((m = (*f->disc->exceptf)(f,SF_DPOLL,&tm,f->disc)) < 0) 81 continue; 82 else if(m > 0) 83 { status[r] = m&SF_RDWR; 84 continue; 85 } 86 } 87 88 if(f->extent < 0) /* unseekable stream, must poll/select */ 89 check[c++] = r; 90 else /* seekable streams are always ready */ 91 { if(f->flags&SF_READ) 92 status[r] |= SF_READ; 93 if(f->flags&SF_WRITE) 94 status[r] |= SF_WRITE; 95 } 96 } 97 98 np = -1; 99 #if _lib_poll 100 if(c > 0) 101 { struct pollfd* fds; 102 103 /* construct the poll array */ 104 for(m = 0, r = 0; r < c; ++r, ++m) 105 { f = fa[check[r]]; 106 if(HASAUXFD(f)) 107 m += 1; 108 } 109 if(!(fds = (struct pollfd*)malloc(m*sizeof(struct pollfd))) ) 110 return -1; 111 112 for(m = 0, r = 0; r < c; ++r, ++m) 113 { f = fa[check[r]]; 114 115 fds[m].fd = f->file; 116 fds[m].events = fds[m].revents = 0; 117 118 if((f->flags&SF_WRITE) && !WRREADY(f) ) 119 fds[m].events |= POLLOUT; 120 121 if((f->flags&SF_READ) && !RDREADY(f) ) 122 { /* a sfpopen situation with two file descriptors */ 123 if((f->mode&SF_WRITE) && HASAUXFD(f)) 124 { m += 1; 125 fds[m].fd = f->proc->file; 126 fds[m].revents = 0; 127 } 128 129 fds[m].events |= POLLIN; 130 } 131 } 132 133 while((np = SFPOLL(fds,m,tm)) < 0 ) 134 { if(errno == EINTR || errno == EAGAIN) 135 errno = 0; 136 else break; 137 } 138 if(np > 0) /* poll succeeded */ 139 np = c; 140 141 for(m = 0, r = 0; r < np; ++r, ++m) 142 { f = fa[check[r]]; 143 144 if((f->flags&SF_WRITE) && !WRREADY(f) ) 145 { if(fds[m].revents&POLLOUT) 146 status[check[r]] |= SF_WRITE; 147 } 148 149 if((f->flags&SF_READ) && !RDREADY(f)) 150 { if((f->mode&SF_WRITE) && HASAUXFD(f)) 151 m += 1; 152 if(fds[m].revents&POLLIN) 153 status[check[r]] |= SF_READ; 154 } 155 } 156 157 free((Void_t*)fds); 158 } 159 #endif /*_lib_poll*/ 160 161 #if _lib_select 162 if(np < 0 && c > 0) 163 { fd_set rd, wr; 164 struct timeval tmb, *tmp; 165 166 FD_ZERO(&rd); 167 FD_ZERO(&wr); 168 m = 0; 169 for(r = 0; r < c; ++r) 170 { f = fa[check[r]]; 171 172 if(f->file > m) 173 m = f->file; 174 175 if((f->flags&SF_WRITE) && !WRREADY(f)) 176 FD_SET(f->file,&wr); 177 178 if((f->flags&SF_READ) && !RDREADY(f)) 179 { if((f->mode&SF_WRITE) && HASAUXFD(f)) 180 { if(f->proc->file > m) 181 m = f->proc->file; 182 FD_SET(f->proc->file, &rd); 183 } 184 else FD_SET(f->file,&rd); 185 } 186 } 187 if(tm < 0) 188 tmp = NIL(struct timeval*); 189 else 190 { tmp = &tmb; 191 tmb.tv_sec = tm/SECOND; 192 tmb.tv_usec = (tm%SECOND)*SECOND; 193 } 194 195 while((np = select(m+1,&rd,&wr,NIL(fd_set*),tmp)) < 0 ) 196 { if(errno == EINTR) 197 errno = 0; 198 else break; 199 } 200 if(np > 0) 201 np = c; 202 203 for(r = 0; r < np; ++r) 204 { f = fa[check[r]]; 205 206 if((f->flags&SF_WRITE) && !WRREADY(f) ) 207 { if(FD_ISSET(f->file,&wr) ) 208 status[check[r]] |= SF_WRITE; 209 } 210 211 if((f->flags&SF_READ) && !RDREADY(f) ) 212 { if((f->mode&SF_WRITE) && HASAUXFD(f) ) 213 { if(FD_ISSET(f->proc->file, &rd) ) 214 status[check[r]] |= SF_READ; 215 } 216 else 217 { if(FD_ISSET(f->file,&rd) ) 218 status[check[r]] |= SF_READ; 219 } 220 } 221 } 222 } 223 #endif /*_lib_select*/ 224 225 for(r = c = 0; c < n; ++c) 226 { if(status[c] == 0) 227 continue; 228 229 f = fa[c]; 230 f->val = (ssize_t)status[c]; 231 232 /* announce status */ 233 if(f->disc && f->disc->exceptf) 234 (*f->disc->exceptf)(f,SF_READY,(Void_t*)(long)status[c],f->disc); 235 236 if(c > r) /* move to front of list */ 237 { fa[c] = fa[r]; 238 fa[r] = f; 239 } 240 r += 1; 241 } 242 243 free((Void_t*)status); 244 return r; 245 } 246