1/***********************************************************************
2*                                                                      *
3*               This software is part of the ast package               *
4*          Copyright (c) 1985-2010 AT&T Intellectual Property          *
5*                      and is licensed under the                       *
6*                  Common Public License, Version 1.0                  *
7*                    by AT&T Intellectual Property                     *
8*                                                                      *
9*                A copy of the License is available at                 *
10*            http://www.opensource.org/licenses/cpl1.0.txt             *
11*         (with md5 checksum 059e8cd6165cb4c31e351f2b69388fd9)         *
12*                                                                      *
13*              Information and Software Systems Research               *
14*                            AT&T Research                             *
15*                           Florham Park NJ                            *
16*                                                                      *
17*                 Glenn Fowler <gsf@research.att.com>                  *
18*                  David Korn <dgk@research.att.com>                   *
19*                   Phong Vo <kpv@research.att.com>                    *
20*                                                                      *
21***********************************************************************/
22#include	"sfdchdr.h"
23
24/*	Discipline to invoke UNIX processes as data filters.
25**	These processes must be able to fit in pipelines.
26**
27**	Written by Kiem-Phong Vo, kpv@research.att.com, 03/18/1998.
28*/
29
30typedef struct _filter_s
31{	Sfdisc_t	disc;		/* discipline structure	*/
32	Sfio_t*		filter;		/* the filter stream	*/
33	char*		next;		/* data unwritten 	*/
34	char*		endb;		/* end of data		*/
35	char		raw[4096];	/* raw data buffer	*/
36} Filter_t;
37
38/* read data from the filter */
39#if __STD_C
40static ssize_t filterread(Sfio_t* f, Void_t* buf, size_t n, Sfdisc_t* disc)
41#else
42static ssize_t filterread(f, buf, n, disc)
43Sfio_t*		f;	/* stream reading from */
44Void_t*		buf;	/* buffer to read into */
45size_t		n;	/* number of bytes requested */
46Sfdisc_t*	disc;	/* discipline */
47#endif
48{
49	Filter_t*	fi;
50	ssize_t		r, w;
51
52	fi = (Filter_t*)disc;
53	for(;;)
54	{
55		/* get some raw data to stuff down the pipe */
56		if(fi->next && fi->next >= fi->endb )
57		{	if((r = sfrd(f,fi->raw,sizeof(fi->raw),disc)) > 0)
58			{	fi->next = fi->raw;
59				fi->endb = fi->raw+r;
60			}
61			else
62			{	/* eof, close write end of pipes */
63				sfset(fi->filter,SF_READ,0);
64				close(sffileno(fi->filter));
65				sfset(fi->filter,SF_READ,1);
66				fi->next = fi->endb = NIL(char*);
67			}
68		}
69
70		if(fi->next && (w = fi->endb - fi->next) > 0 )
71		{	/* see if pipe is ready for write */
72			sfset(fi->filter, SF_READ, 0);
73			r = sfpoll(&fi->filter, 1, 1);
74			sfset(fi->filter, SF_READ, 1);
75
76			if(r == 1) /* non-blocking write */
77			{	errno = 0;
78				if((w = sfwr(fi->filter, fi->next, w, 0)) > 0)
79					fi->next += w;
80				else if(errno != EAGAIN)
81					return 0;
82			}
83		}
84
85		/* see if pipe is ready for read */
86		sfset(fi->filter, SF_WRITE, 0);
87		w = sfpoll(&fi->filter, 1, fi->next ? 1 : -1);
88		sfset(fi->filter, SF_WRITE, 1);
89
90		if(!fi->next || w == 1) /* non-blocking read */
91		{	errno = 0;
92			if((r = sfrd(fi->filter, buf, n, 0)) > 0)
93				return r;
94			if(errno != EAGAIN)
95				return 0;
96		}
97	}
98}
99
100#if __STD_C
101static ssize_t filterwrite(Sfio_t* f, const Void_t* buf, size_t n, Sfdisc_t* disc)
102#else
103static ssize_t filterwrite(f, buf, n, disc)
104Sfio_t*		f;	/* stream writing to */
105Void_t*		buf;	/* buffer to write into */
106size_t		n;	/* number of bytes requested */
107Sfdisc_t*	disc;	/* discipline */
108#endif
109{
110	return -1;
111}
112
113/* for the duration of this discipline, the stream is unseekable */
114#if __STD_C
115static Sfoff_t filterseek(Sfio_t* f, Sfoff_t addr, int offset, Sfdisc_t* disc)
116#else
117static Sfoff_t filterseek(f, addr, offset, disc)
118Sfio_t*		f;
119Sfoff_t		addr;
120int		offset;
121Sfdisc_t*	disc;
122#endif
123{	f = NIL(Sfio_t*);
124	addr = 0;
125	offset = 0;
126	disc = NIL(Sfdisc_t*);
127	return (Sfoff_t)(-1);
128}
129
130/* on close, remove the discipline */
131#if __STD_C
132static int filterexcept(Sfio_t* f, int type, Void_t* data, Sfdisc_t* disc)
133#else
134static int filterexcept(f,type,data,disc)
135Sfio_t*		f;
136int		type;
137Void_t*		data;
138Sfdisc_t*	disc;
139#endif
140{
141	if(type == SF_FINAL || type == SF_DPOP)
142	{	sfclose(((Filter_t*)disc)->filter);
143		free(disc);
144	}
145
146	return 0;
147}
148
149#if __STD_C
150int sfdcfilter(Sfio_t* f, const char* cmd)
151#else
152int sfdcfilter(f, cmd)
153Sfio_t*	f;	/* stream to filter data	*/
154char*	cmd;	/* program to run as a filter	*/
155#endif
156{
157	reg Filter_t*	fi;
158	reg Sfio_t*	filter;
159
160	/* open filter for read&write */
161	if(!(filter = sfpopen(NIL(Sfio_t*),cmd,"r+")) )
162		return -1;
163
164	/* unbuffered stream */
165	sfsetbuf(filter,NIL(Void_t*),0);
166
167	if(!(fi = (Filter_t*)malloc(sizeof(Filter_t))) )
168	{	sfclose(filter);
169		return -1;
170	}
171
172	fi->disc.readf = filterread;
173	fi->disc.writef = filterwrite;
174	fi->disc.seekf = filterseek;
175	fi->disc.exceptf = filterexcept;
176	fi->filter = filter;
177	fi->next = fi->endb = fi->raw;
178
179	if(sfdisc(f,(Sfdisc_t*)fi) != (Sfdisc_t*)fi)
180	{	sfclose(filter);
181		free(fi);
182		return -1;
183	}
184
185	return 0;
186}
187