xref: /illumos-gate/usr/src/cmd/fm/fmd/common/fmd_eventq.c (revision bbf21555)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
24  */
25 
26 #include <fmd_alloc.h>
27 #include <fmd_eventq.h>
28 #include <fmd_module.h>
29 #include <fmd_dispq.h>
30 #include <fmd_subr.h>
31 
32 #include <fmd.h>
33 
34 fmd_eventq_t *
fmd_eventq_create(fmd_module_t * mp,fmd_eventqstat_t * stats,pthread_mutex_t * stats_lock,uint_t limit)35 fmd_eventq_create(fmd_module_t *mp, fmd_eventqstat_t *stats,
36     pthread_mutex_t *stats_lock, uint_t limit)
37 {
38 	fmd_eventq_t *eq = fmd_zalloc(sizeof (fmd_eventq_t), FMD_SLEEP);
39 
40 	(void) pthread_mutex_init(&eq->eq_lock, NULL);
41 	(void) pthread_cond_init(&eq->eq_cv, NULL);
42 
43 	eq->eq_mod = mp;
44 	eq->eq_stats = stats;
45 	eq->eq_stats_lock = stats_lock;
46 	eq->eq_limit = limit;
47 	eq->eq_sgid = fmd_dispq_getgid(fmd.d_disp, eq);
48 
49 	return (eq);
50 }
51 
52 void
fmd_eventq_destroy(fmd_eventq_t * eq)53 fmd_eventq_destroy(fmd_eventq_t *eq)
54 {
55 	fmd_eventqelem_t *eqe;
56 
57 	while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) {
58 		fmd_list_delete(&eq->eq_list, eqe);
59 		fmd_event_rele(eqe->eqe_event);
60 		fmd_free(eqe, sizeof (fmd_eventqelem_t));
61 	}
62 
63 	fmd_dispq_delgid(fmd.d_disp, eq->eq_sgid);
64 	fmd_free(eq, sizeof (fmd_eventq_t));
65 }
66 
67 static void
fmd_eventq_drop(fmd_eventq_t * eq,fmd_eventqelem_t * eqe)68 fmd_eventq_drop(fmd_eventq_t *eq, fmd_eventqelem_t *eqe)
69 {
70 	(void) pthread_mutex_lock(eq->eq_stats_lock);
71 	eq->eq_stats->eqs_dropped.fmds_value.ui64++;
72 	(void) pthread_mutex_unlock(eq->eq_stats_lock);
73 
74 	fmd_event_rele(eqe->eqe_event);
75 	fmd_free(eqe, sizeof (fmd_eventqelem_t));
76 }
77 
78 void
fmd_eventq_drop_topo(fmd_eventq_t * eq)79 fmd_eventq_drop_topo(fmd_eventq_t *eq)
80 {
81 	fmd_eventqelem_t *eqe, *tmp;
82 	boolean_t got_fm_events = B_FALSE;
83 
84 	/*
85 	 * Here we iterate through the per-module event queue in order to remove
86 	 * redundant FMD_EVT_TOPO events.  The trick is to not drop a given
87 	 * topo event if there are any FM protocol events in the queue after
88 	 * it, as those events need to be processed with the correct topology.
89 	 */
90 	(void) pthread_mutex_lock(&eq->eq_lock);
91 	eqe = fmd_list_prev(&eq->eq_list);
92 	while (eqe) {
93 		if (FMD_EVENT_TYPE(eqe->eqe_event) == FMD_EVT_TOPO) {
94 			if (!got_fm_events) {
95 				tmp = eqe;
96 				eqe = fmd_list_prev(eqe);
97 				fmd_list_delete(&eq->eq_list, tmp);
98 				eq->eq_size--;
99 				fmd_eventq_drop(eq, tmp);
100 			} else {
101 				got_fm_events = B_FALSE;
102 				eqe = fmd_list_prev(eqe);
103 			}
104 		} else if (FMD_EVENT_TYPE(eqe->eqe_event) == FMD_EVT_PROTOCOL) {
105 			got_fm_events = B_TRUE;
106 			eqe = fmd_list_prev(eqe);
107 		} else
108 			eqe = fmd_list_prev(eqe);
109 	}
110 	(void) pthread_mutex_unlock(&eq->eq_lock);
111 }
112 
113 /*
114  * Update statistics when an event is dispatched and placed on a module's event
115  * queue.  This is essentially the same code as kstat_waitq_enter(9F).
116  */
117 static void
fmd_eventqstat_dispatch(fmd_eventq_t * eq)118 fmd_eventqstat_dispatch(fmd_eventq_t *eq)
119 {
120 	fmd_eventqstat_t *eqs = eq->eq_stats;
121 	hrtime_t new, delta;
122 	uint32_t wcnt;
123 
124 	(void) pthread_mutex_lock(eq->eq_stats_lock);
125 
126 	new = gethrtime();
127 	delta = new - eqs->eqs_wlastupdate.fmds_value.ui64;
128 	eqs->eqs_wlastupdate.fmds_value.ui64 = new;
129 	wcnt = eqs->eqs_wcnt.fmds_value.ui32++;
130 
131 	if (wcnt != 0) {
132 		eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt;
133 		eqs->eqs_wtime.fmds_value.ui64 += delta;
134 	}
135 
136 	eqs->eqs_dispatched.fmds_value.ui64++;
137 	(void) pthread_mutex_unlock(eq->eq_stats_lock);
138 }
139 
140 void
fmd_eventq_insert_at_head(fmd_eventq_t * eq,fmd_event_t * ep)141 fmd_eventq_insert_at_head(fmd_eventq_t *eq, fmd_event_t *ep)
142 {
143 	uint_t evt = FMD_EVENT_TYPE(ep);
144 	fmd_eventqelem_t *eqe;
145 	int ok;
146 
147 	/*
148 	 * If this event queue is acting as /dev/null, bounce the reference
149 	 * count to free an unreferenced event and just return immediately.
150 	 */
151 	if (eq->eq_limit == 0) {
152 		fmd_event_hold(ep);
153 		fmd_event_rele(ep);
154 		return;
155 	}
156 
157 	eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP);
158 	fmd_event_hold(ep);
159 	eqe->eqe_event = ep;
160 
161 	(void) pthread_mutex_lock(&eq->eq_lock);
162 
163 	if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) {
164 		if (evt != FMD_EVT_CTL)
165 			fmd_eventqstat_dispatch(eq);
166 
167 		fmd_list_prepend(&eq->eq_list, eqe);
168 		eq->eq_size++;
169 	}
170 
171 	(void) pthread_cond_broadcast(&eq->eq_cv);
172 	(void) pthread_mutex_unlock(&eq->eq_lock);
173 
174 	if (!ok)
175 		fmd_eventq_drop(eq, eqe);
176 }
177 
178 void
fmd_eventq_insert_at_time(fmd_eventq_t * eq,fmd_event_t * ep)179 fmd_eventq_insert_at_time(fmd_eventq_t *eq, fmd_event_t *ep)
180 {
181 	uint_t evt = FMD_EVENT_TYPE(ep);
182 	hrtime_t hrt = fmd_event_hrtime(ep);
183 	fmd_eventqelem_t *eqe, *oqe;
184 	int ok;
185 
186 	/*
187 	 * If this event queue is acting as /dev/null, bounce the reference
188 	 * count to free an unreferenced event and just return immediately.
189 	 */
190 	if (eq->eq_limit == 0) {
191 		fmd_event_hold(ep);
192 		fmd_event_rele(ep);
193 		return;
194 	}
195 
196 	eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP);
197 	fmd_event_hold(ep);
198 	eqe->eqe_event = ep;
199 
200 	(void) pthread_mutex_lock(&eq->eq_lock);
201 
202 	/*
203 	 * fmd makes no guarantees that events will be delivered in time order
204 	 * because its transport can make no such guarantees.  Instead we make
205 	 * a looser guarantee that an enqueued event will be dequeued before
206 	 * any newer *pending* events according to event time.  This permits us
207 	 * to state, for example, that a timer expiry event will be delivered
208 	 * prior to any enqueued event whose time is after the timer expired.
209 	 * We use a simple insertion sort for this task, as queue lengths are
210 	 * typically short and events do *tend* to be received chronologically.
211 	 */
212 	for (oqe = fmd_list_prev(&eq->eq_list); oqe; oqe = fmd_list_prev(oqe)) {
213 		if (hrt >= fmd_event_hrtime(oqe->eqe_event))
214 			break; /* 'ep' is newer than the event in 'oqe' */
215 	}
216 
217 	if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) {
218 		if (evt != FMD_EVT_CTL)
219 			fmd_eventqstat_dispatch(eq);
220 
221 		if (oqe == NULL)
222 			fmd_list_prepend(&eq->eq_list, eqe);
223 		else
224 			fmd_list_insert_after(&eq->eq_list, oqe, eqe);
225 		eq->eq_size++;
226 	}
227 
228 	(void) pthread_cond_broadcast(&eq->eq_cv);
229 	(void) pthread_mutex_unlock(&eq->eq_lock);
230 
231 	if (!ok)
232 		fmd_eventq_drop(eq, eqe);
233 }
234 
235 fmd_event_t *
fmd_eventq_delete(fmd_eventq_t * eq)236 fmd_eventq_delete(fmd_eventq_t *eq)
237 {
238 	fmd_eventqstat_t *eqs = eq->eq_stats;
239 	hrtime_t new, delta;
240 	uint32_t wcnt;
241 
242 	fmd_eventqelem_t *eqe;
243 	fmd_event_t *ep;
244 top:
245 	(void) pthread_mutex_lock(&eq->eq_lock);
246 
247 	while (!(eq->eq_flags & FMD_EVENTQ_ABORT) &&
248 	    (eq->eq_size == 0 || (eq->eq_flags & FMD_EVENTQ_SUSPEND)))
249 		(void) pthread_cond_wait(&eq->eq_cv, &eq->eq_lock);
250 
251 	if (eq->eq_flags & FMD_EVENTQ_ABORT) {
252 		(void) pthread_mutex_unlock(&eq->eq_lock);
253 		return (NULL);
254 	}
255 
256 	eqe = fmd_list_next(&eq->eq_list);
257 	fmd_list_delete(&eq->eq_list, eqe);
258 	eq->eq_size--;
259 
260 	(void) pthread_mutex_unlock(&eq->eq_lock);
261 
262 	ep = eqe->eqe_event;
263 	fmd_free(eqe, sizeof (fmd_eventqelem_t));
264 
265 	/*
266 	 * If we dequeued a control event, release it and go back to sleep.
267 	 * fmd_event_rele() on the event will block as described in fmd_ctl.c.
268 	 * This effectively renders control events invisible to our callers
269 	 * as well as to statistics and observability tools (e.g. fmstat(8)).
270 	 */
271 	if (FMD_EVENT_TYPE(ep) == FMD_EVT_CTL) {
272 		fmd_event_rele(ep);
273 		goto top;
274 	}
275 
276 	/*
277 	 * Before returning, update our statistics.  This code is essentially
278 	 * kstat_waitq_to_runq(9F), except simplified because our queues are
279 	 * always consumed by a single thread (i.e. runq len == 1).
280 	 */
281 	(void) pthread_mutex_lock(eq->eq_stats_lock);
282 
283 	new = gethrtime();
284 	delta = new - eqs->eqs_wlastupdate.fmds_value.ui64;
285 
286 	eqs->eqs_wlastupdate.fmds_value.ui64 = new;
287 	eqs->eqs_dlastupdate.fmds_value.ui64 = new;
288 
289 	ASSERT(eqs->eqs_wcnt.fmds_value.ui32 != 0);
290 	wcnt = eqs->eqs_wcnt.fmds_value.ui32--;
291 
292 	eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt;
293 	eqs->eqs_wtime.fmds_value.ui64 += delta;
294 
295 	if (FMD_EVENT_TYPE(ep) == FMD_EVT_PROTOCOL)
296 		eqs->eqs_prdequeued.fmds_value.ui64++;
297 
298 	eqs->eqs_dequeued.fmds_value.ui64++;
299 	(void) pthread_mutex_unlock(eq->eq_stats_lock);
300 
301 	return (ep);
302 }
303 
304 /*
305  * Update statistics when an event is done being processed by the eventq's
306  * consumer thread.  This is essentially kstat_runq_exit(9F) simplified for
307  * our principle that a single thread consumes the queue (i.e. runq len == 1).
308  */
309 void
fmd_eventq_done(fmd_eventq_t * eq)310 fmd_eventq_done(fmd_eventq_t *eq)
311 {
312 	fmd_eventqstat_t *eqs = eq->eq_stats;
313 	hrtime_t new, delta;
314 
315 	(void) pthread_mutex_lock(eq->eq_stats_lock);
316 
317 	new = gethrtime();
318 	delta = new - eqs->eqs_dlastupdate.fmds_value.ui64;
319 
320 	eqs->eqs_dlastupdate.fmds_value.ui64 = new;
321 	eqs->eqs_dtime.fmds_value.ui64 += delta;
322 
323 	(void) pthread_mutex_unlock(eq->eq_stats_lock);
324 }
325 
326 void
fmd_eventq_cancel(fmd_eventq_t * eq,uint_t type,void * data)327 fmd_eventq_cancel(fmd_eventq_t *eq, uint_t type, void *data)
328 {
329 	fmd_eventqelem_t *eqe, *nqe;
330 
331 	(void) pthread_mutex_lock(&eq->eq_lock);
332 
333 	for (eqe = fmd_list_next(&eq->eq_list); eqe != NULL; eqe = nqe) {
334 		nqe = fmd_list_next(eqe);
335 
336 		if (fmd_event_match(eqe->eqe_event, type, data)) {
337 			fmd_list_delete(&eq->eq_list, eqe);
338 			eq->eq_size--;
339 			fmd_event_rele(eqe->eqe_event);
340 			fmd_free(eqe, sizeof (fmd_eventqelem_t));
341 		}
342 	}
343 
344 	(void) pthread_mutex_unlock(&eq->eq_lock);
345 }
346 
347 void
fmd_eventq_suspend(fmd_eventq_t * eq)348 fmd_eventq_suspend(fmd_eventq_t *eq)
349 {
350 	(void) pthread_mutex_lock(&eq->eq_lock);
351 	eq->eq_flags |= FMD_EVENTQ_SUSPEND;
352 	(void) pthread_mutex_unlock(&eq->eq_lock);
353 }
354 
355 void
fmd_eventq_resume(fmd_eventq_t * eq)356 fmd_eventq_resume(fmd_eventq_t *eq)
357 {
358 	(void) pthread_mutex_lock(&eq->eq_lock);
359 	eq->eq_flags &= ~FMD_EVENTQ_SUSPEND;
360 	(void) pthread_cond_broadcast(&eq->eq_cv);
361 	(void) pthread_mutex_unlock(&eq->eq_lock);
362 }
363 
364 void
fmd_eventq_abort(fmd_eventq_t * eq)365 fmd_eventq_abort(fmd_eventq_t *eq)
366 {
367 	fmd_eventqelem_t *eqe;
368 
369 	(void) pthread_mutex_lock(&eq->eq_lock);
370 
371 	while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) {
372 		fmd_list_delete(&eq->eq_list, eqe);
373 		fmd_event_rele(eqe->eqe_event);
374 		fmd_free(eqe, sizeof (fmd_eventqelem_t));
375 	}
376 
377 	eq->eq_flags |= FMD_EVENTQ_ABORT;
378 	(void) pthread_cond_broadcast(&eq->eq_cv);
379 	(void) pthread_mutex_unlock(&eq->eq_lock);
380 }
381