1 /*
2  * Copyright 2016 Jakub Klama <jceel@FreeBSD.org>
3  * All rights reserved
4  *
5  * Copyright 2020 Joyent, Inc.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted providing that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
20  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
24  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
25  * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  *
28  */
29 
30 #include <errno.h>
31 #include <stdlib.h>
32 #include <pthread.h>
33 #if defined(__FreeBSD__)
34 #include <pthread_np.h>
35 #endif
36 #include <sys/queue.h>
37 #include "lib9p.h"
38 #include "threadpool.h"
39 
40 static void l9p_threadpool_rflush(struct l9p_threadpool *tp,
41     struct l9p_request *req);
42 
43 static void *
l9p_responder(void * arg)44 l9p_responder(void *arg)
45 {
46 	struct l9p_threadpool *tp;
47 	struct l9p_worker *worker = arg;
48 	struct l9p_request *req;
49 
50 	tp = worker->ltw_tp;
51 	for (;;) {
52 		/* get next reply to send */
53 
54 		if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
55 			break;
56 		while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting) {
57 			(void) pthread_cond_wait(&tp->ltp_reply_cv,
58 			    &tp->ltp_mtx);
59 		}
60 		if (worker->ltw_exiting) {
61 			(void) pthread_mutex_unlock(&tp->ltp_mtx);
62 			break;
63 		}
64 
65 		/* off reply queue */
66 		req = STAILQ_FIRST(&tp->ltp_replyq);
67 		STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink);
68 
69 		/* request is now in final glide path, can't be Tflush-ed */
70 		req->lr_workstate = L9P_WS_REPLYING;
71 
72 		/* any flushers waiting for this request can go now */
73 		if (req->lr_flushstate != L9P_FLUSH_NONE)
74 			l9p_threadpool_rflush(tp, req);
75 
76 		if (pthread_mutex_unlock(&tp->ltp_mtx) != 0)
77 			break;
78 
79 		/* send response */
80 		l9p_respond(req, false, true);
81 	}
82 	return (NULL);
83 }
84 
85 static void *
l9p_worker(void * arg)86 l9p_worker(void *arg)
87 {
88 	struct l9p_threadpool *tp;
89 	struct l9p_worker *worker = arg;
90 	struct l9p_request *req;
91 
92 	tp = worker->ltw_tp;
93 	if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
94 		return (NULL);
95 	for (;;) {
96 		while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting) {
97 			(void) pthread_cond_wait(&tp->ltp_work_cv,
98 			    &tp->ltp_mtx);
99 		}
100 		if (worker->ltw_exiting)
101 			break;
102 
103 		/* off work queue; now work-in-progress, by us */
104 		req = STAILQ_FIRST(&tp->ltp_workq);
105 		STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink);
106 		req->lr_workstate = L9P_WS_INPROGRESS;
107 		req->lr_worker = worker;
108 		(void) pthread_mutex_unlock(&tp->ltp_mtx);
109 
110 		/* actually try the request */
111 		req->lr_error = l9p_dispatch_request(req);
112 
113 		/* move to responder queue, updating work-state */
114 		if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
115 			return (NULL);
116 		req->lr_workstate = L9P_WS_RESPQUEUED;
117 		req->lr_worker = NULL;
118 		STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
119 
120 		/* signal the responder */
121 		(void) pthread_cond_signal(&tp->ltp_reply_cv);
122 	}
123 	(void) pthread_mutex_unlock(&tp->ltp_mtx);
124 	return (NULL);
125 }
126 
127 /*
128  * Just before finally replying to a request that got touched by
129  * a Tflush request, we enqueue its flushers (requests of type
130  * Tflush, which are now on the flushee's lr_flushq) onto the
131  * response queue.
132  */
133 static void
l9p_threadpool_rflush(struct l9p_threadpool * tp,struct l9p_request * req)134 l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req)
135 {
136 	struct l9p_request *flusher;
137 
138 	/*
139 	 * https://swtch.com/plan9port/man/man9/flush.html says:
140 	 *
141 	 * "Should multiple Tflushes be received for a pending
142 	 * request, they must be answered in order.  A Rflush for
143 	 * any of the multiple Tflushes implies an answer for all
144 	 * previous ones.  Therefore, should a server receive a
145 	 * request and then multiple flushes for that request, it
146 	 * need respond only to the last flush."  This means
147 	 * we could march through the queue of flushers here,
148 	 * marking all but the last one as "to be dropped" rather
149 	 * than "to be replied-to".
150 	 *
151 	 * However, we'll leave that for later, if ever -- it
152 	 * should be harmless to respond to each, in order.
153 	 */
154 	STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) {
155 		flusher->lr_workstate = L9P_WS_RESPQUEUED;
156 #ifdef notdef
157 		if (not the last) {
158 			flusher->lr_flushstate = L9P_FLUSH_NOT_RUN;
159 			/* or, flusher->lr_drop = true ? */
160 		}
161 #endif
162 		STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink);
163 	}
164 }
165 
166 int
l9p_threadpool_init(struct l9p_threadpool * tp,int size)167 l9p_threadpool_init(struct l9p_threadpool *tp, int size)
168 {
169 	struct l9p_worker *worker;
170 #if defined(__FreeBSD__)
171 	char threadname[16];
172 #endif
173 	int error;
174 	int i, nworkers, nresponders;
175 
176 	if (size <= 0)
177 		return (EINVAL);
178 #ifdef __illumos__
179 	pthread_mutexattr_t attr;
180 
181 	if ((error = pthread_mutexattr_init(&attr)) != 0)
182 		return (error);
183 	if ((error = pthread_mutexattr_settype(&attr,
184 	    PTHREAD_MUTEX_ERRORCHECK)) != 0) {
185 		return (error);
186 	}
187 	error = pthread_mutex_init(&tp->ltp_mtx, &attr);
188 #else
189 	error = pthread_mutex_init(&tp->ltp_mtx, NULL);
190 #endif
191 	if (error)
192 		return (error);
193 	error = pthread_cond_init(&tp->ltp_work_cv, NULL);
194 	if (error)
195 		goto fail_work_cv;
196 	error = pthread_cond_init(&tp->ltp_reply_cv, NULL);
197 	if (error)
198 		goto fail_reply_cv;
199 
200 	STAILQ_INIT(&tp->ltp_workq);
201 	STAILQ_INIT(&tp->ltp_replyq);
202 	LIST_INIT(&tp->ltp_workers);
203 
204 	nresponders = 0;
205 	nworkers = 0;
206 	for (i = 0; i <= size; i++) {
207 		worker = calloc(1, sizeof(struct l9p_worker));
208 #ifdef __illumos__
209 		if (worker == NULL)
210 			break;
211 #endif
212 		worker->ltw_tp = tp;
213 		worker->ltw_responder = i == 0;
214 		error = pthread_create(&worker->ltw_thread, NULL,
215 		    worker->ltw_responder ? l9p_responder : l9p_worker,
216 		    (void *)worker);
217 		if (error) {
218 			free(worker);
219 			break;
220 		}
221 		if (worker->ltw_responder)
222 			nresponders++;
223 		else
224 			nworkers++;
225 
226 #if defined(__FreeBSD__)
227 		if (worker->ltw_responder) {
228 			pthread_set_name_np(worker->ltw_thread, "9p-responder");
229 		} else {
230 			sprintf(threadname, "9p-worker:%d", i - 1);
231 			pthread_set_name_np(worker->ltw_thread, threadname);
232 		}
233 #elif defined(__illumos__)
234 		if (worker->ltw_responder) {
235 			(void) pthread_setname_np(worker->ltw_thread,
236 			    "9p-responder");
237 		} else {
238 			char threadname[PTHREAD_MAX_NAMELEN_NP];
239 
240 			(void) snprintf(threadname, sizeof (threadname),
241 			    "9p-worker:%d", i - 1);
242 			(void) pthread_setname_np(worker->ltw_thread,
243 			    threadname);
244 		}
245 #endif
246 
247 		LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link);
248 	}
249 	if (nresponders == 0 || nworkers == 0) {
250 		/* need the one responder, and at least one worker */
251 		l9p_threadpool_shutdown(tp);
252 		return (error);
253 	}
254 	return (0);
255 
256 	/*
257 	 * We could avoid these labels by having multiple destroy
258 	 * paths (one for each error case), or by having booleans
259 	 * for which variables were initialized.  Neither is very
260 	 * appealing...
261 	 */
262 fail_reply_cv:
263 	(void) pthread_cond_destroy(&tp->ltp_work_cv);
264 fail_work_cv:
265 	(void) pthread_mutex_destroy(&tp->ltp_mtx);
266 
267 	return (error);
268 }
269 
270 /*
271  * Run a request, usually by queueing it.
272  */
273 void
l9p_threadpool_run(struct l9p_threadpool * tp,struct l9p_request * req)274 l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req)
275 {
276 
277 	/*
278 	 * Flush requests must be handled specially, since they
279 	 * can cancel / kill off regular requests.  (But we can
280 	 * run them through the regular dispatch mechanism.)
281 	 */
282 	if (req->lr_req.hdr.type == L9P_TFLUSH) {
283 		/* not on a work queue yet so we can touch state */
284 		req->lr_workstate = L9P_WS_IMMEDIATE;
285 		(void) l9p_dispatch_request(req);
286 	} else {
287 		if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
288 			return;
289 		req->lr_workstate = L9P_WS_NOTSTARTED;
290 		STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink);
291 		(void) pthread_cond_signal(&tp->ltp_work_cv);
292 		(void) pthread_mutex_unlock(&tp->ltp_mtx);
293 	}
294 }
295 
296 /*
297  * Run a Tflush request.  Called via l9p_dispatch_request() since
298  * it has some debug code in it, but not called from worker thread.
299  */
300 int
l9p_threadpool_tflush(struct l9p_request * req)301 l9p_threadpool_tflush(struct l9p_request *req)
302 {
303 	struct l9p_connection *conn;
304 	struct l9p_threadpool *tp;
305 	struct l9p_request *flushee;
306 	uint16_t oldtag;
307 	enum l9p_flushstate nstate = L9P_FLUSH_NONE;
308 	int err;
309 
310 	/*
311 	 * Find what we're supposed to flush (the flushee, as it were).
312 	 */
313 	req->lr_error = 0;	/* Tflush always succeeds */
314 	conn = req->lr_conn;
315 	tp = &conn->lc_tp;
316 	oldtag = req->lr_req.tflush.oldtag;
317 	if ((err = ht_wrlock(&conn->lc_requests)) != 0)
318 		return (err);
319 	flushee = ht_find_locked(&conn->lc_requests, oldtag);
320 	if (flushee == NULL) {
321 		/*
322 		 * Nothing to flush!  The old request must have
323 		 * been done and gone already.  Just queue this
324 		 * Tflush for a success reply.
325 		 */
326 		(void) ht_unlock(&conn->lc_requests);
327 		if ((err = pthread_mutex_lock(&tp->ltp_mtx)) != 0)
328 			return (err);
329 		goto done;
330 	}
331 
332 	/*
333 	 * Found the original request.  We'll need to inspect its
334 	 * work-state to figure out what to do.
335 	 */
336 	if ((err = pthread_mutex_lock(&tp->ltp_mtx)) != 0) {
337 		(void) ht_unlock(&conn->lc_requests);
338 		return (err);
339 	}
340 	(void) ht_unlock(&conn->lc_requests);
341 
342 	switch (flushee->lr_workstate) {
343 
344 	case L9P_WS_NOTSTARTED:
345 		/*
346 		 * Flushee is on work queue, but not yet being
347 		 * handled by a worker.
348 		 *
349 		 * The documentation -- see
350 		 * http://ericvh.github.io/9p-rfc/rfc9p2000.html
351 		 * https://swtch.com/plan9port/man/man9/flush.html
352 		 * -- says that "the server should answer the
353 		 * flush message immediately".  However, Linux
354 		 * sends flush requests for operations that
355 		 * must finish, such as Tclunk, and it's not
356 		 * possible to *answer* the flush request until
357 		 * it has been handled (if necessary) or aborted
358 		 * (if allowed).
359 		 *
360 		 * We therefore now just  the original request
361 		 * and let the request-handler do whatever is
362 		 * appropriate.  NOTE: we could have a table of
363 		 * "requests that can be aborted without being
364 		 * run" vs "requests that must be run to be
365 		 * aborted", but for now that seems like an
366 		 * unnecessary complication.
367 		 */
368 		nstate = L9P_FLUSH_REQUESTED_PRE_START;
369 		break;
370 
371 	case L9P_WS_IMMEDIATE:
372 		/*
373 		 * This state only applies to Tflush requests, and
374 		 * flushing a Tflush is illegal.  But we'll do nothing
375 		 * special here, which will make us act like a flush
376 		 * request for the flushee that arrived too late to
377 		 * do anything about the flushee.
378 		 */
379 		nstate = L9P_FLUSH_REQUESTED_POST_START;
380 		break;
381 
382 	case L9P_WS_INPROGRESS:
383 		/*
384 		 * Worker thread flushee->lr_worker is working on it.
385 		 * Kick it to get it out of blocking system calls.
386 		 * (This requires that it carefully set up some
387 		 * signal handlers, and may be FreeBSD-dependent,
388 		 * it probably cannot be handled this way on MacOS.)
389 		 */
390 #ifdef notyet
391 		pthread_kill(...);
392 #endif
393 		nstate = L9P_FLUSH_REQUESTED_POST_START;
394 		break;
395 
396 	case L9P_WS_RESPQUEUED:
397 		/*
398 		 * The flushee is already in the response queue.
399 		 * We'll just mark it as having had some flush
400 		 * action applied.
401 		 */
402 		nstate = L9P_FLUSH_TOOLATE;
403 		break;
404 
405 	case L9P_WS_REPLYING:
406 		/*
407 		 * Although we found the flushee, it's too late to
408 		 * make us depend on it: it's already heading out
409 		 * the door as a reply.
410 		 *
411 		 * We don't want to do anything to the flushee.
412 		 * Instead, we want to work the same way as if
413 		 * we had never found the tag.
414 		 */
415 		goto done;
416 	}
417 
418 	/*
419 	 * Now add us to the list of Tflush-es that are waiting
420 	 * for the flushee (creating the list if needed, i.e., if
421 	 * this is the first Tflush for the flushee).  We (req)
422 	 * will get queued for reply later, when the responder
423 	 * processes the flushee and calls l9p_threadpool_rflush().
424 	 */
425 	if (flushee->lr_flushstate == L9P_FLUSH_NONE)
426 		STAILQ_INIT(&flushee->lr_flushq);
427 	flushee->lr_flushstate = nstate;
428 	STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink);
429 
430 	(void) pthread_mutex_unlock(&tp->ltp_mtx);
431 
432 	return (0);
433 
434 done:
435 	/*
436 	 * This immediate op is ready to be replied-to now, so just
437 	 * stick it onto the reply queue.
438 	 */
439 	req->lr_workstate = L9P_WS_RESPQUEUED;
440 	STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
441 	(void) pthread_mutex_unlock(&tp->ltp_mtx);
442 	(void) pthread_cond_signal(&tp->ltp_reply_cv);
443 	return (0);
444 }
445 
446 int
l9p_threadpool_shutdown(struct l9p_threadpool * tp)447 l9p_threadpool_shutdown(struct l9p_threadpool *tp)
448 {
449 	struct l9p_worker *worker, *tmp;
450 
451 	LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) {
452 		if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
453 			continue;
454 		worker->ltw_exiting = true;
455 		if (worker->ltw_responder)
456 			(void) pthread_cond_signal(&tp->ltp_reply_cv);
457 		else
458 			(void) pthread_cond_broadcast(&tp->ltp_work_cv);
459 		(void) pthread_mutex_unlock(&tp->ltp_mtx);
460 		(void) pthread_join(worker->ltw_thread, NULL);
461 		LIST_REMOVE(worker, ltw_link);
462 		free(worker);
463 	}
464 	(void) pthread_cond_destroy(&tp->ltp_reply_cv);
465 	(void) pthread_cond_destroy(&tp->ltp_work_cv);
466 	(void) pthread_mutex_destroy(&tp->ltp_mtx);
467 
468 	return (0);
469 }
470