[GFS2] Fix ref count bug that used to bite us on umount
[linux-2.6] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58
59 #include "dlm_internal.h"
60 #include "memory.h"
61 #include "lowcomms.h"
62 #include "requestqueue.h"
63 #include "util.h"
64 #include "dir.h"
65 #include "member.h"
66 #include "lockspace.h"
67 #include "ast.h"
68 #include "lock.h"
69 #include "rcom.h"
70 #include "recover.h"
71 #include "lvb_table.h"
72 #include "config.h"
73
74 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
75 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
76 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
80 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_remove(struct dlm_rsb *r);
82 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
84                                     struct dlm_message *ms);
85 static int receive_extralen(struct dlm_message *ms);
86
87 /*
88  * Lock compatibilty matrix - thanks Steve
89  * UN = Unlocked state. Not really a state, used as a flag
90  * PD = Padding. Used to make the matrix a nice power of two in size
91  * Other states are the same as the VMS DLM.
92  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
93  */
94
95 static const int __dlm_compat_matrix[8][8] = {
96       /* UN NL CR CW PR PW EX PD */
97         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
98         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
99         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
100         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
101         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
102         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
103         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
104         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
105 };
106
107 /*
108  * This defines the direction of transfer of LVB data.
109  * Granted mode is the row; requested mode is the column.
110  * Usage: matrix[grmode+1][rqmode+1]
111  * 1 = LVB is returned to the caller
112  * 0 = LVB is written to the resource
113  * -1 = nothing happens to the LVB
114  */
115
116 const int dlm_lvb_operations[8][8] = {
117         /* UN   NL  CR  CW  PR  PW  EX  PD*/
118         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
119         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
120         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
121         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
122         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
123         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
124         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
125         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
126 };
127 EXPORT_SYMBOL_GPL(dlm_lvb_operations);
128
129 #define modes_compat(gr, rq) \
130         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
131
132 int dlm_modes_compat(int mode1, int mode2)
133 {
134         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
135 }
136
137 /*
138  * Compatibility matrix for conversions with QUECVT set.
139  * Granted mode is the row; requested mode is the column.
140  * Usage: matrix[grmode+1][rqmode+1]
141  */
142
143 static const int __quecvt_compat_matrix[8][8] = {
144       /* UN NL CR CW PR PW EX PD */
145         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
146         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
147         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
148         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
149         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
150         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
152         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
153 };
154
155 static void dlm_print_lkb(struct dlm_lkb *lkb)
156 {
157         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
158                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
159                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
160                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
161                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
162 }
163
164 void dlm_print_rsb(struct dlm_rsb *r)
165 {
166         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
167                r->res_nodeid, r->res_flags, r->res_first_lkid,
168                r->res_recover_locks_count, r->res_name);
169 }
170
171 /* Threads cannot use the lockspace while it's being recovered */
172
173 static inline void lock_recovery(struct dlm_ls *ls)
174 {
175         down_read(&ls->ls_in_recovery);
176 }
177
178 static inline void unlock_recovery(struct dlm_ls *ls)
179 {
180         up_read(&ls->ls_in_recovery);
181 }
182
183 static inline int lock_recovery_try(struct dlm_ls *ls)
184 {
185         return down_read_trylock(&ls->ls_in_recovery);
186 }
187
188 static inline int can_be_queued(struct dlm_lkb *lkb)
189 {
190         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
191 }
192
193 static inline int force_blocking_asts(struct dlm_lkb *lkb)
194 {
195         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
196 }
197
198 static inline int is_demoted(struct dlm_lkb *lkb)
199 {
200         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
201 }
202
203 static inline int is_remote(struct dlm_rsb *r)
204 {
205         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
206         return !!r->res_nodeid;
207 }
208
209 static inline int is_process_copy(struct dlm_lkb *lkb)
210 {
211         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
212 }
213
214 static inline int is_master_copy(struct dlm_lkb *lkb)
215 {
216         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
217                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
218         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
219 }
220
221 static inline int middle_conversion(struct dlm_lkb *lkb)
222 {
223         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
224             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
225                 return 1;
226         return 0;
227 }
228
229 static inline int down_conversion(struct dlm_lkb *lkb)
230 {
231         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
232 }
233
234 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
235 {
236         if (is_master_copy(lkb))
237                 return;
238
239         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
240
241         lkb->lkb_lksb->sb_status = rv;
242         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
243
244         dlm_add_ast(lkb, AST_COMP);
245 }
246
247 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
248 {
249         if (is_master_copy(lkb))
250                 send_bast(r, lkb, rqmode);
251         else {
252                 lkb->lkb_bastmode = rqmode;
253                 dlm_add_ast(lkb, AST_BAST);
254         }
255 }
256
257 /*
258  * Basic operations on rsb's and lkb's
259  */
260
261 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
262 {
263         struct dlm_rsb *r;
264
265         r = allocate_rsb(ls, len);
266         if (!r)
267                 return NULL;
268
269         r->res_ls = ls;
270         r->res_length = len;
271         memcpy(r->res_name, name, len);
272         mutex_init(&r->res_mutex);
273
274         INIT_LIST_HEAD(&r->res_lookup);
275         INIT_LIST_HEAD(&r->res_grantqueue);
276         INIT_LIST_HEAD(&r->res_convertqueue);
277         INIT_LIST_HEAD(&r->res_waitqueue);
278         INIT_LIST_HEAD(&r->res_root_list);
279         INIT_LIST_HEAD(&r->res_recover_list);
280
281         return r;
282 }
283
284 static int search_rsb_list(struct list_head *head, char *name, int len,
285                            unsigned int flags, struct dlm_rsb **r_ret)
286 {
287         struct dlm_rsb *r;
288         int error = 0;
289
290         list_for_each_entry(r, head, res_hashchain) {
291                 if (len == r->res_length && !memcmp(name, r->res_name, len))
292                         goto found;
293         }
294         return -ENOENT;
295
296  found:
297         if (r->res_nodeid && (flags & R_MASTER))
298                 error = -ENOTBLK;
299         *r_ret = r;
300         return error;
301 }
302
303 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
304                        unsigned int flags, struct dlm_rsb **r_ret)
305 {
306         struct dlm_rsb *r;
307         int error;
308
309         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
310         if (!error) {
311                 kref_get(&r->res_ref);
312                 goto out;
313         }
314         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
315         if (error)
316                 goto out;
317
318         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
319
320         if (dlm_no_directory(ls))
321                 goto out;
322
323         if (r->res_nodeid == -1) {
324                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
325                 r->res_first_lkid = 0;
326         } else if (r->res_nodeid > 0) {
327                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
328                 r->res_first_lkid = 0;
329         } else {
330                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
331                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
332         }
333  out:
334         *r_ret = r;
335         return error;
336 }
337
338 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
339                       unsigned int flags, struct dlm_rsb **r_ret)
340 {
341         int error;
342         write_lock(&ls->ls_rsbtbl[b].lock);
343         error = _search_rsb(ls, name, len, b, flags, r_ret);
344         write_unlock(&ls->ls_rsbtbl[b].lock);
345         return error;
346 }
347
348 /*
349  * Find rsb in rsbtbl and potentially create/add one
350  *
351  * Delaying the release of rsb's has a similar benefit to applications keeping
352  * NL locks on an rsb, but without the guarantee that the cached master value
353  * will still be valid when the rsb is reused.  Apps aren't always smart enough
354  * to keep NL locks on an rsb that they may lock again shortly; this can lead
355  * to excessive master lookups and removals if we don't delay the release.
356  *
357  * Searching for an rsb means looking through both the normal list and toss
358  * list.  When found on the toss list the rsb is moved to the normal list with
359  * ref count of 1; when found on normal list the ref count is incremented.
360  */
361
362 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
363                     unsigned int flags, struct dlm_rsb **r_ret)
364 {
365         struct dlm_rsb *r, *tmp;
366         uint32_t hash, bucket;
367         int error = 0;
368
369         if (dlm_no_directory(ls))
370                 flags |= R_CREATE;
371
372         hash = jhash(name, namelen, 0);
373         bucket = hash & (ls->ls_rsbtbl_size - 1);
374
375         error = search_rsb(ls, name, namelen, bucket, flags, &r);
376         if (!error)
377                 goto out;
378
379         if (error == -ENOENT && !(flags & R_CREATE))
380                 goto out;
381
382         /* the rsb was found but wasn't a master copy */
383         if (error == -ENOTBLK)
384                 goto out;
385
386         error = -ENOMEM;
387         r = create_rsb(ls, name, namelen);
388         if (!r)
389                 goto out;
390
391         r->res_hash = hash;
392         r->res_bucket = bucket;
393         r->res_nodeid = -1;
394         kref_init(&r->res_ref);
395
396         /* With no directory, the master can be set immediately */
397         if (dlm_no_directory(ls)) {
398                 int nodeid = dlm_dir_nodeid(r);
399                 if (nodeid == dlm_our_nodeid())
400                         nodeid = 0;
401                 r->res_nodeid = nodeid;
402         }
403
404         write_lock(&ls->ls_rsbtbl[bucket].lock);
405         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
406         if (!error) {
407                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
408                 free_rsb(r);
409                 r = tmp;
410                 goto out;
411         }
412         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
413         write_unlock(&ls->ls_rsbtbl[bucket].lock);
414         error = 0;
415  out:
416         *r_ret = r;
417         return error;
418 }
419
420 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
421                  unsigned int flags, struct dlm_rsb **r_ret)
422 {
423         return find_rsb(ls, name, namelen, flags, r_ret);
424 }
425
426 /* This is only called to add a reference when the code already holds
427    a valid reference to the rsb, so there's no need for locking. */
428
429 static inline void hold_rsb(struct dlm_rsb *r)
430 {
431         kref_get(&r->res_ref);
432 }
433
434 void dlm_hold_rsb(struct dlm_rsb *r)
435 {
436         hold_rsb(r);
437 }
438
439 static void toss_rsb(struct kref *kref)
440 {
441         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
442         struct dlm_ls *ls = r->res_ls;
443
444         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
445         kref_init(&r->res_ref);
446         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
447         r->res_toss_time = jiffies;
448         if (r->res_lvbptr) {
449                 free_lvb(r->res_lvbptr);
450                 r->res_lvbptr = NULL;
451         }
452 }
453
454 /* When all references to the rsb are gone it's transfered to
455    the tossed list for later disposal. */
456
457 static void put_rsb(struct dlm_rsb *r)
458 {
459         struct dlm_ls *ls = r->res_ls;
460         uint32_t bucket = r->res_bucket;
461
462         write_lock(&ls->ls_rsbtbl[bucket].lock);
463         kref_put(&r->res_ref, toss_rsb);
464         write_unlock(&ls->ls_rsbtbl[bucket].lock);
465 }
466
467 void dlm_put_rsb(struct dlm_rsb *r)
468 {
469         put_rsb(r);
470 }
471
472 /* See comment for unhold_lkb */
473
474 static void unhold_rsb(struct dlm_rsb *r)
475 {
476         int rv;
477         rv = kref_put(&r->res_ref, toss_rsb);
478         DLM_ASSERT(!rv, dlm_print_rsb(r););
479 }
480
481 static void kill_rsb(struct kref *kref)
482 {
483         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
484
485         /* All work is done after the return from kref_put() so we
486            can release the write_lock before the remove and free. */
487
488         DLM_ASSERT(list_empty(&r->res_lookup),);
489         DLM_ASSERT(list_empty(&r->res_grantqueue),);
490         DLM_ASSERT(list_empty(&r->res_convertqueue),);
491         DLM_ASSERT(list_empty(&r->res_waitqueue),);
492         DLM_ASSERT(list_empty(&r->res_root_list),);
493         DLM_ASSERT(list_empty(&r->res_recover_list),);
494 }
495
496 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
497    The rsb must exist as long as any lkb's for it do. */
498
499 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
500 {
501         hold_rsb(r);
502         lkb->lkb_resource = r;
503 }
504
505 static void detach_lkb(struct dlm_lkb *lkb)
506 {
507         if (lkb->lkb_resource) {
508                 put_rsb(lkb->lkb_resource);
509                 lkb->lkb_resource = NULL;
510         }
511 }
512
513 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
514 {
515         struct dlm_lkb *lkb, *tmp;
516         uint32_t lkid = 0;
517         uint16_t bucket;
518
519         lkb = allocate_lkb(ls);
520         if (!lkb)
521                 return -ENOMEM;
522
523         lkb->lkb_nodeid = -1;
524         lkb->lkb_grmode = DLM_LOCK_IV;
525         kref_init(&lkb->lkb_ref);
526
527         get_random_bytes(&bucket, sizeof(bucket));
528         bucket &= (ls->ls_lkbtbl_size - 1);
529
530         write_lock(&ls->ls_lkbtbl[bucket].lock);
531
532         /* counter can roll over so we must verify lkid is not in use */
533
534         while (lkid == 0) {
535                 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
536
537                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
538                                     lkb_idtbl_list) {
539                         if (tmp->lkb_id != lkid)
540                                 continue;
541                         lkid = 0;
542                         break;
543                 }
544         }
545
546         lkb->lkb_id = lkid;
547         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
548         write_unlock(&ls->ls_lkbtbl[bucket].lock);
549
550         *lkb_ret = lkb;
551         return 0;
552 }
553
554 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
555 {
556         uint16_t bucket = lkid & 0xFFFF;
557         struct dlm_lkb *lkb;
558
559         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
560                 if (lkb->lkb_id == lkid)
561                         return lkb;
562         }
563         return NULL;
564 }
565
566 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
567 {
568         struct dlm_lkb *lkb;
569         uint16_t bucket = lkid & 0xFFFF;
570
571         if (bucket >= ls->ls_lkbtbl_size)
572                 return -EBADSLT;
573
574         read_lock(&ls->ls_lkbtbl[bucket].lock);
575         lkb = __find_lkb(ls, lkid);
576         if (lkb)
577                 kref_get(&lkb->lkb_ref);
578         read_unlock(&ls->ls_lkbtbl[bucket].lock);
579
580         *lkb_ret = lkb;
581         return lkb ? 0 : -ENOENT;
582 }
583
584 static void kill_lkb(struct kref *kref)
585 {
586         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
587
588         /* All work is done after the return from kref_put() so we
589            can release the write_lock before the detach_lkb */
590
591         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
592 }
593
594 /* __put_lkb() is used when an lkb may not have an rsb attached to
595    it so we need to provide the lockspace explicitly */
596
597 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
598 {
599         uint16_t bucket = lkb->lkb_id & 0xFFFF;
600
601         write_lock(&ls->ls_lkbtbl[bucket].lock);
602         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
603                 list_del(&lkb->lkb_idtbl_list);
604                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
605
606                 detach_lkb(lkb);
607
608                 /* for local/process lkbs, lvbptr points to caller's lksb */
609                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
610                         free_lvb(lkb->lkb_lvbptr);
611                 free_lkb(lkb);
612                 return 1;
613         } else {
614                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
615                 return 0;
616         }
617 }
618
619 int dlm_put_lkb(struct dlm_lkb *lkb)
620 {
621         struct dlm_ls *ls;
622
623         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
624         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
625
626         ls = lkb->lkb_resource->res_ls;
627         return __put_lkb(ls, lkb);
628 }
629
630 /* This is only called to add a reference when the code already holds
631    a valid reference to the lkb, so there's no need for locking. */
632
633 static inline void hold_lkb(struct dlm_lkb *lkb)
634 {
635         kref_get(&lkb->lkb_ref);
636 }
637
638 /* This is called when we need to remove a reference and are certain
639    it's not the last ref.  e.g. del_lkb is always called between a
640    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
641    put_lkb would work fine, but would involve unnecessary locking */
642
643 static inline void unhold_lkb(struct dlm_lkb *lkb)
644 {
645         int rv;
646         rv = kref_put(&lkb->lkb_ref, kill_lkb);
647         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
648 }
649
650 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
651                             int mode)
652 {
653         struct dlm_lkb *lkb = NULL;
654
655         list_for_each_entry(lkb, head, lkb_statequeue)
656                 if (lkb->lkb_rqmode < mode)
657                         break;
658
659         if (!lkb)
660                 list_add_tail(new, head);
661         else
662                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
663 }
664
665 /* add/remove lkb to rsb's grant/convert/wait queue */
666
667 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
668 {
669         kref_get(&lkb->lkb_ref);
670
671         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
672
673         lkb->lkb_status = status;
674
675         switch (status) {
676         case DLM_LKSTS_WAITING:
677                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
678                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
679                 else
680                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
681                 break;
682         case DLM_LKSTS_GRANTED:
683                 /* convention says granted locks kept in order of grmode */
684                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
685                                 lkb->lkb_grmode);
686                 break;
687         case DLM_LKSTS_CONVERT:
688                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
689                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
690                 else
691                         list_add_tail(&lkb->lkb_statequeue,
692                                       &r->res_convertqueue);
693                 break;
694         default:
695                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
696         }
697 }
698
699 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
700 {
701         lkb->lkb_status = 0;
702         list_del(&lkb->lkb_statequeue);
703         unhold_lkb(lkb);
704 }
705
706 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
707 {
708         hold_lkb(lkb);
709         del_lkb(r, lkb);
710         add_lkb(r, lkb, sts);
711         unhold_lkb(lkb);
712 }
713
714 /* add/remove lkb from global waiters list of lkb's waiting for
715    a reply from a remote node */
716
717 static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
718 {
719         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
720
721         mutex_lock(&ls->ls_waiters_mutex);
722         if (lkb->lkb_wait_type) {
723                 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
724                 goto out;
725         }
726         lkb->lkb_wait_type = mstype;
727         kref_get(&lkb->lkb_ref);
728         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
729  out:
730         mutex_unlock(&ls->ls_waiters_mutex);
731 }
732
733 static int _remove_from_waiters(struct dlm_lkb *lkb)
734 {
735         int error = 0;
736
737         if (!lkb->lkb_wait_type) {
738                 log_print("remove_from_waiters error");
739                 error = -EINVAL;
740                 goto out;
741         }
742         lkb->lkb_wait_type = 0;
743         list_del(&lkb->lkb_wait_reply);
744         unhold_lkb(lkb);
745  out:
746         return error;
747 }
748
749 static int remove_from_waiters(struct dlm_lkb *lkb)
750 {
751         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
752         int error;
753
754         mutex_lock(&ls->ls_waiters_mutex);
755         error = _remove_from_waiters(lkb);
756         mutex_unlock(&ls->ls_waiters_mutex);
757         return error;
758 }
759
760 static void dir_remove(struct dlm_rsb *r)
761 {
762         int to_nodeid;
763
764         if (dlm_no_directory(r->res_ls))
765                 return;
766
767         to_nodeid = dlm_dir_nodeid(r);
768         if (to_nodeid != dlm_our_nodeid())
769                 send_remove(r);
770         else
771                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
772                                      r->res_name, r->res_length);
773 }
774
775 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
776    found since they are in order of newest to oldest? */
777
778 static int shrink_bucket(struct dlm_ls *ls, int b)
779 {
780         struct dlm_rsb *r;
781         int count = 0, found;
782
783         for (;;) {
784                 found = 0;
785                 write_lock(&ls->ls_rsbtbl[b].lock);
786                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
787                                             res_hashchain) {
788                         if (!time_after_eq(jiffies, r->res_toss_time +
789                                            dlm_config.toss_secs * HZ))
790                                 continue;
791                         found = 1;
792                         break;
793                 }
794
795                 if (!found) {
796                         write_unlock(&ls->ls_rsbtbl[b].lock);
797                         break;
798                 }
799
800                 if (kref_put(&r->res_ref, kill_rsb)) {
801                         list_del(&r->res_hashchain);
802                         write_unlock(&ls->ls_rsbtbl[b].lock);
803
804                         if (is_master(r))
805                                 dir_remove(r);
806                         free_rsb(r);
807                         count++;
808                 } else {
809                         write_unlock(&ls->ls_rsbtbl[b].lock);
810                         log_error(ls, "tossed rsb in use %s", r->res_name);
811                 }
812         }
813
814         return count;
815 }
816
817 void dlm_scan_rsbs(struct dlm_ls *ls)
818 {
819         int i;
820
821         if (dlm_locking_stopped(ls))
822                 return;
823
824         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
825                 shrink_bucket(ls, i);
826                 cond_resched();
827         }
828 }
829
830 /* lkb is master or local copy */
831
832 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
833 {
834         int b, len = r->res_ls->ls_lvblen;
835
836         /* b=1 lvb returned to caller
837            b=0 lvb written to rsb or invalidated
838            b=-1 do nothing */
839
840         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
841
842         if (b == 1) {
843                 if (!lkb->lkb_lvbptr)
844                         return;
845
846                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
847                         return;
848
849                 if (!r->res_lvbptr)
850                         return;
851
852                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
853                 lkb->lkb_lvbseq = r->res_lvbseq;
854
855         } else if (b == 0) {
856                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
857                         rsb_set_flag(r, RSB_VALNOTVALID);
858                         return;
859                 }
860
861                 if (!lkb->lkb_lvbptr)
862                         return;
863
864                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
865                         return;
866
867                 if (!r->res_lvbptr)
868                         r->res_lvbptr = allocate_lvb(r->res_ls);
869
870                 if (!r->res_lvbptr)
871                         return;
872
873                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
874                 r->res_lvbseq++;
875                 lkb->lkb_lvbseq = r->res_lvbseq;
876                 rsb_clear_flag(r, RSB_VALNOTVALID);
877         }
878
879         if (rsb_flag(r, RSB_VALNOTVALID))
880                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
881 }
882
883 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
884 {
885         if (lkb->lkb_grmode < DLM_LOCK_PW)
886                 return;
887
888         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
889                 rsb_set_flag(r, RSB_VALNOTVALID);
890                 return;
891         }
892
893         if (!lkb->lkb_lvbptr)
894                 return;
895
896         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
897                 return;
898
899         if (!r->res_lvbptr)
900                 r->res_lvbptr = allocate_lvb(r->res_ls);
901
902         if (!r->res_lvbptr)
903                 return;
904
905         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
906         r->res_lvbseq++;
907         rsb_clear_flag(r, RSB_VALNOTVALID);
908 }
909
910 /* lkb is process copy (pc) */
911
912 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
913                             struct dlm_message *ms)
914 {
915         int b;
916
917         if (!lkb->lkb_lvbptr)
918                 return;
919
920         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
921                 return;
922
923         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
924         if (b == 1) {
925                 int len = receive_extralen(ms);
926                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
927                 lkb->lkb_lvbseq = ms->m_lvbseq;
928         }
929 }
930
931 /* Manipulate lkb's on rsb's convert/granted/waiting queues
932    remove_lock -- used for unlock, removes lkb from granted
933    revert_lock -- used for cancel, moves lkb from convert to granted
934    grant_lock  -- used for request and convert, adds lkb to granted or
935                   moves lkb from convert or waiting to granted
936
937    Each of these is used for master or local copy lkb's.  There is
938    also a _pc() variation used to make the corresponding change on
939    a process copy (pc) lkb. */
940
941 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
942 {
943         del_lkb(r, lkb);
944         lkb->lkb_grmode = DLM_LOCK_IV;
945         /* this unhold undoes the original ref from create_lkb()
946            so this leads to the lkb being freed */
947         unhold_lkb(lkb);
948 }
949
950 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
951 {
952         set_lvb_unlock(r, lkb);
953         _remove_lock(r, lkb);
954 }
955
956 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
957 {
958         _remove_lock(r, lkb);
959 }
960
961 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
962 {
963         lkb->lkb_rqmode = DLM_LOCK_IV;
964
965         switch (lkb->lkb_status) {
966         case DLM_LKSTS_CONVERT:
967                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
968                 break;
969         case DLM_LKSTS_WAITING:
970                 del_lkb(r, lkb);
971                 lkb->lkb_grmode = DLM_LOCK_IV;
972                 /* this unhold undoes the original ref from create_lkb()
973                    so this leads to the lkb being freed */
974                 unhold_lkb(lkb);
975                 break;
976         default:
977                 log_print("invalid status for revert %d", lkb->lkb_status);
978         }
979 }
980
981 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
982 {
983         revert_lock(r, lkb);
984 }
985
986 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
987 {
988         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
989                 lkb->lkb_grmode = lkb->lkb_rqmode;
990                 if (lkb->lkb_status)
991                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
992                 else
993                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
994         }
995
996         lkb->lkb_rqmode = DLM_LOCK_IV;
997 }
998
999 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1000 {
1001         set_lvb_lock(r, lkb);
1002         _grant_lock(r, lkb);
1003         lkb->lkb_highbast = 0;
1004 }
1005
1006 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1007                           struct dlm_message *ms)
1008 {
1009         set_lvb_lock_pc(r, lkb, ms);
1010         _grant_lock(r, lkb);
1011 }
1012
1013 /* called by grant_pending_locks() which means an async grant message must
1014    be sent to the requesting node in addition to granting the lock if the
1015    lkb belongs to a remote node. */
1016
1017 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1018 {
1019         grant_lock(r, lkb);
1020         if (is_master_copy(lkb))
1021                 send_grant(r, lkb);
1022         else
1023                 queue_cast(r, lkb, 0);
1024 }
1025
1026 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1027 {
1028         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1029                                            lkb_statequeue);
1030         if (lkb->lkb_id == first->lkb_id)
1031                 return 1;
1032
1033         return 0;
1034 }
1035
1036 /* Check if the given lkb conflicts with another lkb on the queue. */
1037
1038 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1039 {
1040         struct dlm_lkb *this;
1041
1042         list_for_each_entry(this, head, lkb_statequeue) {
1043                 if (this == lkb)
1044                         continue;
1045                 if (!modes_compat(this, lkb))
1046                         return 1;
1047         }
1048         return 0;
1049 }
1050
1051 /*
1052  * "A conversion deadlock arises with a pair of lock requests in the converting
1053  * queue for one resource.  The granted mode of each lock blocks the requested
1054  * mode of the other lock."
1055  *
1056  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1057  * convert queue from being granted, then demote lkb (set grmode to NL).
1058  * This second form requires that we check for conv-deadlk even when
1059  * now == 0 in _can_be_granted().
1060  *
1061  * Example:
1062  * Granted Queue: empty
1063  * Convert Queue: NL->EX (first lock)
1064  *                PR->EX (second lock)
1065  *
1066  * The first lock can't be granted because of the granted mode of the second
1067  * lock and the second lock can't be granted because it's not first in the
1068  * list.  We demote the granted mode of the second lock (the lkb passed to this
1069  * function).
1070  *
1071  * After the resolution, the "grant pending" function needs to go back and try
1072  * to grant locks on the convert queue again since the first lock can now be
1073  * granted.
1074  */
1075
1076 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1077 {
1078         struct dlm_lkb *this, *first = NULL, *self = NULL;
1079
1080         list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1081                 if (!first)
1082                         first = this;
1083                 if (this == lkb) {
1084                         self = lkb;
1085                         continue;
1086                 }
1087
1088                 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1089                         return 1;
1090         }
1091
1092         /* if lkb is on the convert queue and is preventing the first
1093            from being granted, then there's deadlock and we demote lkb.
1094            multiple converting locks may need to do this before the first
1095            converting lock can be granted. */
1096
1097         if (self && self != first) {
1098                 if (!modes_compat(lkb, first) &&
1099                     !queue_conflict(&rsb->res_grantqueue, first))
1100                         return 1;
1101         }
1102
1103         return 0;
1104 }
1105
1106 /*
1107  * Return 1 if the lock can be granted, 0 otherwise.
1108  * Also detect and resolve conversion deadlocks.
1109  *
1110  * lkb is the lock to be granted
1111  *
1112  * now is 1 if the function is being called in the context of the
1113  * immediate request, it is 0 if called later, after the lock has been
1114  * queued.
1115  *
1116  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1117  */
1118
1119 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1120 {
1121         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1122
1123         /*
1124          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1125          * a new request for a NL mode lock being blocked.
1126          *
1127          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1128          * request, then it would be granted.  In essence, the use of this flag
1129          * tells the Lock Manager to expedite theis request by not considering
1130          * what may be in the CONVERTING or WAITING queues...  As of this
1131          * writing, the EXPEDITE flag can be used only with new requests for NL
1132          * mode locks.  This flag is not valid for conversion requests.
1133          *
1134          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1135          * conversion or used with a non-NL requested mode.  We also know an
1136          * EXPEDITE request is always granted immediately, so now must always
1137          * be 1.  The full condition to grant an expedite request: (now &&
1138          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1139          * therefore be shortened to just checking the flag.
1140          */
1141
1142         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1143                 return 1;
1144
1145         /*
1146          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1147          * added to the remaining conditions.
1148          */
1149
1150         if (queue_conflict(&r->res_grantqueue, lkb))
1151                 goto out;
1152
1153         /*
1154          * 6-3: By default, a conversion request is immediately granted if the
1155          * requested mode is compatible with the modes of all other granted
1156          * locks
1157          */
1158
1159         if (queue_conflict(&r->res_convertqueue, lkb))
1160                 goto out;
1161
1162         /*
1163          * 6-5: But the default algorithm for deciding whether to grant or
1164          * queue conversion requests does not by itself guarantee that such
1165          * requests are serviced on a "first come first serve" basis.  This, in
1166          * turn, can lead to a phenomenon known as "indefinate postponement".
1167          *
1168          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1169          * the system service employed to request a lock conversion.  This flag
1170          * forces certain conversion requests to be queued, even if they are
1171          * compatible with the granted modes of other locks on the same
1172          * resource.  Thus, the use of this flag results in conversion requests
1173          * being ordered on a "first come first servce" basis.
1174          *
1175          * DCT: This condition is all about new conversions being able to occur
1176          * "in place" while the lock remains on the granted queue (assuming
1177          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1178          * doesn't _have_ to go onto the convert queue where it's processed in
1179          * order.  The "now" variable is necessary to distinguish converts
1180          * being received and processed for the first time now, because once a
1181          * convert is moved to the conversion queue the condition below applies
1182          * requiring fifo granting.
1183          */
1184
1185         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1186                 return 1;
1187
1188         /*
1189          * The NOORDER flag is set to avoid the standard vms rules on grant
1190          * order.
1191          */
1192
1193         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1194                 return 1;
1195
1196         /*
1197          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1198          * granted until all other conversion requests ahead of it are granted
1199          * and/or canceled.
1200          */
1201
1202         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1203                 return 1;
1204
1205         /*
1206          * 6-4: By default, a new request is immediately granted only if all
1207          * three of the following conditions are satisfied when the request is
1208          * issued:
1209          * - The queue of ungranted conversion requests for the resource is
1210          *   empty.
1211          * - The queue of ungranted new requests for the resource is empty.
1212          * - The mode of the new request is compatible with the most
1213          *   restrictive mode of all granted locks on the resource.
1214          */
1215
1216         if (now && !conv && list_empty(&r->res_convertqueue) &&
1217             list_empty(&r->res_waitqueue))
1218                 return 1;
1219
1220         /*
1221          * 6-4: Once a lock request is in the queue of ungranted new requests,
1222          * it cannot be granted until the queue of ungranted conversion
1223          * requests is empty, all ungranted new requests ahead of it are
1224          * granted and/or canceled, and it is compatible with the granted mode
1225          * of the most restrictive lock granted on the resource.
1226          */
1227
1228         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1229             first_in_list(lkb, &r->res_waitqueue))
1230                 return 1;
1231
1232  out:
1233         /*
1234          * The following, enabled by CONVDEADLK, departs from VMS.
1235          */
1236
1237         if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1238             conversion_deadlock_detect(r, lkb)) {
1239                 lkb->lkb_grmode = DLM_LOCK_NL;
1240                 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1241         }
1242
1243         return 0;
1244 }
1245
1246 /*
1247  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1248  * simple way to provide a big optimization to applications that can use them.
1249  */
1250
1251 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1252 {
1253         uint32_t flags = lkb->lkb_exflags;
1254         int rv;
1255         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1256
1257         rv = _can_be_granted(r, lkb, now);
1258         if (rv)
1259                 goto out;
1260
1261         if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1262                 goto out;
1263
1264         if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1265                 alt = DLM_LOCK_PR;
1266         else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1267                 alt = DLM_LOCK_CW;
1268
1269         if (alt) {
1270                 lkb->lkb_rqmode = alt;
1271                 rv = _can_be_granted(r, lkb, now);
1272                 if (rv)
1273                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1274                 else
1275                         lkb->lkb_rqmode = rqmode;
1276         }
1277  out:
1278         return rv;
1279 }
1280
1281 static int grant_pending_convert(struct dlm_rsb *r, int high)
1282 {
1283         struct dlm_lkb *lkb, *s;
1284         int hi, demoted, quit, grant_restart, demote_restart;
1285
1286         quit = 0;
1287  restart:
1288         grant_restart = 0;
1289         demote_restart = 0;
1290         hi = DLM_LOCK_IV;
1291
1292         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1293                 demoted = is_demoted(lkb);
1294                 if (can_be_granted(r, lkb, 0)) {
1295                         grant_lock_pending(r, lkb);
1296                         grant_restart = 1;
1297                 } else {
1298                         hi = max_t(int, lkb->lkb_rqmode, hi);
1299                         if (!demoted && is_demoted(lkb))
1300                                 demote_restart = 1;
1301                 }
1302         }
1303
1304         if (grant_restart)
1305                 goto restart;
1306         if (demote_restart && !quit) {
1307                 quit = 1;
1308                 goto restart;
1309         }
1310
1311         return max_t(int, high, hi);
1312 }
1313
1314 static int grant_pending_wait(struct dlm_rsb *r, int high)
1315 {
1316         struct dlm_lkb *lkb, *s;
1317
1318         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1319                 if (can_be_granted(r, lkb, 0))
1320                         grant_lock_pending(r, lkb);
1321                 else
1322                         high = max_t(int, lkb->lkb_rqmode, high);
1323         }
1324
1325         return high;
1326 }
1327
1328 static void grant_pending_locks(struct dlm_rsb *r)
1329 {
1330         struct dlm_lkb *lkb, *s;
1331         int high = DLM_LOCK_IV;
1332
1333         DLM_ASSERT(is_master(r), dlm_print_rsb(r););
1334
1335         high = grant_pending_convert(r, high);
1336         high = grant_pending_wait(r, high);
1337
1338         if (high == DLM_LOCK_IV)
1339                 return;
1340
1341         /*
1342          * If there are locks left on the wait/convert queue then send blocking
1343          * ASTs to granted locks based on the largest requested mode (high)
1344          * found above. FIXME: highbast < high comparison not valid for PR/CW.
1345          */
1346
1347         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1348                 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1349                     !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1350                         queue_bast(r, lkb, high);
1351                         lkb->lkb_highbast = high;
1352                 }
1353         }
1354 }
1355
1356 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1357                             struct dlm_lkb *lkb)
1358 {
1359         struct dlm_lkb *gr;
1360
1361         list_for_each_entry(gr, head, lkb_statequeue) {
1362                 if (gr->lkb_bastaddr &&
1363                     gr->lkb_highbast < lkb->lkb_rqmode &&
1364                     !modes_compat(gr, lkb)) {
1365                         queue_bast(r, gr, lkb->lkb_rqmode);
1366                         gr->lkb_highbast = lkb->lkb_rqmode;
1367                 }
1368         }
1369 }
1370
1371 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1372 {
1373         send_bast_queue(r, &r->res_grantqueue, lkb);
1374 }
1375
1376 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1377 {
1378         send_bast_queue(r, &r->res_grantqueue, lkb);
1379         send_bast_queue(r, &r->res_convertqueue, lkb);
1380 }
1381
1382 /* set_master(r, lkb) -- set the master nodeid of a resource
1383
1384    The purpose of this function is to set the nodeid field in the given
1385    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1386    known, it can just be copied to the lkb and the function will return
1387    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1388    before it can be copied to the lkb.
1389
1390    When the rsb nodeid is being looked up remotely, the initial lkb
1391    causing the lookup is kept on the ls_waiters list waiting for the
1392    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1393    on the rsb's res_lookup list until the master is verified.
1394
1395    Return values:
1396    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1397    1: the rsb master is not available and the lkb has been placed on
1398       a wait queue
1399 */
1400
1401 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1402 {
1403         struct dlm_ls *ls = r->res_ls;
1404         int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1405
1406         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1407                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1408                 r->res_first_lkid = lkb->lkb_id;
1409                 lkb->lkb_nodeid = r->res_nodeid;
1410                 return 0;
1411         }
1412
1413         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1414                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1415                 return 1;
1416         }
1417
1418         if (r->res_nodeid == 0) {
1419                 lkb->lkb_nodeid = 0;
1420                 return 0;
1421         }
1422
1423         if (r->res_nodeid > 0) {
1424                 lkb->lkb_nodeid = r->res_nodeid;
1425                 return 0;
1426         }
1427
1428         DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
1429
1430         dir_nodeid = dlm_dir_nodeid(r);
1431
1432         if (dir_nodeid != our_nodeid) {
1433                 r->res_first_lkid = lkb->lkb_id;
1434                 send_lookup(r, lkb);
1435                 return 1;
1436         }
1437
1438         for (;;) {
1439                 /* It's possible for dlm_scand to remove an old rsb for
1440                    this same resource from the toss list, us to create
1441                    a new one, look up the master locally, and find it
1442                    already exists just before dlm_scand does the
1443                    dir_remove() on the previous rsb. */
1444
1445                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1446                                        r->res_length, &ret_nodeid);
1447                 if (!error)
1448                         break;
1449                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1450                 schedule();
1451         }
1452
1453         if (ret_nodeid == our_nodeid) {
1454                 r->res_first_lkid = 0;
1455                 r->res_nodeid = 0;
1456                 lkb->lkb_nodeid = 0;
1457         } else {
1458                 r->res_first_lkid = lkb->lkb_id;
1459                 r->res_nodeid = ret_nodeid;
1460                 lkb->lkb_nodeid = ret_nodeid;
1461         }
1462         return 0;
1463 }
1464
1465 static void process_lookup_list(struct dlm_rsb *r)
1466 {
1467         struct dlm_lkb *lkb, *safe;
1468
1469         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1470                 list_del(&lkb->lkb_rsb_lookup);
1471                 _request_lock(r, lkb);
1472                 schedule();
1473         }
1474 }
1475
1476 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1477
1478 static void confirm_master(struct dlm_rsb *r, int error)
1479 {
1480         struct dlm_lkb *lkb;
1481
1482         if (!r->res_first_lkid)
1483                 return;
1484
1485         switch (error) {
1486         case 0:
1487         case -EINPROGRESS:
1488                 r->res_first_lkid = 0;
1489                 process_lookup_list(r);
1490                 break;
1491
1492         case -EAGAIN:
1493                 /* the remote master didn't queue our NOQUEUE request;
1494                    make a waiting lkb the first_lkid */
1495
1496                 r->res_first_lkid = 0;
1497
1498                 if (!list_empty(&r->res_lookup)) {
1499                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1500                                          lkb_rsb_lookup);
1501                         list_del(&lkb->lkb_rsb_lookup);
1502                         r->res_first_lkid = lkb->lkb_id;
1503                         _request_lock(r, lkb);
1504                 } else
1505                         r->res_nodeid = -1;
1506                 break;
1507
1508         default:
1509                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1510         }
1511 }
1512
1513 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1514                          int namelen, uint32_t parent_lkid, void *ast,
1515                          void *astarg, void *bast, struct dlm_args *args)
1516 {
1517         int rv = -EINVAL;
1518
1519         /* check for invalid arg usage */
1520
1521         if (mode < 0 || mode > DLM_LOCK_EX)
1522                 goto out;
1523
1524         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1525                 goto out;
1526
1527         if (flags & DLM_LKF_CANCEL)
1528                 goto out;
1529
1530         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1531                 goto out;
1532
1533         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1534                 goto out;
1535
1536         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1537                 goto out;
1538
1539         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1540                 goto out;
1541
1542         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1543                 goto out;
1544
1545         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1546                 goto out;
1547
1548         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1549                 goto out;
1550
1551         if (!ast || !lksb)
1552                 goto out;
1553
1554         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1555                 goto out;
1556
1557         /* parent/child locks not yet supported */
1558         if (parent_lkid)
1559                 goto out;
1560
1561         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1562                 goto out;
1563
1564         /* these args will be copied to the lkb in validate_lock_args,
1565            it cannot be done now because when converting locks, fields in
1566            an active lkb cannot be modified before locking the rsb */
1567
1568         args->flags = flags;
1569         args->astaddr = ast;
1570         args->astparam = (long) astarg;
1571         args->bastaddr = bast;
1572         args->mode = mode;
1573         args->lksb = lksb;
1574         rv = 0;
1575  out:
1576         return rv;
1577 }
1578
1579 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1580 {
1581         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1582                       DLM_LKF_FORCEUNLOCK))
1583                 return -EINVAL;
1584
1585         args->flags = flags;
1586         args->astparam = (long) astarg;
1587         return 0;
1588 }
1589
1590 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1591                               struct dlm_args *args)
1592 {
1593         int rv = -EINVAL;
1594
1595         if (args->flags & DLM_LKF_CONVERT) {
1596                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1597                         goto out;
1598
1599                 if (args->flags & DLM_LKF_QUECVT &&
1600                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1601                         goto out;
1602
1603                 rv = -EBUSY;
1604                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1605                         goto out;
1606
1607                 if (lkb->lkb_wait_type)
1608                         goto out;
1609         }
1610
1611         lkb->lkb_exflags = args->flags;
1612         lkb->lkb_sbflags = 0;
1613         lkb->lkb_astaddr = args->astaddr;
1614         lkb->lkb_astparam = args->astparam;
1615         lkb->lkb_bastaddr = args->bastaddr;
1616         lkb->lkb_rqmode = args->mode;
1617         lkb->lkb_lksb = args->lksb;
1618         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1619         lkb->lkb_ownpid = (int) current->pid;
1620         rv = 0;
1621  out:
1622         return rv;
1623 }
1624
1625 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1626 {
1627         int rv = -EINVAL;
1628
1629         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1630                 goto out;
1631
1632         if (args->flags & DLM_LKF_FORCEUNLOCK)
1633                 goto out_ok;
1634
1635         if (args->flags & DLM_LKF_CANCEL &&
1636             lkb->lkb_status == DLM_LKSTS_GRANTED)
1637                 goto out;
1638
1639         if (!(args->flags & DLM_LKF_CANCEL) &&
1640             lkb->lkb_status != DLM_LKSTS_GRANTED)
1641                 goto out;
1642
1643         rv = -EBUSY;
1644         if (lkb->lkb_wait_type)
1645                 goto out;
1646
1647  out_ok:
1648         lkb->lkb_exflags = args->flags;
1649         lkb->lkb_sbflags = 0;
1650         lkb->lkb_astparam = args->astparam;
1651
1652         rv = 0;
1653  out:
1654         return rv;
1655 }
1656
1657 /*
1658  * Four stage 4 varieties:
1659  * do_request(), do_convert(), do_unlock(), do_cancel()
1660  * These are called on the master node for the given lock and
1661  * from the central locking logic.
1662  */
1663
1664 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1665 {
1666         int error = 0;
1667
1668         if (can_be_granted(r, lkb, 1)) {
1669                 grant_lock(r, lkb);
1670                 queue_cast(r, lkb, 0);
1671                 goto out;
1672         }
1673
1674         if (can_be_queued(lkb)) {
1675                 error = -EINPROGRESS;
1676                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1677                 send_blocking_asts(r, lkb);
1678                 goto out;
1679         }
1680
1681         error = -EAGAIN;
1682         if (force_blocking_asts(lkb))
1683                 send_blocking_asts_all(r, lkb);
1684         queue_cast(r, lkb, -EAGAIN);
1685
1686  out:
1687         return error;
1688 }
1689
1690 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1691 {
1692         int error = 0;
1693
1694         /* changing an existing lock may allow others to be granted */
1695
1696         if (can_be_granted(r, lkb, 1)) {
1697                 grant_lock(r, lkb);
1698                 queue_cast(r, lkb, 0);
1699                 grant_pending_locks(r);
1700                 goto out;
1701         }
1702
1703         if (can_be_queued(lkb)) {
1704                 if (is_demoted(lkb))
1705                         grant_pending_locks(r);
1706                 error = -EINPROGRESS;
1707                 del_lkb(r, lkb);
1708                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1709                 send_blocking_asts(r, lkb);
1710                 goto out;
1711         }
1712
1713         error = -EAGAIN;
1714         if (force_blocking_asts(lkb))
1715                 send_blocking_asts_all(r, lkb);
1716         queue_cast(r, lkb, -EAGAIN);
1717
1718  out:
1719         return error;
1720 }
1721
1722 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1723 {
1724         remove_lock(r, lkb);
1725         queue_cast(r, lkb, -DLM_EUNLOCK);
1726         grant_pending_locks(r);
1727         return -DLM_EUNLOCK;
1728 }
1729
1730 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1731 {
1732         revert_lock(r, lkb);
1733         queue_cast(r, lkb, -DLM_ECANCEL);
1734         grant_pending_locks(r);
1735         return -DLM_ECANCEL;
1736 }
1737
1738 /*
1739  * Four stage 3 varieties:
1740  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1741  */
1742
1743 /* add a new lkb to a possibly new rsb, called by requesting process */
1744
1745 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1746 {
1747         int error;
1748
1749         /* set_master: sets lkb nodeid from r */
1750
1751         error = set_master(r, lkb);
1752         if (error < 0)
1753                 goto out;
1754         if (error) {
1755                 error = 0;
1756                 goto out;
1757         }
1758
1759         if (is_remote(r))
1760                 /* receive_request() calls do_request() on remote node */
1761                 error = send_request(r, lkb);
1762         else
1763                 error = do_request(r, lkb);
1764  out:
1765         return error;
1766 }
1767
1768 /* change some property of an existing lkb, e.g. mode */
1769
1770 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1771 {
1772         int error;
1773
1774         if (is_remote(r))
1775                 /* receive_convert() calls do_convert() on remote node */
1776                 error = send_convert(r, lkb);
1777         else
1778                 error = do_convert(r, lkb);
1779
1780         return error;
1781 }
1782
1783 /* remove an existing lkb from the granted queue */
1784
1785 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1786 {
1787         int error;
1788
1789         if (is_remote(r))
1790                 /* receive_unlock() calls do_unlock() on remote node */
1791                 error = send_unlock(r, lkb);
1792         else
1793                 error = do_unlock(r, lkb);
1794
1795         return error;
1796 }
1797
1798 /* remove an existing lkb from the convert or wait queue */
1799
1800 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1801 {
1802         int error;
1803
1804         if (is_remote(r))
1805                 /* receive_cancel() calls do_cancel() on remote node */
1806                 error = send_cancel(r, lkb);
1807         else
1808                 error = do_cancel(r, lkb);
1809
1810         return error;
1811 }
1812
1813 /*
1814  * Four stage 2 varieties:
1815  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1816  */
1817
1818 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1819                         int len, struct dlm_args *args)
1820 {
1821         struct dlm_rsb *r;
1822         int error;
1823
1824         error = validate_lock_args(ls, lkb, args);
1825         if (error)
1826                 goto out;
1827
1828         error = find_rsb(ls, name, len, R_CREATE, &r);
1829         if (error)
1830                 goto out;
1831
1832         lock_rsb(r);
1833
1834         attach_lkb(r, lkb);
1835         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1836
1837         error = _request_lock(r, lkb);
1838
1839         unlock_rsb(r);
1840         put_rsb(r);
1841
1842  out:
1843         return error;
1844 }
1845
1846 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1847                         struct dlm_args *args)
1848 {
1849         struct dlm_rsb *r;
1850         int error;
1851
1852         r = lkb->lkb_resource;
1853
1854         hold_rsb(r);
1855         lock_rsb(r);
1856
1857         error = validate_lock_args(ls, lkb, args);
1858         if (error)
1859                 goto out;
1860
1861         error = _convert_lock(r, lkb);
1862  out:
1863         unlock_rsb(r);
1864         put_rsb(r);
1865         return error;
1866 }
1867
1868 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1869                        struct dlm_args *args)
1870 {
1871         struct dlm_rsb *r;
1872         int error;
1873
1874         r = lkb->lkb_resource;
1875
1876         hold_rsb(r);
1877         lock_rsb(r);
1878
1879         error = validate_unlock_args(lkb, args);
1880         if (error)
1881                 goto out;
1882
1883         error = _unlock_lock(r, lkb);
1884  out:
1885         unlock_rsb(r);
1886         put_rsb(r);
1887         return error;
1888 }
1889
1890 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1891                        struct dlm_args *args)
1892 {
1893         struct dlm_rsb *r;
1894         int error;
1895
1896         r = lkb->lkb_resource;
1897
1898         hold_rsb(r);
1899         lock_rsb(r);
1900
1901         error = validate_unlock_args(lkb, args);
1902         if (error)
1903                 goto out;
1904
1905         error = _cancel_lock(r, lkb);
1906  out:
1907         unlock_rsb(r);
1908         put_rsb(r);
1909         return error;
1910 }
1911
1912 /*
1913  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
1914  */
1915
1916 int dlm_lock(dlm_lockspace_t *lockspace,
1917              int mode,
1918              struct dlm_lksb *lksb,
1919              uint32_t flags,
1920              void *name,
1921              unsigned int namelen,
1922              uint32_t parent_lkid,
1923              void (*ast) (void *astarg),
1924              void *astarg,
1925              void (*bast) (void *astarg, int mode))
1926 {
1927         struct dlm_ls *ls;
1928         struct dlm_lkb *lkb;
1929         struct dlm_args args;
1930         int error, convert = flags & DLM_LKF_CONVERT;
1931
1932         ls = dlm_find_lockspace_local(lockspace);
1933         if (!ls)
1934                 return -EINVAL;
1935
1936         lock_recovery(ls);
1937
1938         if (convert)
1939                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1940         else
1941                 error = create_lkb(ls, &lkb);
1942
1943         if (error)
1944                 goto out;
1945
1946         error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1947                               astarg, bast, &args);
1948         if (error)
1949                 goto out_put;
1950
1951         if (convert)
1952                 error = convert_lock(ls, lkb, &args);
1953         else
1954                 error = request_lock(ls, lkb, name, namelen, &args);
1955
1956         if (error == -EINPROGRESS)
1957                 error = 0;
1958  out_put:
1959         if (convert || error)
1960                 __put_lkb(ls, lkb);
1961         if (error == -EAGAIN)
1962                 error = 0;
1963  out:
1964         unlock_recovery(ls);
1965         dlm_put_lockspace(ls);
1966         return error;
1967 }
1968
1969 int dlm_unlock(dlm_lockspace_t *lockspace,
1970                uint32_t lkid,
1971                uint32_t flags,
1972                struct dlm_lksb *lksb,
1973                void *astarg)
1974 {
1975         struct dlm_ls *ls;
1976         struct dlm_lkb *lkb;
1977         struct dlm_args args;
1978         int error;
1979
1980         ls = dlm_find_lockspace_local(lockspace);
1981         if (!ls)
1982                 return -EINVAL;
1983
1984         lock_recovery(ls);
1985
1986         error = find_lkb(ls, lkid, &lkb);
1987         if (error)
1988                 goto out;
1989
1990         error = set_unlock_args(flags, astarg, &args);
1991         if (error)
1992                 goto out_put;
1993
1994         if (flags & DLM_LKF_CANCEL)
1995                 error = cancel_lock(ls, lkb, &args);
1996         else
1997                 error = unlock_lock(ls, lkb, &args);
1998
1999         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2000                 error = 0;
2001  out_put:
2002         dlm_put_lkb(lkb);
2003  out:
2004         unlock_recovery(ls);
2005         dlm_put_lockspace(ls);
2006         return error;
2007 }
2008
2009 /*
2010  * send/receive routines for remote operations and replies
2011  *
2012  * send_args
2013  * send_common
2014  * send_request                 receive_request
2015  * send_convert                 receive_convert
2016  * send_unlock                  receive_unlock
2017  * send_cancel                  receive_cancel
2018  * send_grant                   receive_grant
2019  * send_bast                    receive_bast
2020  * send_lookup                  receive_lookup
2021  * send_remove                  receive_remove
2022  *
2023  *                              send_common_reply
2024  * receive_request_reply        send_request_reply
2025  * receive_convert_reply        send_convert_reply
2026  * receive_unlock_reply         send_unlock_reply
2027  * receive_cancel_reply         send_cancel_reply
2028  * receive_lookup_reply         send_lookup_reply
2029  */
2030
2031 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2032                           int to_nodeid, int mstype,
2033                           struct dlm_message **ms_ret,
2034                           struct dlm_mhandle **mh_ret)
2035 {
2036         struct dlm_message *ms;
2037         struct dlm_mhandle *mh;
2038         char *mb;
2039         int mb_len = sizeof(struct dlm_message);
2040
2041         switch (mstype) {
2042         case DLM_MSG_REQUEST:
2043         case DLM_MSG_LOOKUP:
2044         case DLM_MSG_REMOVE:
2045                 mb_len += r->res_length;
2046                 break;
2047         case DLM_MSG_CONVERT:
2048         case DLM_MSG_UNLOCK:
2049         case DLM_MSG_REQUEST_REPLY:
2050         case DLM_MSG_CONVERT_REPLY:
2051         case DLM_MSG_GRANT:
2052                 if (lkb && lkb->lkb_lvbptr)
2053                         mb_len += r->res_ls->ls_lvblen;
2054                 break;
2055         }
2056
2057         /* get_buffer gives us a message handle (mh) that we need to
2058            pass into lowcomms_commit and a message buffer (mb) that we
2059            write our data into */
2060
2061         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2062         if (!mh)
2063                 return -ENOBUFS;
2064
2065         memset(mb, 0, mb_len);
2066
2067         ms = (struct dlm_message *) mb;
2068
2069         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2070         ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2071         ms->m_header.h_nodeid = dlm_our_nodeid();
2072         ms->m_header.h_length = mb_len;
2073         ms->m_header.h_cmd = DLM_MSG;
2074
2075         ms->m_type = mstype;
2076
2077         *mh_ret = mh;
2078         *ms_ret = ms;
2079         return 0;
2080 }
2081
2082 /* further lowcomms enhancements or alternate implementations may make
2083    the return value from this function useful at some point */
2084
2085 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2086 {
2087         dlm_message_out(ms);
2088         dlm_lowcomms_commit_buffer(mh);
2089         return 0;
2090 }
2091
2092 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2093                       struct dlm_message *ms)
2094 {
2095         ms->m_nodeid   = lkb->lkb_nodeid;
2096         ms->m_pid      = lkb->lkb_ownpid;
2097         ms->m_lkid     = lkb->lkb_id;
2098         ms->m_remid    = lkb->lkb_remid;
2099         ms->m_exflags  = lkb->lkb_exflags;
2100         ms->m_sbflags  = lkb->lkb_sbflags;
2101         ms->m_flags    = lkb->lkb_flags;
2102         ms->m_lvbseq   = lkb->lkb_lvbseq;
2103         ms->m_status   = lkb->lkb_status;
2104         ms->m_grmode   = lkb->lkb_grmode;
2105         ms->m_rqmode   = lkb->lkb_rqmode;
2106         ms->m_hash     = r->res_hash;
2107
2108         /* m_result and m_bastmode are set from function args,
2109            not from lkb fields */
2110
2111         if (lkb->lkb_bastaddr)
2112                 ms->m_asts |= AST_BAST;
2113         if (lkb->lkb_astaddr)
2114                 ms->m_asts |= AST_COMP;
2115
2116         if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2117                 memcpy(ms->m_extra, r->res_name, r->res_length);
2118
2119         else if (lkb->lkb_lvbptr)
2120                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2121
2122 }
2123
2124 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2125 {
2126         struct dlm_message *ms;
2127         struct dlm_mhandle *mh;
2128         int to_nodeid, error;
2129
2130         add_to_waiters(lkb, mstype);
2131
2132         to_nodeid = r->res_nodeid;
2133
2134         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2135         if (error)
2136                 goto fail;
2137
2138         send_args(r, lkb, ms);
2139
2140         error = send_message(mh, ms);
2141         if (error)
2142                 goto fail;
2143         return 0;
2144
2145  fail:
2146         remove_from_waiters(lkb);
2147         return error;
2148 }
2149
2150 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2151 {
2152         return send_common(r, lkb, DLM_MSG_REQUEST);
2153 }
2154
2155 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2156 {
2157         int error;
2158
2159         error = send_common(r, lkb, DLM_MSG_CONVERT);
2160
2161         /* down conversions go without a reply from the master */
2162         if (!error && down_conversion(lkb)) {
2163                 remove_from_waiters(lkb);
2164                 r->res_ls->ls_stub_ms.m_result = 0;
2165                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2166         }
2167
2168         return error;
2169 }
2170
2171 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2172    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2173    that the master is still correct. */
2174
2175 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2176 {
2177         return send_common(r, lkb, DLM_MSG_UNLOCK);
2178 }
2179
2180 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2181 {
2182         return send_common(r, lkb, DLM_MSG_CANCEL);
2183 }
2184
2185 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2186 {
2187         struct dlm_message *ms;
2188         struct dlm_mhandle *mh;
2189         int to_nodeid, error;
2190
2191         to_nodeid = lkb->lkb_nodeid;
2192
2193         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2194         if (error)
2195                 goto out;
2196
2197         send_args(r, lkb, ms);
2198
2199         ms->m_result = 0;
2200
2201         error = send_message(mh, ms);
2202  out:
2203         return error;
2204 }
2205
2206 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2207 {
2208         struct dlm_message *ms;
2209         struct dlm_mhandle *mh;
2210         int to_nodeid, error;
2211
2212         to_nodeid = lkb->lkb_nodeid;
2213
2214         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2215         if (error)
2216                 goto out;
2217
2218         send_args(r, lkb, ms);
2219
2220         ms->m_bastmode = mode;
2221
2222         error = send_message(mh, ms);
2223  out:
2224         return error;
2225 }
2226
2227 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2228 {
2229         struct dlm_message *ms;
2230         struct dlm_mhandle *mh;
2231         int to_nodeid, error;
2232
2233         add_to_waiters(lkb, DLM_MSG_LOOKUP);
2234
2235         to_nodeid = dlm_dir_nodeid(r);
2236
2237         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2238         if (error)
2239                 goto fail;
2240
2241         send_args(r, lkb, ms);
2242
2243         error = send_message(mh, ms);
2244         if (error)
2245                 goto fail;
2246         return 0;
2247
2248  fail:
2249         remove_from_waiters(lkb);
2250         return error;
2251 }
2252
2253 static int send_remove(struct dlm_rsb *r)
2254 {
2255         struct dlm_message *ms;
2256         struct dlm_mhandle *mh;
2257         int to_nodeid, error;
2258
2259         to_nodeid = dlm_dir_nodeid(r);
2260
2261         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2262         if (error)
2263                 goto out;
2264
2265         memcpy(ms->m_extra, r->res_name, r->res_length);
2266         ms->m_hash = r->res_hash;
2267
2268         error = send_message(mh, ms);
2269  out:
2270         return error;
2271 }
2272
2273 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2274                              int mstype, int rv)
2275 {
2276         struct dlm_message *ms;
2277         struct dlm_mhandle *mh;
2278         int to_nodeid, error;
2279
2280         to_nodeid = lkb->lkb_nodeid;
2281
2282         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2283         if (error)
2284                 goto out;
2285
2286         send_args(r, lkb, ms);
2287
2288         ms->m_result = rv;
2289
2290         error = send_message(mh, ms);
2291  out:
2292         return error;
2293 }
2294
2295 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2296 {
2297         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2298 }
2299
2300 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2301 {
2302         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2303 }
2304
2305 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2306 {
2307         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2308 }
2309
2310 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2311 {
2312         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2313 }
2314
2315 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2316                              int ret_nodeid, int rv)
2317 {
2318         struct dlm_rsb *r = &ls->ls_stub_rsb;
2319         struct dlm_message *ms;
2320         struct dlm_mhandle *mh;
2321         int error, nodeid = ms_in->m_header.h_nodeid;
2322
2323         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2324         if (error)
2325                 goto out;
2326
2327         ms->m_lkid = ms_in->m_lkid;
2328         ms->m_result = rv;
2329         ms->m_nodeid = ret_nodeid;
2330
2331         error = send_message(mh, ms);
2332  out:
2333         return error;
2334 }
2335
2336 /* which args we save from a received message depends heavily on the type
2337    of message, unlike the send side where we can safely send everything about
2338    the lkb for any type of message */
2339
2340 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2341 {
2342         lkb->lkb_exflags = ms->m_exflags;
2343         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2344                          (ms->m_flags & 0x0000FFFF);
2345 }
2346
2347 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2348 {
2349         lkb->lkb_sbflags = ms->m_sbflags;
2350         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2351                          (ms->m_flags & 0x0000FFFF);
2352 }
2353
2354 static int receive_extralen(struct dlm_message *ms)
2355 {
2356         return (ms->m_header.h_length - sizeof(struct dlm_message));
2357 }
2358
2359 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2360                        struct dlm_message *ms)
2361 {
2362         int len;
2363
2364         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2365                 if (!lkb->lkb_lvbptr)
2366                         lkb->lkb_lvbptr = allocate_lvb(ls);
2367                 if (!lkb->lkb_lvbptr)
2368                         return -ENOMEM;
2369                 len = receive_extralen(ms);
2370                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2371         }
2372         return 0;
2373 }
2374
2375 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2376                                 struct dlm_message *ms)
2377 {
2378         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2379         lkb->lkb_ownpid = ms->m_pid;
2380         lkb->lkb_remid = ms->m_lkid;
2381         lkb->lkb_grmode = DLM_LOCK_IV;
2382         lkb->lkb_rqmode = ms->m_rqmode;
2383         lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2384         lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2385
2386         DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2387
2388         if (receive_lvb(ls, lkb, ms))
2389                 return -ENOMEM;
2390
2391         return 0;
2392 }
2393
2394 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2395                                 struct dlm_message *ms)
2396 {
2397         if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2398                 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2399                           lkb->lkb_nodeid, ms->m_header.h_nodeid,
2400                           lkb->lkb_id, lkb->lkb_remid);
2401                 return -EINVAL;
2402         }
2403
2404         if (!is_master_copy(lkb))
2405                 return -EINVAL;
2406
2407         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2408                 return -EBUSY;
2409
2410         if (receive_lvb(ls, lkb, ms))
2411                 return -ENOMEM;
2412
2413         lkb->lkb_rqmode = ms->m_rqmode;
2414         lkb->lkb_lvbseq = ms->m_lvbseq;
2415
2416         return 0;
2417 }
2418
2419 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2420                                struct dlm_message *ms)
2421 {
2422         if (!is_master_copy(lkb))
2423                 return -EINVAL;
2424         if (receive_lvb(ls, lkb, ms))
2425                 return -ENOMEM;
2426         return 0;
2427 }
2428
2429 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2430    uses to send a reply and that the remote end uses to process the reply. */
2431
2432 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2433 {
2434         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2435         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2436         lkb->lkb_remid = ms->m_lkid;
2437 }
2438
2439 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2440 {
2441         struct dlm_lkb *lkb;
2442         struct dlm_rsb *r;
2443         int error, namelen;
2444
2445         error = create_lkb(ls, &lkb);
2446         if (error)
2447                 goto fail;
2448
2449         receive_flags(lkb, ms);
2450         lkb->lkb_flags |= DLM_IFL_MSTCPY;
2451         error = receive_request_args(ls, lkb, ms);
2452         if (error) {
2453                 __put_lkb(ls, lkb);
2454                 goto fail;
2455         }
2456
2457         namelen = receive_extralen(ms);
2458
2459         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2460         if (error) {
2461                 __put_lkb(ls, lkb);
2462                 goto fail;
2463         }
2464
2465         lock_rsb(r);
2466
2467         attach_lkb(r, lkb);
2468         error = do_request(r, lkb);
2469         send_request_reply(r, lkb, error);
2470
2471         unlock_rsb(r);
2472         put_rsb(r);
2473
2474         if (error == -EINPROGRESS)
2475                 error = 0;
2476         if (error)
2477                 dlm_put_lkb(lkb);
2478         return;
2479
2480  fail:
2481         setup_stub_lkb(ls, ms);
2482         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2483 }
2484
2485 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2486 {
2487         struct dlm_lkb *lkb;
2488         struct dlm_rsb *r;
2489         int error, reply = 1;
2490
2491         error = find_lkb(ls, ms->m_remid, &lkb);
2492         if (error)
2493                 goto fail;
2494
2495         r = lkb->lkb_resource;
2496
2497         hold_rsb(r);
2498         lock_rsb(r);
2499
2500         receive_flags(lkb, ms);
2501         error = receive_convert_args(ls, lkb, ms);
2502         if (error)
2503                 goto out;
2504         reply = !down_conversion(lkb);
2505
2506         error = do_convert(r, lkb);
2507  out:
2508         if (reply)
2509                 send_convert_reply(r, lkb, error);
2510
2511         unlock_rsb(r);
2512         put_rsb(r);
2513         dlm_put_lkb(lkb);
2514         return;
2515
2516  fail:
2517         setup_stub_lkb(ls, ms);
2518         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2519 }
2520
2521 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2522 {
2523         struct dlm_lkb *lkb;
2524         struct dlm_rsb *r;
2525         int error;
2526
2527         error = find_lkb(ls, ms->m_remid, &lkb);
2528         if (error)
2529                 goto fail;
2530
2531         r = lkb->lkb_resource;
2532
2533         hold_rsb(r);
2534         lock_rsb(r);
2535
2536         receive_flags(lkb, ms);
2537         error = receive_unlock_args(ls, lkb, ms);
2538         if (error)
2539                 goto out;
2540
2541         error = do_unlock(r, lkb);
2542  out:
2543         send_unlock_reply(r, lkb, error);
2544
2545         unlock_rsb(r);
2546         put_rsb(r);
2547         dlm_put_lkb(lkb);
2548         return;
2549
2550  fail:
2551         setup_stub_lkb(ls, ms);
2552         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2553 }
2554
2555 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2556 {
2557         struct dlm_lkb *lkb;
2558         struct dlm_rsb *r;
2559         int error;
2560
2561         error = find_lkb(ls, ms->m_remid, &lkb);
2562         if (error)
2563                 goto fail;
2564
2565         receive_flags(lkb, ms);
2566
2567         r = lkb->lkb_resource;
2568
2569         hold_rsb(r);
2570         lock_rsb(r);
2571
2572         error = do_cancel(r, lkb);
2573         send_cancel_reply(r, lkb, error);
2574
2575         unlock_rsb(r);
2576         put_rsb(r);
2577         dlm_put_lkb(lkb);
2578         return;
2579
2580  fail:
2581         setup_stub_lkb(ls, ms);
2582         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2583 }
2584
2585 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2586 {
2587         struct dlm_lkb *lkb;
2588         struct dlm_rsb *r;
2589         int error;
2590
2591         error = find_lkb(ls, ms->m_remid, &lkb);
2592         if (error) {
2593                 log_error(ls, "receive_grant no lkb");
2594                 return;
2595         }
2596         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2597
2598         r = lkb->lkb_resource;
2599
2600         hold_rsb(r);
2601         lock_rsb(r);
2602
2603         receive_flags_reply(lkb, ms);
2604         grant_lock_pc(r, lkb, ms);
2605         queue_cast(r, lkb, 0);
2606
2607         unlock_rsb(r);
2608         put_rsb(r);
2609         dlm_put_lkb(lkb);
2610 }
2611
2612 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2613 {
2614         struct dlm_lkb *lkb;
2615         struct dlm_rsb *r;
2616         int error;
2617
2618         error = find_lkb(ls, ms->m_remid, &lkb);
2619         if (error) {
2620                 log_error(ls, "receive_bast no lkb");
2621                 return;
2622         }
2623         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2624
2625         r = lkb->lkb_resource;
2626
2627         hold_rsb(r);
2628         lock_rsb(r);
2629
2630         queue_bast(r, lkb, ms->m_bastmode);
2631
2632         unlock_rsb(r);
2633         put_rsb(r);
2634         dlm_put_lkb(lkb);
2635 }
2636
2637 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2638 {
2639         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2640
2641         from_nodeid = ms->m_header.h_nodeid;
2642         our_nodeid = dlm_our_nodeid();
2643
2644         len = receive_extralen(ms);
2645
2646         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2647         if (dir_nodeid != our_nodeid) {
2648                 log_error(ls, "lookup dir_nodeid %d from %d",
2649                           dir_nodeid, from_nodeid);
2650                 error = -EINVAL;
2651                 ret_nodeid = -1;
2652                 goto out;
2653         }
2654
2655         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2656
2657         /* Optimization: we're master so treat lookup as a request */
2658         if (!error && ret_nodeid == our_nodeid) {
2659                 receive_request(ls, ms);
2660                 return;
2661         }
2662  out:
2663         send_lookup_reply(ls, ms, ret_nodeid, error);
2664 }
2665
2666 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2667 {
2668         int len, dir_nodeid, from_nodeid;
2669
2670         from_nodeid = ms->m_header.h_nodeid;
2671
2672         len = receive_extralen(ms);
2673
2674         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2675         if (dir_nodeid != dlm_our_nodeid()) {
2676                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2677                           dir_nodeid, from_nodeid);
2678                 return;
2679         }
2680
2681         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2682 }
2683
2684 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2685 {
2686         struct dlm_lkb *lkb;
2687         struct dlm_rsb *r;
2688         int error, mstype;
2689
2690         error = find_lkb(ls, ms->m_remid, &lkb);
2691         if (error) {
2692                 log_error(ls, "receive_request_reply no lkb");
2693                 return;
2694         }
2695         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2696
2697         mstype = lkb->lkb_wait_type;
2698         error = remove_from_waiters(lkb);
2699         if (error) {
2700                 log_error(ls, "receive_request_reply not on waiters");
2701                 goto out;
2702         }
2703
2704         /* this is the value returned from do_request() on the master */
2705         error = ms->m_result;
2706
2707         r = lkb->lkb_resource;
2708         hold_rsb(r);
2709         lock_rsb(r);
2710
2711         /* Optimization: the dir node was also the master, so it took our
2712            lookup as a request and sent request reply instead of lookup reply */
2713         if (mstype == DLM_MSG_LOOKUP) {
2714                 r->res_nodeid = ms->m_header.h_nodeid;
2715                 lkb->lkb_nodeid = r->res_nodeid;
2716         }
2717
2718         switch (error) {
2719         case -EAGAIN:
2720                 /* request would block (be queued) on remote master;
2721                    the unhold undoes the original ref from create_lkb()
2722                    so it leads to the lkb being freed */
2723                 queue_cast(r, lkb, -EAGAIN);
2724                 confirm_master(r, -EAGAIN);
2725                 unhold_lkb(lkb);
2726                 break;
2727
2728         case -EINPROGRESS:
2729         case 0:
2730                 /* request was queued or granted on remote master */
2731                 receive_flags_reply(lkb, ms);
2732                 lkb->lkb_remid = ms->m_lkid;
2733                 if (error)
2734                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
2735                 else {
2736                         grant_lock_pc(r, lkb, ms);
2737                         queue_cast(r, lkb, 0);
2738                 }
2739                 confirm_master(r, error);
2740                 break;
2741
2742         case -ENOENT:
2743         case -ENOTBLK:
2744                 /* find_rsb failed to find rsb or rsb wasn't master */
2745                 r->res_nodeid = -1;
2746                 lkb->lkb_nodeid = -1;
2747                 _request_lock(r, lkb);
2748                 break;
2749
2750         default:
2751                 log_error(ls, "receive_request_reply error %d", error);
2752         }
2753
2754         unlock_rsb(r);
2755         put_rsb(r);
2756  out:
2757         dlm_put_lkb(lkb);
2758 }
2759
2760 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2761                                     struct dlm_message *ms)
2762 {
2763         int error = ms->m_result;
2764
2765         /* this is the value returned from do_convert() on the master */
2766
2767         switch (error) {
2768         case -EAGAIN:
2769                 /* convert would block (be queued) on remote master */
2770                 queue_cast(r, lkb, -EAGAIN);
2771                 break;
2772
2773         case -EINPROGRESS:
2774                 /* convert was queued on remote master */
2775                 del_lkb(r, lkb);
2776                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2777                 break;
2778
2779         case 0:
2780                 /* convert was granted on remote master */
2781                 receive_flags_reply(lkb, ms);
2782                 grant_lock_pc(r, lkb, ms);
2783                 queue_cast(r, lkb, 0);
2784                 break;
2785
2786         default:
2787                 log_error(r->res_ls, "receive_convert_reply error %d", error);
2788         }
2789 }
2790
2791 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2792 {
2793         struct dlm_rsb *r = lkb->lkb_resource;
2794
2795         hold_rsb(r);
2796         lock_rsb(r);
2797
2798         __receive_convert_reply(r, lkb, ms);
2799
2800         unlock_rsb(r);
2801         put_rsb(r);
2802 }
2803
2804 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2805 {
2806         struct dlm_lkb *lkb;
2807         int error;
2808
2809         error = find_lkb(ls, ms->m_remid, &lkb);
2810         if (error) {
2811                 log_error(ls, "receive_convert_reply no lkb");
2812                 return;
2813         }
2814         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2815
2816         error = remove_from_waiters(lkb);
2817         if (error) {
2818                 log_error(ls, "receive_convert_reply not on waiters");
2819                 goto out;
2820         }
2821
2822         _receive_convert_reply(lkb, ms);
2823  out:
2824         dlm_put_lkb(lkb);
2825 }
2826
2827 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2828 {
2829         struct dlm_rsb *r = lkb->lkb_resource;
2830         int error = ms->m_result;
2831
2832         hold_rsb(r);
2833         lock_rsb(r);
2834
2835         /* this is the value returned from do_unlock() on the master */
2836
2837         switch (error) {
2838         case -DLM_EUNLOCK:
2839                 receive_flags_reply(lkb, ms);
2840                 remove_lock_pc(r, lkb);
2841                 queue_cast(r, lkb, -DLM_EUNLOCK);
2842                 break;
2843         default:
2844                 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2845         }
2846
2847         unlock_rsb(r);
2848         put_rsb(r);
2849 }
2850
2851 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2852 {
2853         struct dlm_lkb *lkb;
2854         int error;
2855
2856         error = find_lkb(ls, ms->m_remid, &lkb);
2857         if (error) {
2858                 log_error(ls, "receive_unlock_reply no lkb");
2859                 return;
2860         }
2861         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2862
2863         error = remove_from_waiters(lkb);
2864         if (error) {
2865                 log_error(ls, "receive_unlock_reply not on waiters");
2866                 goto out;
2867         }
2868
2869         _receive_unlock_reply(lkb, ms);
2870  out:
2871         dlm_put_lkb(lkb);
2872 }
2873
2874 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2875 {
2876         struct dlm_rsb *r = lkb->lkb_resource;
2877         int error = ms->m_result;
2878
2879         hold_rsb(r);
2880         lock_rsb(r);
2881
2882         /* this is the value returned from do_cancel() on the master */
2883
2884         switch (error) {
2885         case -DLM_ECANCEL:
2886                 receive_flags_reply(lkb, ms);
2887                 revert_lock_pc(r, lkb);
2888                 queue_cast(r, lkb, -DLM_ECANCEL);
2889                 break;
2890         default:
2891                 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2892         }
2893
2894         unlock_rsb(r);
2895         put_rsb(r);
2896 }
2897
2898 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2899 {
2900         struct dlm_lkb *lkb;
2901         int error;
2902
2903         error = find_lkb(ls, ms->m_remid, &lkb);
2904         if (error) {
2905                 log_error(ls, "receive_cancel_reply no lkb");
2906                 return;
2907         }
2908         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2909
2910         error = remove_from_waiters(lkb);
2911         if (error) {
2912                 log_error(ls, "receive_cancel_reply not on waiters");
2913                 goto out;
2914         }
2915
2916         _receive_cancel_reply(lkb, ms);
2917  out:
2918         dlm_put_lkb(lkb);
2919 }
2920
2921 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2922 {
2923         struct dlm_lkb *lkb;
2924         struct dlm_rsb *r;
2925         int error, ret_nodeid;
2926
2927         error = find_lkb(ls, ms->m_lkid, &lkb);
2928         if (error) {
2929                 log_error(ls, "receive_lookup_reply no lkb");
2930                 return;
2931         }
2932
2933         error = remove_from_waiters(lkb);
2934         if (error) {
2935                 log_error(ls, "receive_lookup_reply not on waiters");
2936                 goto out;
2937         }
2938
2939         /* this is the value returned by dlm_dir_lookup on dir node
2940            FIXME: will a non-zero error ever be returned? */
2941         error = ms->m_result;
2942
2943         r = lkb->lkb_resource;
2944         hold_rsb(r);
2945         lock_rsb(r);
2946
2947         ret_nodeid = ms->m_nodeid;
2948         if (ret_nodeid == dlm_our_nodeid()) {
2949                 r->res_nodeid = 0;
2950                 ret_nodeid = 0;
2951                 r->res_first_lkid = 0;
2952         } else {
2953                 /* set_master() will copy res_nodeid to lkb_nodeid */
2954                 r->res_nodeid = ret_nodeid;
2955         }
2956
2957         _request_lock(r, lkb);
2958
2959         if (!ret_nodeid)
2960                 process_lookup_list(r);
2961
2962         unlock_rsb(r);
2963         put_rsb(r);
2964  out:
2965         dlm_put_lkb(lkb);
2966 }
2967
2968 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
2969 {
2970         struct dlm_message *ms = (struct dlm_message *) hd;
2971         struct dlm_ls *ls;
2972         int error;
2973
2974         if (!recovery)
2975                 dlm_message_in(ms);
2976
2977         ls = dlm_find_lockspace_global(hd->h_lockspace);
2978         if (!ls) {
2979                 log_print("drop message %d from %d for unknown lockspace %d",
2980                           ms->m_type, nodeid, hd->h_lockspace);
2981                 return -EINVAL;
2982         }
2983
2984         /* recovery may have just ended leaving a bunch of backed-up requests
2985            in the requestqueue; wait while dlm_recoverd clears them */
2986
2987         if (!recovery)
2988                 dlm_wait_requestqueue(ls);
2989
2990         /* recovery may have just started while there were a bunch of
2991            in-flight requests -- save them in requestqueue to be processed
2992            after recovery.  we can't let dlm_recvd block on the recovery
2993            lock.  if dlm_recoverd is calling this function to clear the
2994            requestqueue, it needs to be interrupted (-EINTR) if another
2995            recovery operation is starting. */
2996
2997         while (1) {
2998                 if (dlm_locking_stopped(ls)) {
2999                         if (!recovery)
3000                                 dlm_add_requestqueue(ls, nodeid, hd);
3001                         error = -EINTR;
3002                         goto out;
3003                 }
3004
3005                 if (lock_recovery_try(ls))
3006                         break;
3007                 schedule();
3008         }
3009
3010         switch (ms->m_type) {
3011
3012         /* messages sent to a master node */
3013
3014         case DLM_MSG_REQUEST:
3015                 receive_request(ls, ms);
3016                 break;
3017
3018         case DLM_MSG_CONVERT:
3019                 receive_convert(ls, ms);
3020                 break;
3021
3022         case DLM_MSG_UNLOCK:
3023                 receive_unlock(ls, ms);
3024                 break;
3025
3026         case DLM_MSG_CANCEL:
3027                 receive_cancel(ls, ms);
3028                 break;
3029
3030         /* messages sent from a master node (replies to above) */
3031
3032         case DLM_MSG_REQUEST_REPLY:
3033                 receive_request_reply(ls, ms);
3034                 break;
3035
3036         case DLM_MSG_CONVERT_REPLY:
3037                 receive_convert_reply(ls, ms);
3038                 break;
3039
3040         case DLM_MSG_UNLOCK_REPLY:
3041                 receive_unlock_reply(ls, ms);
3042                 break;
3043
3044         case DLM_MSG_CANCEL_REPLY:
3045                 receive_cancel_reply(ls, ms);
3046                 break;
3047
3048         /* messages sent from a master node (only two types of async msg) */
3049
3050         case DLM_MSG_GRANT:
3051                 receive_grant(ls, ms);
3052                 break;
3053
3054         case DLM_MSG_BAST:
3055                 receive_bast(ls, ms);
3056                 break;
3057
3058         /* messages sent to a dir node */
3059
3060         case DLM_MSG_LOOKUP:
3061                 receive_lookup(ls, ms);
3062                 break;
3063
3064         case DLM_MSG_REMOVE:
3065                 receive_remove(ls, ms);
3066                 break;
3067
3068         /* messages sent from a dir node (remove has no reply) */
3069
3070         case DLM_MSG_LOOKUP_REPLY:
3071                 receive_lookup_reply(ls, ms);
3072                 break;
3073
3074         default:
3075                 log_error(ls, "unknown message type %d", ms->m_type);
3076         }
3077
3078         unlock_recovery(ls);
3079  out:
3080         dlm_put_lockspace(ls);
3081         dlm_astd_wake();
3082         return 0;
3083 }
3084
3085
3086 /*
3087  * Recovery related
3088  */
3089
3090 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3091 {
3092         if (middle_conversion(lkb)) {
3093                 hold_lkb(lkb);
3094                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3095                 _remove_from_waiters(lkb);
3096                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3097
3098                 /* Same special case as in receive_rcom_lock_args() */
3099                 lkb->lkb_grmode = DLM_LOCK_IV;
3100                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3101                 unhold_lkb(lkb);
3102
3103         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3104                 lkb->lkb_flags |= DLM_IFL_RESEND;
3105         }
3106
3107         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3108            conversions are async; there's no reply from the remote master */
3109 }
3110
3111 /* A waiting lkb needs recovery if the master node has failed, or
3112    the master node is changing (only when no directory is used) */
3113
3114 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3115 {
3116         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3117                 return 1;
3118
3119         if (!dlm_no_directory(ls))
3120                 return 0;
3121
3122         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3123                 return 1;
3124
3125         return 0;
3126 }
3127
3128 /* Recovery for locks that are waiting for replies from nodes that are now
3129    gone.  We can just complete unlocks and cancels by faking a reply from the
3130    dead node.  Requests and up-conversions we flag to be resent after
3131    recovery.  Down-conversions can just be completed with a fake reply like
3132    unlocks.  Conversions between PR and CW need special attention. */
3133
3134 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3135 {
3136         struct dlm_lkb *lkb, *safe;
3137
3138         mutex_lock(&ls->ls_waiters_mutex);
3139
3140         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3141                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3142                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3143
3144                 /* all outstanding lookups, regardless of destination  will be
3145                    resent after recovery is done */
3146
3147                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3148                         lkb->lkb_flags |= DLM_IFL_RESEND;
3149                         continue;
3150                 }
3151
3152                 if (!waiter_needs_recovery(ls, lkb))
3153                         continue;
3154
3155                 switch (lkb->lkb_wait_type) {
3156
3157                 case DLM_MSG_REQUEST:
3158                         lkb->lkb_flags |= DLM_IFL_RESEND;
3159                         break;
3160
3161                 case DLM_MSG_CONVERT:
3162                         recover_convert_waiter(ls, lkb);
3163                         break;
3164
3165                 case DLM_MSG_UNLOCK:
3166                         hold_lkb(lkb);
3167                         ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3168                         _remove_from_waiters(lkb);
3169                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3170                         dlm_put_lkb(lkb);
3171                         break;
3172
3173                 case DLM_MSG_CANCEL:
3174                         hold_lkb(lkb);
3175                         ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3176                         _remove_from_waiters(lkb);
3177                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3178                         dlm_put_lkb(lkb);
3179                         break;
3180
3181                 default:
3182                         log_error(ls, "invalid lkb wait_type %d",
3183                                   lkb->lkb_wait_type);
3184                 }
3185         }
3186         mutex_unlock(&ls->ls_waiters_mutex);
3187 }
3188
3189 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3190 {
3191         struct dlm_lkb *lkb;
3192         int rv = 0;
3193
3194         mutex_lock(&ls->ls_waiters_mutex);
3195         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3196                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3197                         rv = lkb->lkb_wait_type;
3198                         _remove_from_waiters(lkb);
3199                         lkb->lkb_flags &= ~DLM_IFL_RESEND;
3200                         break;
3201                 }
3202         }
3203         mutex_unlock(&ls->ls_waiters_mutex);
3204
3205         if (!rv)
3206                 lkb = NULL;
3207         *lkb_ret = lkb;
3208         return rv;
3209 }
3210
3211 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3212    master or dir-node for r.  Processing the lkb may result in it being placed
3213    back on waiters. */
3214
3215 int dlm_recover_waiters_post(struct dlm_ls *ls)
3216 {
3217         struct dlm_lkb *lkb;
3218         struct dlm_rsb *r;
3219         int error = 0, mstype;
3220
3221         while (1) {
3222                 if (dlm_locking_stopped(ls)) {
3223                         log_debug(ls, "recover_waiters_post aborted");
3224                         error = -EINTR;
3225                         break;
3226                 }
3227
3228                 mstype = remove_resend_waiter(ls, &lkb);
3229                 if (!mstype)
3230                         break;
3231
3232                 r = lkb->lkb_resource;
3233
3234                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3235                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3236
3237                 switch (mstype) {
3238
3239                 case DLM_MSG_LOOKUP:
3240                         hold_rsb(r);
3241                         lock_rsb(r);
3242                         _request_lock(r, lkb);
3243                         if (is_master(r))
3244                                 confirm_master(r, 0);
3245                         unlock_rsb(r);
3246                         put_rsb(r);
3247                         break;
3248
3249                 case DLM_MSG_REQUEST:
3250                         hold_rsb(r);
3251                         lock_rsb(r);
3252                         _request_lock(r, lkb);
3253                         unlock_rsb(r);
3254                         put_rsb(r);
3255                         break;
3256
3257                 case DLM_MSG_CONVERT:
3258                         hold_rsb(r);
3259                         lock_rsb(r);
3260                         _convert_lock(r, lkb);
3261                         unlock_rsb(r);
3262                         put_rsb(r);
3263                         break;
3264
3265                 default:
3266                         log_error(ls, "recover_waiters_post type %d", mstype);
3267                 }
3268         }
3269
3270         return error;
3271 }
3272
3273 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3274                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3275 {
3276         struct dlm_ls *ls = r->res_ls;
3277         struct dlm_lkb *lkb, *safe;
3278
3279         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3280                 if (test(ls, lkb)) {
3281                         rsb_set_flag(r, RSB_LOCKS_PURGED);
3282                         del_lkb(r, lkb);
3283                         /* this put should free the lkb */
3284                         if (!dlm_put_lkb(lkb))
3285                                 log_error(ls, "purged lkb not released");
3286                 }
3287         }
3288 }
3289
3290 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3291 {
3292         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3293 }
3294
3295 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3296 {
3297         return is_master_copy(lkb);
3298 }
3299
3300 static void purge_dead_locks(struct dlm_rsb *r)
3301 {
3302         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3303         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3304         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3305 }
3306
3307 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3308 {
3309         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3310         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3311         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3312 }
3313
3314 /* Get rid of locks held by nodes that are gone. */
3315
3316 int dlm_purge_locks(struct dlm_ls *ls)
3317 {
3318         struct dlm_rsb *r;
3319
3320         log_debug(ls, "dlm_purge_locks");
3321
3322         down_write(&ls->ls_root_sem);
3323         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3324                 hold_rsb(r);
3325                 lock_rsb(r);
3326                 if (is_master(r))
3327                         purge_dead_locks(r);
3328                 unlock_rsb(r);
3329                 unhold_rsb(r);
3330
3331                 schedule();
3332         }
3333         up_write(&ls->ls_root_sem);
3334
3335         return 0;
3336 }
3337
3338 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3339 {
3340         struct dlm_rsb *r, *r_ret = NULL;
3341
3342         read_lock(&ls->ls_rsbtbl[bucket].lock);
3343         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3344                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3345                         continue;
3346                 hold_rsb(r);
3347                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3348                 r_ret = r;
3349                 break;
3350         }
3351         read_unlock(&ls->ls_rsbtbl[bucket].lock);
3352         return r_ret;
3353 }
3354
3355 void dlm_grant_after_purge(struct dlm_ls *ls)
3356 {
3357         struct dlm_rsb *r;
3358         int i;
3359
3360         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
3361                 r = find_purged_rsb(ls, i);
3362                 if (!r)
3363                         continue;
3364                 lock_rsb(r);
3365                 if (is_master(r)) {
3366                         grant_pending_locks(r);
3367                         confirm_master(r, 0);
3368                 }
3369                 unlock_rsb(r);
3370                 put_rsb(r);
3371         }
3372 }
3373
3374 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3375                                          uint32_t remid)
3376 {
3377         struct dlm_lkb *lkb;
3378
3379         list_for_each_entry(lkb, head, lkb_statequeue) {
3380                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3381                         return lkb;
3382         }
3383         return NULL;
3384 }
3385
3386 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3387                                     uint32_t remid)
3388 {
3389         struct dlm_lkb *lkb;
3390
3391         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3392         if (lkb)
3393                 return lkb;
3394         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3395         if (lkb)
3396                 return lkb;
3397         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3398         if (lkb)
3399                 return lkb;
3400         return NULL;
3401 }
3402
3403 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3404                                   struct dlm_rsb *r, struct dlm_rcom *rc)
3405 {
3406         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3407         int lvblen;
3408
3409         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3410         lkb->lkb_ownpid = rl->rl_ownpid;
3411         lkb->lkb_remid = rl->rl_lkid;
3412         lkb->lkb_exflags = rl->rl_exflags;
3413         lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3414         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3415         lkb->lkb_lvbseq = rl->rl_lvbseq;
3416         lkb->lkb_rqmode = rl->rl_rqmode;
3417         lkb->lkb_grmode = rl->rl_grmode;
3418         /* don't set lkb_status because add_lkb wants to itself */
3419
3420         lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3421         lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3422
3423         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3424                 lkb->lkb_lvbptr = allocate_lvb(ls);
3425                 if (!lkb->lkb_lvbptr)
3426                         return -ENOMEM;
3427                 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3428                          sizeof(struct rcom_lock);
3429                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3430         }
3431
3432         /* Conversions between PR and CW (middle modes) need special handling.
3433            The real granted mode of these converting locks cannot be determined
3434            until all locks have been rebuilt on the rsb (recover_conversion) */
3435
3436         if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3437                 rl->rl_status = DLM_LKSTS_CONVERT;
3438                 lkb->lkb_grmode = DLM_LOCK_IV;
3439                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3440         }
3441
3442         return 0;
3443 }
3444
3445 /* This lkb may have been recovered in a previous aborted recovery so we need
3446    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3447    If so we just send back a standard reply.  If not, we create a new lkb with
3448    the given values and send back our lkid.  We send back our lkid by sending
3449    back the rcom_lock struct we got but with the remid field filled in. */
3450
3451 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3452 {
3453         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3454         struct dlm_rsb *r;
3455         struct dlm_lkb *lkb;
3456         int error;
3457
3458         if (rl->rl_parent_lkid) {
3459                 error = -EOPNOTSUPP;
3460                 goto out;
3461         }
3462
3463         error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3464         if (error)
3465                 goto out;
3466
3467         lock_rsb(r);
3468
3469         lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3470         if (lkb) {
3471                 error = -EEXIST;
3472                 goto out_remid;
3473         }
3474
3475         error = create_lkb(ls, &lkb);
3476         if (error)
3477                 goto out_unlock;
3478
3479         error = receive_rcom_lock_args(ls, lkb, r, rc);
3480         if (error) {
3481                 __put_lkb(ls, lkb);
3482                 goto out_unlock;
3483         }
3484
3485         attach_lkb(r, lkb);
3486         add_lkb(r, lkb, rl->rl_status);
3487         error = 0;
3488
3489  out_remid:
3490         /* this is the new value returned to the lock holder for
3491            saving in its process-copy lkb */
3492         rl->rl_remid = lkb->lkb_id;
3493
3494  out_unlock:
3495         unlock_rsb(r);
3496         put_rsb(r);
3497  out:
3498         if (error)
3499                 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3500         rl->rl_result = error;
3501         return error;
3502 }
3503
3504 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3505 {
3506         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3507         struct dlm_rsb *r;
3508         struct dlm_lkb *lkb;
3509         int error;
3510
3511         error = find_lkb(ls, rl->rl_lkid, &lkb);
3512         if (error) {
3513                 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3514                 return error;
3515         }
3516
3517         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3518
3519         error = rl->rl_result;
3520
3521         r = lkb->lkb_resource;
3522         hold_rsb(r);
3523         lock_rsb(r);
3524
3525         switch (error) {
3526         case -EEXIST:
3527                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3528                 /* fall through */
3529         case 0:
3530                 lkb->lkb_remid = rl->rl_remid;
3531                 break;
3532         default:
3533                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3534                           error, lkb->lkb_id);
3535         }
3536
3537         /* an ack for dlm_recover_locks() which waits for replies from
3538            all the locks it sends to new masters */
3539         dlm_recovered_lock(r);
3540
3541         unlock_rsb(r);
3542         put_rsb(r);
3543         dlm_put_lkb(lkb);
3544
3545         return 0;
3546 }
3547