1 /***********************************************************************
2 *                                                                      *
3 *               This software is part of the ast package               *
4 *          Copyright (c) 1985-2011 AT&T Intellectual Property          *
5 *                      and is licensed under the                       *
6 *                 Eclipse Public License, Version 1.0                  *
7 *                    by AT&T Intellectual Property                     *
8 *                                                                      *
9 *                A copy of the License is available at                 *
10 *          http://www.eclipse.org/org/documents/epl-v10.html           *
11 *         (with md5 checksum b35adb5213ca9657e911e9befb180842)         *
12 *                                                                      *
13 *              Information and Software Systems Research               *
14 *                            AT&T Research                             *
15 *                           Florham Park NJ                            *
16 *                                                                      *
17 *                 Glenn Fowler <gsf@research.att.com>                  *
18 *                  David Korn <dgk@research.att.com>                   *
19 *                   Phong Vo <kpv@research.att.com>                    *
20 *                                                                      *
21 ***********************************************************************/
22 #include	"sfhdr.h"
23 
24 /*	Poll a set of streams to see if any is available for I/O.
25 **	Ready streams are moved to front of array but retain the
26 **	same relative order.
27 **
28 **	Written by Kiem-Phong Vo.
29 */
30 
31 #if __STD_C
sfpoll(Sfio_t ** fa,reg int n,int tm)32 int sfpoll(Sfio_t** fa, reg int n, int tm)
33 #else
34 int sfpoll(fa, n, tm)
35 Sfio_t**	fa;	/* array of streams to poll		*/
36 reg int		n;	/* number of streams in array		*/
37 int		tm;	/* time in millisecs for select/poll	*/
38 #endif
39 {
40 	reg int		r, c, m, np, eintr;
41 	reg Sfio_t*	f;
42 	reg int		*status, *check;
43 
44 	if(n <= 0 || !fa)
45 		return -1;
46 
47 	if(!(status = (int*)malloc(2*n*sizeof(int))) )
48 		return -1;
49 	check = status+n; /* streams that need polling */
50 
51 	/* a SF_READ stream is ready if there is buffered read data */
52 #define RDREADY(f)	(((f->mode&SF_READ) && f->next < f->endb) || \
53 			 ((f->mode&SF_WRITE) && f->proc && f->proc->ndata > 0) )
54 
55 	/* a SF_WRITE stream is ready if there is no write data */
56 #define WRREADY(f)	(!(f->mode&SF_WRITE) || f->next == f->data)
57 
58 #define HASAUXFD(f)	(f->proc && f->proc->file >= 0 && f->proc->file != f->file)
59 
60 	for(r = c = eintr = 0; r < n; ++r) /* compute streams that must be checked */
61 	{	f = fa[r];
62 		status[r] = 0;
63 
64 		/* terminate poll on interrupt? */
65 		if(f->flags&SF_IOINTR)
66 			eintr++;
67 		/* check accessibility */
68 		m = f->mode&SF_RDWR;
69 		if((int)f->mode != m && _sfmode(f,m,0) < 0)
70 			continue;
71 
72 		if((f->flags&SF_READ) && RDREADY(f))
73 			status[r] |= SF_READ;
74 
75 		if((f->flags&SF_WRITE) && WRREADY(f))
76 			status[r] |= SF_WRITE;
77 
78 		if((f->flags&SF_RDWR) == status[r])
79 			continue;
80 
81 		/* has discipline, ask its opinion */
82 		if(f->disc && f->disc->exceptf)
83 		{	if((m = (*f->disc->exceptf)(f,SF_DPOLL,&tm,f->disc)) < 0)
84 				continue;
85 			else if(m > 0)
86 			{	status[r] = m&SF_RDWR;
87 				continue;
88 			}
89 		}
90 
91 		if(f->extent < 0) /* unseekable stream, must poll/select */
92 			check[c++] = r;
93 		else /* seekable streams are always ready */
94 		{	if(f->flags&SF_READ)
95 				status[r] |= SF_READ;
96 			if(f->flags&SF_WRITE)
97 				status[r] |= SF_WRITE;
98 		}
99 	}
100 	/* terminate poll on interrupt only if all streams marked SF_IOINTR */
101 	eintr = eintr == n ? -1 : EINTR;
102 
103 	np = -1;
104 #if _lib_poll
105 	if(c > 0)
106 	{	struct pollfd*	fds;
107 
108 		/* construct the poll array */
109 		for(m = 0, r = 0; r < c; ++r, ++m)
110 		{	f = fa[check[r]];
111 			if(HASAUXFD(f))
112 				m += 1;
113 		}
114 		if(!(fds = (struct pollfd*)malloc(m*sizeof(struct pollfd))) )
115 			return -1;
116 
117 		for(m = 0, r = 0; r < c; ++r, ++m)
118 		{	f = fa[check[r]];
119 
120 			fds[m].fd = f->file;
121 			fds[m].events = fds[m].revents = 0;
122 
123 			if((f->flags&SF_WRITE) && !WRREADY(f) )
124 				fds[m].events |= POLLOUT;
125 
126 			if((f->flags&SF_READ)  && !RDREADY(f) )
127 			{	/* a sfpopen situation with two file descriptors */
128 				if((f->mode&SF_WRITE) && HASAUXFD(f))
129 				{	m += 1;
130 					fds[m].fd = f->proc->file;
131 					fds[m].revents = 0;
132 				}
133 
134 				fds[m].events |= POLLIN;
135 			}
136 		}
137 
138 		while((np = SFPOLL(fds,m,tm)) < 0 )
139 		{	if(errno == eintr || errno == EAGAIN)
140 				errno = 0;
141 			else	break;
142 		}
143 		if(np > 0) /* poll succeeded */
144 			np = c;
145 
146 		for(m = 0, r = 0; r < np; ++r, ++m)
147 		{	f = fa[check[r]];
148 
149 			if((f->flags&SF_WRITE) && !WRREADY(f) )
150 			{	if(fds[m].revents&POLLOUT)
151 					status[check[r]] |= SF_WRITE;
152 			}
153 
154 			if((f->flags&SF_READ)  && !RDREADY(f))
155 			{	if((f->mode&SF_WRITE) && HASAUXFD(f))
156 					m += 1;
157 				if(fds[m].revents&POLLIN)
158 					status[check[r]] |= SF_READ;
159 			}
160 		}
161 
162 		free((Void_t*)fds);
163 	}
164 #endif /*_lib_poll*/
165 
166 #if _lib_select
167 	if(np < 0 && c > 0)
168 	{	fd_set		rd, wr;
169 		struct timeval	tmb, *tmp;
170 
171 		FD_ZERO(&rd);
172 		FD_ZERO(&wr);
173 		m = 0;
174 		for(r = 0; r < c; ++r)
175 		{	f = fa[check[r]];
176 
177 			if(f->file > m)
178 				m = f->file;
179 
180 			if((f->flags&SF_WRITE) && !WRREADY(f))
181 				FD_SET(f->file,&wr);
182 
183 			if((f->flags&SF_READ)  && !RDREADY(f))
184 			{	if((f->mode&SF_WRITE) && HASAUXFD(f))
185 				{	if(f->proc->file > m)
186 						m = f->proc->file;
187 					FD_SET(f->proc->file, &rd);
188 				}
189 				else	FD_SET(f->file,&rd);
190 			}
191 		}
192 		if(tm < 0)
193 			tmp = NIL(struct timeval*);
194 		else
195 		{	tmp = &tmb;
196 			tmb.tv_sec = tm/SECOND;
197 			tmb.tv_usec = (tm%SECOND)*SECOND;
198 		}
199 
200 		while((np = select(m+1,&rd,&wr,NIL(fd_set*),tmp)) < 0 )
201 		{	if(errno == eintr)
202 				errno = 0;
203 			else	break;
204 		}
205 		if(np > 0)
206 			np = c;
207 
208 		for(r = 0; r < np; ++r)
209 		{	f = fa[check[r]];
210 
211 			if((f->flags&SF_WRITE) && !WRREADY(f) )
212 			{	if(FD_ISSET(f->file,&wr) )
213 					status[check[r]] |= SF_WRITE;
214 			}
215 
216 			if((f->flags&SF_READ) && !RDREADY(f) )
217 			{	if((f->mode&SF_WRITE) && HASAUXFD(f) )
218 				{	if(FD_ISSET(f->proc->file, &rd) )
219 						status[check[r]] |= SF_READ;
220 				}
221 				else
222 				{	if(FD_ISSET(f->file,&rd) )
223 						status[check[r]] |= SF_READ;
224 				}
225 			}
226 		}
227 	}
228 #endif /*_lib_select*/
229 
230 	for(r = c = 0; c < n; ++c)
231 	{	if(status[c] == 0)
232 			continue;
233 
234 		f = fa[c];
235 		f->val = (ssize_t)status[c];
236 
237 		/* announce status */
238 		if(f->disc && f->disc->exceptf)
239 			(*f->disc->exceptf)(f,SF_READY,(Void_t*)(long)status[c],f->disc);
240 
241 		if(c > r) /* move to front of list */
242 		{	fa[c] = fa[r];
243 			fa[r] = f;
244 		}
245 		r += 1;
246 	}
247 
248 	free((Void_t*)status);
249 	return r ? r : np < 0 ? -1 : 0;
250 }
251