1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 25 /* 26 * Copyright (c) 2006 Oracle. All rights reserved. 27 * 28 * This software is available to you under a choice of one of two 29 * licenses. You may choose to be licensed under the terms of the GNU 30 * General Public License (GPL) Version 2, available from the file 31 * COPYING in the main directory of this source tree, or the 32 * OpenIB.org BSD license below: 33 * 34 * Redistribution and use in source and binary forms, with or 35 * without modification, are permitted provided that the following 36 * conditions are met: 37 * 38 * - Redistributions of source code must retain the above 39 * copyright notice, this list of conditions and the following 40 * disclaimer. 41 * 42 * - Redistributions in binary form must reproduce the above 43 * copyright notice, this list of conditions and the following 44 * disclaimer in the documentation and/or other materials 45 * provided with the distribution. 46 * 47 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 48 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 49 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 50 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 51 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 52 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 53 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 54 * SOFTWARE. 55 * 56 */ 57 #include <sys/rds.h> 58 59 #include <sys/ib/clients/rdsv3/rdsv3.h> 60 #include <sys/ib/clients/rdsv3/rdma.h> 61 #include <sys/ib/clients/rdsv3/rdsv3_debug.h> 62 63 void 64 rdsv3_inc_init(struct rdsv3_incoming *inc, struct rdsv3_connection *conn, 65 uint32_be_t saddr) 66 { 67 RDSV3_DPRINTF5("rdsv3_inc_init", "Enter(inc: %p, conn: %p)", inc, conn); 68 inc->i_refcount = 1; 69 list_link_init(&inc->i_item); 70 inc->i_conn = conn; 71 inc->i_saddr = saddr; 72 inc->i_rdma_cookie = 0; 73 } 74 75 void 76 rdsv3_inc_addref(struct rdsv3_incoming *inc) 77 { 78 RDSV3_DPRINTF4("rdsv3_inc_addref", 79 "addref inc %p ref %d", inc, atomic_get(&inc->i_refcount)); 80 atomic_add_32(&inc->i_refcount, 1); 81 } 82 83 void 84 rdsv3_inc_put(struct rdsv3_incoming *inc) 85 { 86 RDSV3_DPRINTF4("rdsv3_inc_put", "put inc %p ref %d", 87 inc, atomic_get(&inc->i_refcount)); 88 if (atomic_dec_and_test(&inc->i_refcount)) { 89 ASSERT(!list_link_active(&inc->i_item)); 90 91 inc->i_conn->c_trans->inc_free(inc); 92 } 93 } 94 95 /*ARGSUSED*/ 96 static void 97 rdsv3_recv_rcvbuf_delta(struct rdsv3_sock *rs, struct rsock *sk, 98 struct rdsv3_cong_map *map, 99 int delta, uint16_be_t port) 100 { 101 int now_congested; 102 103 RDSV3_DPRINTF4("rdsv3_recv_rcvbuf_delta", 104 "Enter(rs: %p, map: %p, delta: %d, port: %d)", 105 rs, map, delta, port); 106 107 if (delta == 0) 108 return; 109 110 rs->rs_rcv_bytes += delta; 111 now_congested = rs->rs_rcv_bytes > rdsv3_sk_rcvbuf(rs); 112 113 RDSV3_DPRINTF5("rdsv3_recv_rcvbuf_delta", 114 "rs %p (%u.%u.%u.%u:%u) recv bytes %d buf %d " 115 "now_cong %d delta %d", 116 rs, NIPQUAD(rs->rs_bound_addr), 117 (int)ntohs(rs->rs_bound_port), rs->rs_rcv_bytes, 118 rdsv3_sk_rcvbuf(rs), now_congested, delta); 119 120 /* wasn't -> am congested */ 121 if (!rs->rs_congested && now_congested) { 122 rs->rs_congested = 1; 123 rdsv3_cong_set_bit(map, port); 124 rdsv3_cong_queue_updates(map); 125 } 126 /* was -> aren't congested */ 127 /* 128 * Require more free space before reporting uncongested to prevent 129 * bouncing cong/uncong state too often 130 */ 131 else if (rs->rs_congested && 132 (rs->rs_rcv_bytes < (rdsv3_sk_rcvbuf(rs)/2))) { 133 rs->rs_congested = 0; 134 rdsv3_cong_clear_bit(map, port); 135 rdsv3_cong_queue_updates(map); 136 } 137 138 /* do nothing if no change in cong state */ 139 140 RDSV3_DPRINTF4("rdsv3_recv_rcvbuf_delta", "Return(rs: %p)", rs); 141 } 142 143 /* 144 * Process all extension headers that come with this message. 145 */ 146 static void 147 rdsv3_recv_incoming_exthdrs(struct rdsv3_incoming *inc, struct rdsv3_sock *rs) 148 { 149 struct rdsv3_header *hdr = &inc->i_hdr; 150 unsigned int pos = 0, type, len; 151 union { 152 struct rdsv3_ext_header_version version; 153 struct rdsv3_ext_header_rdma rdma; 154 struct rdsv3_ext_header_rdma_dest rdma_dest; 155 } buffer; 156 157 RDSV3_DPRINTF4("rdsv3_recv_incoming_exthdrs", "Enter"); 158 while (1) { 159 len = sizeof (buffer); 160 type = rdsv3_message_next_extension(hdr, &pos, &buffer, &len); 161 if (type == RDSV3_EXTHDR_NONE) 162 break; 163 RDSV3_DPRINTF4("recv_incoming_exthdrs", "type %d", type); 164 /* Process extension header here */ 165 switch (type) { 166 case RDSV3_EXTHDR_RDMA: 167 rdsv3_rdma_unuse(rs, ntohl(buffer.rdma.h_rdma_rkey), 168 0); 169 break; 170 171 case RDSV3_EXTHDR_RDMA_DEST: 172 /* 173 * We ignore the size for now. We could stash it 174 * somewhere and use it for error checking. 175 */ 176 inc->i_rdma_cookie = rdsv3_rdma_make_cookie( 177 ntohl(buffer.rdma_dest.h_rdma_rkey), 178 ntohl(buffer.rdma_dest.h_rdma_offset)); 179 180 break; 181 } 182 } 183 RDSV3_DPRINTF4("rdsv3_recv_incoming_exthdrs", "Return"); 184 } 185 186 /* 187 * The transport must make sure that this is serialized against other 188 * rx and conn reset on this specific conn. 189 * 190 * We currently assert that only one fragmented message will be sent 191 * down a connection at a time. This lets us reassemble in the conn 192 * instead of per-flow which means that we don't have to go digging through 193 * flows to tear down partial reassembly progress on conn failure and 194 * we save flow lookup and locking for each frag arrival. It does mean 195 * that small messages will wait behind large ones. Fragmenting at all 196 * is only to reduce the memory consumption of pre-posted buffers. 197 * 198 * The caller passes in saddr and daddr instead of us getting it from the 199 * conn. This lets loopback, who only has one conn for both directions, 200 * tell us which roles the addrs in the conn are playing for this message. 201 */ 202 /* ARGSUSED */ 203 void 204 rdsv3_recv_incoming(struct rdsv3_connection *conn, uint32_be_t saddr, 205 uint32_be_t daddr, struct rdsv3_incoming *inc, int gfp) 206 { 207 struct rdsv3_sock *rs = NULL; 208 struct rsock *sk; 209 210 inc->i_conn = conn; 211 inc->i_rx_jiffies = jiffies; 212 213 RDSV3_DPRINTF5("rdsv3_recv_incoming", 214 "conn %p next %llu inc %p seq %llu len %u sport %u dport %u " 215 "flags 0x%x rx_jiffies %lu", conn, 216 (unsigned long long)conn->c_next_rx_seq, 217 inc, 218 (unsigned long long)ntohll(inc->i_hdr.h_sequence), 219 ntohl(inc->i_hdr.h_len), 220 ntohs(inc->i_hdr.h_sport), 221 ntohs(inc->i_hdr.h_dport), 222 inc->i_hdr.h_flags, 223 inc->i_rx_jiffies); 224 225 /* 226 * Sequence numbers should only increase. Messages get their 227 * sequence number as they're queued in a sending conn. They 228 * can be dropped, though, if the sending socket is closed before 229 * they hit the wire. So sequence numbers can skip forward 230 * under normal operation. They can also drop back in the conn 231 * failover case as previously sent messages are resent down the 232 * new instance of a conn. We drop those, otherwise we have 233 * to assume that the next valid seq does not come after a 234 * hole in the fragment stream. 235 * 236 * The headers don't give us a way to realize if fragments of 237 * a message have been dropped. We assume that frags that arrive 238 * to a flow are part of the current message on the flow that is 239 * being reassembled. This means that senders can't drop messages 240 * from the sending conn until all their frags are sent. 241 * 242 * XXX we could spend more on the wire to get more robust failure 243 * detection, arguably worth it to avoid data corruption. 244 */ 245 if (ntohll(inc->i_hdr.h_sequence) < conn->c_next_rx_seq && 246 (inc->i_hdr.h_flags & RDSV3_FLAG_RETRANSMITTED)) { 247 rdsv3_stats_inc(s_recv_drop_old_seq); 248 goto out; 249 } 250 conn->c_next_rx_seq = ntohll(inc->i_hdr.h_sequence) + 1; 251 252 if (rdsv3_sysctl_ping_enable && inc->i_hdr.h_dport == 0) { 253 rdsv3_stats_inc(s_recv_ping); 254 (void) rdsv3_send_pong(conn, inc->i_hdr.h_sport); 255 goto out; 256 } 257 258 rs = rdsv3_find_bound(daddr, inc->i_hdr.h_dport); 259 if (!rs) { 260 rdsv3_stats_inc(s_recv_drop_no_sock); 261 goto out; 262 } 263 264 /* Process extension headers */ 265 rdsv3_recv_incoming_exthdrs(inc, rs); 266 267 /* We can be racing with rdsv3_release() which marks the socket dead. */ 268 sk = rdsv3_rs_to_sk(rs); 269 270 /* serialize with rdsv3_release -> sock_orphan */ 271 rw_enter(&rs->rs_recv_lock, RW_WRITER); 272 if (!rdsv3_sk_sock_flag(sk, SOCK_DEAD)) { 273 int error, bytes; 274 RDSV3_DPRINTF5("rdsv3_recv_incoming", 275 "adding inc %p to rs %p's recv queue", inc, rs); 276 rdsv3_stats_inc(s_recv_queued); 277 rdsv3_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong, 278 ntohl(inc->i_hdr.h_len), 279 inc->i_hdr.h_dport); 280 rdsv3_inc_addref(inc); 281 list_insert_tail(&rs->rs_recv_queue, inc); 282 bytes = rs->rs_rcv_bytes; 283 rw_exit(&rs->rs_recv_lock); 284 285 __rdsv3_wake_sk_sleep(sk); 286 287 /* wake up anyone waiting in poll */ 288 sk->sk_upcalls->su_recv(sk->sk_upper_handle, NULL, 289 bytes, 0, &error, NULL); 290 if (error != 0) { 291 RDSV3_DPRINTF2("rdsv3_recv_incoming", 292 "su_recv returned: %d", error); 293 } 294 } else { 295 rdsv3_stats_inc(s_recv_drop_dead_sock); 296 rw_exit(&rs->rs_recv_lock); 297 } 298 299 out: 300 if (rs) 301 rdsv3_sock_put(rs); 302 } 303 304 /* 305 * be very careful here. This is being called as the condition in 306 * wait_event_*() needs to cope with being called many times. 307 */ 308 static int 309 rdsv3_next_incoming(struct rdsv3_sock *rs, struct rdsv3_incoming **inc) 310 { 311 if (!*inc) { 312 rw_enter(&rs->rs_recv_lock, RW_READER); 313 if (!list_is_empty(&rs->rs_recv_queue)) { 314 *inc = list_head(&rs->rs_recv_queue); 315 rdsv3_inc_addref(*inc); 316 } 317 rw_exit(&rs->rs_recv_lock); 318 } 319 320 return (*inc != NULL); 321 } 322 323 static int 324 rdsv3_still_queued(struct rdsv3_sock *rs, struct rdsv3_incoming *inc, 325 int drop) 326 { 327 struct rsock *sk = rdsv3_rs_to_sk(rs); 328 int ret = 0; 329 330 RDSV3_DPRINTF4("rdsv3_still_queued", "Enter rs: %p inc: %p drop: %d", 331 rs, inc, drop); 332 333 rw_enter(&rs->rs_recv_lock, RW_WRITER); 334 if (list_link_active(&inc->i_item)) { 335 ret = 1; 336 if (drop) { 337 /* XXX make sure this i_conn is reliable */ 338 rdsv3_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong, 339 -ntohl(inc->i_hdr.h_len), 340 inc->i_hdr.h_dport); 341 list_remove_node(&inc->i_item); 342 rdsv3_inc_put(inc); 343 } 344 } 345 rw_exit(&rs->rs_recv_lock); 346 347 RDSV3_DPRINTF5("rdsv3_still_queued", 348 "inc %p rs %p still %d dropped %d", inc, rs, ret, drop); 349 return (ret); 350 } 351 352 /* 353 * Pull errors off the error queue. 354 * If msghdr is NULL, we will just purge the error queue. 355 */ 356 int 357 rdsv3_notify_queue_get(struct rdsv3_sock *rs, struct msghdr *msghdr) 358 { 359 struct rdsv3_notifier *notifier; 360 struct rdsv3_rdma_notify cmsg; 361 unsigned int count = 0, max_messages = ~0U; 362 list_t copy; 363 int err = 0; 364 365 RDSV3_DPRINTF4("rdsv3_notify_queue_get", "Enter(rs: %p)", rs); 366 367 list_create(©, sizeof (struct rdsv3_notifier), 368 offsetof(struct rdsv3_notifier, n_list)); 369 370 371 /* 372 * put_cmsg copies to user space and thus may sleep. We can't do this 373 * with rs_lock held, so first grab as many notifications as we can 374 * stuff 375 * in the user provided cmsg buffer. We don't try to copy more, to avoid 376 * losing notifications - except when the buffer is so small that 377 * it wouldn't 378 * even hold a single notification. Then we give him as much of this 379 * single 380 * msg as we can squeeze in, and set MSG_CTRUNC. 381 */ 382 if (msghdr) { 383 max_messages = 384 msghdr->msg_controllen / CMSG_SPACE(sizeof (cmsg)); 385 if (!max_messages) 386 max_messages = 1; 387 } 388 389 mutex_enter(&rs->rs_lock); 390 while (!list_is_empty(&rs->rs_notify_queue) && count < max_messages) { 391 notifier = list_remove_head(&rs->rs_notify_queue); 392 list_insert_tail(©, notifier); 393 count++; 394 } 395 mutex_exit(&rs->rs_lock); 396 397 if (!count) 398 return (0); 399 400 while (!list_is_empty(©)) { 401 notifier = list_remove_head(©); 402 403 if (msghdr) { 404 cmsg.user_token = notifier->n_user_token; 405 cmsg.status = notifier->n_status; 406 407 err = rdsv3_put_cmsg(msghdr, SOL_RDS, 408 RDSV3_CMSG_RDMA_STATUS, sizeof (cmsg), &cmsg); 409 if (err) 410 break; 411 } 412 413 kmem_free(notifier, sizeof (struct rdsv3_notifier)); 414 } 415 416 /* 417 * If we bailed out because of an error in put_cmsg, 418 * we may be left with one or more notifications that we 419 * didn't process. Return them to the head of the list. 420 */ 421 if (!list_is_empty(©)) { 422 mutex_enter(&rs->rs_lock); 423 list_splice(©, &rs->rs_notify_queue); 424 mutex_exit(&rs->rs_lock); 425 } 426 427 RDSV3_DPRINTF4("rdsv3_notify_queue_get", "Return(rs: %p)", rs); 428 429 return (err); 430 } 431 432 /* 433 * Queue a congestion notification 434 */ 435 static int 436 rdsv3_notify_cong(struct rdsv3_sock *rs, struct msghdr *msghdr) 437 { 438 uint64_t notify = rs->rs_cong_notify; 439 int err; 440 441 err = rdsv3_put_cmsg(msghdr, SOL_RDS, RDSV3_CMSG_CONG_UPDATE, 442 sizeof (notify), ¬ify); 443 if (err) 444 return (err); 445 446 mutex_enter(&rs->rs_lock); 447 rs->rs_cong_notify &= ~notify; 448 mutex_exit(&rs->rs_lock); 449 450 return (0); 451 } 452 453 /* 454 * Receive any control messages. 455 */ 456 static int 457 rdsv3_cmsg_recv(struct rdsv3_incoming *inc, struct msghdr *msg) 458 { 459 return (rdsv3_put_cmsg(msg, SOL_RDS, RDSV3_CMSG_RDMA_DEST, 460 sizeof (inc->i_rdma_cookie), &inc->i_rdma_cookie)); 461 } 462 463 int 464 rdsv3_recvmsg(struct rdsv3_sock *rs, uio_t *uio, 465 struct nmsghdr *msg, size_t size, int msg_flags) 466 { 467 struct rsock *sk = rdsv3_rs_to_sk(rs); 468 long timeo; 469 int ret = 0; 470 struct sockaddr_in *sin = NULL; 471 struct rdsv3_incoming *inc = NULL; 472 boolean_t nonblock = B_FALSE; 473 474 RDSV3_DPRINTF4("rdsv3_recvmsg", 475 "Enter(rs: %p size: %d msg_flags: 0x%x)", rs, size, msg_flags); 476 477 if ((uio->uio_fmode & (FNDELAY | FNONBLOCK)) || 478 (msg_flags & MSG_DONTWAIT)) 479 nonblock = B_TRUE; 480 481 /* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */ 482 timeo = rdsv3_rcvtimeo(sk, nonblock); 483 484 if (msg_flags & MSG_OOB) 485 goto out; 486 487 /* mark the first cmsg position */ 488 if (msg) { 489 msg->msg_control = NULL; 490 } 491 492 while (1) { 493 /* 494 * If there are pending notifications, do those - 495 * and nothing else 496 */ 497 if (!list_is_empty(&rs->rs_notify_queue)) { 498 ret = rdsv3_notify_queue_get(rs, msg); 499 500 if (msg && msg->msg_namelen) { 501 sin = kmem_zalloc(sizeof (struct sockaddr_in), 502 KM_SLEEP); 503 sin->sin_family = AF_INET_OFFLOAD; 504 if (inc) { 505 sin->sin_port = inc->i_hdr.h_sport; 506 sin->sin_addr.s_addr = inc->i_saddr; 507 } 508 msg->msg_namelen = sizeof (struct sockaddr_in); 509 msg->msg_name = sin; 510 } 511 break; 512 } 513 514 if (rs->rs_cong_notify) { 515 ret = rdsv3_notify_cong(rs, msg); 516 goto out; 517 } 518 519 if (!rdsv3_next_incoming(rs, &inc)) { 520 if (nonblock) { 521 ret = -EAGAIN; 522 break; 523 } 524 525 RDSV3_DPRINTF3("rdsv3_recvmsg", 526 "Before wait (rs: %p)", rs); 527 528 #if 0 529 ret = rdsv3_wait_sig(sk->sk_sleep, 530 !(list_is_empty(&rs->rs_notify_queue) && 531 !rs->rs_cong_notify && 532 !rdsv3_next_incoming(rs, &inc))); 533 if (ret == 0) { 534 /* signal/timeout pending */ 535 RDSV3_DPRINTF2("rdsv3_recvmsg", 536 "woke due to signal"); 537 ret = -ERESTART; 538 } 539 #else 540 mutex_enter(&sk->sk_sleep->waitq_mutex); 541 sk->sk_sleep->waitq_waiters++; 542 while ((list_is_empty(&rs->rs_notify_queue) && 543 !rs->rs_cong_notify && 544 !rdsv3_next_incoming(rs, &inc))) { 545 ret = cv_wait_sig(&sk->sk_sleep->waitq_cv, 546 &sk->sk_sleep->waitq_mutex); 547 if (ret == 0) { 548 /* signal/timeout pending */ 549 RDSV3_DPRINTF2("rdsv3_recvmsg", 550 "woke due to signal"); 551 ret = -ERESTART; 552 break; 553 } 554 } 555 sk->sk_sleep->waitq_waiters--; 556 mutex_exit(&sk->sk_sleep->waitq_mutex); 557 #endif 558 559 RDSV3_DPRINTF5("rdsv3_recvmsg", 560 "recvmsg woke rs: %p inc %p ret %d", 561 rs, inc, -ret); 562 563 if (ret < 0) 564 break; 565 566 /* 567 * if the wakeup was due to rs_notify_queue or 568 * rs_cong_notify then we need to handle those first. 569 */ 570 continue; 571 } 572 573 RDSV3_DPRINTF5("rdsv3_recvmsg", 574 "copying inc %p from %u.%u.%u.%u:%u to user", inc, 575 NIPQUAD(inc->i_conn->c_faddr), 576 ntohs(inc->i_hdr.h_sport)); 577 578 ret = inc->i_conn->c_trans->inc_copy_to_user(inc, uio, size); 579 if (ret < 0) 580 break; 581 582 /* 583 * if the message we just copied isn't at the head of the 584 * recv queue then someone else raced us to return it, try 585 * to get the next message. 586 */ 587 if (!rdsv3_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) { 588 rdsv3_inc_put(inc); 589 inc = NULL; 590 rdsv3_stats_inc(s_recv_deliver_raced); 591 continue; 592 } 593 594 if (ret < ntohl(inc->i_hdr.h_len)) { 595 if (msg_flags & MSG_TRUNC) 596 ret = ntohl(inc->i_hdr.h_len); 597 msg->msg_flags |= MSG_TRUNC; 598 } 599 600 if (rdsv3_cmsg_recv(inc, msg)) { 601 ret = -EFAULT; 602 goto out; 603 } 604 605 rdsv3_stats_inc(s_recv_delivered); 606 607 if (msg->msg_namelen) { 608 sin = kmem_alloc(sizeof (struct sockaddr_in), KM_SLEEP); 609 sin->sin_family = AF_INET_OFFLOAD; 610 sin->sin_port = inc->i_hdr.h_sport; 611 sin->sin_addr.s_addr = inc->i_saddr; 612 (void) memset(sin->sin_zero, 0, 613 sizeof (sin->sin_zero)); 614 msg->msg_namelen = sizeof (struct sockaddr_in); 615 msg->msg_name = sin; 616 } 617 break; 618 } 619 620 if (inc) 621 rdsv3_inc_put(inc); 622 623 out: 624 RDSV3_DPRINTF4("rdsv3_recvmsg", "Return(rs: %p, ret: %d)", rs, ret); 625 626 return (ret); 627 } 628 629 /* 630 * The socket is being shut down and we're asked to drop messages that were 631 * queued for recvmsg. The caller has unbound the socket so the receive path 632 * won't queue any more incoming fragments or messages on the socket. 633 */ 634 void 635 rdsv3_clear_recv_queue(struct rdsv3_sock *rs) 636 { 637 struct rsock *sk = rdsv3_rs_to_sk(rs); 638 struct rdsv3_incoming *inc, *tmp; 639 640 RDSV3_DPRINTF4("rdsv3_clear_recv_queue", "Enter(rs: %p)", rs); 641 642 rw_enter(&rs->rs_recv_lock, RW_WRITER); 643 RDSV3_FOR_EACH_LIST_NODE_SAFE(inc, tmp, &rs->rs_recv_queue, i_item) { 644 rdsv3_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong, 645 -ntohl(inc->i_hdr.h_len), 646 inc->i_hdr.h_dport); 647 list_remove_node(&inc->i_item); 648 rdsv3_inc_put(inc); 649 } 650 rw_exit(&rs->rs_recv_lock); 651 652 RDSV3_DPRINTF4("rdsv3_clear_recv_queue", "Return(rs: %p)", rs); 653 } 654 655 /* 656 * inc->i_saddr isn't used here because it is only set in the receive 657 * path. 658 */ 659 void 660 rdsv3_inc_info_copy(struct rdsv3_incoming *inc, 661 struct rdsv3_info_iterator *iter, 662 uint32_be_t saddr, uint32_be_t daddr, int flip) 663 { 664 struct rdsv3_info_message minfo; 665 666 minfo.seq = ntohll(inc->i_hdr.h_sequence); 667 minfo.len = ntohl(inc->i_hdr.h_len); 668 669 if (flip) { 670 minfo.laddr = daddr; 671 minfo.faddr = saddr; 672 minfo.lport = inc->i_hdr.h_dport; 673 minfo.fport = inc->i_hdr.h_sport; 674 } else { 675 minfo.laddr = saddr; 676 minfo.faddr = daddr; 677 minfo.lport = inc->i_hdr.h_sport; 678 minfo.fport = inc->i_hdr.h_dport; 679 } 680 681 rdsv3_info_copy(iter, &minfo, sizeof (minfo)); 682 } 683