/* * Copyright 2016 Jakub Klama * All rights reserved * * Copyright 2020 Joyent, Inc. * * Redistribution and use in source and binary forms, with or without * modification, are permitted providing that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * */ #include #include #include #if defined(__FreeBSD__) #include #endif #include #include "lib9p.h" #include "threadpool.h" static void l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req); static void * l9p_responder(void *arg) { struct l9p_threadpool *tp; struct l9p_worker *worker = arg; struct l9p_request *req; tp = worker->ltw_tp; for (;;) { /* get next reply to send */ if (pthread_mutex_lock(&tp->ltp_mtx) != 0) break; while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting) { (void) pthread_cond_wait(&tp->ltp_reply_cv, &tp->ltp_mtx); } if (worker->ltw_exiting) { (void) pthread_mutex_unlock(&tp->ltp_mtx); break; } /* off reply queue */ req = STAILQ_FIRST(&tp->ltp_replyq); STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink); /* request is now in final glide path, can't be Tflush-ed */ req->lr_workstate = L9P_WS_REPLYING; /* any flushers waiting for this request can go now */ if (req->lr_flushstate != L9P_FLUSH_NONE) l9p_threadpool_rflush(tp, req); if (pthread_mutex_unlock(&tp->ltp_mtx) != 0) break; /* send response */ l9p_respond(req, false, true); } return (NULL); } static void * l9p_worker(void *arg) { struct l9p_threadpool *tp; struct l9p_worker *worker = arg; struct l9p_request *req; tp = worker->ltw_tp; if (pthread_mutex_lock(&tp->ltp_mtx) != 0) return (NULL); for (;;) { while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting) { (void) pthread_cond_wait(&tp->ltp_work_cv, &tp->ltp_mtx); } if (worker->ltw_exiting) break; /* off work queue; now work-in-progress, by us */ req = STAILQ_FIRST(&tp->ltp_workq); STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink); req->lr_workstate = L9P_WS_INPROGRESS; req->lr_worker = worker; (void) pthread_mutex_unlock(&tp->ltp_mtx); /* actually try the request */ req->lr_error = l9p_dispatch_request(req); /* move to responder queue, updating work-state */ if (pthread_mutex_lock(&tp->ltp_mtx) != 0) return (NULL); req->lr_workstate = L9P_WS_RESPQUEUED; req->lr_worker = NULL; STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink); /* signal the responder */ (void) pthread_cond_signal(&tp->ltp_reply_cv); } (void) pthread_mutex_unlock(&tp->ltp_mtx); return (NULL); } /* * Just before finally replying to a request that got touched by * a Tflush request, we enqueue its flushers (requests of type * Tflush, which are now on the flushee's lr_flushq) onto the * response queue. */ static void l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req) { struct l9p_request *flusher; /* * https://swtch.com/plan9port/man/man9/flush.html says: * * "Should multiple Tflushes be received for a pending * request, they must be answered in order. A Rflush for * any of the multiple Tflushes implies an answer for all * previous ones. Therefore, should a server receive a * request and then multiple flushes for that request, it * need respond only to the last flush." This means * we could march through the queue of flushers here, * marking all but the last one as "to be dropped" rather * than "to be replied-to". * * However, we'll leave that for later, if ever -- it * should be harmless to respond to each, in order. */ STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) { flusher->lr_workstate = L9P_WS_RESPQUEUED; #ifdef notdef if (not the last) { flusher->lr_flushstate = L9P_FLUSH_NOT_RUN; /* or, flusher->lr_drop = true ? */ } #endif STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink); } } int l9p_threadpool_init(struct l9p_threadpool *tp, int size) { struct l9p_worker *worker; #if defined(__FreeBSD__) char threadname[16]; #endif int error; int i, nworkers, nresponders; if (size <= 0) return (EINVAL); #ifdef __illumos__ pthread_mutexattr_t attr; if ((error = pthread_mutexattr_init(&attr)) != 0) return (error); if ((error = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK)) != 0) { return (error); } error = pthread_mutex_init(&tp->ltp_mtx, &attr); #else error = pthread_mutex_init(&tp->ltp_mtx, NULL); #endif if (error) return (error); error = pthread_cond_init(&tp->ltp_work_cv, NULL); if (error) goto fail_work_cv; error = pthread_cond_init(&tp->ltp_reply_cv, NULL); if (error) goto fail_reply_cv; STAILQ_INIT(&tp->ltp_workq); STAILQ_INIT(&tp->ltp_replyq); LIST_INIT(&tp->ltp_workers); nresponders = 0; nworkers = 0; for (i = 0; i <= size; i++) { worker = calloc(1, sizeof(struct l9p_worker)); #ifdef __illumos__ if (worker == NULL) break; #endif worker->ltw_tp = tp; worker->ltw_responder = i == 0; error = pthread_create(&worker->ltw_thread, NULL, worker->ltw_responder ? l9p_responder : l9p_worker, (void *)worker); if (error) { free(worker); break; } if (worker->ltw_responder) nresponders++; else nworkers++; #if defined(__FreeBSD__) if (worker->ltw_responder) { pthread_set_name_np(worker->ltw_thread, "9p-responder"); } else { sprintf(threadname, "9p-worker:%d", i - 1); pthread_set_name_np(worker->ltw_thread, threadname); } #elif defined(__illumos__) if (worker->ltw_responder) { (void) pthread_setname_np(worker->ltw_thread, "9p-responder"); } else { char threadname[PTHREAD_MAX_NAMELEN_NP]; (void) snprintf(threadname, sizeof (threadname), "9p-worker:%d", i - 1); (void) pthread_setname_np(worker->ltw_thread, threadname); } #endif LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link); } if (nresponders == 0 || nworkers == 0) { /* need the one responder, and at least one worker */ l9p_threadpool_shutdown(tp); return (error); } return (0); /* * We could avoid these labels by having multiple destroy * paths (one for each error case), or by having booleans * for which variables were initialized. Neither is very * appealing... */ fail_reply_cv: (void) pthread_cond_destroy(&tp->ltp_work_cv); fail_work_cv: (void) pthread_mutex_destroy(&tp->ltp_mtx); return (error); } /* * Run a request, usually by queueing it. */ void l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req) { /* * Flush requests must be handled specially, since they * can cancel / kill off regular requests. (But we can * run them through the regular dispatch mechanism.) */ if (req->lr_req.hdr.type == L9P_TFLUSH) { /* not on a work queue yet so we can touch state */ req->lr_workstate = L9P_WS_IMMEDIATE; (void) l9p_dispatch_request(req); } else { if (pthread_mutex_lock(&tp->ltp_mtx) != 0) return; req->lr_workstate = L9P_WS_NOTSTARTED; STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink); (void) pthread_cond_signal(&tp->ltp_work_cv); (void) pthread_mutex_unlock(&tp->ltp_mtx); } } /* * Run a Tflush request. Called via l9p_dispatch_request() since * it has some debug code in it, but not called from worker thread. */ int l9p_threadpool_tflush(struct l9p_request *req) { struct l9p_connection *conn; struct l9p_threadpool *tp; struct l9p_request *flushee; uint16_t oldtag; enum l9p_flushstate nstate = L9P_FLUSH_NONE; int err; /* * Find what we're supposed to flush (the flushee, as it were). */ req->lr_error = 0; /* Tflush always succeeds */ conn = req->lr_conn; tp = &conn->lc_tp; oldtag = req->lr_req.tflush.oldtag; if ((err = ht_wrlock(&conn->lc_requests)) != 0) return (err); flushee = ht_find_locked(&conn->lc_requests, oldtag); if (flushee == NULL) { /* * Nothing to flush! The old request must have * been done and gone already. Just queue this * Tflush for a success reply. */ (void) ht_unlock(&conn->lc_requests); if ((err = pthread_mutex_lock(&tp->ltp_mtx)) != 0) return (err); goto done; } /* * Found the original request. We'll need to inspect its * work-state to figure out what to do. */ if ((err = pthread_mutex_lock(&tp->ltp_mtx)) != 0) { (void) ht_unlock(&conn->lc_requests); return (err); } (void) ht_unlock(&conn->lc_requests); switch (flushee->lr_workstate) { case L9P_WS_NOTSTARTED: /* * Flushee is on work queue, but not yet being * handled by a worker. * * The documentation -- see * http://ericvh.github.io/9p-rfc/rfc9p2000.html * https://swtch.com/plan9port/man/man9/flush.html * -- says that "the server should answer the * flush message immediately". However, Linux * sends flush requests for operations that * must finish, such as Tclunk, and it's not * possible to *answer* the flush request until * it has been handled (if necessary) or aborted * (if allowed). * * We therefore now just the original request * and let the request-handler do whatever is * appropriate. NOTE: we could have a table of * "requests that can be aborted without being * run" vs "requests that must be run to be * aborted", but for now that seems like an * unnecessary complication. */ nstate = L9P_FLUSH_REQUESTED_PRE_START; break; case L9P_WS_IMMEDIATE: /* * This state only applies to Tflush requests, and * flushing a Tflush is illegal. But we'll do nothing * special here, which will make us act like a flush * request for the flushee that arrived too late to * do anything about the flushee. */ nstate = L9P_FLUSH_REQUESTED_POST_START; break; case L9P_WS_INPROGRESS: /* * Worker thread flushee->lr_worker is working on it. * Kick it to get it out of blocking system calls. * (This requires that it carefully set up some * signal handlers, and may be FreeBSD-dependent, * it probably cannot be handled this way on MacOS.) */ #ifdef notyet pthread_kill(...); #endif nstate = L9P_FLUSH_REQUESTED_POST_START; break; case L9P_WS_RESPQUEUED: /* * The flushee is already in the response queue. * We'll just mark it as having had some flush * action applied. */ nstate = L9P_FLUSH_TOOLATE; break; case L9P_WS_REPLYING: /* * Although we found the flushee, it's too late to * make us depend on it: it's already heading out * the door as a reply. * * We don't want to do anything to the flushee. * Instead, we want to work the same way as if * we had never found the tag. */ goto done; } /* * Now add us to the list of Tflush-es that are waiting * for the flushee (creating the list if needed, i.e., if * this is the first Tflush for the flushee). We (req) * will get queued for reply later, when the responder * processes the flushee and calls l9p_threadpool_rflush(). */ if (flushee->lr_flushstate == L9P_FLUSH_NONE) STAILQ_INIT(&flushee->lr_flushq); flushee->lr_flushstate = nstate; STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink); (void) pthread_mutex_unlock(&tp->ltp_mtx); return (0); done: /* * This immediate op is ready to be replied-to now, so just * stick it onto the reply queue. */ req->lr_workstate = L9P_WS_RESPQUEUED; STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink); (void) pthread_mutex_unlock(&tp->ltp_mtx); (void) pthread_cond_signal(&tp->ltp_reply_cv); return (0); } int l9p_threadpool_shutdown(struct l9p_threadpool *tp) { struct l9p_worker *worker, *tmp; LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) { if (pthread_mutex_lock(&tp->ltp_mtx) != 0) continue; worker->ltw_exiting = true; if (worker->ltw_responder) (void) pthread_cond_signal(&tp->ltp_reply_cv); else (void) pthread_cond_broadcast(&tp->ltp_work_cv); (void) pthread_mutex_unlock(&tp->ltp_mtx); (void) pthread_join(worker->ltw_thread, NULL); LIST_REMOVE(worker, ltw_link); free(worker); } (void) pthread_cond_destroy(&tp->ltp_reply_cv); (void) pthread_cond_destroy(&tp->ltp_work_cv); (void) pthread_mutex_destroy(&tp->ltp_mtx); return (0); }