xref: /linux/drivers/misc/sgi-xp/xpc_uv.c (revision 367b8112fe2ea5c39a7bb4d263dcdd9b612fae18)
1 /*
2  * This file is subject to the terms and conditions of the GNU General Public
3  * License.  See the file "COPYING" in the main directory of this archive
4  * for more details.
5  *
6  * Copyright (c) 2008 Silicon Graphics, Inc.  All Rights Reserved.
7  */
8 
9 /*
10  * Cross Partition Communication (XPC) uv-based functions.
11  *
12  *     Architecture specific implementation of common functions.
13  *
14  */
15 
16 #include <linux/kernel.h>
17 #include <linux/mm.h>
18 #include <linux/interrupt.h>
19 #include <linux/delay.h>
20 #include <linux/device.h>
21 #include <asm/uv/uv_hub.h>
22 #include "../sgi-gru/gru.h"
23 #include "../sgi-gru/grukservices.h"
24 #include "xpc.h"
25 
26 static atomic64_t xpc_heartbeat_uv;
27 static DECLARE_BITMAP(xpc_heartbeating_to_mask_uv, XP_MAX_NPARTITIONS_UV);
28 
29 #define XPC_ACTIVATE_MSG_SIZE_UV	(1 * GRU_CACHE_LINE_BYTES)
30 #define XPC_NOTIFY_MSG_SIZE_UV		(2 * GRU_CACHE_LINE_BYTES)
31 
32 #define XPC_ACTIVATE_MQ_SIZE_UV	(4 * XP_MAX_NPARTITIONS_UV * \
33 				 XPC_ACTIVATE_MSG_SIZE_UV)
34 #define XPC_NOTIFY_MQ_SIZE_UV	(4 * XP_MAX_NPARTITIONS_UV * \
35 				 XPC_NOTIFY_MSG_SIZE_UV)
36 
37 static void *xpc_activate_mq_uv;
38 static void *xpc_notify_mq_uv;
39 
40 static int
41 xpc_setup_partitions_sn_uv(void)
42 {
43 	short partid;
44 	struct xpc_partition_uv *part_uv;
45 
46 	for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
47 		part_uv = &xpc_partitions[partid].sn.uv;
48 
49 		spin_lock_init(&part_uv->flags_lock);
50 		part_uv->remote_act_state = XPC_P_AS_INACTIVE;
51 	}
52 	return 0;
53 }
54 
55 static void *
56 xpc_create_gru_mq_uv(unsigned int mq_size, int cpuid, unsigned int irq,
57 		     irq_handler_t irq_handler)
58 {
59 	int ret;
60 	int nid;
61 	int mq_order;
62 	struct page *page;
63 	void *mq;
64 
65 	nid = cpu_to_node(cpuid);
66 	mq_order = get_order(mq_size);
67 	page = alloc_pages_node(nid, GFP_KERNEL | __GFP_ZERO | GFP_THISNODE,
68 				mq_order);
69 	if (page == NULL) {
70 		dev_err(xpc_part, "xpc_create_gru_mq_uv() failed to alloc %d "
71 			"bytes of memory on nid=%d for GRU mq\n", mq_size, nid);
72 		return NULL;
73 	}
74 
75 	mq = page_address(page);
76 	ret = gru_create_message_queue(mq, mq_size);
77 	if (ret != 0) {
78 		dev_err(xpc_part, "gru_create_message_queue() returned "
79 			"error=%d\n", ret);
80 		free_pages((unsigned long)mq, mq_order);
81 		return NULL;
82 	}
83 
84 	/* !!! Need to do some other things to set up IRQ */
85 
86 	ret = request_irq(irq, irq_handler, 0, "xpc", NULL);
87 	if (ret != 0) {
88 		dev_err(xpc_part, "request_irq(irq=%d) returned error=%d\n",
89 			irq, ret);
90 		free_pages((unsigned long)mq, mq_order);
91 		return NULL;
92 	}
93 
94 	/* !!! enable generation of irq when GRU mq op occurs to this mq */
95 
96 	/* ??? allow other partitions to access GRU mq? */
97 
98 	return mq;
99 }
100 
101 static void
102 xpc_destroy_gru_mq_uv(void *mq, unsigned int mq_size, unsigned int irq)
103 {
104 	/* ??? disallow other partitions to access GRU mq? */
105 
106 	/* !!! disable generation of irq when GRU mq op occurs to this mq */
107 
108 	free_irq(irq, NULL);
109 
110 	free_pages((unsigned long)mq, get_order(mq_size));
111 }
112 
113 static enum xp_retval
114 xpc_send_gru_msg(unsigned long mq_gpa, void *msg, size_t msg_size)
115 {
116 	enum xp_retval xp_ret;
117 	int ret;
118 
119 	while (1) {
120 		ret = gru_send_message_gpa(mq_gpa, msg, msg_size);
121 		if (ret == MQE_OK) {
122 			xp_ret = xpSuccess;
123 			break;
124 		}
125 
126 		if (ret == MQE_QUEUE_FULL) {
127 			dev_dbg(xpc_chan, "gru_send_message_gpa() returned "
128 				"error=MQE_QUEUE_FULL\n");
129 			/* !!! handle QLimit reached; delay & try again */
130 			/* ??? Do we add a limit to the number of retries? */
131 			(void)msleep_interruptible(10);
132 		} else if (ret == MQE_CONGESTION) {
133 			dev_dbg(xpc_chan, "gru_send_message_gpa() returned "
134 				"error=MQE_CONGESTION\n");
135 			/* !!! handle LB Overflow; simply try again */
136 			/* ??? Do we add a limit to the number of retries? */
137 		} else {
138 			/* !!! Currently this is MQE_UNEXPECTED_CB_ERR */
139 			dev_err(xpc_chan, "gru_send_message_gpa() returned "
140 				"error=%d\n", ret);
141 			xp_ret = xpGruSendMqError;
142 			break;
143 		}
144 	}
145 	return xp_ret;
146 }
147 
148 static void
149 xpc_process_activate_IRQ_rcvd_uv(void)
150 {
151 	unsigned long irq_flags;
152 	short partid;
153 	struct xpc_partition *part;
154 	u8 act_state_req;
155 
156 	DBUG_ON(xpc_activate_IRQ_rcvd == 0);
157 
158 	spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
159 	for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
160 		part = &xpc_partitions[partid];
161 
162 		if (part->sn.uv.act_state_req == 0)
163 			continue;
164 
165 		xpc_activate_IRQ_rcvd--;
166 		BUG_ON(xpc_activate_IRQ_rcvd < 0);
167 
168 		act_state_req = part->sn.uv.act_state_req;
169 		part->sn.uv.act_state_req = 0;
170 		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
171 
172 		if (act_state_req == XPC_P_ASR_ACTIVATE_UV) {
173 			if (part->act_state == XPC_P_AS_INACTIVE)
174 				xpc_activate_partition(part);
175 			else if (part->act_state == XPC_P_AS_DEACTIVATING)
176 				XPC_DEACTIVATE_PARTITION(part, xpReactivating);
177 
178 		} else if (act_state_req == XPC_P_ASR_REACTIVATE_UV) {
179 			if (part->act_state == XPC_P_AS_INACTIVE)
180 				xpc_activate_partition(part);
181 			else
182 				XPC_DEACTIVATE_PARTITION(part, xpReactivating);
183 
184 		} else if (act_state_req == XPC_P_ASR_DEACTIVATE_UV) {
185 			XPC_DEACTIVATE_PARTITION(part, part->sn.uv.reason);
186 
187 		} else {
188 			BUG();
189 		}
190 
191 		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
192 		if (xpc_activate_IRQ_rcvd == 0)
193 			break;
194 	}
195 	spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
196 
197 }
198 
199 static void
200 xpc_handle_activate_mq_msg_uv(struct xpc_partition *part,
201 			      struct xpc_activate_mq_msghdr_uv *msg_hdr,
202 			      int *wakeup_hb_checker)
203 {
204 	unsigned long irq_flags;
205 	struct xpc_partition_uv *part_uv = &part->sn.uv;
206 	struct xpc_openclose_args *args;
207 
208 	part_uv->remote_act_state = msg_hdr->act_state;
209 
210 	switch (msg_hdr->type) {
211 	case XPC_ACTIVATE_MQ_MSG_SYNC_ACT_STATE_UV:
212 		/* syncing of remote_act_state was just done above */
213 		break;
214 
215 	case XPC_ACTIVATE_MQ_MSG_INC_HEARTBEAT_UV: {
216 		struct xpc_activate_mq_msg_heartbeat_req_uv *msg;
217 
218 		msg = container_of(msg_hdr,
219 				   struct xpc_activate_mq_msg_heartbeat_req_uv,
220 				   hdr);
221 		part_uv->heartbeat = msg->heartbeat;
222 		break;
223 	}
224 	case XPC_ACTIVATE_MQ_MSG_OFFLINE_HEARTBEAT_UV: {
225 		struct xpc_activate_mq_msg_heartbeat_req_uv *msg;
226 
227 		msg = container_of(msg_hdr,
228 				   struct xpc_activate_mq_msg_heartbeat_req_uv,
229 				   hdr);
230 		part_uv->heartbeat = msg->heartbeat;
231 
232 		spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
233 		part_uv->flags |= XPC_P_HEARTBEAT_OFFLINE_UV;
234 		spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
235 		break;
236 	}
237 	case XPC_ACTIVATE_MQ_MSG_ONLINE_HEARTBEAT_UV: {
238 		struct xpc_activate_mq_msg_heartbeat_req_uv *msg;
239 
240 		msg = container_of(msg_hdr,
241 				   struct xpc_activate_mq_msg_heartbeat_req_uv,
242 				   hdr);
243 		part_uv->heartbeat = msg->heartbeat;
244 
245 		spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
246 		part_uv->flags &= ~XPC_P_HEARTBEAT_OFFLINE_UV;
247 		spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
248 		break;
249 	}
250 	case XPC_ACTIVATE_MQ_MSG_ACTIVATE_REQ_UV: {
251 		struct xpc_activate_mq_msg_activate_req_uv *msg;
252 
253 		/*
254 		 * ??? Do we deal here with ts_jiffies being different
255 		 * ??? if act_state != XPC_P_AS_INACTIVE instead of
256 		 * ??? below?
257 		 */
258 		msg = container_of(msg_hdr, struct
259 				   xpc_activate_mq_msg_activate_req_uv, hdr);
260 
261 		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
262 		if (part_uv->act_state_req == 0)
263 			xpc_activate_IRQ_rcvd++;
264 		part_uv->act_state_req = XPC_P_ASR_ACTIVATE_UV;
265 		part->remote_rp_pa = msg->rp_gpa; /* !!! _pa is _gpa */
266 		part->remote_rp_ts_jiffies = msg_hdr->rp_ts_jiffies;
267 		part_uv->remote_activate_mq_gpa = msg->activate_mq_gpa;
268 		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
269 
270 		(*wakeup_hb_checker)++;
271 		break;
272 	}
273 	case XPC_ACTIVATE_MQ_MSG_DEACTIVATE_REQ_UV: {
274 		struct xpc_activate_mq_msg_deactivate_req_uv *msg;
275 
276 		msg = container_of(msg_hdr, struct
277 				   xpc_activate_mq_msg_deactivate_req_uv, hdr);
278 
279 		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
280 		if (part_uv->act_state_req == 0)
281 			xpc_activate_IRQ_rcvd++;
282 		part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
283 		part_uv->reason = msg->reason;
284 		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
285 
286 		(*wakeup_hb_checker)++;
287 		return;
288 	}
289 	case XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREQUEST_UV: {
290 		struct xpc_activate_mq_msg_chctl_closerequest_uv *msg;
291 
292 		msg = container_of(msg_hdr, struct
293 				   xpc_activate_mq_msg_chctl_closerequest_uv,
294 				   hdr);
295 		args = &part->remote_openclose_args[msg->ch_number];
296 		args->reason = msg->reason;
297 
298 		spin_lock_irqsave(&part->chctl_lock, irq_flags);
299 		part->chctl.flags[msg->ch_number] |= XPC_CHCTL_CLOSEREQUEST;
300 		spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
301 
302 		xpc_wakeup_channel_mgr(part);
303 		break;
304 	}
305 	case XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREPLY_UV: {
306 		struct xpc_activate_mq_msg_chctl_closereply_uv *msg;
307 
308 		msg = container_of(msg_hdr, struct
309 				   xpc_activate_mq_msg_chctl_closereply_uv,
310 				   hdr);
311 
312 		spin_lock_irqsave(&part->chctl_lock, irq_flags);
313 		part->chctl.flags[msg->ch_number] |= XPC_CHCTL_CLOSEREPLY;
314 		spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
315 
316 		xpc_wakeup_channel_mgr(part);
317 		break;
318 	}
319 	case XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREQUEST_UV: {
320 		struct xpc_activate_mq_msg_chctl_openrequest_uv *msg;
321 
322 		msg = container_of(msg_hdr, struct
323 				   xpc_activate_mq_msg_chctl_openrequest_uv,
324 				   hdr);
325 		args = &part->remote_openclose_args[msg->ch_number];
326 		args->entry_size = msg->entry_size;
327 		args->local_nentries = msg->local_nentries;
328 
329 		spin_lock_irqsave(&part->chctl_lock, irq_flags);
330 		part->chctl.flags[msg->ch_number] |= XPC_CHCTL_OPENREQUEST;
331 		spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
332 
333 		xpc_wakeup_channel_mgr(part);
334 		break;
335 	}
336 	case XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREPLY_UV: {
337 		struct xpc_activate_mq_msg_chctl_openreply_uv *msg;
338 
339 		msg = container_of(msg_hdr, struct
340 				   xpc_activate_mq_msg_chctl_openreply_uv, hdr);
341 		args = &part->remote_openclose_args[msg->ch_number];
342 		args->remote_nentries = msg->remote_nentries;
343 		args->local_nentries = msg->local_nentries;
344 		args->local_msgqueue_pa = msg->local_notify_mq_gpa;
345 
346 		spin_lock_irqsave(&part->chctl_lock, irq_flags);
347 		part->chctl.flags[msg->ch_number] |= XPC_CHCTL_OPENREPLY;
348 		spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
349 
350 		xpc_wakeup_channel_mgr(part);
351 		break;
352 	}
353 	case XPC_ACTIVATE_MQ_MSG_MARK_ENGAGED_UV:
354 		spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
355 		part_uv->flags |= XPC_P_ENGAGED_UV;
356 		spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
357 		break;
358 
359 	case XPC_ACTIVATE_MQ_MSG_MARK_DISENGAGED_UV:
360 		spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
361 		part_uv->flags &= ~XPC_P_ENGAGED_UV;
362 		spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
363 		break;
364 
365 	default:
366 		dev_err(xpc_part, "received unknown activate_mq msg type=%d "
367 			"from partition=%d\n", msg_hdr->type, XPC_PARTID(part));
368 
369 		/* get hb checker to deactivate from the remote partition */
370 		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
371 		if (part_uv->act_state_req == 0)
372 			xpc_activate_IRQ_rcvd++;
373 		part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
374 		part_uv->reason = xpBadMsgType;
375 		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
376 
377 		(*wakeup_hb_checker)++;
378 		return;
379 	}
380 
381 	if (msg_hdr->rp_ts_jiffies != part->remote_rp_ts_jiffies &&
382 	    part->remote_rp_ts_jiffies != 0) {
383 		/*
384 		 * ??? Does what we do here need to be sensitive to
385 		 * ??? act_state or remote_act_state?
386 		 */
387 		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
388 		if (part_uv->act_state_req == 0)
389 			xpc_activate_IRQ_rcvd++;
390 		part_uv->act_state_req = XPC_P_ASR_REACTIVATE_UV;
391 		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
392 
393 		(*wakeup_hb_checker)++;
394 	}
395 }
396 
397 static irqreturn_t
398 xpc_handle_activate_IRQ_uv(int irq, void *dev_id)
399 {
400 	struct xpc_activate_mq_msghdr_uv *msg_hdr;
401 	short partid;
402 	struct xpc_partition *part;
403 	int wakeup_hb_checker = 0;
404 
405 	while ((msg_hdr = gru_get_next_message(xpc_activate_mq_uv)) != NULL) {
406 
407 		partid = msg_hdr->partid;
408 		if (partid < 0 || partid >= XP_MAX_NPARTITIONS_UV) {
409 			dev_err(xpc_part, "xpc_handle_activate_IRQ_uv() "
410 				"received invalid partid=0x%x in message\n",
411 				partid);
412 		} else {
413 			part = &xpc_partitions[partid];
414 			if (xpc_part_ref(part)) {
415 				xpc_handle_activate_mq_msg_uv(part, msg_hdr,
416 							    &wakeup_hb_checker);
417 				xpc_part_deref(part);
418 			}
419 		}
420 
421 		gru_free_message(xpc_activate_mq_uv, msg_hdr);
422 	}
423 
424 	if (wakeup_hb_checker)
425 		wake_up_interruptible(&xpc_activate_IRQ_wq);
426 
427 	return IRQ_HANDLED;
428 }
429 
430 static enum xp_retval
431 xpc_send_activate_IRQ_uv(struct xpc_partition *part, void *msg, size_t msg_size,
432 			 int msg_type)
433 {
434 	struct xpc_activate_mq_msghdr_uv *msg_hdr = msg;
435 
436 	DBUG_ON(msg_size > XPC_ACTIVATE_MSG_SIZE_UV);
437 
438 	msg_hdr->type = msg_type;
439 	msg_hdr->partid = XPC_PARTID(part);
440 	msg_hdr->act_state = part->act_state;
441 	msg_hdr->rp_ts_jiffies = xpc_rsvd_page->ts_jiffies;
442 
443 	/* ??? Is holding a spin_lock (ch->lock) during this call a bad idea? */
444 	return xpc_send_gru_msg(part->sn.uv.remote_activate_mq_gpa, msg,
445 				msg_size);
446 }
447 
448 static void
449 xpc_send_activate_IRQ_part_uv(struct xpc_partition *part, void *msg,
450 			      size_t msg_size, int msg_type)
451 {
452 	enum xp_retval ret;
453 
454 	ret = xpc_send_activate_IRQ_uv(part, msg, msg_size, msg_type);
455 	if (unlikely(ret != xpSuccess))
456 		XPC_DEACTIVATE_PARTITION(part, ret);
457 }
458 
459 static void
460 xpc_send_activate_IRQ_ch_uv(struct xpc_channel *ch, unsigned long *irq_flags,
461 			 void *msg, size_t msg_size, int msg_type)
462 {
463 	struct xpc_partition *part = &xpc_partitions[ch->number];
464 	enum xp_retval ret;
465 
466 	ret = xpc_send_activate_IRQ_uv(part, msg, msg_size, msg_type);
467 	if (unlikely(ret != xpSuccess)) {
468 		if (irq_flags != NULL)
469 			spin_unlock_irqrestore(&ch->lock, *irq_flags);
470 
471 		XPC_DEACTIVATE_PARTITION(part, ret);
472 
473 		if (irq_flags != NULL)
474 			spin_lock_irqsave(&ch->lock, *irq_flags);
475 	}
476 }
477 
478 static void
479 xpc_send_local_activate_IRQ_uv(struct xpc_partition *part, int act_state_req)
480 {
481 	unsigned long irq_flags;
482 	struct xpc_partition_uv *part_uv = &part->sn.uv;
483 
484 	/*
485 	 * !!! Make our side think that the remote parition sent an activate
486 	 * !!! message our way by doing what the activate IRQ handler would
487 	 * !!! do had one really been sent.
488 	 */
489 
490 	spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
491 	if (part_uv->act_state_req == 0)
492 		xpc_activate_IRQ_rcvd++;
493 	part_uv->act_state_req = act_state_req;
494 	spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
495 
496 	wake_up_interruptible(&xpc_activate_IRQ_wq);
497 }
498 
499 static enum xp_retval
500 xpc_get_partition_rsvd_page_pa_uv(void *buf, u64 *cookie, unsigned long *rp_pa,
501 				  size_t *len)
502 {
503 	/* !!! call the UV version of sn_partition_reserved_page_pa() */
504 	return xpUnsupported;
505 }
506 
507 static int
508 xpc_setup_rsvd_page_sn_uv(struct xpc_rsvd_page *rp)
509 {
510 	rp->sn.activate_mq_gpa = uv_gpa(xpc_activate_mq_uv);
511 	return 0;
512 }
513 
514 static void
515 xpc_send_heartbeat_uv(int msg_type)
516 {
517 	short partid;
518 	struct xpc_partition *part;
519 	struct xpc_activate_mq_msg_heartbeat_req_uv msg;
520 
521 	/*
522 	 * !!! On uv we're broadcasting a heartbeat message every 5 seconds.
523 	 * !!! Whereas on sn2 we're bte_copy'ng the heartbeat info every 20
524 	 * !!! seconds. This is an increase in numalink traffic.
525 	 * ??? Is this good?
526 	 */
527 
528 	msg.heartbeat = atomic64_inc_return(&xpc_heartbeat_uv);
529 
530 	partid = find_first_bit(xpc_heartbeating_to_mask_uv,
531 				XP_MAX_NPARTITIONS_UV);
532 
533 	while (partid < XP_MAX_NPARTITIONS_UV) {
534 		part = &xpc_partitions[partid];
535 
536 		xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
537 					      msg_type);
538 
539 		partid = find_next_bit(xpc_heartbeating_to_mask_uv,
540 				       XP_MAX_NPARTITIONS_UV, partid + 1);
541 	}
542 }
543 
544 static void
545 xpc_increment_heartbeat_uv(void)
546 {
547 	xpc_send_heartbeat_uv(XPC_ACTIVATE_MQ_MSG_INC_HEARTBEAT_UV);
548 }
549 
550 static void
551 xpc_offline_heartbeat_uv(void)
552 {
553 	xpc_send_heartbeat_uv(XPC_ACTIVATE_MQ_MSG_OFFLINE_HEARTBEAT_UV);
554 }
555 
556 static void
557 xpc_online_heartbeat_uv(void)
558 {
559 	xpc_send_heartbeat_uv(XPC_ACTIVATE_MQ_MSG_ONLINE_HEARTBEAT_UV);
560 }
561 
562 static void
563 xpc_heartbeat_init_uv(void)
564 {
565 	atomic64_set(&xpc_heartbeat_uv, 0);
566 	bitmap_zero(xpc_heartbeating_to_mask_uv, XP_MAX_NPARTITIONS_UV);
567 	xpc_heartbeating_to_mask = &xpc_heartbeating_to_mask_uv[0];
568 }
569 
570 static void
571 xpc_heartbeat_exit_uv(void)
572 {
573 	xpc_send_heartbeat_uv(XPC_ACTIVATE_MQ_MSG_OFFLINE_HEARTBEAT_UV);
574 }
575 
576 static enum xp_retval
577 xpc_get_remote_heartbeat_uv(struct xpc_partition *part)
578 {
579 	struct xpc_partition_uv *part_uv = &part->sn.uv;
580 	enum xp_retval ret = xpNoHeartbeat;
581 
582 	if (part_uv->remote_act_state != XPC_P_AS_INACTIVE &&
583 	    part_uv->remote_act_state != XPC_P_AS_DEACTIVATING) {
584 
585 		if (part_uv->heartbeat != part->last_heartbeat ||
586 		    (part_uv->flags & XPC_P_HEARTBEAT_OFFLINE_UV)) {
587 
588 			part->last_heartbeat = part_uv->heartbeat;
589 			ret = xpSuccess;
590 		}
591 	}
592 	return ret;
593 }
594 
595 static void
596 xpc_request_partition_activation_uv(struct xpc_rsvd_page *remote_rp,
597 				    unsigned long remote_rp_gpa, int nasid)
598 {
599 	short partid = remote_rp->SAL_partid;
600 	struct xpc_partition *part = &xpc_partitions[partid];
601 	struct xpc_activate_mq_msg_activate_req_uv msg;
602 
603 	part->remote_rp_pa = remote_rp_gpa; /* !!! _pa here is really _gpa */
604 	part->remote_rp_ts_jiffies = remote_rp->ts_jiffies;
605 	part->sn.uv.remote_activate_mq_gpa = remote_rp->sn.activate_mq_gpa;
606 
607 	/*
608 	 * ??? Is it a good idea to make this conditional on what is
609 	 * ??? potentially stale state information?
610 	 */
611 	if (part->sn.uv.remote_act_state == XPC_P_AS_INACTIVE) {
612 		msg.rp_gpa = uv_gpa(xpc_rsvd_page);
613 		msg.activate_mq_gpa = xpc_rsvd_page->sn.activate_mq_gpa;
614 		xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
615 					   XPC_ACTIVATE_MQ_MSG_ACTIVATE_REQ_UV);
616 	}
617 
618 	if (part->act_state == XPC_P_AS_INACTIVE)
619 		xpc_send_local_activate_IRQ_uv(part, XPC_P_ASR_ACTIVATE_UV);
620 }
621 
622 static void
623 xpc_request_partition_reactivation_uv(struct xpc_partition *part)
624 {
625 	xpc_send_local_activate_IRQ_uv(part, XPC_P_ASR_ACTIVATE_UV);
626 }
627 
628 static void
629 xpc_request_partition_deactivation_uv(struct xpc_partition *part)
630 {
631 	struct xpc_activate_mq_msg_deactivate_req_uv msg;
632 
633 	/*
634 	 * ??? Is it a good idea to make this conditional on what is
635 	 * ??? potentially stale state information?
636 	 */
637 	if (part->sn.uv.remote_act_state != XPC_P_AS_DEACTIVATING &&
638 	    part->sn.uv.remote_act_state != XPC_P_AS_INACTIVE) {
639 
640 		msg.reason = part->reason;
641 		xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
642 					 XPC_ACTIVATE_MQ_MSG_DEACTIVATE_REQ_UV);
643 	}
644 }
645 
646 static void
647 xpc_cancel_partition_deactivation_request_uv(struct xpc_partition *part)
648 {
649 	/* nothing needs to be done */
650 	return;
651 }
652 
653 static void
654 xpc_init_fifo_uv(struct xpc_fifo_head_uv *head)
655 {
656 	head->first = NULL;
657 	head->last = NULL;
658 	spin_lock_init(&head->lock);
659 	head->n_entries = 0;
660 }
661 
662 static void *
663 xpc_get_fifo_entry_uv(struct xpc_fifo_head_uv *head)
664 {
665 	unsigned long irq_flags;
666 	struct xpc_fifo_entry_uv *first;
667 
668 	spin_lock_irqsave(&head->lock, irq_flags);
669 	first = head->first;
670 	if (head->first != NULL) {
671 		head->first = first->next;
672 		if (head->first == NULL)
673 			head->last = NULL;
674 	}
675 	head->n_entries++;
676 	spin_unlock_irqrestore(&head->lock, irq_flags);
677 	first->next = NULL;
678 	return first;
679 }
680 
681 static void
682 xpc_put_fifo_entry_uv(struct xpc_fifo_head_uv *head,
683 		      struct xpc_fifo_entry_uv *last)
684 {
685 	unsigned long irq_flags;
686 
687 	last->next = NULL;
688 	spin_lock_irqsave(&head->lock, irq_flags);
689 	if (head->last != NULL)
690 		head->last->next = last;
691 	else
692 		head->first = last;
693 	head->last = last;
694 	head->n_entries--;
695 	BUG_ON(head->n_entries < 0);
696 	spin_unlock_irqrestore(&head->lock, irq_flags);
697 }
698 
699 static int
700 xpc_n_of_fifo_entries_uv(struct xpc_fifo_head_uv *head)
701 {
702 	return head->n_entries;
703 }
704 
705 /*
706  * Setup the channel structures that are uv specific.
707  */
708 static enum xp_retval
709 xpc_setup_ch_structures_sn_uv(struct xpc_partition *part)
710 {
711 	struct xpc_channel_uv *ch_uv;
712 	int ch_number;
713 
714 	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
715 		ch_uv = &part->channels[ch_number].sn.uv;
716 
717 		xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
718 		xpc_init_fifo_uv(&ch_uv->recv_msg_list);
719 	}
720 
721 	return xpSuccess;
722 }
723 
724 /*
725  * Teardown the channel structures that are uv specific.
726  */
727 static void
728 xpc_teardown_ch_structures_sn_uv(struct xpc_partition *part)
729 {
730 	/* nothing needs to be done */
731 	return;
732 }
733 
734 static enum xp_retval
735 xpc_make_first_contact_uv(struct xpc_partition *part)
736 {
737 	struct xpc_activate_mq_msg_uv msg;
738 
739 	/*
740 	 * We send a sync msg to get the remote partition's remote_act_state
741 	 * updated to our current act_state which at this point should
742 	 * be XPC_P_AS_ACTIVATING.
743 	 */
744 	xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
745 				      XPC_ACTIVATE_MQ_MSG_SYNC_ACT_STATE_UV);
746 
747 	while (part->sn.uv.remote_act_state != XPC_P_AS_ACTIVATING) {
748 
749 		dev_dbg(xpc_part, "waiting to make first contact with "
750 			"partition %d\n", XPC_PARTID(part));
751 
752 		/* wait a 1/4 of a second or so */
753 		(void)msleep_interruptible(250);
754 
755 		if (part->act_state == XPC_P_AS_DEACTIVATING)
756 			return part->reason;
757 	}
758 
759 	return xpSuccess;
760 }
761 
762 static u64
763 xpc_get_chctl_all_flags_uv(struct xpc_partition *part)
764 {
765 	unsigned long irq_flags;
766 	union xpc_channel_ctl_flags chctl;
767 
768 	spin_lock_irqsave(&part->chctl_lock, irq_flags);
769 	chctl = part->chctl;
770 	if (chctl.all_flags != 0)
771 		part->chctl.all_flags = 0;
772 
773 	spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
774 	return chctl.all_flags;
775 }
776 
777 static enum xp_retval
778 xpc_allocate_send_msg_slot_uv(struct xpc_channel *ch)
779 {
780 	struct xpc_channel_uv *ch_uv = &ch->sn.uv;
781 	struct xpc_send_msg_slot_uv *msg_slot;
782 	unsigned long irq_flags;
783 	int nentries;
784 	int entry;
785 	size_t nbytes;
786 
787 	for (nentries = ch->local_nentries; nentries > 0; nentries--) {
788 		nbytes = nentries * sizeof(struct xpc_send_msg_slot_uv);
789 		ch_uv->send_msg_slots = kzalloc(nbytes, GFP_KERNEL);
790 		if (ch_uv->send_msg_slots == NULL)
791 			continue;
792 
793 		for (entry = 0; entry < nentries; entry++) {
794 			msg_slot = &ch_uv->send_msg_slots[entry];
795 
796 			msg_slot->msg_slot_number = entry;
797 			xpc_put_fifo_entry_uv(&ch_uv->msg_slot_free_list,
798 					      &msg_slot->next);
799 		}
800 
801 		spin_lock_irqsave(&ch->lock, irq_flags);
802 		if (nentries < ch->local_nentries)
803 			ch->local_nentries = nentries;
804 		spin_unlock_irqrestore(&ch->lock, irq_flags);
805 		return xpSuccess;
806 	}
807 
808 	return xpNoMemory;
809 }
810 
811 static enum xp_retval
812 xpc_allocate_recv_msg_slot_uv(struct xpc_channel *ch)
813 {
814 	struct xpc_channel_uv *ch_uv = &ch->sn.uv;
815 	struct xpc_notify_mq_msg_uv *msg_slot;
816 	unsigned long irq_flags;
817 	int nentries;
818 	int entry;
819 	size_t nbytes;
820 
821 	for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
822 		nbytes = nentries * ch->entry_size;
823 		ch_uv->recv_msg_slots = kzalloc(nbytes, GFP_KERNEL);
824 		if (ch_uv->recv_msg_slots == NULL)
825 			continue;
826 
827 		for (entry = 0; entry < nentries; entry++) {
828 			msg_slot = ch_uv->recv_msg_slots + entry *
829 			    ch->entry_size;
830 
831 			msg_slot->hdr.msg_slot_number = entry;
832 		}
833 
834 		spin_lock_irqsave(&ch->lock, irq_flags);
835 		if (nentries < ch->remote_nentries)
836 			ch->remote_nentries = nentries;
837 		spin_unlock_irqrestore(&ch->lock, irq_flags);
838 		return xpSuccess;
839 	}
840 
841 	return xpNoMemory;
842 }
843 
844 /*
845  * Allocate msg_slots associated with the channel.
846  */
847 static enum xp_retval
848 xpc_setup_msg_structures_uv(struct xpc_channel *ch)
849 {
850 	static enum xp_retval ret;
851 	struct xpc_channel_uv *ch_uv = &ch->sn.uv;
852 
853 	DBUG_ON(ch->flags & XPC_C_SETUP);
854 
855 	ret = xpc_allocate_send_msg_slot_uv(ch);
856 	if (ret == xpSuccess) {
857 
858 		ret = xpc_allocate_recv_msg_slot_uv(ch);
859 		if (ret != xpSuccess) {
860 			kfree(ch_uv->send_msg_slots);
861 			xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
862 		}
863 	}
864 	return ret;
865 }
866 
867 /*
868  * Free up msg_slots and clear other stuff that were setup for the specified
869  * channel.
870  */
871 static void
872 xpc_teardown_msg_structures_uv(struct xpc_channel *ch)
873 {
874 	struct xpc_channel_uv *ch_uv = &ch->sn.uv;
875 
876 	DBUG_ON(!spin_is_locked(&ch->lock));
877 
878 	ch_uv->remote_notify_mq_gpa = 0;
879 
880 	if (ch->flags & XPC_C_SETUP) {
881 		xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
882 		kfree(ch_uv->send_msg_slots);
883 		xpc_init_fifo_uv(&ch_uv->recv_msg_list);
884 		kfree(ch_uv->recv_msg_slots);
885 	}
886 }
887 
888 static void
889 xpc_send_chctl_closerequest_uv(struct xpc_channel *ch, unsigned long *irq_flags)
890 {
891 	struct xpc_activate_mq_msg_chctl_closerequest_uv msg;
892 
893 	msg.ch_number = ch->number;
894 	msg.reason = ch->reason;
895 	xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
896 				    XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREQUEST_UV);
897 }
898 
899 static void
900 xpc_send_chctl_closereply_uv(struct xpc_channel *ch, unsigned long *irq_flags)
901 {
902 	struct xpc_activate_mq_msg_chctl_closereply_uv msg;
903 
904 	msg.ch_number = ch->number;
905 	xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
906 				    XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREPLY_UV);
907 }
908 
909 static void
910 xpc_send_chctl_openrequest_uv(struct xpc_channel *ch, unsigned long *irq_flags)
911 {
912 	struct xpc_activate_mq_msg_chctl_openrequest_uv msg;
913 
914 	msg.ch_number = ch->number;
915 	msg.entry_size = ch->entry_size;
916 	msg.local_nentries = ch->local_nentries;
917 	xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
918 				    XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREQUEST_UV);
919 }
920 
921 static void
922 xpc_send_chctl_openreply_uv(struct xpc_channel *ch, unsigned long *irq_flags)
923 {
924 	struct xpc_activate_mq_msg_chctl_openreply_uv msg;
925 
926 	msg.ch_number = ch->number;
927 	msg.local_nentries = ch->local_nentries;
928 	msg.remote_nentries = ch->remote_nentries;
929 	msg.local_notify_mq_gpa = uv_gpa(xpc_notify_mq_uv);
930 	xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
931 				    XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREPLY_UV);
932 }
933 
934 static void
935 xpc_send_chctl_local_msgrequest_uv(struct xpc_partition *part, int ch_number)
936 {
937 	unsigned long irq_flags;
938 
939 	spin_lock_irqsave(&part->chctl_lock, irq_flags);
940 	part->chctl.flags[ch_number] |= XPC_CHCTL_MSGREQUEST;
941 	spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
942 
943 	xpc_wakeup_channel_mgr(part);
944 }
945 
946 static void
947 xpc_save_remote_msgqueue_pa_uv(struct xpc_channel *ch,
948 			       unsigned long msgqueue_pa)
949 {
950 	ch->sn.uv.remote_notify_mq_gpa = msgqueue_pa;
951 }
952 
953 static void
954 xpc_indicate_partition_engaged_uv(struct xpc_partition *part)
955 {
956 	struct xpc_activate_mq_msg_uv msg;
957 
958 	xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
959 				      XPC_ACTIVATE_MQ_MSG_MARK_ENGAGED_UV);
960 }
961 
962 static void
963 xpc_indicate_partition_disengaged_uv(struct xpc_partition *part)
964 {
965 	struct xpc_activate_mq_msg_uv msg;
966 
967 	xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
968 				      XPC_ACTIVATE_MQ_MSG_MARK_DISENGAGED_UV);
969 }
970 
971 static void
972 xpc_assume_partition_disengaged_uv(short partid)
973 {
974 	struct xpc_partition_uv *part_uv = &xpc_partitions[partid].sn.uv;
975 	unsigned long irq_flags;
976 
977 	spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
978 	part_uv->flags &= ~XPC_P_ENGAGED_UV;
979 	spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
980 }
981 
982 static int
983 xpc_partition_engaged_uv(short partid)
984 {
985 	return (xpc_partitions[partid].sn.uv.flags & XPC_P_ENGAGED_UV) != 0;
986 }
987 
988 static int
989 xpc_any_partition_engaged_uv(void)
990 {
991 	struct xpc_partition_uv *part_uv;
992 	short partid;
993 
994 	for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
995 		part_uv = &xpc_partitions[partid].sn.uv;
996 		if ((part_uv->flags & XPC_P_ENGAGED_UV) != 0)
997 			return 1;
998 	}
999 	return 0;
1000 }
1001 
1002 static enum xp_retval
1003 xpc_allocate_msg_slot_uv(struct xpc_channel *ch, u32 flags,
1004 			 struct xpc_send_msg_slot_uv **address_of_msg_slot)
1005 {
1006 	enum xp_retval ret;
1007 	struct xpc_send_msg_slot_uv *msg_slot;
1008 	struct xpc_fifo_entry_uv *entry;
1009 
1010 	while (1) {
1011 		entry = xpc_get_fifo_entry_uv(&ch->sn.uv.msg_slot_free_list);
1012 		if (entry != NULL)
1013 			break;
1014 
1015 		if (flags & XPC_NOWAIT)
1016 			return xpNoWait;
1017 
1018 		ret = xpc_allocate_msg_wait(ch);
1019 		if (ret != xpInterrupted && ret != xpTimeout)
1020 			return ret;
1021 	}
1022 
1023 	msg_slot = container_of(entry, struct xpc_send_msg_slot_uv, next);
1024 	*address_of_msg_slot = msg_slot;
1025 	return xpSuccess;
1026 }
1027 
1028 static void
1029 xpc_free_msg_slot_uv(struct xpc_channel *ch,
1030 		     struct xpc_send_msg_slot_uv *msg_slot)
1031 {
1032 	xpc_put_fifo_entry_uv(&ch->sn.uv.msg_slot_free_list, &msg_slot->next);
1033 
1034 	/* wakeup anyone waiting for a free msg slot */
1035 	if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
1036 		wake_up(&ch->msg_allocate_wq);
1037 }
1038 
1039 static void
1040 xpc_notify_sender_uv(struct xpc_channel *ch,
1041 		     struct xpc_send_msg_slot_uv *msg_slot,
1042 		     enum xp_retval reason)
1043 {
1044 	xpc_notify_func func = msg_slot->func;
1045 
1046 	if (func != NULL && cmpxchg(&msg_slot->func, func, NULL) == func) {
1047 
1048 		atomic_dec(&ch->n_to_notify);
1049 
1050 		dev_dbg(xpc_chan, "msg_slot->func() called, msg_slot=0x%p "
1051 			"msg_slot_number=%d partid=%d channel=%d\n", msg_slot,
1052 			msg_slot->msg_slot_number, ch->partid, ch->number);
1053 
1054 		func(reason, ch->partid, ch->number, msg_slot->key);
1055 
1056 		dev_dbg(xpc_chan, "msg_slot->func() returned, msg_slot=0x%p "
1057 			"msg_slot_number=%d partid=%d channel=%d\n", msg_slot,
1058 			msg_slot->msg_slot_number, ch->partid, ch->number);
1059 	}
1060 }
1061 
1062 static void
1063 xpc_handle_notify_mq_ack_uv(struct xpc_channel *ch,
1064 			    struct xpc_notify_mq_msg_uv *msg)
1065 {
1066 	struct xpc_send_msg_slot_uv *msg_slot;
1067 	int entry = msg->hdr.msg_slot_number % ch->local_nentries;
1068 
1069 	msg_slot = &ch->sn.uv.send_msg_slots[entry];
1070 
1071 	BUG_ON(msg_slot->msg_slot_number != msg->hdr.msg_slot_number);
1072 	msg_slot->msg_slot_number += ch->local_nentries;
1073 
1074 	if (msg_slot->func != NULL)
1075 		xpc_notify_sender_uv(ch, msg_slot, xpMsgDelivered);
1076 
1077 	xpc_free_msg_slot_uv(ch, msg_slot);
1078 }
1079 
1080 static void
1081 xpc_handle_notify_mq_msg_uv(struct xpc_partition *part,
1082 			    struct xpc_notify_mq_msg_uv *msg)
1083 {
1084 	struct xpc_partition_uv *part_uv = &part->sn.uv;
1085 	struct xpc_channel *ch;
1086 	struct xpc_channel_uv *ch_uv;
1087 	struct xpc_notify_mq_msg_uv *msg_slot;
1088 	unsigned long irq_flags;
1089 	int ch_number = msg->hdr.ch_number;
1090 
1091 	if (unlikely(ch_number >= part->nchannels)) {
1092 		dev_err(xpc_part, "xpc_handle_notify_IRQ_uv() received invalid "
1093 			"channel number=0x%x in message from partid=%d\n",
1094 			ch_number, XPC_PARTID(part));
1095 
1096 		/* get hb checker to deactivate from the remote partition */
1097 		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
1098 		if (part_uv->act_state_req == 0)
1099 			xpc_activate_IRQ_rcvd++;
1100 		part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
1101 		part_uv->reason = xpBadChannelNumber;
1102 		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
1103 
1104 		wake_up_interruptible(&xpc_activate_IRQ_wq);
1105 		return;
1106 	}
1107 
1108 	ch = &part->channels[ch_number];
1109 	xpc_msgqueue_ref(ch);
1110 
1111 	if (!(ch->flags & XPC_C_CONNECTED)) {
1112 		xpc_msgqueue_deref(ch);
1113 		return;
1114 	}
1115 
1116 	/* see if we're really dealing with an ACK for a previously sent msg */
1117 	if (msg->hdr.size == 0) {
1118 		xpc_handle_notify_mq_ack_uv(ch, msg);
1119 		xpc_msgqueue_deref(ch);
1120 		return;
1121 	}
1122 
1123 	/* we're dealing with a normal message sent via the notify_mq */
1124 	ch_uv = &ch->sn.uv;
1125 
1126 	msg_slot = (struct xpc_notify_mq_msg_uv *)((u64)ch_uv->recv_msg_slots +
1127 		    (msg->hdr.msg_slot_number % ch->remote_nentries) *
1128 		    ch->entry_size);
1129 
1130 	BUG_ON(msg->hdr.msg_slot_number != msg_slot->hdr.msg_slot_number);
1131 	BUG_ON(msg_slot->hdr.size != 0);
1132 
1133 	memcpy(msg_slot, msg, msg->hdr.size);
1134 
1135 	xpc_put_fifo_entry_uv(&ch_uv->recv_msg_list, &msg_slot->hdr.u.next);
1136 
1137 	if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) {
1138 		/*
1139 		 * If there is an existing idle kthread get it to deliver
1140 		 * the payload, otherwise we'll have to get the channel mgr
1141 		 * for this partition to create a kthread to do the delivery.
1142 		 */
1143 		if (atomic_read(&ch->kthreads_idle) > 0)
1144 			wake_up_nr(&ch->idle_wq, 1);
1145 		else
1146 			xpc_send_chctl_local_msgrequest_uv(part, ch->number);
1147 	}
1148 	xpc_msgqueue_deref(ch);
1149 }
1150 
1151 static irqreturn_t
1152 xpc_handle_notify_IRQ_uv(int irq, void *dev_id)
1153 {
1154 	struct xpc_notify_mq_msg_uv *msg;
1155 	short partid;
1156 	struct xpc_partition *part;
1157 
1158 	while ((msg = gru_get_next_message(xpc_notify_mq_uv)) != NULL) {
1159 
1160 		partid = msg->hdr.partid;
1161 		if (partid < 0 || partid >= XP_MAX_NPARTITIONS_UV) {
1162 			dev_err(xpc_part, "xpc_handle_notify_IRQ_uv() received "
1163 				"invalid partid=0x%x in message\n", partid);
1164 		} else {
1165 			part = &xpc_partitions[partid];
1166 
1167 			if (xpc_part_ref(part)) {
1168 				xpc_handle_notify_mq_msg_uv(part, msg);
1169 				xpc_part_deref(part);
1170 			}
1171 		}
1172 
1173 		gru_free_message(xpc_notify_mq_uv, msg);
1174 	}
1175 
1176 	return IRQ_HANDLED;
1177 }
1178 
1179 static int
1180 xpc_n_of_deliverable_payloads_uv(struct xpc_channel *ch)
1181 {
1182 	return xpc_n_of_fifo_entries_uv(&ch->sn.uv.recv_msg_list);
1183 }
1184 
1185 static void
1186 xpc_process_msg_chctl_flags_uv(struct xpc_partition *part, int ch_number)
1187 {
1188 	struct xpc_channel *ch = &part->channels[ch_number];
1189 	int ndeliverable_payloads;
1190 
1191 	xpc_msgqueue_ref(ch);
1192 
1193 	ndeliverable_payloads = xpc_n_of_deliverable_payloads_uv(ch);
1194 
1195 	if (ndeliverable_payloads > 0 &&
1196 	    (ch->flags & XPC_C_CONNECTED) &&
1197 	    (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE)) {
1198 
1199 		xpc_activate_kthreads(ch, ndeliverable_payloads);
1200 	}
1201 
1202 	xpc_msgqueue_deref(ch);
1203 }
1204 
1205 static enum xp_retval
1206 xpc_send_payload_uv(struct xpc_channel *ch, u32 flags, void *payload,
1207 		    u16 payload_size, u8 notify_type, xpc_notify_func func,
1208 		    void *key)
1209 {
1210 	enum xp_retval ret = xpSuccess;
1211 	struct xpc_send_msg_slot_uv *msg_slot = NULL;
1212 	struct xpc_notify_mq_msg_uv *msg;
1213 	u8 msg_buffer[XPC_NOTIFY_MSG_SIZE_UV];
1214 	size_t msg_size;
1215 
1216 	DBUG_ON(notify_type != XPC_N_CALL);
1217 
1218 	msg_size = sizeof(struct xpc_notify_mq_msghdr_uv) + payload_size;
1219 	if (msg_size > ch->entry_size)
1220 		return xpPayloadTooBig;
1221 
1222 	xpc_msgqueue_ref(ch);
1223 
1224 	if (ch->flags & XPC_C_DISCONNECTING) {
1225 		ret = ch->reason;
1226 		goto out_1;
1227 	}
1228 	if (!(ch->flags & XPC_C_CONNECTED)) {
1229 		ret = xpNotConnected;
1230 		goto out_1;
1231 	}
1232 
1233 	ret = xpc_allocate_msg_slot_uv(ch, flags, &msg_slot);
1234 	if (ret != xpSuccess)
1235 		goto out_1;
1236 
1237 	if (func != NULL) {
1238 		atomic_inc(&ch->n_to_notify);
1239 
1240 		msg_slot->key = key;
1241 		wmb(); /* a non-NULL func must hit memory after the key */
1242 		msg_slot->func = func;
1243 
1244 		if (ch->flags & XPC_C_DISCONNECTING) {
1245 			ret = ch->reason;
1246 			goto out_2;
1247 		}
1248 	}
1249 
1250 	msg = (struct xpc_notify_mq_msg_uv *)&msg_buffer;
1251 	msg->hdr.partid = xp_partition_id;
1252 	msg->hdr.ch_number = ch->number;
1253 	msg->hdr.size = msg_size;
1254 	msg->hdr.msg_slot_number = msg_slot->msg_slot_number;
1255 	memcpy(&msg->payload, payload, payload_size);
1256 
1257 	ret = xpc_send_gru_msg(ch->sn.uv.remote_notify_mq_gpa, msg, msg_size);
1258 	if (ret == xpSuccess)
1259 		goto out_1;
1260 
1261 	XPC_DEACTIVATE_PARTITION(&xpc_partitions[ch->partid], ret);
1262 out_2:
1263 	if (func != NULL) {
1264 		/*
1265 		 * Try to NULL the msg_slot's func field. If we fail, then
1266 		 * xpc_notify_senders_of_disconnect_uv() beat us to it, in which
1267 		 * case we need to pretend we succeeded to send the message
1268 		 * since the user will get a callout for the disconnect error
1269 		 * by xpc_notify_senders_of_disconnect_uv(), and to also get an
1270 		 * error returned here will confuse them. Additionally, since
1271 		 * in this case the channel is being disconnected we don't need
1272 		 * to put the the msg_slot back on the free list.
1273 		 */
1274 		if (cmpxchg(&msg_slot->func, func, NULL) != func) {
1275 			ret = xpSuccess;
1276 			goto out_1;
1277 		}
1278 
1279 		msg_slot->key = NULL;
1280 		atomic_dec(&ch->n_to_notify);
1281 	}
1282 	xpc_free_msg_slot_uv(ch, msg_slot);
1283 out_1:
1284 	xpc_msgqueue_deref(ch);
1285 	return ret;
1286 }
1287 
1288 /*
1289  * Tell the callers of xpc_send_notify() that the status of their payloads
1290  * is unknown because the channel is now disconnecting.
1291  *
1292  * We don't worry about putting these msg_slots on the free list since the
1293  * msg_slots themselves are about to be kfree'd.
1294  */
1295 static void
1296 xpc_notify_senders_of_disconnect_uv(struct xpc_channel *ch)
1297 {
1298 	struct xpc_send_msg_slot_uv *msg_slot;
1299 	int entry;
1300 
1301 	DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
1302 
1303 	for (entry = 0; entry < ch->local_nentries; entry++) {
1304 
1305 		if (atomic_read(&ch->n_to_notify) == 0)
1306 			break;
1307 
1308 		msg_slot = &ch->sn.uv.send_msg_slots[entry];
1309 		if (msg_slot->func != NULL)
1310 			xpc_notify_sender_uv(ch, msg_slot, ch->reason);
1311 	}
1312 }
1313 
1314 /*
1315  * Get the next deliverable message's payload.
1316  */
1317 static void *
1318 xpc_get_deliverable_payload_uv(struct xpc_channel *ch)
1319 {
1320 	struct xpc_fifo_entry_uv *entry;
1321 	struct xpc_notify_mq_msg_uv *msg;
1322 	void *payload = NULL;
1323 
1324 	if (!(ch->flags & XPC_C_DISCONNECTING)) {
1325 		entry = xpc_get_fifo_entry_uv(&ch->sn.uv.recv_msg_list);
1326 		if (entry != NULL) {
1327 			msg = container_of(entry, struct xpc_notify_mq_msg_uv,
1328 					   hdr.u.next);
1329 			payload = &msg->payload;
1330 		}
1331 	}
1332 	return payload;
1333 }
1334 
1335 static void
1336 xpc_received_payload_uv(struct xpc_channel *ch, void *payload)
1337 {
1338 	struct xpc_notify_mq_msg_uv *msg;
1339 	enum xp_retval ret;
1340 
1341 	msg = container_of(payload, struct xpc_notify_mq_msg_uv, payload);
1342 
1343 	/* return an ACK to the sender of this message */
1344 
1345 	msg->hdr.partid = xp_partition_id;
1346 	msg->hdr.size = 0;	/* size of zero indicates this is an ACK */
1347 
1348 	ret = xpc_send_gru_msg(ch->sn.uv.remote_notify_mq_gpa, msg,
1349 			       sizeof(struct xpc_notify_mq_msghdr_uv));
1350 	if (ret != xpSuccess)
1351 		XPC_DEACTIVATE_PARTITION(&xpc_partitions[ch->partid], ret);
1352 
1353 	msg->hdr.msg_slot_number += ch->remote_nentries;
1354 }
1355 
1356 int
1357 xpc_init_uv(void)
1358 {
1359 	xpc_setup_partitions_sn = xpc_setup_partitions_sn_uv;
1360 	xpc_process_activate_IRQ_rcvd = xpc_process_activate_IRQ_rcvd_uv;
1361 	xpc_get_partition_rsvd_page_pa = xpc_get_partition_rsvd_page_pa_uv;
1362 	xpc_setup_rsvd_page_sn = xpc_setup_rsvd_page_sn_uv;
1363 	xpc_increment_heartbeat = xpc_increment_heartbeat_uv;
1364 	xpc_offline_heartbeat = xpc_offline_heartbeat_uv;
1365 	xpc_online_heartbeat = xpc_online_heartbeat_uv;
1366 	xpc_heartbeat_init = xpc_heartbeat_init_uv;
1367 	xpc_heartbeat_exit = xpc_heartbeat_exit_uv;
1368 	xpc_get_remote_heartbeat = xpc_get_remote_heartbeat_uv;
1369 
1370 	xpc_request_partition_activation = xpc_request_partition_activation_uv;
1371 	xpc_request_partition_reactivation =
1372 	    xpc_request_partition_reactivation_uv;
1373 	xpc_request_partition_deactivation =
1374 	    xpc_request_partition_deactivation_uv;
1375 	xpc_cancel_partition_deactivation_request =
1376 	    xpc_cancel_partition_deactivation_request_uv;
1377 
1378 	xpc_setup_ch_structures_sn = xpc_setup_ch_structures_sn_uv;
1379 	xpc_teardown_ch_structures_sn = xpc_teardown_ch_structures_sn_uv;
1380 
1381 	xpc_make_first_contact = xpc_make_first_contact_uv;
1382 
1383 	xpc_get_chctl_all_flags = xpc_get_chctl_all_flags_uv;
1384 	xpc_send_chctl_closerequest = xpc_send_chctl_closerequest_uv;
1385 	xpc_send_chctl_closereply = xpc_send_chctl_closereply_uv;
1386 	xpc_send_chctl_openrequest = xpc_send_chctl_openrequest_uv;
1387 	xpc_send_chctl_openreply = xpc_send_chctl_openreply_uv;
1388 
1389 	xpc_save_remote_msgqueue_pa = xpc_save_remote_msgqueue_pa_uv;
1390 
1391 	xpc_setup_msg_structures = xpc_setup_msg_structures_uv;
1392 	xpc_teardown_msg_structures = xpc_teardown_msg_structures_uv;
1393 
1394 	xpc_indicate_partition_engaged = xpc_indicate_partition_engaged_uv;
1395 	xpc_indicate_partition_disengaged =
1396 	    xpc_indicate_partition_disengaged_uv;
1397 	xpc_assume_partition_disengaged = xpc_assume_partition_disengaged_uv;
1398 	xpc_partition_engaged = xpc_partition_engaged_uv;
1399 	xpc_any_partition_engaged = xpc_any_partition_engaged_uv;
1400 
1401 	xpc_n_of_deliverable_payloads = xpc_n_of_deliverable_payloads_uv;
1402 	xpc_process_msg_chctl_flags = xpc_process_msg_chctl_flags_uv;
1403 	xpc_send_payload = xpc_send_payload_uv;
1404 	xpc_notify_senders_of_disconnect = xpc_notify_senders_of_disconnect_uv;
1405 	xpc_get_deliverable_payload = xpc_get_deliverable_payload_uv;
1406 	xpc_received_payload = xpc_received_payload_uv;
1407 
1408 	if (sizeof(struct xpc_notify_mq_msghdr_uv) > XPC_MSG_HDR_MAX_SIZE) {
1409 		dev_err(xpc_part, "xpc_notify_mq_msghdr_uv is larger than %d\n",
1410 			XPC_MSG_HDR_MAX_SIZE);
1411 		return -E2BIG;
1412 	}
1413 
1414 	/* ??? The cpuid argument's value is 0, is that what we want? */
1415 	/* !!! The irq argument's value isn't correct. */
1416 	xpc_activate_mq_uv = xpc_create_gru_mq_uv(XPC_ACTIVATE_MQ_SIZE_UV, 0, 0,
1417 						  xpc_handle_activate_IRQ_uv);
1418 	if (xpc_activate_mq_uv == NULL)
1419 		return -ENOMEM;
1420 
1421 	/* ??? The cpuid argument's value is 0, is that what we want? */
1422 	/* !!! The irq argument's value isn't correct. */
1423 	xpc_notify_mq_uv = xpc_create_gru_mq_uv(XPC_NOTIFY_MQ_SIZE_UV, 0, 0,
1424 						xpc_handle_notify_IRQ_uv);
1425 	if (xpc_notify_mq_uv == NULL) {
1426 		/* !!! The irq argument's value isn't correct. */
1427 		xpc_destroy_gru_mq_uv(xpc_activate_mq_uv,
1428 				      XPC_ACTIVATE_MQ_SIZE_UV, 0);
1429 		return -ENOMEM;
1430 	}
1431 
1432 	return 0;
1433 }
1434 
1435 void
1436 xpc_exit_uv(void)
1437 {
1438 	/* !!! The irq argument's value isn't correct. */
1439 	xpc_destroy_gru_mq_uv(xpc_notify_mq_uv, XPC_NOTIFY_MQ_SIZE_UV, 0);
1440 
1441 	/* !!! The irq argument's value isn't correct. */
1442 	xpc_destroy_gru_mq_uv(xpc_activate_mq_uv, XPC_ACTIVATE_MQ_SIZE_UV, 0);
1443 }
1444