1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License").  You may not use this file except in compliance
7 * with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22/*
23 * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
24 * Use is subject to license terms.
25 */
26
27#pragma ident	"%Z%%M%	%I%	%E% SMI"
28
29#include "streams_mmap.h"
30#include "streams_common.h"
31
32/*
33 * Single-byte character memory map-based streams implementation
34 */
35
36static int
37stream_mmap_prime(stream_t *str)
38{
39	char *nl;
40
41	if (stream_is_primed(str))
42		return (PRIME_SUCCEEDED);
43
44	stream_set(str, STREAM_PRIMED);
45
46	if (str->s_buffer_size == 0) {
47		stream_set(str, STREAM_EOS_REACHED);
48		return (PRIME_FAILED_EMPTY_FILE);
49	}
50
51	str->s_current.l_data.sp = str->s_buffer;
52	str->s_type.SF.s_release_origin = str->s_buffer;
53	if ((nl = (char *)memchr(str->s_buffer, '\n', str->s_buffer_size)) ==
54	    NULL) {
55		warn(WMSG_NEWLINE_ADDED, str->s_filename);
56		str->s_current.l_data_length = str->s_buffer_size;
57	} else {
58		str->s_current.l_data_length = nl - (char *)str->s_buffer;
59	}
60
61	str->s_current.l_collate.sp = NULL;
62	str->s_current.l_collate_length = 0;
63
64	__S(stats_incr_fetches());
65	return (PRIME_SUCCEEDED);
66}
67
68/*
69 * stream_mmap_fetch() sets the fields of str->s_current to delimit the next
70 * line of the field.
71 */
72static ssize_t
73stream_mmap_fetch(stream_t *str)
74{
75	ssize_t dist_to_buf_end;
76	char *next_nl;
77
78	ASSERT(stream_is_primed(str));
79	ASSERT((str->s_status & STREAM_EOS_REACHED) == 0);
80
81	/*
82	 * adding one for newline
83	 */
84	str->s_current.l_data.sp = str->s_current.l_data.sp +
85	    str->s_current.l_data_length + 1;
86
87	dist_to_buf_end = str->s_buffer_size - (str->s_current.l_data.sp
88	    - (char *)str->s_buffer);
89	ASSERT(dist_to_buf_end >= 0 && dist_to_buf_end <= str->s_buffer_size);
90
91	next_nl = memchr(str->s_current.l_data.sp, '\n', dist_to_buf_end);
92
93	if (next_nl)
94		str->s_current.l_data_length = next_nl
95		    - str->s_current.l_data.sp;
96	else {
97		warn(WMSG_NEWLINE_ADDED, str->s_filename);
98		str->s_current.l_data_length = dist_to_buf_end;
99	}
100
101	/*
102	 * adding one for newline
103	 */
104	if (str->s_current.l_data.sp + str->s_current.l_data_length + 1 >=
105	    (char *)str->s_buffer + str->s_buffer_size)
106		stream_set(str, STREAM_EOS_REACHED);
107
108	str->s_current.l_collate_length = 0;
109
110	__S(stats_incr_fetches());
111	return (NEXT_LINE_COMPLETE);
112}
113
114static int
115stream_mmap_is_closable(stream_t *str)
116{
117	if (str->s_status & STREAM_OPEN)
118		return (1);
119	return (0);
120}
121
122static int
123stream_mmap_close(stream_t *str)
124{
125	if (str->s_type.SF.s_fd > -1) {
126		(void) close(str->s_type.SF.s_fd);
127		stream_unset(str, STREAM_OPEN);
128		return (1);
129	}
130
131	return (0);
132}
133
134static int
135stream_mmap_free(stream_t *str)
136{
137	if (!(str->s_status & STREAM_OPEN) ||
138	    (str->s_consumer != NULL &&
139	    str->s_consumer->s_status & STREAM_NOT_FREEABLE))
140		return (0);
141
142	if (str->s_buffer == NULL)
143		return (1);
144
145	if (munmap(str->s_buffer, str->s_buffer_size) < 0)
146		die(EMSG_MUNMAP, str->s_filename);
147
148	str->s_buffer = NULL;
149	str->s_buffer_size = 0;
150
151	stream_unset(str, STREAM_PRIMED);
152
153	return (1);
154}
155
156static int
157stream_mmap_eos(stream_t *str)
158{
159	int retval = 0;
160
161	if (str == NULL || str->s_status & STREAM_EOS_REACHED)
162		return (1);
163
164	/*
165	 * If the file's size is known to be zero, then we are at EOS; the
166	 * remaining checks are only sensible if we successfully primed this
167	 * stream.  The additional character is for the optional newline.
168	 */
169	if (str->s_filesize == 0 ||
170	    (stream_is_primed(str) && str->s_current.l_data.sp -
171	    (char *)str->s_buffer + str->s_current.l_data_length + 1 >=
172	    str->s_buffer_size)) {
173		retval = 1;
174		stream_set(str, STREAM_EOS_REACHED);
175	}
176
177	return (retval);
178}
179
180#define	ALIGNED		(~(ulong_t)(PAGESIZE - 1))
181
182/*
183 * In certain cases, we know that we will never need the data on a page again
184 * for the duration of the sort.  (These cases are associated with merges
185 * involving temporary files.)  We can thus release all pages previous to the
186 * page containing the current line, using the MADV_DONTNEED flag to
187 * madvise(3C).  This additional memory management improves our chances of
188 * avoiding a paging situation, by evicting pages we know are of no use.
189 */
190static void
191stream_mmap_release_line(stream_t *str)
192{
193	caddr_t origin = str->s_type.SF.s_release_origin;
194	size_t release = 0;
195
196	while ((caddr_t)((ulong_t)str->s_current.l_data.sp & ALIGNED) -
197	    (origin + release) >= DEFAULT_RELEASE_SIZE)
198		release += DEFAULT_RELEASE_SIZE;
199
200	if (release == 0)
201		return;
202
203	if (madvise(origin, release, MADV_DONTNEED) == -1)
204		warn(gettext("madvise failed"));
205
206	str->s_type.SF.s_release_origin += release;
207}
208
209const stream_ops_t stream_mmap_ops = {
210	stream_mmap_is_closable,
211	stream_mmap_close,
212	stream_mmap_eos,
213	stream_mmap_fetch,
214	NULL,
215	stream_mmap_free,
216	NULL,
217	stream_mmap_prime,
218	NULL,
219	stream_mmap_release_line,
220	NULL,
221	stream_stdio_unlink
222};
223