dlm: add time stamp of blocking callback
[linux-2.6] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87                                     struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132
133 #define modes_compat(gr, rq) \
134         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167
168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171                r->res_nodeid, r->res_flags, r->res_first_lkid,
172                r->res_recover_locks_count, r->res_name);
173 }
174
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177         struct dlm_lkb *lkb;
178
179         dlm_print_rsb(r);
180
181         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183         printk(KERN_ERR "rsb lookup list\n");
184         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185                 dlm_print_lkb(lkb);
186         printk(KERN_ERR "rsb grant queue:\n");
187         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb convert queue:\n");
190         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb wait queue:\n");
193         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195 }
196
197 /* Threads cannot use the lockspace while it's being recovered */
198
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201         down_read(&ls->ls_in_recovery);
202 }
203
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206         up_read(&ls->ls_in_recovery);
207 }
208
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211         return down_read_trylock(&ls->ls_in_recovery);
212 }
213
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242         return !!r->res_nodeid;
243 }
244
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261                 return 1;
262         return 0;
263 }
264
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283                                   DLM_IFL_OVERLAP_CANCEL));
284 }
285
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288         if (is_master_copy(lkb))
289                 return;
290
291         del_timeout(lkb);
292
293         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294
295         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
296            timeout caused the cancel then return -ETIMEDOUT */
297         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299                 rv = -ETIMEDOUT;
300         }
301
302         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304                 rv = -EDEADLK;
305         }
306
307         lkb->lkb_lksb->sb_status = rv;
308         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309
310         dlm_add_ast(lkb, AST_COMP, 0);
311 }
312
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315         queue_cast(r, lkb,
316                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321         lkb->lkb_time_bast = ktime_get();
322
323         if (is_master_copy(lkb))
324                 send_bast(r, lkb, rqmode);
325         else
326                 dlm_add_ast(lkb, AST_BAST, rqmode);
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
334 {
335         struct dlm_rsb *r;
336
337         r = dlm_allocate_rsb(ls, len);
338         if (!r)
339                 return NULL;
340
341         r->res_ls = ls;
342         r->res_length = len;
343         memcpy(r->res_name, name, len);
344         mutex_init(&r->res_mutex);
345
346         INIT_LIST_HEAD(&r->res_lookup);
347         INIT_LIST_HEAD(&r->res_grantqueue);
348         INIT_LIST_HEAD(&r->res_convertqueue);
349         INIT_LIST_HEAD(&r->res_waitqueue);
350         INIT_LIST_HEAD(&r->res_root_list);
351         INIT_LIST_HEAD(&r->res_recover_list);
352
353         return r;
354 }
355
356 static int search_rsb_list(struct list_head *head, char *name, int len,
357                            unsigned int flags, struct dlm_rsb **r_ret)
358 {
359         struct dlm_rsb *r;
360         int error = 0;
361
362         list_for_each_entry(r, head, res_hashchain) {
363                 if (len == r->res_length && !memcmp(name, r->res_name, len))
364                         goto found;
365         }
366         *r_ret = NULL;
367         return -EBADR;
368
369  found:
370         if (r->res_nodeid && (flags & R_MASTER))
371                 error = -ENOTBLK;
372         *r_ret = r;
373         return error;
374 }
375
376 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
377                        unsigned int flags, struct dlm_rsb **r_ret)
378 {
379         struct dlm_rsb *r;
380         int error;
381
382         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
383         if (!error) {
384                 kref_get(&r->res_ref);
385                 goto out;
386         }
387         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
388         if (error)
389                 goto out;
390
391         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
392
393         if (dlm_no_directory(ls))
394                 goto out;
395
396         if (r->res_nodeid == -1) {
397                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
398                 r->res_first_lkid = 0;
399         } else if (r->res_nodeid > 0) {
400                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
401                 r->res_first_lkid = 0;
402         } else {
403                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
404                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
405         }
406  out:
407         *r_ret = r;
408         return error;
409 }
410
411 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
412                       unsigned int flags, struct dlm_rsb **r_ret)
413 {
414         int error;
415         write_lock(&ls->ls_rsbtbl[b].lock);
416         error = _search_rsb(ls, name, len, b, flags, r_ret);
417         write_unlock(&ls->ls_rsbtbl[b].lock);
418         return error;
419 }
420
421 /*
422  * Find rsb in rsbtbl and potentially create/add one
423  *
424  * Delaying the release of rsb's has a similar benefit to applications keeping
425  * NL locks on an rsb, but without the guarantee that the cached master value
426  * will still be valid when the rsb is reused.  Apps aren't always smart enough
427  * to keep NL locks on an rsb that they may lock again shortly; this can lead
428  * to excessive master lookups and removals if we don't delay the release.
429  *
430  * Searching for an rsb means looking through both the normal list and toss
431  * list.  When found on the toss list the rsb is moved to the normal list with
432  * ref count of 1; when found on normal list the ref count is incremented.
433  */
434
435 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
436                     unsigned int flags, struct dlm_rsb **r_ret)
437 {
438         struct dlm_rsb *r, *tmp;
439         uint32_t hash, bucket;
440         int error = -EINVAL;
441
442         if (namelen > DLM_RESNAME_MAXLEN)
443                 goto out;
444
445         if (dlm_no_directory(ls))
446                 flags |= R_CREATE;
447
448         error = 0;
449         hash = jhash(name, namelen, 0);
450         bucket = hash & (ls->ls_rsbtbl_size - 1);
451
452         error = search_rsb(ls, name, namelen, bucket, flags, &r);
453         if (!error)
454                 goto out;
455
456         if (error == -EBADR && !(flags & R_CREATE))
457                 goto out;
458
459         /* the rsb was found but wasn't a master copy */
460         if (error == -ENOTBLK)
461                 goto out;
462
463         error = -ENOMEM;
464         r = create_rsb(ls, name, namelen);
465         if (!r)
466                 goto out;
467
468         r->res_hash = hash;
469         r->res_bucket = bucket;
470         r->res_nodeid = -1;
471         kref_init(&r->res_ref);
472
473         /* With no directory, the master can be set immediately */
474         if (dlm_no_directory(ls)) {
475                 int nodeid = dlm_dir_nodeid(r);
476                 if (nodeid == dlm_our_nodeid())
477                         nodeid = 0;
478                 r->res_nodeid = nodeid;
479         }
480
481         write_lock(&ls->ls_rsbtbl[bucket].lock);
482         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
483         if (!error) {
484                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
485                 dlm_free_rsb(r);
486                 r = tmp;
487                 goto out;
488         }
489         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
490         write_unlock(&ls->ls_rsbtbl[bucket].lock);
491         error = 0;
492  out:
493         *r_ret = r;
494         return error;
495 }
496
497 /* This is only called to add a reference when the code already holds
498    a valid reference to the rsb, so there's no need for locking. */
499
500 static inline void hold_rsb(struct dlm_rsb *r)
501 {
502         kref_get(&r->res_ref);
503 }
504
505 void dlm_hold_rsb(struct dlm_rsb *r)
506 {
507         hold_rsb(r);
508 }
509
510 static void toss_rsb(struct kref *kref)
511 {
512         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
513         struct dlm_ls *ls = r->res_ls;
514
515         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
516         kref_init(&r->res_ref);
517         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
518         r->res_toss_time = jiffies;
519         if (r->res_lvbptr) {
520                 dlm_free_lvb(r->res_lvbptr);
521                 r->res_lvbptr = NULL;
522         }
523 }
524
525 /* When all references to the rsb are gone it's transfered to
526    the tossed list for later disposal. */
527
528 static void put_rsb(struct dlm_rsb *r)
529 {
530         struct dlm_ls *ls = r->res_ls;
531         uint32_t bucket = r->res_bucket;
532
533         write_lock(&ls->ls_rsbtbl[bucket].lock);
534         kref_put(&r->res_ref, toss_rsb);
535         write_unlock(&ls->ls_rsbtbl[bucket].lock);
536 }
537
538 void dlm_put_rsb(struct dlm_rsb *r)
539 {
540         put_rsb(r);
541 }
542
543 /* See comment for unhold_lkb */
544
545 static void unhold_rsb(struct dlm_rsb *r)
546 {
547         int rv;
548         rv = kref_put(&r->res_ref, toss_rsb);
549         DLM_ASSERT(!rv, dlm_dump_rsb(r););
550 }
551
552 static void kill_rsb(struct kref *kref)
553 {
554         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
555
556         /* All work is done after the return from kref_put() so we
557            can release the write_lock before the remove and free. */
558
559         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
560         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
561         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
562         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
563         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
564         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
565 }
566
567 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
568    The rsb must exist as long as any lkb's for it do. */
569
570 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
571 {
572         hold_rsb(r);
573         lkb->lkb_resource = r;
574 }
575
576 static void detach_lkb(struct dlm_lkb *lkb)
577 {
578         if (lkb->lkb_resource) {
579                 put_rsb(lkb->lkb_resource);
580                 lkb->lkb_resource = NULL;
581         }
582 }
583
584 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
585 {
586         struct dlm_lkb *lkb, *tmp;
587         uint32_t lkid = 0;
588         uint16_t bucket;
589
590         lkb = dlm_allocate_lkb(ls);
591         if (!lkb)
592                 return -ENOMEM;
593
594         lkb->lkb_nodeid = -1;
595         lkb->lkb_grmode = DLM_LOCK_IV;
596         kref_init(&lkb->lkb_ref);
597         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
598         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
599         INIT_LIST_HEAD(&lkb->lkb_time_list);
600
601         get_random_bytes(&bucket, sizeof(bucket));
602         bucket &= (ls->ls_lkbtbl_size - 1);
603
604         write_lock(&ls->ls_lkbtbl[bucket].lock);
605
606         /* counter can roll over so we must verify lkid is not in use */
607
608         while (lkid == 0) {
609                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
610
611                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
612                                     lkb_idtbl_list) {
613                         if (tmp->lkb_id != lkid)
614                                 continue;
615                         lkid = 0;
616                         break;
617                 }
618         }
619
620         lkb->lkb_id = lkid;
621         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
622         write_unlock(&ls->ls_lkbtbl[bucket].lock);
623
624         *lkb_ret = lkb;
625         return 0;
626 }
627
628 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
629 {
630         struct dlm_lkb *lkb;
631         uint16_t bucket = (lkid >> 16);
632
633         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
634                 if (lkb->lkb_id == lkid)
635                         return lkb;
636         }
637         return NULL;
638 }
639
640 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
641 {
642         struct dlm_lkb *lkb;
643         uint16_t bucket = (lkid >> 16);
644
645         if (bucket >= ls->ls_lkbtbl_size)
646                 return -EBADSLT;
647
648         read_lock(&ls->ls_lkbtbl[bucket].lock);
649         lkb = __find_lkb(ls, lkid);
650         if (lkb)
651                 kref_get(&lkb->lkb_ref);
652         read_unlock(&ls->ls_lkbtbl[bucket].lock);
653
654         *lkb_ret = lkb;
655         return lkb ? 0 : -ENOENT;
656 }
657
658 static void kill_lkb(struct kref *kref)
659 {
660         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
661
662         /* All work is done after the return from kref_put() so we
663            can release the write_lock before the detach_lkb */
664
665         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
666 }
667
668 /* __put_lkb() is used when an lkb may not have an rsb attached to
669    it so we need to provide the lockspace explicitly */
670
671 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
672 {
673         uint16_t bucket = (lkb->lkb_id >> 16);
674
675         write_lock(&ls->ls_lkbtbl[bucket].lock);
676         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
677                 list_del(&lkb->lkb_idtbl_list);
678                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
679
680                 detach_lkb(lkb);
681
682                 /* for local/process lkbs, lvbptr points to caller's lksb */
683                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
684                         dlm_free_lvb(lkb->lkb_lvbptr);
685                 dlm_free_lkb(lkb);
686                 return 1;
687         } else {
688                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
689                 return 0;
690         }
691 }
692
693 int dlm_put_lkb(struct dlm_lkb *lkb)
694 {
695         struct dlm_ls *ls;
696
697         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
698         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
699
700         ls = lkb->lkb_resource->res_ls;
701         return __put_lkb(ls, lkb);
702 }
703
704 /* This is only called to add a reference when the code already holds
705    a valid reference to the lkb, so there's no need for locking. */
706
707 static inline void hold_lkb(struct dlm_lkb *lkb)
708 {
709         kref_get(&lkb->lkb_ref);
710 }
711
712 /* This is called when we need to remove a reference and are certain
713    it's not the last ref.  e.g. del_lkb is always called between a
714    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
715    put_lkb would work fine, but would involve unnecessary locking */
716
717 static inline void unhold_lkb(struct dlm_lkb *lkb)
718 {
719         int rv;
720         rv = kref_put(&lkb->lkb_ref, kill_lkb);
721         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
722 }
723
724 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
725                             int mode)
726 {
727         struct dlm_lkb *lkb = NULL;
728
729         list_for_each_entry(lkb, head, lkb_statequeue)
730                 if (lkb->lkb_rqmode < mode)
731                         break;
732
733         if (!lkb)
734                 list_add_tail(new, head);
735         else
736                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
737 }
738
739 /* add/remove lkb to rsb's grant/convert/wait queue */
740
741 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
742 {
743         kref_get(&lkb->lkb_ref);
744
745         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
746
747         lkb->lkb_timestamp = ktime_get();
748
749         lkb->lkb_status = status;
750
751         switch (status) {
752         case DLM_LKSTS_WAITING:
753                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
754                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
755                 else
756                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
757                 break;
758         case DLM_LKSTS_GRANTED:
759                 /* convention says granted locks kept in order of grmode */
760                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
761                                 lkb->lkb_grmode);
762                 break;
763         case DLM_LKSTS_CONVERT:
764                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
765                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
766                 else
767                         list_add_tail(&lkb->lkb_statequeue,
768                                       &r->res_convertqueue);
769                 break;
770         default:
771                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
772         }
773 }
774
775 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
776 {
777         lkb->lkb_status = 0;
778         list_del(&lkb->lkb_statequeue);
779         unhold_lkb(lkb);
780 }
781
782 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
783 {
784         hold_lkb(lkb);
785         del_lkb(r, lkb);
786         add_lkb(r, lkb, sts);
787         unhold_lkb(lkb);
788 }
789
790 static int msg_reply_type(int mstype)
791 {
792         switch (mstype) {
793         case DLM_MSG_REQUEST:
794                 return DLM_MSG_REQUEST_REPLY;
795         case DLM_MSG_CONVERT:
796                 return DLM_MSG_CONVERT_REPLY;
797         case DLM_MSG_UNLOCK:
798                 return DLM_MSG_UNLOCK_REPLY;
799         case DLM_MSG_CANCEL:
800                 return DLM_MSG_CANCEL_REPLY;
801         case DLM_MSG_LOOKUP:
802                 return DLM_MSG_LOOKUP_REPLY;
803         }
804         return -1;
805 }
806
807 /* add/remove lkb from global waiters list of lkb's waiting for
808    a reply from a remote node */
809
810 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
811 {
812         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
813         int error = 0;
814
815         mutex_lock(&ls->ls_waiters_mutex);
816
817         if (is_overlap_unlock(lkb) ||
818             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
819                 error = -EINVAL;
820                 goto out;
821         }
822
823         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
824                 switch (mstype) {
825                 case DLM_MSG_UNLOCK:
826                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
827                         break;
828                 case DLM_MSG_CANCEL:
829                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
830                         break;
831                 default:
832                         error = -EBUSY;
833                         goto out;
834                 }
835                 lkb->lkb_wait_count++;
836                 hold_lkb(lkb);
837
838                 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
839                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
840                           lkb->lkb_wait_count, lkb->lkb_flags);
841                 goto out;
842         }
843
844         DLM_ASSERT(!lkb->lkb_wait_count,
845                    dlm_print_lkb(lkb);
846                    printk("wait_count %d\n", lkb->lkb_wait_count););
847
848         lkb->lkb_wait_count++;
849         lkb->lkb_wait_type = mstype;
850         hold_lkb(lkb);
851         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
852  out:
853         if (error)
854                 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
855                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
856                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
857         mutex_unlock(&ls->ls_waiters_mutex);
858         return error;
859 }
860
861 /* We clear the RESEND flag because we might be taking an lkb off the waiters
862    list as part of process_requestqueue (e.g. a lookup that has an optimized
863    request reply on the requestqueue) between dlm_recover_waiters_pre() which
864    set RESEND and dlm_recover_waiters_post() */
865
866 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
867 {
868         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
869         int overlap_done = 0;
870
871         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
872                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
873                 overlap_done = 1;
874                 goto out_del;
875         }
876
877         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
878                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
879                 overlap_done = 1;
880                 goto out_del;
881         }
882
883         /* N.B. type of reply may not always correspond to type of original
884            msg due to lookup->request optimization, verify others? */
885
886         if (lkb->lkb_wait_type) {
887                 lkb->lkb_wait_type = 0;
888                 goto out_del;
889         }
890
891         log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
892                   lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
893         return -1;
894
895  out_del:
896         /* the force-unlock/cancel has completed and we haven't recvd a reply
897            to the op that was in progress prior to the unlock/cancel; we
898            give up on any reply to the earlier op.  FIXME: not sure when/how
899            this would happen */
900
901         if (overlap_done && lkb->lkb_wait_type) {
902                 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
903                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
904                 lkb->lkb_wait_count--;
905                 lkb->lkb_wait_type = 0;
906         }
907
908         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
909
910         lkb->lkb_flags &= ~DLM_IFL_RESEND;
911         lkb->lkb_wait_count--;
912         if (!lkb->lkb_wait_count)
913                 list_del_init(&lkb->lkb_wait_reply);
914         unhold_lkb(lkb);
915         return 0;
916 }
917
918 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
919 {
920         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
921         int error;
922
923         mutex_lock(&ls->ls_waiters_mutex);
924         error = _remove_from_waiters(lkb, mstype);
925         mutex_unlock(&ls->ls_waiters_mutex);
926         return error;
927 }
928
929 /* Handles situations where we might be processing a "fake" or "stub" reply in
930    which we can't try to take waiters_mutex again. */
931
932 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
933 {
934         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
935         int error;
936
937         if (ms != &ls->ls_stub_ms)
938                 mutex_lock(&ls->ls_waiters_mutex);
939         error = _remove_from_waiters(lkb, ms->m_type);
940         if (ms != &ls->ls_stub_ms)
941                 mutex_unlock(&ls->ls_waiters_mutex);
942         return error;
943 }
944
945 static void dir_remove(struct dlm_rsb *r)
946 {
947         int to_nodeid;
948
949         if (dlm_no_directory(r->res_ls))
950                 return;
951
952         to_nodeid = dlm_dir_nodeid(r);
953         if (to_nodeid != dlm_our_nodeid())
954                 send_remove(r);
955         else
956                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
957                                      r->res_name, r->res_length);
958 }
959
960 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
961    found since they are in order of newest to oldest? */
962
963 static int shrink_bucket(struct dlm_ls *ls, int b)
964 {
965         struct dlm_rsb *r;
966         int count = 0, found;
967
968         for (;;) {
969                 found = 0;
970                 write_lock(&ls->ls_rsbtbl[b].lock);
971                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
972                                             res_hashchain) {
973                         if (!time_after_eq(jiffies, r->res_toss_time +
974                                            dlm_config.ci_toss_secs * HZ))
975                                 continue;
976                         found = 1;
977                         break;
978                 }
979
980                 if (!found) {
981                         write_unlock(&ls->ls_rsbtbl[b].lock);
982                         break;
983                 }
984
985                 if (kref_put(&r->res_ref, kill_rsb)) {
986                         list_del(&r->res_hashchain);
987                         write_unlock(&ls->ls_rsbtbl[b].lock);
988
989                         if (is_master(r))
990                                 dir_remove(r);
991                         dlm_free_rsb(r);
992                         count++;
993                 } else {
994                         write_unlock(&ls->ls_rsbtbl[b].lock);
995                         log_error(ls, "tossed rsb in use %s", r->res_name);
996                 }
997         }
998
999         return count;
1000 }
1001
1002 void dlm_scan_rsbs(struct dlm_ls *ls)
1003 {
1004         int i;
1005
1006         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1007                 shrink_bucket(ls, i);
1008                 if (dlm_locking_stopped(ls))
1009                         break;
1010                 cond_resched();
1011         }
1012 }
1013
1014 static void add_timeout(struct dlm_lkb *lkb)
1015 {
1016         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1017
1018         if (is_master_copy(lkb))
1019                 return;
1020
1021         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1022             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1023                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1024                 goto add_it;
1025         }
1026         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1027                 goto add_it;
1028         return;
1029
1030  add_it:
1031         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1032         mutex_lock(&ls->ls_timeout_mutex);
1033         hold_lkb(lkb);
1034         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1035         mutex_unlock(&ls->ls_timeout_mutex);
1036 }
1037
1038 static void del_timeout(struct dlm_lkb *lkb)
1039 {
1040         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1041
1042         mutex_lock(&ls->ls_timeout_mutex);
1043         if (!list_empty(&lkb->lkb_time_list)) {
1044                 list_del_init(&lkb->lkb_time_list);
1045                 unhold_lkb(lkb);
1046         }
1047         mutex_unlock(&ls->ls_timeout_mutex);
1048 }
1049
1050 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1051    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1052    and then lock rsb because of lock ordering in add_timeout.  We may need
1053    to specify some special timeout-related bits in the lkb that are just to
1054    be accessed under the timeout_mutex. */
1055
1056 void dlm_scan_timeout(struct dlm_ls *ls)
1057 {
1058         struct dlm_rsb *r;
1059         struct dlm_lkb *lkb;
1060         int do_cancel, do_warn;
1061         s64 wait_us;
1062
1063         for (;;) {
1064                 if (dlm_locking_stopped(ls))
1065                         break;
1066
1067                 do_cancel = 0;
1068                 do_warn = 0;
1069                 mutex_lock(&ls->ls_timeout_mutex);
1070                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1071
1072                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1073                                                         lkb->lkb_timestamp));
1074
1075                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1076                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1077                                 do_cancel = 1;
1078
1079                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1080                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1081                                 do_warn = 1;
1082
1083                         if (!do_cancel && !do_warn)
1084                                 continue;
1085                         hold_lkb(lkb);
1086                         break;
1087                 }
1088                 mutex_unlock(&ls->ls_timeout_mutex);
1089
1090                 if (!do_cancel && !do_warn)
1091                         break;
1092
1093                 r = lkb->lkb_resource;
1094                 hold_rsb(r);
1095                 lock_rsb(r);
1096
1097                 if (do_warn) {
1098                         /* clear flag so we only warn once */
1099                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1100                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1101                                 del_timeout(lkb);
1102                         dlm_timeout_warn(lkb);
1103                 }
1104
1105                 if (do_cancel) {
1106                         log_debug(ls, "timeout cancel %x node %d %s",
1107                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1108                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1109                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1110                         del_timeout(lkb);
1111                         _cancel_lock(r, lkb);
1112                 }
1113
1114                 unlock_rsb(r);
1115                 unhold_rsb(r);
1116                 dlm_put_lkb(lkb);
1117         }
1118 }
1119
1120 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1121    dlm_recoverd before checking/setting ls_recover_begin. */
1122
1123 void dlm_adjust_timeouts(struct dlm_ls *ls)
1124 {
1125         struct dlm_lkb *lkb;
1126         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1127
1128         ls->ls_recover_begin = 0;
1129         mutex_lock(&ls->ls_timeout_mutex);
1130         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1131                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1132         mutex_unlock(&ls->ls_timeout_mutex);
1133 }
1134
1135 /* lkb is master or local copy */
1136
1137 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1138 {
1139         int b, len = r->res_ls->ls_lvblen;
1140
1141         /* b=1 lvb returned to caller
1142            b=0 lvb written to rsb or invalidated
1143            b=-1 do nothing */
1144
1145         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1146
1147         if (b == 1) {
1148                 if (!lkb->lkb_lvbptr)
1149                         return;
1150
1151                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1152                         return;
1153
1154                 if (!r->res_lvbptr)
1155                         return;
1156
1157                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1158                 lkb->lkb_lvbseq = r->res_lvbseq;
1159
1160         } else if (b == 0) {
1161                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1162                         rsb_set_flag(r, RSB_VALNOTVALID);
1163                         return;
1164                 }
1165
1166                 if (!lkb->lkb_lvbptr)
1167                         return;
1168
1169                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1170                         return;
1171
1172                 if (!r->res_lvbptr)
1173                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1174
1175                 if (!r->res_lvbptr)
1176                         return;
1177
1178                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1179                 r->res_lvbseq++;
1180                 lkb->lkb_lvbseq = r->res_lvbseq;
1181                 rsb_clear_flag(r, RSB_VALNOTVALID);
1182         }
1183
1184         if (rsb_flag(r, RSB_VALNOTVALID))
1185                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1186 }
1187
1188 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1189 {
1190         if (lkb->lkb_grmode < DLM_LOCK_PW)
1191                 return;
1192
1193         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1194                 rsb_set_flag(r, RSB_VALNOTVALID);
1195                 return;
1196         }
1197
1198         if (!lkb->lkb_lvbptr)
1199                 return;
1200
1201         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1202                 return;
1203
1204         if (!r->res_lvbptr)
1205                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1206
1207         if (!r->res_lvbptr)
1208                 return;
1209
1210         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1211         r->res_lvbseq++;
1212         rsb_clear_flag(r, RSB_VALNOTVALID);
1213 }
1214
1215 /* lkb is process copy (pc) */
1216
1217 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1218                             struct dlm_message *ms)
1219 {
1220         int b;
1221
1222         if (!lkb->lkb_lvbptr)
1223                 return;
1224
1225         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1226                 return;
1227
1228         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1229         if (b == 1) {
1230                 int len = receive_extralen(ms);
1231                 if (len > DLM_RESNAME_MAXLEN)
1232                         len = DLM_RESNAME_MAXLEN;
1233                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1234                 lkb->lkb_lvbseq = ms->m_lvbseq;
1235         }
1236 }
1237
1238 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1239    remove_lock -- used for unlock, removes lkb from granted
1240    revert_lock -- used for cancel, moves lkb from convert to granted
1241    grant_lock  -- used for request and convert, adds lkb to granted or
1242                   moves lkb from convert or waiting to granted
1243
1244    Each of these is used for master or local copy lkb's.  There is
1245    also a _pc() variation used to make the corresponding change on
1246    a process copy (pc) lkb. */
1247
1248 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1249 {
1250         del_lkb(r, lkb);
1251         lkb->lkb_grmode = DLM_LOCK_IV;
1252         /* this unhold undoes the original ref from create_lkb()
1253            so this leads to the lkb being freed */
1254         unhold_lkb(lkb);
1255 }
1256
1257 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1258 {
1259         set_lvb_unlock(r, lkb);
1260         _remove_lock(r, lkb);
1261 }
1262
1263 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1264 {
1265         _remove_lock(r, lkb);
1266 }
1267
1268 /* returns: 0 did nothing
1269             1 moved lock to granted
1270            -1 removed lock */
1271
1272 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1273 {
1274         int rv = 0;
1275
1276         lkb->lkb_rqmode = DLM_LOCK_IV;
1277
1278         switch (lkb->lkb_status) {
1279         case DLM_LKSTS_GRANTED:
1280                 break;
1281         case DLM_LKSTS_CONVERT:
1282                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1283                 rv = 1;
1284                 break;
1285         case DLM_LKSTS_WAITING:
1286                 del_lkb(r, lkb);
1287                 lkb->lkb_grmode = DLM_LOCK_IV;
1288                 /* this unhold undoes the original ref from create_lkb()
1289                    so this leads to the lkb being freed */
1290                 unhold_lkb(lkb);
1291                 rv = -1;
1292                 break;
1293         default:
1294                 log_print("invalid status for revert %d", lkb->lkb_status);
1295         }
1296         return rv;
1297 }
1298
1299 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1300 {
1301         return revert_lock(r, lkb);
1302 }
1303
1304 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1305 {
1306         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1307                 lkb->lkb_grmode = lkb->lkb_rqmode;
1308                 if (lkb->lkb_status)
1309                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1310                 else
1311                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1312         }
1313
1314         lkb->lkb_rqmode = DLM_LOCK_IV;
1315 }
1316
1317 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1318 {
1319         set_lvb_lock(r, lkb);
1320         _grant_lock(r, lkb);
1321         lkb->lkb_highbast = 0;
1322 }
1323
1324 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1325                           struct dlm_message *ms)
1326 {
1327         set_lvb_lock_pc(r, lkb, ms);
1328         _grant_lock(r, lkb);
1329 }
1330
1331 /* called by grant_pending_locks() which means an async grant message must
1332    be sent to the requesting node in addition to granting the lock if the
1333    lkb belongs to a remote node. */
1334
1335 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1336 {
1337         grant_lock(r, lkb);
1338         if (is_master_copy(lkb))
1339                 send_grant(r, lkb);
1340         else
1341                 queue_cast(r, lkb, 0);
1342 }
1343
1344 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1345    change the granted/requested modes.  We're munging things accordingly in
1346    the process copy.
1347    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1348    conversion deadlock
1349    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1350    compatible with other granted locks */
1351
1352 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1353 {
1354         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1355                 log_print("munge_demoted %x invalid reply type %d",
1356                           lkb->lkb_id, ms->m_type);
1357                 return;
1358         }
1359
1360         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1361                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1362                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1363                 return;
1364         }
1365
1366         lkb->lkb_grmode = DLM_LOCK_NL;
1367 }
1368
1369 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1370 {
1371         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1372             ms->m_type != DLM_MSG_GRANT) {
1373                 log_print("munge_altmode %x invalid reply type %d",
1374                           lkb->lkb_id, ms->m_type);
1375                 return;
1376         }
1377
1378         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1379                 lkb->lkb_rqmode = DLM_LOCK_PR;
1380         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1381                 lkb->lkb_rqmode = DLM_LOCK_CW;
1382         else {
1383                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1384                 dlm_print_lkb(lkb);
1385         }
1386 }
1387
1388 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1389 {
1390         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1391                                            lkb_statequeue);
1392         if (lkb->lkb_id == first->lkb_id)
1393                 return 1;
1394
1395         return 0;
1396 }
1397
1398 /* Check if the given lkb conflicts with another lkb on the queue. */
1399
1400 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1401 {
1402         struct dlm_lkb *this;
1403
1404         list_for_each_entry(this, head, lkb_statequeue) {
1405                 if (this == lkb)
1406                         continue;
1407                 if (!modes_compat(this, lkb))
1408                         return 1;
1409         }
1410         return 0;
1411 }
1412
1413 /*
1414  * "A conversion deadlock arises with a pair of lock requests in the converting
1415  * queue for one resource.  The granted mode of each lock blocks the requested
1416  * mode of the other lock."
1417  *
1418  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1419  * convert queue from being granted, then deadlk/demote lkb.
1420  *
1421  * Example:
1422  * Granted Queue: empty
1423  * Convert Queue: NL->EX (first lock)
1424  *                PR->EX (second lock)
1425  *
1426  * The first lock can't be granted because of the granted mode of the second
1427  * lock and the second lock can't be granted because it's not first in the
1428  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1429  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1430  * flag set and return DEMOTED in the lksb flags.
1431  *
1432  * Originally, this function detected conv-deadlk in a more limited scope:
1433  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1434  * - if lkb1 was the first entry in the queue (not just earlier), and was
1435  *   blocked by the granted mode of lkb2, and there was nothing on the
1436  *   granted queue preventing lkb1 from being granted immediately, i.e.
1437  *   lkb2 was the only thing preventing lkb1 from being granted.
1438  *
1439  * That second condition meant we'd only say there was conv-deadlk if
1440  * resolving it (by demotion) would lead to the first lock on the convert
1441  * queue being granted right away.  It allowed conversion deadlocks to exist
1442  * between locks on the convert queue while they couldn't be granted anyway.
1443  *
1444  * Now, we detect and take action on conversion deadlocks immediately when
1445  * they're created, even if they may not be immediately consequential.  If
1446  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1447  * mode that would prevent lkb1's conversion from being granted, we do a
1448  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1449  * I think this means that the lkb_is_ahead condition below should always
1450  * be zero, i.e. there will never be conv-deadlk between two locks that are
1451  * both already on the convert queue.
1452  */
1453
1454 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1455 {
1456         struct dlm_lkb *lkb1;
1457         int lkb_is_ahead = 0;
1458
1459         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1460                 if (lkb1 == lkb2) {
1461                         lkb_is_ahead = 1;
1462                         continue;
1463                 }
1464
1465                 if (!lkb_is_ahead) {
1466                         if (!modes_compat(lkb2, lkb1))
1467                                 return 1;
1468                 } else {
1469                         if (!modes_compat(lkb2, lkb1) &&
1470                             !modes_compat(lkb1, lkb2))
1471                                 return 1;
1472                 }
1473         }
1474         return 0;
1475 }
1476
1477 /*
1478  * Return 1 if the lock can be granted, 0 otherwise.
1479  * Also detect and resolve conversion deadlocks.
1480  *
1481  * lkb is the lock to be granted
1482  *
1483  * now is 1 if the function is being called in the context of the
1484  * immediate request, it is 0 if called later, after the lock has been
1485  * queued.
1486  *
1487  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1488  */
1489
1490 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1491 {
1492         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1493
1494         /*
1495          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1496          * a new request for a NL mode lock being blocked.
1497          *
1498          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1499          * request, then it would be granted.  In essence, the use of this flag
1500          * tells the Lock Manager to expedite theis request by not considering
1501          * what may be in the CONVERTING or WAITING queues...  As of this
1502          * writing, the EXPEDITE flag can be used only with new requests for NL
1503          * mode locks.  This flag is not valid for conversion requests.
1504          *
1505          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1506          * conversion or used with a non-NL requested mode.  We also know an
1507          * EXPEDITE request is always granted immediately, so now must always
1508          * be 1.  The full condition to grant an expedite request: (now &&
1509          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1510          * therefore be shortened to just checking the flag.
1511          */
1512
1513         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1514                 return 1;
1515
1516         /*
1517          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1518          * added to the remaining conditions.
1519          */
1520
1521         if (queue_conflict(&r->res_grantqueue, lkb))
1522                 goto out;
1523
1524         /*
1525          * 6-3: By default, a conversion request is immediately granted if the
1526          * requested mode is compatible with the modes of all other granted
1527          * locks
1528          */
1529
1530         if (queue_conflict(&r->res_convertqueue, lkb))
1531                 goto out;
1532
1533         /*
1534          * 6-5: But the default algorithm for deciding whether to grant or
1535          * queue conversion requests does not by itself guarantee that such
1536          * requests are serviced on a "first come first serve" basis.  This, in
1537          * turn, can lead to a phenomenon known as "indefinate postponement".
1538          *
1539          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1540          * the system service employed to request a lock conversion.  This flag
1541          * forces certain conversion requests to be queued, even if they are
1542          * compatible with the granted modes of other locks on the same
1543          * resource.  Thus, the use of this flag results in conversion requests
1544          * being ordered on a "first come first servce" basis.
1545          *
1546          * DCT: This condition is all about new conversions being able to occur
1547          * "in place" while the lock remains on the granted queue (assuming
1548          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1549          * doesn't _have_ to go onto the convert queue where it's processed in
1550          * order.  The "now" variable is necessary to distinguish converts
1551          * being received and processed for the first time now, because once a
1552          * convert is moved to the conversion queue the condition below applies
1553          * requiring fifo granting.
1554          */
1555
1556         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1557                 return 1;
1558
1559         /*
1560          * The NOORDER flag is set to avoid the standard vms rules on grant
1561          * order.
1562          */
1563
1564         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1565                 return 1;
1566
1567         /*
1568          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1569          * granted until all other conversion requests ahead of it are granted
1570          * and/or canceled.
1571          */
1572
1573         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1574                 return 1;
1575
1576         /*
1577          * 6-4: By default, a new request is immediately granted only if all
1578          * three of the following conditions are satisfied when the request is
1579          * issued:
1580          * - The queue of ungranted conversion requests for the resource is
1581          *   empty.
1582          * - The queue of ungranted new requests for the resource is empty.
1583          * - The mode of the new request is compatible with the most
1584          *   restrictive mode of all granted locks on the resource.
1585          */
1586
1587         if (now && !conv && list_empty(&r->res_convertqueue) &&
1588             list_empty(&r->res_waitqueue))
1589                 return 1;
1590
1591         /*
1592          * 6-4: Once a lock request is in the queue of ungranted new requests,
1593          * it cannot be granted until the queue of ungranted conversion
1594          * requests is empty, all ungranted new requests ahead of it are
1595          * granted and/or canceled, and it is compatible with the granted mode
1596          * of the most restrictive lock granted on the resource.
1597          */
1598
1599         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1600             first_in_list(lkb, &r->res_waitqueue))
1601                 return 1;
1602  out:
1603         return 0;
1604 }
1605
1606 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1607                           int *err)
1608 {
1609         int rv;
1610         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1611         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1612
1613         if (err)
1614                 *err = 0;
1615
1616         rv = _can_be_granted(r, lkb, now);
1617         if (rv)
1618                 goto out;
1619
1620         /*
1621          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1622          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1623          * cancels one of the locks.
1624          */
1625
1626         if (is_convert && can_be_queued(lkb) &&
1627             conversion_deadlock_detect(r, lkb)) {
1628                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1629                         lkb->lkb_grmode = DLM_LOCK_NL;
1630                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1631                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1632                         if (err)
1633                                 *err = -EDEADLK;
1634                         else {
1635                                 log_print("can_be_granted deadlock %x now %d",
1636                                           lkb->lkb_id, now);
1637                                 dlm_dump_rsb(r);
1638                         }
1639                 }
1640                 goto out;
1641         }
1642
1643         /*
1644          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1645          * to grant a request in a mode other than the normal rqmode.  It's a
1646          * simple way to provide a big optimization to applications that can
1647          * use them.
1648          */
1649
1650         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1651                 alt = DLM_LOCK_PR;
1652         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1653                 alt = DLM_LOCK_CW;
1654
1655         if (alt) {
1656                 lkb->lkb_rqmode = alt;
1657                 rv = _can_be_granted(r, lkb, now);
1658                 if (rv)
1659                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1660                 else
1661                         lkb->lkb_rqmode = rqmode;
1662         }
1663  out:
1664         return rv;
1665 }
1666
1667 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1668    for locks pending on the convert list.  Once verified (watch for these
1669    log_prints), we should be able to just call _can_be_granted() and not
1670    bother with the demote/deadlk cases here (and there's no easy way to deal
1671    with a deadlk here, we'd have to generate something like grant_lock with
1672    the deadlk error.) */
1673
1674 /* Returns the highest requested mode of all blocked conversions; sets
1675    cw if there's a blocked conversion to DLM_LOCK_CW. */
1676
1677 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1678 {
1679         struct dlm_lkb *lkb, *s;
1680         int hi, demoted, quit, grant_restart, demote_restart;
1681         int deadlk;
1682
1683         quit = 0;
1684  restart:
1685         grant_restart = 0;
1686         demote_restart = 0;
1687         hi = DLM_LOCK_IV;
1688
1689         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1690                 demoted = is_demoted(lkb);
1691                 deadlk = 0;
1692
1693                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1694                         grant_lock_pending(r, lkb);
1695                         grant_restart = 1;
1696                         continue;
1697                 }
1698
1699                 if (!demoted && is_demoted(lkb)) {
1700                         log_print("WARN: pending demoted %x node %d %s",
1701                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1702                         demote_restart = 1;
1703                         continue;
1704                 }
1705
1706                 if (deadlk) {
1707                         log_print("WARN: pending deadlock %x node %d %s",
1708                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1709                         dlm_dump_rsb(r);
1710                         continue;
1711                 }
1712
1713                 hi = max_t(int, lkb->lkb_rqmode, hi);
1714
1715                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1716                         *cw = 1;
1717         }
1718
1719         if (grant_restart)
1720                 goto restart;
1721         if (demote_restart && !quit) {
1722                 quit = 1;
1723                 goto restart;
1724         }
1725
1726         return max_t(int, high, hi);
1727 }
1728
1729 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1730 {
1731         struct dlm_lkb *lkb, *s;
1732
1733         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1734                 if (can_be_granted(r, lkb, 0, NULL))
1735                         grant_lock_pending(r, lkb);
1736                 else {
1737                         high = max_t(int, lkb->lkb_rqmode, high);
1738                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1739                                 *cw = 1;
1740                 }
1741         }
1742
1743         return high;
1744 }
1745
1746 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1747    on either the convert or waiting queue.
1748    high is the largest rqmode of all locks blocked on the convert or
1749    waiting queue. */
1750
1751 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1752 {
1753         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1754                 if (gr->lkb_highbast < DLM_LOCK_EX)
1755                         return 1;
1756                 return 0;
1757         }
1758
1759         if (gr->lkb_highbast < high &&
1760             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1761                 return 1;
1762         return 0;
1763 }
1764
1765 static void grant_pending_locks(struct dlm_rsb *r)
1766 {
1767         struct dlm_lkb *lkb, *s;
1768         int high = DLM_LOCK_IV;
1769         int cw = 0;
1770
1771         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1772
1773         high = grant_pending_convert(r, high, &cw);
1774         high = grant_pending_wait(r, high, &cw);
1775
1776         if (high == DLM_LOCK_IV)
1777                 return;
1778
1779         /*
1780          * If there are locks left on the wait/convert queue then send blocking
1781          * ASTs to granted locks based on the largest requested mode (high)
1782          * found above.
1783          */
1784
1785         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1786                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1787                         if (cw && high == DLM_LOCK_PR &&
1788                             lkb->lkb_grmode == DLM_LOCK_PR)
1789                                 queue_bast(r, lkb, DLM_LOCK_CW);
1790                         else
1791                                 queue_bast(r, lkb, high);
1792                         lkb->lkb_highbast = high;
1793                 }
1794         }
1795 }
1796
1797 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1798 {
1799         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1800             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1801                 if (gr->lkb_highbast < DLM_LOCK_EX)
1802                         return 1;
1803                 return 0;
1804         }
1805
1806         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1807                 return 1;
1808         return 0;
1809 }
1810
1811 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1812                             struct dlm_lkb *lkb)
1813 {
1814         struct dlm_lkb *gr;
1815
1816         list_for_each_entry(gr, head, lkb_statequeue) {
1817                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1818                         queue_bast(r, gr, lkb->lkb_rqmode);
1819                         gr->lkb_highbast = lkb->lkb_rqmode;
1820                 }
1821         }
1822 }
1823
1824 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1825 {
1826         send_bast_queue(r, &r->res_grantqueue, lkb);
1827 }
1828
1829 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1830 {
1831         send_bast_queue(r, &r->res_grantqueue, lkb);
1832         send_bast_queue(r, &r->res_convertqueue, lkb);
1833 }
1834
1835 /* set_master(r, lkb) -- set the master nodeid of a resource
1836
1837    The purpose of this function is to set the nodeid field in the given
1838    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1839    known, it can just be copied to the lkb and the function will return
1840    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1841    before it can be copied to the lkb.
1842
1843    When the rsb nodeid is being looked up remotely, the initial lkb
1844    causing the lookup is kept on the ls_waiters list waiting for the
1845    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1846    on the rsb's res_lookup list until the master is verified.
1847
1848    Return values:
1849    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1850    1: the rsb master is not available and the lkb has been placed on
1851       a wait queue
1852 */
1853
1854 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1855 {
1856         struct dlm_ls *ls = r->res_ls;
1857         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1858
1859         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1860                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1861                 r->res_first_lkid = lkb->lkb_id;
1862                 lkb->lkb_nodeid = r->res_nodeid;
1863                 return 0;
1864         }
1865
1866         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1867                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1868                 return 1;
1869         }
1870
1871         if (r->res_nodeid == 0) {
1872                 lkb->lkb_nodeid = 0;
1873                 return 0;
1874         }
1875
1876         if (r->res_nodeid > 0) {
1877                 lkb->lkb_nodeid = r->res_nodeid;
1878                 return 0;
1879         }
1880
1881         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1882
1883         dir_nodeid = dlm_dir_nodeid(r);
1884
1885         if (dir_nodeid != our_nodeid) {
1886                 r->res_first_lkid = lkb->lkb_id;
1887                 send_lookup(r, lkb);
1888                 return 1;
1889         }
1890
1891         for (i = 0; i < 2; i++) {
1892                 /* It's possible for dlm_scand to remove an old rsb for
1893                    this same resource from the toss list, us to create
1894                    a new one, look up the master locally, and find it
1895                    already exists just before dlm_scand does the
1896                    dir_remove() on the previous rsb. */
1897
1898                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1899                                        r->res_length, &ret_nodeid);
1900                 if (!error)
1901                         break;
1902                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1903                 schedule();
1904         }
1905         if (error && error != -EEXIST)
1906                 return error;
1907
1908         if (ret_nodeid == our_nodeid) {
1909                 r->res_first_lkid = 0;
1910                 r->res_nodeid = 0;
1911                 lkb->lkb_nodeid = 0;
1912         } else {
1913                 r->res_first_lkid = lkb->lkb_id;
1914                 r->res_nodeid = ret_nodeid;
1915                 lkb->lkb_nodeid = ret_nodeid;
1916         }
1917         return 0;
1918 }
1919
1920 static void process_lookup_list(struct dlm_rsb *r)
1921 {
1922         struct dlm_lkb *lkb, *safe;
1923
1924         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1925                 list_del_init(&lkb->lkb_rsb_lookup);
1926                 _request_lock(r, lkb);
1927                 schedule();
1928         }
1929 }
1930
1931 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1932
1933 static void confirm_master(struct dlm_rsb *r, int error)
1934 {
1935         struct dlm_lkb *lkb;
1936
1937         if (!r->res_first_lkid)
1938                 return;
1939
1940         switch (error) {
1941         case 0:
1942         case -EINPROGRESS:
1943                 r->res_first_lkid = 0;
1944                 process_lookup_list(r);
1945                 break;
1946
1947         case -EAGAIN:
1948         case -EBADR:
1949         case -ENOTBLK:
1950                 /* the remote request failed and won't be retried (it was
1951                    a NOQUEUE, or has been canceled/unlocked); make a waiting
1952                    lkb the first_lkid */
1953
1954                 r->res_first_lkid = 0;
1955
1956                 if (!list_empty(&r->res_lookup)) {
1957                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1958                                          lkb_rsb_lookup);
1959                         list_del_init(&lkb->lkb_rsb_lookup);
1960                         r->res_first_lkid = lkb->lkb_id;
1961                         _request_lock(r, lkb);
1962                 }
1963                 break;
1964
1965         default:
1966                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1967         }
1968 }
1969
1970 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1971                          int namelen, unsigned long timeout_cs,
1972                          void (*ast) (void *astparam),
1973                          void *astparam,
1974                          void (*bast) (void *astparam, int mode),
1975                          struct dlm_args *args)
1976 {
1977         int rv = -EINVAL;
1978
1979         /* check for invalid arg usage */
1980
1981         if (mode < 0 || mode > DLM_LOCK_EX)
1982                 goto out;
1983
1984         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1985                 goto out;
1986
1987         if (flags & DLM_LKF_CANCEL)
1988                 goto out;
1989
1990         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1991                 goto out;
1992
1993         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1994                 goto out;
1995
1996         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1997                 goto out;
1998
1999         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2000                 goto out;
2001
2002         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2003                 goto out;
2004
2005         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2006                 goto out;
2007
2008         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2009                 goto out;
2010
2011         if (!ast || !lksb)
2012                 goto out;
2013
2014         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2015                 goto out;
2016
2017         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2018                 goto out;
2019
2020         /* these args will be copied to the lkb in validate_lock_args,
2021            it cannot be done now because when converting locks, fields in
2022            an active lkb cannot be modified before locking the rsb */
2023
2024         args->flags = flags;
2025         args->astfn = ast;
2026         args->astparam = astparam;
2027         args->bastfn = bast;
2028         args->timeout = timeout_cs;
2029         args->mode = mode;
2030         args->lksb = lksb;
2031         rv = 0;
2032  out:
2033         return rv;
2034 }
2035
2036 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2037 {
2038         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2039                       DLM_LKF_FORCEUNLOCK))
2040                 return -EINVAL;
2041
2042         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2043                 return -EINVAL;
2044
2045         args->flags = flags;
2046         args->astparam = astarg;
2047         return 0;
2048 }
2049
2050 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2051                               struct dlm_args *args)
2052 {
2053         int rv = -EINVAL;
2054
2055         if (args->flags & DLM_LKF_CONVERT) {
2056                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2057                         goto out;
2058
2059                 if (args->flags & DLM_LKF_QUECVT &&
2060                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2061                         goto out;
2062
2063                 rv = -EBUSY;
2064                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2065                         goto out;
2066
2067                 if (lkb->lkb_wait_type)
2068                         goto out;
2069
2070                 if (is_overlap(lkb))
2071                         goto out;
2072         }
2073
2074         lkb->lkb_exflags = args->flags;
2075         lkb->lkb_sbflags = 0;
2076         lkb->lkb_astfn = args->astfn;
2077         lkb->lkb_astparam = args->astparam;
2078         lkb->lkb_bastfn = args->bastfn;
2079         lkb->lkb_rqmode = args->mode;
2080         lkb->lkb_lksb = args->lksb;
2081         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2082         lkb->lkb_ownpid = (int) current->pid;
2083         lkb->lkb_timeout_cs = args->timeout;
2084         rv = 0;
2085  out:
2086         return rv;
2087 }
2088
2089 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2090    for success */
2091
2092 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2093    because there may be a lookup in progress and it's valid to do
2094    cancel/unlockf on it */
2095
2096 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2097 {
2098         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2099         int rv = -EINVAL;
2100
2101         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2102                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2103                 dlm_print_lkb(lkb);
2104                 goto out;
2105         }
2106
2107         /* an lkb may still exist even though the lock is EOL'ed due to a
2108            cancel, unlock or failed noqueue request; an app can't use these
2109            locks; return same error as if the lkid had not been found at all */
2110
2111         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2112                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2113                 rv = -ENOENT;
2114                 goto out;
2115         }
2116
2117         /* an lkb may be waiting for an rsb lookup to complete where the
2118            lookup was initiated by another lock */
2119
2120         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2121                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2122                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2123                         list_del_init(&lkb->lkb_rsb_lookup);
2124                         queue_cast(lkb->lkb_resource, lkb,
2125                                    args->flags & DLM_LKF_CANCEL ?
2126                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2127                         unhold_lkb(lkb); /* undoes create_lkb() */
2128                 }
2129                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2130                 rv = -EBUSY;
2131                 goto out;
2132         }
2133
2134         /* cancel not allowed with another cancel/unlock in progress */
2135
2136         if (args->flags & DLM_LKF_CANCEL) {
2137                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2138                         goto out;
2139
2140                 if (is_overlap(lkb))
2141                         goto out;
2142
2143                 /* don't let scand try to do a cancel */
2144                 del_timeout(lkb);
2145
2146                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2147                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2148                         rv = -EBUSY;
2149                         goto out;
2150                 }
2151
2152                 switch (lkb->lkb_wait_type) {
2153                 case DLM_MSG_LOOKUP:
2154                 case DLM_MSG_REQUEST:
2155                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2156                         rv = -EBUSY;
2157                         goto out;
2158                 case DLM_MSG_UNLOCK:
2159                 case DLM_MSG_CANCEL:
2160                         goto out;
2161                 }
2162                 /* add_to_waiters() will set OVERLAP_CANCEL */
2163                 goto out_ok;
2164         }
2165
2166         /* do we need to allow a force-unlock if there's a normal unlock
2167            already in progress?  in what conditions could the normal unlock
2168            fail such that we'd want to send a force-unlock to be sure? */
2169
2170         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2171                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2172                         goto out;
2173
2174                 if (is_overlap_unlock(lkb))
2175                         goto out;
2176
2177                 /* don't let scand try to do a cancel */
2178                 del_timeout(lkb);
2179
2180                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2181                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2182                         rv = -EBUSY;
2183                         goto out;
2184                 }
2185
2186                 switch (lkb->lkb_wait_type) {
2187                 case DLM_MSG_LOOKUP:
2188                 case DLM_MSG_REQUEST:
2189                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2190                         rv = -EBUSY;
2191                         goto out;
2192                 case DLM_MSG_UNLOCK:
2193                         goto out;
2194                 }
2195                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2196                 goto out_ok;
2197         }
2198
2199         /* normal unlock not allowed if there's any op in progress */
2200         rv = -EBUSY;
2201         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2202                 goto out;
2203
2204  out_ok:
2205         /* an overlapping op shouldn't blow away exflags from other op */
2206         lkb->lkb_exflags |= args->flags;
2207         lkb->lkb_sbflags = 0;
2208         lkb->lkb_astparam = args->astparam;
2209         rv = 0;
2210  out:
2211         if (rv)
2212                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2213                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2214                           args->flags, lkb->lkb_wait_type,
2215                           lkb->lkb_resource->res_name);
2216         return rv;
2217 }
2218
2219 /*
2220  * Four stage 4 varieties:
2221  * do_request(), do_convert(), do_unlock(), do_cancel()
2222  * These are called on the master node for the given lock and
2223  * from the central locking logic.
2224  */
2225
2226 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2227 {
2228         int error = 0;
2229
2230         if (can_be_granted(r, lkb, 1, NULL)) {
2231                 grant_lock(r, lkb);
2232                 queue_cast(r, lkb, 0);
2233                 goto out;
2234         }
2235
2236         if (can_be_queued(lkb)) {
2237                 error = -EINPROGRESS;
2238                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2239                 send_blocking_asts(r, lkb);
2240                 add_timeout(lkb);
2241                 goto out;
2242         }
2243
2244         error = -EAGAIN;
2245         if (force_blocking_asts(lkb))
2246                 send_blocking_asts_all(r, lkb);
2247         queue_cast(r, lkb, -EAGAIN);
2248
2249  out:
2250         return error;
2251 }
2252
2253 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2254 {
2255         int error = 0;
2256         int deadlk = 0;
2257
2258         /* changing an existing lock may allow others to be granted */
2259
2260         if (can_be_granted(r, lkb, 1, &deadlk)) {
2261                 grant_lock(r, lkb);
2262                 queue_cast(r, lkb, 0);
2263                 grant_pending_locks(r);
2264                 goto out;
2265         }
2266
2267         /* can_be_granted() detected that this lock would block in a conversion
2268            deadlock, so we leave it on the granted queue and return EDEADLK in
2269            the ast for the convert. */
2270
2271         if (deadlk) {
2272                 /* it's left on the granted queue */
2273                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2274                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2275                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2276                 revert_lock(r, lkb);
2277                 queue_cast(r, lkb, -EDEADLK);
2278                 error = -EDEADLK;
2279                 goto out;
2280         }
2281
2282         /* is_demoted() means the can_be_granted() above set the grmode
2283            to NL, and left us on the granted queue.  This auto-demotion
2284            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2285            now grantable.  We have to try to grant other converting locks
2286            before we try again to grant this one. */
2287
2288         if (is_demoted(lkb)) {
2289                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2290                 if (_can_be_granted(r, lkb, 1)) {
2291                         grant_lock(r, lkb);
2292                         queue_cast(r, lkb, 0);
2293                         grant_pending_locks(r);
2294                         goto out;
2295                 }
2296                 /* else fall through and move to convert queue */
2297         }
2298
2299         if (can_be_queued(lkb)) {
2300                 error = -EINPROGRESS;
2301                 del_lkb(r, lkb);
2302                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2303                 send_blocking_asts(r, lkb);
2304                 add_timeout(lkb);
2305                 goto out;
2306         }
2307
2308         error = -EAGAIN;
2309         if (force_blocking_asts(lkb))
2310                 send_blocking_asts_all(r, lkb);
2311         queue_cast(r, lkb, -EAGAIN);
2312
2313  out:
2314         return error;
2315 }
2316
2317 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2318 {
2319         remove_lock(r, lkb);
2320         queue_cast(r, lkb, -DLM_EUNLOCK);
2321         grant_pending_locks(r);
2322         return -DLM_EUNLOCK;
2323 }
2324
2325 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2326  
2327 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2328 {
2329         int error;
2330
2331         error = revert_lock(r, lkb);
2332         if (error) {
2333                 queue_cast(r, lkb, -DLM_ECANCEL);
2334                 grant_pending_locks(r);
2335                 return -DLM_ECANCEL;
2336         }
2337         return 0;
2338 }
2339
2340 /*
2341  * Four stage 3 varieties:
2342  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2343  */
2344
2345 /* add a new lkb to a possibly new rsb, called by requesting process */
2346
2347 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2348 {
2349         int error;
2350
2351         /* set_master: sets lkb nodeid from r */
2352
2353         error = set_master(r, lkb);
2354         if (error < 0)
2355                 goto out;
2356         if (error) {
2357                 error = 0;
2358                 goto out;
2359         }
2360
2361         if (is_remote(r))
2362                 /* receive_request() calls do_request() on remote node */
2363                 error = send_request(r, lkb);
2364         else
2365                 error = do_request(r, lkb);
2366  out:
2367         return error;
2368 }
2369
2370 /* change some property of an existing lkb, e.g. mode */
2371
2372 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2373 {
2374         int error;
2375
2376         if (is_remote(r))
2377                 /* receive_convert() calls do_convert() on remote node */
2378                 error = send_convert(r, lkb);
2379         else
2380                 error = do_convert(r, lkb);
2381
2382         return error;
2383 }
2384
2385 /* remove an existing lkb from the granted queue */
2386
2387 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2388 {
2389         int error;
2390
2391         if (is_remote(r))
2392                 /* receive_unlock() calls do_unlock() on remote node */
2393                 error = send_unlock(r, lkb);
2394         else
2395                 error = do_unlock(r, lkb);
2396
2397         return error;
2398 }
2399
2400 /* remove an existing lkb from the convert or wait queue */
2401
2402 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2403 {
2404         int error;
2405
2406         if (is_remote(r))
2407                 /* receive_cancel() calls do_cancel() on remote node */
2408                 error = send_cancel(r, lkb);
2409         else
2410                 error = do_cancel(r, lkb);
2411
2412         return error;
2413 }
2414
2415 /*
2416  * Four stage 2 varieties:
2417  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2418  */
2419
2420 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2421                         int len, struct dlm_args *args)
2422 {
2423         struct dlm_rsb *r;
2424         int error;
2425
2426         error = validate_lock_args(ls, lkb, args);
2427         if (error)
2428                 goto out;
2429
2430         error = find_rsb(ls, name, len, R_CREATE, &r);
2431         if (error)
2432                 goto out;
2433
2434         lock_rsb(r);
2435
2436         attach_lkb(r, lkb);
2437         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2438
2439         error = _request_lock(r, lkb);
2440
2441         unlock_rsb(r);
2442         put_rsb(r);
2443
2444  out:
2445         return error;
2446 }
2447
2448 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2449                         struct dlm_args *args)
2450 {
2451         struct dlm_rsb *r;
2452         int error;
2453
2454         r = lkb->lkb_resource;
2455
2456         hold_rsb(r);
2457         lock_rsb(r);
2458
2459         error = validate_lock_args(ls, lkb, args);
2460         if (error)
2461                 goto out;
2462
2463         error = _convert_lock(r, lkb);
2464  out:
2465         unlock_rsb(r);
2466         put_rsb(r);
2467         return error;
2468 }
2469
2470 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2471                        struct dlm_args *args)
2472 {
2473         struct dlm_rsb *r;
2474         int error;
2475
2476         r = lkb->lkb_resource;
2477
2478         hold_rsb(r);
2479         lock_rsb(r);
2480
2481         error = validate_unlock_args(lkb, args);
2482         if (error)
2483                 goto out;
2484
2485         error = _unlock_lock(r, lkb);
2486  out:
2487         unlock_rsb(r);
2488         put_rsb(r);
2489         return error;
2490 }
2491
2492 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2493                        struct dlm_args *args)
2494 {
2495         struct dlm_rsb *r;
2496         int error;
2497
2498         r = lkb->lkb_resource;
2499
2500         hold_rsb(r);
2501         lock_rsb(r);
2502
2503         error = validate_unlock_args(lkb, args);
2504         if (error)
2505                 goto out;
2506
2507         error = _cancel_lock(r, lkb);
2508  out:
2509         unlock_rsb(r);
2510         put_rsb(r);
2511         return error;
2512 }
2513
2514 /*
2515  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2516  */
2517
2518 int dlm_lock(dlm_lockspace_t *lockspace,
2519              int mode,
2520              struct dlm_lksb *lksb,
2521              uint32_t flags,
2522              void *name,
2523              unsigned int namelen,
2524              uint32_t parent_lkid,
2525              void (*ast) (void *astarg),
2526              void *astarg,
2527              void (*bast) (void *astarg, int mode))
2528 {
2529         struct dlm_ls *ls;
2530         struct dlm_lkb *lkb;
2531         struct dlm_args args;
2532         int error, convert = flags & DLM_LKF_CONVERT;
2533
2534         ls = dlm_find_lockspace_local(lockspace);
2535         if (!ls)
2536                 return -EINVAL;
2537
2538         dlm_lock_recovery(ls);
2539
2540         if (convert)
2541                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2542         else
2543                 error = create_lkb(ls, &lkb);
2544
2545         if (error)
2546                 goto out;
2547
2548         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2549                               astarg, bast, &args);
2550         if (error)
2551                 goto out_put;
2552
2553         if (convert)
2554                 error = convert_lock(ls, lkb, &args);
2555         else
2556                 error = request_lock(ls, lkb, name, namelen, &args);
2557
2558         if (error == -EINPROGRESS)
2559                 error = 0;
2560  out_put:
2561         if (convert || error)
2562                 __put_lkb(ls, lkb);
2563         if (error == -EAGAIN || error == -EDEADLK)
2564                 error = 0;
2565  out:
2566         dlm_unlock_recovery(ls);
2567         dlm_put_lockspace(ls);
2568         return error;
2569 }
2570
2571 int dlm_unlock(dlm_lockspace_t *lockspace,
2572                uint32_t lkid,
2573                uint32_t flags,
2574                struct dlm_lksb *lksb,
2575                void *astarg)
2576 {
2577         struct dlm_ls *ls;
2578         struct dlm_lkb *lkb;
2579         struct dlm_args args;
2580         int error;
2581
2582         ls = dlm_find_lockspace_local(lockspace);
2583         if (!ls)
2584                 return -EINVAL;
2585
2586         dlm_lock_recovery(ls);
2587
2588         error = find_lkb(ls, lkid, &lkb);
2589         if (error)
2590                 goto out;
2591
2592         error = set_unlock_args(flags, astarg, &args);
2593         if (error)
2594                 goto out_put;
2595
2596         if (flags & DLM_LKF_CANCEL)
2597                 error = cancel_lock(ls, lkb, &args);
2598         else
2599                 error = unlock_lock(ls, lkb, &args);
2600
2601         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2602                 error = 0;
2603         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2604                 error = 0;
2605  out_put:
2606         dlm_put_lkb(lkb);
2607  out:
2608         dlm_unlock_recovery(ls);
2609         dlm_put_lockspace(ls);
2610         return error;
2611 }
2612
2613 /*
2614  * send/receive routines for remote operations and replies
2615  *
2616  * send_args
2617  * send_common
2618  * send_request                 receive_request
2619  * send_convert                 receive_convert
2620  * send_unlock                  receive_unlock
2621  * send_cancel                  receive_cancel
2622  * send_grant                   receive_grant
2623  * send_bast                    receive_bast
2624  * send_lookup                  receive_lookup
2625  * send_remove                  receive_remove
2626  *
2627  *                              send_common_reply
2628  * receive_request_reply        send_request_reply
2629  * receive_convert_reply        send_convert_reply
2630  * receive_unlock_reply         send_unlock_reply
2631  * receive_cancel_reply         send_cancel_reply
2632  * receive_lookup_reply         send_lookup_reply
2633  */
2634
2635 static int _create_message(struct dlm_ls *ls, int mb_len,
2636                            int to_nodeid, int mstype,
2637                            struct dlm_message **ms_ret,
2638                            struct dlm_mhandle **mh_ret)
2639 {
2640         struct dlm_message *ms;
2641         struct dlm_mhandle *mh;
2642         char *mb;
2643
2644         /* get_buffer gives us a message handle (mh) that we need to
2645            pass into lowcomms_commit and a message buffer (mb) that we
2646            write our data into */
2647
2648         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2649         if (!mh)
2650                 return -ENOBUFS;
2651
2652         memset(mb, 0, mb_len);
2653
2654         ms = (struct dlm_message *) mb;
2655
2656         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2657         ms->m_header.h_lockspace = ls->ls_global_id;
2658         ms->m_header.h_nodeid = dlm_our_nodeid();
2659         ms->m_header.h_length = mb_len;
2660         ms->m_header.h_cmd = DLM_MSG;
2661
2662         ms->m_type = mstype;
2663
2664         *mh_ret = mh;
2665         *ms_ret = ms;
2666         return 0;
2667 }
2668
2669 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2670                           int to_nodeid, int mstype,
2671                           struct dlm_message **ms_ret,
2672                           struct dlm_mhandle **mh_ret)
2673 {
2674         int mb_len = sizeof(struct dlm_message);
2675
2676         switch (mstype) {
2677         case DLM_MSG_REQUEST:
2678         case DLM_MSG_LOOKUP:
2679         case DLM_MSG_REMOVE:
2680                 mb_len += r->res_length;
2681                 break;
2682         case DLM_MSG_CONVERT:
2683         case DLM_MSG_UNLOCK:
2684         case DLM_MSG_REQUEST_REPLY:
2685         case DLM_MSG_CONVERT_REPLY:
2686         case DLM_MSG_GRANT:
2687                 if (lkb && lkb->lkb_lvbptr)
2688                         mb_len += r->res_ls->ls_lvblen;
2689                 break;
2690         }
2691
2692         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2693                                ms_ret, mh_ret);
2694 }
2695
2696 /* further lowcomms enhancements or alternate implementations may make
2697    the return value from this function useful at some point */
2698
2699 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2700 {
2701         dlm_message_out(ms);
2702         dlm_lowcomms_commit_buffer(mh);
2703         return 0;
2704 }
2705
2706 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2707                       struct dlm_message *ms)
2708 {
2709         ms->m_nodeid   = lkb->lkb_nodeid;
2710         ms->m_pid      = lkb->lkb_ownpid;
2711         ms->m_lkid     = lkb->lkb_id;
2712         ms->m_remid    = lkb->lkb_remid;
2713         ms->m_exflags  = lkb->lkb_exflags;
2714         ms->m_sbflags  = lkb->lkb_sbflags;
2715         ms->m_flags    = lkb->lkb_flags;
2716         ms->m_lvbseq   = lkb->lkb_lvbseq;
2717         ms->m_status   = lkb->lkb_status;
2718         ms->m_grmode   = lkb->lkb_grmode;
2719         ms->m_rqmode   = lkb->lkb_rqmode;
2720         ms->m_hash     = r->res_hash;
2721
2722         /* m_result and m_bastmode are set from function args,
2723            not from lkb fields */
2724
2725         if (lkb->lkb_bastfn)
2726                 ms->m_asts |= AST_BAST;
2727         if (lkb->lkb_astfn)
2728                 ms->m_asts |= AST_COMP;
2729
2730         /* compare with switch in create_message; send_remove() doesn't
2731            use send_args() */
2732
2733         switch (ms->m_type) {
2734         case DLM_MSG_REQUEST:
2735         case DLM_MSG_LOOKUP:
2736                 memcpy(ms->m_extra, r->res_name, r->res_length);
2737                 break;
2738         case DLM_MSG_CONVERT:
2739         case DLM_MSG_UNLOCK:
2740         case DLM_MSG_REQUEST_REPLY:
2741         case DLM_MSG_CONVERT_REPLY:
2742         case DLM_MSG_GRANT:
2743                 if (!lkb->lkb_lvbptr)
2744                         break;
2745                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2746                 break;
2747         }
2748 }
2749
2750 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2751 {
2752         struct dlm_message *ms;
2753         struct dlm_mhandle *mh;
2754         int to_nodeid, error;
2755
2756         error = add_to_waiters(lkb, mstype);
2757         if (error)
2758                 return error;
2759
2760         to_nodeid = r->res_nodeid;
2761
2762         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2763         if (error)
2764                 goto fail;
2765
2766         send_args(r, lkb, ms);
2767
2768         error = send_message(mh, ms);
2769         if (error)
2770                 goto fail;
2771         return 0;
2772
2773  fail:
2774         remove_from_waiters(lkb, msg_reply_type(mstype));
2775         return error;
2776 }
2777
2778 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2779 {
2780         return send_common(r, lkb, DLM_MSG_REQUEST);
2781 }
2782
2783 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2784 {
2785         int error;
2786
2787         error = send_common(r, lkb, DLM_MSG_CONVERT);
2788
2789         /* down conversions go without a reply from the master */
2790         if (!error && down_conversion(lkb)) {
2791                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2792                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2793                 r->res_ls->ls_stub_ms.m_result = 0;
2794                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2795                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2796         }
2797
2798         return error;
2799 }
2800
2801 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2802    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2803    that the master is still correct. */
2804
2805 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2806 {
2807         return send_common(r, lkb, DLM_MSG_UNLOCK);
2808 }
2809
2810 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2811 {
2812         return send_common(r, lkb, DLM_MSG_CANCEL);
2813 }
2814
2815 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2816 {
2817         struct dlm_message *ms;
2818         struct dlm_mhandle *mh;
2819         int to_nodeid, error;
2820
2821         to_nodeid = lkb->lkb_nodeid;
2822
2823         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2824         if (error)
2825                 goto out;
2826
2827         send_args(r, lkb, ms);
2828
2829         ms->m_result = 0;
2830
2831         error = send_message(mh, ms);
2832  out:
2833         return error;
2834 }
2835
2836 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2837 {
2838         struct dlm_message *ms;
2839         struct dlm_mhandle *mh;
2840         int to_nodeid, error;
2841
2842         to_nodeid = lkb->lkb_nodeid;
2843
2844         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2845         if (error)
2846                 goto out;
2847
2848         send_args(r, lkb, ms);
2849
2850         ms->m_bastmode = mode;
2851
2852         error = send_message(mh, ms);
2853  out:
2854         return error;
2855 }
2856
2857 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2858 {
2859         struct dlm_message *ms;
2860         struct dlm_mhandle *mh;
2861         int to_nodeid, error;
2862
2863         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2864         if (error)
2865                 return error;
2866
2867         to_nodeid = dlm_dir_nodeid(r);
2868
2869         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2870         if (error)
2871                 goto fail;
2872
2873         send_args(r, lkb, ms);
2874
2875         error = send_message(mh, ms);
2876         if (error)
2877                 goto fail;
2878         return 0;
2879
2880  fail:
2881         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2882         return error;
2883 }
2884
2885 static int send_remove(struct dlm_rsb *r)
2886 {
2887         struct dlm_message *ms;
2888         struct dlm_mhandle *mh;
2889         int to_nodeid, error;
2890
2891         to_nodeid = dlm_dir_nodeid(r);
2892
2893         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2894         if (error)
2895                 goto out;
2896
2897         memcpy(ms->m_extra, r->res_name, r->res_length);
2898         ms->m_hash = r->res_hash;
2899
2900         error = send_message(mh, ms);
2901  out:
2902         return error;
2903 }
2904
2905 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2906                              int mstype, int rv)
2907 {
2908         struct dlm_message *ms;
2909         struct dlm_mhandle *mh;
2910         int to_nodeid, error;
2911
2912         to_nodeid = lkb->lkb_nodeid;
2913
2914         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2915         if (error)
2916                 goto out;
2917
2918         send_args(r, lkb, ms);
2919
2920         ms->m_result = rv;
2921
2922         error = send_message(mh, ms);
2923  out:
2924         return error;
2925 }
2926
2927 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2928 {
2929         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2930 }
2931
2932 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2933 {
2934         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2935 }
2936
2937 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2938 {
2939         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2940 }
2941
2942 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2943 {
2944         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2945 }
2946
2947 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2948                              int ret_nodeid, int rv)
2949 {
2950         struct dlm_rsb *r = &ls->ls_stub_rsb;
2951         struct dlm_message *ms;
2952         struct dlm_mhandle *mh;
2953         int error, nodeid = ms_in->m_header.h_nodeid;
2954
2955         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2956         if (error)
2957                 goto out;
2958
2959         ms->m_lkid = ms_in->m_lkid;
2960         ms->m_result = rv;
2961         ms->m_nodeid = ret_nodeid;
2962
2963         error = send_message(mh, ms);
2964  out:
2965         return error;
2966 }
2967
2968 /* which args we save from a received message depends heavily on the type
2969    of message, unlike the send side where we can safely send everything about
2970    the lkb for any type of message */
2971
2972 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2973 {
2974         lkb->lkb_exflags = ms->m_exflags;
2975         lkb->lkb_sbflags = ms->m_sbflags;
2976         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2977                          (ms->m_flags & 0x0000FFFF);
2978 }
2979
2980 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2981 {
2982         lkb->lkb_sbflags = ms->m_sbflags;
2983         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2984                          (ms->m_flags & 0x0000FFFF);
2985 }
2986
2987 static int receive_extralen(struct dlm_message *ms)
2988 {
2989         return (ms->m_header.h_length - sizeof(struct dlm_message));
2990 }
2991
2992 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2993                        struct dlm_message *ms)
2994 {
2995         int len;
2996
2997         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2998                 if (!lkb->lkb_lvbptr)
2999                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3000                 if (!lkb->lkb_lvbptr)
3001                         return -ENOMEM;
3002                 len = receive_extralen(ms);
3003                 if (len > DLM_RESNAME_MAXLEN)
3004                         len = DLM_RESNAME_MAXLEN;
3005                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3006         }
3007         return 0;
3008 }
3009
3010 static void fake_bastfn(void *astparam, int mode)
3011 {
3012         log_print("fake_bastfn should not be called");
3013 }
3014
3015 static void fake_astfn(void *astparam)
3016 {
3017         log_print("fake_astfn should not be called");
3018 }
3019
3020 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3021                                 struct dlm_message *ms)
3022 {
3023         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3024         lkb->lkb_ownpid = ms->m_pid;
3025         lkb->lkb_remid = ms->m_lkid;
3026         lkb->lkb_grmode = DLM_LOCK_IV;
3027         lkb->lkb_rqmode = ms->m_rqmode;
3028
3029         lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3030         lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3031
3032         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3033                 /* lkb was just created so there won't be an lvb yet */
3034                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3035                 if (!lkb->lkb_lvbptr)
3036                         return -ENOMEM;
3037         }
3038
3039         return 0;
3040 }
3041
3042 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3043                                 struct dlm_message *ms)
3044 {
3045         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3046                 return -EBUSY;
3047
3048         if (receive_lvb(ls, lkb, ms))
3049                 return -ENOMEM;
3050
3051         lkb->lkb_rqmode = ms->m_rqmode;
3052         lkb->lkb_lvbseq = ms->m_lvbseq;
3053
3054         return 0;
3055 }
3056
3057 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3058                                struct dlm_message *ms)
3059 {
3060         if (receive_lvb(ls, lkb, ms))
3061                 return -ENOMEM;
3062         return 0;
3063 }
3064
3065 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3066    uses to send a reply and that the remote end uses to process the reply. */
3067
3068 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3069 {
3070         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3071         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3072         lkb->lkb_remid = ms->m_lkid;
3073 }
3074
3075 /* This is called after the rsb is locked so that we can safely inspect
3076    fields in the lkb. */
3077
3078 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3079 {
3080         int from = ms->m_header.h_nodeid;
3081         int error = 0;
3082
3083         switch (ms->m_type) {
3084         case DLM_MSG_CONVERT:
3085         case DLM_MSG_UNLOCK:
3086         case DLM_MSG_CANCEL:
3087                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3088                         error = -EINVAL;
3089                 break;
3090
3091         case DLM_MSG_CONVERT_REPLY:
3092         case DLM_MSG_UNLOCK_REPLY:
3093         case DLM_MSG_CANCEL_REPLY:
3094         case DLM_MSG_GRANT:
3095         case DLM_MSG_BAST:
3096                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3097                         error = -EINVAL;
3098                 break;
3099
3100         case DLM_MSG_REQUEST_REPLY:
3101                 if (!is_process_copy(lkb))
3102                         error = -EINVAL;
3103                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3104                         error = -EINVAL;
3105                 break;
3106
3107         default:
3108                 error = -EINVAL;
3109         }
3110
3111         if (error)
3112                 log_error(lkb->lkb_resource->res_ls,
3113                           "ignore invalid message %d from %d %x %x %x %d",
3114                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3115                           lkb->lkb_flags, lkb->lkb_nodeid);
3116         return error;
3117 }
3118
3119 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3120 {
3121         struct dlm_lkb *lkb;
3122         struct dlm_rsb *r;
3123         int error, namelen;
3124
3125         error = create_lkb(ls, &lkb);
3126         if (error)
3127                 goto fail;
3128
3129         receive_flags(lkb, ms);
3130         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3131         error = receive_request_args(ls, lkb, ms);
3132         if (error) {
3133                 __put_lkb(ls, lkb);
3134                 goto fail;
3135         }
3136
3137         namelen = receive_extralen(ms);
3138
3139         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3140         if (error) {
3141                 __put_lkb(ls, lkb);
3142                 goto fail;
3143         }
3144
3145         lock_rsb(r);
3146
3147         attach_lkb(r, lkb);
3148         error = do_request(r, lkb);
3149         send_request_reply(r, lkb, error);
3150
3151         unlock_rsb(r);
3152         put_rsb(r);
3153
3154         if (error == -EINPROGRESS)
3155                 error = 0;
3156         if (error)
3157                 dlm_put_lkb(lkb);
3158         return;
3159
3160  fail:
3161         setup_stub_lkb(ls, ms);
3162         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3163 }
3164
3165 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3166 {
3167         struct dlm_lkb *lkb;
3168         struct dlm_rsb *r;
3169         int error, reply = 1;
3170
3171         error = find_lkb(ls, ms->m_remid, &lkb);
3172         if (error)
3173                 goto fail;
3174
3175         r = lkb->lkb_resource;
3176
3177         hold_rsb(r);
3178         lock_rsb(r);
3179
3180         error = validate_message(lkb, ms);
3181         if (error)
3182                 goto out;
3183
3184         receive_flags(lkb, ms);
3185         error = receive_convert_args(ls, lkb, ms);
3186         if (error)
3187                 goto out_reply;
3188         reply = !down_conversion(lkb);
3189
3190         error = do_convert(r, lkb);
3191  out_reply:
3192         if (reply)
3193                 send_convert_reply(r, lkb, error);
3194  out:
3195         unlock_rsb(r);
3196         put_rsb(r);
3197         dlm_put_lkb(lkb);
3198         return;
3199
3200  fail:
3201         setup_stub_lkb(ls, ms);
3202         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3203 }
3204
3205 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3206 {
3207         struct dlm_lkb *lkb;
3208         struct dlm_rsb *r;
3209         int error;
3210
3211         error = find_lkb(ls, ms->m_remid, &lkb);
3212         if (error)
3213                 goto fail;
3214
3215         r = lkb->lkb_resource;
3216
3217         hold_rsb(r);
3218         lock_rsb(r);
3219
3220         error = validate_message(lkb, ms);
3221         if (error)
3222                 goto out;
3223
3224         receive_flags(lkb, ms);
3225         error = receive_unlock_args(ls, lkb, ms);
3226         if (error)
3227                 goto out_reply;
3228
3229         error = do_unlock(r, lkb);
3230  out_reply:
3231         send_unlock_reply(r, lkb, error);
3232  out:
3233         unlock_rsb(r);
3234         put_rsb(r);
3235         dlm_put_lkb(lkb);
3236         return;
3237
3238  fail:
3239         setup_stub_lkb(ls, ms);
3240         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3241 }
3242
3243 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3244 {
3245         struct dlm_lkb *lkb;
3246         struct dlm_rsb *r;
3247         int error;
3248
3249         error = find_lkb(ls, ms->m_remid, &lkb);
3250         if (error)
3251                 goto fail;
3252
3253         receive_flags(lkb, ms);
3254
3255         r = lkb->lkb_resource;
3256
3257         hold_rsb(r);
3258         lock_rsb(r);
3259
3260         error = validate_message(lkb, ms);
3261         if (error)
3262                 goto out;
3263
3264         error = do_cancel(r, lkb);
3265         send_cancel_reply(r, lkb, error);
3266  out:
3267         unlock_rsb(r);
3268         put_rsb(r);
3269         dlm_put_lkb(lkb);
3270         return;
3271
3272  fail:
3273         setup_stub_lkb(ls, ms);
3274         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3275 }
3276
3277 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3278 {
3279         struct dlm_lkb *lkb;
3280         struct dlm_rsb *r;
3281         int error;
3282
3283         error = find_lkb(ls, ms->m_remid, &lkb);
3284         if (error) {
3285                 log_debug(ls, "receive_grant from %d no lkb %x",
3286                           ms->m_header.h_nodeid, ms->m_remid);
3287                 return;
3288         }
3289
3290         r = lkb->lkb_resource;
3291
3292         hold_rsb(r);
3293         lock_rsb(r);
3294
3295         error = validate_message(lkb, ms);
3296         if (error)
3297                 goto out;
3298
3299         receive_flags_reply(lkb, ms);
3300         if (is_altmode(lkb))
3301                 munge_altmode(lkb, ms);
3302         grant_lock_pc(r, lkb, ms);
3303         queue_cast(r, lkb, 0);
3304  out:
3305         unlock_rsb(r);
3306         put_rsb(r);
3307         dlm_put_lkb(lkb);
3308 }
3309
3310 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3311 {
3312         struct dlm_lkb *lkb;
3313         struct dlm_rsb *r;
3314         int error;
3315
3316         error = find_lkb(ls, ms->m_remid, &lkb);
3317         if (error) {
3318                 log_debug(ls, "receive_bast from %d no lkb %x",
3319                           ms->m_header.h_nodeid, ms->m_remid);
3320                 return;
3321         }
3322
3323         r = lkb->lkb_resource;
3324
3325         hold_rsb(r);
3326         lock_rsb(r);
3327
3328         error = validate_message(lkb, ms);
3329         if (error)
3330                 goto out;
3331
3332         queue_bast(r, lkb, ms->m_bastmode);
3333  out:
3334         unlock_rsb(r);
3335         put_rsb(r);
3336         dlm_put_lkb(lkb);
3337 }
3338
3339 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3340 {
3341         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3342
3343         from_nodeid = ms->m_header.h_nodeid;
3344         our_nodeid = dlm_our_nodeid();
3345
3346         len = receive_extralen(ms);
3347
3348         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3349         if (dir_nodeid != our_nodeid) {
3350                 log_error(ls, "lookup dir_nodeid %d from %d",
3351                           dir_nodeid, from_nodeid);
3352                 error = -EINVAL;
3353                 ret_nodeid = -1;
3354                 goto out;
3355         }
3356
3357         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3358
3359         /* Optimization: we're master so treat lookup as a request */
3360         if (!error && ret_nodeid == our_nodeid) {
3361                 receive_request(ls, ms);
3362                 return;
3363         }
3364  out:
3365         send_lookup_reply(ls, ms, ret_nodeid, error);
3366 }
3367
3368 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3369 {
3370         int len, dir_nodeid, from_nodeid;
3371
3372         from_nodeid = ms->m_header.h_nodeid;
3373
3374         len = receive_extralen(ms);
3375
3376         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3377         if (dir_nodeid != dlm_our_nodeid()) {
3378                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3379                           dir_nodeid, from_nodeid);
3380                 return;
3381         }
3382
3383         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3384 }
3385
3386 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3387 {
3388         do_purge(ls, ms->m_nodeid, ms->m_pid);
3389 }
3390
3391 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3392 {
3393         struct dlm_lkb *lkb;
3394         struct dlm_rsb *r;
3395         int error, mstype, result;
3396
3397         error = find_lkb(ls, ms->m_remid, &lkb);
3398         if (error) {
3399                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3400                           ms->m_header.h_nodeid, ms->m_remid);
3401                 return;
3402         }
3403
3404         r = lkb->lkb_resource;
3405         hold_rsb(r);
3406         lock_rsb(r);
3407
3408         error = validate_message(lkb, ms);
3409         if (error)
3410                 goto out;
3411
3412         mstype = lkb->lkb_wait_type;
3413         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3414         if (error)
3415                 goto out;
3416
3417         /* Optimization: the dir node was also the master, so it took our
3418            lookup as a request and sent request reply instead of lookup reply */
3419         if (mstype == DLM_MSG_LOOKUP) {
3420                 r->res_nodeid = ms->m_header.h_nodeid;
3421                 lkb->lkb_nodeid = r->res_nodeid;
3422         }
3423
3424         /* this is the value returned from do_request() on the master */
3425         result = ms->m_result;
3426
3427         switch (result) {
3428         case -EAGAIN:
3429                 /* request would block (be queued) on remote master */
3430                 queue_cast(r, lkb, -EAGAIN);
3431                 confirm_master(r, -EAGAIN);
3432                 unhold_lkb(lkb); /* undoes create_lkb() */
3433                 break;
3434
3435         case -EINPROGRESS:
3436         case 0:
3437                 /* request was queued or granted on remote master */
3438                 receive_flags_reply(lkb, ms);
3439                 lkb->lkb_remid = ms->m_lkid;
3440                 if (is_altmode(lkb))
3441                         munge_altmode(lkb, ms);
3442                 if (result) {
3443                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3444                         add_timeout(lkb);
3445                 } else {
3446                         grant_lock_pc(r, lkb, ms);
3447                         queue_cast(r, lkb, 0);
3448                 }
3449                 confirm_master(r, result);
3450                 break;
3451
3452         case -EBADR:
3453         case -ENOTBLK:
3454                 /* find_rsb failed to find rsb or rsb wasn't master */
3455                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3456                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3457                 r->res_nodeid = -1;
3458                 lkb->lkb_nodeid = -1;
3459
3460                 if (is_overlap(lkb)) {
3461                         /* we'll ignore error in cancel/unlock reply */
3462                         queue_cast_overlap(r, lkb);
3463                         confirm_master(r, result);
3464                         unhold_lkb(lkb); /* undoes create_lkb() */
3465                 } else
3466                         _request_lock(r, lkb);
3467                 break;
3468
3469         default:
3470                 log_error(ls, "receive_request_reply %x error %d",
3471                           lkb->lkb_id, result);
3472         }
3473
3474         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3475                 log_debug(ls, "receive_request_reply %x result %d unlock",
3476                           lkb->lkb_id, result);
3477                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3478                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3479                 send_unlock(r, lkb);
3480         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3481                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3482                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3483                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3484                 send_cancel(r, lkb);
3485         } else {
3486                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3487                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3488         }
3489  out:
3490         unlock_rsb(r);
3491         put_rsb(r);
3492         dlm_put_lkb(lkb);
3493 }
3494
3495 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3496                                     struct dlm_message *ms)
3497 {
3498         /* this is the value returned from do_convert() on the master */
3499         switch (ms->m_result) {
3500         case -EAGAIN:
3501                 /* convert would block (be queued) on remote master */
3502                 queue_cast(r, lkb, -EAGAIN);
3503                 break;
3504
3505         case -EDEADLK:
3506                 receive_flags_reply(lkb, ms);
3507                 revert_lock_pc(r, lkb);
3508                 queue_cast(r, lkb, -EDEADLK);
3509                 break;
3510
3511         case -EINPROGRESS:
3512                 /* convert was queued on remote master */
3513                 receive_flags_reply(lkb, ms);
3514                 if (is_demoted(lkb))
3515                         munge_demoted(lkb, ms);
3516                 del_lkb(r, lkb);
3517                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3518                 add_timeout(lkb);
3519                 break;
3520
3521         case 0:
3522                 /* convert was granted on remote master */
3523                 receive_flags_reply(lkb, ms);
3524                 if (is_demoted(lkb))
3525                         munge_demoted(lkb, ms);
3526                 grant_lock_pc(r, lkb, ms);
3527                 queue_cast(r, lkb, 0);
3528                 break;
3529
3530         default:
3531                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3532                           lkb->lkb_id, ms->m_result);
3533         }
3534 }
3535
3536 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3537 {
3538         struct dlm_rsb *r = lkb->lkb_resource;
3539         int error;
3540
3541         hold_rsb(r);
3542         lock_rsb(r);
3543
3544         error = validate_message(lkb, ms);
3545         if (error)
3546                 goto out;
3547
3548         /* stub reply can happen with waiters_mutex held */
3549         error = remove_from_waiters_ms(lkb, ms);
3550         if (error)
3551                 goto out;
3552
3553         __receive_convert_reply(r, lkb, ms);
3554  out:
3555         unlock_rsb(r);
3556         put_rsb(r);
3557 }
3558
3559 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3560 {
3561         struct dlm_lkb *lkb;
3562         int error;
3563
3564         error = find_lkb(ls, ms->m_remid, &lkb);
3565         if (error) {
3566                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3567                           ms->m_header.h_nodeid, ms->m_remid);
3568                 return;
3569         }
3570
3571         _receive_convert_reply(lkb, ms);
3572         dlm_put_lkb(lkb);
3573 }
3574
3575 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3576 {
3577         struct dlm_rsb *r = lkb->lkb_resource;
3578         int error;
3579
3580         hold_rsb(r);
3581         lock_rsb(r);
3582
3583         error = validate_message(lkb, ms);
3584         if (error)
3585                 goto out;
3586
3587         /* stub reply can happen with waiters_mutex held */
3588         error = remove_from_waiters_ms(lkb, ms);
3589         if (error)
3590                 goto out;
3591
3592         /* this is the value returned from do_unlock() on the master */
3593
3594         switch (ms->m_result) {
3595         case -DLM_EUNLOCK:
3596                 receive_flags_reply(lkb, ms);
3597                 remove_lock_pc(r, lkb);
3598                 queue_cast(r, lkb, -DLM_EUNLOCK);
3599                 break;
3600         case -ENOENT:
3601                 break;
3602         default:
3603                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3604                           lkb->lkb_id, ms->m_result);
3605         }
3606  out:
3607         unlock_rsb(r);
3608         put_rsb(r);
3609 }
3610
3611 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3612 {
3613         struct dlm_lkb *lkb;
3614         int error;
3615
3616         error = find_lkb(ls, ms->m_remid, &lkb);
3617         if (error) {
3618                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3619                           ms->m_header.h_nodeid, ms->m_remid);
3620                 return;
3621         }
3622
3623         _receive_unlock_reply(lkb, ms);
3624         dlm_put_lkb(lkb);
3625 }
3626
3627 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3628 {
3629         struct dlm_rsb *r = lkb->lkb_resource;
3630         int error;
3631
3632         hold_rsb(r);
3633         lock_rsb(r);
3634
3635         error = validate_message(lkb, ms);
3636         if (error)
3637                 goto out;
3638
3639         /* stub reply can happen with waiters_mutex held */
3640         error = remove_from_waiters_ms(lkb, ms);
3641         if (error)
3642                 goto out;
3643
3644         /* this is the value returned from do_cancel() on the master */
3645
3646         switch (ms->m_result) {
3647         case -DLM_ECANCEL:
3648                 receive_flags_reply(lkb, ms);
3649                 revert_lock_pc(r, lkb);
3650                 queue_cast(r, lkb, -DLM_ECANCEL);
3651                 break;
3652         case 0:
3653                 break;
3654         default:
3655                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3656                           lkb->lkb_id, ms->m_result);
3657         }
3658  out:
3659         unlock_rsb(r);
3660         put_rsb(r);
3661 }
3662
3663 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3664 {
3665         struct dlm_lkb *lkb;
3666         int error;
3667
3668         error = find_lkb(ls, ms->m_remid, &lkb);
3669         if (error) {
3670                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3671                           ms->m_header.h_nodeid, ms->m_remid);
3672                 return;
3673         }
3674
3675         _receive_cancel_reply(lkb, ms);
3676         dlm_put_lkb(lkb);
3677 }
3678
3679 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3680 {
3681         struct dlm_lkb *lkb;
3682         struct dlm_rsb *r;
3683         int error, ret_nodeid;
3684
3685         error = find_lkb(ls, ms->m_lkid, &lkb);
3686         if (error) {
3687                 log_error(ls, "receive_lookup_reply no lkb");
3688                 return;
3689         }
3690
3691         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3692            FIXME: will a non-zero error ever be returned? */
3693
3694         r = lkb->lkb_resource;
3695         hold_rsb(r);
3696         lock_rsb(r);
3697
3698         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3699         if (error)
3700                 goto out;
3701
3702         ret_nodeid = ms->m_nodeid;
3703         if (ret_nodeid == dlm_our_nodeid()) {
3704                 r->res_nodeid = 0;
3705                 ret_nodeid = 0;
3706                 r->res_first_lkid = 0;
3707         } else {
3708                 /* set_master() will copy res_nodeid to lkb_nodeid */
3709                 r->res_nodeid = ret_nodeid;
3710         }
3711
3712         if (is_overlap(lkb)) {
3713                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3714                           lkb->lkb_id, lkb->lkb_flags);
3715                 queue_cast_overlap(r, lkb);
3716                 unhold_lkb(lkb); /* undoes create_lkb() */
3717                 goto out_list;
3718         }
3719
3720         _request_lock(r, lkb);
3721
3722  out_list:
3723         if (!ret_nodeid)
3724                 process_lookup_list(r);
3725  out:
3726         unlock_rsb(r);
3727         put_rsb(r);
3728         dlm_put_lkb(lkb);
3729 }
3730
3731 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3732 {
3733         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3734                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3735                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3736                           ms->m_remid, ms->m_result);
3737                 return;
3738         }
3739
3740         switch (ms->m_type) {
3741
3742         /* messages sent to a master node */
3743
3744         case DLM_MSG_REQUEST:
3745                 receive_request(ls, ms);
3746                 break;
3747
3748         case DLM_MSG_CONVERT:
3749                 receive_convert(ls, ms);
3750                 break;
3751
3752         case DLM_MSG_UNLOCK:
3753                 receive_unlock(ls, ms);
3754                 break;
3755
3756         case DLM_MSG_CANCEL:
3757                 receive_cancel(ls, ms);
3758                 break;
3759
3760         /* messages sent from a master node (replies to above) */
3761
3762         case DLM_MSG_REQUEST_REPLY:
3763                 receive_request_reply(ls, ms);
3764                 break;
3765
3766         case DLM_MSG_CONVERT_REPLY:
3767                 receive_convert_reply(ls, ms);
3768                 break;
3769
3770         case DLM_MSG_UNLOCK_REPLY:
3771                 receive_unlock_reply(ls, ms);
3772                 break;
3773
3774         case DLM_MSG_CANCEL_REPLY:
3775                 receive_cancel_reply(ls, ms);
3776                 break;
3777
3778         /* messages sent from a master node (only two types of async msg) */
3779
3780         case DLM_MSG_GRANT:
3781                 receive_grant(ls, ms);
3782                 break;
3783
3784         case DLM_MSG_BAST:
3785                 receive_bast(ls, ms);
3786                 break;
3787
3788         /* messages sent to a dir node */
3789
3790         case DLM_MSG_LOOKUP:
3791                 receive_lookup(ls, ms);
3792                 break;
3793
3794         case DLM_MSG_REMOVE:
3795                 receive_remove(ls, ms);
3796                 break;
3797
3798         /* messages sent from a dir node (remove has no reply) */
3799
3800         case DLM_MSG_LOOKUP_REPLY:
3801                 receive_lookup_reply(ls, ms);
3802                 break;
3803
3804         /* other messages */
3805
3806         case DLM_MSG_PURGE:
3807                 receive_purge(ls, ms);
3808                 break;
3809
3810         default:
3811                 log_error(ls, "unknown message type %d", ms->m_type);
3812         }
3813
3814         dlm_astd_wake();
3815 }
3816
3817 /* If the lockspace is in recovery mode (locking stopped), then normal
3818    messages are saved on the requestqueue for processing after recovery is
3819    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3820    messages off the requestqueue before we process new ones. This occurs right
3821    after recovery completes when we transition from saving all messages on
3822    requestqueue, to processing all the saved messages, to processing new
3823    messages as they arrive. */
3824
3825 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3826                                 int nodeid)
3827 {
3828         if (dlm_locking_stopped(ls)) {
3829                 dlm_add_requestqueue(ls, nodeid, ms);
3830         } else {
3831                 dlm_wait_requestqueue(ls);
3832                 _receive_message(ls, ms);
3833         }
3834 }
3835
3836 /* This is called by dlm_recoverd to process messages that were saved on
3837    the requestqueue. */
3838
3839 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3840 {
3841         _receive_message(ls, ms);
3842 }
3843
3844 /* This is called by the midcomms layer when something is received for
3845    the lockspace.  It could be either a MSG (normal message sent as part of
3846    standard locking activity) or an RCOM (recovery message sent as part of
3847    lockspace recovery). */
3848
3849 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3850 {
3851         struct dlm_header *hd = &p->header;
3852         struct dlm_ls *ls;
3853         int type = 0;
3854
3855         switch (hd->h_cmd) {
3856         case DLM_MSG:
3857                 dlm_message_in(&p->message);
3858                 type = p->message.m_type;
3859                 break;
3860         case DLM_RCOM:
3861                 dlm_rcom_in(&p->rcom);
3862                 type = p->rcom.rc_type;
3863                 break;
3864         default:
3865                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3866                 return;
3867         }
3868
3869         if (hd->h_nodeid != nodeid) {
3870                 log_print("invalid h_nodeid %d from %d lockspace %x",
3871                           hd->h_nodeid, nodeid, hd->h_lockspace);
3872                 return;
3873         }
3874
3875         ls = dlm_find_lockspace_global(hd->h_lockspace);
3876         if (!ls) {
3877                 if (dlm_config.ci_log_debug)
3878                         log_print("invalid lockspace %x from %d cmd %d type %d",
3879                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
3880
3881                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3882                         dlm_send_ls_not_ready(nodeid, &p->rcom);
3883                 return;
3884         }
3885
3886         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3887            be inactive (in this ls) before transitioning to recovery mode */
3888
3889         down_read(&ls->ls_recv_active);
3890         if (hd->h_cmd == DLM_MSG)
3891                 dlm_receive_message(ls, &p->message, nodeid);
3892         else
3893                 dlm_receive_rcom(ls, &p->rcom, nodeid);
3894         up_read(&ls->ls_recv_active);
3895
3896         dlm_put_lockspace(ls);
3897 }
3898
3899 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3900 {
3901         if (middle_conversion(lkb)) {
3902                 hold_lkb(lkb);
3903                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3904                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3905                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3906                 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3907                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3908
3909                 /* Same special case as in receive_rcom_lock_args() */
3910                 lkb->lkb_grmode = DLM_LOCK_IV;
3911                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3912                 unhold_lkb(lkb);
3913
3914         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3915                 lkb->lkb_flags |= DLM_IFL_RESEND;
3916         }
3917
3918         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3919            conversions are async; there's no reply from the remote master */
3920 }
3921
3922 /* A waiting lkb needs recovery if the master node has failed, or
3923    the master node is changing (only when no directory is used) */
3924
3925 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3926 {
3927         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3928                 return 1;
3929
3930         if (!dlm_no_directory(ls))
3931                 return 0;
3932
3933         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3934                 return 1;
3935
3936         return 0;
3937 }
3938
3939 /* Recovery for locks that are waiting for replies from nodes that are now
3940    gone.  We can just complete unlocks and cancels by faking a reply from the
3941    dead node.  Requests and up-conversions we flag to be resent after
3942    recovery.  Down-conversions can just be completed with a fake reply like
3943    unlocks.  Conversions between PR and CW need special attention. */
3944
3945 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3946 {
3947         struct dlm_lkb *lkb, *safe;
3948         int wait_type, stub_unlock_result, stub_cancel_result;
3949
3950         mutex_lock(&ls->ls_waiters_mutex);
3951
3952         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3953                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3954                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3955
3956                 /* all outstanding lookups, regardless of destination  will be
3957                    resent after recovery is done */
3958
3959                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3960                         lkb->lkb_flags |= DLM_IFL_RESEND;
3961                         continue;
3962                 }
3963
3964                 if (!waiter_needs_recovery(ls, lkb))
3965                         continue;
3966
3967                 wait_type = lkb->lkb_wait_type;
3968                 stub_unlock_result = -DLM_EUNLOCK;
3969                 stub_cancel_result = -DLM_ECANCEL;
3970
3971                 /* Main reply may have been received leaving a zero wait_type,
3972                    but a reply for the overlapping op may not have been
3973                    received.  In that case we need to fake the appropriate
3974                    reply for the overlap op. */
3975
3976                 if (!wait_type) {
3977                         if (is_overlap_cancel(lkb)) {
3978                                 wait_type = DLM_MSG_CANCEL;
3979                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
3980                                         stub_cancel_result = 0;
3981                         }
3982                         if (is_overlap_unlock(lkb)) {
3983                                 wait_type = DLM_MSG_UNLOCK;
3984                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
3985                                         stub_unlock_result = -ENOENT;
3986                         }
3987
3988                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
3989                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
3990                                   stub_cancel_result, stub_unlock_result);
3991                 }
3992
3993                 switch (wait_type) {
3994
3995                 case DLM_MSG_REQUEST:
3996                         lkb->lkb_flags |= DLM_IFL_RESEND;
3997                         break;
3998
3999                 case DLM_MSG_CONVERT:
4000                         recover_convert_waiter(ls, lkb);
4001                         break;
4002
4003                 case DLM_MSG_UNLOCK:
4004                         hold_lkb(lkb);
4005                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4006                         ls->ls_stub_ms.m_result = stub_unlock_result;
4007                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4008                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4009                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4010                         dlm_put_lkb(lkb);
4011                         break;
4012
4013                 case DLM_MSG_CANCEL:
4014                         hold_lkb(lkb);
4015                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4016                         ls->ls_stub_ms.m_result = stub_cancel_result;
4017                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4018                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4019                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4020                         dlm_put_lkb(lkb);
4021                         break;
4022
4023                 default:
4024                         log_error(ls, "invalid lkb wait_type %d %d",
4025                                   lkb->lkb_wait_type, wait_type);
4026                 }
4027                 schedule();
4028         }
4029         mutex_unlock(&ls->ls_waiters_mutex);
4030 }
4031
4032 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4033 {
4034         struct dlm_lkb *lkb;
4035         int found = 0;
4036
4037         mutex_lock(&ls->ls_waiters_mutex);
4038         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4039                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4040                         hold_lkb(lkb);
4041                         found = 1;
4042                         break;
4043                 }
4044         }
4045         mutex_unlock(&ls->ls_waiters_mutex);
4046
4047         if (!found)
4048                 lkb = NULL;
4049         return lkb;
4050 }
4051
4052 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4053    master or dir-node for r.  Processing the lkb may result in it being placed
4054    back on waiters. */
4055
4056 /* We do this after normal locking has been enabled and any saved messages
4057    (in requestqueue) have been processed.  We should be confident that at
4058    this point we won't get or process a reply to any of these waiting
4059    operations.  But, new ops may be coming in on the rsbs/locks here from
4060    userspace or remotely. */
4061
4062 /* there may have been an overlap unlock/cancel prior to recovery or after
4063    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4064    overlap flag would just have been set and nothing new sent.  we can be
4065    confident here than any replies to either the initial op or overlap ops
4066    prior to recovery have been received. */
4067
4068 int dlm_recover_waiters_post(struct dlm_ls *ls)
4069 {
4070         struct dlm_lkb *lkb;
4071         struct dlm_rsb *r;
4072         int error = 0, mstype, err, oc, ou;
4073
4074         while (1) {
4075                 if (dlm_locking_stopped(ls)) {
4076                         log_debug(ls, "recover_waiters_post aborted");
4077                         error = -EINTR;
4078                         break;
4079                 }
4080
4081                 lkb = find_resend_waiter(ls);
4082                 if (!lkb)
4083                         break;
4084
4085                 r = lkb->lkb_resource;
4086                 hold_rsb(r);
4087                 lock_rsb(r);
4088
4089                 mstype = lkb->lkb_wait_type;
4090                 oc = is_overlap_cancel(lkb);
4091                 ou = is_overlap_unlock(lkb);
4092                 err = 0;
4093
4094                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4095                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4096
4097                 /* At this point we assume that we won't get a reply to any
4098                    previous op or overlap op on this lock.  First, do a big
4099                    remove_from_waiters() for all previous ops. */
4100
4101                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4102                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4103                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4104                 lkb->lkb_wait_type = 0;
4105                 lkb->lkb_wait_count = 0;
4106                 mutex_lock(&ls->ls_waiters_mutex);
4107                 list_del_init(&lkb->lkb_wait_reply);
4108                 mutex_unlock(&ls->ls_waiters_mutex);
4109                 unhold_lkb(lkb); /* for waiters list */
4110
4111                 if (oc || ou) {
4112                         /* do an unlock or cancel instead of resending */
4113                         switch (mstype) {
4114                         case DLM_MSG_LOOKUP:
4115                         case DLM_MSG_REQUEST:
4116                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4117                                                         -DLM_ECANCEL);
4118                                 unhold_lkb(lkb); /* undoes create_lkb() */
4119                                 break;
4120                         case DLM_MSG_CONVERT:
4121                                 if (oc) {
4122                                         queue_cast(r, lkb, -DLM_ECANCEL);
4123                                 } else {
4124                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4125                                         _unlock_lock(r, lkb);
4126                                 }
4127                                 break;
4128                         default:
4129                                 err = 1;
4130                         }
4131                 } else {
4132                         switch (mstype) {
4133                         case DLM_MSG_LOOKUP:
4134                         case DLM_MSG_REQUEST:
4135                                 _request_lock(r, lkb);
4136                                 if (is_master(r))
4137                                         confirm_master(r, 0);
4138                                 break;
4139                         case DLM_MSG_CONVERT:
4140                                 _convert_lock(r, lkb);
4141                                 break;
4142                         default:
4143                                 err = 1;
4144                         }
4145                 }
4146
4147                 if (err)
4148                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4149                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4150                 unlock_rsb(r);
4151                 put_rsb(r);
4152                 dlm_put_lkb(lkb);
4153         }
4154
4155         return error;
4156 }
4157
4158 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4159                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4160 {
4161         struct dlm_ls *ls = r->res_ls;
4162         struct dlm_lkb *lkb, *safe;
4163
4164         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4165                 if (test(ls, lkb)) {
4166                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4167                         del_lkb(r, lkb);
4168                         /* this put should free the lkb */
4169                         if (!dlm_put_lkb(lkb))
4170                                 log_error(ls, "purged lkb not released");
4171                 }
4172         }
4173 }
4174
4175 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4176 {
4177         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4178 }
4179
4180 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4181 {
4182         return is_master_copy(lkb);
4183 }
4184
4185 static void purge_dead_locks(struct dlm_rsb *r)
4186 {
4187         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4188         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4189         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4190 }
4191
4192 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4193 {
4194         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4195         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4196         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4197 }
4198
4199 /* Get rid of locks held by nodes that are gone. */
4200
4201 int dlm_purge_locks(struct dlm_ls *ls)
4202 {
4203         struct dlm_rsb *r;
4204
4205         log_debug(ls, "dlm_purge_locks");
4206
4207         down_write(&ls->ls_root_sem);
4208         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4209                 hold_rsb(r);
4210                 lock_rsb(r);
4211                 if (is_master(r))
4212                         purge_dead_locks(r);
4213                 unlock_rsb(r);
4214                 unhold_rsb(r);
4215
4216                 schedule();
4217         }
4218         up_write(&ls->ls_root_sem);
4219
4220         return 0;
4221 }
4222
4223 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4224 {
4225         struct dlm_rsb *r, *r_ret = NULL;
4226
4227         read_lock(&ls->ls_rsbtbl[bucket].lock);
4228         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4229                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4230                         continue;
4231                 hold_rsb(r);
4232                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4233                 r_ret = r;
4234                 break;
4235         }
4236         read_unlock(&ls->ls_rsbtbl[bucket].lock);
4237         return r_ret;
4238 }
4239
4240 void dlm_grant_after_purge(struct dlm_ls *ls)
4241 {
4242         struct dlm_rsb *r;
4243         int bucket = 0;
4244
4245         while (1) {
4246                 r = find_purged_rsb(ls, bucket);
4247                 if (!r) {
4248                         if (bucket == ls->ls_rsbtbl_size - 1)
4249                                 break;
4250                         bucket++;
4251                         continue;
4252                 }
4253                 lock_rsb(r);
4254                 if (is_master(r)) {
4255                         grant_pending_locks(r);
4256                         confirm_master(r, 0);
4257                 }
4258                 unlock_rsb(r);
4259                 put_rsb(r);
4260                 schedule();
4261         }
4262 }
4263
4264 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4265                                          uint32_t remid)
4266 {
4267         struct dlm_lkb *lkb;
4268
4269         list_for_each_entry(lkb, head, lkb_statequeue) {
4270                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4271                         return lkb;
4272         }
4273         return NULL;
4274 }
4275
4276 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4277                                     uint32_t remid)
4278 {
4279         struct dlm_lkb *lkb;
4280
4281         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4282         if (lkb)
4283                 return lkb;
4284         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4285         if (lkb)
4286                 return lkb;
4287         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4288         if (lkb)
4289                 return lkb;
4290         return NULL;
4291 }
4292
4293 /* needs at least dlm_rcom + rcom_lock */
4294 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4295                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4296 {
4297         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4298
4299         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4300         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4301         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4302         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4303         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4304         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4305         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4306         lkb->lkb_rqmode = rl->rl_rqmode;
4307         lkb->lkb_grmode = rl->rl_grmode;
4308         /* don't set lkb_status because add_lkb wants to itself */
4309
4310         lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4311         lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4312
4313         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4314                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4315                          sizeof(struct rcom_lock);
4316                 if (lvblen > ls->ls_lvblen)
4317                         return -EINVAL;
4318                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4319                 if (!lkb->lkb_lvbptr)
4320                         return -ENOMEM;
4321                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4322         }
4323
4324         /* Conversions between PR and CW (middle modes) need special handling.
4325            The real granted mode of these converting locks cannot be determined
4326            until all locks have been rebuilt on the rsb (recover_conversion) */
4327
4328         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4329             middle_conversion(lkb)) {
4330                 rl->rl_status = DLM_LKSTS_CONVERT;
4331                 lkb->lkb_grmode = DLM_LOCK_IV;
4332                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4333         }
4334
4335         return 0;
4336 }
4337
4338 /* This lkb may have been recovered in a previous aborted recovery so we need
4339    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4340    If so we just send back a standard reply.  If not, we create a new lkb with
4341    the given values and send back our lkid.  We send back our lkid by sending
4342    back the rcom_lock struct we got but with the remid field filled in. */
4343
4344 /* needs at least dlm_rcom + rcom_lock */
4345 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4346 {
4347         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4348         struct dlm_rsb *r;
4349         struct dlm_lkb *lkb;
4350         int error;
4351
4352         if (rl->rl_parent_lkid) {
4353                 error = -EOPNOTSUPP;
4354                 goto out;
4355         }
4356
4357         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4358                          R_MASTER, &r);
4359         if (error)
4360                 goto out;
4361
4362         lock_rsb(r);
4363
4364         lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4365         if (lkb) {
4366                 error = -EEXIST;
4367                 goto out_remid;
4368         }
4369
4370         error = create_lkb(ls, &lkb);
4371         if (error)
4372                 goto out_unlock;
4373
4374         error = receive_rcom_lock_args(ls, lkb, r, rc);
4375         if (error) {
4376                 __put_lkb(ls, lkb);
4377                 goto out_unlock;
4378         }
4379
4380         attach_lkb(r, lkb);
4381         add_lkb(r, lkb, rl->rl_status);
4382         error = 0;
4383
4384  out_remid:
4385         /* this is the new value returned to the lock holder for
4386            saving in its process-copy lkb */
4387         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4388
4389  out_unlock:
4390         unlock_rsb(r);
4391         put_rsb(r);
4392  out:
4393         if (error)
4394                 log_debug(ls, "recover_master_copy %d %x", error,
4395                           le32_to_cpu(rl->rl_lkid));
4396         rl->rl_result = cpu_to_le32(error);
4397         return error;
4398 }
4399
4400 /* needs at least dlm_rcom + rcom_lock */
4401 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4402 {
4403         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4404         struct dlm_rsb *r;
4405         struct dlm_lkb *lkb;
4406         int error;
4407
4408         error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4409         if (error) {
4410                 log_error(ls, "recover_process_copy no lkid %x",
4411                                 le32_to_cpu(rl->rl_lkid));
4412                 return error;
4413         }
4414
4415         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4416
4417         error = le32_to_cpu(rl->rl_result);
4418
4419         r = lkb->lkb_resource;
4420         hold_rsb(r);
4421         lock_rsb(r);
4422
4423         switch (error) {
4424         case -EBADR:
4425                 /* There's a chance the new master received our lock before
4426                    dlm_recover_master_reply(), this wouldn't happen if we did
4427                    a barrier between recover_masters and recover_locks. */
4428                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4429                           (unsigned long)r, r->res_name);
4430                 dlm_send_rcom_lock(r, lkb);
4431                 goto out;
4432         case -EEXIST:
4433                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4434                 /* fall through */
4435         case 0:
4436                 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4437                 break;
4438         default:
4439                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4440                           error, lkb->lkb_id);
4441         }
4442
4443         /* an ack for dlm_recover_locks() which waits for replies from
4444            all the locks it sends to new masters */
4445         dlm_recovered_lock(r);
4446  out:
4447         unlock_rsb(r);
4448         put_rsb(r);
4449         dlm_put_lkb(lkb);
4450
4451         return 0;
4452 }
4453
4454 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4455                      int mode, uint32_t flags, void *name, unsigned int namelen,
4456                      unsigned long timeout_cs)
4457 {
4458         struct dlm_lkb *lkb;
4459         struct dlm_args args;
4460         int error;
4461
4462         dlm_lock_recovery(ls);
4463
4464         error = create_lkb(ls, &lkb);
4465         if (error) {
4466                 kfree(ua);
4467                 goto out;
4468         }
4469
4470         if (flags & DLM_LKF_VALBLK) {
4471                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4472                 if (!ua->lksb.sb_lvbptr) {
4473                         kfree(ua);
4474                         __put_lkb(ls, lkb);
4475                         error = -ENOMEM;
4476                         goto out;
4477                 }
4478         }
4479
4480         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4481            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4482            lock and that lkb_astparam is the dlm_user_args structure. */
4483
4484         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4485                               fake_astfn, ua, fake_bastfn, &args);
4486         lkb->lkb_flags |= DLM_IFL_USER;
4487         ua->old_mode = DLM_LOCK_IV;
4488
4489         if (error) {
4490                 __put_lkb(ls, lkb);
4491                 goto out;
4492         }
4493
4494         error = request_lock(ls, lkb, name, namelen, &args);
4495
4496         switch (error) {
4497         case 0:
4498                 break;
4499         case -EINPROGRESS:
4500                 error = 0;
4501                 break;
4502         case -EAGAIN:
4503                 error = 0;
4504                 /* fall through */
4505         default:
4506                 __put_lkb(ls, lkb);
4507                 goto out;
4508         }
4509
4510         /* add this new lkb to the per-process list of locks */
4511         spin_lock(&ua->proc->locks_spin);
4512         hold_lkb(lkb);
4513         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4514         spin_unlock(&ua->proc->locks_spin);
4515  out:
4516         dlm_unlock_recovery(ls);
4517         return error;
4518 }
4519
4520 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4521                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4522                      unsigned long timeout_cs)
4523 {
4524         struct dlm_lkb *lkb;
4525         struct dlm_args args;
4526         struct dlm_user_args *ua;
4527         int error;
4528
4529         dlm_lock_recovery(ls);
4530
4531         error = find_lkb(ls, lkid, &lkb);
4532         if (error)
4533                 goto out;
4534
4535         /* user can change the params on its lock when it converts it, or
4536            add an lvb that didn't exist before */
4537
4538         ua = lkb->lkb_ua;
4539
4540         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4541                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4542                 if (!ua->lksb.sb_lvbptr) {
4543                         error = -ENOMEM;
4544                         goto out_put;
4545                 }
4546         }
4547         if (lvb_in && ua->lksb.sb_lvbptr)
4548                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4549
4550         ua->xid = ua_tmp->xid;
4551         ua->castparam = ua_tmp->castparam;
4552         ua->castaddr = ua_tmp->castaddr;
4553         ua->bastparam = ua_tmp->bastparam;
4554         ua->bastaddr = ua_tmp->bastaddr;
4555         ua->user_lksb = ua_tmp->user_lksb;
4556         ua->old_mode = lkb->lkb_grmode;
4557
4558         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4559                               fake_astfn, ua, fake_bastfn, &args);
4560         if (error)
4561                 goto out_put;
4562
4563         error = convert_lock(ls, lkb, &args);
4564
4565         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4566                 error = 0;
4567  out_put:
4568         dlm_put_lkb(lkb);
4569  out:
4570         dlm_unlock_recovery(ls);
4571         kfree(ua_tmp);
4572         return error;
4573 }
4574
4575 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4576                     uint32_t flags, uint32_t lkid, char *lvb_in)
4577 {
4578         struct dlm_lkb *lkb;
4579         struct dlm_args args;
4580         struct dlm_user_args *ua;
4581         int error;
4582
4583         dlm_lock_recovery(ls);
4584
4585         error = find_lkb(ls, lkid, &lkb);
4586         if (error)
4587                 goto out;
4588
4589         ua = lkb->lkb_ua;
4590
4591         if (lvb_in && ua->lksb.sb_lvbptr)
4592                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4593         if (ua_tmp->castparam)
4594                 ua->castparam = ua_tmp->castparam;
4595         ua->user_lksb = ua_tmp->user_lksb;
4596
4597         error = set_unlock_args(flags, ua, &args);
4598         if (error)
4599                 goto out_put;
4600
4601         error = unlock_lock(ls, lkb, &args);
4602
4603         if (error == -DLM_EUNLOCK)
4604                 error = 0;
4605         /* from validate_unlock_args() */
4606         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4607                 error = 0;
4608         if (error)
4609                 goto out_put;
4610
4611         spin_lock(&ua->proc->locks_spin);
4612         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4613         if (!list_empty(&lkb->lkb_ownqueue))
4614                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4615         spin_unlock(&ua->proc->locks_spin);
4616  out_put:
4617         dlm_put_lkb(lkb);
4618  out:
4619         dlm_unlock_recovery(ls);
4620         kfree(ua_tmp);
4621         return error;
4622 }
4623
4624 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4625                     uint32_t flags, uint32_t lkid)
4626 {
4627         struct dlm_lkb *lkb;
4628         struct dlm_args args;
4629         struct dlm_user_args *ua;
4630         int error;
4631
4632         dlm_lock_recovery(ls);
4633
4634         error = find_lkb(ls, lkid, &lkb);
4635         if (error)
4636                 goto out;
4637
4638         ua = lkb->lkb_ua;
4639         if (ua_tmp->castparam)
4640                 ua->castparam = ua_tmp->castparam;
4641         ua->user_lksb = ua_tmp->user_lksb;
4642
4643         error = set_unlock_args(flags, ua, &args);
4644         if (error)
4645                 goto out_put;
4646
4647         error = cancel_lock(ls, lkb, &args);
4648
4649         if (error == -DLM_ECANCEL)
4650                 error = 0;
4651         /* from validate_unlock_args() */
4652         if (error == -EBUSY)
4653                 error = 0;
4654  out_put:
4655         dlm_put_lkb(lkb);
4656  out:
4657         dlm_unlock_recovery(ls);
4658         kfree(ua_tmp);
4659         return error;
4660 }
4661
4662 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4663 {
4664         struct dlm_lkb *lkb;
4665         struct dlm_args args;
4666         struct dlm_user_args *ua;
4667         struct dlm_rsb *r;
4668         int error;
4669
4670         dlm_lock_recovery(ls);
4671
4672         error = find_lkb(ls, lkid, &lkb);
4673         if (error)
4674                 goto out;
4675
4676         ua = lkb->lkb_ua;
4677
4678         error = set_unlock_args(flags, ua, &args);
4679         if (error)
4680                 goto out_put;
4681
4682         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4683
4684         r = lkb->lkb_resource;
4685         hold_rsb(r);
4686         lock_rsb(r);
4687
4688         error = validate_unlock_args(lkb, &args);
4689         if (error)
4690                 goto out_r;
4691         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4692
4693         error = _cancel_lock(r, lkb);
4694  out_r:
4695         unlock_rsb(r);
4696         put_rsb(r);
4697
4698         if (error == -DLM_ECANCEL)
4699                 error = 0;
4700         /* from validate_unlock_args() */
4701         if (error == -EBUSY)
4702                 error = 0;
4703  out_put:
4704         dlm_put_lkb(lkb);
4705  out:
4706         dlm_unlock_recovery(ls);
4707         return error;
4708 }
4709
4710 /* lkb's that are removed from the waiters list by revert are just left on the
4711    orphans list with the granted orphan locks, to be freed by purge */
4712
4713 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4714 {
4715         struct dlm_args args;
4716         int error;
4717
4718         hold_lkb(lkb);
4719         mutex_lock(&ls->ls_orphans_mutex);
4720         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4721         mutex_unlock(&ls->ls_orphans_mutex);
4722
4723         set_unlock_args(0, lkb->lkb_ua, &args);
4724
4725         error = cancel_lock(ls, lkb, &args);
4726         if (error == -DLM_ECANCEL)
4727                 error = 0;
4728         return error;
4729 }
4730
4731 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4732    Regardless of what rsb queue the lock is on, it's removed and freed. */
4733
4734 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4735 {
4736         struct dlm_args args;
4737         int error;
4738
4739         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4740
4741         error = unlock_lock(ls, lkb, &args);
4742         if (error == -DLM_EUNLOCK)
4743                 error = 0;
4744         return error;
4745 }
4746
4747 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4748    (which does lock_rsb) due to deadlock with receiving a message that does
4749    lock_rsb followed by dlm_user_add_ast() */
4750
4751 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4752                                      struct dlm_user_proc *proc)
4753 {
4754         struct dlm_lkb *lkb = NULL;
4755
4756         mutex_lock(&ls->ls_clear_proc_locks);
4757         if (list_empty(&proc->locks))
4758                 goto out;
4759
4760         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4761         list_del_init(&lkb->lkb_ownqueue);
4762
4763         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4764                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4765         else
4766                 lkb->lkb_flags |= DLM_IFL_DEAD;
4767  out:
4768         mutex_unlock(&ls->ls_clear_proc_locks);
4769         return lkb;
4770 }
4771
4772 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4773    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4774    which we clear here. */
4775
4776 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4777    list, and no more device_writes should add lkb's to proc->locks list; so we
4778    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4779    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4780    them ourself. */
4781
4782 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4783 {
4784         struct dlm_lkb *lkb, *safe;
4785
4786         dlm_lock_recovery(ls);
4787
4788         while (1) {
4789                 lkb = del_proc_lock(ls, proc);
4790                 if (!lkb)
4791                         break;
4792                 del_timeout(lkb);
4793                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4794                         orphan_proc_lock(ls, lkb);
4795                 else
4796                         unlock_proc_lock(ls, lkb);
4797
4798                 /* this removes the reference for the proc->locks list
4799                    added by dlm_user_request, it may result in the lkb
4800                    being freed */
4801
4802                 dlm_put_lkb(lkb);
4803         }
4804
4805         mutex_lock(&ls->ls_clear_proc_locks);
4806
4807         /* in-progress unlocks */
4808         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4809                 list_del_init(&lkb->lkb_ownqueue);
4810                 lkb->lkb_flags |= DLM_IFL_DEAD;
4811                 dlm_put_lkb(lkb);
4812         }
4813
4814         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4815                 lkb->lkb_ast_type = 0;
4816                 list_del(&lkb->lkb_astqueue);
4817                 dlm_put_lkb(lkb);
4818         }
4819
4820         mutex_unlock(&ls->ls_clear_proc_locks);
4821         dlm_unlock_recovery(ls);
4822 }
4823
4824 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4825 {
4826         struct dlm_lkb *lkb, *safe;
4827
4828         while (1) {
4829                 lkb = NULL;
4830                 spin_lock(&proc->locks_spin);
4831                 if (!list_empty(&proc->locks)) {
4832                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4833                                          lkb_ownqueue);
4834                         list_del_init(&lkb->lkb_ownqueue);
4835                 }
4836                 spin_unlock(&proc->locks_spin);
4837
4838                 if (!lkb)
4839                         break;
4840
4841                 lkb->lkb_flags |= DLM_IFL_DEAD;
4842                 unlock_proc_lock(ls, lkb);
4843                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4844         }
4845
4846         spin_lock(&proc->locks_spin);
4847         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4848                 list_del_init(&lkb->lkb_ownqueue);
4849                 lkb->lkb_flags |= DLM_IFL_DEAD;
4850                 dlm_put_lkb(lkb);
4851         }
4852         spin_unlock(&proc->locks_spin);
4853
4854         spin_lock(&proc->asts_spin);
4855         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4856                 list_del(&lkb->lkb_astqueue);
4857                 dlm_put_lkb(lkb);
4858         }
4859         spin_unlock(&proc->asts_spin);
4860 }
4861
4862 /* pid of 0 means purge all orphans */
4863
4864 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4865 {
4866         struct dlm_lkb *lkb, *safe;
4867
4868         mutex_lock(&ls->ls_orphans_mutex);
4869         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4870                 if (pid && lkb->lkb_ownpid != pid)
4871                         continue;
4872                 unlock_proc_lock(ls, lkb);
4873                 list_del_init(&lkb->lkb_ownqueue);
4874                 dlm_put_lkb(lkb);
4875         }
4876         mutex_unlock(&ls->ls_orphans_mutex);
4877 }
4878
4879 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4880 {
4881         struct dlm_message *ms;
4882         struct dlm_mhandle *mh;
4883         int error;
4884
4885         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4886                                 DLM_MSG_PURGE, &ms, &mh);
4887         if (error)
4888                 return error;
4889         ms->m_nodeid = nodeid;
4890         ms->m_pid = pid;
4891
4892         return send_message(mh, ms);
4893 }
4894
4895 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4896                    int nodeid, int pid)
4897 {
4898         int error = 0;
4899
4900         if (nodeid != dlm_our_nodeid()) {
4901                 error = send_purge(ls, nodeid, pid);
4902         } else {
4903                 dlm_lock_recovery(ls);
4904                 if (pid == current->pid)
4905                         purge_proc_locks(ls, proc);
4906                 else
4907                         do_purge(ls, nodeid, pid);
4908                 dlm_unlock_recovery(ls);
4909         }
4910         return error;
4911 }
4912