xref: /illumos-gate/usr/src/uts/i86pc/os/x_call.c (revision f22cbd2db87ae3945ed6a9166f8b9d61b65c6ab9)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /*
26  * Copyright (c) 2010, Intel Corporation.
27  * All rights reserved.
28  */
29 
30 #include <sys/types.h>
31 #include <sys/param.h>
32 #include <sys/t_lock.h>
33 #include <sys/thread.h>
34 #include <sys/cpuvar.h>
35 #include <sys/x_call.h>
36 #include <sys/xc_levels.h>
37 #include <sys/cpu.h>
38 #include <sys/psw.h>
39 #include <sys/sunddi.h>
40 #include <sys/debug.h>
41 #include <sys/systm.h>
42 #include <sys/archsystm.h>
43 #include <sys/machsystm.h>
44 #include <sys/mutex_impl.h>
45 #include <sys/stack.h>
46 #include <sys/promif.h>
47 #include <sys/x86_archext.h>
48 
49 /*
50  * Implementation for cross-processor calls via interprocessor interrupts
51  *
52  * This implementation uses a message passing architecture to allow multiple
53  * concurrent cross calls to be in flight at any given time. We use the cmpxchg
54  * instruction, aka atomic_cas_ptr(), to implement simple efficient work
55  * queues for message passing between CPUs with almost no need for regular
56  * locking.  See xc_extract() and xc_insert() below.
57  *
58  * The general idea is that initiating a cross call means putting a message
59  * on a target(s) CPU's work queue. Any synchronization is handled by passing
60  * the message back and forth between initiator and target(s).
61  *
62  * Every CPU has xc_work_cnt, which indicates it has messages to process.
63  * This value is incremented as message traffic is initiated and decremented
64  * with every message that finishes all processing.
65  *
66  * The code needs no mfence or other membar_*() calls. The uses of
67  * atomic_cas_ptr(), atomic_cas_32() and atomic_dec_32() for the message
68  * passing are implemented with LOCK prefix instructions which are
69  * equivalent to mfence.
70  *
71  * One interesting aspect of this implmentation is that it allows 2 or more
72  * CPUs to initiate cross calls to intersecting sets of CPUs at the same time.
73  * The cross call processing by the CPUs will happen in any order with only
74  * a guarantee, for xc_call() and xc_sync(), that an initiator won't return
75  * from cross calls before all slaves have invoked the function.
76  *
77  * The reason for this asynchronous approach is to allow for fast global
78  * TLB shootdowns. If all CPUs, say N, tried to do a global TLB invalidation
79  * on a different Virtual Address at the same time. The old code required
80  * N squared IPIs. With this method, depending on timing, it could happen
81  * with just N IPIs.
82  */
83 
84 /*
85  * The default is to not enable collecting counts of IPI information, since
86  * the updating of shared cachelines could cause excess bus traffic.
87  */
88 uint_t xc_collect_enable = 0;
89 uint64_t xc_total_cnt = 0;	/* total #IPIs sent for cross calls */
90 uint64_t xc_multi_cnt = 0;	/* # times we piggy backed on another IPI */
91 
92 /*
93  * Values for message states. Here are the normal transitions. A transition
94  * of "->" happens in the slave cpu and "=>" happens in the master cpu as
95  * the messages are passed back and forth.
96  *
97  * FREE => ASYNC ->                       DONE => FREE
98  * FREE => CALL ->                        DONE => FREE
99  * FREE => SYNC -> WAITING => RELEASED -> DONE => FREE
100  *
101  * The interesing one above is ASYNC. You might ask, why not go directly
102  * to FREE, instead of DONE. If it did that, it might be possible to exhaust
103  * the master's xc_free list if a master can generate ASYNC messages faster
104  * then the slave can process them. That could be handled with more complicated
105  * handling. However since nothing important uses ASYNC, I've not bothered.
106  */
107 #define	XC_MSG_FREE	(0)	/* msg in xc_free queue */
108 #define	XC_MSG_ASYNC	(1)	/* msg in slave xc_msgbox */
109 #define	XC_MSG_CALL	(2)	/* msg in slave xc_msgbox */
110 #define	XC_MSG_SYNC	(3)	/* msg in slave xc_msgbox */
111 #define	XC_MSG_WAITING	(4)	/* msg in master xc_msgbox or xc_waiters */
112 #define	XC_MSG_RELEASED	(5)	/* msg in slave xc_msgbox */
113 #define	XC_MSG_DONE	(6)	/* msg in master xc_msgbox */
114 
115 /*
116  * We allow for one high priority message at a time to happen in the system.
117  * This is used for panic, kmdb, etc., so no locking is done.
118  */
119 static volatile cpuset_t xc_priority_set_store;
120 static volatile ulong_t *xc_priority_set = CPUSET2BV(xc_priority_set_store);
121 static xc_data_t xc_priority_data;
122 
123 /*
124  * Wrappers to avoid C compiler warnings due to volatile. The atomic bit
125  * operations don't accept volatile bit vectors - which is a bit silly.
126  */
127 #define	XC_BT_SET(vector, b)	BT_ATOMIC_SET((ulong_t *)(vector), (b))
128 #define	XC_BT_CLEAR(vector, b)	BT_ATOMIC_CLEAR((ulong_t *)(vector), (b))
129 
130 /*
131  * Decrement a CPU's work count
132  */
133 static void
134 xc_decrement(struct machcpu *mcpu)
135 {
136 	atomic_dec_32(&mcpu->xc_work_cnt);
137 }
138 
139 /*
140  * Increment a CPU's work count and return the old value
141  */
142 static int
143 xc_increment(struct machcpu *mcpu)
144 {
145 	int old;
146 	do {
147 		old = mcpu->xc_work_cnt;
148 	} while (atomic_cas_32(&mcpu->xc_work_cnt, old, old + 1) != old);
149 	return (old);
150 }
151 
152 /*
153  * Put a message into a queue. The insertion is atomic no matter
154  * how many different inserts/extracts to the same queue happen.
155  */
156 static void
157 xc_insert(void *queue, xc_msg_t *msg)
158 {
159 	xc_msg_t *old_head;
160 
161 	/*
162 	 * FREE messages should only ever be getting inserted into
163 	 * the xc_master CPUs xc_free queue.
164 	 */
165 	ASSERT(msg->xc_command != XC_MSG_FREE ||
166 	    cpu[msg->xc_master] == NULL || /* possible only during init */
167 	    queue == &cpu[msg->xc_master]->cpu_m.xc_free);
168 
169 	do {
170 		old_head = (xc_msg_t *)*(volatile xc_msg_t **)queue;
171 		msg->xc_next = old_head;
172 	} while (atomic_cas_ptr(queue, old_head, msg) != old_head);
173 }
174 
175 /*
176  * Extract a message from a queue. The extraction is atomic only
177  * when just one thread does extractions from the queue.
178  * If the queue is empty, NULL is returned.
179  */
180 static xc_msg_t *
181 xc_extract(xc_msg_t **queue)
182 {
183 	xc_msg_t *old_head;
184 
185 	do {
186 		old_head = (xc_msg_t *)*(volatile xc_msg_t **)queue;
187 		if (old_head == NULL)
188 			return (old_head);
189 	} while (atomic_cas_ptr(queue, old_head, old_head->xc_next) !=
190 	    old_head);
191 	old_head->xc_next = NULL;
192 	return (old_head);
193 }
194 
195 /*
196  * Initialize the machcpu fields used for cross calls
197  */
198 static uint_t xc_initialized = 0;
199 
200 void
201 xc_init_cpu(struct cpu *cpup)
202 {
203 	xc_msg_t *msg;
204 	int c;
205 
206 	/*
207 	 * Allocate message buffers for the new CPU.
208 	 */
209 	for (c = 0; c < max_ncpus; ++c) {
210 		if (plat_dr_support_cpu()) {
211 			/*
212 			 * Allocate a message buffer for every CPU possible
213 			 * in system, including our own, and add them to our xc
214 			 * message queue.
215 			 */
216 			msg = kmem_zalloc(sizeof (*msg), KM_SLEEP);
217 			msg->xc_command = XC_MSG_FREE;
218 			msg->xc_master = cpup->cpu_id;
219 			xc_insert(&cpup->cpu_m.xc_free, msg);
220 		} else if (cpu[c] != NULL && cpu[c] != cpup) {
221 			/*
222 			 * Add a new message buffer to each existing CPU's free
223 			 * list, as well as one for my list for each of them.
224 			 * Note: cpu0 is statically inserted into cpu[] array,
225 			 * so need to check cpu[c] isn't cpup itself to avoid
226 			 * allocating extra message buffers for cpu0.
227 			 */
228 			msg = kmem_zalloc(sizeof (*msg), KM_SLEEP);
229 			msg->xc_command = XC_MSG_FREE;
230 			msg->xc_master = c;
231 			xc_insert(&cpu[c]->cpu_m.xc_free, msg);
232 
233 			msg = kmem_zalloc(sizeof (*msg), KM_SLEEP);
234 			msg->xc_command = XC_MSG_FREE;
235 			msg->xc_master = cpup->cpu_id;
236 			xc_insert(&cpup->cpu_m.xc_free, msg);
237 		}
238 	}
239 
240 	if (!plat_dr_support_cpu()) {
241 		/*
242 		 * Add one for self messages if CPU hotplug is disabled.
243 		 */
244 		msg = kmem_zalloc(sizeof (*msg), KM_SLEEP);
245 		msg->xc_command = XC_MSG_FREE;
246 		msg->xc_master = cpup->cpu_id;
247 		xc_insert(&cpup->cpu_m.xc_free, msg);
248 	}
249 
250 	if (!xc_initialized)
251 		xc_initialized = 1;
252 }
253 
254 void
255 xc_fini_cpu(struct cpu *cpup)
256 {
257 	xc_msg_t *msg;
258 
259 	ASSERT((cpup->cpu_flags & CPU_READY) == 0);
260 	ASSERT(cpup->cpu_m.xc_msgbox == NULL);
261 	ASSERT(cpup->cpu_m.xc_work_cnt == 0);
262 
263 	while ((msg = xc_extract(&cpup->cpu_m.xc_free)) != NULL) {
264 		kmem_free(msg, sizeof (*msg));
265 	}
266 }
267 
268 #define	XC_FLUSH_MAX_WAITS		1000
269 
270 /* Flush inflight message buffers. */
271 int
272 xc_flush_cpu(struct cpu *cpup)
273 {
274 	int i;
275 
276 	ASSERT((cpup->cpu_flags & CPU_READY) == 0);
277 
278 	/*
279 	 * Pause all working CPUs, which ensures that there's no CPU in
280 	 * function xc_common().
281 	 * This is used to work around a race condition window in xc_common()
282 	 * between checking CPU_READY flag and increasing working item count.
283 	 */
284 	pause_cpus(cpup, NULL);
285 	start_cpus();
286 
287 	for (i = 0; i < XC_FLUSH_MAX_WAITS; i++) {
288 		if (cpup->cpu_m.xc_work_cnt == 0) {
289 			break;
290 		}
291 		DELAY(1);
292 	}
293 	for (; i < XC_FLUSH_MAX_WAITS; i++) {
294 		if (!BT_TEST(xc_priority_set, cpup->cpu_id)) {
295 			break;
296 		}
297 		DELAY(1);
298 	}
299 
300 	return (i >= XC_FLUSH_MAX_WAITS ? ETIME : 0);
301 }
302 
303 /*
304  * X-call message processing routine. Note that this is used by both
305  * senders and recipients of messages.
306  *
307  * We're protected against changing CPUs by either being in a high-priority
308  * interrupt, having preemption disabled or by having a raised SPL.
309  */
310 /*ARGSUSED*/
311 uint_t
312 xc_serv(caddr_t arg1, caddr_t arg2)
313 {
314 	struct machcpu *mcpup = &(CPU->cpu_m);
315 	xc_msg_t *msg;
316 	xc_data_t *data;
317 	xc_msg_t *xc_waiters = NULL;
318 	uint32_t num_waiting = 0;
319 	xc_func_t func;
320 	xc_arg_t a1;
321 	xc_arg_t a2;
322 	xc_arg_t a3;
323 	uint_t rc = DDI_INTR_UNCLAIMED;
324 
325 	while (mcpup->xc_work_cnt != 0) {
326 		rc = DDI_INTR_CLAIMED;
327 
328 		/*
329 		 * We may have to wait for a message to arrive.
330 		 */
331 		for (msg = NULL; msg == NULL;
332 		    msg = xc_extract(&mcpup->xc_msgbox)) {
333 
334 			/*
335 			 * Alway check for and handle a priority message.
336 			 */
337 			if (BT_TEST(xc_priority_set, CPU->cpu_id)) {
338 				func = xc_priority_data.xc_func;
339 				a1 = xc_priority_data.xc_a1;
340 				a2 = xc_priority_data.xc_a2;
341 				a3 = xc_priority_data.xc_a3;
342 				XC_BT_CLEAR(xc_priority_set, CPU->cpu_id);
343 				xc_decrement(mcpup);
344 				func(a1, a2, a3);
345 				if (mcpup->xc_work_cnt == 0)
346 					return (rc);
347 			}
348 
349 			/*
350 			 * wait for a message to arrive
351 			 */
352 			SMT_PAUSE();
353 		}
354 
355 
356 		/*
357 		 * process the message
358 		 */
359 		switch (msg->xc_command) {
360 
361 		/*
362 		 * ASYNC gives back the message immediately, then we do the
363 		 * function and return with no more waiting.
364 		 */
365 		case XC_MSG_ASYNC:
366 			data = &cpu[msg->xc_master]->cpu_m.xc_data;
367 			func = data->xc_func;
368 			a1 = data->xc_a1;
369 			a2 = data->xc_a2;
370 			a3 = data->xc_a3;
371 			msg->xc_command = XC_MSG_DONE;
372 			xc_insert(&cpu[msg->xc_master]->cpu_m.xc_msgbox, msg);
373 			if (func != NULL)
374 				(void) (*func)(a1, a2, a3);
375 			xc_decrement(mcpup);
376 			break;
377 
378 		/*
379 		 * SYNC messages do the call, then send it back to the master
380 		 * in WAITING mode
381 		 */
382 		case XC_MSG_SYNC:
383 			data = &cpu[msg->xc_master]->cpu_m.xc_data;
384 			if (data->xc_func != NULL)
385 				(void) (*data->xc_func)(data->xc_a1,
386 				    data->xc_a2, data->xc_a3);
387 			msg->xc_command = XC_MSG_WAITING;
388 			xc_insert(&cpu[msg->xc_master]->cpu_m.xc_msgbox, msg);
389 			break;
390 
391 		/*
392 		 * WAITING messsages are collected by the master until all
393 		 * have arrived. Once all arrive, we release them back to
394 		 * the slaves
395 		 */
396 		case XC_MSG_WAITING:
397 			xc_insert(&xc_waiters, msg);
398 			if (++num_waiting < mcpup->xc_wait_cnt)
399 				break;
400 			while ((msg = xc_extract(&xc_waiters)) != NULL) {
401 				msg->xc_command = XC_MSG_RELEASED;
402 				xc_insert(&cpu[msg->xc_slave]->cpu_m.xc_msgbox,
403 				    msg);
404 				--num_waiting;
405 			}
406 			if (num_waiting != 0)
407 				panic("wrong number waiting");
408 			mcpup->xc_wait_cnt = 0;
409 			break;
410 
411 		/*
412 		 * CALL messages do the function and then, like RELEASE,
413 		 * send the message is back to master as DONE.
414 		 */
415 		case XC_MSG_CALL:
416 			data = &cpu[msg->xc_master]->cpu_m.xc_data;
417 			if (data->xc_func != NULL)
418 				(void) (*data->xc_func)(data->xc_a1,
419 				    data->xc_a2, data->xc_a3);
420 			/*FALLTHROUGH*/
421 		case XC_MSG_RELEASED:
422 			msg->xc_command = XC_MSG_DONE;
423 			xc_insert(&cpu[msg->xc_master]->cpu_m.xc_msgbox, msg);
424 			xc_decrement(mcpup);
425 			break;
426 
427 		/*
428 		 * DONE means a slave has completely finished up.
429 		 * Once we collect all the DONE messages, we'll exit
430 		 * processing too.
431 		 */
432 		case XC_MSG_DONE:
433 			msg->xc_command = XC_MSG_FREE;
434 			xc_insert(&mcpup->xc_free, msg);
435 			xc_decrement(mcpup);
436 			break;
437 
438 		case XC_MSG_FREE:
439 			panic("free message 0x%p in msgbox", (void *)msg);
440 			break;
441 
442 		default:
443 			panic("bad message 0x%p in msgbox", (void *)msg);
444 			break;
445 		}
446 	}
447 	return (rc);
448 }
449 
450 /*
451  * Initiate cross call processing.
452  */
453 static void
454 xc_common(
455 	xc_func_t func,
456 	xc_arg_t arg1,
457 	xc_arg_t arg2,
458 	xc_arg_t arg3,
459 	ulong_t *set,
460 	uint_t command)
461 {
462 	int c;
463 	struct cpu *cpup;
464 	xc_msg_t *msg;
465 	xc_data_t *data;
466 	int cnt;
467 	int save_spl;
468 
469 	if (!xc_initialized) {
470 		if (BT_TEST(set, CPU->cpu_id) && (CPU->cpu_flags & CPU_READY) &&
471 		    func != NULL)
472 			(void) (*func)(arg1, arg2, arg3);
473 		return;
474 	}
475 
476 	save_spl = splr(ipltospl(XC_HI_PIL));
477 
478 	/*
479 	 * fill in cross call data
480 	 */
481 	data = &CPU->cpu_m.xc_data;
482 	data->xc_func = func;
483 	data->xc_a1 = arg1;
484 	data->xc_a2 = arg2;
485 	data->xc_a3 = arg3;
486 
487 	/*
488 	 * Post messages to all CPUs involved that are CPU_READY
489 	 */
490 	CPU->cpu_m.xc_wait_cnt = 0;
491 	for (c = 0; c < max_ncpus; ++c) {
492 		if (!BT_TEST(set, c))
493 			continue;
494 		cpup = cpu[c];
495 		if (cpup == NULL || !(cpup->cpu_flags & CPU_READY))
496 			continue;
497 
498 		/*
499 		 * Fill out a new message.
500 		 */
501 		msg = xc_extract(&CPU->cpu_m.xc_free);
502 		if (msg == NULL)
503 			panic("Ran out of free xc_msg_t's");
504 		msg->xc_command = command;
505 		if (msg->xc_master != CPU->cpu_id)
506 			panic("msg %p has wrong xc_master", (void *)msg);
507 		msg->xc_slave = c;
508 
509 		/*
510 		 * Increment my work count for all messages that I'll
511 		 * transition from DONE to FREE.
512 		 * Also remember how many XC_MSG_WAITINGs to look for
513 		 */
514 		(void) xc_increment(&CPU->cpu_m);
515 		if (command == XC_MSG_SYNC)
516 			++CPU->cpu_m.xc_wait_cnt;
517 
518 		/*
519 		 * Increment the target CPU work count then insert the message
520 		 * in the target msgbox. If I post the first bit of work
521 		 * for the target to do, send an IPI to the target CPU.
522 		 */
523 		cnt = xc_increment(&cpup->cpu_m);
524 		xc_insert(&cpup->cpu_m.xc_msgbox, msg);
525 		if (cpup != CPU) {
526 			if (cnt == 0) {
527 				CPU_STATS_ADDQ(CPU, sys, xcalls, 1);
528 				send_dirint(c, XC_HI_PIL);
529 				if (xc_collect_enable)
530 					++xc_total_cnt;
531 			} else if (xc_collect_enable) {
532 				++xc_multi_cnt;
533 			}
534 		}
535 	}
536 
537 	/*
538 	 * Now drop into the message handler until all work is done
539 	 */
540 	(void) xc_serv(NULL, NULL);
541 	splx(save_spl);
542 }
543 
544 /*
545  * Push out a priority cross call.
546  */
547 static void
548 xc_priority_common(
549 	xc_func_t func,
550 	xc_arg_t arg1,
551 	xc_arg_t arg2,
552 	xc_arg_t arg3,
553 	ulong_t *set)
554 {
555 	int i;
556 	int c;
557 	struct cpu *cpup;
558 
559 	/*
560 	 * Wait briefly for any previous xc_priority to have finished.
561 	 */
562 	for (c = 0; c < max_ncpus; ++c) {
563 		cpup = cpu[c];
564 		if (cpup == NULL || !(cpup->cpu_flags & CPU_READY))
565 			continue;
566 
567 		/*
568 		 * The value of 40000 here is from old kernel code. It
569 		 * really should be changed to some time based value, since
570 		 * under a hypervisor, there's no guarantee a remote CPU
571 		 * is even scheduled.
572 		 */
573 		for (i = 0; BT_TEST(xc_priority_set, c) && i < 40000; ++i)
574 			SMT_PAUSE();
575 
576 		/*
577 		 * Some CPU did not respond to a previous priority request. It's
578 		 * probably deadlocked with interrupts blocked or some such
579 		 * problem. We'll just erase the previous request - which was
580 		 * most likely a kmdb_enter that has already expired - and plow
581 		 * ahead.
582 		 */
583 		if (BT_TEST(xc_priority_set, c)) {
584 			XC_BT_CLEAR(xc_priority_set, c);
585 			if (cpup->cpu_m.xc_work_cnt > 0)
586 				xc_decrement(&cpup->cpu_m);
587 		}
588 	}
589 
590 	/*
591 	 * fill in cross call data
592 	 */
593 	xc_priority_data.xc_func = func;
594 	xc_priority_data.xc_a1 = arg1;
595 	xc_priority_data.xc_a2 = arg2;
596 	xc_priority_data.xc_a3 = arg3;
597 
598 	/*
599 	 * Post messages to all CPUs involved that are CPU_READY
600 	 * We'll always IPI, plus bang on the xc_msgbox for i86_mwait()
601 	 */
602 	for (c = 0; c < max_ncpus; ++c) {
603 		if (!BT_TEST(set, c))
604 			continue;
605 		cpup = cpu[c];
606 		if (cpup == NULL || !(cpup->cpu_flags & CPU_READY) ||
607 		    cpup == CPU)
608 			continue;
609 		(void) xc_increment(&cpup->cpu_m);
610 		XC_BT_SET(xc_priority_set, c);
611 		send_dirint(c, XC_HI_PIL);
612 		for (i = 0; i < 10; ++i) {
613 			(void) atomic_cas_ptr(&cpup->cpu_m.xc_msgbox,
614 			    cpup->cpu_m.xc_msgbox, cpup->cpu_m.xc_msgbox);
615 		}
616 	}
617 }
618 
619 /*
620  * Do cross call to all other CPUs with absolutely no waiting or handshaking.
621  * This should only be used for extraordinary operations, like panic(), which
622  * need to work, in some fashion, in a not completely functional system.
623  * All other uses that want minimal waiting should use xc_call_nowait().
624  */
625 void
626 xc_priority(
627 	xc_arg_t arg1,
628 	xc_arg_t arg2,
629 	xc_arg_t arg3,
630 	ulong_t *set,
631 	xc_func_t func)
632 {
633 	extern int IGNORE_KERNEL_PREEMPTION;
634 	int save_spl = splr(ipltospl(XC_HI_PIL));
635 	int save_kernel_preemption = IGNORE_KERNEL_PREEMPTION;
636 
637 	IGNORE_KERNEL_PREEMPTION = 1;
638 	xc_priority_common((xc_func_t)func, arg1, arg2, arg3, set);
639 	IGNORE_KERNEL_PREEMPTION = save_kernel_preemption;
640 	splx(save_spl);
641 }
642 
643 /*
644  * Wrapper for kmdb to capture other CPUs, causing them to enter the debugger.
645  */
646 void
647 kdi_xc_others(int this_cpu, void (*func)(void))
648 {
649 	extern int IGNORE_KERNEL_PREEMPTION;
650 	int save_kernel_preemption;
651 	cpuset_t set;
652 
653 	if (!xc_initialized)
654 		return;
655 
656 	save_kernel_preemption = IGNORE_KERNEL_PREEMPTION;
657 	IGNORE_KERNEL_PREEMPTION = 1;
658 	CPUSET_ALL_BUT(set, this_cpu);
659 	xc_priority_common((xc_func_t)func, 0, 0, 0, CPUSET2BV(set));
660 	IGNORE_KERNEL_PREEMPTION = save_kernel_preemption;
661 }
662 
663 
664 
665 /*
666  * Invoke function on specified processors. Remotes may continue after
667  * service with no waiting. xc_call_nowait() may return immediately too.
668  */
669 void
670 xc_call_nowait(
671 	xc_arg_t arg1,
672 	xc_arg_t arg2,
673 	xc_arg_t arg3,
674 	ulong_t *set,
675 	xc_func_t func)
676 {
677 	xc_common(func, arg1, arg2, arg3, set, XC_MSG_ASYNC);
678 }
679 
680 /*
681  * Invoke function on specified processors. Remotes may continue after
682  * service with no waiting. xc_call() returns only after remotes have finished.
683  */
684 void
685 xc_call(
686 	xc_arg_t arg1,
687 	xc_arg_t arg2,
688 	xc_arg_t arg3,
689 	ulong_t *set,
690 	xc_func_t func)
691 {
692 	xc_common(func, arg1, arg2, arg3, set, XC_MSG_CALL);
693 }
694 
695 /*
696  * Invoke function on specified processors. Remotes wait until all have
697  * finished. xc_sync() also waits until all remotes have finished.
698  */
699 void
700 xc_sync(
701 	xc_arg_t arg1,
702 	xc_arg_t arg2,
703 	xc_arg_t arg3,
704 	ulong_t *set,
705 	xc_func_t func)
706 {
707 	xc_common(func, arg1, arg2, arg3, set, XC_MSG_SYNC);
708 }
709