1 /***********************************************************************
2 * *
3 * This software is part of the ast package *
4 * Copyright (c) 1985-2010 AT&T Intellectual Property *
5 * and is licensed under the *
6 * Common Public License, Version 1.0 *
7 * by AT&T Intellectual Property *
8 * *
9 * A copy of the License is available at *
10 * http://www.opensource.org/licenses/cpl1.0.txt *
11 * (with md5 checksum 059e8cd6165cb4c31e351f2b69388fd9) *
12 * *
13 * Information and Software Systems Research *
14 * AT&T Research *
15 * Florham Park NJ *
16 * *
17 * Glenn Fowler <gsf@research.att.com> *
18 * David Korn <dgk@research.att.com> *
19 * Phong Vo <kpv@research.att.com> *
20 * *
21 ***********************************************************************/
22 #include "sfdchdr.h"
23
24 /* Discipline to invoke UNIX processes as data filters.
25 ** These processes must be able to fit in pipelines.
26 **
27 ** Written by Kiem-Phong Vo, kpv@research.att.com, 03/18/1998.
28 */
29
30 typedef struct _filter_s
31 { Sfdisc_t disc; /* discipline structure */
32 Sfio_t* filter; /* the filter stream */
33 char* next; /* data unwritten */
34 char* endb; /* end of data */
35 char raw[4096]; /* raw data buffer */
36 } Filter_t;
37
38 /* read data from the filter */
39 #if __STD_C
filterread(Sfio_t * f,Void_t * buf,size_t n,Sfdisc_t * disc)40 static ssize_t filterread(Sfio_t* f, Void_t* buf, size_t n, Sfdisc_t* disc)
41 #else
42 static ssize_t filterread(f, buf, n, disc)
43 Sfio_t* f; /* stream reading from */
44 Void_t* buf; /* buffer to read into */
45 size_t n; /* number of bytes requested */
46 Sfdisc_t* disc; /* discipline */
47 #endif
48 {
49 Filter_t* fi;
50 ssize_t r, w;
51
52 fi = (Filter_t*)disc;
53 for(;;)
54 {
55 /* get some raw data to stuff down the pipe */
56 if(fi->next && fi->next >= fi->endb )
57 { if((r = sfrd(f,fi->raw,sizeof(fi->raw),disc)) > 0)
58 { fi->next = fi->raw;
59 fi->endb = fi->raw+r;
60 }
61 else
62 { /* eof, close write end of pipes */
63 sfset(fi->filter,SF_READ,0);
64 close(sffileno(fi->filter));
65 sfset(fi->filter,SF_READ,1);
66 fi->next = fi->endb = NIL(char*);
67 }
68 }
69
70 if(fi->next && (w = fi->endb - fi->next) > 0 )
71 { /* see if pipe is ready for write */
72 sfset(fi->filter, SF_READ, 0);
73 r = sfpoll(&fi->filter, 1, 1);
74 sfset(fi->filter, SF_READ, 1);
75
76 if(r == 1) /* non-blocking write */
77 { errno = 0;
78 if((w = sfwr(fi->filter, fi->next, w, 0)) > 0)
79 fi->next += w;
80 else if(errno != EAGAIN)
81 return 0;
82 }
83 }
84
85 /* see if pipe is ready for read */
86 sfset(fi->filter, SF_WRITE, 0);
87 w = sfpoll(&fi->filter, 1, fi->next ? 1 : -1);
88 sfset(fi->filter, SF_WRITE, 1);
89
90 if(!fi->next || w == 1) /* non-blocking read */
91 { errno = 0;
92 if((r = sfrd(fi->filter, buf, n, 0)) > 0)
93 return r;
94 if(errno != EAGAIN)
95 return 0;
96 }
97 }
98 }
99
100 #if __STD_C
filterwrite(Sfio_t * f,const Void_t * buf,size_t n,Sfdisc_t * disc)101 static ssize_t filterwrite(Sfio_t* f, const Void_t* buf, size_t n, Sfdisc_t* disc)
102 #else
103 static ssize_t filterwrite(f, buf, n, disc)
104 Sfio_t* f; /* stream writing to */
105 Void_t* buf; /* buffer to write into */
106 size_t n; /* number of bytes requested */
107 Sfdisc_t* disc; /* discipline */
108 #endif
109 {
110 return -1;
111 }
112
113 /* for the duration of this discipline, the stream is unseekable */
114 #if __STD_C
filterseek(Sfio_t * f,Sfoff_t addr,int offset,Sfdisc_t * disc)115 static Sfoff_t filterseek(Sfio_t* f, Sfoff_t addr, int offset, Sfdisc_t* disc)
116 #else
117 static Sfoff_t filterseek(f, addr, offset, disc)
118 Sfio_t* f;
119 Sfoff_t addr;
120 int offset;
121 Sfdisc_t* disc;
122 #endif
123 { f = NIL(Sfio_t*);
124 addr = 0;
125 offset = 0;
126 disc = NIL(Sfdisc_t*);
127 return (Sfoff_t)(-1);
128 }
129
130 /* on close, remove the discipline */
131 #if __STD_C
filterexcept(Sfio_t * f,int type,Void_t * data,Sfdisc_t * disc)132 static int filterexcept(Sfio_t* f, int type, Void_t* data, Sfdisc_t* disc)
133 #else
134 static int filterexcept(f,type,data,disc)
135 Sfio_t* f;
136 int type;
137 Void_t* data;
138 Sfdisc_t* disc;
139 #endif
140 {
141 if(type == SF_FINAL || type == SF_DPOP)
142 { sfclose(((Filter_t*)disc)->filter);
143 free(disc);
144 }
145
146 return 0;
147 }
148
149 #if __STD_C
sfdcfilter(Sfio_t * f,const char * cmd)150 int sfdcfilter(Sfio_t* f, const char* cmd)
151 #else
152 int sfdcfilter(f, cmd)
153 Sfio_t* f; /* stream to filter data */
154 char* cmd; /* program to run as a filter */
155 #endif
156 {
157 reg Filter_t* fi;
158 reg Sfio_t* filter;
159
160 /* open filter for read&write */
161 if(!(filter = sfpopen(NIL(Sfio_t*),cmd,"r+")) )
162 return -1;
163
164 /* unbuffered stream */
165 sfsetbuf(filter,NIL(Void_t*),0);
166
167 if(!(fi = (Filter_t*)malloc(sizeof(Filter_t))) )
168 { sfclose(filter);
169 return -1;
170 }
171
172 fi->disc.readf = filterread;
173 fi->disc.writef = filterwrite;
174 fi->disc.seekf = filterseek;
175 fi->disc.exceptf = filterexcept;
176 fi->filter = filter;
177 fi->next = fi->endb = fi->raw;
178
179 if(sfdisc(f,(Sfdisc_t*)fi) != (Sfdisc_t*)fi)
180 { sfclose(filter);
181 free(fi);
182 return -1;
183 }
184
185 return 0;
186 }
187