xref: /illumos-gate/usr/src/uts/common/io/ib/clients/rdsv3/ib_recv.c (revision d2b539e744e90927cf7a57df3475145c279d68f9)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
23  */
24 
25 /*
26  * Copyright (c) 2006 Oracle.  All rights reserved.
27  *
28  * This software is available to you under a choice of one of two
29  * licenses.  You may choose to be licensed under the terms of the GNU
30  * General Public License (GPL) Version 2, available from the file
31  * COPYING in the main directory of this source tree, or the
32  * OpenIB.org BSD license below:
33  *
34  *     Redistribution and use in source and binary forms, with or
35  *     without modification, are permitted provided that the following
36  *     conditions are met:
37  *
38  *      - Redistributions of source code must retain the above
39  *        copyright notice, this list of conditions and the following
40  *        disclaimer.
41  *
42  *      - Redistributions in binary form must reproduce the above
43  *        copyright notice, this list of conditions and the following
44  *        disclaimer in the documentation and/or other materials
45  *        provided with the distribution.
46  *
47  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
48  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
49  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
50  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
51  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
52  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
53  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
54  * SOFTWARE.
55  *
56  */
57 #include <sys/types.h>
58 #include <sys/kmem.h>
59 #include <sys/cpuvar.h>
60 #include <sys/rds.h>
61 
62 #include <sys/ib/clients/rdsv3/rdsv3.h>
63 #include <sys/ib/clients/rdsv3/ib.h>
64 #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
65 
66 static struct kmem_cache *rdsv3_ib_incoming_slab;
67 static atomic_t	rdsv3_ib_allocation = ATOMIC_INIT(0);
68 
69 void
70 rdsv3_ib_recv_init_ring(struct rdsv3_ib_connection *ic)
71 {
72 	struct rdsv3_ib_recv_work *recv;
73 	struct rdsv3_header *hdrp;
74 	uint32_t i;
75 
76 	RDSV3_DPRINTF4("rdsv3_ib_recv_init_ring", "ic: %p", ic);
77 
78 	hdrp = ic->i_recv_hdrs;
79 	for (i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
80 		recv->r_ibinc = NULL;
81 		recv->r_frag = NULL;
82 
83 		/* initialize the hdr sgl permanently */
84 		recv->r_sge[0].ds_va = (ib_vaddr_t)(uintptr_t)hdrp++;
85 		recv->r_sge[0].ds_len = sizeof (struct rdsv3_header);
86 		recv->r_sge[0].ds_key = ic->i_mr->lkey;
87 	}
88 }
89 
90 static void
91 rdsv3_ib_recv_clear_one(struct rdsv3_ib_connection *ic,
92     struct rdsv3_ib_recv_work *recv)
93 {
94 	RDSV3_DPRINTF4("rdsv3_ib_recv_clear_one", "ic: %p, recv: %p",
95 	    ic, recv);
96 
97 	if (recv->r_ibinc) {
98 		rdsv3_inc_put(&recv->r_ibinc->ii_inc);
99 		recv->r_ibinc = NULL;
100 	}
101 	if (recv->r_frag) {
102 		kmem_cache_free(ic->rds_ibdev->ib_frag_slab, recv->r_frag);
103 		recv->r_frag = NULL;
104 	}
105 
106 	RDSV3_DPRINTF4("rdsv3_ib_recv_clear_one", "Return: ic: %p, recv: %p",
107 	    ic, recv);
108 }
109 
110 void
111 rdsv3_ib_recv_clear_ring(struct rdsv3_ib_connection *ic)
112 {
113 	uint32_t i;
114 
115 	RDSV3_DPRINTF4("rdsv3_ib_recv_clear_ring", "ic: %p", ic);
116 
117 	for (i = 0; i < ic->i_recv_ring.w_nr; i++)
118 		rdsv3_ib_recv_clear_one(ic, &ic->i_recvs[i]);
119 }
120 
121 extern int atomic_add_unless(atomic_t *, uint_t, ulong_t);
122 
123 static int
124 rdsv3_ib_recv_refill_one(struct rdsv3_connection *conn,
125     struct rdsv3_ib_recv_work *recv, int kmflags)
126 {
127 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
128 	ibt_mi_hdl_t mi_hdl;
129 	ibt_iov_attr_t iov_attr;
130 	ibt_iov_t iov_arr[1];
131 
132 	RDSV3_DPRINTF5("rdsv3_ib_recv_refill_one", "conn: %p, recv: %p",
133 	    conn, recv);
134 
135 	if (recv->r_ibinc == NULL) {
136 		if (!atomic_add_unless(&rdsv3_ib_allocation, 1,
137 		    rdsv3_ib_sysctl_max_recv_allocation)) {
138 			rdsv3_ib_stats_inc(s_ib_rx_alloc_limit);
139 			goto out;
140 		}
141 		recv->r_ibinc = kmem_cache_alloc(rdsv3_ib_incoming_slab,
142 		    kmflags);
143 		if (recv->r_ibinc == NULL) {
144 			atomic_add_32(&rdsv3_ib_allocation, -1);
145 			goto out;
146 		}
147 		rdsv3_inc_init(&recv->r_ibinc->ii_inc, conn, conn->c_faddr);
148 	}
149 
150 	if (recv->r_frag == NULL) {
151 		recv->r_frag = kmem_cache_alloc(ic->rds_ibdev->ib_frag_slab,
152 		    kmflags);
153 		if (recv->r_frag == NULL)
154 			goto out;
155 	}
156 
157 	/* Data sge, structure copy */
158 	recv->r_sge[1] = recv->r_frag->f_sge;
159 
160 	RDSV3_DPRINTF5("rdsv3_ib_recv_refill_one", "Return: conn: %p, recv: %p",
161 	    conn, recv);
162 
163 	return (0);
164 out:
165 	return (-ENOMEM);
166 }
167 
168 /*
169  * This tries to allocate and post unused work requests after making sure that
170  * they have all the allocations they need to queue received fragments into
171  * sockets.  The i_recv_mutex is held here so that ring_alloc and _unalloc
172  * pairs don't go unmatched.
173  *
174  * -1 is returned if posting fails due to temporary resource exhaustion.
175  */
176 int
177 rdsv3_ib_recv_refill(struct rdsv3_connection *conn, int kmflags, int prefill)
178 {
179 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
180 	struct rdsv3_ib_recv_work *recv;
181 	unsigned int posted = 0;
182 	int ret = 0, avail;
183 	uint32_t pos, i;
184 
185 	RDSV3_DPRINTF4("rdsv3_ib_recv_refill", "conn: %p, prefill: %d",
186 	    conn, prefill);
187 
188 	if (prefill || rdsv3_conn_up(conn)) {
189 		uint_t w_nr = ic->i_recv_ring.w_nr;
190 
191 		avail = rdsv3_ib_ring_alloc(&ic->i_recv_ring, w_nr, &pos);
192 		if ((avail <= 0) || (pos >= w_nr)) {
193 			RDSV3_DPRINTF2("rdsv3_ib_recv_refill",
194 			    "Argh - ring alloc returned pos=%u, avail: %d",
195 			    pos, avail);
196 			return (-EINVAL);
197 		}
198 
199 		/* populate the WRs */
200 		for (i = 0; i < avail; i++) {
201 			recv = &ic->i_recvs[pos];
202 			ret = rdsv3_ib_recv_refill_one(conn, recv, kmflags);
203 			if (ret) {
204 				rdsv3_ib_ring_unalloc(&ic->i_recv_ring,
205 				    avail - i);
206 				break;
207 			}
208 			ic->i_recv_wrs[i].wr_id = (ibt_wrid_t)(uintptr_t)recv;
209 			ic->i_recv_wrs[i].wr_nds = RDSV3_IB_RECV_SGE;
210 			ic->i_recv_wrs[i].wr_sgl = &recv->r_sge[0];
211 
212 			pos = (pos + 1) % w_nr;
213 		}
214 
215 		if (i) {
216 			/* post the WRs at one shot */
217 			ret = ibt_post_recv(ib_get_ibt_channel_hdl(ic->i_cm_id),
218 			    &ic->i_recv_wrs[0], i, &posted);
219 			RDSV3_DPRINTF3("rdsv3_ib_recv_refill",
220 			    "attempted: %d posted: %d WRs ret %d",
221 			    i, posted, ret);
222 			if (ret) {
223 				RDSV3_DPRINTF2("rdsv3_ib_recv_refill",
224 				    "disconnecting and reconnecting\n",
225 				    NIPQUAD(conn->c_faddr), ret);
226 				rdsv3_ib_ring_unalloc(&ic->i_recv_ring,
227 				    i - posted);
228 				rdsv3_conn_drop(conn);
229 			}
230 		}
231 	}
232 
233 	/* We're doing flow control - update the window. */
234 	if (ic->i_flowctl && posted)
235 		rdsv3_ib_advertise_credits(conn, posted);
236 
237 	RDSV3_DPRINTF4("rdsv3_ib_recv_refill", "Return: conn: %p, posted: %d",
238 	    conn, posted);
239 	return (ret);
240 }
241 
242 void
243 rdsv3_ib_inc_purge(struct rdsv3_incoming *inc)
244 {
245 	struct rdsv3_ib_incoming *ibinc;
246 	struct rdsv3_page_frag *frag;
247 	struct rdsv3_page_frag *pos;
248 	struct rdsv3_ib_connection *ic =
249 	    (struct rdsv3_ib_connection *)inc->i_conn->c_transport_data;
250 
251 	RDSV3_DPRINTF4("rdsv3_ib_inc_purge", "inc: %p", inc);
252 
253 	ibinc = container_of(inc, struct rdsv3_ib_incoming, ii_inc);
254 	RDSV3_DPRINTF5("rdsv3_ib_inc_purge",
255 	    "purging ibinc %p inc %p\n", ibinc, inc);
256 
257 	RDSV3_FOR_EACH_LIST_NODE_SAFE(frag, pos, &ibinc->ii_frags, f_item) {
258 		list_remove_node(&frag->f_item);
259 		kmem_cache_free(ic->rds_ibdev->ib_frag_slab, frag);
260 	}
261 
262 	RDSV3_DPRINTF4("rdsv3_ib_inc_purge", "Return: inc: %p", inc);
263 }
264 
265 void
266 rdsv3_ib_inc_free(struct rdsv3_incoming *inc)
267 {
268 	struct rdsv3_ib_incoming *ibinc;
269 
270 	RDSV3_DPRINTF4("rdsv3_ib_inc_free", "inc: %p", inc);
271 
272 	ibinc = container_of(inc, struct rdsv3_ib_incoming, ii_inc);
273 
274 	rdsv3_ib_inc_purge(inc);
275 	RDSV3_DPRINTF5("rdsv3_ib_inc_free", "freeing ibinc %p inc %p",
276 	    ibinc, inc);
277 	ASSERT(list_is_empty(&ibinc->ii_frags));
278 	kmem_cache_free(rdsv3_ib_incoming_slab, ibinc);
279 	atomic_dec_uint(&rdsv3_ib_allocation);
280 
281 	RDSV3_DPRINTF4("rdsv3_ib_inc_free", "Return: inc: %p", inc);
282 }
283 
284 int
285 rdsv3_ib_inc_copy_to_user(struct rdsv3_incoming *inc, uio_t *uiop,
286     size_t size)
287 {
288 	struct rdsv3_ib_incoming *ibinc;
289 	struct rdsv3_page_frag *frag;
290 	unsigned long to_copy;
291 	unsigned long frag_off = 0;
292 	int copied = 0;
293 	int ret;
294 	uint32_t len;
295 
296 	ibinc = container_of(inc, struct rdsv3_ib_incoming, ii_inc);
297 	frag = list_head(&ibinc->ii_frags);
298 	len = ntohl(inc->i_hdr.h_len);
299 
300 	RDSV3_DPRINTF4("rdsv3_ib_inc_copy_to_user", "inc: %p, size: %d len: %d",
301 	    inc, size, len);
302 
303 	while (copied < size && copied < len) {
304 		if (frag_off == RDSV3_FRAG_SIZE) {
305 			frag = list_next(&ibinc->ii_frags, frag);
306 			frag_off = 0;
307 		}
308 
309 		to_copy = min(len - copied, RDSV3_FRAG_SIZE - frag_off);
310 		to_copy = min(size - copied, to_copy);
311 
312 		RDSV3_DPRINTF5("rdsv3_ib_inc_copy_to_user",
313 		    "%lu bytes to user %p from frag [%p, %u] + %lu",
314 		    to_copy, uiop,
315 		    frag->f_page, frag->f_offset, frag_off);
316 
317 		ret = uiomove((caddr_t)(frag->f_page +
318 		    frag->f_offset + frag_off),
319 		    to_copy, UIO_READ, uiop);
320 		if (ret) {
321 			RDSV3_DPRINTF2("rdsv3_ib_inc_copy_to_user",
322 			    "uiomove (%d) returned: %d", to_copy, ret);
323 			break;
324 		}
325 
326 		frag_off += to_copy;
327 		copied += to_copy;
328 	}
329 
330 	RDSV3_DPRINTF4("rdsv3_ib_inc_copy_to_user",
331 	    "Return: inc: %p, copied: %d", inc, copied);
332 
333 	return (copied);
334 }
335 
336 /* ic starts out kmem_zalloc()ed */
337 void
338 rdsv3_ib_recv_init_ack(struct rdsv3_ib_connection *ic)
339 {
340 	ibt_send_wr_t *wr = &ic->i_ack_wr;
341 	ibt_wr_ds_t *sge = &ic->i_ack_sge;
342 
343 	RDSV3_DPRINTF4("rdsv3_ib_recv_init_ack", "ic: %p", ic);
344 
345 	sge->ds_va = ic->i_ack_dma;
346 	sge->ds_len = sizeof (struct rdsv3_header);
347 	sge->ds_key = ic->i_mr->lkey;
348 
349 	wr->wr_sgl = sge;
350 	wr->wr_nds = 1;
351 	wr->wr_opcode = IBT_WRC_SEND;
352 	wr->wr_id = RDSV3_IB_ACK_WR_ID;
353 	wr->wr_flags = IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
354 }
355 
356 /*
357  * You'd think that with reliable IB connections you wouldn't need to ack
358  * messages that have been received.  The problem is that IB hardware generates
359  * an ack message before it has DMAed the message into memory.  This creates a
360  * potential message loss if the HCA is disabled for any reason between when it
361  * sends the ack and before the message is DMAed and processed.  This is only a
362  * potential issue if another HCA is available for fail-over.
363  *
364  * When the remote host receives our ack they'll free the sent message from
365  * their send queue.  To decrease the latency of this we always send an ack
366  * immediately after we've received messages.
367  *
368  * For simplicity, we only have one ack in flight at a time.  This puts
369  * pressure on senders to have deep enough send queues to absorb the latency of
370  * a single ack frame being in flight.  This might not be good enough.
371  *
372  * This is implemented by have a long-lived send_wr and sge which point to a
373  * statically allocated ack frame.  This ack wr does not fall under the ring
374  * accounting that the tx and rx wrs do.  The QP attribute specifically makes
375  * room for it beyond the ring size.  Send completion notices its special
376  * wr_id and avoids working with the ring in that case.
377  */
378 static void
379 rdsv3_ib_set_ack(struct rdsv3_ib_connection *ic, uint64_t seq,
380     int ack_required)
381 {
382 	RDSV3_DPRINTF4("rdsv3_ib_set_ack", "ic: %p, seq: %lld ack: %d",
383 	    ic, seq, ack_required);
384 
385 	mutex_enter(&ic->i_ack_lock);
386 	ic->i_ack_next = seq;
387 	if (ack_required)
388 		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
389 	mutex_exit(&ic->i_ack_lock);
390 }
391 
392 static uint64_t
393 rdsv3_ib_get_ack(struct rdsv3_ib_connection *ic)
394 {
395 	uint64_t seq;
396 
397 	RDSV3_DPRINTF4("rdsv3_ib_get_ack", "ic: %p", ic);
398 
399 	clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
400 
401 	mutex_enter(&ic->i_ack_lock);
402 	seq = ic->i_ack_next;
403 	mutex_exit(&ic->i_ack_lock);
404 
405 	return (seq);
406 }
407 
408 static void
409 rdsv3_ib_send_ack(struct rdsv3_ib_connection *ic, unsigned int adv_credits)
410 {
411 	struct rdsv3_header *hdr = ic->i_ack;
412 	uint64_t seq;
413 	int ret;
414 
415 	RDSV3_DPRINTF4("rdsv3_ib_send_ack", "ic: %p adv_credits: %d",
416 	    ic, adv_credits);
417 
418 	seq = rdsv3_ib_get_ack(ic);
419 
420 	RDSV3_DPRINTF4("rdsv3_ib_send_ack", "send_ack: ic %p ack %llu",
421 	    ic, (unsigned long long) seq);
422 	rdsv3_message_populate_header(hdr, 0, 0, 0);
423 	hdr->h_ack = htonll(seq);
424 	hdr->h_credit = adv_credits;
425 	rdsv3_message_make_checksum(hdr);
426 	ic->i_ack_queued = jiffies;
427 
428 	ret = ibt_post_send(RDSV3_QP2CHANHDL(ic->i_cm_id->qp), &ic->i_ack_wr, 1,
429 	    NULL);
430 	if (ret) {
431 		/*
432 		 * Failed to send. Release the WR, and
433 		 * force another ACK.
434 		 */
435 		clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
436 		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
437 		rdsv3_ib_stats_inc(s_ib_ack_send_failure);
438 		RDSV3_DPRINTF2("rdsv3_ib_send_ack", "sending ack failed\n");
439 		rdsv3_conn_drop(ic->conn);
440 	} else {
441 		rdsv3_ib_stats_inc(s_ib_ack_sent);
442 	}
443 	RDSV3_DPRINTF4("rdsv3_ib_send_ack", "Return: ic: %p adv_credits: %d",
444 	    ic, adv_credits);
445 }
446 
447 /*
448  * There are 3 ways of getting acknowledgements to the peer:
449  *  1.	We call rdsv3_ib_attempt_ack from the recv completion handler
450  *	to send an ACK-only frame.
451  *	However, there can be only one such frame in the send queue
452  *	at any time, so we may have to postpone it.
453  *  2.	When another (data) packet is transmitted while there's
454  *	an ACK in the queue, we piggyback the ACK sequence number
455  *	on the data packet.
456  *  3.	If the ACK WR is done sending, we get called from the
457  *	send queue completion handler, and check whether there's
458  *	another ACK pending (postponed because the WR was on the
459  *	queue). If so, we transmit it.
460  *
461  * We maintain 2 variables:
462  *  -	i_ack_flags, which keeps track of whether the ACK WR
463  *	is currently in the send queue or not (IB_ACK_IN_FLIGHT)
464  *  -	i_ack_next, which is the last sequence number we received
465  *
466  * Potentially, send queue and receive queue handlers can run concurrently.
467  * It would be nice to not have to use a spinlock to synchronize things,
468  * but the one problem that rules this out is that 64bit updates are
469  * not atomic on all platforms. Things would be a lot simpler if
470  * we had atomic64 or maybe cmpxchg64 everywhere.
471  *
472  * Reconnecting complicates this picture just slightly. When we
473  * reconnect, we may be seeing duplicate packets. The peer
474  * is retransmitting them, because it hasn't seen an ACK for
475  * them. It is important that we ACK these.
476  *
477  * ACK mitigation adds a header flag "ACK_REQUIRED"; any packet with
478  * this flag set *MUST* be acknowledged immediately.
479  */
480 
481 /*
482  * When we get here, we're called from the recv queue handler.
483  * Check whether we ought to transmit an ACK.
484  */
485 void
486 rdsv3_ib_attempt_ack(struct rdsv3_ib_connection *ic)
487 {
488 	unsigned int adv_credits;
489 
490 	RDSV3_DPRINTF4("rdsv3_ib_attempt_ack", "ic: %p", ic);
491 
492 	if (!test_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
493 		return;
494 
495 	if (test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
496 		rdsv3_ib_stats_inc(s_ib_ack_send_delayed);
497 		return;
498 	}
499 
500 	/* Can we get a send credit? */
501 	if (!rdsv3_ib_send_grab_credits(ic, 1, &adv_credits, 0)) {
502 		rdsv3_ib_stats_inc(s_ib_tx_throttle);
503 		clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
504 		return;
505 	}
506 
507 	clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
508 	rdsv3_ib_send_ack(ic, adv_credits);
509 
510 	RDSV3_DPRINTF4("rdsv3_ib_attempt_ack", "Return: ic: %p", ic);
511 }
512 
513 /*
514  * We get here from the send completion handler, when the
515  * adapter tells us the ACK frame was sent.
516  */
517 void
518 rdsv3_ib_ack_send_complete(struct rdsv3_ib_connection *ic)
519 {
520 	RDSV3_DPRINTF4("rdsv3_ib_ack_send_complete", "ic: %p", ic);
521 	clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
522 	rdsv3_ib_attempt_ack(ic);
523 }
524 
525 /*
526  * This is called by the regular xmit code when it wants to piggyback
527  * an ACK on an outgoing frame.
528  */
529 uint64_t
530 rdsv3_ib_piggyb_ack(struct rdsv3_ib_connection *ic)
531 {
532 	RDSV3_DPRINTF4("rdsv3_ib_piggyb_ack", "ic: %p", ic);
533 	if (test_and_clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags)) {
534 		rdsv3_ib_stats_inc(s_ib_ack_send_piggybacked);
535 	}
536 	return (rdsv3_ib_get_ack(ic));
537 }
538 
539 static struct rdsv3_header *
540 rdsv3_ib_get_header(struct rdsv3_connection *conn,
541     struct rdsv3_ib_recv_work *recv,
542     uint32_t data_len)
543 {
544 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
545 	void *hdr_buff = &ic->i_recv_hdrs[recv - ic->i_recvs];
546 
547 	RDSV3_DPRINTF4("rdsv3_ib_get_header", "conn: %p, recv: %p len: %d",
548 	    conn, recv, data_len);
549 
550 	/*
551 	 * Support header at the front (RDS 3.1+) as well as header-at-end.
552 	 *
553 	 * Cases:
554 	 * 1) header all in header buff (great!)
555 	 * 2) header all in data page (copy all to header buff)
556 	 * 3) header split across hdr buf + data page
557 	 *    (move bit in hdr buff to end before copying other bit from
558 	 *    data page)
559 	 */
560 	if (conn->c_version > RDS_PROTOCOL_3_0 || data_len == RDSV3_FRAG_SIZE)
561 		return (hdr_buff);
562 	/*
563 	 * XXX - Need to discuss the support for version < RDS_PROTOCOL_3_1.
564 	 */
565 	if (conn->c_version == RDS_PROTOCOL_3_0)
566 		return (hdr_buff);
567 
568 	/* version < RDS_PROTOCOL_3_0 */
569 	RDSV3_DPRINTF2("rdsv3_ib_get_header",
570 	    "NULL header (version: 0x%x, data_len: %d)", conn->c_version,
571 	    data_len);
572 	return (NULL);
573 }
574 
575 /*
576  * It's kind of lame that we're copying from the posted receive pages into
577  * long-lived bitmaps.  We could have posted the bitmaps and rdma written into
578  * them.  But receiving new congestion bitmaps should be a *rare* event, so
579  * hopefully we won't need to invest that complexity in making it more
580  * efficient.  By copying we can share a simpler core with TCP which has to
581  * copy.
582  */
583 static void
584 rdsv3_ib_cong_recv(struct rdsv3_connection *conn,
585     struct rdsv3_ib_incoming *ibinc)
586 {
587 	struct rdsv3_cong_map *map;
588 	unsigned int map_off;
589 	unsigned int map_page;
590 	struct rdsv3_page_frag *frag;
591 	unsigned long frag_off;
592 	unsigned long to_copy;
593 	unsigned long copied;
594 	uint64_t uncongested = 0;
595 	caddr_t addr;
596 
597 	RDSV3_DPRINTF4("rdsv3_ib_cong_recv", "conn: %p, ibinc: %p",
598 	    conn, ibinc);
599 
600 	/* catch completely corrupt packets */
601 	if (ntohl(ibinc->ii_inc.i_hdr.h_len) != RDSV3_CONG_MAP_BYTES)
602 		return;
603 
604 	map = conn->c_fcong;
605 	map_page = 0;
606 	map_off = 0;
607 
608 	frag = list_head(&ibinc->ii_frags);
609 	frag_off = 0;
610 
611 	copied = 0;
612 
613 	while (copied < RDSV3_CONG_MAP_BYTES) {
614 		uint64_t *src, *dst;
615 		unsigned int k;
616 
617 		to_copy = min(RDSV3_FRAG_SIZE - frag_off, PAGE_SIZE - map_off);
618 		ASSERT(!(to_copy & 7)); /* Must be 64bit aligned. */
619 
620 		addr = frag->f_page + frag->f_offset;
621 
622 		src = (uint64_t *)(addr + frag_off);
623 		dst = (uint64_t *)(map->m_page_addrs[map_page] + map_off);
624 		RDSV3_DPRINTF4("rdsv3_ib_cong_recv",
625 		    "src: %p dst: %p copied: %d", src, dst, copied);
626 		for (k = 0; k < to_copy; k += 8) {
627 			/*
628 			 * Record ports that became uncongested, ie
629 			 * bits that changed from 0 to 1.
630 			 */
631 			uncongested |= ~(*src) & *dst;
632 			*dst++ = *src++;
633 		}
634 
635 		copied += to_copy;
636 		RDSV3_DPRINTF4("rdsv3_ib_cong_recv",
637 		    "src: %p dst: %p copied: %d", src, dst, copied);
638 
639 		map_off += to_copy;
640 		if (map_off == PAGE_SIZE) {
641 			map_off = 0;
642 			map_page++;
643 		}
644 
645 		frag_off += to_copy;
646 		if (frag_off == RDSV3_FRAG_SIZE) {
647 			frag = list_next(&ibinc->ii_frags, frag);
648 			frag_off = 0;
649 		}
650 	}
651 
652 #if 0
653 XXX
654 	/* the congestion map is in little endian order */
655 	uncongested = le64_to_cpu(uncongested);
656 #endif
657 
658 	rdsv3_cong_map_updated(map, uncongested);
659 
660 	RDSV3_DPRINTF4("rdsv3_ib_cong_recv", "Return: conn: %p, ibinc: %p",
661 	    conn, ibinc);
662 }
663 
664 /*
665  * Rings are posted with all the allocations they'll need to queue the
666  * incoming message to the receiving socket so this can't fail.
667  * All fragments start with a header, so we can make sure we're not receiving
668  * garbage, and we can tell a small 8 byte fragment from an ACK frame.
669  */
670 struct rdsv3_ib_ack_state {
671 	uint64_t		ack_next;
672 	uint64_t		ack_recv;
673 	unsigned int	ack_required:1;
674 	unsigned int	ack_next_valid:1;
675 	unsigned int	ack_recv_valid:1;
676 };
677 
678 static void
679 rdsv3_ib_process_recv(struct rdsv3_connection *conn,
680     struct rdsv3_ib_recv_work *recv, uint32_t data_len,
681     struct rdsv3_ib_ack_state *state)
682 {
683 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
684 	struct rdsv3_ib_incoming *ibinc = ic->i_ibinc;
685 	struct rdsv3_header *ihdr, *hdr;
686 
687 	/* XXX shut down the connection if port 0,0 are seen? */
688 
689 	RDSV3_DPRINTF5("rdsv3_ib_process_recv",
690 	    "ic %p ibinc %p recv %p byte len %u", ic, ibinc, recv, data_len);
691 
692 	if (data_len < sizeof (struct rdsv3_header)) {
693 		RDSV3_DPRINTF2("rdsv3_ib_process_recv",
694 		    "incoming message from %u.%u.%u.%u didn't include a "
695 		    "header, disconnecting and reconnecting",
696 		    NIPQUAD(conn->c_faddr));
697 		rdsv3_conn_drop(conn);
698 		return;
699 	}
700 	data_len -= sizeof (struct rdsv3_header);
701 
702 	if ((ihdr = rdsv3_ib_get_header(conn, recv, data_len)) == NULL) {
703 		RDSV3_DPRINTF2("rdsv3_ib_process_recv", "incoming message "
704 		    "from %u.%u.%u.%u didn't have a proper version (0x%x) or"
705 		    "data_len (0x%x), disconnecting and "
706 		    "reconnecting",
707 		    NIPQUAD(conn->c_faddr), conn->c_version, data_len);
708 		rdsv3_conn_drop(conn);
709 		return;
710 	}
711 
712 	/* Validate the checksum. */
713 	if (!rdsv3_message_verify_checksum(ihdr)) {
714 		RDSV3_DPRINTF2("rdsv3_ib_process_recv", "incoming message "
715 		    "from %u.%u.%u.%u has corrupted header - "
716 		    "forcing a reconnect",
717 		    NIPQUAD(conn->c_faddr));
718 		rdsv3_conn_drop(conn);
719 		rdsv3_stats_inc(s_recv_drop_bad_checksum);
720 		return;
721 	}
722 
723 	/* Process the ACK sequence which comes with every packet */
724 	state->ack_recv = ntohll(ihdr->h_ack);
725 	state->ack_recv_valid = 1;
726 
727 	/* Process the credits update if there was one */
728 	if (ihdr->h_credit)
729 		rdsv3_ib_send_add_credits(conn, ihdr->h_credit);
730 
731 	if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && data_len == 0) {
732 		/*
733 		 * This is an ACK-only packet. The fact that it gets
734 		 * special treatment here is that historically, ACKs
735 		 * were rather special beasts.
736 		 */
737 		rdsv3_ib_stats_inc(s_ib_ack_received);
738 
739 		return;
740 	}
741 
742 	/*
743 	 * If we don't already have an inc on the connection then this
744 	 * fragment has a header and starts a message.. copy its header
745 	 * into the inc and save the inc so we can hang upcoming fragments
746 	 * off its list.
747 	 */
748 	if (ibinc == NULL) {
749 		ibinc = recv->r_ibinc;
750 		recv->r_ibinc = NULL;
751 		ic->i_ibinc = ibinc;
752 
753 		hdr = &ibinc->ii_inc.i_hdr;
754 		(void) memcpy(hdr, ihdr, sizeof (*hdr));
755 		ic->i_recv_data_rem = ntohl(hdr->h_len);
756 
757 		RDSV3_DPRINTF5("rdsv3_ib_process_recv",
758 		    "ic %p ibinc %p rem %u flag 0x%x", ic, ibinc,
759 		    ic->i_recv_data_rem, hdr->h_flags);
760 	} else {
761 		hdr = &ibinc->ii_inc.i_hdr;
762 		/*
763 		 * We can't just use memcmp here; fragments of a
764 		 * single message may carry different ACKs
765 		 */
766 		if (hdr->h_sequence != ihdr->h_sequence ||
767 		    hdr->h_len != ihdr->h_len ||
768 		    hdr->h_sport != ihdr->h_sport ||
769 		    hdr->h_dport != ihdr->h_dport) {
770 			RDSV3_DPRINTF2("rdsv3_ib_process_recv",
771 			    "fragment header mismatch; forcing reconnect");
772 			rdsv3_conn_drop(conn);
773 			return;
774 		}
775 	}
776 
777 	list_insert_tail(&ibinc->ii_frags, recv->r_frag);
778 	recv->r_frag = NULL;
779 
780 	if (ic->i_recv_data_rem > RDSV3_FRAG_SIZE)
781 		ic->i_recv_data_rem -= RDSV3_FRAG_SIZE;
782 	else {
783 		ic->i_recv_data_rem = 0;
784 		ic->i_ibinc = NULL;
785 
786 		if (ibinc->ii_inc.i_hdr.h_flags == RDSV3_FLAG_CONG_BITMAP)
787 			rdsv3_ib_cong_recv(conn, ibinc);
788 		else {
789 			rdsv3_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
790 			    &ibinc->ii_inc, KM_NOSLEEP);
791 			state->ack_next = ntohll(hdr->h_sequence);
792 			state->ack_next_valid = 1;
793 		}
794 
795 		/*
796 		 * Evaluate the ACK_REQUIRED flag *after* we received
797 		 * the complete frame, and after bumping the next_rx
798 		 * sequence.
799 		 */
800 		if (hdr->h_flags & RDSV3_FLAG_ACK_REQUIRED) {
801 			rdsv3_stats_inc(s_recv_ack_required);
802 			state->ack_required = 1;
803 		}
804 
805 		rdsv3_inc_put(&ibinc->ii_inc);
806 	}
807 
808 	RDSV3_DPRINTF4("rdsv3_ib_process_recv",
809 	    "Return: conn: %p recv: %p len: %d state: %p",
810 	    conn, recv, data_len, state);
811 }
812 
813 /*
814  * Plucking the oldest entry from the ring can be done concurrently with
815  * the thread refilling the ring.  Each ring operation is protected by
816  * spinlocks and the transient state of refilling doesn't change the
817  * recording of which entry is oldest.
818  *
819  * This relies on IB only calling one cq comp_handler for each cq so that
820  * there will only be one caller of rdsv3_recv_incoming() per RDS connection.
821  */
822 
823 void
824 rdsv3_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
825 {
826 	struct rdsv3_connection *conn = context;
827 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
828 
829 	RDSV3_DPRINTF4("rdsv3_ib_recv_cq_comp_handler",
830 	    "Enter(conn: %p cq: %p)", conn, cq);
831 
832 	rdsv3_ib_stats_inc(s_ib_rx_cq_call);
833 
834 	(void) ddi_taskq_dispatch(ic->i_recv_tasklet, rdsv3_ib_recv_tasklet_fn,
835 	    (void *)ic, DDI_SLEEP);
836 }
837 
838 static inline void
839 rdsv3_poll_cq(struct rdsv3_ib_connection *ic, struct rdsv3_ib_ack_state *state)
840 {
841 	struct rdsv3_connection *conn = ic->conn;
842 	ibt_wc_t wc;
843 	struct rdsv3_ib_recv_work *recv;
844 	uint_t polled;
845 
846 	while (ibt_poll_cq(RDSV3_CQ2CQHDL(ic->i_recv_cq), &wc, 1, &polled) ==
847 	    IBT_SUCCESS) {
848 		RDSV3_DPRINTF5("rdsv3_ib_recv_cq_comp_handler",
849 		    "rwc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
850 		    (unsigned long long)wc.wc_id, wc.wc_status,
851 		    wc.wc_bytes_xfer, ntohl(wc.wc_immed_data));
852 		rdsv3_ib_stats_inc(s_ib_rx_cq_event);
853 
854 		recv = (struct rdsv3_ib_recv_work *)(uintptr_t)wc.wc_id;
855 
856 		/*
857 		 * Also process recvs in connecting state because it is possible
858 		 * to get a recv completion _before_ the rdmacm ESTABLISHED
859 		 * event is processed.
860 		 */
861 		if (rdsv3_conn_up(conn) || rdsv3_conn_connecting(conn)) {
862 			/*
863 			 * We expect errors as the qp is drained during
864 			 * shutdown
865 			 */
866 			if (wc.wc_status == IBT_WC_SUCCESS) {
867 				rdsv3_ib_process_recv(conn, recv,
868 				    wc.wc_bytes_xfer, state);
869 			} else {
870 				RDSV3_DPRINTF2("rdsv3_ib_recv_cq_comp_handler",
871 				    "recv completion on "
872 				    "%u.%u.%u.%u had status %u, "
873 				    "disconnecting and reconnecting\n",
874 				    NIPQUAD(conn->c_faddr),
875 				    wc.wc_status);
876 				rdsv3_conn_drop(conn);
877 			}
878 		}
879 
880 		rdsv3_ib_ring_free(&ic->i_recv_ring, 1);
881 	}
882 }
883 
884 static processorid_t rdsv3_taskq_bind_cpuid = 0;
885 void
886 rdsv3_ib_recv_tasklet_fn(void *data)
887 {
888 	struct rdsv3_ib_connection *ic = (struct rdsv3_ib_connection *)data;
889 	struct rdsv3_connection *conn = ic->conn;
890 	struct rdsv3_ib_ack_state state = { 0, };
891 	cpu_t   *cp;
892 
893 	RDSV3_DPRINTF4("rdsv3_ib_recv_tasklet_fn", "Enter: ic: %p", ic);
894 
895 	/* If not already bound, bind this thread to a CPU */
896 	if (ic->i_recv_tasklet_cpuid != rdsv3_taskq_bind_cpuid) {
897 		cp = cpu[rdsv3_taskq_bind_cpuid];
898 		mutex_enter(&cpu_lock);
899 		if (cpu_is_online(cp)) {
900 			if (ic->i_recv_tasklet_cpuid >= 0)
901 				thread_affinity_clear(curthread);
902 			thread_affinity_set(curthread, rdsv3_taskq_bind_cpuid);
903 			ic->i_recv_tasklet_cpuid = rdsv3_taskq_bind_cpuid;
904 		}
905 		mutex_exit(&cpu_lock);
906 	}
907 
908 	rdsv3_poll_cq(ic, &state);
909 	(void) ibt_enable_cq_notify(RDSV3_CQ2CQHDL(ic->i_recv_cq),
910 	    IBT_NEXT_SOLICITED);
911 	rdsv3_poll_cq(ic, &state);
912 
913 	if (state.ack_next_valid)
914 		rdsv3_ib_set_ack(ic, state.ack_next, state.ack_required);
915 	if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
916 		rdsv3_send_drop_acked(conn, state.ack_recv, NULL);
917 		ic->i_ack_recv = state.ack_recv;
918 	}
919 	if (rdsv3_conn_up(conn))
920 		rdsv3_ib_attempt_ack(ic);
921 
922 	/*
923 	 * If we ever end up with a really empty receive ring, we're
924 	 * in deep trouble, as the sender will definitely see RNR
925 	 * timeouts.
926 	 */
927 	if (rdsv3_ib_ring_empty(&ic->i_recv_ring))
928 		rdsv3_ib_stats_inc(s_ib_rx_ring_empty);
929 
930 	/*
931 	 * If the ring is running low, then schedule the thread to refill.
932 	 */
933 	if (rdsv3_ib_ring_low(&ic->i_recv_ring) &&
934 	    (rdsv3_conn_up(conn) || rdsv3_conn_connecting(conn)))
935 		rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_recv_w, 0);
936 
937 	RDSV3_DPRINTF4("rdsv3_ib_recv_tasklet_fn", "Return: ic: %p", ic);
938 }
939 
940 int
941 rdsv3_ib_recv(struct rdsv3_connection *conn)
942 {
943 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
944 	int ret = 0;
945 
946 	RDSV3_DPRINTF4("rdsv3_ib_recv", "conn %p\n", conn);
947 
948 	/*
949 	 * If we get a temporary posting failure in this context then
950 	 * we're really low and we want the caller to back off for a bit.
951 	 */
952 	mutex_enter(&ic->i_recv_mutex);
953 	if (rdsv3_ib_recv_refill(conn, KM_NOSLEEP, 0))
954 		ret = -ENOMEM;
955 	else
956 		rdsv3_ib_stats_inc(s_ib_rx_refill_from_thread);
957 	mutex_exit(&ic->i_recv_mutex);
958 
959 	if (rdsv3_conn_up(conn))
960 		rdsv3_ib_attempt_ack(ic);
961 
962 	RDSV3_DPRINTF4("rdsv3_ib_recv", "Return: conn: %p", conn);
963 
964 	return (ret);
965 }
966 
967 uint_t	MaxRecvMemory = 128 * 1024 * 1024;
968 
969 extern int rdsv3_ib_inc_constructor(void *buf, void *arg, int kmflags);
970 extern void rdsv3_ib_inc_destructor(void *buf, void *arg);
971 
972 int
973 rdsv3_ib_recv_init(void)
974 {
975 	RDSV3_DPRINTF4("rdsv3_ib_recv_init", "Enter");
976 
977 	/* XXX - hard code it to 128 MB */
978 	rdsv3_ib_sysctl_max_recv_allocation = MaxRecvMemory / RDSV3_FRAG_SIZE;
979 
980 	rdsv3_ib_incoming_slab = kmem_cache_create("rdsv3_ib_incoming",
981 	    sizeof (struct rdsv3_ib_incoming), 0, rdsv3_ib_inc_constructor,
982 	    rdsv3_ib_inc_destructor, NULL, NULL, NULL, 0);
983 	if (rdsv3_ib_incoming_slab == NULL) {
984 		RDSV3_DPRINTF2("rdsv3_ib_recv_init", "kmem_cache_create "
985 		    "failed");
986 		return (-ENOMEM);
987 	}
988 
989 	RDSV3_DPRINTF4("rdsv3_ib_recv_init", "Return");
990 	return (0);
991 }
992 
993 void
994 rdsv3_ib_recv_exit(void)
995 {
996 	RDSV3_DPRINTF4("rdsv3_ib_recv_exit", "Enter");
997 	kmem_cache_destroy(rdsv3_ib_incoming_slab);
998 	RDSV3_DPRINTF4("rdsv3_ib_recv_exit", "Return");
999 }
1000