xref: /illumos-gate/usr/src/uts/common/inet/squeue.c (revision 233fee3f31fd346be76e19861bfbff832c1768f1)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
23  */
24 
25 /*
26  * Copyright 2017 Joyent, Inc.
27  */
28 
29 /*
30  * Squeues: General purpose serialization mechanism
31  * ------------------------------------------------
32  *
33  * Background:
34  * -----------
35  *
36  * This is a general purpose high-performance serialization mechanism
37  * currently used by TCP/IP. It is implement by means of a per CPU queue,
38  * a worker thread and a polling thread with are bound to the CPU
39  * associated with the squeue. The squeue is strictly FIFO for both read
40  * and write side and only one thread can process it at any given time.
41  * The design goal of squeue was to offer a very high degree of
42  * parallelization (on a per H/W execution pipeline basis) with at
43  * most one queuing.
44  *
45  * The modules needing protection typically calls SQUEUE_ENTER_ONE() or
46  * SQUEUE_ENTER() macro as soon as a thread enter the module
47  * from either direction. For each packet, the processing function
48  * and argument is stored in the mblk itself. When the packet is ready
49  * to be processed, the squeue retrieves the stored function and calls
50  * it with the supplied argument and the pointer to the packet itself.
51  * The called function can assume that no other thread is processing
52  * the squeue when it is executing.
53  *
54  * Squeue/connection binding:
55  * --------------------------
56  *
57  * TCP/IP uses an IP classifier in conjunction with squeue where specific
58  * connections are assigned to specific squeue (based on various policies),
59  * at the connection creation time. Once assigned, the connection to
60  * squeue mapping is never changed and all future packets for that
61  * connection are processed on that squeue. The connection ("conn") to
62  * squeue mapping is stored in "conn_t" member "conn_sqp".
63  *
64  * Since the processing of the connection cuts across multiple layers
65  * but still allows packets for different connnection to be processed on
66  * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
67  * "Per Connection Vertical Perimeter".
68  *
69  * Processing Model:
70  * -----------------
71  *
72  * Squeue doesn't necessary processes packets with its own worker thread.
73  * The callers can pick if they just want to queue the packet, process
74  * their packet if nothing is queued or drain and process. The first two
75  * modes are typically employed when the packet was generated while
76  * already doing the processing behind the squeue and last mode (drain
77  * and process) is typically employed when the thread is entering squeue
78  * for the first time. The squeue still imposes a finite time limit
79  * for which a external thread can do processing after which it switches
80  * processing to its own worker thread.
81  *
82  * Once created, squeues are never deleted. Hence squeue pointers are
83  * always valid. This means that functions outside the squeue can still
84  * refer safely to conn_sqp and their is no need for ref counts.
85  *
86  * Only a thread executing in the squeue can change the squeue of the
87  * connection. It does so by calling a squeue framework function to do this.
88  * After changing the squeue, the thread must leave the squeue. It must not
89  * continue to execute any code that needs squeue protection.
90  *
91  * The squeue framework, after entering the squeue, checks if the current
92  * squeue matches the conn_sqp. If the check fails, the packet is delivered
93  * to right squeue.
94  *
95  * Polling Model:
96  * --------------
97  *
98  * Squeues can control the rate of packet arrival into itself from the
99  * NIC or specific Rx ring within a NIC. As part of capability negotiation
100  * between IP and MAC layer, squeue are created for each TCP soft ring
101  * (or TCP Rx ring - to be implemented in future). As part of this
102  * negotiation, squeues get a cookie for underlying soft ring or Rx
103  * ring, a function to turn off incoming packets and a function to call
104  * to poll for packets. This helps schedule the receive side packet
105  * processing so that queue backlog doesn't build up and packet processing
106  * doesn't keep getting disturbed by high priority interrupts. As part
107  * of this mode, as soon as a backlog starts building, squeue turns off
108  * the interrupts and switches to poll mode. In poll mode, when poll
109  * thread goes down to retrieve packets, it retrieves them in the form of
110  * a chain which improves performance even more. As the squeue/softring
111  * system gets more packets, it gets more efficient by switching to
112  * polling more often and dealing with larger packet chains.
113  *
114  */
115 
116 #include <sys/types.h>
117 #include <sys/cmn_err.h>
118 #include <sys/debug.h>
119 #include <sys/kmem.h>
120 #include <sys/cpuvar.h>
121 #include <sys/condvar_impl.h>
122 #include <sys/systm.h>
123 #include <sys/callb.h>
124 #include <sys/sdt.h>
125 #include <sys/ddi.h>
126 #include <sys/sunddi.h>
127 #include <sys/stack.h>
128 #include <sys/archsystm.h>
129 
130 #include <inet/ipclassifier.h>
131 #include <inet/udp_impl.h>
132 
133 #include <sys/squeue_impl.h>
134 
135 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
136 static void squeue_worker(squeue_t *sqp);
137 static void squeue_polling_thread(squeue_t *sqp);
138 static void squeue_worker_wakeup(squeue_t *sqp);
139 
140 kmem_cache_t *squeue_cache;
141 
142 #define	SQUEUE_MSEC_TO_NSEC 1000000
143 
144 int squeue_drain_ms = 20;
145 
146 /* The values above converted to ticks or nano seconds */
147 static uint_t squeue_drain_ns = 0;
148 
149 uintptr_t squeue_drain_stack_needed = 10240;
150 uint_t squeue_drain_stack_toodeep;
151 
152 #define	MAX_BYTES_TO_PICKUP	150000
153 
154 #define	ENQUEUE_CHAIN(sqp, mp, tail, cnt) {			\
155 	/*							\
156 	 * Enqueue our mblk chain.				\
157 	 */							\
158 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
159 								\
160 	if ((sqp)->sq_last != NULL)				\
161 		(sqp)->sq_last->b_next = (mp);			\
162 	else							\
163 		(sqp)->sq_first = (mp);				\
164 	(sqp)->sq_last = (tail);				\
165 	(sqp)->sq_count += (cnt);				\
166 	ASSERT((sqp)->sq_count > 0);				\
167 	DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp,	\
168 		mblk_t *, mp, mblk_t *, tail, int, cnt);	\
169 								\
170 }
171 
172 /*
173  * Blank the receive ring (in this case it is the soft ring). When
174  * blanked, the soft ring will not send any more packets up.
175  * Blanking may not succeed when there is a CPU already in the soft
176  * ring sending packets up. In that case, SQS_POLLING will not be
177  * set.
178  */
179 #define	SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) {		\
180 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
181 	if (sq_poll_capable) {					\
182 		ASSERT(rx_ring != NULL);			\
183 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
184 		if (!(sqp->sq_state & SQS_POLLING)) {		\
185 			if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \
186 				sqp->sq_state |= SQS_POLLING;	\
187 		}						\
188 	}							\
189 }
190 
191 #define	SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) {	\
192 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
193 	if (sq_poll_capable) {					\
194 		ASSERT(rx_ring != NULL);			\
195 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
196 		if (sqp->sq_state & SQS_POLLING) {		\
197 			sqp->sq_state &= ~SQS_POLLING;		\
198 			rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \
199 		}						\
200 	}							\
201 }
202 
203 /* Wakeup poll thread only if SQS_POLLING is set */
204 #define	SQS_POLL_RING(sqp) {			\
205 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
206 	if (sqp->sq_state & SQS_POLLING) {			\
207 		ASSERT(sqp->sq_state & SQS_POLL_CAPAB);		\
208 		if (!(sqp->sq_state & SQS_GET_PKTS)) {		\
209 			sqp->sq_state |= SQS_GET_PKTS;		\
210 			cv_signal(&sqp->sq_poll_cv);		\
211 		}						\
212 	}							\
213 }
214 
215 #ifdef DEBUG
216 #define	SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) {		\
217 	(sqp)->sq_curmp = (mp);					\
218 	(sqp)->sq_curproc = (proc);				\
219 	(sqp)->sq_connp = (connp);				\
220 	(mp)->b_tag = (sqp)->sq_tag = (tag);			\
221 }
222 
223 #define	SQUEUE_DBG_CLEAR(sqp)	{				\
224 	(sqp)->sq_curmp = NULL;					\
225 	(sqp)->sq_curproc = NULL;				\
226 	(sqp)->sq_connp = NULL;					\
227 }
228 #else
229 #define	SQUEUE_DBG_SET(sqp, mp, proc, connp, tag)
230 #define	SQUEUE_DBG_CLEAR(sqp)
231 #endif
232 
233 void
234 squeue_init(void)
235 {
236 	squeue_cache = kmem_cache_create("squeue_cache",
237 	    sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
238 
239 	squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC;
240 }
241 
242 squeue_t *
243 squeue_create(pri_t pri)
244 {
245 	squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
246 
247 	bzero(sqp, sizeof (squeue_t));
248 	sqp->sq_bind = PBIND_NONE;
249 	sqp->sq_priority = pri;
250 	sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
251 	    sqp, 0, &p0, TS_RUN, pri);
252 
253 	sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread,
254 	    sqp, 0, &p0, TS_RUN, pri);
255 
256 	sqp->sq_enter = squeue_enter;
257 	sqp->sq_drain = squeue_drain;
258 
259 	return (sqp);
260 }
261 
262 /*
263  * Bind squeue worker thread to the specified CPU, given by CPU id.
264  * If the CPU id  value is -1, bind the worker thread to the value
265  * specified in sq_bind field. If a thread is already bound to a
266  * different CPU, unbind it from the old CPU and bind to the new one.
267  */
268 
269 void
270 squeue_bind(squeue_t *sqp, processorid_t bind)
271 {
272 	mutex_enter(&sqp->sq_lock);
273 	ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE);
274 	ASSERT(MUTEX_HELD(&cpu_lock));
275 
276 	if (sqp->sq_state & SQS_BOUND) {
277 		if (sqp->sq_bind == bind) {
278 			mutex_exit(&sqp->sq_lock);
279 			return;
280 		}
281 		thread_affinity_clear(sqp->sq_worker);
282 	} else {
283 		sqp->sq_state |= SQS_BOUND;
284 	}
285 
286 	if (bind != PBIND_NONE)
287 		sqp->sq_bind = bind;
288 
289 	thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
290 	mutex_exit(&sqp->sq_lock);
291 }
292 
293 void
294 squeue_unbind(squeue_t *sqp)
295 {
296 	mutex_enter(&sqp->sq_lock);
297 	if (!(sqp->sq_state & SQS_BOUND)) {
298 		mutex_exit(&sqp->sq_lock);
299 		return;
300 	}
301 
302 	sqp->sq_state &= ~SQS_BOUND;
303 	thread_affinity_clear(sqp->sq_worker);
304 	mutex_exit(&sqp->sq_lock);
305 }
306 
307 /*
308  * squeue_enter() - enter squeue sqp with mblk mp (which can be
309  * a chain), while tail points to the end and cnt in number of
310  * mblks in the chain.
311  *
312  * For a chain of single packet (i.e. mp == tail), go through the
313  * fast path if no one is processing the squeue and nothing is queued.
314  *
315  * The proc and arg for each mblk is already stored in the mblk in
316  * appropriate places.
317  *
318  * The process_flag specifies if we are allowed to process the mblk
319  * and drain in the entering thread context. If process_flag is
320  * SQ_FILL, then we just queue the mblk and return (after signaling
321  * the worker thread if no one else is processing the squeue).
322  *
323  * The ira argument can be used when the count is one.
324  * For a chain the caller needs to prepend any needed mblks from
325  * ip_recv_attr_to_mblk().
326  */
327 /* ARGSUSED */
328 void
329 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
330     ip_recv_attr_t *ira, int process_flag, uint8_t tag)
331 {
332 	conn_t		*connp;
333 	sqproc_t	proc;
334 	hrtime_t	now;
335 
336 	ASSERT(sqp != NULL);
337 	ASSERT(mp != NULL);
338 	ASSERT(tail != NULL);
339 	ASSERT(cnt > 0);
340 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
341 	ASSERT(ira == NULL || cnt == 1);
342 
343 	mutex_enter(&sqp->sq_lock);
344 
345 	/*
346 	 * Try to process the packet if SQ_FILL flag is not set and
347 	 * we are allowed to process the squeue. The SQ_NODRAIN is
348 	 * ignored if the packet chain consists of more than 1 packet.
349 	 */
350 	if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) ||
351 	    (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) {
352 		/*
353 		 * See if anything is already queued. If we are the
354 		 * first packet, do inline processing else queue the
355 		 * packet and do the drain.
356 		 */
357 		if (sqp->sq_first == NULL && cnt == 1) {
358 			/*
359 			 * Fast-path, ok to process and nothing queued.
360 			 */
361 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
362 			sqp->sq_run = curthread;
363 			mutex_exit(&sqp->sq_lock);
364 
365 			/*
366 			 * We are the chain of 1 packet so
367 			 * go through this fast path.
368 			 */
369 			ASSERT(mp->b_prev != NULL);
370 			ASSERT(mp->b_queue != NULL);
371 			connp = (conn_t *)mp->b_prev;
372 			mp->b_prev = NULL;
373 			proc = (sqproc_t)mp->b_queue;
374 			mp->b_queue = NULL;
375 			ASSERT(proc != NULL && connp != NULL);
376 			ASSERT(mp->b_next == NULL);
377 
378 			/*
379 			 * Handle squeue switching. More details in the
380 			 * block comment at the top of the file
381 			 */
382 			if (connp->conn_sqp == sqp) {
383 				SQUEUE_DBG_SET(sqp, mp, proc, connp,
384 				    tag);
385 				connp->conn_on_sqp = B_TRUE;
386 				DTRACE_PROBE3(squeue__proc__start, squeue_t *,
387 				    sqp, mblk_t *, mp, conn_t *, connp);
388 				(*proc)(connp, mp, sqp, ira);
389 				DTRACE_PROBE2(squeue__proc__end, squeue_t *,
390 				    sqp, conn_t *, connp);
391 				connp->conn_on_sqp = B_FALSE;
392 				SQUEUE_DBG_CLEAR(sqp);
393 				CONN_DEC_REF(connp);
394 			} else {
395 				SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
396 				    connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
397 			}
398 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
399 			mutex_enter(&sqp->sq_lock);
400 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
401 			sqp->sq_run = NULL;
402 			if (sqp->sq_first == NULL ||
403 			    process_flag == SQ_NODRAIN) {
404 				/*
405 				 * If work or control actions are pending, wake
406 				 * up the worker thread.
407 				 */
408 				if (sqp->sq_first != NULL ||
409 				    sqp->sq_state & SQS_WORKER_THR_CONTROL) {
410 					squeue_worker_wakeup(sqp);
411 				}
412 				mutex_exit(&sqp->sq_lock);
413 				return;
414 			}
415 		} else {
416 			if (ira != NULL) {
417 				mblk_t	*attrmp;
418 
419 				ASSERT(cnt == 1);
420 				attrmp = ip_recv_attr_to_mblk(ira);
421 				if (attrmp == NULL) {
422 					mutex_exit(&sqp->sq_lock);
423 					ip_drop_input("squeue: "
424 					    "ip_recv_attr_to_mblk",
425 					    mp, NULL);
426 					/* Caller already set b_prev/b_next */
427 					mp->b_prev = mp->b_next = NULL;
428 					freemsg(mp);
429 					return;
430 				}
431 				ASSERT(attrmp->b_cont == NULL);
432 				attrmp->b_cont = mp;
433 				/* Move connp and func to new */
434 				attrmp->b_queue = mp->b_queue;
435 				mp->b_queue = NULL;
436 				attrmp->b_prev = mp->b_prev;
437 				mp->b_prev = NULL;
438 
439 				ASSERT(mp == tail);
440 				tail = mp = attrmp;
441 			}
442 
443 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
444 #ifdef DEBUG
445 			mp->b_tag = tag;
446 #endif
447 		}
448 		/*
449 		 * We are here because either we couldn't do inline
450 		 * processing (because something was already queued),
451 		 * or we had a chain of more than one packet,
452 		 * or something else arrived after we were done with
453 		 * inline processing.
454 		 */
455 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
456 		ASSERT(sqp->sq_first != NULL);
457 		now = gethrtime();
458 		sqp->sq_run = curthread;
459 		sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns);
460 
461 		/*
462 		 * If we didn't do a complete drain, the worker
463 		 * thread was already signalled by squeue_drain.
464 		 * In case any control actions are pending, wake
465 		 * up the worker.
466 		 */
467 		sqp->sq_run = NULL;
468 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
469 			squeue_worker_wakeup(sqp);
470 		}
471 	} else {
472 		/*
473 		 * We let a thread processing a squeue reenter only
474 		 * once. This helps the case of incoming connection
475 		 * where a SYN-ACK-ACK that triggers the conn_ind
476 		 * doesn't have to queue the packet if listener and
477 		 * eager are on the same squeue. Also helps the
478 		 * loopback connection where the two ends are bound
479 		 * to the same squeue (which is typical on single
480 		 * CPU machines).
481 		 *
482 		 * We let the thread reenter only once for the fear
483 		 * of stack getting blown with multiple traversal.
484 		 */
485 		connp = (conn_t *)mp->b_prev;
486 		if (!(sqp->sq_state & SQS_REENTER) &&
487 		    (process_flag != SQ_FILL) && (sqp->sq_first == NULL) &&
488 		    (sqp->sq_run == curthread) && (cnt == 1) &&
489 		    (connp->conn_on_sqp == B_FALSE)) {
490 			sqp->sq_state |= SQS_REENTER;
491 			mutex_exit(&sqp->sq_lock);
492 
493 			ASSERT(mp->b_prev != NULL);
494 			ASSERT(mp->b_queue != NULL);
495 
496 			mp->b_prev = NULL;
497 			proc = (sqproc_t)mp->b_queue;
498 			mp->b_queue = NULL;
499 
500 			/*
501 			 * Handle squeue switching. More details in the
502 			 * block comment at the top of the file
503 			 */
504 			if (connp->conn_sqp == sqp) {
505 				connp->conn_on_sqp = B_TRUE;
506 				DTRACE_PROBE3(squeue__proc__start, squeue_t *,
507 				    sqp, mblk_t *, mp, conn_t *, connp);
508 				(*proc)(connp, mp, sqp, ira);
509 				DTRACE_PROBE2(squeue__proc__end, squeue_t *,
510 				    sqp, conn_t *, connp);
511 				connp->conn_on_sqp = B_FALSE;
512 				CONN_DEC_REF(connp);
513 			} else {
514 				SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
515 				    connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
516 			}
517 
518 			mutex_enter(&sqp->sq_lock);
519 			sqp->sq_state &= ~SQS_REENTER;
520 			mutex_exit(&sqp->sq_lock);
521 			return;
522 		}
523 
524 		/*
525 		 * Queue is already being processed or there is already
526 		 * one or more paquets on the queue. Enqueue the
527 		 * packet and wakeup the squeue worker thread if the
528 		 * squeue is not being processed.
529 		 */
530 #ifdef DEBUG
531 		mp->b_tag = tag;
532 #endif
533 		if (ira != NULL) {
534 			mblk_t	*attrmp;
535 
536 			ASSERT(cnt == 1);
537 			attrmp = ip_recv_attr_to_mblk(ira);
538 			if (attrmp == NULL) {
539 				mutex_exit(&sqp->sq_lock);
540 				ip_drop_input("squeue: ip_recv_attr_to_mblk",
541 				    mp, NULL);
542 				/* Caller already set b_prev/b_next */
543 				mp->b_prev = mp->b_next = NULL;
544 				freemsg(mp);
545 				return;
546 			}
547 			ASSERT(attrmp->b_cont == NULL);
548 			attrmp->b_cont = mp;
549 			/* Move connp and func to new */
550 			attrmp->b_queue = mp->b_queue;
551 			mp->b_queue = NULL;
552 			attrmp->b_prev = mp->b_prev;
553 			mp->b_prev = NULL;
554 
555 			ASSERT(mp == tail);
556 			tail = mp = attrmp;
557 		}
558 		ENQUEUE_CHAIN(sqp, mp, tail, cnt);
559 		/*
560 		 * If the worker isn't running or control actions are pending,
561 		 * wake it it up now.
562 		 */
563 		if ((sqp->sq_state & SQS_PROC) == 0 ||
564 		    (sqp->sq_state & SQS_WORKER_THR_CONTROL) != 0) {
565 			squeue_worker_wakeup(sqp);
566 		}
567 	}
568 	mutex_exit(&sqp->sq_lock);
569 }
570 
571 /*
572  * PRIVATE FUNCTIONS
573  */
574 
575 
576 /*
577  * Wake up worker thread for squeue to process queued work.
578  */
579 static void
580 squeue_worker_wakeup(squeue_t *sqp)
581 {
582 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
583 
584 	cv_signal(&sqp->sq_worker_cv);
585 	sqp->sq_awoken = gethrtime();
586 }
587 
588 static void
589 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
590 {
591 	mblk_t		*mp;
592 	mblk_t 		*head;
593 	sqproc_t 	proc;
594 	conn_t		*connp;
595 	ill_rx_ring_t	*sq_rx_ring = sqp->sq_rx_ring;
596 	hrtime_t 	now;
597 	boolean_t	sq_poll_capable;
598 	ip_recv_attr_t	*ira, iras;
599 
600 	/*
601 	 * Before doing any work, check our stack depth; if we're not a
602 	 * worker thread for this squeue and we're beginning to get tight on
603 	 * on stack, kick the worker, bump a counter and return.
604 	 */
605 	if (proc_type != SQS_WORKER && STACK_BIAS + (uintptr_t)getfp() -
606 	    (uintptr_t)curthread->t_stkbase < squeue_drain_stack_needed) {
607 		ASSERT(mutex_owned(&sqp->sq_lock));
608 		squeue_worker_wakeup(sqp);
609 		squeue_drain_stack_toodeep++;
610 		return;
611 	}
612 
613 	sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0;
614 again:
615 	ASSERT(mutex_owned(&sqp->sq_lock));
616 	ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
617 	    SQS_POLL_QUIESCE_DONE)));
618 
619 	head = sqp->sq_first;
620 	sqp->sq_first = NULL;
621 	sqp->sq_last = NULL;
622 	sqp->sq_count = 0;
623 
624 	sqp->sq_state |= SQS_PROC | proc_type;
625 
626 	/*
627 	 * We have backlog built up. Switch to polling mode if the
628 	 * device underneath allows it. Need to do it so that
629 	 * more packets don't come in and disturb us (by contending
630 	 * for sq_lock or higher priority thread preempting us).
631 	 *
632 	 * The worker thread is allowed to do active polling while we
633 	 * just disable the interrupts for drain by non worker (kernel
634 	 * or userland) threads so they can peacefully process the
635 	 * packets during time allocated to them.
636 	 */
637 	SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring);
638 	mutex_exit(&sqp->sq_lock);
639 
640 	while ((mp = head) != NULL) {
641 
642 		head = mp->b_next;
643 		mp->b_next = NULL;
644 
645 		proc = (sqproc_t)mp->b_queue;
646 		mp->b_queue = NULL;
647 		connp = (conn_t *)mp->b_prev;
648 		mp->b_prev = NULL;
649 
650 		/* Is there an ip_recv_attr_t to handle? */
651 		if (ip_recv_attr_is_mblk(mp)) {
652 			mblk_t	*attrmp = mp;
653 
654 			ASSERT(attrmp->b_cont != NULL);
655 
656 			mp = attrmp->b_cont;
657 			attrmp->b_cont = NULL;
658 			ASSERT(mp->b_queue == NULL);
659 			ASSERT(mp->b_prev == NULL);
660 
661 			if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
662 				/* The ill or ip_stack_t disappeared on us */
663 				ip_drop_input("ip_recv_attr_from_mblk",
664 				    mp, NULL);
665 				ira_cleanup(&iras, B_TRUE);
666 				CONN_DEC_REF(connp);
667 				continue;
668 			}
669 			ira = &iras;
670 		} else {
671 			ira = NULL;
672 		}
673 
674 
675 		/*
676 		 * Handle squeue switching. More details in the
677 		 * block comment at the top of the file
678 		 */
679 		if (connp->conn_sqp == sqp) {
680 			SQUEUE_DBG_SET(sqp, mp, proc, connp,
681 			    mp->b_tag);
682 			connp->conn_on_sqp = B_TRUE;
683 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
684 			    sqp, mblk_t *, mp, conn_t *, connp);
685 			(*proc)(connp, mp, sqp, ira);
686 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
687 			    sqp, conn_t *, connp);
688 			connp->conn_on_sqp = B_FALSE;
689 			CONN_DEC_REF(connp);
690 		} else {
691 			SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira,
692 			    SQ_FILL, SQTAG_SQUEUE_CHANGE);
693 		}
694 		if (ira != NULL)
695 			ira_cleanup(ira, B_TRUE);
696 	}
697 
698 	SQUEUE_DBG_CLEAR(sqp);
699 
700 	mutex_enter(&sqp->sq_lock);
701 
702 	/*
703 	 * Check if there is still work to do (either more arrived or timer
704 	 * expired). If we are the worker thread and we are polling capable,
705 	 * continue doing the work since no one else is around to do the
706 	 * work anyway (but signal the poll thread to retrieve some packets
707 	 * in the meanwhile). If we are not the worker thread, just
708 	 * signal the worker thread to take up the work if processing time
709 	 * has expired.
710 	 */
711 	if (sqp->sq_first != NULL) {
712 		/*
713 		 * Still more to process. If time quanta not expired, we
714 		 * should let the drain go on. The worker thread is allowed
715 		 * to drain as long as there is anything left.
716 		 */
717 		now = gethrtime();
718 		if ((now < expire) || (proc_type == SQS_WORKER)) {
719 			/*
720 			 * If time not expired or we are worker thread and
721 			 * this squeue is polling capable, continue to do
722 			 * the drain.
723 			 *
724 			 * We turn off interrupts for all userland threads
725 			 * doing drain but we do active polling only for
726 			 * worker thread.
727 			 *
728 			 * Calling SQS_POLL_RING() even in the case of
729 			 * SQS_POLLING_ON() not succeeding is ok as
730 			 * SQS_POLL_RING() will not wake up poll thread
731 			 * if SQS_POLLING bit is not set.
732 			 */
733 			if (proc_type == SQS_WORKER)
734 				SQS_POLL_RING(sqp);
735 			goto again;
736 		}
737 
738 		squeue_worker_wakeup(sqp);
739 	}
740 
741 	/*
742 	 * If the poll thread is already running, just return. The
743 	 * poll thread continues to hold the proc and will finish
744 	 * processing.
745 	 */
746 	if (sqp->sq_state & SQS_GET_PKTS) {
747 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
748 		    SQS_POLL_QUIESCE_DONE)));
749 		sqp->sq_state &= ~proc_type;
750 		return;
751 	}
752 
753 	/*
754 	 *
755 	 * If we are the worker thread and no work is left, send the poll
756 	 * thread down once more to see if something arrived. Otherwise,
757 	 * turn the interrupts back on and we are done.
758 	 */
759 	if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) {
760 		/*
761 		 * Do one last check to see if anything arrived
762 		 * in the NIC. We leave the SQS_PROC set to ensure
763 		 * that poll thread keeps the PROC and can decide
764 		 * if it needs to turn polling off or continue
765 		 * processing.
766 		 *
767 		 * If we drop the SQS_PROC here and poll thread comes
768 		 * up empty handed, it can not safely turn polling off
769 		 * since someone else could have acquired the PROC
770 		 * and started draining. The previously running poll
771 		 * thread and the current thread doing drain would end
772 		 * up in a race for turning polling on/off and more
773 		 * complex code would be required to deal with it.
774 		 *
775 		 * Its lot simpler for drain to hand the SQS_PROC to
776 		 * poll thread (if running) and let poll thread finish
777 		 * without worrying about racing with any other thread.
778 		 */
779 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
780 		    SQS_POLL_QUIESCE_DONE)));
781 		SQS_POLL_RING(sqp);
782 		sqp->sq_state &= ~proc_type;
783 	} else {
784 		/*
785 		 * The squeue is either not capable of polling or the
786 		 * attempt to blank (i.e., turn SQS_POLLING_ON()) was
787 		 * unsuccessful or poll thread already finished
788 		 * processing and didn't find anything. Since there
789 		 * is nothing queued and we already turn polling on
790 		 * (for all threads doing drain), we should turn
791 		 * polling off and relinquish the PROC.
792 		 */
793 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
794 		    SQS_POLL_QUIESCE_DONE)));
795 		SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring);
796 		sqp->sq_state &= ~(SQS_PROC | proc_type);
797 		/*
798 		 * If we are not the worker and there is a pending quiesce
799 		 * event, wake up the worker
800 		 */
801 		if ((proc_type != SQS_WORKER) &&
802 		    (sqp->sq_state & SQS_WORKER_THR_CONTROL)) {
803 			squeue_worker_wakeup(sqp);
804 		}
805 	}
806 }
807 
808 /*
809  * Quiesce, Restart, or Cleanup of the squeue poll thread.
810  *
811  * Quiesce and Restart: After an squeue poll thread has been quiesced, it does
812  * not attempt to poll the underlying soft ring any more. The quiesce is
813  * triggered by the mac layer when it wants to quiesce a soft ring. Typically
814  * control operations such as changing the fanout of a NIC or VNIC (dladm
815  * setlinkprop) need to quiesce data flow before changing the wiring.
816  * The operation is done by the mac layer, but it calls back into IP to
817  * quiesce the soft ring. After completing the operation (say increase or
818  * decrease of the fanout) the mac layer then calls back into IP to restart
819  * the quiesced soft ring.
820  *
821  * Cleanup: This is triggered when the squeue binding to a soft ring is
822  * removed permanently. Typically interface plumb and unplumb would trigger
823  * this. It can also be triggered from the mac layer when a soft ring is
824  * being deleted say as the result of a fanout reduction. Since squeues are
825  * never deleted, the cleanup marks the squeue as fit for recycling and
826  * moves it to the zeroth squeue set.
827  */
828 static void
829 squeue_poll_thr_control(squeue_t *sqp)
830 {
831 	if (sqp->sq_state & SQS_POLL_THR_RESTART) {
832 		/* Restart implies a previous quiesce */
833 		ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED);
834 		sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED |
835 		    SQS_POLL_THR_RESTART);
836 		sqp->sq_state |= SQS_POLL_CAPAB;
837 		cv_signal(&sqp->sq_worker_cv);
838 		return;
839 	}
840 
841 	if (sqp->sq_state & SQS_POLL_THR_QUIESCE) {
842 		sqp->sq_state |= SQS_POLL_THR_QUIESCED;
843 		sqp->sq_state &= ~SQS_POLL_THR_QUIESCE;
844 		cv_signal(&sqp->sq_worker_cv);
845 		return;
846 	}
847 }
848 
849 /*
850  * POLLING Notes
851  *
852  * With polling mode, we want to do as much processing as we possibly can
853  * in worker thread context. The sweet spot is worker thread keeps doing
854  * work all the time in polling mode and writers etc. keep dumping packets
855  * to worker thread. Occassionally, we send the poll thread (running at
856  * lower priority to NIC to get the chain of packets to feed to worker).
857  * Sending the poll thread down to NIC is dependant on 3 criterions
858  *
859  * 1) Its always driven from squeue_drain and only if worker thread is
860  *	doing the drain.
861  * 2) We clear the backlog once and more packets arrived in between.
862  *	Before starting drain again, send the poll thread down if
863  *	the drain is being done by worker thread.
864  * 3) Before exiting the squeue_drain, if the poll thread is not already
865  *	working and we are the worker thread, try to poll one more time.
866  *
867  * For latency sake, we do allow any thread calling squeue_enter
868  * to process its packet provided:
869  *
870  * 1) Nothing is queued
871  * 2) If more packets arrived in between, the non worker thread are allowed
872  *	to do the drain till their time quanta expired provided SQS_GET_PKTS
873  *	wasn't set in between.
874  *
875  * Avoiding deadlocks with interrupts
876  * ==================================
877  *
878  * One of the big problem is that we can't send poll_thr down while holding
879  * the sq_lock since the thread can block. So we drop the sq_lock before
880  * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the
881  * poll thread is running so that no other thread can acquire the
882  * perimeter in between. If the squeue_drain gets done (no more work
883  * left), it leaves the SQS_PROC set if poll thread is running.
884  */
885 
886 /*
887  * This is the squeue poll thread. In poll mode, it polls the underlying
888  * TCP softring and feeds packets into the squeue. The worker thread then
889  * drains the squeue. The poll thread also responds to control signals for
890  * quiesceing, restarting, or cleanup of an squeue. These are driven by
891  * control operations like plumb/unplumb or as a result of dynamic Rx ring
892  * related operations that are driven from the mac layer.
893  */
894 static void
895 squeue_polling_thread(squeue_t *sqp)
896 {
897 	kmutex_t *lock = &sqp->sq_lock;
898 	kcondvar_t *async = &sqp->sq_poll_cv;
899 	ip_mac_rx_t sq_get_pkts;
900 	ip_accept_t ip_accept;
901 	ill_rx_ring_t *sq_rx_ring;
902 	ill_t *sq_ill;
903 	mblk_t *head, *tail, *mp;
904 	uint_t cnt;
905 	void *sq_mac_handle;
906 	callb_cpr_t cprinfo;
907 	size_t bytes_to_pickup;
908 	uint32_t ctl_state;
909 
910 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll");
911 	mutex_enter(lock);
912 
913 	for (;;) {
914 		CALLB_CPR_SAFE_BEGIN(&cprinfo);
915 		cv_wait(async, lock);
916 		CALLB_CPR_SAFE_END(&cprinfo, lock);
917 
918 		ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL |
919 		    SQS_POLL_THR_QUIESCED);
920 		if (ctl_state != 0) {
921 			/*
922 			 * If the squeue is quiesced, then wait for a control
923 			 * request. A quiesced squeue must not poll the
924 			 * underlying soft ring.
925 			 */
926 			if (ctl_state == SQS_POLL_THR_QUIESCED)
927 				continue;
928 			/*
929 			 * Act on control requests to quiesce, cleanup or
930 			 * restart an squeue
931 			 */
932 			squeue_poll_thr_control(sqp);
933 			continue;
934 		}
935 
936 		if (!(sqp->sq_state & SQS_POLL_CAPAB))
937 			continue;
938 
939 		ASSERT((sqp->sq_state &
940 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
941 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
942 
943 poll_again:
944 		sq_rx_ring = sqp->sq_rx_ring;
945 		sq_get_pkts = sq_rx_ring->rr_rx;
946 		sq_mac_handle = sq_rx_ring->rr_rx_handle;
947 		ip_accept = sq_rx_ring->rr_ip_accept;
948 		sq_ill = sq_rx_ring->rr_ill;
949 		bytes_to_pickup = MAX_BYTES_TO_PICKUP;
950 		mutex_exit(lock);
951 		head = sq_get_pkts(sq_mac_handle, bytes_to_pickup);
952 		mp = NULL;
953 		if (head != NULL) {
954 			/*
955 			 * We got the packet chain from the mac layer. It
956 			 * would be nice to be able to process it inline
957 			 * for better performance but we need to give
958 			 * IP a chance to look at this chain to ensure
959 			 * that packets are really meant for this squeue
960 			 * and do the IP processing.
961 			 */
962 			mp = ip_accept(sq_ill, sq_rx_ring, sqp, head,
963 			    &tail, &cnt);
964 		}
965 		mutex_enter(lock);
966 		if (mp != NULL) {
967 			/*
968 			 * The ip_accept function has already added an
969 			 * ip_recv_attr_t mblk if that is needed.
970 			 */
971 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
972 		}
973 		ASSERT((sqp->sq_state &
974 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
975 		    (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
976 
977 		if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) {
978 			/*
979 			 * We have packets to process and worker thread
980 			 * is not running.  Check to see if poll thread is
981 			 * allowed to process. Let it do processing only if it
982 			 * picked up some packets from the NIC otherwise
983 			 * wakeup the worker thread.
984 			 */
985 			if (mp != NULL) {
986 				hrtime_t  now;
987 
988 				now = gethrtime();
989 				sqp->sq_run = curthread;
990 				sqp->sq_drain(sqp, SQS_POLL_PROC, now +
991 				    squeue_drain_ns);
992 				sqp->sq_run = NULL;
993 
994 				if (sqp->sq_first == NULL)
995 					goto poll_again;
996 
997 				/*
998 				 * Couldn't do the entire drain because the
999 				 * time limit expired, let the
1000 				 * worker thread take over.
1001 				 */
1002 			}
1003 
1004 			/*
1005 			 * Put the SQS_PROC_HELD on so the worker
1006 			 * thread can distinguish where its called from. We
1007 			 * can remove the SQS_PROC flag here and turn off the
1008 			 * polling so that it wouldn't matter who gets the
1009 			 * processing but we get better performance this way
1010 			 * and save the cost of turn polling off and possibly
1011 			 * on again as soon as we start draining again.
1012 			 *
1013 			 * We can't remove the SQS_PROC flag without turning
1014 			 * polling off until we can guarantee that control
1015 			 * will return to squeue_drain immediately.
1016 			 */
1017 			sqp->sq_state |= SQS_PROC_HELD;
1018 			sqp->sq_state &= ~SQS_GET_PKTS;
1019 			squeue_worker_wakeup(sqp);
1020 		} else if (sqp->sq_first == NULL &&
1021 		    !(sqp->sq_state & SQS_WORKER)) {
1022 			/*
1023 			 * Nothing queued and worker thread not running.
1024 			 * Since we hold the proc, no other thread is
1025 			 * processing the squeue. This means that there
1026 			 * is no work to be done and nothing is queued
1027 			 * in squeue or in NIC. Turn polling off and go
1028 			 * back to interrupt mode.
1029 			 */
1030 			sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS);
1031 			/* LINTED: constant in conditional context */
1032 			SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring);
1033 
1034 			/*
1035 			 * If there is a pending control operation
1036 			 * wake up the worker, since it is currently
1037 			 * not running.
1038 			 */
1039 			if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1040 				squeue_worker_wakeup(sqp);
1041 			}
1042 		} else {
1043 			/*
1044 			 * Worker thread is already running. We don't need
1045 			 * to do anything. Indicate that poll thread is done.
1046 			 */
1047 			sqp->sq_state &= ~SQS_GET_PKTS;
1048 		}
1049 		if (sqp->sq_state & SQS_POLL_THR_CONTROL) {
1050 			/*
1051 			 * Act on control requests to quiesce, cleanup or
1052 			 * restart an squeue
1053 			 */
1054 			squeue_poll_thr_control(sqp);
1055 		}
1056 	}
1057 }
1058 
1059 /*
1060  * The squeue worker thread acts on any control requests to quiesce, cleanup
1061  * or restart an ill_rx_ring_t by calling this function. The worker thread
1062  * synchronizes with the squeue poll thread to complete the request and finally
1063  * wakes up the requestor when the request is completed.
1064  */
1065 static void
1066 squeue_worker_thr_control(squeue_t *sqp)
1067 {
1068 	ill_t	*ill;
1069 	ill_rx_ring_t	*rx_ring;
1070 
1071 	ASSERT(MUTEX_HELD(&sqp->sq_lock));
1072 
1073 	if (sqp->sq_state & SQS_POLL_RESTART) {
1074 		/* Restart implies a previous quiesce. */
1075 		ASSERT((sqp->sq_state & (SQS_PROC_HELD |
1076 		    SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) ==
1077 		    (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER));
1078 		/*
1079 		 * Request the squeue poll thread to restart and wait till
1080 		 * it actually restarts.
1081 		 */
1082 		sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE;
1083 		sqp->sq_state |= SQS_POLL_THR_RESTART;
1084 		cv_signal(&sqp->sq_poll_cv);
1085 		while (sqp->sq_state & SQS_POLL_THR_QUIESCED)
1086 			cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1087 		sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC |
1088 		    SQS_WORKER);
1089 		/*
1090 		 * Signal any waiter that is waiting for the restart
1091 		 * to complete
1092 		 */
1093 		sqp->sq_state |= SQS_POLL_RESTART_DONE;
1094 		cv_signal(&sqp->sq_ctrlop_done_cv);
1095 		return;
1096 	}
1097 
1098 	if (sqp->sq_state & SQS_PROC_HELD) {
1099 		/* The squeue poll thread handed control to us */
1100 		ASSERT(sqp->sq_state & SQS_PROC);
1101 	}
1102 
1103 	/*
1104 	 * Prevent any other thread from processing the squeue
1105 	 * until we finish the control actions by setting SQS_PROC.
1106 	 * But allow ourself to reenter by setting SQS_WORKER
1107 	 */
1108 	sqp->sq_state |= (SQS_PROC | SQS_WORKER);
1109 
1110 	/* Signal the squeue poll thread and wait for it to quiesce itself */
1111 	if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) {
1112 		sqp->sq_state |= SQS_POLL_THR_QUIESCE;
1113 		cv_signal(&sqp->sq_poll_cv);
1114 		while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED))
1115 			cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1116 	}
1117 
1118 	rx_ring = sqp->sq_rx_ring;
1119 	ill = rx_ring->rr_ill;
1120 	/*
1121 	 * The lock hierarchy is as follows.
1122 	 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock
1123 	 */
1124 	mutex_exit(&sqp->sq_lock);
1125 	mutex_enter(&ill->ill_lock);
1126 	mutex_enter(&sqp->sq_lock);
1127 
1128 	SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0,
1129 	    sqp->sq_rx_ring);
1130 	sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD);
1131 	if (sqp->sq_state & SQS_POLL_CLEANUP) {
1132 		/*
1133 		 * Disassociate this squeue from its ill_rx_ring_t.
1134 		 * The rr_sqp, sq_rx_ring fields are protected by the
1135 		 * corresponding squeue, ill_lock* and sq_lock. Holding any
1136 		 * of them will ensure that the ring to squeue mapping does
1137 		 * not change.
1138 		 */
1139 		ASSERT(!(sqp->sq_state & SQS_DEFAULT));
1140 
1141 		sqp->sq_rx_ring = NULL;
1142 		rx_ring->rr_sqp = NULL;
1143 
1144 		sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED |
1145 		    SQS_POLL_QUIESCE_DONE);
1146 		sqp->sq_ill = NULL;
1147 
1148 		rx_ring->rr_rx_handle = NULL;
1149 		rx_ring->rr_intr_handle = NULL;
1150 		rx_ring->rr_intr_enable = NULL;
1151 		rx_ring->rr_intr_disable = NULL;
1152 		sqp->sq_state |= SQS_POLL_CLEANUP_DONE;
1153 	} else {
1154 		sqp->sq_state &= ~SQS_POLL_QUIESCE;
1155 		sqp->sq_state |= SQS_POLL_QUIESCE_DONE;
1156 	}
1157 	/*
1158 	 * Signal any waiter that is waiting for the quiesce or cleanup
1159 	 * to complete and also wait for it to actually see and reset the
1160 	 * SQS_POLL_CLEANUP_DONE.
1161 	 */
1162 	cv_signal(&sqp->sq_ctrlop_done_cv);
1163 	mutex_exit(&ill->ill_lock);
1164 	if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) {
1165 		cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1166 		sqp->sq_state &= ~(SQS_PROC | SQS_WORKER);
1167 	}
1168 }
1169 
1170 static void
1171 squeue_worker(squeue_t *sqp)
1172 {
1173 	kmutex_t *lock = &sqp->sq_lock;
1174 	kcondvar_t *async = &sqp->sq_worker_cv;
1175 	callb_cpr_t cprinfo;
1176 	hrtime_t now;
1177 
1178 	CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker");
1179 	mutex_enter(lock);
1180 
1181 	for (;;) {
1182 		for (;;) {
1183 			/*
1184 			 * If the poll thread has handed control to us
1185 			 * we need to break out of the wait.
1186 			 */
1187 			if (sqp->sq_state & SQS_PROC_HELD)
1188 				break;
1189 
1190 			/*
1191 			 * If the squeue is not being processed and we either
1192 			 * have messages to drain or some thread has signaled
1193 			 * some control activity we need to break
1194 			 */
1195 			if (!(sqp->sq_state & SQS_PROC) &&
1196 			    ((sqp->sq_state & SQS_WORKER_THR_CONTROL) ||
1197 			    (sqp->sq_first != NULL)))
1198 				break;
1199 
1200 			/*
1201 			 * If we have started some control action, then check
1202 			 * for the SQS_WORKER flag (since we don't
1203 			 * release the squeue) to make sure we own the squeue
1204 			 * and break out
1205 			 */
1206 			if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) &&
1207 			    (sqp->sq_state & SQS_WORKER))
1208 				break;
1209 
1210 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
1211 			cv_wait(async, lock);
1212 			CALLB_CPR_SAFE_END(&cprinfo, lock);
1213 		}
1214 		if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1215 			squeue_worker_thr_control(sqp);
1216 			continue;
1217 		}
1218 		ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
1219 		    SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE |
1220 		    SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL)));
1221 
1222 		if (sqp->sq_state & SQS_PROC_HELD)
1223 			sqp->sq_state &= ~SQS_PROC_HELD;
1224 
1225 		now = gethrtime();
1226 		sqp->sq_run = curthread;
1227 		sqp->sq_drain(sqp, SQS_WORKER, now +  squeue_drain_ns);
1228 		sqp->sq_run = NULL;
1229 	}
1230 }
1231 
1232 uintptr_t *
1233 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1234 {
1235 	ASSERT(p < SQPRIVATE_MAX);
1236 
1237 	return (&sqp->sq_private[p]);
1238 }
1239 
1240 /* ARGSUSED */
1241 void
1242 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy)
1243 {
1244 	conn_t *connp = (conn_t *)arg;
1245 	squeue_t *sqp = connp->conn_sqp;
1246 
1247 	/*
1248 	 * Mark the squeue as paused before waking up the thread stuck
1249 	 * in squeue_synch_enter().
1250 	 */
1251 	mutex_enter(&sqp->sq_lock);
1252 	sqp->sq_state |= SQS_PAUSE;
1253 
1254 	/*
1255 	 * Notify the thread that it's OK to proceed; that is done by
1256 	 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk.
1257 	 */
1258 	ASSERT(mp->b_flag & MSGWAITSYNC);
1259 	mp->b_flag &= ~MSGWAITSYNC;
1260 	cv_broadcast(&connp->conn_sq_cv);
1261 
1262 	/*
1263 	 * We are doing something on behalf of another thread, so we have to
1264 	 * pause and wait until it finishes.
1265 	 */
1266 	while (sqp->sq_state & SQS_PAUSE) {
1267 		cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock);
1268 	}
1269 	mutex_exit(&sqp->sq_lock);
1270 }
1271 
1272 int
1273 squeue_synch_enter(conn_t *connp, mblk_t *use_mp)
1274 {
1275 	squeue_t *sqp;
1276 
1277 again:
1278 	sqp = connp->conn_sqp;
1279 
1280 	mutex_enter(&sqp->sq_lock);
1281 	if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) {
1282 		/*
1283 		 * We are OK to proceed if the squeue is empty, and
1284 		 * no one owns the squeue.
1285 		 *
1286 		 * The caller won't own the squeue as this is called from the
1287 		 * application.
1288 		 */
1289 		ASSERT(sqp->sq_run == NULL);
1290 
1291 		sqp->sq_state |= SQS_PROC;
1292 		sqp->sq_run = curthread;
1293 		mutex_exit(&sqp->sq_lock);
1294 
1295 		/*
1296 		 * Handle squeue switching. The conn's squeue can only change
1297 		 * while there is a thread in the squeue, which is why we do
1298 		 * the check after entering the squeue. If it has changed, exit
1299 		 * this squeue and redo everything with the new sqeueue.
1300 		 */
1301 		if (sqp != connp->conn_sqp) {
1302 			mutex_enter(&sqp->sq_lock);
1303 			sqp->sq_state &= ~SQS_PROC;
1304 			sqp->sq_run = NULL;
1305 			mutex_exit(&sqp->sq_lock);
1306 			goto again;
1307 		}
1308 #if SQUEUE_DEBUG
1309 		sqp->sq_curmp = NULL;
1310 		sqp->sq_curproc = NULL;
1311 		sqp->sq_connp = connp;
1312 #endif
1313 		connp->conn_on_sqp = B_TRUE;
1314 		return (0);
1315 	} else {
1316 		mblk_t  *mp;
1317 
1318 		mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp;
1319 		if (mp == NULL) {
1320 			mutex_exit(&sqp->sq_lock);
1321 			return (ENOMEM);
1322 		}
1323 
1324 		/*
1325 		 * We mark the mblk as awaiting synchronous squeue access
1326 		 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn
1327 		 * fires, MSGWAITSYNC is cleared, at which point we know we
1328 		 * have exclusive access.
1329 		 */
1330 		mp->b_flag |= MSGWAITSYNC;
1331 
1332 		CONN_INC_REF(connp);
1333 		SET_SQUEUE(mp, squeue_wakeup_conn, connp);
1334 		ENQUEUE_CHAIN(sqp, mp, mp, 1);
1335 
1336 		ASSERT(sqp->sq_run != curthread);
1337 
1338 		/* Wait until the enqueued mblk get processed. */
1339 		while (mp->b_flag & MSGWAITSYNC)
1340 			cv_wait(&connp->conn_sq_cv, &sqp->sq_lock);
1341 		mutex_exit(&sqp->sq_lock);
1342 
1343 		if (use_mp == NULL)
1344 			freeb(mp);
1345 
1346 		return (0);
1347 	}
1348 }
1349 
1350 void
1351 squeue_synch_exit(conn_t *connp)
1352 {
1353 	squeue_t *sqp = connp->conn_sqp;
1354 
1355 	mutex_enter(&sqp->sq_lock);
1356 	if (sqp->sq_run == curthread) {
1357 		ASSERT(sqp->sq_state & SQS_PROC);
1358 
1359 		sqp->sq_state &= ~SQS_PROC;
1360 		sqp->sq_run = NULL;
1361 		connp->conn_on_sqp = B_FALSE;
1362 
1363 		if (sqp->sq_first != NULL) {
1364 			/*
1365 			 * If this was a normal thread, then it would
1366 			 * (most likely) continue processing the pending
1367 			 * requests. Since the just completed operation
1368 			 * was executed synchronously, the thread should
1369 			 * not be delayed. To compensate, wake up the
1370 			 * worker thread right away when there are outstanding
1371 			 * requests.
1372 			 */
1373 			squeue_worker_wakeup(sqp);
1374 		}
1375 	} else {
1376 		/*
1377 		 * The caller doesn't own the squeue, clear the SQS_PAUSE flag,
1378 		 * and wake up the squeue owner, such that owner can continue
1379 		 * processing.
1380 		 */
1381 		ASSERT(sqp->sq_state & SQS_PAUSE);
1382 		sqp->sq_state &= ~SQS_PAUSE;
1383 
1384 		/* There should be only one thread blocking on sq_synch_cv. */
1385 		cv_signal(&sqp->sq_synch_cv);
1386 	}
1387 	mutex_exit(&sqp->sq_lock);
1388 }
1389