1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright (c) 2002-2003, Network Appliance, Inc. All rights reserved.
24  */
25 
26 /*
27  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
28  * Use is subject to license terms.
29  */
30 
31 /*
32  *
33  * MODULE: dapl_evd_util.c
34  *
35  * PURPOSE: Manage EVD Info structure
36  *
37  * $Id: dapl_evd_util.c,v 1.41 2003/08/20 13:18:36 sjs2 Exp $
38  */
39 
40 #include <sys/time.h>
41 #include <strings.h>
42 #include "dapl_evd_util.h"
43 #include "dapl_ia_util.h"
44 #include "dapl_cno_util.h"
45 #include "dapl_ring_buffer_util.h"
46 #include "dapl_adapter_util.h"
47 #include "dapl_tavor_ibtf_impl.h"
48 #include "dapl_cookie.h"
49 #include "dapl.h"
50 
51 
52 #ifdef	DAPL_DBG	/* For debugging.  */
53 static void
54 dapli_evd_eh_print_cqe(
55 	IN  ib_work_completion_t	cqe);
56 #endif
57 
58 static DAT_BOOLEAN
59 dapli_evd_cqe_to_event(
60     IN DAPL_EVD			*evd_ptr,
61     IN ib_work_completion_t	*cqe_ptr,
62     IN DAT_BOOLEAN		process_premature_events,
63     OUT DAT_EVENT		*event_ptr);
64 
65 static DAT_RETURN
66 dapli_evd_event_alloc(
67 	IN  DAPL_EVD		*evd_ptr,
68 	IN  DAPL_CNO		*cno_ptr,
69 	IN  DAT_COUNT		qlen);
70 
71 
72 /*
73  * dapls_evd_internal_create
74  *
75  * actually create the evd.  this is called after all parameter checking
76  * has been performed in dapl_ep_create.  it is also called from dapl_ia_open
77  * to create the default async evd.
78  *
79  * Input:
80  * 	ia_ptr
81  *	cno_ptr
82  *	qlen
83  *	evd_flags
84  *
85  * Output:
86  * 	evd_ptr_ptr
87  *
88  * Returns:
89  * 	none
90  *
91  */
92 
93 DAT_RETURN
94 dapls_evd_internal_create(
95     DAPL_IA		*ia_ptr,
96     DAPL_CNO		*cno_ptr,
97     DAT_COUNT		min_qlen,
98     DAT_EVD_FLAGS	evd_flags,
99     DAPL_EVD		**evd_ptr_ptr)
100 {
101 	DAPL_EVD	*evd_ptr;
102 	DAT_COUNT	cq_len;
103 	DAT_RETURN	dat_status;
104 
105 	dat_status	= DAT_SUCCESS;
106 	*evd_ptr_ptr	= NULL;
107 	cq_len		= min_qlen;
108 
109 	evd_ptr = dapls_evd_alloc(ia_ptr,
110 	    cno_ptr,
111 	    evd_flags,
112 	    min_qlen);
113 	if (!evd_ptr) {
114 		dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,
115 		    DAT_RESOURCE_MEMORY);
116 		goto bail;
117 	}
118 
119 	/*
120 	 * If we are dealing with event streams besides a CQ event stream,
121 	 * be conservative and set producer side locking.  Otherwise, no.
122 	 */
123 	evd_ptr->evd_producer_locking_needed =
124 	    ((evd_flags & ~ (DAT_EVD_DTO_FLAG|DAT_EVD_RMR_BIND_FLAG)) != 0);
125 
126 	/* Before we setup any callbacks, transition state to OPEN.  */
127 	evd_ptr->evd_state = DAPL_EVD_STATE_OPEN;
128 
129 	/*
130 	 * we need to call cq_alloc even for connection/cr/async evds
131 	 * since all the allocation happens there.
132 	 */
133 	dat_status = dapls_ib_cq_alloc(ia_ptr,
134 	    evd_ptr, cno_ptr, &cq_len);
135 	if (dat_status != DAT_SUCCESS) {
136 		goto bail;
137 	}
138 
139 	dat_status = dapls_ib_setup_async_callback(
140 	    ia_ptr,
141 	    DAPL_ASYNC_CQ_COMPLETION,
142 	    (unsigned int *) evd_ptr->ib_cq_handle,
143 	    (ib_async_handler_t)dapl_evd_dto_callback,
144 	    evd_ptr);
145 	if (dat_status != DAT_SUCCESS) {
146 		goto bail;
147 	}
148 	/*
149 	 * cq_notify is not required since when evd_wait is called
150 	 * time we go and poll cq anyways.
151 	 * dat_status = dapls_set_cq_notify(ia_ptr, evd_ptr);
152 	 */
153 
154 	/*
155 	 * We now have an accurate count of events, so allocate them into
156 	 * the EVD
157 	 */
158 	dat_status = dapli_evd_event_alloc(evd_ptr, cno_ptr, cq_len);
159 	if (dat_status != DAT_SUCCESS) {
160 		goto bail;
161 	}
162 
163 	/* We're assuming success in the following.   */
164 	dapl_os_assert(dat_status == DAT_SUCCESS);
165 	dapl_ia_link_evd(ia_ptr, evd_ptr);
166 	*evd_ptr_ptr = evd_ptr;
167 
168 bail:
169 	if (dat_status != DAT_SUCCESS) {
170 		if (evd_ptr) {
171 			(void) dapls_evd_dealloc(evd_ptr);
172 		}
173 	}
174 
175 	return (dat_status);
176 }
177 
178 /*
179  * dapls_evd_alloc
180  *
181  * alloc and initialize an EVD struct
182  *
183  * Input:
184  * 	ia
185  *
186  * Output:
187  * 	evd_ptr
188  *
189  * Returns:
190  * 	none
191  *
192  */
193 DAPL_EVD *
194 dapls_evd_alloc(
195     IN DAPL_IA		*ia_ptr,
196     IN DAPL_CNO		*cno_ptr,
197     IN DAT_EVD_FLAGS	evd_flags,
198     IN DAT_COUNT	qlen) /* ARGSUSED */
199 {
200 	DAPL_EVD	*evd_ptr;
201 
202 	evd_ptr    = NULL;
203 
204 	/* Allocate EVD */
205 	evd_ptr = (DAPL_EVD *)dapl_os_alloc(sizeof (DAPL_EVD));
206 	if (!evd_ptr) {
207 		goto bail;
208 	}
209 
210 	/* zero the structure */
211 	(void) dapl_os_memzero(evd_ptr, sizeof (DAPL_EVD));
212 
213 	/*
214 	 * initialize the header
215 	 */
216 	evd_ptr->header.provider		= ia_ptr->header.provider;
217 	evd_ptr->header.magic			= DAPL_MAGIC_EVD;
218 	evd_ptr->header.handle_type		= DAT_HANDLE_TYPE_EVD;
219 	evd_ptr->header.owner_ia		= ia_ptr;
220 	evd_ptr->header.user_context.as_64	= 0;
221 	evd_ptr->header.user_context.as_ptr	= NULL;
222 	dapl_llist_init_entry(&evd_ptr->header.ia_list_entry);
223 	dapl_os_lock_init(&evd_ptr->header.lock);
224 
225 	/*
226 	 * Initialize the body
227 	 */
228 	evd_ptr->evd_state	= DAPL_EVD_STATE_INITIAL;
229 	evd_ptr->evd_flags	= evd_flags;
230 	evd_ptr->evd_enabled	= DAT_TRUE;
231 	evd_ptr->evd_waitable	= DAT_TRUE;
232 	evd_ptr->evd_producer_locking_needed = 1; /* Conservative value.  */
233 	evd_ptr->ib_cq_handle	= IB_INVALID_HANDLE;
234 	evd_ptr->evd_ref_count	= 0;
235 	evd_ptr->catastrophic_overflow = DAT_FALSE;
236 	evd_ptr->qlen		= qlen;
237 
238 	dapl_llist_init_entry(&evd_ptr->cno_list_entry);
239 	evd_ptr->completion_type = DAPL_EVD_STATE_THRESHOLD;
240 	(void) dapl_os_wait_object_init(&evd_ptr->wait_object);
241 
242 bail:
243 	return (evd_ptr);
244 }
245 
246 
247 /*
248  * dapls_evd_event_alloc
249  *
250  * alloc events into an EVD.
251  *
252  * Input:
253  * 	evd_ptr
254  *	qlen
255  *
256  * Output:
257  * 	NONE
258  *
259  * Returns:
260  * 	DAT_SUCCESS
261  *	ERROR
262  *
263  */
264 DAT_RETURN
265 dapli_evd_event_alloc(
266     IN DAPL_EVD		*evd_ptr,
267     IN  DAPL_CNO	*cno_ptr,
268     IN DAT_COUNT	qlen)
269 {
270 	DAT_EVENT	*event_ptr;
271 	DAT_COUNT	i;
272 	DAT_RETURN	dat_status;
273 
274 	dat_status = DAT_SUCCESS;
275 	event_ptr  = NULL;
276 
277 	/* Allocate EVENTs */
278 	event_ptr = (DAT_EVENT *) dapl_os_alloc(qlen * sizeof (DAT_EVENT));
279 	if (!event_ptr) {
280 		goto bail;
281 	}
282 	evd_ptr->events = event_ptr;
283 	evd_ptr->qlen = qlen;
284 
285 	/* allocate free event queue */
286 	dat_status = dapls_rbuf_alloc(&evd_ptr->free_event_queue, qlen);
287 	if (dat_status != DAT_SUCCESS) {
288 		goto bail;
289 	}
290 
291 	/* allocate pending event queue */
292 	dat_status = dapls_rbuf_alloc(&evd_ptr->pending_event_queue, qlen);
293 	if (dat_status != DAT_SUCCESS) {
294 		goto bail;
295 	}
296 
297 	/* add events to free event queue */
298 	for (i = 0; i < qlen; i++) {
299 		dat_status = dapls_rbuf_add(&evd_ptr->free_event_queue,
300 		    (void *)event_ptr);
301 		dapl_os_assert(dat_status == DAT_SUCCESS);
302 		event_ptr++;
303 	}
304 	evd_ptr->cq_notified = DAT_FALSE;
305 	evd_ptr->cq_notified_when = 0;
306 	evd_ptr->cno_active_count = 0;
307 	if (cno_ptr != NULL) {
308 		dapl_os_lock(&cno_ptr->header.lock);
309 		dapl_llist_add_head(&cno_ptr->evd_list_head,
310 		    &evd_ptr->cno_list_entry, evd_ptr);
311 		/* Take a reference count on the CNO */
312 		dapl_os_atomic_inc(&cno_ptr->cno_ref_count);
313 		dapl_os_unlock(&cno_ptr->header.lock);
314 	}
315 	evd_ptr->cno_ptr = cno_ptr;
316 	evd_ptr->threshold = 0;
317 
318 bail:
319 	return (dat_status);
320 }
321 
322 
323 /*
324  * dapls_evd_dealloc
325  *
326  * Free the passed in EVD structure. If an error occurs, this function
327  * will clean up all of the internal data structures and report the
328  * error.
329  *
330  * Input:
331  * 	evd_ptr
332  *
333  * Output:
334  * 	none
335  *
336  * Returns:
337  * 	status
338  *
339  */
340 DAT_RETURN
341 dapls_evd_dealloc(
342     IN DAPL_EVD		*evd_ptr)
343 {
344 	DAT_RETURN	dat_status;
345 	DAPL_IA	*ia_ptr;
346 
347 	dat_status = DAT_SUCCESS;
348 
349 	dapl_os_assert(evd_ptr->header.magic == DAPL_MAGIC_EVD);
350 	dapl_os_assert(evd_ptr->evd_ref_count == 0);
351 
352 	/*
353 	 * Destroy the CQ first, to keep any more callbacks from coming
354 	 * up from it.
355 	 */
356 	if (evd_ptr->ib_cq_handle != IB_INVALID_HANDLE) {
357 		ia_ptr = evd_ptr->header.owner_ia;
358 
359 		dat_status = dapls_ib_cq_free(ia_ptr, evd_ptr);
360 		if (dat_status != DAT_SUCCESS) {
361 			goto bail;
362 		}
363 	}
364 
365 	/*
366 	 * We should now be safe to invalidate the EVD; reset the
367 	 * magic to prevent reuse.
368 	 */
369 	evd_ptr->header.magic = DAPL_MAGIC_INVALID;
370 
371 	/* Release reference on the CNO if it exists */
372 	if (evd_ptr->cno_ptr != NULL) {
373 		dapl_os_lock(&evd_ptr->cno_ptr->header.lock);
374 		(void) dapl_llist_remove_entry(&evd_ptr->cno_ptr->evd_list_head,
375 		    &evd_ptr->cno_list_entry);
376 		dapl_os_atomic_dec(&evd_ptr->cno_ptr->cno_ref_count);
377 		dapl_os_unlock(&evd_ptr->cno_ptr->header.lock);
378 	}
379 
380 	/*
381 	 * If the ring buffer allocation failed, then the dapls_rbuf_destroy
382 	 * function will detect that the ring buffer's internal data (ex. base
383 	 * pointer) are invalid and will handle the situation appropriately
384 	 */
385 	dapls_rbuf_destroy(&evd_ptr->free_event_queue);
386 	dapls_rbuf_destroy(&evd_ptr->pending_event_queue);
387 
388 	if (evd_ptr->events) {
389 		dapl_os_free(evd_ptr->events,
390 		    evd_ptr->qlen * sizeof (DAT_EVENT));
391 	}
392 
393 	(void) dapl_os_wait_object_destroy(&evd_ptr->wait_object);
394 	dapl_os_free(evd_ptr, sizeof (DAPL_EVD));
395 
396 bail:
397 	return (dat_status);
398 }
399 
400 
401 /*
402  * dapli_evd_eh_print_cqe
403  *
404  * Input:
405  *	cqe
406  *
407  * Output:
408  *	none
409  *
410  * Prints out a CQE for debug purposes
411  *
412  */
413 
414 #ifdef	DAPL_DBG	/* For debugging.  */
415 void
416 dapli_evd_eh_print_cqe(
417     IN 	ib_work_completion_t	cqe)
418 {
419 	static char *optable[] = {
420 		"",
421 		"OP_SEND",
422 		"OP_RDMA_READ",
423 		"OP_RDMA_WRITE",
424 		"OP_COMP_AND_SWAP",
425 		"OP_FETCH_AND_ADD",
426 		"OP_BIND_MW",
427 		"OP_RECEIVE",
428 		"OP_RECEIVE_RDMAWI",
429 		0
430 	};
431 	DAPL_COOKIE		*dto_cookie;
432 
433 	dto_cookie = (DAPL_COOKIE *) (uintptr_t)DAPL_GET_CQE_WRID(&cqe);
434 
435 	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
436 	    "\t >>>>>>>>>>>>>>>>>>>>>>><<<<<<<<<<<<<<<<<<<\n");
437 	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
438 	    "\t dapl_evd_dto_callback : CQE \n");
439 	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
440 	    "\t\t work_req_id 0x%llx\n", DAPL_GET_CQE_WRID(&cqe));
441 	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
442 	    "\t\t op_type: %s\n", optable[DAPL_GET_CQE_OPTYPE(&cqe)]);
443 	if ((DAPL_GET_CQE_OPTYPE(&cqe) == OP_SEND) ||
444 	    (DAPL_GET_CQE_OPTYPE(&cqe) == OP_RDMA_WRITE)) {
445 		dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
446 		    "\t\t bytes_num %d\n", dto_cookie->val.dto.size);
447 	} else {
448 		dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
449 		    "\t\t bytes_num %d\n", DAPL_GET_CQE_BYTESNUM(&cqe));
450 	}
451 	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
452 	    "\t\t status %d\n", DAPL_GET_CQE_STATUS(&cqe));
453 	dapl_dbg_log(DAPL_DBG_TYPE_CALLBACK,
454 	    "\t >>>>>>>>>>>>>>>>>>>>>>><<<<<<<<<<<<<<<<<<<\n");
455 }
456 #endif
457 
458 /*
459  * Event posting code follows.
460  */
461 
462 /*
463  * These next two functions (dapli_evd_get_event and dapli_evd_post_event)
464  * are a pair.  They are always called together, from one of the functions
465  * at the end of this file (dapl_evd_post_*_event).
466  *
467  * Note that if producer side locking is enabled, the first one takes the
468  * EVD lock and the second releases it.
469  */
470 
471 /*
472  * dapli_evd_get_event
473  *
474  * Get an event struct from the evd.  The caller should fill in the event
475  * and call dapl_evd_post_event.
476  *
477  * If there are no events available, an overflow event is generated to the
478  * async EVD handler.
479  *
480  * If this EVD required producer locking, a successful return implies
481  * that the lock is held.
482  *
483  * Input:
484  * 	evd_ptr
485  *
486  * Output:
487  *	event
488  *
489  */
490 
491 static DAT_EVENT *
492 dapli_evd_get_event(
493     DAPL_EVD *evd_ptr)
494 {
495 	DAT_EVENT	*event;
496 
497 	if (evd_ptr->evd_producer_locking_needed) {
498 		dapl_os_lock(&evd_ptr->header.lock);
499 	}
500 
501 	event = (DAT_EVENT *)dapls_rbuf_remove(&evd_ptr->free_event_queue);
502 
503 	/* Release the lock if it was taken and the call failed.  */
504 	if (!event && evd_ptr->evd_producer_locking_needed) {
505 		dapl_os_unlock(&evd_ptr->header.lock);
506 	}
507 
508 	return (event);
509 }
510 
511 /*
512  * dapli_evd_post_event
513  *
514  * Post the <event> to the evd.  If possible, invoke the evd's CNO.
515  * Otherwise post the event on the pending queue.
516  *
517  * If producer side locking is required, the EVD lock must be held upon
518  * entry to this function.
519  *
520  * Input:
521  * 	evd_ptr
522  * 	event
523  *
524  * Output:
525  *	none
526  *
527  */
528 
529 static void
530 dapli_evd_post_event(
531     IN	DAPL_EVD	*evd_ptr,
532     IN	const DAT_EVENT	*event_ptr)
533 {
534 	DAT_RETURN	dat_status;
535 	DAPL_CNO 	*cno_to_trigger = NULL;
536 
537 	dapl_dbg_log(DAPL_DBG_TYPE_EVD,
538 	    "dapli_evd_post_event: Called with event # %x\n",
539 	    event_ptr->event_number);
540 
541 	dat_status = dapls_rbuf_add(&evd_ptr->pending_event_queue,
542 	    (void *)event_ptr);
543 	dapl_os_assert(dat_status == DAT_SUCCESS);
544 
545 	dapl_os_assert(evd_ptr->evd_state == DAPL_EVD_STATE_WAITED ||
546 	    evd_ptr->evd_state == DAPL_EVD_STATE_OPEN);
547 
548 	if (evd_ptr->evd_state == DAPL_EVD_STATE_OPEN) {
549 		/* No waiter.  Arrange to trigger a CNO if it exists.  */
550 
551 		if (evd_ptr->evd_enabled) {
552 			cno_to_trigger = evd_ptr->cno_ptr;
553 		}
554 		if (evd_ptr->evd_producer_locking_needed) {
555 			dapl_os_unlock(&evd_ptr->header.lock);
556 		}
557 	} else {
558 		/*
559 		 * This routine gets called
560 		 *  - In the context of the waiting thread when CQ, CM or ASYNC
561 		 *    events need to be put on to the EVD ring buffer.
562 		 *  - Due to a post of a software event.
563 		 *
564 		 * In the first case the waiting thread is pulling the events
565 		 * from various streams into the evd so there is no need to
566 		 * wake any thread. In the second case if the evd is in waited
567 		 * state then we need to wakeup the waiting thread.
568 		 */
569 		if (event_ptr->event_number == DAT_SOFTWARE_EVENT) {
570 			/*
571 			 * We're in DAPL_EVD_STATE_WAITED.  Take the lock if
572 			 * we don't have it, recheck, and signal.
573 			 */
574 
575 			if (!evd_ptr->evd_producer_locking_needed) {
576 				dapl_os_lock(&evd_ptr->header.lock);
577 			}
578 
579 			if (evd_ptr->evd_state == DAPL_EVD_STATE_WAITED) {
580 				dapl_os_unlock(&evd_ptr->header.lock);
581 				(void) dapls_ib_event_wakeup(evd_ptr);
582 			} else {
583 				dapl_os_unlock(&evd_ptr->header.lock);
584 			}
585 		} else {
586 			if (evd_ptr->evd_producer_locking_needed) {
587 				dapl_os_unlock(&evd_ptr->header.lock);
588 			}
589 		}
590 	}
591 
592 	if (cno_to_trigger != NULL) {
593 		dapl_cno_trigger(cno_to_trigger, evd_ptr);
594 	}
595 }
596 
597 /*
598  * dapli_evd_post_event_nosignal
599  *
600  * Post the <event> to the evd.  Do not do any wakeup processing.
601  * This function should only be called if it is known that there are
602  * no waiters that it is appropriate to wakeup on this EVD.  An example
603  * of such a situation is during internal dat_evd_wait() processing.
604  *
605  * If producer side locking is required, the EVD lock must be held upon
606  * entry to this function.
607  *
608  * Input:
609  * 	evd_ptr
610  * 	event
611  *
612  * Output:
613  *	none
614  *
615  */
616 
617 static void
618 dapli_evd_post_event_nosignal(
619     IN	DAPL_EVD	*evd_ptr,
620     IN	const DAT_EVENT	*event_ptr)
621 {
622 	DAT_RETURN	dat_status;
623 
624 	dapl_dbg_log(DAPL_DBG_TYPE_EVD,
625 	    "dapli_evd_post_event: Called with event # %x\n",
626 	    event_ptr->event_number);
627 
628 	dat_status = dapls_rbuf_add(&evd_ptr->pending_event_queue,
629 	    (void *)event_ptr);
630 	dapl_os_assert(dat_status == DAT_SUCCESS);
631 
632 	dapl_os_assert(evd_ptr->evd_state == DAPL_EVD_STATE_WAITED ||
633 	    evd_ptr->evd_state == DAPL_EVD_STATE_OPEN);
634 
635 	if (evd_ptr->evd_producer_locking_needed) {
636 		dapl_os_unlock(&evd_ptr->header.lock);
637 	}
638 }
639 
640 /*
641  * dapli_evd_format_overflow_event
642  *
643  * format an overflow event for posting
644  *
645  * Input:
646  * 	evd_ptr
647  * 	event_ptr
648  *
649  * Output:
650  *	none
651  *
652  */
653 static void
654 dapli_evd_format_overflow_event(
655 	IN  DAPL_EVD  *evd_ptr,
656 	OUT DAT_EVENT *event_ptr)
657 {
658 	DAPL_IA *ia_ptr;
659 
660 	ia_ptr = evd_ptr->header.owner_ia;
661 
662 	event_ptr->evd_handle   = (DAT_EVD_HANDLE)evd_ptr;
663 	event_ptr->event_number = DAT_ASYNC_ERROR_EVD_OVERFLOW;
664 	event_ptr->event_data.asynch_error_event_data.dat_handle =
665 	    (DAT_HANDLE)ia_ptr;
666 }
667 
668 /*
669  * dapli_evd_post_overflow_event
670  *
671  * post an overflow event
672  *
673  * Input:
674  * 	async_evd_ptr
675  * 	evd_ptr
676  *
677  * Output:
678  *	none
679  *
680  */
681 static void
682 dapli_evd_post_overflow_event(
683     IN  DAPL_EVD  *async_evd_ptr,
684     IN  DAPL_EVD  *overflow_evd_ptr)
685 {
686 	DAT_EVENT *overflow_event;
687 
688 	/*
689 	 * The overflow_evd_ptr mght be the same as evd.
690 	 * In that case we've got a catastrophic overflow.
691 	 */
692 	if (async_evd_ptr == overflow_evd_ptr) {
693 		async_evd_ptr->catastrophic_overflow = DAT_TRUE;
694 		async_evd_ptr->evd_state = DAPL_EVD_STATE_DEAD;
695 		return;
696 	}
697 
698 	overflow_event = dapli_evd_get_event(overflow_evd_ptr);
699 	if (!overflow_event) {
700 		/* this is not good */
701 		overflow_evd_ptr->catastrophic_overflow = DAT_TRUE;
702 		overflow_evd_ptr->evd_state = DAPL_EVD_STATE_DEAD;
703 		return;
704 	}
705 	dapli_evd_format_overflow_event(overflow_evd_ptr, overflow_event);
706 	dapli_evd_post_event(overflow_evd_ptr, overflow_event);
707 }
708 
709 static DAT_EVENT *
710 dapli_evd_get_and_init_event(
711     IN DAPL_EVD				*evd_ptr,
712     IN DAT_EVENT_NUMBER			event_number)
713 {
714 	DAT_EVENT 		*event_ptr;
715 
716 	event_ptr = dapli_evd_get_event(evd_ptr);
717 	if (NULL == event_ptr) {
718 		dapli_evd_post_overflow_event(
719 		    evd_ptr->header.owner_ia->async_error_evd, evd_ptr);
720 	} else {
721 		event_ptr->evd_handle = (DAT_EVD_HANDLE) evd_ptr;
722 		event_ptr->event_number = event_number;
723 	}
724 
725 	return (event_ptr);
726 }
727 
728 DAT_RETURN
729 dapls_evd_post_cr_arrival_event(
730     IN DAPL_EVD				*evd_ptr,
731     IN DAT_EVENT_NUMBER			event_number,
732     IN DAT_SP_HANDLE			sp_handle,
733     DAT_IA_ADDRESS_PTR			ia_address_ptr,
734     DAT_CONN_QUAL			conn_qual,
735     DAT_CR_HANDLE			cr_handle)
736 {
737 	DAT_EVENT 		*event_ptr;
738 	event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number);
739 	/*
740 	 * Note event lock may be held on successful return
741 	 * to be released by dapli_evd_post_event(), if provider side locking
742 	 * is needed.
743 	 */
744 
745 	if (!event_ptr) {
746 		return (DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY);
747 	}
748 
749 	event_ptr->event_data.cr_arrival_event_data.sp_handle = sp_handle;
750 	event_ptr->event_data.cr_arrival_event_data.local_ia_address_ptr
751 	    = ia_address_ptr;
752 	event_ptr->event_data.cr_arrival_event_data.conn_qual = conn_qual;
753 	event_ptr->event_data.cr_arrival_event_data.cr_handle = cr_handle;
754 
755 	dapli_evd_post_event(evd_ptr, event_ptr);
756 	return (DAT_SUCCESS);
757 }
758 
759 
760 DAT_RETURN
761 dapls_evd_post_connection_event(
762     IN DAPL_EVD				*evd_ptr,
763     IN DAT_EVENT_NUMBER			event_number,
764     IN DAT_EP_HANDLE			ep_handle,
765     IN DAT_COUNT			private_data_size,
766     IN DAT_PVOID			private_data)
767 {
768 	DAT_EVENT 		*event_ptr;
769 	event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number);
770 	/*
771 	 * Note event lock may be held on successful return
772 	 * to be released by dapli_evd_post_event(), if provider side locking
773 	 * is needed.
774 	 */
775 
776 	if (!event_ptr) {
777 		return (DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY);
778 	}
779 
780 	event_ptr->event_data.connect_event_data.ep_handle = ep_handle;
781 	event_ptr->event_data.connect_event_data.private_data_size
782 	    = private_data_size;
783 	event_ptr->event_data.connect_event_data.private_data = private_data;
784 
785 	dapli_evd_post_event(evd_ptr, event_ptr);
786 	return (DAT_SUCCESS);
787 }
788 
789 
790 DAT_RETURN
791 dapls_evd_post_async_error_event(
792     IN DAPL_EVD				*evd_ptr,
793     IN DAT_EVENT_NUMBER			event_number,
794     IN DAT_IA_HANDLE			ia_handle)
795 {
796 	DAT_EVENT 		*event_ptr;
797 	event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number);
798 	/*
799 	 * Note event lock may be held on successful return
800 	 * to be released by dapli_evd_post_event(), if provider side locking
801 	 * is needed.
802 	 */
803 
804 	if (!event_ptr) {
805 		return (DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY);
806 	}
807 
808 	event_ptr->event_data.asynch_error_event_data.dat_handle = ia_handle;
809 
810 	dapli_evd_post_event(evd_ptr, event_ptr);
811 	return (DAT_SUCCESS);
812 }
813 
814 
815 DAT_RETURN
816 dapls_evd_post_software_event(
817     IN DAPL_EVD				*evd_ptr,
818     IN DAT_EVENT_NUMBER			event_number,
819     IN DAT_PVOID			pointer)
820 {
821 	DAT_EVENT 		*event_ptr;
822 	event_ptr = dapli_evd_get_and_init_event(evd_ptr, event_number);
823 	/*
824 	 * Note event lock may be held on successful return
825 	 * to be released by dapli_evd_post_event(), if provider side locking
826 	 * is needed.
827 	 */
828 
829 	if (!event_ptr) {
830 		return (DAT_QUEUE_FULL);
831 	}
832 
833 	event_ptr->event_data.software_event_data.pointer = pointer;
834 
835 	dapli_evd_post_event(evd_ptr, event_ptr);
836 	return (DAT_SUCCESS);
837 }
838 
839 void
840 dapls_evd_post_premature_events(IN DAPL_EP *ep_ptr)
841 {
842 	DAPL_EVD		*evd_ptr;
843 	DAT_EVENT		*event;
844 	ib_work_completion_t	*cqe;
845 	uint32_t		qpn;
846 	int			prm_idx;
847 	int			nevents;
848 	int			i;
849 
850 	dapls_ib_poll_premature_events(ep_ptr, &cqe, &nevents);
851 	/* premature events are always recv events */
852 	evd_ptr = ep_ptr->param.recv_evd_handle;
853 	qpn = ep_ptr->qpn;
854 
855 	i = 0;
856 	prm_idx = 0;
857 	while (i < nevents) {
858 		/*
859 		 * If srq_attached, premature events cannot exceed max_recv_dtos
860 		 */
861 		dapl_os_assert(!ep_ptr->srq_attached ||
862 		    (prm_idx <= ((DAPL_SRQ *)ep_ptr->param.srq_handle)->
863 		    param.max_recv_dtos));
864 
865 		/*
866 		 * The SRQ premature event list could potentially have
867 		 * holes (ie. free entries in the middle) or premature
868 		 * events for other QPs. These need to be skipped.
869 		 */
870 		if (ep_ptr->srq_attached &&
871 		    (!DAPL_CQE_IS_VALID(&cqe[prm_idx]) ||
872 		    (DAPL_GET_CQE_QPN(&cqe[prm_idx]) != qpn))) {
873 			prm_idx++;
874 			continue;
875 		}
876 
877 		dapl_dbg_log(DAPL_DBG_TYPE_DTO_COMP_ERR,
878 		    " Premature DTO processing\n");
879 
880 #ifdef	DAPL_DBG	/* For debugging.  */
881 		dapli_evd_eh_print_cqe(cqe[i]);
882 #endif
883 		/*
884 		 * Can use DAT_DTO_COMPLETION_EVENT because
885 		 * dapli_evd_cqe_to_event will overwrite.
886 		 */
887 		event = dapli_evd_get_and_init_event(evd_ptr,
888 		    DAT_DTO_COMPLETION_EVENT);
889 		if (event == NULL) {
890 			/* We've already attempted the overflow post, return */
891 			return;
892 		}
893 		(void) dapli_evd_cqe_to_event(evd_ptr, &cqe[i], DAT_TRUE,
894 		    event);
895 		dapli_evd_post_event_nosignal(evd_ptr, event);
896 		/*
897 		 * For SRQ attached QPs recycle the premature event
898 		 */
899 		if (ep_ptr->srq_attached) {
900 			dapls_ib_free_premature_events(ep_ptr, prm_idx);
901 			prm_idx++;
902 		}
903 		i++;
904 	}
905 }
906 
907 /*
908  * dapli_evd_cqe_to_event
909  *
910  * Convert a CQE into an event structure.
911  *
912  * Input:
913  *	evd_ptr
914  * 	cqe_ptr
915  *
916  * Output:
917  * 	event_ptr
918  *
919  * Returns:
920  * 	none
921  *
922  */
923 static DAT_BOOLEAN
924 dapli_evd_cqe_to_event(
925     IN DAPL_EVD			*evd_ptr,
926     IN ib_work_completion_t	*cqe_ptr,
927     IN DAT_BOOLEAN		process_premature_events,
928     OUT DAT_EVENT		*event_ptr)
929 {
930 	DAPL_EP			*ep_ptr;
931 	DAPL_SRQ		*srq_ptr;
932 	DAPL_COOKIE		*cookie;
933 	DAT_EP_STATE		ep_state;
934 	ib_qp_handle_t		qp;
935 	ib_uint32_t		ib_status;
936 	ib_uint32_t		ibtype;
937 	int			srq_enabled;
938 	int			dto_error = 0;
939 
940 
941 	/*
942 	 * All that can be relied on if the status is bad is the status
943 	 * and WRID.
944 	 */
945 	ib_status = DAPL_GET_CQE_STATUS(cqe_ptr);
946 
947 	cookie = (DAPL_COOKIE *)((uintptr_t)DAPL_GET_CQE_WRID(cqe_ptr));
948 	dapl_os_assert((NULL != cookie));
949 
950 	if (cookie->queue_type == DAPL_COOKIE_QUEUE_EP) {
951 		srq_enabled = 0;
952 		ep_ptr = cookie->queue.ep;
953 	} else {
954 		srq_enabled = 1;
955 		srq_ptr = cookie->queue.srq;
956 		dapl_os_assert(NULL != srq_ptr);
957 		dapl_os_assert(srq_ptr->header.magic == DAPL_MAGIC_SRQ);
958 		ib_status = DAPL_GET_CQE_STATUS(cqe_ptr);
959 		ep_ptr = dapls_ib_srq_lookup_ep(srq_ptr, cqe_ptr);
960 	}
961 
962 	dapl_os_assert((NULL != ep_ptr));
963 	dapl_os_assert((ep_ptr->header.magic == DAPL_MAGIC_EP) ||
964 	    (ep_ptr->header.magic == DAPL_MAGIC_EP_EXIT));
965 
966 	event_ptr->evd_handle = (DAT_EVD_HANDLE) evd_ptr;
967 
968 	/*
969 	 * Check if the DTO completion arrived before CONNECTION_ESTABLISHED
970 	 * event -
971 	 *
972 	 * Send DTOs can occur only if ep state is CONNECTED/DISCONNECTED
973 	 * therefore it cannot occur before connection established event.
974 	 * Receive DTO can potentially complete before connection established
975 	 * event has been delivered to the client. In this case if the
976 	 * ep state is ACTIVE_CONNECTION_PENDING (active side) or
977 	 * COMPLETION_PENDING (passive side) the event is put in a special
978 	 * event queue in the qp_handle.
979 	 *
980 	 */
981 	if (!process_premature_events &&
982 	    (cookie->type == DAPL_COOKIE_TYPE_DTO) &&
983 	    (ib_status == IB_COMP_ST_SUCCESS)) {
984 		ep_state = ep_ptr->param.ep_state;
985 		qp = ep_ptr->qp_handle;
986 		if ((ep_state == DAT_EP_STATE_ACTIVE_CONNECTION_PENDING) ||
987 		    (ep_state == DAT_EP_STATE_COMPLETION_PENDING) ||
988 		    (qp->qp_num_premature_events > 0)) {
989 			/*
990 			 * not yet ready to put the event in the evd ring
991 			 * buffer
992 			 */
993 			dapls_ib_store_premature_events(qp, cqe_ptr);
994 			return (DAT_FALSE);
995 		}
996 	}
997 
998 	switch (cookie->type) {
999 	case DAPL_COOKIE_TYPE_DTO:
1000 	{
1001 		DAPL_COOKIE_BUFFER	*buffer;
1002 
1003 		if (DAPL_DTO_TYPE_RECV == cookie->val.dto.type) {
1004 			if (srq_enabled) {
1005 				dapl_os_atomic_dec(&srq_ptr->recv_count);
1006 				buffer = &srq_ptr->recv_buffer;
1007 			} else {
1008 				dapl_os_atomic_dec(&ep_ptr->recv_count);
1009 				buffer = &ep_ptr->recv_buffer;
1010 			}
1011 		} else {
1012 			dapl_os_atomic_dec(&ep_ptr->req_count);
1013 			buffer = &ep_ptr->req_buffer;
1014 		}
1015 
1016 		event_ptr->event_number = DAT_DTO_COMPLETION_EVENT;
1017 		event_ptr->event_data.dto_completion_event_data.ep_handle =
1018 		    ep_ptr;
1019 		event_ptr->event_data.dto_completion_event_data.user_cookie =
1020 		    cookie->val.dto.cookie;
1021 
1022 		switch (ib_status) {
1023 		case IB_COMP_ST_SUCCESS:
1024 		{
1025 			ibtype = DAPL_GET_CQE_OPTYPE(cqe_ptr);
1026 
1027 			event_ptr->event_data.dto_completion_event_data.status =
1028 			    DAT_DTO_SUCCESS;
1029 			dapl_os_assert((ibtype == OP_SEND &&
1030 			    cookie->val.dto.type == DAPL_DTO_TYPE_SEND) ||
1031 			    (ibtype == OP_RECEIVE &&
1032 			    cookie->val.dto.type == DAPL_DTO_TYPE_RECV) ||
1033 			    (ibtype == OP_RDMA_WRITE &&
1034 			    cookie->val.dto.type ==
1035 			    DAPL_DTO_TYPE_RDMA_WRITE) ||
1036 			    (ibtype == OP_RDMA_READ &&
1037 			    cookie->val.dto.type ==
1038 			    DAPL_DTO_TYPE_RDMA_READ));
1039 			break;
1040 		}
1041 		case IB_COMP_ST_LOCAL_LEN_ERR:
1042 		{
1043 			event_ptr->event_data.dto_completion_event_data.status =
1044 			    DAT_DTO_ERR_LOCAL_LENGTH;
1045 			break;
1046 		}
1047 		case IB_COMP_ST_LOCAL_PROTECT_ERR:
1048 		{
1049 			event_ptr->event_data.dto_completion_event_data.status =
1050 			    DAT_DTO_ERR_LOCAL_PROTECTION;
1051 			break;
1052 		}
1053 		case IB_COMP_ST_WR_FLUSHED_ERR:
1054 		{
1055 			event_ptr->event_data.dto_completion_event_data.status =
1056 			    DAT_DTO_ERR_FLUSHED;
1057 			break;
1058 		}
1059 		case IB_COMP_ST_BAD_RESPONSE_ERR:
1060 		{
1061 			event_ptr->event_data.dto_completion_event_data.status =
1062 			    DAT_DTO_ERR_BAD_RESPONSE;
1063 			break;
1064 		}
1065 		case IB_COMP_ST_REM_REQ_ERR:
1066 		case IB_COMP_ST_REM_OP_ERR:
1067 		{
1068 			event_ptr->event_data.dto_completion_event_data.status =
1069 			    DAT_DTO_ERR_REMOTE_RESPONDER;
1070 			break;
1071 		}
1072 		case IB_COMP_ST_REM_ACC_ERR:
1073 		{
1074 			event_ptr->event_data.dto_completion_event_data.status =
1075 			    DAT_DTO_ERR_REMOTE_ACCESS;
1076 			break;
1077 		}
1078 		/*
1079 		 * Unsupported RD errors
1080 		 * case IB_COMP_ST_EE_STATE_ERR:
1081 		 * case IB_COMP_ST_EE_CTX_NO_ERR:
1082 		 */
1083 		case IB_COMP_ST_TRANSP_COUNTER:
1084 		{
1085 			event_ptr->event_data.dto_completion_event_data.status =
1086 			    DAT_DTO_ERR_TRANSPORT;
1087 			break;
1088 		}
1089 		case IB_COMP_ST_RNR_COUNTER:
1090 		{
1091 			event_ptr->event_data.dto_completion_event_data.status =
1092 			    DAT_DTO_ERR_RECEIVER_NOT_READY;
1093 			break;
1094 		}
1095 		case IB_COMP_ST_MW_BIND_ERR:
1096 		{
1097 			event_ptr->event_data.dto_completion_event_data.status =
1098 			    DAT_RMR_OPERATION_FAILED;
1099 			break;
1100 		}
1101 		case IB_COMP_ST_LOCAL_OP_ERR:
1102 		{
1103 			event_ptr->event_data.dto_completion_event_data.status =
1104 			    DAT_DTO_ERR_LOCAL_EP;
1105 			break;
1106 		}
1107 		default:
1108 		{
1109 			dapl_dbg_log(DAPL_DBG_TYPE_DTO_COMP_ERR,
1110 			    " DTO completion ERROR: %d: op %#x\n",
1111 			    DAPL_GET_CQE_STATUS(cqe_ptr),
1112 			    DAPL_GET_CQE_OPTYPE(cqe_ptr));
1113 			event_ptr->event_data.dto_completion_event_data.status =
1114 			    DAT_DTO_FAILURE;
1115 			break;
1116 		}
1117 		}
1118 
1119 		/* Most error DTO ops result in disconnecting the EP */
1120 		if ((event_ptr->event_data.dto_completion_event_data.status !=
1121 		    DAT_DTO_SUCCESS) &&
1122 		    (event_ptr->event_data.dto_completion_event_data.status !=
1123 		    DAT_RMR_OPERATION_FAILED)) {
1124 			dto_error = 1;
1125 			dapl_dbg_log(DAPL_DBG_TYPE_DTO_COMP_ERR,
1126 			    " DTO completion ERROR: %d: op %#x\n",
1127 			    DAPL_GET_CQE_STATUS(cqe_ptr),
1128 			    DAPL_GET_CQE_OPTYPE(cqe_ptr));
1129 		}
1130 
1131 		if (cookie->val.dto.type == DAPL_DTO_TYPE_SEND ||
1132 		    cookie->val.dto.type == DAPL_DTO_TYPE_RDMA_WRITE) {
1133 			/* Get size from DTO; CQE value may be off.  */
1134 			event_ptr->event_data.dto_completion_event_data.
1135 			    transfered_length = cookie->val.dto.size;
1136 		} else {
1137 			event_ptr->event_data.dto_completion_event_data.
1138 			    transfered_length = DAPL_GET_CQE_BYTESNUM(cqe_ptr);
1139 		}
1140 
1141 		dapls_cookie_dealloc(buffer, cookie);
1142 		break;
1143 	}
1144 
1145 	case DAPL_COOKIE_TYPE_RMR:
1146 	{
1147 		dapl_os_atomic_dec(&ep_ptr->req_count);
1148 
1149 		event_ptr->event_number = DAT_RMR_BIND_COMPLETION_EVENT;
1150 
1151 		event_ptr->event_data.rmr_completion_event_data.rmr_handle =
1152 		    cookie->val.rmr.rmr;
1153 		event_ptr->event_data.rmr_completion_event_data.user_cookie =
1154 		    cookie->val.rmr.cookie;
1155 		if (ib_status == IB_COMP_ST_SUCCESS) {
1156 			ibtype = DAPL_GET_CQE_OPTYPE(cqe_ptr);
1157 
1158 			event_ptr->event_data.rmr_completion_event_data.status =
1159 			    DAT_RMR_BIND_SUCCESS;
1160 			dapl_os_assert(ibtype == OP_BIND_MW);
1161 		} else {
1162 			event_ptr->event_data.rmr_completion_event_data.status =
1163 			    DAT_RMR_BIND_FAILURE;
1164 			dto_error = 1;
1165 		}
1166 
1167 		dapls_cookie_dealloc(&ep_ptr->req_buffer, cookie);
1168 		break;
1169 	}
1170 	default:
1171 	{
1172 		dapl_os_assert(!"Invalid Operation type");
1173 		break;
1174 	}
1175 	}
1176 
1177 	/*
1178 	 * A DTO failed this will cause the connection to be broken
1179 	 */
1180 	if ((dto_error) && (ep_ptr->param.ep_state == DAT_EP_STATE_CONNECTED)) {
1181 		ep_ptr->param.ep_state = DAT_EP_STATE_DISCONNECTED;
1182 		/*
1183 		 * Disconnect at the IB level.
1184 		 */
1185 		dapls_ib_disconnect_clean(ep_ptr, DAT_TRUE, IB_CME_CONNECTED);
1186 	}
1187 	/* convert premature rec to error flush on disconnect */
1188 	if (process_premature_events && (ep_ptr->param.ep_state ==
1189 	    DAT_EP_STATE_DISCONNECTED) && (ib_status == IB_COMP_ST_SUCCESS)) {
1190 		dapl_os_assert(ibtype == OP_RECEIVE &&
1191 		    cookie->val.dto.type == DAPL_DTO_TYPE_RECV);
1192 		event_ptr->event_data.dto_completion_event_data.status =
1193 		    DAT_DTO_ERR_FLUSHED;
1194 	}
1195 	return (DAT_TRUE);
1196 }
1197 
1198 /*
1199  * dapls_evd_copy_cq
1200  *
1201  * Copy all entries on a CQ associated with the EVD onto that EVD
1202  * Up to caller to handle races, if any.  Note that no EVD waiters will
1203  * be awoken by this copy.
1204  *
1205  * Input:
1206  *	evd_ptr
1207  *
1208  * Output:
1209  * 	nevents
1210  *
1211  * Returns:
1212  * 	none
1213  *
1214  */
1215 void
1216 dapls_evd_copy_cq(
1217 	DAPL_EVD	*evd_ptr,
1218 	int		*nevents)
1219 {
1220 	ib_work_completion_t	cqe[MAX_CQES_PER_POLL];
1221 	DAT_RETURN		dat_status;
1222 	ib_cq_handle_t		cq_handle;
1223 	DAT_EVENT		*event;
1224 	uint_t			num_cqes_polled = 0;
1225 	int			cqe_events;
1226 	int			i;
1227 
1228 	cq_handle = evd_ptr->ib_cq_handle;
1229 
1230 	*nevents = 0;
1231 
1232 	if (cq_handle == IB_INVALID_HANDLE) {
1233 		/* Nothing to do if no CQ.  */
1234 		return;
1235 	}
1236 	dat_status = DAPL_POLL(evd_ptr)(cq_handle,
1237 	    cqe, MAX_CQES_PER_POLL, &num_cqes_polled);
1238 
1239 	if (dat_status == DAT_SUCCESS) {
1240 		dapl_dbg_log(DAPL_DBG_TYPE_EVD, "dapls_evd_copy_cq: %u\n",
1241 		    num_cqes_polled);
1242 		cqe_events = 0;
1243 		for (i = 0; i < num_cqes_polled; i++) {
1244 #ifdef	DAPL_DBG	/* For debugging.  */
1245 			dapli_evd_eh_print_cqe(cqe[i]);
1246 #endif
1247 
1248 			/*
1249 			 * Can use DAT_DTO_COMPLETION_EVENT because
1250 			 * dapli_evd_cqe_to_event will overwrite.
1251 			 */
1252 
1253 			event = dapli_evd_get_and_init_event(
1254 			    evd_ptr, DAT_DTO_COMPLETION_EVENT);
1255 			if (event == NULL) {
1256 			/*
1257 			 * We've already attempted the overflow post; return.
1258 			 */
1259 				return;
1260 			}
1261 			if (dapli_evd_cqe_to_event(evd_ptr, &cqe[i], DAT_FALSE,
1262 			    event)) {
1263 				dapli_evd_post_event_nosignal(evd_ptr, event);
1264 				cqe_events++;
1265 			} else {
1266 				dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1267 				    "dapls_evd_copy_cq: premature event\n");
1268 				/*
1269 				 * We've deferred processing the CQE, so add
1270 				 * the event_ptr back to free queue
1271 				 */
1272 				dat_status = dapls_rbuf_add(&evd_ptr->
1273 				    free_event_queue, (void *)event);
1274 				dapl_os_assert(dat_status == DAT_SUCCESS);
1275 				if (evd_ptr->evd_producer_locking_needed) {
1276 					dapl_os_unlock(&evd_ptr->header.lock);
1277 				}
1278 			}
1279 		}
1280 		*nevents = cqe_events;
1281 	} else if (DAT_GET_TYPE(dat_status) != DAT_QUEUE_EMPTY) {
1282 		dapl_dbg_log(DAPL_DBG_TYPE_ERR,
1283 		    "dapls_evd_copy_cq: dapls_ib_completion_poll "
1284 		    "returned 0x%x\n", dat_status);
1285 		dapl_os_assert(!"Bad return from dapls_ib_completion_poll");
1286 	}
1287 }
1288 
1289 /*
1290  * dapls_evd_copy_events
1291  *
1292  * Copy all events associated with the EVD onto that EVD
1293  *
1294  * Input:
1295  *	evd_ptr
1296  *	timeout
1297  *
1298  * Output:
1299  * 	return status
1300  *
1301  * Returns:
1302  * 	none
1303  *
1304  */
1305 DAT_RETURN
1306 dapls_evd_copy_events(
1307     DAPL_EVD 	*evd_ptr,
1308     DAT_TIMEOUT timeout)
1309 {
1310 	dapl_ib_event_t	evp_arr[NUM_EVENTS_PER_POLL];
1311 	dapl_ib_event_t	*evpp_start;
1312 	dapl_ib_event_t	*evpp;
1313 	DAPL_IA		*ia_ptr;
1314 	DAT_RETURN	dat_status;
1315 	int		waited;
1316 	uint64_t	curr_time;
1317 	uint64_t	final_time;
1318 	uint64_t	time_left;
1319 	int		events_needed = 0;
1320 	int		nevents = 0;
1321 	int		num_cqe = 0;
1322 	int		num_ke = 0; /* kernel events - CM or ASYNC events */
1323 	int		i;
1324 
1325 	/* rbuf count is zero on entry */
1326 
1327 	if (evd_ptr->evd_flags & (DAT_EVD_CONNECTION_FLAG |
1328 	    DAT_EVD_CR_FLAG | DAT_EVD_ASYNC_FLAG)) {
1329 		if (evd_ptr->threshold <= NUM_EVENTS_PER_POLL) {
1330 			evpp = evp_arr;
1331 		} else {
1332 			/* need to allocate on the heap */
1333 			evpp = (dapl_ib_event_t *)dapl_os_alloc(
1334 			    evd_ptr->threshold * sizeof (dapl_ib_event_t));
1335 			if (evpp == NULL) {
1336 				return (DAT_INSUFFICIENT_RESOURCES);
1337 			}
1338 		}
1339 		evpp_start = evpp;
1340 		/* for evd_dequeue, check for ke before returning Q_EMPTY */
1341 		if (evd_ptr->threshold == 0 && timeout == 0)
1342 			evd_ptr->threshold = 1;
1343 	} else {
1344 		evpp = NULL;
1345 		evpp_start = NULL;
1346 	}
1347 	ia_ptr = evd_ptr->header.owner_ia;
1348 	waited = 0;
1349 	dat_status = DAT_SUCCESS;
1350 
1351 	/* calculate various time wait elements */
1352 	if (timeout == 0) {
1353 		final_time = 0;
1354 		time_left = 0;
1355 	} else if (timeout == DAT_TIMEOUT_INFINITE) {
1356 		/*
1357 		 * The real value of DAT_TIMEOUT_INFINITE is fairly small
1358 		 * ~71 mins, to prevent premature timeouts map it to
1359 		 * 1 year.  NOTE: 64-bit integers are needed here
1360 		 * because 32 bits is not enough.  Other types,
1361 		 * such as clock_t are not 64-bit, so are not
1362 		 * sufficient for this.  Similarly, hrtime_t is
1363 		 * defined as a "nanosecond counter", which does not
1364 		 * match our need for time in microseconds, so we
1365 		 * just use the more general uint64_t here.
1366 		 */
1367 #define	DAPL_ONE_YEAR_IN_USEC	((365 * 24 * 3600) * 1000000LL)
1368 		curr_time = gethrtime();
1369 		time_left = DAPL_ONE_YEAR_IN_USEC;
1370 		final_time = curr_time + DAPL_ONE_YEAR_IN_USEC * 1000;
1371 	} else {
1372 		/*
1373 		 * maximum time by which the routine needs to return
1374 		 * DAT_TIMEOUT_INFINITE is defined as ~0 but its of type int
1375 		 * so mask the MSB to avoid overflow
1376 		 */
1377 		curr_time = gethrtime();
1378 		final_time = curr_time + (uint64_t)(timeout&0x7fffffff)*1000;
1379 		time_left = (final_time - curr_time)/1000;
1380 	}
1381 
1382 	do {
1383 		/*
1384 		 * If this evd has a CQ event stream check the CQs first
1385 		 */
1386 		if (evd_ptr->evd_flags & (DAT_EVD_DTO_FLAG |
1387 		    DAT_EVD_RMR_BIND_FLAG)) {
1388 			/*
1389 			 * Poll CQ for events, update the total number of CQEs
1390 			 * so far
1391 			 */
1392 			nevents = 0;
1393 			dapls_evd_copy_cq(evd_ptr, &nevents);
1394 			num_cqe += nevents;
1395 			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1396 			    "dapls_evd_copy_event: copy_cq num_cqe(%d)\n",
1397 			    num_cqe);
1398 		}
1399 
1400 		/*
1401 		 * We use the dapls_rbuf_count since it includes
1402 		 *  - CQ events pulled by dapls_evd_copy_cq
1403 		 *  - events added by dat_evd_post_se()
1404 		 */
1405 		events_needed = evd_ptr->threshold - num_ke -
1406 		    dapls_rbuf_count(&evd_ptr->pending_event_queue);
1407 
1408 		/*
1409 		 * check for pending events
1410 		 * note: threshold=0 implies dapl_evd_dequeue
1411 		 */
1412 		if (events_needed < 0) {
1413 			/* There are more than sufficient events */
1414 			break;
1415 		} else if (events_needed == 0) {
1416 			/* report queue empty on dat_evd_dequeue */
1417 			/* non CQ events are expected to be polled */
1418 			/* by dat_evd_wait */
1419 			if (evd_ptr->threshold == 0)
1420 				dat_status =  DAT_ERROR(DAT_QUEUE_EMPTY, 0);
1421 			/*
1422 			 * when threshold > 0, we have sufficient events
1423 			 */
1424 			break;
1425 		} else {
1426 			/*
1427 			 * when we reach here, this implies dat_evd_wait
1428 			 * return on any dto completion as
1429 			 * threshold > 1 will be taken as hint only
1430 			 */
1431 			if (num_cqe)
1432 				break;
1433 		}
1434 
1435 		/* check we've already waited */
1436 		if (waited > 0) {
1437 			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1438 			    "dapls_evd_copy_event: waited[%d]\n", waited);
1439 			if (dat_status != DAT_SUCCESS)
1440 				break;
1441 			curr_time = gethrtime();
1442 			/* exit on time expired */
1443 			if (curr_time >= final_time)
1444 				break;
1445 			time_left = (final_time - curr_time)/1000;
1446 		}
1447 
1448 		/* check for DTO type evd's */
1449 		if (evd_ptr->evd_flags & (DAT_EVD_DTO_FLAG |
1450 		    DAT_EVD_RMR_BIND_FLAG)) {
1451 			if (events_needed == 1) {
1452 				/*
1453 				 * Need only one event so enable cq
1454 				 * notification
1455 				 */
1456 				/*
1457 				 * XXX: Things need to be modified here to
1458 				 * implement the NOTIFICATION suppression
1459 				 * correctly - relies on THRESHOLD flag
1460 				 * and UNSIGNALLED flag to be stored
1461 				 * in the evd.
1462 				 */
1463 				dat_status = dapls_set_cq_notify(ia_ptr,
1464 				    evd_ptr);
1465 				if (dat_status != DAT_SUCCESS) {
1466 					dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1467 					    "dapls_evd_copy_event:"
1468 					    " set_cq_notify(%d)\n", dat_status);
1469 					return (dat_status);
1470 				}
1471 			} else if (events_needed > 1) {
1472 				/*
1473 				 * We need multiple events so lets enable CQ for
1474 				 * notification on N events.
1475 				 * dat_status = dapls_set_cqN_notify(ia_ptr,
1476 				 * evd_ptr, (uint32_t)events_needed);
1477 				 */
1478 				dat_status = dapls_set_cq_notify(ia_ptr,
1479 				    evd_ptr);
1480 				if (dat_status != DAT_SUCCESS) {
1481 					dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1482 					    "dapls_evd_copy_event:"
1483 					    " set_cqN_notify:%d\n", dat_status);
1484 					return (dat_status);
1485 				}
1486 			}
1487 
1488 			/*
1489 			 * Per Tavor PRM if completions occur after polling
1490 			 * the CQ and before arming it, upon arming the CQ
1491 			 * handler will be immediately fired. Hence it
1492 			 * recommends that a re-poll of the CQ can be skipped
1493 			 * as an optimization.
1494 			 */
1495 		}
1496 
1497 		nevents = 0;
1498 
1499 		/*
1500 		 * non-NULL evpp_start denotes either
1501 		 * DAT_EVD_CONNECTION_FLAG, DAT_EVD_CR_FLAG, DAT_EVD_ASYNC_FLAG
1502 		 * is set and thus needs to check events from kernel
1503 		 */
1504 		if (evpp_start) {
1505 			/*
1506 			 * Even if dat_status is not DAT_SUCCESS, num_events
1507 			 * could be non-zero.
1508 			 */
1509 			dat_status = dapls_ib_event_poll(evd_ptr, time_left,
1510 			    (evd_ptr->threshold - (num_cqe + num_ke)), evpp,
1511 			    &nevents);
1512 			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1513 			    "dapls_evd_copy_event: poll returned 0x%x(%d)\n",
1514 			    dat_status, nevents);
1515 
1516 			num_ke += nevents;
1517 			evpp += nevents;
1518 		} else {
1519 			/* perform a timewait */
1520 			dat_status = dapls_ib_event_poll(evd_ptr, time_left,
1521 			    0, NULL, &nevents);
1522 			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1523 			    "dapls_evd_copy_event: poll(cq_notification) "
1524 			    "returned 0x%x\n", dat_status);
1525 			if (DAT_GET_TYPE(dat_status) == DAT_INTERRUPTED_CALL)
1526 				return (dat_status);
1527 		}
1528 
1529 		waited++;
1530 	} while (dapls_rbuf_count(&evd_ptr->pending_event_queue) + num_ke <
1531 	    evd_ptr->threshold);
1532 
1533 	/* process the cm events now */
1534 	for (i = 0; i < num_ke; i++) {
1535 		switch (evpp_start[i].ibe_ev_family) {
1536 		case DAPL_CR_EVENTS: /* PASSIVE side events */
1537 		case DAPL_PASSIVE_CONNECTION_EVENTS:
1538 			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1539 			    "dapls_evd_copy_event: Passive side Event %d\n",
1540 			    evpp_start[i].ibe_ce.ibce_event);
1541 			dapls_cr_callback((ib_cm_handle_t)
1542 			    evpp_start[i].ibe_ce.ibce_psep_cookie,
1543 			    evpp_start[i].ibe_ce.ibce_event,
1544 			    evpp_start[i].ibe_ce.ibce_priv_data_ptr, (void *)
1545 			    (uintptr_t)evpp_start[i].ibe_ce.ibce_cookie);
1546 			break;
1547 		case DAPL_ACTIVE_CONNECTION_EVENTS: /* ACTIVE side events */
1548 			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1549 			    "dapls_evd_copy_event: Active Conn Event %d\n",
1550 			    evpp_start[i].ibe_ce.ibce_event);
1551 			dapl_evd_connection_callback((ib_cm_handle_t)
1552 			    IB_INVALID_HANDLE,
1553 			    evpp_start[i].ibe_ce.ibce_event,
1554 			    evpp_start[i].ibe_ce.ibce_priv_data_ptr, (void *)
1555 			    (uintptr_t)evpp_start[i].ibe_ce.ibce_cookie);
1556 			break;
1557 		case DAPL_ASYNC_EVENTS:
1558 			dapl_dbg_log(DAPL_DBG_TYPE_EVD,
1559 			    "dapls_evd_copy_event: Async Event %d\n",
1560 			    evpp_start[i].ibe_async.ibae_type);
1561 			dapls_ib_async_callback(evd_ptr,
1562 			    ia_ptr->hca_ptr->ib_hca_handle,
1563 			    &(evpp_start[i].ibe_async), ia_ptr);
1564 			break;
1565 		default:
1566 			dapl_dbg_log(DAPL_DBG_TYPE_ERR,
1567 			    "dapls_evd_copy_event: dapls_ib_event_poll %d "
1568 			    "returned 0x%x\n", i, evpp_start[i].ibe_ev_family);
1569 			dapl_os_assert(!"Bad return from dapls_ib_event_poll");
1570 			break;
1571 		}
1572 	}
1573 
1574 	return (dat_status);
1575 }
1576 
1577 /*
1578  * dapls_evd_cq_poll_to_event
1579  *
1580  * Attempt to dequeue a single CQE from a CQ and turn it into
1581  * an event.
1582  *
1583  * Input:
1584  *	evd_ptr
1585  *
1586  * Output:
1587  * 	event
1588  *
1589  * Returns:
1590  * 	Status of operation
1591  *
1592  */
1593 DAT_RETURN
1594 dapls_evd_cq_poll_to_event(
1595     IN DAPL_EVD 	*evd_ptr,
1596     OUT DAT_EVENT	*event)
1597 {
1598 	DAT_RETURN		dat_status;
1599 	ib_work_completion_t	cur_cqe;
1600 
1601 	/* skip one layer of do-nothing function */
1602 	dat_status = DAPL_POLL1(evd_ptr)(evd_ptr->ib_cq_handle, &cur_cqe);
1603 
1604 	if (dat_status == DAT_SUCCESS) {
1605 #ifdef	DAPL_DBG	/* For debugging.  */
1606 		dapli_evd_eh_print_cqe(cur_cqe);
1607 #endif
1608 		(void) dapli_evd_cqe_to_event(evd_ptr, &cur_cqe, DAT_FALSE,
1609 		    event);
1610 	}
1611 
1612 	return (dat_status);
1613 }
1614 
1615 /*
1616  * Local variables:
1617  *  c-indent-level: 4
1618  *  c-basic-offset: 4
1619  *  tab-width: 8
1620  * End:
1621  */
1622