xref: /illumos-gate/usr/src/lib/libc/port/rt/mqueue.c (revision e8031f0a8ed0e45c6d8847c5e09424e66fd34a4b)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 
23 /*
24  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
25  * Use is subject to license terms.
26  */
27 
28 #pragma ident	"%Z%%M%	%I%	%E% SMI"
29 
30 #pragma weak mq_open = _mq_open
31 #pragma weak mq_close = _mq_close
32 #pragma weak mq_unlink = _mq_unlink
33 #pragma weak mq_send = _mq_send
34 #pragma weak mq_timedsend = _mq_timedsend
35 #pragma weak mq_reltimedsend_np = _mq_reltimedsend_np
36 #pragma weak mq_receive = _mq_receive
37 #pragma weak mq_timedreceive = _mq_timedreceive
38 #pragma weak mq_reltimedreceive_np = _mq_reltimedreceive_np
39 #pragma weak mq_notify = _mq_notify
40 #pragma weak mq_setattr = _mq_setattr
41 #pragma weak mq_getattr = _mq_getattr
42 
43 #include "c_synonyms.h"
44 #define	_KMEMUSER
45 #include <sys/param.h>		/* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
46 #undef	_KMEMUSER
47 #include <mqueue.h>
48 #include <sys/types.h>
49 #include <sys/file.h>
50 #include <sys/mman.h>
51 #include <errno.h>
52 #include <stdarg.h>
53 #include <limits.h>
54 #include <pthread.h>
55 #include <assert.h>
56 #include <string.h>
57 #include <unistd.h>
58 #include <stdlib.h>
59 #include <sys/stat.h>
60 #include <inttypes.h>
61 
62 #include "mqlib.h"
63 #include "pos4obj.h"
64 #include "pos4.h"
65 
66 /*
67  * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
68  * If this assumption is somehow invalidated, mq_open() needs to be changed
69  * back to the old version which kept a count and enforced a limit.
70  * We make sure that this is pointed out to those changing <sys/param.h>
71  * by checking _MQ_OPEN_MAX at compile time.
72  */
73 #if _MQ_OPEN_MAX != -1
74 #error "librt:mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
75 #endif
76 
77 #define	MQ_ALIGNSIZE	8	/* 64-bit alignment */
78 
79 #ifdef DEBUG
80 #define	MQ_ASSERT(x) \
81 	assert(x);
82 
83 #define	MQ_ASSERT_PTR(_m, _p) \
84 	assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
85 	    !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
86 	    _m->mq_totsize));
87 
88 #define	MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
89 	int _val; \
90 	(void) sem_getvalue((sem), &_val); \
91 	assert((_val) <= val); }
92 #else
93 #define	MQ_ASSERT(x)
94 #define	MQ_ASSERT_PTR(_m, _p)
95 #define	MQ_ASSERT_SEMVAL_LEQ(sem, val)
96 #endif
97 
98 #define	MQ_PTR(m, n)	((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
99 #define	HEAD_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
100 			(uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
101 #define	TAIL_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
102 			(uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
103 
104 #define	MQ_RESERVED	((mqdes_t *)-1)
105 
106 #define	ABS_TIME	0
107 #define	REL_TIME	1
108 
109 static int
110 mq_is_valid(mqdes_t *mqdp)
111 {
112 	/*
113 	 * Any use of a message queue after it was closed is
114 	 * undefined.  But the standard strongly favours EBADF
115 	 * returns.  Before we dereference which could be fatal,
116 	 * we first do some pointer sanity checks.
117 	 */
118 	if (mqdp != NULL && mqdp != MQ_RESERVED &&
119 	    ((uintptr_t)mqdp & 0x7) == 0) {
120 		return (mqdp->mqd_magic == MQ_MAGIC);
121 	}
122 
123 	return (0);
124 }
125 
126 static void
127 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
128 {
129 	int		i;
130 	uint64_t	temp;
131 	uint64_t	currentp;
132 	uint64_t	nextp;
133 
134 	/*
135 	 * We only need to initialize the non-zero fields.  The use of
136 	 * ftruncate() on the message queue file assures that the
137 	 * pages will be zfod.
138 	 */
139 	(void) sem_init(&mqhp->mq_exclusive, 1, 1);
140 	(void) sem_init(&mqhp->mq_rblocked, 1, 0);
141 	(void) sem_init(&mqhp->mq_notempty, 1, 0);
142 	(void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
143 
144 	mqhp->mq_maxsz = msgsize;
145 	mqhp->mq_maxmsg = maxmsg;
146 
147 	/*
148 	 * As of this writing (1997), there are 32 message queue priorities.
149 	 * If this is to change, then the size of the mq_mask will also
150 	 * have to change.  If NDEBUG isn't defined, assert that
151 	 * _MQ_PRIO_MAX hasn't changed.
152 	 */
153 	mqhp->mq_maxprio = _MQ_PRIO_MAX;
154 	MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
155 
156 	mqhp->mq_magic = MQ_MAGIC;
157 
158 	/*
159 	 * Since the message queue can be mapped into different
160 	 * virtual address ranges by different processes, we don't
161 	 * keep track of pointers, only offsets into the shared region.
162 	 */
163 	mqhp->mq_headpp = sizeof (mqhdr_t);
164 	mqhp->mq_tailpp = mqhp->mq_headpp +
165 		mqhp->mq_maxprio * sizeof (uint64_t);
166 	mqhp->mq_freep = mqhp->mq_tailpp +
167 		mqhp->mq_maxprio * sizeof (uint64_t);
168 
169 	currentp = mqhp->mq_freep;
170 	MQ_PTR(mqhp, currentp)->msg_next = 0;
171 
172 	temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
173 	for (i = 1; i < mqhp->mq_maxmsg; i++) {
174 		nextp = currentp + sizeof (msghdr_t) + temp;
175 		MQ_PTR(mqhp, currentp)->msg_next = nextp;
176 		MQ_PTR(mqhp, nextp)->msg_next = 0;
177 		currentp = nextp;
178 	}
179 }
180 
181 static size_t
182 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
183 {
184 	uint64_t currentp;
185 	msghdr_t *curbuf;
186 	uint64_t *headpp;
187 	uint64_t *tailpp;
188 
189 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_exclusive, 0);
190 
191 	/*
192 	 * Get the head and tail pointers for the queue of maximum
193 	 * priority.  We shouldn't be here unless there is a message for
194 	 * us, so it's fair to assert that both the head and tail
195 	 * pointers are non-NULL.
196 	 */
197 	headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
198 	tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
199 
200 	if (msg_prio != NULL)
201 		*msg_prio = mqhp->mq_curmaxprio;
202 
203 	currentp = *headpp;
204 	MQ_ASSERT_PTR(mqhp, currentp);
205 	curbuf = MQ_PTR(mqhp, currentp);
206 
207 	if ((*headpp = curbuf->msg_next) == NULL) {
208 		/*
209 		 * We just nuked the last message in this priority's queue.
210 		 * Twiddle this priority's bit, and then find the next bit
211 		 * tipped.
212 		 */
213 		uint_t prio = mqhp->mq_curmaxprio;
214 
215 		mqhp->mq_mask &= ~(1u << prio);
216 
217 		for (; prio != 0; prio--)
218 			if (mqhp->mq_mask & (1u << prio))
219 				break;
220 		mqhp->mq_curmaxprio = prio;
221 
222 		*tailpp = NULL;
223 	}
224 
225 	/*
226 	 * Copy the message, and put the buffer back on the free list.
227 	 */
228 	(void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
229 	curbuf->msg_next = mqhp->mq_freep;
230 	mqhp->mq_freep = currentp;
231 
232 	return (curbuf->msg_len);
233 }
234 
235 
236 static void
237 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
238 {
239 	uint64_t currentp;
240 	msghdr_t *curbuf;
241 	uint64_t *headpp;
242 	uint64_t *tailpp;
243 
244 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_exclusive, 0);
245 
246 	/*
247 	 * Grab a free message block, and link it in.  We shouldn't
248 	 * be here unless there is room in the queue for us;  it's
249 	 * fair to assert that the free pointer is non-NULL.
250 	 */
251 	currentp = mqhp->mq_freep;
252 	MQ_ASSERT_PTR(mqhp, currentp);
253 	curbuf = MQ_PTR(mqhp, currentp);
254 
255 	/*
256 	 * Remove a message from the free list, and copy in the new contents.
257 	 */
258 	mqhp->mq_freep = curbuf->msg_next;
259 	curbuf->msg_next = NULL;
260 	(void) memcpy((char *)&curbuf[1], msgp, len);
261 	curbuf->msg_len = len;
262 
263 	headpp = HEAD_PTR(mqhp, prio);
264 	tailpp = TAIL_PTR(mqhp, prio);
265 
266 	if (*tailpp == 0) {
267 		/*
268 		 * This is the first message on this queue.  Set the
269 		 * head and tail pointers, and tip the appropriate bit
270 		 * in the priority mask.
271 		 */
272 		*headpp = currentp;
273 		*tailpp = currentp;
274 		mqhp->mq_mask |= (1u << prio);
275 		if (prio > mqhp->mq_curmaxprio)
276 			mqhp->mq_curmaxprio = prio;
277 	} else {
278 		MQ_ASSERT_PTR(mqhp, *tailpp);
279 		MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
280 		*tailpp = currentp;
281 	}
282 }
283 
284 mqd_t
285 _mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
286 {
287 	va_list		ap;
288 	mode_t		mode;
289 	struct mq_attr	*attr;
290 	int		fd;
291 	int		err;
292 	int		cr_flag = 0;
293 	int		locked = 0;
294 	uint64_t	total_size;
295 	size_t		msgsize;
296 	ssize_t		maxmsg;
297 	uint64_t	temp;
298 	void		*ptr;
299 	mqdes_t		*mqdp;
300 	mqhdr_t		*mqhp;
301 	struct mq_dn	*mqdnp;
302 
303 	if (__pos4obj_check(path) == -1)
304 		return ((mqd_t)-1);
305 
306 	/* acquire MSGQ lock to have atomic operation */
307 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
308 		goto out;
309 	locked = 1;
310 
311 	va_start(ap, oflag);
312 	/* filter oflag to have READ/WRITE/CREATE modes only */
313 	oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
314 	if ((oflag & O_CREAT) != 0) {
315 		mode = va_arg(ap, mode_t);
316 		attr = va_arg(ap, struct mq_attr *);
317 	}
318 	va_end(ap);
319 
320 	if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
321 	    mode, &cr_flag)) < 0)
322 		goto out;
323 
324 	/* closing permission file */
325 	(void) __close_nc(fd);
326 
327 	/* Try to open/create data file */
328 	if (cr_flag) {
329 		cr_flag = PFILE_CREATE;
330 		if (attr == NULL) {
331 			maxmsg = MQ_MAXMSG;
332 			msgsize = MQ_MAXSIZE;
333 		} else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
334 			errno = EINVAL;
335 			goto out;
336 		} else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
337 			errno = ENOSPC;
338 			goto out;
339 		} else {
340 			maxmsg = attr->mq_maxmsg;
341 			msgsize = attr->mq_msgsize;
342 		}
343 
344 		/* adjust for message size at word boundary */
345 		temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
346 
347 		total_size = sizeof (mqhdr_t) +
348 			maxmsg * (temp + sizeof (msghdr_t)) +
349 			2 * _MQ_PRIO_MAX * sizeof (uint64_t);
350 
351 		if (total_size > SSIZE_MAX) {
352 			errno = ENOSPC;
353 			goto out;
354 		}
355 
356 		/*
357 		 * data file is opened with read/write to those
358 		 * who have read or write permission
359 		 */
360 		mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
361 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
362 		    (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
363 			goto out;
364 
365 		cr_flag |= DFILE_CREATE | DFILE_OPEN;
366 
367 		/* force permissions to avoid umask effect */
368 		if (fchmod(fd, mode) < 0)
369 			goto out;
370 
371 		if (ftruncate64(fd, (off64_t)total_size) < 0)
372 			goto out;
373 	} else {
374 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
375 		    O_RDWR, 0666, &err)) < 0)
376 			goto out;
377 		cr_flag = DFILE_OPEN;
378 
379 		/* Message queue has not been initialized yet */
380 		if (read(fd, &total_size, sizeof (total_size)) !=
381 		    sizeof (total_size) || total_size == 0) {
382 			errno = ENOENT;
383 			goto out;
384 		}
385 
386 		/* Message queue too big for this process to handle */
387 		if (total_size > SSIZE_MAX) {
388 			errno = EFBIG;
389 			goto out;
390 		}
391 	}
392 
393 	if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
394 		errno = ENOMEM;
395 		goto out;
396 	}
397 	cr_flag |= ALLOC_MEM;
398 
399 	if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
400 	    MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
401 		goto out;
402 	mqhp = ptr;
403 	cr_flag |= DFILE_MMAP;
404 
405 	/* closing data file */
406 	(void) __close_nc(fd);
407 	cr_flag &= ~DFILE_OPEN;
408 
409 	/*
410 	 * create, unlink, size, mmap, and close description file
411 	 * all for a flag word in anonymous shared memory
412 	 */
413 	if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
414 	    0666, &err)) < 0)
415 		goto out;
416 	cr_flag |= DFILE_OPEN;
417 	(void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
418 	if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
419 		goto out;
420 
421 	if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
422 	    PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
423 		goto out;
424 	mqdnp = ptr;
425 	cr_flag |= MQDNP_MMAP;
426 
427 	(void) __close_nc(fd);
428 	cr_flag &= ~DFILE_OPEN;
429 
430 	/*
431 	 * we follow the same strategy as filesystem open() routine,
432 	 * where fcntl.h flags are changed to flags defined in file.h.
433 	 */
434 	mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
435 	mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
436 
437 	/* new message queue requires initialization */
438 	if ((cr_flag & DFILE_CREATE) != 0) {
439 		/* message queue header has to be initialized */
440 		mq_init(mqhp, msgsize, maxmsg);
441 		mqhp->mq_totsize = total_size;
442 	}
443 	mqdp->mqd_mq = mqhp;
444 	mqdp->mqd_mqdn = mqdnp;
445 	mqdp->mqd_magic = MQ_MAGIC;
446 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0)
447 		return ((mqd_t)mqdp);
448 
449 	locked = 0;	/* fall into the error case */
450 out:
451 	err = errno;
452 	if ((cr_flag & DFILE_OPEN) != 0)
453 		(void) __close_nc(fd);
454 	if ((cr_flag & DFILE_CREATE) != 0)
455 		(void) __pos4obj_unlink(path, MQ_DATA_TYPE);
456 	if ((cr_flag & PFILE_CREATE) != 0)
457 		(void) __pos4obj_unlink(path, MQ_PERM_TYPE);
458 	if ((cr_flag & ALLOC_MEM) != 0)
459 		free((void *)mqdp);
460 	if ((cr_flag & DFILE_MMAP) != 0)
461 		(void) munmap((caddr_t)mqhp, (size_t)total_size);
462 	if ((cr_flag & MQDNP_MMAP) != 0)
463 		(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
464 	if (locked)
465 		(void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
466 	errno = err;
467 	return ((mqd_t)-1);
468 }
469 
470 int
471 _mq_close(mqd_t mqdes)
472 {
473 	mqdes_t *mqdp = (mqdes_t *)mqdes;
474 	mqhdr_t *mqhp;
475 	struct mq_dn *mqdnp;
476 	int canstate;
477 
478 	if (!mq_is_valid(mqdp)) {
479 		errno = EBADF;
480 		return (-1);
481 	}
482 
483 	mqhp = mqdp->mqd_mq;
484 	mqdnp = mqdp->mqd_mqdn;
485 
486 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &canstate);
487 	while (sem_wait(&mqhp->mq_exclusive) == -1 && errno == EINTR)
488 		continue;
489 	(void) pthread_setcancelstate(canstate, NULL);
490 
491 	if (mqhp->mq_des == (uintptr_t)mqdp &&
492 	    mqhp->mq_sigid.sn_pid == getpid()) {
493 		/* Notification is set for this descriptor, remove it */
494 		(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
495 		mqhp->mq_sigid.sn_pid = 0;
496 		mqhp->mq_des = 0;
497 	}
498 	(void) sem_post(&mqhp->mq_exclusive);
499 
500 	/* Invalidate the descriptor before freeing it */
501 	mqdp->mqd_magic = 0;
502 	free(mqdp);
503 
504 	(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
505 	return (munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize));
506 }
507 
508 int
509 _mq_unlink(const char *path)
510 {
511 	int err;
512 
513 	if (__pos4obj_check(path) < 0)
514 		return (-1);
515 
516 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
517 		return (-1);
518 	}
519 
520 	err = __pos4obj_unlink(path, MQ_PERM_TYPE);
521 
522 	if (err == 0 || (err == -1 && errno == EEXIST)) {
523 		errno = 0;
524 		err = __pos4obj_unlink(path, MQ_DATA_TYPE);
525 	}
526 
527 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
528 		return (-1);
529 
530 	return (err);
531 
532 }
533 
534 static int
535 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
536 	uint_t msg_prio, const timespec_t *timeout, int abs_rel)
537 {
538 	mqdes_t *mqdp = (mqdes_t *)mqdes;
539 	mqhdr_t *mqhp;
540 	int err;
541 	int canstate;
542 	int notify = 0;
543 
544 	/*
545 	 * sem_*wait() does cancellation, if called.
546 	 * pthread_testcancel() ensures that cancellation takes place if
547 	 * there is a cancellation pending when mq_*send() is called.
548 	 */
549 	pthread_testcancel();
550 
551 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
552 		errno = EBADF;
553 		return (-1);
554 	}
555 
556 	mqhp = mqdp->mqd_mq;
557 
558 	if (msg_prio >= mqhp->mq_maxprio) {
559 		errno = EINVAL;
560 		return (-1);
561 	}
562 	if (msg_len > mqhp->mq_maxsz) {
563 		errno = EMSGSIZE;
564 		return (-1);
565 	}
566 
567 	if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0)
568 		err = sem_trywait(&mqhp->mq_notfull);
569 	else {
570 		/*
571 		 * We might get cancelled here...
572 		 */
573 		if (timeout == NULL)
574 			err = sem_wait(&mqhp->mq_notfull);
575 		else if (abs_rel == ABS_TIME)
576 			err = sem_timedwait(&mqhp->mq_notfull, timeout);
577 		else
578 			err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
579 	}
580 	if (err == -1) {
581 		/*
582 		 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
583 		 * by sem_*wait(), so we can just return.
584 		 */
585 		return (-1);
586 	}
587 
588 	/*
589 	 * By the time we're here, we know that we've got the capacity
590 	 * to add to the queue...now acquire the exclusive lock.
591 	 */
592 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &canstate);
593 	err = sem_wait(&mqhp->mq_exclusive);
594 	(void) pthread_setcancelstate(canstate, NULL);
595 	if (err == -1) {
596 		/*
597 		 * We must have been interrupted by a signal.
598 		 * Post on mq_notfull so someone else can take our slot.
599 		 */
600 		(void) sem_post(&mqhp->mq_notfull);
601 		return (-1);
602 	}
603 
604 	/*
605 	 * Now determine if we want to kick the notification.  POSIX
606 	 * requires that if a process has registered for notification,
607 	 * we must kick it when the queue makes an empty to non-empty
608 	 * transition, and there are no blocked receivers.  Note that
609 	 * this mechanism does _not_ guarantee that the kicked process
610 	 * will be able to receive a message without blocking;  another
611 	 * receiver could intervene in the meantime.  Thus,
612 	 * the notification mechanism is inherently racy;  all we can
613 	 * do is hope to minimize the window as much as possible.  In
614 	 * general, we want to avoid kicking the notification when
615 	 * there are clearly receivers blocked.  We'll determine if we
616 	 * want to kick the notification before the mq_putmsg(), but the
617 	 * actual signotify() won't be done until the message is on
618 	 * the queue.
619 	 */
620 	if (mqhp->mq_sigid.sn_pid != 0) {
621 		int nmessages, nblocked;
622 		(void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
623 		(void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
624 
625 		if (nmessages == 0 && nblocked == 0)
626 			notify = 1;
627 	}
628 
629 	mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
630 
631 	/*
632 	 * The ordering here is important.  We want to make sure that
633 	 * one has to have mq_exclusive before being able to kick
634 	 * mq_notempty.
635 	 */
636 	(void) sem_post(&mqhp->mq_notempty);
637 
638 	if (notify) {
639 		(void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
640 		mqhp->mq_sigid.sn_pid = 0;
641 		mqhp->mq_des = 0;
642 	}
643 
644 	(void) sem_post(&mqhp->mq_exclusive);
645 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
646 
647 	return (0);
648 }
649 
650 int
651 _mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
652 {
653 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
654 		NULL, ABS_TIME));
655 }
656 
657 int
658 _mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
659 	uint_t msg_prio, const timespec_t *abs_timeout)
660 {
661 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
662 		abs_timeout, ABS_TIME));
663 }
664 
665 int
666 _mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
667 	uint_t msg_prio, const timespec_t *rel_timeout)
668 {
669 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
670 		rel_timeout, REL_TIME));
671 }
672 
673 static void
674 decrement_rblocked(mqhdr_t *mqhp)
675 {
676 	int canstate;
677 
678 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &canstate);
679 	while (sem_wait(&mqhp->mq_rblocked) == -1)
680 		continue;
681 	(void) pthread_setcancelstate(canstate, NULL);
682 }
683 
684 static ssize_t
685 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
686 	uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
687 {
688 	mqdes_t *mqdp = (mqdes_t *)mqdes;
689 	mqhdr_t *mqhp;
690 	ssize_t	msg_size;
691 	int canstate;
692 	int err;
693 
694 	/*
695 	 * sem_*wait() does cancellation, if called.
696 	 * pthread_testcancel() ensures that cancellation takes place if
697 	 * there is a cancellation pending when mq_*receive() is called.
698 	 */
699 	pthread_testcancel();
700 
701 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
702 		errno = EBADF;
703 		return (ssize_t)(-1);
704 	}
705 
706 	mqhp = mqdp->mqd_mq;
707 
708 	if (msg_len < mqhp->mq_maxsz) {
709 		errno = EMSGSIZE;
710 		return (ssize_t)(-1);
711 	}
712 
713 	/*
714 	 * The semaphoring scheme for mq_[timed]receive is a little hairy
715 	 * thanks to POSIX.1b's arcane notification mechanism.  First,
716 	 * we try to take the common case and do a sem_trywait().
717 	 * If that doesn't work, and O_NONBLOCK hasn't been set,
718 	 * then note that we're going to sleep by incrementing the rblocked
719 	 * semaphore.  We decrement that semaphore after waking up.
720 	 */
721 	if (sem_trywait(&mqhp->mq_notempty) == -1) {
722 		if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
723 			/*
724 			 * errno has been set to EAGAIN or EINTR by
725 			 * sem_trywait(), so we can just return.
726 			 */
727 			return (-1);
728 		}
729 		/*
730 		 * If we're here, then we're probably going to block...
731 		 * increment the rblocked semaphore.  If we get
732 		 * cancelled, decrement_rblocked() will decrement it.
733 		 */
734 		(void) sem_post(&mqhp->mq_rblocked);
735 
736 		pthread_cleanup_push(decrement_rblocked, mqhp);
737 		if (timeout == NULL)
738 			err = sem_wait(&mqhp->mq_notempty);
739 		else if (abs_rel == ABS_TIME)
740 			err = sem_timedwait(&mqhp->mq_notempty, timeout);
741 		else
742 			err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
743 		pthread_cleanup_pop(1);
744 
745 		if (err == -1) {
746 			/*
747 			 * We took a signal or timeout while waiting
748 			 * on mq_notempty...
749 			 */
750 			return (-1);
751 		}
752 	}
753 
754 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &canstate);
755 	err = sem_wait(&mqhp->mq_exclusive);
756 	(void) pthread_setcancelstate(canstate, NULL);
757 	if (err == -1) {
758 		/*
759 		 * We must have been interrupted by a signal.
760 		 * Post on mq_notfull so someone else can take our message.
761 		 */
762 		(void) sem_post(&mqhp->mq_notempty);
763 		return (-1);
764 	}
765 
766 	msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
767 
768 	(void) sem_post(&mqhp->mq_notfull);
769 	(void) sem_post(&mqhp->mq_exclusive);
770 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
771 
772 	return (msg_size);
773 }
774 
775 ssize_t
776 _mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
777 {
778 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
779 		NULL, ABS_TIME));
780 }
781 
782 ssize_t
783 _mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
784 	uint_t *msg_prio, const timespec_t *abs_timeout)
785 {
786 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
787 		abs_timeout, ABS_TIME));
788 }
789 
790 ssize_t
791 _mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
792 	uint_t *msg_prio, const timespec_t *rel_timeout)
793 {
794 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
795 		rel_timeout, REL_TIME));
796 }
797 
798 int
799 _mq_notify(mqd_t mqdes, const struct sigevent *notification)
800 {
801 	mqdes_t *mqdp = (mqdes_t *)mqdes;
802 	mqhdr_t *mqhp;
803 	int canstate;
804 	siginfo_t mq_siginfo;
805 
806 	if (!mq_is_valid(mqdp)) {
807 		errno = EBADF;
808 		return (-1);
809 	}
810 
811 	mqhp = mqdp->mqd_mq;
812 
813 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &canstate);
814 	while (sem_wait(&mqhp->mq_exclusive) == -1 && errno == EINTR)
815 		continue;
816 	(void) pthread_setcancelstate(canstate, NULL);
817 
818 	if (notification == NULL) {
819 		if (mqhp->mq_des == (uintptr_t)mqdp &&
820 		    mqhp->mq_sigid.sn_pid == getpid()) {
821 			/*
822 			 * Remove signotify_id if queue is registered with
823 			 * this process
824 			 */
825 			(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
826 			mqhp->mq_sigid.sn_pid = 0;
827 			mqhp->mq_des = 0;
828 		} else {
829 			/*
830 			 * if registered with another process or mqdes
831 			 */
832 			errno = EBUSY;
833 			goto bad;
834 		}
835 	} else {
836 		/*
837 		 * Register notification with this process.
838 		 */
839 
840 		switch (notification->sigev_notify) {
841 		case SIGEV_NONE:
842 			mq_siginfo.si_signo = 0;
843 			mq_siginfo.si_code = SI_MESGQ;
844 			break;
845 		case SIGEV_SIGNAL:
846 			mq_siginfo.si_signo = notification->sigev_signo;
847 			mq_siginfo.si_value = notification->sigev_value;
848 			mq_siginfo.si_code = SI_MESGQ;
849 			break;
850 		case SIGEV_THREAD:
851 			errno = ENOSYS;
852 			goto bad;
853 		default:
854 			errno = EINVAL;
855 			goto bad;
856 		}
857 
858 		/*
859 		 * Either notification is not present, or if
860 		 * notification is already present, but the process
861 		 * which registered notification does not exist then
862 		 * kernel can register notification for current process.
863 		 */
864 
865 		if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
866 			goto bad;
867 		mqhp->mq_des = (uintptr_t)mqdp;
868 	}
869 
870 	(void) sem_post(&mqhp->mq_exclusive);
871 	return (0);
872 
873 bad:
874 	(void) sem_post(&mqhp->mq_exclusive);
875 	return (-1);
876 }
877 
878 int
879 _mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
880 {
881 	mqdes_t *mqdp = (mqdes_t *)mqdes;
882 	mqhdr_t *mqhp;
883 	uint_t	flag = 0;
884 
885 	if (!mq_is_valid(mqdp)) {
886 		errno = EBADF;
887 		return (-1);
888 	}
889 
890 	/* store current attributes */
891 	if (omqstat != NULL) {
892 		int	count;
893 
894 		mqhp = mqdp->mqd_mq;
895 		omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
896 		omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
897 		omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
898 		(void) sem_getvalue(&mqhp->mq_notempty, &count);
899 		omqstat->mq_curmsgs = count;
900 	}
901 
902 	/* set description attributes */
903 	if ((mqstat->mq_flags & O_NONBLOCK) != 0)
904 		flag = FNONBLOCK;
905 	mqdp->mqd_mqdn->mqdn_flags = flag;
906 
907 	return (0);
908 }
909 
910 int
911 _mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
912 {
913 	mqdes_t *mqdp = (mqdes_t *)mqdes;
914 	mqhdr_t *mqhp;
915 	int count;
916 
917 	if (!mq_is_valid(mqdp)) {
918 		errno = EBADF;
919 		return (-1);
920 	}
921 
922 	mqhp = mqdp->mqd_mq;
923 
924 	mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
925 	mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
926 	mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
927 	(void) sem_getvalue(&mqhp->mq_notempty, &count);
928 	mqstat->mq_curmsgs = count;
929 	return (0);
930 }
931