xref: /illumos-gate/usr/src/uts/common/io/stream.c (revision f82c7503)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	All Rights Reserved	*/
23 
24 /*
25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  *
28  * Copyright 2021 Tintri by DDN, Inc. All rights reserved.
29  * Copyright 2022 Garrett D'Amore
30  */
31 
32 #include <sys/types.h>
33 #include <sys/param.h>
34 #include <sys/thread.h>
35 #include <sys/sysmacros.h>
36 #include <sys/stropts.h>
37 #include <sys/stream.h>
38 #include <sys/strsubr.h>
39 #include <sys/strsun.h>
40 #include <sys/conf.h>
41 #include <sys/debug.h>
42 #include <sys/cmn_err.h>
43 #include <sys/kmem.h>
44 #include <sys/atomic.h>
45 #include <sys/errno.h>
46 #include <sys/vtrace.h>
47 #include <sys/ftrace.h>
48 #include <sys/ontrap.h>
49 #include <sys/sdt.h>
50 #include <sys/strft.h>
51 
52 #ifdef DEBUG
53 #include <sys/kmem_impl.h>
54 #endif
55 
56 /*
57  * This file contains all the STREAMS utility routines that may
58  * be used by modules and drivers.
59  */
60 
61 /*
62  * STREAMS message allocator: principles of operation
63  *
64  * The streams message allocator consists of all the routines that
65  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
66  * dupb(), freeb() and freemsg().  What follows is a high-level view
67  * of how the allocator works.
68  *
69  * Every streams message consists of one or more mblks, a dblk, and data.
70  * All mblks for all types of messages come from a common mblk_cache.
71  * The dblk and data come in several flavors, depending on how the
72  * message is allocated:
73  *
74  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
75  *     fixed-size dblk/data caches. For message sizes that are multiples of
76  *     PAGESIZE, dblks are allocated separately from the buffer.
77  *     The associated buffer is allocated by the constructor using kmem_alloc().
78  *     For all other message sizes, dblk and its associated data is allocated
79  *     as a single contiguous chunk of memory.
80  *     Objects in these caches consist of a dblk plus its associated data.
81  *     allocb() determines the nearest-size cache by table lookup:
82  *     the dblk_cache[] array provides the mapping from size to dblk cache.
83  *
84  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
85  *     kmem_alloc()'ing a buffer for the data and supplying that
86  *     buffer to gesballoc(), described below.
87  *
88  * (3) The four flavors of [d]esballoc[a] are all implemented by a
89  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
90  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
91  *     db_lim and db_frtnp to describe the caller-supplied buffer.
92  *
93  * While there are several routines to allocate messages, there is only
94  * one routine to free messages: freeb().  freeb() simply invokes the
95  * dblk's free method, dbp->db_free(), which is set at allocation time.
96  *
97  * dupb() creates a new reference to a message by allocating a new mblk,
98  * incrementing the dblk reference count and setting the dblk's free
99  * method to dblk_decref().  The dblk's original free method is retained
100  * in db_lastfree.  dblk_decref() decrements the reference count on each
101  * freeb().  If this is not the last reference it just frees the mblk;
102  * if this *is* the last reference, it restores db_free to db_lastfree,
103  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
104  *
105  * The implementation makes aggressive use of kmem object caching for
106  * maximum performance.  This makes the code simple and compact, but
107  * also a bit abstruse in some places.  The invariants that constitute a
108  * message's constructed state, described below, are more subtle than usual.
109  *
110  * Every dblk has an "attached mblk" as part of its constructed state.
111  * The mblk is allocated by the dblk's constructor and remains attached
112  * until the message is either dup'ed or pulled up.  In the dupb() case
113  * the mblk association doesn't matter until the last free, at which time
114  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
115  * the mblk association because it swaps the leading mblks of two messages,
116  * so it is responsible for swapping their db_mblk pointers accordingly.
117  * From a constructed-state viewpoint it doesn't matter that a dblk's
118  * attached mblk can change while the message is allocated; all that
119  * matters is that the dblk has *some* attached mblk when it's freed.
120  *
121  * The sizes of the allocb() small-message caches are not magical.
122  * They represent a good trade-off between internal and external
123  * fragmentation for current workloads.  They should be reevaluated
124  * periodically, especially if allocations larger than DBLK_MAX_CACHE
125  * become common.  We use 64-byte alignment so that dblks don't
126  * straddle cache lines unnecessarily.
127  */
128 #define	DBLK_MAX_CACHE		73728
129 #define	DBLK_CACHE_ALIGN	64
130 #define	DBLK_MIN_SIZE		8
131 #define	DBLK_SIZE_SHIFT		3
132 
133 #ifdef _BIG_ENDIAN
134 #define	DBLK_RTFU_SHIFT(field)	\
135 	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
136 #else
137 #define	DBLK_RTFU_SHIFT(field)	\
138 	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
139 #endif
140 
141 #define	DBLK_RTFU(ref, type, flags, uioflag)	\
142 	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
143 	((type) << DBLK_RTFU_SHIFT(db_type)) | \
144 	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
145 	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
146 #define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
147 #define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
148 #define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))
149 
150 static size_t dblk_sizes[] = {
151 #ifdef _LP64
152 	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
153 	8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
154 	40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
155 #else
156 	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
157 	8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
158 	40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
159 #endif
160 	DBLK_MAX_CACHE, 0
161 };
162 
163 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
164 static struct kmem_cache *mblk_cache;
165 static struct kmem_cache *dblk_esb_cache;
166 static struct kmem_cache *fthdr_cache;
167 static struct kmem_cache *ftblk_cache;
168 
169 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
170 static mblk_t *allocb_oversize(size_t size, int flags);
171 static int allocb_tryhard_fails;
172 static void frnop_func(void *arg);
173 frtn_t frnop = { frnop_func };
174 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
175 
176 static boolean_t rwnext_enter(queue_t *qp);
177 static void rwnext_exit(queue_t *qp);
178 
179 /*
180  * Patchable mblk/dblk kmem_cache flags.
181  */
182 int dblk_kmem_flags = 0;
183 int mblk_kmem_flags = 0;
184 
185 static int
dblk_constructor(void * buf,void * cdrarg,int kmflags)186 dblk_constructor(void *buf, void *cdrarg, int kmflags)
187 {
188 	dblk_t *dbp = buf;
189 	ssize_t msg_size = (ssize_t)cdrarg;
190 	size_t index;
191 
192 	ASSERT(msg_size != 0);
193 
194 	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
195 
196 	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
197 
198 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
199 		return (-1);
200 	if ((msg_size & PAGEOFFSET) == 0) {
201 		dbp->db_base = kmem_alloc(msg_size, kmflags);
202 		if (dbp->db_base == NULL) {
203 			kmem_cache_free(mblk_cache, dbp->db_mblk);
204 			return (-1);
205 		}
206 	} else {
207 		dbp->db_base = (unsigned char *)&dbp[1];
208 	}
209 
210 	dbp->db_mblk->b_datap = dbp;
211 	dbp->db_cache = dblk_cache[index];
212 	dbp->db_lim = dbp->db_base + msg_size;
213 	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
214 	dbp->db_frtnp = NULL;
215 	dbp->db_fthdr = NULL;
216 	dbp->db_credp = NULL;
217 	dbp->db_cpid = -1;
218 	dbp->db_struioflag = 0;
219 	dbp->db_struioun.cksum.flags = 0;
220 	return (0);
221 }
222 
223 /*ARGSUSED*/
224 static int
dblk_esb_constructor(void * buf,void * cdrarg,int kmflags)225 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
226 {
227 	dblk_t *dbp = buf;
228 
229 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
230 		return (-1);
231 	dbp->db_mblk->b_datap = dbp;
232 	dbp->db_cache = dblk_esb_cache;
233 	dbp->db_fthdr = NULL;
234 	dbp->db_credp = NULL;
235 	dbp->db_cpid = -1;
236 	dbp->db_struioflag = 0;
237 	dbp->db_struioun.cksum.flags = 0;
238 	return (0);
239 }
240 
241 static int
bcache_dblk_constructor(void * buf,void * cdrarg,int kmflags)242 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
243 {
244 	dblk_t *dbp = buf;
245 	bcache_t *bcp = cdrarg;
246 
247 	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
248 		return (-1);
249 
250 	dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
251 	if (dbp->db_base == NULL) {
252 		kmem_cache_free(mblk_cache, dbp->db_mblk);
253 		return (-1);
254 	}
255 
256 	dbp->db_mblk->b_datap = dbp;
257 	dbp->db_cache = (void *)bcp;
258 	dbp->db_lim = dbp->db_base + bcp->size;
259 	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
260 	dbp->db_frtnp = NULL;
261 	dbp->db_fthdr = NULL;
262 	dbp->db_credp = NULL;
263 	dbp->db_cpid = -1;
264 	dbp->db_struioflag = 0;
265 	dbp->db_struioun.cksum.flags = 0;
266 	return (0);
267 }
268 
269 /*ARGSUSED*/
270 static void
dblk_destructor(void * buf,void * cdrarg)271 dblk_destructor(void *buf, void *cdrarg)
272 {
273 	dblk_t *dbp = buf;
274 	ssize_t msg_size = (ssize_t)cdrarg;
275 
276 	ASSERT(dbp->db_mblk->b_datap == dbp);
277 	ASSERT(msg_size != 0);
278 	ASSERT(dbp->db_struioflag == 0);
279 	ASSERT(dbp->db_struioun.cksum.flags == 0);
280 
281 	if ((msg_size & PAGEOFFSET) == 0) {
282 		kmem_free(dbp->db_base, msg_size);
283 	}
284 
285 	kmem_cache_free(mblk_cache, dbp->db_mblk);
286 }
287 
288 static void
bcache_dblk_destructor(void * buf,void * cdrarg)289 bcache_dblk_destructor(void *buf, void *cdrarg)
290 {
291 	dblk_t *dbp = buf;
292 	bcache_t *bcp = cdrarg;
293 
294 	kmem_cache_free(bcp->buffer_cache, dbp->db_base);
295 
296 	ASSERT(dbp->db_mblk->b_datap == dbp);
297 	ASSERT(dbp->db_struioflag == 0);
298 	ASSERT(dbp->db_struioun.cksum.flags == 0);
299 
300 	kmem_cache_free(mblk_cache, dbp->db_mblk);
301 }
302 
303 /* ARGSUSED */
304 static int
ftblk_constructor(void * buf,void * cdrarg,int kmflags)305 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
306 {
307 	ftblk_t *fbp = buf;
308 	int i;
309 
310 	bzero(fbp, sizeof (ftblk_t));
311 	if (str_ftstack != 0) {
312 		for (i = 0; i < FTBLK_EVNTS; i++)
313 			fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
314 	}
315 
316 	return (0);
317 }
318 
319 /* ARGSUSED */
320 static void
ftblk_destructor(void * buf,void * cdrarg)321 ftblk_destructor(void *buf, void *cdrarg)
322 {
323 	ftblk_t *fbp = buf;
324 	int i;
325 
326 	if (str_ftstack != 0) {
327 		for (i = 0; i < FTBLK_EVNTS; i++) {
328 			if (fbp->ev[i].stk != NULL) {
329 				kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
330 				fbp->ev[i].stk = NULL;
331 			}
332 		}
333 	}
334 }
335 
336 static int
fthdr_constructor(void * buf,void * cdrarg,int kmflags)337 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
338 {
339 	fthdr_t *fhp = buf;
340 
341 	return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
342 }
343 
344 static void
fthdr_destructor(void * buf,void * cdrarg)345 fthdr_destructor(void *buf, void *cdrarg)
346 {
347 	fthdr_t *fhp = buf;
348 
349 	ftblk_destructor(&fhp->first, cdrarg);
350 }
351 
352 void
streams_msg_init(void)353 streams_msg_init(void)
354 {
355 	char name[40];
356 	size_t size;
357 	size_t lastsize = DBLK_MIN_SIZE;
358 	size_t *sizep;
359 	struct kmem_cache *cp;
360 	size_t tot_size;
361 	int offset;
362 
363 	mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
364 	    NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
365 
366 	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
367 
368 		if ((offset = (size & PAGEOFFSET)) != 0) {
369 			/*
370 			 * We are in the middle of a page, dblk should
371 			 * be allocated on the same page
372 			 */
373 			tot_size = size + sizeof (dblk_t);
374 			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
375 			    < PAGESIZE);
376 			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
377 
378 		} else {
379 
380 			/*
381 			 * buf size is multiple of page size, dblk and
382 			 * buffer are allocated separately.
383 			 */
384 
385 			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
386 			tot_size = sizeof (dblk_t);
387 		}
388 
389 		(void) sprintf(name, "streams_dblk_%ld", size);
390 		cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
391 		    dblk_constructor, dblk_destructor, NULL, (void *)(size),
392 		    NULL, dblk_kmem_flags);
393 
394 		while (lastsize <= size) {
395 			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
396 			lastsize += DBLK_MIN_SIZE;
397 		}
398 	}
399 
400 	dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
401 	    DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
402 	    (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
403 	fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
404 	    fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
405 	ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
406 	    ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
407 
408 	/* initialize throttling queue for esballoc */
409 	esballoc_queue_init();
410 }
411 
412 /*ARGSUSED*/
413 mblk_t *
allocb(size_t size,uint_t pri)414 allocb(size_t size, uint_t pri)
415 {
416 	dblk_t *dbp;
417 	mblk_t *mp;
418 	size_t index;
419 
420 	index =  (size - 1)  >> DBLK_SIZE_SHIFT;
421 
422 	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
423 		if (size != 0) {
424 			mp = allocb_oversize(size, KM_NOSLEEP);
425 			goto out;
426 		}
427 		index = 0;
428 	}
429 
430 	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
431 		mp = NULL;
432 		goto out;
433 	}
434 
435 	mp = dbp->db_mblk;
436 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
437 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
438 	mp->b_rptr = mp->b_wptr = dbp->db_base;
439 	mp->b_queue = NULL;
440 	MBLK_BAND_FLAG_WORD(mp) = 0;
441 	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
442 out:
443 	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
444 
445 	return (mp);
446 }
447 
448 /*
449  * Allocate an mblk taking db_credp and db_cpid from the template.
450  * Allow the cred to be NULL.
451  */
452 mblk_t *
allocb_tmpl(size_t size,const mblk_t * tmpl)453 allocb_tmpl(size_t size, const mblk_t *tmpl)
454 {
455 	mblk_t *mp = allocb(size, 0);
456 
457 	if (mp != NULL) {
458 		dblk_t *src = tmpl->b_datap;
459 		dblk_t *dst = mp->b_datap;
460 		cred_t *cr;
461 		pid_t cpid;
462 
463 		cr = msg_getcred(tmpl, &cpid);
464 		if (cr != NULL)
465 			crhold(dst->db_credp = cr);
466 		dst->db_cpid = cpid;
467 		dst->db_type = src->db_type;
468 	}
469 	return (mp);
470 }
471 
472 mblk_t *
allocb_cred(size_t size,cred_t * cr,pid_t cpid)473 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
474 {
475 	mblk_t *mp = allocb(size, 0);
476 
477 	ASSERT(cr != NULL);
478 	if (mp != NULL) {
479 		dblk_t *dbp = mp->b_datap;
480 
481 		crhold(dbp->db_credp = cr);
482 		dbp->db_cpid = cpid;
483 	}
484 	return (mp);
485 }
486 
487 mblk_t *
allocb_cred_wait(size_t size,uint_t flags,int * error,cred_t * cr,pid_t cpid)488 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
489 {
490 	mblk_t *mp = allocb_wait(size, 0, flags, error);
491 
492 	ASSERT(cr != NULL);
493 	if (mp != NULL) {
494 		dblk_t *dbp = mp->b_datap;
495 
496 		crhold(dbp->db_credp = cr);
497 		dbp->db_cpid = cpid;
498 	}
499 
500 	return (mp);
501 }
502 
503 /*
504  * Extract the db_cred (and optionally db_cpid) from a message.
505  * We find the first mblk which has a non-NULL db_cred and use that.
506  * If none found we return NULL.
507  * Does NOT get a hold on the cred.
508  */
509 cred_t *
msg_getcred(const mblk_t * mp,pid_t * cpidp)510 msg_getcred(const mblk_t *mp, pid_t *cpidp)
511 {
512 	cred_t *cr = NULL;
513 	cred_t *cr2;
514 	mblk_t *mp2;
515 
516 	while (mp != NULL) {
517 		dblk_t *dbp = mp->b_datap;
518 
519 		cr = dbp->db_credp;
520 		if (cr == NULL) {
521 			mp = mp->b_cont;
522 			continue;
523 		}
524 		if (cpidp != NULL)
525 			*cpidp = dbp->db_cpid;
526 
527 #ifdef DEBUG
528 		/*
529 		 * Normally there should at most one db_credp in a message.
530 		 * But if there are multiple (as in the case of some M_IOC*
531 		 * and some internal messages in TCP/IP bind logic) then
532 		 * they must be identical in the normal case.
533 		 * However, a socket can be shared between different uids
534 		 * in which case data queued in TCP would be from different
535 		 * creds. Thus we can only assert for the zoneid being the
536 		 * same. Due to Multi-level Level Ports for TX, some
537 		 * cred_t can have a NULL cr_zone, and we skip the comparison
538 		 * in that case.
539 		 */
540 		mp2 = mp->b_cont;
541 		while (mp2 != NULL) {
542 			cr2 = DB_CRED(mp2);
543 			if (cr2 != NULL) {
544 				DTRACE_PROBE2(msg__getcred,
545 				    cred_t *, cr, cred_t *, cr2);
546 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
547 				    crgetzone(cr) == NULL ||
548 				    crgetzone(cr2) == NULL);
549 			}
550 			mp2 = mp2->b_cont;
551 		}
552 #endif
553 		return (cr);
554 	}
555 	if (cpidp != NULL)
556 		*cpidp = NOPID;
557 	return (NULL);
558 }
559 
560 /*
561  * Variant of msg_getcred which, when a cred is found
562  * 1. Returns with a hold on the cred
563  * 2. Clears the first cred in the mblk.
564  * This is more efficient to use than a msg_getcred() + crhold() when
565  * the message is freed after the cred has been extracted.
566  *
567  * The caller is responsible for ensuring that there is no other reference
568  * on the message since db_credp can not be cleared when there are other
569  * references.
570  */
571 cred_t *
msg_extractcred(mblk_t * mp,pid_t * cpidp)572 msg_extractcred(mblk_t *mp, pid_t *cpidp)
573 {
574 	cred_t *cr = NULL;
575 	cred_t *cr2;
576 	mblk_t *mp2;
577 
578 	while (mp != NULL) {
579 		dblk_t *dbp = mp->b_datap;
580 
581 		cr = dbp->db_credp;
582 		if (cr == NULL) {
583 			mp = mp->b_cont;
584 			continue;
585 		}
586 		ASSERT(dbp->db_ref == 1);
587 		dbp->db_credp = NULL;
588 		if (cpidp != NULL)
589 			*cpidp = dbp->db_cpid;
590 #ifdef DEBUG
591 		/*
592 		 * Normally there should at most one db_credp in a message.
593 		 * But if there are multiple (as in the case of some M_IOC*
594 		 * and some internal messages in TCP/IP bind logic) then
595 		 * they must be identical in the normal case.
596 		 * However, a socket can be shared between different uids
597 		 * in which case data queued in TCP would be from different
598 		 * creds. Thus we can only assert for the zoneid being the
599 		 * same. Due to Multi-level Level Ports for TX, some
600 		 * cred_t can have a NULL cr_zone, and we skip the comparison
601 		 * in that case.
602 		 */
603 		mp2 = mp->b_cont;
604 		while (mp2 != NULL) {
605 			cr2 = DB_CRED(mp2);
606 			if (cr2 != NULL) {
607 				DTRACE_PROBE2(msg__extractcred,
608 				    cred_t *, cr, cred_t *, cr2);
609 				ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
610 				    crgetzone(cr) == NULL ||
611 				    crgetzone(cr2) == NULL);
612 			}
613 			mp2 = mp2->b_cont;
614 		}
615 #endif
616 		return (cr);
617 	}
618 	return (NULL);
619 }
620 /*
621  * Get the label for a message. Uses the first mblk in the message
622  * which has a non-NULL db_credp.
623  * Returns NULL if there is no credp.
624  */
625 extern struct ts_label_s *
msg_getlabel(const mblk_t * mp)626 msg_getlabel(const mblk_t *mp)
627 {
628 	cred_t *cr = msg_getcred(mp, NULL);
629 
630 	if (cr == NULL)
631 		return (NULL);
632 
633 	return (crgetlabel(cr));
634 }
635 
636 void
freeb(mblk_t * mp)637 freeb(mblk_t *mp)
638 {
639 	dblk_t *dbp = mp->b_datap;
640 
641 	ASSERT(dbp->db_ref > 0);
642 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
643 	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
644 
645 	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
646 
647 	dbp->db_free(mp, dbp);
648 }
649 
650 void
freemsg(mblk_t * mp)651 freemsg(mblk_t *mp)
652 {
653 	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
654 	while (mp) {
655 		dblk_t *dbp = mp->b_datap;
656 		mblk_t *mp_cont = mp->b_cont;
657 
658 		ASSERT(dbp->db_ref > 0);
659 		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
660 
661 		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
662 
663 		dbp->db_free(mp, dbp);
664 		mp = mp_cont;
665 	}
666 }
667 
668 /*
669  * Reallocate a block for another use.  Try hard to use the old block.
670  * If the old data is wanted (copy), leave b_wptr at the end of the data,
671  * otherwise return b_wptr = b_rptr.
672  *
673  * This routine is private and unstable.
674  */
675 mblk_t	*
reallocb(mblk_t * mp,size_t size,uint_t copy)676 reallocb(mblk_t *mp, size_t size, uint_t copy)
677 {
678 	mblk_t		*mp1;
679 	unsigned char	*old_rptr;
680 	ptrdiff_t	cur_size;
681 
682 	if (mp == NULL)
683 		return (allocb(size, BPRI_HI));
684 
685 	cur_size = mp->b_wptr - mp->b_rptr;
686 	old_rptr = mp->b_rptr;
687 
688 	ASSERT(mp->b_datap->db_ref != 0);
689 
690 	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
691 		/*
692 		 * If the data is wanted and it will fit where it is, no
693 		 * work is required.
694 		 */
695 		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
696 			return (mp);
697 
698 		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
699 		mp1 = mp;
700 	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
701 		/* XXX other mp state could be copied too, db_flags ... ? */
702 		mp1->b_cont = mp->b_cont;
703 	} else {
704 		return (NULL);
705 	}
706 
707 	if (copy) {
708 		bcopy(old_rptr, mp1->b_rptr, cur_size);
709 		mp1->b_wptr = mp1->b_rptr + cur_size;
710 	}
711 
712 	if (mp != mp1)
713 		freeb(mp);
714 
715 	return (mp1);
716 }
717 
718 static void
dblk_lastfree(mblk_t * mp,dblk_t * dbp)719 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
720 {
721 	ASSERT(dbp->db_mblk == mp);
722 	if (dbp->db_fthdr != NULL)
723 		str_ftfree(dbp);
724 
725 	/* set credp and projid to be 'unspecified' before returning to cache */
726 	if (dbp->db_credp != NULL) {
727 		crfree(dbp->db_credp);
728 		dbp->db_credp = NULL;
729 	}
730 	dbp->db_cpid = -1;
731 
732 	/* Reset the struioflag and the checksum flag fields */
733 	dbp->db_struioflag = 0;
734 	dbp->db_struioun.cksum.flags = 0;
735 
736 	/* and the COOKED and/or UIOA flag(s) */
737 	dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
738 
739 	kmem_cache_free(dbp->db_cache, dbp);
740 }
741 
742 static void
dblk_decref(mblk_t * mp,dblk_t * dbp)743 dblk_decref(mblk_t *mp, dblk_t *dbp)
744 {
745 	if (dbp->db_ref != 1) {
746 		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
747 		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
748 		/*
749 		 * atomic_add_32_nv() just decremented db_ref, so we no longer
750 		 * have a reference to the dblk, which means another thread
751 		 * could free it.  Therefore we cannot examine the dblk to
752 		 * determine whether ours was the last reference.  Instead,
753 		 * we extract the new and minimum reference counts from rtfu.
754 		 * Note that all we're really saying is "if (ref != refmin)".
755 		 */
756 		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
757 		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
758 			kmem_cache_free(mblk_cache, mp);
759 			return;
760 		}
761 	}
762 	dbp->db_mblk = mp;
763 	dbp->db_free = dbp->db_lastfree;
764 	dbp->db_lastfree(mp, dbp);
765 }
766 
767 mblk_t *
dupb(mblk_t * mp)768 dupb(mblk_t *mp)
769 {
770 	dblk_t *dbp = mp->b_datap;
771 	mblk_t *new_mp;
772 	uint32_t oldrtfu, newrtfu;
773 
774 	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
775 		goto out;
776 
777 	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
778 	new_mp->b_rptr = mp->b_rptr;
779 	new_mp->b_wptr = mp->b_wptr;
780 	new_mp->b_datap = dbp;
781 	new_mp->b_queue = NULL;
782 	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
783 
784 	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
785 
786 	dbp->db_free = dblk_decref;
787 	do {
788 		ASSERT(dbp->db_ref > 0);
789 		oldrtfu = DBLK_RTFU_WORD(dbp);
790 		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
791 		/*
792 		 * If db_ref is maxed out we can't dup this message anymore.
793 		 */
794 		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
795 			kmem_cache_free(mblk_cache, new_mp);
796 			new_mp = NULL;
797 			goto out;
798 		}
799 	} while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
800 	    oldrtfu);
801 
802 out:
803 	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
804 	return (new_mp);
805 }
806 
807 static void
dblk_lastfree_desb(mblk_t * mp,dblk_t * dbp)808 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
809 {
810 	frtn_t *frp = dbp->db_frtnp;
811 
812 	ASSERT(dbp->db_mblk == mp);
813 	frp->free_func(frp->free_arg);
814 	if (dbp->db_fthdr != NULL)
815 		str_ftfree(dbp);
816 
817 	/* set credp and projid to be 'unspecified' before returning to cache */
818 	if (dbp->db_credp != NULL) {
819 		crfree(dbp->db_credp);
820 		dbp->db_credp = NULL;
821 	}
822 	dbp->db_cpid = -1;
823 	dbp->db_struioflag = 0;
824 	dbp->db_struioun.cksum.flags = 0;
825 
826 	kmem_cache_free(dbp->db_cache, dbp);
827 }
828 
829 /*ARGSUSED*/
830 static void
frnop_func(void * arg)831 frnop_func(void *arg)
832 {
833 }
834 
835 /*
836  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
837  *
838  * The variants with a 'd' prefix (desballoc, desballoca)
839  *	directly free the mblk when it loses its last ref,
840  *	where the other variants free asynchronously.
841  * The variants with an 'a' suffix (esballoca, desballoca)
842  *	add an extra ref, effectively letting the streams subsystem
843  *	know that the message data should not be modified.
844  *	(eg. see db_ref checks in reallocb and elsewhere)
845  *
846  * The method used by the 'a' suffix functions to keep the dblk
847  * db_ref > 1 is non-obvious.  The macro DBLK_RTFU(2,...) passed to
848  * gesballoc sets the initial db_ref = 2 and sets the DBLK_REFMIN
849  * bit in db_flags.  In dblk_decref() that flag essentially means
850  * the dblk has one extra ref, so the "last ref" is one, not zero.
851  */
852 static mblk_t *
gesballoc(unsigned char * base,size_t size,uint32_t db_rtfu,frtn_t * frp,void (* lastfree)(mblk_t *,dblk_t *),int kmflags)853 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
854     void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
855 {
856 	dblk_t *dbp;
857 	mblk_t *mp;
858 
859 	ASSERT(base != NULL && frp != NULL);
860 
861 	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
862 		mp = NULL;
863 		goto out;
864 	}
865 
866 	mp = dbp->db_mblk;
867 	dbp->db_base = base;
868 	dbp->db_lim = base + size;
869 	dbp->db_free = dbp->db_lastfree = lastfree;
870 	dbp->db_frtnp = frp;
871 	DBLK_RTFU_WORD(dbp) = db_rtfu;
872 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
873 	mp->b_rptr = mp->b_wptr = base;
874 	mp->b_queue = NULL;
875 	MBLK_BAND_FLAG_WORD(mp) = 0;
876 
877 out:
878 	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
879 	return (mp);
880 }
881 
882 /*ARGSUSED*/
883 mblk_t *
esballoc(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)884 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
885 {
886 	mblk_t *mp;
887 
888 	/*
889 	 * Note that this is structured to allow the common case (i.e.
890 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
891 	 * call optimization.
892 	 */
893 	if (!str_ftnever) {
894 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
895 		    frp, freebs_enqueue, KM_NOSLEEP);
896 
897 		if (mp != NULL)
898 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
899 		return (mp);
900 	}
901 
902 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
903 	    frp, freebs_enqueue, KM_NOSLEEP));
904 }
905 
906 /*
907  * Same as esballoc() but sleeps waiting for memory.
908  */
909 /*ARGSUSED*/
910 mblk_t *
esballoc_wait(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)911 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
912 {
913 	mblk_t *mp;
914 
915 	/*
916 	 * Note that this is structured to allow the common case (i.e.
917 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
918 	 * call optimization.
919 	 */
920 	if (!str_ftnever) {
921 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
922 		    frp, freebs_enqueue, KM_SLEEP);
923 
924 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
925 		return (mp);
926 	}
927 
928 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
929 	    frp, freebs_enqueue, KM_SLEEP));
930 }
931 
932 /*ARGSUSED*/
933 mblk_t *
desballoc(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)934 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
935 {
936 	mblk_t *mp;
937 
938 	/*
939 	 * Note that this is structured to allow the common case (i.e.
940 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
941 	 * call optimization.
942 	 */
943 	if (!str_ftnever) {
944 		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
945 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
946 
947 		if (mp != NULL)
948 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
949 		return (mp);
950 	}
951 
952 	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
953 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
954 }
955 
956 /*ARGSUSED*/
957 mblk_t *
esballoca(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)958 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
959 {
960 	mblk_t *mp;
961 
962 	/*
963 	 * Note that this is structured to allow the common case (i.e.
964 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
965 	 * call optimization.
966 	 */
967 	if (!str_ftnever) {
968 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
969 		    frp, freebs_enqueue, KM_NOSLEEP);
970 
971 		if (mp != NULL)
972 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
973 		return (mp);
974 	}
975 
976 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
977 	    frp, freebs_enqueue, KM_NOSLEEP));
978 }
979 
980 /*
981  * Same as esballoca() but sleeps waiting for memory.
982  */
983 mblk_t *
esballoca_wait(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)984 esballoca_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
985 {
986 	mblk_t *mp;
987 
988 	/*
989 	 * Note that this is structured to allow the common case (i.e.
990 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
991 	 * call optimization.
992 	 */
993 	if (!str_ftnever) {
994 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
995 		    frp, freebs_enqueue, KM_SLEEP);
996 
997 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
998 		return (mp);
999 	}
1000 
1001 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1002 	    frp, freebs_enqueue, KM_SLEEP));
1003 }
1004 
1005 /*ARGSUSED*/
1006 mblk_t *
desballoca(unsigned char * base,size_t size,uint_t pri,frtn_t * frp)1007 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
1008 {
1009 	mblk_t *mp;
1010 
1011 	/*
1012 	 * Note that this is structured to allow the common case (i.e.
1013 	 * STREAMS flowtracing disabled) to call gesballoc() with tail
1014 	 * call optimization.
1015 	 */
1016 	if (!str_ftnever) {
1017 		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1018 		    frp, dblk_lastfree_desb, KM_NOSLEEP);
1019 
1020 		if (mp != NULL)
1021 			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
1022 		return (mp);
1023 	}
1024 
1025 	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
1026 	    frp, dblk_lastfree_desb, KM_NOSLEEP));
1027 }
1028 
1029 static void
bcache_dblk_lastfree(mblk_t * mp,dblk_t * dbp)1030 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
1031 {
1032 	bcache_t *bcp = dbp->db_cache;
1033 
1034 	ASSERT(dbp->db_mblk == mp);
1035 	if (dbp->db_fthdr != NULL)
1036 		str_ftfree(dbp);
1037 
1038 	/* set credp and projid to be 'unspecified' before returning to cache */
1039 	if (dbp->db_credp != NULL) {
1040 		crfree(dbp->db_credp);
1041 		dbp->db_credp = NULL;
1042 	}
1043 	dbp->db_cpid = -1;
1044 	dbp->db_struioflag = 0;
1045 	dbp->db_struioun.cksum.flags = 0;
1046 
1047 	mutex_enter(&bcp->mutex);
1048 	kmem_cache_free(bcp->dblk_cache, dbp);
1049 	bcp->alloc--;
1050 
1051 	if (bcp->alloc == 0 && bcp->destroy != 0) {
1052 		kmem_cache_destroy(bcp->dblk_cache);
1053 		kmem_cache_destroy(bcp->buffer_cache);
1054 		mutex_exit(&bcp->mutex);
1055 		mutex_destroy(&bcp->mutex);
1056 		kmem_free(bcp, sizeof (bcache_t));
1057 	} else {
1058 		mutex_exit(&bcp->mutex);
1059 	}
1060 }
1061 
1062 bcache_t *
bcache_create(char * name,size_t size,uint_t align)1063 bcache_create(char *name, size_t size, uint_t align)
1064 {
1065 	bcache_t *bcp;
1066 	char buffer[255];
1067 
1068 	ASSERT((align & (align - 1)) == 0);
1069 
1070 	if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1071 		return (NULL);
1072 
1073 	bcp->size = size;
1074 	bcp->align = align;
1075 	bcp->alloc = 0;
1076 	bcp->destroy = 0;
1077 
1078 	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1079 
1080 	(void) sprintf(buffer, "%s_buffer_cache", name);
1081 	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1082 	    NULL, NULL, NULL, 0);
1083 	(void) sprintf(buffer, "%s_dblk_cache", name);
1084 	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1085 	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1086 	    NULL, (void *)bcp, NULL, 0);
1087 
1088 	return (bcp);
1089 }
1090 
1091 void
bcache_destroy(bcache_t * bcp)1092 bcache_destroy(bcache_t *bcp)
1093 {
1094 	ASSERT(bcp != NULL);
1095 
1096 	mutex_enter(&bcp->mutex);
1097 	if (bcp->alloc == 0) {
1098 		kmem_cache_destroy(bcp->dblk_cache);
1099 		kmem_cache_destroy(bcp->buffer_cache);
1100 		mutex_exit(&bcp->mutex);
1101 		mutex_destroy(&bcp->mutex);
1102 		kmem_free(bcp, sizeof (bcache_t));
1103 	} else {
1104 		bcp->destroy++;
1105 		mutex_exit(&bcp->mutex);
1106 	}
1107 }
1108 
1109 /*ARGSUSED*/
1110 mblk_t *
bcache_allocb(bcache_t * bcp,uint_t pri)1111 bcache_allocb(bcache_t *bcp, uint_t pri)
1112 {
1113 	dblk_t *dbp;
1114 	mblk_t *mp = NULL;
1115 
1116 	ASSERT(bcp != NULL);
1117 
1118 	mutex_enter(&bcp->mutex);
1119 	if (bcp->destroy != 0) {
1120 		mutex_exit(&bcp->mutex);
1121 		goto out;
1122 	}
1123 
1124 	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1125 		mutex_exit(&bcp->mutex);
1126 		goto out;
1127 	}
1128 	bcp->alloc++;
1129 	mutex_exit(&bcp->mutex);
1130 
1131 	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1132 
1133 	mp = dbp->db_mblk;
1134 	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1135 	mp->b_next = mp->b_prev = mp->b_cont = NULL;
1136 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1137 	mp->b_queue = NULL;
1138 	MBLK_BAND_FLAG_WORD(mp) = 0;
1139 	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1140 out:
1141 	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1142 
1143 	return (mp);
1144 }
1145 
1146 static void
dblk_lastfree_oversize(mblk_t * mp,dblk_t * dbp)1147 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1148 {
1149 	ASSERT(dbp->db_mblk == mp);
1150 	if (dbp->db_fthdr != NULL)
1151 		str_ftfree(dbp);
1152 
1153 	/* set credp and projid to be 'unspecified' before returning to cache */
1154 	if (dbp->db_credp != NULL) {
1155 		crfree(dbp->db_credp);
1156 		dbp->db_credp = NULL;
1157 	}
1158 	dbp->db_cpid = -1;
1159 	dbp->db_struioflag = 0;
1160 	dbp->db_struioun.cksum.flags = 0;
1161 
1162 	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1163 	kmem_cache_free(dbp->db_cache, dbp);
1164 }
1165 
1166 static mblk_t *
allocb_oversize(size_t size,int kmflags)1167 allocb_oversize(size_t size, int kmflags)
1168 {
1169 	mblk_t *mp;
1170 	void *buf;
1171 
1172 	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1173 	if ((buf = kmem_alloc(size, kmflags)) == NULL)
1174 		return (NULL);
1175 	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1176 	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1177 		kmem_free(buf, size);
1178 
1179 	if (mp != NULL)
1180 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1181 
1182 	return (mp);
1183 }
1184 
1185 mblk_t *
allocb_tryhard(size_t target_size)1186 allocb_tryhard(size_t target_size)
1187 {
1188 	size_t size;
1189 	mblk_t *bp;
1190 
1191 	for (size = target_size; size < target_size + 512;
1192 	    size += DBLK_CACHE_ALIGN)
1193 		if ((bp = allocb(size, BPRI_HI)) != NULL)
1194 			return (bp);
1195 	allocb_tryhard_fails++;
1196 	return (NULL);
1197 }
1198 
1199 /*
1200  * This routine is consolidation private for STREAMS internal use
1201  * This routine may only be called from sync routines (i.e., not
1202  * from put or service procedures).  It is located here (rather
1203  * than strsubr.c) so that we don't have to expose all of the
1204  * allocb() implementation details in header files.
1205  */
1206 mblk_t *
allocb_wait(size_t size,uint_t pri,uint_t flags,int * error)1207 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1208 {
1209 	dblk_t *dbp;
1210 	mblk_t *mp;
1211 	size_t index;
1212 
1213 	index = (size -1) >> DBLK_SIZE_SHIFT;
1214 
1215 	if (flags & STR_NOSIG) {
1216 		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1217 			if (size != 0) {
1218 				mp = allocb_oversize(size, KM_SLEEP);
1219 				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1220 				    (uintptr_t)mp);
1221 				return (mp);
1222 			}
1223 			index = 0;
1224 		}
1225 
1226 		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1227 		mp = dbp->db_mblk;
1228 		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1229 		mp->b_next = mp->b_prev = mp->b_cont = NULL;
1230 		mp->b_rptr = mp->b_wptr = dbp->db_base;
1231 		mp->b_queue = NULL;
1232 		MBLK_BAND_FLAG_WORD(mp) = 0;
1233 		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1234 
1235 		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1236 
1237 	} else {
1238 		while ((mp = allocb(size, pri)) == NULL) {
1239 			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1240 				return (NULL);
1241 		}
1242 	}
1243 
1244 	return (mp);
1245 }
1246 
1247 /*
1248  * Call function 'func' with 'arg' when a class zero block can
1249  * be allocated with priority 'pri'.
1250  */
1251 bufcall_id_t
esbbcall(uint_t pri,void (* func)(void *),void * arg)1252 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1253 {
1254 	return (bufcall(1, pri, func, arg));
1255 }
1256 
1257 /*
1258  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1259  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1260  * This provides consistency for all internal allocators of ioctl.
1261  */
1262 mblk_t *
mkiocb(uint_t cmd)1263 mkiocb(uint_t cmd)
1264 {
1265 	struct iocblk	*ioc;
1266 	mblk_t		*mp;
1267 
1268 	/*
1269 	 * Allocate enough space for any of the ioctl related messages.
1270 	 */
1271 	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1272 		return (NULL);
1273 
1274 	bzero(mp->b_rptr, sizeof (union ioctypes));
1275 
1276 	/*
1277 	 * Set the mblk_t information and ptrs correctly.
1278 	 */
1279 	mp->b_wptr += sizeof (struct iocblk);
1280 	mp->b_datap->db_type = M_IOCTL;
1281 
1282 	/*
1283 	 * Fill in the fields.
1284 	 */
1285 	ioc		= (struct iocblk *)mp->b_rptr;
1286 	ioc->ioc_cmd	= cmd;
1287 	ioc->ioc_cr	= kcred;
1288 	ioc->ioc_id	= getiocseqno();
1289 	ioc->ioc_flag	= IOC_NATIVE;
1290 	return (mp);
1291 }
1292 
1293 /*
1294  * test if block of given size can be allocated with a request of
1295  * the given priority.
1296  * 'pri' is no longer used, but is retained for compatibility.
1297  */
1298 /* ARGSUSED */
1299 int
testb(size_t size,uint_t pri)1300 testb(size_t size, uint_t pri)
1301 {
1302 	return ((size + sizeof (dblk_t)) <= kmem_avail());
1303 }
1304 
1305 /*
1306  * Call function 'func' with argument 'arg' when there is a reasonably
1307  * good chance that a block of size 'size' can be allocated.
1308  * 'pri' is no longer used, but is retained for compatibility.
1309  */
1310 /* ARGSUSED */
1311 bufcall_id_t
bufcall(size_t size,uint_t pri,void (* func)(void *),void * arg)1312 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1313 {
1314 	static long bid = 1;	/* always odd to save checking for zero */
1315 	bufcall_id_t bc_id;
1316 	struct strbufcall *bcp;
1317 
1318 	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1319 		return (0);
1320 
1321 	bcp->bc_func = func;
1322 	bcp->bc_arg = arg;
1323 	bcp->bc_size = size;
1324 	bcp->bc_next = NULL;
1325 	bcp->bc_executor = NULL;
1326 
1327 	mutex_enter(&strbcall_lock);
1328 	/*
1329 	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1330 	 * should be no references to bcp since it may be freed by
1331 	 * runbufcalls(). Since bcp_id field is returned, we save its value in
1332 	 * the local var.
1333 	 */
1334 	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */
1335 
1336 	/*
1337 	 * add newly allocated stream event to existing
1338 	 * linked list of events.
1339 	 */
1340 	if (strbcalls.bc_head == NULL) {
1341 		strbcalls.bc_head = strbcalls.bc_tail = bcp;
1342 	} else {
1343 		strbcalls.bc_tail->bc_next = bcp;
1344 		strbcalls.bc_tail = bcp;
1345 	}
1346 
1347 	cv_signal(&strbcall_cv);
1348 	mutex_exit(&strbcall_lock);
1349 	return (bc_id);
1350 }
1351 
1352 /*
1353  * Cancel a bufcall request.
1354  */
1355 void
unbufcall(bufcall_id_t id)1356 unbufcall(bufcall_id_t id)
1357 {
1358 	strbufcall_t *bcp, *pbcp;
1359 
1360 	mutex_enter(&strbcall_lock);
1361 again:
1362 	pbcp = NULL;
1363 	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1364 		if (id == bcp->bc_id)
1365 			break;
1366 		pbcp = bcp;
1367 	}
1368 	if (bcp) {
1369 		if (bcp->bc_executor != NULL) {
1370 			if (bcp->bc_executor != curthread) {
1371 				cv_wait(&bcall_cv, &strbcall_lock);
1372 				goto again;
1373 			}
1374 		} else {
1375 			if (pbcp)
1376 				pbcp->bc_next = bcp->bc_next;
1377 			else
1378 				strbcalls.bc_head = bcp->bc_next;
1379 			if (bcp == strbcalls.bc_tail)
1380 				strbcalls.bc_tail = pbcp;
1381 			kmem_free(bcp, sizeof (strbufcall_t));
1382 		}
1383 	}
1384 	mutex_exit(&strbcall_lock);
1385 }
1386 
1387 /*
1388  * Duplicate a message block by block (uses dupb), returning
1389  * a pointer to the duplicate message.
1390  * Returns a non-NULL value only if the entire message
1391  * was dup'd.
1392  */
1393 mblk_t *
dupmsg(mblk_t * bp)1394 dupmsg(mblk_t *bp)
1395 {
1396 	mblk_t *head, *nbp;
1397 
1398 	if (!bp || !(nbp = head = dupb(bp)))
1399 		return (NULL);
1400 
1401 	while (bp->b_cont) {
1402 		if (!(nbp->b_cont = dupb(bp->b_cont))) {
1403 			freemsg(head);
1404 			return (NULL);
1405 		}
1406 		nbp = nbp->b_cont;
1407 		bp = bp->b_cont;
1408 	}
1409 	return (head);
1410 }
1411 
1412 #define	DUPB_NOLOAN(bp) \
1413 	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1414 	copyb((bp)) : dupb((bp)))
1415 
1416 mblk_t *
dupmsg_noloan(mblk_t * bp)1417 dupmsg_noloan(mblk_t *bp)
1418 {
1419 	mblk_t *head, *nbp;
1420 
1421 	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1422 	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1423 		return (NULL);
1424 
1425 	while (bp->b_cont) {
1426 		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1427 			freemsg(head);
1428 			return (NULL);
1429 		}
1430 		nbp = nbp->b_cont;
1431 		bp = bp->b_cont;
1432 	}
1433 	return (head);
1434 }
1435 
1436 /*
1437  * Copy data from message and data block to newly allocated message and
1438  * data block. Returns new message block pointer, or NULL if error.
1439  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1440  * as in the original even when db_base is not word aligned. (bug 1052877)
1441  */
1442 mblk_t *
copyb(mblk_t * bp)1443 copyb(mblk_t *bp)
1444 {
1445 	mblk_t	*nbp;
1446 	dblk_t	*dp, *ndp;
1447 	uchar_t *base;
1448 	size_t	size;
1449 	size_t	unaligned;
1450 
1451 	ASSERT(bp->b_wptr >= bp->b_rptr);
1452 
1453 	dp = bp->b_datap;
1454 	if (dp->db_fthdr != NULL)
1455 		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1456 
1457 	size = dp->db_lim - dp->db_base;
1458 	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1459 	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1460 		return (NULL);
1461 	nbp->b_flag = bp->b_flag;
1462 	nbp->b_band = bp->b_band;
1463 	ndp = nbp->b_datap;
1464 
1465 	/*
1466 	 * Copy the various checksum information that came in
1467 	 * originally.
1468 	 */
1469 	ndp->db_cksumstart = dp->db_cksumstart;
1470 	ndp->db_cksumend = dp->db_cksumend;
1471 	ndp->db_cksumstuff = dp->db_cksumstuff;
1472 	bcopy(dp->db_struioun.data, ndp->db_struioun.data,
1473 	    sizeof (dp->db_struioun.data));
1474 
1475 	/*
1476 	 * Well, here is a potential issue.  If we are trying to
1477 	 * trace a flow, and we copy the message, we might lose
1478 	 * information about where this message might have been.
1479 	 * So we should inherit the FT data.  On the other hand,
1480 	 * a user might be interested only in alloc to free data.
1481 	 * So I guess the real answer is to provide a tunable.
1482 	 */
1483 	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1484 
1485 	base = ndp->db_base + unaligned;
1486 	bcopy(dp->db_base, ndp->db_base + unaligned, size);
1487 
1488 	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1489 	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1490 
1491 	return (nbp);
1492 }
1493 
1494 /*
1495  * Copy data from message to newly allocated message using new
1496  * data blocks.  Returns a pointer to the new message, or NULL if error.
1497  */
1498 mblk_t *
copymsg(mblk_t * bp)1499 copymsg(mblk_t *bp)
1500 {
1501 	mblk_t *head, *nbp;
1502 
1503 	if (!bp || !(nbp = head = copyb(bp)))
1504 		return (NULL);
1505 
1506 	while (bp->b_cont) {
1507 		if (!(nbp->b_cont = copyb(bp->b_cont))) {
1508 			freemsg(head);
1509 			return (NULL);
1510 		}
1511 		nbp = nbp->b_cont;
1512 		bp = bp->b_cont;
1513 	}
1514 	return (head);
1515 }
1516 
1517 /*
1518  * link a message block to tail of message
1519  */
1520 void
linkb(mblk_t * mp,mblk_t * bp)1521 linkb(mblk_t *mp, mblk_t *bp)
1522 {
1523 	ASSERT(mp && bp);
1524 
1525 	for (; mp->b_cont; mp = mp->b_cont)
1526 		;
1527 	mp->b_cont = bp;
1528 }
1529 
1530 /*
1531  * unlink a message block from head of message
1532  * return pointer to new message.
1533  * NULL if message becomes empty.
1534  */
1535 mblk_t *
unlinkb(mblk_t * bp)1536 unlinkb(mblk_t *bp)
1537 {
1538 	mblk_t *bp1;
1539 
1540 	bp1 = bp->b_cont;
1541 	bp->b_cont = NULL;
1542 	return (bp1);
1543 }
1544 
1545 /*
1546  * remove a message block "bp" from message "mp"
1547  *
1548  * Return pointer to new message or NULL if no message remains.
1549  * Return -1 if bp is not found in message.
1550  */
1551 mblk_t *
rmvb(mblk_t * mp,mblk_t * bp)1552 rmvb(mblk_t *mp, mblk_t *bp)
1553 {
1554 	mblk_t *tmp;
1555 	mblk_t *lastp = NULL;
1556 
1557 	ASSERT(mp && bp);
1558 	for (tmp = mp; tmp; tmp = tmp->b_cont) {
1559 		if (tmp == bp) {
1560 			if (lastp)
1561 				lastp->b_cont = tmp->b_cont;
1562 			else
1563 				mp = tmp->b_cont;
1564 			tmp->b_cont = NULL;
1565 			return (mp);
1566 		}
1567 		lastp = tmp;
1568 	}
1569 	return ((mblk_t *)-1);
1570 }
1571 
1572 /*
1573  * Concatenate and align first len bytes of common
1574  * message type.  Len == -1, means concat everything.
1575  * Returns 1 on success, 0 on failure
1576  * After the pullup, mp points to the pulled up data.
1577  */
1578 int
pullupmsg(mblk_t * mp,ssize_t len)1579 pullupmsg(mblk_t *mp, ssize_t len)
1580 {
1581 	mblk_t *bp, *b_cont;
1582 	dblk_t *dbp;
1583 	ssize_t n;
1584 
1585 	ASSERT(mp->b_datap->db_ref > 0);
1586 	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1587 
1588 	if (len == -1) {
1589 		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1590 			return (1);
1591 		len = xmsgsize(mp);
1592 	} else {
1593 		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1594 		ASSERT(first_mblk_len >= 0);
1595 		/*
1596 		 * If the length is less than that of the first mblk,
1597 		 * we want to pull up the message into an aligned mblk.
1598 		 * Though not part of the spec, some callers assume it.
1599 		 */
1600 		if (len <= first_mblk_len) {
1601 			if (str_aligned(mp->b_rptr))
1602 				return (1);
1603 			len = first_mblk_len;
1604 		} else if (xmsgsize(mp) < len)
1605 			return (0);
1606 	}
1607 
1608 	if ((bp = allocb_tmpl(len, mp)) == NULL)
1609 		return (0);
1610 
1611 	dbp = bp->b_datap;
1612 	*bp = *mp;		/* swap mblks so bp heads the old msg... */
1613 	mp->b_datap = dbp;	/* ... and mp heads the new message */
1614 	mp->b_datap->db_mblk = mp;
1615 	bp->b_datap->db_mblk = bp;
1616 	mp->b_rptr = mp->b_wptr = dbp->db_base;
1617 
1618 	do {
1619 		ASSERT(bp->b_datap->db_ref > 0);
1620 		ASSERT(bp->b_wptr >= bp->b_rptr);
1621 		n = MIN(bp->b_wptr - bp->b_rptr, len);
1622 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1623 		if (n > 0)
1624 			bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1625 		mp->b_wptr += n;
1626 		bp->b_rptr += n;
1627 		len -= n;
1628 		if (bp->b_rptr != bp->b_wptr)
1629 			break;
1630 		b_cont = bp->b_cont;
1631 		freeb(bp);
1632 		bp = b_cont;
1633 	} while (len && bp);
1634 
1635 	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */
1636 
1637 	return (1);
1638 }
1639 
1640 /*
1641  * Concatenate and align at least the first len bytes of common message
1642  * type.  Len == -1 means concatenate everything.  The original message is
1643  * unaltered.  Returns a pointer to a new message on success, otherwise
1644  * returns NULL.
1645  */
1646 mblk_t *
msgpullup(mblk_t * mp,ssize_t len)1647 msgpullup(mblk_t *mp, ssize_t len)
1648 {
1649 	mblk_t	*newmp;
1650 	ssize_t	totlen;
1651 	ssize_t	n;
1652 
1653 	totlen = xmsgsize(mp);
1654 
1655 	if ((len > 0) && (len > totlen))
1656 		return (NULL);
1657 
1658 	/*
1659 	 * Copy all of the first msg type into one new mblk, then dupmsg
1660 	 * and link the rest onto this.
1661 	 */
1662 
1663 	len = totlen;
1664 
1665 	if ((newmp = allocb_tmpl(len, mp)) == NULL)
1666 		return (NULL);
1667 
1668 	newmp->b_flag = mp->b_flag;
1669 	newmp->b_band = mp->b_band;
1670 
1671 	while (len > 0) {
1672 		n = mp->b_wptr - mp->b_rptr;
1673 		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
1674 		if (n > 0)
1675 			bcopy(mp->b_rptr, newmp->b_wptr, n);
1676 		newmp->b_wptr += n;
1677 		len -= n;
1678 		mp = mp->b_cont;
1679 	}
1680 
1681 	if (mp != NULL) {
1682 		newmp->b_cont = dupmsg(mp);
1683 		if (newmp->b_cont == NULL) {
1684 			freemsg(newmp);
1685 			return (NULL);
1686 		}
1687 	}
1688 
1689 	return (newmp);
1690 }
1691 
1692 /*
1693  * Trim bytes from message
1694  *  len > 0, trim from head
1695  *  len < 0, trim from tail
1696  * Returns 1 on success, 0 on failure.
1697  */
1698 int
adjmsg(mblk_t * mp,ssize_t len)1699 adjmsg(mblk_t *mp, ssize_t len)
1700 {
1701 	mblk_t *bp;
1702 	mblk_t *save_bp = NULL;
1703 	mblk_t *prev_bp;
1704 	mblk_t *bcont;
1705 	unsigned char type;
1706 	ssize_t n;
1707 	int fromhead;
1708 	int first;
1709 
1710 	ASSERT(mp != NULL);
1711 
1712 	if (len < 0) {
1713 		fromhead = 0;
1714 		len = -len;
1715 	} else {
1716 		fromhead = 1;
1717 	}
1718 
1719 	if (xmsgsize(mp) < len)
1720 		return (0);
1721 
1722 	if (fromhead) {
1723 		first = 1;
1724 		while (len) {
1725 			ASSERT(mp->b_wptr >= mp->b_rptr);
1726 			n = MIN(mp->b_wptr - mp->b_rptr, len);
1727 			mp->b_rptr += n;
1728 			len -= n;
1729 
1730 			/*
1731 			 * If this is not the first zero length
1732 			 * message remove it
1733 			 */
1734 			if (!first && (mp->b_wptr == mp->b_rptr)) {
1735 				bcont = mp->b_cont;
1736 				freeb(mp);
1737 				mp = save_bp->b_cont = bcont;
1738 			} else {
1739 				save_bp = mp;
1740 				mp = mp->b_cont;
1741 			}
1742 			first = 0;
1743 		}
1744 	} else {
1745 		type = mp->b_datap->db_type;
1746 		while (len) {
1747 			bp = mp;
1748 			save_bp = NULL;
1749 
1750 			/*
1751 			 * Find the last message of same type
1752 			 */
1753 			while (bp && bp->b_datap->db_type == type) {
1754 				ASSERT(bp->b_wptr >= bp->b_rptr);
1755 				prev_bp = save_bp;
1756 				save_bp = bp;
1757 				bp = bp->b_cont;
1758 			}
1759 			if (save_bp == NULL)
1760 				break;
1761 			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1762 			save_bp->b_wptr -= n;
1763 			len -= n;
1764 
1765 			/*
1766 			 * If this is not the first message
1767 			 * and we have taken away everything
1768 			 * from this message, remove it
1769 			 */
1770 
1771 			if ((save_bp != mp) &&
1772 			    (save_bp->b_wptr == save_bp->b_rptr)) {
1773 				bcont = save_bp->b_cont;
1774 				freeb(save_bp);
1775 				prev_bp->b_cont = bcont;
1776 			}
1777 		}
1778 	}
1779 	return (1);
1780 }
1781 
1782 /*
1783  * get number of data bytes in message
1784  */
1785 size_t
msgdsize(mblk_t * bp)1786 msgdsize(mblk_t *bp)
1787 {
1788 	size_t count = 0;
1789 
1790 	for (; bp; bp = bp->b_cont)
1791 		if (bp->b_datap->db_type == M_DATA) {
1792 			ASSERT(bp->b_wptr >= bp->b_rptr);
1793 			count += bp->b_wptr - bp->b_rptr;
1794 		}
1795 	return (count);
1796 }
1797 
1798 /*
1799  * Get a message off head of queue
1800  *
1801  * If queue has no buffers then mark queue
1802  * with QWANTR. (queue wants to be read by
1803  * someone when data becomes available)
1804  *
1805  * If there is something to take off then do so.
1806  * If queue falls below hi water mark turn off QFULL
1807  * flag.  Decrement weighted count of queue.
1808  * Also turn off QWANTR because queue is being read.
1809  *
1810  * The queue count is maintained on a per-band basis.
1811  * Priority band 0 (normal messages) uses q_count,
1812  * q_lowat, etc.  Non-zero priority bands use the
1813  * fields in their respective qband structures
1814  * (qb_count, qb_lowat, etc.)  All messages appear
1815  * on the same list, linked via their b_next pointers.
1816  * q_first is the head of the list.  q_count does
1817  * not reflect the size of all the messages on the
1818  * queue.  It only reflects those messages in the
1819  * normal band of flow.  The one exception to this
1820  * deals with high priority messages.  They are in
1821  * their own conceptual "band", but are accounted
1822  * against q_count.
1823  *
1824  * If queue count is below the lo water mark and QWANTW
1825  * is set, enable the closest backq which has a service
1826  * procedure and turn off the QWANTW flag.
1827  *
1828  * getq could be built on top of rmvq, but isn't because
1829  * of performance considerations.
1830  *
1831  * A note on the use of q_count and q_mblkcnt:
1832  *   q_count is the traditional byte count for messages that
1833  *   have been put on a queue.  Documentation tells us that
1834  *   we shouldn't rely on that count, but some drivers/modules
1835  *   do.  What was needed, however, is a mechanism to prevent
1836  *   runaway streams from consuming all of the resources,
1837  *   and particularly be able to flow control zero-length
1838  *   messages.  q_mblkcnt is used for this purpose.  It
1839  *   counts the number of mblk's that are being put on
1840  *   the queue.  The intention here, is that each mblk should
1841  *   contain one byte of data and, for the purpose of
1842  *   flow-control, logically does.  A queue will become
1843  *   full when EITHER of these values (q_count and q_mblkcnt)
1844  *   reach the highwater mark.  It will clear when BOTH
1845  *   of them drop below the highwater mark.  And it will
1846  *   backenable when BOTH of them drop below the lowwater
1847  *   mark.
1848  *   With this algorithm, a driver/module might be able
1849  *   to find a reasonably accurate q_count, and the
1850  *   framework can still try and limit resource usage.
1851  */
1852 mblk_t *
getq(queue_t * q)1853 getq(queue_t *q)
1854 {
1855 	mblk_t *bp;
1856 	uchar_t band = 0;
1857 
1858 	bp = getq_noenab(q, 0);
1859 	if (bp != NULL)
1860 		band = bp->b_band;
1861 
1862 	/*
1863 	 * Inlined from qbackenable().
1864 	 * Quick check without holding the lock.
1865 	 */
1866 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1867 		return (bp);
1868 
1869 	qbackenable(q, band);
1870 	return (bp);
1871 }
1872 
1873 /*
1874  * Returns the number of bytes in a message (a message is defined as a
1875  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1876  * also return the number of distinct mblks in the message.
1877  */
1878 int
mp_cont_len(mblk_t * bp,int * mblkcnt)1879 mp_cont_len(mblk_t *bp, int *mblkcnt)
1880 {
1881 	mblk_t	*mp;
1882 	int	mblks = 0;
1883 	int	bytes = 0;
1884 
1885 	for (mp = bp; mp != NULL; mp = mp->b_cont) {
1886 		bytes += MBLKL(mp);
1887 		mblks++;
1888 	}
1889 
1890 	if (mblkcnt != NULL)
1891 		*mblkcnt = mblks;
1892 
1893 	return (bytes);
1894 }
1895 
1896 /*
1897  * Like getq() but does not backenable.  This is used by the stream
1898  * head when a putback() is likely.  The caller must call qbackenable()
1899  * after it is done with accessing the queue.
1900  * The rbytes arguments to getq_noneab() allows callers to specify a
1901  * the maximum number of bytes to return. If the current amount on the
1902  * queue is less than this then the entire message will be returned.
1903  * A value of 0 returns the entire message and is equivalent to the old
1904  * default behaviour prior to the addition of the rbytes argument.
1905  */
1906 mblk_t *
getq_noenab(queue_t * q,ssize_t rbytes)1907 getq_noenab(queue_t *q, ssize_t rbytes)
1908 {
1909 	mblk_t *bp, *mp1;
1910 	mblk_t *mp2 = NULL;
1911 	qband_t *qbp;
1912 	kthread_id_t freezer;
1913 	int	bytecnt = 0, mblkcnt = 0;
1914 
1915 	/* freezestr should allow its caller to call getq/putq */
1916 	freezer = STREAM(q)->sd_freezer;
1917 	if (freezer == curthread) {
1918 		ASSERT(frozenstr(q));
1919 		ASSERT(MUTEX_HELD(QLOCK(q)));
1920 	} else
1921 		mutex_enter(QLOCK(q));
1922 
1923 	if ((bp = q->q_first) == 0) {
1924 		q->q_flag |= QWANTR;
1925 	} else {
1926 		/*
1927 		 * If the caller supplied a byte threshold and there is
1928 		 * more than this amount on the queue then break up the
1929 		 * the message appropriately.  We can only safely do
1930 		 * this for M_DATA messages.
1931 		 */
1932 		if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1933 		    (q->q_count > rbytes)) {
1934 			/*
1935 			 * Inline version of mp_cont_len() which terminates
1936 			 * when we meet or exceed rbytes.
1937 			 */
1938 			for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1939 				mblkcnt++;
1940 				bytecnt += MBLKL(mp1);
1941 				if (bytecnt  >= rbytes)
1942 					break;
1943 			}
1944 			/*
1945 			 * We need to account for the following scenarios:
1946 			 *
1947 			 * 1) Too much data in the first message:
1948 			 *	mp1 will be the mblk which puts us over our
1949 			 *	byte limit.
1950 			 * 2) Not enough data in the first message:
1951 			 *	mp1 will be NULL.
1952 			 * 3) Exactly the right amount of data contained within
1953 			 *    whole mblks:
1954 			 *	mp1->b_cont will be where we break the message.
1955 			 */
1956 			if (bytecnt > rbytes) {
1957 				/*
1958 				 * Dup/copy mp1 and put what we don't need
1959 				 * back onto the queue. Adjust the read/write
1960 				 * and continuation pointers appropriately
1961 				 * and decrement the current mblk count to
1962 				 * reflect we are putting an mblk back onto
1963 				 * the queue.
1964 				 * When adjusting the message pointers, it's
1965 				 * OK to use the existing bytecnt and the
1966 				 * requested amount (rbytes) to calculate the
1967 				 * the new write offset (b_wptr) of what we
1968 				 * are taking. However, we  cannot use these
1969 				 * values when calculating the read offset of
1970 				 * the mblk we are putting back on the queue.
1971 				 * This is because the begining (b_rptr) of the
1972 				 * mblk represents some arbitrary point within
1973 				 * the message.
1974 				 * It's simplest to do this by advancing b_rptr
1975 				 * by the new length of mp1 as we don't have to
1976 				 * remember any intermediate state.
1977 				 */
1978 				ASSERT(mp1 != NULL);
1979 				mblkcnt--;
1980 				if ((mp2 = dupb(mp1)) == NULL &&
1981 				    (mp2 = copyb(mp1)) == NULL) {
1982 					bytecnt = mblkcnt = 0;
1983 					goto dup_failed;
1984 				}
1985 				mp2->b_cont = mp1->b_cont;
1986 				mp1->b_wptr -= bytecnt - rbytes;
1987 				mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
1988 				mp1->b_cont = NULL;
1989 				bytecnt = rbytes;
1990 			} else {
1991 				/*
1992 				 * Either there is not enough data in the first
1993 				 * message or there is no excess data to deal
1994 				 * with. If mp1 is NULL, we are taking the
1995 				 * whole message. No need to do anything.
1996 				 * Otherwise we assign mp1->b_cont to mp2 as
1997 				 * we will be putting this back onto the head of
1998 				 * the queue.
1999 				 */
2000 				if (mp1 != NULL) {
2001 					mp2 = mp1->b_cont;
2002 					mp1->b_cont = NULL;
2003 				}
2004 			}
2005 			/*
2006 			 * If mp2 is not NULL then we have part of the message
2007 			 * to put back onto the queue.
2008 			 */
2009 			if (mp2 != NULL) {
2010 				if ((mp2->b_next = bp->b_next) == NULL)
2011 					q->q_last = mp2;
2012 				else
2013 					bp->b_next->b_prev = mp2;
2014 				q->q_first = mp2;
2015 			} else {
2016 				if ((q->q_first = bp->b_next) == NULL)
2017 					q->q_last = NULL;
2018 				else
2019 					q->q_first->b_prev = NULL;
2020 			}
2021 		} else {
2022 			/*
2023 			 * Either no byte threshold was supplied, there is
2024 			 * not enough on the queue or we failed to
2025 			 * duplicate/copy a data block. In these cases we
2026 			 * just take the entire first message.
2027 			 */
2028 dup_failed:
2029 			bytecnt = mp_cont_len(bp, &mblkcnt);
2030 			if ((q->q_first = bp->b_next) == NULL)
2031 				q->q_last = NULL;
2032 			else
2033 				q->q_first->b_prev = NULL;
2034 		}
2035 		if (bp->b_band == 0) {
2036 			q->q_count -= bytecnt;
2037 			q->q_mblkcnt -= mblkcnt;
2038 			if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2039 			    (q->q_mblkcnt < q->q_hiwat))) {
2040 				q->q_flag &= ~QFULL;
2041 			}
2042 		} else {
2043 			int i;
2044 
2045 			ASSERT(bp->b_band <= q->q_nband);
2046 			ASSERT(q->q_bandp != NULL);
2047 			ASSERT(MUTEX_HELD(QLOCK(q)));
2048 			qbp = q->q_bandp;
2049 			i = bp->b_band;
2050 			while (--i > 0)
2051 				qbp = qbp->qb_next;
2052 			if (qbp->qb_first == qbp->qb_last) {
2053 				qbp->qb_first = NULL;
2054 				qbp->qb_last = NULL;
2055 			} else {
2056 				qbp->qb_first = bp->b_next;
2057 			}
2058 			qbp->qb_count -= bytecnt;
2059 			qbp->qb_mblkcnt -= mblkcnt;
2060 			if (qbp->qb_mblkcnt == 0 ||
2061 			    ((qbp->qb_count < qbp->qb_hiwat) &&
2062 			    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2063 				qbp->qb_flag &= ~QB_FULL;
2064 			}
2065 		}
2066 		q->q_flag &= ~QWANTR;
2067 		bp->b_next = NULL;
2068 		bp->b_prev = NULL;
2069 	}
2070 	if (freezer != curthread)
2071 		mutex_exit(QLOCK(q));
2072 
2073 	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0);
2074 
2075 	return (bp);
2076 }
2077 
2078 /*
2079  * Determine if a backenable is needed after removing a message in the
2080  * specified band.
2081  * NOTE: This routine assumes that something like getq_noenab() has been
2082  * already called.
2083  *
2084  * For the read side it is ok to hold sd_lock across calling this (and the
2085  * stream head often does).
2086  * But for the write side strwakeq might be invoked and it acquires sd_lock.
2087  */
2088 void
qbackenable(queue_t * q,uchar_t band)2089 qbackenable(queue_t *q, uchar_t band)
2090 {
2091 	int backenab = 0;
2092 	qband_t *qbp;
2093 	kthread_id_t freezer;
2094 
2095 	ASSERT(q);
2096 	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2097 
2098 	/*
2099 	 * Quick check without holding the lock.
2100 	 * OK since after getq() has lowered the q_count these flags
2101 	 * would not change unless either the qbackenable() is done by
2102 	 * another thread (which is ok) or the queue has gotten QFULL
2103 	 * in which case another backenable will take place when the queue
2104 	 * drops below q_lowat.
2105 	 */
2106 	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2107 		return;
2108 
2109 	/* freezestr should allow its caller to call getq/putq */
2110 	freezer = STREAM(q)->sd_freezer;
2111 	if (freezer == curthread) {
2112 		ASSERT(frozenstr(q));
2113 		ASSERT(MUTEX_HELD(QLOCK(q)));
2114 	} else
2115 		mutex_enter(QLOCK(q));
2116 
2117 	if (band == 0) {
2118 		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2119 		    q->q_mblkcnt < q->q_lowat)) {
2120 			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2121 		}
2122 	} else {
2123 		int i;
2124 
2125 		ASSERT((unsigned)band <= q->q_nband);
2126 		ASSERT(q->q_bandp != NULL);
2127 
2128 		qbp = q->q_bandp;
2129 		i = band;
2130 		while (--i > 0)
2131 			qbp = qbp->qb_next;
2132 
2133 		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2134 		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
2135 			backenab = qbp->qb_flag & QB_WANTW;
2136 		}
2137 	}
2138 
2139 	if (backenab == 0) {
2140 		if (freezer != curthread)
2141 			mutex_exit(QLOCK(q));
2142 		return;
2143 	}
2144 
2145 	/* Have to drop the lock across strwakeq and backenable */
2146 	if (backenab & QWANTWSYNC)
2147 		q->q_flag &= ~QWANTWSYNC;
2148 	if (backenab & (QWANTW|QB_WANTW)) {
2149 		if (band != 0)
2150 			qbp->qb_flag &= ~QB_WANTW;
2151 		else {
2152 			q->q_flag &= ~QWANTW;
2153 		}
2154 	}
2155 
2156 	if (freezer != curthread)
2157 		mutex_exit(QLOCK(q));
2158 
2159 	if (backenab & QWANTWSYNC)
2160 		strwakeq(q, QWANTWSYNC);
2161 	if (backenab & (QWANTW|QB_WANTW))
2162 		backenable(q, band);
2163 }
2164 
2165 /*
2166  * Remove a message from a queue.  The queue count and other
2167  * flow control parameters are adjusted and the back queue
2168  * enabled if necessary.
2169  *
2170  * rmvq can be called with the stream frozen, but other utility functions
2171  * holding QLOCK, and by streams modules without any locks/frozen.
2172  */
2173 void
rmvq(queue_t * q,mblk_t * mp)2174 rmvq(queue_t *q, mblk_t *mp)
2175 {
2176 	ASSERT(mp != NULL);
2177 
2178 	rmvq_noenab(q, mp);
2179 	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2180 		/*
2181 		 * qbackenable can handle a frozen stream but not a "random"
2182 		 * qlock being held. Drop lock across qbackenable.
2183 		 */
2184 		mutex_exit(QLOCK(q));
2185 		qbackenable(q, mp->b_band);
2186 		mutex_enter(QLOCK(q));
2187 	} else {
2188 		qbackenable(q, mp->b_band);
2189 	}
2190 }
2191 
2192 /*
2193  * Like rmvq() but without any backenabling.
2194  * This exists to handle SR_CONSOL_DATA in strrput().
2195  */
2196 void
rmvq_noenab(queue_t * q,mblk_t * mp)2197 rmvq_noenab(queue_t *q, mblk_t *mp)
2198 {
2199 	int i;
2200 	qband_t *qbp = NULL;
2201 	kthread_id_t freezer;
2202 	int	bytecnt = 0, mblkcnt = 0;
2203 
2204 	freezer = STREAM(q)->sd_freezer;
2205 	if (freezer == curthread) {
2206 		ASSERT(frozenstr(q));
2207 		ASSERT(MUTEX_HELD(QLOCK(q)));
2208 	} else if (MUTEX_HELD(QLOCK(q))) {
2209 		/* Don't drop lock on exit */
2210 		freezer = curthread;
2211 	} else
2212 		mutex_enter(QLOCK(q));
2213 
2214 	ASSERT(mp->b_band <= q->q_nband);
2215 	if (mp->b_band != 0) {		/* Adjust band pointers */
2216 		ASSERT(q->q_bandp != NULL);
2217 		qbp = q->q_bandp;
2218 		i = mp->b_band;
2219 		while (--i > 0)
2220 			qbp = qbp->qb_next;
2221 		if (mp == qbp->qb_first) {
2222 			if (mp->b_next && mp->b_band == mp->b_next->b_band)
2223 				qbp->qb_first = mp->b_next;
2224 			else
2225 				qbp->qb_first = NULL;
2226 		}
2227 		if (mp == qbp->qb_last) {
2228 			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2229 				qbp->qb_last = mp->b_prev;
2230 			else
2231 				qbp->qb_last = NULL;
2232 		}
2233 	}
2234 
2235 	/*
2236 	 * Remove the message from the list.
2237 	 */
2238 	if (mp->b_prev)
2239 		mp->b_prev->b_next = mp->b_next;
2240 	else
2241 		q->q_first = mp->b_next;
2242 	if (mp->b_next)
2243 		mp->b_next->b_prev = mp->b_prev;
2244 	else
2245 		q->q_last = mp->b_prev;
2246 	mp->b_next = NULL;
2247 	mp->b_prev = NULL;
2248 
2249 	/* Get the size of the message for q_count accounting */
2250 	bytecnt = mp_cont_len(mp, &mblkcnt);
2251 
2252 	if (mp->b_band == 0) {		/* Perform q_count accounting */
2253 		q->q_count -= bytecnt;
2254 		q->q_mblkcnt -= mblkcnt;
2255 		if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2256 		    (q->q_mblkcnt < q->q_hiwat))) {
2257 			q->q_flag &= ~QFULL;
2258 		}
2259 	} else {			/* Perform qb_count accounting */
2260 		qbp->qb_count -= bytecnt;
2261 		qbp->qb_mblkcnt -= mblkcnt;
2262 		if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2263 		    (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2264 			qbp->qb_flag &= ~QB_FULL;
2265 		}
2266 	}
2267 	if (freezer != curthread)
2268 		mutex_exit(QLOCK(q));
2269 
2270 	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0);
2271 }
2272 
2273 /*
2274  * Empty a queue.
2275  * If flag is set, remove all messages.  Otherwise, remove
2276  * only non-control messages.  If queue falls below its low
2277  * water mark, and QWANTW is set, enable the nearest upstream
2278  * service procedure.
2279  *
2280  * Historical note: when merging the M_FLUSH code in strrput with this
2281  * code one difference was discovered. flushq did not have a check
2282  * for q_lowat == 0 in the backenabling test.
2283  *
2284  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2285  * if one exists on the queue.
2286  */
2287 void
flushq_common(queue_t * q,int flag,int pcproto_flag)2288 flushq_common(queue_t *q, int flag, int pcproto_flag)
2289 {
2290 	mblk_t *mp, *nmp;
2291 	qband_t *qbp;
2292 	int backenab = 0;
2293 	unsigned char bpri;
2294 	unsigned char	qbf[NBAND];	/* band flushing backenable flags */
2295 
2296 	if (q->q_first == NULL)
2297 		return;
2298 
2299 	mutex_enter(QLOCK(q));
2300 	mp = q->q_first;
2301 	q->q_first = NULL;
2302 	q->q_last = NULL;
2303 	q->q_count = 0;
2304 	q->q_mblkcnt = 0;
2305 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2306 		qbp->qb_first = NULL;
2307 		qbp->qb_last = NULL;
2308 		qbp->qb_count = 0;
2309 		qbp->qb_mblkcnt = 0;
2310 		qbp->qb_flag &= ~QB_FULL;
2311 	}
2312 	q->q_flag &= ~QFULL;
2313 	mutex_exit(QLOCK(q));
2314 	while (mp) {
2315 		nmp = mp->b_next;
2316 		mp->b_next = mp->b_prev = NULL;
2317 
2318 		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0);
2319 
2320 		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2321 			(void) putq(q, mp);
2322 		else if (flag || datamsg(mp->b_datap->db_type))
2323 			freemsg(mp);
2324 		else
2325 			(void) putq(q, mp);
2326 		mp = nmp;
2327 	}
2328 	bpri = 1;
2329 	mutex_enter(QLOCK(q));
2330 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2331 		if ((qbp->qb_flag & QB_WANTW) &&
2332 		    (((qbp->qb_count < qbp->qb_lowat) &&
2333 		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2334 		    qbp->qb_lowat == 0)) {
2335 			qbp->qb_flag &= ~QB_WANTW;
2336 			backenab = 1;
2337 			qbf[bpri] = 1;
2338 		} else
2339 			qbf[bpri] = 0;
2340 		bpri++;
2341 	}
2342 	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2343 	if ((q->q_flag & QWANTW) &&
2344 	    (((q->q_count < q->q_lowat) &&
2345 	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2346 		q->q_flag &= ~QWANTW;
2347 		backenab = 1;
2348 		qbf[0] = 1;
2349 	} else
2350 		qbf[0] = 0;
2351 
2352 	/*
2353 	 * If any band can now be written to, and there is a writer
2354 	 * for that band, then backenable the closest service procedure.
2355 	 */
2356 	if (backenab) {
2357 		mutex_exit(QLOCK(q));
2358 		for (bpri = q->q_nband; bpri != 0; bpri--)
2359 			if (qbf[bpri])
2360 				backenable(q, bpri);
2361 		if (qbf[0])
2362 			backenable(q, 0);
2363 	} else
2364 		mutex_exit(QLOCK(q));
2365 }
2366 
2367 /*
2368  * The real flushing takes place in flushq_common. This is done so that
2369  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2370  * or not. Currently the only place that uses this flag is the stream head.
2371  */
2372 void
flushq(queue_t * q,int flag)2373 flushq(queue_t *q, int flag)
2374 {
2375 	flushq_common(q, flag, 0);
2376 }
2377 
2378 /*
2379  * Flush the queue of messages of the given priority band.
2380  * There is some duplication of code between flushq and flushband.
2381  * This is because we want to optimize the code as much as possible.
2382  * The assumption is that there will be more messages in the normal
2383  * (priority 0) band than in any other.
2384  *
2385  * Historical note: when merging the M_FLUSH code in strrput with this
2386  * code one difference was discovered. flushband had an extra check for
2387  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2388  * case. That check does not match the man page for flushband and was not
2389  * in the strrput flush code hence it was removed.
2390  */
2391 void
flushband(queue_t * q,unsigned char pri,int flag)2392 flushband(queue_t *q, unsigned char pri, int flag)
2393 {
2394 	mblk_t *mp;
2395 	mblk_t *nmp;
2396 	mblk_t *last;
2397 	qband_t *qbp;
2398 	int band;
2399 
2400 	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2401 	if (pri > q->q_nband) {
2402 		return;
2403 	}
2404 	mutex_enter(QLOCK(q));
2405 	if (pri == 0) {
2406 		mp = q->q_first;
2407 		q->q_first = NULL;
2408 		q->q_last = NULL;
2409 		q->q_count = 0;
2410 		q->q_mblkcnt = 0;
2411 		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2412 			qbp->qb_first = NULL;
2413 			qbp->qb_last = NULL;
2414 			qbp->qb_count = 0;
2415 			qbp->qb_mblkcnt = 0;
2416 			qbp->qb_flag &= ~QB_FULL;
2417 		}
2418 		q->q_flag &= ~QFULL;
2419 		mutex_exit(QLOCK(q));
2420 		while (mp) {
2421 			nmp = mp->b_next;
2422 			mp->b_next = mp->b_prev = NULL;
2423 			if ((mp->b_band == 0) &&
2424 			    ((flag == FLUSHALL) ||
2425 			    datamsg(mp->b_datap->db_type)))
2426 				freemsg(mp);
2427 			else
2428 				(void) putq(q, mp);
2429 			mp = nmp;
2430 		}
2431 		mutex_enter(QLOCK(q));
2432 		if ((q->q_flag & QWANTW) &&
2433 		    (((q->q_count < q->q_lowat) &&
2434 		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2435 			q->q_flag &= ~QWANTW;
2436 			mutex_exit(QLOCK(q));
2437 
2438 			backenable(q, pri);
2439 		} else
2440 			mutex_exit(QLOCK(q));
2441 	} else {	/* pri != 0 */
2442 		boolean_t flushed = B_FALSE;
2443 		band = pri;
2444 
2445 		ASSERT(MUTEX_HELD(QLOCK(q)));
2446 		qbp = q->q_bandp;
2447 		while (--band > 0)
2448 			qbp = qbp->qb_next;
2449 		mp = qbp->qb_first;
2450 		if (mp == NULL) {
2451 			mutex_exit(QLOCK(q));
2452 			return;
2453 		}
2454 		last = qbp->qb_last->b_next;
2455 		/*
2456 		 * rmvq_noenab() and freemsg() are called for each mblk that
2457 		 * meets the criteria.  The loop is executed until the last
2458 		 * mblk has been processed.
2459 		 */
2460 		while (mp != last) {
2461 			ASSERT(mp->b_band == pri);
2462 			nmp = mp->b_next;
2463 			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2464 				rmvq_noenab(q, mp);
2465 				freemsg(mp);
2466 				flushed = B_TRUE;
2467 			}
2468 			mp = nmp;
2469 		}
2470 		mutex_exit(QLOCK(q));
2471 
2472 		/*
2473 		 * If any mblk(s) has been freed, we know that qbackenable()
2474 		 * will need to be called.
2475 		 */
2476 		if (flushed)
2477 			qbackenable(q, pri);
2478 	}
2479 }
2480 
2481 /*
2482  * Return 1 if the queue is not full.  If the queue is full, return
2483  * 0 (may not put message) and set QWANTW flag (caller wants to write
2484  * to the queue).
2485  */
2486 int
canput(queue_t * q)2487 canput(queue_t *q)
2488 {
2489 	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2490 
2491 	/* this is for loopback transports, they should not do a canput */
2492 	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2493 
2494 	/* Find next forward module that has a service procedure */
2495 	q = q->q_nfsrv;
2496 
2497 	if (!(q->q_flag & QFULL)) {
2498 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2499 		return (1);
2500 	}
2501 	mutex_enter(QLOCK(q));
2502 	if (q->q_flag & QFULL) {
2503 		q->q_flag |= QWANTW;
2504 		mutex_exit(QLOCK(q));
2505 		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2506 		return (0);
2507 	}
2508 	mutex_exit(QLOCK(q));
2509 	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2510 	return (1);
2511 }
2512 
2513 /*
2514  * This is the new canput for use with priority bands.  Return 1 if the
2515  * band is not full.  If the band is full, return 0 (may not put message)
2516  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2517  * write to the queue).
2518  */
2519 int
bcanput(queue_t * q,unsigned char pri)2520 bcanput(queue_t *q, unsigned char pri)
2521 {
2522 	qband_t *qbp;
2523 
2524 	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2525 	if (!q)
2526 		return (0);
2527 
2528 	/* Find next forward module that has a service procedure */
2529 	q = q->q_nfsrv;
2530 
2531 	mutex_enter(QLOCK(q));
2532 	if (pri == 0) {
2533 		if (q->q_flag & QFULL) {
2534 			q->q_flag |= QWANTW;
2535 			mutex_exit(QLOCK(q));
2536 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2537 			    "bcanput:%p %X %d", q, pri, 0);
2538 			return (0);
2539 		}
2540 	} else {	/* pri != 0 */
2541 		if (pri > q->q_nband) {
2542 			/*
2543 			 * No band exists yet, so return success.
2544 			 */
2545 			mutex_exit(QLOCK(q));
2546 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2547 			    "bcanput:%p %X %d", q, pri, 1);
2548 			return (1);
2549 		}
2550 		qbp = q->q_bandp;
2551 		while (--pri)
2552 			qbp = qbp->qb_next;
2553 		if (qbp->qb_flag & QB_FULL) {
2554 			qbp->qb_flag |= QB_WANTW;
2555 			mutex_exit(QLOCK(q));
2556 			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2557 			    "bcanput:%p %X %d", q, pri, 0);
2558 			return (0);
2559 		}
2560 	}
2561 	mutex_exit(QLOCK(q));
2562 	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2563 	    "bcanput:%p %X %d", q, pri, 1);
2564 	return (1);
2565 }
2566 
2567 /*
2568  * Put a message on a queue.
2569  *
2570  * Messages are enqueued on a priority basis.  The priority classes
2571  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2572  * and B_NORMAL (type < QPCTL && band == 0).
2573  *
2574  * Add appropriate weighted data block sizes to queue count.
2575  * If queue hits high water mark then set QFULL flag.
2576  *
2577  * If QNOENAB is not set (putq is allowed to enable the queue),
2578  * enable the queue only if the message is PRIORITY,
2579  * or the QWANTR flag is set (indicating that the service procedure
2580  * is ready to read the queue.  This implies that a service
2581  * procedure must NEVER put a high priority message back on its own
2582  * queue, as this would result in an infinite loop (!).
2583  */
2584 int
putq(queue_t * q,mblk_t * bp)2585 putq(queue_t *q, mblk_t *bp)
2586 {
2587 	mblk_t *tmp;
2588 	qband_t *qbp = NULL;
2589 	int mcls = (int)queclass(bp);
2590 	kthread_id_t freezer;
2591 	int	bytecnt = 0, mblkcnt = 0;
2592 
2593 	freezer = STREAM(q)->sd_freezer;
2594 	if (freezer == curthread) {
2595 		ASSERT(frozenstr(q));
2596 		ASSERT(MUTEX_HELD(QLOCK(q)));
2597 	} else
2598 		mutex_enter(QLOCK(q));
2599 
2600 	/*
2601 	 * Make sanity checks and if qband structure is not yet
2602 	 * allocated, do so.
2603 	 */
2604 	if (mcls == QPCTL) {
2605 		if (bp->b_band != 0)
2606 			bp->b_band = 0;		/* force to be correct */
2607 	} else if (bp->b_band != 0) {
2608 		int i;
2609 		qband_t **qbpp;
2610 
2611 		if (bp->b_band > q->q_nband) {
2612 
2613 			/*
2614 			 * The qband structure for this priority band is
2615 			 * not on the queue yet, so we have to allocate
2616 			 * one on the fly.  It would be wasteful to
2617 			 * associate the qband structures with every
2618 			 * queue when the queues are allocated.  This is
2619 			 * because most queues will only need the normal
2620 			 * band of flow which can be described entirely
2621 			 * by the queue itself.
2622 			 */
2623 			qbpp = &q->q_bandp;
2624 			while (*qbpp)
2625 				qbpp = &(*qbpp)->qb_next;
2626 			while (bp->b_band > q->q_nband) {
2627 				if ((*qbpp = allocband()) == NULL) {
2628 					if (freezer != curthread)
2629 						mutex_exit(QLOCK(q));
2630 					return (0);
2631 				}
2632 				(*qbpp)->qb_hiwat = q->q_hiwat;
2633 				(*qbpp)->qb_lowat = q->q_lowat;
2634 				q->q_nband++;
2635 				qbpp = &(*qbpp)->qb_next;
2636 			}
2637 		}
2638 		ASSERT(MUTEX_HELD(QLOCK(q)));
2639 		qbp = q->q_bandp;
2640 		i = bp->b_band;
2641 		while (--i)
2642 			qbp = qbp->qb_next;
2643 	}
2644 
2645 	/*
2646 	 * If queue is empty, add the message and initialize the pointers.
2647 	 * Otherwise, adjust message pointers and queue pointers based on
2648 	 * the type of the message and where it belongs on the queue.  Some
2649 	 * code is duplicated to minimize the number of conditionals and
2650 	 * hopefully minimize the amount of time this routine takes.
2651 	 */
2652 	if (!q->q_first) {
2653 		bp->b_next = NULL;
2654 		bp->b_prev = NULL;
2655 		q->q_first = bp;
2656 		q->q_last = bp;
2657 		if (qbp) {
2658 			qbp->qb_first = bp;
2659 			qbp->qb_last = bp;
2660 		}
2661 	} else if (!qbp) {	/* bp->b_band == 0 */
2662 
2663 		/*
2664 		 * If queue class of message is less than or equal to
2665 		 * that of the last one on the queue, tack on to the end.
2666 		 */
2667 		tmp = q->q_last;
2668 		if (mcls <= (int)queclass(tmp)) {
2669 			bp->b_next = NULL;
2670 			bp->b_prev = tmp;
2671 			tmp->b_next = bp;
2672 			q->q_last = bp;
2673 		} else {
2674 			tmp = q->q_first;
2675 			while ((int)queclass(tmp) >= mcls)
2676 				tmp = tmp->b_next;
2677 
2678 			/*
2679 			 * Insert bp before tmp.
2680 			 */
2681 			bp->b_next = tmp;
2682 			bp->b_prev = tmp->b_prev;
2683 			if (tmp->b_prev)
2684 				tmp->b_prev->b_next = bp;
2685 			else
2686 				q->q_first = bp;
2687 			tmp->b_prev = bp;
2688 		}
2689 	} else {		/* bp->b_band != 0 */
2690 		if (qbp->qb_first) {
2691 			tmp = qbp->qb_last;
2692 
2693 			/*
2694 			 * Insert bp after the last message in this band.
2695 			 */
2696 			bp->b_next = tmp->b_next;
2697 			if (tmp->b_next)
2698 				tmp->b_next->b_prev = bp;
2699 			else
2700 				q->q_last = bp;
2701 			bp->b_prev = tmp;
2702 			tmp->b_next = bp;
2703 		} else {
2704 			tmp = q->q_last;
2705 			if ((mcls < (int)queclass(tmp)) ||
2706 			    (bp->b_band <= tmp->b_band)) {
2707 
2708 				/*
2709 				 * Tack bp on end of queue.
2710 				 */
2711 				bp->b_next = NULL;
2712 				bp->b_prev = tmp;
2713 				tmp->b_next = bp;
2714 				q->q_last = bp;
2715 			} else {
2716 				tmp = q->q_first;
2717 				while (tmp->b_datap->db_type >= QPCTL)
2718 					tmp = tmp->b_next;
2719 				while (tmp->b_band >= bp->b_band)
2720 					tmp = tmp->b_next;
2721 
2722 				/*
2723 				 * Insert bp before tmp.
2724 				 */
2725 				bp->b_next = tmp;
2726 				bp->b_prev = tmp->b_prev;
2727 				if (tmp->b_prev)
2728 					tmp->b_prev->b_next = bp;
2729 				else
2730 					q->q_first = bp;
2731 				tmp->b_prev = bp;
2732 			}
2733 			qbp->qb_first = bp;
2734 		}
2735 		qbp->qb_last = bp;
2736 	}
2737 
2738 	/* Get message byte count for q_count accounting */
2739 	bytecnt = mp_cont_len(bp, &mblkcnt);
2740 
2741 	if (qbp) {
2742 		qbp->qb_count += bytecnt;
2743 		qbp->qb_mblkcnt += mblkcnt;
2744 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2745 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2746 			qbp->qb_flag |= QB_FULL;
2747 		}
2748 	} else {
2749 		q->q_count += bytecnt;
2750 		q->q_mblkcnt += mblkcnt;
2751 		if ((q->q_count >= q->q_hiwat) ||
2752 		    (q->q_mblkcnt >= q->q_hiwat)) {
2753 			q->q_flag |= QFULL;
2754 		}
2755 	}
2756 
2757 	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0);
2758 
2759 	if ((mcls > QNORM) ||
2760 	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2761 		qenable_locked(q);
2762 	ASSERT(MUTEX_HELD(QLOCK(q)));
2763 	if (freezer != curthread)
2764 		mutex_exit(QLOCK(q));
2765 
2766 	return (1);
2767 }
2768 
2769 /*
2770  * Put stuff back at beginning of Q according to priority order.
2771  * See comment on putq above for details.
2772  */
2773 int
putbq(queue_t * q,mblk_t * bp)2774 putbq(queue_t *q, mblk_t *bp)
2775 {
2776 	mblk_t *tmp;
2777 	qband_t *qbp = NULL;
2778 	int mcls = (int)queclass(bp);
2779 	kthread_id_t freezer;
2780 	int	bytecnt = 0, mblkcnt = 0;
2781 
2782 	ASSERT(q && bp);
2783 	ASSERT(bp->b_next == NULL);
2784 	freezer = STREAM(q)->sd_freezer;
2785 	if (freezer == curthread) {
2786 		ASSERT(frozenstr(q));
2787 		ASSERT(MUTEX_HELD(QLOCK(q)));
2788 	} else
2789 		mutex_enter(QLOCK(q));
2790 
2791 	/*
2792 	 * Make sanity checks and if qband structure is not yet
2793 	 * allocated, do so.
2794 	 */
2795 	if (mcls == QPCTL) {
2796 		if (bp->b_band != 0)
2797 			bp->b_band = 0;		/* force to be correct */
2798 	} else if (bp->b_band != 0) {
2799 		int i;
2800 		qband_t **qbpp;
2801 
2802 		if (bp->b_band > q->q_nband) {
2803 			qbpp = &q->q_bandp;
2804 			while (*qbpp)
2805 				qbpp = &(*qbpp)->qb_next;
2806 			while (bp->b_band > q->q_nband) {
2807 				if ((*qbpp = allocband()) == NULL) {
2808 					if (freezer != curthread)
2809 						mutex_exit(QLOCK(q));
2810 					return (0);
2811 				}
2812 				(*qbpp)->qb_hiwat = q->q_hiwat;
2813 				(*qbpp)->qb_lowat = q->q_lowat;
2814 				q->q_nband++;
2815 				qbpp = &(*qbpp)->qb_next;
2816 			}
2817 		}
2818 		qbp = q->q_bandp;
2819 		i = bp->b_band;
2820 		while (--i)
2821 			qbp = qbp->qb_next;
2822 	}
2823 
2824 	/*
2825 	 * If queue is empty or if message is high priority,
2826 	 * place on the front of the queue.
2827 	 */
2828 	tmp = q->q_first;
2829 	if ((!tmp) || (mcls == QPCTL)) {
2830 		bp->b_next = tmp;
2831 		if (tmp)
2832 			tmp->b_prev = bp;
2833 		else
2834 			q->q_last = bp;
2835 		q->q_first = bp;
2836 		bp->b_prev = NULL;
2837 		if (qbp) {
2838 			qbp->qb_first = bp;
2839 			qbp->qb_last = bp;
2840 		}
2841 	} else if (qbp) {	/* bp->b_band != 0 */
2842 		tmp = qbp->qb_first;
2843 		if (tmp) {
2844 
2845 			/*
2846 			 * Insert bp before the first message in this band.
2847 			 */
2848 			bp->b_next = tmp;
2849 			bp->b_prev = tmp->b_prev;
2850 			if (tmp->b_prev)
2851 				tmp->b_prev->b_next = bp;
2852 			else
2853 				q->q_first = bp;
2854 			tmp->b_prev = bp;
2855 		} else {
2856 			tmp = q->q_last;
2857 			if ((mcls < (int)queclass(tmp)) ||
2858 			    (bp->b_band < tmp->b_band)) {
2859 
2860 				/*
2861 				 * Tack bp on end of queue.
2862 				 */
2863 				bp->b_next = NULL;
2864 				bp->b_prev = tmp;
2865 				tmp->b_next = bp;
2866 				q->q_last = bp;
2867 			} else {
2868 				tmp = q->q_first;
2869 				while (tmp->b_datap->db_type >= QPCTL)
2870 					tmp = tmp->b_next;
2871 				while (tmp->b_band > bp->b_band)
2872 					tmp = tmp->b_next;
2873 
2874 				/*
2875 				 * Insert bp before tmp.
2876 				 */
2877 				bp->b_next = tmp;
2878 				bp->b_prev = tmp->b_prev;
2879 				if (tmp->b_prev)
2880 					tmp->b_prev->b_next = bp;
2881 				else
2882 					q->q_first = bp;
2883 				tmp->b_prev = bp;
2884 			}
2885 			qbp->qb_last = bp;
2886 		}
2887 		qbp->qb_first = bp;
2888 	} else {		/* bp->b_band == 0 && !QPCTL */
2889 
2890 		/*
2891 		 * If the queue class or band is less than that of the last
2892 		 * message on the queue, tack bp on the end of the queue.
2893 		 */
2894 		tmp = q->q_last;
2895 		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2896 			bp->b_next = NULL;
2897 			bp->b_prev = tmp;
2898 			tmp->b_next = bp;
2899 			q->q_last = bp;
2900 		} else {
2901 			tmp = q->q_first;
2902 			while (tmp->b_datap->db_type >= QPCTL)
2903 				tmp = tmp->b_next;
2904 			while (tmp->b_band > bp->b_band)
2905 				tmp = tmp->b_next;
2906 
2907 			/*
2908 			 * Insert bp before tmp.
2909 			 */
2910 			bp->b_next = tmp;
2911 			bp->b_prev = tmp->b_prev;
2912 			if (tmp->b_prev)
2913 				tmp->b_prev->b_next = bp;
2914 			else
2915 				q->q_first = bp;
2916 			tmp->b_prev = bp;
2917 		}
2918 	}
2919 
2920 	/* Get message byte count for q_count accounting */
2921 	bytecnt = mp_cont_len(bp, &mblkcnt);
2922 
2923 	if (qbp) {
2924 		qbp->qb_count += bytecnt;
2925 		qbp->qb_mblkcnt += mblkcnt;
2926 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
2927 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2928 			qbp->qb_flag |= QB_FULL;
2929 		}
2930 	} else {
2931 		q->q_count += bytecnt;
2932 		q->q_mblkcnt += mblkcnt;
2933 		if ((q->q_count >= q->q_hiwat) ||
2934 		    (q->q_mblkcnt >= q->q_hiwat)) {
2935 			q->q_flag |= QFULL;
2936 		}
2937 	}
2938 
2939 	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0);
2940 
2941 	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2942 		qenable_locked(q);
2943 	ASSERT(MUTEX_HELD(QLOCK(q)));
2944 	if (freezer != curthread)
2945 		mutex_exit(QLOCK(q));
2946 
2947 	return (1);
2948 }
2949 
2950 /*
2951  * Insert a message before an existing message on the queue.  If the
2952  * existing message is NULL, the new messages is placed on the end of
2953  * the queue.  The queue class of the new message is ignored.  However,
2954  * the priority band of the new message must adhere to the following
2955  * ordering:
2956  *
2957  *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2958  *
2959  * All flow control parameters are updated.
2960  *
2961  * insq can be called with the stream frozen, but other utility functions
2962  * holding QLOCK, and by streams modules without any locks/frozen.
2963  */
2964 int
insq(queue_t * q,mblk_t * emp,mblk_t * mp)2965 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2966 {
2967 	mblk_t *tmp;
2968 	qband_t *qbp = NULL;
2969 	int mcls = (int)queclass(mp);
2970 	kthread_id_t freezer;
2971 	int	bytecnt = 0, mblkcnt = 0;
2972 
2973 	freezer = STREAM(q)->sd_freezer;
2974 	if (freezer == curthread) {
2975 		ASSERT(frozenstr(q));
2976 		ASSERT(MUTEX_HELD(QLOCK(q)));
2977 	} else if (MUTEX_HELD(QLOCK(q))) {
2978 		/* Don't drop lock on exit */
2979 		freezer = curthread;
2980 	} else
2981 		mutex_enter(QLOCK(q));
2982 
2983 	if (mcls == QPCTL) {
2984 		if (mp->b_band != 0)
2985 			mp->b_band = 0;		/* force to be correct */
2986 		if (emp && emp->b_prev &&
2987 		    (emp->b_prev->b_datap->db_type < QPCTL))
2988 			goto badord;
2989 	}
2990 	if (emp) {
2991 		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2992 		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2993 		    (emp->b_prev->b_band < mp->b_band))) {
2994 			goto badord;
2995 		}
2996 	} else {
2997 		tmp = q->q_last;
2998 		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
2999 badord:
3000 			cmn_err(CE_WARN,
3001 			    "insq: attempt to insert message out of order "
3002 			    "on q %p", (void *)q);
3003 			if (freezer != curthread)
3004 				mutex_exit(QLOCK(q));
3005 			return (0);
3006 		}
3007 	}
3008 
3009 	if (mp->b_band != 0) {
3010 		int i;
3011 		qband_t **qbpp;
3012 
3013 		if (mp->b_band > q->q_nband) {
3014 			qbpp = &q->q_bandp;
3015 			while (*qbpp)
3016 				qbpp = &(*qbpp)->qb_next;
3017 			while (mp->b_band > q->q_nband) {
3018 				if ((*qbpp = allocband()) == NULL) {
3019 					if (freezer != curthread)
3020 						mutex_exit(QLOCK(q));
3021 					return (0);
3022 				}
3023 				(*qbpp)->qb_hiwat = q->q_hiwat;
3024 				(*qbpp)->qb_lowat = q->q_lowat;
3025 				q->q_nband++;
3026 				qbpp = &(*qbpp)->qb_next;
3027 			}
3028 		}
3029 		qbp = q->q_bandp;
3030 		i = mp->b_band;
3031 		while (--i)
3032 			qbp = qbp->qb_next;
3033 	}
3034 
3035 	if ((mp->b_next = emp) != NULL) {
3036 		if ((mp->b_prev = emp->b_prev) != NULL)
3037 			emp->b_prev->b_next = mp;
3038 		else
3039 			q->q_first = mp;
3040 		emp->b_prev = mp;
3041 	} else {
3042 		if ((mp->b_prev = q->q_last) != NULL)
3043 			q->q_last->b_next = mp;
3044 		else
3045 			q->q_first = mp;
3046 		q->q_last = mp;
3047 	}
3048 
3049 	/* Get mblk and byte count for q_count accounting */
3050 	bytecnt = mp_cont_len(mp, &mblkcnt);
3051 
3052 	if (qbp) {	/* adjust qband pointers and count */
3053 		if (!qbp->qb_first) {
3054 			qbp->qb_first = mp;
3055 			qbp->qb_last = mp;
3056 		} else {
3057 			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3058 			    (mp->b_prev->b_band != mp->b_band)))
3059 				qbp->qb_first = mp;
3060 			else if (mp->b_next == NULL || (mp->b_next != NULL &&
3061 			    (mp->b_next->b_band != mp->b_band)))
3062 				qbp->qb_last = mp;
3063 		}
3064 		qbp->qb_count += bytecnt;
3065 		qbp->qb_mblkcnt += mblkcnt;
3066 		if ((qbp->qb_count >= qbp->qb_hiwat) ||
3067 		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3068 			qbp->qb_flag |= QB_FULL;
3069 		}
3070 	} else {
3071 		q->q_count += bytecnt;
3072 		q->q_mblkcnt += mblkcnt;
3073 		if ((q->q_count >= q->q_hiwat) ||
3074 		    (q->q_mblkcnt >= q->q_hiwat)) {
3075 			q->q_flag |= QFULL;
3076 		}
3077 	}
3078 
3079 	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0);
3080 
3081 	if (canenable(q) && (q->q_flag & QWANTR))
3082 		qenable_locked(q);
3083 
3084 	ASSERT(MUTEX_HELD(QLOCK(q)));
3085 	if (freezer != curthread)
3086 		mutex_exit(QLOCK(q));
3087 
3088 	return (1);
3089 }
3090 
3091 /*
3092  * Create and put a control message on queue.
3093  */
3094 int
putctl(queue_t * q,int type)3095 putctl(queue_t *q, int type)
3096 {
3097 	mblk_t *bp;
3098 
3099 	if ((datamsg(type) && (type != M_DELAY)) ||
3100 	    (bp = allocb_tryhard(0)) == NULL)
3101 		return (0);
3102 	bp->b_datap->db_type = (unsigned char) type;
3103 
3104 	put(q, bp);
3105 
3106 	return (1);
3107 }
3108 
3109 /*
3110  * Control message with a single-byte parameter
3111  */
3112 int
putctl1(queue_t * q,int type,int param)3113 putctl1(queue_t *q, int type, int param)
3114 {
3115 	mblk_t *bp;
3116 
3117 	if ((datamsg(type) && (type != M_DELAY)) ||
3118 	    (bp = allocb_tryhard(1)) == NULL)
3119 		return (0);
3120 	bp->b_datap->db_type = (unsigned char)type;
3121 	*bp->b_wptr++ = (unsigned char)param;
3122 
3123 	put(q, bp);
3124 
3125 	return (1);
3126 }
3127 
3128 int
putnextctl1(queue_t * q,int type,int param)3129 putnextctl1(queue_t *q, int type, int param)
3130 {
3131 	mblk_t *bp;
3132 
3133 	if ((datamsg(type) && (type != M_DELAY)) ||
3134 	    ((bp = allocb_tryhard(1)) == NULL))
3135 		return (0);
3136 
3137 	bp->b_datap->db_type = (unsigned char)type;
3138 	*bp->b_wptr++ = (unsigned char)param;
3139 
3140 	putnext(q, bp);
3141 
3142 	return (1);
3143 }
3144 
3145 int
putnextctl(queue_t * q,int type)3146 putnextctl(queue_t *q, int type)
3147 {
3148 	mblk_t *bp;
3149 
3150 	if ((datamsg(type) && (type != M_DELAY)) ||
3151 	    ((bp = allocb_tryhard(0)) == NULL))
3152 		return (0);
3153 	bp->b_datap->db_type = (unsigned char)type;
3154 
3155 	putnext(q, bp);
3156 
3157 	return (1);
3158 }
3159 
3160 /*
3161  * Return the queue upstream from this one
3162  */
3163 queue_t *
backq(queue_t * q)3164 backq(queue_t *q)
3165 {
3166 	q = _OTHERQ(q);
3167 	if (q->q_next) {
3168 		q = q->q_next;
3169 		return (_OTHERQ(q));
3170 	}
3171 	return (NULL);
3172 }
3173 
3174 /*
3175  * Send a block back up the queue in reverse from this
3176  * one (e.g. to respond to ioctls)
3177  */
3178 void
qreply(queue_t * q,mblk_t * bp)3179 qreply(queue_t *q, mblk_t *bp)
3180 {
3181 	ASSERT(q && bp);
3182 
3183 	putnext(_OTHERQ(q), bp);
3184 }
3185 
3186 /*
3187  * Streams Queue Scheduling
3188  *
3189  * Queues are enabled through qenable() when they have messages to
3190  * process.  They are serviced by queuerun(), which runs each enabled
3191  * queue's service procedure.  The call to queuerun() is processor
3192  * dependent - the general principle is that it be run whenever a queue
3193  * is enabled but before returning to user level.  For system calls,
3194  * the function runqueues() is called if their action causes a queue
3195  * to be enabled.  For device interrupts, queuerun() should be
3196  * called before returning from the last level of interrupt.  Beyond
3197  * this, no timing assumptions should be made about queue scheduling.
3198  */
3199 
3200 /*
3201  * Enable a queue: put it on list of those whose service procedures are
3202  * ready to run and set up the scheduling mechanism.
3203  * The broadcast is done outside the mutex -> to avoid the woken thread
3204  * from contending with the mutex. This is OK 'cos the queue has been
3205  * enqueued on the runlist and flagged safely at this point.
3206  */
3207 void
qenable(queue_t * q)3208 qenable(queue_t *q)
3209 {
3210 	mutex_enter(QLOCK(q));
3211 	qenable_locked(q);
3212 	mutex_exit(QLOCK(q));
3213 }
3214 /*
3215  * Return number of messages on queue
3216  */
3217 int
qsize(queue_t * qp)3218 qsize(queue_t *qp)
3219 {
3220 	int count = 0;
3221 	mblk_t *mp;
3222 
3223 	mutex_enter(QLOCK(qp));
3224 	for (mp = qp->q_first; mp; mp = mp->b_next)
3225 		count++;
3226 	mutex_exit(QLOCK(qp));
3227 	return (count);
3228 }
3229 
3230 /*
3231  * noenable - set queue so that putq() will not enable it.
3232  * enableok - set queue so that putq() can enable it.
3233  */
3234 void
noenable(queue_t * q)3235 noenable(queue_t *q)
3236 {
3237 	mutex_enter(QLOCK(q));
3238 	q->q_flag |= QNOENB;
3239 	mutex_exit(QLOCK(q));
3240 }
3241 
3242 void
enableok(queue_t * q)3243 enableok(queue_t *q)
3244 {
3245 	mutex_enter(QLOCK(q));
3246 	q->q_flag &= ~QNOENB;
3247 	mutex_exit(QLOCK(q));
3248 }
3249 
3250 /*
3251  * Set queue fields.
3252  */
3253 int
strqset(queue_t * q,qfields_t what,unsigned char pri,intptr_t val)3254 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3255 {
3256 	qband_t *qbp = NULL;
3257 	queue_t	*wrq;
3258 	int error = 0;
3259 	kthread_id_t freezer;
3260 
3261 	freezer = STREAM(q)->sd_freezer;
3262 	if (freezer == curthread) {
3263 		ASSERT(frozenstr(q));
3264 		ASSERT(MUTEX_HELD(QLOCK(q)));
3265 	} else
3266 		mutex_enter(QLOCK(q));
3267 
3268 	if (what >= QBAD) {
3269 		error = EINVAL;
3270 		goto done;
3271 	}
3272 	if (pri != 0) {
3273 		int i;
3274 		qband_t **qbpp;
3275 
3276 		if (pri > q->q_nband) {
3277 			qbpp = &q->