xref: /illumos-gate/usr/src/lib/mergeq/workq.c (revision bc1f688b)
1 /*
2  * This file and its contents are supplied under the terms of the
3  * Common Development and Distribution License ("CDDL"), version 1.0.
4  * You may only use this file in accordance with the terms of version
5  * 1.0 of the CDDL.
6  *
7  * A full copy of the text of the CDDL should have accompanied this
8  * source.  A copy of the CDDL is also available via the Internet at
9  * http://www.illumos.org/license/CDDL.
10  */
11 
12 /*
13  * Copyright 2015 Joyent, Inc.
14  */
15 
16 /*
17  * Work queue
18  *
19  * A multi-threaded work queue.
20  *
21  * The general design of this is to add a fixed number of items to the queue and
22  * then drain them with the specified number of threads.
23  */
24 
25 #include <strings.h>
26 #include <sys/debug.h>
27 #include <thread.h>
28 #include <synch.h>
29 #include <errno.h>
30 #include <limits.h>
31 #include <stdlib.h>
32 
33 #include "workq.h"
34 
35 struct workq {
36 	mutex_t wq_lock;	/* Protects below items */
37 	cond_t wq_cond;		/* Condition variable */
38 	void **wq_items;	/* Array of items to process */
39 	size_t wq_nitems;	/* Number of items in queue */
40 	size_t wq_cap;		/* Queue capacity */
41 	size_t wq_next;		/* Next item to process */
42 	uint_t wq_ndthreads;	/* Desired number of threads */
43 	thread_t *wq_thrs;	/* Actual threads */
44 	workq_proc_f *wq_func;	/* Processing function */
45 	void *wq_arg;		/* Argument for processing */
46 	boolean_t wq_working;	/* Are we actively using it? */
47 	boolean_t wq_iserror;	/* Have we encountered an error? */
48 	int wq_error;		/* Error value, if any */
49 };
50 
51 #define	WORKQ_DEFAULT_CAP	64
52 
53 static int
workq_error(int err)54 workq_error(int err)
55 {
56 	VERIFY(err != 0);
57 	errno = err;
58 	return (WORKQ_ERROR);
59 }
60 
61 void
workq_fini(workq_t * wqp)62 workq_fini(workq_t *wqp)
63 {
64 	if (wqp == NULL)
65 		return;
66 
67 	VERIFY(wqp->wq_working != B_TRUE);
68 	VERIFY0(mutex_destroy(&wqp->wq_lock));
69 	VERIFY0(cond_destroy(&wqp->wq_cond));
70 	if (wqp->wq_cap > 0)
71 		workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
72 	if (wqp->wq_ndthreads > 0)
73 		workq_free(wqp->wq_thrs, sizeof (thread_t) * wqp->wq_ndthreads);
74 	workq_free(wqp, sizeof (workq_t));
75 }
76 
77 int
workq_init(workq_t ** outp,uint_t nthrs)78 workq_init(workq_t **outp, uint_t nthrs)
79 {
80 	int ret;
81 	workq_t *wqp;
82 
83 	wqp = workq_alloc(sizeof (workq_t));
84 	if (wqp == NULL)
85 		return (workq_error(ENOMEM));
86 
87 	bzero(wqp, sizeof (workq_t));
88 	wqp->wq_items = workq_alloc(sizeof (void *) * WORKQ_DEFAULT_CAP);
89 	if (wqp->wq_items == NULL) {
90 		workq_free(wqp, sizeof (workq_t));
91 		return (workq_error(ENOMEM));
92 	}
93 	bzero(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
94 
95 	wqp->wq_ndthreads = nthrs - 1;
96 	if (wqp->wq_ndthreads > 0) {
97 		wqp->wq_thrs = workq_alloc(sizeof (thread_t) *
98 		    wqp->wq_ndthreads);
99 		if (wqp->wq_thrs == NULL) {
100 			workq_free(wqp->wq_items, sizeof (void *) *
101 			    WORKQ_DEFAULT_CAP);
102 			workq_free(wqp, sizeof (workq_t));
103 			return (workq_error(ENOMEM));
104 		}
105 	}
106 
107 	if ((ret = mutex_init(&wqp->wq_lock, USYNC_THREAD | LOCK_ERRORCHECK,
108 	    NULL)) != 0) {
109 		if (wqp->wq_ndthreads > 0) {
110 			workq_free(wqp->wq_thrs,
111 			    sizeof (thread_t) * wqp->wq_ndthreads);
112 		}
113 		workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
114 		workq_free(wqp, sizeof (workq_t));
115 		return (workq_error(ret));
116 	}
117 
118 	if ((ret = cond_init(&wqp->wq_cond, USYNC_THREAD, NULL)) != 0) {
119 		VERIFY0(mutex_destroy(&wqp->wq_lock));
120 		if (wqp->wq_ndthreads > 0) {
121 			workq_free(wqp->wq_thrs,
122 			    sizeof (thread_t) * wqp->wq_ndthreads);
123 		}
124 		workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
125 		workq_free(wqp, sizeof (workq_t));
126 		return (workq_error(ret));
127 	}
128 
129 	wqp->wq_cap = WORKQ_DEFAULT_CAP;
130 	*outp = wqp;
131 	return (0);
132 }
133 
134 static void
workq_reset(workq_t * wqp)135 workq_reset(workq_t *wqp)
136 {
137 	VERIFY(MUTEX_HELD(&wqp->wq_lock));
138 	VERIFY(wqp->wq_working == B_FALSE);
139 	if (wqp->wq_cap > 0)
140 		bzero(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
141 	wqp->wq_nitems = 0;
142 	wqp->wq_next = 0;
143 	wqp->wq_func = NULL;
144 	wqp->wq_arg = NULL;
145 	wqp->wq_iserror = B_FALSE;
146 	wqp->wq_error = 0;
147 }
148 
149 static int
workq_grow(workq_t * wqp)150 workq_grow(workq_t *wqp)
151 {
152 	size_t ncap;
153 	void **items;
154 
155 	VERIFY(MUTEX_HELD(&wqp->wq_lock));
156 	VERIFY(wqp->wq_working == B_FALSE);
157 
158 	if (SIZE_MAX - wqp->wq_cap < WORKQ_DEFAULT_CAP)
159 		return (ENOSPC);
160 
161 	ncap = wqp->wq_cap + WORKQ_DEFAULT_CAP;
162 	items = workq_alloc(ncap * sizeof (void *));
163 	if (items == NULL)
164 		return (ENOMEM);
165 
166 	bzero(items, ncap * sizeof (void *));
167 	bcopy(wqp->wq_items, items, wqp->wq_cap * sizeof (void *));
168 	workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
169 	wqp->wq_items = items;
170 	wqp->wq_cap = ncap;
171 	return (0);
172 }
173 
174 int
workq_add(workq_t * wqp,void * item)175 workq_add(workq_t *wqp, void *item)
176 {
177 	VERIFY0(mutex_lock(&wqp->wq_lock));
178 	if (wqp->wq_working == B_TRUE) {
179 		VERIFY0(mutex_unlock(&wqp->wq_lock));
180 		return (workq_error(ENXIO));
181 	}
182 
183 	if (wqp->wq_nitems == wqp->wq_cap) {
184 		int ret;
185 
186 		if ((ret = workq_grow(wqp)) != 0) {
187 			VERIFY0(mutex_unlock(&wqp->wq_lock));
188 			return (workq_error(ret));
189 		}
190 	}
191 
192 	wqp->wq_items[wqp->wq_nitems] = item;
193 	wqp->wq_nitems++;
194 
195 	VERIFY0(mutex_unlock(&wqp->wq_lock));
196 
197 	return (0);
198 }
199 
200 static void *
workq_pop(workq_t * wqp)201 workq_pop(workq_t *wqp)
202 {
203 	void *out;
204 
205 	VERIFY(MUTEX_HELD(&wqp->wq_lock));
206 	VERIFY(wqp->wq_next < wqp->wq_nitems);
207 
208 	out = wqp->wq_items[wqp->wq_next];
209 	wqp->wq_items[wqp->wq_next] = NULL;
210 	wqp->wq_next++;
211 
212 	return (out);
213 }
214 
215 static void *
workq_thr_work(void * arg)216 workq_thr_work(void *arg)
217 {
218 	workq_t *wqp = arg;
219 
220 	VERIFY0(mutex_lock(&wqp->wq_lock));
221 	VERIFY(wqp->wq_working == B_TRUE);
222 
223 	for (;;) {
224 		int ret;
225 		void *item;
226 
227 		if (wqp->wq_iserror == B_TRUE ||
228 		    wqp->wq_next == wqp->wq_nitems) {
229 			VERIFY0(mutex_unlock(&wqp->wq_lock));
230 			return (NULL);
231 		}
232 
233 		item = workq_pop(wqp);
234 
235 		VERIFY0(mutex_unlock(&wqp->wq_lock));
236 		ret = wqp->wq_func(item, wqp->wq_arg);
237 		VERIFY0(mutex_lock(&wqp->wq_lock));
238 
239 		if (ret != 0) {
240 			if (wqp->wq_iserror == B_FALSE) {
241 				wqp->wq_iserror = B_TRUE;
242 				wqp->wq_error = ret;
243 			}
244 			VERIFY0(mutex_unlock(&wqp->wq_lock));
245 			return (NULL);
246 		}
247 	}
248 }
249 
250 int
workq_work(workq_t * wqp,workq_proc_f * func,void * arg,int * errp)251 workq_work(workq_t *wqp, workq_proc_f *func, void *arg, int *errp)
252 {
253 	int i, ret;
254 	boolean_t seterr = B_FALSE;
255 
256 	if (wqp == NULL || func == NULL)
257 		return (workq_error(EINVAL));
258 
259 	VERIFY0(mutex_lock(&wqp->wq_lock));
260 	if (wqp->wq_working == B_TRUE) {
261 		VERIFY0(mutex_unlock(&wqp->wq_lock));
262 		return (workq_error(EBUSY));
263 	}
264 
265 	if (wqp->wq_nitems == 0) {
266 		workq_reset(wqp);
267 		VERIFY0(mutex_unlock(&wqp->wq_lock));
268 		return (0);
269 	}
270 
271 	wqp->wq_func = func;
272 	wqp->wq_arg = arg;
273 	wqp->wq_next = 0;
274 	wqp->wq_working = B_TRUE;
275 
276 	ret = 0;
277 	for (i = 0; i < wqp->wq_ndthreads; i++) {
278 		ret = thr_create(NULL, 0, workq_thr_work, wqp, 0,
279 		    &wqp->wq_thrs[i]);
280 		if (ret != 0) {
281 			wqp->wq_iserror = B_TRUE;
282 		}
283 	}
284 
285 	VERIFY0(mutex_unlock(&wqp->wq_lock));
286 	if (ret == 0)
287 		(void) workq_thr_work(wqp);
288 
289 	for (i = 0; i < wqp->wq_ndthreads; i++) {
290 		VERIFY0(thr_join(wqp->wq_thrs[i], NULL, NULL));
291 	}
292 
293 	VERIFY0(mutex_lock(&wqp->wq_lock));
294 	wqp->wq_working = B_FALSE;
295 	if (ret == 0 && wqp->wq_iserror == B_TRUE) {
296 		ret = WORKQ_UERROR;
297 		if (errp != NULL)
298 			*errp = wqp->wq_error;
299 	} else if (ret != 0) {
300 		VERIFY(wqp->wq_iserror == B_FALSE);
301 		seterr = B_TRUE;
302 	}
303 
304 	workq_reset(wqp);
305 	VERIFY0(mutex_unlock(&wqp->wq_lock));
306 
307 	if (seterr == B_TRUE)
308 		return (workq_error(ret));
309 
310 	return (ret);
311 }
312