1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright (c) 2002-2003, Network Appliance, Inc. All rights reserved.
24  */
25 
26 /*
27  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
28  * Use is subject to license terms.
29  */
30 
31 /*
32  *
33  * MODULE: dapl_ring_buffer_util.c
34  *
35  * PURPOSE: Ring buffer management
36  * Description: Support and management functions for ring buffers
37  *
38  * $Id: dapl_ring_buffer_util.c,v 1.9 2003/07/08 14:23:35 sjs2 Exp $
39  */
40 
41 #include "dapl_ring_buffer_util.h"
42 
43 /*
44  * dapls_rbuf_alloc
45  *
46  * Given a DAPL_RING_BUFFER, initialize it and provide memory for
47  * the ringbuf itself. A passed in size will be adjusted to the next
48  * largest power of two number to simplify management.
49  *
50  * Input:
51  *	rbuf		pointer to DAPL_RING_BUFFER
52  *	size		number of elements to allocate & manage
53  *
54  * Output:
55  *	none
56  *
57  * Returns:
58  *	DAT_SUCCESS
59  *	DAT_INSUFFICIENT_RESOURCES
60  *
61  */
62 DAT_RETURN
dapls_rbuf_alloc(INOUT DAPL_RING_BUFFER * rbuf,IN DAT_COUNT size)63 dapls_rbuf_alloc(
64 	INOUT	DAPL_RING_BUFFER	*rbuf,
65 	IN	DAT_COUNT		 size)
66 {
67 	unsigned int			rsize;	/* real size */
68 
69 	/*
70 	 * The circular buffer must be allocated one too large.
71 	 * This eliminates any need for a distinct counter, as that
72 	 * having the two pointers equal always means "empty" -- never "full"
73 	 */
74 	size++;
75 
76 	/* Put size on a power of 2 boundary */
77 	rsize = 1;
78 	while ((DAT_COUNT)rsize < size) {
79 		rsize <<= 1;
80 	}
81 
82 	rbuf->base = (void *) dapl_os_alloc(rsize * sizeof (void *));
83 	if (rbuf->base != NULL) {
84 		rbuf->lim = rsize - 1;
85 		rbuf->head = 0;
86 		rbuf->tail = 0;
87 		dapl_os_lock_init(&rbuf->lock);
88 	} else {
89 		return (DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY);
90 	}
91 
92 	return (DAT_SUCCESS);
93 }
94 
95 
96 /*
97  * dapls_rbuf_realloc
98  *
99  * Resizes an empty DAPL_RING_BUFFER. This function is not thread safe;
100  * adding or removing elements from a ring buffer while resizing
101  * will have indeterminate results.
102  *
103  * Input:
104  *	rbuf		pointer to DAPL_RING_BUFFER
105  *	size		number of elements to allocate & manage
106  *
107  * Output:
108  *	none
109  *
110  * Returns:
111  *	DAT_SUCCESS
112  *	DAT_INVALID_STATE
113  *	DAT_INSUFFICIENT_RESOURCES
114  *
115  */
116 DAT_RETURN
dapls_rbuf_realloc(INOUT DAPL_RING_BUFFER * rbuf,IN DAT_COUNT size)117 dapls_rbuf_realloc(
118 	INOUT	DAPL_RING_BUFFER	*rbuf,
119 	IN	DAT_COUNT		 size)
120 {
121 	int			rsize;		/* real size */
122 	DAT_RETURN		dat_status;
123 
124 	dat_status = DAT_SUCCESS;
125 
126 	/* if the ring buffer is not empty */
127 	if (rbuf->head != rbuf->tail) {
128 		dat_status = DAT_ERROR(DAT_INVALID_STATE, 0);
129 		goto bail;
130 	}
131 
132 	/* Put size on a power of 2 boundary */
133 	rsize = 1;
134 	while (rsize < size) {
135 		rsize <<= 1;
136 	}
137 
138 	rbuf->base = (void *)dapl_os_realloc(rbuf->base,
139 	    rsize * sizeof (void *));
140 	if (NULL == rbuf->base) {
141 		dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,
142 		    DAT_RESOURCE_MEMORY);
143 		goto bail;
144 	}
145 
146 	rbuf->lim = rsize - 1;
147 
148 bail:
149 	return (dat_status);
150 }
151 
152 
153 /*
154  * dapls_rbuf_destroy
155  *
156  * Release the buffer and reset pointers to a DAPL_RING_BUFFER
157  *
158  * Input:
159  *	rbuf		pointer to DAPL_RING_BUFFER
160  *
161  * Output:
162  *	none
163  *
164  * Returns:
165  *	none
166  *
167  */
168 void
dapls_rbuf_destroy(IN DAPL_RING_BUFFER * rbuf)169 dapls_rbuf_destroy(
170 	IN  DAPL_RING_BUFFER		*rbuf)
171 {
172 	if ((NULL == rbuf) ||
173 	    (NULL == rbuf->base)) {
174 		return;
175 	}
176 
177 	dapl_os_lock_destroy(&rbuf->lock);
178 	dapl_os_free(rbuf->base, (rbuf->lim + 1) * sizeof (void *));
179 	rbuf->base = NULL;
180 	rbuf->lim = 0;
181 }
182 
183 /*
184  * dapls_rbuf_add
185  *
186  * Add an entry to the ring buffer
187  *
188  * Input:
189  *	rbuf		pointer to DAPL_RING_BUFFER
190  *	entry		entry to add
191  *
192  * Output:
193  *	none
194  *
195  * Returns:
196  *	DAT_SUCCESS
197  *	DAT_INSUFFICIENT_RESOURCES         (queue full)
198  *
199  */
200 DAT_RETURN
dapls_rbuf_add(IN DAPL_RING_BUFFER * rbuf,IN void * entry)201 dapls_rbuf_add(
202 	IN  DAPL_RING_BUFFER		*rbuf,
203 	IN  void			*entry)
204 {
205 	DAPL_ATOMIC		pos;
206 
207 	dapl_os_lock(&rbuf->lock);
208 	pos = rbuf->head;
209 	if (((pos + 1) & rbuf->lim) != rbuf->tail) {
210 		rbuf->base[pos] = entry;
211 		rbuf->head = (pos + 1) & rbuf->lim;
212 		dapl_os_unlock(&rbuf->lock);
213 		return (DAT_SUCCESS);
214 	}
215 
216 	dapl_os_unlock(&rbuf->lock);
217 	return (DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY));
218 }
219 
220 
221 /*
222  * dapls_rbuf_remove
223  *
224  * Remove an entry from the ring buffer
225  *
226  * Input:
227  *	rbuf		pointer to DAPL_RING_BUFFER
228  *
229  * Output:
230  *	entry		entry removed from the ring buffer
231  *
232  * Returns:
233  *	a pointer to a buffer entry
234  */
235 void *
dapls_rbuf_remove(IN DAPL_RING_BUFFER * rbuf)236 dapls_rbuf_remove(
237 	IN  DAPL_RING_BUFFER	*rbuf)
238 {
239 	DAPL_ATOMIC		pos;
240 
241 	dapl_os_lock(&rbuf->lock);
242 	if (rbuf->head != rbuf->tail) {
243 		pos = rbuf->tail;
244 		rbuf->tail = (pos + 1) & rbuf->lim;
245 		dapl_os_unlock(&rbuf->lock);
246 		return (rbuf->base[pos]);
247 	}
248 
249 	dapl_os_unlock(&rbuf->lock);
250 	return (NULL);
251 }
252 
253 
254 /*
255  * dapli_rbuf_count
256  *
257  * Return the number of entries in use in the ring buffer
258  *
259  * Input:
260  *	rbuf		pointer to DAPL_RING_BUFFER
261  *
262  * Output:
263  *	none
264  *
265  * Returns:
266  *	count of entries
267  *
268  */
269 DAT_COUNT
dapls_rbuf_count(IN DAPL_RING_BUFFER * rbuf)270 dapls_rbuf_count(
271 	IN DAPL_RING_BUFFER *rbuf)
272 {
273 	int head;
274 	int tail;
275 
276 	dapl_os_lock(&rbuf->lock);
277 	head = rbuf->head;
278 	tail = rbuf->tail;
279 	dapl_os_unlock(&rbuf->lock);
280 	if (head == tail)
281 		return (0);
282 	if (head > tail)
283 		return (head - tail);
284 	/* add 1 to lim as it is a mask, number of entries - 1 */
285 	return ((rbuf->lim + 1 - tail + head));
286 }
287