Merge branch 'x86/core' into x86/apic
[linux-2.6] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87                                     struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132
133 #define modes_compat(gr, rq) \
134         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167
168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171                r->res_nodeid, r->res_flags, r->res_first_lkid,
172                r->res_recover_locks_count, r->res_name);
173 }
174
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177         struct dlm_lkb *lkb;
178
179         dlm_print_rsb(r);
180
181         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183         printk(KERN_ERR "rsb lookup list\n");
184         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185                 dlm_print_lkb(lkb);
186         printk(KERN_ERR "rsb grant queue:\n");
187         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb convert queue:\n");
190         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb wait queue:\n");
193         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195 }
196
197 /* Threads cannot use the lockspace while it's being recovered */
198
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201         down_read(&ls->ls_in_recovery);
202 }
203
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206         up_read(&ls->ls_in_recovery);
207 }
208
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211         return down_read_trylock(&ls->ls_in_recovery);
212 }
213
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242         return !!r->res_nodeid;
243 }
244
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261                 return 1;
262         return 0;
263 }
264
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283                                   DLM_IFL_OVERLAP_CANCEL));
284 }
285
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288         if (is_master_copy(lkb))
289                 return;
290
291         del_timeout(lkb);
292
293         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294
295         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
296            timeout caused the cancel then return -ETIMEDOUT */
297         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299                 rv = -ETIMEDOUT;
300         }
301
302         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304                 rv = -EDEADLK;
305         }
306
307         lkb->lkb_lksb->sb_status = rv;
308         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309
310         dlm_add_ast(lkb, AST_COMP);
311 }
312
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315         queue_cast(r, lkb,
316                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321         if (is_master_copy(lkb))
322                 send_bast(r, lkb, rqmode);
323         else {
324                 lkb->lkb_bastmode = rqmode;
325                 dlm_add_ast(lkb, AST_BAST);
326         }
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
334 {
335         struct dlm_rsb *r;
336
337         r = dlm_allocate_rsb(ls, len);
338         if (!r)
339                 return NULL;
340
341         r->res_ls = ls;
342         r->res_length = len;
343         memcpy(r->res_name, name, len);
344         mutex_init(&r->res_mutex);
345
346         INIT_LIST_HEAD(&r->res_lookup);
347         INIT_LIST_HEAD(&r->res_grantqueue);
348         INIT_LIST_HEAD(&r->res_convertqueue);
349         INIT_LIST_HEAD(&r->res_waitqueue);
350         INIT_LIST_HEAD(&r->res_root_list);
351         INIT_LIST_HEAD(&r->res_recover_list);
352
353         return r;
354 }
355
356 static int search_rsb_list(struct list_head *head, char *name, int len,
357                            unsigned int flags, struct dlm_rsb **r_ret)
358 {
359         struct dlm_rsb *r;
360         int error = 0;
361
362         list_for_each_entry(r, head, res_hashchain) {
363                 if (len == r->res_length && !memcmp(name, r->res_name, len))
364                         goto found;
365         }
366         *r_ret = NULL;
367         return -EBADR;
368
369  found:
370         if (r->res_nodeid && (flags & R_MASTER))
371                 error = -ENOTBLK;
372         *r_ret = r;
373         return error;
374 }
375
376 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
377                        unsigned int flags, struct dlm_rsb **r_ret)
378 {
379         struct dlm_rsb *r;
380         int error;
381
382         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
383         if (!error) {
384                 kref_get(&r->res_ref);
385                 goto out;
386         }
387         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
388         if (error)
389                 goto out;
390
391         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
392
393         if (dlm_no_directory(ls))
394                 goto out;
395
396         if (r->res_nodeid == -1) {
397                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
398                 r->res_first_lkid = 0;
399         } else if (r->res_nodeid > 0) {
400                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
401                 r->res_first_lkid = 0;
402         } else {
403                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
404                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
405         }
406  out:
407         *r_ret = r;
408         return error;
409 }
410
411 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
412                       unsigned int flags, struct dlm_rsb **r_ret)
413 {
414         int error;
415         write_lock(&ls->ls_rsbtbl[b].lock);
416         error = _search_rsb(ls, name, len, b, flags, r_ret);
417         write_unlock(&ls->ls_rsbtbl[b].lock);
418         return error;
419 }
420
421 /*
422  * Find rsb in rsbtbl and potentially create/add one
423  *
424  * Delaying the release of rsb's has a similar benefit to applications keeping
425  * NL locks on an rsb, but without the guarantee that the cached master value
426  * will still be valid when the rsb is reused.  Apps aren't always smart enough
427  * to keep NL locks on an rsb that they may lock again shortly; this can lead
428  * to excessive master lookups and removals if we don't delay the release.
429  *
430  * Searching for an rsb means looking through both the normal list and toss
431  * list.  When found on the toss list the rsb is moved to the normal list with
432  * ref count of 1; when found on normal list the ref count is incremented.
433  */
434
435 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
436                     unsigned int flags, struct dlm_rsb **r_ret)
437 {
438         struct dlm_rsb *r, *tmp;
439         uint32_t hash, bucket;
440         int error = -EINVAL;
441
442         if (namelen > DLM_RESNAME_MAXLEN)
443                 goto out;
444
445         if (dlm_no_directory(ls))
446                 flags |= R_CREATE;
447
448         error = 0;
449         hash = jhash(name, namelen, 0);
450         bucket = hash & (ls->ls_rsbtbl_size - 1);
451
452         error = search_rsb(ls, name, namelen, bucket, flags, &r);
453         if (!error)
454                 goto out;
455
456         if (error == -EBADR && !(flags & R_CREATE))
457                 goto out;
458
459         /* the rsb was found but wasn't a master copy */
460         if (error == -ENOTBLK)
461                 goto out;
462
463         error = -ENOMEM;
464         r = create_rsb(ls, name, namelen);
465         if (!r)
466                 goto out;
467
468         r->res_hash = hash;
469         r->res_bucket = bucket;
470         r->res_nodeid = -1;
471         kref_init(&r->res_ref);
472
473         /* With no directory, the master can be set immediately */
474         if (dlm_no_directory(ls)) {
475                 int nodeid = dlm_dir_nodeid(r);
476                 if (nodeid == dlm_our_nodeid())
477                         nodeid = 0;
478                 r->res_nodeid = nodeid;
479         }
480
481         write_lock(&ls->ls_rsbtbl[bucket].lock);
482         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
483         if (!error) {
484                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
485                 dlm_free_rsb(r);
486                 r = tmp;
487                 goto out;
488         }
489         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
490         write_unlock(&ls->ls_rsbtbl[bucket].lock);
491         error = 0;
492  out:
493         *r_ret = r;
494         return error;
495 }
496
497 /* This is only called to add a reference when the code already holds
498    a valid reference to the rsb, so there's no need for locking. */
499
500 static inline void hold_rsb(struct dlm_rsb *r)
501 {
502         kref_get(&r->res_ref);
503 }
504
505 void dlm_hold_rsb(struct dlm_rsb *r)
506 {
507         hold_rsb(r);
508 }
509
510 static void toss_rsb(struct kref *kref)
511 {
512         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
513         struct dlm_ls *ls = r->res_ls;
514
515         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
516         kref_init(&r->res_ref);
517         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
518         r->res_toss_time = jiffies;
519         if (r->res_lvbptr) {
520                 dlm_free_lvb(r->res_lvbptr);
521                 r->res_lvbptr = NULL;
522         }
523 }
524
525 /* When all references to the rsb are gone it's transfered to
526    the tossed list for later disposal. */
527
528 static void put_rsb(struct dlm_rsb *r)
529 {
530         struct dlm_ls *ls = r->res_ls;
531         uint32_t bucket = r->res_bucket;
532
533         write_lock(&ls->ls_rsbtbl[bucket].lock);
534         kref_put(&r->res_ref, toss_rsb);
535         write_unlock(&ls->ls_rsbtbl[bucket].lock);
536 }
537
538 void dlm_put_rsb(struct dlm_rsb *r)
539 {
540         put_rsb(r);
541 }
542
543 /* See comment for unhold_lkb */
544
545 static void unhold_rsb(struct dlm_rsb *r)
546 {
547         int rv;
548         rv = kref_put(&r->res_ref, toss_rsb);
549         DLM_ASSERT(!rv, dlm_dump_rsb(r););
550 }
551
552 static void kill_rsb(struct kref *kref)
553 {
554         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
555
556         /* All work is done after the return from kref_put() so we
557            can release the write_lock before the remove and free. */
558
559         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
560         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
561         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
562         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
563         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
564         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
565 }
566
567 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
568    The rsb must exist as long as any lkb's for it do. */
569
570 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
571 {
572         hold_rsb(r);
573         lkb->lkb_resource = r;
574 }
575
576 static void detach_lkb(struct dlm_lkb *lkb)
577 {
578         if (lkb->lkb_resource) {
579                 put_rsb(lkb->lkb_resource);
580                 lkb->lkb_resource = NULL;
581         }
582 }
583
584 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
585 {
586         struct dlm_lkb *lkb, *tmp;
587         uint32_t lkid = 0;
588         uint16_t bucket;
589
590         lkb = dlm_allocate_lkb(ls);
591         if (!lkb)
592                 return -ENOMEM;
593
594         lkb->lkb_nodeid = -1;
595         lkb->lkb_grmode = DLM_LOCK_IV;
596         kref_init(&lkb->lkb_ref);
597         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
598         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
599         INIT_LIST_HEAD(&lkb->lkb_time_list);
600
601         get_random_bytes(&bucket, sizeof(bucket));
602         bucket &= (ls->ls_lkbtbl_size - 1);
603
604         write_lock(&ls->ls_lkbtbl[bucket].lock);
605
606         /* counter can roll over so we must verify lkid is not in use */
607
608         while (lkid == 0) {
609                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
610
611                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
612                                     lkb_idtbl_list) {
613                         if (tmp->lkb_id != lkid)
614                                 continue;
615                         lkid = 0;
616                         break;
617                 }
618         }
619
620         lkb->lkb_id = lkid;
621         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
622         write_unlock(&ls->ls_lkbtbl[bucket].lock);
623
624         *lkb_ret = lkb;
625         return 0;
626 }
627
628 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
629 {
630         struct dlm_lkb *lkb;
631         uint16_t bucket = (lkid >> 16);
632
633         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
634                 if (lkb->lkb_id == lkid)
635                         return lkb;
636         }
637         return NULL;
638 }
639
640 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
641 {
642         struct dlm_lkb *lkb;
643         uint16_t bucket = (lkid >> 16);
644
645         if (bucket >= ls->ls_lkbtbl_size)
646                 return -EBADSLT;
647
648         read_lock(&ls->ls_lkbtbl[bucket].lock);
649         lkb = __find_lkb(ls, lkid);
650         if (lkb)
651                 kref_get(&lkb->lkb_ref);
652         read_unlock(&ls->ls_lkbtbl[bucket].lock);
653
654         *lkb_ret = lkb;
655         return lkb ? 0 : -ENOENT;
656 }
657
658 static void kill_lkb(struct kref *kref)
659 {
660         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
661
662         /* All work is done after the return from kref_put() so we
663            can release the write_lock before the detach_lkb */
664
665         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
666 }
667
668 /* __put_lkb() is used when an lkb may not have an rsb attached to
669    it so we need to provide the lockspace explicitly */
670
671 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
672 {
673         uint16_t bucket = (lkb->lkb_id >> 16);
674
675         write_lock(&ls->ls_lkbtbl[bucket].lock);
676         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
677                 list_del(&lkb->lkb_idtbl_list);
678                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
679
680                 detach_lkb(lkb);
681
682                 /* for local/process lkbs, lvbptr points to caller's lksb */
683                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
684                         dlm_free_lvb(lkb->lkb_lvbptr);
685                 dlm_free_lkb(lkb);
686                 return 1;
687         } else {
688                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
689                 return 0;
690         }
691 }
692
693 int dlm_put_lkb(struct dlm_lkb *lkb)
694 {
695         struct dlm_ls *ls;
696
697         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
698         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
699
700         ls = lkb->lkb_resource->res_ls;
701         return __put_lkb(ls, lkb);
702 }
703
704 /* This is only called to add a reference when the code already holds
705    a valid reference to the lkb, so there's no need for locking. */
706
707 static inline void hold_lkb(struct dlm_lkb *lkb)
708 {
709         kref_get(&lkb->lkb_ref);
710 }
711
712 /* This is called when we need to remove a reference and are certain
713    it's not the last ref.  e.g. del_lkb is always called between a
714    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
715    put_lkb would work fine, but would involve unnecessary locking */
716
717 static inline void unhold_lkb(struct dlm_lkb *lkb)
718 {
719         int rv;
720         rv = kref_put(&lkb->lkb_ref, kill_lkb);
721         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
722 }
723
724 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
725                             int mode)
726 {
727         struct dlm_lkb *lkb = NULL;
728
729         list_for_each_entry(lkb, head, lkb_statequeue)
730                 if (lkb->lkb_rqmode < mode)
731                         break;
732
733         if (!lkb)
734                 list_add_tail(new, head);
735         else
736                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
737 }
738
739 /* add/remove lkb to rsb's grant/convert/wait queue */
740
741 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
742 {
743         kref_get(&lkb->lkb_ref);
744
745         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
746
747         lkb->lkb_status = status;
748
749         switch (status) {
750         case DLM_LKSTS_WAITING:
751                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
752                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
753                 else
754                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
755                 break;
756         case DLM_LKSTS_GRANTED:
757                 /* convention says granted locks kept in order of grmode */
758                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
759                                 lkb->lkb_grmode);
760                 break;
761         case DLM_LKSTS_CONVERT:
762                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
763                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
764                 else
765                         list_add_tail(&lkb->lkb_statequeue,
766                                       &r->res_convertqueue);
767                 break;
768         default:
769                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
770         }
771 }
772
773 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
774 {
775         lkb->lkb_status = 0;
776         list_del(&lkb->lkb_statequeue);
777         unhold_lkb(lkb);
778 }
779
780 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
781 {
782         hold_lkb(lkb);
783         del_lkb(r, lkb);
784         add_lkb(r, lkb, sts);
785         unhold_lkb(lkb);
786 }
787
788 static int msg_reply_type(int mstype)
789 {
790         switch (mstype) {
791         case DLM_MSG_REQUEST:
792                 return DLM_MSG_REQUEST_REPLY;
793         case DLM_MSG_CONVERT:
794                 return DLM_MSG_CONVERT_REPLY;
795         case DLM_MSG_UNLOCK:
796                 return DLM_MSG_UNLOCK_REPLY;
797         case DLM_MSG_CANCEL:
798                 return DLM_MSG_CANCEL_REPLY;
799         case DLM_MSG_LOOKUP:
800                 return DLM_MSG_LOOKUP_REPLY;
801         }
802         return -1;
803 }
804
805 /* add/remove lkb from global waiters list of lkb's waiting for
806    a reply from a remote node */
807
808 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
809 {
810         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
811         int error = 0;
812
813         mutex_lock(&ls->ls_waiters_mutex);
814
815         if (is_overlap_unlock(lkb) ||
816             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
817                 error = -EINVAL;
818                 goto out;
819         }
820
821         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
822                 switch (mstype) {
823                 case DLM_MSG_UNLOCK:
824                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
825                         break;
826                 case DLM_MSG_CANCEL:
827                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
828                         break;
829                 default:
830                         error = -EBUSY;
831                         goto out;
832                 }
833                 lkb->lkb_wait_count++;
834                 hold_lkb(lkb);
835
836                 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
837                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
838                           lkb->lkb_wait_count, lkb->lkb_flags);
839                 goto out;
840         }
841
842         DLM_ASSERT(!lkb->lkb_wait_count,
843                    dlm_print_lkb(lkb);
844                    printk("wait_count %d\n", lkb->lkb_wait_count););
845
846         lkb->lkb_wait_count++;
847         lkb->lkb_wait_type = mstype;
848         hold_lkb(lkb);
849         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
850  out:
851         if (error)
852                 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
853                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
854                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
855         mutex_unlock(&ls->ls_waiters_mutex);
856         return error;
857 }
858
859 /* We clear the RESEND flag because we might be taking an lkb off the waiters
860    list as part of process_requestqueue (e.g. a lookup that has an optimized
861    request reply on the requestqueue) between dlm_recover_waiters_pre() which
862    set RESEND and dlm_recover_waiters_post() */
863
864 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
865 {
866         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
867         int overlap_done = 0;
868
869         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
870                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
871                 overlap_done = 1;
872                 goto out_del;
873         }
874
875         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
876                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
877                 overlap_done = 1;
878                 goto out_del;
879         }
880
881         /* N.B. type of reply may not always correspond to type of original
882            msg due to lookup->request optimization, verify others? */
883
884         if (lkb->lkb_wait_type) {
885                 lkb->lkb_wait_type = 0;
886                 goto out_del;
887         }
888
889         log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
890                   lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
891         return -1;
892
893  out_del:
894         /* the force-unlock/cancel has completed and we haven't recvd a reply
895            to the op that was in progress prior to the unlock/cancel; we
896            give up on any reply to the earlier op.  FIXME: not sure when/how
897            this would happen */
898
899         if (overlap_done && lkb->lkb_wait_type) {
900                 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
901                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
902                 lkb->lkb_wait_count--;
903                 lkb->lkb_wait_type = 0;
904         }
905
906         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
907
908         lkb->lkb_flags &= ~DLM_IFL_RESEND;
909         lkb->lkb_wait_count--;
910         if (!lkb->lkb_wait_count)
911                 list_del_init(&lkb->lkb_wait_reply);
912         unhold_lkb(lkb);
913         return 0;
914 }
915
916 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
917 {
918         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
919         int error;
920
921         mutex_lock(&ls->ls_waiters_mutex);
922         error = _remove_from_waiters(lkb, mstype);
923         mutex_unlock(&ls->ls_waiters_mutex);
924         return error;
925 }
926
927 /* Handles situations where we might be processing a "fake" or "stub" reply in
928    which we can't try to take waiters_mutex again. */
929
930 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
931 {
932         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
933         int error;
934
935         if (ms != &ls->ls_stub_ms)
936                 mutex_lock(&ls->ls_waiters_mutex);
937         error = _remove_from_waiters(lkb, ms->m_type);
938         if (ms != &ls->ls_stub_ms)
939                 mutex_unlock(&ls->ls_waiters_mutex);
940         return error;
941 }
942
943 static void dir_remove(struct dlm_rsb *r)
944 {
945         int to_nodeid;
946
947         if (dlm_no_directory(r->res_ls))
948                 return;
949
950         to_nodeid = dlm_dir_nodeid(r);
951         if (to_nodeid != dlm_our_nodeid())
952                 send_remove(r);
953         else
954                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
955                                      r->res_name, r->res_length);
956 }
957
958 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
959    found since they are in order of newest to oldest? */
960
961 static int shrink_bucket(struct dlm_ls *ls, int b)
962 {
963         struct dlm_rsb *r;
964         int count = 0, found;
965
966         for (;;) {
967                 found = 0;
968                 write_lock(&ls->ls_rsbtbl[b].lock);
969                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
970                                             res_hashchain) {
971                         if (!time_after_eq(jiffies, r->res_toss_time +
972                                            dlm_config.ci_toss_secs * HZ))
973                                 continue;
974                         found = 1;
975                         break;
976                 }
977
978                 if (!found) {
979                         write_unlock(&ls->ls_rsbtbl[b].lock);
980                         break;
981                 }
982
983                 if (kref_put(&r->res_ref, kill_rsb)) {
984                         list_del(&r->res_hashchain);
985                         write_unlock(&ls->ls_rsbtbl[b].lock);
986
987                         if (is_master(r))
988                                 dir_remove(r);
989                         dlm_free_rsb(r);
990                         count++;
991                 } else {
992                         write_unlock(&ls->ls_rsbtbl[b].lock);
993                         log_error(ls, "tossed rsb in use %s", r->res_name);
994                 }
995         }
996
997         return count;
998 }
999
1000 void dlm_scan_rsbs(struct dlm_ls *ls)
1001 {
1002         int i;
1003
1004         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1005                 shrink_bucket(ls, i);
1006                 if (dlm_locking_stopped(ls))
1007                         break;
1008                 cond_resched();
1009         }
1010 }
1011
1012 static void add_timeout(struct dlm_lkb *lkb)
1013 {
1014         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1015
1016         if (is_master_copy(lkb)) {
1017                 lkb->lkb_timestamp = jiffies;
1018                 return;
1019         }
1020
1021         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1022             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1023                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1024                 goto add_it;
1025         }
1026         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1027                 goto add_it;
1028         return;
1029
1030  add_it:
1031         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1032         mutex_lock(&ls->ls_timeout_mutex);
1033         hold_lkb(lkb);
1034         lkb->lkb_timestamp = jiffies;
1035         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1036         mutex_unlock(&ls->ls_timeout_mutex);
1037 }
1038
1039 static void del_timeout(struct dlm_lkb *lkb)
1040 {
1041         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1042
1043         mutex_lock(&ls->ls_timeout_mutex);
1044         if (!list_empty(&lkb->lkb_time_list)) {
1045                 list_del_init(&lkb->lkb_time_list);
1046                 unhold_lkb(lkb);
1047         }
1048         mutex_unlock(&ls->ls_timeout_mutex);
1049 }
1050
1051 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1052    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1053    and then lock rsb because of lock ordering in add_timeout.  We may need
1054    to specify some special timeout-related bits in the lkb that are just to
1055    be accessed under the timeout_mutex. */
1056
1057 void dlm_scan_timeout(struct dlm_ls *ls)
1058 {
1059         struct dlm_rsb *r;
1060         struct dlm_lkb *lkb;
1061         int do_cancel, do_warn;
1062
1063         for (;;) {
1064                 if (dlm_locking_stopped(ls))
1065                         break;
1066
1067                 do_cancel = 0;
1068                 do_warn = 0;
1069                 mutex_lock(&ls->ls_timeout_mutex);
1070                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1071
1072                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1073                             time_after_eq(jiffies, lkb->lkb_timestamp +
1074                                           lkb->lkb_timeout_cs * HZ/100))
1075                                 do_cancel = 1;
1076
1077                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1078                             time_after_eq(jiffies, lkb->lkb_timestamp +
1079                                            dlm_config.ci_timewarn_cs * HZ/100))
1080                                 do_warn = 1;
1081
1082                         if (!do_cancel && !do_warn)
1083                                 continue;
1084                         hold_lkb(lkb);
1085                         break;
1086                 }
1087                 mutex_unlock(&ls->ls_timeout_mutex);
1088
1089                 if (!do_cancel && !do_warn)
1090                         break;
1091
1092                 r = lkb->lkb_resource;
1093                 hold_rsb(r);
1094                 lock_rsb(r);
1095
1096                 if (do_warn) {
1097                         /* clear flag so we only warn once */
1098                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1099                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1100                                 del_timeout(lkb);
1101                         dlm_timeout_warn(lkb);
1102                 }
1103
1104                 if (do_cancel) {
1105                         log_debug(ls, "timeout cancel %x node %d %s",
1106                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1107                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1108                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1109                         del_timeout(lkb);
1110                         _cancel_lock(r, lkb);
1111                 }
1112
1113                 unlock_rsb(r);
1114                 unhold_rsb(r);
1115                 dlm_put_lkb(lkb);
1116         }
1117 }
1118
1119 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1120    dlm_recoverd before checking/setting ls_recover_begin. */
1121
1122 void dlm_adjust_timeouts(struct dlm_ls *ls)
1123 {
1124         struct dlm_lkb *lkb;
1125         long adj = jiffies - ls->ls_recover_begin;
1126
1127         ls->ls_recover_begin = 0;
1128         mutex_lock(&ls->ls_timeout_mutex);
1129         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1130                 lkb->lkb_timestamp += adj;
1131         mutex_unlock(&ls->ls_timeout_mutex);
1132 }
1133
1134 /* lkb is master or local copy */
1135
1136 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1137 {
1138         int b, len = r->res_ls->ls_lvblen;
1139
1140         /* b=1 lvb returned to caller
1141            b=0 lvb written to rsb or invalidated
1142            b=-1 do nothing */
1143
1144         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1145
1146         if (b == 1) {
1147                 if (!lkb->lkb_lvbptr)
1148                         return;
1149
1150                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1151                         return;
1152
1153                 if (!r->res_lvbptr)
1154                         return;
1155
1156                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1157                 lkb->lkb_lvbseq = r->res_lvbseq;
1158
1159         } else if (b == 0) {
1160                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1161                         rsb_set_flag(r, RSB_VALNOTVALID);
1162                         return;
1163                 }
1164
1165                 if (!lkb->lkb_lvbptr)
1166                         return;
1167
1168                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1169                         return;
1170
1171                 if (!r->res_lvbptr)
1172                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1173
1174                 if (!r->res_lvbptr)
1175                         return;
1176
1177                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1178                 r->res_lvbseq++;
1179                 lkb->lkb_lvbseq = r->res_lvbseq;
1180                 rsb_clear_flag(r, RSB_VALNOTVALID);
1181         }
1182
1183         if (rsb_flag(r, RSB_VALNOTVALID))
1184                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1185 }
1186
1187 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1188 {
1189         if (lkb->lkb_grmode < DLM_LOCK_PW)
1190                 return;
1191
1192         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1193                 rsb_set_flag(r, RSB_VALNOTVALID);
1194                 return;
1195         }
1196
1197         if (!lkb->lkb_lvbptr)
1198                 return;
1199
1200         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1201                 return;
1202
1203         if (!r->res_lvbptr)
1204                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1205
1206         if (!r->res_lvbptr)
1207                 return;
1208
1209         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1210         r->res_lvbseq++;
1211         rsb_clear_flag(r, RSB_VALNOTVALID);
1212 }
1213
1214 /* lkb is process copy (pc) */
1215
1216 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1217                             struct dlm_message *ms)
1218 {
1219         int b;
1220
1221         if (!lkb->lkb_lvbptr)
1222                 return;
1223
1224         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1225                 return;
1226
1227         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1228         if (b == 1) {
1229                 int len = receive_extralen(ms);
1230                 if (len > DLM_RESNAME_MAXLEN)
1231                         len = DLM_RESNAME_MAXLEN;
1232                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1233                 lkb->lkb_lvbseq = ms->m_lvbseq;
1234         }
1235 }
1236
1237 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1238    remove_lock -- used for unlock, removes lkb from granted
1239    revert_lock -- used for cancel, moves lkb from convert to granted
1240    grant_lock  -- used for request and convert, adds lkb to granted or
1241                   moves lkb from convert or waiting to granted
1242
1243    Each of these is used for master or local copy lkb's.  There is
1244    also a _pc() variation used to make the corresponding change on
1245    a process copy (pc) lkb. */
1246
1247 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1248 {
1249         del_lkb(r, lkb);
1250         lkb->lkb_grmode = DLM_LOCK_IV;
1251         /* this unhold undoes the original ref from create_lkb()
1252            so this leads to the lkb being freed */
1253         unhold_lkb(lkb);
1254 }
1255
1256 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1257 {
1258         set_lvb_unlock(r, lkb);
1259         _remove_lock(r, lkb);
1260 }
1261
1262 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1263 {
1264         _remove_lock(r, lkb);
1265 }
1266
1267 /* returns: 0 did nothing
1268             1 moved lock to granted
1269            -1 removed lock */
1270
1271 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1272 {
1273         int rv = 0;
1274
1275         lkb->lkb_rqmode = DLM_LOCK_IV;
1276
1277         switch (lkb->lkb_status) {
1278         case DLM_LKSTS_GRANTED:
1279                 break;
1280         case DLM_LKSTS_CONVERT:
1281                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1282                 rv = 1;
1283                 break;
1284         case DLM_LKSTS_WAITING:
1285                 del_lkb(r, lkb);
1286                 lkb->lkb_grmode = DLM_LOCK_IV;
1287                 /* this unhold undoes the original ref from create_lkb()
1288                    so this leads to the lkb being freed */
1289                 unhold_lkb(lkb);
1290                 rv = -1;
1291                 break;
1292         default:
1293                 log_print("invalid status for revert %d", lkb->lkb_status);
1294         }
1295         return rv;
1296 }
1297
1298 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1299 {
1300         return revert_lock(r, lkb);
1301 }
1302
1303 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1304 {
1305         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1306                 lkb->lkb_grmode = lkb->lkb_rqmode;
1307                 if (lkb->lkb_status)
1308                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1309                 else
1310                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1311         }
1312
1313         lkb->lkb_rqmode = DLM_LOCK_IV;
1314 }
1315
1316 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1317 {
1318         set_lvb_lock(r, lkb);
1319         _grant_lock(r, lkb);
1320         lkb->lkb_highbast = 0;
1321 }
1322
1323 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1324                           struct dlm_message *ms)
1325 {
1326         set_lvb_lock_pc(r, lkb, ms);
1327         _grant_lock(r, lkb);
1328 }
1329
1330 /* called by grant_pending_locks() which means an async grant message must
1331    be sent to the requesting node in addition to granting the lock if the
1332    lkb belongs to a remote node. */
1333
1334 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1335 {
1336         grant_lock(r, lkb);
1337         if (is_master_copy(lkb))
1338                 send_grant(r, lkb);
1339         else
1340                 queue_cast(r, lkb, 0);
1341 }
1342
1343 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1344    change the granted/requested modes.  We're munging things accordingly in
1345    the process copy.
1346    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1347    conversion deadlock
1348    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1349    compatible with other granted locks */
1350
1351 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1352 {
1353         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1354                 log_print("munge_demoted %x invalid reply type %d",
1355                           lkb->lkb_id, ms->m_type);
1356                 return;
1357         }
1358
1359         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1360                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1361                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1362                 return;
1363         }
1364
1365         lkb->lkb_grmode = DLM_LOCK_NL;
1366 }
1367
1368 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1369 {
1370         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1371             ms->m_type != DLM_MSG_GRANT) {
1372                 log_print("munge_altmode %x invalid reply type %d",
1373                           lkb->lkb_id, ms->m_type);
1374                 return;
1375         }
1376
1377         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1378                 lkb->lkb_rqmode = DLM_LOCK_PR;
1379         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1380                 lkb->lkb_rqmode = DLM_LOCK_CW;
1381         else {
1382                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1383                 dlm_print_lkb(lkb);
1384         }
1385 }
1386
1387 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1388 {
1389         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1390                                            lkb_statequeue);
1391         if (lkb->lkb_id == first->lkb_id)
1392                 return 1;
1393
1394         return 0;
1395 }
1396
1397 /* Check if the given lkb conflicts with another lkb on the queue. */
1398
1399 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1400 {
1401         struct dlm_lkb *this;
1402
1403         list_for_each_entry(this, head, lkb_statequeue) {
1404                 if (this == lkb)
1405                         continue;
1406                 if (!modes_compat(this, lkb))
1407                         return 1;
1408         }
1409         return 0;
1410 }
1411
1412 /*
1413  * "A conversion deadlock arises with a pair of lock requests in the converting
1414  * queue for one resource.  The granted mode of each lock blocks the requested
1415  * mode of the other lock."
1416  *
1417  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1418  * convert queue from being granted, then deadlk/demote lkb.
1419  *
1420  * Example:
1421  * Granted Queue: empty
1422  * Convert Queue: NL->EX (first lock)
1423  *                PR->EX (second lock)
1424  *
1425  * The first lock can't be granted because of the granted mode of the second
1426  * lock and the second lock can't be granted because it's not first in the
1427  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1428  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1429  * flag set and return DEMOTED in the lksb flags.
1430  *
1431  * Originally, this function detected conv-deadlk in a more limited scope:
1432  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1433  * - if lkb1 was the first entry in the queue (not just earlier), and was
1434  *   blocked by the granted mode of lkb2, and there was nothing on the
1435  *   granted queue preventing lkb1 from being granted immediately, i.e.
1436  *   lkb2 was the only thing preventing lkb1 from being granted.
1437  *
1438  * That second condition meant we'd only say there was conv-deadlk if
1439  * resolving it (by demotion) would lead to the first lock on the convert
1440  * queue being granted right away.  It allowed conversion deadlocks to exist
1441  * between locks on the convert queue while they couldn't be granted anyway.
1442  *
1443  * Now, we detect and take action on conversion deadlocks immediately when
1444  * they're created, even if they may not be immediately consequential.  If
1445  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1446  * mode that would prevent lkb1's conversion from being granted, we do a
1447  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1448  * I think this means that the lkb_is_ahead condition below should always
1449  * be zero, i.e. there will never be conv-deadlk between two locks that are
1450  * both already on the convert queue.
1451  */
1452
1453 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1454 {
1455         struct dlm_lkb *lkb1;
1456         int lkb_is_ahead = 0;
1457
1458         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1459                 if (lkb1 == lkb2) {
1460                         lkb_is_ahead = 1;
1461                         continue;
1462                 }
1463
1464                 if (!lkb_is_ahead) {
1465                         if (!modes_compat(lkb2, lkb1))
1466                                 return 1;
1467                 } else {
1468                         if (!modes_compat(lkb2, lkb1) &&
1469                             !modes_compat(lkb1, lkb2))
1470                                 return 1;
1471                 }
1472         }
1473         return 0;
1474 }
1475
1476 /*
1477  * Return 1 if the lock can be granted, 0 otherwise.
1478  * Also detect and resolve conversion deadlocks.
1479  *
1480  * lkb is the lock to be granted
1481  *
1482  * now is 1 if the function is being called in the context of the
1483  * immediate request, it is 0 if called later, after the lock has been
1484  * queued.
1485  *
1486  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1487  */
1488
1489 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1490 {
1491         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1492
1493         /*
1494          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1495          * a new request for a NL mode lock being blocked.
1496          *
1497          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1498          * request, then it would be granted.  In essence, the use of this flag
1499          * tells the Lock Manager to expedite theis request by not considering
1500          * what may be in the CONVERTING or WAITING queues...  As of this
1501          * writing, the EXPEDITE flag can be used only with new requests for NL
1502          * mode locks.  This flag is not valid for conversion requests.
1503          *
1504          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1505          * conversion or used with a non-NL requested mode.  We also know an
1506          * EXPEDITE request is always granted immediately, so now must always
1507          * be 1.  The full condition to grant an expedite request: (now &&
1508          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1509          * therefore be shortened to just checking the flag.
1510          */
1511
1512         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1513                 return 1;
1514
1515         /*
1516          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1517          * added to the remaining conditions.
1518          */
1519
1520         if (queue_conflict(&r->res_grantqueue, lkb))
1521                 goto out;
1522
1523         /*
1524          * 6-3: By default, a conversion request is immediately granted if the
1525          * requested mode is compatible with the modes of all other granted
1526          * locks
1527          */
1528
1529         if (queue_conflict(&r->res_convertqueue, lkb))
1530                 goto out;
1531
1532         /*
1533          * 6-5: But the default algorithm for deciding whether to grant or
1534          * queue conversion requests does not by itself guarantee that such
1535          * requests are serviced on a "first come first serve" basis.  This, in
1536          * turn, can lead to a phenomenon known as "indefinate postponement".
1537          *
1538          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1539          * the system service employed to request a lock conversion.  This flag
1540          * forces certain conversion requests to be queued, even if they are
1541          * compatible with the granted modes of other locks on the same
1542          * resource.  Thus, the use of this flag results in conversion requests
1543          * being ordered on a "first come first servce" basis.
1544          *
1545          * DCT: This condition is all about new conversions being able to occur
1546          * "in place" while the lock remains on the granted queue (assuming
1547          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1548          * doesn't _have_ to go onto the convert queue where it's processed in
1549          * order.  The "now" variable is necessary to distinguish converts
1550          * being received and processed for the first time now, because once a
1551          * convert is moved to the conversion queue the condition below applies
1552          * requiring fifo granting.
1553          */
1554
1555         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1556                 return 1;
1557
1558         /*
1559          * The NOORDER flag is set to avoid the standard vms rules on grant
1560          * order.
1561          */
1562
1563         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1564                 return 1;
1565
1566         /*
1567          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1568          * granted until all other conversion requests ahead of it are granted
1569          * and/or canceled.
1570          */
1571
1572         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1573                 return 1;
1574
1575         /*
1576          * 6-4: By default, a new request is immediately granted only if all
1577          * three of the following conditions are satisfied when the request is
1578          * issued:
1579          * - The queue of ungranted conversion requests for the resource is
1580          *   empty.
1581          * - The queue of ungranted new requests for the resource is empty.
1582          * - The mode of the new request is compatible with the most
1583          *   restrictive mode of all granted locks on the resource.
1584          */
1585
1586         if (now && !conv && list_empty(&r->res_convertqueue) &&
1587             list_empty(&r->res_waitqueue))
1588                 return 1;
1589
1590         /*
1591          * 6-4: Once a lock request is in the queue of ungranted new requests,
1592          * it cannot be granted until the queue of ungranted conversion
1593          * requests is empty, all ungranted new requests ahead of it are
1594          * granted and/or canceled, and it is compatible with the granted mode
1595          * of the most restrictive lock granted on the resource.
1596          */
1597
1598         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1599             first_in_list(lkb, &r->res_waitqueue))
1600                 return 1;
1601  out:
1602         return 0;
1603 }
1604
1605 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1606                           int *err)
1607 {
1608         int rv;
1609         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1610         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1611
1612         if (err)
1613                 *err = 0;
1614
1615         rv = _can_be_granted(r, lkb, now);
1616         if (rv)
1617                 goto out;
1618
1619         /*
1620          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1621          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1622          * cancels one of the locks.
1623          */
1624
1625         if (is_convert && can_be_queued(lkb) &&
1626             conversion_deadlock_detect(r, lkb)) {
1627                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1628                         lkb->lkb_grmode = DLM_LOCK_NL;
1629                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1630                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1631                         if (err)
1632                                 *err = -EDEADLK;
1633                         else {
1634                                 log_print("can_be_granted deadlock %x now %d",
1635                                           lkb->lkb_id, now);
1636                                 dlm_dump_rsb(r);
1637                         }
1638                 }
1639                 goto out;
1640         }
1641
1642         /*
1643          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1644          * to grant a request in a mode other than the normal rqmode.  It's a
1645          * simple way to provide a big optimization to applications that can
1646          * use them.
1647          */
1648
1649         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1650                 alt = DLM_LOCK_PR;
1651         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1652                 alt = DLM_LOCK_CW;
1653
1654         if (alt) {
1655                 lkb->lkb_rqmode = alt;
1656                 rv = _can_be_granted(r, lkb, now);
1657                 if (rv)
1658                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1659                 else
1660                         lkb->lkb_rqmode = rqmode;
1661         }
1662  out:
1663         return rv;
1664 }
1665
1666 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1667    for locks pending on the convert list.  Once verified (watch for these
1668    log_prints), we should be able to just call _can_be_granted() and not
1669    bother with the demote/deadlk cases here (and there's no easy way to deal
1670    with a deadlk here, we'd have to generate something like grant_lock with
1671    the deadlk error.) */
1672
1673 /* Returns the highest requested mode of all blocked conversions; sets
1674    cw if there's a blocked conversion to DLM_LOCK_CW. */
1675
1676 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1677 {
1678         struct dlm_lkb *lkb, *s;
1679         int hi, demoted, quit, grant_restart, demote_restart;
1680         int deadlk;
1681
1682         quit = 0;
1683  restart:
1684         grant_restart = 0;
1685         demote_restart = 0;
1686         hi = DLM_LOCK_IV;
1687
1688         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1689                 demoted = is_demoted(lkb);
1690                 deadlk = 0;
1691
1692                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1693                         grant_lock_pending(r, lkb);
1694                         grant_restart = 1;
1695                         continue;
1696                 }
1697
1698                 if (!demoted && is_demoted(lkb)) {
1699                         log_print("WARN: pending demoted %x node %d %s",
1700                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1701                         demote_restart = 1;
1702                         continue;
1703                 }
1704
1705                 if (deadlk) {
1706                         log_print("WARN: pending deadlock %x node %d %s",
1707                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1708                         dlm_dump_rsb(r);
1709                         continue;
1710                 }
1711
1712                 hi = max_t(int, lkb->lkb_rqmode, hi);
1713
1714                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1715                         *cw = 1;
1716         }
1717
1718         if (grant_restart)
1719                 goto restart;
1720         if (demote_restart && !quit) {
1721                 quit = 1;
1722                 goto restart;
1723         }
1724
1725         return max_t(int, high, hi);
1726 }
1727
1728 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1729 {
1730         struct dlm_lkb *lkb, *s;
1731
1732         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1733                 if (can_be_granted(r, lkb, 0, NULL))
1734                         grant_lock_pending(r, lkb);
1735                 else {
1736                         high = max_t(int, lkb->lkb_rqmode, high);
1737                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1738                                 *cw = 1;
1739                 }
1740         }
1741
1742         return high;
1743 }
1744
1745 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1746    on either the convert or waiting queue.
1747    high is the largest rqmode of all locks blocked on the convert or
1748    waiting queue. */
1749
1750 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1751 {
1752         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1753                 if (gr->lkb_highbast < DLM_LOCK_EX)
1754                         return 1;
1755                 return 0;
1756         }
1757
1758         if (gr->lkb_highbast < high &&
1759             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1760                 return 1;
1761         return 0;
1762 }
1763
1764 static void grant_pending_locks(struct dlm_rsb *r)
1765 {
1766         struct dlm_lkb *lkb, *s;
1767         int high = DLM_LOCK_IV;
1768         int cw = 0;
1769
1770         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1771
1772         high = grant_pending_convert(r, high, &cw);
1773         high = grant_pending_wait(r, high, &cw);
1774
1775         if (high == DLM_LOCK_IV)
1776                 return;
1777
1778         /*
1779          * If there are locks left on the wait/convert queue then send blocking
1780          * ASTs to granted locks based on the largest requested mode (high)
1781          * found above.
1782          */
1783
1784         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1785                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1786                         if (cw && high == DLM_LOCK_PR &&
1787                             lkb->lkb_grmode == DLM_LOCK_PR)
1788                                 queue_bast(r, lkb, DLM_LOCK_CW);
1789                         else
1790                                 queue_bast(r, lkb, high);
1791                         lkb->lkb_highbast = high;
1792                 }
1793         }
1794 }
1795
1796 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1797 {
1798         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1799             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1800                 if (gr->lkb_highbast < DLM_LOCK_EX)
1801                         return 1;
1802                 return 0;
1803         }
1804
1805         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1806                 return 1;
1807         return 0;
1808 }
1809
1810 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1811                             struct dlm_lkb *lkb)
1812 {
1813         struct dlm_lkb *gr;
1814
1815         list_for_each_entry(gr, head, lkb_statequeue) {
1816                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1817                         queue_bast(r, gr, lkb->lkb_rqmode);
1818                         gr->lkb_highbast = lkb->lkb_rqmode;
1819                 }
1820         }
1821 }
1822
1823 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1824 {
1825         send_bast_queue(r, &r->res_grantqueue, lkb);
1826 }
1827
1828 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1829 {
1830         send_bast_queue(r, &r->res_grantqueue, lkb);
1831         send_bast_queue(r, &r->res_convertqueue, lkb);
1832 }
1833
1834 /* set_master(r, lkb) -- set the master nodeid of a resource
1835
1836    The purpose of this function is to set the nodeid field in the given
1837    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1838    known, it can just be copied to the lkb and the function will return
1839    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1840    before it can be copied to the lkb.
1841
1842    When the rsb nodeid is being looked up remotely, the initial lkb
1843    causing the lookup is kept on the ls_waiters list waiting for the
1844    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1845    on the rsb's res_lookup list until the master is verified.
1846
1847    Return values:
1848    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1849    1: the rsb master is not available and the lkb has been placed on
1850       a wait queue
1851 */
1852
1853 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1854 {
1855         struct dlm_ls *ls = r->res_ls;
1856         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1857
1858         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1859                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1860                 r->res_first_lkid = lkb->lkb_id;
1861                 lkb->lkb_nodeid = r->res_nodeid;
1862                 return 0;
1863         }
1864
1865         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1866                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1867                 return 1;
1868         }
1869
1870         if (r->res_nodeid == 0) {
1871                 lkb->lkb_nodeid = 0;
1872                 return 0;
1873         }
1874
1875         if (r->res_nodeid > 0) {
1876                 lkb->lkb_nodeid = r->res_nodeid;
1877                 return 0;
1878         }
1879
1880         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1881
1882         dir_nodeid = dlm_dir_nodeid(r);
1883
1884         if (dir_nodeid != our_nodeid) {
1885                 r->res_first_lkid = lkb->lkb_id;
1886                 send_lookup(r, lkb);
1887                 return 1;
1888         }
1889
1890         for (i = 0; i < 2; i++) {
1891                 /* It's possible for dlm_scand to remove an old rsb for
1892                    this same resource from the toss list, us to create
1893                    a new one, look up the master locally, and find it
1894                    already exists just before dlm_scand does the
1895                    dir_remove() on the previous rsb. */
1896
1897                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1898                                        r->res_length, &ret_nodeid);
1899                 if (!error)
1900                         break;
1901                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1902                 schedule();
1903         }
1904         if (error && error != -EEXIST)
1905                 return error;
1906
1907         if (ret_nodeid == our_nodeid) {
1908                 r->res_first_lkid = 0;
1909                 r->res_nodeid = 0;
1910                 lkb->lkb_nodeid = 0;
1911         } else {
1912                 r->res_first_lkid = lkb->lkb_id;
1913                 r->res_nodeid = ret_nodeid;
1914                 lkb->lkb_nodeid = ret_nodeid;
1915         }
1916         return 0;
1917 }
1918
1919 static void process_lookup_list(struct dlm_rsb *r)
1920 {
1921         struct dlm_lkb *lkb, *safe;
1922
1923         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1924                 list_del_init(&lkb->lkb_rsb_lookup);
1925                 _request_lock(r, lkb);
1926                 schedule();
1927         }
1928 }
1929
1930 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1931
1932 static void confirm_master(struct dlm_rsb *r, int error)
1933 {
1934         struct dlm_lkb *lkb;
1935
1936         if (!r->res_first_lkid)
1937                 return;
1938
1939         switch (error) {
1940         case 0:
1941         case -EINPROGRESS:
1942                 r->res_first_lkid = 0;
1943                 process_lookup_list(r);
1944                 break;
1945
1946         case -EAGAIN:
1947         case -EBADR:
1948         case -ENOTBLK:
1949                 /* the remote request failed and won't be retried (it was
1950                    a NOQUEUE, or has been canceled/unlocked); make a waiting
1951                    lkb the first_lkid */
1952
1953                 r->res_first_lkid = 0;
1954
1955                 if (!list_empty(&r->res_lookup)) {
1956                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1957                                          lkb_rsb_lookup);
1958                         list_del_init(&lkb->lkb_rsb_lookup);
1959                         r->res_first_lkid = lkb->lkb_id;
1960                         _request_lock(r, lkb);
1961                 }
1962                 break;
1963
1964         default:
1965                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1966         }
1967 }
1968
1969 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1970                          int namelen, unsigned long timeout_cs,
1971                          void (*ast) (void *astparam),
1972                          void *astparam,
1973                          void (*bast) (void *astparam, int mode),
1974                          struct dlm_args *args)
1975 {
1976         int rv = -EINVAL;
1977
1978         /* check for invalid arg usage */
1979
1980         if (mode < 0 || mode > DLM_LOCK_EX)
1981                 goto out;
1982
1983         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1984                 goto out;
1985
1986         if (flags & DLM_LKF_CANCEL)
1987                 goto out;
1988
1989         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1990                 goto out;
1991
1992         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1993                 goto out;
1994
1995         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1996                 goto out;
1997
1998         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1999                 goto out;
2000
2001         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2002                 goto out;
2003
2004         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2005                 goto out;
2006
2007         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2008                 goto out;
2009
2010         if (!ast || !lksb)
2011                 goto out;
2012
2013         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2014                 goto out;
2015
2016         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2017                 goto out;
2018
2019         /* these args will be copied to the lkb in validate_lock_args,
2020            it cannot be done now because when converting locks, fields in
2021            an active lkb cannot be modified before locking the rsb */
2022
2023         args->flags = flags;
2024         args->astfn = ast;
2025         args->astparam = astparam;
2026         args->bastfn = bast;
2027         args->timeout = timeout_cs;
2028         args->mode = mode;
2029         args->lksb = lksb;
2030         rv = 0;
2031  out:
2032         return rv;
2033 }
2034
2035 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2036 {
2037         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2038                       DLM_LKF_FORCEUNLOCK))
2039                 return -EINVAL;
2040
2041         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2042                 return -EINVAL;
2043
2044         args->flags = flags;
2045         args->astparam = astarg;
2046         return 0;
2047 }
2048
2049 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2050                               struct dlm_args *args)
2051 {
2052         int rv = -EINVAL;
2053
2054         if (args->flags & DLM_LKF_CONVERT) {
2055                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2056                         goto out;
2057
2058                 if (args->flags & DLM_LKF_QUECVT &&
2059                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2060                         goto out;
2061
2062                 rv = -EBUSY;
2063                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2064                         goto out;
2065
2066                 if (lkb->lkb_wait_type)
2067                         goto out;
2068
2069                 if (is_overlap(lkb))
2070                         goto out;
2071         }
2072
2073         lkb->lkb_exflags = args->flags;
2074         lkb->lkb_sbflags = 0;
2075         lkb->lkb_astfn = args->astfn;
2076         lkb->lkb_astparam = args->astparam;
2077         lkb->lkb_bastfn = args->bastfn;
2078         lkb->lkb_rqmode = args->mode;
2079         lkb->lkb_lksb = args->lksb;
2080         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2081         lkb->lkb_ownpid = (int) current->pid;
2082         lkb->lkb_timeout_cs = args->timeout;
2083         rv = 0;
2084  out:
2085         return rv;
2086 }
2087
2088 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2089    for success */
2090
2091 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2092    because there may be a lookup in progress and it's valid to do
2093    cancel/unlockf on it */
2094
2095 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2096 {
2097         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2098         int rv = -EINVAL;
2099
2100         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2101                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2102                 dlm_print_lkb(lkb);
2103                 goto out;
2104         }
2105
2106         /* an lkb may still exist even though the lock is EOL'ed due to a
2107            cancel, unlock or failed noqueue request; an app can't use these
2108            locks; return same error as if the lkid had not been found at all */
2109
2110         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2111                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2112                 rv = -ENOENT;
2113                 goto out;
2114         }
2115
2116         /* an lkb may be waiting for an rsb lookup to complete where the
2117            lookup was initiated by another lock */
2118
2119         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2120                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2121                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2122                         list_del_init(&lkb->lkb_rsb_lookup);
2123                         queue_cast(lkb->lkb_resource, lkb,
2124                                    args->flags & DLM_LKF_CANCEL ?
2125                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2126                         unhold_lkb(lkb); /* undoes create_lkb() */
2127                 }
2128                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2129                 rv = -EBUSY;
2130                 goto out;
2131         }
2132
2133         /* cancel not allowed with another cancel/unlock in progress */
2134
2135         if (args->flags & DLM_LKF_CANCEL) {
2136                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2137                         goto out;
2138
2139                 if (is_overlap(lkb))
2140                         goto out;
2141
2142                 /* don't let scand try to do a cancel */
2143                 del_timeout(lkb);
2144
2145                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2146                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2147                         rv = -EBUSY;
2148                         goto out;
2149                 }
2150
2151                 switch (lkb->lkb_wait_type) {
2152                 case DLM_MSG_LOOKUP:
2153                 case DLM_MSG_REQUEST:
2154                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2155                         rv = -EBUSY;
2156                         goto out;
2157                 case DLM_MSG_UNLOCK:
2158                 case DLM_MSG_CANCEL:
2159                         goto out;
2160                 }
2161                 /* add_to_waiters() will set OVERLAP_CANCEL */
2162                 goto out_ok;
2163         }
2164
2165         /* do we need to allow a force-unlock if there's a normal unlock
2166            already in progress?  in what conditions could the normal unlock
2167            fail such that we'd want to send a force-unlock to be sure? */
2168
2169         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2170                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2171                         goto out;
2172
2173                 if (is_overlap_unlock(lkb))
2174                         goto out;
2175
2176                 /* don't let scand try to do a cancel */
2177                 del_timeout(lkb);
2178
2179                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2180                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2181                         rv = -EBUSY;
2182                         goto out;
2183                 }
2184
2185                 switch (lkb->lkb_wait_type) {
2186                 case DLM_MSG_LOOKUP:
2187                 case DLM_MSG_REQUEST:
2188                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2189                         rv = -EBUSY;
2190                         goto out;
2191                 case DLM_MSG_UNLOCK:
2192                         goto out;
2193                 }
2194                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2195                 goto out_ok;
2196         }
2197
2198         /* normal unlock not allowed if there's any op in progress */
2199         rv = -EBUSY;
2200         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2201                 goto out;
2202
2203  out_ok:
2204         /* an overlapping op shouldn't blow away exflags from other op */
2205         lkb->lkb_exflags |= args->flags;
2206         lkb->lkb_sbflags = 0;
2207         lkb->lkb_astparam = args->astparam;
2208         rv = 0;
2209  out:
2210         if (rv)
2211                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2212                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2213                           args->flags, lkb->lkb_wait_type,
2214                           lkb->lkb_resource->res_name);
2215         return rv;
2216 }
2217
2218 /*
2219  * Four stage 4 varieties:
2220  * do_request(), do_convert(), do_unlock(), do_cancel()
2221  * These are called on the master node for the given lock and
2222  * from the central locking logic.
2223  */
2224
2225 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2226 {
2227         int error = 0;
2228
2229         if (can_be_granted(r, lkb, 1, NULL)) {
2230                 grant_lock(r, lkb);
2231                 queue_cast(r, lkb, 0);
2232                 goto out;
2233         }
2234
2235         if (can_be_queued(lkb)) {
2236                 error = -EINPROGRESS;
2237                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2238                 send_blocking_asts(r, lkb);
2239                 add_timeout(lkb);
2240                 goto out;
2241         }
2242
2243         error = -EAGAIN;
2244         if (force_blocking_asts(lkb))
2245                 send_blocking_asts_all(r, lkb);
2246         queue_cast(r, lkb, -EAGAIN);
2247
2248  out:
2249         return error;
2250 }
2251
2252 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2253 {
2254         int error = 0;
2255         int deadlk = 0;
2256
2257         /* changing an existing lock may allow others to be granted */
2258
2259         if (can_be_granted(r, lkb, 1, &deadlk)) {
2260                 grant_lock(r, lkb);
2261                 queue_cast(r, lkb, 0);
2262                 grant_pending_locks(r);
2263                 goto out;
2264         }
2265
2266         /* can_be_granted() detected that this lock would block in a conversion
2267            deadlock, so we leave it on the granted queue and return EDEADLK in
2268            the ast for the convert. */
2269
2270         if (deadlk) {
2271                 /* it's left on the granted queue */
2272                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2273                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2274                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2275                 revert_lock(r, lkb);
2276                 queue_cast(r, lkb, -EDEADLK);
2277                 error = -EDEADLK;
2278                 goto out;
2279         }
2280
2281         /* is_demoted() means the can_be_granted() above set the grmode
2282            to NL, and left us on the granted queue.  This auto-demotion
2283            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2284            now grantable.  We have to try to grant other converting locks
2285            before we try again to grant this one. */
2286
2287         if (is_demoted(lkb)) {
2288                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2289                 if (_can_be_granted(r, lkb, 1)) {
2290                         grant_lock(r, lkb);
2291                         queue_cast(r, lkb, 0);
2292                         grant_pending_locks(r);
2293                         goto out;
2294                 }
2295                 /* else fall through and move to convert queue */
2296         }
2297
2298         if (can_be_queued(lkb)) {
2299                 error = -EINPROGRESS;
2300                 del_lkb(r, lkb);
2301                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2302                 send_blocking_asts(r, lkb);
2303                 add_timeout(lkb);
2304                 goto out;
2305         }
2306
2307         error = -EAGAIN;
2308         if (force_blocking_asts(lkb))
2309                 send_blocking_asts_all(r, lkb);
2310         queue_cast(r, lkb, -EAGAIN);
2311
2312  out:
2313         return error;
2314 }
2315
2316 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2317 {
2318         remove_lock(r, lkb);
2319         queue_cast(r, lkb, -DLM_EUNLOCK);
2320         grant_pending_locks(r);
2321         return -DLM_EUNLOCK;
2322 }
2323
2324 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2325  
2326 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2327 {
2328         int error;
2329
2330         error = revert_lock(r, lkb);
2331         if (error) {
2332                 queue_cast(r, lkb, -DLM_ECANCEL);
2333                 grant_pending_locks(r);
2334                 return -DLM_ECANCEL;
2335         }
2336         return 0;
2337 }
2338
2339 /*
2340  * Four stage 3 varieties:
2341  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2342  */
2343
2344 /* add a new lkb to a possibly new rsb, called by requesting process */
2345
2346 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2347 {
2348         int error;
2349
2350         /* set_master: sets lkb nodeid from r */
2351
2352         error = set_master(r, lkb);
2353         if (error < 0)
2354                 goto out;
2355         if (error) {
2356                 error = 0;
2357                 goto out;
2358         }
2359
2360         if (is_remote(r))
2361                 /* receive_request() calls do_request() on remote node */
2362                 error = send_request(r, lkb);
2363         else
2364                 error = do_request(r, lkb);
2365  out:
2366         return error;
2367 }
2368
2369 /* change some property of an existing lkb, e.g. mode */
2370
2371 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2372 {
2373         int error;
2374
2375         if (is_remote(r))
2376                 /* receive_convert() calls do_convert() on remote node */
2377                 error = send_convert(r, lkb);
2378         else
2379                 error = do_convert(r, lkb);
2380
2381         return error;
2382 }
2383
2384 /* remove an existing lkb from the granted queue */
2385
2386 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2387 {
2388         int error;
2389
2390         if (is_remote(r))
2391                 /* receive_unlock() calls do_unlock() on remote node */
2392                 error = send_unlock(r, lkb);
2393         else
2394                 error = do_unlock(r, lkb);
2395
2396         return error;
2397 }
2398
2399 /* remove an existing lkb from the convert or wait queue */
2400
2401 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2402 {
2403         int error;
2404
2405         if (is_remote(r))
2406                 /* receive_cancel() calls do_cancel() on remote node */
2407                 error = send_cancel(r, lkb);
2408         else
2409                 error = do_cancel(r, lkb);
2410
2411         return error;
2412 }
2413
2414 /*
2415  * Four stage 2 varieties:
2416  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2417  */
2418
2419 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2420                         int len, struct dlm_args *args)
2421 {
2422         struct dlm_rsb *r;
2423         int error;
2424
2425         error = validate_lock_args(ls, lkb, args);
2426         if (error)
2427                 goto out;
2428
2429         error = find_rsb(ls, name, len, R_CREATE, &r);
2430         if (error)
2431                 goto out;
2432
2433         lock_rsb(r);
2434
2435         attach_lkb(r, lkb);
2436         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2437
2438         error = _request_lock(r, lkb);
2439
2440         unlock_rsb(r);
2441         put_rsb(r);
2442
2443  out:
2444         return error;
2445 }
2446
2447 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2448                         struct dlm_args *args)
2449 {
2450         struct dlm_rsb *r;
2451         int error;
2452
2453         r = lkb->lkb_resource;
2454
2455         hold_rsb(r);
2456         lock_rsb(r);
2457
2458         error = validate_lock_args(ls, lkb, args);
2459         if (error)
2460                 goto out;
2461
2462         error = _convert_lock(r, lkb);
2463  out:
2464         unlock_rsb(r);
2465         put_rsb(r);
2466         return error;
2467 }
2468
2469 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2470                        struct dlm_args *args)
2471 {
2472         struct dlm_rsb *r;
2473         int error;
2474
2475         r = lkb->lkb_resource;
2476
2477         hold_rsb(r);
2478         lock_rsb(r);
2479
2480         error = validate_unlock_args(lkb, args);
2481         if (error)
2482                 goto out;
2483
2484         error = _unlock_lock(r, lkb);
2485  out:
2486         unlock_rsb(r);
2487         put_rsb(r);
2488         return error;
2489 }
2490
2491 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2492                        struct dlm_args *args)
2493 {
2494         struct dlm_rsb *r;
2495         int error;
2496
2497         r = lkb->lkb_resource;
2498
2499         hold_rsb(r);
2500         lock_rsb(r);
2501
2502         error = validate_unlock_args(lkb, args);
2503         if (error)
2504                 goto out;
2505
2506         error = _cancel_lock(r, lkb);
2507  out:
2508         unlock_rsb(r);
2509         put_rsb(r);
2510         return error;
2511 }
2512
2513 /*
2514  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2515  */
2516
2517 int dlm_lock(dlm_lockspace_t *lockspace,
2518              int mode,
2519              struct dlm_lksb *lksb,
2520              uint32_t flags,
2521              void *name,
2522              unsigned int namelen,
2523              uint32_t parent_lkid,
2524              void (*ast) (void *astarg),
2525              void *astarg,
2526              void (*bast) (void *astarg, int mode))
2527 {
2528         struct dlm_ls *ls;
2529         struct dlm_lkb *lkb;
2530         struct dlm_args args;
2531         int error, convert = flags & DLM_LKF_CONVERT;
2532
2533         ls = dlm_find_lockspace_local(lockspace);
2534         if (!ls)
2535                 return -EINVAL;
2536
2537         dlm_lock_recovery(ls);
2538
2539         if (convert)
2540                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2541         else
2542                 error = create_lkb(ls, &lkb);
2543
2544         if (error)
2545                 goto out;
2546
2547         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2548                               astarg, bast, &args);
2549         if (error)
2550                 goto out_put;
2551
2552         if (convert)
2553                 error = convert_lock(ls, lkb, &args);
2554         else
2555                 error = request_lock(ls, lkb, name, namelen, &args);
2556
2557         if (error == -EINPROGRESS)
2558                 error = 0;
2559  out_put:
2560         if (convert || error)
2561                 __put_lkb(ls, lkb);
2562         if (error == -EAGAIN || error == -EDEADLK)
2563                 error = 0;
2564  out:
2565         dlm_unlock_recovery(ls);
2566         dlm_put_lockspace(ls);
2567         return error;
2568 }
2569
2570 int dlm_unlock(dlm_lockspace_t *lockspace,
2571                uint32_t lkid,
2572                uint32_t flags,
2573                struct dlm_lksb *lksb,
2574                void *astarg)
2575 {
2576         struct dlm_ls *ls;
2577         struct dlm_lkb *lkb;
2578         struct dlm_args args;
2579         int error;
2580
2581         ls = dlm_find_lockspace_local(lockspace);
2582         if (!ls)
2583                 return -EINVAL;
2584
2585         dlm_lock_recovery(ls);
2586
2587         error = find_lkb(ls, lkid, &lkb);
2588         if (error)
2589                 goto out;
2590
2591         error = set_unlock_args(flags, astarg, &args);
2592         if (error)
2593                 goto out_put;
2594
2595         if (flags & DLM_LKF_CANCEL)
2596                 error = cancel_lock(ls, lkb, &args);
2597         else
2598                 error = unlock_lock(ls, lkb, &args);
2599
2600         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2601                 error = 0;
2602         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2603                 error = 0;
2604  out_put:
2605         dlm_put_lkb(lkb);
2606  out:
2607         dlm_unlock_recovery(ls);
2608         dlm_put_lockspace(ls);
2609         return error;
2610 }
2611
2612 /*
2613  * send/receive routines for remote operations and replies
2614  *
2615  * send_args
2616  * send_common
2617  * send_request                 receive_request
2618  * send_convert                 receive_convert
2619  * send_unlock                  receive_unlock
2620  * send_cancel                  receive_cancel
2621  * send_grant                   receive_grant
2622  * send_bast                    receive_bast
2623  * send_lookup                  receive_lookup
2624  * send_remove                  receive_remove
2625  *
2626  *                              send_common_reply
2627  * receive_request_reply        send_request_reply
2628  * receive_convert_reply        send_convert_reply
2629  * receive_unlock_reply         send_unlock_reply
2630  * receive_cancel_reply         send_cancel_reply
2631  * receive_lookup_reply         send_lookup_reply
2632  */
2633
2634 static int _create_message(struct dlm_ls *ls, int mb_len,
2635                            int to_nodeid, int mstype,
2636                            struct dlm_message **ms_ret,
2637                            struct dlm_mhandle **mh_ret)
2638 {
2639         struct dlm_message *ms;
2640         struct dlm_mhandle *mh;
2641         char *mb;
2642
2643         /* get_buffer gives us a message handle (mh) that we need to
2644            pass into lowcomms_commit and a message buffer (mb) that we
2645            write our data into */
2646
2647         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2648         if (!mh)
2649                 return -ENOBUFS;
2650
2651         memset(mb, 0, mb_len);
2652
2653         ms = (struct dlm_message *) mb;
2654
2655         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2656         ms->m_header.h_lockspace = ls->ls_global_id;
2657         ms->m_header.h_nodeid = dlm_our_nodeid();
2658         ms->m_header.h_length = mb_len;
2659         ms->m_header.h_cmd = DLM_MSG;
2660
2661         ms->m_type = mstype;
2662
2663         *mh_ret = mh;
2664         *ms_ret = ms;
2665         return 0;
2666 }
2667
2668 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2669                           int to_nodeid, int mstype,
2670                           struct dlm_message **ms_ret,
2671                           struct dlm_mhandle **mh_ret)
2672 {
2673         int mb_len = sizeof(struct dlm_message);
2674
2675         switch (mstype) {
2676         case DLM_MSG_REQUEST:
2677         case DLM_MSG_LOOKUP:
2678         case DLM_MSG_REMOVE:
2679                 mb_len += r->res_length;
2680                 break;
2681         case DLM_MSG_CONVERT:
2682         case DLM_MSG_UNLOCK:
2683         case DLM_MSG_REQUEST_REPLY:
2684         case DLM_MSG_CONVERT_REPLY:
2685         case DLM_MSG_GRANT:
2686                 if (lkb && lkb->lkb_lvbptr)
2687                         mb_len += r->res_ls->ls_lvblen;
2688                 break;
2689         }
2690
2691         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2692                                ms_ret, mh_ret);
2693 }
2694
2695 /* further lowcomms enhancements or alternate implementations may make
2696    the return value from this function useful at some point */
2697
2698 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2699 {
2700         dlm_message_out(ms);
2701         dlm_lowcomms_commit_buffer(mh);
2702         return 0;
2703 }
2704
2705 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2706                       struct dlm_message *ms)
2707 {
2708         ms->m_nodeid   = lkb->lkb_nodeid;
2709         ms->m_pid      = lkb->lkb_ownpid;
2710         ms->m_lkid     = lkb->lkb_id;
2711         ms->m_remid    = lkb->lkb_remid;
2712         ms->m_exflags  = lkb->lkb_exflags;
2713         ms->m_sbflags  = lkb->lkb_sbflags;
2714         ms->m_flags    = lkb->lkb_flags;
2715         ms->m_lvbseq   = lkb->lkb_lvbseq;
2716         ms->m_status   = lkb->lkb_status;
2717         ms->m_grmode   = lkb->lkb_grmode;
2718         ms->m_rqmode   = lkb->lkb_rqmode;
2719         ms->m_hash     = r->res_hash;
2720
2721         /* m_result and m_bastmode are set from function args,
2722            not from lkb fields */
2723
2724         if (lkb->lkb_bastfn)
2725                 ms->m_asts |= AST_BAST;
2726         if (lkb->lkb_astfn)
2727                 ms->m_asts |= AST_COMP;
2728
2729         /* compare with switch in create_message; send_remove() doesn't
2730            use send_args() */
2731
2732         switch (ms->m_type) {
2733         case DLM_MSG_REQUEST:
2734         case DLM_MSG_LOOKUP:
2735                 memcpy(ms->m_extra, r->res_name, r->res_length);
2736                 break;
2737         case DLM_MSG_CONVERT:
2738         case DLM_MSG_UNLOCK:
2739         case DLM_MSG_REQUEST_REPLY:
2740         case DLM_MSG_CONVERT_REPLY:
2741         case DLM_MSG_GRANT:
2742                 if (!lkb->lkb_lvbptr)
2743                         break;
2744                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2745                 break;
2746         }
2747 }
2748
2749 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2750 {
2751         struct dlm_message *ms;
2752         struct dlm_mhandle *mh;
2753         int to_nodeid, error;
2754
2755         error = add_to_waiters(lkb, mstype);
2756         if (error)
2757                 return error;
2758
2759         to_nodeid = r->res_nodeid;
2760
2761         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2762         if (error)
2763                 goto fail;
2764
2765         send_args(r, lkb, ms);
2766
2767         error = send_message(mh, ms);
2768         if (error)
2769                 goto fail;
2770         return 0;
2771
2772  fail:
2773         remove_from_waiters(lkb, msg_reply_type(mstype));
2774         return error;
2775 }
2776
2777 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2778 {
2779         return send_common(r, lkb, DLM_MSG_REQUEST);
2780 }
2781
2782 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2783 {
2784         int error;
2785
2786         error = send_common(r, lkb, DLM_MSG_CONVERT);
2787
2788         /* down conversions go without a reply from the master */
2789         if (!error && down_conversion(lkb)) {
2790                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2791                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2792                 r->res_ls->ls_stub_ms.m_result = 0;
2793                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2794                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2795         }
2796
2797         return error;
2798 }
2799
2800 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2801    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2802    that the master is still correct. */
2803
2804 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2805 {
2806         return send_common(r, lkb, DLM_MSG_UNLOCK);
2807 }
2808
2809 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2810 {
2811         return send_common(r, lkb, DLM_MSG_CANCEL);
2812 }
2813
2814 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2815 {
2816         struct dlm_message *ms;
2817         struct dlm_mhandle *mh;
2818         int to_nodeid, error;
2819
2820         to_nodeid = lkb->lkb_nodeid;
2821
2822         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2823         if (error)
2824                 goto out;
2825
2826         send_args(r, lkb, ms);
2827
2828         ms->m_result = 0;
2829
2830         error = send_message(mh, ms);
2831  out:
2832         return error;
2833 }
2834
2835 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2836 {
2837         struct dlm_message *ms;
2838         struct dlm_mhandle *mh;
2839         int to_nodeid, error;
2840
2841         to_nodeid = lkb->lkb_nodeid;
2842
2843         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2844         if (error)
2845                 goto out;
2846
2847         send_args(r, lkb, ms);
2848
2849         ms->m_bastmode = mode;
2850
2851         error = send_message(mh, ms);
2852  out:
2853         return error;
2854 }
2855
2856 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2857 {
2858         struct dlm_message *ms;
2859         struct dlm_mhandle *mh;
2860         int to_nodeid, error;
2861
2862         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2863         if (error)
2864                 return error;
2865
2866         to_nodeid = dlm_dir_nodeid(r);
2867
2868         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2869         if (error)
2870                 goto fail;
2871
2872         send_args(r, lkb, ms);
2873
2874         error = send_message(mh, ms);
2875         if (error)
2876                 goto fail;
2877         return 0;
2878
2879  fail:
2880         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2881         return error;
2882 }
2883
2884 static int send_remove(struct dlm_rsb *r)
2885 {
2886         struct dlm_message *ms;
2887         struct dlm_mhandle *mh;
2888         int to_nodeid, error;
2889
2890         to_nodeid = dlm_dir_nodeid(r);
2891
2892         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2893         if (error)
2894                 goto out;
2895
2896         memcpy(ms->m_extra, r->res_name, r->res_length);
2897         ms->m_hash = r->res_hash;
2898
2899         error = send_message(mh, ms);
2900  out:
2901         return error;
2902 }
2903
2904 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2905                              int mstype, int rv)
2906 {
2907         struct dlm_message *ms;
2908         struct dlm_mhandle *mh;
2909         int to_nodeid, error;
2910
2911         to_nodeid = lkb->lkb_nodeid;
2912
2913         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2914         if (error)
2915                 goto out;
2916
2917         send_args(r, lkb, ms);
2918
2919         ms->m_result = rv;
2920
2921         error = send_message(mh, ms);
2922  out:
2923         return error;
2924 }
2925
2926 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2927 {
2928         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2929 }
2930
2931 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2932 {
2933         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2934 }
2935
2936 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2937 {
2938         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2939 }
2940
2941 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2942 {
2943         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2944 }
2945
2946 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2947                              int ret_nodeid, int rv)
2948 {
2949         struct dlm_rsb *r = &ls->ls_stub_rsb;
2950         struct dlm_message *ms;
2951         struct dlm_mhandle *mh;
2952         int error, nodeid = ms_in->m_header.h_nodeid;
2953
2954         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2955         if (error)
2956                 goto out;
2957
2958         ms->m_lkid = ms_in->m_lkid;
2959         ms->m_result = rv;
2960         ms->m_nodeid = ret_nodeid;
2961
2962         error = send_message(mh, ms);
2963  out:
2964         return error;
2965 }
2966
2967 /* which args we save from a received message depends heavily on the type
2968    of message, unlike the send side where we can safely send everything about
2969    the lkb for any type of message */
2970
2971 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2972 {
2973         lkb->lkb_exflags = ms->m_exflags;
2974         lkb->lkb_sbflags = ms->m_sbflags;
2975         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2976                          (ms->m_flags & 0x0000FFFF);
2977 }
2978
2979 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2980 {
2981         lkb->lkb_sbflags = ms->m_sbflags;
2982         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2983                          (ms->m_flags & 0x0000FFFF);
2984 }
2985
2986 static int receive_extralen(struct dlm_message *ms)
2987 {
2988         return (ms->m_header.h_length - sizeof(struct dlm_message));
2989 }
2990
2991 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2992                        struct dlm_message *ms)
2993 {
2994         int len;
2995
2996         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2997                 if (!lkb->lkb_lvbptr)
2998                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
2999                 if (!lkb->lkb_lvbptr)
3000                         return -ENOMEM;
3001                 len = receive_extralen(ms);
3002                 if (len > DLM_RESNAME_MAXLEN)
3003                         len = DLM_RESNAME_MAXLEN;
3004                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3005         }
3006         return 0;
3007 }
3008
3009 static void fake_bastfn(void *astparam, int mode)
3010 {
3011         log_print("fake_bastfn should not be called");
3012 }
3013
3014 static void fake_astfn(void *astparam)
3015 {
3016         log_print("fake_astfn should not be called");
3017 }
3018
3019 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3020                                 struct dlm_message *ms)
3021 {
3022         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3023         lkb->lkb_ownpid = ms->m_pid;
3024         lkb->lkb_remid = ms->m_lkid;
3025         lkb->lkb_grmode = DLM_LOCK_IV;
3026         lkb->lkb_rqmode = ms->m_rqmode;
3027
3028         lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3029         lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3030
3031         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3032                 /* lkb was just created so there won't be an lvb yet */
3033                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3034                 if (!lkb->lkb_lvbptr)
3035                         return -ENOMEM;
3036         }
3037
3038         return 0;
3039 }
3040
3041 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3042                                 struct dlm_message *ms)
3043 {
3044         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3045                 return -EBUSY;
3046
3047         if (receive_lvb(ls, lkb, ms))
3048                 return -ENOMEM;
3049
3050         lkb->lkb_rqmode = ms->m_rqmode;
3051         lkb->lkb_lvbseq = ms->m_lvbseq;
3052
3053         return 0;
3054 }
3055
3056 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3057                                struct dlm_message *ms)
3058 {
3059         if (receive_lvb(ls, lkb, ms))
3060                 return -ENOMEM;
3061         return 0;
3062 }
3063
3064 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3065    uses to send a reply and that the remote end uses to process the reply. */
3066
3067 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3068 {
3069         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3070         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3071         lkb->lkb_remid = ms->m_lkid;
3072 }
3073
3074 /* This is called after the rsb is locked so that we can safely inspect
3075    fields in the lkb. */
3076
3077 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3078 {
3079         int from = ms->m_header.h_nodeid;
3080         int error = 0;
3081
3082         switch (ms->m_type) {
3083         case DLM_MSG_CONVERT:
3084         case DLM_MSG_UNLOCK:
3085         case DLM_MSG_CANCEL:
3086                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3087                         error = -EINVAL;
3088                 break;
3089
3090         case DLM_MSG_CONVERT_REPLY:
3091         case DLM_MSG_UNLOCK_REPLY:
3092         case DLM_MSG_CANCEL_REPLY:
3093         case DLM_MSG_GRANT:
3094         case DLM_MSG_BAST:
3095                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3096                         error = -EINVAL;
3097                 break;
3098
3099         case DLM_MSG_REQUEST_REPLY:
3100                 if (!is_process_copy(lkb))
3101                         error = -EINVAL;
3102                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3103                         error = -EINVAL;
3104                 break;
3105
3106         default:
3107                 error = -EINVAL;
3108         }
3109
3110         if (error)
3111                 log_error(lkb->lkb_resource->res_ls,
3112                           "ignore invalid message %d from %d %x %x %x %d",
3113                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3114                           lkb->lkb_flags, lkb->lkb_nodeid);
3115         return error;
3116 }
3117
3118 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3119 {
3120         struct dlm_lkb *lkb;
3121         struct dlm_rsb *r;
3122         int error, namelen;
3123
3124         error = create_lkb(ls, &lkb);
3125         if (error)
3126                 goto fail;
3127
3128         receive_flags(lkb, ms);
3129         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3130         error = receive_request_args(ls, lkb, ms);
3131         if (error) {
3132                 __put_lkb(ls, lkb);
3133                 goto fail;
3134         }
3135
3136         namelen = receive_extralen(ms);
3137
3138         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3139         if (error) {
3140                 __put_lkb(ls, lkb);
3141                 goto fail;
3142         }
3143
3144         lock_rsb(r);
3145
3146         attach_lkb(r, lkb);
3147         error = do_request(r, lkb);
3148         send_request_reply(r, lkb, error);
3149
3150         unlock_rsb(r);
3151         put_rsb(r);
3152
3153         if (error == -EINPROGRESS)
3154                 error = 0;
3155         if (error)
3156                 dlm_put_lkb(lkb);
3157         return;
3158
3159  fail:
3160         setup_stub_lkb(ls, ms);
3161         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3162 }
3163
3164 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3165 {
3166         struct dlm_lkb *lkb;
3167         struct dlm_rsb *r;
3168         int error, reply = 1;
3169
3170         error = find_lkb(ls, ms->m_remid, &lkb);
3171         if (error)
3172                 goto fail;
3173
3174         r = lkb->lkb_resource;
3175
3176         hold_rsb(r);
3177         lock_rsb(r);
3178
3179         error = validate_message(lkb, ms);
3180         if (error)
3181                 goto out;
3182
3183         receive_flags(lkb, ms);
3184         error = receive_convert_args(ls, lkb, ms);
3185         if (error)
3186                 goto out_reply;
3187         reply = !down_conversion(lkb);
3188
3189         error = do_convert(r, lkb);
3190  out_reply:
3191         if (reply)
3192                 send_convert_reply(r, lkb, error);
3193  out:
3194         unlock_rsb(r);
3195         put_rsb(r);
3196         dlm_put_lkb(lkb);
3197         return;
3198
3199  fail:
3200         setup_stub_lkb(ls, ms);
3201         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3202 }
3203
3204 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3205 {
3206         struct dlm_lkb *lkb;
3207         struct dlm_rsb *r;
3208         int error;
3209
3210         error = find_lkb(ls, ms->m_remid, &lkb);
3211         if (error)
3212                 goto fail;
3213
3214         r = lkb->lkb_resource;
3215
3216         hold_rsb(r);
3217         lock_rsb(r);
3218
3219         error = validate_message(lkb, ms);
3220         if (error)
3221                 goto out;
3222
3223         receive_flags(lkb, ms);
3224         error = receive_unlock_args(ls, lkb, ms);
3225         if (error)
3226                 goto out_reply;
3227
3228         error = do_unlock(r, lkb);
3229  out_reply:
3230         send_unlock_reply(r, lkb, error);
3231  out:
3232         unlock_rsb(r);
3233         put_rsb(r);
3234         dlm_put_lkb(lkb);
3235         return;
3236
3237  fail:
3238         setup_stub_lkb(ls, ms);
3239         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3240 }
3241
3242 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3243 {
3244         struct dlm_lkb *lkb;
3245         struct dlm_rsb *r;
3246         int error;
3247
3248         error = find_lkb(ls, ms->m_remid, &lkb);
3249         if (error)
3250                 goto fail;
3251
3252         receive_flags(lkb, ms);
3253
3254         r = lkb->lkb_resource;
3255
3256         hold_rsb(r);
3257         lock_rsb(r);
3258
3259         error = validate_message(lkb, ms);
3260         if (error)
3261                 goto out;
3262
3263         error = do_cancel(r, lkb);
3264         send_cancel_reply(r, lkb, error);
3265  out:
3266         unlock_rsb(r);
3267         put_rsb(r);
3268         dlm_put_lkb(lkb);
3269         return;
3270
3271  fail:
3272         setup_stub_lkb(ls, ms);
3273         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3274 }
3275
3276 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3277 {
3278         struct dlm_lkb *lkb;
3279         struct dlm_rsb *r;
3280         int error;
3281
3282         error = find_lkb(ls, ms->m_remid, &lkb);
3283         if (error) {
3284                 log_debug(ls, "receive_grant from %d no lkb %x",
3285                           ms->m_header.h_nodeid, ms->m_remid);
3286                 return;
3287         }
3288
3289         r = lkb->lkb_resource;
3290
3291         hold_rsb(r);
3292         lock_rsb(r);
3293
3294         error = validate_message(lkb, ms);
3295         if (error)
3296                 goto out;
3297
3298         receive_flags_reply(lkb, ms);
3299         if (is_altmode(lkb))
3300                 munge_altmode(lkb, ms);
3301         grant_lock_pc(r, lkb, ms);
3302         queue_cast(r, lkb, 0);
3303  out:
3304         unlock_rsb(r);
3305         put_rsb(r);
3306         dlm_put_lkb(lkb);
3307 }
3308
3309 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3310 {
3311         struct dlm_lkb *lkb;
3312         struct dlm_rsb *r;
3313         int error;
3314
3315         error = find_lkb(ls, ms->m_remid, &lkb);
3316         if (error) {
3317                 log_debug(ls, "receive_bast from %d no lkb %x",
3318                           ms->m_header.h_nodeid, ms->m_remid);
3319                 return;
3320         }
3321
3322         r = lkb->lkb_resource;
3323
3324         hold_rsb(r);
3325         lock_rsb(r);
3326
3327         error = validate_message(lkb, ms);
3328         if (error)
3329                 goto out;
3330
3331         queue_bast(r, lkb, ms->m_bastmode);
3332  out:
3333         unlock_rsb(r);
3334         put_rsb(r);
3335         dlm_put_lkb(lkb);
3336 }
3337
3338 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3339 {
3340         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3341
3342         from_nodeid = ms->m_header.h_nodeid;
3343         our_nodeid = dlm_our_nodeid();
3344
3345         len = receive_extralen(ms);
3346
3347         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3348         if (dir_nodeid != our_nodeid) {
3349                 log_error(ls, "lookup dir_nodeid %d from %d",
3350                           dir_nodeid, from_nodeid);
3351                 error = -EINVAL;
3352                 ret_nodeid = -1;
3353                 goto out;
3354         }
3355
3356         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3357
3358         /* Optimization: we're master so treat lookup as a request */
3359         if (!error && ret_nodeid == our_nodeid) {
3360                 receive_request(ls, ms);
3361                 return;
3362         }
3363  out:
3364         send_lookup_reply(ls, ms, ret_nodeid, error);
3365 }
3366
3367 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3368 {
3369         int len, dir_nodeid, from_nodeid;
3370
3371         from_nodeid = ms->m_header.h_nodeid;
3372
3373         len = receive_extralen(ms);
3374
3375         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3376         if (dir_nodeid != dlm_our_nodeid()) {
3377                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3378                           dir_nodeid, from_nodeid);
3379                 return;
3380         }
3381
3382         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3383 }
3384
3385 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3386 {
3387         do_purge(ls, ms->m_nodeid, ms->m_pid);
3388 }
3389
3390 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3391 {
3392         struct dlm_lkb *lkb;
3393         struct dlm_rsb *r;
3394         int error, mstype, result;
3395
3396         error = find_lkb(ls, ms->m_remid, &lkb);
3397         if (error) {
3398                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3399                           ms->m_header.h_nodeid, ms->m_remid);
3400                 return;
3401         }
3402
3403         r = lkb->lkb_resource;
3404         hold_rsb(r);
3405         lock_rsb(r);
3406
3407         error = validate_message(lkb, ms);
3408         if (error)
3409                 goto out;
3410
3411         mstype = lkb->lkb_wait_type;
3412         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3413         if (error)
3414                 goto out;
3415
3416         /* Optimization: the dir node was also the master, so it took our
3417            lookup as a request and sent request reply instead of lookup reply */
3418         if (mstype == DLM_MSG_LOOKUP) {
3419                 r->res_nodeid = ms->m_header.h_nodeid;
3420                 lkb->lkb_nodeid = r->res_nodeid;
3421         }
3422
3423         /* this is the value returned from do_request() on the master */
3424         result = ms->m_result;
3425
3426         switch (result) {
3427         case -EAGAIN:
3428                 /* request would block (be queued) on remote master */
3429                 queue_cast(r, lkb, -EAGAIN);
3430                 confirm_master(r, -EAGAIN);
3431                 unhold_lkb(lkb); /* undoes create_lkb() */
3432                 break;
3433
3434         case -EINPROGRESS:
3435         case 0:
3436                 /* request was queued or granted on remote master */
3437                 receive_flags_reply(lkb, ms);
3438                 lkb->lkb_remid = ms->m_lkid;
3439                 if (is_altmode(lkb))
3440                         munge_altmode(lkb, ms);
3441                 if (result) {
3442                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3443                         add_timeout(lkb);
3444                 } else {
3445                         grant_lock_pc(r, lkb, ms);
3446                         queue_cast(r, lkb, 0);
3447                 }
3448                 confirm_master(r, result);
3449                 break;
3450
3451         case -EBADR:
3452         case -ENOTBLK:
3453                 /* find_rsb failed to find rsb or rsb wasn't master */
3454                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3455                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3456                 r->res_nodeid = -1;
3457                 lkb->lkb_nodeid = -1;
3458
3459                 if (is_overlap(lkb)) {
3460                         /* we'll ignore error in cancel/unlock reply */
3461                         queue_cast_overlap(r, lkb);
3462                         confirm_master(r, result);
3463                         unhold_lkb(lkb); /* undoes create_lkb() */
3464                 } else
3465                         _request_lock(r, lkb);
3466                 break;
3467
3468         default:
3469                 log_error(ls, "receive_request_reply %x error %d",
3470                           lkb->lkb_id, result);
3471         }
3472
3473         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3474                 log_debug(ls, "receive_request_reply %x result %d unlock",
3475                           lkb->lkb_id, result);
3476                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3477                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3478                 send_unlock(r, lkb);
3479         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3480                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3481                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3482                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3483                 send_cancel(r, lkb);
3484         } else {
3485                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3486                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3487         }
3488  out:
3489         unlock_rsb(r);
3490         put_rsb(r);
3491         dlm_put_lkb(lkb);
3492 }
3493
3494 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3495                                     struct dlm_message *ms)
3496 {
3497         /* this is the value returned from do_convert() on the master */
3498         switch (ms->m_result) {
3499         case -EAGAIN:
3500                 /* convert would block (be queued) on remote master */
3501                 queue_cast(r, lkb, -EAGAIN);
3502                 break;
3503
3504         case -EDEADLK:
3505                 receive_flags_reply(lkb, ms);
3506                 revert_lock_pc(r, lkb);
3507                 queue_cast(r, lkb, -EDEADLK);
3508                 break;
3509
3510         case -EINPROGRESS:
3511                 /* convert was queued on remote master */
3512                 receive_flags_reply(lkb, ms);
3513                 if (is_demoted(lkb))
3514                         munge_demoted(lkb, ms);
3515                 del_lkb(r, lkb);
3516                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3517                 add_timeout(lkb);
3518                 break;
3519
3520         case 0:
3521                 /* convert was granted on remote master */
3522                 receive_flags_reply(lkb, ms);
3523                 if (is_demoted(lkb))
3524                         munge_demoted(lkb, ms);
3525                 grant_lock_pc(r, lkb, ms);
3526                 queue_cast(r, lkb, 0);
3527                 break;
3528
3529         default:
3530                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3531                           lkb->lkb_id, ms->m_result);
3532         }
3533 }
3534
3535 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3536 {
3537         struct dlm_rsb *r = lkb->lkb_resource;
3538         int error;
3539
3540         hold_rsb(r);
3541         lock_rsb(r);
3542
3543         error = validate_message(lkb, ms);
3544         if (error)
3545                 goto out;
3546
3547         /* stub reply can happen with waiters_mutex held */
3548         error = remove_from_waiters_ms(lkb, ms);
3549         if (error)
3550                 goto out;
3551
3552         __receive_convert_reply(r, lkb, ms);
3553  out:
3554         unlock_rsb(r);
3555         put_rsb(r);
3556 }
3557
3558 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3559 {
3560         struct dlm_lkb *lkb;
3561         int error;
3562
3563         error = find_lkb(ls, ms->m_remid, &lkb);
3564         if (error) {
3565                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3566                           ms->m_header.h_nodeid, ms->m_remid);
3567                 return;
3568         }
3569
3570         _receive_convert_reply(lkb, ms);
3571         dlm_put_lkb(lkb);
3572 }
3573
3574 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3575 {
3576         struct dlm_rsb *r = lkb->lkb_resource;
3577         int error;
3578
3579         hold_rsb(r);
3580         lock_rsb(r);
3581
3582         error = validate_message(lkb, ms);
3583         if (error)
3584                 goto out;
3585
3586         /* stub reply can happen with waiters_mutex held */
3587         error = remove_from_waiters_ms(lkb, ms);
3588         if (error)
3589                 goto out;
3590
3591         /* this is the value returned from do_unlock() on the master */
3592
3593         switch (ms->m_result) {
3594         case -DLM_EUNLOCK:
3595                 receive_flags_reply(lkb, ms);
3596                 remove_lock_pc(r, lkb);
3597                 queue_cast(r, lkb, -DLM_EUNLOCK);
3598                 break;
3599         case -ENOENT:
3600                 break;
3601         default:
3602                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3603                           lkb->lkb_id, ms->m_result);
3604         }
3605  out:
3606         unlock_rsb(r);
3607         put_rsb(r);
3608 }
3609
3610 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3611 {
3612         struct dlm_lkb *lkb;
3613         int error;
3614
3615         error = find_lkb(ls, ms->m_remid, &lkb);
3616         if (error) {
3617                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3618                           ms->m_header.h_nodeid, ms->m_remid);
3619                 return;
3620         }
3621
3622         _receive_unlock_reply(lkb, ms);
3623         dlm_put_lkb(lkb);
3624 }
3625
3626 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3627 {
3628         struct dlm_rsb *r = lkb->lkb_resource;
3629         int error;
3630
3631         hold_rsb(r);
3632         lock_rsb(r);
3633
3634         error = validate_message(lkb, ms);
3635         if (error)
3636                 goto out;
3637
3638         /* stub reply can happen with waiters_mutex held */
3639         error = remove_from_waiters_ms(lkb, ms);
3640         if (error)
3641                 goto out;
3642
3643         /* this is the value returned from do_cancel() on the master */
3644
3645         switch (ms->m_result) {
3646         case -DLM_ECANCEL:
3647                 receive_flags_reply(lkb, ms);
3648                 revert_lock_pc(r, lkb);
3649                 queue_cast(r, lkb, -DLM_ECANCEL);
3650                 break;
3651         case 0:
3652                 break;
3653         default:
3654                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3655                           lkb->lkb_id, ms->m_result);
3656         }
3657  out:
3658         unlock_rsb(r);
3659         put_rsb(r);
3660 }
3661
3662 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3663 {
3664         struct dlm_lkb *lkb;
3665         int error;
3666
3667         error = find_lkb(ls, ms->m_remid, &lkb);
3668         if (error) {
3669                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3670                           ms->m_header.h_nodeid, ms->m_remid);
3671                 return;
3672         }
3673
3674         _receive_cancel_reply(lkb, ms);
3675         dlm_put_lkb(lkb);
3676 }
3677
3678 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3679 {
3680         struct dlm_lkb *lkb;
3681         struct dlm_rsb *r;
3682         int error, ret_nodeid;
3683
3684         error = find_lkb(ls, ms->m_lkid, &lkb);
3685         if (error) {
3686                 log_error(ls, "receive_lookup_reply no lkb");
3687                 return;
3688         }
3689
3690         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3691            FIXME: will a non-zero error ever be returned? */
3692
3693         r = lkb->lkb_resource;
3694         hold_rsb(r);
3695         lock_rsb(r);
3696
3697         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3698         if (error)
3699                 goto out;
3700
3701         ret_nodeid = ms->m_nodeid;
3702         if (ret_nodeid == dlm_our_nodeid()) {
3703                 r->res_nodeid = 0;
3704                 ret_nodeid = 0;
3705                 r->res_first_lkid = 0;
3706         } else {
3707                 /* set_master() will copy res_nodeid to lkb_nodeid */
3708                 r->res_nodeid = ret_nodeid;
3709         }
3710
3711         if (is_overlap(lkb)) {
3712                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3713                           lkb->lkb_id, lkb->lkb_flags);
3714                 queue_cast_overlap(r, lkb);
3715                 unhold_lkb(lkb); /* undoes create_lkb() */
3716                 goto out_list;
3717         }
3718
3719         _request_lock(r, lkb);
3720
3721  out_list:
3722         if (!ret_nodeid)
3723                 process_lookup_list(r);
3724  out:
3725         unlock_rsb(r);
3726         put_rsb(r);
3727         dlm_put_lkb(lkb);
3728 }
3729
3730 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3731 {
3732         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3733                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3734                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3735                           ms->m_remid, ms->m_result);
3736                 return;
3737         }
3738
3739         switch (ms->m_type) {
3740
3741         /* messages sent to a master node */
3742
3743         case DLM_MSG_REQUEST:
3744                 receive_request(ls, ms);
3745                 break;
3746
3747         case DLM_MSG_CONVERT:
3748                 receive_convert(ls, ms);
3749                 break;
3750
3751         case DLM_MSG_UNLOCK:
3752                 receive_unlock(ls, ms);
3753                 break;
3754
3755         case DLM_MSG_CANCEL:
3756                 receive_cancel(ls, ms);
3757                 break;
3758
3759         /* messages sent from a master node (replies to above) */
3760
3761         case DLM_MSG_REQUEST_REPLY:
3762                 receive_request_reply(ls, ms);
3763                 break;
3764
3765         case DLM_MSG_CONVERT_REPLY:
3766                 receive_convert_reply(ls, ms);
3767                 break;
3768
3769         case DLM_MSG_UNLOCK_REPLY:
3770                 receive_unlock_reply(ls, ms);
3771                 break;
3772
3773         case DLM_MSG_CANCEL_REPLY:
3774                 receive_cancel_reply(ls, ms);
3775                 break;
3776
3777         /* messages sent from a master node (only two types of async msg) */
3778
3779         case DLM_MSG_GRANT:
3780                 receive_grant(ls, ms);
3781                 break;
3782
3783         case DLM_MSG_BAST:
3784                 receive_bast(ls, ms);
3785                 break;
3786
3787         /* messages sent to a dir node */
3788
3789         case DLM_MSG_LOOKUP:
3790                 receive_lookup(ls, ms);
3791                 break;
3792
3793         case DLM_MSG_REMOVE:
3794                 receive_remove(ls, ms);
3795                 break;
3796
3797         /* messages sent from a dir node (remove has no reply) */
3798
3799         case DLM_MSG_LOOKUP_REPLY:
3800                 receive_lookup_reply(ls, ms);
3801                 break;
3802
3803         /* other messages */
3804
3805         case DLM_MSG_PURGE:
3806                 receive_purge(ls, ms);
3807                 break;
3808
3809         default:
3810                 log_error(ls, "unknown message type %d", ms->m_type);
3811         }
3812
3813         dlm_astd_wake();
3814 }
3815
3816 /* If the lockspace is in recovery mode (locking stopped), then normal
3817    messages are saved on the requestqueue for processing after recovery is
3818    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3819    messages off the requestqueue before we process new ones. This occurs right
3820    after recovery completes when we transition from saving all messages on
3821    requestqueue, to processing all the saved messages, to processing new
3822    messages as they arrive. */
3823
3824 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3825                                 int nodeid)
3826 {
3827         if (dlm_locking_stopped(ls)) {
3828                 dlm_add_requestqueue(ls, nodeid, ms);
3829         } else {
3830                 dlm_wait_requestqueue(ls);
3831                 _receive_message(ls, ms);
3832         }
3833 }
3834
3835 /* This is called by dlm_recoverd to process messages that were saved on
3836    the requestqueue. */
3837
3838 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3839 {
3840         _receive_message(ls, ms);
3841 }
3842
3843 /* This is called by the midcomms layer when something is received for
3844    the lockspace.  It could be either a MSG (normal message sent as part of
3845    standard locking activity) or an RCOM (recovery message sent as part of
3846    lockspace recovery). */
3847
3848 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3849 {
3850         struct dlm_header *hd = &p->header;
3851         struct dlm_ls *ls;
3852         int type = 0;
3853
3854         switch (hd->h_cmd) {
3855         case DLM_MSG:
3856                 dlm_message_in(&p->message);
3857                 type = p->message.m_type;
3858                 break;
3859         case DLM_RCOM:
3860                 dlm_rcom_in(&p->rcom);
3861                 type = p->rcom.rc_type;
3862                 break;
3863         default:
3864                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3865                 return;
3866         }
3867
3868         if (hd->h_nodeid != nodeid) {
3869                 log_print("invalid h_nodeid %d from %d lockspace %x",
3870                           hd->h_nodeid, nodeid, hd->h_lockspace);
3871                 return;
3872         }
3873
3874         ls = dlm_find_lockspace_global(hd->h_lockspace);
3875         if (!ls) {
3876                 if (dlm_config.ci_log_debug)
3877                         log_print("invalid lockspace %x from %d cmd %d type %d",
3878                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
3879
3880                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3881                         dlm_send_ls_not_ready(nodeid, &p->rcom);
3882                 return;
3883         }
3884
3885         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3886            be inactive (in this ls) before transitioning to recovery mode */
3887
3888         down_read(&ls->ls_recv_active);
3889         if (hd->h_cmd == DLM_MSG)
3890                 dlm_receive_message(ls, &p->message, nodeid);
3891         else
3892                 dlm_receive_rcom(ls, &p->rcom, nodeid);
3893         up_read(&ls->ls_recv_active);
3894
3895         dlm_put_lockspace(ls);
3896 }
3897
3898 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3899 {
3900         if (middle_conversion(lkb)) {
3901                 hold_lkb(lkb);
3902                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3903                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3904                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3905                 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3906                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3907
3908                 /* Same special case as in receive_rcom_lock_args() */
3909                 lkb->lkb_grmode = DLM_LOCK_IV;
3910                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3911                 unhold_lkb(lkb);
3912
3913         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3914                 lkb->lkb_flags |= DLM_IFL_RESEND;
3915         }
3916
3917         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3918            conversions are async; there's no reply from the remote master */
3919 }
3920
3921 /* A waiting lkb needs recovery if the master node has failed, or
3922    the master node is changing (only when no directory is used) */
3923
3924 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3925 {
3926         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3927                 return 1;
3928
3929         if (!dlm_no_directory(ls))
3930                 return 0;
3931
3932         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3933                 return 1;
3934
3935         return 0;
3936 }
3937
3938 /* Recovery for locks that are waiting for replies from nodes that are now
3939    gone.  We can just complete unlocks and cancels by faking a reply from the
3940    dead node.  Requests and up-conversions we flag to be resent after
3941    recovery.  Down-conversions can just be completed with a fake reply like
3942    unlocks.  Conversions between PR and CW need special attention. */
3943
3944 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3945 {
3946         struct dlm_lkb *lkb, *safe;
3947         int wait_type, stub_unlock_result, stub_cancel_result;
3948
3949         mutex_lock(&ls->ls_waiters_mutex);
3950
3951         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3952                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3953                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3954
3955                 /* all outstanding lookups, regardless of destination  will be
3956                    resent after recovery is done */
3957
3958                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3959                         lkb->lkb_flags |= DLM_IFL_RESEND;
3960                         continue;
3961                 }
3962
3963                 if (!waiter_needs_recovery(ls, lkb))
3964                         continue;
3965
3966                 wait_type = lkb->lkb_wait_type;
3967                 stub_unlock_result = -DLM_EUNLOCK;
3968                 stub_cancel_result = -DLM_ECANCEL;
3969
3970                 /* Main reply may have been received leaving a zero wait_type,
3971                    but a reply for the overlapping op may not have been
3972                    received.  In that case we need to fake the appropriate
3973                    reply for the overlap op. */
3974
3975                 if (!wait_type) {
3976                         if (is_overlap_cancel(lkb)) {
3977                                 wait_type = DLM_MSG_CANCEL;
3978                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
3979                                         stub_cancel_result = 0;
3980                         }
3981                         if (is_overlap_unlock(lkb)) {
3982                                 wait_type = DLM_MSG_UNLOCK;
3983                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
3984                                         stub_unlock_result = -ENOENT;
3985                         }
3986
3987                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
3988                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
3989                                   stub_cancel_result, stub_unlock_result);
3990                 }
3991
3992                 switch (wait_type) {
3993
3994                 case DLM_MSG_REQUEST:
3995                         lkb->lkb_flags |= DLM_IFL_RESEND;
3996                         break;
3997
3998                 case DLM_MSG_CONVERT:
3999                         recover_convert_waiter(ls, lkb);
4000                         break;
4001
4002                 case DLM_MSG_UNLOCK:
4003                         hold_lkb(lkb);
4004                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4005                         ls->ls_stub_ms.m_result = stub_unlock_result;
4006                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4007                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4008                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4009                         dlm_put_lkb(lkb);
4010                         break;
4011
4012                 case DLM_MSG_CANCEL:
4013                         hold_lkb(lkb);
4014                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4015                         ls->ls_stub_ms.m_result = stub_cancel_result;
4016                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4017                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4018                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4019                         dlm_put_lkb(lkb);
4020                         break;
4021
4022                 default:
4023                         log_error(ls, "invalid lkb wait_type %d %d",
4024                                   lkb->lkb_wait_type, wait_type);
4025                 }
4026                 schedule();
4027         }
4028         mutex_unlock(&ls->ls_waiters_mutex);
4029 }
4030
4031 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4032 {
4033         struct dlm_lkb *lkb;
4034         int found = 0;
4035
4036         mutex_lock(&ls->ls_waiters_mutex);
4037         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4038                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4039                         hold_lkb(lkb);
4040                         found = 1;
4041                         break;
4042                 }
4043         }
4044         mutex_unlock(&ls->ls_waiters_mutex);
4045
4046         if (!found)
4047                 lkb = NULL;
4048         return lkb;
4049 }
4050
4051 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4052    master or dir-node for r.  Processing the lkb may result in it being placed
4053    back on waiters. */
4054
4055 /* We do this after normal locking has been enabled and any saved messages
4056    (in requestqueue) have been processed.  We should be confident that at
4057    this point we won't get or process a reply to any of these waiting
4058    operations.  But, new ops may be coming in on the rsbs/locks here from
4059    userspace or remotely. */
4060
4061 /* there may have been an overlap unlock/cancel prior to recovery or after
4062    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4063    overlap flag would just have been set and nothing new sent.  we can be
4064    confident here than any replies to either the initial op or overlap ops
4065    prior to recovery have been received. */
4066
4067 int dlm_recover_waiters_post(struct dlm_ls *ls)
4068 {
4069         struct dlm_lkb *lkb;
4070         struct dlm_rsb *r;
4071         int error = 0, mstype, err, oc, ou;
4072
4073         while (1) {
4074                 if (dlm_locking_stopped(ls)) {
4075                         log_debug(ls, "recover_waiters_post aborted");
4076                         error = -EINTR;
4077                         break;
4078                 }
4079
4080                 lkb = find_resend_waiter(ls);
4081                 if (!lkb)
4082                         break;
4083
4084                 r = lkb->lkb_resource;
4085                 hold_rsb(r);
4086                 lock_rsb(r);
4087
4088                 mstype = lkb->lkb_wait_type;
4089                 oc = is_overlap_cancel(lkb);
4090                 ou = is_overlap_unlock(lkb);
4091                 err = 0;
4092
4093                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4094                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4095
4096                 /* At this point we assume that we won't get a reply to any
4097                    previous op or overlap op on this lock.  First, do a big
4098                    remove_from_waiters() for all previous ops. */
4099
4100                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4101                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4102                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4103                 lkb->lkb_wait_type = 0;
4104                 lkb->lkb_wait_count = 0;
4105                 mutex_lock(&ls->ls_waiters_mutex);
4106                 list_del_init(&lkb->lkb_wait_reply);
4107                 mutex_unlock(&ls->ls_waiters_mutex);
4108                 unhold_lkb(lkb); /* for waiters list */
4109
4110                 if (oc || ou) {
4111                         /* do an unlock or cancel instead of resending */
4112                         switch (mstype) {
4113                         case DLM_MSG_LOOKUP:
4114                         case DLM_MSG_REQUEST:
4115                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4116                                                         -DLM_ECANCEL);
4117                                 unhold_lkb(lkb); /* undoes create_lkb() */
4118                                 break;
4119                         case DLM_MSG_CONVERT:
4120                                 if (oc) {
4121                                         queue_cast(r, lkb, -DLM_ECANCEL);
4122                                 } else {
4123                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4124                                         _unlock_lock(r, lkb);
4125                                 }
4126                                 break;
4127                         default:
4128                                 err = 1;
4129                         }
4130                 } else {
4131                         switch (mstype) {
4132                         case DLM_MSG_LOOKUP:
4133                         case DLM_MSG_REQUEST:
4134                                 _request_lock(r, lkb);
4135                                 if (is_master(r))
4136                                         confirm_master(r, 0);
4137                                 break;
4138                         case DLM_MSG_CONVERT:
4139                                 _convert_lock(r, lkb);
4140                                 break;
4141                         default:
4142                                 err = 1;
4143                         }
4144                 }
4145
4146                 if (err)
4147                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4148                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4149                 unlock_rsb(r);
4150                 put_rsb(r);
4151                 dlm_put_lkb(lkb);
4152         }
4153
4154         return error;
4155 }
4156
4157 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4158                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4159 {
4160         struct dlm_ls *ls = r->res_ls;
4161         struct dlm_lkb *lkb, *safe;
4162
4163         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4164                 if (test(ls, lkb)) {
4165                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4166                         del_lkb(r, lkb);
4167                         /* this put should free the lkb */
4168                         if (!dlm_put_lkb(lkb))
4169                                 log_error(ls, "purged lkb not released");
4170                 }
4171         }
4172 }
4173
4174 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4175 {
4176         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4177 }
4178
4179 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4180 {
4181         return is_master_copy(lkb);
4182 }
4183
4184 static void purge_dead_locks(struct dlm_rsb *r)
4185 {
4186         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4187         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4188         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4189 }
4190
4191 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4192 {
4193         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4194         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4195         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4196 }
4197
4198 /* Get rid of locks held by nodes that are gone. */
4199
4200 int dlm_purge_locks(struct dlm_ls *ls)
4201 {
4202         struct dlm_rsb *r;
4203
4204         log_debug(ls, "dlm_purge_locks");
4205
4206         down_write(&ls->ls_root_sem);
4207         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4208                 hold_rsb(r);
4209                 lock_rsb(r);
4210                 if (is_master(r))
4211                         purge_dead_locks(r);
4212                 unlock_rsb(r);
4213                 unhold_rsb(r);
4214
4215                 schedule();
4216         }
4217         up_write(&ls->ls_root_sem);
4218
4219         return 0;
4220 }
4221
4222 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4223 {
4224         struct dlm_rsb *r, *r_ret = NULL;
4225
4226         read_lock(&ls->ls_rsbtbl[bucket].lock);
4227         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4228                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4229                         continue;
4230                 hold_rsb(r);
4231                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4232                 r_ret = r;
4233                 break;
4234         }
4235         read_unlock(&ls->ls_rsbtbl[bucket].lock);
4236         return r_ret;
4237 }
4238
4239 void dlm_grant_after_purge(struct dlm_ls *ls)
4240 {
4241         struct dlm_rsb *r;
4242         int bucket = 0;
4243
4244         while (1) {
4245                 r = find_purged_rsb(ls, bucket);
4246                 if (!r) {
4247                         if (bucket == ls->ls_rsbtbl_size - 1)
4248                                 break;
4249                         bucket++;
4250                         continue;
4251                 }
4252                 lock_rsb(r);
4253                 if (is_master(r)) {
4254                         grant_pending_locks(r);
4255                         confirm_master(r, 0);
4256                 }
4257                 unlock_rsb(r);
4258                 put_rsb(r);
4259                 schedule();
4260         }
4261 }
4262
4263 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4264                                          uint32_t remid)
4265 {
4266         struct dlm_lkb *lkb;
4267
4268         list_for_each_entry(lkb, head, lkb_statequeue) {
4269                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4270                         return lkb;
4271         }
4272         return NULL;
4273 }
4274
4275 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4276                                     uint32_t remid)
4277 {
4278         struct dlm_lkb *lkb;
4279
4280         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4281         if (lkb)
4282                 return lkb;
4283         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4284         if (lkb)
4285                 return lkb;
4286         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4287         if (lkb)
4288                 return lkb;
4289         return NULL;
4290 }
4291
4292 /* needs at least dlm_rcom + rcom_lock */
4293 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4294                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4295 {
4296         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4297
4298         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4299         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4300         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4301         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4302         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4303         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4304         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4305         lkb->lkb_rqmode = rl->rl_rqmode;
4306         lkb->lkb_grmode = rl->rl_grmode;
4307         /* don't set lkb_status because add_lkb wants to itself */
4308
4309         lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4310         lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4311
4312         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4313                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4314                          sizeof(struct rcom_lock);
4315                 if (lvblen > ls->ls_lvblen)
4316                         return -EINVAL;
4317                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4318                 if (!lkb->lkb_lvbptr)
4319                         return -ENOMEM;
4320                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4321         }
4322
4323         /* Conversions between PR and CW (middle modes) need special handling.
4324            The real granted mode of these converting locks cannot be determined
4325            until all locks have been rebuilt on the rsb (recover_conversion) */
4326
4327         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4328             middle_conversion(lkb)) {
4329                 rl->rl_status = DLM_LKSTS_CONVERT;
4330                 lkb->lkb_grmode = DLM_LOCK_IV;
4331                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4332         }
4333
4334         return 0;
4335 }
4336
4337 /* This lkb may have been recovered in a previous aborted recovery so we need
4338    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4339    If so we just send back a standard reply.  If not, we create a new lkb with
4340    the given values and send back our lkid.  We send back our lkid by sending
4341    back the rcom_lock struct we got but with the remid field filled in. */
4342
4343 /* needs at least dlm_rcom + rcom_lock */
4344 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4345 {
4346         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4347         struct dlm_rsb *r;
4348         struct dlm_lkb *lkb;
4349         int error;
4350
4351         if (rl->rl_parent_lkid) {
4352                 error = -EOPNOTSUPP;
4353                 goto out;
4354         }
4355
4356         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4357                          R_MASTER, &r);
4358         if (error)
4359                 goto out;
4360
4361         lock_rsb(r);
4362
4363         lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4364         if (lkb) {
4365                 error = -EEXIST;
4366                 goto out_remid;
4367         }
4368
4369         error = create_lkb(ls, &lkb);
4370         if (error)
4371                 goto out_unlock;
4372
4373         error = receive_rcom_lock_args(ls, lkb, r, rc);
4374         if (error) {
4375                 __put_lkb(ls, lkb);
4376                 goto out_unlock;
4377         }
4378
4379         attach_lkb(r, lkb);
4380         add_lkb(r, lkb, rl->rl_status);
4381         error = 0;
4382
4383  out_remid:
4384         /* this is the new value returned to the lock holder for
4385            saving in its process-copy lkb */
4386         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4387
4388  out_unlock:
4389         unlock_rsb(r);
4390         put_rsb(r);
4391  out:
4392         if (error)
4393                 log_debug(ls, "recover_master_copy %d %x", error,
4394                           le32_to_cpu(rl->rl_lkid));
4395         rl->rl_result = cpu_to_le32(error);
4396         return error;
4397 }
4398
4399 /* needs at least dlm_rcom + rcom_lock */
4400 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4401 {
4402         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4403         struct dlm_rsb *r;
4404         struct dlm_lkb *lkb;
4405         int error;
4406
4407         error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4408         if (error) {
4409                 log_error(ls, "recover_process_copy no lkid %x",
4410                                 le32_to_cpu(rl->rl_lkid));
4411                 return error;
4412         }
4413
4414         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4415
4416         error = le32_to_cpu(rl->rl_result);
4417
4418         r = lkb->lkb_resource;
4419         hold_rsb(r);
4420         lock_rsb(r);
4421
4422         switch (error) {
4423         case -EBADR:
4424                 /* There's a chance the new master received our lock before
4425                    dlm_recover_master_reply(), this wouldn't happen if we did
4426                    a barrier between recover_masters and recover_locks. */
4427                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4428                           (unsigned long)r, r->res_name);
4429                 dlm_send_rcom_lock(r, lkb);
4430                 goto out;
4431         case -EEXIST:
4432                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4433                 /* fall through */
4434         case 0:
4435                 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4436                 break;
4437         default:
4438                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4439                           error, lkb->lkb_id);
4440         }
4441
4442         /* an ack for dlm_recover_locks() which waits for replies from
4443            all the locks it sends to new masters */
4444         dlm_recovered_lock(r);
4445  out:
4446         unlock_rsb(r);
4447         put_rsb(r);
4448         dlm_put_lkb(lkb);
4449
4450         return 0;
4451 }
4452
4453 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4454                      int mode, uint32_t flags, void *name, unsigned int namelen,
4455                      unsigned long timeout_cs)
4456 {
4457         struct dlm_lkb *lkb;
4458         struct dlm_args args;
4459         int error;
4460
4461         dlm_lock_recovery(ls);
4462
4463         error = create_lkb(ls, &lkb);
4464         if (error) {
4465                 kfree(ua);
4466                 goto out;
4467         }
4468
4469         if (flags & DLM_LKF_VALBLK) {
4470                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4471                 if (!ua->lksb.sb_lvbptr) {
4472                         kfree(ua);
4473                         __put_lkb(ls, lkb);
4474                         error = -ENOMEM;
4475                         goto out;
4476                 }
4477         }
4478
4479         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4480            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4481            lock and that lkb_astparam is the dlm_user_args structure. */
4482
4483         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4484                               fake_astfn, ua, fake_bastfn, &args);
4485         lkb->lkb_flags |= DLM_IFL_USER;
4486         ua->old_mode = DLM_LOCK_IV;
4487
4488         if (error) {
4489                 __put_lkb(ls, lkb);
4490                 goto out;
4491         }
4492
4493         error = request_lock(ls, lkb, name, namelen, &args);
4494
4495         switch (error) {
4496         case 0:
4497                 break;
4498         case -EINPROGRESS:
4499                 error = 0;
4500                 break;
4501         case -EAGAIN:
4502                 error = 0;
4503                 /* fall through */
4504         default:
4505                 __put_lkb(ls, lkb);
4506                 goto out;
4507         }
4508
4509         /* add this new lkb to the per-process list of locks */
4510         spin_lock(&ua->proc->locks_spin);
4511         hold_lkb(lkb);
4512         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4513         spin_unlock(&ua->proc->locks_spin);
4514  out:
4515         dlm_unlock_recovery(ls);
4516         return error;
4517 }
4518
4519 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4520                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4521                      unsigned long timeout_cs)
4522 {
4523         struct dlm_lkb *lkb;
4524         struct dlm_args args;
4525         struct dlm_user_args *ua;
4526         int error;
4527
4528         dlm_lock_recovery(ls);
4529
4530         error = find_lkb(ls, lkid, &lkb);
4531         if (error)
4532                 goto out;
4533
4534         /* user can change the params on its lock when it converts it, or
4535            add an lvb that didn't exist before */
4536
4537         ua = lkb->lkb_ua;
4538
4539         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4540                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4541                 if (!ua->lksb.sb_lvbptr) {
4542                         error = -ENOMEM;
4543                         goto out_put;
4544                 }
4545         }
4546         if (lvb_in && ua->lksb.sb_lvbptr)
4547                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4548
4549         ua->xid = ua_tmp->xid;
4550         ua->castparam = ua_tmp->castparam;
4551         ua->castaddr = ua_tmp->castaddr;
4552         ua->bastparam = ua_tmp->bastparam;
4553         ua->bastaddr = ua_tmp->bastaddr;
4554         ua->user_lksb = ua_tmp->user_lksb;
4555         ua->old_mode = lkb->lkb_grmode;
4556
4557         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4558                               fake_astfn, ua, fake_bastfn, &args);
4559         if (error)
4560                 goto out_put;
4561
4562         error = convert_lock(ls, lkb, &args);
4563
4564         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4565                 error = 0;
4566  out_put:
4567         dlm_put_lkb(lkb);
4568  out:
4569         dlm_unlock_recovery(ls);
4570         kfree(ua_tmp);
4571         return error;
4572 }
4573
4574 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4575                     uint32_t flags, uint32_t lkid, char *lvb_in)
4576 {
4577         struct dlm_lkb *lkb;
4578         struct dlm_args args;
4579         struct dlm_user_args *ua;
4580         int error;
4581
4582         dlm_lock_recovery(ls);
4583
4584         error = find_lkb(ls, lkid, &lkb);
4585         if (error)
4586                 goto out;
4587
4588         ua = lkb->lkb_ua;
4589
4590         if (lvb_in && ua->lksb.sb_lvbptr)
4591                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4592         if (ua_tmp->castparam)
4593                 ua->castparam = ua_tmp->castparam;
4594         ua->user_lksb = ua_tmp->user_lksb;
4595
4596         error = set_unlock_args(flags, ua, &args);
4597         if (error)
4598                 goto out_put;
4599
4600         error = unlock_lock(ls, lkb, &args);
4601
4602         if (error == -DLM_EUNLOCK)
4603                 error = 0;
4604         /* from validate_unlock_args() */
4605         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4606                 error = 0;
4607         if (error)
4608                 goto out_put;
4609
4610         spin_lock(&ua->proc->locks_spin);
4611         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4612         if (!list_empty(&lkb->lkb_ownqueue))
4613                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4614         spin_unlock(&ua->proc->locks_spin);
4615  out_put:
4616         dlm_put_lkb(lkb);
4617  out:
4618         dlm_unlock_recovery(ls);
4619         kfree(ua_tmp);
4620         return error;
4621 }
4622
4623 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4624                     uint32_t flags, uint32_t lkid)
4625 {
4626         struct dlm_lkb *lkb;
4627         struct dlm_args args;
4628         struct dlm_user_args *ua;
4629         int error;
4630
4631         dlm_lock_recovery(ls);
4632
4633         error = find_lkb(ls, lkid, &lkb);
4634         if (error)
4635                 goto out;
4636
4637         ua = lkb->lkb_ua;
4638         if (ua_tmp->castparam)
4639                 ua->castparam = ua_tmp->castparam;
4640         ua->user_lksb = ua_tmp->user_lksb;
4641
4642         error = set_unlock_args(flags, ua, &args);
4643         if (error)
4644                 goto out_put;
4645
4646         error = cancel_lock(ls, lkb, &args);
4647
4648         if (error == -DLM_ECANCEL)
4649                 error = 0;
4650         /* from validate_unlock_args() */
4651         if (error == -EBUSY)
4652                 error = 0;
4653  out_put:
4654         dlm_put_lkb(lkb);
4655  out:
4656         dlm_unlock_recovery(ls);
4657         kfree(ua_tmp);
4658         return error;
4659 }
4660
4661 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4662 {
4663         struct dlm_lkb *lkb;
4664         struct dlm_args args;
4665         struct dlm_user_args *ua;
4666         struct dlm_rsb *r;
4667         int error;
4668
4669         dlm_lock_recovery(ls);
4670
4671         error = find_lkb(ls, lkid, &lkb);
4672         if (error)
4673                 goto out;
4674
4675         ua = lkb->lkb_ua;
4676
4677         error = set_unlock_args(flags, ua, &args);
4678         if (error)
4679                 goto out_put;
4680
4681         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4682
4683         r = lkb->lkb_resource;
4684         hold_rsb(r);
4685         lock_rsb(r);
4686
4687         error = validate_unlock_args(lkb, &args);
4688         if (error)
4689                 goto out_r;
4690         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4691
4692         error = _cancel_lock(r, lkb);
4693  out_r:
4694         unlock_rsb(r);
4695         put_rsb(r);
4696
4697         if (error == -DLM_ECANCEL)
4698                 error = 0;
4699         /* from validate_unlock_args() */
4700         if (error == -EBUSY)
4701                 error = 0;
4702  out_put:
4703         dlm_put_lkb(lkb);
4704  out:
4705         dlm_unlock_recovery(ls);
4706         return error;
4707 }
4708
4709 /* lkb's that are removed from the waiters list by revert are just left on the
4710    orphans list with the granted orphan locks, to be freed by purge */
4711
4712 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4713 {
4714         struct dlm_args args;
4715         int error;
4716
4717         hold_lkb(lkb);
4718         mutex_lock(&ls->ls_orphans_mutex);
4719         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4720         mutex_unlock(&ls->ls_orphans_mutex);
4721
4722         set_unlock_args(0, lkb->lkb_ua, &args);
4723
4724         error = cancel_lock(ls, lkb, &args);
4725         if (error == -DLM_ECANCEL)
4726                 error = 0;
4727         return error;
4728 }
4729
4730 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4731    Regardless of what rsb queue the lock is on, it's removed and freed. */
4732
4733 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4734 {
4735         struct dlm_args args;
4736         int error;
4737
4738         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4739
4740         error = unlock_lock(ls, lkb, &args);
4741         if (error == -DLM_EUNLOCK)
4742                 error = 0;
4743         return error;
4744 }
4745
4746 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4747    (which does lock_rsb) due to deadlock with receiving a message that does
4748    lock_rsb followed by dlm_user_add_ast() */
4749
4750 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4751                                      struct dlm_user_proc *proc)
4752 {
4753         struct dlm_lkb *lkb = NULL;
4754
4755         mutex_lock(&ls->ls_clear_proc_locks);
4756         if (list_empty(&proc->locks))
4757                 goto out;
4758
4759         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4760         list_del_init(&lkb->lkb_ownqueue);
4761
4762         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4763                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4764         else
4765                 lkb->lkb_flags |= DLM_IFL_DEAD;
4766  out:
4767         mutex_unlock(&ls->ls_clear_proc_locks);
4768         return lkb;
4769 }
4770
4771 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4772    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4773    which we clear here. */
4774
4775 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4776    list, and no more device_writes should add lkb's to proc->locks list; so we
4777    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4778    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4779    them ourself. */
4780
4781 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4782 {
4783         struct dlm_lkb *lkb, *safe;
4784
4785         dlm_lock_recovery(ls);
4786
4787         while (1) {
4788                 lkb = del_proc_lock(ls, proc);
4789                 if (!lkb)
4790                         break;
4791                 del_timeout(lkb);
4792                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4793                         orphan_proc_lock(ls, lkb);
4794                 else
4795                         unlock_proc_lock(ls, lkb);
4796
4797                 /* this removes the reference for the proc->locks list
4798                    added by dlm_user_request, it may result in the lkb
4799                    being freed */
4800
4801                 dlm_put_lkb(lkb);
4802         }
4803
4804         mutex_lock(&ls->ls_clear_proc_locks);
4805
4806         /* in-progress unlocks */
4807         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4808                 list_del_init(&lkb->lkb_ownqueue);
4809                 lkb->lkb_flags |= DLM_IFL_DEAD;
4810                 dlm_put_lkb(lkb);
4811         }
4812
4813         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4814                 lkb->lkb_ast_type = 0;
4815                 list_del(&lkb->lkb_astqueue);
4816                 dlm_put_lkb(lkb);
4817         }
4818
4819         mutex_unlock(&ls->ls_clear_proc_locks);
4820         dlm_unlock_recovery(ls);
4821 }
4822
4823 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4824 {
4825         struct dlm_lkb *lkb, *safe;
4826
4827         while (1) {
4828                 lkb = NULL;
4829                 spin_lock(&proc->locks_spin);
4830                 if (!list_empty(&proc->locks)) {
4831                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4832                                          lkb_ownqueue);
4833                         list_del_init(&lkb->lkb_ownqueue);
4834                 }
4835                 spin_unlock(&proc->locks_spin);
4836
4837                 if (!lkb)
4838                         break;
4839
4840                 lkb->lkb_flags |= DLM_IFL_DEAD;
4841                 unlock_proc_lock(ls, lkb);
4842                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4843         }
4844
4845         spin_lock(&proc->locks_spin);
4846         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4847                 list_del_init(&lkb->lkb_ownqueue);
4848                 lkb->lkb_flags |= DLM_IFL_DEAD;
4849                 dlm_put_lkb(lkb);
4850         }
4851         spin_unlock(&proc->locks_spin);
4852
4853         spin_lock(&proc->asts_spin);
4854         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4855                 list_del(&lkb->lkb_astqueue);
4856                 dlm_put_lkb(lkb);
4857         }
4858         spin_unlock(&proc->asts_spin);
4859 }
4860
4861 /* pid of 0 means purge all orphans */
4862
4863 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4864 {
4865         struct dlm_lkb *lkb, *safe;
4866
4867         mutex_lock(&ls->ls_orphans_mutex);
4868         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4869                 if (pid && lkb->lkb_ownpid != pid)
4870                         continue;
4871                 unlock_proc_lock(ls, lkb);
4872                 list_del_init(&lkb->lkb_ownqueue);
4873                 dlm_put_lkb(lkb);
4874         }
4875         mutex_unlock(&ls->ls_orphans_mutex);
4876 }
4877
4878 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4879 {
4880         struct dlm_message *ms;
4881         struct dlm_mhandle *mh;
4882         int error;
4883
4884         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4885                                 DLM_MSG_PURGE, &ms, &mh);
4886         if (error)
4887                 return error;
4888         ms->m_nodeid = nodeid;
4889         ms->m_pid = pid;
4890
4891         return send_message(mh, ms);
4892 }
4893
4894 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4895                    int nodeid, int pid)
4896 {
4897         int error = 0;
4898
4899         if (nodeid != dlm_our_nodeid()) {
4900                 error = send_purge(ls, nodeid, pid);
4901         } else {
4902                 dlm_lock_recovery(ls);
4903                 if (pid == current->pid)
4904                         purge_proc_locks(ls, proc);
4905                 else
4906                         do_purge(ls, nodeid, pid);
4907                 dlm_unlock_recovery(ls);
4908         }
4909         return error;
4910 }
4911