1 /*********************************************************************** 2 * * 3 * This software is part of the ast package * 4 * Copyright (c) 1985-2011 AT&T Intellectual Property * 5 * and is licensed under the * 6 * Eclipse Public License, Version 1.0 * 7 * by AT&T Intellectual Property * 8 * * 9 * A copy of the License is available at * 10 * http://www.eclipse.org/org/documents/epl-v10.html * 11 * (with md5 checksum b35adb5213ca9657e911e9befb180842) * 12 * * 13 * Information and Software Systems Research * 14 * AT&T Research * 15 * Florham Park NJ * 16 * * 17 * Glenn Fowler <gsf@research.att.com> * 18 * David Korn <dgk@research.att.com> * 19 * Phong Vo <kpv@research.att.com> * 20 * * 21 ***********************************************************************/ 22 #include "sfdchdr.h" 23 24 /* Discipline to invoke UNIX processes as data filters. 25 ** These processes must be able to fit in pipelines. 26 ** 27 ** Written by Kiem-Phong Vo, kpv@research.att.com, 03/18/1998. 28 */ 29 30 typedef struct _filter_s 31 { Sfdisc_t disc; /* discipline structure */ 32 Sfio_t* filter; /* the filter stream */ 33 char* next; /* data unwritten */ 34 char* endb; /* end of data */ 35 char raw[4096]; /* raw data buffer */ 36 } Filter_t; 37 38 /* read data from the filter */ 39 #if __STD_C 40 static ssize_t filterread(Sfio_t* f, Void_t* buf, size_t n, Sfdisc_t* disc) 41 #else 42 static ssize_t filterread(f, buf, n, disc) 43 Sfio_t* f; /* stream reading from */ 44 Void_t* buf; /* buffer to read into */ 45 size_t n; /* number of bytes requested */ 46 Sfdisc_t* disc; /* discipline */ 47 #endif 48 { 49 Filter_t* fi; 50 ssize_t r, w; 51 52 fi = (Filter_t*)disc; 53 for(;;) 54 { 55 /* get some raw data to stuff down the pipe */ 56 if(fi->next && fi->next >= fi->endb ) 57 { if((r = sfrd(f,fi->raw,sizeof(fi->raw),disc)) > 0) 58 { fi->next = fi->raw; 59 fi->endb = fi->raw+r; 60 } 61 else 62 { /* eof, close write end of pipes */ 63 sfset(fi->filter,SF_READ,0); 64 close(sffileno(fi->filter)); 65 sfset(fi->filter,SF_READ,1); 66 fi->next = fi->endb = NIL(char*); 67 } 68 } 69 70 if(fi->next && (w = fi->endb - fi->next) > 0 ) 71 { /* see if pipe is ready for write */ 72 sfset(fi->filter, SF_READ, 0); 73 r = sfpoll(&fi->filter, 1, 1); 74 sfset(fi->filter, SF_READ, 1); 75 76 if(r == 1) /* non-blocking write */ 77 { errno = 0; 78 if((w = sfwr(fi->filter, fi->next, w, 0)) > 0) 79 fi->next += w; 80 else if(errno != EAGAIN) 81 return 0; 82 } 83 } 84 85 /* see if pipe is ready for read */ 86 sfset(fi->filter, SF_WRITE, 0); 87 w = sfpoll(&fi->filter, 1, fi->next ? 1 : -1); 88 sfset(fi->filter, SF_WRITE, 1); 89 90 if(!fi->next || w == 1) /* non-blocking read */ 91 { errno = 0; 92 if((r = sfrd(fi->filter, buf, n, 0)) > 0) 93 return r; 94 if(errno != EAGAIN) 95 return 0; 96 } 97 } 98 } 99 100 #if __STD_C 101 static ssize_t filterwrite(Sfio_t* f, const Void_t* buf, size_t n, Sfdisc_t* disc) 102 #else 103 static ssize_t filterwrite(f, buf, n, disc) 104 Sfio_t* f; /* stream writing to */ 105 Void_t* buf; /* buffer to write into */ 106 size_t n; /* number of bytes requested */ 107 Sfdisc_t* disc; /* discipline */ 108 #endif 109 { 110 return -1; 111 } 112 113 /* for the duration of this discipline, the stream is unseekable */ 114 #if __STD_C 115 static Sfoff_t filterseek(Sfio_t* f, Sfoff_t addr, int offset, Sfdisc_t* disc) 116 #else 117 static Sfoff_t filterseek(f, addr, offset, disc) 118 Sfio_t* f; 119 Sfoff_t addr; 120 int offset; 121 Sfdisc_t* disc; 122 #endif 123 { f = NIL(Sfio_t*); 124 addr = 0; 125 offset = 0; 126 disc = NIL(Sfdisc_t*); 127 return (Sfoff_t)(-1); 128 } 129 130 /* on close, remove the discipline */ 131 #if __STD_C 132 static int filterexcept(Sfio_t* f, int type, Void_t* data, Sfdisc_t* disc) 133 #else 134 static int filterexcept(f,type,data,disc) 135 Sfio_t* f; 136 int type; 137 Void_t* data; 138 Sfdisc_t* disc; 139 #endif 140 { 141 if(type == SF_FINAL || type == SF_DPOP) 142 { sfclose(((Filter_t*)disc)->filter); 143 free(disc); 144 } 145 146 return 0; 147 } 148 149 #if __STD_C 150 int sfdcfilter(Sfio_t* f, const char* cmd) 151 #else 152 int sfdcfilter(f, cmd) 153 Sfio_t* f; /* stream to filter data */ 154 char* cmd; /* program to run as a filter */ 155 #endif 156 { 157 reg Filter_t* fi; 158 reg Sfio_t* filter; 159 160 /* open filter for read&write */ 161 if(!(filter = sfpopen(NIL(Sfio_t*),cmd,"r+")) ) 162 return -1; 163 164 /* unbuffered stream */ 165 sfsetbuf(filter,NIL(Void_t*),0); 166 167 if(!(fi = (Filter_t*)malloc(sizeof(Filter_t))) ) 168 { sfclose(filter); 169 return -1; 170 } 171 172 fi->disc.readf = filterread; 173 fi->disc.writef = filterwrite; 174 fi->disc.seekf = filterseek; 175 fi->disc.exceptf = filterexcept; 176 fi->filter = filter; 177 fi->next = fi->endb = fi->raw; 178 179 if(sfdisc(f,(Sfdisc_t*)fi) != (Sfdisc_t*)fi) 180 { sfclose(filter); 181 free(fi); 182 return -1; 183 } 184 185 return 0; 186 } 187