1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22/*
23 * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24 * Use is subject to license terms.
25 */
26
27#include "lint.h"
28#include "mtlib.h"
29#define	_KMEMUSER
30#include <sys/param.h>		/* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
31#undef	_KMEMUSER
32#include <mqueue.h>
33#include <sys/types.h>
34#include <sys/file.h>
35#include <sys/mman.h>
36#include <errno.h>
37#include <stdarg.h>
38#include <limits.h>
39#include <pthread.h>
40#include <assert.h>
41#include <string.h>
42#include <unistd.h>
43#include <stdlib.h>
44#include <sys/stat.h>
45#include <inttypes.h>
46#include "sigev_thread.h"
47#include "pos4obj.h"
48
49/*
50 * Default values per message queue
51 */
52#define	MQ_MAXMSG	128
53#define	MQ_MAXSIZE	1024
54
55#define	MQ_MAGIC	0x4d534751		/* "MSGQ" */
56
57/*
58 * Message header which is part of messages in link list
59 */
60typedef struct {
61	uint64_t 	msg_next;	/* offset of next message in the link */
62	uint64_t	msg_len;	/* length of the message */
63} msghdr_t;
64
65/*
66 * message queue description
67 */
68struct mq_dn {
69	size_t		mqdn_flags;	/* open description flags */
70};
71
72/*
73 * message queue descriptor structure
74 */
75typedef struct mq_des {
76	struct mq_des	*mqd_next;	/* list of all open mq descriptors, */
77	struct mq_des	*mqd_prev;	/* needed for fork-safety */
78	int		mqd_magic;	/* magic # to identify mq_des */
79	int		mqd_flags;	/* operation flag per open */
80	struct mq_header *mqd_mq;	/* address pointer of message Q */
81	struct mq_dn	*mqd_mqdn;	/* open	description */
82	thread_communication_data_t *mqd_tcd;	/* SIGEV_THREAD notification */
83	int		mqd_ownerdead;	/* mq_exclusive is inconsistent */
84} mqdes_t;
85
86/*
87 * message queue common header, part of the mmap()ed file.
88 * Since message queues may be shared between 32- and 64-bit processes,
89 * care must be taken to make sure that the elements of this structure
90 * are identical for both _LP64 and _ILP32 cases.
91 */
92typedef struct mq_header {
93	/* first field must be mq_totsize, DO NOT insert before this	*/
94	int64_t		mq_totsize;	/* total size of the Queue */
95	int64_t		mq_maxsz;	/* max size of each message */
96	uint32_t	mq_maxmsg;	/* max messages in the queue */
97	uint32_t	mq_maxprio;	/* maximum mqueue priority */
98	uint32_t	mq_curmaxprio;	/* current maximum MQ priority */
99	uint32_t	mq_mask;	/* priority bitmask */
100	uint64_t	mq_freep;	/* free message's head pointer */
101	uint64_t	mq_headpp;	/* pointer to head pointers */
102	uint64_t	mq_tailpp;	/* pointer to tail pointers */
103	signotify_id_t	mq_sigid;	/* notification id (3 int's) */
104	uint32_t	mq_ntype;	/* notification type (SIGEV_*) */
105	uint64_t	mq_des;		/* pointer to msg Q descriptor */
106	mutex_t		mq_exclusive;	/* acquire for exclusive access */
107	sem_t		mq_rblocked;	/* number of processes rblocked */
108	sem_t		mq_notfull;	/* mq_send()'s block on this */
109	sem_t		mq_notempty;	/* mq_receive()'s block on this */
110	sem_t		mq_spawner;	/* spawner thread blocks on this */
111} mqhdr_t;
112
113/*
114 * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
115 * If this assumption is somehow invalidated, mq_open() needs to be changed
116 * back to the old version which kept a count and enforced a limit.
117 * We make sure that this is pointed out to those changing <sys/param.h>
118 * by checking _MQ_OPEN_MAX at compile time.
119 */
120#if _MQ_OPEN_MAX != -1
121#error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
122#endif
123
124#define	MQ_ALIGNSIZE	8	/* 64-bit alignment */
125
126#ifdef DEBUG
127#define	MQ_ASSERT(x)	assert(x);
128
129#define	MQ_ASSERT_PTR(_m, _p) \
130	assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
131	    !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
132	    _m->mq_totsize));
133
134#define	MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
135	int _val; \
136	(void) sem_getvalue((sem), &_val); \
137	assert((_val) <= val); }
138#else
139#define	MQ_ASSERT(x)
140#define	MQ_ASSERT_PTR(_m, _p)
141#define	MQ_ASSERT_SEMVAL_LEQ(sem, val)
142#endif
143
144#define	MQ_PTR(m, n)	((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
145#define	HEAD_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
146			(uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
147#define	TAIL_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
148			(uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
149
150#define	MQ_RESERVED	((mqdes_t *)-1)
151
152#define	ABS_TIME	0
153#define	REL_TIME	1
154
155static mutex_t mq_list_lock = DEFAULTMUTEX;
156static mqdes_t *mq_list = NULL;
157
158extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
159
160static int
161mq_is_valid(mqdes_t *mqdp)
162{
163	/*
164	 * Any use of a message queue after it was closed is
165	 * undefined.  But the standard strongly favours EBADF
166	 * returns.  Before we dereference which could be fatal,
167	 * we first do some pointer sanity checks.
168	 */
169	if (mqdp != NULL && mqdp != MQ_RESERVED &&
170	    ((uintptr_t)mqdp & 0x7) == 0) {
171		return (mqdp->mqd_magic == MQ_MAGIC);
172	}
173
174	return (0);
175}
176
177static void
178mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
179{
180	int		i;
181	uint64_t	temp;
182	uint64_t	currentp;
183	uint64_t	nextp;
184
185	/*
186	 * We only need to initialize the non-zero fields.  The use of
187	 * ftruncate() on the message queue file assures that the
188	 * pages will be zero-filled.
189	 */
190	(void) mutex_init(&mqhp->mq_exclusive,
191	    USYNC_PROCESS | LOCK_ROBUST, NULL);
192	(void) sem_init(&mqhp->mq_rblocked, 1, 0);
193	(void) sem_init(&mqhp->mq_notempty, 1, 0);
194	(void) sem_init(&mqhp->mq_spawner, 1, 0);
195	(void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
196
197	mqhp->mq_maxsz = msgsize;
198	mqhp->mq_maxmsg = maxmsg;
199
200	/*
201	 * As of this writing (1997), there are 32 message queue priorities.
202	 * If this is to change, then the size of the mq_mask will
203	 * also have to change.  If DEBUG is defined, assert that
204	 * _MQ_PRIO_MAX hasn't changed.
205	 */
206	mqhp->mq_maxprio = _MQ_PRIO_MAX;
207#if defined(DEBUG)
208	/* LINTED always true */
209	MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
210#endif
211
212	/*
213	 * Since the message queue can be mapped into different
214	 * virtual address ranges by different processes, we don't
215	 * keep track of pointers, only offsets into the shared region.
216	 */
217	mqhp->mq_headpp = sizeof (mqhdr_t);
218	mqhp->mq_tailpp = mqhp->mq_headpp +
219	    mqhp->mq_maxprio * sizeof (uint64_t);
220	mqhp->mq_freep = mqhp->mq_tailpp +
221	    mqhp->mq_maxprio * sizeof (uint64_t);
222
223	currentp = mqhp->mq_freep;
224	MQ_PTR(mqhp, currentp)->msg_next = 0;
225
226	temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
227	for (i = 1; i < mqhp->mq_maxmsg; i++) {
228		nextp = currentp + sizeof (msghdr_t) + temp;
229		MQ_PTR(mqhp, currentp)->msg_next = nextp;
230		MQ_PTR(mqhp, nextp)->msg_next = 0;
231		currentp = nextp;
232	}
233}
234
235static size_t
236mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
237{
238	uint64_t currentp;
239	msghdr_t *curbuf;
240	uint64_t *headpp;
241	uint64_t *tailpp;
242
243	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
244
245	/*
246	 * Get the head and tail pointers for the queue of maximum
247	 * priority.  We shouldn't be here unless there is a message for
248	 * us, so it's fair to assert that both the head and tail
249	 * pointers are non-NULL.
250	 */
251	headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
252	tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
253
254	if (msg_prio != NULL)
255		*msg_prio = mqhp->mq_curmaxprio;
256
257	currentp = *headpp;
258	MQ_ASSERT_PTR(mqhp, currentp);
259	curbuf = MQ_PTR(mqhp, currentp);
260
261	if ((*headpp = curbuf->msg_next) == 0) {
262		/*
263		 * We just nuked the last message in this priority's queue.
264		 * Twiddle this priority's bit, and then find the next bit
265		 * tipped.
266		 */
267		uint_t prio = mqhp->mq_curmaxprio;
268
269		mqhp->mq_mask &= ~(1u << prio);
270
271		for (; prio != 0; prio--)
272			if (mqhp->mq_mask & (1u << prio))
273				break;
274		mqhp->mq_curmaxprio = prio;
275
276		*tailpp = 0;
277	}
278
279	/*
280	 * Copy the message, and put the buffer back on the free list.
281	 */
282	(void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
283	curbuf->msg_next = mqhp->mq_freep;
284	mqhp->mq_freep = currentp;
285
286	return (curbuf->msg_len);
287}
288
289
290static void
291mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
292{
293	uint64_t currentp;
294	msghdr_t *curbuf;
295	uint64_t *headpp;
296	uint64_t *tailpp;
297
298	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
299
300	/*
301	 * Grab a free message block, and link it in.  We shouldn't
302	 * be here unless there is room in the queue for us;  it's
303	 * fair to assert that the free pointer is non-NULL.
304	 */
305	currentp = mqhp->mq_freep;
306	MQ_ASSERT_PTR(mqhp, currentp);
307	curbuf = MQ_PTR(mqhp, currentp);
308
309	/*
310	 * Remove a message from the free list, and copy in the new contents.
311	 */
312	mqhp->mq_freep = curbuf->msg_next;
313	curbuf->msg_next = 0;
314	(void) memcpy((char *)&curbuf[1], msgp, len);
315	curbuf->msg_len = len;
316
317	headpp = HEAD_PTR(mqhp, prio);
318	tailpp = TAIL_PTR(mqhp, prio);
319
320	if (*tailpp == 0) {
321		/*
322		 * This is the first message on this queue.  Set the
323		 * head and tail pointers, and tip the appropriate bit
324		 * in the priority mask.
325		 */
326		*headpp = currentp;
327		*tailpp = currentp;
328		mqhp->mq_mask |= (1u << prio);
329		if (prio > mqhp->mq_curmaxprio)
330			mqhp->mq_curmaxprio = prio;
331	} else {
332		MQ_ASSERT_PTR(mqhp, *tailpp);
333		MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
334		*tailpp = currentp;
335	}
336}
337
338/*
339 * Send a notification and also delete the registration.
340 */
341static void
342do_notify(mqhdr_t *mqhp)
343{
344	(void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
345	if (mqhp->mq_ntype == SIGEV_THREAD ||
346	    mqhp->mq_ntype == SIGEV_PORT)
347		(void) sem_post(&mqhp->mq_spawner);
348	mqhp->mq_ntype = 0;
349	mqhp->mq_des = 0;
350}
351
352/*
353 * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE.
354 * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that
355 * they fail with errno == EBADMSG.  Trigger any registered notification.
356 */
357static void
358owner_dead(mqdes_t *mqdp, int error)
359{
360	mqhdr_t *mqhp = mqdp->mqd_mq;
361
362	mqdp->mqd_ownerdead = 1;
363	(void) sem_post(&mqhp->mq_notfull);
364	(void) sem_post(&mqhp->mq_notempty);
365	if (error == EOWNERDEAD) {
366		if (mqhp->mq_sigid.sn_pid != 0)
367			do_notify(mqhp);
368		(void) mutex_unlock(&mqhp->mq_exclusive);
369	}
370	errno = EBADMSG;
371}
372
373mqd_t
374mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
375{
376	va_list		ap;
377	mode_t		mode = 0;
378	struct mq_attr	*attr = NULL;
379	int		fd;
380	int		err;
381	int		cr_flag = 0;
382	int		locked = 0;
383	uint64_t	total_size;
384	size_t		msgsize;
385	ssize_t		maxmsg;
386	uint64_t	temp;
387	void		*ptr;
388	mqdes_t		*mqdp;
389	mqhdr_t		*mqhp;
390	struct mq_dn	*mqdnp;
391
392	if (__pos4obj_check(path) == -1)
393		return ((mqd_t)-1);
394
395	/* acquire MSGQ lock to have atomic operation */
396	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
397		goto out;
398	locked = 1;
399
400	va_start(ap, oflag);
401	/* filter oflag to have READ/WRITE/CREATE modes only */
402	oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
403	if ((oflag & O_CREAT) != 0) {
404		mode = va_arg(ap, mode_t);
405		attr = va_arg(ap, struct mq_attr *);
406	}
407	va_end(ap);
408
409	if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
410	    mode, &cr_flag)) < 0)
411		goto out;
412
413	/* closing permission file */
414	(void) __close_nc(fd);
415
416	/* Try to open/create data file */
417	if (cr_flag) {
418		cr_flag = PFILE_CREATE;
419		if (attr == NULL) {
420			maxmsg = MQ_MAXMSG;
421			msgsize = MQ_MAXSIZE;
422		} else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
423			errno = EINVAL;
424			goto out;
425		} else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
426			errno = ENOSPC;
427			goto out;
428		} else {
429			maxmsg = attr->mq_maxmsg;
430			msgsize = attr->mq_msgsize;
431		}
432
433		/* adjust for message size at word boundary */
434		temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
435
436		total_size = sizeof (mqhdr_t) +
437		    maxmsg * (temp + sizeof (msghdr_t)) +
438		    2 * _MQ_PRIO_MAX * sizeof (uint64_t);
439
440		if (total_size > SSIZE_MAX) {
441			errno = ENOSPC;
442			goto out;
443		}
444
445		/*
446		 * data file is opened with read/write to those
447		 * who have read or write permission
448		 */
449		mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
450		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
451		    (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
452			goto out;
453
454		cr_flag |= DFILE_CREATE | DFILE_OPEN;
455
456		/* force permissions to avoid umask effect */
457		if (fchmod(fd, mode) < 0)
458			goto out;
459
460		if (ftruncate64(fd, (off64_t)total_size) < 0)
461			goto out;
462	} else {
463		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
464		    O_RDWR, 0666, &err)) < 0)
465			goto out;
466		cr_flag = DFILE_OPEN;
467
468		/* Message queue has not been initialized yet */
469		if (read(fd, &total_size, sizeof (total_size)) !=
470		    sizeof (total_size) || total_size == 0) {
471			errno = ENOENT;
472			goto out;
473		}
474
475		/* Message queue too big for this process to handle */
476		if (total_size > SSIZE_MAX) {
477			errno = EFBIG;
478			goto out;
479		}
480	}
481
482	if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
483		errno = ENOMEM;
484		goto out;
485	}
486	cr_flag |= ALLOC_MEM;
487
488	if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
489	    MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
490		goto out;
491	mqhp = ptr;
492	cr_flag |= DFILE_MMAP;
493
494	/* closing data file */
495	(void) __close_nc(fd);
496	cr_flag &= ~DFILE_OPEN;
497
498	/*
499	 * create, unlink, size, mmap, and close description file
500	 * all for a flag word in anonymous shared memory
501	 */
502	if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
503	    0666, &err)) < 0)
504		goto out;
505	cr_flag |= DFILE_OPEN;
506	(void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
507	if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
508		goto out;
509
510	if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
511	    PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
512		goto out;
513	mqdnp = ptr;
514	cr_flag |= MQDNP_MMAP;
515
516	(void) __close_nc(fd);
517	cr_flag &= ~DFILE_OPEN;
518
519	/*
520	 * we follow the same strategy as filesystem open() routine,
521	 * where fcntl.h flags are changed to flags defined in file.h.
522	 */
523	mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
524	mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
525
526	/* new message queue requires initialization */
527	if ((cr_flag & DFILE_CREATE) != 0) {
528		/* message queue header has to be initialized */
529		mq_init(mqhp, msgsize, maxmsg);
530		mqhp->mq_totsize = total_size;
531	}
532	mqdp->mqd_mq = mqhp;
533	mqdp->mqd_mqdn = mqdnp;
534	mqdp->mqd_magic = MQ_MAGIC;
535	mqdp->mqd_tcd = NULL;
536	mqdp->mqd_ownerdead = 0;
537	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
538		lmutex_lock(&mq_list_lock);
539		mqdp->mqd_next = mq_list;
540		mqdp->mqd_prev = NULL;
541		if (mq_list)
542			mq_list->mqd_prev = mqdp;
543		mq_list = mqdp;
544		lmutex_unlock(&mq_list_lock);
545		return ((mqd_t)mqdp);
546	}
547
548	locked = 0;	/* fall into the error case */
549out:
550	err = errno;
551	if ((cr_flag & DFILE_OPEN) != 0)
552		(void) __close_nc(fd);
553	if ((cr_flag & DFILE_CREATE) != 0)
554		(void) __pos4obj_unlink(path, MQ_DATA_TYPE);
555	if ((cr_flag & PFILE_CREATE) != 0)
556		(void) __pos4obj_unlink(path, MQ_PERM_TYPE);
557	if ((cr_flag & ALLOC_MEM) != 0)
558		free((void *)mqdp);
559	if ((cr_flag & DFILE_MMAP) != 0)
560		(void) munmap((caddr_t)mqhp, (size_t)total_size);
561	if ((cr_flag & MQDNP_MMAP) != 0)
562		(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
563	if (locked)
564		(void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
565	errno = err;
566	return ((mqd_t)-1);
567}
568
569static void
570mq_close_cleanup(mqdes_t *mqdp)
571{
572	mqhdr_t *mqhp = mqdp->mqd_mq;
573	struct mq_dn *mqdnp = mqdp->mqd_mqdn;
574
575	/* invalidate the descriptor before freeing it */
576	mqdp->mqd_magic = 0;
577	if (!mqdp->mqd_ownerdead)
578		(void) mutex_unlock(&mqhp->mq_exclusive);
579
580	lmutex_lock(&mq_list_lock);
581	if (mqdp->mqd_next)
582		mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
583	if (mqdp->mqd_prev)
584		mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
585	if (mq_list == mqdp)
586		mq_list = mqdp->mqd_next;
587	lmutex_unlock(&mq_list_lock);
588
589	free(mqdp);
590	(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
591	(void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
592}
593
594int
595mq_close(mqd_t mqdes)
596{
597	mqdes_t *mqdp = (mqdes_t *)mqdes;
598	mqhdr_t *mqhp;
599	thread_communication_data_t *tcdp;
600	int error;
601
602	if (!mq_is_valid(mqdp)) {
603		errno = EBADF;
604		return (-1);
605	}
606
607	mqhp = mqdp->mqd_mq;
608	if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
609		mqdp->mqd_ownerdead = 1;
610		if (error == EOWNERDEAD)
611			(void) mutex_unlock(&mqhp->mq_exclusive);
612		/* carry on regardless, without holding mq_exclusive */
613	}
614
615	if (mqhp->mq_des == (uintptr_t)mqdp &&
616	    mqhp->mq_sigid.sn_pid == getpid()) {
617		/* notification is set for this descriptor, remove it */
618		(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
619		mqhp->mq_ntype = 0;
620		mqhp->mq_des = 0;
621	}
622
623	pthread_cleanup_push(mq_close_cleanup, mqdp);
624	if ((tcdp = mqdp->mqd_tcd) != NULL) {
625		mqdp->mqd_tcd = NULL;
626		del_sigev_mq(tcdp);	/* possible cancellation point */
627	}
628	pthread_cleanup_pop(1);		/* finish in the cleanup handler */
629
630	return (0);
631}
632
633int
634mq_unlink(const char *path)
635{
636	int err;
637
638	if (__pos4obj_check(path) < 0)
639		return (-1);
640
641	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
642		return (-1);
643	}
644
645	err = __pos4obj_unlink(path, MQ_PERM_TYPE);
646
647	if (err == 0 || (err == -1 && errno == EEXIST)) {
648		errno = 0;
649		err = __pos4obj_unlink(path, MQ_DATA_TYPE);
650	}
651
652	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
653		return (-1);
654
655	return (err);
656
657}
658
659static int
660__mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
661	uint_t msg_prio, const timespec_t *timeout, int abs_rel)
662{
663	mqdes_t *mqdp = (mqdes_t *)mqdes;
664	mqhdr_t *mqhp;
665	int err;
666	int notify = 0;
667
668	/*
669	 * sem_*wait() does cancellation, if called.
670	 * pthread_testcancel() ensures that cancellation takes place if
671	 * there is a cancellation pending when mq_*send() is called.
672	 */
673	pthread_testcancel();
674
675	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
676		errno = EBADF;
677		return (-1);
678	}
679
680	mqhp = mqdp->mqd_mq;
681
682	if (msg_prio >= mqhp->mq_maxprio) {
683		errno = EINVAL;
684		return (-1);
685	}
686	if (msg_len > mqhp->mq_maxsz) {
687		errno = EMSGSIZE;
688		return (-1);
689	}
690
691	if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
692		err = sem_trywait(&mqhp->mq_notfull);
693	else {
694		/*
695		 * We might get cancelled here...
696		 */
697		if (timeout == NULL)
698			err = sem_wait(&mqhp->mq_notfull);
699		else if (abs_rel == ABS_TIME)
700			err = sem_timedwait(&mqhp->mq_notfull, timeout);
701		else
702			err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
703	}
704	if (err == -1) {
705		/*
706		 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
707		 * by sem_*wait(), so we can just return.
708		 */
709		return (-1);
710	}
711
712	/*
713	 * By the time we're here, we know that we've got the capacity
714	 * to add to the queue...now acquire the exclusive lock.
715	 */
716	if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
717		owner_dead(mqdp, err);
718		return (-1);
719	}
720
721	/*
722	 * Now determine if we want to kick the notification.  POSIX
723	 * requires that if a process has registered for notification,
724	 * we must kick it when the queue makes an empty to non-empty
725	 * transition, and there are no blocked receivers.  Note that
726	 * this mechanism does _not_ guarantee that the kicked process
727	 * will be able to receive a message without blocking;
728	 * another receiver could intervene in the meantime.  Thus,
729	 * the notification mechanism is inherently racy; all we can
730	 * do is hope to minimize the window as much as possible.
731	 * In general, we want to avoid kicking the notification when
732	 * there are clearly receivers blocked.  We'll determine if
733	 * we want to kick the notification before the mq_putmsg(),
734	 * but the actual signotify() won't be done until the message
735	 * is on the queue.
736	 */
737	if (mqhp->mq_sigid.sn_pid != 0) {
738		int nmessages, nblocked;
739
740		(void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
741		(void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
742
743		if (nmessages == 0 && nblocked == 0)
744			notify = 1;
745	}
746
747	mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
748	(void) sem_post(&mqhp->mq_notempty);
749
750	if (notify) {
751		/* notify and also delete the registration */
752		do_notify(mqhp);
753	}
754
755	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
756	(void) mutex_unlock(&mqhp->mq_exclusive);
757
758	return (0);
759}
760
761int
762mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
763{
764	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
765	    NULL, ABS_TIME));
766}
767
768int
769mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
770	uint_t msg_prio, const timespec_t *abs_timeout)
771{
772	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
773	    abs_timeout, ABS_TIME));
774}
775
776int
777mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
778	uint_t msg_prio, const timespec_t *rel_timeout)
779{
780	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
781	    rel_timeout, REL_TIME));
782}
783
784static void
785decrement_rblocked(mqhdr_t *mqhp)
786{
787	int cancel_state;
788
789	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
790	while (sem_wait(&mqhp->mq_rblocked) == -1)
791		continue;
792	(void) pthread_setcancelstate(cancel_state, NULL);
793}
794
795static ssize_t
796__mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
797	uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
798{
799	mqdes_t *mqdp = (mqdes_t *)mqdes;
800	mqhdr_t *mqhp;
801	ssize_t	msg_size;
802	int err;
803
804	/*
805	 * sem_*wait() does cancellation, if called.
806	 * pthread_testcancel() ensures that cancellation takes place if
807	 * there is a cancellation pending when mq_*receive() is called.
808	 */
809	pthread_testcancel();
810
811	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
812		errno = EBADF;
813		return (ssize_t)(-1);
814	}
815
816	mqhp = mqdp->mqd_mq;
817
818	if (msg_len < mqhp->mq_maxsz) {
819		errno = EMSGSIZE;
820		return (ssize_t)(-1);
821	}
822
823	/*
824	 * The semaphoring scheme for mq_[timed]receive is a little hairy
825	 * thanks to POSIX.1b's arcane notification mechanism.  First,
826	 * we try to take the common case and do a sem_trywait().
827	 * If that doesn't work, and O_NONBLOCK hasn't been set,
828	 * then note that we're going to sleep by incrementing the rblocked
829	 * semaphore.  We decrement that semaphore after waking up.
830	 */
831	if (sem_trywait(&mqhp->mq_notempty) == -1) {
832		if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
833			/*
834			 * errno has been set to EAGAIN or EINTR by
835			 * sem_trywait(), so we can just return.
836			 */
837			return (-1);
838		}
839		/*
840		 * If we're here, then we're probably going to block...
841		 * increment the rblocked semaphore.  If we get
842		 * cancelled, decrement_rblocked() will decrement it.
843		 */
844		(void) sem_post(&mqhp->mq_rblocked);
845
846		pthread_cleanup_push(decrement_rblocked, mqhp);
847		if (timeout == NULL)
848			err = sem_wait(&mqhp->mq_notempty);
849		else if (abs_rel == ABS_TIME)
850			err = sem_timedwait(&mqhp->mq_notempty, timeout);
851		else
852			err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
853		pthread_cleanup_pop(1);
854
855		if (err == -1) {
856			/*
857			 * We took a signal or timeout while waiting
858			 * on mq_notempty...
859			 */
860			return (-1);
861		}
862	}
863
864	if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
865		owner_dead(mqdp, err);
866		return (-1);
867	}
868	msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
869	(void) sem_post(&mqhp->mq_notfull);
870	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
871	(void) mutex_unlock(&mqhp->mq_exclusive);
872
873	return (msg_size);
874}
875
876ssize_t
877mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
878{
879	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
880	    NULL, ABS_TIME));
881}
882
883ssize_t
884mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
885	uint_t *msg_prio, const timespec_t *abs_timeout)
886{
887	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
888	    abs_timeout, ABS_TIME));
889}
890
891ssize_t
892mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
893	uint_t *msg_prio, const timespec_t *rel_timeout)
894{
895	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
896	    rel_timeout, REL_TIME));
897}
898
899/*
900 * Only used below, in mq_notify().
901 * We already have a spawner thread.
902 * Verify that the attributes match; cancel it if necessary.
903 */
904static int
905cancel_if_necessary(thread_communication_data_t *tcdp,
906	const struct sigevent *sigevp)
907{
908	int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp,
909	    sigevp->sigev_notify_attributes);
910
911	if (do_cancel) {
912		/*
913		 * Attributes don't match, cancel the spawner thread.
914		 */
915		(void) pthread_cancel(tcdp->tcd_server_id);
916	} else {
917		/*
918		 * Reuse the existing spawner thread with possibly
919		 * changed notification function and value.
920		 */
921		tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
922		tcdp->tcd_notif.sigev_signo = 0;
923		tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
924		tcdp->tcd_notif.sigev_notify_function =
925		    sigevp->sigev_notify_function;
926	}
927
928	return (do_cancel);
929}
930
931int
932mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
933{
934	mqdes_t *mqdp = (mqdes_t *)mqdes;
935	mqhdr_t *mqhp;
936	thread_communication_data_t *tcdp;
937	siginfo_t mq_siginfo;
938	struct sigevent sigevent;
939	struct stat64 statb;
940	port_notify_t *pn;
941	void *userval;
942	int rval = -1;
943	int ntype;
944	int port;
945	int error;
946
947	if (!mq_is_valid(mqdp)) {
948		errno = EBADF;
949		return (-1);
950	}
951
952	mqhp = mqdp->mqd_mq;
953
954	if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
955		mqdp->mqd_ownerdead = 1;
956		sigevp = NULL;
957		if (error == EOWNERDEAD)
958			(void) mutex_unlock(&mqhp->mq_exclusive);
959		/* carry on regardless, without holding mq_exclusive */
960	}
961
962	if (sigevp == NULL) {		/* remove notification */
963		if (mqhp->mq_des == (uintptr_t)mqdp &&
964		    mqhp->mq_sigid.sn_pid == getpid()) {
965			/* notification is set for this descriptor, remove it */
966			(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
967			if ((tcdp = mqdp->mqd_tcd) != NULL) {
968				sig_mutex_lock(&tcdp->tcd_lock);
969				if (tcdp->tcd_msg_enabled) {
970					/* cancel the spawner thread */
971					tcdp = mqdp->mqd_tcd;
972					mqdp->mqd_tcd = NULL;
973					(void) pthread_cancel(
974					    tcdp->tcd_server_id);
975				}
976				sig_mutex_unlock(&tcdp->tcd_lock);
977			}
978			mqhp->mq_ntype = 0;
979			mqhp->mq_des = 0;
980		} else {
981			/* notification is not set for this descriptor */
982			errno = EBUSY;
983			goto bad;
984		}
985	} else {		/* register notification with this process */
986		switch (ntype = sigevp->sigev_notify) {
987		case SIGEV_THREAD:
988			userval = sigevp->sigev_value.sival_ptr;
989			port = -1;
990			break;
991		case SIGEV_PORT:
992			pn = sigevp->sigev_value.sival_ptr;
993			userval = pn->portnfy_user;
994			port = pn->portnfy_port;
995			if (fstat64(port, &statb) != 0 ||
996			    !S_ISPORT(statb.st_mode)) {
997				errno = EBADF;
998				goto bad;
999			}
1000			(void) memset(&sigevent, 0, sizeof (sigevent));
1001			sigevent.sigev_notify = SIGEV_PORT;
1002			sigevp = &sigevent;
1003			break;
1004		}
1005		switch (ntype) {
1006		case SIGEV_NONE:
1007			mq_siginfo.si_signo = 0;
1008			mq_siginfo.si_code = SI_MESGQ;
1009			break;
1010		case SIGEV_SIGNAL:
1011			mq_siginfo.si_signo = sigevp->sigev_signo;
1012			mq_siginfo.si_value = sigevp->sigev_value;
1013			mq_siginfo.si_code = SI_MESGQ;
1014			break;
1015		case SIGEV_THREAD:
1016			if ((tcdp = mqdp->mqd_tcd) != NULL &&
1017			    cancel_if_necessary(tcdp, sigevp))
1018				mqdp->mqd_tcd = NULL;
1019			/* FALLTHROUGH */
1020		case SIGEV_PORT:
1021			if ((tcdp = mqdp->mqd_tcd) == NULL) {
1022				/* we must create a spawner thread */
1023				tcdp = setup_sigev_handler(sigevp, MQ);
1024				if (tcdp == NULL) {
1025					errno = EBADF;
1026					goto bad;
1027				}
1028				tcdp->tcd_msg_enabled = 0;
1029				tcdp->tcd_msg_closing = 0;
1030				tcdp->tcd_msg_avail = &mqhp->mq_spawner;
1031				if (launch_spawner(tcdp) != 0) {
1032					free_sigev_handler(tcdp);
1033					goto bad;
1034				}
1035				mqdp->mqd_tcd = tcdp;
1036			}
1037			mq_siginfo.si_signo = 0;
1038			mq_siginfo.si_code = SI_MESGQ;
1039			break;
1040		default:
1041			errno = EINVAL;
1042			goto bad;
1043		}
1044
1045		/* register notification */
1046		if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
1047			goto bad;
1048		mqhp->mq_ntype = ntype;
1049		mqhp->mq_des = (uintptr_t)mqdp;
1050		switch (ntype) {
1051		case SIGEV_THREAD:
1052		case SIGEV_PORT:
1053			tcdp->tcd_port = port;
1054			tcdp->tcd_msg_object = mqdp;
1055			tcdp->tcd_msg_userval = userval;
1056			sig_mutex_lock(&tcdp->tcd_lock);
1057			tcdp->tcd_msg_enabled = ntype;
1058			sig_mutex_unlock(&tcdp->tcd_lock);
1059			(void) cond_broadcast(&tcdp->tcd_cv);
1060			break;
1061		}
1062	}
1063
1064	rval = 0;	/* success */
1065bad:
1066	if (error == 0) {
1067		(void) mutex_unlock(&mqhp->mq_exclusive);
1068	} else {
1069		errno = EBADMSG;
1070		rval = -1;
1071	}
1072	return (rval);
1073}
1074
1075int
1076mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
1077{
1078	mqdes_t *mqdp = (mqdes_t *)mqdes;
1079	mqhdr_t *mqhp;
1080	uint_t	flag = 0;
1081
1082	if (!mq_is_valid(mqdp)) {
1083		errno = EBADF;
1084		return (-1);
1085	}
1086
1087	/* store current attributes */
1088	if (omqstat != NULL) {
1089		int	count;
1090
1091		mqhp = mqdp->mqd_mq;
1092		omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1093		omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1094		omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1095		(void) sem_getvalue(&mqhp->mq_notempty, &count);
1096		omqstat->mq_curmsgs = count;
1097	}
1098
1099	/* set description attributes */
1100	if ((mqstat->mq_flags & O_NONBLOCK) != 0)
1101		flag = FNONBLOCK;
1102	mqdp->mqd_mqdn->mqdn_flags = flag;
1103
1104	return (0);
1105}
1106
1107int
1108mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
1109{
1110	mqdes_t *mqdp = (mqdes_t *)mqdes;
1111	mqhdr_t *mqhp;
1112	int count;
1113
1114	if (!mq_is_valid(mqdp)) {
1115		errno = EBADF;
1116		return (-1);
1117	}
1118
1119	mqhp = mqdp->mqd_mq;
1120
1121	mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1122	mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1123	mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1124	(void) sem_getvalue(&mqhp->mq_notempty, &count);
1125	mqstat->mq_curmsgs = count;
1126	return (0);
1127}
1128
1129/*
1130 * Cleanup after fork1() in the child process.
1131 */
1132void
1133postfork1_child_sigev_mq(void)
1134{
1135	thread_communication_data_t *tcdp;
1136	mqdes_t *mqdp;
1137
1138	for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
1139		if ((tcdp = mqdp->mqd_tcd) != NULL) {
1140			mqdp->mqd_tcd = NULL;
1141			tcd_teardown(tcdp);
1142		}
1143	}
1144}
1145