1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved. 23 */ 24 25 /* 26 * Copyright (c) 2006 Oracle. All rights reserved. 27 * 28 * This software is available to you under a choice of one of two 29 * licenses. You may choose to be licensed under the terms of the GNU 30 * General Public License (GPL) Version 2, available from the file 31 * COPYING in the main directory of this source tree, or the 32 * OpenIB.org BSD license below: 33 * 34 * Redistribution and use in source and binary forms, with or 35 * without modification, are permitted provided that the following 36 * conditions are met: 37 * 38 * - Redistributions of source code must retain the above 39 * copyright notice, this list of conditions and the following 40 * disclaimer. 41 * 42 * - Redistributions in binary form must reproduce the above 43 * copyright notice, this list of conditions and the following 44 * disclaimer in the documentation and/or other materials 45 * provided with the distribution. 46 * 47 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 48 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 49 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 50 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 51 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 52 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 53 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 54 * SOFTWARE. 55 * 56 */ 57 #include <sys/rds.h> 58 59 #include <sys/ib/clients/rdsv3/rdsv3.h> 60 #include <sys/ib/clients/rdsv3/rdma.h> 61 #include <sys/ib/clients/rdsv3/ib.h> 62 #include <sys/ib/clients/rdsv3/rdsv3_debug.h> 63 64 static void 65 rdsv3_ib_send_rdma_complete(struct rdsv3_message *rm, 66 int wc_status) 67 { 68 int notify_status; 69 70 RDSV3_DPRINTF4("rdsv3_ib_send_rdma_complete", "rm: %p, wc_status: %d", 71 rm, wc_status); 72 73 switch (wc_status) { 74 case IBT_WC_WR_FLUSHED_ERR: 75 return; 76 77 case IBT_WC_SUCCESS: 78 notify_status = RDSV3_RDMA_SUCCESS; 79 break; 80 81 case IBT_WC_REMOTE_ACCESS_ERR: 82 notify_status = RDSV3_RDMA_REMOTE_ERROR; 83 break; 84 85 default: 86 notify_status = RDSV3_RDMA_OTHER_ERROR; 87 break; 88 } 89 rdsv3_rdma_send_complete(rm, notify_status); 90 91 RDSV3_DPRINTF4("rdsv3_ib_send_rdma_complete", "rm: %p, wc_status: %d", 92 rm, wc_status); 93 } 94 95 static void rdsv3_ib_dma_unmap_sg_rdma(struct ib_device *dev, 96 uint_t num, struct rdsv3_rdma_sg scat[]); 97 98 void 99 rdsv3_ib_send_unmap_rdma(struct rdsv3_ib_connection *ic, 100 struct rdsv3_rdma_op *op) 101 { 102 RDSV3_DPRINTF4("rdsv3_ib_send_unmap_rdma", "ic: %p, op: %p", ic, op); 103 if (op->r_mapped) { 104 op->r_mapped = 0; 105 if (ic->i_cm_id) { 106 rdsv3_ib_dma_unmap_sg_rdma(ic->i_cm_id->device, 107 op->r_nents, op->r_rdma_sg); 108 } else { 109 rdsv3_ib_dma_unmap_sg_rdma((struct ib_device *)NULL, 110 op->r_nents, op->r_rdma_sg); 111 } 112 } 113 } 114 115 static void 116 rdsv3_ib_send_unmap_rm(struct rdsv3_ib_connection *ic, 117 struct rdsv3_ib_send_work *send, 118 int wc_status) 119 { 120 struct rdsv3_message *rm = send->s_rm; 121 122 RDSV3_DPRINTF4("rdsv3_ib_send_unmap_rm", "ic %p send %p rm %p\n", 123 ic, send, rm); 124 125 rdsv3_ib_dma_unmap_sg(ic->i_cm_id->device, 126 rm->m_sg, rm->m_nents); 127 128 if (rm->m_rdma_op != NULL) { 129 rdsv3_ib_send_unmap_rdma(ic, rm->m_rdma_op); 130 131 /* 132 * If the user asked for a completion notification on this 133 * message, we can implement three different semantics: 134 * 1. Notify when we received the ACK on the RDS message 135 * that was queued with the RDMA. This provides reliable 136 * notification of RDMA status at the expense of a one-way 137 * packet delay. 138 * 2. Notify when the IB stack gives us the completion 139 * event for the RDMA operation. 140 * 3. Notify when the IB stack gives us the completion 141 * event for the accompanying RDS messages. 142 * Here, we implement approach #3. To implement approach #2, 143 * call rdsv3_rdma_send_complete from the cq_handler. 144 * To implement #1, 145 * don't call rdsv3_rdma_send_complete at all, and fall back to 146 * the notify 147 * handling in the ACK processing code. 148 * 149 * Note: There's no need to explicitly sync any RDMA buffers 150 * using 151 * ib_dma_sync_sg_for_cpu - the completion for the RDMA 152 * operation itself unmapped the RDMA buffers, which takes care 153 * of synching. 154 */ 155 rdsv3_ib_send_rdma_complete(rm, wc_status); 156 157 if (rm->m_rdma_op->r_write) 158 rdsv3_stats_add(s_send_rdma_bytes, 159 rm->m_rdma_op->r_bytes); 160 else 161 rdsv3_stats_add(s_recv_rdma_bytes, 162 rm->m_rdma_op->r_bytes); 163 } 164 165 /* 166 * If anyone waited for this message to get flushed out, wake 167 * them up now 168 */ 169 rdsv3_message_unmapped(rm); 170 171 rdsv3_message_put(rm); 172 send->s_rm = NULL; 173 } 174 175 void 176 rdsv3_ib_send_init_ring(struct rdsv3_ib_connection *ic) 177 { 178 struct rdsv3_ib_send_work *send; 179 uint32_t i; 180 181 RDSV3_DPRINTF4("rdsv3_ib_send_init_ring", "ic: %p", ic); 182 183 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 184 send->s_rm = NULL; 185 send->s_op = NULL; 186 } 187 } 188 189 void 190 rdsv3_ib_send_clear_ring(struct rdsv3_ib_connection *ic) 191 { 192 struct rdsv3_ib_send_work *send; 193 uint32_t i; 194 195 RDSV3_DPRINTF4("rdsv3_ib_send_clear_ring", "ic: %p", ic); 196 197 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 198 if (send->s_opcode == 0xdd) 199 continue; 200 if (send->s_rm) 201 rdsv3_ib_send_unmap_rm(ic, send, IBT_WC_WR_FLUSHED_ERR); 202 if (send->s_op) 203 rdsv3_ib_send_unmap_rdma(ic, send->s_op); 204 } 205 206 RDSV3_DPRINTF4("rdsv3_ib_send_clear_ring", "Return: ic: %p", ic); 207 } 208 209 /* 210 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc 211 * operations performed in the send path. As the sender allocs and potentially 212 * unallocs the next free entry in the ring it doesn't alter which is 213 * the next to be freed, which is what this is concerned with. 214 */ 215 void 216 rdsv3_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) 217 { 218 struct rdsv3_connection *conn = context; 219 struct rdsv3_ib_connection *ic = conn->c_transport_data; 220 ibt_wc_t wc; 221 struct rdsv3_ib_send_work *send; 222 uint32_t completed, polled; 223 uint32_t oldest; 224 uint32_t i = 0; 225 int ret; 226 227 RDSV3_DPRINTF4("rdsv3_ib_send_cq_comp_handler", "conn: %p cq: %p", 228 conn, cq); 229 230 rdsv3_ib_stats_inc(s_ib_tx_cq_call); 231 ret = ibt_enable_cq_notify(RDSV3_CQ2CQHDL(cq), IBT_NEXT_COMPLETION); 232 if (ret) 233 RDSV3_DPRINTF2("rdsv3_ib_send_cq_comp_handler", 234 "ib_req_notify_cq send failed: %d", ret); 235 236 while (ibt_poll_cq(RDSV3_CQ2CQHDL(cq), &wc, 1, &polled) == 237 IBT_SUCCESS) { 238 RDSV3_DPRINTF5("rdsv3_ib_send_cq_comp_handler", 239 "swc wr_id 0x%llx status %u byte_len %u imm_data %u\n", 240 (unsigned long long)wc.wc_id, wc.wc_status, 241 wc.wc_bytes_xfer, ntohl(wc.wc_immed_data)); 242 rdsv3_ib_stats_inc(s_ib_tx_cq_event); 243 244 if (wc.wc_id == RDSV3_IB_ACK_WR_ID) { 245 if (ic->i_ack_queued + HZ/2 < jiffies) 246 rdsv3_ib_stats_inc(s_ib_tx_stalled); 247 rdsv3_ib_ack_send_complete(ic); 248 continue; 249 } 250 251 oldest = rdsv3_ib_ring_oldest(&ic->i_send_ring); 252 253 completed = rdsv3_ib_ring_completed(&ic->i_send_ring, 254 wc.wc_id, oldest); 255 256 for (i = 0; i < completed; i++) { 257 send = &ic->i_sends[oldest]; 258 259 /* 260 * In the error case, wc.opcode sometimes contains 261 * garbage 262 */ 263 switch (send->s_opcode) { 264 case IBT_WRC_SEND: 265 if (send->s_rm) 266 rdsv3_ib_send_unmap_rm(ic, send, 267 wc.wc_status); 268 break; 269 case IBT_WRC_RDMAW: 270 case IBT_WRC_RDMAR: 271 /* 272 * Nothing to be done - the SG list will 273 * be unmapped 274 * when the SEND completes. 275 */ 276 break; 277 default: 278 #ifndef __lock_lint 279 RDSV3_DPRINTF2("rdsv3_ib_send_cq_comp_handler", 280 "RDS/IB: %s: unexpected opcode " 281 "0x%x in WR!", 282 __func__, send->s_opcode); 283 #endif 284 break; 285 } 286 287 send->s_opcode = 0xdd; 288 if (send->s_queued + HZ/2 < jiffies) 289 rdsv3_ib_stats_inc(s_ib_tx_stalled); 290 291 /* 292 * If a RDMA operation produced an error, signal 293 * this right 294 * away. If we don't, the subsequent SEND that goes 295 * with this 296 * RDMA will be canceled with ERR_WFLUSH, and the 297 * application 298 * never learn that the RDMA failed. 299 */ 300 if (wc.wc_status == 301 IBT_WC_REMOTE_ACCESS_ERR && send->s_op) { 302 struct rdsv3_message *rm; 303 304 rm = rdsv3_send_get_message(conn, send->s_op); 305 if (rm) { 306 if (rm->m_rdma_op != NULL) 307 rdsv3_ib_send_unmap_rdma(ic, 308 rm->m_rdma_op); 309 rdsv3_ib_send_rdma_complete(rm, 310 wc.wc_status); 311 rdsv3_message_put(rm); 312 } 313 } 314 315 oldest = (oldest + 1) % ic->i_send_ring.w_nr; 316 } 317 318 RDSV3_DPRINTF4("rdsv3_ib_send_cq_comp_handler", "compl: %d", 319 completed); 320 rdsv3_ib_ring_free(&ic->i_send_ring, completed); 321 322 if (test_and_clear_bit(RDSV3_LL_SEND_FULL, &conn->c_flags) || 323 test_bit(0, &conn->c_map_queued)) 324 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_send_w, 0); 325 326 /* We expect errors as the qp is drained during shutdown */ 327 if (wc.wc_status != IBT_WC_SUCCESS && rdsv3_conn_up(conn)) { 328 RDSV3_DPRINTF2("rdsv3_ib_send_cq_comp_handler", 329 "send completion on %u.%u.%u.%u " 330 "had status %u, disconnecting and reconnecting\n", 331 NIPQUAD(conn->c_faddr), wc.wc_status); 332 rdsv3_conn_drop(conn); 333 } 334 } 335 336 RDSV3_DPRINTF4("rdsv3_ib_send_cq_comp_handler", 337 "Return: conn: %p, cq: %p", conn, cq); 338 } 339 340 /* 341 * This is the main function for allocating credits when sending 342 * messages. 343 * 344 * Conceptually, we have two counters: 345 * - send credits: this tells us how many WRs we're allowed 346 * to submit without overruning the reciever's queue. For 347 * each SEND WR we post, we decrement this by one. 348 * 349 * - posted credits: this tells us how many WRs we recently 350 * posted to the receive queue. This value is transferred 351 * to the peer as a "credit update" in a RDS header field. 352 * Every time we transmit credits to the peer, we subtract 353 * the amount of transferred credits from this counter. 354 * 355 * It is essential that we avoid situations where both sides have 356 * exhausted their send credits, and are unable to send new credits 357 * to the peer. We achieve this by requiring that we send at least 358 * one credit update to the peer before exhausting our credits. 359 * When new credits arrive, we subtract one credit that is withheld 360 * until we've posted new buffers and are ready to transmit these 361 * credits (see rdsv3_ib_send_add_credits below). 362 * 363 * The RDS send code is essentially single-threaded; rdsv3_send_xmit 364 * grabs c_send_lock to ensure exclusive access to the send ring. 365 * However, the ACK sending code is independent and can race with 366 * message SENDs. 367 * 368 * In the send path, we need to update the counters for send credits 369 * and the counter of posted buffers atomically - when we use the 370 * last available credit, we cannot allow another thread to race us 371 * and grab the posted credits counter. Hence, we have to use a 372 * spinlock to protect the credit counter, or use atomics. 373 * 374 * Spinlocks shared between the send and the receive path are bad, 375 * because they create unnecessary delays. An early implementation 376 * using a spinlock showed a 5% degradation in throughput at some 377 * loads. 378 * 379 * This implementation avoids spinlocks completely, putting both 380 * counters into a single atomic, and updating that atomic using 381 * atomic_add (in the receive path, when receiving fresh credits), 382 * and using atomic_cmpxchg when updating the two counters. 383 */ 384 int 385 rdsv3_ib_send_grab_credits(struct rdsv3_ib_connection *ic, 386 uint32_t wanted, uint32_t *adv_credits, int need_posted, int max_posted) 387 { 388 unsigned int avail, posted, got = 0, advertise; 389 long oldval, newval; 390 391 RDSV3_DPRINTF4("rdsv3_ib_send_grab_credits", "ic: %p, %d %d %d %d", 392 ic, wanted, *adv_credits, need_posted, max_posted); 393 394 *adv_credits = 0; 395 if (!ic->i_flowctl) 396 return (wanted); 397 398 try_again: 399 advertise = 0; 400 oldval = newval = atomic_get(&ic->i_credits); 401 posted = IB_GET_POST_CREDITS(oldval); 402 avail = IB_GET_SEND_CREDITS(oldval); 403 404 RDSV3_DPRINTF5("rdsv3_ib_send_grab_credits", 405 "wanted (%u): credits=%u posted=%u\n", wanted, avail, posted); 406 407 /* The last credit must be used to send a credit update. */ 408 if (avail && !posted) 409 avail--; 410 411 if (avail < wanted) { 412 struct rdsv3_connection *conn = ic->i_cm_id->context; 413 414 /* Oops, there aren't that many credits left! */ 415 set_bit(RDSV3_LL_SEND_FULL, &conn->c_flags); 416 got = avail; 417 } else { 418 /* Sometimes you get what you want, lalala. */ 419 got = wanted; 420 } 421 newval -= IB_SET_SEND_CREDITS(got); 422 423 /* 424 * If need_posted is non-zero, then the caller wants 425 * the posted regardless of whether any send credits are 426 * available. 427 */ 428 if (posted && (got || need_posted)) { 429 advertise = min(posted, max_posted); 430 newval -= IB_SET_POST_CREDITS(advertise); 431 } 432 433 /* Finally bill everything */ 434 if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval) 435 goto try_again; 436 437 *adv_credits = advertise; 438 439 RDSV3_DPRINTF4("rdsv3_ib_send_grab_credits", "ic: %p, %d %d %d %d", 440 ic, got, *adv_credits, need_posted, max_posted); 441 return (got); 442 } 443 444 void 445 rdsv3_ib_send_add_credits(struct rdsv3_connection *conn, unsigned int credits) 446 { 447 struct rdsv3_ib_connection *ic = conn->c_transport_data; 448 449 if (credits == 0) 450 return; 451 452 RDSV3_DPRINTF5("rdsv3_ib_send_add_credits", 453 "credits (%u): current=%u%s\n", 454 credits, 455 IB_GET_SEND_CREDITS(atomic_get(&ic->i_credits)), 456 test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags) ? 457 ", ll_send_full" : ""); 458 459 atomic_add_32(&ic->i_credits, IB_SET_SEND_CREDITS(credits)); 460 if (test_and_clear_bit(RDSV3_LL_SEND_FULL, &conn->c_flags)) 461 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_send_w, 0); 462 463 ASSERT(!(IB_GET_SEND_CREDITS(credits) >= 16384)); 464 465 rdsv3_ib_stats_inc(s_ib_rx_credit_updates); 466 467 RDSV3_DPRINTF4("rdsv3_ib_send_add_credits", 468 "Return: conn: %p, credits: %d", 469 conn, credits); 470 } 471 472 void 473 rdsv3_ib_advertise_credits(struct rdsv3_connection *conn, unsigned int posted) 474 { 475 struct rdsv3_ib_connection *ic = conn->c_transport_data; 476 477 RDSV3_DPRINTF4("rdsv3_ib_advertise_credits", "conn: %p, posted: %d", 478 conn, posted); 479 480 if (posted == 0) 481 return; 482 483 atomic_add_32(&ic->i_credits, IB_SET_POST_CREDITS(posted)); 484 485 /* 486 * Decide whether to send an update to the peer now. 487 * If we would send a credit update for every single buffer we 488 * post, we would end up with an ACK storm (ACK arrives, 489 * consumes buffer, we refill the ring, send ACK to remote 490 * advertising the newly posted buffer... ad inf) 491 * 492 * Performance pretty much depends on how often we send 493 * credit updates - too frequent updates mean lots of ACKs. 494 * Too infrequent updates, and the peer will run out of 495 * credits and has to throttle. 496 * For the time being, 16 seems to be a good compromise. 497 */ 498 if (IB_GET_POST_CREDITS(atomic_get(&ic->i_credits)) >= 16) 499 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 500 } 501 502 static inline void 503 rdsv3_ib_xmit_populate_wr(struct rdsv3_ib_connection *ic, 504 ibt_send_wr_t *wr, unsigned int pos, 505 struct rdsv3_scatterlist *scat, unsigned int off, unsigned int length, 506 int send_flags) 507 { 508 ibt_wr_ds_t *sge; 509 510 RDSV3_DPRINTF4("rdsv3_ib_xmit_populate_wr", 511 "ic: %p, wr: %p scat: %p %d %d %d %d", 512 ic, wr, scat, pos, off, length, send_flags); 513 514 wr->wr_id = pos; 515 wr->wr_trans = IBT_RC_SRV; 516 wr->wr_flags = send_flags; 517 wr->wr_opcode = IBT_WRC_SEND; 518 519 if (length != 0) { 520 int ix, len, assigned; 521 ibt_wr_ds_t *sgl; 522 523 ASSERT(length <= scat->length - off); 524 525 sgl = scat->sgl; 526 if (off != 0) { 527 /* find the right sgl to begin with */ 528 while (sgl->ds_len <= off) { 529 off -= sgl->ds_len; 530 sgl++; 531 } 532 } 533 534 ix = 1; /* first data sgl is at 1 */ 535 assigned = 0; 536 len = length; 537 do { 538 sge = &wr->wr_sgl[ix++]; 539 sge->ds_va = sgl->ds_va + off; 540 assigned = min(len, sgl->ds_len - off); 541 sge->ds_len = assigned; 542 sge->ds_key = sgl->ds_key; 543 len -= assigned; 544 if (len != 0) { 545 sgl++; 546 off = 0; 547 } 548 } while (len > 0); 549 550 wr->wr_nds = ix; 551 } else { 552 /* 553 * We're sending a packet with no payload. There is only 554 * one SGE 555 */ 556 wr->wr_nds = 1; 557 } 558 559 sge = &wr->wr_sgl[0]; 560 sge->ds_va = ic->i_send_hdrs_dma + (pos * sizeof (struct rdsv3_header)); 561 sge->ds_len = sizeof (struct rdsv3_header); 562 sge->ds_key = ic->i_mr->lkey; 563 564 RDSV3_DPRINTF4("rdsv3_ib_xmit_populate_wr", 565 "Return: ic: %p, wr: %p scat: %p", ic, wr, scat); 566 } 567 568 /* 569 * This can be called multiple times for a given message. The first time 570 * we see a message we map its scatterlist into the IB device so that 571 * we can provide that mapped address to the IB scatter gather entries 572 * in the IB work requests. We translate the scatterlist into a series 573 * of work requests that fragment the message. These work requests complete 574 * in order so we pass ownership of the message to the completion handler 575 * once we send the final fragment. 576 * 577 * The RDS core uses the c_send_lock to only enter this function once 578 * per connection. This makes sure that the tx ring alloc/unalloc pairs 579 * don't get out of sync and confuse the ring. 580 */ 581 int 582 rdsv3_ib_xmit(struct rdsv3_connection *conn, struct rdsv3_message *rm, 583 unsigned int hdr_off, unsigned int sg, unsigned int off) 584 { 585 struct rdsv3_ib_connection *ic = conn->c_transport_data; 586 struct ib_device *dev = ic->i_cm_id->device; 587 struct rdsv3_ib_send_work *send = NULL; 588 struct rdsv3_ib_send_work *first; 589 struct rdsv3_ib_send_work *prev; 590 ibt_send_wr_t *wr; 591 struct rdsv3_scatterlist *scat; 592 uint32_t pos; 593 uint32_t i; 594 uint32_t work_alloc; 595 uint32_t credit_alloc; 596 uint32_t posted; 597 uint32_t adv_credits = 0; 598 int send_flags = 0; 599 int sent; 600 int ret; 601 int flow_controlled = 0; 602 603 RDSV3_DPRINTF4("rdsv3_ib_xmit", "conn: %p, rm: %p", conn, rm); 604 605 ASSERT(!(off % RDSV3_FRAG_SIZE)); 606 ASSERT(!(hdr_off != 0 && hdr_off != sizeof (struct rdsv3_header))); 607 608 /* Do not send cong updates to IB loopback */ 609 if (conn->c_loopback && 610 rm->m_inc.i_hdr.h_flags & RDSV3_FLAG_CONG_BITMAP) { 611 rdsv3_cong_map_updated(conn->c_fcong, ~(uint64_t)0); 612 return (sizeof (struct rdsv3_header) + RDSV3_CONG_MAP_BYTES); 613 } 614 615 #ifndef __lock_lint 616 /* FIXME we may overallocate here */ 617 if (ntohl(rm->m_inc.i_hdr.h_len) == 0) 618 i = 1; 619 else 620 i = ceil(ntohl(rm->m_inc.i_hdr.h_len), RDSV3_FRAG_SIZE); 621 #endif 622 623 work_alloc = rdsv3_ib_ring_alloc(&ic->i_send_ring, i, &pos); 624 if (work_alloc == 0) { 625 set_bit(RDSV3_LL_SEND_FULL, &conn->c_flags); 626 rdsv3_ib_stats_inc(s_ib_tx_ring_full); 627 ret = -ENOMEM; 628 goto out; 629 } 630 631 credit_alloc = work_alloc; 632 if (ic->i_flowctl) { 633 credit_alloc = rdsv3_ib_send_grab_credits(ic, work_alloc, 634 &posted, 0, RDSV3_MAX_ADV_CREDIT); 635 adv_credits += posted; 636 if (credit_alloc < work_alloc) { 637 rdsv3_ib_ring_unalloc(&ic->i_send_ring, 638 work_alloc - credit_alloc); 639 work_alloc = credit_alloc; 640 flow_controlled++; 641 } 642 if (work_alloc == 0) { 643 set_bit(RDSV3_LL_SEND_FULL, &conn->c_flags); 644 rdsv3_ib_stats_inc(s_ib_tx_throttle); 645 ret = -ENOMEM; 646 goto out; 647 } 648 } 649 650 /* map the message the first time we see it */ 651 if (ic->i_rm == NULL) { 652 /* 653 * printk(KERN_NOTICE 654 * "rdsv3_ib_xmit prep msg dport=%u flags=0x%x len=%d\n", 655 * be16_to_cpu(rm->m_inc.i_hdr.h_dport), 656 * rm->m_inc.i_hdr.h_flags, 657 * be32_to_cpu(rm->m_inc.i_hdr.h_len)); 658 */ 659 if (rm->m_nents) { 660 rm->m_count = rdsv3_ib_dma_map_sg(dev, 661 rm->m_sg, rm->m_nents); 662 RDSV3_DPRINTF5("rdsv3_ib_xmit", 663 "ic %p mapping rm %p: %d\n", ic, rm, rm->m_count); 664 if (rm->m_count == 0) { 665 rdsv3_ib_stats_inc(s_ib_tx_sg_mapping_failure); 666 rdsv3_ib_ring_unalloc(&ic->i_send_ring, 667 work_alloc); 668 ret = -ENOMEM; /* XXX ? */ 669 RDSV3_DPRINTF2("rdsv3_ib_xmit", 670 "fail: ic %p mapping rm %p: %d\n", 671 ic, rm, rm->m_count); 672 goto out; 673 } 674 } else { 675 rm->m_count = 0; 676 } 677 678 ic->i_unsignaled_wrs = rdsv3_ib_sysctl_max_unsig_wrs; 679 ic->i_unsignaled_bytes = rdsv3_ib_sysctl_max_unsig_bytes; 680 rdsv3_message_addref(rm); 681 ic->i_rm = rm; 682 683 /* Finalize the header */ 684 if (test_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags)) 685 rm->m_inc.i_hdr.h_flags |= RDSV3_FLAG_ACK_REQUIRED; 686 if (test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags)) 687 rm->m_inc.i_hdr.h_flags |= RDSV3_FLAG_RETRANSMITTED; 688 689 /* 690 * If it has a RDMA op, tell the peer we did it. This is 691 * used by the peer to release use-once RDMA MRs. 692 */ 693 if (rm->m_rdma_op) { 694 struct rdsv3_ext_header_rdma ext_hdr; 695 696 ext_hdr.h_rdma_rkey = htonl(rm->m_rdma_op->r_key); 697 (void) rdsv3_message_add_extension(&rm->m_inc.i_hdr, 698 RDSV3_EXTHDR_RDMA, &ext_hdr, 699 sizeof (ext_hdr)); 700 } 701 if (rm->m_rdma_cookie) { 702 (void) rdsv3_message_add_rdma_dest_extension( 703 &rm->m_inc.i_hdr, 704 rdsv3_rdma_cookie_key(rm->m_rdma_cookie), 705 rdsv3_rdma_cookie_offset(rm->m_rdma_cookie)); 706 } 707 708 /* 709 * Note - rdsv3_ib_piggyb_ack clears the ACK_REQUIRED bit, so 710 * we should not do this unless we have a chance of at least 711 * sticking the header into the send ring. Which is why we 712 * should call rdsv3_ib_ring_alloc first. 713 */ 714 rm->m_inc.i_hdr.h_ack = htonll(rdsv3_ib_piggyb_ack(ic)); 715 rdsv3_message_make_checksum(&rm->m_inc.i_hdr); 716 717 /* 718 * Update adv_credits since we reset the ACK_REQUIRED bit. 719 */ 720 (void) rdsv3_ib_send_grab_credits(ic, 0, &posted, 1, 721 RDSV3_MAX_ADV_CREDIT - adv_credits); 722 adv_credits += posted; 723 ASSERT(adv_credits <= 255); 724 } else if (ic->i_rm != rm) 725 RDSV3_PANIC(); 726 727 send = &ic->i_sends[pos]; 728 first = send; 729 prev = NULL; 730 scat = &rm->m_sg[sg]; 731 sent = 0; 732 i = 0; 733 734 /* 735 * Sometimes you want to put a fence between an RDMA 736 * READ and the following SEND. 737 * We could either do this all the time 738 * or when requested by the user. Right now, we let 739 * the application choose. 740 */ 741 if (rm->m_rdma_op && rm->m_rdma_op->r_fence) 742 send_flags = IBT_WR_SEND_FENCE; 743 744 /* 745 * We could be copying the header into the unused tail of the page. 746 * That would need to be changed in the future when those pages might 747 * be mapped userspace pages or page cache pages. So instead we always 748 * use a second sge and our long-lived ring of mapped headers. We send 749 * the header after the data so that the data payload can be aligned on 750 * the receiver. 751 */ 752 753 /* handle a 0-len message */ 754 if (ntohl(rm->m_inc.i_hdr.h_len) == 0) { 755 wr = &ic->i_send_wrs[0]; 756 rdsv3_ib_xmit_populate_wr(ic, wr, pos, NULL, 0, 0, send_flags); 757 send->s_queued = jiffies; 758 send->s_op = NULL; 759 send->s_opcode = wr->wr_opcode; 760 goto add_header; 761 } 762 763 /* if there's data reference it with a chain of work reqs */ 764 for (; i < work_alloc && scat != &rm->m_sg[rm->m_count]; i++) { 765 unsigned int len; 766 767 send = &ic->i_sends[pos]; 768 769 wr = &ic->i_send_wrs[i]; 770 len = min(RDSV3_FRAG_SIZE, 771 rdsv3_ib_sg_dma_len(dev, scat) - off); 772 rdsv3_ib_xmit_populate_wr(ic, wr, pos, scat, off, len, 773 send_flags); 774 send->s_queued = jiffies; 775 send->s_op = NULL; 776 send->s_opcode = wr->wr_opcode; 777 778 /* 779 * We want to delay signaling completions just enough to get 780 * the batching benefits but not so much that we create dead 781 * time 782 * on the wire. 783 */ 784 if (ic->i_unsignaled_wrs-- == 0) { 785 ic->i_unsignaled_wrs = rdsv3_ib_sysctl_max_unsig_wrs; 786 wr->wr_flags |= 787 IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT; 788 } 789 790 ic->i_unsignaled_bytes -= len; 791 if (ic->i_unsignaled_bytes <= 0) { 792 ic->i_unsignaled_bytes = 793 rdsv3_ib_sysctl_max_unsig_bytes; 794 wr->wr_flags |= 795 IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT; 796 } 797 798 /* 799 * Always signal the last one if we're stopping due to flow 800 * control. 801 */ 802 if (flow_controlled && i == (work_alloc-1)) { 803 wr->wr_flags |= 804 IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT; 805 } 806 807 RDSV3_DPRINTF5("rdsv3_ib_xmit", "send %p wr %p num_sge %u \n", 808 send, wr, wr->wr_nds); 809 810 sent += len; 811 off += len; 812 if (off == rdsv3_ib_sg_dma_len(dev, scat)) { 813 scat++; 814 off = 0; 815 } 816 817 add_header: 818 /* 819 * Tack on the header after the data. The header SGE 820 * should already 821 * have been set up to point to the right header buffer. 822 */ 823 (void) memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, 824 sizeof (struct rdsv3_header)); 825 826 if (0) { 827 struct rdsv3_header *hdr = &ic->i_send_hdrs[pos]; 828 829 RDSV3_DPRINTF2("rdsv3_ib_xmit", 830 "send WR dport=%u flags=0x%x len=%d", 831 ntohs(hdr->h_dport), 832 hdr->h_flags, 833 ntohl(hdr->h_len)); 834 } 835 if (adv_credits) { 836 struct rdsv3_header *hdr = &ic->i_send_hdrs[pos]; 837 838 /* add credit and redo the header checksum */ 839 hdr->h_credit = adv_credits; 840 rdsv3_message_make_checksum(hdr); 841 adv_credits = 0; 842 rdsv3_ib_stats_inc(s_ib_tx_credit_updates); 843 } 844 845 prev = send; 846 847 pos = (pos + 1) % ic->i_send_ring.w_nr; 848 } 849 850 /* 851 * Account the RDS header in the number of bytes we sent, but just once. 852 * The caller has no concept of fragmentation. 853 */ 854 if (hdr_off == 0) 855 sent += sizeof (struct rdsv3_header); 856 857 /* if we finished the message then send completion owns it */ 858 if (scat == &rm->m_sg[rm->m_count]) { 859 prev->s_rm = ic->i_rm; 860 wr->wr_flags |= IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT; 861 ic->i_rm = NULL; 862 } 863 864 if (i < work_alloc) { 865 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i); 866 work_alloc = i; 867 } 868 if (ic->i_flowctl && i < credit_alloc) 869 rdsv3_ib_send_add_credits(conn, credit_alloc - i); 870 871 /* XXX need to worry about failed_wr and partial sends. */ 872 ret = ibt_post_send(ib_get_ibt_channel_hdl(ic->i_cm_id), 873 ic->i_send_wrs, i, &posted); 874 if (posted != i) { 875 RDSV3_DPRINTF2("rdsv3_ib_xmit", 876 "ic %p first %p nwr: %d ret %d:%d", 877 ic, first, i, ret, posted); 878 } 879 if (ret) { 880 RDSV3_DPRINTF2("rdsv3_ib_xmit", 881 "RDS/IB: ib_post_send to %u.%u.%u.%u " 882 "returned %d\n", NIPQUAD(conn->c_faddr), ret); 883 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 884 if (prev->s_rm) { 885 ic->i_rm = prev->s_rm; 886 prev->s_rm = NULL; 887 } 888 #if 1 889 RDSV3_DPRINTF2("rdsv3_ib_xmit", "ibt_post_send FAIL"); 890 ret = -EAGAIN; 891 #else 892 /* Finesse this later */ 893 RDSV3_PANIC(); 894 #endif 895 goto out; 896 } 897 898 ret = sent; 899 900 RDSV3_DPRINTF4("rdsv3_ib_xmit", "Return: conn: %p, rm: %p", conn, rm); 901 out: 902 ASSERT(!adv_credits); 903 return (ret); 904 } 905 906 static void 907 rdsv3_ib_dma_unmap_sg_rdma(struct ib_device *dev, uint_t num, 908 struct rdsv3_rdma_sg scat[]) 909 { 910 ibt_hca_hdl_t hca_hdl; 911 int i; 912 int num_sgl; 913 914 RDSV3_DPRINTF4("rdsv3_ib_dma_unmap_sg", "rdma_sg: %p", scat); 915 916 if (dev) { 917 hca_hdl = ib_get_ibt_hca_hdl(dev); 918 } else { 919 hca_hdl = scat[0].hca_hdl; 920 RDSV3_DPRINTF2("rdsv3_ib_dma_unmap_sg_rdma", 921 "NULL dev use cached hca_hdl %p", hca_hdl); 922 } 923 924 if (hca_hdl == NULL) 925 return; 926 scat[0].hca_hdl = NULL; 927 928 for (i = 0; i < num; i++) { 929 if (scat[i].mihdl != NULL) { 930 num_sgl = (scat[i].iovec.bytes / PAGESIZE) + 2; 931 kmem_free(scat[i].swr.wr_sgl, 932 (num_sgl * sizeof (ibt_wr_ds_t))); 933 scat[i].swr.wr_sgl = NULL; 934 (void) ibt_unmap_mem_iov(hca_hdl, scat[i].mihdl); 935 scat[i].mihdl = NULL; 936 } else 937 break; 938 } 939 } 940 941 /* ARGSUSED */ 942 uint_t 943 rdsv3_ib_dma_map_sg_rdma(struct ib_device *dev, struct rdsv3_rdma_sg scat[], 944 uint_t num, struct rdsv3_scatterlist **scatl) 945 { 946 ibt_hca_hdl_t hca_hdl; 947 ibt_iov_attr_t iov_attr; 948 struct buf *bp; 949 uint_t i, j, k; 950 uint_t count; 951 struct rdsv3_scatterlist *sg; 952 int ret; 953 954 RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma", "scat: %p, num: %d", 955 scat, num); 956 957 hca_hdl = ib_get_ibt_hca_hdl(dev); 958 scat[0].hca_hdl = hca_hdl; 959 bzero(&iov_attr, sizeof (ibt_iov_attr_t)); 960 iov_attr.iov_flags = IBT_IOV_BUF; 961 iov_attr.iov_lso_hdr_sz = 0; 962 963 for (i = 0, count = 0; i < num; i++) { 964 /* transpose umem_cookie to buf structure */ 965 bp = ddi_umem_iosetup(scat[i].umem_cookie, 966 scat[i].iovec.addr & PAGEOFFSET, scat[i].iovec.bytes, 967 B_WRITE, 0, 0, NULL, DDI_UMEM_SLEEP); 968 if (bp == NULL) { 969 /* free resources and return error */ 970 goto out; 971 } 972 /* setup ibt_map_mem_iov() attributes */ 973 iov_attr.iov_buf = bp; 974 iov_attr.iov_wr_nds = (scat[i].iovec.bytes / PAGESIZE) + 2; 975 scat[i].swr.wr_sgl = 976 kmem_zalloc(iov_attr.iov_wr_nds * sizeof (ibt_wr_ds_t), 977 KM_SLEEP); 978 979 ret = ibt_map_mem_iov(hca_hdl, &iov_attr, 980 (ibt_all_wr_t *)&scat[i].swr, &scat[i].mihdl); 981 freerbuf(bp); 982 if (ret != IBT_SUCCESS) { 983 RDSV3_DPRINTF2("rdsv3_ib_dma_map_sg_rdma", 984 "ibt_map_mem_iov returned: %d", ret); 985 /* free resources and return error */ 986 kmem_free(scat[i].swr.wr_sgl, 987 iov_attr.iov_wr_nds * sizeof (ibt_wr_ds_t)); 988 goto out; 989 } 990 count += scat[i].swr.wr_nds; 991 992 #ifdef DEBUG 993 for (j = 0; j < scat[i].swr.wr_nds; j++) { 994 RDSV3_DPRINTF5("rdsv3_ib_dma_map_sg_rdma", 995 "sgl[%d] va %llx len %x", j, 996 scat[i].swr.wr_sgl[j].ds_va, 997 scat[i].swr.wr_sgl[j].ds_len); 998 } 999 #endif 1000 RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma", 1001 "iovec.bytes: 0x%x scat[%d]swr.wr_nds: %d", 1002 scat[i].iovec.bytes, i, scat[i].swr.wr_nds); 1003 } 1004 1005 count = ((count - 1) / RDSV3_IB_MAX_SGE) + 1; 1006 RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma", "Ret: num: %d", count); 1007 return (count); 1008 1009 out: 1010 rdsv3_ib_dma_unmap_sg_rdma(dev, num, scat); 1011 return (0); 1012 } 1013 1014 int 1015 rdsv3_ib_xmit_rdma(struct rdsv3_connection *conn, struct rdsv3_rdma_op *op) 1016 { 1017 struct rdsv3_ib_connection *ic = conn->c_transport_data; 1018 struct rdsv3_ib_send_work *send = NULL; 1019 struct rdsv3_rdma_sg *scat; 1020 uint64_t remote_addr; 1021 uint32_t pos; 1022 uint32_t work_alloc; 1023 uint32_t i, j, k, idx; 1024 uint32_t left, count; 1025 uint32_t posted; 1026 int sent; 1027 ibt_status_t status; 1028 ibt_send_wr_t *wr; 1029 ibt_wr_ds_t *sge; 1030 1031 RDSV3_DPRINTF4("rdsv3_ib_xmit_rdma", "rdsv3_ib_conn: %p", ic); 1032 1033 /* map the message the first time we see it */ 1034 if (!op->r_mapped) { 1035 op->r_count = rdsv3_ib_dma_map_sg_rdma(ic->i_cm_id->device, 1036 op->r_rdma_sg, op->r_nents, &op->r_sg); 1037 RDSV3_DPRINTF5("rdsv3_ib_xmit_rdma", "ic %p mapping op %p: %d", 1038 ic, op, op->r_count); 1039 if (op->r_count == 0) { 1040 rdsv3_ib_stats_inc(s_ib_tx_sg_mapping_failure); 1041 RDSV3_DPRINTF2("rdsv3_ib_xmit_rdma", 1042 "fail: ic %p mapping op %p: %d", 1043 ic, op, op->r_count); 1044 return (-ENOMEM); /* XXX ? */ 1045 } 1046 op->r_mapped = 1; 1047 } 1048 1049 /* 1050 * Instead of knowing how to return a partial rdma read/write 1051 * we insist that there 1052 * be enough work requests to send the entire message. 1053 */ 1054 work_alloc = rdsv3_ib_ring_alloc(&ic->i_send_ring, op->r_count, &pos); 1055 if (work_alloc != op->r_count) { 1056 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 1057 rdsv3_ib_stats_inc(s_ib_tx_ring_full); 1058 return (-ENOMEM); 1059 } 1060 1061 /* 1062 * take the scatter list and transpose into a list of 1063 * send wr's each with a scatter list of RDSV3_IB_MAX_SGE 1064 */ 1065 scat = &op->r_rdma_sg[0]; 1066 sent = 0; 1067 remote_addr = op->r_remote_addr; 1068 1069 for (i = 0, k = 0; i < op->r_nents; i++) { 1070 left = scat[i].swr.wr_nds; 1071 for (idx = 0; left > 0; k++) { 1072 send = &ic->i_sends[pos]; 1073 send->s_queued = jiffies; 1074 send->s_opcode = op->r_write ? IBT_WRC_RDMAW : 1075 IBT_WRC_RDMAR; 1076 send->s_op = op; 1077 1078 wr = &ic->i_send_wrs[k]; 1079 wr->wr_flags = 0; 1080 wr->wr_id = pos; 1081 wr->wr_trans = IBT_RC_SRV; 1082 wr->wr_opcode = op->r_write ? IBT_WRC_RDMAW : 1083 IBT_WRC_RDMAR; 1084 wr->wr.rc.rcwr.rdma.rdma_raddr = remote_addr; 1085 wr->wr.rc.rcwr.rdma.rdma_rkey = op->r_key; 1086 1087 if (left > RDSV3_IB_MAX_SGE) { 1088 count = RDSV3_IB_MAX_SGE; 1089 left -= RDSV3_IB_MAX_SGE; 1090 } else { 1091 count = left; 1092 left = 0; 1093 } 1094 wr->wr_nds = count; 1095 1096 for (j = 0; j < count; j++) { 1097 sge = &wr->wr_sgl[j]; 1098 *sge = scat[i].swr.wr_sgl[idx]; 1099 remote_addr += scat[i].swr.wr_sgl[idx].ds_len; 1100 sent += scat[i].swr.wr_sgl[idx].ds_len; 1101 idx++; 1102 RDSV3_DPRINTF4("xmit_rdma", 1103 "send_wrs[%d]sgl[%d] va %llx len %x", 1104 k, j, sge->ds_va, sge->ds_len); 1105 } 1106 RDSV3_DPRINTF4("rdsv3_ib_xmit_rdma", 1107 "wr[%d] %p key: %x code: %d tlen: %d", 1108 k, wr, wr->wr.rc.rcwr.rdma.rdma_rkey, 1109 wr->wr_opcode, sent); 1110 1111 /* 1112 * We want to delay signaling completions just enough 1113 * to get the batching benefits but not so much that 1114 * we create dead time on the wire. 1115 */ 1116 if (ic->i_unsignaled_wrs-- == 0) { 1117 ic->i_unsignaled_wrs = 1118 rdsv3_ib_sysctl_max_unsig_wrs; 1119 wr->wr_flags = IBT_WR_SEND_SIGNAL; 1120 } 1121 1122 pos = (pos + 1) % ic->i_send_ring.w_nr; 1123 } 1124 } 1125 1126 status = ibt_post_send(ib_get_ibt_channel_hdl(ic->i_cm_id), 1127 ic->i_send_wrs, k, &posted); 1128 if (status != IBT_SUCCESS) { 1129 RDSV3_DPRINTF2("rdsv3_ib_xmit_rdma", 1130 "RDS/IB: rdma ib_post_send returned %d", status); 1131 rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 1132 } 1133 return (status); 1134 } 1135 1136 void 1137 rdsv3_ib_xmit_complete(struct rdsv3_connection *conn) 1138 { 1139 struct rdsv3_ib_connection *ic = conn->c_transport_data; 1140 1141 RDSV3_DPRINTF4("rdsv3_ib_xmit_complete", "conn: %p", conn); 1142 1143 /* 1144 * We may have a pending ACK or window update we were unable 1145 * to send previously (due to flow control). Try again. 1146 */ 1147 rdsv3_ib_attempt_ack(ic); 1148 } 1149