xref: /illumos-gate/usr/src/cmd/sort/streams_stdio.c (revision 101e15b5)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #include "streams_stdio.h"
28 #include "streams_common.h"
29 
30 #define	SHELF_OCCUPIED	1
31 #define	SHELF_VACANT	0
32 static int shelf = SHELF_VACANT;
33 
34 /*
35  * Single-byte character file i/o-based streams implementation
36  *
37  *   The routines in this file contain the implementation of the i/o streams
38  *   interface for those situations where the input is via stdio.
39  *
40  * The "shelf"
41  *   In the case where the input buffer contains insufficient room to hold the
42  *   entire line, the fractional line is shelved, and will be grafted to on the
43  *   subsequent read.
44  */
45 int
stream_stdio_open_for_write(stream_t * str)46 stream_stdio_open_for_write(stream_t *str)
47 {
48 	stream_simple_file_t	*SF = &(str->s_type.SF);
49 
50 	ASSERT(!(str->s_status & STREAM_OPEN));
51 	ASSERT(!(str->s_status & STREAM_OUTPUT));
52 
53 	if (str->s_status & STREAM_NOTFILE)
54 		SF->s_fd = fileno(stdout);
55 	else
56 		if ((SF->s_fd = open(str->s_filename, O_CREAT | O_TRUNC |
57 		    O_WRONLY, OUTPUT_MODE)) < 0) {
58 			if (errno == EMFILE || errno == ENFILE)
59 				return (-1);
60 			else
61 				die(EMSG_OPEN, str->s_filename);
62 		}
63 
64 	stream_set(str, STREAM_OPEN | STREAM_OUTPUT);
65 
66 	return (1);
67 }
68 
69 /*
70  * In the case of an instantaneous stream, we allocate a small buffer (64k) here
71  * for the stream; otherwise, the s_buffer and s_buffer_size members should have
72  * been set by stream_set_size() prior to calling stream_prime().
73  *
74  * Repriming (priming an already primed stream) is done when we are reentering a
75  * file after having sorted a previous portion of the file.
76  */
77 static int
stream_stdio_prime(stream_t * str)78 stream_stdio_prime(stream_t *str)
79 {
80 	stream_buffered_file_t *BF = &(str->s_type.BF);
81 	char *current_position;
82 	char *end_of_buffer;
83 	char *next_nl;
84 
85 	ASSERT(!(str->s_status & STREAM_OUTPUT));
86 	ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
87 	ASSERT(str->s_status & STREAM_OPEN);
88 
89 	if (str->s_status & STREAM_INSTANT && (str->s_buffer == NULL)) {
90 		str->s_buffer = xzmap(0, STDIO_VBUF_SIZE, PROT_READ |
91 		    PROT_WRITE, MAP_PRIVATE, 0);
92 		if (str->s_buffer == MAP_FAILED)
93 			die(EMSG_MMAP);
94 		str->s_buffer_size = STDIO_VBUF_SIZE;
95 	}
96 
97 	ASSERT(str->s_buffer != NULL);
98 
99 	if (stream_is_primed(str)) {
100 		/*
101 		 * l_data_length is only set to -1 in the case of coincidental
102 		 * exhaustion of the input butter.  This is thus the only case
103 		 * which involves no copying on a re-prime.
104 		 */
105 		int shelf_state = shelf;
106 
107 		ASSERT(str->s_current.l_data_length >= -1);
108 		(void) memcpy(str->s_buffer, str->s_current.l_data.sp,
109 		    str->s_current.l_data_length + 1);
110 		str->s_current.l_data.sp = str->s_buffer;
111 
112 		/*
113 		 * If our current line is incomplete, we need to get the rest of
114 		 * the line--if we can't, then we've exhausted memory.
115 		 */
116 		if ((str->s_current.l_data_length == -1 ||
117 		    shelf_state == SHELF_OCCUPIED ||
118 		    *(str->s_current.l_data.sp +
119 		    str->s_current.l_data_length) != '\n') &&
120 		    SOP_FETCH(str) == NEXT_LINE_INCOMPLETE &&
121 		    shelf_state == SHELF_OCCUPIED)
122 			die(EMSG_MEMORY);
123 
124 		str->s_current.l_collate.sp = NULL;
125 		str->s_current.l_collate_length = 0;
126 
127 		return (PRIME_SUCCEEDED);
128 	}
129 
130 	stream_set(str, STREAM_PRIMED);
131 
132 	current_position = (char *)str->s_buffer;
133 	end_of_buffer = (char *)str->s_buffer + str->s_buffer_size;
134 
135 	trip_eof(BF->s_fp);
136 	if (!feof(BF->s_fp))
137 		(void) fgets(current_position, end_of_buffer - current_position,
138 		    BF->s_fp);
139 	else {
140 		stream_set(str, STREAM_EOS_REACHED);
141 		stream_unset(str, STREAM_PRIMED);
142 		return (PRIME_FAILED_EMPTY_FILE);
143 	}
144 
145 	str->s_current.l_data.sp = current_position;
146 	/*
147 	 * Because one might run sort on a binary file, strlen() is no longer
148 	 * trustworthy--we must explicitly search for a newline.
149 	 */
150 	if ((next_nl = memchr(current_position, '\n',
151 	    end_of_buffer - current_position)) == NULL) {
152 		warn(WMSG_NEWLINE_ADDED, str->s_filename);
153 		str->s_current.l_data_length = MIN(strlen(current_position),
154 		    end_of_buffer - current_position);
155 	} else {
156 		str->s_current.l_data_length = next_nl - current_position;
157 	}
158 
159 	str->s_current.l_collate.sp = NULL;
160 	str->s_current.l_collate_length = 0;
161 
162 	__S(stats_incr_fetches());
163 	return (PRIME_SUCCEEDED);
164 }
165 
166 /*
167  * stream_stdio_fetch() guarantees the return of a complete line, or a flag
168  * indicating that the complete line could not be read.
169  */
170 static ssize_t
stream_stdio_fetch(stream_t * str)171 stream_stdio_fetch(stream_t *str)
172 {
173 	ssize_t	dist_to_buf_end;
174 	int ret_val;
175 	char *graft_pt, *next_nl;
176 
177 	ASSERT(str->s_status & STREAM_OPEN);
178 	ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
179 	ASSERT((str->s_status & STREAM_EOS_REACHED) == 0);
180 
181 	graft_pt = str->s_current.l_data.sp + str->s_current.l_data_length + 1;
182 
183 	if (shelf == SHELF_VACANT) {
184 		/*
185 		 * The graft point is the start of the current line.
186 		 */
187 		str->s_current.l_data.sp = graft_pt;
188 	} else if (str->s_current.l_data_length > -1) {
189 		/*
190 		 * Correct for terminating NUL on shelved line.  This NUL is
191 		 * only present if we didn't have the coincidental case
192 		 * mentioned in the comment below.
193 		 */
194 		graft_pt--;
195 	}
196 
197 	dist_to_buf_end = str->s_buffer_size - (graft_pt -
198 	    (char *)str->s_buffer);
199 
200 	if (dist_to_buf_end <= 1) {
201 		/*
202 		 * fgets()'s behaviour in the case of a one-character buffer is
203 		 * somewhat unhelpful:  it fills the buffer with '\0' and
204 		 * returns successfully (even if EOF has been reached for the
205 		 * file in question).  Since we may be in the middle of a
206 		 * grafting operation, we leave early, maintaining the shelf in
207 		 * its current state.
208 		 */
209 		str->s_current.l_data_length = -1;
210 		return (NEXT_LINE_INCOMPLETE);
211 	}
212 
213 	if (fgets(graft_pt, dist_to_buf_end, str->s_type.BF.s_fp) == NULL) {
214 		if (feof(str->s_type.BF.s_fp))
215 			stream_set(str, STREAM_EOS_REACHED);
216 		else
217 			die(EMSG_READ, str->s_filename);
218 	}
219 
220 	trip_eof(str->s_type.BF.s_fp);
221 	/*
222 	 * Because one might run sort on a binary file, strlen() is no longer
223 	 * trustworthy--we must explicitly search for a newline.
224 	 */
225 	if ((next_nl = memchr(str->s_current.l_data.sp, '\n',
226 	    dist_to_buf_end)) == NULL) {
227 		str->s_current.l_data_length = strlen(str->s_current.l_data.sp);
228 	} else {
229 		str->s_current.l_data_length = next_nl -
230 		    str->s_current.l_data.sp;
231 	}
232 
233 	str->s_current.l_collate_length = 0;
234 
235 	if (*(str->s_current.l_data.sp + str->s_current.l_data_length) !=
236 	    '\n') {
237 		if (!feof(str->s_type.BF.s_fp)) {
238 			/*
239 			 * We were only able to read part of the line; note that
240 			 * we have something on the shelf for our next fetch.
241 			 * If the shelf was previously occupied, and we still
242 			 * can't get the entire line, then we need more
243 			 * resources.
244 			 */
245 			if (shelf == SHELF_OCCUPIED)
246 				die(EMSG_MEMORY);
247 
248 			shelf = SHELF_OCCUPIED;
249 			ret_val = NEXT_LINE_INCOMPLETE;
250 
251 			__S(stats_incr_shelves());
252 		} else {
253 			stream_set(str, STREAM_EOS_REACHED);
254 			warn(WMSG_NEWLINE_ADDED, str->s_filename);
255 		}
256 	} else {
257 		shelf = SHELF_VACANT;
258 		ret_val = NEXT_LINE_COMPLETE;
259 		__S(stats_incr_fetches());
260 	}
261 
262 	return (ret_val);
263 }
264 
265 /*
266  * stdio_fetch_overwrite() is used when we are performing an operation where we
267  * need the buffer contents only over a single period.  (merge and check are
268  * operations of this kind.)  In this case, we read the current line at the head
269  * of the stream's defined buffer.  If we cannot read the entire line, we have
270  * not allocated sufficient memory.
271  */
272 ssize_t
stream_stdio_fetch_overwrite(stream_t * str)273 stream_stdio_fetch_overwrite(stream_t *str)
274 {
275 	ssize_t	dist_to_buf_end;
276 
277 	ASSERT(str->s_status & STREAM_OPEN);
278 	ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
279 	ASSERT((str->s_status & STREAM_EOS_REACHED) == 0);
280 
281 	str->s_current.l_data.sp = str->s_buffer;
282 	dist_to_buf_end = str->s_buffer_size;
283 
284 	if (fgets(str->s_current.l_data.sp, dist_to_buf_end,
285 	    str->s_type.BF.s_fp) == NULL) {
286 		if (feof(str->s_type.BF.s_fp))
287 			stream_set(str, STREAM_EOS_REACHED);
288 		else
289 			die(EMSG_READ, str->s_filename);
290 	}
291 
292 	trip_eof(str->s_type.BF.s_fp);
293 	str->s_current.l_data_length = strlen(str->s_current.l_data.sp) - 1;
294 	str->s_current.l_collate_length = 0;
295 
296 	if (str->s_current.l_data_length == -1 ||
297 	    *(str->s_current.l_data.sp + str->s_current.l_data_length) !=
298 	    '\n') {
299 		if (!feof(str->s_type.BF.s_fp)) {
300 			/*
301 			 * In the overwrite case, failure to read the entire
302 			 * line means our buffer size was insufficient (as we
303 			 * are using all of it).  Exit, requesting more
304 			 * resources.
305 			 */
306 			die(EMSG_MEMORY);
307 		} else {
308 			stream_set(str, STREAM_EOS_REACHED);
309 			warn(WMSG_NEWLINE_ADDED, str->s_filename);
310 		}
311 	}
312 
313 	__S(stats_incr_fetches());
314 	return (NEXT_LINE_COMPLETE);
315 }
316 
317 int
stream_stdio_is_closable(stream_t * str)318 stream_stdio_is_closable(stream_t *str)
319 {
320 	if (str->s_status & STREAM_OPEN && !(str->s_status & STREAM_NOTFILE))
321 		return (1);
322 	return (0);
323 }
324 
325 int
stream_stdio_close(stream_t * str)326 stream_stdio_close(stream_t *str)
327 {
328 	ASSERT(str->s_status & STREAM_OPEN);
329 
330 	if (!(str->s_status & STREAM_OUTPUT)) {
331 		if (!(str->s_status & STREAM_NOTFILE))
332 			(void) fclose(str->s_type.BF.s_fp);
333 
334 		if (str->s_type.BF.s_vbuf != NULL) {
335 			free(str->s_type.BF.s_vbuf);
336 			str->s_type.BF.s_vbuf = NULL;
337 		}
338 	} else {
339 		if (cxwrite(str->s_type.SF.s_fd, NULL, 0) == 0)
340 			(void) close(str->s_type.SF.s_fd);
341 		else
342 			die(EMSG_WRITE, str->s_filename);
343 	}
344 
345 	stream_unset(str, STREAM_OPEN | STREAM_PRIMED | STREAM_OUTPUT);
346 	return (1);
347 }
348 
349 static void
stream_stdio_send_eol(stream_t * str)350 stream_stdio_send_eol(stream_t *str)
351 {
352 	ASSERT(str->s_status & STREAM_OPEN);
353 	ASSERT(str->s_status & STREAM_OUTPUT);
354 
355 	if (cxwrite(str->s_type.SF.s_fd, "\n", 1) < 0)
356 		die(EMSG_WRITE, str->s_filename);
357 }
358 
359 void
stream_stdio_flush(stream_t * str)360 stream_stdio_flush(stream_t *str)
361 {
362 	ASSERT(str->s_status & STREAM_OPEN);
363 	ASSERT(str->s_status & STREAM_OUTPUT);
364 
365 	if (cxwrite(str->s_type.SF.s_fd, NULL, 0) < 0)
366 		die(EMSG_WRITE, str->s_filename);
367 }
368 
369 static void
stream_stdio_put_line(stream_t * str,line_rec_t * line)370 stream_stdio_put_line(stream_t *str, line_rec_t *line)
371 {
372 	ASSERT(str->s_status & STREAM_OPEN);
373 	ASSERT(str->s_status & STREAM_OUTPUT);
374 
375 	if (line->l_data_length >= 0) {
376 		if (cxwrite(str->s_type.SF.s_fd, line->l_data.sp,
377 		    line->l_data_length) < 0)
378 			die(EMSG_WRITE, str->s_filename);
379 
380 		stream_stdio_send_eol(str);
381 		__S(stats_incr_puts());
382 	}
383 	safe_free(line->l_raw_collate.sp);
384 	line->l_raw_collate.sp = NULL;
385 }
386 
387 void
stream_stdio_put_line_unique(stream_t * str,line_rec_t * line)388 stream_stdio_put_line_unique(stream_t *str, line_rec_t *line)
389 {
390 	static line_rec_t pvs;
391 	static size_t collate_buf_len;
392 
393 	ASSERT(str->s_status & STREAM_OPEN);
394 	ASSERT(str->s_status & STREAM_OUTPUT);
395 
396 	if (pvs.l_collate.sp != NULL &&
397 	    collated(&pvs, line, 0, COLL_UNIQUE) == 0) {
398 		__S(stats_incr_not_unique());
399 		return;
400 	}
401 
402 	__S(stats_incr_put_unique());
403 	stream_stdio_put_line(str, line);
404 
405 	if (line->l_collate_length + 1 > collate_buf_len) {
406 		pvs.l_collate.sp = safe_realloc(pvs.l_collate.sp,
407 		    line->l_collate_length + 1);
408 		collate_buf_len = line->l_collate_length + 1;
409 	}
410 
411 	(void) memcpy(pvs.l_collate.sp, line->l_collate.sp,
412 	    line->l_collate_length);
413 	*(pvs.l_collate.sp + line->l_collate_length) = '\0';
414 	pvs.l_collate_length = line->l_collate_length;
415 }
416 
417 int
stream_stdio_unlink(stream_t * str)418 stream_stdio_unlink(stream_t *str)
419 {
420 	if (!(str->s_status & STREAM_NOTFILE))
421 		return (unlink(str->s_filename));
422 
423 	return (0);
424 }
425 
426 int
stream_stdio_free(stream_t * str)427 stream_stdio_free(stream_t *str)
428 {
429 	/*
430 	 * Unmap the memory we allocated for input, if it's valid to do so.
431 	 */
432 	if (!(str->s_status & STREAM_OPEN) ||
433 	    (str->s_consumer != NULL &&
434 	    str->s_consumer->s_status & STREAM_NOT_FREEABLE))
435 		return (0);
436 
437 	if (str->s_buffer != NULL) {
438 		if (munmap(str->s_buffer, str->s_buffer_size) < 0)
439 			die(EMSG_MUNMAP, "/dev/zero");
440 		else {
441 			str->s_buffer = NULL;
442 			str->s_buffer_size = 0;
443 		}
444 	}
445 
446 	stream_unset(str, STREAM_PRIMED | STREAM_INSTANT);
447 
448 	return (1);
449 }
450 
451 static int
stream_stdio_eos(stream_t * str)452 stream_stdio_eos(stream_t *str)
453 {
454 	int retval = 0;
455 
456 	ASSERT(!(str->s_status & STREAM_OUTPUT));
457 	ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
458 
459 	if (str == NULL || str->s_status & STREAM_EOS_REACHED)
460 		return (1);
461 
462 	trip_eof(str->s_type.BF.s_fp);
463 	if (feof(str->s_type.BF.s_fp) &&
464 	    shelf == SHELF_VACANT &&
465 	    str->s_current.l_collate_length != -1) {
466 		retval = 1;
467 		stream_set(str, STREAM_EOS_REACHED);
468 	}
469 
470 	return (retval);
471 }
472 
473 /*ARGSUSED*/
474 static void
stream_stdio_release_line(stream_t * str)475 stream_stdio_release_line(stream_t *str)
476 {
477 }
478 
479 const stream_ops_t stream_stdio_ops = {
480 	stream_stdio_is_closable,
481 	stream_stdio_close,
482 	stream_stdio_eos,
483 	stream_stdio_fetch,
484 	stream_stdio_flush,
485 	stream_stdio_free,
486 	stream_stdio_open_for_write,
487 	stream_stdio_prime,
488 	stream_stdio_put_line,
489 	stream_stdio_release_line,
490 	stream_stdio_send_eol,
491 	stream_stdio_unlink
492 };
493