Merge Btrfs into fs/btrfs
[linux-2.6] / drivers / misc / sgi-xp / xpc_uv.c
1 /*
2  * This file is subject to the terms and conditions of the GNU General Public
3  * License.  See the file "COPYING" in the main directory of this archive
4  * for more details.
5  *
6  * Copyright (c) 2008 Silicon Graphics, Inc.  All Rights Reserved.
7  */
8
9 /*
10  * Cross Partition Communication (XPC) uv-based functions.
11  *
12  *     Architecture specific implementation of common functions.
13  *
14  */
15
16 #include <linux/kernel.h>
17 #include <linux/mm.h>
18 #include <linux/interrupt.h>
19 #include <linux/delay.h>
20 #include <linux/device.h>
21 #include <asm/uv/uv_hub.h>
22 #include "../sgi-gru/gru.h"
23 #include "../sgi-gru/grukservices.h"
24 #include "xpc.h"
25
26 static atomic64_t xpc_heartbeat_uv;
27 static DECLARE_BITMAP(xpc_heartbeating_to_mask_uv, XP_MAX_NPARTITIONS_UV);
28
29 #define XPC_ACTIVATE_MSG_SIZE_UV        (1 * GRU_CACHE_LINE_BYTES)
30 #define XPC_NOTIFY_MSG_SIZE_UV          (2 * GRU_CACHE_LINE_BYTES)
31
32 #define XPC_ACTIVATE_MQ_SIZE_UV (4 * XP_MAX_NPARTITIONS_UV * \
33                                  XPC_ACTIVATE_MSG_SIZE_UV)
34 #define XPC_NOTIFY_MQ_SIZE_UV   (4 * XP_MAX_NPARTITIONS_UV * \
35                                  XPC_NOTIFY_MSG_SIZE_UV)
36
37 static void *xpc_activate_mq_uv;
38 static void *xpc_notify_mq_uv;
39
40 static int
41 xpc_setup_partitions_sn_uv(void)
42 {
43         short partid;
44         struct xpc_partition_uv *part_uv;
45
46         for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
47                 part_uv = &xpc_partitions[partid].sn.uv;
48
49                 spin_lock_init(&part_uv->flags_lock);
50                 part_uv->remote_act_state = XPC_P_AS_INACTIVE;
51         }
52         return 0;
53 }
54
55 static void *
56 xpc_create_gru_mq_uv(unsigned int mq_size, int cpuid, unsigned int irq,
57                      irq_handler_t irq_handler)
58 {
59         int ret;
60         int nid;
61         int mq_order;
62         struct page *page;
63         void *mq;
64
65         nid = cpu_to_node(cpuid);
66         mq_order = get_order(mq_size);
67         page = alloc_pages_node(nid, GFP_KERNEL | __GFP_ZERO | GFP_THISNODE,
68                                 mq_order);
69         if (page == NULL) {
70                 dev_err(xpc_part, "xpc_create_gru_mq_uv() failed to alloc %d "
71                         "bytes of memory on nid=%d for GRU mq\n", mq_size, nid);
72                 return NULL;
73         }
74
75         mq = page_address(page);
76         ret = gru_create_message_queue(mq, mq_size);
77         if (ret != 0) {
78                 dev_err(xpc_part, "gru_create_message_queue() returned "
79                         "error=%d\n", ret);
80                 free_pages((unsigned long)mq, mq_order);
81                 return NULL;
82         }
83
84         /* !!! Need to do some other things to set up IRQ */
85
86         ret = request_irq(irq, irq_handler, 0, "xpc", NULL);
87         if (ret != 0) {
88                 dev_err(xpc_part, "request_irq(irq=%d) returned error=%d\n",
89                         irq, ret);
90                 free_pages((unsigned long)mq, mq_order);
91                 return NULL;
92         }
93
94         /* !!! enable generation of irq when GRU mq op occurs to this mq */
95
96         /* ??? allow other partitions to access GRU mq? */
97
98         return mq;
99 }
100
101 static void
102 xpc_destroy_gru_mq_uv(void *mq, unsigned int mq_size, unsigned int irq)
103 {
104         /* ??? disallow other partitions to access GRU mq? */
105
106         /* !!! disable generation of irq when GRU mq op occurs to this mq */
107
108         free_irq(irq, NULL);
109
110         free_pages((unsigned long)mq, get_order(mq_size));
111 }
112
113 static enum xp_retval
114 xpc_send_gru_msg(unsigned long mq_gpa, void *msg, size_t msg_size)
115 {
116         enum xp_retval xp_ret;
117         int ret;
118
119         while (1) {
120                 ret = gru_send_message_gpa(mq_gpa, msg, msg_size);
121                 if (ret == MQE_OK) {
122                         xp_ret = xpSuccess;
123                         break;
124                 }
125
126                 if (ret == MQE_QUEUE_FULL) {
127                         dev_dbg(xpc_chan, "gru_send_message_gpa() returned "
128                                 "error=MQE_QUEUE_FULL\n");
129                         /* !!! handle QLimit reached; delay & try again */
130                         /* ??? Do we add a limit to the number of retries? */
131                         (void)msleep_interruptible(10);
132                 } else if (ret == MQE_CONGESTION) {
133                         dev_dbg(xpc_chan, "gru_send_message_gpa() returned "
134                                 "error=MQE_CONGESTION\n");
135                         /* !!! handle LB Overflow; simply try again */
136                         /* ??? Do we add a limit to the number of retries? */
137                 } else {
138                         /* !!! Currently this is MQE_UNEXPECTED_CB_ERR */
139                         dev_err(xpc_chan, "gru_send_message_gpa() returned "
140                                 "error=%d\n", ret);
141                         xp_ret = xpGruSendMqError;
142                         break;
143                 }
144         }
145         return xp_ret;
146 }
147
148 static void
149 xpc_process_activate_IRQ_rcvd_uv(void)
150 {
151         unsigned long irq_flags;
152         short partid;
153         struct xpc_partition *part;
154         u8 act_state_req;
155
156         DBUG_ON(xpc_activate_IRQ_rcvd == 0);
157
158         spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
159         for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
160                 part = &xpc_partitions[partid];
161
162                 if (part->sn.uv.act_state_req == 0)
163                         continue;
164
165                 xpc_activate_IRQ_rcvd--;
166                 BUG_ON(xpc_activate_IRQ_rcvd < 0);
167
168                 act_state_req = part->sn.uv.act_state_req;
169                 part->sn.uv.act_state_req = 0;
170                 spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
171
172                 if (act_state_req == XPC_P_ASR_ACTIVATE_UV) {
173                         if (part->act_state == XPC_P_AS_INACTIVE)
174                                 xpc_activate_partition(part);
175                         else if (part->act_state == XPC_P_AS_DEACTIVATING)
176                                 XPC_DEACTIVATE_PARTITION(part, xpReactivating);
177
178                 } else if (act_state_req == XPC_P_ASR_REACTIVATE_UV) {
179                         if (part->act_state == XPC_P_AS_INACTIVE)
180                                 xpc_activate_partition(part);
181                         else
182                                 XPC_DEACTIVATE_PARTITION(part, xpReactivating);
183
184                 } else if (act_state_req == XPC_P_ASR_DEACTIVATE_UV) {
185                         XPC_DEACTIVATE_PARTITION(part, part->sn.uv.reason);
186
187                 } else {
188                         BUG();
189                 }
190
191                 spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
192                 if (xpc_activate_IRQ_rcvd == 0)
193                         break;
194         }
195         spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
196
197 }
198
199 static void
200 xpc_handle_activate_mq_msg_uv(struct xpc_partition *part,
201                               struct xpc_activate_mq_msghdr_uv *msg_hdr,
202                               int *wakeup_hb_checker)
203 {
204         unsigned long irq_flags;
205         struct xpc_partition_uv *part_uv = &part->sn.uv;
206         struct xpc_openclose_args *args;
207
208         part_uv->remote_act_state = msg_hdr->act_state;
209
210         switch (msg_hdr->type) {
211         case XPC_ACTIVATE_MQ_MSG_SYNC_ACT_STATE_UV:
212                 /* syncing of remote_act_state was just done above */
213                 break;
214
215         case XPC_ACTIVATE_MQ_MSG_INC_HEARTBEAT_UV: {
216                 struct xpc_activate_mq_msg_heartbeat_req_uv *msg;
217
218                 msg = container_of(msg_hdr,
219                                    struct xpc_activate_mq_msg_heartbeat_req_uv,
220                                    hdr);
221                 part_uv->heartbeat = msg->heartbeat;
222                 break;
223         }
224         case XPC_ACTIVATE_MQ_MSG_OFFLINE_HEARTBEAT_UV: {
225                 struct xpc_activate_mq_msg_heartbeat_req_uv *msg;
226
227                 msg = container_of(msg_hdr,
228                                    struct xpc_activate_mq_msg_heartbeat_req_uv,
229                                    hdr);
230                 part_uv->heartbeat = msg->heartbeat;
231
232                 spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
233                 part_uv->flags |= XPC_P_HEARTBEAT_OFFLINE_UV;
234                 spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
235                 break;
236         }
237         case XPC_ACTIVATE_MQ_MSG_ONLINE_HEARTBEAT_UV: {
238                 struct xpc_activate_mq_msg_heartbeat_req_uv *msg;
239
240                 msg = container_of(msg_hdr,
241                                    struct xpc_activate_mq_msg_heartbeat_req_uv,
242                                    hdr);
243                 part_uv->heartbeat = msg->heartbeat;
244
245                 spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
246                 part_uv->flags &= ~XPC_P_HEARTBEAT_OFFLINE_UV;
247                 spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
248                 break;
249         }
250         case XPC_ACTIVATE_MQ_MSG_ACTIVATE_REQ_UV: {
251                 struct xpc_activate_mq_msg_activate_req_uv *msg;
252
253                 /*
254                  * ??? Do we deal here with ts_jiffies being different
255                  * ??? if act_state != XPC_P_AS_INACTIVE instead of
256                  * ??? below?
257                  */
258                 msg = container_of(msg_hdr, struct
259                                    xpc_activate_mq_msg_activate_req_uv, hdr);
260
261                 spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
262                 if (part_uv->act_state_req == 0)
263                         xpc_activate_IRQ_rcvd++;
264                 part_uv->act_state_req = XPC_P_ASR_ACTIVATE_UV;
265                 part->remote_rp_pa = msg->rp_gpa; /* !!! _pa is _gpa */
266                 part->remote_rp_ts_jiffies = msg_hdr->rp_ts_jiffies;
267                 part_uv->remote_activate_mq_gpa = msg->activate_mq_gpa;
268                 spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
269
270                 (*wakeup_hb_checker)++;
271                 break;
272         }
273         case XPC_ACTIVATE_MQ_MSG_DEACTIVATE_REQ_UV: {
274                 struct xpc_activate_mq_msg_deactivate_req_uv *msg;
275
276                 msg = container_of(msg_hdr, struct
277                                    xpc_activate_mq_msg_deactivate_req_uv, hdr);
278
279                 spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
280                 if (part_uv->act_state_req == 0)
281                         xpc_activate_IRQ_rcvd++;
282                 part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
283                 part_uv->reason = msg->reason;
284                 spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
285
286                 (*wakeup_hb_checker)++;
287                 return;
288         }
289         case XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREQUEST_UV: {
290                 struct xpc_activate_mq_msg_chctl_closerequest_uv *msg;
291
292                 msg = container_of(msg_hdr, struct
293                                    xpc_activate_mq_msg_chctl_closerequest_uv,
294                                    hdr);
295                 args = &part->remote_openclose_args[msg->ch_number];
296                 args->reason = msg->reason;
297
298                 spin_lock_irqsave(&part->chctl_lock, irq_flags);
299                 part->chctl.flags[msg->ch_number] |= XPC_CHCTL_CLOSEREQUEST;
300                 spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
301
302                 xpc_wakeup_channel_mgr(part);
303                 break;
304         }
305         case XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREPLY_UV: {
306                 struct xpc_activate_mq_msg_chctl_closereply_uv *msg;
307
308                 msg = container_of(msg_hdr, struct
309                                    xpc_activate_mq_msg_chctl_closereply_uv,
310                                    hdr);
311
312                 spin_lock_irqsave(&part->chctl_lock, irq_flags);
313                 part->chctl.flags[msg->ch_number] |= XPC_CHCTL_CLOSEREPLY;
314                 spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
315
316                 xpc_wakeup_channel_mgr(part);
317                 break;
318         }
319         case XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREQUEST_UV: {
320                 struct xpc_activate_mq_msg_chctl_openrequest_uv *msg;
321
322                 msg = container_of(msg_hdr, struct
323                                    xpc_activate_mq_msg_chctl_openrequest_uv,
324                                    hdr);
325                 args = &part->remote_openclose_args[msg->ch_number];
326                 args->entry_size = msg->entry_size;
327                 args->local_nentries = msg->local_nentries;
328
329                 spin_lock_irqsave(&part->chctl_lock, irq_flags);
330                 part->chctl.flags[msg->ch_number] |= XPC_CHCTL_OPENREQUEST;
331                 spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
332
333                 xpc_wakeup_channel_mgr(part);
334                 break;
335         }
336         case XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREPLY_UV: {
337                 struct xpc_activate_mq_msg_chctl_openreply_uv *msg;
338
339                 msg = container_of(msg_hdr, struct
340                                    xpc_activate_mq_msg_chctl_openreply_uv, hdr);
341                 args = &part->remote_openclose_args[msg->ch_number];
342                 args->remote_nentries = msg->remote_nentries;
343                 args->local_nentries = msg->local_nentries;
344                 args->local_msgqueue_pa = msg->local_notify_mq_gpa;
345
346                 spin_lock_irqsave(&part->chctl_lock, irq_flags);
347                 part->chctl.flags[msg->ch_number] |= XPC_CHCTL_OPENREPLY;
348                 spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
349
350                 xpc_wakeup_channel_mgr(part);
351                 break;
352         }
353         case XPC_ACTIVATE_MQ_MSG_MARK_ENGAGED_UV:
354                 spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
355                 part_uv->flags |= XPC_P_ENGAGED_UV;
356                 spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
357                 break;
358
359         case XPC_ACTIVATE_MQ_MSG_MARK_DISENGAGED_UV:
360                 spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
361                 part_uv->flags &= ~XPC_P_ENGAGED_UV;
362                 spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
363                 break;
364
365         default:
366                 dev_err(xpc_part, "received unknown activate_mq msg type=%d "
367                         "from partition=%d\n", msg_hdr->type, XPC_PARTID(part));
368
369                 /* get hb checker to deactivate from the remote partition */
370                 spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
371                 if (part_uv->act_state_req == 0)
372                         xpc_activate_IRQ_rcvd++;
373                 part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
374                 part_uv->reason = xpBadMsgType;
375                 spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
376
377                 (*wakeup_hb_checker)++;
378                 return;
379         }
380
381         if (msg_hdr->rp_ts_jiffies != part->remote_rp_ts_jiffies &&
382             part->remote_rp_ts_jiffies != 0) {
383                 /*
384                  * ??? Does what we do here need to be sensitive to
385                  * ??? act_state or remote_act_state?
386                  */
387                 spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
388                 if (part_uv->act_state_req == 0)
389                         xpc_activate_IRQ_rcvd++;
390                 part_uv->act_state_req = XPC_P_ASR_REACTIVATE_UV;
391                 spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
392
393                 (*wakeup_hb_checker)++;
394         }
395 }
396
397 static irqreturn_t
398 xpc_handle_activate_IRQ_uv(int irq, void *dev_id)
399 {
400         struct xpc_activate_mq_msghdr_uv *msg_hdr;
401         short partid;
402         struct xpc_partition *part;
403         int wakeup_hb_checker = 0;
404
405         while ((msg_hdr = gru_get_next_message(xpc_activate_mq_uv)) != NULL) {
406
407                 partid = msg_hdr->partid;
408                 if (partid < 0 || partid >= XP_MAX_NPARTITIONS_UV) {
409                         dev_err(xpc_part, "xpc_handle_activate_IRQ_uv() "
410                                 "received invalid partid=0x%x in message\n",
411                                 partid);
412                 } else {
413                         part = &xpc_partitions[partid];
414                         if (xpc_part_ref(part)) {
415                                 xpc_handle_activate_mq_msg_uv(part, msg_hdr,
416                                                             &wakeup_hb_checker);
417                                 xpc_part_deref(part);
418                         }
419                 }
420
421                 gru_free_message(xpc_activate_mq_uv, msg_hdr);
422         }
423
424         if (wakeup_hb_checker)
425                 wake_up_interruptible(&xpc_activate_IRQ_wq);
426
427         return IRQ_HANDLED;
428 }
429
430 static enum xp_retval
431 xpc_send_activate_IRQ_uv(struct xpc_partition *part, void *msg, size_t msg_size,
432                          int msg_type)
433 {
434         struct xpc_activate_mq_msghdr_uv *msg_hdr = msg;
435
436         DBUG_ON(msg_size > XPC_ACTIVATE_MSG_SIZE_UV);
437
438         msg_hdr->type = msg_type;
439         msg_hdr->partid = XPC_PARTID(part);
440         msg_hdr->act_state = part->act_state;
441         msg_hdr->rp_ts_jiffies = xpc_rsvd_page->ts_jiffies;
442
443         /* ??? Is holding a spin_lock (ch->lock) during this call a bad idea? */
444         return xpc_send_gru_msg(part->sn.uv.remote_activate_mq_gpa, msg,
445                                 msg_size);
446 }
447
448 static void
449 xpc_send_activate_IRQ_part_uv(struct xpc_partition *part, void *msg,
450                               size_t msg_size, int msg_type)
451 {
452         enum xp_retval ret;
453
454         ret = xpc_send_activate_IRQ_uv(part, msg, msg_size, msg_type);
455         if (unlikely(ret != xpSuccess))
456                 XPC_DEACTIVATE_PARTITION(part, ret);
457 }
458
459 static void
460 xpc_send_activate_IRQ_ch_uv(struct xpc_channel *ch, unsigned long *irq_flags,
461                          void *msg, size_t msg_size, int msg_type)
462 {
463         struct xpc_partition *part = &xpc_partitions[ch->number];
464         enum xp_retval ret;
465
466         ret = xpc_send_activate_IRQ_uv(part, msg, msg_size, msg_type);
467         if (unlikely(ret != xpSuccess)) {
468                 if (irq_flags != NULL)
469                         spin_unlock_irqrestore(&ch->lock, *irq_flags);
470
471                 XPC_DEACTIVATE_PARTITION(part, ret);
472
473                 if (irq_flags != NULL)
474                         spin_lock_irqsave(&ch->lock, *irq_flags);
475         }
476 }
477
478 static void
479 xpc_send_local_activate_IRQ_uv(struct xpc_partition *part, int act_state_req)
480 {
481         unsigned long irq_flags;
482         struct xpc_partition_uv *part_uv = &part->sn.uv;
483
484         /*
485          * !!! Make our side think that the remote parition sent an activate
486          * !!! message our way by doing what the activate IRQ handler would
487          * !!! do had one really been sent.
488          */
489
490         spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
491         if (part_uv->act_state_req == 0)
492                 xpc_activate_IRQ_rcvd++;
493         part_uv->act_state_req = act_state_req;
494         spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
495
496         wake_up_interruptible(&xpc_activate_IRQ_wq);
497 }
498
499 static enum xp_retval
500 xpc_get_partition_rsvd_page_pa_uv(void *buf, u64 *cookie, unsigned long *rp_pa,
501                                   size_t *len)
502 {
503         /* !!! call the UV version of sn_partition_reserved_page_pa() */
504         return xpUnsupported;
505 }
506
507 static int
508 xpc_setup_rsvd_page_sn_uv(struct xpc_rsvd_page *rp)
509 {
510         rp->sn.activate_mq_gpa = uv_gpa(xpc_activate_mq_uv);
511         return 0;
512 }
513
514 static void
515 xpc_send_heartbeat_uv(int msg_type)
516 {
517         short partid;
518         struct xpc_partition *part;
519         struct xpc_activate_mq_msg_heartbeat_req_uv msg;
520
521         /*
522          * !!! On uv we're broadcasting a heartbeat message every 5 seconds.
523          * !!! Whereas on sn2 we're bte_copy'ng the heartbeat info every 20
524          * !!! seconds. This is an increase in numalink traffic.
525          * ??? Is this good?
526          */
527
528         msg.heartbeat = atomic64_inc_return(&xpc_heartbeat_uv);
529
530         partid = find_first_bit(xpc_heartbeating_to_mask_uv,
531                                 XP_MAX_NPARTITIONS_UV);
532
533         while (partid < XP_MAX_NPARTITIONS_UV) {
534                 part = &xpc_partitions[partid];
535
536                 xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
537                                               msg_type);
538
539                 partid = find_next_bit(xpc_heartbeating_to_mask_uv,
540                                        XP_MAX_NPARTITIONS_UV, partid + 1);
541         }
542 }
543
544 static void
545 xpc_increment_heartbeat_uv(void)
546 {
547         xpc_send_heartbeat_uv(XPC_ACTIVATE_MQ_MSG_INC_HEARTBEAT_UV);
548 }
549
550 static void
551 xpc_offline_heartbeat_uv(void)
552 {
553         xpc_send_heartbeat_uv(XPC_ACTIVATE_MQ_MSG_OFFLINE_HEARTBEAT_UV);
554 }
555
556 static void
557 xpc_online_heartbeat_uv(void)
558 {
559         xpc_send_heartbeat_uv(XPC_ACTIVATE_MQ_MSG_ONLINE_HEARTBEAT_UV);
560 }
561
562 static void
563 xpc_heartbeat_init_uv(void)
564 {
565         atomic64_set(&xpc_heartbeat_uv, 0);
566         bitmap_zero(xpc_heartbeating_to_mask_uv, XP_MAX_NPARTITIONS_UV);
567         xpc_heartbeating_to_mask = &xpc_heartbeating_to_mask_uv[0];
568 }
569
570 static void
571 xpc_heartbeat_exit_uv(void)
572 {
573         xpc_send_heartbeat_uv(XPC_ACTIVATE_MQ_MSG_OFFLINE_HEARTBEAT_UV);
574 }
575
576 static enum xp_retval
577 xpc_get_remote_heartbeat_uv(struct xpc_partition *part)
578 {
579         struct xpc_partition_uv *part_uv = &part->sn.uv;
580         enum xp_retval ret = xpNoHeartbeat;
581
582         if (part_uv->remote_act_state != XPC_P_AS_INACTIVE &&
583             part_uv->remote_act_state != XPC_P_AS_DEACTIVATING) {
584
585                 if (part_uv->heartbeat != part->last_heartbeat ||
586                     (part_uv->flags & XPC_P_HEARTBEAT_OFFLINE_UV)) {
587
588                         part->last_heartbeat = part_uv->heartbeat;
589                         ret = xpSuccess;
590                 }
591         }
592         return ret;
593 }
594
595 static void
596 xpc_request_partition_activation_uv(struct xpc_rsvd_page *remote_rp,
597                                     unsigned long remote_rp_gpa, int nasid)
598 {
599         short partid = remote_rp->SAL_partid;
600         struct xpc_partition *part = &xpc_partitions[partid];
601         struct xpc_activate_mq_msg_activate_req_uv msg;
602
603         part->remote_rp_pa = remote_rp_gpa; /* !!! _pa here is really _gpa */
604         part->remote_rp_ts_jiffies = remote_rp->ts_jiffies;
605         part->sn.uv.remote_activate_mq_gpa = remote_rp->sn.activate_mq_gpa;
606
607         /*
608          * ??? Is it a good idea to make this conditional on what is
609          * ??? potentially stale state information?
610          */
611         if (part->sn.uv.remote_act_state == XPC_P_AS_INACTIVE) {
612                 msg.rp_gpa = uv_gpa(xpc_rsvd_page);
613                 msg.activate_mq_gpa = xpc_rsvd_page->sn.activate_mq_gpa;
614                 xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
615                                            XPC_ACTIVATE_MQ_MSG_ACTIVATE_REQ_UV);
616         }
617
618         if (part->act_state == XPC_P_AS_INACTIVE)
619                 xpc_send_local_activate_IRQ_uv(part, XPC_P_ASR_ACTIVATE_UV);
620 }
621
622 static void
623 xpc_request_partition_reactivation_uv(struct xpc_partition *part)
624 {
625         xpc_send_local_activate_IRQ_uv(part, XPC_P_ASR_ACTIVATE_UV);
626 }
627
628 static void
629 xpc_request_partition_deactivation_uv(struct xpc_partition *part)
630 {
631         struct xpc_activate_mq_msg_deactivate_req_uv msg;
632
633         /*
634          * ??? Is it a good idea to make this conditional on what is
635          * ??? potentially stale state information?
636          */
637         if (part->sn.uv.remote_act_state != XPC_P_AS_DEACTIVATING &&
638             part->sn.uv.remote_act_state != XPC_P_AS_INACTIVE) {
639
640                 msg.reason = part->reason;
641                 xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
642                                          XPC_ACTIVATE_MQ_MSG_DEACTIVATE_REQ_UV);
643         }
644 }
645
646 static void
647 xpc_cancel_partition_deactivation_request_uv(struct xpc_partition *part)
648 {
649         /* nothing needs to be done */
650         return;
651 }
652
653 static void
654 xpc_init_fifo_uv(struct xpc_fifo_head_uv *head)
655 {
656         head->first = NULL;
657         head->last = NULL;
658         spin_lock_init(&head->lock);
659         head->n_entries = 0;
660 }
661
662 static void *
663 xpc_get_fifo_entry_uv(struct xpc_fifo_head_uv *head)
664 {
665         unsigned long irq_flags;
666         struct xpc_fifo_entry_uv *first;
667
668         spin_lock_irqsave(&head->lock, irq_flags);
669         first = head->first;
670         if (head->first != NULL) {
671                 head->first = first->next;
672                 if (head->first == NULL)
673                         head->last = NULL;
674         }
675         head->n_entries++;
676         spin_unlock_irqrestore(&head->lock, irq_flags);
677         first->next = NULL;
678         return first;
679 }
680
681 static void
682 xpc_put_fifo_entry_uv(struct xpc_fifo_head_uv *head,
683                       struct xpc_fifo_entry_uv *last)
684 {
685         unsigned long irq_flags;
686
687         last->next = NULL;
688         spin_lock_irqsave(&head->lock, irq_flags);
689         if (head->last != NULL)
690                 head->last->next = last;
691         else
692                 head->first = last;
693         head->last = last;
694         head->n_entries--;
695         BUG_ON(head->n_entries < 0);
696         spin_unlock_irqrestore(&head->lock, irq_flags);
697 }
698
699 static int
700 xpc_n_of_fifo_entries_uv(struct xpc_fifo_head_uv *head)
701 {
702         return head->n_entries;
703 }
704
705 /*
706  * Setup the channel structures that are uv specific.
707  */
708 static enum xp_retval
709 xpc_setup_ch_structures_sn_uv(struct xpc_partition *part)
710 {
711         struct xpc_channel_uv *ch_uv;
712         int ch_number;
713
714         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
715                 ch_uv = &part->channels[ch_number].sn.uv;
716
717                 xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
718                 xpc_init_fifo_uv(&ch_uv->recv_msg_list);
719         }
720
721         return xpSuccess;
722 }
723
724 /*
725  * Teardown the channel structures that are uv specific.
726  */
727 static void
728 xpc_teardown_ch_structures_sn_uv(struct xpc_partition *part)
729 {
730         /* nothing needs to be done */
731         return;
732 }
733
734 static enum xp_retval
735 xpc_make_first_contact_uv(struct xpc_partition *part)
736 {
737         struct xpc_activate_mq_msg_uv msg;
738
739         /*
740          * We send a sync msg to get the remote partition's remote_act_state
741          * updated to our current act_state which at this point should
742          * be XPC_P_AS_ACTIVATING.
743          */
744         xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
745                                       XPC_ACTIVATE_MQ_MSG_SYNC_ACT_STATE_UV);
746
747         while (part->sn.uv.remote_act_state != XPC_P_AS_ACTIVATING) {
748
749                 dev_dbg(xpc_part, "waiting to make first contact with "
750                         "partition %d\n", XPC_PARTID(part));
751
752                 /* wait a 1/4 of a second or so */
753                 (void)msleep_interruptible(250);
754
755                 if (part->act_state == XPC_P_AS_DEACTIVATING)
756                         return part->reason;
757         }
758
759         return xpSuccess;
760 }
761
762 static u64
763 xpc_get_chctl_all_flags_uv(struct xpc_partition *part)
764 {
765         unsigned long irq_flags;
766         union xpc_channel_ctl_flags chctl;
767
768         spin_lock_irqsave(&part->chctl_lock, irq_flags);
769         chctl = part->chctl;
770         if (chctl.all_flags != 0)
771                 part->chctl.all_flags = 0;
772
773         spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
774         return chctl.all_flags;
775 }
776
777 static enum xp_retval
778 xpc_allocate_send_msg_slot_uv(struct xpc_channel *ch)
779 {
780         struct xpc_channel_uv *ch_uv = &ch->sn.uv;
781         struct xpc_send_msg_slot_uv *msg_slot;
782         unsigned long irq_flags;
783         int nentries;
784         int entry;
785         size_t nbytes;
786
787         for (nentries = ch->local_nentries; nentries > 0; nentries--) {
788                 nbytes = nentries * sizeof(struct xpc_send_msg_slot_uv);
789                 ch_uv->send_msg_slots = kzalloc(nbytes, GFP_KERNEL);
790                 if (ch_uv->send_msg_slots == NULL)
791                         continue;
792
793                 for (entry = 0; entry < nentries; entry++) {
794                         msg_slot = &ch_uv->send_msg_slots[entry];
795
796                         msg_slot->msg_slot_number = entry;
797                         xpc_put_fifo_entry_uv(&ch_uv->msg_slot_free_list,
798                                               &msg_slot->next);
799                 }
800
801                 spin_lock_irqsave(&ch->lock, irq_flags);
802                 if (nentries < ch->local_nentries)
803                         ch->local_nentries = nentries;
804                 spin_unlock_irqrestore(&ch->lock, irq_flags);
805                 return xpSuccess;
806         }
807
808         return xpNoMemory;
809 }
810
811 static enum xp_retval
812 xpc_allocate_recv_msg_slot_uv(struct xpc_channel *ch)
813 {
814         struct xpc_channel_uv *ch_uv = &ch->sn.uv;
815         struct xpc_notify_mq_msg_uv *msg_slot;
816         unsigned long irq_flags;
817         int nentries;
818         int entry;
819         size_t nbytes;
820
821         for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
822                 nbytes = nentries * ch->entry_size;
823                 ch_uv->recv_msg_slots = kzalloc(nbytes, GFP_KERNEL);
824                 if (ch_uv->recv_msg_slots == NULL)
825                         continue;
826
827                 for (entry = 0; entry < nentries; entry++) {
828                         msg_slot = ch_uv->recv_msg_slots + entry *
829                             ch->entry_size;
830
831                         msg_slot->hdr.msg_slot_number = entry;
832                 }
833
834                 spin_lock_irqsave(&ch->lock, irq_flags);
835                 if (nentries < ch->remote_nentries)
836                         ch->remote_nentries = nentries;
837                 spin_unlock_irqrestore(&ch->lock, irq_flags);
838                 return xpSuccess;
839         }
840
841         return xpNoMemory;
842 }
843
844 /*
845  * Allocate msg_slots associated with the channel.
846  */
847 static enum xp_retval
848 xpc_setup_msg_structures_uv(struct xpc_channel *ch)
849 {
850         static enum xp_retval ret;
851         struct xpc_channel_uv *ch_uv = &ch->sn.uv;
852
853         DBUG_ON(ch->flags & XPC_C_SETUP);
854
855         ret = xpc_allocate_send_msg_slot_uv(ch);
856         if (ret == xpSuccess) {
857
858                 ret = xpc_allocate_recv_msg_slot_uv(ch);
859                 if (ret != xpSuccess) {
860                         kfree(ch_uv->send_msg_slots);
861                         xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
862                 }
863         }
864         return ret;
865 }
866
867 /*
868  * Free up msg_slots and clear other stuff that were setup for the specified
869  * channel.
870  */
871 static void
872 xpc_teardown_msg_structures_uv(struct xpc_channel *ch)
873 {
874         struct xpc_channel_uv *ch_uv = &ch->sn.uv;
875
876         DBUG_ON(!spin_is_locked(&ch->lock));
877
878         ch_uv->remote_notify_mq_gpa = 0;
879
880         if (ch->flags & XPC_C_SETUP) {
881                 xpc_init_fifo_uv(&ch_uv->msg_slot_free_list);
882                 kfree(ch_uv->send_msg_slots);
883                 xpc_init_fifo_uv(&ch_uv->recv_msg_list);
884                 kfree(ch_uv->recv_msg_slots);
885         }
886 }
887
888 static void
889 xpc_send_chctl_closerequest_uv(struct xpc_channel *ch, unsigned long *irq_flags)
890 {
891         struct xpc_activate_mq_msg_chctl_closerequest_uv msg;
892
893         msg.ch_number = ch->number;
894         msg.reason = ch->reason;
895         xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
896                                     XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREQUEST_UV);
897 }
898
899 static void
900 xpc_send_chctl_closereply_uv(struct xpc_channel *ch, unsigned long *irq_flags)
901 {
902         struct xpc_activate_mq_msg_chctl_closereply_uv msg;
903
904         msg.ch_number = ch->number;
905         xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
906                                     XPC_ACTIVATE_MQ_MSG_CHCTL_CLOSEREPLY_UV);
907 }
908
909 static void
910 xpc_send_chctl_openrequest_uv(struct xpc_channel *ch, unsigned long *irq_flags)
911 {
912         struct xpc_activate_mq_msg_chctl_openrequest_uv msg;
913
914         msg.ch_number = ch->number;
915         msg.entry_size = ch->entry_size;
916         msg.local_nentries = ch->local_nentries;
917         xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
918                                     XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREQUEST_UV);
919 }
920
921 static void
922 xpc_send_chctl_openreply_uv(struct xpc_channel *ch, unsigned long *irq_flags)
923 {
924         struct xpc_activate_mq_msg_chctl_openreply_uv msg;
925
926         msg.ch_number = ch->number;
927         msg.local_nentries = ch->local_nentries;
928         msg.remote_nentries = ch->remote_nentries;
929         msg.local_notify_mq_gpa = uv_gpa(xpc_notify_mq_uv);
930         xpc_send_activate_IRQ_ch_uv(ch, irq_flags, &msg, sizeof(msg),
931                                     XPC_ACTIVATE_MQ_MSG_CHCTL_OPENREPLY_UV);
932 }
933
934 static void
935 xpc_send_chctl_local_msgrequest_uv(struct xpc_partition *part, int ch_number)
936 {
937         unsigned long irq_flags;
938
939         spin_lock_irqsave(&part->chctl_lock, irq_flags);
940         part->chctl.flags[ch_number] |= XPC_CHCTL_MSGREQUEST;
941         spin_unlock_irqrestore(&part->chctl_lock, irq_flags);
942
943         xpc_wakeup_channel_mgr(part);
944 }
945
946 static void
947 xpc_save_remote_msgqueue_pa_uv(struct xpc_channel *ch,
948                                unsigned long msgqueue_pa)
949 {
950         ch->sn.uv.remote_notify_mq_gpa = msgqueue_pa;
951 }
952
953 static void
954 xpc_indicate_partition_engaged_uv(struct xpc_partition *part)
955 {
956         struct xpc_activate_mq_msg_uv msg;
957
958         xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
959                                       XPC_ACTIVATE_MQ_MSG_MARK_ENGAGED_UV);
960 }
961
962 static void
963 xpc_indicate_partition_disengaged_uv(struct xpc_partition *part)
964 {
965         struct xpc_activate_mq_msg_uv msg;
966
967         xpc_send_activate_IRQ_part_uv(part, &msg, sizeof(msg),
968                                       XPC_ACTIVATE_MQ_MSG_MARK_DISENGAGED_UV);
969 }
970
971 static void
972 xpc_assume_partition_disengaged_uv(short partid)
973 {
974         struct xpc_partition_uv *part_uv = &xpc_partitions[partid].sn.uv;
975         unsigned long irq_flags;
976
977         spin_lock_irqsave(&part_uv->flags_lock, irq_flags);
978         part_uv->flags &= ~XPC_P_ENGAGED_UV;
979         spin_unlock_irqrestore(&part_uv->flags_lock, irq_flags);
980 }
981
982 static int
983 xpc_partition_engaged_uv(short partid)
984 {
985         return (xpc_partitions[partid].sn.uv.flags & XPC_P_ENGAGED_UV) != 0;
986 }
987
988 static int
989 xpc_any_partition_engaged_uv(void)
990 {
991         struct xpc_partition_uv *part_uv;
992         short partid;
993
994         for (partid = 0; partid < XP_MAX_NPARTITIONS_UV; partid++) {
995                 part_uv = &xpc_partitions[partid].sn.uv;
996                 if ((part_uv->flags & XPC_P_ENGAGED_UV) != 0)
997                         return 1;
998         }
999         return 0;
1000 }
1001
1002 static enum xp_retval
1003 xpc_allocate_msg_slot_uv(struct xpc_channel *ch, u32 flags,
1004                          struct xpc_send_msg_slot_uv **address_of_msg_slot)
1005 {
1006         enum xp_retval ret;
1007         struct xpc_send_msg_slot_uv *msg_slot;
1008         struct xpc_fifo_entry_uv *entry;
1009
1010         while (1) {
1011                 entry = xpc_get_fifo_entry_uv(&ch->sn.uv.msg_slot_free_list);
1012                 if (entry != NULL)
1013                         break;
1014
1015                 if (flags & XPC_NOWAIT)
1016                         return xpNoWait;
1017
1018                 ret = xpc_allocate_msg_wait(ch);
1019                 if (ret != xpInterrupted && ret != xpTimeout)
1020                         return ret;
1021         }
1022
1023         msg_slot = container_of(entry, struct xpc_send_msg_slot_uv, next);
1024         *address_of_msg_slot = msg_slot;
1025         return xpSuccess;
1026 }
1027
1028 static void
1029 xpc_free_msg_slot_uv(struct xpc_channel *ch,
1030                      struct xpc_send_msg_slot_uv *msg_slot)
1031 {
1032         xpc_put_fifo_entry_uv(&ch->sn.uv.msg_slot_free_list, &msg_slot->next);
1033
1034         /* wakeup anyone waiting for a free msg slot */
1035         if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
1036                 wake_up(&ch->msg_allocate_wq);
1037 }
1038
1039 static void
1040 xpc_notify_sender_uv(struct xpc_channel *ch,
1041                      struct xpc_send_msg_slot_uv *msg_slot,
1042                      enum xp_retval reason)
1043 {
1044         xpc_notify_func func = msg_slot->func;
1045
1046         if (func != NULL && cmpxchg(&msg_slot->func, func, NULL) == func) {
1047
1048                 atomic_dec(&ch->n_to_notify);
1049
1050                 dev_dbg(xpc_chan, "msg_slot->func() called, msg_slot=0x%p "
1051                         "msg_slot_number=%d partid=%d channel=%d\n", msg_slot,
1052                         msg_slot->msg_slot_number, ch->partid, ch->number);
1053
1054                 func(reason, ch->partid, ch->number, msg_slot->key);
1055
1056                 dev_dbg(xpc_chan, "msg_slot->func() returned, msg_slot=0x%p "
1057                         "msg_slot_number=%d partid=%d channel=%d\n", msg_slot,
1058                         msg_slot->msg_slot_number, ch->partid, ch->number);
1059         }
1060 }
1061
1062 static void
1063 xpc_handle_notify_mq_ack_uv(struct xpc_channel *ch,
1064                             struct xpc_notify_mq_msg_uv *msg)
1065 {
1066         struct xpc_send_msg_slot_uv *msg_slot;
1067         int entry = msg->hdr.msg_slot_number % ch->local_nentries;
1068
1069         msg_slot = &ch->sn.uv.send_msg_slots[entry];
1070
1071         BUG_ON(msg_slot->msg_slot_number != msg->hdr.msg_slot_number);
1072         msg_slot->msg_slot_number += ch->local_nentries;
1073
1074         if (msg_slot->func != NULL)
1075                 xpc_notify_sender_uv(ch, msg_slot, xpMsgDelivered);
1076
1077         xpc_free_msg_slot_uv(ch, msg_slot);
1078 }
1079
1080 static void
1081 xpc_handle_notify_mq_msg_uv(struct xpc_partition *part,
1082                             struct xpc_notify_mq_msg_uv *msg)
1083 {
1084         struct xpc_partition_uv *part_uv = &part->sn.uv;
1085         struct xpc_channel *ch;
1086         struct xpc_channel_uv *ch_uv;
1087         struct xpc_notify_mq_msg_uv *msg_slot;
1088         unsigned long irq_flags;
1089         int ch_number = msg->hdr.ch_number;
1090
1091         if (unlikely(ch_number >= part->nchannels)) {
1092                 dev_err(xpc_part, "xpc_handle_notify_IRQ_uv() received invalid "
1093                         "channel number=0x%x in message from partid=%d\n",
1094                         ch_number, XPC_PARTID(part));
1095
1096                 /* get hb checker to deactivate from the remote partition */
1097                 spin_lock_irqsave(&xpc_activate_IRQ_rcvd_lock, irq_flags);
1098                 if (part_uv->act_state_req == 0)
1099                         xpc_activate_IRQ_rcvd++;
1100                 part_uv->act_state_req = XPC_P_ASR_DEACTIVATE_UV;
1101                 part_uv->reason = xpBadChannelNumber;
1102                 spin_unlock_irqrestore(&xpc_activate_IRQ_rcvd_lock, irq_flags);
1103
1104                 wake_up_interruptible(&xpc_activate_IRQ_wq);
1105                 return;
1106         }
1107
1108         ch = &part->channels[ch_number];
1109         xpc_msgqueue_ref(ch);
1110
1111         if (!(ch->flags & XPC_C_CONNECTED)) {
1112                 xpc_msgqueue_deref(ch);
1113                 return;
1114         }
1115
1116         /* see if we're really dealing with an ACK for a previously sent msg */
1117         if (msg->hdr.size == 0) {
1118                 xpc_handle_notify_mq_ack_uv(ch, msg);
1119                 xpc_msgqueue_deref(ch);
1120                 return;
1121         }
1122
1123         /* we're dealing with a normal message sent via the notify_mq */
1124         ch_uv = &ch->sn.uv;
1125
1126         msg_slot = (struct xpc_notify_mq_msg_uv *)((u64)ch_uv->recv_msg_slots +
1127                     (msg->hdr.msg_slot_number % ch->remote_nentries) *
1128                     ch->entry_size);
1129
1130         BUG_ON(msg->hdr.msg_slot_number != msg_slot->hdr.msg_slot_number);
1131         BUG_ON(msg_slot->hdr.size != 0);
1132
1133         memcpy(msg_slot, msg, msg->hdr.size);
1134
1135         xpc_put_fifo_entry_uv(&ch_uv->recv_msg_list, &msg_slot->hdr.u.next);
1136
1137         if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) {
1138                 /*
1139                  * If there is an existing idle kthread get it to deliver
1140                  * the payload, otherwise we'll have to get the channel mgr
1141                  * for this partition to create a kthread to do the delivery.
1142                  */
1143                 if (atomic_read(&ch->kthreads_idle) > 0)
1144                         wake_up_nr(&ch->idle_wq, 1);
1145                 else
1146                         xpc_send_chctl_local_msgrequest_uv(part, ch->number);
1147         }
1148         xpc_msgqueue_deref(ch);
1149 }
1150
1151 static irqreturn_t
1152 xpc_handle_notify_IRQ_uv(int irq, void *dev_id)
1153 {
1154         struct xpc_notify_mq_msg_uv *msg;
1155         short partid;
1156         struct xpc_partition *part;
1157
1158         while ((msg = gru_get_next_message(xpc_notify_mq_uv)) != NULL) {
1159
1160                 partid = msg->hdr.partid;
1161                 if (partid < 0 || partid >= XP_MAX_NPARTITIONS_UV) {
1162                         dev_err(xpc_part, "xpc_handle_notify_IRQ_uv() received "
1163                                 "invalid partid=0x%x in message\n", partid);
1164                 } else {
1165                         part = &xpc_partitions[partid];
1166
1167                         if (xpc_part_ref(part)) {
1168                                 xpc_handle_notify_mq_msg_uv(part, msg);
1169                                 xpc_part_deref(part);
1170                         }
1171                 }
1172
1173                 gru_free_message(xpc_notify_mq_uv, msg);
1174         }
1175
1176         return IRQ_HANDLED;
1177 }
1178
1179 static int
1180 xpc_n_of_deliverable_payloads_uv(struct xpc_channel *ch)
1181 {
1182         return xpc_n_of_fifo_entries_uv(&ch->sn.uv.recv_msg_list);
1183 }
1184
1185 static void
1186 xpc_process_msg_chctl_flags_uv(struct xpc_partition *part, int ch_number)
1187 {
1188         struct xpc_channel *ch = &part->channels[ch_number];
1189         int ndeliverable_payloads;
1190
1191         xpc_msgqueue_ref(ch);
1192
1193         ndeliverable_payloads = xpc_n_of_deliverable_payloads_uv(ch);
1194
1195         if (ndeliverable_payloads > 0 &&
1196             (ch->flags & XPC_C_CONNECTED) &&
1197             (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE)) {
1198
1199                 xpc_activate_kthreads(ch, ndeliverable_payloads);
1200         }
1201
1202         xpc_msgqueue_deref(ch);
1203 }
1204
1205 static enum xp_retval
1206 xpc_send_payload_uv(struct xpc_channel *ch, u32 flags, void *payload,
1207                     u16 payload_size, u8 notify_type, xpc_notify_func func,
1208                     void *key)
1209 {
1210         enum xp_retval ret = xpSuccess;
1211         struct xpc_send_msg_slot_uv *msg_slot = NULL;
1212         struct xpc_notify_mq_msg_uv *msg;
1213         u8 msg_buffer[XPC_NOTIFY_MSG_SIZE_UV];
1214         size_t msg_size;
1215
1216         DBUG_ON(notify_type != XPC_N_CALL);
1217
1218         msg_size = sizeof(struct xpc_notify_mq_msghdr_uv) + payload_size;
1219         if (msg_size > ch->entry_size)
1220                 return xpPayloadTooBig;
1221
1222         xpc_msgqueue_ref(ch);
1223
1224         if (ch->flags & XPC_C_DISCONNECTING) {
1225                 ret = ch->reason;
1226                 goto out_1;
1227         }
1228         if (!(ch->flags & XPC_C_CONNECTED)) {
1229                 ret = xpNotConnected;
1230                 goto out_1;
1231         }
1232
1233         ret = xpc_allocate_msg_slot_uv(ch, flags, &msg_slot);
1234         if (ret != xpSuccess)
1235                 goto out_1;
1236
1237         if (func != NULL) {
1238                 atomic_inc(&ch->n_to_notify);
1239
1240                 msg_slot->key = key;
1241                 wmb(); /* a non-NULL func must hit memory after the key */
1242                 msg_slot->func = func;
1243
1244                 if (ch->flags & XPC_C_DISCONNECTING) {
1245                         ret = ch->reason;
1246                         goto out_2;
1247                 }
1248         }
1249
1250         msg = (struct xpc_notify_mq_msg_uv *)&msg_buffer;
1251         msg->hdr.partid = xp_partition_id;
1252         msg->hdr.ch_number = ch->number;
1253         msg->hdr.size = msg_size;
1254         msg->hdr.msg_slot_number = msg_slot->msg_slot_number;
1255         memcpy(&msg->payload, payload, payload_size);
1256
1257         ret = xpc_send_gru_msg(ch->sn.uv.remote_notify_mq_gpa, msg, msg_size);
1258         if (ret == xpSuccess)
1259                 goto out_1;
1260
1261         XPC_DEACTIVATE_PARTITION(&xpc_partitions[ch->partid], ret);
1262 out_2:
1263         if (func != NULL) {
1264                 /*
1265                  * Try to NULL the msg_slot's func field. If we fail, then
1266                  * xpc_notify_senders_of_disconnect_uv() beat us to it, in which
1267                  * case we need to pretend we succeeded to send the message
1268                  * since the user will get a callout for the disconnect error
1269                  * by xpc_notify_senders_of_disconnect_uv(), and to also get an
1270                  * error returned here will confuse them. Additionally, since
1271                  * in this case the channel is being disconnected we don't need
1272                  * to put the the msg_slot back on the free list.
1273                  */
1274                 if (cmpxchg(&msg_slot->func, func, NULL) != func) {
1275                         ret = xpSuccess;
1276                         goto out_1;
1277                 }
1278
1279                 msg_slot->key = NULL;
1280                 atomic_dec(&ch->n_to_notify);
1281         }
1282         xpc_free_msg_slot_uv(ch, msg_slot);
1283 out_1:
1284         xpc_msgqueue_deref(ch);
1285         return ret;
1286 }
1287
1288 /*
1289  * Tell the callers of xpc_send_notify() that the status of their payloads
1290  * is unknown because the channel is now disconnecting.
1291  *
1292  * We don't worry about putting these msg_slots on the free list since the
1293  * msg_slots themselves are about to be kfree'd.
1294  */
1295 static void
1296 xpc_notify_senders_of_disconnect_uv(struct xpc_channel *ch)
1297 {
1298         struct xpc_send_msg_slot_uv *msg_slot;
1299         int entry;
1300
1301         DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
1302
1303         for (entry = 0; entry < ch->local_nentries; entry++) {
1304
1305                 if (atomic_read(&ch->n_to_notify) == 0)
1306                         break;
1307
1308                 msg_slot = &ch->sn.uv.send_msg_slots[entry];
1309                 if (msg_slot->func != NULL)
1310                         xpc_notify_sender_uv(ch, msg_slot, ch->reason);
1311         }
1312 }
1313
1314 /*
1315  * Get the next deliverable message's payload.
1316  */
1317 static void *
1318 xpc_get_deliverable_payload_uv(struct xpc_channel *ch)
1319 {
1320         struct xpc_fifo_entry_uv *entry;
1321         struct xpc_notify_mq_msg_uv *msg;
1322         void *payload = NULL;
1323
1324         if (!(ch->flags & XPC_C_DISCONNECTING)) {
1325                 entry = xpc_get_fifo_entry_uv(&ch->sn.uv.recv_msg_list);
1326                 if (entry != NULL) {
1327                         msg = container_of(entry, struct xpc_notify_mq_msg_uv,
1328                                            hdr.u.next);
1329                         payload = &msg->payload;
1330                 }
1331         }
1332         return payload;
1333 }
1334
1335 static void
1336 xpc_received_payload_uv(struct xpc_channel *ch, void *payload)
1337 {
1338         struct xpc_notify_mq_msg_uv *msg;
1339         enum xp_retval ret;
1340
1341         msg = container_of(payload, struct xpc_notify_mq_msg_uv, payload);
1342
1343         /* return an ACK to the sender of this message */
1344
1345         msg->hdr.partid = xp_partition_id;
1346         msg->hdr.size = 0;      /* size of zero indicates this is an ACK */
1347
1348         ret = xpc_send_gru_msg(ch->sn.uv.remote_notify_mq_gpa, msg,
1349                                sizeof(struct xpc_notify_mq_msghdr_uv));
1350         if (ret != xpSuccess)
1351                 XPC_DEACTIVATE_PARTITION(&xpc_partitions[ch->partid], ret);
1352
1353         msg->hdr.msg_slot_number += ch->remote_nentries;
1354 }
1355
1356 int
1357 xpc_init_uv(void)
1358 {
1359         xpc_setup_partitions_sn = xpc_setup_partitions_sn_uv;
1360         xpc_process_activate_IRQ_rcvd = xpc_process_activate_IRQ_rcvd_uv;
1361         xpc_get_partition_rsvd_page_pa = xpc_get_partition_rsvd_page_pa_uv;
1362         xpc_setup_rsvd_page_sn = xpc_setup_rsvd_page_sn_uv;
1363         xpc_increment_heartbeat = xpc_increment_heartbeat_uv;
1364         xpc_offline_heartbeat = xpc_offline_heartbeat_uv;
1365         xpc_online_heartbeat = xpc_online_heartbeat_uv;
1366         xpc_heartbeat_init = xpc_heartbeat_init_uv;
1367         xpc_heartbeat_exit = xpc_heartbeat_exit_uv;
1368         xpc_get_remote_heartbeat = xpc_get_remote_heartbeat_uv;
1369
1370         xpc_request_partition_activation = xpc_request_partition_activation_uv;
1371         xpc_request_partition_reactivation =
1372             xpc_request_partition_reactivation_uv;
1373         xpc_request_partition_deactivation =
1374             xpc_request_partition_deactivation_uv;
1375         xpc_cancel_partition_deactivation_request =
1376             xpc_cancel_partition_deactivation_request_uv;
1377
1378         xpc_setup_ch_structures_sn = xpc_setup_ch_structures_sn_uv;
1379         xpc_teardown_ch_structures_sn = xpc_teardown_ch_structures_sn_uv;
1380
1381         xpc_make_first_contact = xpc_make_first_contact_uv;
1382
1383         xpc_get_chctl_all_flags = xpc_get_chctl_all_flags_uv;
1384         xpc_send_chctl_closerequest = xpc_send_chctl_closerequest_uv;
1385         xpc_send_chctl_closereply = xpc_send_chctl_closereply_uv;
1386         xpc_send_chctl_openrequest = xpc_send_chctl_openrequest_uv;
1387         xpc_send_chctl_openreply = xpc_send_chctl_openreply_uv;
1388
1389         xpc_save_remote_msgqueue_pa = xpc_save_remote_msgqueue_pa_uv;
1390
1391         xpc_setup_msg_structures = xpc_setup_msg_structures_uv;
1392         xpc_teardown_msg_structures = xpc_teardown_msg_structures_uv;
1393
1394         xpc_indicate_partition_engaged = xpc_indicate_partition_engaged_uv;
1395         xpc_indicate_partition_disengaged =
1396             xpc_indicate_partition_disengaged_uv;
1397         xpc_assume_partition_disengaged = xpc_assume_partition_disengaged_uv;
1398         xpc_partition_engaged = xpc_partition_engaged_uv;
1399         xpc_any_partition_engaged = xpc_any_partition_engaged_uv;
1400
1401         xpc_n_of_deliverable_payloads = xpc_n_of_deliverable_payloads_uv;
1402         xpc_process_msg_chctl_flags = xpc_process_msg_chctl_flags_uv;
1403         xpc_send_payload = xpc_send_payload_uv;
1404         xpc_notify_senders_of_disconnect = xpc_notify_senders_of_disconnect_uv;
1405         xpc_get_deliverable_payload = xpc_get_deliverable_payload_uv;
1406         xpc_received_payload = xpc_received_payload_uv;
1407
1408         if (sizeof(struct xpc_notify_mq_msghdr_uv) > XPC_MSG_HDR_MAX_SIZE) {
1409                 dev_err(xpc_part, "xpc_notify_mq_msghdr_uv is larger than %d\n",
1410                         XPC_MSG_HDR_MAX_SIZE);
1411                 return -E2BIG;
1412         }
1413
1414         /* ??? The cpuid argument's value is 0, is that what we want? */
1415         /* !!! The irq argument's value isn't correct. */
1416         xpc_activate_mq_uv = xpc_create_gru_mq_uv(XPC_ACTIVATE_MQ_SIZE_UV, 0, 0,
1417                                                   xpc_handle_activate_IRQ_uv);
1418         if (xpc_activate_mq_uv == NULL)
1419                 return -ENOMEM;
1420
1421         /* ??? The cpuid argument's value is 0, is that what we want? */
1422         /* !!! The irq argument's value isn't correct. */
1423         xpc_notify_mq_uv = xpc_create_gru_mq_uv(XPC_NOTIFY_MQ_SIZE_UV, 0, 0,
1424                                                 xpc_handle_notify_IRQ_uv);
1425         if (xpc_notify_mq_uv == NULL) {
1426                 /* !!! The irq argument's value isn't correct. */
1427                 xpc_destroy_gru_mq_uv(xpc_activate_mq_uv,
1428                                       XPC_ACTIVATE_MQ_SIZE_UV, 0);
1429                 return -ENOMEM;
1430         }
1431
1432         return 0;
1433 }
1434
1435 void
1436 xpc_exit_uv(void)
1437 {
1438         /* !!! The irq argument's value isn't correct. */
1439         xpc_destroy_gru_mq_uv(xpc_notify_mq_uv, XPC_NOTIFY_MQ_SIZE_UV, 0);
1440
1441         /* !!! The irq argument's value isn't correct. */
1442         xpc_destroy_gru_mq_uv(xpc_activate_mq_uv, XPC_ACTIVATE_MQ_SIZE_UV, 0);
1443 }