xref: /illumos-gate/usr/src/uts/common/os/strsubr.c (revision 9b664393)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
22 /*	  All Rights Reserved	*/
23 
24 
25 /*
26  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
27  * Use is subject to license terms.
28  * Copyright (c) 2016 by Delphix. All rights reserved.
29  * Copyright 2018 OmniOS Community Edition (OmniOSce) Association.
30  * Copyright 2018 Joyent, Inc.
31  * Copyright 2022 Garrett D'Amore
32  */
33 
34 #include <sys/types.h>
35 #include <sys/sysmacros.h>
36 #include <sys/param.h>
37 #include <sys/errno.h>
38 #include <sys/signal.h>
39 #include <sys/proc.h>
40 #include <sys/conf.h>
41 #include <sys/cred.h>
42 #include <sys/user.h>
43 #include <sys/vnode.h>
44 #include <sys/file.h>
45 #include <sys/session.h>
46 #include <sys/stream.h>
47 #include <sys/strsubr.h>
48 #include <sys/stropts.h>
49 #include <sys/poll.h>
50 #include <sys/systm.h>
51 #include <sys/cpuvar.h>
52 #include <sys/uio.h>
53 #include <sys/cmn_err.h>
54 #include <sys/priocntl.h>
55 #include <sys/procset.h>
56 #include <sys/vmem.h>
57 #include <sys/bitmap.h>
58 #include <sys/kmem.h>
59 #include <sys/siginfo.h>
60 #include <sys/vtrace.h>
61 #include <sys/callb.h>
62 #include <sys/debug.h>
63 #include <sys/modctl.h>
64 #include <sys/vmsystm.h>
65 #include <vm/page.h>
66 #include <sys/atomic.h>
67 #include <sys/suntpi.h>
68 #include <sys/strlog.h>
69 #include <sys/promif.h>
70 #include <sys/project.h>
71 #include <sys/vm.h>
72 #include <sys/taskq.h>
73 #include <sys/sunddi.h>
74 #include <sys/sunldi_impl.h>
75 #include <sys/strsun.h>
76 #include <sys/isa_defs.h>
77 #include <sys/pattr.h>
78 #include <sys/strft.h>
79 #include <sys/fs/snode.h>
80 #include <sys/zone.h>
81 #include <sys/open.h>
82 #include <sys/sunldi.h>
83 #include <sys/sad.h>
84 #include <sys/netstack.h>
85 
86 #define	O_SAMESTR(q)	(((q)->q_next) && \
87 	(((q)->q_flag & QREADR) == ((q)->q_next->q_flag & QREADR)))
88 
89 /*
90  * WARNING:
91  * The variables and routines in this file are private, belonging
92  * to the STREAMS subsystem. These should not be used by modules
93  * or drivers. Compatibility will not be guaranteed.
94  */
95 
96 /*
97  * Id value used to distinguish between different multiplexor links.
98  */
99 static int32_t lnk_id = 0;
100 
101 #define	STREAMS_LOPRI MINCLSYSPRI
102 static pri_t streams_lopri = STREAMS_LOPRI;
103 
104 #define	STRSTAT(x)	(str_statistics.x.value.ui64++)
105 typedef struct str_stat {
106 	kstat_named_t	sqenables;
107 	kstat_named_t	stenables;
108 	kstat_named_t	syncqservice;
109 	kstat_named_t	freebs;
110 	kstat_named_t	qwr_outer;
111 	kstat_named_t	rservice;
112 	kstat_named_t	strwaits;
113 	kstat_named_t	taskqfails;
114 	kstat_named_t	bufcalls;
115 	kstat_named_t	qhelps;
116 	kstat_named_t	qremoved;
117 	kstat_named_t	sqremoved;
118 	kstat_named_t	bcwaits;
119 	kstat_named_t	sqtoomany;
120 } str_stat_t;
121 
122 static str_stat_t str_statistics = {
123 	{ "sqenables",		KSTAT_DATA_UINT64 },
124 	{ "stenables",		KSTAT_DATA_UINT64 },
125 	{ "syncqservice",	KSTAT_DATA_UINT64 },
126 	{ "freebs",		KSTAT_DATA_UINT64 },
127 	{ "qwr_outer",		KSTAT_DATA_UINT64 },
128 	{ "rservice",		KSTAT_DATA_UINT64 },
129 	{ "strwaits",		KSTAT_DATA_UINT64 },
130 	{ "taskqfails",		KSTAT_DATA_UINT64 },
131 	{ "bufcalls",		KSTAT_DATA_UINT64 },
132 	{ "qhelps",		KSTAT_DATA_UINT64 },
133 	{ "qremoved",		KSTAT_DATA_UINT64 },
134 	{ "sqremoved",		KSTAT_DATA_UINT64 },
135 	{ "bcwaits",		KSTAT_DATA_UINT64 },
136 	{ "sqtoomany",		KSTAT_DATA_UINT64 },
137 };
138 
139 static kstat_t *str_kstat;
140 
141 /*
142  * qrunflag was used previously to control background scheduling of queues. It
143  * is not used anymore, but kept here in case some module still wants to access
144  * it via qready() and setqsched macros.
145  */
146 char qrunflag;			/*  Unused */
147 
148 /*
149  * Most of the streams scheduling is done via task queues. Task queues may fail
150  * for non-sleep dispatches, so there are two backup threads servicing failed
151  * requests for queues and syncqs. Both of these threads also service failed
152  * dispatches freebs requests. Queues are put in the list specified by `qhead'
153  * and `qtail' pointers, syncqs use `sqhead' and `sqtail' pointers and freebs
154  * requests are put into `freebs_list' which has no tail pointer. All three
155  * lists are protected by a single `service_queue' lock and use
156  * `services_to_run' condition variable for signaling background threads. Use of
157  * a single lock should not be a problem because it is only used under heavy
158  * loads when task queues start to fail and at that time it may be a good idea
159  * to throttle scheduling requests.
160  *
161  * NOTE: queues and syncqs should be scheduled by two separate threads because
162  * queue servicing may be blocked waiting for a syncq which may be also
163  * scheduled for background execution. This may create a deadlock when only one
164  * thread is used for both.
165  */
166 
167 static taskq_t *streams_taskq;		/* Used for most STREAMS scheduling */
168 
169 static kmutex_t service_queue;		/* protects all of servicing vars */
170 static kcondvar_t services_to_run;	/* wake up background service thread */
171 static kcondvar_t syncqs_to_run;	/* wake up background service thread */
172 
173 /*
174  * List of queues scheduled for background processing due to lack of resources
175  * in the task queues. Protected by service_queue lock;
176  */
177 static struct queue *qhead;
178 static struct queue *qtail;
179 
180 /*
181  * Same list for syncqs
182  */
183 static syncq_t *sqhead;
184 static syncq_t *sqtail;
185 
186 static mblk_t *freebs_list;	/* list of buffers to free */
187 
188 /*
189  * Backup threads for servicing queues and syncqs
190  */
191 kthread_t *streams_qbkgrnd_thread;
192 kthread_t *streams_sqbkgrnd_thread;
193 
194 /*
195  * Bufcalls related variables.
196  */
197 struct bclist	strbcalls;	/* list of waiting bufcalls */
198 kmutex_t	strbcall_lock;	/* protects bufcall list (strbcalls) */
199 kcondvar_t	strbcall_cv;	/* Signaling when a bufcall is added */
200 kmutex_t	bcall_monitor;	/* sleep/wakeup style monitor */
201 kcondvar_t	bcall_cv;	/* wait 'till executing bufcall completes */
202 kthread_t	*bc_bkgrnd_thread; /* Thread to service bufcall requests */
203 
204 kmutex_t	strresources;	/* protects global resources */
205 kmutex_t	muxifier;	/* single-threads multiplexor creation */
206 
207 static void	*str_stack_init(netstackid_t stackid, netstack_t *ns);
208 static void	str_stack_shutdown(netstackid_t stackid, void *arg);
209 static void	str_stack_fini(netstackid_t stackid, void *arg);
210 
211 /*
212  * run_queues is no longer used, but is kept in case some 3rd party
213  * module/driver decides to use it.
214  */
215 int run_queues = 0;
216 
217 /*
218  * sq_max_size is the depth of the syncq (in number of messages) before
219  * qfill_syncq() starts QFULL'ing destination queues. As its primary
220  * consumer - IP is no longer D_MTPERMOD, but there may be other
221  * modules/drivers depend on this syncq flow control, we prefer to
222  * choose a large number as the default value. For potential
223  * performance gain, this value is tunable in /etc/system.
224  */
225 int sq_max_size = 10000;
226 
227 /*
228  * The number of ciputctrl structures per syncq and stream we create when
229  * needed.
230  */
231 int n_ciputctrl;
232 int max_n_ciputctrl = 16;
233 /*
234  * If n_ciputctrl is < min_n_ciputctrl don't even create ciputctrl_cache.
235  */
236 int min_n_ciputctrl = 2;
237 
238 /*
239  * Per-driver/module syncqs
240  * ========================
241  *
242  * For drivers/modules that use PERMOD or outer syncqs we keep a list of
243  * perdm structures, new entries being added (and new syncqs allocated) when
244  * setq() encounters a module/driver with a streamtab that it hasn't seen
245  * before.
246  * The reason for this mechanism is that some modules and drivers share a
247  * common streamtab and it is necessary for those modules and drivers to also
248  * share a common PERMOD syncq.
249  *
250  * perdm_list --> dm_str == streamtab_1
251  *                dm_sq == syncq_1
252  *                dm_ref
253  *                dm_next --> dm_str == streamtab_2
254  *                            dm_sq == syncq_2
255  *                            dm_ref
256  *                            dm_next --> ... NULL
257  *
258  * The dm_ref field is incremented for each new driver/module that takes
259  * a reference to the perdm structure and hence shares the syncq.
260  * References are held in the fmodsw_impl_t structure for each STREAMS module
261  * or the dev_impl array (indexed by device major number) for each driver.
262  *
263  * perdm_list -> [dm_ref == 1] -> [dm_ref == 2] -> [dm_ref == 1] -> NULL
264  *		     ^                 ^ ^               ^
265  *                   |  ______________/  |               |
266  *                   | /                 |               |
267  * dev_impl:     ...|x|y|...          module A	      module B
268  *
269  * When a module/driver is unloaded the reference count is decremented and,
270  * when it falls to zero, the perdm structure is removed from the list and
271  * the syncq is freed (see rele_dm()).
272  */
273 perdm_t *perdm_list = NULL;
274 static krwlock_t perdm_rwlock;
275 cdevsw_impl_t *devimpl;
276 
277 extern struct qinit strdata;
278 extern struct qinit stwdata;
279 
280 static void runservice(queue_t *);
281 static void streams_bufcall_service(void);
282 static void streams_qbkgrnd_service(void);
283 static void streams_sqbkgrnd_service(void);
284 static syncq_t *new_syncq(void);
285 static void free_syncq(syncq_t *);
286 static void outer_insert(syncq_t *, syncq_t *);
287 static void outer_remove(syncq_t *, syncq_t *);
288 static void write_now(syncq_t *);
289 static void clr_qfull(queue_t *);
290 static void runbufcalls(void);
291 static void sqenable(syncq_t *);
292 static void sqfill_events(syncq_t *, queue_t *, mblk_t *, void (*)());
293 static void wait_q_syncq(queue_t *);
294 static void backenable_insertedq(queue_t *);
295 
296 static void queue_service(queue_t *);
297 static void stream_service(stdata_t *);
298 static void syncq_service(syncq_t *);
299 static void qwriter_outer_service(syncq_t *);
300 static void mblk_free(mblk_t *);
301 #ifdef DEBUG
302 static int qprocsareon(queue_t *);
303 #endif
304 
305 static void set_nfsrv_ptr(queue_t *, queue_t *, queue_t *, queue_t *);
306 static void reset_nfsrv_ptr(queue_t *, queue_t *);
307 void set_qfull(queue_t *);
308 
309 static void sq_run_events(syncq_t *);
310 static int propagate_syncq(queue_t *);
311 
312 static void	blocksq(syncq_t *, ushort_t, int);
313 static void	unblocksq(syncq_t *, ushort_t, int);
314 static int	dropsq(syncq_t *, uint16_t);
315 static void	emptysq(syncq_t *);
316 static sqlist_t *sqlist_alloc(struct stdata *, int);
317 static void	sqlist_free(sqlist_t *);
318 static sqlist_t	*sqlist_build(queue_t *, struct stdata *, boolean_t);
319 static void	sqlist_insert(sqlist_t *, syncq_t *);
320 static void	sqlist_insertall(sqlist_t *, queue_t *);
321 
322 static void	strsetuio(stdata_t *);
323 
324 struct kmem_cache *stream_head_cache;
325 struct kmem_cache *queue_cache;
326 struct kmem_cache *syncq_cache;
327 struct kmem_cache *qband_cache;
328 struct kmem_cache *linkinfo_cache;
329 struct kmem_cache *ciputctrl_cache = NULL;
330 
331 static linkinfo_t *linkinfo_list;
332 
333 /* Global esballoc throttling queue */
334 static esb_queue_t system_esbq;
335 
336 /* Array of esballoc throttling queues, of length esbq_nelem */
337 static esb_queue_t *volatile system_esbq_array;
338 static int esbq_nelem;
339 static kmutex_t esbq_lock;
340 static int esbq_log2_cpus_per_q = 0;
341 
342 /* Scale the system_esbq length by setting number of CPUs per queue. */
343 uint_t esbq_cpus_per_q = 1;
344 
345 /*
346  * esballoc tunable parameters.
347  */
348 int		esbq_max_qlen = 0x16;	/* throttled queue length */
349 clock_t		esbq_timeout = 0x8;	/* timeout to process esb queue */
350 
351 /*
352  * Routines to handle esballoc queueing.
353  */
354 static void esballoc_process_queue(esb_queue_t *);
355 static void esballoc_enqueue_mblk(mblk_t *);
356 static void esballoc_timer(void *);
357 static void esballoc_set_timer(esb_queue_t *, clock_t);
358 static void esballoc_mblk_free(mblk_t *);
359 
360 /*
361  *  Qinit structure and Module_info structures
362  *	for passthru read and write queues
363  */
364 
365 static int pass_rput(queue_t *, mblk_t *);
366 static int pass_wput(queue_t *, mblk_t *);
367 static queue_t *link_addpassthru(stdata_t *);
368 static void link_rempassthru(queue_t *);
369 
370 struct  module_info passthru_info = {
371 	0,
372 	"passthru",
373 	0,
374 	INFPSZ,
375 	STRHIGH,
376 	STRLOW
377 };
378 
379 struct  qinit passthru_rinit = {
380 	pass_rput,
381 	NULL,
382 	NULL,
383 	NULL,
384 	NULL,
385 	&passthru_info,
386 	NULL
387 };
388 
389 struct  qinit passthru_winit = {
390 	pass_wput,
391 	NULL,
392 	NULL,
393 	NULL,
394 	NULL,
395 	&passthru_info,
396 	NULL
397 };
398 
399 /*
400  * Verify correctness of list head/tail pointers.
401  */
402 #define	LISTCHECK(head, tail, link) {				\
403 	EQUIV(head, tail);					\
404 	IMPLY(tail != NULL, tail->link == NULL);		\
405 }
406 
407 /*
408  * Enqueue a list element `el' in the end of a list denoted by `head' and `tail'
409  * using a `link' field.
410  */
411 #define	ENQUEUE(el, head, tail, link) {				\
412 	ASSERT(el->link == NULL);				\
413 	LISTCHECK(head, tail, link);				\
414 	if (head == NULL)					\
415 		head = el;					\
416 	else							\
417 		tail->link = el;				\
418 	tail = el;						\
419 }
420 
421 /*
422  * Dequeue the first element of the list denoted by `head' and `tail' pointers
423  * using a `link' field and put result into `el'.
424  */
425 #define	DQ(el, head, tail, link) {				\
426 	LISTCHECK(head, tail, link);				\
427 	el = head;						\
428 	if (head != NULL) {					\
429 		head = head->link;				\
430 		if (head == NULL)				\
431 			tail = NULL;				\
432 		el->link = NULL;				\
433 	}							\
434 }
435 
436 /*
437  * Remove `el' from the list using `chase' and `curr' pointers and return result
438  * in `succeed'.
439  */
440 #define	RMQ(el, head, tail, link, chase, curr, succeed) {	\
441 	LISTCHECK(head, tail, link);				\
442 	chase = NULL;						\
443 	succeed = 0;						\
444 	for (curr = head; (curr != el) && (curr != NULL); curr = curr->link) \
445 		chase = curr;					\
446 	if (curr != NULL) {					\
447 		succeed = 1;					\
448 		ASSERT(curr == el);				\
449 		if (chase != NULL)				\
450 			chase->link = curr->link;		\
451 		else						\
452 			head = curr->link;			\
453 		curr->link = NULL;				\
454 		if (curr == tail)				\
455 			tail = chase;				\
456 	}							\
457 	LISTCHECK(head, tail, link);				\
458 }
459 
460 /* Handling of delayed messages on the inner syncq. */
461 
462 /*
463  * DEBUG versions should use function versions (to simplify tracing) and
464  * non-DEBUG kernels should use macro versions.
465  */
466 
467 /*
468  * Put a queue on the syncq list of queues.
469  * Assumes SQLOCK held.
470  */
471 #define	SQPUT_Q(sq, qp)							\
472 {									\
473 	ASSERT(MUTEX_HELD(SQLOCK(sq)));					\
474 	if (!(qp->q_sqflags & Q_SQQUEUED)) {				\
475 		/* The queue should not be linked anywhere */		\
476 		ASSERT((qp->q_sqprev == NULL) && (qp->q_sqnext == NULL)); \
477 		/* Head and tail may only be NULL simultaneously */	\
478 		EQUIV(sq->sq_head, sq->sq_tail);			\
479 		/* Queue may be only enqueued on its syncq */		\
480 		ASSERT(sq == qp->q_syncq);				\
481 		/* Check the correctness of SQ_MESSAGES flag */		\
482 		EQUIV(sq->sq_head, (sq->sq_flags & SQ_MESSAGES));	\
483 		/* Sanity check first/last elements of the list */	\
484 		IMPLY(sq->sq_head != NULL, sq->sq_head->q_sqprev == NULL);\
485 		IMPLY(sq->sq_tail != NULL, sq->sq_tail->q_sqnext == NULL);\
486 		/*							\
487 		 * Sanity check of priority field: empty queue should	\
488 		 * have zero priority					\
489 		 * and nqueues equal to zero.				\
490 		 */							\
491 		IMPLY(sq->sq_head == NULL, sq->sq_pri == 0);		\
492 		/* Sanity check of sq_nqueues field */			\
493 		EQUIV(sq->sq_head, sq->sq_nqueues);			\
494 		if (sq->sq_head == NULL) {				\
495 			sq->sq_head = sq->sq_tail = qp;			\
496 			sq->sq_flags |= SQ_MESSAGES;			\
497 		} else if (qp->q_spri == 0) {				\
498 			qp->q_sqprev = sq->sq_tail;			\
499 			sq->sq_tail->q_sqnext = qp;			\
500 			sq->sq_tail = qp;				\
501 		} else {						\
502 			/*						\
503 			 * Put this queue in priority order: higher	\
504 			 * priority gets closer to the head.		\
505 			 */						\
506 			queue_t **qpp = &sq->sq_tail;			\
507 			queue_t *qnext = NULL;				\
508 									\
509 			while (*qpp != NULL && qp->q_spri > (*qpp)->q_spri) { \
510 				qnext = *qpp;				\
511 				qpp = &(*qpp)->q_sqprev;		\
512 			}						\
513 			qp->q_sqnext = qnext;				\
514 			qp->q_sqprev = *qpp;				\
515 			if (*qpp != NULL) {				\
516 				(*qpp)->q_sqnext = qp;			\
517 			} else {					\
518 				sq->sq_head = qp;			\
519 				sq->sq_pri = sq->sq_head->q_spri;	\
520 			}						\
521 			*qpp = qp;					\
522 		}							\
523 		qp->q_sqflags |= Q_SQQUEUED;				\
524 		qp->q_sqtstamp = ddi_get_lbolt();			\
525 		sq->sq_nqueues++;					\
526 	}								\
527 }
528 
529 /*
530  * Remove a queue from the syncq list
531  * Assumes SQLOCK held.
532  */
533 #define	SQRM_Q(sq, qp)							\
534 	{								\
535 		ASSERT(MUTEX_HELD(SQLOCK(sq)));				\
536 		ASSERT(qp->q_sqflags & Q_SQQUEUED);			\
537 		ASSERT(sq->sq_head != NULL && sq->sq_tail != NULL);	\
538 		ASSERT((sq->sq_flags & SQ_MESSAGES) != 0);		\
539 		/* Check that the queue is actually in the list */	\
540 		ASSERT(qp->q_sqnext != NULL || sq->sq_tail == qp);	\
541 		ASSERT(qp->q_sqprev != NULL || sq->sq_head == qp);	\
542 		ASSERT(sq->sq_nqueues != 0);				\
543 		if (qp->q_sqprev == NULL) {				\
544 			/* First queue on list, make head q_sqnext */	\
545 			sq->sq_head = qp->q_sqnext;			\
546 		} else {						\
547 			/* Make prev->next == next */			\
548 			qp->q_sqprev->q_sqnext = qp->q_sqnext;		\
549 		}							\
550 		if (qp->q_sqnext == NULL) {				\
551 			/* Last queue on list, make tail sqprev */	\
552 			sq->sq_tail = qp->q_sqprev;			\
553 		} else {						\
554 			/* Make next->prev == prev */			\
555 			qp->q_sqnext->q_sqprev = qp->q_sqprev;		\
556 		}							\
557 		/* clear out references on this queue */		\
558 		qp->q_sqprev = qp->q_sqnext = NULL;			\
559 		qp->q_sqflags &= ~Q_SQQUEUED;				\
560 		/* If there is nothing queued, clear SQ_MESSAGES */	\
561 		if (sq->sq_head != NULL) {				\
562 			sq->sq_pri = sq->sq_head->q_spri;		\
563 		} else	{						\
564 			sq->sq_flags &= ~SQ_MESSAGES;			\
565 			sq->sq_pri = 0;					\
566 		}							\
567 		sq->sq_nqueues--;					\
568 		ASSERT(sq->sq_head != NULL || sq->sq_evhead != NULL ||	\
569 		    (sq->sq_flags & SQ_QUEUED) == 0);			\
570 	}
571 
572 /* Hide the definition from the header file. */
573 #ifdef SQPUT_MP
574 #undef SQPUT_MP
575 #endif
576 
577 /*
578  * Put a message on the queue syncq.
579  * Assumes QLOCK held.
580  */
581 #define	SQPUT_MP(qp, mp)						\
582 	{								\
583 		ASSERT(MUTEX_HELD(QLOCK(qp)));				\
584 		ASSERT(qp->q_sqhead == NULL ||				\
585 		    (qp->q_sqtail != NULL &&				\
586 		    qp->q_sqtail->b_next == NULL));			\
587 		qp->q_syncqmsgs++;					\
588 		ASSERT(qp->q_syncqmsgs != 0);	/* Wraparound */	\
589 		if (qp->q_sqhead == NULL) {				\
590 			qp->q_sqhead = qp->q_sqtail = mp;		\
591 		} else {						\
592 			qp->q_sqtail->b_next = mp;			\
593 			qp->q_sqtail = mp;				\
594 		}							\
595 		ASSERT(qp->q_syncqmsgs > 0);				\
596 		set_qfull(qp);						\
597 	}
598 
599 #define	SQ_PUTCOUNT_SETFAST_LOCKED(sq) {				\
600 		ASSERT(MUTEX_HELD(SQLOCK(sq)));				\
601 		if ((sq)->sq_ciputctrl != NULL) {			\
602 			int i;						\
603 			int nlocks = (sq)->sq_nciputctrl;		\
604 			ciputctrl_t *cip = (sq)->sq_ciputctrl;		\
605 			ASSERT((sq)->sq_type & SQ_CIPUT);		\
606 			for (i = 0; i <= nlocks; i++) {			\
607 				ASSERT(MUTEX_HELD(&cip[i].ciputctrl_lock)); \
608 				cip[i].ciputctrl_count |= SQ_FASTPUT;	\
609 			}						\
610 		}							\
611 	}
612 
613 
614 #define	SQ_PUTCOUNT_CLRFAST_LOCKED(sq) {				\
615 		ASSERT(MUTEX_HELD(SQLOCK(sq)));				\
616 		if ((sq)->sq_ciputctrl != NULL) {			\
617 			int i;						\
618 			int nlocks = (sq)->sq_nciputctrl;		\
619 			ciputctrl_t *cip = (sq)->sq_ciputctrl;		\
620 			ASSERT((sq)->sq_type & SQ_CIPUT);		\
621 			for (i = 0; i <= nlocks; i++) {			\
622 				ASSERT(MUTEX_HELD(&cip[i].ciputctrl_lock)); \
623 				cip[i].ciputctrl_count &= ~SQ_FASTPUT;	\
624 			}						\
625 		}							\
626 	}
627 
628 /*
629  * Run service procedures for all queues in the stream head.
630  */
631 #define	STR_SERVICE(stp, q) {						\
632 	ASSERT(MUTEX_HELD(&stp->sd_qlock));				\
633 	while (stp->sd_qhead != NULL) {					\
634 		DQ(q, stp->sd_qhead, stp->sd_qtail, q_link);		\
635 		ASSERT(stp->sd_nqueues > 0);				\
636 		stp->sd_nqueues--;					\
637 		ASSERT(!(q->q_flag & QINSERVICE));			\
638 		mutex_exit(&stp->sd_qlock);				\
639 		queue_service(q);					\
640 		mutex_enter(&stp->sd_qlock);				\
641 	}								\
642 	ASSERT(stp->sd_nqueues == 0);					\
643 	ASSERT((stp->sd_qhead == NULL) && (stp->sd_qtail == NULL));	\
644 }
645 
646 /*
647  * Constructor/destructor routines for the stream head cache
648  */
649 /* ARGSUSED */
650 static int
stream_head_constructor(void * buf,void * cdrarg,int kmflags)651 stream_head_constructor(void *buf, void *cdrarg, int kmflags)
652 {
653 	stdata_t *stp = buf;
654 
655 	mutex_init(&stp->sd_lock, NULL, MUTEX_DEFAULT, NULL);
656 	mutex_init(&stp->sd_reflock, NULL, MUTEX_DEFAULT, NULL);
657 	mutex_init(&stp->sd_qlock, NULL, MUTEX_DEFAULT, NULL);
658 	cv_init(&stp->sd_monitor, NULL, CV_DEFAULT, NULL);
659 	cv_init(&stp->sd_iocmonitor, NULL, CV_DEFAULT, NULL);
660 	cv_init(&stp->sd_refmonitor, NULL, CV_DEFAULT, NULL);
661 	cv_init(&stp->sd_qcv, NULL, CV_DEFAULT, NULL);
662 	cv_init(&stp->sd_zcopy_wait, NULL, CV_DEFAULT, NULL);
663 	stp->sd_wrq = NULL;
664 
665 	return (0);
666 }
667 
668 /* ARGSUSED */
669 static void
stream_head_destructor(void * buf,void * cdrarg)670 stream_head_destructor(void *buf, void *cdrarg)
671 {
672 	stdata_t *stp = buf;
673 
674 	mutex_destroy(&stp->sd_lock);
675 	mutex_destroy(&stp->sd_reflock);
676 	mutex_destroy(&stp->sd_qlock);
677 	cv_destroy(&stp->sd_monitor);
678 	cv_destroy(&stp->sd_iocmonitor);
679 	cv_destroy(&stp->sd_refmonitor);
680 	cv_destroy(&stp->sd_qcv);
681 	cv_destroy(&stp->sd_zcopy_wait);
682 }
683 
684 /*
685  * Constructor/destructor routines for the queue cache
686  */
687 /* ARGSUSED */
688 static int
queue_constructor(void * buf,void * cdrarg,int kmflags)689 queue_constructor(void *buf, void *cdrarg, int kmflags)
690 {
691 	queinfo_t *qip = buf;
692 	queue_t *qp = &qip->qu_rqueue;
693 	queue_t *wqp = &qip->qu_wqueue;
694 	syncq_t	*sq = &qip->qu_syncq;
695 
696 	qp->q_first = NULL;
697 	qp->q_link = NULL;
698 	qp->q_count = 0;
699 	qp->q_mblkcnt = 0;
700 	qp->q_sqhead = NULL;
701 	qp->q_sqtail = NULL;
702 	qp->q_sqnext = NULL;
703 	qp->q_sqprev = NULL;
704 	qp->q_sqflags = 0;
705 	qp->q_rwcnt = 0;
706 	qp->q_spri = 0;
707 
708 	mutex_init(QLOCK(qp), NULL, MUTEX_DEFAULT, NULL);
709 	cv_init(&qp->q_wait, NULL, CV_DEFAULT, NULL);
710 
711 	wqp->q_first = NULL;
712 	wqp->q_link = NULL;
713 	wqp->q_count = 0;
714 	wqp->q_mblkcnt = 0;
715 	wqp->q_sqhead = NULL;
716 	wqp->q_sqtail = NULL;
717 	wqp->q_sqnext = NULL;
718 	wqp->q_sqprev = NULL;
719 	wqp->q_sqflags = 0;
720 	wqp->q_rwcnt = 0;
721 	wqp->q_spri = 0;
722 
723 	mutex_init(QLOCK(wqp), NULL, MUTEX_DEFAULT, NULL);
724 	cv_init(&wqp->q_wait, NULL, CV_DEFAULT, NULL);
725 
726 	sq->sq_head = NULL;
727 	sq->sq_tail = NULL;
728 	sq->sq_evhead = NULL;
729 	sq->sq_evtail = NULL;
730 	sq->sq_callbpend = NULL;
731 	sq->sq_outer = NULL;
732 	sq->sq_onext = NULL;
733 	sq->sq_oprev = NULL;
734 	sq->sq_next = NULL;
735 	sq->sq_svcflags = 0;
736 	sq->sq_servcount = 0;
737 	sq->sq_needexcl = 0;
738 	sq->sq_nqueues = 0;
739 	sq->sq_pri = 0;
740 
741 	mutex_init(&sq->sq_lock, NULL, MUTEX_DEFAULT, NULL);
742 	cv_init(&sq->sq_wait, NULL, CV_DEFAULT, NULL);
743 	cv_init(&sq->sq_exitwait, NULL, CV_DEFAULT, NULL);
744 
745 	return (0);
746 }
747 
748 /* ARGSUSED */
749 static void
queue_destructor(void * buf,void * cdrarg)750 queue_destructor(void *buf, void *cdrarg)
751 {
752 	queinfo_t *qip = buf;
753 	queue_t *qp = &qip->qu_rqueue;
754 	queue_t *wqp = &qip->qu_wqueue;
755 	syncq_t	*sq = &qip->qu_syncq;
756 
757 	ASSERT(qp->q_sqhead == NULL);
758 	ASSERT(wqp->q_sqhead == NULL);
759 	ASSERT(qp->q_sqnext == NULL);
760 	ASSERT(wqp->q_sqnext == NULL);
761 	ASSERT(qp->q_rwcnt == 0);
762 	ASSERT(wqp->q_rwcnt == 0);
763 
764 	mutex_destroy(&qp->q_lock);
765 	cv_destroy(&qp->q_wait);
766 
767 	mutex_destroy(&wqp->q_lock);
768 	cv_destroy(&wqp->q_wait);
769 
770 	mutex_destroy(&sq->sq_lock);
771 	cv_destroy(&sq->sq_wait);
772 	cv_destroy(&sq->sq_exitwait);
773 }
774 
775 /*
776  * Constructor/destructor routines for the syncq cache
777  */
778 /* ARGSUSED */
779 static int
syncq_constructor(void * buf,void * cdrarg,int kmflags)780 syncq_constructor(void *buf, void *cdrarg, int kmflags)
781 {
782 	syncq_t	*sq = buf;
783 
784 	bzero(buf, sizeof (syncq_t));
785 
786 	mutex_init(&sq->sq_lock, NULL, MUTEX_DEFAULT, NULL);
787 	cv_init(&sq->sq_wait, NULL, CV_DEFAULT, NULL);
788 	cv_init(&sq->sq_exitwait, NULL, CV_DEFAULT, NULL);
789 
790 	return (0);
791 }
792 
793 /* ARGSUSED */
794 static void
syncq_destructor(void * buf,void * cdrarg)795 syncq_destructor(void *buf, void *cdrarg)
796 {
797 	syncq_t	*sq = buf;
798 
799 	ASSERT(sq->sq_head == NULL);
800 	ASSERT(sq->sq_tail == NULL);
801 	ASSERT(sq->sq_evhead == NULL);
802 	ASSERT(sq->sq_evtail == NULL);
803 	ASSERT(sq->sq_callbpend == NULL);
804 	ASSERT(sq->sq_callbflags == 0);
805 	ASSERT(sq->sq_outer == NULL);
806 	ASSERT(sq->sq_onext == NULL);
807 	ASSERT(sq->sq_oprev == NULL);
808 	ASSERT(sq->sq_next == NULL);
809 	ASSERT(sq->sq_needexcl == 0);
810 	ASSERT(sq->sq_svcflags == 0);
811 	ASSERT(sq->sq_servcount == 0);
812 	ASSERT(sq->sq_nqueues == 0);
813 	ASSERT(sq->sq_pri == 0);
814 	ASSERT(sq->sq_count == 0);
815 	ASSERT(sq->sq_rmqcount == 0);
816 	ASSERT(sq->sq_cancelid == 0);
817 	ASSERT(sq->sq_ciputctrl == NULL);
818 	ASSERT(sq->sq_nciputctrl == 0);
819 	ASSERT(sq->sq_type == 0);
820 	ASSERT(sq->sq_flags == 0);
821 
822 	mutex_destroy(&sq->sq_lock);
823 	cv_destroy(&sq->sq_wait);
824 	cv_destroy(&sq->sq_exitwait);
825 }
826 
827 /* ARGSUSED */
828 static int
ciputctrl_constructor(void * buf,void * cdrarg,int kmflags)829 ciputctrl_constructor(void *buf, void *cdrarg, int kmflags)
830 {
831 	ciputctrl_t *cip = buf;
832 	int i;
833 
834 	for (i = 0; i < n_ciputctrl; i++) {
835 		cip[i].ciputctrl_count = SQ_FASTPUT;
836 		mutex_init(&cip[i].ciputctrl_lock, NULL, MUTEX_DEFAULT, NULL);
837 	}
838 
839 	return (0);
840 }
841 
842 /* ARGSUSED */
843 static void
ciputctrl_destructor(void * buf,void * cdrarg)844 ciputctrl_destructor(void *buf, void *cdrarg)
845 {
846 	ciputctrl_t *cip = buf;
847 	int i;
848 
849 	for (i = 0; i < n_ciputctrl; i++) {
850 		ASSERT(cip[i].ciputctrl_count & SQ_FASTPUT);
851 		mutex_destroy(&cip[i].ciputctrl_lock);
852 	}
853 }
854 
855 /*
856  * Init routine run from main at boot time.
857  */
858 void
strinit(void)859 strinit(void)
860 {
861 	int ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus);
862 
863 	stream_head_cache = kmem_cache_create("stream_head_cache",
864 	    sizeof (stdata_t), 0,
865 	    stream_head_constructor, stream_head_destructor, NULL,
866 	    NULL, NULL, 0);
867 
868 	queue_cache = kmem_cache_create("queue_cache", sizeof (queinfo_t), 0,
869 	    queue_constructor, queue_destructor, NULL, NULL, NULL, 0);
870 
871 	syncq_cache = kmem_cache_create("syncq_cache", sizeof (syncq_t), 0,
872 	    syncq_constructor, syncq_destructor, NULL, NULL, NULL, 0);
873 
874 	qband_cache = kmem_cache_create("qband_cache",
875 	    sizeof (qband_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
876 
877 	linkinfo_cache = kmem_cache_create("linkinfo_cache",
878 	    sizeof (linkinfo_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
879 
880 	n_ciputctrl = ncpus;
881 	n_ciputctrl = 1 << highbit(n_ciputctrl - 1);
882 	ASSERT(n_ciputctrl >= 1);
883 	n_ciputctrl = MIN(n_ciputctrl, max_n_ciputctrl);
884 	if (n_ciputctrl >= min_n_ciputctrl) {
885 		ciputctrl_cache = kmem_cache_create("ciputctrl_cache",
886 		    sizeof (ciputctrl_t) * n_ciputctrl,
887 		    sizeof (ciputctrl_t), ciputctrl_constructor,
888 		    ciputctrl_destructor, NULL, NULL, NULL, 0);
889 	}
890 
891 	streams_taskq = system_taskq;
892 
893 	if (streams_taskq == NULL)
894 		panic("strinit: no memory for streams taskq!");
895 
896 	bc_bkgrnd_thread = thread_create(NULL, 0,
897 	    streams_bufcall_service, NULL, 0, &p0, TS_RUN, streams_lopri);
898 
899 	streams_qbkgrnd_thread = thread_create(NULL, 0,
900 	    streams_qbkgrnd_service, NULL, 0, &p0, TS_RUN, streams_lopri);
901 
902 	streams_sqbkgrnd_thread = thread_create(NULL, 0,
903 	    streams_sqbkgrnd_service, NULL, 0, &p0, TS_RUN, streams_lopri);
904 
905 	/*
906 	 * Create STREAMS kstats.
907 	 */
908 	str_kstat = kstat_create("streams", 0, "strstat",
909 	    "net", KSTAT_TYPE_NAMED,
910 	    sizeof (str_statistics) / sizeof (kstat_named_t),
911 	    KSTAT_FLAG_VIRTUAL);
912 
913 	if (str_kstat != NULL) {
914 		str_kstat->ks_data = &str_statistics;
915 		kstat_install(str_kstat);
916 	}
917 
918 	/*
919 	 * TPI support routine initialisation.
920 	 */
921 	tpi_init();
922 
923 	/*
924 	 * Handle to have autopush and persistent link information per
925 	 * zone.
926 	 * Note: uses shutdown hook instead of destroy hook so that the
927 	 * persistent links can be torn down before the destroy hooks
928 	 * in the TCP/IP stack are called.
929 	 */
930 	netstack_register(NS_STR, str_stack_init, str_stack_shutdown,
931 	    str_stack_fini);
932 }
933 
934 void
str_sendsig(vnode_t * vp,int event,uchar_t band,int error)935 str_sendsig(vnode_t *vp, int event, uchar_t band, int error)
936 {
937 	struct stdata *stp;
938 
939 	ASSERT(vp->v_stream);
940 	stp = vp->v_stream;
941 	/* Have to hold sd_lock to prevent siglist from changing */
942 	mutex_enter(&stp->sd_lock);
943 	if (stp->sd_sigflags & event)
944 		strsendsig(stp->sd_siglist, event, band, error);
945 	mutex_exit(&stp->sd_lock);
946 }
947 
948 /*
949  * Send the "sevent" set of signals to a process.
950  * This might send more than one signal if the process is registered
951  * for multiple events. The caller should pass in an sevent that only
952  * includes the events for which the process has registered.
953  */
954 static void
dosendsig(proc_t * proc,int events,int sevent,k_siginfo_t * info,uchar_t band,int error)955 dosendsig(proc_t *proc, int events, int sevent, k_siginfo_t *info,
956     uchar_t band, int error)
957 {
958 	ASSERT(MUTEX_HELD(&proc->p_lock));
959 
960 	info->si_band = 0;
961 	info->si_errno = 0;
962 
963 	if (sevent & S_ERROR) {
964 		sevent &= ~S_ERROR;
965 		info->si_code = POLL_ERR;
966 		info->si_errno = error;
967 		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
968 		    "strsendsig:proc %p info %p", proc, info);
969 		sigaddq(proc, NULL, info, KM_NOSLEEP);
970 		info->si_errno = 0;
971 	}
972 	if (sevent & S_HANGUP) {
973 		sevent &= ~S_HANGUP;
974 		info->si_code = POLL_HUP;
975 		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
976 		    "strsendsig:proc %p info %p", proc, info);
977 		sigaddq(proc, NULL, info, KM_NOSLEEP);
978 	}
979 	if (sevent & S_HIPRI) {
980 		sevent &= ~S_HIPRI;
981 		info->si_code = POLL_PRI;
982 		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
983 		    "strsendsig:proc %p info %p", proc, info);
984 		sigaddq(proc, NULL, info, KM_NOSLEEP);
985 	}
986 	if (sevent & S_RDBAND) {
987 		sevent &= ~S_RDBAND;
988 		if (events & S_BANDURG)
989 			sigtoproc(proc, NULL, SIGURG);
990 		else
991 			sigtoproc(proc, NULL, SIGPOLL);
992 	}
993 	if (sevent & S_WRBAND) {
994 		sevent &= ~S_WRBAND;
995 		sigtoproc(proc, NULL, SIGPOLL);
996 	}
997 	if (sevent & S_INPUT) {
998 		sevent &= ~S_INPUT;
999 		info->si_code = POLL_IN;
1000 		info->si_band = band;
1001 		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
1002 		    "strsendsig:proc %p info %p", proc, info);
1003 		sigaddq(proc, NULL, info, KM_NOSLEEP);
1004 		info->si_band = 0;
1005 	}
1006 	if (sevent & S_OUTPUT) {
1007 		sevent &= ~S_OUTPUT;
1008 		info->si_code = POLL_OUT;
1009 		info->si_band = band;
1010 		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
1011 		    "strsendsig:proc %p info %p", proc, info);
1012 		sigaddq(proc, NULL, info, KM_NOSLEEP);
1013 		info->si_band = 0;
1014 	}
1015 	if (sevent & S_MSG) {
1016 		sevent &= ~S_MSG;
1017 		info->si_code = POLL_MSG;
1018 		info->si_band = band;
1019 		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
1020 		    "strsendsig:proc %p info %p", proc, info);
1021 		sigaddq(proc, NULL, info, KM_NOSLEEP);
1022 		info->si_band = 0;
1023 	}
1024 	if (sevent & S_RDNORM) {
1025 		sevent &= ~S_RDNORM;
1026 		sigtoproc(proc, NULL, SIGPOLL);
1027 	}
1028 	if (sevent != 0) {
1029 		panic("strsendsig: unknown event(s) %x", sevent);
1030 	}
1031 }
1032 
1033 /*
1034  * Send SIGPOLL/SIGURG signal to all processes and process groups
1035  * registered on the given signal list that want a signal for at
1036  * least one of the specified events.
1037  *
1038  * Must be called with exclusive access to siglist (caller holding sd_lock).
1039  *
1040  * strioctl(I_SETSIG/I_ESETSIG) will only change siglist when holding
1041  * sd_lock and the ioctl code maintains a PID_HOLD on the pid structure
1042  * while it is in the siglist.
1043  *
1044  * For performance reasons (MP scalability) the code drops pidlock
1045  * when sending signals to a single process.
1046  * When sending to a process group the code holds
1047  * pidlock to prevent the membership in the process group from changing
1048  * while walking the p_pglink list.
1049  */
1050 void
strsendsig(strsig_t * siglist,int event,uchar_t band,int error)1051 strsendsig(strsig_t *siglist, int event, uchar_t band, int error)
1052 {
1053 	strsig_t *ssp;
1054 	k_siginfo_t info;
1055 	struct pid *pidp;
1056 	proc_t  *proc;
1057 
1058 	info.si_signo = SIGPOLL;
1059 	info.si_errno = 0;
1060 	for (ssp = siglist; ssp; ssp = ssp->ss_next) {
1061 		int sevent;
1062 
1063 		sevent = ssp->ss_events & event;
1064 		if (sevent == 0)
1065 			continue;
1066 
1067 		if ((pidp = ssp->ss_pidp) == NULL) {
1068 			/* pid was released but still on event list */
1069 			continue;
1070 		}
1071 
1072 
1073 		if (ssp->ss_pid > 0) {
1074 			/*
1075 			 * XXX This unfortunately still generates
1076 			 * a signal when a fd is closed but
1077 			 * the proc is active.
1078 			 */
1079 			ASSERT(ssp->ss_pid == pidp->pid_id);
1080 
1081 			mutex_enter(&pidlock);
1082 			proc = prfind_zone(pidp->pid_id, ALL_ZONES);
1083 			if (proc == NULL) {
1084 				mutex_exit(&pidlock);
1085 				continue;
1086 			}
1087 			mutex_enter(&proc->p_lock);
1088 			mutex_exit(&pidlock);
1089 			dosendsig(proc, ssp->ss_events, sevent, &info,
1090 			    band, error);
1091 			mutex_exit(&proc->p_lock);
1092 		} else {
1093 			/*
1094 			 * Send to process group. Hold pidlock across
1095 			 * calls to dosendsig().
1096 			 */
1097 			pid_t pgrp = -ssp->ss_pid;
1098 
1099 			mutex_enter(&pidlock);
1100 			proc = pgfind_zone(pgrp, ALL_ZONES);
1101 			while (proc != NULL) {
1102 				mutex_enter(&proc->p_lock);
1103 				dosendsig(proc, ssp->ss_events, sevent,
1104 				    &info, band, error);
1105 				mutex_exit(&proc->p_lock);
1106 				proc = proc->p_pglink;
1107 			}
1108 			mutex_exit(&pidlock);
1109 		}
1110 	}
1111 }
1112 
1113 /*
1114  * Attach a stream device or module.
1115  * qp is a read queue; the new queue goes in so its next
1116  * read ptr is the argument, and the write queue corresponding
1117  * to the argument points to this queue. Return 0 on success,
1118  * or a non-zero errno on failure.
1119  */
1120 int
qattach(queue_t * qp,dev_t * devp,int oflag,cred_t * crp,fmodsw_impl_t * fp,boolean_t is_insert)1121 qattach(queue_t *qp, dev_t *devp, int oflag, cred_t *crp, fmodsw_impl_t *fp,
1122     boolean_t is_insert)
1123 {
1124 	major_t			major;
1125 	cdevsw_impl_t		*dp;
1126 	struct streamtab	*str;
1127 	queue_t			*rq;
1128 	queue_t			*wrq;
1129 	uint32_t		qflag;
1130 	uint32_t		sqtype;
1131 	perdm_t			*dmp;
1132 	int			error;
1133 	int			sflag;
1134 
1135 	rq = allocq();
1136 	wrq = _WR(rq);
1137 	STREAM(rq) = STREAM(wrq) = STREAM(qp);
1138 
1139 	if (fp != NULL) {
1140 		str = fp->f_str;
1141 		qflag = fp->f_qflag;
1142 		sqtype = fp->f_sqtype;
1143 		dmp = fp->f_dmp;
1144 		IMPLY((qflag & (QPERMOD | QMTOUTPERIM)), dmp != NULL);
1145 		sflag = MODOPEN;
1146 
1147 		/*
1148 		 * stash away a pointer to the module structure so we can
1149 		 * unref it in qdetach.
1150 		 */
1151 		rq->q_fp = fp;
1152 	} else {
1153 		ASSERT(!is_insert);
1154 
1155 		major = getmajor(*devp);
1156 		dp = &devimpl[major];
1157 
1158 		str = dp->d_str;
1159 		ASSERT(str == STREAMSTAB(major));
1160 
1161 		qflag = dp->d_qflag;
1162 		ASSERT(qflag & QISDRV);
1163 		sqtype = dp->d_sqtype;
1164 
1165 		/* create perdm_t if needed */
1166 		if (NEED_DM(dp->d_dmp, qflag))
1167 			dp->d_dmp = hold_dm(str, qflag, sqtype);
1168 
1169 		dmp = dp->d_dmp;
1170 		sflag = 0;
1171 	}
1172 
1173 	TRACE_2(TR_FAC_STREAMS_FR, TR_QATTACH_FLAGS,
1174 	    "qattach:qflag == %X(%X)", qflag, *devp);
1175 
1176 	/* setq might sleep in allocator - avoid holding locks. */
1177 	setq(rq, str->st_rdinit, str->st_wrinit, dmp, qflag, sqtype, B_FALSE);
1178 
1179 	/*
1180 	 * Before calling the module's open routine, set up the q_next
1181 	 * pointer for inserting a module in the middle of a stream.
1182 	 *
1183 	 * Note that we can always set _QINSERTING and set up q_next
1184 	 * pointer for both inserting and pushing a module.  Then there
1185 	 * is no need for the is_insert parameter.  In insertq(), called
1186 	 * by qprocson(), assume that q_next of the new module always points
1187 	 * to the correct queue and use it for insertion.  Everything should
1188 	 * work out fine.  But in the first release of _I_INSERT, we
1189 	 * distinguish between inserting and pushing to make sure that
1190 	 * pushing a module follows the same code path as before.
1191 	 */
1192 	if (is_insert) {
1193 		rq->q_flag |= _QINSERTING;
1194 		rq->q_next = qp;
1195 	}
1196 
1197 	/*
1198 	 * If there is an outer perimeter get exclusive access during
1199 	 * the open procedure.  Bump up the reference count on the queue.
1200 	 */
1201 	entersq(rq->q_syncq, SQ_OPENCLOSE);
1202 	error = (*rq->q_qinfo->qi_qopen)(rq, devp, oflag, sflag, crp);
1203 	if (error != 0)
1204 		goto failed;
1205 	leavesq(rq->q_syncq, SQ_OPENCLOSE);
1206 	ASSERT(qprocsareon(rq));
1207 	return (0);
1208 
1209 failed:
1210 	rq->q_flag &= ~_QINSERTING;
1211 	if (backq(wrq) != NULL && backq(wrq)->q_next == wrq)
1212 		qprocsoff(rq);
1213 	leavesq(rq->q_syncq, SQ_OPENCLOSE);
1214 	rq->q_next = wrq->q_next = NULL;
1215 	qdetach(rq, 0, 0, crp, B_FALSE);
1216 	return (error);
1217 }
1218 
1219 /*
1220  * Handle second open of stream. For modules, set the
1221  * last argument to MODOPEN and do not pass any open flags.
1222  * Ignore dummydev since this is not the first open.
1223  */
1224 int
qreopen(queue_t * qp,dev_t * devp,int flag,cred_t * crp)1225 qreopen(queue_t *qp, dev_t *devp, int flag, cred_t *crp)
1226 {
1227 	int	error;
1228 	dev_t dummydev;
1229 	queue_t *wqp = _WR(qp);
1230 
1231 	ASSERT(qp->q_flag & QREADR);
1232 	entersq(qp->q_syncq, SQ_OPENCLOSE);
1233 
1234 	dummydev = *devp;
1235 	if (error = ((*qp->q_qinfo->qi_qopen)(qp, &dummydev,
1236 	    (wqp->q_next ? 0 : flag), (wqp->q_next ? MODOPEN : 0), crp))) {
1237 		leavesq(qp->q_syncq, SQ_OPENCLOSE);
1238 		mutex_enter(&STREAM(qp)->sd_lock);
1239 		qp->q_stream->sd_flag |= STREOPENFAIL;
1240 		mutex_exit(&STREAM(qp)->sd_lock);
1241 		return (error);
1242 	}
1243 	leavesq(qp->q_syncq, SQ_OPENCLOSE);
1244 
1245 	/*
1246 	 * successful open should have done qprocson()
1247 	 */
1248 	ASSERT(qprocsareon(_RD(qp)));
1249 	return (0);
1250 }
1251 
1252 /*
1253  * Detach a stream module or device.
1254  * If clmode == 1 then the module or driver was opened and its
1255  * close routine must be called. If clmode == 0, the module
1256  * or driver was never opened or the open failed, and so its close
1257  * should not be called.
1258  */
1259 void
qdetach(queue_t * qp,int clmode,int flag,cred_t * crp,boolean_t is_remove)1260 qdetach(queue_t *qp, int clmode, int flag, cred_t *crp, boolean_t is_remove)
1261 {
1262 	queue_t *wqp = _WR(qp);
1263 	ASSERT(STREAM(qp)->sd_flag & (STRCLOSE|STWOPEN|STRPLUMB));
1264 
1265 	if (STREAM_NEEDSERVICE(STREAM(qp)))
1266 		stream_runservice(STREAM(qp));
1267 
1268 	if (clmode) {
1269 		/*
1270 		 * Make sure that all the messages on the write side syncq are
1271 		 * processed and nothing is left. Since we are closing, no new
1272 		 * messages may appear there.
1273 		 */
1274 		wait_q_syncq(wqp);
1275 
1276 		entersq(qp->q_syncq, SQ_OPENCLOSE);
1277 		if (is_remove) {
1278 			mutex_enter(QLOCK(qp));
1279 			qp->q_flag |= _QREMOVING;
1280 			mutex_exit(QLOCK(qp));
1281 		}
1282 		(*qp->q_qinfo->qi_qclose)(qp, flag, crp);
1283 		/*
1284 		 * Check that qprocsoff() was actually called.
1285 		 */
1286 		ASSERT((qp->q_flag & QWCLOSE) && (wqp->q_flag & QWCLOSE));
1287 
1288 		leavesq(qp->q_syncq, SQ_OPENCLOSE);
1289 	} else {
1290 		disable_svc(qp);
1291 	}
1292 
1293 	/*
1294 	 * Allow any threads blocked in entersq to proceed and discover
1295 	 * the QWCLOSE is set.
1296 	 * Note: This assumes that all users of entersq check QWCLOSE.
1297 	 * Currently runservice is the only entersq that can happen
1298 	 * after removeq has finished.
1299 	 * Removeq will have discarded all messages destined to the closing
1300 	 * pair of queues from the syncq.
1301 	 * NOTE: Calling a function inside an assert is unconventional.
1302 	 * However, it does not cause any problem since flush_syncq() does
1303 	 * not change any state except when it returns non-zero i.e.
1304 	 * when the assert will trigger.
1305 	 */
1306 	ASSERT(flush_syncq(qp->q_syncq, qp) == 0);
1307 	ASSERT(flush_syncq(wqp->q_syncq, wqp) == 0);
1308 	ASSERT((qp->q_flag & QPERMOD) ||
1309 	    ((qp->q_syncq->sq_head == NULL) &&
1310 	    (wqp->q_syncq->sq_head == NULL)));
1311 
1312 	/* release any fmodsw_impl_t structure held on behalf of the queue */
1313 	ASSERT(qp->q_fp != NULL || qp->q_flag & QISDRV);
1314 	if (qp->q_fp != NULL)
1315 		fmodsw_rele(qp->q_fp);
1316 
1317 	/* freeq removes us from the outer perimeter if any */
1318 	freeq(qp);
1319 }
1320 
1321 /* Prevent service procedures from being called */
1322 void
disable_svc(queue_t * qp)1323 disable_svc(queue_t *qp)
1324 {
1325 	queue_t *wqp = _WR(qp);
1326 
1327 	ASSERT(qp->q_flag & QREADR);
1328 	mutex_enter(QLOCK(qp));
1329 	qp->q_flag |= QWCLOSE;
1330 	mutex_exit(QLOCK(qp));
1331 	mutex_enter(QLOCK(wqp));
1332 	wqp->q_flag |= QWCLOSE;
1333 	mutex_exit(QLOCK(wqp));
1334 }
1335 
1336 /* Allow service procedures to be called again */
1337 void
enable_svc(queue_t * qp)1338 enable_svc(queue_t *qp)
1339 {
1340 	queue_t *wqp = _WR(qp);
1341 
1342 	ASSERT(qp->q_flag & QREADR);
1343 	mutex_enter(QLOCK(qp));
1344 	qp->q_flag &= ~QWCLOSE;
1345 	mutex_exit(QLOCK(qp));
1346 	mutex_enter(QLOCK(wqp));
1347 	wqp->q_flag &= ~QWCLOSE;
1348 	mutex_exit(QLOCK(wqp));
1349 }
1350 
1351 /*
1352  * Remove queue from qhead/qtail if it is enabled.
1353  * Only reset QENAB if the queue was removed from the runlist.
1354  * A queue goes through 3 stages:
1355  *	It is on the service list and QENAB is set.
1356  *	It is removed from the service list but QENAB is still set.
1357  *	QENAB gets changed to QINSERVICE.
1358  *	QINSERVICE is reset (when the service procedure is done)
1359  * Thus we can not reset QENAB unless we actually removed it from the service
1360  * queue.
1361  */
1362 void
remove_runlist(queue_t * qp)1363 remove_runlist(queue_t *qp)
1364 {
1365 	if (qp->q_flag & QENAB && qhead != NULL) {
1366 		queue_t *q_chase;
1367 		queue_t *q_curr;
1368 		int removed;
1369 
1370 		mutex_enter(&service_queue);
1371 		RMQ(qp, qhead, qtail, q_link, q_chase, q_curr, removed);
1372 		mutex_exit(&service_queue);
1373 		if (removed) {
1374 			STRSTAT(qremoved);
1375 			qp->q_flag &= ~QENAB;
1376 		}
1377 	}
1378 }
1379 
1380 
1381 /*
1382  * Wait for any pending service processing to complete.
1383  * The removal of queues from the runlist is not atomic with the
1384  * clearing of the QENABLED flag and setting the INSERVICE flag.
1385  * consequently it is possible for remove_runlist in strclose
1386  * to not find the queue on the runlist but for it to be QENABLED
1387  * and not yet INSERVICE -> hence wait_svc needs to check QENABLED
1388  * as well as INSERVICE.
1389  */
1390 void
wait_svc(queue_t * qp)1391 wait_svc(queue_t *qp)
1392 {
1393 	queue_t *wqp = _WR(qp);
1394 
1395 	ASSERT(qp->q_flag & QREADR);
1396 
1397 	/*
1398 	 * Try to remove queues from qhead/qtail list.
1399 	 */
1400 	if (qhead != NULL) {
1401 		remove_runlist(qp);
1402 		remove_runlist(wqp);
1403 	}
1404 	/*
1405 	 * Wait till the syncqs associated with the queue disappear from the
1406 	 * background processing list.
1407 	 * This only needs to be done for non-PERMOD perimeters since
1408 	 * for PERMOD perimeters the syncq may be shared and will only be freed
1409 	 * when the last module/driver is unloaded.
1410 	 * If for PERMOD perimeters queue was on the syncq list, removeq()
1411 	 * should call propagate_syncq() or drain_syncq() for it. Both of these
1412 	 * functions remove the queue from its syncq list, so sqthread will not
1413 	 * try to access the queue.
1414 	 */
1415 	if (!(qp->q_flag & QPERMOD)) {
1416 		syncq_t *rsq = qp->q_syncq;
1417 		syncq_t *wsq = wqp->q_syncq;
1418 
1419 		/*
1420 		 * Disable rsq and wsq and wait for any background processing of
1421 		 * syncq to complete.
1422 		 */
1423 		wait_sq_svc(rsq);
1424 		if (wsq != rsq)
1425 			wait_sq_svc(wsq);
1426 	}
1427 
1428 	mutex_enter(QLOCK(qp));
1429 	while (qp->q_flag & (QINSERVICE|QENAB))
1430 		cv_wait(&qp->q_wait, QLOCK(qp));
1431 	mutex_exit(QLOCK(qp));
1432 	mutex_enter(QLOCK(wqp));
1433 	while (wqp->q_flag & (QINSERVICE|QENAB))
1434 		cv_wait(&wqp->q_wait, QLOCK(wqp));
1435 	mutex_exit(QLOCK(wqp));
1436 }
1437 
1438 /*
1439  * Put ioctl data from userland buffer `arg' into the mblk chain `bp'.
1440  * `flag' must always contain either K_TO_K or U_TO_K; STR_NOSIG may
1441  * also be set, and is passed through to allocb_cred_wait().
1442  *
1443  * Returns errno on failure, zero on success.
1444  */
1445 int
putiocd(mblk_t * bp,char * arg,int flag,cred_t * cr)1446 putiocd(mblk_t *bp, char *arg, int flag, cred_t *cr)
1447 {
1448 	mblk_t *tmp;
1449 	ssize_t  count;
1450 	int error = 0;
1451 
1452 	ASSERT((flag & (U_TO_K | K_TO_K)) == U_TO_K ||
1453 	    (flag & (U_TO_K | K_TO_K)) == K_TO_K);
1454 
1455 	if (bp->b_datap->db_type == M_IOCTL) {
1456 		count = ((struct iocblk *)bp->b_rptr)->ioc_count;
1457 	} else {
1458 		ASSERT(bp->b_datap->db_type == M_COPYIN);
1459 		count = ((struct copyreq *)bp->b_rptr)->cq_size;
1460 	}
1461 	/*
1462 	 * strdoioctl validates ioc_count, so if this assert fails it
1463 	 * cannot be due to user error.
1464 	 */
1465 	ASSERT(count >= 0);
1466 
1467 	if ((tmp = allocb_cred_wait(count, (flag & STR_NOSIG), &error, cr,
1468 	    curproc->p_pid)) == NULL) {
1469 		return (error);
1470 	}
1471 	error = strcopyin(arg, tmp->b_wptr, count, flag & (U_TO_K|K_TO_K));
1472 	if (error != 0) {
1473 		freeb(tmp);
1474 		return (error);
1475 	}
1476 	DB_CPID(tmp) = curproc->p_pid;
1477 	tmp->b_wptr += count;
1478 	bp->b_cont = tmp;
1479 
1480 	return (0);
1481 }
1482 
1483 /*
1484  * Copy ioctl data to user-land. Return non-zero errno on failure,
1485  * 0 for success.
1486  */
1487 int
getiocd(mblk_t * bp,char * arg,int copymode)1488 getiocd(mblk_t *bp, char *arg, int copymode)
1489 {
1490 	ssize_t count;
1491 	size_t  n;
1492 	int	error;
1493 
1494 	if (bp->b_datap->db_type == M_IOCACK)
1495 		count = ((struct iocblk *)bp->b_rptr)->ioc_count;
1496 	else {
1497 		ASSERT(bp->b_datap->db_type == M_COPYOUT);
1498 		count = ((struct copyreq *)bp->b_rptr)->cq_size;
1499 	}
1500 	ASSERT(count >= 0);
1501 
1502 	for (bp = bp->b_cont; bp && count;
1503 	    count -= n, bp = bp->b_cont, arg += n) {
1504 		n = MIN(count, bp->b_wptr - bp->b_rptr);
1505 		error = strcopyout(bp->b_rptr, arg, n, copymode);
1506 		if (error)
1507 			return (error);
1508 	}
1509 	ASSERT(count == 0);
1510 	return (0);
1511 }
1512 
1513 /*
1514  * Allocate a linkinfo entry given the write queue of the
1515  * bottom module of the top stream and the write queue of the
1516  * stream head of the bottom stream.
1517  */
1518 linkinfo_t *
alloclink(queue_t * qup,queue_t * qdown,file_t * fpdown)1519 alloclink(queue_t *qup, queue_t *qdown, file_t *fpdown)
1520 {
1521 	linkinfo_t *linkp;
1522 
1523 	linkp = kmem_cache_alloc(linkinfo_cache, KM_SLEEP);
1524 
1525 	linkp->li_lblk.l_qtop = qup;
1526 	linkp->li_lblk.l_qbot = qdown;
1527 	linkp->li_fpdown = fpdown;
1528 
1529 	mutex_enter(&strresources);
1530 	linkp->li_next = linkinfo_list;
1531 	linkp->li_prev = NULL;
1532 	if (linkp->li_next)
1533 		linkp->li_next->li_prev = linkp;
1534 	linkinfo_list = linkp;
1535 	linkp->li_lblk.l_index = ++lnk_id;
1536 	ASSERT(lnk_id != 0);	/* this should never wrap in practice */
1537 	mutex_exit(&strresources);
1538 
1539 	return (linkp);
1540 }
1541 
1542 /*
1543  * Free a linkinfo entry.
1544  */
1545 void
lbfree(linkinfo_t * linkp)1546 lbfree(linkinfo_t *linkp)
1547 {
1548 	mutex_enter(&strresources);
1549 	if (linkp->li_next)
1550 		linkp->li_next->li_prev = linkp->li_prev;
1551 	if (linkp->li_prev)
1552 		linkp->li_prev->li_next = linkp->li_next;
1553 	else
1554 		linkinfo_list = linkp->li_next;
1555 	mutex_exit(&strresources);
1556 
1557 	kmem_cache_free(linkinfo_cache, linkp);
1558 }
1559 
1560 /*
1561  * Check for a potential linking cycle.
1562  * Return 1 if a link will result in a cycle,
1563  * and 0 otherwise.
1564  */
1565 int
linkcycle(stdata_t * upstp,stdata_t * lostp,str_stack_t * ss)1566 linkcycle(stdata_t *upstp, stdata_t *lostp, str_stack_t *ss)
1567 {
1568 	struct mux_node *np;
1569 	struct mux_edge *ep;
1570 	int i;
1571 	major_t lomaj;
1572 	major_t upmaj;
1573 	/*
1574 	 * if the lower stream is a pipe/FIFO, return, since link
1575 	 * cycles can not happen on pipes/FIFOs
1576 	 */
1577 	if (lostp->sd_vnode->v_type == VFIFO)
1578 		return (0);
1579 
1580 	for (i = 0; i < ss->ss_devcnt; i++) {
1581 		np = &ss->ss_mux_nodes[i];
1582 		MUX_CLEAR(np);
1583 	}
1584 	lomaj = getmajor(lostp->sd_vnode->v_rdev);
1585 	upmaj = getmajor(upstp->sd_vnode->v_rdev);
1586 	np = &ss->ss_mux_nodes[lomaj];
1587 	for (;;) {
1588 		if (!MUX_DIDVISIT(np)) {
1589 			if (np->mn_imaj == upmaj)
1590 				return (1);
1591 			if (np->mn_outp == NULL) {
1592 				MUX_VISIT(np);
1593 				if (np->mn_originp == NULL)
1594 					return (0);
1595 				np = np->mn_originp;
1596 				continue;
1597 			}
1598 			MUX_VISIT(np);
1599 			np->mn_startp = np->mn_outp;
1600 		} else {
1601 			if (np->mn_startp == NULL) {
1602 				if (np->mn_originp == NULL)
1603 					return (0);
1604 				else {
1605 					np = np->mn_originp;
1606 					continue;
1607 				}
1608 			}
1609 			/*
1610 			 * If ep->me_nodep is a FIFO (me_nodep == NULL),
1611 			 * ignore the edge and move on. ep->me_nodep gets
1612 			 * set to NULL in mux_addedge() if it is a FIFO.
1613 			 *
1614 			 */
1615 			ep = np->mn_startp;
1616 			np->mn_startp = ep->me_nextp;
1617 			if (ep->me_nodep == NULL)
1618 				continue;
1619 			ep->me_nodep->mn_originp = np;
1620 			np = ep->me_nodep;
1621 		}
1622 	}
1623 }
1624 
1625 /*
1626  * Find linkinfo entry corresponding to the parameters.
1627  */
1628 linkinfo_t *
findlinks(stdata_t * stp,int index,int type,str_stack_t * ss)1629 findlinks(stdata_t *stp, int index, int type, str_stack_t *ss)
1630 {
1631 	linkinfo_t *linkp;
1632 	struct mux_edge *mep;
1633 	struct mux_node *mnp;
1634 	queue_t *qup;
1635 
1636 	mutex_enter(&strresources);
1637 	if ((type & LINKTYPEMASK) == LINKNORMAL) {
1638 		qup = getendq(stp->sd_wrq);
1639 		for (linkp = linkinfo_list; linkp; linkp = linkp->li_next) {
1640 			if ((qup == linkp->li_lblk.l_qtop) &&
1641 			    (!index || (index == linkp->li_lblk.l_index))) {
1642 				mutex_exit(&strresources);
1643 				return (linkp);
1644 			}
1645 		}
1646 	} else {
1647 		ASSERT((type & LINKTYPEMASK) == LINKPERSIST);
1648 		mnp = &ss->ss_mux_nodes[getmajor(stp->sd_vnode->v_rdev)];
1649 		mep = mnp->mn_outp;
1650 		while (mep) {
1651 			if ((index == 0) || (index == mep->me_muxid))
1652 				break;
1653 			mep = mep->me_nextp;
1654 		}
1655 		if (!mep) {
1656 			mutex_exit(&strresources);
1657 			return (NULL);
1658 		}
1659 		for (linkp = linkinfo_list; linkp; linkp = linkp->li_next) {
1660 			if ((!linkp->li_lblk.l_qtop) &&
1661 			    (mep->me_muxid == linkp->li_lblk.l_index)) {
1662 				mutex_exit(&strresources);
1663 				return (linkp);
1664 			}
1665 		}
1666 	}
1667 	mutex_exit(&strresources);
1668 	return (NULL);
1669 }
1670 
1671 /*
1672  * Given a queue ptr, follow the chain of q_next pointers until you reach the
1673  * last queue on the chain and return it.
1674  */
1675 queue_t *
getendq(queue_t * q)1676 getendq(queue_t *q)
1677 {
1678 	ASSERT(q != NULL);
1679 	while (_SAMESTR(q))
1680 		q = q->q_next;
1681 	return (q);
1682 }
1683 
1684 /*
1685  * Wait for the syncq count to drop to zero.
1686  * sq could be either outer or inner.
1687  */
1688 
1689 static void
wait_syncq(syncq_t * sq)1690 wait_syncq(syncq_t *sq)
1691 {
1692 	uint16_t count;
1693 
1694 	mutex_enter(SQLOCK(sq));
1695 	count = sq->sq_count;
1696 	SQ_PUTLOCKS_ENTER(sq);
1697 	SUM_SQ_PUTCOUNTS(sq, count);
1698 	while (count != 0) {
1699 		sq->sq_flags |= SQ_WANTWAKEUP;
1700 		SQ_PUTLOCKS_EXIT(sq);
1701 		cv_wait(&sq->sq_wait, SQLOCK(sq));
1702 		count = sq->sq_count;
1703 		SQ_PUTLOCKS_ENTER(sq);
1704 		SUM_SQ_PUTCOUNTS(sq, count);
1705 	}
1706 	SQ_PUTLOCKS_EXIT(sq);
1707 	mutex_exit(SQLOCK(sq));
1708 }
1709 
1710 /*
1711  * Wait while there are any messages for the queue in its syncq.
1712  */
1713 static void
wait_q_syncq(queue_t * q)1714 wait_q_syncq(queue_t *q)
1715 {
1716 	if ((q->q_sqflags & Q_SQQUEUED) || (q->q_syncqmsgs > 0)) {
1717 		syncq_t *sq = q->q_syncq;
1718 
1719 		mutex_enter(SQLOCK(sq));
1720 		while ((q->q_sqflags & Q_SQQUEUED) || (q->q_syncqmsgs > 0)) {
1721 			sq->sq_flags |= SQ_WANTWAKEUP;
1722 			cv_wait(&sq->sq_wait, SQLOCK(sq));
1723 		}
1724 		mutex_exit(SQLOCK(sq));
1725 	}
1726 }
1727 
1728 
1729 int
mlink_file(vnode_t * vp,int cmd,struct file * fpdown,cred_t * crp,int * rvalp,int lhlink)1730 mlink_file(vnode_t *vp, int cmd, struct file *fpdown, cred_t *crp, int *rvalp,
1731     int lhlink)
1732 {
1733 	struct stdata *stp;
1734 	struct strioctl strioc;
1735 	struct linkinfo *linkp;
1736 	struct stdata *stpdown;
1737 	struct streamtab *str;
1738 	queue_t *passq;
1739 	syncq_t *passyncq;
1740 	queue_t *rq;
1741 	cdevsw_impl_t *dp;
1742 	uint32_t qflag;
1743 	uint32_t sqtype;
1744 	perdm_t *dmp;
1745 	int error = 0;
1746 	netstack_t *ns;
1747 	str_stack_t *ss;
1748 
1749 	stp = vp->v_stream;
1750 	TRACE_1(TR_FAC_STREAMS_FR,
1751 	    TR_I_LINK, "I_LINK/I_PLINK:stp %p", stp);
1752 	/*
1753 	 * Test for invalid upper stream
1754 	 */
1755 	if (stp->sd_flag & STRHUP) {
1756 		return (ENXIO);
1757 	}
1758 	if (vp->v_type == VFIFO) {
1759 		return (EINVAL);
1760 	}
1761 	if (stp->sd_strtab == NULL) {
1762 		return (EINVAL);
1763 	}
1764 	if (!stp->sd_strtab->st_muxwinit) {
1765 		return (EINVAL);
1766 	}
1767 	if (fpdown == NULL) {
1768 		return (EBADF);
1769 	}
1770 	ns = netstack_find_by_cred(crp);
1771 	ASSERT(ns != NULL);
1772 	ss = ns->netstack_str;
1773 	ASSERT(ss != NULL);
1774 
1775 	if (getmajor(stp->sd_vnode->v_rdev) >= ss->ss_devcnt) {
1776 		netstack_rele(ss->ss_netstack);
1777 		return (EINVAL);
1778 	}
1779 	mutex_enter(&muxifier);
1780 	if (stp->sd_flag & STPLEX) {
1781 		mutex_exit(&muxifier);
1782 		netstack_rele(ss->ss_netstack);
1783 		return (ENXIO);
1784 	}
1785 
1786 	/*
1787 	 * Test for invalid lower stream.
1788 	 * The check for the v_type != VFIFO and having a major
1789 	 * number not >= devcnt is done to avoid problems with
1790 	 * adding mux_node entry past the end of mux_nodes[].
1791 	 * For FIFO's we don't add an entry so this isn't a
1792 	 * problem.
1793 	 */
1794 	if (((stpdown = fpdown->f_vnode->v_stream) == NULL) ||
1795 	    (stpdown == stp) || (stpdown->sd_flag &
1796 	    (STPLEX|STRHUP|STRDERR|STWRERR|IOCWAIT|STRPLUMB)) ||
1797 	    ((stpdown->sd_vnode->v_type != VFIFO) &&
1798 	    (getmajor(stpdown->sd_vnode->v_rdev) >= ss->ss_devcnt)) ||
1799 	    linkcycle(stp, stpdown, ss)) {
1800 		mutex_exit(&muxifier);
1801 		netstack_rele(ss->ss_netstack);
1802 		return (EINVAL);
1803 	}
1804 	TRACE_1(TR_FAC_STREAMS_FR,
1805 	    TR_STPDOWN, "stpdown:%p", stpdown);
1806 	rq = getendq(stp->sd_wrq);
1807 	if (cmd == I_PLINK)
1808 		rq = NULL;
1809 
1810 	linkp = alloclink(rq, stpdown->sd_wrq, fpdown);
1811 
1812 	strioc.ic_cmd = cmd;
1813 	strioc.ic_timout = INFTIM;
1814 	strioc.ic_len = sizeof (struct linkblk);
1815 	strioc.ic_dp = (char *)&linkp->li_lblk;
1816 
1817 	/*
1818 	 * STRPLUMB protects plumbing changes and should be set before
1819 	 * link_addpassthru()/link_rempassthru() are called, so it is set here
1820 	 * and cleared in the end of mlink when passthru queue is removed.
1821 	 * Setting of STRPLUMB prevents reopens of the stream while passthru
1822 	 * queue is in-place (it is not a proper module and doesn't have open
1823 	 * entry point).
1824 	 *
1825 	 * STPLEX prevents any threads from entering the stream from above. It
1826 	 * can't be set before the call to link_addpassthru() because putnext
1827 	 * from below may cause stream head I/O routines to be called and these
1828 	 * routines assert that STPLEX is not set. After link_addpassthru()
1829 	 * nothing may come from below since the pass queue syncq is blocked.
1830 	 * Note also that STPLEX should be cleared before the call to
1831 	 * link_rempassthru() since when messages start flowing to the stream
1832 	 * head (e.g. because of message propagation from the pass queue) stream
1833 	 * head I/O routines may be called with STPLEX flag set.
1834 	 *
1835 	 * When STPLEX is set, nothing may come into the stream from above and
1836 	 * it is safe to do a setq which will change stream head. So, the
1837 	 * correct sequence of actions is:
1838 	 *
1839 	 * 1) Set STRPLUMB
1840 	 * 2) Call link_addpassthru()
1841 	 * 3) Set STPLEX
1842 	 * 4) Call setq and update the stream state
1843 	 * 5) Clear STPLEX
1844 	 * 6) Call link_rempassthru()
1845 	 * 7) Clear STRPLUMB
1846 	 *
1847 	 * The same sequence applies to munlink() code.
1848 	 */
1849 	mutex_enter(&stpdown->sd_lock);
1850 	stpdown->sd_flag |= STRPLUMB;
1851 	mutex_exit(&stpdown->sd_lock);
1852 	/*
1853 	 * Add passthru queue below lower mux. This will block
1854 	 * syncqs of lower muxs read queue during I_LINK/I_UNLINK.
1855 	 */
1856 	passq = link_addpassthru(stpdown);
1857 
1858 	mutex_enter(&stpdown->sd_lock);
1859 	stpdown->sd_flag |= STPLEX;
1860 	mutex_exit(&stpdown->sd_lock);
1861 
1862 	rq = _RD(stpdown->sd_wrq);
1863 	/*
1864 	 * There may be messages in the streamhead's syncq due to messages
1865 	 * that arrived before link_addpassthru() was done. To avoid
1866 	 * background processing of the syncq happening simultaneous with
1867 	 * setq processing, we disable the streamhead syncq and wait until
1868 	 * existing background thread finishes working on it.
1869 	 */
1870 	wait_sq_svc(rq->q_syncq);
1871 	passyncq = passq->q_syncq;
1872 	if (!(passyncq->sq_flags & SQ_BLOCKED))
1873 		blocksq(passyncq, SQ_BLOCKED, 0);
1874 
1875 	ASSERT((rq->q_flag & QMT_TYPEMASK) == QMTSAFE);
1876 	ASSERT(rq->q_syncq == SQ(rq) && _WR(rq)->q_syncq == SQ(rq));
1877 	rq->q_ptr = _WR(rq)->q_ptr = NULL;
1878 
1879 	/* setq might sleep in allocator - avoid holding locks. */
1880 	/* Note: we are holding muxifier here. */
1881 
1882 	str = stp->sd_strtab;
1883 	dp = &devimpl[getmajor(vp->v_rdev)];
1884 	ASSERT(dp->d_str == str);
1885 
1886 	qflag = dp->d_qflag;
1887 	sqtype = dp->d_sqtype;
1888 
1889 	/* create perdm_t if needed */
1890 	if (NEED_DM(dp->d_dmp, qflag))
1891 		dp->d_dmp = hold_dm(str, qflag, sqtype);
1892 
1893 	dmp = dp->d_dmp;
1894 
1895 	setq(rq, str->st_muxrinit, str->st_muxwinit, dmp, qflag, sqtype,
1896 	    B_TRUE);
1897 
1898 	/*
1899 	 * XXX Remove any "odd" messages from the queue.
1900 	 * Keep only M_DATA, M_PROTO, M_PCPROTO.
1901 	 */
1902 	error = strdoioctl(stp, &strioc, FNATIVE,
1903 	    K_TO_K | STR_NOERROR | STR_NOSIG, crp, rvalp);
1904 	if (error != 0)
1905 		goto cleanup;
1906 
1907 	mutex_enter(&fpdown->f_tlock);
1908 	fpdown->f_count++;
1909 	mutex_exit(&fpdown->f_tlock);
1910 
1911 	/*
1912 	 * if we've made it here the linkage is all set up so we should also
1913 	 * set up the layered driver linkages
1914 	 */
1915 
1916 	ASSERT((cmd == I_LINK) || (cmd == I_PLINK));
1917 	if (cmd == I_LINK) {
1918 		error = ldi_mlink_fp(stp, fpdown, lhlink, LINKNORMAL);
1919 	} else {
1920 		error = ldi_mlink_fp(stp, fpdown, lhlink, LINKPERSIST);
1921 	}
1922 
1923 	if (error != 0) {
1924 		mutex_enter(&fpdown->f_tlock);
1925 		fpdown->f_count--;
1926 		mutex_exit(&fpdown->f_tlock);
1927 		goto cleanup;
1928 	}
1929 
1930 	link_rempassthru(passq);
1931 
1932 	mux_addedge(stp, stpdown, linkp->li_lblk.l_index, ss);
1933 
1934 	/*
1935 	 * Mark the upper stream as having dependent links
1936 	 * so that strclose can clean it up.
1937 	 */
1938 	if (cmd == I_LINK) {
1939 		mutex_enter(&stp->sd_lock);
1940 		stp->sd_flag |= STRHASLINKS;
1941 		mutex_exit(&stp->sd_lock);
1942 	}
1943 	/*
1944 	 * Wake up any other processes that may have been
1945 	 * waiting on the lower stream. These will all
1946 	 * error out.
1947 	 */
1948 	mutex_enter(&stpdown->sd_lock);
1949 	/* The passthru module is removed so we may release STRPLUMB */
1950 	stpdown->sd_flag &= ~STRPLUMB;
1951 	cv_broadcast(&rq->q_wait);
1952 	cv_broadcast(&_WR(rq)->q_wait);
1953 	cv_broadcast(&stpdown->sd_monitor);
1954 	mutex_exit(&stpdown->sd_lock);
1955 	mutex_exit(&muxifier);
1956 	*rvalp = linkp->li_lblk.l_index;
1957 	netstack_rele(ss->ss_netstack);
1958 	return (0);
1959 
1960 cleanup:
1961 	lbfree(linkp);
1962 
1963 	if (!(passyncq->sq_flags & SQ_BLOCKED))
1964 		blocksq(passyncq, SQ_BLOCKED, 0);
1965 	/*
1966 	 * Restore the stream head queue and then remove
1967 	 * the passq. Turn off STPLEX before we turn on
1968 	 * the stream by removing the passq.
1969 	 */
1970 	rq->q_ptr = _WR(rq)->q_ptr = stpdown;
1971 	setq(rq, &strdata, &stwdata, NULL, QMTSAFE, SQ_CI|SQ_CO,
1972 	    B_TRUE);
1973 
1974 	mutex_enter(&stpdown->sd_lock);
1975 	stpdown->sd_flag &= ~STPLEX;
1976 	mutex_exit(&stpdown->sd_lock);
1977 
1978 	link_rempassthru(passq);
1979 
1980 	mutex_enter(&stpdown->sd_lock);
1981 	stpdown->sd_flag &= ~STRPLUMB;
1982 	/* Wakeup anyone waiting for STRPLUMB to clear. */
1983 	cv_broadcast(&stpdown->sd_monitor);
1984 	mutex_exit(&stpdown->sd_lock);
1985 
1986 	mutex_exit(&muxifier);
1987 	netstack_rele(ss->ss_netstack);
1988 	return (error);
1989 }
1990 
1991 int
mlink(vnode_t * vp,int cmd,int arg,cred_t * crp,int * rvalp,int lhlink)1992 mlink(vnode_t *vp, int cmd, int arg, cred_t *crp, int *rvalp, int lhlink)
1993 {
1994 	int		ret;
1995 	struct file	*fpdown;
1996 
1997 	fpdown = getf(arg);
1998 	ret = mlink_file(vp, cmd, fpdown, crp, rvalp, lhlink);
1999 	if (fpdown != NULL)
2000 		releasef(arg);
2001 	return (ret);
2002 }
2003 
2004 /*
2005  * Unlink a multiplexor link. Stp is the controlling stream for the
2006  * link, and linkp points to the link's entry in the linkinfo list.
2007  * The muxifier lock must be held on entry and is dropped on exit.
2008  *
2009  * NOTE : Currently it is assumed that mux would process all the messages
2010  * sitting on it's queue before ACKing the UNLINK. It is the responsibility
2011  * of the mux to handle all the messages that arrive before UNLINK.
2012  * If the mux has to send down messages on its lower stream before
2013  * ACKing I_UNLINK, then it *should* know to handle messages even
2014  * after the UNLINK is acked (actually it should be able to handle till we
2015  * re-block the read side of the pass queue here). If the mux does not
2016  * open up the lower stream, any messages that arrive during UNLINK
2017  * will be put in the stream head. In the case of lower stream opening
2018  * up, some messages might land in the stream head depending on when
2019  * the message arrived and when the read side of the pass queue was
2020  * re-blocked.
2021  */
2022 int
munlink(stdata_t * stp,linkinfo_t * linkp,int flag,cred_t * crp,int * rvalp,str_stack_t * ss)2023 munlink(stdata_t *stp, linkinfo_t *linkp, int flag, cred_t *crp, int *rvalp,
2024     str_stack_t *ss)
2025 {
2026 	struct strioctl strioc;
2027 	struct stdata *stpdown;
2028 	queue_t *rq, *wrq;
2029 	queue_t	*passq;
2030 	syncq_t *passyncq;
2031 	int error = 0;
2032 	file_t *fpdown;
2033 
2034 	ASSERT(MUTEX_HELD(&muxifier));
2035 
2036 	stpdown = linkp->li_fpdown->f_vnode->v_stream;
2037 
2038 	/*
2039 	 * See the comment in mlink() concerning STRPLUMB/STPLEX flags.
2040 	 */
2041 	mutex_enter(&stpdown->sd_lock);
2042 	stpdown->sd_flag |= STRPLUMB;
2043 	mutex_exit(&stpdown->sd_lock);
2044 
2045 	/*
2046 	 * Add passthru queue below lower mux. This will block
2047 	 * syncqs of lower muxs read queue during I_LINK/I_UNLINK.
2048 	 */
2049 	passq = link_addpassthru(stpdown);
2050 
2051 	if ((flag & LINKTYPEMASK) == LINKNORMAL)
2052 		strioc.ic_cmd = I_UNLINK;
2053 	else
2054 		strioc.ic_cmd = I_PUNLINK;
2055 	strioc.ic_timout = INFTIM;
2056 	strioc.ic_len = sizeof (struct linkblk);
2057 	strioc.ic_dp = (char *)&linkp->li_lblk;
2058 
2059 	error = strdoioctl(stp, &strioc, FNATIVE,
2060 	    K_TO_K | STR_NOERROR | STR_NOSIG, crp, rvalp);
2061 
2062 	/*
2063 	 * If there was an error and this is not called via strclose,
2064 	 * return to the user. Otherwise, pretend there was no error
2065 	 * and close the link.
2066 	 */
2067 	if (error) {
2068 		if (flag & LINKCLOSE) {
2069 			cmn_err(CE_WARN, "KERNEL: munlink: could not perform "
2070 			    "unlink ioctl, closing anyway (%d)\n", error);
2071 		} else {
2072 			link_rempassthru(passq);
2073 			mutex_enter(&stpdown->sd_lock);
2074 			stpdown->sd_flag &= ~STRPLUMB;
2075 			cv_broadcast(&stpdown->sd_monitor);
2076 			mutex_exit(&stpdown->sd_lock);
2077 			mutex_exit(&muxifier);
2078 			return (error);
2079 		}
2080 	}
2081 
2082 	mux_rmvedge(stp, linkp->li_lblk.l_index, ss);
2083 	fpdown = linkp->li_fpdown;
2084 	lbfree(linkp);
2085 
2086 	/*
2087 	 * We go ahead and drop muxifier here--it's a nasty global lock that
2088 	 * can slow others down. It's okay to since attempts to mlink() this
2089 	 * stream will be stopped because STPLEX is still set in the stdata
2090 	 * structure, and munlink() is stopped because mux_rmvedge() and
2091 	 * lbfree() have removed it from mux_nodes[] and linkinfo_list,
2092 	 * respectively.  Note that we defer the closef() of fpdown until
2093 	 * after we drop muxifier since strclose() can call munlinkall().
2094 	 */
2095 	mutex_exit(&muxifier);
2096 
2097 	wrq = stpdown->sd_wrq;
2098 	rq = _RD(wrq);
2099 
2100 	/*
2101 	 * Get rid of outstanding service procedure runs, before we make
2102 	 * it a stream head, since a stream head doesn't have any service
2103 	 * procedure.
2104 	 */
2105 	disable_svc(rq);
2106 	wait_svc(rq);
2107 
2108 	/*
2109 	 * Since we don't disable the syncq for QPERMOD, we wait for whatever
2110 	 * is queued up to be finished. mux should take care that nothing is
2111 	 * send down to this queue. We should do it now as we're going to block
2112 	 * passyncq if it was unblocked.
2113 	 */
2114 	if (wrq->q_flag & QPERMOD) {
2115 		syncq_t	*sq = wrq->q_syncq;
2116 
2117 		mutex_enter(SQLOCK(sq));
2118 		while (wrq->q_sqflags & Q_SQQUEUED) {
2119 			sq->sq_flags |= SQ_WANTWAKEUP;
2120 			cv_wait(&sq->sq_wait, SQLOCK(sq));
2121 		}
2122 		mutex_exit(SQLOCK(sq));
2123 	}
2124 	passyncq = passq->q_syncq;
2125 	if (!(passyncq->sq_flags & SQ_BLOCKED)) {
2126 
2127 		syncq_t *sq, *outer;
2128 
2129 		/*
2130 		 * Messages could be flowing from underneath. We will
2131 		 * block the read side of the passq. This would be
2132 		 * sufficient for QPAIR and QPERQ muxes to ensure
2133 		 * that no data is flowing up into this queue
2134 		 * and hence no thread active in this instance of
2135 		 * lower mux. But for QPERMOD and QMTOUTPERIM there
2136 		 * could be messages on the inner and outer/inner
2137 		 * syncqs respectively. We will wait for them to drain.
2138 		 * Because passq is blocked messages end up in the syncq
2139 		 * And qfill_syncq could possibly end up setting QFULL
2140 		 * which will access the rq->q_flag. Hence, we have to
2141 		 * acquire the QLOCK in setq.
2142 		 *
2143 		 * XXX Messages can also flow from top into this
2144 		 * queue though the unlink is over (Ex. some instance
2145 		 * in putnext() called from top that has still not
2146 		 * accessed this queue. And also putq(lowerq) ?).
2147 		 * Solution : How about blocking the l_qtop queue ?
2148 		 * Do we really care about such pure D_MP muxes ?
2149 		 */
2150 
2151 		blocksq(passyncq, SQ_BLOCKED, 0);
2152 
2153 		sq = rq->q_syncq;
2154 		if ((outer = sq->sq_outer) != NULL) {
2155 
2156 			/*
2157 			 * We have to just wait for the outer sq_count
2158 			 * drop to zero. As this does not prevent new
2159 			 * messages to enter the outer perimeter, this
2160 			 * is subject to starvation.
2161 			 *
2162 			 * NOTE :Because of blocksq above, messages could
2163 			 * be in the inner syncq only because of some
2164 			 * thread holding the outer perimeter exclusively.
2165 			 * Hence it would be sufficient to wait for the
2166 			 * exclusive holder of the outer perimeter to drain
2167 			 * the inner and outer syncqs. But we will not depend
2168 			 * on this feature and hence check the inner syncqs
2169 			 * separately.
2170 			 */
2171 			wait_syncq(outer);
2172 		}
2173 
2174 
2175 		/*
2176 		 * There could be messages destined for
2177 		 * this queue. Let the exclusive holder
2178 		 * drain it.
2179 		 */
2180 
2181 		wait_syncq(sq);
2182 		ASSERT((rq->q_flag & QPERMOD) ||
2183 		    ((rq->q_syncq->sq_head == NULL) &&
2184 		    (_WR(rq)->q_syncq->sq_head == NULL)));
2185 	}
2186 
2187 	/*
2188 	 * We haven't taken care of QPERMOD case yet. QPERMOD is a special
2189 	 * case as we don't disable its syncq or remove it off the syncq
2190 	 * service list.
2191 	 */
2192 	if (rq->q_flag & QPERMOD) {
2193 		syncq_t	*sq = rq->q_syncq;
2194 
2195 		mutex_enter(SQLOCK(sq));
2196 		while (rq->q_sqflags & Q_SQQUEUED) {
2197 			sq->sq_flags |= SQ_WANTWAKEUP;
2198 			cv_wait(&sq->sq_wait, SQLOCK(sq));
2199 		}
2200 		mutex_exit(SQLOCK(sq));
2201 	}
2202 
2203 	/*
2204 	 * flush_syncq changes states only when there are some messages to
2205 	 * free, i.e. when it returns non-zero value to return.
2206 	 */
2207 	ASSERT(flush_syncq(rq->q_syncq, rq) == 0);
2208 	ASSERT(flush_syncq(wrq->q_syncq, wrq) == 0);
2209 
2210 	/*
2211 	 * Nobody else should know about this queue now.
2212 	 * If the mux did not process the messages before
2213 	 * acking the I_UNLINK, free them now.
2214 	 */
2215 
2216 	flushq(rq, FLUSHALL);
2217 	flushq(_WR(rq), FLUSHALL);
2218 
2219 	/*
2220 	 * Convert the mux lower queue into a stream head queue.
2221 	 * Turn off STPLEX before we turn on the stream by removing the passq.
2222 	 */
2223 	rq->q_ptr = wrq->q_ptr = stpdown;
2224 	setq(rq, &strdata, &stwdata, NULL, QMTSAFE, SQ_CI|SQ_CO, B_TRUE);
2225 
2226 	ASSERT((rq->q_flag & QMT_TYPEMASK) == QMTSAFE);
2227 	ASSERT(rq->q_syncq == SQ(rq) && _WR(rq)->q_syncq == SQ(rq));
2228 
2229 	enable_svc(rq);
2230 
2231 	/*
2232 	 * Now it is a proper stream, so STPLEX is cleared. But STRPLUMB still
2233 	 * needs to be set to prevent reopen() of the stream - such reopen may
2234 	 * try to call non-existent pass queue open routine and panic.
2235 	 */
2236 	mutex_enter(&stpdown->sd_lock);
2237 	stpdown->sd_flag &= ~STPLEX;
2238 	mutex_exit(&stpdown->sd_lock);
2239 
2240 	ASSERT(((flag & LINKTYPEMASK) == LINKNORMAL) ||
2241 	    ((flag & LINKTYPEMASK) == LINKPERSIST));
2242 
2243 	/* clean up the layered driver linkages */
2244 	if ((flag & LINKTYPEMASK) == LINKNORMAL) {
2245 		VERIFY0(ldi_munlink_fp(stp, fpdown, LINKNORMAL));
2246 	} else {
2247 		VERIFY0(ldi_munlink_fp(stp, fpdown, LINKPERSIST));
2248 	}
2249 
2250 	link_rempassthru(passq);
2251 
2252 	/*
2253 	 * Now all plumbing changes are finished and STRPLUMB is no
2254 	 * longer needed.
2255 	 */
2256 	mutex_enter(&stpdown->sd_lock);
2257 	stpdown->sd_flag &= ~STRPLUMB;
2258 	cv_broadcast(&stpdown->sd_monitor);
2259 	mutex_exit(&stpdown->sd_lock);
2260 
2261 	(void) closef(fpdown);
2262 	return (0);
2263 }
2264 
2265 /*
2266  * Unlink all multiplexor links for which stp is the controlling stream.
2267  * Return 0, or a non-zero errno on failure.
2268  */
2269 int
munlinkall(stdata_t * stp,int flag,cred_t * crp,int * rvalp,str_stack_t * ss)2270 munlinkall(stdata_t *stp, int flag, cred_t *crp, int *rvalp, str_stack_t *ss)
2271 {
2272 	linkinfo_t *linkp;
2273 	int error = 0;
2274 
2275 	mutex_enter(&muxifier);
2276 	while (linkp = findlinks(stp, 0, flag, ss)) {
2277 		/*
2278 		 * munlink() releases the muxifier lock.
2279 		 */
2280 		if (error = munlink(stp, linkp, flag, crp, rvalp, ss))
2281 			return (error);
2282 		mutex_enter(&muxifier);
2283 	}
2284 	mutex_exit(&muxifier);
2285 	return (0);
2286 }
2287 
2288 /*
2289  * A multiplexor link has been made. Add an
2290  * edge to the directed graph.
2291  */
2292 void
mux_addedge(stdata_t * upstp,stdata_t * lostp,int muxid,str_stack_t * ss)2293 mux_addedge(stdata_t *upstp, stdata_t *lostp, int muxid, str_stack_t *ss)
2294 {
2295 	struct mux_node *np;
2296 	struct mux_edge *ep;
2297 	major_t upmaj;
2298 	major_t lomaj;
2299 
2300 	upmaj = getmajor(upstp->sd_vnode->v_rdev);
2301 	lomaj = getmajor(lostp->sd_vnode->v_rdev);
2302 	np = &ss->ss_mux_nodes[upmaj];
2303 	if (np->mn_outp) {
2304 		ep = np->mn_outp;
2305 		while (ep->me_nextp)
2306 			ep = ep->me_nextp;
2307 		ep->me_nextp = kmem_alloc(sizeof (struct mux_edge), KM_SLEEP);
2308 		ep = ep->me_nextp;
2309 	} else {
2310 		np->mn_outp = kmem_alloc(sizeof (struct mux_edge), KM_SLEEP);
2311 		ep = np->mn_outp;
2312 	}
2313 	ep->me_nextp = NULL;
2314 	ep->me_muxid = muxid;
2315 	/*
2316 	 * Save the dev_t for the purposes of str_stack_shutdown.
2317 	 * str_stack_shutdown assumes that the device allows reopen, since
2318 	 * this dev_t is the one after any cloning by xx_open().
2319 	 * Would prefer finding the dev_t from before any cloning,
2320 	 * but specfs doesn't retain that.
2321 	 */
2322 	ep->me_dev = upstp->sd_vnode->v_rdev;
2323 	if (lostp->sd_vnode->v_type == VFIFO)
2324 		ep->me_nodep = NULL;
2325 	else
2326 		ep->me_nodep = &ss->ss_mux_nodes[lomaj];
2327 }
2328 
2329 /*
2330  * A multiplexor link has been removed. Remove the
2331  * edge in the directed graph.
2332  */
2333 void
mux_rmvedge(stdata_t * upstp,int muxid,str_stack_t * ss)2334 mux_rmvedge(stdata_t *upstp, int muxid, str_stack_t *ss)
2335 {
2336 	struct mux_node *np;
2337 	struct mux_edge *ep;
2338 	struct mux_edge *pep = NULL;
2339 	major_t upmaj;
2340 
2341 	upmaj = getmajor(upstp->sd_vnode->v_rdev);
2342 	np = &ss->ss_mux_nodes[upmaj];
2343 	ASSERT(np->mn_outp != NULL);
2344 	ep = np->mn_outp;
2345 	while (ep) {
2346 		if (ep->me_muxid == muxid) {
2347 			if (pep)
2348 				pep->me_nextp = ep->me_nextp;
2349 			else
2350 				np->mn_outp = ep->me_nextp;
2351 			kmem_free(ep, sizeof (struct mux_edge));
2352 			return;
2353 		}
2354 		pep = ep;
2355 		ep = ep->me_nextp;
2356 	}
2357 	ASSERT(0);	/* should not reach here */
2358 }
2359 
2360 /*
2361  * Translate the device flags (from conf.h) to the corresponding
2362  * qflag and sq_flag (type) values.
2363  */
2364 int
devflg_to_qflag(struct streamtab * stp,uint32_t devflag,uint32_t * qflagp,uint32_t * sqtypep)2365 devflg_to_qflag(struct streamtab *stp, uint32_t devflag, uint32_t *qflagp,
2366     uint32_t *sqtypep)
2367 {
2368 	uint32_t qflag = 0;
2369 	uint32_t sqtype = 0;
2370 
2371 	if (devflag & _D_OLD)
2372 		goto bad;
2373 
2374 	/* Inner perimeter presence and scope */
2375 	switch (devflag & D_MTINNER_MASK) {
2376 	case D_MP:
2377 		qflag |= QMTSAFE;
2378 		sqtype |= SQ_CI;
2379 		break;
2380 	case D_MTPERQ|D_MP:
2381 		qflag |= QPERQ;
2382 		break;
2383 	case D_MTQPAIR|D_MP:
2384 		qflag |= QPAIR;
2385 		break;
2386 	case D_MTPERMOD|D_MP:
2387 		qflag |= QPERMOD;
2388 		break;
2389 	default:
2390 		goto bad;
2391 	}
2392 
2393 	/* Outer perimeter */
2394 	if (devflag & D_MTOUTPERIM) {
2395 		switch (devflag & D_MTINNER_MASK) {
2396 		case D_MP:
2397 		case D_MTPERQ|D_MP:
2398 		case D_MTQPAIR|D_MP:
2399 			break;
2400 		default:
2401 			goto bad;
2402 		}
2403 		qflag |= QMTOUTPERIM;
2404 	}
2405 
2406 	/* Inner perimeter modifiers */
2407 	if (devflag & D_MTINNER_MOD) {
2408 		switch (devflag & D_MTINNER_MASK) {
2409 		case D_MP:
2410 			goto bad;
2411 		default:
2412 			break;
2413 		}
2414 		if (devflag & D_MTPUTSHARED)
2415 			sqtype |= SQ_CIPUT;
2416 		if (devflag & _D_MTOCSHARED) {
2417 			/*
2418 			 * The code in putnext assumes that it has the
2419 			 * highest concurrency by not checking sq_count.
2420 			 * Thus _D_MTOCSHARED can only be supported when
2421 			 * D_MTPUTSHARED is set.
2422 			 */
2423 			if (!(devflag & D_MTPUTSHARED))
2424 				goto bad;
2425 			sqtype |= SQ_CIOC;
2426 		}
2427 		if (devflag & _D_MTCBSHARED) {
2428 			/*
2429 			 * The code in putnext assumes that it has the
2430 			 * highest concurrency by not checking sq_count.
2431 			 * Thus _D_MTCBSHARED can only be supported when
2432 			 * D_MTPUTSHARED is set.
2433 			 */
2434 			if (!(devflag & D_MTPUTSHARED))
2435 				goto bad;
2436 			sqtype |= SQ_CICB;
2437 		}
2438 		if (devflag & _D_MTSVCSHARED) {
2439 			/*
2440 			 * The code in putnext assumes that it has the
2441 			 * highest concurrency by not checking sq_count.
2442 			 * Thus _D_MTSVCSHARED can only be supported when
2443 			 * D_MTPUTSHARED is set. Also _D_MTSVCSHARED is
2444 			 * supported only for QPERMOD.
2445 			 */
2446 			if (!(devflag & D_MTPUTSHARED) || !(qflag & QPERMOD))
2447 				goto bad;
2448 			sqtype |= SQ_CISVC;
2449 		}
2450 	}
2451 
2452 	/* Default outer perimeter concurrency */
2453 	sqtype |= SQ_CO;
2454 
2455 	/* Outer perimeter modifiers */
2456 	if (devflag & D_MTOCEXCL) {
2457 		if (!(devflag & D_MTOUTPERIM)) {
2458 			/* No outer perimeter */
2459 			goto bad;
2460 		}
2461 		sqtype &= ~SQ_COOC;
2462 	}
2463 
2464 	/* Synchronous Streams extended qinit structure */
2465 	if (devflag & D_SYNCSTR)
2466 		qflag |= QSYNCSTR;
2467 
2468 	/*
2469 	 * Private flag used by a transport module to indicate
2470 	 * to sockfs that it supports direct-access mode without
2471 	 * having to go through STREAMS.
2472 	 */
2473 	if (devflag & _D_DIRECT) {
2474 		/* Reject unless the module is fully-MT (no perimeter) */
2475 		if ((qflag & QMT_TYPEMASK) != QMTSAFE)
2476 			goto bad;
2477 		qflag |= _QDIRECT;
2478 	}
2479 
2480 	/*
2481 	 * Private flag used to indicate that a streams module should only
2482 	 * be pushed once. The TTY streams modules have this flag since if
2483 	 * libc believes itself to be an xpg4 process then it will
2484 	 * automatically and unconditionally push them when a PTS device is
2485 	 * opened. If an application is not aware of this then without this
2486 	 * flag we would end up with duplicate modules.
2487 	 */
2488 	if (devflag & _D_SINGLE_INSTANCE)
2489 		qflag |= _QSINGLE_INSTANCE;
2490 
2491 	*qflagp = qflag;
2492 	*sqtypep = sqtype;
2493 	return (0);
2494 
2495 bad:
2496 	cmn_err(CE_WARN,
2497 	    "stropen: bad MT flags (0x%x) in driver '%s'",
2498 	    (int)(qflag & D_MTSAFETY_MASK),
2499 	    stp->st_rdinit->qi_minfo->mi_idname);
2500 
2501 	return (EINVAL);
2502 }
2503 
2504 /*
2505  * Set the interface values for a pair of queues (qinit structure,
2506  * packet sizes, water marks).
2507  * setq assumes that the caller does not have a claim (entersq or claimq)
2508  * on the queue.
2509  */
2510 void
setq(queue_t * rq,struct qinit * rinit,struct qinit * winit,perdm_t * dmp,uint32_t qflag,uint32_t sqtype,boolean_t lock_needed)2511 setq(queue_t *rq, struct qinit *rinit, struct qinit *winit,
2512     perdm_t *dmp, uint32_t qflag, uint32_t sqtype, boolean_t lock_needed)
2513 {
2514 	queue_t *wq;
2515 	syncq_t	*sq, *outer;
2516 
2517 	ASSERT(rq->q_flag & QREADR);
2518 	ASSERT((qflag & QMT_TYPEMASK) != 0);
2519 	IMPLY((qflag & (QPERMOD | QMTOUTPERIM)), dmp != NULL);
2520 
2521 	wq = _WR(rq);
2522 	rq->q_qinfo = rinit;
2523 	rq->q_hiwat = rinit->qi_minfo->mi_hiwat;
2524 	rq->q_lowat = rinit->qi_minfo->mi_lowat;
2525 	rq->q_minpsz = rinit->qi_minfo->mi_minpsz;
2526 	rq->q_maxpsz = rinit->qi_minfo->mi_maxpsz;
2527 	wq->q_qinfo = winit;
2528 	wq->q_hiwat = winit->qi_minfo->mi_hiwat;
2529 	wq->q_lowat = winit->qi_minfo->mi_lowat;
2530 	wq->q_minpsz = winit->qi_minfo->mi_minpsz;
2531 	wq->q_maxpsz = winit->qi_minfo->mi_maxpsz;
2532 
2533 	/* Remove old syncqs */
2534 	sq = rq->q_syncq;
2535 	outer = sq->sq_outer;
2536 	if (outer != NULL) {
2537 		ASSERT(wq->q_syncq->sq_outer == outer);
2538 		outer_remove(outer, rq->q_syncq);
2539 		if (wq->q_syncq != rq->q_syncq)
2540 			outer_remove(outer, wq->q_syncq);
2541 	}
2542 	ASSERT(sq->sq_outer == NULL);
2543 	ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL);
2544 
2545 	if (sq != SQ(rq)) {
2546 		if (!(rq->q_flag & QPERMOD))
2547 			free_syncq(sq);
2548 		if (wq->q_syncq == rq->q_syncq)
2549 			wq->q_syncq = NULL;
2550 		rq->q_syncq = NULL;
2551 	}
2552 	if (wq->q_syncq != NULL && wq->q_syncq != sq &&
2553 	    wq->q_syncq != SQ(rq)) {
2554 		free_syncq(wq->q_syncq);
2555 		wq->q_syncq = NULL;
2556 	}
2557 	ASSERT(rq->q_syncq == NULL || (rq->q_syncq->sq_head == NULL &&
2558 	    rq->q_syncq->sq_tail == NULL));
2559 	ASSERT(wq->q_syncq == NULL || (wq->q_syncq->sq_head == NULL &&
2560 	    wq->q_syncq->sq_tail == NULL));
2561 
2562 	if (!(rq->q_flag & QPERMOD) &&
2563 	    rq->q_syncq != NULL && rq->q_syncq->sq_ciputctrl != NULL) {
2564 		ASSERT(rq->q_syncq->sq_nciputctrl == n_ciputctrl - 1);
2565 		SUMCHECK_CIPUTCTRL_COUNTS(rq->q_syncq->sq_ciputctrl,
2566 		    rq->q_syncq->sq_nciputctrl, 0);
2567 		ASSERT(ciputctrl_cache != NULL);
2568 		kmem_cache_free(ciputctrl_cache, rq->q_syncq->sq_ciputctrl);
2569 		rq->q_syncq->sq_ciputctrl = NULL;
2570 		rq->q_syncq->sq_nciputctrl = 0;
2571 	}
2572 
2573 	if (!(wq->q_flag & QPERMOD) &&
2574 	    wq->q_syncq != NULL && wq->q_syncq->sq_ciputctrl != NULL) {
2575 		ASSERT(wq->q_syncq->sq_nciputctrl == n_ciputctrl - 1);
2576 		SUMCHECK_CIPUTCTRL_COUNTS(wq->q_syncq->sq_ciputctrl,
2577 		    wq->q_syncq->sq_nciputctrl, 0);
2578 		ASSERT(ciputctrl_cache != NULL);
2579 		kmem_cache_free(ciputctrl_cache, wq->q_syncq->sq_ciputctrl);
2580 		wq->q_syncq->sq_ciputctrl = NULL;
2581 		wq->q_syncq->sq_nciputctrl = 0;
2582 	}
2583 
2584 	sq = SQ(rq);
2585 	ASSERT(sq->sq_head == NULL && sq->sq_tail == NULL);
2586 	ASSERT(sq->sq_outer == NULL);
2587 	ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL);
2588 
2589 	/*
2590 	 * Create syncqs based on qflag and sqtype. Set the SQ_TYPES_IN_FLAGS
2591 	 * bits in sq_flag based on the sqtype.
2592 	 */
2593 	ASSERT((sq->sq_flags & ~SQ_TYPES_IN_FLAGS) == 0);
2594 
2595 	rq->q_syncq = wq->q_syncq = sq;
2596 	sq->sq_type = sqtype;
2597 	sq->sq_flags = (sqtype & SQ_TYPES_IN_FLAGS);
2598 
2599 	/*
2600 	 *  We are making sq_svcflags zero,
2601 	 *  resetting SQ_DISABLED in case it was set by
2602 	 *  wait_svc() in the munlink path.
2603 	 *
2604 	 */
2605 	ASSERT((sq->sq_svcflags & SQ_SERVICE) == 0);
2606 	sq->sq_svcflags = 0;
2607 
2608 	/*
2609 	 * We need to acquire the lock here for the mlink and munlink case,
2610 	 * where canputnext, backenable, etc can access the q_flag.
2611 	 */
2612 	if (lock_needed) {
2613 		mutex_enter(QLOCK(rq));
2614 		rq->q_flag = (rq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
2615 		mutex_exit(QLOCK(rq));
2616 		mutex_enter(QLOCK(wq));
2617 		wq->q_flag = (wq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
2618 		mutex_exit(QLOCK(wq));
2619 	} else {
2620 		rq->q_flag = (rq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
2621 		wq->q_flag = (wq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
2622 	}
2623 
2624 	if (qflag & QPERQ) {
2625 		/* Allocate a separate syncq for the write side */
2626 		sq = new_syncq();
2627 		sq->sq_type = rq->q_syncq->sq_type;
2628 		sq->sq_flags = rq->q_syncq->sq_flags;
2629 		ASSERT(sq->sq_outer == NULL && sq->sq_onext == NULL &&
2630 		    sq->sq_oprev == NULL);
2631 		wq->q_syncq = sq;
2632 	}
2633 	if (qflag & QPERMOD) {
2634 		sq = dmp->dm_sq;
2635 
2636 		/*
2637 		 * Assert that we do have an inner perimeter syncq and that it
2638 		 * does not have an outer perimeter associated with it.
2639 		 */
2640 		ASSERT(sq->sq_outer == NULL && sq->sq_onext == NULL &&
2641 		    sq->sq_oprev == NULL);
2642 		rq->q_syncq = wq->q_syncq = sq;
2643 	}
2644 	if (qflag & QMTOUTPERIM) {
2645 		outer = dmp->dm_sq;
2646 
2647 		ASSERT(outer->sq_outer == NULL);
2648 		outer_insert(outer, rq->q_syncq);
2649 		if (wq->q_syncq != rq->q_syncq)
2650 			outer_insert(outer, wq->q_syncq);
2651 	}
2652 	ASSERT((rq->q_syncq->sq_flags & SQ_TYPES_IN_FLAGS) ==
2653 	    (rq->q_syncq->sq_type & SQ_TYPES_IN_FLAGS));
2654 	ASSERT((wq->q_syncq->sq_flags & SQ_TYPES_IN_FLAGS) ==
2655 	    (wq->q_syncq->sq_type & SQ_TYPES_IN_FLAGS));
2656 	ASSERT((rq->q_flag & QMT_TYPEMASK) == (qflag & QMT_TYPEMASK));
2657 
2658 	/*
2659 	 * Initialize struio() types.
2660 	 */
2661 	rq->q_struiot =
2662 	    (rq->q_flag & QSYNCSTR) ? rinit->qi_struiot : STRUIOT_NONE;
2663 	wq->q_struiot =
2664 	    (wq->q_flag & QSYNCSTR) ? winit->qi_struiot : STRUIOT_NONE;
2665 }
2666 
2667 perdm_t *
hold_dm(struct streamtab * str,uint32_t qflag,uint32_t sqtype)2668 hold_dm(struct streamtab *str, uint32_t qflag, uint32_t sqtype)
2669 {
2670 	syncq_t	*sq;
2671 	perdm_t	**pp;
2672 	perdm_t	*p;
2673 	perdm_t	*dmp;
2674 
2675 	ASSERT(str != NULL);
2676 	ASSERT(qflag & (QPERMOD | QMTOUTPERIM));
2677 
2678 	rw_enter(&perdm_rwlock, RW_READER);
2679 	for (p = perdm_list; p != NULL; p = p->dm_next) {
2680 		if (p->dm_str == str) {	/* found one */
2681 			atomic_inc_32(&(p->dm_ref));
2682 			rw_exit(&perdm_rwlock);
2683 			return (p);
2684 		}
2685 	}
2686 	rw_exit(&perdm_rwlock);
2687 
2688 	sq = new_syncq();
2689 	if (qflag & QPERMOD) {
2690 		sq->sq_type = sqtype | SQ_PERMOD;
2691 		sq->sq_flags = sqtype & SQ_TYPES_IN_FLAGS;
2692 	} else {
2693 		ASSERT(qflag & QMTOUTPERIM);
2694 		sq->sq_onext = sq->sq_oprev = sq;
2695 	}
2696 
2697 	dmp = kmem_alloc(sizeof (perdm_t), KM_SLEEP);
2698 	dmp->dm_sq = sq;
2699 	dmp->dm_str = str;
2700 	dmp->dm_ref = 1;
2701 	dmp->dm_next = NULL;
2702 
2703 	rw_enter(&perdm_rwlock, RW_WRITER);
2704 	for (pp = &perdm_list; (p = *pp) != NULL; pp = &(p->dm_next)) {
2705 		if (p->dm_str == str) {	/* already present */
2706 			p->dm_ref++;
2707 			rw_exit(&perdm_rwlock);
2708 			free_syncq(sq);
2709 			kmem_free(dmp, sizeof (perdm_t));
2710 			return (p);
2711 		}
2712 	}
2713 
2714 	*pp = dmp;
2715 	rw_exit(&perdm_rwlock);
2716 	return (dmp);
2717 }
2718 
2719 void
rele_dm(perdm_t * dmp)2720 rele_dm(perdm_t *dmp)
2721 {
2722 	perdm_t **pp;
2723 	perdm_t *p;
2724 
2725 	rw_enter(&perdm_rwlock, RW_WRITER);
2726 	ASSERT(dmp->dm_ref > 0);
2727 
2728 	if (--dmp->dm_ref > 0) {
2729 		rw_exit(&perdm_rwlock);
2730 		return;
2731 	}
2732 
2733 	for (pp = &perdm_list; (p = *pp) != NULL; pp = &(p->dm_next))
2734 		if (p == dmp)
2735 			break;
2736 	ASSERT(p == dmp);
2737 	*pp = p->dm_next;
2738 	rw_exit(&perdm_rwlock);
2739 
2740 	/*
2741 	 * Wait for any background processing that relies on the
2742 	 * syncq to complete before it is freed.
2743 	 */
2744 	wait_sq_svc(p->dm_sq);
2745 	free_syncq(p->dm_sq);
2746 	kmem_free(p, sizeof (perdm_t));
2747 }
2748 
2749 /*
2750  * Make a protocol message given control and data buffers.
2751  * n.b., this can block; be careful of what locks you hold when calling it.
2752  *
2753  * If sd_maxblk is less than *iosize this routine can fail part way through
2754  * (due to an allocation failure). In this case on return *iosize will contain
2755  * the amount that was consumed. Otherwise *iosize will not be modified
2756  * i.e. it will contain the amount that was consumed.
2757  */
2758 int
strmakemsg(struct strbuf * mctl,ssize_t * iosize,struct uio * uiop,stdata_t * stp,int32_t flag,mblk_t ** mpp)2759 strmakemsg(
2760 	struct strbuf *mctl,
2761 	ssize_t *iosize,
2762 	struct uio *uiop,
2763 	stdata_t *stp,
2764 	int32_t flag,
2765 	mblk_t **mpp)
2766 {
2767 	mblk_t *mpctl = NULL;
2768 	mblk_t *mpdata = NULL;
2769 	int error;
2770 
2771 	ASSERT(uiop != NULL);
2772 
2773 	*mpp = NULL;
2774 	/* Create control part, if any */
2775 	if ((mctl != NULL) && (mctl->len >= 0)) {
2776 		error = strmakectl(mctl, flag, uiop->uio_fmode, &mpctl);
2777 		if (error)
2778 			return (error);
2779 	}
2780 	/* Create data part, if any */
2781 	if (*iosize >= 0) {
2782 		error = strmakedata(iosize, uiop, stp, flag, &mpdata);
2783 		if (error) {
2784 			freemsg(mpctl);
2785 			return (error);
2786 		}
2787 	}
2788 	if (mpctl != NULL) {
2789 		if (mpdata != NULL)
2790 			linkb(mpctl, mpdata);
2791 		*mpp = mpctl;
2792 	} else {
2793 		*mpp = mpdata;
2794 	}
2795 	return (0);
2796 }
2797 
2798 /*
2799  * Make the control part of a protocol message given a control buffer.
2800  * n.b., this can block; be careful of what locks you hold when calling it.
2801  */
2802 int
strmakectl(struct strbuf * mctl,int32_t flag,int32_t fflag,mblk_t ** mpp)2803 strmakectl(
2804 	struct strbuf *mctl,
2805 	int32_t flag,
2806 	int32_t fflag,
2807 	mblk_t **mpp)
2808 {
2809 	mblk_t *bp = NULL;
2810 	unsigned char msgtype;
2811 	int error = 0;
2812 	cred_t *cr = CRED();
2813 
2814 	/* We do not support interrupt threads using the stream head to send */
2815 	ASSERT(cr != NULL);
2816 
2817 	*mpp = NULL;
2818 	/*
2819 	 * Create control part of message, if any.
2820 	 */
2821 	if ((mctl != NULL) && (mctl->len >= 0)) {
2822 		caddr_t base;
2823 		int ctlcount;
2824 		int allocsz;
2825 
2826 		if (flag & RS_HIPRI)
2827 			msgtype = M_PCPROTO;
2828 		else
2829 			msgtype = M_PROTO;
2830 
2831 		ctlcount = mctl->len;
2832 		base = mctl->buf;
2833 
2834 		/*
2835 		 * Give modules a better chance to reuse M_PROTO/M_PCPROTO
2836 		 * blocks by increasing the size to something more usable.
2837 		 */
2838 		allocsz = MAX(ctlcount, 64);
2839 
2840 		/*
2841 		 * Range checking has already been done; simply try
2842 		 * to allocate a message block for the ctl part.
2843 		 */
2844 		while ((bp = allocb_cred(allocsz, cr,
2845 		    curproc->p_pid)) == NULL) {
2846 			if (fflag & (FNDELAY|FNONBLOCK))
2847 				return (EAGAIN);
2848 			if (error = strwaitbuf(allocsz, BPRI_MED))
2849 				return (error);
2850 		}
2851 
2852 		bp->b_datap->db_type = msgtype;
2853 		if (copyin(base, bp->b_wptr, ctlcount)) {
2854 			freeb(bp);
2855 			return (EFAULT);
2856 		}
2857 		bp->b_wptr += ctlcount;
2858 	}
2859 	*mpp = bp;
2860 	return (0);
2861 }
2862 
2863 /*
2864  * Make a protocol message given data buffers.
2865  * n.b., this can block; be careful of what locks you hold when calling it.
2866  *
2867  * If sd_maxblk is less than *iosize this routine can fail part way through
2868  * (due to an allocation failure). In this case on return *iosize will contain
2869  * the amount that was consumed. Otherwise *iosize will not be modified
2870  * i.e. it will contain the amount that was consumed.
2871  */
2872 int
strmakedata(ssize_t * iosize,struct uio * uiop,stdata_t * stp,int32_t flag,mblk_t ** mpp)2873 strmakedata(
2874 	ssize_t   *iosize,
2875 	struct uio *uiop,
2876 	stdata_t *stp,
2877 	int32_t flag,
2878 	mblk_t **mpp)
2879 {
2880 	mblk_t *mp = NULL;
2881 	mblk_t *bp;
2882 	int wroff = (int)stp->sd_wroff;
2883 	int tail_len = (int)stp->sd_tail;
2884 	int extra = wroff + tail_len;
2885 	int error = 0;
2886 	ssize_t maxblk;
2887 	ssize_t count = *iosize;
2888 	cred_t *cr;
2889 
2890 	*mpp = NULL;
2891 	if (count < 0)
2892 		return (0);
2893 
2894 	/* We do not support interrupt threads using the stream head to send */
2895 	cr = CRED();
2896 	ASSERT(cr != NULL);
2897 
2898 	maxblk = stp->sd_maxblk;
2899 	if (maxblk == INFPSZ)
2900 		maxblk = count;
2901 
2902 	/*
2903 	 * Create data part of message, if any.
2904 	 */
2905 	do {
2906 		ssize_t size;
2907 		dblk_t  *dp;
2908 
2909 		ASSERT(uiop);
2910 
2911 		size = MIN(count, maxblk);
2912 
2913 		while ((bp = allocb_cred(size + extra, cr,
2914 		    curproc->p_pid)) == NULL) {
2915 			error = EAGAIN;
2916 			if ((uiop->uio_fmode & (FNDELAY|FNONBLOCK)) ||
2917 			    (error = strwaitbuf(size + extra, BPRI_MED)) != 0) {
2918 				if (count == *iosize) {
2919 					freemsg(mp);
2920 					return (error);
2921 				} else {
2922 					*iosize -= count;
2923 					*mpp = mp;
2924 					return (0);
2925 				}
2926 			}
2927 		}
2928 		dp = bp->b_datap;
2929 		dp->db_cpid = curproc->p_pid;
2930 		ASSERT(wroff <= dp->db_lim - bp->b_wptr);
2931 		bp->b_wptr = bp->b_rptr = bp->b_rptr + wroff;
2932 
2933 		if (flag & STRUIO_POSTPONE) {
2934 			/*
2935 			 * Setup the stream uio portion of the
2936 			 * dblk for subsequent use by struioget().
2937 			 */
2938 			dp->db_struioflag = STRUIO_SPEC;
2939 			dp->db_cksumstart = 0;
2940 			dp->db_cksumstuff = 0;
2941 			dp->db_cksumend = size;
2942 			*(long long *)dp->db_struioun.data = 0ll;
2943 			bp->b_wptr += size;
2944 		} else {
2945 			if (stp->sd_copyflag & STRCOPYCACHED)
2946 				uiop->uio_extflg |= UIO_COPY_CACHED;
2947 
2948 			if (size != 0) {
2949 				error = uiomove(bp->b_wptr, size, UIO_WRITE,
2950 				    uiop);
2951 				if (error != 0) {
2952 					freeb(bp);
2953 					freemsg(mp);
2954 					return (error);
2955 				}
2956 			}
2957 			bp->b_wptr += size;
2958 
2959 			if (stp->sd_wputdatafunc != NULL) {
2960 				mblk_t *newbp;
2961 
2962 				newbp = (stp->sd_wputdatafunc)(stp->sd_vnode,
2963 				    bp, NULL, NULL, NULL, NULL);
2964 				if (newbp == NULL) {
2965 					freeb(bp);
2966 					freemsg(mp);
2967 					return (ECOMM);
2968 				}
2969 				bp = newbp;
2970 			}
2971 		}
2972 
2973 		count -= size;
2974 
2975 		if (mp == NULL)
2976 			mp = bp;
2977 		else
2978 			linkb(mp, bp);
2979 	} while (count > 0);
2980 
2981 	*mpp = mp;
2982 	return (0);
2983 }
2984 
2985 /*
2986  * Wait for a buffer to become available. Return non-zero errno
2987  * if not able to wait, 0 if buffer is probably there.
2988  */
2989 int
strwaitbuf(size_t size,int pri)2990 strwaitbuf(size_t size, int pri)
2991 {
2992 	bufcall_id_t id;
2993 
2994 	mutex_enter(&bcall_monitor);
2995 	if ((id = bufcall(size, pri, (void (*)(void *))cv_broadcast,
2996 	    &ttoproc(curthread)->p_flag_cv)) == 0) {
2997 		mutex_exit(&bcall_monitor);
2998 		return (ENOSR);
2999 	}
3000 	if (!cv_wait_sig(&(ttoproc(curthread)->p_flag_cv), &bcall_monitor)) {
3001 		unbufcall(id);
3002 		mutex_exit(&bcall_monitor);
3003 		return (EINTR);
3004 	}
3005 	unbufcall(id);
3006 	mutex_exit(&bcall_monitor);
3007 	return (0);
3008 }
3009 
3010 /*
3011  * This function waits for a read or write event to happen on a stream.
3012  * fmode can specify FNDELAY and/or FNONBLOCK.
3013  * The timeout is in ms with -1 meaning infinite.
3014  * The flag values work as follows:
3015  *	READWAIT	Check for read side errors, send M_READ
3016  *	GETWAIT		Check for read side errors, no M_READ
3017  *	WRITEWAIT	Check for write side errors.
3018  *	NOINTR		Do not return error if nonblocking or timeout.
3019  *	STR_NOERROR	Ignore all errors except STPLEX.
3020  *	STR_NOSIG	Ignore/hold signals during the duration of the call.
3021  *	STR_PEEK	Pass through the strgeterr().
3022  */
3023 int
strwaitq(stdata_t * stp,int flag,ssize_t count,int fmode,clock_t timout,int * done)3024 strwaitq(stdata_t *stp, int flag, ssize_t count, int fmode, clock_t timout,
3025     int *done)
3026 {
3027 	int slpflg, errs;
3028 	int error;
3029 	kcondvar_t *sleepon;
3030 	mblk_t *mp;
3031 	ssize_t *rd_count;
3032 	clock_t rval;
3033 
3034 	ASSERT(MUTEX_HELD(&stp->sd_lock));
3035 	if ((flag & READWAIT) || (flag & GETWAIT)) {
3036 		slpflg = RSLEEP;
3037 		sleepon = &_RD(stp->sd_wrq)->q_wait;
3038 		errs = STRDERR|STPLEX;
3039 	} else {
3040 		slpflg = WSLEEP;
3041 		sleepon = &stp->sd_wrq->q_wait;
3042 		errs = STWRERR|STRHUP|STPLEX;
3043 	}
3044 	if (flag & STR_NOERROR)
3045 		errs = STPLEX;
3046 
3047 	if (stp->sd_wakeq & slpflg) {
3048 		/*
3049 		 * A strwakeq() is pending, no need to sleep.
3050 		 */
3051 		stp->sd_wakeq &= ~slpflg;
3052 		*done = 0;
3053 		return (0);
3054 	}
3055 
3056 	if (stp->sd_flag & errs) {
3057 		/*
3058 		 * Check for errors before going to sleep since the
3059 		 * caller might not have checked this while holding
3060 		 * sd_lock.
3061 		 */
3062 		error = strgeterr(stp, errs, (flag & STR_PEEK));
3063 		if (error != 0) {
3064 			*done = 1;
3065 			return (error);
3066 		}
3067 	}
3068 
3069 	/*
3070 	 * If any module downstream has requested read notification
3071 	 * by setting SNDMREAD flag using M_SETOPTS, send a message
3072 	 * down stream.
3073 	 */
3074 	if ((flag & READWAIT) && (stp->sd_flag & SNDMREAD)) {
3075 		mutex_exit(&stp->sd_lock);
3076 		if (!(mp = allocb_wait(sizeof (ssize_t), BPRI_MED,
3077 		    (flag & STR_NOSIG), &error))) {
3078 			mutex_enter(&stp->sd_lock);
3079 			*done = 1;
3080 			return (error);
3081 		}
3082 		mp->b_datap->db_type = M_READ;
3083 		rd_count = (ssize_t *)mp->b_wptr;
3084 		*rd_count = count;
3085 		mp->b_wptr += sizeof (ssize_t);
3086 		/*
3087 		 * Send the number of bytes requested by the
3088 		 * read as the argument to M_READ.
3089 		 */
3090 		stream_willservice(stp);
3091 		putnext(stp->sd_wrq, mp);
3092 		stream_runservice(stp);
3093 		mutex_enter(&stp->sd_lock);
3094 
3095 		/*
3096 		 * If any data arrived due to inline processing
3097 		 * of putnext(), don't sleep.
3098 		 */
3099 		if (_RD(stp->sd_wrq)->q_first != NULL) {
3100 			*done = 0;
3101 			return (0);
3102 		}
3103 	}
3104 
3105 	if (fmode & (FNDELAY|FNONBLOCK)) {
3106 		if (!(flag & NOINTR))
3107 			error = EAGAIN;
3108 		else
3109 			error = 0;
3110 		*done = 1;
3111 		return (error);
3112 	}
3113 
3114 	stp->sd_flag |= slpflg;
3115 	TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_WAIT2,
3116 	    "strwaitq sleeps (2):%p, %X, %lX, %X, %p",
3117 	    stp, flag, count, fmode, done);
3118 
3119 	rval = str_cv_wait(sleepon, &stp->sd_lock, timout, flag & STR_NOSIG);
3120 	if (rval > 0) {
3121 		/* EMPTY */
3122 		TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_WAKE2,
3123 		    "strwaitq awakes(2):%X, %X, %X, %X, %X",
3124 		    stp, flag, count, fmode, done);
3125 	} else if (rval == 0) {
3126 		TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_INTR2,
3127 		    "strwaitq interrupt #2:%p, %X, %lX, %X, %p",
3128 		    stp, flag, count, fmode, done);
3129 		stp->sd_flag &= ~slpflg;
3130 		cv_broadcast(sleepon);
3131 		if (!(flag & NOINTR))
3132 			error = EINTR;
3133 		else
3134 			error = 0;
3135 		*done = 1;
3136 		return (error);
3137 	} else {
3138 		/* timeout */
3139 		TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_TIME,
3140 		    "strwaitq timeout:%p, %X, %lX, %X, %p",
3141 		    stp, flag, count, fmode, done);
3142 		*done = 1;
3143 		if (!(flag & NOINTR))
3144 			return (ETIME);
3145 		else
3146 			return (0);
3147 	}
3148 	/*
3149 	 * If the caller implements delayed errors (i.e. queued after data)
3150 	 * we can not check for errors here since data as well as an
3151 	 * error might have arrived at the stream head. We return to
3152 	 * have the caller check the read queue before checking for errors.
3153 	 */
3154 	if ((stp->sd_flag & errs) && !(flag & STR_DELAYERR)) {
3155 		error = strgeterr(stp, errs, (flag & STR_PEEK));
3156 		if (error != 0) {
3157 			*done = 1;
3158 			return (error);
3159 		}
3160 	}
3161 	*done = 0;
3162 	return (0);
3163 }
3164 
3165 /*
3166  * Perform job control discipline access checks.
3167  * Return 0 for success and the errno for failure.
3168  */
3169 
3170 #define	cantsend(p, t, sig) \
3171 	(sigismember(&(p)->p_ignore, sig) || signal_is_blocked((t), sig))
3172 
3173 int
straccess(struct stdata * stp,enum jcaccess mode)3174 straccess(struct stdata *stp, enum jcaccess mode)
3175 {
3176 	extern kcondvar_t lbolt_cv;	/* XXX: should be in a header file */
3177 	kthread_t *t = curthread;
3178 	proc_t *p = ttoproc(t);
3179 	sess_t *sp;
3180 
3181 	ASSERT(mutex_owned(&stp->sd_lock));
3182 
3183 	if (stp->sd_sidp == NULL || stp->sd_vnode->v_type == VFIFO)
3184 		return (0);
3185 
3186 	mutex_enter(&p->p_lock);		/* protects p_pgidp */
3187 
3188 	for (;;) {
3189 		mutex_enter(&p->p_splock);	/* protects p->p_sessp */
3190 		sp = p->p_sessp;
3191 		mutex_enter(&sp->s_lock);	/* protects sp->* */
3192 
3193 		/*
3194 		 * If this is not the calling process's controlling terminal
3195 		 * or if the calling process is already in the foreground
3196 		 * then allow access.
3197 		 */
3198 		if (sp->s_dev != stp->sd_vnode->v_rdev ||
3199 		    p->p_pgidp == stp->sd_pgidp) {
3200 			mutex_exit(&sp->s_lock);
3201 			mutex_exit(&p->p_splock);
3202 			mutex_exit(&p->p_lock);
3203 			return (0);
3204 		}
3205 
3206 		/*
3207 		 * Check to see if controlling terminal has been deallocated.
3208 		 */
3209 		if (sp->s_vp == NULL) {
3210 			if (!cantsend(p, t, SIGHUP))
3211 				sigtoproc(p, t, SIGHUP);
3212 			mutex_exit(&sp->s_lock);
3213 			mutex_exit(&p->p_splock);
3214 			mutex_exit(&p->p_lock);
3215 			return (EIO);
3216 		}
3217 
3218 		mutex_exit(&