1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License").  You may not use this file except in compliance
7 * with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22/*
23 * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
24 * Use is subject to license terms.
25 */
26
27/*
28 * A synchronized FIFO queue for inter-thread producer-consumer semantics.
29 * This queue will handle multiple writers and readers simultaneously.
30 *
31 * The following operations are provided:
32 * slp_new_queue:	create a new queue
33 * slp_enqueue:		place a message at the end of the queue
34 * slp_enqueue_at_head:	place a message the the start of the queue
35 * slp_dequeue:		remove and return the next message on the queue
36 *				(waits indefinately)
37 * slp_dequeue_timed:	remove and return the next message on the queue
38 *				(waits only for a specified time)
39 * slp_flush_queue:	flushes and frees all messages on a queue
40 * slp_destroy_queue:	frees an empty queue.
41 */
42
43#include <stdio.h>
44#include <stdlib.h>
45#include <thread.h>
46#include <synch.h>
47#include <syslog.h>
48#include <slp.h>
49#include <slp-internal.h>
50
51/* Private implementation details */
52struct queue_entry {
53	void *msg;
54	struct queue_entry *next;
55};
56typedef struct queue_entry slp_queue_entry_t;
57
58struct queue {
59	slp_queue_entry_t *head;
60	slp_queue_entry_t *tail;
61	mutex_t *lock;
62	cond_t *wait;
63	int count;
64};
65
66/*
67 * Creates, initializes, and returns a new queue.
68 * If an initialization error occured, returns NULL and sets err to
69 * the appropriate SLP error code.
70 * queues can operate in one of two modes: timed-wait, and infinite
71 * wait. The timeout parameter specifies which of these modes should
72 * be enabled for the new queue.
73 */
74slp_queue_t *slp_new_queue(SLPError *err) {
75	mutex_t *lock;
76	cond_t *wait;
77	struct queue *q;
78
79	*err = SLP_OK;
80
81	/* initialize new mutex and semaphore */
82	if ((lock = calloc(1, sizeof (*lock))) == NULL) {
83		*err = SLP_MEMORY_ALLOC_FAILED;
84		slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
85		return (NULL);
86	}
87
88	/* intialize condition vars */
89	if (!(wait = calloc(1, sizeof (*wait)))) {
90		*err = SLP_MEMORY_ALLOC_FAILED;
91		slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
92		return (NULL);
93	}
94	(void) cond_init(wait, USYNC_THREAD, NULL);
95
96	/* create the queue */
97	if ((q = malloc(sizeof (*q))) == NULL) {
98		*err = SLP_MEMORY_ALLOC_FAILED;
99		slp_err(LOG_CRIT, 0, "slp_new_queue", "out of memory");
100		return (NULL);
101	}
102
103	q->head = NULL;
104	q->lock = lock;
105	q->wait = wait;
106	q->count = 0;
107
108	return (q);
109}
110
111/*
112 * Adds msg to the tail of queue q.
113 * Returns an SLP error code: SLP_OK for no error, or SLP_MEMORY_ALLOC_FAILED
114 * if it couldn't allocate memory.
115 */
116SLPError slp_enqueue(slp_queue_t *qa, void *msg) {
117	slp_queue_entry_t *qe;
118	struct queue *q = qa;
119
120	if ((qe = malloc(sizeof (*qe))) == NULL) {
121		slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory");
122		return (SLP_MEMORY_ALLOC_FAILED);
123	}
124
125	(void) mutex_lock(q->lock);
126	qe->msg = msg;
127	qe->next = NULL;
128	if (q->head != NULL) {	/* queue is not emptry */
129		q->tail->next = qe;
130		q->tail = qe;
131	} else {		/* queue is empty */
132		q->head = q->tail = qe;
133	}
134	q->count++;
135	(void) mutex_unlock(q->lock);
136	(void) cond_signal(q->wait);
137
138	return (SLP_OK);
139}
140
141/*
142 * Inserts a message at the head of the queue. This is useful for inserting
143 * things like cancel messages.
144 */
145SLPError slp_enqueue_at_head(slp_queue_t *qa, void *msg) {
146	slp_queue_entry_t *qe;
147	struct queue *q = qa;
148
149	if ((qe = malloc(sizeof (*qe))) == NULL) {
150		slp_err(LOG_CRIT, 0, "slp_enqueue", "out of memory");
151		return (SLP_MEMORY_ALLOC_FAILED);
152	}
153
154	(void) mutex_lock(q->lock);
155	qe->msg = msg;
156	qe->next = q->head;
157	q->head = qe;
158
159	q->count++;
160	(void) mutex_unlock(q->lock);
161	(void) cond_signal(q->wait);
162
163	return (SLP_OK);
164}
165
166/*
167 * The core functionality for dequeue.
168 */
169static void *dequeue_nolock(struct queue *q) {
170	void *msg;
171	slp_queue_entry_t *qe = q->head;
172
173	if (!qe)
174		return (NULL);	/* shouldn't get here */
175	msg = qe->msg;
176	if (!qe->next)		/* last one in queue */
177		q->head = q->tail = NULL;
178	else
179		q->head = qe->next;
180	free(qe);
181	q->count--;
182	return (msg);
183}
184
185/*
186 * Returns the first message waiting or arriving in the queue, or if no
187 * message is available after waiting the amount of time specified in
188 * 'to', returns NULL, and sets 'etimed' to true. If an error occured,
189 * returns NULL and sets 'etimed' to false.
190 */
191void *slp_dequeue_timed(slp_queue_t *qa, timestruc_t *to, SLPBoolean *etimed) {
192	int err;
193	void *ans;
194	struct queue *q = qa;
195
196	if (etimed)
197		*etimed = SLP_FALSE;
198
199	(void) mutex_lock(q->lock);
200	if (q->count > 0) {
201		/* something's in the q, so no need to wait */
202		goto msg_available;
203	}
204
205	/* else wait */
206	while (q->count == 0) {
207		if (to) {
208			err = cond_timedwait(q->wait, q->lock, to);
209		} else {
210			err = cond_wait(q->wait, q->lock);
211		}
212		if (err == ETIME) {
213			(void) mutex_unlock(q->lock);
214			*etimed = SLP_TRUE;
215			return (NULL);
216		}
217	}
218
219msg_available:
220	ans = dequeue_nolock(q);
221	(void) mutex_unlock(q->lock);
222	return (ans);
223}
224
225/*
226 * Removes the first message from the queue and returns it.
227 * Returns NULL only on internal error.
228 */
229void *slp_dequeue(slp_queue_t *qa) {
230	return (slp_dequeue_timed(qa, NULL, NULL));
231}
232
233/*
234 * Flushes the queue, using the caller-specified free function to
235 * free each message in the queue.
236 */
237void slp_flush_queue(slp_queue_t *qa, void (*free_f)(void *)) {
238	slp_queue_entry_t *p, *pn;
239	struct queue *q = qa;
240
241	for (p = q->head; p; p = pn) {
242		pn = p->next;
243		free_f(p);
244	}
245}
246
247/*
248 * Frees a queue.
249 * The queue must be empty before it can be destroyed; slp_flush_queue
250 * can be used to empty a queue.
251 */
252void slp_destroy_queue(slp_queue_t *qa) {
253	struct queue *q = qa;
254
255	(void) mutex_destroy(q->lock);
256	(void) cond_destroy(q->wait);
257	free(q->lock);
258	free(q->wait);
259	free(q);
260}
261