1 /*
2 * Copyright 2016 Jakub Klama <jceel@FreeBSD.org>
3 * All rights reserved
4 *
5 * Copyright 2020 Joyent, Inc.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted providing that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
24 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
25 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 *
28 */
29
30 #include <errno.h>
31 #include <stdlib.h>
32 #include <pthread.h>
33 #if defined(__FreeBSD__)
34 #include <pthread_np.h>
35 #endif
36 #include <sys/queue.h>
37 #include "lib9p.h"
38 #include "threadpool.h"
39
40 static void l9p_threadpool_rflush(struct l9p_threadpool *tp,
41 struct l9p_request *req);
42
43 static void *
l9p_responder(void * arg)44 l9p_responder(void *arg)
45 {
46 struct l9p_threadpool *tp;
47 struct l9p_worker *worker = arg;
48 struct l9p_request *req;
49
50 tp = worker->ltw_tp;
51 for (;;) {
52 /* get next reply to send */
53
54 if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
55 break;
56 while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting) {
57 (void) pthread_cond_wait(&tp->ltp_reply_cv,
58 &tp->ltp_mtx);
59 }
60 if (worker->ltw_exiting) {
61 (void) pthread_mutex_unlock(&tp->ltp_mtx);
62 break;
63 }
64
65 /* off reply queue */
66 req = STAILQ_FIRST(&tp->ltp_replyq);
67 STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink);
68
69 /* request is now in final glide path, can't be Tflush-ed */
70 req->lr_workstate = L9P_WS_REPLYING;
71
72 /* any flushers waiting for this request can go now */
73 if (req->lr_flushstate != L9P_FLUSH_NONE)
74 l9p_threadpool_rflush(tp, req);
75
76 if (pthread_mutex_unlock(&tp->ltp_mtx) != 0)
77 break;
78
79 /* send response */
80 l9p_respond(req, false, true);
81 }
82 return (NULL);
83 }
84
85 static void *
l9p_worker(void * arg)86 l9p_worker(void *arg)
87 {
88 struct l9p_threadpool *tp;
89 struct l9p_worker *worker = arg;
90 struct l9p_request *req;
91
92 tp = worker->ltw_tp;
93 if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
94 return (NULL);
95 for (;;) {
96 while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting) {
97 (void) pthread_cond_wait(&tp->ltp_work_cv,
98 &tp->ltp_mtx);
99 }
100 if (worker->ltw_exiting)
101 break;
102
103 /* off work queue; now work-in-progress, by us */
104 req = STAILQ_FIRST(&tp->ltp_workq);
105 STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink);
106 req->lr_workstate = L9P_WS_INPROGRESS;
107 req->lr_worker = worker;
108 (void) pthread_mutex_unlock(&tp->ltp_mtx);
109
110 /* actually try the request */
111 req->lr_error = l9p_dispatch_request(req);
112
113 /* move to responder queue, updating work-state */
114 if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
115 return (NULL);
116 req->lr_workstate = L9P_WS_RESPQUEUED;
117 req->lr_worker = NULL;
118 STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
119
120 /* signal the responder */
121 (void) pthread_cond_signal(&tp->ltp_reply_cv);
122 }
123 (void) pthread_mutex_unlock(&tp->ltp_mtx);
124 return (NULL);
125 }
126
127 /*
128 * Just before finally replying to a request that got touched by
129 * a Tflush request, we enqueue its flushers (requests of type
130 * Tflush, which are now on the flushee's lr_flushq) onto the
131 * response queue.
132 */
133 static void
l9p_threadpool_rflush(struct l9p_threadpool * tp,struct l9p_request * req)134 l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req)
135 {
136 struct l9p_request *flusher;
137
138 /*
139 * https://swtch.com/plan9port/man/man9/flush.html says:
140 *
141 * "Should multiple Tflushes be received for a pending
142 * request, they must be answered in order. A Rflush for
143 * any of the multiple Tflushes implies an answer for all
144 * previous ones. Therefore, should a server receive a
145 * request and then multiple flushes for that request, it
146 * need respond only to the last flush." This means
147 * we could march through the queue of flushers here,
148 * marking all but the last one as "to be dropped" rather
149 * than "to be replied-to".
150 *
151 * However, we'll leave that for later, if ever -- it
152 * should be harmless to respond to each, in order.
153 */
154 STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) {
155 flusher->lr_workstate = L9P_WS_RESPQUEUED;
156 #ifdef notdef
157 if (not the last) {
158 flusher->lr_flushstate = L9P_FLUSH_NOT_RUN;
159 /* or, flusher->lr_drop = true ? */
160 }
161 #endif
162 STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink);
163 }
164 }
165
166 int
l9p_threadpool_init(struct l9p_threadpool * tp,int size)167 l9p_threadpool_init(struct l9p_threadpool *tp, int size)
168 {
169 struct l9p_worker *worker;
170 #if defined(__FreeBSD__)
171 char threadname[16];
172 #endif
173 int error;
174 int i, nworkers, nresponders;
175
176 if (size <= 0)
177 return (EINVAL);
178 #ifdef __illumos__
179 pthread_mutexattr_t attr;
180
181 if ((error = pthread_mutexattr_init(&attr)) != 0)
182 return (error);
183 if ((error = pthread_mutexattr_settype(&attr,
184 PTHREAD_MUTEX_ERRORCHECK)) != 0) {
185 return (error);
186 }
187 error = pthread_mutex_init(&tp->ltp_mtx, &attr);
188 #else
189 error = pthread_mutex_init(&tp->ltp_mtx, NULL);
190 #endif
191 if (error)
192 return (error);
193 error = pthread_cond_init(&tp->ltp_work_cv, NULL);
194 if (error)
195 goto fail_work_cv;
196 error = pthread_cond_init(&tp->ltp_reply_cv, NULL);
197 if (error)
198 goto fail_reply_cv;
199
200 STAILQ_INIT(&tp->ltp_workq);
201 STAILQ_INIT(&tp->ltp_replyq);
202 LIST_INIT(&tp->ltp_workers);
203
204 nresponders = 0;
205 nworkers = 0;
206 for (i = 0; i <= size; i++) {
207 worker = calloc(1, sizeof(struct l9p_worker));
208 #ifdef __illumos__
209 if (worker == NULL)
210 break;
211 #endif
212 worker->ltw_tp = tp;
213 worker->ltw_responder = i == 0;
214 error = pthread_create(&worker->ltw_thread, NULL,
215 worker->ltw_responder ? l9p_responder : l9p_worker,
216 (void *)worker);
217 if (error) {
218 free(worker);
219 break;
220 }
221 if (worker->ltw_responder)
222 nresponders++;
223 else
224 nworkers++;
225
226 #if defined(__FreeBSD__)
227 if (worker->ltw_responder) {
228 pthread_set_name_np(worker->ltw_thread, "9p-responder");
229 } else {
230 sprintf(threadname, "9p-worker:%d", i - 1);
231 pthread_set_name_np(worker->ltw_thread, threadname);
232 }
233 #elif defined(__illumos__)
234 if (worker->ltw_responder) {
235 (void) pthread_setname_np(worker->ltw_thread,
236 "9p-responder");
237 } else {
238 char threadname[PTHREAD_MAX_NAMELEN_NP];
239
240 (void) snprintf(threadname, sizeof (threadname),
241 "9p-worker:%d", i - 1);
242 (void) pthread_setname_np(worker->ltw_thread,
243 threadname);
244 }
245 #endif
246
247 LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link);
248 }
249 if (nresponders == 0 || nworkers == 0) {
250 /* need the one responder, and at least one worker */
251 l9p_threadpool_shutdown(tp);
252 return (error);
253 }
254 return (0);
255
256 /*
257 * We could avoid these labels by having multiple destroy
258 * paths (one for each error case), or by having booleans
259 * for which variables were initialized. Neither is very
260 * appealing...
261 */
262 fail_reply_cv:
263 (void) pthread_cond_destroy(&tp->ltp_work_cv);
264 fail_work_cv:
265 (void) pthread_mutex_destroy(&tp->ltp_mtx);
266
267 return (error);
268 }
269
270 /*
271 * Run a request, usually by queueing it.
272 */
273 void
l9p_threadpool_run(struct l9p_threadpool * tp,struct l9p_request * req)274 l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req)
275 {
276
277 /*
278 * Flush requests must be handled specially, since they
279 * can cancel / kill off regular requests. (But we can
280 * run them through the regular dispatch mechanism.)
281 */
282 if (req->lr_req.hdr.type == L9P_TFLUSH) {
283 /* not on a work queue yet so we can touch state */
284 req->lr_workstate = L9P_WS_IMMEDIATE;
285 (void) l9p_dispatch_request(req);
286 } else {
287 if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
288 return;
289 req->lr_workstate = L9P_WS_NOTSTARTED;
290 STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink);
291 (void) pthread_cond_signal(&tp->ltp_work_cv);
292 (void) pthread_mutex_unlock(&tp->ltp_mtx);
293 }
294 }
295
296 /*
297 * Run a Tflush request. Called via l9p_dispatch_request() since
298 * it has some debug code in it, but not called from worker thread.
299 */
300 int
l9p_threadpool_tflush(struct l9p_request * req)301 l9p_threadpool_tflush(struct l9p_request *req)
302 {
303 struct l9p_connection *conn;
304 struct l9p_threadpool *tp;
305 struct l9p_request *flushee;
306 uint16_t oldtag;
307 enum l9p_flushstate nstate = L9P_FLUSH_NONE;
308 int err;
309
310 /*
311 * Find what we're supposed to flush (the flushee, as it were).
312 */
313 req->lr_error = 0; /* Tflush always succeeds */
314 conn = req->lr_conn;
315 tp = &conn->lc_tp;
316 oldtag = req->lr_req.tflush.oldtag;
317 if ((err = ht_wrlock(&conn->lc_requests)) != 0)
318 return (err);
319 flushee = ht_find_locked(&conn->lc_requests, oldtag);
320 if (flushee == NULL) {
321 /*
322 * Nothing to flush! The old request must have
323 * been done and gone already. Just queue this
324 * Tflush for a success reply.
325 */
326 (void) ht_unlock(&conn->lc_requests);
327 if ((err = pthread_mutex_lock(&tp->ltp_mtx)) != 0)
328 return (err);
329 goto done;
330 }
331
332 /*
333 * Found the original request. We'll need to inspect its
334 * work-state to figure out what to do.
335 */
336 if ((err = pthread_mutex_lock(&tp->ltp_mtx)) != 0) {
337 (void) ht_unlock(&conn->lc_requests);
338 return (err);
339 }
340 (void) ht_unlock(&conn->lc_requests);
341
342 switch (flushee->lr_workstate) {
343
344 case L9P_WS_NOTSTARTED:
345 /*
346 * Flushee is on work queue, but not yet being
347 * handled by a worker.
348 *
349 * The documentation -- see
350 * http://ericvh.github.io/9p-rfc/rfc9p2000.html
351 * https://swtch.com/plan9port/man/man9/flush.html
352 * -- says that "the server should answer the
353 * flush message immediately". However, Linux
354 * sends flush requests for operations that
355 * must finish, such as Tclunk, and it's not
356 * possible to *answer* the flush request until
357 * it has been handled (if necessary) or aborted
358 * (if allowed).
359 *
360 * We therefore now just the original request
361 * and let the request-handler do whatever is
362 * appropriate. NOTE: we could have a table of
363 * "requests that can be aborted without being
364 * run" vs "requests that must be run to be
365 * aborted", but for now that seems like an
366 * unnecessary complication.
367 */
368 nstate = L9P_FLUSH_REQUESTED_PRE_START;
369 break;
370
371 case L9P_WS_IMMEDIATE:
372 /*
373 * This state only applies to Tflush requests, and
374 * flushing a Tflush is illegal. But we'll do nothing
375 * special here, which will make us act like a flush
376 * request for the flushee that arrived too late to
377 * do anything about the flushee.
378 */
379 nstate = L9P_FLUSH_REQUESTED_POST_START;
380 break;
381
382 case L9P_WS_INPROGRESS:
383 /*
384 * Worker thread flushee->lr_worker is working on it.
385 * Kick it to get it out of blocking system calls.
386 * (This requires that it carefully set up some
387 * signal handlers, and may be FreeBSD-dependent,
388 * it probably cannot be handled this way on MacOS.)
389 */
390 #ifdef notyet
391 pthread_kill(...);
392 #endif
393 nstate = L9P_FLUSH_REQUESTED_POST_START;
394 break;
395
396 case L9P_WS_RESPQUEUED:
397 /*
398 * The flushee is already in the response queue.
399 * We'll just mark it as having had some flush
400 * action applied.
401 */
402 nstate = L9P_FLUSH_TOOLATE;
403 break;
404
405 case L9P_WS_REPLYING:
406 /*
407 * Although we found the flushee, it's too late to
408 * make us depend on it: it's already heading out
409 * the door as a reply.
410 *
411 * We don't want to do anything to the flushee.
412 * Instead, we want to work the same way as if
413 * we had never found the tag.
414 */
415 goto done;
416 }
417
418 /*
419 * Now add us to the list of Tflush-es that are waiting
420 * for the flushee (creating the list if needed, i.e., if
421 * this is the first Tflush for the flushee). We (req)
422 * will get queued for reply later, when the responder
423 * processes the flushee and calls l9p_threadpool_rflush().
424 */
425 if (flushee->lr_flushstate == L9P_FLUSH_NONE)
426 STAILQ_INIT(&flushee->lr_flushq);
427 flushee->lr_flushstate = nstate;
428 STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink);
429
430 (void) pthread_mutex_unlock(&tp->ltp_mtx);
431
432 return (0);
433
434 done:
435 /*
436 * This immediate op is ready to be replied-to now, so just
437 * stick it onto the reply queue.
438 */
439 req->lr_workstate = L9P_WS_RESPQUEUED;
440 STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
441 (void) pthread_mutex_unlock(&tp->ltp_mtx);
442 (void) pthread_cond_signal(&tp->ltp_reply_cv);
443 return (0);
444 }
445
446 int
l9p_threadpool_shutdown(struct l9p_threadpool * tp)447 l9p_threadpool_shutdown(struct l9p_threadpool *tp)
448 {
449 struct l9p_worker *worker, *tmp;
450
451 LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) {
452 if (pthread_mutex_lock(&tp->ltp_mtx) != 0)
453 continue;
454 worker->ltw_exiting = true;
455 if (worker->ltw_responder)
456 (void) pthread_cond_signal(&tp->ltp_reply_cv);
457 else
458 (void) pthread_cond_broadcast(&tp->ltp_work_cv);
459 (void) pthread_mutex_unlock(&tp->ltp_mtx);
460 (void) pthread_join(worker->ltw_thread, NULL);
461 LIST_REMOVE(worker, ltw_link);
462 free(worker);
463 }
464 (void) pthread_cond_destroy(&tp->ltp_reply_cv);
465 (void) pthread_cond_destroy(&tp->ltp_work_cv);
466 (void) pthread_mutex_destroy(&tp->ltp_mtx);
467
468 return (0);
469 }
470