1 /***********************************************************************
2 * *
3 * This software is part of the ast package *
4 * Copyright (c) 1985-2010 AT&T Intellectual Property *
5 * and is licensed under the *
6 * Common Public License, Version 1.0 *
7 * by AT&T Intellectual Property *
8 * *
9 * A copy of the License is available at *
10 * http://www.opensource.org/licenses/cpl1.0.txt *
11 * (with md5 checksum 059e8cd6165cb4c31e351f2b69388fd9) *
12 * *
13 * Information and Software Systems Research *
14 * AT&T Research *
15 * Florham Park NJ *
16 * *
17 * Glenn Fowler <gsf@research.att.com> *
18 * David Korn <dgk@research.att.com> *
19 * Phong Vo <kpv@research.att.com> *
20 * *
21 ***********************************************************************/
22 #include "sfhdr.h"
23
24 /* Poll a set of streams to see if any is available for I/O.
25 ** Ready streams are moved to front of array but retain the
26 ** same relative order.
27 **
28 ** Written by Kiem-Phong Vo.
29 */
30
31 #if __STD_C
sfpoll(Sfio_t ** fa,reg int n,int tm)32 int sfpoll(Sfio_t** fa, reg int n, int tm)
33 #else
34 int sfpoll(fa, n, tm)
35 Sfio_t** fa; /* array of streams to poll */
36 reg int n; /* number of streams in array */
37 int tm; /* time in millisecs for select/poll */
38 #endif
39 {
40 reg int r, c, m, np;
41 reg Sfio_t* f;
42 reg int *status, *check;
43
44 if(n <= 0 || !fa)
45 return -1;
46
47 if(!(status = (int*)malloc(2*n*sizeof(int))) )
48 return -1;
49 check = status+n; /* streams that need polling */
50
51 /* a SF_READ stream is ready if there is buffered read data */
52 #define RDREADY(f) (((f->mode&SF_READ) && f->next < f->endb) || \
53 ((f->mode&SF_WRITE) && f->proc && f->proc->ndata > 0) )
54
55 /* a SF_WRITE stream is ready if there is no write data */
56 #define WRREADY(f) (!(f->mode&SF_WRITE) || f->next == f->data)
57
58 #define HASAUXFD(f) (f->proc && f->proc->file >= 0 && f->proc->file != f->file)
59
60 for(r = c = 0; r < n; ++r) /* compute streams that must be checked */
61 { f = fa[r];
62 status[r] = 0;
63
64 /* check accessibility */
65 m = f->mode&SF_RDWR;
66 if((int)f->mode != m && _sfmode(f,m,0) < 0)
67 continue;
68
69 if((f->flags&SF_READ) && RDREADY(f))
70 status[r] |= SF_READ;
71
72 if((f->flags&SF_WRITE) && WRREADY(f))
73 status[r] |= SF_WRITE;
74
75 if((f->flags&SF_RDWR) == status[r])
76 continue;
77
78 /* has discipline, ask its opinion */
79 if(f->disc && f->disc->exceptf)
80 { if((m = (*f->disc->exceptf)(f,SF_DPOLL,&tm,f->disc)) < 0)
81 continue;
82 else if(m > 0)
83 { status[r] = m&SF_RDWR;
84 continue;
85 }
86 }
87
88 if(f->extent < 0) /* unseekable stream, must poll/select */
89 check[c++] = r;
90 else /* seekable streams are always ready */
91 { if(f->flags&SF_READ)
92 status[r] |= SF_READ;
93 if(f->flags&SF_WRITE)
94 status[r] |= SF_WRITE;
95 }
96 }
97
98 np = -1;
99 #if _lib_poll
100 if(c > 0)
101 { struct pollfd* fds;
102
103 /* construct the poll array */
104 for(m = 0, r = 0; r < c; ++r, ++m)
105 { f = fa[check[r]];
106 if(HASAUXFD(f))
107 m += 1;
108 }
109 if(!(fds = (struct pollfd*)malloc(m*sizeof(struct pollfd))) )
110 return -1;
111
112 for(m = 0, r = 0; r < c; ++r, ++m)
113 { f = fa[check[r]];
114
115 fds[m].fd = f->file;
116 fds[m].events = fds[m].revents = 0;
117
118 if((f->flags&SF_WRITE) && !WRREADY(f) )
119 fds[m].events |= POLLOUT;
120
121 if((f->flags&SF_READ) && !RDREADY(f) )
122 { /* a sfpopen situation with two file descriptors */
123 if((f->mode&SF_WRITE) && HASAUXFD(f))
124 { m += 1;
125 fds[m].fd = f->proc->file;
126 fds[m].revents = 0;
127 }
128
129 fds[m].events |= POLLIN;
130 }
131 }
132
133 while((np = SFPOLL(fds,m,tm)) < 0 )
134 { if(errno == EINTR || errno == EAGAIN)
135 errno = 0;
136 else break;
137 }
138 if(np > 0) /* poll succeeded */
139 np = c;
140
141 for(m = 0, r = 0; r < np; ++r, ++m)
142 { f = fa[check[r]];
143
144 if((f->flags&SF_WRITE) && !WRREADY(f) )
145 { if(fds[m].revents&POLLOUT)
146 status[check[r]] |= SF_WRITE;
147 }
148
149 if((f->flags&SF_READ) && !RDREADY(f))
150 { if((f->mode&SF_WRITE) && HASAUXFD(f))
151 m += 1;
152 if(fds[m].revents&POLLIN)
153 status[check[r]] |= SF_READ;
154 }
155 }
156
157 free((Void_t*)fds);
158 }
159 #endif /*_lib_poll*/
160
161 #if _lib_select
162 if(np < 0 && c > 0)
163 { fd_set rd, wr;
164 struct timeval tmb, *tmp;
165
166 FD_ZERO(&rd);
167 FD_ZERO(&wr);
168 m = 0;
169 for(r = 0; r < c; ++r)
170 { f = fa[check[r]];
171
172 if(f->file > m)
173 m = f->file;
174
175 if((f->flags&SF_WRITE) && !WRREADY(f))
176 FD_SET(f->file,&wr);
177
178 if((f->flags&SF_READ) && !RDREADY(f))
179 { if((f->mode&SF_WRITE) && HASAUXFD(f))
180 { if(f->proc->file > m)
181 m = f->proc->file;
182 FD_SET(f->proc->file, &rd);
183 }
184 else FD_SET(f->file,&rd);
185 }
186 }
187 if(tm < 0)
188 tmp = NIL(struct timeval*);
189 else
190 { tmp = &tmb;
191 tmb.tv_sec = tm/SECOND;
192 tmb.tv_usec = (tm%SECOND)*SECOND;
193 }
194
195 while((np = select(m+1,&rd,&wr,NIL(fd_set*),tmp)) < 0 )
196 { if(errno == EINTR)
197 errno = 0;
198 else break;
199 }
200 if(np > 0)
201 np = c;
202
203 for(r = 0; r < np; ++r)
204 { f = fa[check[r]];
205
206 if((f->flags&SF_WRITE) && !WRREADY(f) )
207 { if(FD_ISSET(f->file,&wr) )
208 status[check[r]] |= SF_WRITE;
209 }
210
211 if((f->flags&SF_READ) && !RDREADY(f) )
212 { if((f->mode&SF_WRITE) && HASAUXFD(f) )
213 { if(FD_ISSET(f->proc->file, &rd) )
214 status[check[r]] |= SF_READ;
215 }
216 else
217 { if(FD_ISSET(f->file,&rd) )
218 status[check[r]] |= SF_READ;
219 }
220 }
221 }
222 }
223 #endif /*_lib_select*/
224
225 for(r = c = 0; c < n; ++c)
226 { if(status[c] == 0)
227 continue;
228
229 f = fa[c];
230 f->val = (ssize_t)status[c];
231
232 /* announce status */
233 if(f->disc && f->disc->exceptf)
234 (*f->disc->exceptf)(f,SF_READY,(Void_t*)(long)status[c],f->disc);
235
236 if(c > r) /* move to front of list */
237 { fa[c] = fa[r];
238 fa[r] = f;
239 }
240 r += 1;
241 }
242
243 free((Void_t*)status);
244 return r;
245 }
246