1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 23 /* 24 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 25 * Use is subject to license terms. 26 */ 27 28 #pragma ident "%Z%%M% %I% %E% SMI" 29 30 #pragma weak mq_open = _mq_open 31 #pragma weak mq_close = _mq_close 32 #pragma weak mq_unlink = _mq_unlink 33 #pragma weak mq_send = _mq_send 34 #pragma weak mq_timedsend = _mq_timedsend 35 #pragma weak mq_reltimedsend_np = _mq_reltimedsend_np 36 #pragma weak mq_receive = _mq_receive 37 #pragma weak mq_timedreceive = _mq_timedreceive 38 #pragma weak mq_reltimedreceive_np = _mq_reltimedreceive_np 39 #pragma weak mq_notify = _mq_notify 40 #pragma weak mq_setattr = _mq_setattr 41 #pragma weak mq_getattr = _mq_getattr 42 43 #include "c_synonyms.h" 44 #define _KMEMUSER 45 #include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */ 46 #undef _KMEMUSER 47 #include <mqueue.h> 48 #include <sys/types.h> 49 #include <sys/file.h> 50 #include <sys/mman.h> 51 #include <errno.h> 52 #include <stdarg.h> 53 #include <limits.h> 54 #include <pthread.h> 55 #include <assert.h> 56 #include <string.h> 57 #include <unistd.h> 58 #include <stdlib.h> 59 #include <sys/stat.h> 60 #include <inttypes.h> 61 62 #include "mqlib.h" 63 #include "pos4obj.h" 64 #include "pos4.h" 65 66 /* 67 * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit". 68 * If this assumption is somehow invalidated, mq_open() needs to be changed 69 * back to the old version which kept a count and enforced a limit. 70 * We make sure that this is pointed out to those changing <sys/param.h> 71 * by checking _MQ_OPEN_MAX at compile time. 72 */ 73 #if _MQ_OPEN_MAX != -1 74 #error "librt:mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing." 75 #endif 76 77 #define MQ_ALIGNSIZE 8 /* 64-bit alignment */ 78 79 #ifdef DEBUG 80 #define MQ_ASSERT(x) \ 81 assert(x); 82 83 #define MQ_ASSERT_PTR(_m, _p) \ 84 assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \ 85 !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \ 86 _m->mq_totsize)); 87 88 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \ 89 int _val; \ 90 (void) sem_getvalue((sem), &_val); \ 91 assert((_val) <= val); } 92 #else 93 #define MQ_ASSERT(x) 94 #define MQ_ASSERT_PTR(_m, _p) 95 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) 96 #endif 97 98 #define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n)) 99 #define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ 100 (uintptr_t)m->mq_headpp + n * sizeof (uint64_t))) 101 #define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ 102 (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t))) 103 104 #define MQ_RESERVED ((mqdes_t *)-1) 105 106 #define ABS_TIME 0 107 #define REL_TIME 1 108 109 static int 110 mq_is_valid(mqdes_t *mqdp) 111 { 112 /* 113 * Any use of a message queue after it was closed is 114 * undefined. But the standard strongly favours EBADF 115 * returns. Before we dereference which could be fatal, 116 * we first do some pointer sanity checks. 117 */ 118 if (mqdp != NULL && mqdp != MQ_RESERVED && 119 ((uintptr_t)mqdp & 0x7) == 0) { 120 return (mqdp->mqd_magic == MQ_MAGIC); 121 } 122 123 return (0); 124 } 125 126 static void 127 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg) 128 { 129 int i; 130 uint64_t temp; 131 uint64_t currentp; 132 uint64_t nextp; 133 134 /* 135 * We only need to initialize the non-zero fields. The use of 136 * ftruncate() on the message queue file assures that the 137 * pages will be zfod. 138 */ 139 (void) sem_init(&mqhp->mq_exclusive, 1, 1); 140 (void) sem_init(&mqhp->mq_rblocked, 1, 0); 141 (void) sem_init(&mqhp->mq_notempty, 1, 0); 142 (void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg); 143 144 mqhp->mq_maxsz = msgsize; 145 mqhp->mq_maxmsg = maxmsg; 146 147 /* 148 * As of this writing (1997), there are 32 message queue priorities. 149 * If this is to change, then the size of the mq_mask will also 150 * have to change. If NDEBUG isn't defined, assert that 151 * _MQ_PRIO_MAX hasn't changed. 152 */ 153 mqhp->mq_maxprio = _MQ_PRIO_MAX; 154 MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX); 155 156 mqhp->mq_magic = MQ_MAGIC; 157 158 /* 159 * Since the message queue can be mapped into different 160 * virtual address ranges by different processes, we don't 161 * keep track of pointers, only offsets into the shared region. 162 */ 163 mqhp->mq_headpp = sizeof (mqhdr_t); 164 mqhp->mq_tailpp = mqhp->mq_headpp + 165 mqhp->mq_maxprio * sizeof (uint64_t); 166 mqhp->mq_freep = mqhp->mq_tailpp + 167 mqhp->mq_maxprio * sizeof (uint64_t); 168 169 currentp = mqhp->mq_freep; 170 MQ_PTR(mqhp, currentp)->msg_next = 0; 171 172 temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); 173 for (i = 1; i < mqhp->mq_maxmsg; i++) { 174 nextp = currentp + sizeof (msghdr_t) + temp; 175 MQ_PTR(mqhp, currentp)->msg_next = nextp; 176 MQ_PTR(mqhp, nextp)->msg_next = 0; 177 currentp = nextp; 178 } 179 } 180 181 static size_t 182 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio) 183 { 184 uint64_t currentp; 185 msghdr_t *curbuf; 186 uint64_t *headpp; 187 uint64_t *tailpp; 188 189 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_exclusive, 0); 190 191 /* 192 * Get the head and tail pointers for the queue of maximum 193 * priority. We shouldn't be here unless there is a message for 194 * us, so it's fair to assert that both the head and tail 195 * pointers are non-NULL. 196 */ 197 headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio); 198 tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio); 199 200 if (msg_prio != NULL) 201 *msg_prio = mqhp->mq_curmaxprio; 202 203 currentp = *headpp; 204 MQ_ASSERT_PTR(mqhp, currentp); 205 curbuf = MQ_PTR(mqhp, currentp); 206 207 if ((*headpp = curbuf->msg_next) == NULL) { 208 /* 209 * We just nuked the last message in this priority's queue. 210 * Twiddle this priority's bit, and then find the next bit 211 * tipped. 212 */ 213 uint_t prio = mqhp->mq_curmaxprio; 214 215 mqhp->mq_mask &= ~(1u << prio); 216 217 for (; prio != 0; prio--) 218 if (mqhp->mq_mask & (1u << prio)) 219 break; 220 mqhp->mq_curmaxprio = prio; 221 222 *tailpp = NULL; 223 } 224 225 /* 226 * Copy the message, and put the buffer back on the free list. 227 */ 228 (void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len); 229 curbuf->msg_next = mqhp->mq_freep; 230 mqhp->mq_freep = currentp; 231 232 return (curbuf->msg_len); 233 } 234 235 236 static void 237 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio) 238 { 239 uint64_t currentp; 240 msghdr_t *curbuf; 241 uint64_t *headpp; 242 uint64_t *tailpp; 243 244 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_exclusive, 0); 245 246 /* 247 * Grab a free message block, and link it in. We shouldn't 248 * be here unless there is room in the queue for us; it's 249 * fair to assert that the free pointer is non-NULL. 250 */ 251 currentp = mqhp->mq_freep; 252 MQ_ASSERT_PTR(mqhp, currentp); 253 curbuf = MQ_PTR(mqhp, currentp); 254 255 /* 256 * Remove a message from the free list, and copy in the new contents. 257 */ 258 mqhp->mq_freep = curbuf->msg_next; 259 curbuf->msg_next = NULL; 260 (void) memcpy((char *)&curbuf[1], msgp, len); 261 curbuf->msg_len = len; 262 263 headpp = HEAD_PTR(mqhp, prio); 264 tailpp = TAIL_PTR(mqhp, prio); 265 266 if (*tailpp == 0) { 267 /* 268 * This is the first message on this queue. Set the 269 * head and tail pointers, and tip the appropriate bit 270 * in the priority mask. 271 */ 272 *headpp = currentp; 273 *tailpp = currentp; 274 mqhp->mq_mask |= (1u << prio); 275 if (prio > mqhp->mq_curmaxprio) 276 mqhp->mq_curmaxprio = prio; 277 } else { 278 MQ_ASSERT_PTR(mqhp, *tailpp); 279 MQ_PTR(mqhp, *tailpp)->msg_next = currentp; 280 *tailpp = currentp; 281 } 282 } 283 284 mqd_t 285 _mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...) 286 { 287 va_list ap; 288 mode_t mode; 289 struct mq_attr *attr; 290 int fd; 291 int err; 292 int cr_flag = 0; 293 int locked = 0; 294 uint64_t total_size; 295 size_t msgsize; 296 ssize_t maxmsg; 297 uint64_t temp; 298 void *ptr; 299 mqdes_t *mqdp; 300 mqhdr_t *mqhp; 301 struct mq_dn *mqdnp; 302 303 if (__pos4obj_check(path) == -1) 304 return ((mqd_t)-1); 305 306 /* acquire MSGQ lock to have atomic operation */ 307 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) 308 goto out; 309 locked = 1; 310 311 va_start(ap, oflag); 312 /* filter oflag to have READ/WRITE/CREATE modes only */ 313 oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK); 314 if ((oflag & O_CREAT) != 0) { 315 mode = va_arg(ap, mode_t); 316 attr = va_arg(ap, struct mq_attr *); 317 } 318 va_end(ap); 319 320 if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag, 321 mode, &cr_flag)) < 0) 322 goto out; 323 324 /* closing permission file */ 325 (void) __close_nc(fd); 326 327 /* Try to open/create data file */ 328 if (cr_flag) { 329 cr_flag = PFILE_CREATE; 330 if (attr == NULL) { 331 maxmsg = MQ_MAXMSG; 332 msgsize = MQ_MAXSIZE; 333 } else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) { 334 errno = EINVAL; 335 goto out; 336 } else if (attr->mq_maxmsg > _SEM_VALUE_MAX) { 337 errno = ENOSPC; 338 goto out; 339 } else { 340 maxmsg = attr->mq_maxmsg; 341 msgsize = attr->mq_msgsize; 342 } 343 344 /* adjust for message size at word boundary */ 345 temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); 346 347 total_size = sizeof (mqhdr_t) + 348 maxmsg * (temp + sizeof (msghdr_t)) + 349 2 * _MQ_PRIO_MAX * sizeof (uint64_t); 350 351 if (total_size > SSIZE_MAX) { 352 errno = ENOSPC; 353 goto out; 354 } 355 356 /* 357 * data file is opened with read/write to those 358 * who have read or write permission 359 */ 360 mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1; 361 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, 362 (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0) 363 goto out; 364 365 cr_flag |= DFILE_CREATE | DFILE_OPEN; 366 367 /* force permissions to avoid umask effect */ 368 if (fchmod(fd, mode) < 0) 369 goto out; 370 371 if (ftruncate64(fd, (off64_t)total_size) < 0) 372 goto out; 373 } else { 374 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, 375 O_RDWR, 0666, &err)) < 0) 376 goto out; 377 cr_flag = DFILE_OPEN; 378 379 /* Message queue has not been initialized yet */ 380 if (read(fd, &total_size, sizeof (total_size)) != 381 sizeof (total_size) || total_size == 0) { 382 errno = ENOENT; 383 goto out; 384 } 385 386 /* Message queue too big for this process to handle */ 387 if (total_size > SSIZE_MAX) { 388 errno = EFBIG; 389 goto out; 390 } 391 } 392 393 if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) { 394 errno = ENOMEM; 395 goto out; 396 } 397 cr_flag |= ALLOC_MEM; 398 399 if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE, 400 MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) 401 goto out; 402 mqhp = ptr; 403 cr_flag |= DFILE_MMAP; 404 405 /* closing data file */ 406 (void) __close_nc(fd); 407 cr_flag &= ~DFILE_OPEN; 408 409 /* 410 * create, unlink, size, mmap, and close description file 411 * all for a flag word in anonymous shared memory 412 */ 413 if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT, 414 0666, &err)) < 0) 415 goto out; 416 cr_flag |= DFILE_OPEN; 417 (void) __pos4obj_unlink(path, MQ_DSCN_TYPE); 418 if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0) 419 goto out; 420 421 if ((ptr = mmap64(NULL, sizeof (struct mq_dn), 422 PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) 423 goto out; 424 mqdnp = ptr; 425 cr_flag |= MQDNP_MMAP; 426 427 (void) __close_nc(fd); 428 cr_flag &= ~DFILE_OPEN; 429 430 /* 431 * we follow the same strategy as filesystem open() routine, 432 * where fcntl.h flags are changed to flags defined in file.h. 433 */ 434 mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE); 435 mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK); 436 437 /* new message queue requires initialization */ 438 if ((cr_flag & DFILE_CREATE) != 0) { 439 /* message queue header has to be initialized */ 440 mq_init(mqhp, msgsize, maxmsg); 441 mqhp->mq_totsize = total_size; 442 } 443 mqdp->mqd_mq = mqhp; 444 mqdp->mqd_mqdn = mqdnp; 445 mqdp->mqd_magic = MQ_MAGIC; 446 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) 447 return ((mqd_t)mqdp); 448 449 locked = 0; /* fall into the error case */ 450 out: 451 err = errno; 452 if ((cr_flag & DFILE_OPEN) != 0) 453 (void) __close_nc(fd); 454 if ((cr_flag & DFILE_CREATE) != 0) 455 (void) __pos4obj_unlink(path, MQ_DATA_TYPE); 456 if ((cr_flag & PFILE_CREATE) != 0) 457 (void) __pos4obj_unlink(path, MQ_PERM_TYPE); 458 if ((cr_flag & ALLOC_MEM) != 0) 459 free((void *)mqdp); 460 if ((cr_flag & DFILE_MMAP) != 0) 461 (void) munmap((caddr_t)mqhp, (size_t)total_size); 462 if ((cr_flag & MQDNP_MMAP) != 0) 463 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); 464 if (locked) 465 (void) __pos4obj_unlock(path, MQ_LOCK_TYPE); 466 errno = err; 467 return ((mqd_t)-1); 468 } 469 470 int 471 _mq_close(mqd_t mqdes) 472 { 473 mqdes_t *mqdp = (mqdes_t *)mqdes; 474 mqhdr_t *mqhp; 475 struct mq_dn *mqdnp; 476 int canstate; 477 478 if (!mq_is_valid(mqdp)) { 479 errno = EBADF; 480 return (-1); 481 } 482 483 mqhp = mqdp->mqd_mq; 484 mqdnp = mqdp->mqd_mqdn; 485 486 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &canstate); 487 while (sem_wait(&mqhp->mq_exclusive) == -1 && errno == EINTR) 488 continue; 489 (void) pthread_setcancelstate(canstate, NULL); 490 491 if (mqhp->mq_des == (uintptr_t)mqdp && 492 mqhp->mq_sigid.sn_pid == getpid()) { 493 /* Notification is set for this descriptor, remove it */ 494 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); 495 mqhp->mq_sigid.sn_pid = 0; 496 mqhp->mq_des = 0; 497 } 498 (void) sem_post(&mqhp->mq_exclusive); 499 500 /* Invalidate the descriptor before freeing it */ 501 mqdp->mqd_magic = 0; 502 free(mqdp); 503 504 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); 505 return (munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize)); 506 } 507 508 int 509 _mq_unlink(const char *path) 510 { 511 int err; 512 513 if (__pos4obj_check(path) < 0) 514 return (-1); 515 516 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) { 517 return (-1); 518 } 519 520 err = __pos4obj_unlink(path, MQ_PERM_TYPE); 521 522 if (err == 0 || (err == -1 && errno == EEXIST)) { 523 errno = 0; 524 err = __pos4obj_unlink(path, MQ_DATA_TYPE); 525 } 526 527 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0) 528 return (-1); 529 530 return (err); 531 532 } 533 534 static int 535 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 536 uint_t msg_prio, const timespec_t *timeout, int abs_rel) 537 { 538 mqdes_t *mqdp = (mqdes_t *)mqdes; 539 mqhdr_t *mqhp; 540 int err; 541 int canstate; 542 int notify = 0; 543 544 /* 545 * sem_*wait() does cancellation, if called. 546 * pthread_testcancel() ensures that cancellation takes place if 547 * there is a cancellation pending when mq_*send() is called. 548 */ 549 pthread_testcancel(); 550 551 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) { 552 errno = EBADF; 553 return (-1); 554 } 555 556 mqhp = mqdp->mqd_mq; 557 558 if (msg_prio >= mqhp->mq_maxprio) { 559 errno = EINVAL; 560 return (-1); 561 } 562 if (msg_len > mqhp->mq_maxsz) { 563 errno = EMSGSIZE; 564 return (-1); 565 } 566 567 if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) 568 err = sem_trywait(&mqhp->mq_notfull); 569 else { 570 /* 571 * We might get cancelled here... 572 */ 573 if (timeout == NULL) 574 err = sem_wait(&mqhp->mq_notfull); 575 else if (abs_rel == ABS_TIME) 576 err = sem_timedwait(&mqhp->mq_notfull, timeout); 577 else 578 err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout); 579 } 580 if (err == -1) { 581 /* 582 * errno has been set to EAGAIN / EINTR / ETIMEDOUT 583 * by sem_*wait(), so we can just return. 584 */ 585 return (-1); 586 } 587 588 /* 589 * By the time we're here, we know that we've got the capacity 590 * to add to the queue...now acquire the exclusive lock. 591 */ 592 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &canstate); 593 err = sem_wait(&mqhp->mq_exclusive); 594 (void) pthread_setcancelstate(canstate, NULL); 595 if (err == -1) { 596 /* 597 * We must have been interrupted by a signal. 598 * Post on mq_notfull so someone else can take our slot. 599 */ 600 (void) sem_post(&mqhp->mq_notfull); 601 return (-1); 602 } 603 604 /* 605 * Now determine if we want to kick the notification. POSIX 606 * requires that if a process has registered for notification, 607 * we must kick it when the queue makes an empty to non-empty 608 * transition, and there are no blocked receivers. Note that 609 * this mechanism does _not_ guarantee that the kicked process 610 * will be able to receive a message without blocking; another 611 * receiver could intervene in the meantime. Thus, 612 * the notification mechanism is inherently racy; all we can 613 * do is hope to minimize the window as much as possible. In 614 * general, we want to avoid kicking the notification when 615 * there are clearly receivers blocked. We'll determine if we 616 * want to kick the notification before the mq_putmsg(), but the 617 * actual signotify() won't be done until the message is on 618 * the queue. 619 */ 620 if (mqhp->mq_sigid.sn_pid != 0) { 621 int nmessages, nblocked; 622 (void) sem_getvalue(&mqhp->mq_notempty, &nmessages); 623 (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked); 624 625 if (nmessages == 0 && nblocked == 0) 626 notify = 1; 627 } 628 629 mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio); 630 631 /* 632 * The ordering here is important. We want to make sure that 633 * one has to have mq_exclusive before being able to kick 634 * mq_notempty. 635 */ 636 (void) sem_post(&mqhp->mq_notempty); 637 638 if (notify) { 639 (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid); 640 mqhp->mq_sigid.sn_pid = 0; 641 mqhp->mq_des = 0; 642 } 643 644 (void) sem_post(&mqhp->mq_exclusive); 645 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg)); 646 647 return (0); 648 } 649 650 int 651 _mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio) 652 { 653 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 654 NULL, ABS_TIME)); 655 } 656 657 int 658 _mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 659 uint_t msg_prio, const timespec_t *abs_timeout) 660 { 661 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 662 abs_timeout, ABS_TIME)); 663 } 664 665 int 666 _mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 667 uint_t msg_prio, const timespec_t *rel_timeout) 668 { 669 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, 670 rel_timeout, REL_TIME)); 671 } 672 673 static void 674 decrement_rblocked(mqhdr_t *mqhp) 675 { 676 int canstate; 677 678 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &canstate); 679 while (sem_wait(&mqhp->mq_rblocked) == -1) 680 continue; 681 (void) pthread_setcancelstate(canstate, NULL); 682 } 683 684 static ssize_t 685 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 686 uint_t *msg_prio, const timespec_t *timeout, int abs_rel) 687 { 688 mqdes_t *mqdp = (mqdes_t *)mqdes; 689 mqhdr_t *mqhp; 690 ssize_t msg_size; 691 int canstate; 692 int err; 693 694 /* 695 * sem_*wait() does cancellation, if called. 696 * pthread_testcancel() ensures that cancellation takes place if 697 * there is a cancellation pending when mq_*receive() is called. 698 */ 699 pthread_testcancel(); 700 701 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) { 702 errno = EBADF; 703 return (ssize_t)(-1); 704 } 705 706 mqhp = mqdp->mqd_mq; 707 708 if (msg_len < mqhp->mq_maxsz) { 709 errno = EMSGSIZE; 710 return (ssize_t)(-1); 711 } 712 713 /* 714 * The semaphoring scheme for mq_[timed]receive is a little hairy 715 * thanks to POSIX.1b's arcane notification mechanism. First, 716 * we try to take the common case and do a sem_trywait(). 717 * If that doesn't work, and O_NONBLOCK hasn't been set, 718 * then note that we're going to sleep by incrementing the rblocked 719 * semaphore. We decrement that semaphore after waking up. 720 */ 721 if (sem_trywait(&mqhp->mq_notempty) == -1) { 722 if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) { 723 /* 724 * errno has been set to EAGAIN or EINTR by 725 * sem_trywait(), so we can just return. 726 */ 727 return (-1); 728 } 729 /* 730 * If we're here, then we're probably going to block... 731 * increment the rblocked semaphore. If we get 732 * cancelled, decrement_rblocked() will decrement it. 733 */ 734 (void) sem_post(&mqhp->mq_rblocked); 735 736 pthread_cleanup_push(decrement_rblocked, mqhp); 737 if (timeout == NULL) 738 err = sem_wait(&mqhp->mq_notempty); 739 else if (abs_rel == ABS_TIME) 740 err = sem_timedwait(&mqhp->mq_notempty, timeout); 741 else 742 err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout); 743 pthread_cleanup_pop(1); 744 745 if (err == -1) { 746 /* 747 * We took a signal or timeout while waiting 748 * on mq_notempty... 749 */ 750 return (-1); 751 } 752 } 753 754 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &canstate); 755 err = sem_wait(&mqhp->mq_exclusive); 756 (void) pthread_setcancelstate(canstate, NULL); 757 if (err == -1) { 758 /* 759 * We must have been interrupted by a signal. 760 * Post on mq_notfull so someone else can take our message. 761 */ 762 (void) sem_post(&mqhp->mq_notempty); 763 return (-1); 764 } 765 766 msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio); 767 768 (void) sem_post(&mqhp->mq_notfull); 769 (void) sem_post(&mqhp->mq_exclusive); 770 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg)); 771 772 return (msg_size); 773 } 774 775 ssize_t 776 _mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio) 777 { 778 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 779 NULL, ABS_TIME)); 780 } 781 782 ssize_t 783 _mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, 784 uint_t *msg_prio, const timespec_t *abs_timeout) 785 { 786 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 787 abs_timeout, ABS_TIME)); 788 } 789 790 ssize_t 791 _mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len, 792 uint_t *msg_prio, const timespec_t *rel_timeout) 793 { 794 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, 795 rel_timeout, REL_TIME)); 796 } 797 798 int 799 _mq_notify(mqd_t mqdes, const struct sigevent *notification) 800 { 801 mqdes_t *mqdp = (mqdes_t *)mqdes; 802 mqhdr_t *mqhp; 803 int canstate; 804 siginfo_t mq_siginfo; 805 806 if (!mq_is_valid(mqdp)) { 807 errno = EBADF; 808 return (-1); 809 } 810 811 mqhp = mqdp->mqd_mq; 812 813 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &canstate); 814 while (sem_wait(&mqhp->mq_exclusive) == -1 && errno == EINTR) 815 continue; 816 (void) pthread_setcancelstate(canstate, NULL); 817 818 if (notification == NULL) { 819 if (mqhp->mq_des == (uintptr_t)mqdp && 820 mqhp->mq_sigid.sn_pid == getpid()) { 821 /* 822 * Remove signotify_id if queue is registered with 823 * this process 824 */ 825 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); 826 mqhp->mq_sigid.sn_pid = 0; 827 mqhp->mq_des = 0; 828 } else { 829 /* 830 * if registered with another process or mqdes 831 */ 832 errno = EBUSY; 833 goto bad; 834 } 835 } else { 836 /* 837 * Register notification with this process. 838 */ 839 840 switch (notification->sigev_notify) { 841 case SIGEV_NONE: 842 mq_siginfo.si_signo = 0; 843 mq_siginfo.si_code = SI_MESGQ; 844 break; 845 case SIGEV_SIGNAL: 846 mq_siginfo.si_signo = notification->sigev_signo; 847 mq_siginfo.si_value = notification->sigev_value; 848 mq_siginfo.si_code = SI_MESGQ; 849 break; 850 case SIGEV_THREAD: 851 errno = ENOSYS; 852 goto bad; 853 default: 854 errno = EINVAL; 855 goto bad; 856 } 857 858 /* 859 * Either notification is not present, or if 860 * notification is already present, but the process 861 * which registered notification does not exist then 862 * kernel can register notification for current process. 863 */ 864 865 if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0) 866 goto bad; 867 mqhp->mq_des = (uintptr_t)mqdp; 868 } 869 870 (void) sem_post(&mqhp->mq_exclusive); 871 return (0); 872 873 bad: 874 (void) sem_post(&mqhp->mq_exclusive); 875 return (-1); 876 } 877 878 int 879 _mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat) 880 { 881 mqdes_t *mqdp = (mqdes_t *)mqdes; 882 mqhdr_t *mqhp; 883 uint_t flag = 0; 884 885 if (!mq_is_valid(mqdp)) { 886 errno = EBADF; 887 return (-1); 888 } 889 890 /* store current attributes */ 891 if (omqstat != NULL) { 892 int count; 893 894 mqhp = mqdp->mqd_mq; 895 omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; 896 omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; 897 omqstat->mq_msgsize = (long)mqhp->mq_maxsz; 898 (void) sem_getvalue(&mqhp->mq_notempty, &count); 899 omqstat->mq_curmsgs = count; 900 } 901 902 /* set description attributes */ 903 if ((mqstat->mq_flags & O_NONBLOCK) != 0) 904 flag = FNONBLOCK; 905 mqdp->mqd_mqdn->mqdn_flags = flag; 906 907 return (0); 908 } 909 910 int 911 _mq_getattr(mqd_t mqdes, struct mq_attr *mqstat) 912 { 913 mqdes_t *mqdp = (mqdes_t *)mqdes; 914 mqhdr_t *mqhp; 915 int count; 916 917 if (!mq_is_valid(mqdp)) { 918 errno = EBADF; 919 return (-1); 920 } 921 922 mqhp = mqdp->mqd_mq; 923 924 mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; 925 mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; 926 mqstat->mq_msgsize = (long)mqhp->mq_maxsz; 927 (void) sem_getvalue(&mqhp->mq_notempty, &count); 928 mqstat->mq_curmsgs = count; 929 return (0); 930 } 931