[GFS2] Make the new argument to gfs2_trans_add_bh() actually do something
[linux-2.6] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58
59 #include "dlm_internal.h"
60 #include "memory.h"
61 #include "lowcomms.h"
62 #include "requestqueue.h"
63 #include "util.h"
64 #include "dir.h"
65 #include "member.h"
66 #include "lockspace.h"
67 #include "ast.h"
68 #include "lock.h"
69 #include "rcom.h"
70 #include "recover.h"
71 #include "lvb_table.h"
72 #include "config.h"
73
74 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
75 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
76 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
80 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_remove(struct dlm_rsb *r);
82 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
84                                     struct dlm_message *ms);
85 static int receive_extralen(struct dlm_message *ms);
86
87 /*
88  * Lock compatibilty matrix - thanks Steve
89  * UN = Unlocked state. Not really a state, used as a flag
90  * PD = Padding. Used to make the matrix a nice power of two in size
91  * Other states are the same as the VMS DLM.
92  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
93  */
94
95 static const int __dlm_compat_matrix[8][8] = {
96       /* UN NL CR CW PR PW EX PD */
97         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
98         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
99         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
100         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
101         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
102         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
103         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
104         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
105 };
106
107 /*
108  * This defines the direction of transfer of LVB data.
109  * Granted mode is the row; requested mode is the column.
110  * Usage: matrix[grmode+1][rqmode+1]
111  * 1 = LVB is returned to the caller
112  * 0 = LVB is written to the resource
113  * -1 = nothing happens to the LVB
114  */
115
116 const int dlm_lvb_operations[8][8] = {
117         /* UN   NL  CR  CW  PR  PW  EX  PD*/
118         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
119         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
120         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
121         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
122         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
123         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
124         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
125         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
126 };
127 EXPORT_SYMBOL_GPL(dlm_lvb_operations);
128
129 #define modes_compat(gr, rq) \
130         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
131
132 int dlm_modes_compat(int mode1, int mode2)
133 {
134         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
135 }
136
137 /*
138  * Compatibility matrix for conversions with QUECVT set.
139  * Granted mode is the row; requested mode is the column.
140  * Usage: matrix[grmode+1][rqmode+1]
141  */
142
143 static const int __quecvt_compat_matrix[8][8] = {
144       /* UN NL CR CW PR PW EX PD */
145         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
146         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
147         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
148         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
149         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
150         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
152         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
153 };
154
155 static void dlm_print_lkb(struct dlm_lkb *lkb)
156 {
157         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
158                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
159                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
160                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
161                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
162 }
163
164 void dlm_print_rsb(struct dlm_rsb *r)
165 {
166         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
167                r->res_nodeid, r->res_flags, r->res_first_lkid,
168                r->res_recover_locks_count, r->res_name);
169 }
170
171 /* Threads cannot use the lockspace while it's being recovered */
172
173 static inline void lock_recovery(struct dlm_ls *ls)
174 {
175         down_read(&ls->ls_in_recovery);
176 }
177
178 static inline void unlock_recovery(struct dlm_ls *ls)
179 {
180         up_read(&ls->ls_in_recovery);
181 }
182
183 static inline int lock_recovery_try(struct dlm_ls *ls)
184 {
185         return down_read_trylock(&ls->ls_in_recovery);
186 }
187
188 static inline int can_be_queued(struct dlm_lkb *lkb)
189 {
190         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
191 }
192
193 static inline int force_blocking_asts(struct dlm_lkb *lkb)
194 {
195         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
196 }
197
198 static inline int is_demoted(struct dlm_lkb *lkb)
199 {
200         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
201 }
202
203 static inline int is_remote(struct dlm_rsb *r)
204 {
205         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
206         return !!r->res_nodeid;
207 }
208
209 static inline int is_process_copy(struct dlm_lkb *lkb)
210 {
211         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
212 }
213
214 static inline int is_master_copy(struct dlm_lkb *lkb)
215 {
216         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
217                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
218         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? TRUE : FALSE;
219 }
220
221 static inline int middle_conversion(struct dlm_lkb *lkb)
222 {
223         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
224             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
225                 return TRUE;
226         return FALSE;
227 }
228
229 static inline int down_conversion(struct dlm_lkb *lkb)
230 {
231         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
232 }
233
234 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
235 {
236         if (is_master_copy(lkb))
237                 return;
238
239         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
240
241         lkb->lkb_lksb->sb_status = rv;
242         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
243
244         dlm_add_ast(lkb, AST_COMP);
245 }
246
247 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
248 {
249         if (is_master_copy(lkb))
250                 send_bast(r, lkb, rqmode);
251         else {
252                 lkb->lkb_bastmode = rqmode;
253                 dlm_add_ast(lkb, AST_BAST);
254         }
255 }
256
257 /*
258  * Basic operations on rsb's and lkb's
259  */
260
261 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
262 {
263         struct dlm_rsb *r;
264
265         r = allocate_rsb(ls, len);
266         if (!r)
267                 return NULL;
268
269         r->res_ls = ls;
270         r->res_length = len;
271         memcpy(r->res_name, name, len);
272         init_MUTEX(&r->res_sem);
273
274         INIT_LIST_HEAD(&r->res_lookup);
275         INIT_LIST_HEAD(&r->res_grantqueue);
276         INIT_LIST_HEAD(&r->res_convertqueue);
277         INIT_LIST_HEAD(&r->res_waitqueue);
278         INIT_LIST_HEAD(&r->res_root_list);
279         INIT_LIST_HEAD(&r->res_recover_list);
280
281         return r;
282 }
283
284 static int search_rsb_list(struct list_head *head, char *name, int len,
285                            unsigned int flags, struct dlm_rsb **r_ret)
286 {
287         struct dlm_rsb *r;
288         int error = 0;
289
290         list_for_each_entry(r, head, res_hashchain) {
291                 if (len == r->res_length && !memcmp(name, r->res_name, len))
292                         goto found;
293         }
294         return -ENOENT;
295
296  found:
297         if (r->res_nodeid && (flags & R_MASTER))
298                 error = -ENOTBLK;
299         *r_ret = r;
300         return error;
301 }
302
303 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
304                        unsigned int flags, struct dlm_rsb **r_ret)
305 {
306         struct dlm_rsb *r;
307         int error;
308
309         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
310         if (!error) {
311                 kref_get(&r->res_ref);
312                 goto out;
313         }
314         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
315         if (error)
316                 goto out;
317
318         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
319
320         if (dlm_no_directory(ls))
321                 goto out;
322
323         if (r->res_nodeid == -1) {
324                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
325                 r->res_first_lkid = 0;
326         } else if (r->res_nodeid > 0) {
327                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
328                 r->res_first_lkid = 0;
329         } else {
330                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
331                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
332         }
333  out:
334         *r_ret = r;
335         return error;
336 }
337
338 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
339                       unsigned int flags, struct dlm_rsb **r_ret)
340 {
341         int error;
342         write_lock(&ls->ls_rsbtbl[b].lock);
343         error = _search_rsb(ls, name, len, b, flags, r_ret);
344         write_unlock(&ls->ls_rsbtbl[b].lock);
345         return error;
346 }
347
348 /*
349  * Find rsb in rsbtbl and potentially create/add one
350  *
351  * Delaying the release of rsb's has a similar benefit to applications keeping
352  * NL locks on an rsb, but without the guarantee that the cached master value
353  * will still be valid when the rsb is reused.  Apps aren't always smart enough
354  * to keep NL locks on an rsb that they may lock again shortly; this can lead
355  * to excessive master lookups and removals if we don't delay the release.
356  *
357  * Searching for an rsb means looking through both the normal list and toss
358  * list.  When found on the toss list the rsb is moved to the normal list with
359  * ref count of 1; when found on normal list the ref count is incremented.
360  */
361
362 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
363                     unsigned int flags, struct dlm_rsb **r_ret)
364 {
365         struct dlm_rsb *r, *tmp;
366         uint32_t hash, bucket;
367         int error = 0;
368
369         if (dlm_no_directory(ls))
370                 flags |= R_CREATE;
371
372         hash = jhash(name, namelen, 0);
373         bucket = hash & (ls->ls_rsbtbl_size - 1);
374
375         error = search_rsb(ls, name, namelen, bucket, flags, &r);
376         if (!error)
377                 goto out;
378
379         if (error == -ENOENT && !(flags & R_CREATE))
380                 goto out;
381
382         /* the rsb was found but wasn't a master copy */
383         if (error == -ENOTBLK)
384                 goto out;
385
386         error = -ENOMEM;
387         r = create_rsb(ls, name, namelen);
388         if (!r)
389                 goto out;
390
391         r->res_hash = hash;
392         r->res_bucket = bucket;
393         r->res_nodeid = -1;
394         kref_init(&r->res_ref);
395
396         /* With no directory, the master can be set immediately */
397         if (dlm_no_directory(ls)) {
398                 int nodeid = dlm_dir_nodeid(r);
399                 if (nodeid == dlm_our_nodeid())
400                         nodeid = 0;
401                 r->res_nodeid = nodeid;
402         }
403
404         write_lock(&ls->ls_rsbtbl[bucket].lock);
405         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
406         if (!error) {
407                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
408                 free_rsb(r);
409                 r = tmp;
410                 goto out;
411         }
412         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
413         write_unlock(&ls->ls_rsbtbl[bucket].lock);
414         error = 0;
415  out:
416         *r_ret = r;
417         return error;
418 }
419
420 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
421                  unsigned int flags, struct dlm_rsb **r_ret)
422 {
423         return find_rsb(ls, name, namelen, flags, r_ret);
424 }
425
426 /* This is only called to add a reference when the code already holds
427    a valid reference to the rsb, so there's no need for locking. */
428
429 static inline void hold_rsb(struct dlm_rsb *r)
430 {
431         kref_get(&r->res_ref);
432 }
433
434 void dlm_hold_rsb(struct dlm_rsb *r)
435 {
436         hold_rsb(r);
437 }
438
439 static void toss_rsb(struct kref *kref)
440 {
441         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
442         struct dlm_ls *ls = r->res_ls;
443
444         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
445         kref_init(&r->res_ref);
446         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
447         r->res_toss_time = jiffies;
448         if (r->res_lvbptr) {
449                 free_lvb(r->res_lvbptr);
450                 r->res_lvbptr = NULL;
451         }
452 }
453
454 /* When all references to the rsb are gone it's transfered to
455    the tossed list for later disposal. */
456
457 static void put_rsb(struct dlm_rsb *r)
458 {
459         struct dlm_ls *ls = r->res_ls;
460         uint32_t bucket = r->res_bucket;
461
462         write_lock(&ls->ls_rsbtbl[bucket].lock);
463         kref_put(&r->res_ref, toss_rsb);
464         write_unlock(&ls->ls_rsbtbl[bucket].lock);
465 }
466
467 void dlm_put_rsb(struct dlm_rsb *r)
468 {
469         put_rsb(r);
470 }
471
472 /* See comment for unhold_lkb */
473
474 static void unhold_rsb(struct dlm_rsb *r)
475 {
476         int rv;
477         rv = kref_put(&r->res_ref, toss_rsb);
478         DLM_ASSERT(!rv, dlm_print_rsb(r););
479 }
480
481 static void kill_rsb(struct kref *kref)
482 {
483         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
484
485         /* All work is done after the return from kref_put() so we
486            can release the write_lock before the remove and free. */
487
488         DLM_ASSERT(list_empty(&r->res_lookup),);
489         DLM_ASSERT(list_empty(&r->res_grantqueue),);
490         DLM_ASSERT(list_empty(&r->res_convertqueue),);
491         DLM_ASSERT(list_empty(&r->res_waitqueue),);
492         DLM_ASSERT(list_empty(&r->res_root_list),);
493         DLM_ASSERT(list_empty(&r->res_recover_list),);
494 }
495
496 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
497    The rsb must exist as long as any lkb's for it do. */
498
499 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
500 {
501         hold_rsb(r);
502         lkb->lkb_resource = r;
503 }
504
505 static void detach_lkb(struct dlm_lkb *lkb)
506 {
507         if (lkb->lkb_resource) {
508                 put_rsb(lkb->lkb_resource);
509                 lkb->lkb_resource = NULL;
510         }
511 }
512
513 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
514 {
515         struct dlm_lkb *lkb, *tmp;
516         uint32_t lkid = 0;
517         uint16_t bucket;
518
519         lkb = allocate_lkb(ls);
520         if (!lkb)
521                 return -ENOMEM;
522
523         lkb->lkb_nodeid = -1;
524         lkb->lkb_grmode = DLM_LOCK_IV;
525         kref_init(&lkb->lkb_ref);
526
527         get_random_bytes(&bucket, sizeof(bucket));
528         bucket &= (ls->ls_lkbtbl_size - 1);
529
530         write_lock(&ls->ls_lkbtbl[bucket].lock);
531
532         /* counter can roll over so we must verify lkid is not in use */
533
534         while (lkid == 0) {
535                 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
536
537                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
538                                     lkb_idtbl_list) {
539                         if (tmp->lkb_id != lkid)
540                                 continue;
541                         lkid = 0;
542                         break;
543                 }
544         }
545
546         lkb->lkb_id = lkid;
547         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
548         write_unlock(&ls->ls_lkbtbl[bucket].lock);
549
550         *lkb_ret = lkb;
551         return 0;
552 }
553
554 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
555 {
556         uint16_t bucket = lkid & 0xFFFF;
557         struct dlm_lkb *lkb;
558
559         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
560                 if (lkb->lkb_id == lkid)
561                         return lkb;
562         }
563         return NULL;
564 }
565
566 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
567 {
568         struct dlm_lkb *lkb;
569         uint16_t bucket = lkid & 0xFFFF;
570
571         if (bucket >= ls->ls_lkbtbl_size)
572                 return -EBADSLT;
573
574         read_lock(&ls->ls_lkbtbl[bucket].lock);
575         lkb = __find_lkb(ls, lkid);
576         if (lkb)
577                 kref_get(&lkb->lkb_ref);
578         read_unlock(&ls->ls_lkbtbl[bucket].lock);
579
580         *lkb_ret = lkb;
581         return lkb ? 0 : -ENOENT;
582 }
583
584 static void kill_lkb(struct kref *kref)
585 {
586         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
587
588         /* All work is done after the return from kref_put() so we
589            can release the write_lock before the detach_lkb */
590
591         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
592 }
593
594 static int put_lkb(struct dlm_lkb *lkb)
595 {
596         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
597         uint16_t bucket = lkb->lkb_id & 0xFFFF;
598
599         write_lock(&ls->ls_lkbtbl[bucket].lock);
600         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
601                 list_del(&lkb->lkb_idtbl_list);
602                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
603
604                 detach_lkb(lkb);
605
606                 /* for local/process lkbs, lvbptr points to caller's lksb */
607                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
608                         free_lvb(lkb->lkb_lvbptr);
609                 if (lkb->lkb_range)
610                         free_range(lkb->lkb_range);
611                 free_lkb(lkb);
612                 return 1;
613         } else {
614                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
615                 return 0;
616         }
617 }
618
619 int dlm_put_lkb(struct dlm_lkb *lkb)
620 {
621         return put_lkb(lkb);
622 }
623
624 /* This is only called to add a reference when the code already holds
625    a valid reference to the lkb, so there's no need for locking. */
626
627 static inline void hold_lkb(struct dlm_lkb *lkb)
628 {
629         kref_get(&lkb->lkb_ref);
630 }
631
632 /* This is called when we need to remove a reference and are certain
633    it's not the last ref.  e.g. del_lkb is always called between a
634    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
635    put_lkb would work fine, but would involve unnecessary locking */
636
637 static inline void unhold_lkb(struct dlm_lkb *lkb)
638 {
639         int rv;
640         rv = kref_put(&lkb->lkb_ref, kill_lkb);
641         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
642 }
643
644 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
645                             int mode)
646 {
647         struct dlm_lkb *lkb = NULL;
648
649         list_for_each_entry(lkb, head, lkb_statequeue)
650                 if (lkb->lkb_rqmode < mode)
651                         break;
652
653         if (!lkb)
654                 list_add_tail(new, head);
655         else
656                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
657 }
658
659 /* add/remove lkb to rsb's grant/convert/wait queue */
660
661 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
662 {
663         kref_get(&lkb->lkb_ref);
664
665         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
666
667         lkb->lkb_status = status;
668
669         switch (status) {
670         case DLM_LKSTS_WAITING:
671                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
672                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
673                 else
674                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
675                 break;
676         case DLM_LKSTS_GRANTED:
677                 /* convention says granted locks kept in order of grmode */
678                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
679                                 lkb->lkb_grmode);
680                 break;
681         case DLM_LKSTS_CONVERT:
682                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
683                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
684                 else
685                         list_add_tail(&lkb->lkb_statequeue,
686                                       &r->res_convertqueue);
687                 break;
688         default:
689                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
690         }
691 }
692
693 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
694 {
695         lkb->lkb_status = 0;
696         list_del(&lkb->lkb_statequeue);
697         unhold_lkb(lkb);
698 }
699
700 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
701 {
702         hold_lkb(lkb);
703         del_lkb(r, lkb);
704         add_lkb(r, lkb, sts);
705         unhold_lkb(lkb);
706 }
707
708 /* add/remove lkb from global waiters list of lkb's waiting for
709    a reply from a remote node */
710
711 static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
712 {
713         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
714
715         down(&ls->ls_waiters_sem);
716         if (lkb->lkb_wait_type) {
717                 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
718                 goto out;
719         }
720         lkb->lkb_wait_type = mstype;
721         kref_get(&lkb->lkb_ref);
722         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
723  out:
724         up(&ls->ls_waiters_sem);
725 }
726
727 static int _remove_from_waiters(struct dlm_lkb *lkb)
728 {
729         int error = 0;
730
731         if (!lkb->lkb_wait_type) {
732                 log_print("remove_from_waiters error");
733                 error = -EINVAL;
734                 goto out;
735         }
736         lkb->lkb_wait_type = 0;
737         list_del(&lkb->lkb_wait_reply);
738         unhold_lkb(lkb);
739  out:
740         return error;
741 }
742
743 static int remove_from_waiters(struct dlm_lkb *lkb)
744 {
745         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
746         int error;
747
748         down(&ls->ls_waiters_sem);
749         error = _remove_from_waiters(lkb);
750         up(&ls->ls_waiters_sem);
751         return error;
752 }
753
754 static void dir_remove(struct dlm_rsb *r)
755 {
756         int to_nodeid;
757
758         if (dlm_no_directory(r->res_ls))
759                 return;
760
761         to_nodeid = dlm_dir_nodeid(r);
762         if (to_nodeid != dlm_our_nodeid())
763                 send_remove(r);
764         else
765                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
766                                      r->res_name, r->res_length);
767 }
768
769 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
770    found since they are in order of newest to oldest? */
771
772 static int shrink_bucket(struct dlm_ls *ls, int b)
773 {
774         struct dlm_rsb *r;
775         int count = 0, found;
776
777         for (;;) {
778                 found = FALSE;
779                 write_lock(&ls->ls_rsbtbl[b].lock);
780                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
781                                             res_hashchain) {
782                         if (!time_after_eq(jiffies, r->res_toss_time +
783                                            dlm_config.toss_secs * HZ))
784                                 continue;
785                         found = TRUE;
786                         break;
787                 }
788
789                 if (!found) {
790                         write_unlock(&ls->ls_rsbtbl[b].lock);
791                         break;
792                 }
793
794                 if (kref_put(&r->res_ref, kill_rsb)) {
795                         list_del(&r->res_hashchain);
796                         write_unlock(&ls->ls_rsbtbl[b].lock);
797
798                         if (is_master(r))
799                                 dir_remove(r);
800                         free_rsb(r);
801                         count++;
802                 } else {
803                         write_unlock(&ls->ls_rsbtbl[b].lock);
804                         log_error(ls, "tossed rsb in use %s", r->res_name);
805                 }
806         }
807
808         return count;
809 }
810
811 void dlm_scan_rsbs(struct dlm_ls *ls)
812 {
813         int i;
814
815         if (dlm_locking_stopped(ls))
816                 return;
817
818         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
819                 shrink_bucket(ls, i);
820                 cond_resched();
821         }
822 }
823
824 /* lkb is master or local copy */
825
826 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
827 {
828         int b, len = r->res_ls->ls_lvblen;
829
830         /* b=1 lvb returned to caller
831            b=0 lvb written to rsb or invalidated
832            b=-1 do nothing */
833
834         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
835
836         if (b == 1) {
837                 if (!lkb->lkb_lvbptr)
838                         return;
839
840                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
841                         return;
842
843                 if (!r->res_lvbptr)
844                         return;
845
846                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
847                 lkb->lkb_lvbseq = r->res_lvbseq;
848
849         } else if (b == 0) {
850                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
851                         rsb_set_flag(r, RSB_VALNOTVALID);
852                         return;
853                 }
854
855                 if (!lkb->lkb_lvbptr)
856                         return;
857
858                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
859                         return;
860
861                 if (!r->res_lvbptr)
862                         r->res_lvbptr = allocate_lvb(r->res_ls);
863
864                 if (!r->res_lvbptr)
865                         return;
866
867                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
868                 r->res_lvbseq++;
869                 lkb->lkb_lvbseq = r->res_lvbseq;
870                 rsb_clear_flag(r, RSB_VALNOTVALID);
871         }
872
873         if (rsb_flag(r, RSB_VALNOTVALID))
874                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
875 }
876
877 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
878 {
879         if (lkb->lkb_grmode < DLM_LOCK_PW)
880                 return;
881
882         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
883                 rsb_set_flag(r, RSB_VALNOTVALID);
884                 return;
885         }
886
887         if (!lkb->lkb_lvbptr)
888                 return;
889
890         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
891                 return;
892
893         if (!r->res_lvbptr)
894                 r->res_lvbptr = allocate_lvb(r->res_ls);
895
896         if (!r->res_lvbptr)
897                 return;
898
899         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
900         r->res_lvbseq++;
901         rsb_clear_flag(r, RSB_VALNOTVALID);
902 }
903
904 /* lkb is process copy (pc) */
905
906 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
907                             struct dlm_message *ms)
908 {
909         int b;
910
911         if (!lkb->lkb_lvbptr)
912                 return;
913
914         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
915                 return;
916
917         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
918         if (b == 1) {
919                 int len = receive_extralen(ms);
920                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
921                 lkb->lkb_lvbseq = ms->m_lvbseq;
922         }
923 }
924
925 /* Manipulate lkb's on rsb's convert/granted/waiting queues
926    remove_lock -- used for unlock, removes lkb from granted
927    revert_lock -- used for cancel, moves lkb from convert to granted
928    grant_lock  -- used for request and convert, adds lkb to granted or
929                   moves lkb from convert or waiting to granted
930
931    Each of these is used for master or local copy lkb's.  There is
932    also a _pc() variation used to make the corresponding change on
933    a process copy (pc) lkb. */
934
935 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
936 {
937         del_lkb(r, lkb);
938         lkb->lkb_grmode = DLM_LOCK_IV;
939         /* this unhold undoes the original ref from create_lkb()
940            so this leads to the lkb being freed */
941         unhold_lkb(lkb);
942 }
943
944 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
945 {
946         set_lvb_unlock(r, lkb);
947         _remove_lock(r, lkb);
948 }
949
950 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
951 {
952         _remove_lock(r, lkb);
953 }
954
955 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
956 {
957         lkb->lkb_rqmode = DLM_LOCK_IV;
958
959         switch (lkb->lkb_status) {
960         case DLM_LKSTS_CONVERT:
961                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
962                 break;
963         case DLM_LKSTS_WAITING:
964                 del_lkb(r, lkb);
965                 lkb->lkb_grmode = DLM_LOCK_IV;
966                 /* this unhold undoes the original ref from create_lkb()
967                    so this leads to the lkb being freed */
968                 unhold_lkb(lkb);
969                 break;
970         default:
971                 log_print("invalid status for revert %d", lkb->lkb_status);
972         }
973 }
974
975 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
976 {
977         revert_lock(r, lkb);
978 }
979
980 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
981 {
982         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
983                 lkb->lkb_grmode = lkb->lkb_rqmode;
984                 if (lkb->lkb_status)
985                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
986                 else
987                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
988         }
989
990         lkb->lkb_rqmode = DLM_LOCK_IV;
991
992         if (lkb->lkb_range) {
993                 lkb->lkb_range[GR_RANGE_START] = lkb->lkb_range[RQ_RANGE_START];
994                 lkb->lkb_range[GR_RANGE_END] = lkb->lkb_range[RQ_RANGE_END];
995         }
996 }
997
998 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
999 {
1000         set_lvb_lock(r, lkb);
1001         _grant_lock(r, lkb);
1002         lkb->lkb_highbast = 0;
1003 }
1004
1005 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1006                           struct dlm_message *ms)
1007 {
1008         set_lvb_lock_pc(r, lkb, ms);
1009         _grant_lock(r, lkb);
1010 }
1011
1012 /* called by grant_pending_locks() which means an async grant message must
1013    be sent to the requesting node in addition to granting the lock if the
1014    lkb belongs to a remote node. */
1015
1016 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1017 {
1018         grant_lock(r, lkb);
1019         if (is_master_copy(lkb))
1020                 send_grant(r, lkb);
1021         else
1022                 queue_cast(r, lkb, 0);
1023 }
1024
1025 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1026 {
1027         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1028                                            lkb_statequeue);
1029         if (lkb->lkb_id == first->lkb_id)
1030                 return TRUE;
1031
1032         return FALSE;
1033 }
1034
1035 /* Return 1 if the locks' ranges overlap.  If the lkb has no range then it is
1036    assumed to cover 0-ffffffff.ffffffff */
1037
1038 static inline int ranges_overlap(struct dlm_lkb *lkb1, struct dlm_lkb *lkb2)
1039 {
1040         if (!lkb1->lkb_range || !lkb2->lkb_range)
1041                 return TRUE;
1042
1043         if (lkb1->lkb_range[RQ_RANGE_END] < lkb2->lkb_range[GR_RANGE_START] ||
1044             lkb1->lkb_range[RQ_RANGE_START] > lkb2->lkb_range[GR_RANGE_END])
1045                 return FALSE;
1046
1047         return TRUE;
1048 }
1049
1050 /* Check if the given lkb conflicts with another lkb on the queue. */
1051
1052 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1053 {
1054         struct dlm_lkb *this;
1055
1056         list_for_each_entry(this, head, lkb_statequeue) {
1057                 if (this == lkb)
1058                         continue;
1059                 if (ranges_overlap(lkb, this) && !modes_compat(this, lkb))
1060                         return TRUE;
1061         }
1062         return FALSE;
1063 }
1064
1065 /*
1066  * "A conversion deadlock arises with a pair of lock requests in the converting
1067  * queue for one resource.  The granted mode of each lock blocks the requested
1068  * mode of the other lock."
1069  *
1070  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1071  * convert queue from being granted, then demote lkb (set grmode to NL).
1072  * This second form requires that we check for conv-deadlk even when
1073  * now == 0 in _can_be_granted().
1074  *
1075  * Example:
1076  * Granted Queue: empty
1077  * Convert Queue: NL->EX (first lock)
1078  *                PR->EX (second lock)
1079  *
1080  * The first lock can't be granted because of the granted mode of the second
1081  * lock and the second lock can't be granted because it's not first in the
1082  * list.  We demote the granted mode of the second lock (the lkb passed to this
1083  * function).
1084  *
1085  * After the resolution, the "grant pending" function needs to go back and try
1086  * to grant locks on the convert queue again since the first lock can now be
1087  * granted.
1088  */
1089
1090 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1091 {
1092         struct dlm_lkb *this, *first = NULL, *self = NULL;
1093
1094         list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1095                 if (!first)
1096                         first = this;
1097                 if (this == lkb) {
1098                         self = lkb;
1099                         continue;
1100                 }
1101
1102                 if (!ranges_overlap(lkb, this))
1103                         continue;
1104
1105                 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1106                         return TRUE;
1107         }
1108
1109         /* if lkb is on the convert queue and is preventing the first
1110            from being granted, then there's deadlock and we demote lkb.
1111            multiple converting locks may need to do this before the first
1112            converting lock can be granted. */
1113
1114         if (self && self != first) {
1115                 if (!modes_compat(lkb, first) &&
1116                     !queue_conflict(&rsb->res_grantqueue, first))
1117                         return TRUE;
1118         }
1119
1120         return FALSE;
1121 }
1122
1123 /*
1124  * Return 1 if the lock can be granted, 0 otherwise.
1125  * Also detect and resolve conversion deadlocks.
1126  *
1127  * lkb is the lock to be granted
1128  *
1129  * now is 1 if the function is being called in the context of the
1130  * immediate request, it is 0 if called later, after the lock has been
1131  * queued.
1132  *
1133  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1134  */
1135
1136 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1137 {
1138         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1139
1140         /*
1141          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1142          * a new request for a NL mode lock being blocked.
1143          *
1144          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1145          * request, then it would be granted.  In essence, the use of this flag
1146          * tells the Lock Manager to expedite theis request by not considering
1147          * what may be in the CONVERTING or WAITING queues...  As of this
1148          * writing, the EXPEDITE flag can be used only with new requests for NL
1149          * mode locks.  This flag is not valid for conversion requests.
1150          *
1151          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1152          * conversion or used with a non-NL requested mode.  We also know an
1153          * EXPEDITE request is always granted immediately, so now must always
1154          * be 1.  The full condition to grant an expedite request: (now &&
1155          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1156          * therefore be shortened to just checking the flag.
1157          */
1158
1159         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1160                 return TRUE;
1161
1162         /*
1163          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1164          * added to the remaining conditions.
1165          */
1166
1167         if (queue_conflict(&r->res_grantqueue, lkb))
1168                 goto out;
1169
1170         /*
1171          * 6-3: By default, a conversion request is immediately granted if the
1172          * requested mode is compatible with the modes of all other granted
1173          * locks
1174          */
1175
1176         if (queue_conflict(&r->res_convertqueue, lkb))
1177                 goto out;
1178
1179         /*
1180          * 6-5: But the default algorithm for deciding whether to grant or
1181          * queue conversion requests does not by itself guarantee that such
1182          * requests are serviced on a "first come first serve" basis.  This, in
1183          * turn, can lead to a phenomenon known as "indefinate postponement".
1184          *
1185          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1186          * the system service employed to request a lock conversion.  This flag
1187          * forces certain conversion requests to be queued, even if they are
1188          * compatible with the granted modes of other locks on the same
1189          * resource.  Thus, the use of this flag results in conversion requests
1190          * being ordered on a "first come first servce" basis.
1191          *
1192          * DCT: This condition is all about new conversions being able to occur
1193          * "in place" while the lock remains on the granted queue (assuming
1194          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1195          * doesn't _have_ to go onto the convert queue where it's processed in
1196          * order.  The "now" variable is necessary to distinguish converts
1197          * being received and processed for the first time now, because once a
1198          * convert is moved to the conversion queue the condition below applies
1199          * requiring fifo granting.
1200          */
1201
1202         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1203                 return TRUE;
1204
1205         /*
1206          * When using range locks the NOORDER flag is set to avoid the standard
1207          * vms rules on grant order.
1208          */
1209
1210         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1211                 return TRUE;
1212
1213         /*
1214          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1215          * granted until all other conversion requests ahead of it are granted
1216          * and/or canceled.
1217          */
1218
1219         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1220                 return TRUE;
1221
1222         /*
1223          * 6-4: By default, a new request is immediately granted only if all
1224          * three of the following conditions are satisfied when the request is
1225          * issued:
1226          * - The queue of ungranted conversion requests for the resource is
1227          *   empty.
1228          * - The queue of ungranted new requests for the resource is empty.
1229          * - The mode of the new request is compatible with the most
1230          *   restrictive mode of all granted locks on the resource.
1231          */
1232
1233         if (now && !conv && list_empty(&r->res_convertqueue) &&
1234             list_empty(&r->res_waitqueue))
1235                 return TRUE;
1236
1237         /*
1238          * 6-4: Once a lock request is in the queue of ungranted new requests,
1239          * it cannot be granted until the queue of ungranted conversion
1240          * requests is empty, all ungranted new requests ahead of it are
1241          * granted and/or canceled, and it is compatible with the granted mode
1242          * of the most restrictive lock granted on the resource.
1243          */
1244
1245         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1246             first_in_list(lkb, &r->res_waitqueue))
1247                 return TRUE;
1248
1249  out:
1250         /*
1251          * The following, enabled by CONVDEADLK, departs from VMS.
1252          */
1253
1254         if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1255             conversion_deadlock_detect(r, lkb)) {
1256                 lkb->lkb_grmode = DLM_LOCK_NL;
1257                 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1258         }
1259
1260         return FALSE;
1261 }
1262
1263 /*
1264  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1265  * simple way to provide a big optimization to applications that can use them.
1266  */
1267
1268 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1269 {
1270         uint32_t flags = lkb->lkb_exflags;
1271         int rv;
1272         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1273
1274         rv = _can_be_granted(r, lkb, now);
1275         if (rv)
1276                 goto out;
1277
1278         if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1279                 goto out;
1280
1281         if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1282                 alt = DLM_LOCK_PR;
1283         else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1284                 alt = DLM_LOCK_CW;
1285
1286         if (alt) {
1287                 lkb->lkb_rqmode = alt;
1288                 rv = _can_be_granted(r, lkb, now);
1289                 if (rv)
1290                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1291                 else
1292                         lkb->lkb_rqmode = rqmode;
1293         }
1294  out:
1295         return rv;
1296 }
1297
1298 static int grant_pending_convert(struct dlm_rsb *r, int high)
1299 {
1300         struct dlm_lkb *lkb, *s;
1301         int hi, demoted, quit, grant_restart, demote_restart;
1302
1303         quit = 0;
1304  restart:
1305         grant_restart = 0;
1306         demote_restart = 0;
1307         hi = DLM_LOCK_IV;
1308
1309         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1310                 demoted = is_demoted(lkb);
1311                 if (can_be_granted(r, lkb, FALSE)) {
1312                         grant_lock_pending(r, lkb);
1313                         grant_restart = 1;
1314                 } else {
1315                         hi = max_t(int, lkb->lkb_rqmode, hi);
1316                         if (!demoted && is_demoted(lkb))
1317                                 demote_restart = 1;
1318                 }
1319         }
1320
1321         if (grant_restart)
1322                 goto restart;
1323         if (demote_restart && !quit) {
1324                 quit = 1;
1325                 goto restart;
1326         }
1327
1328         return max_t(int, high, hi);
1329 }
1330
1331 static int grant_pending_wait(struct dlm_rsb *r, int high)
1332 {
1333         struct dlm_lkb *lkb, *s;
1334
1335         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1336                 if (can_be_granted(r, lkb, FALSE))
1337                         grant_lock_pending(r, lkb);
1338                 else
1339                         high = max_t(int, lkb->lkb_rqmode, high);
1340         }
1341
1342         return high;
1343 }
1344
1345 static void grant_pending_locks(struct dlm_rsb *r)
1346 {
1347         struct dlm_lkb *lkb, *s;
1348         int high = DLM_LOCK_IV;
1349
1350         DLM_ASSERT(is_master(r), dlm_print_rsb(r););
1351
1352         high = grant_pending_convert(r, high);
1353         high = grant_pending_wait(r, high);
1354
1355         if (high == DLM_LOCK_IV)
1356                 return;
1357
1358         /*
1359          * If there are locks left on the wait/convert queue then send blocking
1360          * ASTs to granted locks based on the largest requested mode (high)
1361          * found above.  This can generate spurious blocking ASTs for range
1362          * locks. FIXME: highbast < high comparison not valid for PR/CW.
1363          */
1364
1365         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1366                 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1367                     !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1368                         queue_bast(r, lkb, high);
1369                         lkb->lkb_highbast = high;
1370                 }
1371         }
1372 }
1373
1374 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1375                             struct dlm_lkb *lkb)
1376 {
1377         struct dlm_lkb *gr;
1378
1379         list_for_each_entry(gr, head, lkb_statequeue) {
1380                 if (gr->lkb_bastaddr &&
1381                     gr->lkb_highbast < lkb->lkb_rqmode &&
1382                     ranges_overlap(lkb, gr) && !modes_compat(gr, lkb)) {
1383                         queue_bast(r, gr, lkb->lkb_rqmode);
1384                         gr->lkb_highbast = lkb->lkb_rqmode;
1385                 }
1386         }
1387 }
1388
1389 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1390 {
1391         send_bast_queue(r, &r->res_grantqueue, lkb);
1392 }
1393
1394 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1395 {
1396         send_bast_queue(r, &r->res_grantqueue, lkb);
1397         send_bast_queue(r, &r->res_convertqueue, lkb);
1398 }
1399
1400 /* set_master(r, lkb) -- set the master nodeid of a resource
1401
1402    The purpose of this function is to set the nodeid field in the given
1403    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1404    known, it can just be copied to the lkb and the function will return
1405    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1406    before it can be copied to the lkb.
1407
1408    When the rsb nodeid is being looked up remotely, the initial lkb
1409    causing the lookup is kept on the ls_waiters list waiting for the
1410    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1411    on the rsb's res_lookup list until the master is verified.
1412
1413    Return values:
1414    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1415    1: the rsb master is not available and the lkb has been placed on
1416       a wait queue
1417 */
1418
1419 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1420 {
1421         struct dlm_ls *ls = r->res_ls;
1422         int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1423
1424         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1425                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1426                 r->res_first_lkid = lkb->lkb_id;
1427                 lkb->lkb_nodeid = r->res_nodeid;
1428                 return 0;
1429         }
1430
1431         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1432                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1433                 return 1;
1434         }
1435
1436         if (r->res_nodeid == 0) {
1437                 lkb->lkb_nodeid = 0;
1438                 return 0;
1439         }
1440
1441         if (r->res_nodeid > 0) {
1442                 lkb->lkb_nodeid = r->res_nodeid;
1443                 return 0;
1444         }
1445
1446         DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
1447
1448         dir_nodeid = dlm_dir_nodeid(r);
1449
1450         if (dir_nodeid != our_nodeid) {
1451                 r->res_first_lkid = lkb->lkb_id;
1452                 send_lookup(r, lkb);
1453                 return 1;
1454         }
1455
1456         for (;;) {
1457                 /* It's possible for dlm_scand to remove an old rsb for
1458                    this same resource from the toss list, us to create
1459                    a new one, look up the master locally, and find it
1460                    already exists just before dlm_scand does the
1461                    dir_remove() on the previous rsb. */
1462
1463                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1464                                        r->res_length, &ret_nodeid);
1465                 if (!error)
1466                         break;
1467                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1468                 schedule();
1469         }
1470
1471         if (ret_nodeid == our_nodeid) {
1472                 r->res_first_lkid = 0;
1473                 r->res_nodeid = 0;
1474                 lkb->lkb_nodeid = 0;
1475         } else {
1476                 r->res_first_lkid = lkb->lkb_id;
1477                 r->res_nodeid = ret_nodeid;
1478                 lkb->lkb_nodeid = ret_nodeid;
1479         }
1480         return 0;
1481 }
1482
1483 static void process_lookup_list(struct dlm_rsb *r)
1484 {
1485         struct dlm_lkb *lkb, *safe;
1486
1487         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1488                 list_del(&lkb->lkb_rsb_lookup);
1489                 _request_lock(r, lkb);
1490                 schedule();
1491         }
1492 }
1493
1494 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1495
1496 static void confirm_master(struct dlm_rsb *r, int error)
1497 {
1498         struct dlm_lkb *lkb;
1499
1500         if (!r->res_first_lkid)
1501                 return;
1502
1503         switch (error) {
1504         case 0:
1505         case -EINPROGRESS:
1506                 r->res_first_lkid = 0;
1507                 process_lookup_list(r);
1508                 break;
1509
1510         case -EAGAIN:
1511                 /* the remote master didn't queue our NOQUEUE request;
1512                    make a waiting lkb the first_lkid */
1513
1514                 r->res_first_lkid = 0;
1515
1516                 if (!list_empty(&r->res_lookup)) {
1517                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1518                                          lkb_rsb_lookup);
1519                         list_del(&lkb->lkb_rsb_lookup);
1520                         r->res_first_lkid = lkb->lkb_id;
1521                         _request_lock(r, lkb);
1522                 } else
1523                         r->res_nodeid = -1;
1524                 break;
1525
1526         default:
1527                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1528         }
1529 }
1530
1531 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1532                          int namelen, uint32_t parent_lkid, void *ast,
1533                          void *astarg, void *bast, struct dlm_range *range,
1534                          struct dlm_args *args)
1535 {
1536         int rv = -EINVAL;
1537
1538         /* check for invalid arg usage */
1539
1540         if (mode < 0 || mode > DLM_LOCK_EX)
1541                 goto out;
1542
1543         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1544                 goto out;
1545
1546         if (flags & DLM_LKF_CANCEL)
1547                 goto out;
1548
1549         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1550                 goto out;
1551
1552         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1553                 goto out;
1554
1555         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1556                 goto out;
1557
1558         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1559                 goto out;
1560
1561         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1562                 goto out;
1563
1564         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1565                 goto out;
1566
1567         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1568                 goto out;
1569
1570         if (!ast || !lksb)
1571                 goto out;
1572
1573         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1574                 goto out;
1575
1576         /* parent/child locks not yet supported */
1577         if (parent_lkid)
1578                 goto out;
1579
1580         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1581                 goto out;
1582
1583         /* these args will be copied to the lkb in validate_lock_args,
1584            it cannot be done now because when converting locks, fields in
1585            an active lkb cannot be modified before locking the rsb */
1586
1587         args->flags = flags;
1588         args->astaddr = ast;
1589         args->astparam = (long) astarg;
1590         args->bastaddr = bast;
1591         args->mode = mode;
1592         args->lksb = lksb;
1593         args->range = range;
1594         rv = 0;
1595  out:
1596         return rv;
1597 }
1598
1599 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1600 {
1601         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1602                       DLM_LKF_FORCEUNLOCK))
1603                 return -EINVAL;
1604
1605         args->flags = flags;
1606         args->astparam = (long) astarg;
1607         return 0;
1608 }
1609
1610 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1611                               struct dlm_args *args)
1612 {
1613         int rv = -EINVAL;
1614
1615         if (args->flags & DLM_LKF_CONVERT) {
1616                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1617                         goto out;
1618
1619                 if (args->flags & DLM_LKF_QUECVT &&
1620                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1621                         goto out;
1622
1623                 rv = -EBUSY;
1624                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1625                         goto out;
1626
1627                 if (lkb->lkb_wait_type)
1628                         goto out;
1629         }
1630
1631         lkb->lkb_exflags = args->flags;
1632         lkb->lkb_sbflags = 0;
1633         lkb->lkb_astaddr = args->astaddr;
1634         lkb->lkb_astparam = args->astparam;
1635         lkb->lkb_bastaddr = args->bastaddr;
1636         lkb->lkb_rqmode = args->mode;
1637         lkb->lkb_lksb = args->lksb;
1638         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1639         lkb->lkb_ownpid = (int) current->pid;
1640
1641         rv = 0;
1642         if (!args->range)
1643                 goto out;
1644
1645         if (!lkb->lkb_range) {
1646                 rv = -ENOMEM;
1647                 lkb->lkb_range = allocate_range(ls);
1648                 if (!lkb->lkb_range)
1649                         goto out;
1650                 /* This is needed for conversions that contain ranges
1651                    where the original lock didn't but it's harmless for
1652                    new locks too. */
1653                 lkb->lkb_range[GR_RANGE_START] = 0LL;
1654                 lkb->lkb_range[GR_RANGE_END] = 0xffffffffffffffffULL;
1655         }
1656
1657         lkb->lkb_range[RQ_RANGE_START] = args->range->ra_start;
1658         lkb->lkb_range[RQ_RANGE_END] = args->range->ra_end;
1659         lkb->lkb_flags |= DLM_IFL_RANGE;
1660         rv = 0;
1661  out:
1662         return rv;
1663 }
1664
1665 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1666 {
1667         int rv = -EINVAL;
1668
1669         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1670                 goto out;
1671
1672         if (args->flags & DLM_LKF_FORCEUNLOCK)
1673                 goto out_ok;
1674
1675         if (args->flags & DLM_LKF_CANCEL &&
1676             lkb->lkb_status == DLM_LKSTS_GRANTED)
1677                 goto out;
1678
1679         if (!(args->flags & DLM_LKF_CANCEL) &&
1680             lkb->lkb_status != DLM_LKSTS_GRANTED)
1681                 goto out;
1682
1683         rv = -EBUSY;
1684         if (lkb->lkb_wait_type)
1685                 goto out;
1686
1687  out_ok:
1688         lkb->lkb_exflags = args->flags;
1689         lkb->lkb_sbflags = 0;
1690         lkb->lkb_astparam = args->astparam;
1691
1692         rv = 0;
1693  out:
1694         return rv;
1695 }
1696
1697 /*
1698  * Four stage 4 varieties:
1699  * do_request(), do_convert(), do_unlock(), do_cancel()
1700  * These are called on the master node for the given lock and
1701  * from the central locking logic.
1702  */
1703
1704 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1705 {
1706         int error = 0;
1707
1708         if (can_be_granted(r, lkb, TRUE)) {
1709                 grant_lock(r, lkb);
1710                 queue_cast(r, lkb, 0);
1711                 goto out;
1712         }
1713
1714         if (can_be_queued(lkb)) {
1715                 error = -EINPROGRESS;
1716                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1717                 send_blocking_asts(r, lkb);
1718                 goto out;
1719         }
1720
1721         error = -EAGAIN;
1722         if (force_blocking_asts(lkb))
1723                 send_blocking_asts_all(r, lkb);
1724         queue_cast(r, lkb, -EAGAIN);
1725
1726  out:
1727         return error;
1728 }
1729
1730 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1731 {
1732         int error = 0;
1733
1734         /* changing an existing lock may allow others to be granted */
1735
1736         if (can_be_granted(r, lkb, TRUE)) {
1737                 grant_lock(r, lkb);
1738                 queue_cast(r, lkb, 0);
1739                 grant_pending_locks(r);
1740                 goto out;
1741         }
1742
1743         if (can_be_queued(lkb)) {
1744                 if (is_demoted(lkb))
1745                         grant_pending_locks(r);
1746                 error = -EINPROGRESS;
1747                 del_lkb(r, lkb);
1748                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1749                 send_blocking_asts(r, lkb);
1750                 goto out;
1751         }
1752
1753         error = -EAGAIN;
1754         if (force_blocking_asts(lkb))
1755                 send_blocking_asts_all(r, lkb);
1756         queue_cast(r, lkb, -EAGAIN);
1757
1758  out:
1759         return error;
1760 }
1761
1762 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1763 {
1764         remove_lock(r, lkb);
1765         queue_cast(r, lkb, -DLM_EUNLOCK);
1766         grant_pending_locks(r);
1767         return -DLM_EUNLOCK;
1768 }
1769
1770 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1771 {
1772         revert_lock(r, lkb);
1773         queue_cast(r, lkb, -DLM_ECANCEL);
1774         grant_pending_locks(r);
1775         return -DLM_ECANCEL;
1776 }
1777
1778 /*
1779  * Four stage 3 varieties:
1780  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1781  */
1782
1783 /* add a new lkb to a possibly new rsb, called by requesting process */
1784
1785 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1786 {
1787         int error;
1788
1789         /* set_master: sets lkb nodeid from r */
1790
1791         error = set_master(r, lkb);
1792         if (error < 0)
1793                 goto out;
1794         if (error) {
1795                 error = 0;
1796                 goto out;
1797         }
1798
1799         if (is_remote(r))
1800                 /* receive_request() calls do_request() on remote node */
1801                 error = send_request(r, lkb);
1802         else
1803                 error = do_request(r, lkb);
1804  out:
1805         return error;
1806 }
1807
1808 /* change some property of an existing lkb, e.g. mode, range */
1809
1810 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1811 {
1812         int error;
1813
1814         if (is_remote(r))
1815                 /* receive_convert() calls do_convert() on remote node */
1816                 error = send_convert(r, lkb);
1817         else
1818                 error = do_convert(r, lkb);
1819
1820         return error;
1821 }
1822
1823 /* remove an existing lkb from the granted queue */
1824
1825 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1826 {
1827         int error;
1828
1829         if (is_remote(r))
1830                 /* receive_unlock() calls do_unlock() on remote node */
1831                 error = send_unlock(r, lkb);
1832         else
1833                 error = do_unlock(r, lkb);
1834
1835         return error;
1836 }
1837
1838 /* remove an existing lkb from the convert or wait queue */
1839
1840 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1841 {
1842         int error;
1843
1844         if (is_remote(r))
1845                 /* receive_cancel() calls do_cancel() on remote node */
1846                 error = send_cancel(r, lkb);
1847         else
1848                 error = do_cancel(r, lkb);
1849
1850         return error;
1851 }
1852
1853 /*
1854  * Four stage 2 varieties:
1855  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1856  */
1857
1858 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1859                         int len, struct dlm_args *args)
1860 {
1861         struct dlm_rsb *r;
1862         int error;
1863
1864         error = validate_lock_args(ls, lkb, args);
1865         if (error)
1866                 goto out;
1867
1868         error = find_rsb(ls, name, len, R_CREATE, &r);
1869         if (error)
1870                 goto out;
1871
1872         lock_rsb(r);
1873
1874         attach_lkb(r, lkb);
1875         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1876
1877         error = _request_lock(r, lkb);
1878
1879         unlock_rsb(r);
1880         put_rsb(r);
1881
1882  out:
1883         return error;
1884 }
1885
1886 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1887                         struct dlm_args *args)
1888 {
1889         struct dlm_rsb *r;
1890         int error;
1891
1892         r = lkb->lkb_resource;
1893
1894         hold_rsb(r);
1895         lock_rsb(r);
1896
1897         error = validate_lock_args(ls, lkb, args);
1898         if (error)
1899                 goto out;
1900
1901         error = _convert_lock(r, lkb);
1902  out:
1903         unlock_rsb(r);
1904         put_rsb(r);
1905         return error;
1906 }
1907
1908 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1909                        struct dlm_args *args)
1910 {
1911         struct dlm_rsb *r;
1912         int error;
1913
1914         r = lkb->lkb_resource;
1915
1916         hold_rsb(r);
1917         lock_rsb(r);
1918
1919         error = validate_unlock_args(lkb, args);
1920         if (error)
1921                 goto out;
1922
1923         error = _unlock_lock(r, lkb);
1924  out:
1925         unlock_rsb(r);
1926         put_rsb(r);
1927         return error;
1928 }
1929
1930 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1931                        struct dlm_args *args)
1932 {
1933         struct dlm_rsb *r;
1934         int error;
1935
1936         r = lkb->lkb_resource;
1937
1938         hold_rsb(r);
1939         lock_rsb(r);
1940
1941         error = validate_unlock_args(lkb, args);
1942         if (error)
1943                 goto out;
1944
1945         error = _cancel_lock(r, lkb);
1946  out:
1947         unlock_rsb(r);
1948         put_rsb(r);
1949         return error;
1950 }
1951
1952 /*
1953  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
1954  */
1955
1956 int dlm_lock(dlm_lockspace_t *lockspace,
1957              int mode,
1958              struct dlm_lksb *lksb,
1959              uint32_t flags,
1960              void *name,
1961              unsigned int namelen,
1962              uint32_t parent_lkid,
1963              void (*ast) (void *astarg),
1964              void *astarg,
1965              void (*bast) (void *astarg, int mode),
1966              struct dlm_range *range)
1967 {
1968         struct dlm_ls *ls;
1969         struct dlm_lkb *lkb;
1970         struct dlm_args args;
1971         int error, convert = flags & DLM_LKF_CONVERT;
1972
1973         ls = dlm_find_lockspace_local(lockspace);
1974         if (!ls)
1975                 return -EINVAL;
1976
1977         lock_recovery(ls);
1978
1979         if (convert)
1980                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1981         else
1982                 error = create_lkb(ls, &lkb);
1983
1984         if (error)
1985                 goto out;
1986
1987         error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1988                               astarg, bast, range, &args);
1989         if (error)
1990                 goto out_put;
1991
1992         if (convert)
1993                 error = convert_lock(ls, lkb, &args);
1994         else
1995                 error = request_lock(ls, lkb, name, namelen, &args);
1996
1997         if (error == -EINPROGRESS)
1998                 error = 0;
1999  out_put:
2000         if (convert || error)
2001                 put_lkb(lkb);
2002         if (error == -EAGAIN)
2003                 error = 0;
2004  out:
2005         unlock_recovery(ls);
2006         dlm_put_lockspace(ls);
2007         return error;
2008 }
2009
2010 int dlm_unlock(dlm_lockspace_t *lockspace,
2011                uint32_t lkid,
2012                uint32_t flags,
2013                struct dlm_lksb *lksb,
2014                void *astarg)
2015 {
2016         struct dlm_ls *ls;
2017         struct dlm_lkb *lkb;
2018         struct dlm_args args;
2019         int error;
2020
2021         ls = dlm_find_lockspace_local(lockspace);
2022         if (!ls)
2023                 return -EINVAL;
2024
2025         lock_recovery(ls);
2026
2027         error = find_lkb(ls, lkid, &lkb);
2028         if (error)
2029                 goto out;
2030
2031         error = set_unlock_args(flags, astarg, &args);
2032         if (error)
2033                 goto out_put;
2034
2035         if (flags & DLM_LKF_CANCEL)
2036                 error = cancel_lock(ls, lkb, &args);
2037         else
2038                 error = unlock_lock(ls, lkb, &args);
2039
2040         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2041                 error = 0;
2042  out_put:
2043         put_lkb(lkb);
2044  out:
2045         unlock_recovery(ls);
2046         dlm_put_lockspace(ls);
2047         return error;
2048 }
2049
2050 /*
2051  * send/receive routines for remote operations and replies
2052  *
2053  * send_args
2054  * send_common
2055  * send_request                 receive_request
2056  * send_convert                 receive_convert
2057  * send_unlock                  receive_unlock
2058  * send_cancel                  receive_cancel
2059  * send_grant                   receive_grant
2060  * send_bast                    receive_bast
2061  * send_lookup                  receive_lookup
2062  * send_remove                  receive_remove
2063  *
2064  *                              send_common_reply
2065  * receive_request_reply        send_request_reply
2066  * receive_convert_reply        send_convert_reply
2067  * receive_unlock_reply         send_unlock_reply
2068  * receive_cancel_reply         send_cancel_reply
2069  * receive_lookup_reply         send_lookup_reply
2070  */
2071
2072 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2073                           int to_nodeid, int mstype,
2074                           struct dlm_message **ms_ret,
2075                           struct dlm_mhandle **mh_ret)
2076 {
2077         struct dlm_message *ms;
2078         struct dlm_mhandle *mh;
2079         char *mb;
2080         int mb_len = sizeof(struct dlm_message);
2081
2082         switch (mstype) {
2083         case DLM_MSG_REQUEST:
2084         case DLM_MSG_LOOKUP:
2085         case DLM_MSG_REMOVE:
2086                 mb_len += r->res_length;
2087                 break;
2088         case DLM_MSG_CONVERT:
2089         case DLM_MSG_UNLOCK:
2090         case DLM_MSG_REQUEST_REPLY:
2091         case DLM_MSG_CONVERT_REPLY:
2092         case DLM_MSG_GRANT:
2093                 if (lkb && lkb->lkb_lvbptr)
2094                         mb_len += r->res_ls->ls_lvblen;
2095                 break;
2096         }
2097
2098         /* get_buffer gives us a message handle (mh) that we need to
2099            pass into lowcomms_commit and a message buffer (mb) that we
2100            write our data into */
2101
2102         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2103         if (!mh)
2104                 return -ENOBUFS;
2105
2106         memset(mb, 0, mb_len);
2107
2108         ms = (struct dlm_message *) mb;
2109
2110         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2111         ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2112         ms->m_header.h_nodeid = dlm_our_nodeid();
2113         ms->m_header.h_length = mb_len;
2114         ms->m_header.h_cmd = DLM_MSG;
2115
2116         ms->m_type = mstype;
2117
2118         *mh_ret = mh;
2119         *ms_ret = ms;
2120         return 0;
2121 }
2122
2123 /* further lowcomms enhancements or alternate implementations may make
2124    the return value from this function useful at some point */
2125
2126 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2127 {
2128         dlm_message_out(ms);
2129         dlm_lowcomms_commit_buffer(mh);
2130         return 0;
2131 }
2132
2133 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2134                       struct dlm_message *ms)
2135 {
2136         ms->m_nodeid   = lkb->lkb_nodeid;
2137         ms->m_pid      = lkb->lkb_ownpid;
2138         ms->m_lkid     = lkb->lkb_id;
2139         ms->m_remid    = lkb->lkb_remid;
2140         ms->m_exflags  = lkb->lkb_exflags;
2141         ms->m_sbflags  = lkb->lkb_sbflags;
2142         ms->m_flags    = lkb->lkb_flags;
2143         ms->m_lvbseq   = lkb->lkb_lvbseq;
2144         ms->m_status   = lkb->lkb_status;
2145         ms->m_grmode   = lkb->lkb_grmode;
2146         ms->m_rqmode   = lkb->lkb_rqmode;
2147         ms->m_hash     = r->res_hash;
2148
2149         /* m_result and m_bastmode are set from function args,
2150            not from lkb fields */
2151
2152         if (lkb->lkb_bastaddr)
2153                 ms->m_asts |= AST_BAST;
2154         if (lkb->lkb_astaddr)
2155                 ms->m_asts |= AST_COMP;
2156
2157         if (lkb->lkb_range) {
2158                 ms->m_range[0] = lkb->lkb_range[RQ_RANGE_START];
2159                 ms->m_range[1] = lkb->lkb_range[RQ_RANGE_END];
2160         }
2161
2162         if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2163                 memcpy(ms->m_extra, r->res_name, r->res_length);
2164
2165         else if (lkb->lkb_lvbptr)
2166                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2167
2168 }
2169
2170 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2171 {
2172         struct dlm_message *ms;
2173         struct dlm_mhandle *mh;
2174         int to_nodeid, error;
2175
2176         add_to_waiters(lkb, mstype);
2177
2178         to_nodeid = r->res_nodeid;
2179
2180         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2181         if (error)
2182                 goto fail;
2183
2184         send_args(r, lkb, ms);
2185
2186         error = send_message(mh, ms);
2187         if (error)
2188                 goto fail;
2189         return 0;
2190
2191  fail:
2192         remove_from_waiters(lkb);
2193         return error;
2194 }
2195
2196 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2197 {
2198         return send_common(r, lkb, DLM_MSG_REQUEST);
2199 }
2200
2201 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2202 {
2203         int error;
2204
2205         error = send_common(r, lkb, DLM_MSG_CONVERT);
2206
2207         /* down conversions go without a reply from the master */
2208         if (!error && down_conversion(lkb)) {
2209                 remove_from_waiters(lkb);
2210                 r->res_ls->ls_stub_ms.m_result = 0;
2211                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2212         }
2213
2214         return error;
2215 }
2216
2217 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2218    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2219    that the master is still correct. */
2220
2221 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2222 {
2223         return send_common(r, lkb, DLM_MSG_UNLOCK);
2224 }
2225
2226 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2227 {
2228         return send_common(r, lkb, DLM_MSG_CANCEL);
2229 }
2230
2231 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2232 {
2233         struct dlm_message *ms;
2234         struct dlm_mhandle *mh;
2235         int to_nodeid, error;
2236
2237         to_nodeid = lkb->lkb_nodeid;
2238
2239         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2240         if (error)
2241                 goto out;
2242
2243         send_args(r, lkb, ms);
2244
2245         ms->m_result = 0;
2246
2247         error = send_message(mh, ms);
2248  out:
2249         return error;
2250 }
2251
2252 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2253 {
2254         struct dlm_message *ms;
2255         struct dlm_mhandle *mh;
2256         int to_nodeid, error;
2257
2258         to_nodeid = lkb->lkb_nodeid;
2259
2260         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2261         if (error)
2262                 goto out;
2263
2264         send_args(r, lkb, ms);
2265
2266         ms->m_bastmode = mode;
2267
2268         error = send_message(mh, ms);
2269  out:
2270         return error;
2271 }
2272
2273 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2274 {
2275         struct dlm_message *ms;
2276         struct dlm_mhandle *mh;
2277         int to_nodeid, error;
2278
2279         add_to_waiters(lkb, DLM_MSG_LOOKUP);
2280
2281         to_nodeid = dlm_dir_nodeid(r);
2282
2283         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2284         if (error)
2285                 goto fail;
2286
2287         send_args(r, lkb, ms);
2288
2289         error = send_message(mh, ms);
2290         if (error)
2291                 goto fail;
2292         return 0;
2293
2294  fail:
2295         remove_from_waiters(lkb);
2296         return error;
2297 }
2298
2299 static int send_remove(struct dlm_rsb *r)
2300 {
2301         struct dlm_message *ms;
2302         struct dlm_mhandle *mh;
2303         int to_nodeid, error;
2304
2305         to_nodeid = dlm_dir_nodeid(r);
2306
2307         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2308         if (error)
2309                 goto out;
2310
2311         memcpy(ms->m_extra, r->res_name, r->res_length);
2312         ms->m_hash = r->res_hash;
2313
2314         error = send_message(mh, ms);
2315  out:
2316         return error;
2317 }
2318
2319 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2320                              int mstype, int rv)
2321 {
2322         struct dlm_message *ms;
2323         struct dlm_mhandle *mh;
2324         int to_nodeid, error;
2325
2326         to_nodeid = lkb->lkb_nodeid;
2327
2328         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2329         if (error)
2330                 goto out;
2331
2332         send_args(r, lkb, ms);
2333
2334         ms->m_result = rv;
2335
2336         error = send_message(mh, ms);
2337  out:
2338         return error;
2339 }
2340
2341 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2342 {
2343         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2344 }
2345
2346 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2347 {
2348         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2349 }
2350
2351 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2352 {
2353         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2354 }
2355
2356 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2357 {
2358         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2359 }
2360
2361 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2362                              int ret_nodeid, int rv)
2363 {
2364         struct dlm_rsb *r = &ls->ls_stub_rsb;
2365         struct dlm_message *ms;
2366         struct dlm_mhandle *mh;
2367         int error, nodeid = ms_in->m_header.h_nodeid;
2368
2369         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2370         if (error)
2371                 goto out;
2372
2373         ms->m_lkid = ms_in->m_lkid;
2374         ms->m_result = rv;
2375         ms->m_nodeid = ret_nodeid;
2376
2377         error = send_message(mh, ms);
2378  out:
2379         return error;
2380 }
2381
2382 /* which args we save from a received message depends heavily on the type
2383    of message, unlike the send side where we can safely send everything about
2384    the lkb for any type of message */
2385
2386 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2387 {
2388         lkb->lkb_exflags = ms->m_exflags;
2389         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2390                          (ms->m_flags & 0x0000FFFF);
2391 }
2392
2393 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2394 {
2395         lkb->lkb_sbflags = ms->m_sbflags;
2396         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2397                          (ms->m_flags & 0x0000FFFF);
2398 }
2399
2400 static int receive_extralen(struct dlm_message *ms)
2401 {
2402         return (ms->m_header.h_length - sizeof(struct dlm_message));
2403 }
2404
2405 static int receive_range(struct dlm_ls *ls, struct dlm_lkb *lkb,
2406                          struct dlm_message *ms)
2407 {
2408         if (lkb->lkb_flags & DLM_IFL_RANGE) {
2409                 if (!lkb->lkb_range)
2410                         lkb->lkb_range = allocate_range(ls);
2411                 if (!lkb->lkb_range)
2412                         return -ENOMEM;
2413                 lkb->lkb_range[RQ_RANGE_START] = ms->m_range[0];
2414                 lkb->lkb_range[RQ_RANGE_END] = ms->m_range[1];
2415         }
2416         return 0;
2417 }
2418
2419 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2420                        struct dlm_message *ms)
2421 {
2422         int len;
2423
2424         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2425                 if (!lkb->lkb_lvbptr)
2426                         lkb->lkb_lvbptr = allocate_lvb(ls);
2427                 if (!lkb->lkb_lvbptr)
2428                         return -ENOMEM;
2429                 len = receive_extralen(ms);
2430                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2431         }
2432         return 0;
2433 }
2434
2435 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2436                                 struct dlm_message *ms)
2437 {
2438         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2439         lkb->lkb_ownpid = ms->m_pid;
2440         lkb->lkb_remid = ms->m_lkid;
2441         lkb->lkb_grmode = DLM_LOCK_IV;
2442         lkb->lkb_rqmode = ms->m_rqmode;
2443         lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2444         lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2445
2446         DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2447
2448         if (receive_range(ls, lkb, ms))
2449                 return -ENOMEM;
2450
2451         if (receive_lvb(ls, lkb, ms))
2452                 return -ENOMEM;
2453
2454         return 0;
2455 }
2456
2457 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2458                                 struct dlm_message *ms)
2459 {
2460         if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2461                 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2462                           lkb->lkb_nodeid, ms->m_header.h_nodeid,
2463                           lkb->lkb_id, lkb->lkb_remid);
2464                 return -EINVAL;
2465         }
2466
2467         if (!is_master_copy(lkb))
2468                 return -EINVAL;
2469
2470         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2471                 return -EBUSY;
2472
2473         if (receive_range(ls, lkb, ms))
2474                 return -ENOMEM;
2475         if (lkb->lkb_range) {
2476                 lkb->lkb_range[GR_RANGE_START] = 0LL;
2477                 lkb->lkb_range[GR_RANGE_END] = 0xffffffffffffffffULL;
2478         }
2479
2480         if (receive_lvb(ls, lkb, ms))
2481                 return -ENOMEM;
2482
2483         lkb->lkb_rqmode = ms->m_rqmode;
2484         lkb->lkb_lvbseq = ms->m_lvbseq;
2485
2486         return 0;
2487 }
2488
2489 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2490                                struct dlm_message *ms)
2491 {
2492         if (!is_master_copy(lkb))
2493                 return -EINVAL;
2494         if (receive_lvb(ls, lkb, ms))
2495                 return -ENOMEM;
2496         return 0;
2497 }
2498
2499 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2500    uses to send a reply and that the remote end uses to process the reply. */
2501
2502 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2503 {
2504         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2505         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2506         lkb->lkb_remid = ms->m_lkid;
2507 }
2508
2509 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2510 {
2511         struct dlm_lkb *lkb;
2512         struct dlm_rsb *r;
2513         int error, namelen;
2514
2515         error = create_lkb(ls, &lkb);
2516         if (error)
2517                 goto fail;
2518
2519         receive_flags(lkb, ms);
2520         lkb->lkb_flags |= DLM_IFL_MSTCPY;
2521         error = receive_request_args(ls, lkb, ms);
2522         if (error) {
2523                 put_lkb(lkb);
2524                 goto fail;
2525         }
2526
2527         namelen = receive_extralen(ms);
2528
2529         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2530         if (error) {
2531                 put_lkb(lkb);
2532                 goto fail;
2533         }
2534
2535         lock_rsb(r);
2536
2537         attach_lkb(r, lkb);
2538         error = do_request(r, lkb);
2539         send_request_reply(r, lkb, error);
2540
2541         unlock_rsb(r);
2542         put_rsb(r);
2543
2544         if (error == -EINPROGRESS)
2545                 error = 0;
2546         if (error)
2547                 put_lkb(lkb);
2548         return;
2549
2550  fail:
2551         setup_stub_lkb(ls, ms);
2552         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2553 }
2554
2555 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2556 {
2557         struct dlm_lkb *lkb;
2558         struct dlm_rsb *r;
2559         int error, reply = TRUE;
2560
2561         error = find_lkb(ls, ms->m_remid, &lkb);
2562         if (error)
2563                 goto fail;
2564
2565         r = lkb->lkb_resource;
2566
2567         hold_rsb(r);
2568         lock_rsb(r);
2569
2570         receive_flags(lkb, ms);
2571         error = receive_convert_args(ls, lkb, ms);
2572         if (error)
2573                 goto out;
2574         reply = !down_conversion(lkb);
2575
2576         error = do_convert(r, lkb);
2577  out:
2578         if (reply)
2579                 send_convert_reply(r, lkb, error);
2580
2581         unlock_rsb(r);
2582         put_rsb(r);
2583         put_lkb(lkb);
2584         return;
2585
2586  fail:
2587         setup_stub_lkb(ls, ms);
2588         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2589 }
2590
2591 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2592 {
2593         struct dlm_lkb *lkb;
2594         struct dlm_rsb *r;
2595         int error;
2596
2597         error = find_lkb(ls, ms->m_remid, &lkb);
2598         if (error)
2599                 goto fail;
2600
2601         r = lkb->lkb_resource;
2602
2603         hold_rsb(r);
2604         lock_rsb(r);
2605
2606         receive_flags(lkb, ms);
2607         error = receive_unlock_args(ls, lkb, ms);
2608         if (error)
2609                 goto out;
2610
2611         error = do_unlock(r, lkb);
2612  out:
2613         send_unlock_reply(r, lkb, error);
2614
2615         unlock_rsb(r);
2616         put_rsb(r);
2617         put_lkb(lkb);
2618         return;
2619
2620  fail:
2621         setup_stub_lkb(ls, ms);
2622         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2623 }
2624
2625 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2626 {
2627         struct dlm_lkb *lkb;
2628         struct dlm_rsb *r;
2629         int error;
2630
2631         error = find_lkb(ls, ms->m_remid, &lkb);
2632         if (error)
2633                 goto fail;
2634
2635         receive_flags(lkb, ms);
2636
2637         r = lkb->lkb_resource;
2638
2639         hold_rsb(r);
2640         lock_rsb(r);
2641
2642         error = do_cancel(r, lkb);
2643         send_cancel_reply(r, lkb, error);
2644
2645         unlock_rsb(r);
2646         put_rsb(r);
2647         put_lkb(lkb);
2648         return;
2649
2650  fail:
2651         setup_stub_lkb(ls, ms);
2652         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2653 }
2654
2655 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2656 {
2657         struct dlm_lkb *lkb;
2658         struct dlm_rsb *r;
2659         int error;
2660
2661         error = find_lkb(ls, ms->m_remid, &lkb);
2662         if (error) {
2663                 log_error(ls, "receive_grant no lkb");
2664                 return;
2665         }
2666         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2667
2668         r = lkb->lkb_resource;
2669
2670         hold_rsb(r);
2671         lock_rsb(r);
2672
2673         receive_flags_reply(lkb, ms);
2674         grant_lock_pc(r, lkb, ms);
2675         queue_cast(r, lkb, 0);
2676
2677         unlock_rsb(r);
2678         put_rsb(r);
2679         put_lkb(lkb);
2680 }
2681
2682 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2683 {
2684         struct dlm_lkb *lkb;
2685         struct dlm_rsb *r;
2686         int error;
2687
2688         error = find_lkb(ls, ms->m_remid, &lkb);
2689         if (error) {
2690                 log_error(ls, "receive_bast no lkb");
2691                 return;
2692         }
2693         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2694
2695         r = lkb->lkb_resource;
2696
2697         hold_rsb(r);
2698         lock_rsb(r);
2699
2700         queue_bast(r, lkb, ms->m_bastmode);
2701
2702         unlock_rsb(r);
2703         put_rsb(r);
2704         put_lkb(lkb);
2705 }
2706
2707 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2708 {
2709         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2710
2711         from_nodeid = ms->m_header.h_nodeid;
2712         our_nodeid = dlm_our_nodeid();
2713
2714         len = receive_extralen(ms);
2715
2716         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2717         if (dir_nodeid != our_nodeid) {
2718                 log_error(ls, "lookup dir_nodeid %d from %d",
2719                           dir_nodeid, from_nodeid);
2720                 error = -EINVAL;
2721                 ret_nodeid = -1;
2722                 goto out;
2723         }
2724
2725         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2726
2727         /* Optimization: we're master so treat lookup as a request */
2728         if (!error && ret_nodeid == our_nodeid) {
2729                 receive_request(ls, ms);
2730                 return;
2731         }
2732  out:
2733         send_lookup_reply(ls, ms, ret_nodeid, error);
2734 }
2735
2736 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2737 {
2738         int len, dir_nodeid, from_nodeid;
2739
2740         from_nodeid = ms->m_header.h_nodeid;
2741
2742         len = receive_extralen(ms);
2743
2744         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2745         if (dir_nodeid != dlm_our_nodeid()) {
2746                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2747                           dir_nodeid, from_nodeid);
2748                 return;
2749         }
2750
2751         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2752 }
2753
2754 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2755 {
2756         struct dlm_lkb *lkb;
2757         struct dlm_rsb *r;
2758         int error, mstype;
2759
2760         error = find_lkb(ls, ms->m_remid, &lkb);
2761         if (error) {
2762                 log_error(ls, "receive_request_reply no lkb");
2763                 return;
2764         }
2765         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2766
2767         mstype = lkb->lkb_wait_type;
2768         error = remove_from_waiters(lkb);
2769         if (error) {
2770                 log_error(ls, "receive_request_reply not on waiters");
2771                 goto out;
2772         }
2773
2774         /* this is the value returned from do_request() on the master */
2775         error = ms->m_result;
2776
2777         r = lkb->lkb_resource;
2778         hold_rsb(r);
2779         lock_rsb(r);
2780
2781         /* Optimization: the dir node was also the master, so it took our
2782            lookup as a request and sent request reply instead of lookup reply */
2783         if (mstype == DLM_MSG_LOOKUP) {
2784                 r->res_nodeid = ms->m_header.h_nodeid;
2785                 lkb->lkb_nodeid = r->res_nodeid;
2786         }
2787
2788         switch (error) {
2789         case -EAGAIN:
2790                 /* request would block (be queued) on remote master;
2791                    the unhold undoes the original ref from create_lkb()
2792                    so it leads to the lkb being freed */
2793                 queue_cast(r, lkb, -EAGAIN);
2794                 confirm_master(r, -EAGAIN);
2795                 unhold_lkb(lkb);
2796                 break;
2797
2798         case -EINPROGRESS:
2799         case 0:
2800                 /* request was queued or granted on remote master */
2801                 receive_flags_reply(lkb, ms);
2802                 lkb->lkb_remid = ms->m_lkid;
2803                 if (error)
2804                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
2805                 else {
2806                         grant_lock_pc(r, lkb, ms);
2807                         queue_cast(r, lkb, 0);
2808                 }
2809                 confirm_master(r, error);
2810                 break;
2811
2812         case -ENOENT:
2813         case -ENOTBLK:
2814                 /* find_rsb failed to find rsb or rsb wasn't master */
2815                 r->res_nodeid = -1;
2816                 lkb->lkb_nodeid = -1;
2817                 _request_lock(r, lkb);
2818                 break;
2819
2820         default:
2821                 log_error(ls, "receive_request_reply error %d", error);
2822         }
2823
2824         unlock_rsb(r);
2825         put_rsb(r);
2826  out:
2827         put_lkb(lkb);
2828 }
2829
2830 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2831                                     struct dlm_message *ms)
2832 {
2833         int error = ms->m_result;
2834
2835         /* this is the value returned from do_convert() on the master */
2836
2837         switch (error) {
2838         case -EAGAIN:
2839                 /* convert would block (be queued) on remote master */
2840                 queue_cast(r, lkb, -EAGAIN);
2841                 break;
2842
2843         case -EINPROGRESS:
2844                 /* convert was queued on remote master */
2845                 del_lkb(r, lkb);
2846                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2847                 break;
2848
2849         case 0:
2850                 /* convert was granted on remote master */
2851                 receive_flags_reply(lkb, ms);
2852                 grant_lock_pc(r, lkb, ms);
2853                 queue_cast(r, lkb, 0);
2854                 break;
2855
2856         default:
2857                 log_error(r->res_ls, "receive_convert_reply error %d", error);
2858         }
2859 }
2860
2861 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2862 {
2863         struct dlm_rsb *r = lkb->lkb_resource;
2864
2865         hold_rsb(r);
2866         lock_rsb(r);
2867
2868         __receive_convert_reply(r, lkb, ms);
2869
2870         unlock_rsb(r);
2871         put_rsb(r);
2872 }
2873
2874 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2875 {
2876         struct dlm_lkb *lkb;
2877         int error;
2878
2879         error = find_lkb(ls, ms->m_remid, &lkb);
2880         if (error) {
2881                 log_error(ls, "receive_convert_reply no lkb");
2882                 return;
2883         }
2884         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2885
2886         error = remove_from_waiters(lkb);
2887         if (error) {
2888                 log_error(ls, "receive_convert_reply not on waiters");
2889                 goto out;
2890         }
2891
2892         _receive_convert_reply(lkb, ms);
2893  out:
2894         put_lkb(lkb);
2895 }
2896
2897 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2898 {
2899         struct dlm_rsb *r = lkb->lkb_resource;
2900         int error = ms->m_result;
2901
2902         hold_rsb(r);
2903         lock_rsb(r);
2904
2905         /* this is the value returned from do_unlock() on the master */
2906
2907         switch (error) {
2908         case -DLM_EUNLOCK:
2909                 receive_flags_reply(lkb, ms);
2910                 remove_lock_pc(r, lkb);
2911                 queue_cast(r, lkb, -DLM_EUNLOCK);
2912                 break;
2913         default:
2914                 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2915         }
2916
2917         unlock_rsb(r);
2918         put_rsb(r);
2919 }
2920
2921 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2922 {
2923         struct dlm_lkb *lkb;
2924         int error;
2925
2926         error = find_lkb(ls, ms->m_remid, &lkb);
2927         if (error) {
2928                 log_error(ls, "receive_unlock_reply no lkb");
2929                 return;
2930         }
2931         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2932
2933         error = remove_from_waiters(lkb);
2934         if (error) {
2935                 log_error(ls, "receive_unlock_reply not on waiters");
2936                 goto out;
2937         }
2938
2939         _receive_unlock_reply(lkb, ms);
2940  out:
2941         put_lkb(lkb);
2942 }
2943
2944 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2945 {
2946         struct dlm_rsb *r = lkb->lkb_resource;
2947         int error = ms->m_result;
2948
2949         hold_rsb(r);
2950         lock_rsb(r);
2951
2952         /* this is the value returned from do_cancel() on the master */
2953
2954         switch (error) {
2955         case -DLM_ECANCEL:
2956                 receive_flags_reply(lkb, ms);
2957                 revert_lock_pc(r, lkb);
2958                 queue_cast(r, lkb, -DLM_ECANCEL);
2959                 break;
2960         default:
2961                 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2962         }
2963
2964         unlock_rsb(r);
2965         put_rsb(r);
2966 }
2967
2968 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2969 {
2970         struct dlm_lkb *lkb;
2971         int error;
2972
2973         error = find_lkb(ls, ms->m_remid, &lkb);
2974         if (error) {
2975                 log_error(ls, "receive_cancel_reply no lkb");
2976                 return;
2977         }
2978         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2979
2980         error = remove_from_waiters(lkb);
2981         if (error) {
2982                 log_error(ls, "receive_cancel_reply not on waiters");
2983                 goto out;
2984         }
2985
2986         _receive_cancel_reply(lkb, ms);
2987  out:
2988         put_lkb(lkb);
2989 }
2990
2991 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2992 {
2993         struct dlm_lkb *lkb;
2994         struct dlm_rsb *r;
2995         int error, ret_nodeid;
2996
2997         error = find_lkb(ls, ms->m_lkid, &lkb);
2998         if (error) {
2999                 log_error(ls, "receive_lookup_reply no lkb");
3000                 return;
3001         }
3002
3003         error = remove_from_waiters(lkb);
3004         if (error) {
3005                 log_error(ls, "receive_lookup_reply not on waiters");
3006                 goto out;
3007         }
3008
3009         /* this is the value returned by dlm_dir_lookup on dir node
3010            FIXME: will a non-zero error ever be returned? */
3011         error = ms->m_result;
3012
3013         r = lkb->lkb_resource;
3014         hold_rsb(r);
3015         lock_rsb(r);
3016
3017         ret_nodeid = ms->m_nodeid;
3018         if (ret_nodeid == dlm_our_nodeid()) {
3019                 r->res_nodeid = 0;
3020                 ret_nodeid = 0;
3021                 r->res_first_lkid = 0;
3022         } else {
3023                 /* set_master() will copy res_nodeid to lkb_nodeid */
3024                 r->res_nodeid = ret_nodeid;
3025         }
3026
3027         _request_lock(r, lkb);
3028
3029         if (!ret_nodeid)
3030                 process_lookup_list(r);
3031
3032         unlock_rsb(r);
3033         put_rsb(r);
3034  out:
3035         put_lkb(lkb);
3036 }
3037
3038 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3039 {
3040         struct dlm_message *ms = (struct dlm_message *) hd;
3041         struct dlm_ls *ls;
3042         int error;
3043
3044         if (!recovery)
3045                 dlm_message_in(ms);
3046
3047         ls = dlm_find_lockspace_global(hd->h_lockspace);
3048         if (!ls) {
3049                 log_print("drop message %d from %d for unknown lockspace %d",
3050                           ms->m_type, nodeid, hd->h_lockspace);
3051                 return -EINVAL;
3052         }
3053
3054         /* recovery may have just ended leaving a bunch of backed-up requests
3055            in the requestqueue; wait while dlm_recoverd clears them */
3056
3057         if (!recovery)
3058                 dlm_wait_requestqueue(ls);
3059
3060         /* recovery may have just started while there were a bunch of
3061            in-flight requests -- save them in requestqueue to be processed
3062            after recovery.  we can't let dlm_recvd block on the recovery
3063            lock.  if dlm_recoverd is calling this function to clear the
3064            requestqueue, it needs to be interrupted (-EINTR) if another
3065            recovery operation is starting. */
3066
3067         while (1) {
3068                 if (dlm_locking_stopped(ls)) {
3069                         if (!recovery)
3070                                 dlm_add_requestqueue(ls, nodeid, hd);
3071                         error = -EINTR;
3072                         goto out;
3073                 }
3074
3075                 if (lock_recovery_try(ls))
3076                         break;
3077                 schedule();
3078         }
3079
3080         switch (ms->m_type) {
3081
3082         /* messages sent to a master node */
3083
3084         case DLM_MSG_REQUEST:
3085                 receive_request(ls, ms);
3086                 break;
3087
3088         case DLM_MSG_CONVERT:
3089                 receive_convert(ls, ms);
3090                 break;
3091
3092         case DLM_MSG_UNLOCK:
3093                 receive_unlock(ls, ms);
3094                 break;
3095
3096         case DLM_MSG_CANCEL:
3097                 receive_cancel(ls, ms);
3098                 break;
3099
3100         /* messages sent from a master node (replies to above) */
3101
3102         case DLM_MSG_REQUEST_REPLY:
3103                 receive_request_reply(ls, ms);
3104                 break;
3105
3106         case DLM_MSG_CONVERT_REPLY:
3107                 receive_convert_reply(ls, ms);
3108                 break;
3109
3110         case DLM_MSG_UNLOCK_REPLY:
3111                 receive_unlock_reply(ls, ms);
3112                 break;
3113
3114         case DLM_MSG_CANCEL_REPLY:
3115                 receive_cancel_reply(ls, ms);
3116                 break;
3117
3118         /* messages sent from a master node (only two types of async msg) */
3119
3120         case DLM_MSG_GRANT:
3121                 receive_grant(ls, ms);
3122                 break;
3123
3124         case DLM_MSG_BAST:
3125                 receive_bast(ls, ms);
3126                 break;
3127
3128         /* messages sent to a dir node */
3129
3130         case DLM_MSG_LOOKUP:
3131                 receive_lookup(ls, ms);
3132                 break;
3133
3134         case DLM_MSG_REMOVE:
3135                 receive_remove(ls, ms);
3136                 break;
3137
3138         /* messages sent from a dir node (remove has no reply) */
3139
3140         case DLM_MSG_LOOKUP_REPLY:
3141                 receive_lookup_reply(ls, ms);
3142                 break;
3143
3144         default:
3145                 log_error(ls, "unknown message type %d", ms->m_type);
3146         }
3147
3148         unlock_recovery(ls);
3149  out:
3150         dlm_put_lockspace(ls);
3151         dlm_astd_wake();
3152         return 0;
3153 }
3154
3155
3156 /*
3157  * Recovery related
3158  */
3159
3160 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3161 {
3162         if (middle_conversion(lkb)) {
3163                 hold_lkb(lkb);
3164                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3165                 _remove_from_waiters(lkb);
3166                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3167
3168                 /* Same special case as in receive_rcom_lock_args() */
3169                 lkb->lkb_grmode = DLM_LOCK_IV;
3170                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3171                 unhold_lkb(lkb);
3172
3173         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3174                 lkb->lkb_flags |= DLM_IFL_RESEND;
3175         }
3176
3177         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3178            conversions are async; there's no reply from the remote master */
3179 }
3180
3181 /* A waiting lkb needs recovery if the master node has failed, or
3182    the master node is changing (only when no directory is used) */
3183
3184 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3185 {
3186         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3187                 return 1;
3188
3189         if (!dlm_no_directory(ls))
3190                 return 0;
3191
3192         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3193                 return 1;
3194
3195         return 0;
3196 }
3197
3198 /* Recovery for locks that are waiting for replies from nodes that are now
3199    gone.  We can just complete unlocks and cancels by faking a reply from the
3200    dead node.  Requests and up-conversions we flag to be resent after
3201    recovery.  Down-conversions can just be completed with a fake reply like
3202    unlocks.  Conversions between PR and CW need special attention. */
3203
3204 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3205 {
3206         struct dlm_lkb *lkb, *safe;
3207
3208         down(&ls->ls_waiters_sem);
3209
3210         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3211                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3212                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3213
3214                 /* all outstanding lookups, regardless of destination  will be
3215                    resent after recovery is done */
3216
3217                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3218                         lkb->lkb_flags |= DLM_IFL_RESEND;
3219                         continue;
3220                 }
3221
3222                 if (!waiter_needs_recovery(ls, lkb))
3223                         continue;
3224
3225                 switch (lkb->lkb_wait_type) {
3226
3227                 case DLM_MSG_REQUEST:
3228                         lkb->lkb_flags |= DLM_IFL_RESEND;
3229                         break;
3230
3231                 case DLM_MSG_CONVERT:
3232                         recover_convert_waiter(ls, lkb);
3233                         break;
3234
3235                 case DLM_MSG_UNLOCK:
3236                         hold_lkb(lkb);
3237                         ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3238                         _remove_from_waiters(lkb);
3239                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3240                         put_lkb(lkb);
3241                         break;
3242
3243                 case DLM_MSG_CANCEL:
3244                         hold_lkb(lkb);
3245                         ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3246                         _remove_from_waiters(lkb);
3247                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3248                         put_lkb(lkb);
3249                         break;
3250
3251                 default:
3252                         log_error(ls, "invalid lkb wait_type %d",
3253                                   lkb->lkb_wait_type);
3254                 }
3255         }
3256         up(&ls->ls_waiters_sem);
3257 }
3258
3259 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3260 {
3261         struct dlm_lkb *lkb;
3262         int rv = 0;
3263
3264         down(&ls->ls_waiters_sem);
3265         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3266                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3267                         rv = lkb->lkb_wait_type;
3268                         _remove_from_waiters(lkb);
3269                         lkb->lkb_flags &= ~DLM_IFL_RESEND;
3270                         break;
3271                 }
3272         }
3273         up(&ls->ls_waiters_sem);
3274
3275         if (!rv)
3276                 lkb = NULL;
3277         *lkb_ret = lkb;
3278         return rv;
3279 }
3280
3281 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3282    master or dir-node for r.  Processing the lkb may result in it being placed
3283    back on waiters. */
3284
3285 int dlm_recover_waiters_post(struct dlm_ls *ls)
3286 {
3287         struct dlm_lkb *lkb;
3288         struct dlm_rsb *r;
3289         int error = 0, mstype;
3290
3291         while (1) {
3292                 if (dlm_locking_stopped(ls)) {
3293                         log_debug(ls, "recover_waiters_post aborted");
3294                         error = -EINTR;
3295                         break;
3296                 }
3297
3298                 mstype = remove_resend_waiter(ls, &lkb);
3299                 if (!mstype)
3300                         break;
3301
3302                 r = lkb->lkb_resource;
3303
3304                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3305                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3306
3307                 switch (mstype) {
3308
3309                 case DLM_MSG_LOOKUP:
3310                         hold_rsb(r);
3311                         lock_rsb(r);
3312                         _request_lock(r, lkb);
3313                         if (is_master(r))
3314                                 confirm_master(r, 0);
3315                         unlock_rsb(r);
3316                         put_rsb(r);
3317                         break;
3318
3319                 case DLM_MSG_REQUEST:
3320                         hold_rsb(r);
3321                         lock_rsb(r);
3322                         _request_lock(r, lkb);
3323                         unlock_rsb(r);
3324                         put_rsb(r);
3325                         break;
3326
3327                 case DLM_MSG_CONVERT:
3328                         hold_rsb(r);
3329                         lock_rsb(r);
3330                         _convert_lock(r, lkb);
3331                         unlock_rsb(r);
3332                         put_rsb(r);
3333                         break;
3334
3335                 default:
3336                         log_error(ls, "recover_waiters_post type %d", mstype);
3337                 }
3338         }
3339
3340         return error;
3341 }
3342
3343 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3344                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3345 {
3346         struct dlm_ls *ls = r->res_ls;
3347         struct dlm_lkb *lkb, *safe;
3348
3349         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3350                 if (test(ls, lkb)) {
3351                         del_lkb(r, lkb);
3352                         /* this put should free the lkb */
3353                         if (!put_lkb(lkb))
3354                                 log_error(ls, "purged lkb not released");
3355                 }
3356         }
3357 }
3358
3359 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3360 {
3361         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3362 }
3363
3364 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3365 {
3366         return is_master_copy(lkb);
3367 }
3368
3369 static void purge_dead_locks(struct dlm_rsb *r)
3370 {
3371         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3372         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3373         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3374 }
3375
3376 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3377 {
3378         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3379         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3380         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3381 }
3382
3383 /* Get rid of locks held by nodes that are gone. */
3384
3385 int dlm_purge_locks(struct dlm_ls *ls)
3386 {
3387         struct dlm_rsb *r;
3388
3389         log_debug(ls, "dlm_purge_locks");
3390
3391         down_write(&ls->ls_root_sem);
3392         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3393                 hold_rsb(r);
3394                 lock_rsb(r);
3395                 if (is_master(r))
3396                         purge_dead_locks(r);
3397                 unlock_rsb(r);
3398                 unhold_rsb(r);
3399
3400                 schedule();
3401         }
3402         up_write(&ls->ls_root_sem);
3403
3404         return 0;
3405 }
3406
3407 int dlm_grant_after_purge(struct dlm_ls *ls)
3408 {
3409         struct dlm_rsb *r;
3410         int i;
3411
3412         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
3413                 read_lock(&ls->ls_rsbtbl[i].lock);
3414                 list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
3415                         hold_rsb(r);
3416                         lock_rsb(r);
3417                         if (is_master(r)) {
3418                                 grant_pending_locks(r);
3419                                 confirm_master(r, 0);
3420                         }
3421                         unlock_rsb(r);
3422                         put_rsb(r);
3423                 }
3424                 read_unlock(&ls->ls_rsbtbl[i].lock);
3425         }
3426
3427         return 0;
3428 }
3429
3430 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3431                                          uint32_t remid)
3432 {
3433         struct dlm_lkb *lkb;
3434
3435         list_for_each_entry(lkb, head, lkb_statequeue) {
3436                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3437                         return lkb;
3438         }
3439         return NULL;
3440 }
3441
3442 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3443                                     uint32_t remid)
3444 {
3445         struct dlm_lkb *lkb;
3446
3447         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3448         if (lkb)
3449                 return lkb;
3450         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3451         if (lkb)
3452                 return lkb;
3453         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3454         if (lkb)
3455                 return lkb;
3456         return NULL;
3457 }
3458
3459 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3460                                   struct dlm_rsb *r, struct dlm_rcom *rc)
3461 {
3462         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3463         int lvblen;
3464
3465         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3466         lkb->lkb_ownpid = rl->rl_ownpid;
3467         lkb->lkb_remid = rl->rl_lkid;
3468         lkb->lkb_exflags = rl->rl_exflags;
3469         lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3470         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3471         lkb->lkb_lvbseq = rl->rl_lvbseq;
3472         lkb->lkb_rqmode = rl->rl_rqmode;
3473         lkb->lkb_grmode = rl->rl_grmode;
3474         /* don't set lkb_status because add_lkb wants to itself */
3475
3476         lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3477         lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3478
3479         if (lkb->lkb_flags & DLM_IFL_RANGE) {
3480                 lkb->lkb_range = allocate_range(ls);
3481                 if (!lkb->lkb_range)
3482                         return -ENOMEM;
3483                 memcpy(lkb->lkb_range, rl->rl_range, 4*sizeof(uint64_t));
3484         }
3485
3486         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3487                 lkb->lkb_lvbptr = allocate_lvb(ls);
3488                 if (!lkb->lkb_lvbptr)
3489                         return -ENOMEM;
3490                 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3491                          sizeof(struct rcom_lock);
3492                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3493         }
3494
3495         /* Conversions between PR and CW (middle modes) need special handling.
3496            The real granted mode of these converting locks cannot be determined
3497            until all locks have been rebuilt on the rsb (recover_conversion) */
3498
3499         if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3500                 rl->rl_status = DLM_LKSTS_CONVERT;
3501                 lkb->lkb_grmode = DLM_LOCK_IV;
3502                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3503         }
3504
3505         return 0;
3506 }
3507
3508 /* This lkb may have been recovered in a previous aborted recovery so we need
3509    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3510    If so we just send back a standard reply.  If not, we create a new lkb with
3511    the given values and send back our lkid.  We send back our lkid by sending
3512    back the rcom_lock struct we got but with the remid field filled in. */
3513
3514 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3515 {
3516         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3517         struct dlm_rsb *r;
3518         struct dlm_lkb *lkb;
3519         int error;
3520
3521         if (rl->rl_parent_lkid) {
3522                 error = -EOPNOTSUPP;
3523                 goto out;
3524         }
3525
3526         error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3527         if (error)
3528                 goto out;
3529
3530         lock_rsb(r);
3531
3532         lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3533         if (lkb) {
3534                 error = -EEXIST;
3535                 goto out_remid;
3536         }
3537
3538         error = create_lkb(ls, &lkb);
3539         if (error)
3540                 goto out_unlock;
3541
3542         error = receive_rcom_lock_args(ls, lkb, r, rc);
3543         if (error) {
3544                 put_lkb(lkb);
3545                 goto out_unlock;
3546         }
3547
3548         attach_lkb(r, lkb);
3549         add_lkb(r, lkb, rl->rl_status);
3550         error = 0;
3551
3552  out_remid:
3553         /* this is the new value returned to the lock holder for
3554            saving in its process-copy lkb */
3555         rl->rl_remid = lkb->lkb_id;
3556
3557  out_unlock:
3558         unlock_rsb(r);
3559         put_rsb(r);
3560  out:
3561         if (error)
3562                 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3563         rl->rl_result = error;
3564         return error;
3565 }
3566
3567 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3568 {
3569         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3570         struct dlm_rsb *r;
3571         struct dlm_lkb *lkb;
3572         int error;
3573
3574         error = find_lkb(ls, rl->rl_lkid, &lkb);
3575         if (error) {
3576                 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3577                 return error;
3578         }
3579
3580         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3581
3582         error = rl->rl_result;
3583
3584         r = lkb->lkb_resource;
3585         hold_rsb(r);
3586         lock_rsb(r);
3587
3588         switch (error) {
3589         case -EEXIST:
3590                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3591                 /* fall through */
3592         case 0:
3593                 lkb->lkb_remid = rl->rl_remid;
3594                 break;
3595         default:
3596                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3597                           error, lkb->lkb_id);
3598         }
3599
3600         /* an ack for dlm_recover_locks() which waits for replies from
3601            all the locks it sends to new masters */
3602         dlm_recovered_lock(r);
3603
3604         unlock_rsb(r);
3605         put_rsb(r);
3606         put_lkb(lkb);
3607
3608         return 0;
3609 }
3610