xref: /linux/drivers/misc/sgi-xp/xpc_uv.c (revision c532de5a67a70f8533d495f8f2aaa9a0491c3ad0)
1 /*
2  * This file is subject to the terms and conditions of the GNU General Public
3  * License.  See the file "COPYING" in the main directory of this archive
4  * for more details.
5  *
6  * Copyright (c) 2008-2009 Silicon Graphics, Inc.  All Rights Reserved.
7  */
8 
9 /*
10  * Cross Partition Communication (XPC) uv-based functions.
11  *
12  *     Architecture specific implementation of common functions.
13  *
14  */
15 
16 #include <linux/kernel.h>
17 #include <linux/mm.h>
18 #include <linux/interrupt.h>
19 #include <linux/delay.h>
20 #include <linux/device.h>
21 #include <linux/cpu.h>
22 #include <linux/module.h>
23 #include <linux/err.h>
24 #include <linux/slab.h>
25 #include <linux/numa.h>
26 #include <asm/uv/uv_hub.h>
27 #include <asm/uv/bios.h>
28 #include <asm/uv/uv_irq.h>
29 #include "../sgi-gru/gru.h"
30 #include "../sgi-gru/grukservices.h"
31 #include "xpc.h"
32 
33 static struct xpc_heartbeat_uv *xpc_heartbeat_uv;
34 
35 #define XPC_ACTIVATE_MSG_SIZE_UV	(1 * GRU_CACHE_LINE_BYTES)
36 #define XPC_ACTIVATE_MQ_SIZE_UV		(4 * XP_MAX_NPARTITIONS_UV * \
37 					 XPC_ACTIVATE_MSG_SIZE_UV)
38 #define XPC_ACTIVATE_IRQ_NAME		"xpc_activate"
39 
40 #define XPC_NOTIFY_MSG_SIZE_UV		(2 * GRU_CACHE_LINE_BYTES)
41 #define XPC_NOTIFY_MQ_SIZE_UV		(4 * XP_MAX_NPARTITIONS_UV * \
42 					 XPC_NOTIFY_MSG_SIZE_UV)
43 #define XPC_NOTIFY_IRQ_NAME		"xpc_notify"
44 
45 static int xpc_mq_node = NUMA_NO_NODE;
46 
47 static struct xpc_gru_mq_uv *xpc_activate_mq_uv;
48 static struct xpc_gru_mq_uv *xpc_notify_mq_uv;
49 
50 static int
51 xpc_setup_partitions_uv(void)
52 {
53 	short partid;
54 	struct xpc_partition_uv *part_uv;
55 
56 	for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
57 		part_uv = &xpc_partitions[partid].sn.uv;
58 
59 		mutex_init(&part_uv->cached_activate_gru_mq_desc_mutex);
60 		spin_lock_init(&part_uv->flags_lock);
61 		part_uv->remote_act_state = XPC_P_AS_INACTIVE;
62 	}
63 	return 0;
64 }
65 
66 static void
67 xpc_teardown_partitions_uv(void)
68 {
69 	short partid;
70 	struct xpc_partition_uv *part_uv;
71 	unsigned long irq_flags;
72 
73 	for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
74 		part_uv = &xpc_partitions[partid].sn.uv;
75 
76 		if (part_uv->cached_activate_gru_mq_desc != NULL) {
77 			mutex_lock(&part_uv->cached_activate_gru_mq_desc_mutex);
78 			spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
79 			part_uv->flags &= ~XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV;
80 			spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
81 			kfree(part_uv->cached_activate_gru_mq_desc);
82 			part_uv->cached_activate_gru_mq_desc = NULL;
83 			mutex_unlock(&part_uv->
84 				     cached_activate_gru_mq_desc_mutex);
85 		}
86 	}
87 }
88 
89 static int
90 xpc_get_gru_mq_irq_uv(struct xpc_gru_mq_uv *mq, int cpu, char *irq_name)
91 {
92 	int mmr_pnode = uv_blade_to_pnode(mq->mmr_blade);
93 
94 	mq->irq = uv_setup_irq(irq_name, cpu, mq->mmr_blade, mq->mmr_offset,
95 			UV_AFFINITY_CPU);
96 	if (mq->irq < 0)
97 		return mq->irq;
98 
99 	mq->mmr_value = uv_read_global_mmr64(mmr_pnode, mq->mmr_offset);
100 
101 	return 0;
102 }
103 
104 static void
105 xpc_release_gru_mq_irq_uv(struct xpc_gru_mq_uv *mq)
106 {
107 	uv_teardown_irq(mq->irq);
108 }
109 
110 static int
111 xpc_gru_mq_watchlist_alloc_uv(struct xpc_gru_mq_uv *mq)
112 {
113 	int ret;
114 
115 	ret = uv_bios_mq_watchlist_alloc(uv_gpa(mq->address),
116 					 mq->order, &mq->mmr_offset);
117 	if (ret < 0) {
118 		dev_err(xpc_part, "uv_bios_mq_watchlist_alloc() failed, "
119 			"ret=%d\n", ret);
120 		return ret;
121 	}
122 
123 	mq->watchlist_num = ret;
124 	return 0;
125 }
126 
127 static void
128 xpc_gru_mq_watchlist_free_uv(struct xpc_gru_mq_uv *mq)
129 {
130 	int ret;
131 	int mmr_pnode = uv_blade_to_pnode(mq->mmr_blade);
132 
133 	ret = uv_bios_mq_watchlist_free(mmr_pnode, mq->watchlist_num);
134 	BUG_ON(ret != BIOS_STATUS_SUCCESS);
135 }
136 
137 static struct xpc_gru_mq_uv *
138 xpc_create_gru_mq_uv(unsigned int mq_size, int cpu, char *irq_name,
139 		     irq_handler_t irq_handler)
140 {
141 	enum xp_retval xp_ret;
142 	int ret;
143 	int nid;
144 	int nasid;
145 	int pg_order;
146 	struct page *page;
147 	struct xpc_gru_mq_uv *mq;
148 	struct uv_IO_APIC_route_entry *mmr_value;
149 
150 	mq = kmalloc(sizeof(struct xpc_gru_mq_uv), GFP_KERNEL);
151 	if (mq == NULL) {
152 		dev_err(xpc_part, "xpc_create_gru_mq_uv() failed to kmalloc() "
153 			"a xpc_gru_mq_uv structure\n");
154 		ret = -ENOMEM;
155 		goto out_0;
156 	}
157 
158 	mq->gru_mq_desc = kzalloc(sizeof(struct gru_message_queue_desc),
159 				  GFP_KERNEL);
160 	if (mq->gru_mq_desc == NULL) {
161 		dev_err(xpc_part, "xpc_create_gru_mq_uv() failed to kmalloc() "
162 			"a gru_message_queue_desc structure\n");
163 		ret = -ENOMEM;
164 		goto out_1;
165 	}
166 
167 	pg_order = get_order(mq_size);
168 	mq->order = pg_order + PAGE_SHIFT;
169 	mq_size = 1UL << mq->order;
170 
171 	mq->mmr_blade = uv_cpu_to_blade_id(cpu);
172 
173 	nid = cpu_to_node(cpu);
174 	page = __alloc_pages_node(nid,
175 				      GFP_KERNEL | __GFP_ZERO | __GFP_THISNODE,
176 				      pg_order);
177 	if (page == NULL) {
178 		dev_err(xpc_part, "xpc_create_gru_mq_uv() failed to alloc %d "
179 			"bytes of memory on nid=%d for GRU mq\n", mq_size, nid);
180 		ret = -ENOMEM;
181 		goto out_2;
182 	}
183 	mq->address = page_address(page);
184 
185 	/* enable generation of irq when GRU mq operation occurs to this mq */
186 	ret = xpc_gru_mq_watchlist_alloc_uv(mq);
187 	if (ret != 0)
188 		goto out_3;
189 
190 	ret = xpc_get_gru_mq_irq_uv(mq, cpu, irq_name);
191 	if (ret != 0)
192 		goto out_4;
193 
194 	ret = request_irq(mq->irq, irq_handler, 0, irq_name, NULL);
195 	if (ret != 0) {
196 		dev_err(xpc_part, "request_irq(irq=%d) returned error=%d\n",
197 			mq->irq, -ret);
198 		goto out_5;
199 	}
200 
201 	nasid = UV_PNODE_TO_NASID(uv_cpu_to_pnode(cpu));
202 
203 	mmr_value = (struct uv_IO_APIC_route_entry *)&mq->mmr_value;
204 	ret = gru_create_message_queue(mq->gru_mq_desc, mq->address, mq_size,
205 				     nasid, mmr_value->vector, mmr_value->dest);
206 	if (ret != 0) {
207 		dev_err(xpc_part, "gru_create_message_queue() returned "
208 			"error=%d\n", ret);
209 		ret = -EINVAL;
210 		goto out_6;
211 	}
212 
213 	/* allow other partitions to access this GRU mq */
214 	xp_ret = xp_expand_memprotect(xp_pa(mq->address), mq_size);
215 	if (xp_ret != xpSuccess) {
216 		ret = -EACCES;
217 		goto out_6;
218 	}
219 
220 	return mq;
221 
222 	/* something went wrong */
223 out_6:
224 	free_irq(mq->irq, NULL);
225 out_5:
226 	xpc_release_gru_mq_irq_uv(mq);
227 out_4:
228 	xpc_gru_mq_watchlist_free_uv(mq);
229 out_3:
230 	free_pages((unsigned long)mq->address, pg_order);
231 out_2:
232 	kfree(mq->gru_mq_desc);
233 out_1:
234 	kfree(mq);
235 out_0:
236 	return ERR_PTR(ret);
237 }
238 
239 static void
240 xpc_destroy_gru_mq_uv(struct xpc_gru_mq_uv *mq)
241 {
242 	unsigned int mq_size;
243 	int pg_order;
244 	int ret;
245 
246 	/* disallow other partitions to access GRU mq */
247 	mq_size = 1UL << mq->order;
248 	ret = xp_restrict_memprotect(xp_pa(mq->address), mq_size);
249 	BUG_ON(ret != xpSuccess);
250 
251 	/* unregister irq handler and release mq irq/vector mapping */
252 	free_irq(mq->irq, NULL);
253 	xpc_release_gru_mq_irq_uv(mq);
254 
255 	/* disable generation of irq when GRU mq op occurs to this mq */
256 	xpc_gru_mq_watchlist_free_uv(mq);
257 
258 	pg_order = mq->order - PAGE_SHIFT;
259 	free_pages((unsigned long)mq->address, pg_order);
260 
261 	kfree(mq);
262 }
263 
264 static enum xp_retval
265 xpc_send_gru_msg(struct gru_message_queue_desc *gru_mq_desc, void *msg,
266 		 size_t msg_size)
267 {
268 	enum xp_retval xp_ret;
269 	int ret;
270 
271 	while (1) {
272 		ret = gru_send_message_gpa(gru_mq_desc, msg, msg_size);
273 		if (ret == MQE_OK) {
274 			xp_ret = xpSuccess;
275 			break;
276 		}
277 
278 		if (ret == MQE_QUEUE_FULL) {
279 			dev_dbg(xpc_chan, "gru_send_message_gpa() returned "
280 				"error=MQE_QUEUE_FULL\n");
281 			/* !!! handle QLimit reached; delay & try again */
282 			/* ??? Do we add a limit to the number of retries? */
283 			(void)msleep_interruptible(10);
284 		} else if (ret == MQE_CONGESTION) {
285 			dev_dbg(xpc_chan, "gru_send_message_gpa() returned "
286 				"error=MQE_CONGESTION\n");
287 			/* !!! handle LB Overflow; simply try again */
288 			/* ??? Do we add a limit to the number of retries? */
289 		} else {
290 			/* !!! Currently this is MQE_UNEXPECTED_CB_ERR */
291 			dev_err(xpc_chan, "gru_send_message_gpa() returned "
292 				"error=%d\n", ret);
293 			xp_ret = xpGruSendMqError;
294 			break;
295 		}
296 	}
297 	return xp_ret;
298 }
299 
300 static void
301 xpc_process_activate_IRQ_rcvd_uv(void)
302 {
303 	unsigned long irq_flags;
304 	short partid;
305 	struct xpc_partition *part;
306 	u8 act_state_req;
307 
308 	DBUG_ON(xpc_activate_IRQ_rcvd == 0);
309 
310 	spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
311 	for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
312 		part = &xpc_partitions[partid];
313 
314 		if (part->sn.uv.act_state_req == 0)
315 			continue;
316 
317 		xpc_activate_IRQ_rcvd--;
318 		BUG_ON(xpc_activate_IRQ_rcvd < 0);
319 
320 		act_state_req = part->sn.uv.act_state_req;
321 		part->sn.uv.act_state_req = 0;
322 		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
323 
324 		if (act_state_req == XPC_P_ASR_ACTIVATE_UV) {
325 			if (part->act_state == XPC_P_AS_INACTIVE)
326 				xpc_activate_partition(part);
327 			else if (part->act_state == XPC_P_AS_DEACTIVATING)
328 				XPC_DEACTIVATE_PARTITION(part, xpReactivating);
329 
330 		} else if (act_state_req == XPC_P_ASR_REACTIVATE_UV) {
331 			if (part->act_state == XPC_P_AS_INACTIVE)
332 				xpc_activate_partition(part);
333 			else
334 				XPC_DEACTIVATE_PARTITION(part, xpReactivating);
335 
336 		} else if (act_state_req == XPC_P_ASR_DEACTIVATE_UV) {
337 			XPC_DEACTIVATE_PARTITION(part, part->sn.uv.reason);
338 
339 		} else {
340 			BUG();
341 		}
342 
343 		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
344 		if (xpc_activate_IRQ_rcvd == 0)
345 			break;
346 	}
347 	spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
348 
349 }
350 
351 static void
352 xpc_handle_activate_mq_msg_uv(struct xpc_partition *part,
353 			      struct xpc_activate_mq_msghdr_uv *msg_hdr,
354 			      int part_setup,
355 			      int *wakeup_hb_checker)
356 {
357 	unsigned long irq_flags;
358 	struct xpc_partition_uv *part_uv = &part->sn.uv;
359 	struct xpc_openclose_args *args;
360 
361 	part_uv->remote_act_state = msg_hdr->act_state;
362 
363 	switch (msg_hdr->type) {
364 	case XPC_ACTIVATE_MQ_MSG_SYNC_ACT_STATE_UV:
365 		/* syncing of remote_act_state was just done above */
366 		break;
367 
368 	case XPC_ACTIVATE_MQ_MSG_ACTIVATE_REQ_UV: {
369 		struct xpc_activate_mq_msg_activate_req_uv *msg;
370 
371 		/*
372 		 * ??? Do we deal here with ts_jiffies being different
373 		 * ??? if act_state != XPC_P_AS_INACTIVE instead of
374 		 * ??? below?
375 		 */
376 		msg = container_of(msg_hdr, struct
377 				   xpc_activate_mq_msg_activate_req_uv, hdr);
378 
379 		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
380 		if (part_uv->act_state_req == 0)
381 			xpc_activate_IRQ_rcvd++;
382 		part_uv->act_state_req = XPC_P_ASR_ACTIVATE_UV;
383 		part->remote_rp_pa = msg->rp_gpa; /* !!! _pa is _gpa */
384 		part->remote_rp_ts_jiffies = msg_hdr->rp_ts_jiffies;
385 		part_uv->heartbeat_gpa = msg->heartbeat_gpa;
386 
387 		if (msg->activate_gru_mq_desc_gpa !=
388 		    part_uv->activate_gru_mq_desc_gpa) {
389 			spin_lock(&part_uv->flags_lock);
390 			part_uv->flags &= ~XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV;
391 			spin_unlock(&part_uv->flags_lock);
392 			part_uv->activate_gru_mq_desc_gpa =
393 			    msg->activate_gru_mq_desc_gpa;
394 		}
395 		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
396 
397 		(*wakeup_hb_checker)++;
398 		break;
399 	}
400 	case XPC_ACTIVATE_MQ_MSG_DEACTIVATE_REQ_UV: {
401 		struct xpc_activate_mq_msg_deactivate_req_uv *msg;
402 
403 		msg = container_of(msg_hdr, struct
404 				   xpc_activate_mq_msg_deactivate_req_uv, hdr);
405 
406 		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
407 		if (part_uv->act_state_req == 0)
408 			xpc_activate_IRQ_rcvd++;
409 		part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
410 		part_uv->reason = msg->reason;
411 		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
412 
413 		(*wakeup_hb_checker)++;
414 		return;
415 	}
416 	case XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREQUEST_UV: {
417 		struct xpc_activate_mq_msg_chctl_closerequest_uv *msg;
418 
419 		if (!part_setup)
420 			break;
421 
422 		msg = container_of(msg_hdr, struct
423 				   xpc_activate_mq_msg_chctl_closerequest_uv,
424 				   hdr);
425 		args = &part->remote_openclose_args[msg->ch_number];
426 		args->reason = msg->reason;
427 
428 		spin_lock_irqsave(&part->chctl_lock, irq_flags);
429 		part->chctl.flags[msg->ch_number] |= XPC_CHCTL_CLOSEREQUEST;
430 		spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
431 
432 		xpc_wakeup_channel_mgr(part);
433 		break;
434 	}
435 	case XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREPLY_UV: {
436 		struct xpc_activate_mq_msg_chctl_closereply_uv *msg;
437 
438 		if (!part_setup)
439 			break;
440 
441 		msg = container_of(msg_hdr, struct
442 				   xpc_activate_mq_msg_chctl_closereply_uv,
443 				   hdr);
444 
445 		spin_lock_irqsave(&part->chctl_lock, irq_flags);
446 		part->chctl.flags[msg->ch_number] |= XPC_CHCTL_CLOSEREPLY;
447 		spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
448 
449 		xpc_wakeup_channel_mgr(part);
450 		break;
451 	}
452 	case XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREQUEST_UV: {
453 		struct xpc_activate_mq_msg_chctl_openrequest_uv *msg;
454 
455 		if (!part_setup)
456 			break;
457 
458 		msg = container_of(msg_hdr, struct
459 				   xpc_activate_mq_msg_chctl_openrequest_uv,
460 				   hdr);
461 		args = &part->remote_openclose_args[msg->ch_number];
462 		args->entry_size = msg->entry_size;
463 		args->local_nentries = msg->local_nentries;
464 
465 		spin_lock_irqsave(&part->chctl_lock, irq_flags);
466 		part->chctl.flags[msg->ch_number] |= XPC_CHCTL_OPENREQUEST;
467 		spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
468 
469 		xpc_wakeup_channel_mgr(part);
470 		break;
471 	}
472 	case XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREPLY_UV: {
473 		struct xpc_activate_mq_msg_chctl_openreply_uv *msg;
474 
475 		if (!part_setup)
476 			break;
477 
478 		msg = container_of(msg_hdr, struct
479 				   xpc_activate_mq_msg_chctl_openreply_uv, hdr);
480 		args = &part->remote_openclose_args[msg->ch_number];
481 		args->remote_nentries = msg->remote_nentries;
482 		args->local_nentries = msg->local_nentries;
483 		args->local_msgqueue_pa = msg->notify_gru_mq_desc_gpa;
484 
485 		spin_lock_irqsave(&part->chctl_lock, irq_flags);
486 		part->chctl.flags[msg->ch_number] |= XPC_CHCTL_OPENREPLY;
487 		spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
488 
489 		xpc_wakeup_channel_mgr(part);
490 		break;
491 	}
492 	case XPC_ACTIVATE_MQ_MSG_CHCTL_OPENCOMPLETE_UV: {
493 		struct xpc_activate_mq_msg_chctl_opencomplete_uv *msg;
494 
495 		if (!part_setup)
496 			break;
497 
498 		msg = container_of(msg_hdr, struct
499 				xpc_activate_mq_msg_chctl_opencomplete_uv, hdr);
500 		spin_lock_irqsave(&part->chctl_lock, irq_flags);
501 		part->chctl.flags[msg->ch_number] |= XPC_CHCTL_OPENCOMPLETE;
502 		spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
503 
504 		xpc_wakeup_channel_mgr(part);
505 	}
506 		fallthrough;
507 	case XPC_ACTIVATE_MQ_MSG_MARK_ENGAGED_UV:
508 		spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
509 		part_uv->flags |= XPC_P_ENGAGED_UV;
510 		spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
511 		break;
512 
513 	case XPC_ACTIVATE_MQ_MSG_MARK_DISENGAGED_UV:
514 		spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
515 		part_uv->flags &= ~XPC_P_ENGAGED_UV;
516 		spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
517 		break;
518 
519 	default:
520 		dev_err(xpc_part, "received unknown activate_mq msg type=%d "
521 			"from partition=%d\n", msg_hdr->type, XPC_PARTID(part));
522 
523 		/* get hb checker to deactivate from the remote partition */
524 		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
525 		if (part_uv->act_state_req == 0)
526 			xpc_activate_IRQ_rcvd++;
527 		part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
528 		part_uv->reason = xpBadMsgType;
529 		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
530 
531 		(*wakeup_hb_checker)++;
532 		return;
533 	}
534 
535 	if (msg_hdr->rp_ts_jiffies != part->remote_rp_ts_jiffies &&
536 	    part->remote_rp_ts_jiffies != 0) {
537 		/*
538 		 * ??? Does what we do here need to be sensitive to
539 		 * ??? act_state or remote_act_state?
540 		 */
541 		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
542 		if (part_uv->act_state_req == 0)
543 			xpc_activate_IRQ_rcvd++;
544 		part_uv->act_state_req = XPC_P_ASR_REACTIVATE_UV;
545 		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
546 
547 		(*wakeup_hb_checker)++;
548 	}
549 }
550 
551 static irqreturn_t
552 xpc_handle_activate_IRQ_uv(int irq, void *dev_id)
553 {
554 	struct xpc_activate_mq_msghdr_uv *msg_hdr;
555 	short partid;
556 	struct xpc_partition *part;
557 	int wakeup_hb_checker = 0;
558 	int part_referenced;
559 
560 	while (1) {
561 		msg_hdr = gru_get_next_message(xpc_activate_mq_uv->gru_mq_desc);
562 		if (msg_hdr == NULL)
563 			break;
564 
565 		partid = msg_hdr->partid;
566 		if (partid < 0 || partid >= XP_MAX_NPARTITIONS_UV) {
567 			dev_err(xpc_part, "xpc_handle_activate_IRQ_uv() "
568 				"received invalid partid=0x%x in message\n",
569 				partid);
570 		} else {
571 			part = &xpc_partitions[partid];
572 
573 			part_referenced = xpc_part_ref(part);
574 			xpc_handle_activate_mq_msg_uv(part, msg_hdr,
575 						      part_referenced,
576 						      &wakeup_hb_checker);
577 			if (part_referenced)
578 				xpc_part_deref(part);
579 		}
580 
581 		gru_free_message(xpc_activate_mq_uv->gru_mq_desc, msg_hdr);
582 	}
583 
584 	if (wakeup_hb_checker)
585 		wake_up_interruptible(&xpc_activate_IRQ_wq);
586 
587 	return IRQ_HANDLED;
588 }
589 
590 static enum xp_retval
591 xpc_cache_remote_gru_mq_desc_uv(struct gru_message_queue_desc *gru_mq_desc,
592 				unsigned long gru_mq_desc_gpa)
593 {
594 	enum xp_retval ret;
595 
596 	ret = xp_remote_memcpy(uv_gpa(gru_mq_desc), gru_mq_desc_gpa,
597 			       sizeof(struct gru_message_queue_desc));
598 	if (ret == xpSuccess)
599 		gru_mq_desc->mq = NULL;
600 
601 	return ret;
602 }
603 
604 static enum xp_retval
605 xpc_send_activate_IRQ_uv(struct xpc_partition *part, void *msg, size_t msg_size,
606 			 int msg_type)
607 {
608 	struct xpc_activate_mq_msghdr_uv *msg_hdr = msg;
609 	struct xpc_partition_uv *part_uv = &part->sn.uv;
610 	struct gru_message_queue_desc *gru_mq_desc;
611 	unsigned long irq_flags;
612 	enum xp_retval ret;
613 
614 	DBUG_ON(msg_size > XPC_ACTIVATE_MSG_SIZE_UV);
615 
616 	msg_hdr->type = msg_type;
617 	msg_hdr->partid = xp_partition_id;
618 	msg_hdr->act_state = part->act_state;
619 	msg_hdr->rp_ts_jiffies = xpc_rsvd_page->ts_jiffies;
620 
621 	mutex_lock(&part_uv->cached_activate_gru_mq_desc_mutex);
622 again:
623 	if (!(part_uv->flags & XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV)) {
624 		gru_mq_desc = part_uv->cached_activate_gru_mq_desc;
625 		if (gru_mq_desc == NULL) {
626 			gru_mq_desc = kmalloc(sizeof(struct
627 					      gru_message_queue_desc),
628 					      GFP_ATOMIC);
629 			if (gru_mq_desc == NULL) {
630 				ret = xpNoMemory;
631 				goto done;
632 			}
633 			part_uv->cached_activate_gru_mq_desc = gru_mq_desc;
634 		}
635 
636 		ret = xpc_cache_remote_gru_mq_desc_uv(gru_mq_desc,
637 						      part_uv->
638 						      activate_gru_mq_desc_gpa);
639 		if (ret != xpSuccess)
640 			goto done;
641 
642 		spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
643 		part_uv->flags |= XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV;
644 		spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
645 	}
646 
647 	/* ??? Is holding a spin_lock (ch->lock) during this call a bad idea? */
648 	ret = xpc_send_gru_msg(part_uv->cached_activate_gru_mq_desc, msg,
649 			       msg_size);
650 	if (ret != xpSuccess) {
651 		smp_rmb();	/* ensure a fresh copy of part_uv->flags */
652 		if (!(part_uv->flags & XPC_P_CACHED_ACTIVATE_GRU_MQ_DESC_UV))
653 			goto again;
654 	}
655 done:
656 	mutex_unlock(&part_uv->cached_activate_gru_mq_desc_mutex);
657 	return ret;
658 }
659 
660 static void
661 xpc_send_activate_IRQ_part_uv(struct xpc_partition *part, void *msg,
662 			      size_t msg_size, int msg_type)
663 {
664 	enum xp_retval ret;
665 
666 	ret = xpc_send_activate_IRQ_uv(part, msg, msg_size, msg_type);
667 	if (unlikely(ret != xpSuccess))
668 		XPC_DEACTIVATE_PARTITION(part, ret);
669 }
670 
671 static void
672 xpc_send_activate_IRQ_ch_uv(struct xpc_channel *ch, unsigned long *irq_flags,
673 			 void *msg, size_t msg_size, int msg_type)
674 {
675 	struct xpc_partition *part = &xpc_partitions[ch->partid];
676 	enum xp_retval ret;
677 
678 	ret = xpc_send_activate_IRQ_uv(part, msg, msg_size, msg_type);
679 	if (unlikely(ret != xpSuccess)) {
680 		if (irq_flags != NULL)
681 			spin_unlock_irqrestore(&ch->lock, *irq_flags);
682 
683 		XPC_DEACTIVATE_PARTITION(part, ret);
684 
685 		if (irq_flags != NULL)
686 			spin_lock_irqsave(&ch->lock, *irq_flags);
687 	}
688 }
689 
690 static void
691 xpc_send_local_activate_IRQ_uv(struct xpc_partition *part, int act_state_req)
692 {
693 	unsigned long irq_flags;
694 	struct xpc_partition_uv *part_uv = &part->sn.uv;
695 
696 	/*
697 	 * !!! Make our side think that the remote partition sent an activate
698 	 * !!! mq message our way by doing what the activate IRQ handler would
699 	 * !!! do had one really been sent.
700 	 */
701 
702 	spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
703 	if (part_uv->act_state_req == 0)
704 		xpc_activate_IRQ_rcvd++;
705 	part_uv->act_state_req = act_state_req;
706 	spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
707 
708 	wake_up_interruptible(&xpc_activate_IRQ_wq);
709 }
710 
711 static enum xp_retval
712 xpc_get_partition_rsvd_page_pa_uv(void *buf, u64 *cookie, unsigned long *rp_pa,
713 				  size_t *len)
714 {
715 	s64 status;
716 	enum xp_retval ret;
717 
718 	status = uv_bios_reserved_page_pa((u64)buf, cookie, (u64 *)rp_pa,
719 					  (u64 *)len);
720 	if (status == BIOS_STATUS_SUCCESS)
721 		ret = xpSuccess;
722 	else if (status == BIOS_STATUS_MORE_PASSES)
723 		ret = xpNeedMoreInfo;
724 	else
725 		ret = xpBiosError;
726 
727 	return ret;
728 }
729 
730 static int
731 xpc_setup_rsvd_page_uv(struct xpc_rsvd_page *rp)
732 {
733 	xpc_heartbeat_uv =
734 	    &xpc_partitions[sn_partition_id].sn.uv.cached_heartbeat;
735 	rp->sn.uv.heartbeat_gpa = uv_gpa(xpc_heartbeat_uv);
736 	rp->sn.uv.activate_gru_mq_desc_gpa =
737 	    uv_gpa(xpc_activate_mq_uv->gru_mq_desc);
738 	return 0;
739 }
740 
741 static void
742 xpc_allow_hb_uv(short partid)
743 {
744 }
745 
746 static void
747 xpc_disallow_hb_uv(short partid)
748 {
749 }
750 
751 static void
752 xpc_disallow_all_hbs_uv(void)
753 {
754 }
755 
756 static void
757 xpc_increment_heartbeat_uv(void)
758 {
759 	xpc_heartbeat_uv->value++;
760 }
761 
762 static void
763 xpc_offline_heartbeat_uv(void)
764 {
765 	xpc_increment_heartbeat_uv();
766 	xpc_heartbeat_uv->offline = 1;
767 }
768 
769 static void
770 xpc_online_heartbeat_uv(void)
771 {
772 	xpc_increment_heartbeat_uv();
773 	xpc_heartbeat_uv->offline = 0;
774 }
775 
776 static void
777 xpc_heartbeat_init_uv(void)
778 {
779 	xpc_heartbeat_uv->value = 1;
780 	xpc_heartbeat_uv->offline = 0;
781 }
782 
783 static void
784 xpc_heartbeat_exit_uv(void)
785 {
786 	xpc_offline_heartbeat_uv();
787 }
788 
789 static enum xp_retval
790 xpc_get_remote_heartbeat_uv(struct xpc_partition *part)
791 {
792 	struct xpc_partition_uv *part_uv = &part->sn.uv;
793 	enum xp_retval ret;
794 
795 	ret = xp_remote_memcpy(uv_gpa(&part_uv->cached_heartbeat),
796 			       part_uv->heartbeat_gpa,
797 			       sizeof(struct xpc_heartbeat_uv));
798 	if (ret != xpSuccess)
799 		return ret;
800 
801 	if (part_uv->cached_heartbeat.value == part->last_heartbeat &&
802 	    !part_uv->cached_heartbeat.offline) {
803 
804 		ret = xpNoHeartbeat;
805 	} else {
806 		part->last_heartbeat = part_uv->cached_heartbeat.value;
807 	}
808 	return ret;
809 }
810 
811 static void
812 xpc_request_partition_activation_uv(struct xpc_rsvd_page *remote_rp,
813 				    unsigned long remote_rp_gpa, int nasid)
814 {
815 	short partid = remote_rp->SAL_partid;
816 	struct xpc_partition *part = &xpc_partitions[partid];
817 	struct xpc_activate_mq_msg_activate_req_uv msg;
818 
819 	part->remote_rp_pa = remote_rp_gpa; /* !!! _pa here is really _gpa */
820 	part->remote_rp_ts_jiffies = remote_rp->ts_jiffies;
821 	part->sn.uv.heartbeat_gpa = remote_rp->sn.uv.heartbeat_gpa;
822 	part->sn.uv.activate_gru_mq_desc_gpa =
823 	    remote_rp->sn.uv.activate_gru_mq_desc_gpa;
824 
825 	/*
826 	 * ??? Is it a good idea to make this conditional on what is
827 	 * ??? potentially stale state information?
828 	 */
829 	if (part->sn.uv.remote_act_state == XPC_P_AS_INACTIVE) {
830 		msg.rp_gpa = uv_gpa(xpc_rsvd_page);
831 		msg.heartbeat_gpa = xpc_rsvd_page->sn.uv.heartbeat_gpa;
832 		msg.activate_gru_mq_desc_gpa =
833 		    xpc_rsvd_page->sn.uv.activate_gru_mq_desc_gpa;
834 		xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
835 					   XPC_ACTIVATE_MQ_MSG_ACTIVATE_REQ_UV);
836 	}
837 
838 	if (part->act_state == XPC_P_AS_INACTIVE)
839 		xpc_send_local_activate_IRQ_uv(part, XPC_P_ASR_ACTIVATE_UV);
840 }
841 
842 static void
843 xpc_request_partition_reactivation_uv(struct xpc_partition *part)
844 {
845 	xpc_send_local_activate_IRQ_uv(part, XPC_P_ASR_ACTIVATE_UV);
846 }
847 
848 static void
849 xpc_request_partition_deactivation_uv(struct xpc_partition *part)
850 {
851 	struct xpc_activate_mq_msg_deactivate_req_uv msg;
852 
853 	/*
854 	 * ??? Is it a good idea to make this conditional on what is
855 	 * ??? potentially stale state information?
856 	 */
857 	if (part->sn.uv.remote_act_state != XPC_P_AS_DEACTIVATING &&
858 	    part->sn.uv.remote_act_state != XPC_P_AS_INACTIVE) {
859 
860 		msg.reason = part->reason;
861 		xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
862 					 XPC_ACTIVATE_MQ_MSG_DEACTIVATE_REQ_UV);
863 	}
864 }
865 
866 static void
867 xpc_cancel_partition_deactivation_request_uv(struct xpc_partition *part)
868 {
869 	/* nothing needs to be done */
870 	return;
871 }
872 
873 static void
874 xpc_init_fifo_uv(struct xpc_fifo_head_uv *head)
875 {
876 	head->first = NULL;
877 	head->last = NULL;
878 	spin_lock_init(&head->lock);
879 	head->n_entries = 0;
880 }
881 
882 static void *
883 xpc_get_fifo_entry_uv(struct xpc_fifo_head_uv *head)
884 {
885 	unsigned long irq_flags;
886 	struct xpc_fifo_entry_uv *first;
887 
888 	spin_lock_irqsave(&head->lock, irq_flags);
889 	first = head->first;
890 	if (head->first != NULL) {
891 		head->first = first->next;
892 		if (head->first == NULL)
893 			head->last = NULL;
894 
895 		head->n_entries--;
896 		BUG_ON(head->n_entries < 0);
897 
898 		first->next = NULL;
899 	}
900 	spin_unlock_irqrestore(&head->lock, irq_flags);
901 	return first;
902 }
903 
904 static void
905 xpc_put_fifo_entry_uv(struct xpc_fifo_head_uv *head,
906 		      struct xpc_fifo_entry_uv *last)
907 {
908 	unsigned long irq_flags;
909 
910 	last->next = NULL;
911 	spin_lock_irqsave(&head->lock, irq_flags);
912 	if (head->last != NULL)
913 		head->last->next = last;
914 	else
915 		head->first = last;
916 	head->last = last;
917 	head->n_entries++;
918 	spin_unlock_irqrestore(&head->lock, irq_flags);
919 }
920 
921 static int
922 xpc_n_of_fifo_entries_uv(struct xpc_fifo_head_uv *head)
923 {
924 	return head->n_entries;
925 }
926 
927 /*
928  * Setup the channel structures that are uv specific.
929  */
930 static enum xp_retval
931 xpc_setup_ch_structures_uv(struct xpc_partition *part)
932 {
933 	struct xpc_channel_uv *ch_uv;
934 	int ch_number;
935 
936 	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
937 		ch_uv = &part->channels[ch_number].sn.uv;
938 
939 		xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
940 		xpc_init_fifo_uv(&ch_uv->recv_msg_list);
941 	}
942 
943 	return xpSuccess;
944 }
945 
946 /*
947  * Teardown the channel structures that are uv specific.
948  */
949 static void
950 xpc_teardown_ch_structures_uv(struct xpc_partition *part)
951 {
952 	/* nothing needs to be done */
953 	return;
954 }
955 
956 static enum xp_retval
957 xpc_make_first_contact_uv(struct xpc_partition *part)
958 {
959 	struct xpc_activate_mq_msg_uv msg;
960 
961 	/*
962 	 * We send a sync msg to get the remote partition's remote_act_state
963 	 * updated to our current act_state which at this point should
964 	 * be XPC_P_AS_ACTIVATING.
965 	 */
966 	xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
967 				      XPC_ACTIVATE_MQ_MSG_SYNC_ACT_STATE_UV);
968 
969 	while (!((part->sn.uv.remote_act_state == XPC_P_AS_ACTIVATING) ||
970 		 (part->sn.uv.remote_act_state == XPC_P_AS_ACTIVE))) {
971 
972 		dev_dbg(xpc_part, "waiting to make first contact with "
973 			"partition %d\n", XPC_PARTID(part));
974 
975 		/* wait a 1/4 of a second or so */
976 		(void)msleep_interruptible(250);
977 
978 		if (part->act_state == XPC_P_AS_DEACTIVATING)
979 			return part->reason;
980 	}
981 
982 	return xpSuccess;
983 }
984 
985 static u64
986 xpc_get_chctl_all_flags_uv(struct xpc_partition *part)
987 {
988 	unsigned long irq_flags;
989 	union xpc_channel_ctl_flags chctl;
990 
991 	spin_lock_irqsave(&part->chctl_lock, irq_flags);
992 	chctl = part->chctl;
993 	if (chctl.all_flags != 0)
994 		part->chctl.all_flags = 0;
995 
996 	spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
997 	return chctl.all_flags;
998 }
999 
1000 static enum xp_retval
1001 xpc_allocate_send_msg_slot_uv(struct xpc_channel *ch)
1002 {
1003 	struct xpc_channel_uv *ch_uv = &ch->sn.uv;
1004 	struct xpc_send_msg_slot_uv *msg_slot;
1005 	unsigned long irq_flags;
1006 	int nentries;
1007 	int entry;
1008 	size_t nbytes;
1009 
1010 	for (nentries = ch->local_nentries; nentries > 0; nentries--) {
1011 		nbytes = nentries * sizeof(struct xpc_send_msg_slot_uv);
1012 		ch_uv->send_msg_slots = kzalloc(nbytes, GFP_KERNEL);
1013 		if (ch_uv->send_msg_slots == NULL)
1014 			continue;
1015 
1016 		for (entry = 0; entry < nentries; entry++) {
1017 			msg_slot = &ch_uv->send_msg_slots[entry];
1018 
1019 			msg_slot->msg_slot_number = entry;
1020 			xpc_put_fifo_entry_uv(&ch_uv->msg_slot_free_list,
1021 					      &msg_slot->next);
1022 		}
1023 
1024 		spin_lock_irqsave(&ch->lock, irq_flags);
1025 		if (nentries < ch->local_nentries)
1026 			ch->local_nentries = nentries;
1027 		spin_unlock_irqrestore(&ch->lock, irq_flags);
1028 		return xpSuccess;
1029 	}
1030 
1031 	return xpNoMemory;
1032 }
1033 
1034 static enum xp_retval
1035 xpc_allocate_recv_msg_slot_uv(struct xpc_channel *ch)
1036 {
1037 	struct xpc_channel_uv *ch_uv = &ch->sn.uv;
1038 	struct xpc_notify_mq_msg_uv *msg_slot;
1039 	unsigned long irq_flags;
1040 	int nentries;
1041 	int entry;
1042 	size_t nbytes;
1043 
1044 	for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
1045 		nbytes = nentries * ch->entry_size;
1046 		ch_uv->recv_msg_slots = kzalloc(nbytes, GFP_KERNEL);
1047 		if (ch_uv->recv_msg_slots == NULL)
1048 			continue;
1049 
1050 		for (entry = 0; entry < nentries; entry++) {
1051 			msg_slot = ch_uv->recv_msg_slots +
1052 			    entry * ch->entry_size;
1053 
1054 			msg_slot->hdr.msg_slot_number = entry;
1055 		}
1056 
1057 		spin_lock_irqsave(&ch->lock, irq_flags);
1058 		if (nentries < ch->remote_nentries)
1059 			ch->remote_nentries = nentries;
1060 		spin_unlock_irqrestore(&ch->lock, irq_flags);
1061 		return xpSuccess;
1062 	}
1063 
1064 	return xpNoMemory;
1065 }
1066 
1067 /*
1068  * Allocate msg_slots associated with the channel.
1069  */
1070 static enum xp_retval
1071 xpc_setup_msg_structures_uv(struct xpc_channel *ch)
1072 {
1073 	static enum xp_retval ret;
1074 	struct xpc_channel_uv *ch_uv = &ch->sn.uv;
1075 
1076 	DBUG_ON(ch->flags & XPC_C_SETUP);
1077 
1078 	ch_uv->cached_notify_gru_mq_desc = kmalloc(sizeof(struct
1079 						   gru_message_queue_desc),
1080 						   GFP_KERNEL);
1081 	if (ch_uv->cached_notify_gru_mq_desc == NULL)
1082 		return xpNoMemory;
1083 
1084 	ret = xpc_allocate_send_msg_slot_uv(ch);
1085 	if (ret == xpSuccess) {
1086 
1087 		ret = xpc_allocate_recv_msg_slot_uv(ch);
1088 		if (ret != xpSuccess) {
1089 			kfree(ch_uv->send_msg_slots);
1090 			xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
1091 		}
1092 	}
1093 	return ret;
1094 }
1095 
1096 /*
1097  * Free up msg_slots and clear other stuff that were setup for the specified
1098  * channel.
1099  */
1100 static void
1101 xpc_teardown_msg_structures_uv(struct xpc_channel *ch)
1102 {
1103 	struct xpc_channel_uv *ch_uv = &ch->sn.uv;
1104 
1105 	lockdep_assert_held(&ch->lock);
1106 
1107 	kfree(ch_uv->cached_notify_gru_mq_desc);
1108 	ch_uv->cached_notify_gru_mq_desc = NULL;
1109 
1110 	if (ch->flags & XPC_C_SETUP) {
1111 		xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
1112 		kfree(ch_uv->send_msg_slots);
1113 		xpc_init_fifo_uv(&ch_uv->recv_msg_list);
1114 		kfree(ch_uv->recv_msg_slots);
1115 	}
1116 }
1117 
1118 static void
1119 xpc_send_chctl_closerequest_uv(struct xpc_channel *ch, unsigned long *irq_flags)
1120 {
1121 	struct xpc_activate_mq_msg_chctl_closerequest_uv msg;
1122 
1123 	msg.ch_number = ch->number;
1124 	msg.reason = ch->reason;
1125 	xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
1126 				    XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREQUEST_UV);
1127 }
1128 
1129 static void
1130 xpc_send_chctl_closereply_uv(struct xpc_channel *ch, unsigned long *irq_flags)
1131 {
1132 	struct xpc_activate_mq_msg_chctl_closereply_uv msg;
1133 
1134 	msg.ch_number = ch->number;
1135 	xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
1136 				    XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREPLY_UV);
1137 }
1138 
1139 static void
1140 xpc_send_chctl_openrequest_uv(struct xpc_channel *ch, unsigned long *irq_flags)
1141 {
1142 	struct xpc_activate_mq_msg_chctl_openrequest_uv msg;
1143 
1144 	msg.ch_number = ch->number;
1145 	msg.entry_size = ch->entry_size;
1146 	msg.local_nentries = ch->local_nentries;
1147 	xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
1148 				    XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREQUEST_UV);
1149 }
1150 
1151 static void
1152 xpc_send_chctl_openreply_uv(struct xpc_channel *ch, unsigned long *irq_flags)
1153 {
1154 	struct xpc_activate_mq_msg_chctl_openreply_uv msg;
1155 
1156 	msg.ch_number = ch->number;
1157 	msg.local_nentries = ch->local_nentries;
1158 	msg.remote_nentries = ch->remote_nentries;
1159 	msg.notify_gru_mq_desc_gpa = uv_gpa(xpc_notify_mq_uv->gru_mq_desc);
1160 	xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
1161 				    XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREPLY_UV);
1162 }
1163 
1164 static void
1165 xpc_send_chctl_opencomplete_uv(struct xpc_channel *ch, unsigned long *irq_flags)
1166 {
1167 	struct xpc_activate_mq_msg_chctl_opencomplete_uv msg;
1168 
1169 	msg.ch_number = ch->number;
1170 	xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
1171 				    XPC_ACTIVATE_MQ_MSG_CHCTL_OPENCOMPLETE_UV);
1172 }
1173 
1174 static void
1175 xpc_send_chctl_local_msgrequest_uv(struct xpc_partition *part, int ch_number)
1176 {
1177 	unsigned long irq_flags;
1178 
1179 	spin_lock_irqsave(&part->chctl_lock, irq_flags);
1180 	part->chctl.flags[ch_number] |= XPC_CHCTL_MSGREQUEST;
1181 	spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
1182 
1183 	xpc_wakeup_channel_mgr(part);
1184 }
1185 
1186 static enum xp_retval
1187 xpc_save_remote_msgqueue_pa_uv(struct xpc_channel *ch,
1188 			       unsigned long gru_mq_desc_gpa)
1189 {
1190 	struct xpc_channel_uv *ch_uv = &ch->sn.uv;
1191 
1192 	DBUG_ON(ch_uv->cached_notify_gru_mq_desc == NULL);
1193 	return xpc_cache_remote_gru_mq_desc_uv(ch_uv->cached_notify_gru_mq_desc,
1194 					       gru_mq_desc_gpa);
1195 }
1196 
1197 static void
1198 xpc_indicate_partition_engaged_uv(struct xpc_partition *part)
1199 {
1200 	struct xpc_activate_mq_msg_uv msg;
1201 
1202 	xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
1203 				      XPC_ACTIVATE_MQ_MSG_MARK_ENGAGED_UV);
1204 }
1205 
1206 static void
1207 xpc_indicate_partition_disengaged_uv(struct xpc_partition *part)
1208 {
1209 	struct xpc_activate_mq_msg_uv msg;
1210 
1211 	xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
1212 				      XPC_ACTIVATE_MQ_MSG_MARK_DISENGAGED_UV);
1213 }
1214 
1215 static void
1216 xpc_assume_partition_disengaged_uv(short partid)
1217 {
1218 	struct xpc_partition_uv *part_uv = &xpc_partitions[partid].sn.uv;
1219 	unsigned long irq_flags;
1220 
1221 	spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
1222 	part_uv->flags &= ~XPC_P_ENGAGED_UV;
1223 	spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
1224 }
1225 
1226 static int
1227 xpc_partition_engaged_uv(short partid)
1228 {
1229 	return (xpc_partitions[partid].sn.uv.flags & XPC_P_ENGAGED_UV) != 0;
1230 }
1231 
1232 static int
1233 xpc_any_partition_engaged_uv(void)
1234 {
1235 	struct xpc_partition_uv *part_uv;
1236 	short partid;
1237 
1238 	for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
1239 		part_uv = &xpc_partitions[partid].sn.uv;
1240 		if ((part_uv->flags & XPC_P_ENGAGED_UV) != 0)
1241 			return 1;
1242 	}
1243 	return 0;
1244 }
1245 
1246 static enum xp_retval
1247 xpc_allocate_msg_slot_uv(struct xpc_channel *ch, u32 flags,
1248 			 struct xpc_send_msg_slot_uv **address_of_msg_slot)
1249 {
1250 	enum xp_retval ret;
1251 	struct xpc_send_msg_slot_uv *msg_slot;
1252 	struct xpc_fifo_entry_uv *entry;
1253 
1254 	while (1) {
1255 		entry = xpc_get_fifo_entry_uv(&ch->sn.uv.msg_slot_free_list);
1256 		if (entry != NULL)
1257 			break;
1258 
1259 		if (flags & XPC_NOWAIT)
1260 			return xpNoWait;
1261 
1262 		ret = xpc_allocate_msg_wait(ch);
1263 		if (ret != xpInterrupted && ret != xpTimeout)
1264 			return ret;
1265 	}
1266 
1267 	msg_slot = container_of(entry, struct xpc_send_msg_slot_uv, next);
1268 	*address_of_msg_slot = msg_slot;
1269 	return xpSuccess;
1270 }
1271 
1272 static void
1273 xpc_free_msg_slot_uv(struct xpc_channel *ch,
1274 		     struct xpc_send_msg_slot_uv *msg_slot)
1275 {
1276 	xpc_put_fifo_entry_uv(&ch->sn.uv.msg_slot_free_list, &msg_slot->next);
1277 
1278 	/* wakeup anyone waiting for a free msg slot */
1279 	if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
1280 		wake_up(&ch->msg_allocate_wq);
1281 }
1282 
1283 static void
1284 xpc_notify_sender_uv(struct xpc_channel *ch,
1285 		     struct xpc_send_msg_slot_uv *msg_slot,
1286 		     enum xp_retval reason)
1287 {
1288 	xpc_notify_func func = msg_slot->func;
1289 
1290 	if (func != NULL && cmpxchg(&msg_slot->func, func, NULL) == func) {
1291 
1292 		atomic_dec(&ch->n_to_notify);
1293 
1294 		dev_dbg(xpc_chan, "msg_slot->func() called, msg_slot=0x%p "
1295 			"msg_slot_number=%d partid=%d channel=%d\n", msg_slot,
1296 			msg_slot->msg_slot_number, ch->partid, ch->number);
1297 
1298 		func(reason, ch->partid, ch->number, msg_slot->key);
1299 
1300 		dev_dbg(xpc_chan, "msg_slot->func() returned, msg_slot=0x%p "
1301 			"msg_slot_number=%d partid=%d channel=%d\n", msg_slot,
1302 			msg_slot->msg_slot_number, ch->partid, ch->number);
1303 	}
1304 }
1305 
1306 static void
1307 xpc_handle_notify_mq_ack_uv(struct xpc_channel *ch,
1308 			    struct xpc_notify_mq_msg_uv *msg)
1309 {
1310 	struct xpc_send_msg_slot_uv *msg_slot;
1311 	int entry = msg->hdr.msg_slot_number % ch->local_nentries;
1312 
1313 	msg_slot = &ch->sn.uv.send_msg_slots[entry];
1314 
1315 	BUG_ON(msg_slot->msg_slot_number != msg->hdr.msg_slot_number);
1316 	msg_slot->msg_slot_number += ch->local_nentries;
1317 
1318 	if (msg_slot->func != NULL)
1319 		xpc_notify_sender_uv(ch, msg_slot, xpMsgDelivered);
1320 
1321 	xpc_free_msg_slot_uv(ch, msg_slot);
1322 }
1323 
1324 static void
1325 xpc_handle_notify_mq_msg_uv(struct xpc_partition *part,
1326 			    struct xpc_notify_mq_msg_uv *msg)
1327 {
1328 	struct xpc_partition_uv *part_uv = &part->sn.uv;
1329 	struct xpc_channel *ch;
1330 	struct xpc_channel_uv *ch_uv;
1331 	struct xpc_notify_mq_msg_uv *msg_slot;
1332 	unsigned long irq_flags;
1333 	int ch_number = msg->hdr.ch_number;
1334 
1335 	if (unlikely(ch_number >= part->nchannels)) {
1336 		dev_err(xpc_part, "xpc_handle_notify_IRQ_uv() received invalid "
1337 			"channel number=0x%x in message from partid=%d\n",
1338 			ch_number, XPC_PARTID(part));
1339 
1340 		/* get hb checker to deactivate from the remote partition */
1341 		spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
1342 		if (part_uv->act_state_req == 0)
1343 			xpc_activate_IRQ_rcvd++;
1344 		part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
1345 		part_uv->reason = xpBadChannelNumber;
1346 		spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
1347 
1348 		wake_up_interruptible(&xpc_activate_IRQ_wq);
1349 		return;
1350 	}
1351 
1352 	ch = &part->channels[ch_number];
1353 	xpc_msgqueue_ref(ch);
1354 
1355 	if (!(ch->flags & XPC_C_CONNECTED)) {
1356 		xpc_msgqueue_deref(ch);
1357 		return;
1358 	}
1359 
1360 	/* see if we're really dealing with an ACK for a previously sent msg */
1361 	if (msg->hdr.size == 0) {
1362 		xpc_handle_notify_mq_ack_uv(ch, msg);
1363 		xpc_msgqueue_deref(ch);
1364 		return;
1365 	}
1366 
1367 	/* we're dealing with a normal message sent via the notify_mq */
1368 	ch_uv = &ch->sn.uv;
1369 
1370 	msg_slot = ch_uv->recv_msg_slots +
1371 	    (msg->hdr.msg_slot_number % ch->remote_nentries) * ch->entry_size;
1372 
1373 	BUG_ON(msg_slot->hdr.size != 0);
1374 
1375 	memcpy(msg_slot, msg, msg->hdr.size);
1376 
1377 	xpc_put_fifo_entry_uv(&ch_uv->recv_msg_list, &msg_slot->hdr.u.next);
1378 
1379 	if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) {
1380 		/*
1381 		 * If there is an existing idle kthread get it to deliver
1382 		 * the payload, otherwise we'll have to get the channel mgr
1383 		 * for this partition to create a kthread to do the delivery.
1384 		 */
1385 		if (atomic_read(&ch->kthreads_idle) > 0)
1386 			wake_up_nr(&ch->idle_wq, 1);
1387 		else
1388 			xpc_send_chctl_local_msgrequest_uv(part, ch->number);
1389 	}
1390 	xpc_msgqueue_deref(ch);
1391 }
1392 
1393 static irqreturn_t
1394 xpc_handle_notify_IRQ_uv(int irq, void *dev_id)
1395 {
1396 	struct xpc_notify_mq_msg_uv *msg;
1397 	short partid;
1398 	struct xpc_partition *part;
1399 
1400 	while ((msg = gru_get_next_message(xpc_notify_mq_uv->gru_mq_desc)) !=
1401 	       NULL) {
1402 
1403 		partid = msg->hdr.partid;
1404 		if (partid < 0 || partid >= XP_MAX_NPARTITIONS_UV) {
1405 			dev_err(xpc_part, "xpc_handle_notify_IRQ_uv() received "
1406 				"invalid partid=0x%x in message\n", partid);
1407 		} else {
1408 			part = &xpc_partitions[partid];
1409 
1410 			if (xpc_part_ref(part)) {
1411 				xpc_handle_notify_mq_msg_uv(part, msg);
1412 				xpc_part_deref(part);
1413 			}
1414 		}
1415 
1416 		gru_free_message(xpc_notify_mq_uv->gru_mq_desc, msg);
1417 	}
1418 
1419 	return IRQ_HANDLED;
1420 }
1421 
1422 static int
1423 xpc_n_of_deliverable_payloads_uv(struct xpc_channel *ch)
1424 {
1425 	return xpc_n_of_fifo_entries_uv(&ch->sn.uv.recv_msg_list);
1426 }
1427 
1428 static void
1429 xpc_process_msg_chctl_flags_uv(struct xpc_partition *part, int ch_number)
1430 {
1431 	struct xpc_channel *ch = &part->channels[ch_number];
1432 	int ndeliverable_payloads;
1433 
1434 	xpc_msgqueue_ref(ch);
1435 
1436 	ndeliverable_payloads = xpc_n_of_deliverable_payloads_uv(ch);
1437 
1438 	if (ndeliverable_payloads > 0 &&
1439 	    (ch->flags & XPC_C_CONNECTED) &&
1440 	    (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE)) {
1441 
1442 		xpc_activate_kthreads(ch, ndeliverable_payloads);
1443 	}
1444 
1445 	xpc_msgqueue_deref(ch);
1446 }
1447 
1448 static enum xp_retval
1449 xpc_send_payload_uv(struct xpc_channel *ch, u32 flags, void *payload,
1450 		    u16 payload_size, u8 notify_type, xpc_notify_func func,
1451 		    void *key)
1452 {
1453 	enum xp_retval ret = xpSuccess;
1454 	struct xpc_send_msg_slot_uv *msg_slot = NULL;
1455 	struct xpc_notify_mq_msg_uv *msg;
1456 	u8 msg_buffer[XPC_NOTIFY_MSG_SIZE_UV];
1457 	size_t msg_size;
1458 
1459 	DBUG_ON(notify_type != XPC_N_CALL);
1460 
1461 	msg_size = sizeof(struct xpc_notify_mq_msghdr_uv) + payload_size;
1462 	if (msg_size > ch->entry_size)
1463 		return xpPayloadTooBig;
1464 
1465 	xpc_msgqueue_ref(ch);
1466 
1467 	if (ch->flags & XPC_C_DISCONNECTING) {
1468 		ret = ch->reason;
1469 		goto out_1;
1470 	}
1471 	if (!(ch->flags & XPC_C_CONNECTED)) {
1472 		ret = xpNotConnected;
1473 		goto out_1;
1474 	}
1475 
1476 	ret = xpc_allocate_msg_slot_uv(ch, flags, &msg_slot);
1477 	if (ret != xpSuccess)
1478 		goto out_1;
1479 
1480 	if (func != NULL) {
1481 		atomic_inc(&ch->n_to_notify);
1482 
1483 		msg_slot->key = key;
1484 		smp_wmb(); /* a non-NULL func must hit memory after the key */
1485 		msg_slot->func = func;
1486 
1487 		if (ch->flags & XPC_C_DISCONNECTING) {
1488 			ret = ch->reason;
1489 			goto out_2;
1490 		}
1491 	}
1492 
1493 	msg = (struct xpc_notify_mq_msg_uv *)&msg_buffer;
1494 	msg->hdr.partid = xp_partition_id;
1495 	msg->hdr.ch_number = ch->number;
1496 	msg->hdr.size = msg_size;
1497 	msg->hdr.msg_slot_number = msg_slot->msg_slot_number;
1498 	memcpy(&msg->payload, payload, payload_size);
1499 
1500 	ret = xpc_send_gru_msg(ch->sn.uv.cached_notify_gru_mq_desc, msg,
1501 			       msg_size);
1502 	if (ret == xpSuccess)
1503 		goto out_1;
1504 
1505 	XPC_DEACTIVATE_PARTITION(&xpc_partitions[ch->partid], ret);
1506 out_2:
1507 	if (func != NULL) {
1508 		/*
1509 		 * Try to NULL the msg_slot's func field. If we fail, then
1510 		 * xpc_notify_senders_of_disconnect_uv() beat us to it, in which
1511 		 * case we need to pretend we succeeded to send the message
1512 		 * since the user will get a callout for the disconnect error
1513 		 * by xpc_notify_senders_of_disconnect_uv(), and to also get an
1514 		 * error returned here will confuse them. Additionally, since
1515 		 * in this case the channel is being disconnected we don't need
1516 		 * to put the msg_slot back on the free list.
1517 		 */
1518 		if (cmpxchg(&msg_slot->func, func, NULL) != func) {
1519 			ret = xpSuccess;
1520 			goto out_1;
1521 		}
1522 
1523 		msg_slot->key = NULL;
1524 		atomic_dec(&ch->n_to_notify);
1525 	}
1526 	xpc_free_msg_slot_uv(ch, msg_slot);
1527 out_1:
1528 	xpc_msgqueue_deref(ch);
1529 	return ret;
1530 }
1531 
1532 /*
1533  * Tell the callers of xpc_send_notify() that the status of their payloads
1534  * is unknown because the channel is now disconnecting.
1535  *
1536  * We don't worry about putting these msg_slots on the free list since the
1537  * msg_slots themselves are about to be kfree'd.
1538  */
1539 static void
1540 xpc_notify_senders_of_disconnect_uv(struct xpc_channel *ch)
1541 {
1542 	struct xpc_send_msg_slot_uv *msg_slot;
1543 	int entry;
1544 
1545 	DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
1546 
1547 	for (entry = 0; entry < ch->local_nentries; entry++) {
1548 
1549 		if (atomic_read(&ch->n_to_notify) == 0)
1550 			break;
1551 
1552 		msg_slot = &ch->sn.uv.send_msg_slots[entry];
1553 		if (msg_slot->func != NULL)
1554 			xpc_notify_sender_uv(ch, msg_slot, ch->reason);
1555 	}
1556 }
1557 
1558 /*
1559  * Get the next deliverable message's payload.
1560  */
1561 static void *
1562 xpc_get_deliverable_payload_uv(struct xpc_channel *ch)
1563 {
1564 	struct xpc_fifo_entry_uv *entry;
1565 	struct xpc_notify_mq_msg_uv *msg;
1566 	void *payload = NULL;
1567 
1568 	if (!(ch->flags & XPC_C_DISCONNECTING)) {
1569 		entry = xpc_get_fifo_entry_uv(&ch->sn.uv.recv_msg_list);
1570 		if (entry != NULL) {
1571 			msg = container_of(entry, struct xpc_notify_mq_msg_uv,
1572 					   hdr.u.next);
1573 			payload = &msg->payload;
1574 		}
1575 	}
1576 	return payload;
1577 }
1578 
1579 static void
1580 xpc_received_payload_uv(struct xpc_channel *ch, void *payload)
1581 {
1582 	struct xpc_notify_mq_msg_uv *msg;
1583 	enum xp_retval ret;
1584 
1585 	msg = container_of(payload, struct xpc_notify_mq_msg_uv, payload);
1586 
1587 	/* return an ACK to the sender of this message */
1588 
1589 	msg->hdr.partid = xp_partition_id;
1590 	msg->hdr.size = 0;	/* size of zero indicates this is an ACK */
1591 
1592 	ret = xpc_send_gru_msg(ch->sn.uv.cached_notify_gru_mq_desc, msg,
1593 			       sizeof(struct xpc_notify_mq_msghdr_uv));
1594 	if (ret != xpSuccess)
1595 		XPC_DEACTIVATE_PARTITION(&xpc_partitions[ch->partid], ret);
1596 }
1597 
1598 static const struct xpc_arch_operations xpc_arch_ops_uv = {
1599 	.setup_partitions = xpc_setup_partitions_uv,
1600 	.teardown_partitions = xpc_teardown_partitions_uv,
1601 	.process_activate_IRQ_rcvd = xpc_process_activate_IRQ_rcvd_uv,
1602 	.get_partition_rsvd_page_pa = xpc_get_partition_rsvd_page_pa_uv,
1603 	.setup_rsvd_page = xpc_setup_rsvd_page_uv,
1604 
1605 	.allow_hb = xpc_allow_hb_uv,
1606 	.disallow_hb = xpc_disallow_hb_uv,
1607 	.disallow_all_hbs = xpc_disallow_all_hbs_uv,
1608 	.increment_heartbeat = xpc_increment_heartbeat_uv,
1609 	.offline_heartbeat = xpc_offline_heartbeat_uv,
1610 	.online_heartbeat = xpc_online_heartbeat_uv,
1611 	.heartbeat_init = xpc_heartbeat_init_uv,
1612 	.heartbeat_exit = xpc_heartbeat_exit_uv,
1613 	.get_remote_heartbeat = xpc_get_remote_heartbeat_uv,
1614 
1615 	.request_partition_activation =
1616 		xpc_request_partition_activation_uv,
1617 	.request_partition_reactivation =
1618 		xpc_request_partition_reactivation_uv,
1619 	.request_partition_deactivation =
1620 		xpc_request_partition_deactivation_uv,
1621 	.cancel_partition_deactivation_request =
1622 		xpc_cancel_partition_deactivation_request_uv,
1623 
1624 	.setup_ch_structures = xpc_setup_ch_structures_uv,
1625 	.teardown_ch_structures = xpc_teardown_ch_structures_uv,
1626 
1627 	.make_first_contact = xpc_make_first_contact_uv,
1628 
1629 	.get_chctl_all_flags = xpc_get_chctl_all_flags_uv,
1630 	.send_chctl_closerequest = xpc_send_chctl_closerequest_uv,
1631 	.send_chctl_closereply = xpc_send_chctl_closereply_uv,
1632 	.send_chctl_openrequest = xpc_send_chctl_openrequest_uv,
1633 	.send_chctl_openreply = xpc_send_chctl_openreply_uv,
1634 	.send_chctl_opencomplete = xpc_send_chctl_opencomplete_uv,
1635 	.process_msg_chctl_flags = xpc_process_msg_chctl_flags_uv,
1636 
1637 	.save_remote_msgqueue_pa = xpc_save_remote_msgqueue_pa_uv,
1638 
1639 	.setup_msg_structures = xpc_setup_msg_structures_uv,
1640 	.teardown_msg_structures = xpc_teardown_msg_structures_uv,
1641 
1642 	.indicate_partition_engaged = xpc_indicate_partition_engaged_uv,
1643 	.indicate_partition_disengaged = xpc_indicate_partition_disengaged_uv,
1644 	.assume_partition_disengaged = xpc_assume_partition_disengaged_uv,
1645 	.partition_engaged = xpc_partition_engaged_uv,
1646 	.any_partition_engaged = xpc_any_partition_engaged_uv,
1647 
1648 	.n_of_deliverable_payloads = xpc_n_of_deliverable_payloads_uv,
1649 	.send_payload = xpc_send_payload_uv,
1650 	.get_deliverable_payload = xpc_get_deliverable_payload_uv,
1651 	.received_payload = xpc_received_payload_uv,
1652 	.notify_senders_of_disconnect = xpc_notify_senders_of_disconnect_uv,
1653 };
1654 
1655 static int
1656 xpc_init_mq_node(int nid)
1657 {
1658 	int cpu;
1659 
1660 	cpus_read_lock();
1661 
1662 	for_each_cpu(cpu, cpumask_of_node(nid)) {
1663 		xpc_activate_mq_uv =
1664 			xpc_create_gru_mq_uv(XPC_ACTIVATE_MQ_SIZE_UV, nid,
1665 					     XPC_ACTIVATE_IRQ_NAME,
1666 					     xpc_handle_activate_IRQ_uv);
1667 		if (!IS_ERR(xpc_activate_mq_uv))
1668 			break;
1669 	}
1670 	if (IS_ERR(xpc_activate_mq_uv)) {
1671 		cpus_read_unlock();
1672 		return PTR_ERR(xpc_activate_mq_uv);
1673 	}
1674 
1675 	for_each_cpu(cpu, cpumask_of_node(nid)) {
1676 		xpc_notify_mq_uv =
1677 			xpc_create_gru_mq_uv(XPC_NOTIFY_MQ_SIZE_UV, nid,
1678 					     XPC_NOTIFY_IRQ_NAME,
1679 					     xpc_handle_notify_IRQ_uv);
1680 		if (!IS_ERR(xpc_notify_mq_uv))
1681 			break;
1682 	}
1683 	if (IS_ERR(xpc_notify_mq_uv)) {
1684 		xpc_destroy_gru_mq_uv(xpc_activate_mq_uv);
1685 		cpus_read_unlock();
1686 		return PTR_ERR(xpc_notify_mq_uv);
1687 	}
1688 
1689 	cpus_read_unlock();
1690 	return 0;
1691 }
1692 
1693 int
1694 xpc_init_uv(void)
1695 {
1696 	int nid;
1697 	int ret = 0;
1698 
1699 	xpc_arch_ops = xpc_arch_ops_uv;
1700 
1701 	if (sizeof(struct xpc_notify_mq_msghdr_uv) > XPC_MSG_HDR_MAX_SIZE) {
1702 		dev_err(xpc_part, "xpc_notify_mq_msghdr_uv is larger than %d\n",
1703 			XPC_MSG_HDR_MAX_SIZE);
1704 		return -E2BIG;
1705 	}
1706 
1707 	if (xpc_mq_node < 0)
1708 		for_each_online_node(nid) {
1709 			ret = xpc_init_mq_node(nid);
1710 
1711 			if (!ret)
1712 				break;
1713 		}
1714 	else
1715 		ret = xpc_init_mq_node(xpc_mq_node);
1716 
1717 	if (ret < 0)
1718 		dev_err(xpc_part, "xpc_init_mq_node() returned error=%d\n",
1719 			-ret);
1720 
1721 	return ret;
1722 }
1723 
1724 void
1725 xpc_exit_uv(void)
1726 {
1727 	xpc_destroy_gru_mq_uv(xpc_notify_mq_uv);
1728 	xpc_destroy_gru_mq_uv(xpc_activate_mq_uv);
1729 }
1730 
1731 module_param(xpc_mq_node, int, 0);
1732 MODULE_PARM_DESC(xpc_mq_node, "Node number on which to allocate message queues.");
1733