xref: /illumos-gate/usr/src/uts/i86pc/os/x_call.c (revision a3114836)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /*
26  * Copyright (c) 2010, Intel Corporation.
27  * All rights reserved.
28  */
29 
30 #include <sys/types.h>
31 #include <sys/param.h>
32 #include <sys/t_lock.h>
33 #include <sys/thread.h>
34 #include <sys/cpuvar.h>
35 #include <sys/x_call.h>
36 #include <sys/xc_levels.h>
37 #include <sys/cpu.h>
38 #include <sys/psw.h>
39 #include <sys/sunddi.h>
40 #include <sys/debug.h>
41 #include <sys/systm.h>
42 #include <sys/archsystm.h>
43 #include <sys/machsystm.h>
44 #include <sys/mutex_impl.h>
45 #include <sys/stack.h>
46 #include <sys/promif.h>
47 #include <sys/x86_archext.h>
48 
49 /*
50  * Implementation for cross-processor calls via interprocessor interrupts
51  *
52  * This implementation uses a message passing architecture to allow multiple
53  * concurrent cross calls to be in flight at any given time. We use the cmpxchg
54  * instruction, aka casptr(), to implement simple efficient work queues for
55  * message passing between CPUs with almost no need for regular locking.
56  * See xc_extract() and xc_insert() below.
57  *
58  * The general idea is that initiating a cross call means putting a message
59  * on a target(s) CPU's work queue. Any synchronization is handled by passing
60  * the message back and forth between initiator and target(s).
61  *
62  * Every CPU has xc_work_cnt, which indicates it has messages to process.
63  * This value is incremented as message traffic is initiated and decremented
64  * with every message that finishes all processing.
65  *
66  * The code needs no mfence or other membar_*() calls. The uses of
67  * casptr(), cas32() and atomic_dec_32() for the message passing are
68  * implemented with LOCK prefix instructions which are equivalent to mfence.
69  *
70  * One interesting aspect of this implmentation is that it allows 2 or more
71  * CPUs to initiate cross calls to intersecting sets of CPUs at the same time.
72  * The cross call processing by the CPUs will happen in any order with only
73  * a guarantee, for xc_call() and xc_sync(), that an initiator won't return
74  * from cross calls before all slaves have invoked the function.
75  *
76  * The reason for this asynchronous approach is to allow for fast global
77  * TLB shootdowns. If all CPUs, say N, tried to do a global TLB invalidation
78  * on a different Virtual Address at the same time. The old code required
79  * N squared IPIs. With this method, depending on timing, it could happen
80  * with just N IPIs.
81  */
82 
83 /*
84  * The default is to not enable collecting counts of IPI information, since
85  * the updating of shared cachelines could cause excess bus traffic.
86  */
87 uint_t xc_collect_enable = 0;
88 uint64_t xc_total_cnt = 0;	/* total #IPIs sent for cross calls */
89 uint64_t xc_multi_cnt = 0;	/* # times we piggy backed on another IPI */
90 
91 /*
92  * Values for message states. Here are the normal transitions. A transition
93  * of "->" happens in the slave cpu and "=>" happens in the master cpu as
94  * the messages are passed back and forth.
95  *
96  * FREE => ASYNC ->                       DONE => FREE
97  * FREE => CALL ->                        DONE => FREE
98  * FREE => SYNC -> WAITING => RELEASED -> DONE => FREE
99  *
100  * The interesing one above is ASYNC. You might ask, why not go directly
101  * to FREE, instead of DONE. If it did that, it might be possible to exhaust
102  * the master's xc_free list if a master can generate ASYNC messages faster
103  * then the slave can process them. That could be handled with more complicated
104  * handling. However since nothing important uses ASYNC, I've not bothered.
105  */
106 #define	XC_MSG_FREE	(0)	/* msg in xc_free queue */
107 #define	XC_MSG_ASYNC	(1)	/* msg in slave xc_msgbox */
108 #define	XC_MSG_CALL	(2)	/* msg in slave xc_msgbox */
109 #define	XC_MSG_SYNC	(3)	/* msg in slave xc_msgbox */
110 #define	XC_MSG_WAITING	(4)	/* msg in master xc_msgbox or xc_waiters */
111 #define	XC_MSG_RELEASED	(5)	/* msg in slave xc_msgbox */
112 #define	XC_MSG_DONE	(6)	/* msg in master xc_msgbox */
113 
114 /*
115  * We allow for one high priority message at a time to happen in the system.
116  * This is used for panic, kmdb, etc., so no locking is done.
117  */
118 static volatile cpuset_t xc_priority_set_store;
119 static volatile ulong_t *xc_priority_set = CPUSET2BV(xc_priority_set_store);
120 static xc_data_t xc_priority_data;
121 
122 /*
123  * Wrappers to avoid C compiler warnings due to volatile. The atomic bit
124  * operations don't accept volatile bit vectors - which is a bit silly.
125  */
126 #define	XC_BT_SET(vector, b)	BT_ATOMIC_SET((ulong_t *)(vector), (b))
127 #define	XC_BT_CLEAR(vector, b)	BT_ATOMIC_CLEAR((ulong_t *)(vector), (b))
128 
129 /*
130  * Decrement a CPU's work count
131  */
132 static void
133 xc_decrement(struct machcpu *mcpu)
134 {
135 	atomic_dec_32(&mcpu->xc_work_cnt);
136 }
137 
138 /*
139  * Increment a CPU's work count and return the old value
140  */
141 static int
142 xc_increment(struct machcpu *mcpu)
143 {
144 	int old;
145 	do {
146 		old = mcpu->xc_work_cnt;
147 	} while (cas32((uint32_t *)&mcpu->xc_work_cnt, old, old + 1) != old);
148 	return (old);
149 }
150 
151 /*
152  * Put a message into a queue. The insertion is atomic no matter
153  * how many different inserts/extracts to the same queue happen.
154  */
155 static void
156 xc_insert(void *queue, xc_msg_t *msg)
157 {
158 	xc_msg_t *old_head;
159 
160 	/*
161 	 * FREE messages should only ever be getting inserted into
162 	 * the xc_master CPUs xc_free queue.
163 	 */
164 	ASSERT(msg->xc_command != XC_MSG_FREE ||
165 	    cpu[msg->xc_master] == NULL || /* possible only during init */
166 	    queue == &cpu[msg->xc_master]->cpu_m.xc_free);
167 
168 	do {
169 		old_head = (xc_msg_t *)*(volatile xc_msg_t **)queue;
170 		msg->xc_next = old_head;
171 	} while (casptr(queue, old_head, msg) != old_head);
172 }
173 
174 /*
175  * Extract a message from a queue. The extraction is atomic only
176  * when just one thread does extractions from the queue.
177  * If the queue is empty, NULL is returned.
178  */
179 static xc_msg_t *
180 xc_extract(xc_msg_t **queue)
181 {
182 	xc_msg_t *old_head;
183 
184 	do {
185 		old_head = (xc_msg_t *)*(volatile xc_msg_t **)queue;
186 		if (old_head == NULL)
187 			return (old_head);
188 	} while (casptr(queue, old_head, old_head->xc_next) != old_head);
189 	old_head->xc_next = NULL;
190 	return (old_head);
191 }
192 
193 /*
194  * Initialize the machcpu fields used for cross calls
195  */
196 static uint_t xc_initialized = 0;
197 
198 void
199 xc_init_cpu(struct cpu *cpup)
200 {
201 	xc_msg_t *msg;
202 	int c;
203 
204 	/*
205 	 * Allocate message buffers for the new CPU.
206 	 */
207 	for (c = 0; c < max_ncpus; ++c) {
208 		if (plat_dr_support_cpu()) {
209 			/*
210 			 * Allocate a message buffer for every CPU possible
211 			 * in system, including our own, and add them to our xc
212 			 * message queue.
213 			 */
214 			msg = kmem_zalloc(sizeof (*msg), KM_SLEEP);
215 			msg->xc_command = XC_MSG_FREE;
216 			msg->xc_master = cpup->cpu_id;
217 			xc_insert(&cpup->cpu_m.xc_free, msg);
218 		} else if (cpu[c] != NULL && cpu[c] != cpup) {
219 			/*
220 			 * Add a new message buffer to each existing CPU's free
221 			 * list, as well as one for my list for each of them.
222 			 * Note: cpu0 is statically inserted into cpu[] array,
223 			 * so need to check cpu[c] isn't cpup itself to avoid
224 			 * allocating extra message buffers for cpu0.
225 			 */
226 			msg = kmem_zalloc(sizeof (*msg), KM_SLEEP);
227 			msg->xc_command = XC_MSG_FREE;
228 			msg->xc_master = c;
229 			xc_insert(&cpu[c]->cpu_m.xc_free, msg);
230 
231 			msg = kmem_zalloc(sizeof (*msg), KM_SLEEP);
232 			msg->xc_command = XC_MSG_FREE;
233 			msg->xc_master = cpup->cpu_id;
234 			xc_insert(&cpup->cpu_m.xc_free, msg);
235 		}
236 	}
237 
238 	if (!plat_dr_support_cpu()) {
239 		/*
240 		 * Add one for self messages if CPU hotplug is disabled.
241 		 */
242 		msg = kmem_zalloc(sizeof (*msg), KM_SLEEP);
243 		msg->xc_command = XC_MSG_FREE;
244 		msg->xc_master = cpup->cpu_id;
245 		xc_insert(&cpup->cpu_m.xc_free, msg);
246 	}
247 
248 	if (!xc_initialized)
249 		xc_initialized = 1;
250 }
251 
252 void
253 xc_fini_cpu(struct cpu *cpup)
254 {
255 	xc_msg_t *msg;
256 
257 	ASSERT((cpup->cpu_flags & CPU_READY) == 0);
258 	ASSERT(cpup->cpu_m.xc_msgbox == NULL);
259 	ASSERT(cpup->cpu_m.xc_work_cnt == 0);
260 
261 	while ((msg = xc_extract(&cpup->cpu_m.xc_free)) != NULL) {
262 		kmem_free(msg, sizeof (*msg));
263 	}
264 }
265 
266 #define	XC_FLUSH_MAX_WAITS		1000
267 
268 /* Flush inflight message buffers. */
269 int
270 xc_flush_cpu(struct cpu *cpup)
271 {
272 	int i;
273 
274 	ASSERT((cpup->cpu_flags & CPU_READY) == 0);
275 
276 	/*
277 	 * Pause all working CPUs, which ensures that there's no CPU in
278 	 * function xc_common().
279 	 * This is used to work around a race condition window in xc_common()
280 	 * between checking CPU_READY flag and increasing working item count.
281 	 */
282 	pause_cpus(cpup);
283 	start_cpus();
284 
285 	for (i = 0; i < XC_FLUSH_MAX_WAITS; i++) {
286 		if (cpup->cpu_m.xc_work_cnt == 0) {
287 			break;
288 		}
289 		DELAY(1);
290 	}
291 	for (; i < XC_FLUSH_MAX_WAITS; i++) {
292 		if (!BT_TEST(xc_priority_set, cpup->cpu_id)) {
293 			break;
294 		}
295 		DELAY(1);
296 	}
297 
298 	return (i >= XC_FLUSH_MAX_WAITS ? ETIME : 0);
299 }
300 
301 /*
302  * X-call message processing routine. Note that this is used by both
303  * senders and recipients of messages.
304  *
305  * We're protected against changing CPUs by either being in a high-priority
306  * interrupt, having preemption disabled or by having a raised SPL.
307  */
308 /*ARGSUSED*/
309 uint_t
310 xc_serv(caddr_t arg1, caddr_t arg2)
311 {
312 	struct machcpu *mcpup = &(CPU->cpu_m);
313 	xc_msg_t *msg;
314 	xc_data_t *data;
315 	xc_msg_t *xc_waiters = NULL;
316 	uint32_t num_waiting = 0;
317 	xc_func_t func;
318 	xc_arg_t a1;
319 	xc_arg_t a2;
320 	xc_arg_t a3;
321 	uint_t rc = DDI_INTR_UNCLAIMED;
322 
323 	while (mcpup->xc_work_cnt != 0) {
324 		rc = DDI_INTR_CLAIMED;
325 
326 		/*
327 		 * We may have to wait for a message to arrive.
328 		 */
329 		for (msg = NULL; msg == NULL;
330 		    msg = xc_extract(&mcpup->xc_msgbox)) {
331 
332 			/*
333 			 * Alway check for and handle a priority message.
334 			 */
335 			if (BT_TEST(xc_priority_set, CPU->cpu_id)) {
336 				func = xc_priority_data.xc_func;
337 				a1 = xc_priority_data.xc_a1;
338 				a2 = xc_priority_data.xc_a2;
339 				a3 = xc_priority_data.xc_a3;
340 				XC_BT_CLEAR(xc_priority_set, CPU->cpu_id);
341 				xc_decrement(mcpup);
342 				func(a1, a2, a3);
343 				if (mcpup->xc_work_cnt == 0)
344 					return (rc);
345 			}
346 
347 			/*
348 			 * wait for a message to arrive
349 			 */
350 			SMT_PAUSE();
351 		}
352 
353 
354 		/*
355 		 * process the message
356 		 */
357 		switch (msg->xc_command) {
358 
359 		/*
360 		 * ASYNC gives back the message immediately, then we do the
361 		 * function and return with no more waiting.
362 		 */
363 		case XC_MSG_ASYNC:
364 			data = &cpu[msg->xc_master]->cpu_m.xc_data;
365 			func = data->xc_func;
366 			a1 = data->xc_a1;
367 			a2 = data->xc_a2;
368 			a3 = data->xc_a3;
369 			msg->xc_command = XC_MSG_DONE;
370 			xc_insert(&cpu[msg->xc_master]->cpu_m.xc_msgbox, msg);
371 			if (func != NULL)
372 				(void) (*func)(a1, a2, a3);
373 			xc_decrement(mcpup);
374 			break;
375 
376 		/*
377 		 * SYNC messages do the call, then send it back to the master
378 		 * in WAITING mode
379 		 */
380 		case XC_MSG_SYNC:
381 			data = &cpu[msg->xc_master]->cpu_m.xc_data;
382 			if (data->xc_func != NULL)
383 				(void) (*data->xc_func)(data->xc_a1,
384 				    data->xc_a2, data->xc_a3);
385 			msg->xc_command = XC_MSG_WAITING;
386 			xc_insert(&cpu[msg->xc_master]->cpu_m.xc_msgbox, msg);
387 			break;
388 
389 		/*
390 		 * WAITING messsages are collected by the master until all
391 		 * have arrived. Once all arrive, we release them back to
392 		 * the slaves
393 		 */
394 		case XC_MSG_WAITING:
395 			xc_insert(&xc_waiters, msg);
396 			if (++num_waiting < mcpup->xc_wait_cnt)
397 				break;
398 			while ((msg = xc_extract(&xc_waiters)) != NULL) {
399 				msg->xc_command = XC_MSG_RELEASED;
400 				xc_insert(&cpu[msg->xc_slave]->cpu_m.xc_msgbox,
401 				    msg);
402 				--num_waiting;
403 			}
404 			if (num_waiting != 0)
405 				panic("wrong number waiting");
406 			mcpup->xc_wait_cnt = 0;
407 			break;
408 
409 		/*
410 		 * CALL messages do the function and then, like RELEASE,
411 		 * send the message is back to master as DONE.
412 		 */
413 		case XC_MSG_CALL:
414 			data = &cpu[msg->xc_master]->cpu_m.xc_data;
415 			if (data->xc_func != NULL)
416 				(void) (*data->xc_func)(data->xc_a1,
417 				    data->xc_a2, data->xc_a3);
418 			/*FALLTHROUGH*/
419 		case XC_MSG_RELEASED:
420 			msg->xc_command = XC_MSG_DONE;
421 			xc_insert(&cpu[msg->xc_master]->cpu_m.xc_msgbox, msg);
422 			xc_decrement(mcpup);
423 			break;
424 
425 		/*
426 		 * DONE means a slave has completely finished up.
427 		 * Once we collect all the DONE messages, we'll exit
428 		 * processing too.
429 		 */
430 		case XC_MSG_DONE:
431 			msg->xc_command = XC_MSG_FREE;
432 			xc_insert(&mcpup->xc_free, msg);
433 			xc_decrement(mcpup);
434 			break;
435 
436 		case XC_MSG_FREE:
437 			panic("free message 0x%p in msgbox", (void *)msg);
438 			break;
439 
440 		default:
441 			panic("bad message 0x%p in msgbox", (void *)msg);
442 			break;
443 		}
444 	}
445 	return (rc);
446 }
447 
448 /*
449  * Initiate cross call processing.
450  */
451 static void
452 xc_common(
453 	xc_func_t func,
454 	xc_arg_t arg1,
455 	xc_arg_t arg2,
456 	xc_arg_t arg3,
457 	ulong_t *set,
458 	uint_t command)
459 {
460 	int c;
461 	struct cpu *cpup;
462 	xc_msg_t *msg;
463 	xc_data_t *data;
464 	int cnt;
465 	int save_spl;
466 
467 	if (!xc_initialized) {
468 		if (BT_TEST(set, CPU->cpu_id) && (CPU->cpu_flags & CPU_READY) &&
469 		    func != NULL)
470 			(void) (*func)(arg1, arg2, arg3);
471 		return;
472 	}
473 
474 	save_spl = splr(ipltospl(XC_HI_PIL));
475 
476 	/*
477 	 * fill in cross call data
478 	 */
479 	data = &CPU->cpu_m.xc_data;
480 	data->xc_func = func;
481 	data->xc_a1 = arg1;
482 	data->xc_a2 = arg2;
483 	data->xc_a3 = arg3;
484 
485 	/*
486 	 * Post messages to all CPUs involved that are CPU_READY
487 	 */
488 	CPU->cpu_m.xc_wait_cnt = 0;
489 	for (c = 0; c < max_ncpus; ++c) {
490 		if (!BT_TEST(set, c))
491 			continue;
492 		cpup = cpu[c];
493 		if (cpup == NULL || !(cpup->cpu_flags & CPU_READY))
494 			continue;
495 
496 		/*
497 		 * Fill out a new message.
498 		 */
499 		msg = xc_extract(&CPU->cpu_m.xc_free);
500 		if (msg == NULL)
501 			panic("Ran out of free xc_msg_t's");
502 		msg->xc_command = command;
503 		if (msg->xc_master != CPU->cpu_id)
504 			panic("msg %p has wrong xc_master", (void *)msg);
505 		msg->xc_slave = c;
506 
507 		/*
508 		 * Increment my work count for all messages that I'll
509 		 * transition from DONE to FREE.
510 		 * Also remember how many XC_MSG_WAITINGs to look for
511 		 */
512 		(void) xc_increment(&CPU->cpu_m);
513 		if (command == XC_MSG_SYNC)
514 			++CPU->cpu_m.xc_wait_cnt;
515 
516 		/*
517 		 * Increment the target CPU work count then insert the message
518 		 * in the target msgbox. If I post the first bit of work
519 		 * for the target to do, send an IPI to the target CPU.
520 		 */
521 		cnt = xc_increment(&cpup->cpu_m);
522 		xc_insert(&cpup->cpu_m.xc_msgbox, msg);
523 		if (cpup != CPU) {
524 			if (cnt == 0) {
525 				CPU_STATS_ADDQ(CPU, sys, xcalls, 1);
526 				send_dirint(c, XC_HI_PIL);
527 				if (xc_collect_enable)
528 					++xc_total_cnt;
529 			} else if (xc_collect_enable) {
530 				++xc_multi_cnt;
531 			}
532 		}
533 	}
534 
535 	/*
536 	 * Now drop into the message handler until all work is done
537 	 */
538 	(void) xc_serv(NULL, NULL);
539 	splx(save_spl);
540 }
541 
542 /*
543  * Push out a priority cross call.
544  */
545 static void
546 xc_priority_common(
547 	xc_func_t func,
548 	xc_arg_t arg1,
549 	xc_arg_t arg2,
550 	xc_arg_t arg3,
551 	ulong_t *set)
552 {
553 	int i;
554 	int c;
555 	struct cpu *cpup;
556 
557 	/*
558 	 * Wait briefly for any previous xc_priority to have finished.
559 	 */
560 	for (c = 0; c < max_ncpus; ++c) {
561 		cpup = cpu[c];
562 		if (cpup == NULL || !(cpup->cpu_flags & CPU_READY))
563 			continue;
564 
565 		/*
566 		 * The value of 40000 here is from old kernel code. It
567 		 * really should be changed to some time based value, since
568 		 * under a hypervisor, there's no guarantee a remote CPU
569 		 * is even scheduled.
570 		 */
571 		for (i = 0; BT_TEST(xc_priority_set, c) && i < 40000; ++i)
572 			SMT_PAUSE();
573 
574 		/*
575 		 * Some CPU did not respond to a previous priority request. It's
576 		 * probably deadlocked with interrupts blocked or some such
577 		 * problem. We'll just erase the previous request - which was
578 		 * most likely a kmdb_enter that has already expired - and plow
579 		 * ahead.
580 		 */
581 		if (BT_TEST(xc_priority_set, c)) {
582 			XC_BT_CLEAR(xc_priority_set, c);
583 			if (cpup->cpu_m.xc_work_cnt > 0)
584 				xc_decrement(&cpup->cpu_m);
585 		}
586 	}
587 
588 	/*
589 	 * fill in cross call data
590 	 */
591 	xc_priority_data.xc_func = func;
592 	xc_priority_data.xc_a1 = arg1;
593 	xc_priority_data.xc_a2 = arg2;
594 	xc_priority_data.xc_a3 = arg3;
595 
596 	/*
597 	 * Post messages to all CPUs involved that are CPU_READY
598 	 * We'll always IPI, plus bang on the xc_msgbox for i86_mwait()
599 	 */
600 	for (c = 0; c < max_ncpus; ++c) {
601 		if (!BT_TEST(set, c))
602 			continue;
603 		cpup = cpu[c];
604 		if (cpup == NULL || !(cpup->cpu_flags & CPU_READY) ||
605 		    cpup == CPU)
606 			continue;
607 		(void) xc_increment(&cpup->cpu_m);
608 		XC_BT_SET(xc_priority_set, c);
609 		send_dirint(c, XC_HI_PIL);
610 		for (i = 0; i < 10; ++i) {
611 			(void) casptr(&cpup->cpu_m.xc_msgbox,
612 			    cpup->cpu_m.xc_msgbox, cpup->cpu_m.xc_msgbox);
613 		}
614 	}
615 }
616 
617 /*
618  * Do cross call to all other CPUs with absolutely no waiting or handshaking.
619  * This should only be used for extraordinary operations, like panic(), which
620  * need to work, in some fashion, in a not completely functional system.
621  * All other uses that want minimal waiting should use xc_call_nowait().
622  */
623 void
624 xc_priority(
625 	xc_arg_t arg1,
626 	xc_arg_t arg2,
627 	xc_arg_t arg3,
628 	ulong_t *set,
629 	xc_func_t func)
630 {
631 	extern int IGNORE_KERNEL_PREEMPTION;
632 	int save_spl = splr(ipltospl(XC_HI_PIL));
633 	int save_kernel_preemption = IGNORE_KERNEL_PREEMPTION;
634 
635 	IGNORE_KERNEL_PREEMPTION = 1;
636 	xc_priority_common((xc_func_t)func, arg1, arg2, arg3, set);
637 	IGNORE_KERNEL_PREEMPTION = save_kernel_preemption;
638 	splx(save_spl);
639 }
640 
641 /*
642  * Wrapper for kmdb to capture other CPUs, causing them to enter the debugger.
643  */
644 void
645 kdi_xc_others(int this_cpu, void (*func)(void))
646 {
647 	extern int IGNORE_KERNEL_PREEMPTION;
648 	int save_kernel_preemption;
649 	cpuset_t set;
650 
651 	if (!xc_initialized)
652 		return;
653 
654 	save_kernel_preemption = IGNORE_KERNEL_PREEMPTION;
655 	IGNORE_KERNEL_PREEMPTION = 1;
656 	CPUSET_ALL_BUT(set, this_cpu);
657 	xc_priority_common((xc_func_t)func, 0, 0, 0, CPUSET2BV(set));
658 	IGNORE_KERNEL_PREEMPTION = save_kernel_preemption;
659 }
660 
661 
662 
663 /*
664  * Invoke function on specified processors. Remotes may continue after
665  * service with no waiting. xc_call_nowait() may return immediately too.
666  */
667 void
668 xc_call_nowait(
669 	xc_arg_t arg1,
670 	xc_arg_t arg2,
671 	xc_arg_t arg3,
672 	ulong_t *set,
673 	xc_func_t func)
674 {
675 	xc_common(func, arg1, arg2, arg3, set, XC_MSG_ASYNC);
676 }
677 
678 /*
679  * Invoke function on specified processors. Remotes may continue after
680  * service with no waiting. xc_call() returns only after remotes have finished.
681  */
682 void
683 xc_call(
684 	xc_arg_t arg1,
685 	xc_arg_t arg2,
686 	xc_arg_t arg3,
687 	ulong_t *set,
688 	xc_func_t func)
689 {
690 	xc_common(func, arg1, arg2, arg3, set, XC_MSG_CALL);
691 }
692 
693 /*
694  * Invoke function on specified processors. Remotes wait until all have
695  * finished. xc_sync() also waits until all remotes have finished.
696  */
697 void
698 xc_sync(
699 	xc_arg_t arg1,
700 	xc_arg_t arg2,
701 	xc_arg_t arg3,
702 	ulong_t *set,
703 	xc_func_t func)
704 {
705 	xc_common(func, arg1, arg2, arg3, set, XC_MSG_SYNC);
706 }
707