Merge master.kernel.org:/home/rmk/linux-2.6-arm
[linux-2.6] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87                                     struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132
133 #define modes_compat(gr, rq) \
134         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167
168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171                r->res_nodeid, r->res_flags, r->res_first_lkid,
172                r->res_recover_locks_count, r->res_name);
173 }
174
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177         struct dlm_lkb *lkb;
178
179         dlm_print_rsb(r);
180
181         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183         printk(KERN_ERR "rsb lookup list\n");
184         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185                 dlm_print_lkb(lkb);
186         printk(KERN_ERR "rsb grant queue:\n");
187         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb convert queue:\n");
190         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb wait queue:\n");
193         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195 }
196
197 /* Threads cannot use the lockspace while it's being recovered */
198
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201         down_read(&ls->ls_in_recovery);
202 }
203
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206         up_read(&ls->ls_in_recovery);
207 }
208
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211         return down_read_trylock(&ls->ls_in_recovery);
212 }
213
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242         return !!r->res_nodeid;
243 }
244
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261                 return 1;
262         return 0;
263 }
264
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283                                   DLM_IFL_OVERLAP_CANCEL));
284 }
285
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288         if (is_master_copy(lkb))
289                 return;
290
291         del_timeout(lkb);
292
293         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294
295         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
296            timeout caused the cancel then return -ETIMEDOUT */
297         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299                 rv = -ETIMEDOUT;
300         }
301
302         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304                 rv = -EDEADLK;
305         }
306
307         lkb->lkb_lksb->sb_status = rv;
308         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309
310         dlm_add_ast(lkb, AST_COMP, 0);
311 }
312
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315         queue_cast(r, lkb,
316                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321         lkb->lkb_time_bast = ktime_get();
322
323         if (is_master_copy(lkb))
324                 send_bast(r, lkb, rqmode);
325         else
326                 dlm_add_ast(lkb, AST_BAST, rqmode);
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
334 {
335         struct dlm_rsb *r;
336
337         r = dlm_allocate_rsb(ls, len);
338         if (!r)
339                 return NULL;
340
341         r->res_ls = ls;
342         r->res_length = len;
343         memcpy(r->res_name, name, len);
344         mutex_init(&r->res_mutex);
345
346         INIT_LIST_HEAD(&r->res_lookup);
347         INIT_LIST_HEAD(&r->res_grantqueue);
348         INIT_LIST_HEAD(&r->res_convertqueue);
349         INIT_LIST_HEAD(&r->res_waitqueue);
350         INIT_LIST_HEAD(&r->res_root_list);
351         INIT_LIST_HEAD(&r->res_recover_list);
352
353         return r;
354 }
355
356 static int search_rsb_list(struct list_head *head, char *name, int len,
357                            unsigned int flags, struct dlm_rsb **r_ret)
358 {
359         struct dlm_rsb *r;
360         int error = 0;
361
362         list_for_each_entry(r, head, res_hashchain) {
363                 if (len == r->res_length && !memcmp(name, r->res_name, len))
364                         goto found;
365         }
366         *r_ret = NULL;
367         return -EBADR;
368
369  found:
370         if (r->res_nodeid && (flags & R_MASTER))
371                 error = -ENOTBLK;
372         *r_ret = r;
373         return error;
374 }
375
376 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
377                        unsigned int flags, struct dlm_rsb **r_ret)
378 {
379         struct dlm_rsb *r;
380         int error;
381
382         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
383         if (!error) {
384                 kref_get(&r->res_ref);
385                 goto out;
386         }
387         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
388         if (error)
389                 goto out;
390
391         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
392
393         if (dlm_no_directory(ls))
394                 goto out;
395
396         if (r->res_nodeid == -1) {
397                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
398                 r->res_first_lkid = 0;
399         } else if (r->res_nodeid > 0) {
400                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
401                 r->res_first_lkid = 0;
402         } else {
403                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
404                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
405         }
406  out:
407         *r_ret = r;
408         return error;
409 }
410
411 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
412                       unsigned int flags, struct dlm_rsb **r_ret)
413 {
414         int error;
415         spin_lock(&ls->ls_rsbtbl[b].lock);
416         error = _search_rsb(ls, name, len, b, flags, r_ret);
417         spin_unlock(&ls->ls_rsbtbl[b].lock);
418         return error;
419 }
420
421 /*
422  * Find rsb in rsbtbl and potentially create/add one
423  *
424  * Delaying the release of rsb's has a similar benefit to applications keeping
425  * NL locks on an rsb, but without the guarantee that the cached master value
426  * will still be valid when the rsb is reused.  Apps aren't always smart enough
427  * to keep NL locks on an rsb that they may lock again shortly; this can lead
428  * to excessive master lookups and removals if we don't delay the release.
429  *
430  * Searching for an rsb means looking through both the normal list and toss
431  * list.  When found on the toss list the rsb is moved to the normal list with
432  * ref count of 1; when found on normal list the ref count is incremented.
433  */
434
435 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
436                     unsigned int flags, struct dlm_rsb **r_ret)
437 {
438         struct dlm_rsb *r, *tmp;
439         uint32_t hash, bucket;
440         int error = -EINVAL;
441
442         if (namelen > DLM_RESNAME_MAXLEN)
443                 goto out;
444
445         if (dlm_no_directory(ls))
446                 flags |= R_CREATE;
447
448         error = 0;
449         hash = jhash(name, namelen, 0);
450         bucket = hash & (ls->ls_rsbtbl_size - 1);
451
452         error = search_rsb(ls, name, namelen, bucket, flags, &r);
453         if (!error)
454                 goto out;
455
456         if (error == -EBADR && !(flags & R_CREATE))
457                 goto out;
458
459         /* the rsb was found but wasn't a master copy */
460         if (error == -ENOTBLK)
461                 goto out;
462
463         error = -ENOMEM;
464         r = create_rsb(ls, name, namelen);
465         if (!r)
466                 goto out;
467
468         r->res_hash = hash;
469         r->res_bucket = bucket;
470         r->res_nodeid = -1;
471         kref_init(&r->res_ref);
472
473         /* With no directory, the master can be set immediately */
474         if (dlm_no_directory(ls)) {
475                 int nodeid = dlm_dir_nodeid(r);
476                 if (nodeid == dlm_our_nodeid())
477                         nodeid = 0;
478                 r->res_nodeid = nodeid;
479         }
480
481         spin_lock(&ls->ls_rsbtbl[bucket].lock);
482         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
483         if (!error) {
484                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
485                 dlm_free_rsb(r);
486                 r = tmp;
487                 goto out;
488         }
489         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
490         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
491         error = 0;
492  out:
493         *r_ret = r;
494         return error;
495 }
496
497 /* This is only called to add a reference when the code already holds
498    a valid reference to the rsb, so there's no need for locking. */
499
500 static inline void hold_rsb(struct dlm_rsb *r)
501 {
502         kref_get(&r->res_ref);
503 }
504
505 void dlm_hold_rsb(struct dlm_rsb *r)
506 {
507         hold_rsb(r);
508 }
509
510 static void toss_rsb(struct kref *kref)
511 {
512         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
513         struct dlm_ls *ls = r->res_ls;
514
515         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
516         kref_init(&r->res_ref);
517         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
518         r->res_toss_time = jiffies;
519         if (r->res_lvbptr) {
520                 dlm_free_lvb(r->res_lvbptr);
521                 r->res_lvbptr = NULL;
522         }
523 }
524
525 /* When all references to the rsb are gone it's transfered to
526    the tossed list for later disposal. */
527
528 static void put_rsb(struct dlm_rsb *r)
529 {
530         struct dlm_ls *ls = r->res_ls;
531         uint32_t bucket = r->res_bucket;
532
533         spin_lock(&ls->ls_rsbtbl[bucket].lock);
534         kref_put(&r->res_ref, toss_rsb);
535         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
536 }
537
538 void dlm_put_rsb(struct dlm_rsb *r)
539 {
540         put_rsb(r);
541 }
542
543 /* See comment for unhold_lkb */
544
545 static void unhold_rsb(struct dlm_rsb *r)
546 {
547         int rv;
548         rv = kref_put(&r->res_ref, toss_rsb);
549         DLM_ASSERT(!rv, dlm_dump_rsb(r););
550 }
551
552 static void kill_rsb(struct kref *kref)
553 {
554         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
555
556         /* All work is done after the return from kref_put() so we
557            can release the write_lock before the remove and free. */
558
559         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
560         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
561         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
562         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
563         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
564         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
565 }
566
567 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
568    The rsb must exist as long as any lkb's for it do. */
569
570 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
571 {
572         hold_rsb(r);
573         lkb->lkb_resource = r;
574 }
575
576 static void detach_lkb(struct dlm_lkb *lkb)
577 {
578         if (lkb->lkb_resource) {
579                 put_rsb(lkb->lkb_resource);
580                 lkb->lkb_resource = NULL;
581         }
582 }
583
584 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
585 {
586         struct dlm_lkb *lkb, *tmp;
587         uint32_t lkid = 0;
588         uint16_t bucket;
589
590         lkb = dlm_allocate_lkb(ls);
591         if (!lkb)
592                 return -ENOMEM;
593
594         lkb->lkb_nodeid = -1;
595         lkb->lkb_grmode = DLM_LOCK_IV;
596         kref_init(&lkb->lkb_ref);
597         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
598         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
599         INIT_LIST_HEAD(&lkb->lkb_time_list);
600
601         get_random_bytes(&bucket, sizeof(bucket));
602         bucket &= (ls->ls_lkbtbl_size - 1);
603
604         write_lock(&ls->ls_lkbtbl[bucket].lock);
605
606         /* counter can roll over so we must verify lkid is not in use */
607
608         while (lkid == 0) {
609                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
610
611                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
612                                     lkb_idtbl_list) {
613                         if (tmp->lkb_id != lkid)
614                                 continue;
615                         lkid = 0;
616                         break;
617                 }
618         }
619
620         lkb->lkb_id = lkid;
621         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
622         write_unlock(&ls->ls_lkbtbl[bucket].lock);
623
624         *lkb_ret = lkb;
625         return 0;
626 }
627
628 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
629 {
630         struct dlm_lkb *lkb;
631         uint16_t bucket = (lkid >> 16);
632
633         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
634                 if (lkb->lkb_id == lkid)
635                         return lkb;
636         }
637         return NULL;
638 }
639
640 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
641 {
642         struct dlm_lkb *lkb;
643         uint16_t bucket = (lkid >> 16);
644
645         if (bucket >= ls->ls_lkbtbl_size)
646                 return -EBADSLT;
647
648         read_lock(&ls->ls_lkbtbl[bucket].lock);
649         lkb = __find_lkb(ls, lkid);
650         if (lkb)
651                 kref_get(&lkb->lkb_ref);
652         read_unlock(&ls->ls_lkbtbl[bucket].lock);
653
654         *lkb_ret = lkb;
655         return lkb ? 0 : -ENOENT;
656 }
657
658 static void kill_lkb(struct kref *kref)
659 {
660         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
661
662         /* All work is done after the return from kref_put() so we
663            can release the write_lock before the detach_lkb */
664
665         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
666 }
667
668 /* __put_lkb() is used when an lkb may not have an rsb attached to
669    it so we need to provide the lockspace explicitly */
670
671 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
672 {
673         uint16_t bucket = (lkb->lkb_id >> 16);
674
675         write_lock(&ls->ls_lkbtbl[bucket].lock);
676         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
677                 list_del(&lkb->lkb_idtbl_list);
678                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
679
680                 detach_lkb(lkb);
681
682                 /* for local/process lkbs, lvbptr points to caller's lksb */
683                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
684                         dlm_free_lvb(lkb->lkb_lvbptr);
685                 dlm_free_lkb(lkb);
686                 return 1;
687         } else {
688                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
689                 return 0;
690         }
691 }
692
693 int dlm_put_lkb(struct dlm_lkb *lkb)
694 {
695         struct dlm_ls *ls;
696
697         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
698         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
699
700         ls = lkb->lkb_resource->res_ls;
701         return __put_lkb(ls, lkb);
702 }
703
704 /* This is only called to add a reference when the code already holds
705    a valid reference to the lkb, so there's no need for locking. */
706
707 static inline void hold_lkb(struct dlm_lkb *lkb)
708 {
709         kref_get(&lkb->lkb_ref);
710 }
711
712 /* This is called when we need to remove a reference and are certain
713    it's not the last ref.  e.g. del_lkb is always called between a
714    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
715    put_lkb would work fine, but would involve unnecessary locking */
716
717 static inline void unhold_lkb(struct dlm_lkb *lkb)
718 {
719         int rv;
720         rv = kref_put(&lkb->lkb_ref, kill_lkb);
721         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
722 }
723
724 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
725                             int mode)
726 {
727         struct dlm_lkb *lkb = NULL;
728
729         list_for_each_entry(lkb, head, lkb_statequeue)
730                 if (lkb->lkb_rqmode < mode)
731                         break;
732
733         if (!lkb)
734                 list_add_tail(new, head);
735         else
736                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
737 }
738
739 /* add/remove lkb to rsb's grant/convert/wait queue */
740
741 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
742 {
743         kref_get(&lkb->lkb_ref);
744
745         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
746
747         lkb->lkb_timestamp = ktime_get();
748
749         lkb->lkb_status = status;
750
751         switch (status) {
752         case DLM_LKSTS_WAITING:
753                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
754                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
755                 else
756                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
757                 break;
758         case DLM_LKSTS_GRANTED:
759                 /* convention says granted locks kept in order of grmode */
760                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
761                                 lkb->lkb_grmode);
762                 break;
763         case DLM_LKSTS_CONVERT:
764                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
765                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
766                 else
767                         list_add_tail(&lkb->lkb_statequeue,
768                                       &r->res_convertqueue);
769                 break;
770         default:
771                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
772         }
773 }
774
775 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
776 {
777         lkb->lkb_status = 0;
778         list_del(&lkb->lkb_statequeue);
779         unhold_lkb(lkb);
780 }
781
782 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
783 {
784         hold_lkb(lkb);
785         del_lkb(r, lkb);
786         add_lkb(r, lkb, sts);
787         unhold_lkb(lkb);
788 }
789
790 static int msg_reply_type(int mstype)
791 {
792         switch (mstype) {
793         case DLM_MSG_REQUEST:
794                 return DLM_MSG_REQUEST_REPLY;
795         case DLM_MSG_CONVERT:
796                 return DLM_MSG_CONVERT_REPLY;
797         case DLM_MSG_UNLOCK:
798                 return DLM_MSG_UNLOCK_REPLY;
799         case DLM_MSG_CANCEL:
800                 return DLM_MSG_CANCEL_REPLY;
801         case DLM_MSG_LOOKUP:
802                 return DLM_MSG_LOOKUP_REPLY;
803         }
804         return -1;
805 }
806
807 /* add/remove lkb from global waiters list of lkb's waiting for
808    a reply from a remote node */
809
810 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
811 {
812         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
813         int error = 0;
814
815         mutex_lock(&ls->ls_waiters_mutex);
816
817         if (is_overlap_unlock(lkb) ||
818             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
819                 error = -EINVAL;
820                 goto out;
821         }
822
823         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
824                 switch (mstype) {
825                 case DLM_MSG_UNLOCK:
826                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
827                         break;
828                 case DLM_MSG_CANCEL:
829                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
830                         break;
831                 default:
832                         error = -EBUSY;
833                         goto out;
834                 }
835                 lkb->lkb_wait_count++;
836                 hold_lkb(lkb);
837
838                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
839                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
840                           lkb->lkb_wait_count, lkb->lkb_flags);
841                 goto out;
842         }
843
844         DLM_ASSERT(!lkb->lkb_wait_count,
845                    dlm_print_lkb(lkb);
846                    printk("wait_count %d\n", lkb->lkb_wait_count););
847
848         lkb->lkb_wait_count++;
849         lkb->lkb_wait_type = mstype;
850         hold_lkb(lkb);
851         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
852  out:
853         if (error)
854                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
855                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
856                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
857         mutex_unlock(&ls->ls_waiters_mutex);
858         return error;
859 }
860
861 /* We clear the RESEND flag because we might be taking an lkb off the waiters
862    list as part of process_requestqueue (e.g. a lookup that has an optimized
863    request reply on the requestqueue) between dlm_recover_waiters_pre() which
864    set RESEND and dlm_recover_waiters_post() */
865
866 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
867                                 struct dlm_message *ms)
868 {
869         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
870         int overlap_done = 0;
871
872         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
873                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
874                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
875                 overlap_done = 1;
876                 goto out_del;
877         }
878
879         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
880                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
881                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
882                 overlap_done = 1;
883                 goto out_del;
884         }
885
886         /* Cancel state was preemptively cleared by a successful convert,
887            see next comment, nothing to do. */
888
889         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
890             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
891                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
892                           lkb->lkb_id, lkb->lkb_wait_type);
893                 return -1;
894         }
895
896         /* Remove for the convert reply, and premptively remove for the
897            cancel reply.  A convert has been granted while there's still
898            an outstanding cancel on it (the cancel is moot and the result
899            in the cancel reply should be 0).  We preempt the cancel reply
900            because the app gets the convert result and then can follow up
901            with another op, like convert.  This subsequent op would see the
902            lingering state of the cancel and fail with -EBUSY. */
903
904         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
905             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
906             is_overlap_cancel(lkb) && ms && !ms->m_result) {
907                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
908                           lkb->lkb_id);
909                 lkb->lkb_wait_type = 0;
910                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
911                 lkb->lkb_wait_count--;
912                 goto out_del;
913         }
914
915         /* N.B. type of reply may not always correspond to type of original
916            msg due to lookup->request optimization, verify others? */
917
918         if (lkb->lkb_wait_type) {
919                 lkb->lkb_wait_type = 0;
920                 goto out_del;
921         }
922
923         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
924                   lkb->lkb_id, mstype, lkb->lkb_flags);
925         return -1;
926
927  out_del:
928         /* the force-unlock/cancel has completed and we haven't recvd a reply
929            to the op that was in progress prior to the unlock/cancel; we
930            give up on any reply to the earlier op.  FIXME: not sure when/how
931            this would happen */
932
933         if (overlap_done && lkb->lkb_wait_type) {
934                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
935                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
936                 lkb->lkb_wait_count--;
937                 lkb->lkb_wait_type = 0;
938         }
939
940         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
941
942         lkb->lkb_flags &= ~DLM_IFL_RESEND;
943         lkb->lkb_wait_count--;
944         if (!lkb->lkb_wait_count)
945                 list_del_init(&lkb->lkb_wait_reply);
946         unhold_lkb(lkb);
947         return 0;
948 }
949
950 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
951 {
952         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
953         int error;
954
955         mutex_lock(&ls->ls_waiters_mutex);
956         error = _remove_from_waiters(lkb, mstype, NULL);
957         mutex_unlock(&ls->ls_waiters_mutex);
958         return error;
959 }
960
961 /* Handles situations where we might be processing a "fake" or "stub" reply in
962    which we can't try to take waiters_mutex again. */
963
964 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
965 {
966         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
967         int error;
968
969         if (ms != &ls->ls_stub_ms)
970                 mutex_lock(&ls->ls_waiters_mutex);
971         error = _remove_from_waiters(lkb, ms->m_type, ms);
972         if (ms != &ls->ls_stub_ms)
973                 mutex_unlock(&ls->ls_waiters_mutex);
974         return error;
975 }
976
977 static void dir_remove(struct dlm_rsb *r)
978 {
979         int to_nodeid;
980
981         if (dlm_no_directory(r->res_ls))
982                 return;
983
984         to_nodeid = dlm_dir_nodeid(r);
985         if (to_nodeid != dlm_our_nodeid())
986                 send_remove(r);
987         else
988                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
989                                      r->res_name, r->res_length);
990 }
991
992 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
993    found since they are in order of newest to oldest? */
994
995 static int shrink_bucket(struct dlm_ls *ls, int b)
996 {
997         struct dlm_rsb *r;
998         int count = 0, found;
999
1000         for (;;) {
1001                 found = 0;
1002                 spin_lock(&ls->ls_rsbtbl[b].lock);
1003                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1004                                             res_hashchain) {
1005                         if (!time_after_eq(jiffies, r->res_toss_time +
1006                                            dlm_config.ci_toss_secs * HZ))
1007                                 continue;
1008                         found = 1;
1009                         break;
1010                 }
1011
1012                 if (!found) {
1013                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1014                         break;
1015                 }
1016
1017                 if (kref_put(&r->res_ref, kill_rsb)) {
1018                         list_del(&r->res_hashchain);
1019                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1020
1021                         if (is_master(r))
1022                                 dir_remove(r);
1023                         dlm_free_rsb(r);
1024                         count++;
1025                 } else {
1026                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1027                         log_error(ls, "tossed rsb in use %s", r->res_name);
1028                 }
1029         }
1030
1031         return count;
1032 }
1033
1034 void dlm_scan_rsbs(struct dlm_ls *ls)
1035 {
1036         int i;
1037
1038         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1039                 shrink_bucket(ls, i);
1040                 if (dlm_locking_stopped(ls))
1041                         break;
1042                 cond_resched();
1043         }
1044 }
1045
1046 static void add_timeout(struct dlm_lkb *lkb)
1047 {
1048         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1049
1050         if (is_master_copy(lkb))
1051                 return;
1052
1053         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1054             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1055                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1056                 goto add_it;
1057         }
1058         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1059                 goto add_it;
1060         return;
1061
1062  add_it:
1063         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1064         mutex_lock(&ls->ls_timeout_mutex);
1065         hold_lkb(lkb);
1066         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1067         mutex_unlock(&ls->ls_timeout_mutex);
1068 }
1069
1070 static void del_timeout(struct dlm_lkb *lkb)
1071 {
1072         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1073
1074         mutex_lock(&ls->ls_timeout_mutex);
1075         if (!list_empty(&lkb->lkb_time_list)) {
1076                 list_del_init(&lkb->lkb_time_list);
1077                 unhold_lkb(lkb);
1078         }
1079         mutex_unlock(&ls->ls_timeout_mutex);
1080 }
1081
1082 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1083    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1084    and then lock rsb because of lock ordering in add_timeout.  We may need
1085    to specify some special timeout-related bits in the lkb that are just to
1086    be accessed under the timeout_mutex. */
1087
1088 void dlm_scan_timeout(struct dlm_ls *ls)
1089 {
1090         struct dlm_rsb *r;
1091         struct dlm_lkb *lkb;
1092         int do_cancel, do_warn;
1093         s64 wait_us;
1094
1095         for (;;) {
1096                 if (dlm_locking_stopped(ls))
1097                         break;
1098
1099                 do_cancel = 0;
1100                 do_warn = 0;
1101                 mutex_lock(&ls->ls_timeout_mutex);
1102                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1103
1104                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1105                                                         lkb->lkb_timestamp));
1106
1107                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1108                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1109                                 do_cancel = 1;
1110
1111                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1112                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1113                                 do_warn = 1;
1114
1115                         if (!do_cancel && !do_warn)
1116                                 continue;
1117                         hold_lkb(lkb);
1118                         break;
1119                 }
1120                 mutex_unlock(&ls->ls_timeout_mutex);
1121
1122                 if (!do_cancel && !do_warn)
1123                         break;
1124
1125                 r = lkb->lkb_resource;
1126                 hold_rsb(r);
1127                 lock_rsb(r);
1128
1129                 if (do_warn) {
1130                         /* clear flag so we only warn once */
1131                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1132                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1133                                 del_timeout(lkb);
1134                         dlm_timeout_warn(lkb);
1135                 }
1136
1137                 if (do_cancel) {
1138                         log_debug(ls, "timeout cancel %x node %d %s",
1139                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1140                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1141                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1142                         del_timeout(lkb);
1143                         _cancel_lock(r, lkb);
1144                 }
1145
1146                 unlock_rsb(r);
1147                 unhold_rsb(r);
1148                 dlm_put_lkb(lkb);
1149         }
1150 }
1151
1152 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1153    dlm_recoverd before checking/setting ls_recover_begin. */
1154
1155 void dlm_adjust_timeouts(struct dlm_ls *ls)
1156 {
1157         struct dlm_lkb *lkb;
1158         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1159
1160         ls->ls_recover_begin = 0;
1161         mutex_lock(&ls->ls_timeout_mutex);
1162         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1163                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1164         mutex_unlock(&ls->ls_timeout_mutex);
1165 }
1166
1167 /* lkb is master or local copy */
1168
1169 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1170 {
1171         int b, len = r->res_ls->ls_lvblen;
1172
1173         /* b=1 lvb returned to caller
1174            b=0 lvb written to rsb or invalidated
1175            b=-1 do nothing */
1176
1177         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1178
1179         if (b == 1) {
1180                 if (!lkb->lkb_lvbptr)
1181                         return;
1182
1183                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1184                         return;
1185
1186                 if (!r->res_lvbptr)
1187                         return;
1188
1189                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1190                 lkb->lkb_lvbseq = r->res_lvbseq;
1191
1192         } else if (b == 0) {
1193                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1194                         rsb_set_flag(r, RSB_VALNOTVALID);
1195                         return;
1196                 }
1197
1198                 if (!lkb->lkb_lvbptr)
1199                         return;
1200
1201                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1202                         return;
1203
1204                 if (!r->res_lvbptr)
1205                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1206
1207                 if (!r->res_lvbptr)
1208                         return;
1209
1210                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1211                 r->res_lvbseq++;
1212                 lkb->lkb_lvbseq = r->res_lvbseq;
1213                 rsb_clear_flag(r, RSB_VALNOTVALID);
1214         }
1215
1216         if (rsb_flag(r, RSB_VALNOTVALID))
1217                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1218 }
1219
1220 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1221 {
1222         if (lkb->lkb_grmode < DLM_LOCK_PW)
1223                 return;
1224
1225         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1226                 rsb_set_flag(r, RSB_VALNOTVALID);
1227                 return;
1228         }
1229
1230         if (!lkb->lkb_lvbptr)
1231                 return;
1232
1233         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1234                 return;
1235
1236         if (!r->res_lvbptr)
1237                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1238
1239         if (!r->res_lvbptr)
1240                 return;
1241
1242         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1243         r->res_lvbseq++;
1244         rsb_clear_flag(r, RSB_VALNOTVALID);
1245 }
1246
1247 /* lkb is process copy (pc) */
1248
1249 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1250                             struct dlm_message *ms)
1251 {
1252         int b;
1253
1254         if (!lkb->lkb_lvbptr)
1255                 return;
1256
1257         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1258                 return;
1259
1260         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1261         if (b == 1) {
1262                 int len = receive_extralen(ms);
1263                 if (len > DLM_RESNAME_MAXLEN)
1264                         len = DLM_RESNAME_MAXLEN;
1265                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1266                 lkb->lkb_lvbseq = ms->m_lvbseq;
1267         }
1268 }
1269
1270 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1271    remove_lock -- used for unlock, removes lkb from granted
1272    revert_lock -- used for cancel, moves lkb from convert to granted
1273    grant_lock  -- used for request and convert, adds lkb to granted or
1274                   moves lkb from convert or waiting to granted
1275
1276    Each of these is used for master or local copy lkb's.  There is
1277    also a _pc() variation used to make the corresponding change on
1278    a process copy (pc) lkb. */
1279
1280 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1281 {
1282         del_lkb(r, lkb);
1283         lkb->lkb_grmode = DLM_LOCK_IV;
1284         /* this unhold undoes the original ref from create_lkb()
1285            so this leads to the lkb being freed */
1286         unhold_lkb(lkb);
1287 }
1288
1289 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1290 {
1291         set_lvb_unlock(r, lkb);
1292         _remove_lock(r, lkb);
1293 }
1294
1295 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1296 {
1297         _remove_lock(r, lkb);
1298 }
1299
1300 /* returns: 0 did nothing
1301             1 moved lock to granted
1302            -1 removed lock */
1303
1304 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1305 {
1306         int rv = 0;
1307
1308         lkb->lkb_rqmode = DLM_LOCK_IV;
1309
1310         switch (lkb->lkb_status) {
1311         case DLM_LKSTS_GRANTED:
1312                 break;
1313         case DLM_LKSTS_CONVERT:
1314                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1315                 rv = 1;
1316                 break;
1317         case DLM_LKSTS_WAITING:
1318                 del_lkb(r, lkb);
1319                 lkb->lkb_grmode = DLM_LOCK_IV;
1320                 /* this unhold undoes the original ref from create_lkb()
1321                    so this leads to the lkb being freed */
1322                 unhold_lkb(lkb);
1323                 rv = -1;
1324                 break;
1325         default:
1326                 log_print("invalid status for revert %d", lkb->lkb_status);
1327         }
1328         return rv;
1329 }
1330
1331 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1332 {
1333         return revert_lock(r, lkb);
1334 }
1335
1336 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1337 {
1338         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1339                 lkb->lkb_grmode = lkb->lkb_rqmode;
1340                 if (lkb->lkb_status)
1341                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1342                 else
1343                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1344         }
1345
1346         lkb->lkb_rqmode = DLM_LOCK_IV;
1347 }
1348
1349 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1350 {
1351         set_lvb_lock(r, lkb);
1352         _grant_lock(r, lkb);
1353         lkb->lkb_highbast = 0;
1354 }
1355
1356 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1357                           struct dlm_message *ms)
1358 {
1359         set_lvb_lock_pc(r, lkb, ms);
1360         _grant_lock(r, lkb);
1361 }
1362
1363 /* called by grant_pending_locks() which means an async grant message must
1364    be sent to the requesting node in addition to granting the lock if the
1365    lkb belongs to a remote node. */
1366
1367 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1368 {
1369         grant_lock(r, lkb);
1370         if (is_master_copy(lkb))
1371                 send_grant(r, lkb);
1372         else
1373                 queue_cast(r, lkb, 0);
1374 }
1375
1376 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1377    change the granted/requested modes.  We're munging things accordingly in
1378    the process copy.
1379    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1380    conversion deadlock
1381    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1382    compatible with other granted locks */
1383
1384 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1385 {
1386         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1387                 log_print("munge_demoted %x invalid reply type %d",
1388                           lkb->lkb_id, ms->m_type);
1389                 return;
1390         }
1391
1392         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1393                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1394                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1395                 return;
1396         }
1397
1398         lkb->lkb_grmode = DLM_LOCK_NL;
1399 }
1400
1401 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1402 {
1403         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1404             ms->m_type != DLM_MSG_GRANT) {
1405                 log_print("munge_altmode %x invalid reply type %d",
1406                           lkb->lkb_id, ms->m_type);
1407                 return;
1408         }
1409
1410         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1411                 lkb->lkb_rqmode = DLM_LOCK_PR;
1412         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1413                 lkb->lkb_rqmode = DLM_LOCK_CW;
1414         else {
1415                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1416                 dlm_print_lkb(lkb);
1417         }
1418 }
1419
1420 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1421 {
1422         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1423                                            lkb_statequeue);
1424         if (lkb->lkb_id == first->lkb_id)
1425                 return 1;
1426
1427         return 0;
1428 }
1429
1430 /* Check if the given lkb conflicts with another lkb on the queue. */
1431
1432 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1433 {
1434         struct dlm_lkb *this;
1435
1436         list_for_each_entry(this, head, lkb_statequeue) {
1437                 if (this == lkb)
1438                         continue;
1439                 if (!modes_compat(this, lkb))
1440                         return 1;
1441         }
1442         return 0;
1443 }
1444
1445 /*
1446  * "A conversion deadlock arises with a pair of lock requests in the converting
1447  * queue for one resource.  The granted mode of each lock blocks the requested
1448  * mode of the other lock."
1449  *
1450  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1451  * convert queue from being granted, then deadlk/demote lkb.
1452  *
1453  * Example:
1454  * Granted Queue: empty
1455  * Convert Queue: NL->EX (first lock)
1456  *                PR->EX (second lock)
1457  *
1458  * The first lock can't be granted because of the granted mode of the second
1459  * lock and the second lock can't be granted because it's not first in the
1460  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1461  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1462  * flag set and return DEMOTED in the lksb flags.
1463  *
1464  * Originally, this function detected conv-deadlk in a more limited scope:
1465  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1466  * - if lkb1 was the first entry in the queue (not just earlier), and was
1467  *   blocked by the granted mode of lkb2, and there was nothing on the
1468  *   granted queue preventing lkb1 from being granted immediately, i.e.
1469  *   lkb2 was the only thing preventing lkb1 from being granted.
1470  *
1471  * That second condition meant we'd only say there was conv-deadlk if
1472  * resolving it (by demotion) would lead to the first lock on the convert
1473  * queue being granted right away.  It allowed conversion deadlocks to exist
1474  * between locks on the convert queue while they couldn't be granted anyway.
1475  *
1476  * Now, we detect and take action on conversion deadlocks immediately when
1477  * they're created, even if they may not be immediately consequential.  If
1478  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1479  * mode that would prevent lkb1's conversion from being granted, we do a
1480  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1481  * I think this means that the lkb_is_ahead condition below should always
1482  * be zero, i.e. there will never be conv-deadlk between two locks that are
1483  * both already on the convert queue.
1484  */
1485
1486 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1487 {
1488         struct dlm_lkb *lkb1;
1489         int lkb_is_ahead = 0;
1490
1491         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1492                 if (lkb1 == lkb2) {
1493                         lkb_is_ahead = 1;
1494                         continue;
1495                 }
1496
1497                 if (!lkb_is_ahead) {
1498                         if (!modes_compat(lkb2, lkb1))
1499                                 return 1;
1500                 } else {
1501                         if (!modes_compat(lkb2, lkb1) &&
1502                             !modes_compat(lkb1, lkb2))
1503                                 return 1;
1504                 }
1505         }
1506         return 0;
1507 }
1508
1509 /*
1510  * Return 1 if the lock can be granted, 0 otherwise.
1511  * Also detect and resolve conversion deadlocks.
1512  *
1513  * lkb is the lock to be granted
1514  *
1515  * now is 1 if the function is being called in the context of the
1516  * immediate request, it is 0 if called later, after the lock has been
1517  * queued.
1518  *
1519  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1520  */
1521
1522 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1523 {
1524         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1525
1526         /*
1527          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1528          * a new request for a NL mode lock being blocked.
1529          *
1530          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1531          * request, then it would be granted.  In essence, the use of this flag
1532          * tells the Lock Manager to expedite theis request by not considering
1533          * what may be in the CONVERTING or WAITING queues...  As of this
1534          * writing, the EXPEDITE flag can be used only with new requests for NL
1535          * mode locks.  This flag is not valid for conversion requests.
1536          *
1537          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1538          * conversion or used with a non-NL requested mode.  We also know an
1539          * EXPEDITE request is always granted immediately, so now must always
1540          * be 1.  The full condition to grant an expedite request: (now &&
1541          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1542          * therefore be shortened to just checking the flag.
1543          */
1544
1545         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1546                 return 1;
1547
1548         /*
1549          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1550          * added to the remaining conditions.
1551          */
1552
1553         if (queue_conflict(&r->res_grantqueue, lkb))
1554                 goto out;
1555
1556         /*
1557          * 6-3: By default, a conversion request is immediately granted if the
1558          * requested mode is compatible with the modes of all other granted
1559          * locks
1560          */
1561
1562         if (queue_conflict(&r->res_convertqueue, lkb))
1563                 goto out;
1564
1565         /*
1566          * 6-5: But the default algorithm for deciding whether to grant or
1567          * queue conversion requests does not by itself guarantee that such
1568          * requests are serviced on a "first come first serve" basis.  This, in
1569          * turn, can lead to a phenomenon known as "indefinate postponement".
1570          *
1571          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1572          * the system service employed to request a lock conversion.  This flag
1573          * forces certain conversion requests to be queued, even if they are
1574          * compatible with the granted modes of other locks on the same
1575          * resource.  Thus, the use of this flag results in conversion requests
1576          * being ordered on a "first come first servce" basis.
1577          *
1578          * DCT: This condition is all about new conversions being able to occur
1579          * "in place" while the lock remains on the granted queue (assuming
1580          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1581          * doesn't _have_ to go onto the convert queue where it's processed in
1582          * order.  The "now" variable is necessary to distinguish converts
1583          * being received and processed for the first time now, because once a
1584          * convert is moved to the conversion queue the condition below applies
1585          * requiring fifo granting.
1586          */
1587
1588         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1589                 return 1;
1590
1591         /*
1592          * The NOORDER flag is set to avoid the standard vms rules on grant
1593          * order.
1594          */
1595
1596         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1597                 return 1;
1598
1599         /*
1600          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1601          * granted until all other conversion requests ahead of it are granted
1602          * and/or canceled.
1603          */
1604
1605         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1606                 return 1;
1607
1608         /*
1609          * 6-4: By default, a new request is immediately granted only if all
1610          * three of the following conditions are satisfied when the request is
1611          * issued:
1612          * - The queue of ungranted conversion requests for the resource is
1613          *   empty.
1614          * - The queue of ungranted new requests for the resource is empty.
1615          * - The mode of the new request is compatible with the most
1616          *   restrictive mode of all granted locks on the resource.
1617          */
1618
1619         if (now && !conv && list_empty(&r->res_convertqueue) &&
1620             list_empty(&r->res_waitqueue))
1621                 return 1;
1622
1623         /*
1624          * 6-4: Once a lock request is in the queue of ungranted new requests,
1625          * it cannot be granted until the queue of ungranted conversion
1626          * requests is empty, all ungranted new requests ahead of it are
1627          * granted and/or canceled, and it is compatible with the granted mode
1628          * of the most restrictive lock granted on the resource.
1629          */
1630
1631         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1632             first_in_list(lkb, &r->res_waitqueue))
1633                 return 1;
1634  out:
1635         return 0;
1636 }
1637
1638 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1639                           int *err)
1640 {
1641         int rv;
1642         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1643         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1644
1645         if (err)
1646                 *err = 0;
1647
1648         rv = _can_be_granted(r, lkb, now);
1649         if (rv)
1650                 goto out;
1651
1652         /*
1653          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1654          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1655          * cancels one of the locks.
1656          */
1657
1658         if (is_convert && can_be_queued(lkb) &&
1659             conversion_deadlock_detect(r, lkb)) {
1660                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1661                         lkb->lkb_grmode = DLM_LOCK_NL;
1662                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1663                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1664                         if (err)
1665                                 *err = -EDEADLK;
1666                         else {
1667                                 log_print("can_be_granted deadlock %x now %d",
1668                                           lkb->lkb_id, now);
1669                                 dlm_dump_rsb(r);
1670                         }
1671                 }
1672                 goto out;
1673         }
1674
1675         /*
1676          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1677          * to grant a request in a mode other than the normal rqmode.  It's a
1678          * simple way to provide a big optimization to applications that can
1679          * use them.
1680          */
1681
1682         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1683                 alt = DLM_LOCK_PR;
1684         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1685                 alt = DLM_LOCK_CW;
1686
1687         if (alt) {
1688                 lkb->lkb_rqmode = alt;
1689                 rv = _can_be_granted(r, lkb, now);
1690                 if (rv)
1691                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1692                 else
1693                         lkb->lkb_rqmode = rqmode;
1694         }
1695  out:
1696         return rv;
1697 }
1698
1699 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1700    for locks pending on the convert list.  Once verified (watch for these
1701    log_prints), we should be able to just call _can_be_granted() and not
1702    bother with the demote/deadlk cases here (and there's no easy way to deal
1703    with a deadlk here, we'd have to generate something like grant_lock with
1704    the deadlk error.) */
1705
1706 /* Returns the highest requested mode of all blocked conversions; sets
1707    cw if there's a blocked conversion to DLM_LOCK_CW. */
1708
1709 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1710 {
1711         struct dlm_lkb *lkb, *s;
1712         int hi, demoted, quit, grant_restart, demote_restart;
1713         int deadlk;
1714
1715         quit = 0;
1716  restart:
1717         grant_restart = 0;
1718         demote_restart = 0;
1719         hi = DLM_LOCK_IV;
1720
1721         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1722                 demoted = is_demoted(lkb);
1723                 deadlk = 0;
1724
1725                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1726                         grant_lock_pending(r, lkb);
1727                         grant_restart = 1;
1728                         continue;
1729                 }
1730
1731                 if (!demoted && is_demoted(lkb)) {
1732                         log_print("WARN: pending demoted %x node %d %s",
1733                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1734                         demote_restart = 1;
1735                         continue;
1736                 }
1737
1738                 if (deadlk) {
1739                         log_print("WARN: pending deadlock %x node %d %s",
1740                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1741                         dlm_dump_rsb(r);
1742                         continue;
1743                 }
1744
1745                 hi = max_t(int, lkb->lkb_rqmode, hi);
1746
1747                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1748                         *cw = 1;
1749         }
1750
1751         if (grant_restart)
1752                 goto restart;
1753         if (demote_restart && !quit) {
1754                 quit = 1;
1755                 goto restart;
1756         }
1757
1758         return max_t(int, high, hi);
1759 }
1760
1761 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1762 {
1763         struct dlm_lkb *lkb, *s;
1764
1765         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1766                 if (can_be_granted(r, lkb, 0, NULL))
1767                         grant_lock_pending(r, lkb);
1768                 else {
1769                         high = max_t(int, lkb->lkb_rqmode, high);
1770                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1771                                 *cw = 1;
1772                 }
1773         }
1774
1775         return high;
1776 }
1777
1778 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1779    on either the convert or waiting queue.
1780    high is the largest rqmode of all locks blocked on the convert or
1781    waiting queue. */
1782
1783 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1784 {
1785         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1786                 if (gr->lkb_highbast < DLM_LOCK_EX)
1787                         return 1;
1788                 return 0;
1789         }
1790
1791         if (gr->lkb_highbast < high &&
1792             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1793                 return 1;
1794         return 0;
1795 }
1796
1797 static void grant_pending_locks(struct dlm_rsb *r)
1798 {
1799         struct dlm_lkb *lkb, *s;
1800         int high = DLM_LOCK_IV;
1801         int cw = 0;
1802
1803         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1804
1805         high = grant_pending_convert(r, high, &cw);
1806         high = grant_pending_wait(r, high, &cw);
1807
1808         if (high == DLM_LOCK_IV)
1809                 return;
1810
1811         /*
1812          * If there are locks left on the wait/convert queue then send blocking
1813          * ASTs to granted locks based on the largest requested mode (high)
1814          * found above.
1815          */
1816
1817         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1818                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1819                         if (cw && high == DLM_LOCK_PR &&
1820                             lkb->lkb_grmode == DLM_LOCK_PR)
1821                                 queue_bast(r, lkb, DLM_LOCK_CW);
1822                         else
1823                                 queue_bast(r, lkb, high);
1824                         lkb->lkb_highbast = high;
1825                 }
1826         }
1827 }
1828
1829 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1830 {
1831         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1832             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1833                 if (gr->lkb_highbast < DLM_LOCK_EX)
1834                         return 1;
1835                 return 0;
1836         }
1837
1838         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1839                 return 1;
1840         return 0;
1841 }
1842
1843 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1844                             struct dlm_lkb *lkb)
1845 {
1846         struct dlm_lkb *gr;
1847
1848         list_for_each_entry(gr, head, lkb_statequeue) {
1849                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1850                         queue_bast(r, gr, lkb->lkb_rqmode);
1851                         gr->lkb_highbast = lkb->lkb_rqmode;
1852                 }
1853         }
1854 }
1855
1856 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1857 {
1858         send_bast_queue(r, &r->res_grantqueue, lkb);
1859 }
1860
1861 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1862 {
1863         send_bast_queue(r, &r->res_grantqueue, lkb);
1864         send_bast_queue(r, &r->res_convertqueue, lkb);
1865 }
1866
1867 /* set_master(r, lkb) -- set the master nodeid of a resource
1868
1869    The purpose of this function is to set the nodeid field in the given
1870    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1871    known, it can just be copied to the lkb and the function will return
1872    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1873    before it can be copied to the lkb.
1874
1875    When the rsb nodeid is being looked up remotely, the initial lkb
1876    causing the lookup is kept on the ls_waiters list waiting for the
1877    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1878    on the rsb's res_lookup list until the master is verified.
1879
1880    Return values:
1881    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1882    1: the rsb master is not available and the lkb has been placed on
1883       a wait queue
1884 */
1885
1886 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1887 {
1888         struct dlm_ls *ls = r->res_ls;
1889         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1890
1891         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1892                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1893                 r->res_first_lkid = lkb->lkb_id;
1894                 lkb->lkb_nodeid = r->res_nodeid;
1895                 return 0;
1896         }
1897
1898         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1899                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1900                 return 1;
1901         }
1902
1903         if (r->res_nodeid == 0) {
1904                 lkb->lkb_nodeid = 0;
1905                 return 0;
1906         }
1907
1908         if (r->res_nodeid > 0) {
1909                 lkb->lkb_nodeid = r->res_nodeid;
1910                 return 0;
1911         }
1912
1913         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1914
1915         dir_nodeid = dlm_dir_nodeid(r);
1916
1917         if (dir_nodeid != our_nodeid) {
1918                 r->res_first_lkid = lkb->lkb_id;
1919                 send_lookup(r, lkb);
1920                 return 1;
1921         }
1922
1923         for (i = 0; i < 2; i++) {
1924                 /* It's possible for dlm_scand to remove an old rsb for
1925                    this same resource from the toss list, us to create
1926                    a new one, look up the master locally, and find it
1927                    already exists just before dlm_scand does the
1928                    dir_remove() on the previous rsb. */
1929
1930                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1931                                        r->res_length, &ret_nodeid);
1932                 if (!error)
1933                         break;
1934                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1935                 schedule();
1936         }
1937         if (error && error != -EEXIST)
1938                 return error;
1939
1940         if (ret_nodeid == our_nodeid) {
1941                 r->res_first_lkid = 0;
1942                 r->res_nodeid = 0;
1943                 lkb->lkb_nodeid = 0;
1944         } else {
1945                 r->res_first_lkid = lkb->lkb_id;
1946                 r->res_nodeid = ret_nodeid;
1947                 lkb->lkb_nodeid = ret_nodeid;
1948         }
1949         return 0;
1950 }
1951
1952 static void process_lookup_list(struct dlm_rsb *r)
1953 {
1954         struct dlm_lkb *lkb, *safe;
1955
1956         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1957                 list_del_init(&lkb->lkb_rsb_lookup);
1958                 _request_lock(r, lkb);
1959                 schedule();
1960         }
1961 }
1962
1963 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1964
1965 static void confirm_master(struct dlm_rsb *r, int error)
1966 {
1967         struct dlm_lkb *lkb;
1968
1969         if (!r->res_first_lkid)
1970                 return;
1971
1972         switch (error) {
1973         case 0:
1974         case -EINPROGRESS:
1975                 r->res_first_lkid = 0;
1976                 process_lookup_list(r);
1977                 break;
1978
1979         case -EAGAIN:
1980         case -EBADR:
1981         case -ENOTBLK:
1982                 /* the remote request failed and won't be retried (it was
1983                    a NOQUEUE, or has been canceled/unlocked); make a waiting
1984                    lkb the first_lkid */
1985
1986                 r->res_first_lkid = 0;
1987
1988                 if (!list_empty(&r->res_lookup)) {
1989                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1990                                          lkb_rsb_lookup);
1991                         list_del_init(&lkb->lkb_rsb_lookup);
1992                         r->res_first_lkid = lkb->lkb_id;
1993                         _request_lock(r, lkb);
1994                 }
1995                 break;
1996
1997         default:
1998                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1999         }
2000 }
2001
2002 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2003                          int namelen, unsigned long timeout_cs,
2004                          void (*ast) (void *astparam),
2005                          void *astparam,
2006                          void (*bast) (void *astparam, int mode),
2007                          struct dlm_args *args)
2008 {
2009         int rv = -EINVAL;
2010
2011         /* check for invalid arg usage */
2012
2013         if (mode < 0 || mode > DLM_LOCK_EX)
2014                 goto out;
2015
2016         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2017                 goto out;
2018
2019         if (flags & DLM_LKF_CANCEL)
2020                 goto out;
2021
2022         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2023                 goto out;
2024
2025         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2026                 goto out;
2027
2028         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2029                 goto out;
2030
2031         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2032                 goto out;
2033
2034         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2035                 goto out;
2036
2037         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2038                 goto out;
2039
2040         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2041                 goto out;
2042
2043         if (!ast || !lksb)
2044                 goto out;
2045
2046         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2047                 goto out;
2048
2049         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2050                 goto out;
2051
2052         /* these args will be copied to the lkb in validate_lock_args,
2053            it cannot be done now because when converting locks, fields in
2054            an active lkb cannot be modified before locking the rsb */
2055
2056         args->flags = flags;
2057         args->astfn = ast;
2058         args->astparam = astparam;
2059         args->bastfn = bast;
2060         args->timeout = timeout_cs;
2061         args->mode = mode;
2062         args->lksb = lksb;
2063         rv = 0;
2064  out:
2065         return rv;
2066 }
2067
2068 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2069 {
2070         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2071                       DLM_LKF_FORCEUNLOCK))
2072                 return -EINVAL;
2073
2074         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2075                 return -EINVAL;
2076
2077         args->flags = flags;
2078         args->astparam = astarg;
2079         return 0;
2080 }
2081
2082 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2083                               struct dlm_args *args)
2084 {
2085         int rv = -EINVAL;
2086
2087         if (args->flags & DLM_LKF_CONVERT) {
2088                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2089                         goto out;
2090
2091                 if (args->flags & DLM_LKF_QUECVT &&
2092                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2093                         goto out;
2094
2095                 rv = -EBUSY;
2096                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2097                         goto out;
2098
2099                 if (lkb->lkb_wait_type)
2100                         goto out;
2101
2102                 if (is_overlap(lkb))
2103                         goto out;
2104         }
2105
2106         lkb->lkb_exflags = args->flags;
2107         lkb->lkb_sbflags = 0;
2108         lkb->lkb_astfn = args->astfn;
2109         lkb->lkb_astparam = args->astparam;
2110         lkb->lkb_bastfn = args->bastfn;
2111         lkb->lkb_rqmode = args->mode;
2112         lkb->lkb_lksb = args->lksb;
2113         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2114         lkb->lkb_ownpid = (int) current->pid;
2115         lkb->lkb_timeout_cs = args->timeout;
2116         rv = 0;
2117  out:
2118         if (rv)
2119                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2120                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2121                           lkb->lkb_status, lkb->lkb_wait_type,
2122                           lkb->lkb_resource->res_name);
2123         return rv;
2124 }
2125
2126 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2127    for success */
2128
2129 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2130    because there may be a lookup in progress and it's valid to do
2131    cancel/unlockf on it */
2132
2133 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2134 {
2135         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2136         int rv = -EINVAL;
2137
2138         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2139                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2140                 dlm_print_lkb(lkb);
2141                 goto out;
2142         }
2143
2144         /* an lkb may still exist even though the lock is EOL'ed due to a
2145            cancel, unlock or failed noqueue request; an app can't use these
2146            locks; return same error as if the lkid had not been found at all */
2147
2148         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2149                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2150                 rv = -ENOENT;
2151                 goto out;
2152         }
2153
2154         /* an lkb may be waiting for an rsb lookup to complete where the
2155            lookup was initiated by another lock */
2156
2157         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2158                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2159                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2160                         list_del_init(&lkb->lkb_rsb_lookup);
2161                         queue_cast(lkb->lkb_resource, lkb,
2162                                    args->flags & DLM_LKF_CANCEL ?
2163                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2164                         unhold_lkb(lkb); /* undoes create_lkb() */
2165                 }
2166                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2167                 rv = -EBUSY;
2168                 goto out;
2169         }
2170
2171         /* cancel not allowed with another cancel/unlock in progress */
2172
2173         if (args->flags & DLM_LKF_CANCEL) {
2174                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2175                         goto out;
2176
2177                 if (is_overlap(lkb))
2178                         goto out;
2179
2180                 /* don't let scand try to do a cancel */
2181                 del_timeout(lkb);
2182
2183                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2184                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2185                         rv = -EBUSY;
2186                         goto out;
2187                 }
2188
2189                 /* there's nothing to cancel */
2190                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2191                     !lkb->lkb_wait_type) {
2192                         rv = -EBUSY;
2193                         goto out;
2194                 }
2195
2196                 switch (lkb->lkb_wait_type) {
2197                 case DLM_MSG_LOOKUP:
2198                 case DLM_MSG_REQUEST:
2199                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2200                         rv = -EBUSY;
2201                         goto out;
2202                 case DLM_MSG_UNLOCK:
2203                 case DLM_MSG_CANCEL:
2204                         goto out;
2205                 }
2206                 /* add_to_waiters() will set OVERLAP_CANCEL */
2207                 goto out_ok;
2208         }
2209
2210         /* do we need to allow a force-unlock if there's a normal unlock
2211            already in progress?  in what conditions could the normal unlock
2212            fail such that we'd want to send a force-unlock to be sure? */
2213
2214         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2215                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2216                         goto out;
2217
2218                 if (is_overlap_unlock(lkb))
2219                         goto out;
2220
2221                 /* don't let scand try to do a cancel */
2222                 del_timeout(lkb);
2223
2224                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2225                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2226                         rv = -EBUSY;
2227                         goto out;
2228                 }
2229
2230                 switch (lkb->lkb_wait_type) {
2231                 case DLM_MSG_LOOKUP:
2232                 case DLM_MSG_REQUEST:
2233                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2234                         rv = -EBUSY;
2235                         goto out;
2236                 case DLM_MSG_UNLOCK:
2237                         goto out;
2238                 }
2239                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2240                 goto out_ok;
2241         }
2242
2243         /* normal unlock not allowed if there's any op in progress */
2244         rv = -EBUSY;
2245         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2246                 goto out;
2247
2248  out_ok:
2249         /* an overlapping op shouldn't blow away exflags from other op */
2250         lkb->lkb_exflags |= args->flags;
2251         lkb->lkb_sbflags = 0;
2252         lkb->lkb_astparam = args->astparam;
2253         rv = 0;
2254  out:
2255         if (rv)
2256                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2257                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2258                           args->flags, lkb->lkb_wait_type,
2259                           lkb->lkb_resource->res_name);
2260         return rv;
2261 }
2262
2263 /*
2264  * Four stage 4 varieties:
2265  * do_request(), do_convert(), do_unlock(), do_cancel()
2266  * These are called on the master node for the given lock and
2267  * from the central locking logic.
2268  */
2269
2270 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2271 {
2272         int error = 0;
2273
2274         if (can_be_granted(r, lkb, 1, NULL)) {
2275                 grant_lock(r, lkb);
2276                 queue_cast(r, lkb, 0);
2277                 goto out;
2278         }
2279
2280         if (can_be_queued(lkb)) {
2281                 error = -EINPROGRESS;
2282                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2283                 send_blocking_asts(r, lkb);
2284                 add_timeout(lkb);
2285                 goto out;
2286         }
2287
2288         error = -EAGAIN;
2289         if (force_blocking_asts(lkb))
2290                 send_blocking_asts_all(r, lkb);
2291         queue_cast(r, lkb, -EAGAIN);
2292
2293  out:
2294         return error;
2295 }
2296
2297 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2298 {
2299         int error = 0;
2300         int deadlk = 0;
2301
2302         /* changing an existing lock may allow others to be granted */
2303
2304         if (can_be_granted(r, lkb, 1, &deadlk)) {
2305                 grant_lock(r, lkb);
2306                 queue_cast(r, lkb, 0);
2307                 grant_pending_locks(r);
2308                 goto out;
2309         }
2310
2311         /* can_be_granted() detected that this lock would block in a conversion
2312            deadlock, so we leave it on the granted queue and return EDEADLK in
2313            the ast for the convert. */
2314
2315         if (deadlk) {
2316                 /* it's left on the granted queue */
2317                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2318                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2319                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2320                 revert_lock(r, lkb);
2321                 queue_cast(r, lkb, -EDEADLK);
2322                 error = -EDEADLK;
2323                 goto out;
2324         }
2325
2326         /* is_demoted() means the can_be_granted() above set the grmode
2327            to NL, and left us on the granted queue.  This auto-demotion
2328            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2329            now grantable.  We have to try to grant other converting locks
2330            before we try again to grant this one. */
2331
2332         if (is_demoted(lkb)) {
2333                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2334                 if (_can_be_granted(r, lkb, 1)) {
2335                         grant_lock(r, lkb);
2336                         queue_cast(r, lkb, 0);
2337                         grant_pending_locks(r);
2338                         goto out;
2339                 }
2340                 /* else fall through and move to convert queue */
2341         }
2342
2343         if (can_be_queued(lkb)) {
2344                 error = -EINPROGRESS;
2345                 del_lkb(r, lkb);
2346                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2347                 send_blocking_asts(r, lkb);
2348                 add_timeout(lkb);
2349                 goto out;
2350         }
2351
2352         error = -EAGAIN;
2353         if (force_blocking_asts(lkb))
2354                 send_blocking_asts_all(r, lkb);
2355         queue_cast(r, lkb, -EAGAIN);
2356
2357  out:
2358         return error;
2359 }
2360
2361 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2362 {
2363         remove_lock(r, lkb);
2364         queue_cast(r, lkb, -DLM_EUNLOCK);
2365         grant_pending_locks(r);
2366         return -DLM_EUNLOCK;
2367 }
2368
2369 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2370  
2371 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2372 {
2373         int error;
2374
2375         error = revert_lock(r, lkb);
2376         if (error) {
2377                 queue_cast(r, lkb, -DLM_ECANCEL);
2378                 grant_pending_locks(r);
2379                 return -DLM_ECANCEL;
2380         }
2381         return 0;
2382 }
2383
2384 /*
2385  * Four stage 3 varieties:
2386  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2387  */
2388
2389 /* add a new lkb to a possibly new rsb, called by requesting process */
2390
2391 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2392 {
2393         int error;
2394
2395         /* set_master: sets lkb nodeid from r */
2396
2397         error = set_master(r, lkb);
2398         if (error < 0)
2399                 goto out;
2400         if (error) {
2401                 error = 0;
2402                 goto out;
2403         }
2404
2405         if (is_remote(r))
2406                 /* receive_request() calls do_request() on remote node */
2407                 error = send_request(r, lkb);
2408         else
2409                 error = do_request(r, lkb);
2410  out:
2411         return error;
2412 }
2413
2414 /* change some property of an existing lkb, e.g. mode */
2415
2416 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2417 {
2418         int error;
2419
2420         if (is_remote(r))
2421                 /* receive_convert() calls do_convert() on remote node */
2422                 error = send_convert(r, lkb);
2423         else
2424                 error = do_convert(r, lkb);
2425
2426         return error;
2427 }
2428
2429 /* remove an existing lkb from the granted queue */
2430
2431 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2432 {
2433         int error;
2434
2435         if (is_remote(r))
2436                 /* receive_unlock() calls do_unlock() on remote node */
2437                 error = send_unlock(r, lkb);
2438         else
2439                 error = do_unlock(r, lkb);
2440
2441         return error;
2442 }
2443
2444 /* remove an existing lkb from the convert or wait queue */
2445
2446 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2447 {
2448         int error;
2449
2450         if (is_remote(r))
2451                 /* receive_cancel() calls do_cancel() on remote node */
2452                 error = send_cancel(r, lkb);
2453         else
2454                 error = do_cancel(r, lkb);
2455
2456         return error;
2457 }
2458
2459 /*
2460  * Four stage 2 varieties:
2461  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2462  */
2463
2464 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2465                         int len, struct dlm_args *args)
2466 {
2467         struct dlm_rsb *r;
2468         int error;
2469
2470         error = validate_lock_args(ls, lkb, args);
2471         if (error)
2472                 goto out;
2473
2474         error = find_rsb(ls, name, len, R_CREATE, &r);
2475         if (error)
2476                 goto out;
2477
2478         lock_rsb(r);
2479
2480         attach_lkb(r, lkb);
2481         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2482
2483         error = _request_lock(r, lkb);
2484
2485         unlock_rsb(r);
2486         put_rsb(r);
2487
2488  out:
2489         return error;
2490 }
2491
2492 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2493                         struct dlm_args *args)
2494 {
2495         struct dlm_rsb *r;
2496         int error;
2497
2498         r = lkb->lkb_resource;
2499
2500         hold_rsb(r);
2501         lock_rsb(r);
2502
2503         error = validate_lock_args(ls, lkb, args);
2504         if (error)
2505                 goto out;
2506
2507         error = _convert_lock(r, lkb);
2508  out:
2509         unlock_rsb(r);
2510         put_rsb(r);
2511         return error;
2512 }
2513
2514 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2515                        struct dlm_args *args)
2516 {
2517         struct dlm_rsb *r;
2518         int error;
2519
2520         r = lkb->lkb_resource;
2521
2522         hold_rsb(r);
2523         lock_rsb(r);
2524
2525         error = validate_unlock_args(lkb, args);
2526         if (error)
2527                 goto out;
2528
2529         error = _unlock_lock(r, lkb);
2530  out:
2531         unlock_rsb(r);
2532         put_rsb(r);
2533         return error;
2534 }
2535
2536 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2537                        struct dlm_args *args)
2538 {
2539         struct dlm_rsb *r;
2540         int error;
2541
2542         r = lkb->lkb_resource;
2543
2544         hold_rsb(r);
2545         lock_rsb(r);
2546
2547         error = validate_unlock_args(lkb, args);
2548         if (error)
2549                 goto out;
2550
2551         error = _cancel_lock(r, lkb);
2552  out:
2553         unlock_rsb(r);
2554         put_rsb(r);
2555         return error;
2556 }
2557
2558 /*
2559  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2560  */
2561
2562 int dlm_lock(dlm_lockspace_t *lockspace,
2563              int mode,
2564              struct dlm_lksb *lksb,
2565              uint32_t flags,
2566              void *name,
2567              unsigned int namelen,
2568              uint32_t parent_lkid,
2569              void (*ast) (void *astarg),
2570              void *astarg,
2571              void (*bast) (void *astarg, int mode))
2572 {
2573         struct dlm_ls *ls;
2574         struct dlm_lkb *lkb;
2575         struct dlm_args args;
2576         int error, convert = flags & DLM_LKF_CONVERT;
2577
2578         ls = dlm_find_lockspace_local(lockspace);
2579         if (!ls)
2580                 return -EINVAL;
2581
2582         dlm_lock_recovery(ls);
2583
2584         if (convert)
2585                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2586         else
2587                 error = create_lkb(ls, &lkb);
2588
2589         if (error)
2590                 goto out;
2591
2592         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2593                               astarg, bast, &args);
2594         if (error)
2595                 goto out_put;
2596
2597         if (convert)
2598                 error = convert_lock(ls, lkb, &args);
2599         else
2600                 error = request_lock(ls, lkb, name, namelen, &args);
2601
2602         if (error == -EINPROGRESS)
2603                 error = 0;
2604  out_put:
2605         if (convert || error)
2606                 __put_lkb(ls, lkb);
2607         if (error == -EAGAIN || error == -EDEADLK)
2608                 error = 0;
2609  out:
2610         dlm_unlock_recovery(ls);
2611         dlm_put_lockspace(ls);
2612         return error;
2613 }
2614
2615 int dlm_unlock(dlm_lockspace_t *lockspace,
2616                uint32_t lkid,
2617                uint32_t flags,
2618                struct dlm_lksb *lksb,
2619                void *astarg)
2620 {
2621         struct dlm_ls *ls;
2622         struct dlm_lkb *lkb;
2623         struct dlm_args args;
2624         int error;
2625
2626         ls = dlm_find_lockspace_local(lockspace);
2627         if (!ls)
2628                 return -EINVAL;
2629
2630         dlm_lock_recovery(ls);
2631
2632         error = find_lkb(ls, lkid, &lkb);
2633         if (error)
2634                 goto out;
2635
2636         error = set_unlock_args(flags, astarg, &args);
2637         if (error)
2638                 goto out_put;
2639
2640         if (flags & DLM_LKF_CANCEL)
2641                 error = cancel_lock(ls, lkb, &args);
2642         else
2643                 error = unlock_lock(ls, lkb, &args);
2644
2645         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2646                 error = 0;
2647         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2648                 error = 0;
2649  out_put:
2650         dlm_put_lkb(lkb);
2651  out:
2652         dlm_unlock_recovery(ls);
2653         dlm_put_lockspace(ls);
2654         return error;
2655 }
2656
2657 /*
2658  * send/receive routines for remote operations and replies
2659  *
2660  * send_args
2661  * send_common
2662  * send_request                 receive_request
2663  * send_convert                 receive_convert
2664  * send_unlock                  receive_unlock
2665  * send_cancel                  receive_cancel
2666  * send_grant                   receive_grant
2667  * send_bast                    receive_bast
2668  * send_lookup                  receive_lookup
2669  * send_remove                  receive_remove
2670  *
2671  *                              send_common_reply
2672  * receive_request_reply        send_request_reply
2673  * receive_convert_reply        send_convert_reply
2674  * receive_unlock_reply         send_unlock_reply
2675  * receive_cancel_reply         send_cancel_reply
2676  * receive_lookup_reply         send_lookup_reply
2677  */
2678
2679 static int _create_message(struct dlm_ls *ls, int mb_len,
2680                            int to_nodeid, int mstype,
2681                            struct dlm_message **ms_ret,
2682                            struct dlm_mhandle **mh_ret)
2683 {
2684         struct dlm_message *ms;
2685         struct dlm_mhandle *mh;
2686         char *mb;
2687
2688         /* get_buffer gives us a message handle (mh) that we need to
2689            pass into lowcomms_commit and a message buffer (mb) that we
2690            write our data into */
2691
2692         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2693         if (!mh)
2694                 return -ENOBUFS;
2695
2696         memset(mb, 0, mb_len);
2697
2698         ms = (struct dlm_message *) mb;
2699
2700         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2701         ms->m_header.h_lockspace = ls->ls_global_id;
2702         ms->m_header.h_nodeid = dlm_our_nodeid();
2703         ms->m_header.h_length = mb_len;
2704         ms->m_header.h_cmd = DLM_MSG;
2705
2706         ms->m_type = mstype;
2707
2708         *mh_ret = mh;
2709         *ms_ret = ms;
2710         return 0;
2711 }
2712
2713 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2714                           int to_nodeid, int mstype,
2715                           struct dlm_message **ms_ret,
2716                           struct dlm_mhandle **mh_ret)
2717 {
2718         int mb_len = sizeof(struct dlm_message);
2719
2720         switch (mstype) {
2721         case DLM_MSG_REQUEST:
2722         case DLM_MSG_LOOKUP:
2723         case DLM_MSG_REMOVE:
2724                 mb_len += r->res_length;
2725                 break;
2726         case DLM_MSG_CONVERT:
2727         case DLM_MSG_UNLOCK:
2728         case DLM_MSG_REQUEST_REPLY:
2729         case DLM_MSG_CONVERT_REPLY:
2730         case DLM_MSG_GRANT:
2731                 if (lkb && lkb->lkb_lvbptr)
2732                         mb_len += r->res_ls->ls_lvblen;
2733                 break;
2734         }
2735
2736         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2737                                ms_ret, mh_ret);
2738 }
2739
2740 /* further lowcomms enhancements or alternate implementations may make
2741    the return value from this function useful at some point */
2742
2743 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2744 {
2745         dlm_message_out(ms);
2746         dlm_lowcomms_commit_buffer(mh);
2747         return 0;
2748 }
2749
2750 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2751                       struct dlm_message *ms)
2752 {
2753         ms->m_nodeid   = lkb->lkb_nodeid;
2754         ms->m_pid      = lkb->lkb_ownpid;
2755         ms->m_lkid     = lkb->lkb_id;
2756         ms->m_remid    = lkb->lkb_remid;
2757         ms->m_exflags  = lkb->lkb_exflags;
2758         ms->m_sbflags  = lkb->lkb_sbflags;
2759         ms->m_flags    = lkb->lkb_flags;
2760         ms->m_lvbseq   = lkb->lkb_lvbseq;
2761         ms->m_status   = lkb->lkb_status;
2762         ms->m_grmode   = lkb->lkb_grmode;
2763         ms->m_rqmode   = lkb->lkb_rqmode;
2764         ms->m_hash     = r->res_hash;
2765
2766         /* m_result and m_bastmode are set from function args,
2767            not from lkb fields */
2768
2769         if (lkb->lkb_bastfn)
2770                 ms->m_asts |= AST_BAST;
2771         if (lkb->lkb_astfn)
2772                 ms->m_asts |= AST_COMP;
2773
2774         /* compare with switch in create_message; send_remove() doesn't
2775            use send_args() */
2776
2777         switch (ms->m_type) {
2778         case DLM_MSG_REQUEST:
2779         case DLM_MSG_LOOKUP:
2780                 memcpy(ms->m_extra, r->res_name, r->res_length);
2781                 break;
2782         case DLM_MSG_CONVERT:
2783         case DLM_MSG_UNLOCK:
2784         case DLM_MSG_REQUEST_REPLY:
2785         case DLM_MSG_CONVERT_REPLY:
2786         case DLM_MSG_GRANT:
2787                 if (!lkb->lkb_lvbptr)
2788                         break;
2789                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2790                 break;
2791         }
2792 }
2793
2794 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2795 {
2796         struct dlm_message *ms;
2797         struct dlm_mhandle *mh;
2798         int to_nodeid, error;
2799
2800         error = add_to_waiters(lkb, mstype);
2801         if (error)
2802                 return error;
2803
2804         to_nodeid = r->res_nodeid;
2805
2806         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2807         if (error)
2808                 goto fail;
2809
2810         send_args(r, lkb, ms);
2811
2812         error = send_message(mh, ms);
2813         if (error)
2814                 goto fail;
2815         return 0;
2816
2817  fail:
2818         remove_from_waiters(lkb, msg_reply_type(mstype));
2819         return error;
2820 }
2821
2822 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2823 {
2824         return send_common(r, lkb, DLM_MSG_REQUEST);
2825 }
2826
2827 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2828 {
2829         int error;
2830
2831         error = send_common(r, lkb, DLM_MSG_CONVERT);
2832
2833         /* down conversions go without a reply from the master */
2834         if (!error && down_conversion(lkb)) {
2835                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2836                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2837                 r->res_ls->ls_stub_ms.m_result = 0;
2838                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2839                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2840         }
2841
2842         return error;
2843 }
2844
2845 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2846    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2847    that the master is still correct. */
2848
2849 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2850 {
2851         return send_common(r, lkb, DLM_MSG_UNLOCK);
2852 }
2853
2854 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2855 {
2856         return send_common(r, lkb, DLM_MSG_CANCEL);
2857 }
2858
2859 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2860 {
2861         struct dlm_message *ms;
2862         struct dlm_mhandle *mh;
2863         int to_nodeid, error;
2864
2865         to_nodeid = lkb->lkb_nodeid;
2866
2867         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2868         if (error)
2869                 goto out;
2870
2871         send_args(r, lkb, ms);
2872
2873         ms->m_result = 0;
2874
2875         error = send_message(mh, ms);
2876  out:
2877         return error;
2878 }
2879
2880 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2881 {
2882         struct dlm_message *ms;
2883         struct dlm_mhandle *mh;
2884         int to_nodeid, error;
2885
2886         to_nodeid = lkb->lkb_nodeid;
2887
2888         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2889         if (error)
2890                 goto out;
2891
2892         send_args(r, lkb, ms);
2893
2894         ms->m_bastmode = mode;
2895
2896         error = send_message(mh, ms);
2897  out:
2898         return error;
2899 }
2900
2901 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2902 {
2903         struct dlm_message *ms;
2904         struct dlm_mhandle *mh;
2905         int to_nodeid, error;
2906
2907         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2908         if (error)
2909                 return error;
2910
2911         to_nodeid = dlm_dir_nodeid(r);
2912
2913         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2914         if (error)
2915                 goto fail;
2916
2917         send_args(r, lkb, ms);
2918
2919         error = send_message(mh, ms);
2920         if (error)
2921                 goto fail;
2922         return 0;
2923
2924  fail:
2925         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2926         return error;
2927 }
2928
2929 static int send_remove(struct dlm_rsb *r)
2930 {
2931         struct dlm_message *ms;
2932         struct dlm_mhandle *mh;
2933         int to_nodeid, error;
2934
2935         to_nodeid = dlm_dir_nodeid(r);
2936
2937         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2938         if (error)
2939                 goto out;
2940
2941         memcpy(ms->m_extra, r->res_name, r->res_length);
2942         ms->m_hash = r->res_hash;
2943
2944         error = send_message(mh, ms);
2945  out:
2946         return error;
2947 }
2948
2949 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2950                              int mstype, int rv)
2951 {
2952         struct dlm_message *ms;
2953         struct dlm_mhandle *mh;
2954         int to_nodeid, error;
2955
2956         to_nodeid = lkb->lkb_nodeid;
2957
2958         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2959         if (error)
2960                 goto out;
2961
2962         send_args(r, lkb, ms);
2963
2964         ms->m_result = rv;
2965
2966         error = send_message(mh, ms);
2967  out:
2968         return error;
2969 }
2970
2971 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2972 {
2973         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2974 }
2975
2976 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2977 {
2978         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2979 }
2980
2981 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2982 {
2983         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2984 }
2985
2986 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2987 {
2988         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2989 }
2990
2991 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2992                              int ret_nodeid, int rv)
2993 {
2994         struct dlm_rsb *r = &ls->ls_stub_rsb;
2995         struct dlm_message *ms;
2996         struct dlm_mhandle *mh;
2997         int error, nodeid = ms_in->m_header.h_nodeid;
2998
2999         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3000         if (error)
3001                 goto out;
3002
3003         ms->m_lkid = ms_in->m_lkid;
3004         ms->m_result = rv;
3005         ms->m_nodeid = ret_nodeid;
3006
3007         error = send_message(mh, ms);
3008  out:
3009         return error;
3010 }
3011
3012 /* which args we save from a received message depends heavily on the type
3013    of message, unlike the send side where we can safely send everything about
3014    the lkb for any type of message */
3015
3016 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3017 {
3018         lkb->lkb_exflags = ms->m_exflags;
3019         lkb->lkb_sbflags = ms->m_sbflags;
3020         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3021                          (ms->m_flags & 0x0000FFFF);
3022 }
3023
3024 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3025 {
3026         lkb->lkb_sbflags = ms->m_sbflags;
3027         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3028                          (ms->m_flags & 0x0000FFFF);
3029 }
3030
3031 static int receive_extralen(struct dlm_message *ms)
3032 {
3033         return (ms->m_header.h_length - sizeof(struct dlm_message));
3034 }
3035
3036 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3037                        struct dlm_message *ms)
3038 {
3039         int len;
3040
3041         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3042                 if (!lkb->lkb_lvbptr)
3043                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3044                 if (!lkb->lkb_lvbptr)
3045                         return -ENOMEM;
3046                 len = receive_extralen(ms);
3047                 if (len > DLM_RESNAME_MAXLEN)
3048                         len = DLM_RESNAME_MAXLEN;
3049                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3050         }
3051         return 0;
3052 }
3053
3054 static void fake_bastfn(void *astparam, int mode)
3055 {
3056         log_print("fake_bastfn should not be called");
3057 }
3058
3059 static void fake_astfn(void *astparam)
3060 {
3061         log_print("fake_astfn should not be called");
3062 }
3063
3064 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3065                                 struct dlm_message *ms)
3066 {
3067         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3068         lkb->lkb_ownpid = ms->m_pid;
3069         lkb->lkb_remid = ms->m_lkid;
3070         lkb->lkb_grmode = DLM_LOCK_IV;
3071         lkb->lkb_rqmode = ms->m_rqmode;
3072
3073         lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3074         lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3075
3076         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3077                 /* lkb was just created so there won't be an lvb yet */
3078                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3079                 if (!lkb->lkb_lvbptr)
3080                         return -ENOMEM;
3081         }
3082
3083         return 0;
3084 }
3085
3086 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3087                                 struct dlm_message *ms)
3088 {
3089         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3090                 return -EBUSY;
3091
3092         if (receive_lvb(ls, lkb, ms))
3093                 return -ENOMEM;
3094
3095         lkb->lkb_rqmode = ms->m_rqmode;
3096         lkb->lkb_lvbseq = ms->m_lvbseq;
3097
3098         return 0;
3099 }
3100
3101 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3102                                struct dlm_message *ms)
3103 {
3104         if (receive_lvb(ls, lkb, ms))
3105                 return -ENOMEM;
3106         return 0;
3107 }
3108
3109 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3110    uses to send a reply and that the remote end uses to process the reply. */
3111
3112 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3113 {
3114         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3115         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3116         lkb->lkb_remid = ms->m_lkid;
3117 }
3118
3119 /* This is called after the rsb is locked so that we can safely inspect
3120    fields in the lkb. */
3121
3122 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3123 {
3124         int from = ms->m_header.h_nodeid;
3125         int error = 0;
3126
3127         switch (ms->m_type) {
3128         case DLM_MSG_CONVERT:
3129         case DLM_MSG_UNLOCK:
3130         case DLM_MSG_CANCEL:
3131                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3132                         error = -EINVAL;
3133                 break;
3134
3135         case DLM_MSG_CONVERT_REPLY:
3136         case DLM_MSG_UNLOCK_REPLY:
3137         case DLM_MSG_CANCEL_REPLY:
3138         case DLM_MSG_GRANT:
3139         case DLM_MSG_BAST:
3140                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3141                         error = -EINVAL;
3142                 break;
3143
3144         case DLM_MSG_REQUEST_REPLY:
3145                 if (!is_process_copy(lkb))
3146                         error = -EINVAL;
3147                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3148                         error = -EINVAL;
3149                 break;
3150
3151         default:
3152                 error = -EINVAL;
3153         }
3154
3155         if (error)
3156                 log_error(lkb->lkb_resource->res_ls,
3157                           "ignore invalid message %d from %d %x %x %x %d",
3158                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3159                           lkb->lkb_flags, lkb->lkb_nodeid);
3160         return error;
3161 }
3162
3163 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3164 {
3165         struct dlm_lkb *lkb;
3166         struct dlm_rsb *r;
3167         int error, namelen;
3168
3169         error = create_lkb(ls, &lkb);
3170         if (error)
3171                 goto fail;
3172
3173         receive_flags(lkb, ms);
3174         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3175         error = receive_request_args(ls, lkb, ms);
3176         if (error) {
3177                 __put_lkb(ls, lkb);
3178                 goto fail;
3179         }
3180
3181         namelen = receive_extralen(ms);
3182
3183         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3184         if (error) {
3185                 __put_lkb(ls, lkb);
3186                 goto fail;
3187         }
3188
3189         lock_rsb(r);
3190
3191         attach_lkb(r, lkb);
3192         error = do_request(r, lkb);
3193         send_request_reply(r, lkb, error);
3194
3195         unlock_rsb(r);
3196         put_rsb(r);
3197
3198         if (error == -EINPROGRESS)
3199                 error = 0;
3200         if (error)
3201                 dlm_put_lkb(lkb);
3202         return;
3203
3204  fail:
3205         setup_stub_lkb(ls, ms);
3206         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3207 }
3208
3209 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3210 {
3211         struct dlm_lkb *lkb;
3212         struct dlm_rsb *r;
3213         int error, reply = 1;
3214
3215         error = find_lkb(ls, ms->m_remid, &lkb);
3216         if (error)
3217                 goto fail;
3218
3219         r = lkb->lkb_resource;
3220
3221         hold_rsb(r);
3222         lock_rsb(r);
3223
3224         error = validate_message(lkb, ms);
3225         if (error)
3226                 goto out;
3227
3228         receive_flags(lkb, ms);
3229         error = receive_convert_args(ls, lkb, ms);
3230         if (error)
3231                 goto out_reply;
3232         reply = !down_conversion(lkb);
3233
3234         error = do_convert(r, lkb);
3235  out_reply:
3236         if (reply)
3237                 send_convert_reply(r, lkb, error);
3238  out:
3239         unlock_rsb(r);
3240         put_rsb(r);
3241         dlm_put_lkb(lkb);
3242         return;
3243
3244  fail:
3245         setup_stub_lkb(ls, ms);
3246         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3247 }
3248
3249 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3250 {
3251         struct dlm_lkb *lkb;
3252         struct dlm_rsb *r;
3253         int error;
3254
3255         error = find_lkb(ls, ms->m_remid, &lkb);
3256         if (error)
3257                 goto fail;
3258
3259         r = lkb->lkb_resource;
3260
3261         hold_rsb(r);
3262         lock_rsb(r);
3263
3264         error = validate_message(lkb, ms);
3265         if (error)
3266                 goto out;
3267
3268         receive_flags(lkb, ms);
3269         error = receive_unlock_args(ls, lkb, ms);
3270         if (error)
3271                 goto out_reply;
3272
3273         error = do_unlock(r, lkb);
3274  out_reply:
3275         send_unlock_reply(r, lkb, error);
3276  out:
3277         unlock_rsb(r);
3278         put_rsb(r);
3279         dlm_put_lkb(lkb);
3280         return;
3281
3282  fail:
3283         setup_stub_lkb(ls, ms);
3284         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3285 }
3286
3287 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3288 {
3289         struct dlm_lkb *lkb;
3290         struct dlm_rsb *r;
3291         int error;
3292
3293         error = find_lkb(ls, ms->m_remid, &lkb);
3294         if (error)
3295                 goto fail;
3296
3297         receive_flags(lkb, ms);
3298
3299         r = lkb->lkb_resource;
3300
3301         hold_rsb(r);
3302         lock_rsb(r);
3303
3304         error = validate_message(lkb, ms);
3305         if (error)
3306                 goto out;
3307
3308         error = do_cancel(r, lkb);
3309         send_cancel_reply(r, lkb, error);
3310  out:
3311         unlock_rsb(r);
3312         put_rsb(r);
3313         dlm_put_lkb(lkb);
3314         return;
3315
3316  fail:
3317         setup_stub_lkb(ls, ms);
3318         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3319 }
3320
3321 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3322 {
3323         struct dlm_lkb *lkb;
3324         struct dlm_rsb *r;
3325         int error;
3326
3327         error = find_lkb(ls, ms->m_remid, &lkb);
3328         if (error) {
3329                 log_debug(ls, "receive_grant from %d no lkb %x",
3330                           ms->m_header.h_nodeid, ms->m_remid);
3331                 return;
3332         }
3333
3334         r = lkb->lkb_resource;
3335
3336         hold_rsb(r);
3337         lock_rsb(r);
3338
3339         error = validate_message(lkb, ms);
3340         if (error)
3341                 goto out;
3342
3343         receive_flags_reply(lkb, ms);
3344         if (is_altmode(lkb))
3345                 munge_altmode(lkb, ms);
3346         grant_lock_pc(r, lkb, ms);
3347         queue_cast(r, lkb, 0);
3348  out:
3349         unlock_rsb(r);
3350         put_rsb(r);
3351         dlm_put_lkb(lkb);
3352 }
3353
3354 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3355 {
3356         struct dlm_lkb *lkb;
3357         struct dlm_rsb *r;
3358         int error;
3359
3360         error = find_lkb(ls, ms->m_remid, &lkb);
3361         if (error) {
3362                 log_debug(ls, "receive_bast from %d no lkb %x",
3363                           ms->m_header.h_nodeid, ms->m_remid);
3364                 return;
3365         }
3366
3367         r = lkb->lkb_resource;
3368
3369         hold_rsb(r);
3370         lock_rsb(r);
3371
3372         error = validate_message(lkb, ms);
3373         if (error)
3374                 goto out;
3375
3376         queue_bast(r, lkb, ms->m_bastmode);
3377  out:
3378         unlock_rsb(r);
3379         put_rsb(r);
3380         dlm_put_lkb(lkb);
3381 }
3382
3383 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3384 {
3385         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3386
3387         from_nodeid = ms->m_header.h_nodeid;
3388         our_nodeid = dlm_our_nodeid();
3389
3390         len = receive_extralen(ms);
3391
3392         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3393         if (dir_nodeid != our_nodeid) {
3394                 log_error(ls, "lookup dir_nodeid %d from %d",
3395                           dir_nodeid, from_nodeid);
3396                 error = -EINVAL;
3397                 ret_nodeid = -1;
3398                 goto out;
3399         }
3400
3401         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3402
3403         /* Optimization: we're master so treat lookup as a request */
3404         if (!error && ret_nodeid == our_nodeid) {
3405                 receive_request(ls, ms);
3406                 return;
3407         }
3408  out:
3409         send_lookup_reply(ls, ms, ret_nodeid, error);
3410 }
3411
3412 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3413 {
3414         int len, dir_nodeid, from_nodeid;
3415
3416         from_nodeid = ms->m_header.h_nodeid;
3417
3418         len = receive_extralen(ms);
3419
3420         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3421         if (dir_nodeid != dlm_our_nodeid()) {
3422                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3423                           dir_nodeid, from_nodeid);
3424                 return;
3425         }
3426
3427         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3428 }
3429
3430 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3431 {
3432         do_purge(ls, ms->m_nodeid, ms->m_pid);
3433 }
3434
3435 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3436 {
3437         struct dlm_lkb *lkb;
3438         struct dlm_rsb *r;
3439         int error, mstype, result;
3440
3441         error = find_lkb(ls, ms->m_remid, &lkb);
3442         if (error) {
3443                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3444                           ms->m_header.h_nodeid, ms->m_remid);
3445                 return;
3446         }
3447
3448         r = lkb->lkb_resource;
3449         hold_rsb(r);
3450         lock_rsb(r);
3451
3452         error = validate_message(lkb, ms);
3453         if (error)
3454                 goto out;
3455
3456         mstype = lkb->lkb_wait_type;
3457         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3458         if (error)
3459                 goto out;
3460
3461         /* Optimization: the dir node was also the master, so it took our
3462            lookup as a request and sent request reply instead of lookup reply */
3463         if (mstype == DLM_MSG_LOOKUP) {
3464                 r->res_nodeid = ms->m_header.h_nodeid;
3465                 lkb->lkb_nodeid = r->res_nodeid;
3466         }
3467
3468         /* this is the value returned from do_request() on the master */
3469         result = ms->m_result;
3470
3471         switch (result) {
3472         case -EAGAIN:
3473                 /* request would block (be queued) on remote master */
3474                 queue_cast(r, lkb, -EAGAIN);
3475                 confirm_master(r, -EAGAIN);
3476                 unhold_lkb(lkb); /* undoes create_lkb() */
3477                 break;
3478
3479         case -EINPROGRESS:
3480         case 0:
3481                 /* request was queued or granted on remote master */
3482                 receive_flags_reply(lkb, ms);
3483                 lkb->lkb_remid = ms->m_lkid;
3484                 if (is_altmode(lkb))
3485                         munge_altmode(lkb, ms);
3486                 if (result) {
3487                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3488                         add_timeout(lkb);
3489                 } else {
3490                         grant_lock_pc(r, lkb, ms);
3491                         queue_cast(r, lkb, 0);
3492                 }
3493                 confirm_master(r, result);
3494                 break;
3495
3496         case -EBADR:
3497         case -ENOTBLK:
3498                 /* find_rsb failed to find rsb or rsb wasn't master */
3499                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3500                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3501                 r->res_nodeid = -1;
3502                 lkb->lkb_nodeid = -1;
3503
3504                 if (is_overlap(lkb)) {
3505                         /* we'll ignore error in cancel/unlock reply */
3506                         queue_cast_overlap(r, lkb);
3507                         confirm_master(r, result);
3508                         unhold_lkb(lkb); /* undoes create_lkb() */
3509                 } else
3510                         _request_lock(r, lkb);
3511                 break;
3512
3513         default:
3514                 log_error(ls, "receive_request_reply %x error %d",
3515                           lkb->lkb_id, result);
3516         }
3517
3518         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3519                 log_debug(ls, "receive_request_reply %x result %d unlock",
3520                           lkb->lkb_id, result);
3521                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3522                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3523                 send_unlock(r, lkb);
3524         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3525                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3526                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3527                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3528                 send_cancel(r, lkb);
3529         } else {
3530                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3531                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3532         }
3533  out:
3534         unlock_rsb(r);
3535         put_rsb(r);
3536         dlm_put_lkb(lkb);
3537 }
3538
3539 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3540                                     struct dlm_message *ms)
3541 {
3542         /* this is the value returned from do_convert() on the master */
3543         switch (ms->m_result) {
3544         case -EAGAIN:
3545                 /* convert would block (be queued) on remote master */
3546                 queue_cast(r, lkb, -EAGAIN);
3547                 break;
3548
3549         case -EDEADLK:
3550                 receive_flags_reply(lkb, ms);
3551                 revert_lock_pc(r, lkb);
3552                 queue_cast(r, lkb, -EDEADLK);
3553                 break;
3554
3555         case -EINPROGRESS:
3556                 /* convert was queued on remote master */
3557                 receive_flags_reply(lkb, ms);
3558                 if (is_demoted(lkb))
3559                         munge_demoted(lkb, ms);
3560                 del_lkb(r, lkb);
3561                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3562                 add_timeout(lkb);
3563                 break;
3564
3565         case 0:
3566                 /* convert was granted on remote master */
3567                 receive_flags_reply(lkb, ms);
3568                 if (is_demoted(lkb))
3569                         munge_demoted(lkb, ms);
3570                 grant_lock_pc(r, lkb, ms);
3571                 queue_cast(r, lkb, 0);
3572                 break;
3573
3574         default:
3575                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3576                           lkb->lkb_id, ms->m_result);
3577         }
3578 }
3579
3580 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3581 {
3582         struct dlm_rsb *r = lkb->lkb_resource;
3583         int error;
3584
3585         hold_rsb(r);
3586         lock_rsb(r);
3587
3588         error = validate_message(lkb, ms);
3589         if (error)
3590                 goto out;
3591
3592         /* stub reply can happen with waiters_mutex held */
3593         error = remove_from_waiters_ms(lkb, ms);
3594         if (error)
3595                 goto out;
3596
3597         __receive_convert_reply(r, lkb, ms);
3598  out:
3599         unlock_rsb(r);
3600         put_rsb(r);
3601 }
3602
3603 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3604 {
3605         struct dlm_lkb *lkb;
3606         int error;
3607
3608         error = find_lkb(ls, ms->m_remid, &lkb);
3609         if (error) {
3610                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3611                           ms->m_header.h_nodeid, ms->m_remid);
3612                 return;
3613         }
3614
3615         _receive_convert_reply(lkb, ms);
3616         dlm_put_lkb(lkb);
3617 }
3618
3619 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3620 {
3621         struct dlm_rsb *r = lkb->lkb_resource;
3622         int error;
3623
3624         hold_rsb(r);
3625         lock_rsb(r);
3626
3627         error = validate_message(lkb, ms);
3628         if (error)
3629                 goto out;
3630
3631         /* stub reply can happen with waiters_mutex held */
3632         error = remove_from_waiters_ms(lkb, ms);
3633         if (error)
3634                 goto out;
3635
3636         /* this is the value returned from do_unlock() on the master */
3637
3638         switch (ms->m_result) {
3639         case -DLM_EUNLOCK:
3640                 receive_flags_reply(lkb, ms);
3641                 remove_lock_pc(r, lkb);
3642                 queue_cast(r, lkb, -DLM_EUNLOCK);
3643                 break;
3644         case -ENOENT:
3645                 break;
3646         default:
3647                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3648                           lkb->lkb_id, ms->m_result);
3649         }
3650  out:
3651         unlock_rsb(r);
3652         put_rsb(r);
3653 }
3654
3655 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3656 {
3657         struct dlm_lkb *lkb;
3658         int error;
3659
3660         error = find_lkb(ls, ms->m_remid, &lkb);
3661         if (error) {
3662                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3663                           ms->m_header.h_nodeid, ms->m_remid);
3664                 return;
3665         }
3666
3667         _receive_unlock_reply(lkb, ms);
3668         dlm_put_lkb(lkb);
3669 }
3670
3671 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3672 {
3673         struct dlm_rsb *r = lkb->lkb_resource;
3674         int error;
3675
3676         hold_rsb(r);
3677         lock_rsb(r);
3678
3679         error = validate_message(lkb, ms);
3680         if (error)
3681                 goto out;
3682
3683         /* stub reply can happen with waiters_mutex held */
3684         error = remove_from_waiters_ms(lkb, ms);
3685         if (error)
3686                 goto out;
3687
3688         /* this is the value returned from do_cancel() on the master */
3689
3690         switch (ms->m_result) {
3691         case -DLM_ECANCEL:
3692                 receive_flags_reply(lkb, ms);
3693                 revert_lock_pc(r, lkb);
3694                 queue_cast(r, lkb, -DLM_ECANCEL);
3695                 break;
3696         case 0:
3697                 break;
3698         default:
3699                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3700                           lkb->lkb_id, ms->m_result);
3701         }
3702  out:
3703         unlock_rsb(r);
3704         put_rsb(r);
3705 }
3706
3707 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3708 {
3709         struct dlm_lkb *lkb;
3710         int error;
3711
3712         error = find_lkb(ls, ms->m_remid, &lkb);
3713         if (error) {
3714                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3715                           ms->m_header.h_nodeid, ms->m_remid);
3716                 return;
3717         }
3718
3719         _receive_cancel_reply(lkb, ms);
3720         dlm_put_lkb(lkb);
3721 }
3722
3723 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3724 {
3725         struct dlm_lkb *lkb;
3726         struct dlm_rsb *r;
3727         int error, ret_nodeid;
3728
3729         error = find_lkb(ls, ms->m_lkid, &lkb);
3730         if (error) {
3731                 log_error(ls, "receive_lookup_reply no lkb");
3732                 return;
3733         }
3734
3735         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3736            FIXME: will a non-zero error ever be returned? */
3737
3738         r = lkb->lkb_resource;
3739         hold_rsb(r);
3740         lock_rsb(r);
3741
3742         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3743         if (error)
3744                 goto out;
3745
3746         ret_nodeid = ms->m_nodeid;
3747         if (ret_nodeid == dlm_our_nodeid()) {
3748                 r->res_nodeid = 0;
3749                 ret_nodeid = 0;
3750                 r->res_first_lkid = 0;
3751         } else {
3752                 /* set_master() will copy res_nodeid to lkb_nodeid */
3753                 r->res_nodeid = ret_nodeid;
3754         }
3755
3756         if (is_overlap(lkb)) {
3757                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3758                           lkb->lkb_id, lkb->lkb_flags);
3759                 queue_cast_overlap(r, lkb);
3760                 unhold_lkb(lkb); /* undoes create_lkb() */
3761                 goto out_list;
3762         }
3763
3764         _request_lock(r, lkb);
3765
3766  out_list:
3767         if (!ret_nodeid)
3768                 process_lookup_list(r);
3769  out:
3770         unlock_rsb(r);
3771         put_rsb(r);
3772         dlm_put_lkb(lkb);
3773 }
3774
3775 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3776 {
3777         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3778                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3779                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3780                           ms->m_remid, ms->m_result);
3781                 return;
3782         }
3783
3784         switch (ms->m_type) {
3785
3786         /* messages sent to a master node */
3787
3788         case DLM_MSG_REQUEST:
3789                 receive_request(ls, ms);
3790                 break;
3791
3792         case DLM_MSG_CONVERT:
3793                 receive_convert(ls, ms);
3794                 break;
3795
3796         case DLM_MSG_UNLOCK:
3797                 receive_unlock(ls, ms);
3798                 break;
3799
3800         case DLM_MSG_CANCEL:
3801                 receive_cancel(ls, ms);
3802                 break;
3803
3804         /* messages sent from a master node (replies to above) */
3805
3806         case DLM_MSG_REQUEST_REPLY:
3807                 receive_request_reply(ls, ms);
3808                 break;
3809
3810         case DLM_MSG_CONVERT_REPLY:
3811                 receive_convert_reply(ls, ms);
3812                 break;
3813
3814         case DLM_MSG_UNLOCK_REPLY:
3815                 receive_unlock_reply(ls, ms);
3816                 break;
3817
3818         case DLM_MSG_CANCEL_REPLY:
3819                 receive_cancel_reply(ls, ms);
3820                 break;
3821
3822         /* messages sent from a master node (only two types of async msg) */
3823
3824         case DLM_MSG_GRANT:
3825                 receive_grant(ls, ms);
3826                 break;
3827
3828         case DLM_MSG_BAST:
3829                 receive_bast(ls, ms);
3830                 break;
3831
3832         /* messages sent to a dir node */
3833
3834         case DLM_MSG_LOOKUP:
3835                 receive_lookup(ls, ms);
3836                 break;
3837
3838         case DLM_MSG_REMOVE:
3839                 receive_remove(ls, ms);
3840                 break;
3841
3842         /* messages sent from a dir node (remove has no reply) */
3843
3844         case DLM_MSG_LOOKUP_REPLY:
3845                 receive_lookup_reply(ls, ms);
3846                 break;
3847
3848         /* other messages */
3849
3850         case DLM_MSG_PURGE:
3851                 receive_purge(ls, ms);
3852                 break;
3853
3854         default:
3855                 log_error(ls, "unknown message type %d", ms->m_type);
3856         }
3857
3858         dlm_astd_wake();
3859 }
3860
3861 /* If the lockspace is in recovery mode (locking stopped), then normal
3862    messages are saved on the requestqueue for processing after recovery is
3863    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3864    messages off the requestqueue before we process new ones. This occurs right
3865    after recovery completes when we transition from saving all messages on
3866    requestqueue, to processing all the saved messages, to processing new
3867    messages as they arrive. */
3868
3869 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3870                                 int nodeid)
3871 {
3872         if (dlm_locking_stopped(ls)) {
3873                 dlm_add_requestqueue(ls, nodeid, ms);
3874         } else {
3875                 dlm_wait_requestqueue(ls);
3876                 _receive_message(ls, ms);
3877         }
3878 }
3879
3880 /* This is called by dlm_recoverd to process messages that were saved on
3881    the requestqueue. */
3882
3883 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3884 {
3885         _receive_message(ls, ms);
3886 }
3887
3888 /* This is called by the midcomms layer when something is received for
3889    the lockspace.  It could be either a MSG (normal message sent as part of
3890    standard locking activity) or an RCOM (recovery message sent as part of
3891    lockspace recovery). */
3892
3893 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3894 {
3895         struct dlm_header *hd = &p->header;
3896         struct dlm_ls *ls;
3897         int type = 0;
3898
3899         switch (hd->h_cmd) {
3900         case DLM_MSG:
3901                 dlm_message_in(&p->message);
3902                 type = p->message.m_type;
3903                 break;
3904         case DLM_RCOM:
3905                 dlm_rcom_in(&p->rcom);
3906                 type = p->rcom.rc_type;
3907                 break;
3908         default:
3909                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3910                 return;
3911         }
3912
3913         if (hd->h_nodeid != nodeid) {
3914                 log_print("invalid h_nodeid %d from %d lockspace %x",
3915                           hd->h_nodeid, nodeid, hd->h_lockspace);
3916                 return;
3917         }
3918
3919         ls = dlm_find_lockspace_global(hd->h_lockspace);
3920         if (!ls) {
3921                 if (dlm_config.ci_log_debug)
3922                         log_print("invalid lockspace %x from %d cmd %d type %d",
3923                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
3924
3925                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3926                         dlm_send_ls_not_ready(nodeid, &p->rcom);
3927                 return;
3928         }
3929
3930         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3931            be inactive (in this ls) before transitioning to recovery mode */
3932
3933         down_read(&ls->ls_recv_active);
3934         if (hd->h_cmd == DLM_MSG)
3935                 dlm_receive_message(ls, &p->message, nodeid);
3936         else
3937                 dlm_receive_rcom(ls, &p->rcom, nodeid);
3938         up_read(&ls->ls_recv_active);
3939
3940         dlm_put_lockspace(ls);
3941 }
3942
3943 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3944 {
3945         if (middle_conversion(lkb)) {
3946                 hold_lkb(lkb);
3947                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3948                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3949                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3950                 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3951                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3952
3953                 /* Same special case as in receive_rcom_lock_args() */
3954                 lkb->lkb_grmode = DLM_LOCK_IV;
3955                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3956                 unhold_lkb(lkb);
3957
3958         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3959                 lkb->lkb_flags |= DLM_IFL_RESEND;
3960         }
3961
3962         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3963            conversions are async; there's no reply from the remote master */
3964 }
3965
3966 /* A waiting lkb needs recovery if the master node has failed, or
3967    the master node is changing (only when no directory is used) */
3968
3969 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3970 {
3971         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3972                 return 1;
3973
3974         if (!dlm_no_directory(ls))
3975                 return 0;
3976
3977         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3978                 return 1;
3979
3980         return 0;
3981 }
3982
3983 /* Recovery for locks that are waiting for replies from nodes that are now
3984    gone.  We can just complete unlocks and cancels by faking a reply from the
3985    dead node.  Requests and up-conversions we flag to be resent after
3986    recovery.  Down-conversions can just be completed with a fake reply like
3987    unlocks.  Conversions between PR and CW need special attention. */
3988
3989 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3990 {
3991         struct dlm_lkb *lkb, *safe;
3992         int wait_type, stub_unlock_result, stub_cancel_result;
3993
3994         mutex_lock(&ls->ls_waiters_mutex);
3995
3996         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3997                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3998                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3999
4000                 /* all outstanding lookups, regardless of destination  will be
4001                    resent after recovery is done */
4002
4003                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4004                         lkb->lkb_flags |= DLM_IFL_RESEND;
4005                         continue;
4006                 }
4007
4008                 if (!waiter_needs_recovery(ls, lkb))
4009                         continue;
4010
4011                 wait_type = lkb->lkb_wait_type;
4012                 stub_unlock_result = -DLM_EUNLOCK;
4013                 stub_cancel_result = -DLM_ECANCEL;
4014
4015                 /* Main reply may have been received leaving a zero wait_type,
4016                    but a reply for the overlapping op may not have been
4017                    received.  In that case we need to fake the appropriate
4018                    reply for the overlap op. */
4019
4020                 if (!wait_type) {
4021                         if (is_overlap_cancel(lkb)) {
4022                                 wait_type = DLM_MSG_CANCEL;
4023                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4024                                         stub_cancel_result = 0;
4025                         }
4026                         if (is_overlap_unlock(lkb)) {
4027                                 wait_type = DLM_MSG_UNLOCK;
4028                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4029                                         stub_unlock_result = -ENOENT;
4030                         }
4031
4032                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4033                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
4034                                   stub_cancel_result, stub_unlock_result);
4035                 }
4036
4037                 switch (wait_type) {
4038
4039                 case DLM_MSG_REQUEST:
4040                         lkb->lkb_flags |= DLM_IFL_RESEND;
4041                         break;
4042
4043                 case DLM_MSG_CONVERT:
4044                         recover_convert_waiter(ls, lkb);
4045                         break;
4046
4047                 case DLM_MSG_UNLOCK:
4048                         hold_lkb(lkb);
4049                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4050                         ls->ls_stub_ms.m_result = stub_unlock_result;
4051                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4052                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4053                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4054                         dlm_put_lkb(lkb);
4055                         break;
4056
4057                 case DLM_MSG_CANCEL:
4058                         hold_lkb(lkb);
4059                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4060                         ls->ls_stub_ms.m_result = stub_cancel_result;
4061                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4062                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4063                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4064                         dlm_put_lkb(lkb);
4065                         break;
4066
4067                 default:
4068                         log_error(ls, "invalid lkb wait_type %d %d",
4069                                   lkb->lkb_wait_type, wait_type);
4070                 }
4071                 schedule();
4072         }
4073         mutex_unlock(&ls->ls_waiters_mutex);
4074 }
4075
4076 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4077 {
4078         struct dlm_lkb *lkb;
4079         int found = 0;
4080
4081         mutex_lock(&ls->ls_waiters_mutex);
4082         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4083                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4084                         hold_lkb(lkb);
4085                         found = 1;
4086                         break;
4087                 }
4088         }
4089         mutex_unlock(&ls->ls_waiters_mutex);
4090
4091         if (!found)
4092                 lkb = NULL;
4093         return lkb;
4094 }
4095
4096 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4097    master or dir-node for r.  Processing the lkb may result in it being placed
4098    back on waiters. */
4099
4100 /* We do this after normal locking has been enabled and any saved messages
4101    (in requestqueue) have been processed.  We should be confident that at
4102    this point we won't get or process a reply to any of these waiting
4103    operations.  But, new ops may be coming in on the rsbs/locks here from
4104    userspace or remotely. */
4105
4106 /* there may have been an overlap unlock/cancel prior to recovery or after
4107    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4108    overlap flag would just have been set and nothing new sent.  we can be
4109    confident here than any replies to either the initial op or overlap ops
4110    prior to recovery have been received. */
4111
4112 int dlm_recover_waiters_post(struct dlm_ls *ls)
4113 {
4114         struct dlm_lkb *lkb;
4115         struct dlm_rsb *r;
4116         int error = 0, mstype, err, oc, ou;
4117
4118         while (1) {
4119                 if (dlm_locking_stopped(ls)) {
4120                         log_debug(ls, "recover_waiters_post aborted");
4121                         error = -EINTR;
4122                         break;
4123                 }
4124
4125                 lkb = find_resend_waiter(ls);
4126                 if (!lkb)
4127                         break;
4128
4129                 r = lkb->lkb_resource;
4130                 hold_rsb(r);
4131                 lock_rsb(r);
4132
4133                 mstype = lkb->lkb_wait_type;
4134                 oc = is_overlap_cancel(lkb);
4135                 ou = is_overlap_unlock(lkb);
4136                 err = 0;
4137
4138                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4139                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4140
4141                 /* At this point we assume that we won't get a reply to any
4142                    previous op or overlap op on this lock.  First, do a big
4143                    remove_from_waiters() for all previous ops. */
4144
4145                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4146                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4147                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4148                 lkb->lkb_wait_type = 0;
4149                 lkb->lkb_wait_count = 0;
4150                 mutex_lock(&ls->ls_waiters_mutex);
4151                 list_del_init(&lkb->lkb_wait_reply);
4152                 mutex_unlock(&ls->ls_waiters_mutex);
4153                 unhold_lkb(lkb); /* for waiters list */
4154
4155                 if (oc || ou) {
4156                         /* do an unlock or cancel instead of resending */
4157                         switch (mstype) {
4158                         case DLM_MSG_LOOKUP:
4159                         case DLM_MSG_REQUEST:
4160                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4161                                                         -DLM_ECANCEL);
4162                                 unhold_lkb(lkb); /* undoes create_lkb() */
4163                                 break;
4164                         case DLM_MSG_CONVERT:
4165                                 if (oc) {
4166                                         queue_cast(r, lkb, -DLM_ECANCEL);
4167                                 } else {
4168                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4169                                         _unlock_lock(r, lkb);
4170                                 }
4171                                 break;
4172                         default:
4173                                 err = 1;
4174                         }
4175                 } else {
4176                         switch (mstype) {
4177                         case DLM_MSG_LOOKUP:
4178                         case DLM_MSG_REQUEST:
4179                                 _request_lock(r, lkb);
4180                                 if (is_master(r))
4181                                         confirm_master(r, 0);
4182                                 break;
4183                         case DLM_MSG_CONVERT:
4184                                 _convert_lock(r, lkb);
4185                                 break;
4186                         default:
4187                                 err = 1;
4188                         }
4189                 }
4190
4191                 if (err)
4192                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4193                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4194                 unlock_rsb(r);
4195                 put_rsb(r);
4196                 dlm_put_lkb(lkb);
4197         }
4198
4199         return error;
4200 }
4201
4202 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4203                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4204 {
4205         struct dlm_ls *ls = r->res_ls;
4206         struct dlm_lkb *lkb, *safe;
4207
4208         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4209                 if (test(ls, lkb)) {
4210                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4211                         del_lkb(r, lkb);
4212                         /* this put should free the lkb */
4213                         if (!dlm_put_lkb(lkb))
4214                                 log_error(ls, "purged lkb not released");
4215                 }
4216         }
4217 }
4218
4219 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4220 {
4221         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4222 }
4223
4224 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4225 {
4226         return is_master_copy(lkb);
4227 }
4228
4229 static void purge_dead_locks(struct dlm_rsb *r)
4230 {
4231         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4232         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4233         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4234 }
4235
4236 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4237 {
4238         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4239         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4240         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4241 }
4242
4243 /* Get rid of locks held by nodes that are gone. */
4244
4245 int dlm_purge_locks(struct dlm_ls *ls)
4246 {
4247         struct dlm_rsb *r;
4248
4249         log_debug(ls, "dlm_purge_locks");
4250
4251         down_write(&ls->ls_root_sem);
4252         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4253                 hold_rsb(r);
4254                 lock_rsb(r);
4255                 if (is_master(r))
4256                         purge_dead_locks(r);
4257                 unlock_rsb(r);
4258                 unhold_rsb(r);
4259
4260                 schedule();
4261         }
4262         up_write(&ls->ls_root_sem);
4263
4264         return 0;
4265 }
4266
4267 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4268 {
4269         struct dlm_rsb *r, *r_ret = NULL;
4270
4271         spin_lock(&ls->ls_rsbtbl[bucket].lock);
4272         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4273                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4274                         continue;
4275                 hold_rsb(r);
4276                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4277                 r_ret = r;
4278                 break;
4279         }
4280         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4281         return r_ret;
4282 }
4283
4284 void dlm_grant_after_purge(struct dlm_ls *ls)
4285 {
4286         struct dlm_rsb *r;
4287         int bucket = 0;
4288
4289         while (1) {
4290                 r = find_purged_rsb(ls, bucket);
4291                 if (!r) {
4292                         if (bucket == ls->ls_rsbtbl_size - 1)
4293                                 break;
4294                         bucket++;
4295                         continue;
4296                 }
4297                 lock_rsb(r);
4298                 if (is_master(r)) {
4299                         grant_pending_locks(r);
4300                         confirm_master(r, 0);
4301                 }
4302                 unlock_rsb(r);
4303                 put_rsb(r);
4304                 schedule();
4305         }
4306 }
4307
4308 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4309                                          uint32_t remid)
4310 {
4311         struct dlm_lkb *lkb;
4312
4313         list_for_each_entry(lkb, head, lkb_statequeue) {
4314                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4315                         return lkb;
4316         }
4317         return NULL;
4318 }
4319
4320 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4321                                     uint32_t remid)
4322 {
4323         struct dlm_lkb *lkb;
4324
4325         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4326         if (lkb)
4327                 return lkb;
4328         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4329         if (lkb)
4330                 return lkb;
4331         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4332         if (lkb)
4333                 return lkb;
4334         return NULL;
4335 }
4336
4337 /* needs at least dlm_rcom + rcom_lock */
4338 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4339                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4340 {
4341         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4342
4343         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4344         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4345         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4346         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4347         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4348         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4349         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4350         lkb->lkb_rqmode = rl->rl_rqmode;
4351         lkb->lkb_grmode = rl->rl_grmode;
4352         /* don't set lkb_status because add_lkb wants to itself */
4353
4354         lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4355         lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4356
4357         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4358                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4359                          sizeof(struct rcom_lock);
4360                 if (lvblen > ls->ls_lvblen)
4361                         return -EINVAL;
4362                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4363                 if (!lkb->lkb_lvbptr)
4364                         return -ENOMEM;
4365                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4366         }
4367
4368         /* Conversions between PR and CW (middle modes) need special handling.
4369            The real granted mode of these converting locks cannot be determined
4370            until all locks have been rebuilt on the rsb (recover_conversion) */
4371
4372         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4373             middle_conversion(lkb)) {
4374                 rl->rl_status = DLM_LKSTS_CONVERT;
4375                 lkb->lkb_grmode = DLM_LOCK_IV;
4376                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4377         }
4378
4379         return 0;
4380 }
4381
4382 /* This lkb may have been recovered in a previous aborted recovery so we need
4383    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4384    If so we just send back a standard reply.  If not, we create a new lkb with
4385    the given values and send back our lkid.  We send back our lkid by sending
4386    back the rcom_lock struct we got but with the remid field filled in. */
4387
4388 /* needs at least dlm_rcom + rcom_lock */
4389 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4390 {
4391         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4392         struct dlm_rsb *r;
4393         struct dlm_lkb *lkb;
4394         int error;
4395
4396         if (rl->rl_parent_lkid) {
4397                 error = -EOPNOTSUPP;
4398                 goto out;
4399         }
4400
4401         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4402                          R_MASTER, &r);
4403         if (error)
4404                 goto out;
4405
4406         lock_rsb(r);
4407
4408         lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4409         if (lkb) {
4410                 error = -EEXIST;
4411                 goto out_remid;
4412         }
4413
4414         error = create_lkb(ls, &lkb);
4415         if (error)
4416                 goto out_unlock;
4417
4418         error = receive_rcom_lock_args(ls, lkb, r, rc);
4419         if (error) {
4420                 __put_lkb(ls, lkb);
4421                 goto out_unlock;
4422         }
4423
4424         attach_lkb(r, lkb);
4425         add_lkb(r, lkb, rl->rl_status);
4426         error = 0;
4427
4428  out_remid:
4429         /* this is the new value returned to the lock holder for
4430            saving in its process-copy lkb */
4431         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4432
4433  out_unlock:
4434         unlock_rsb(r);
4435         put_rsb(r);
4436  out:
4437         if (error)
4438                 log_debug(ls, "recover_master_copy %d %x", error,
4439                           le32_to_cpu(rl->rl_lkid));
4440         rl->rl_result = cpu_to_le32(error);
4441         return error;
4442 }
4443
4444 /* needs at least dlm_rcom + rcom_lock */
4445 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4446 {
4447         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4448         struct dlm_rsb *r;
4449         struct dlm_lkb *lkb;
4450         int error;
4451
4452         error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4453         if (error) {
4454                 log_error(ls, "recover_process_copy no lkid %x",
4455                                 le32_to_cpu(rl->rl_lkid));
4456                 return error;
4457         }
4458
4459         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4460
4461         error = le32_to_cpu(rl->rl_result);
4462
4463         r = lkb->lkb_resource;
4464         hold_rsb(r);
4465         lock_rsb(r);
4466
4467         switch (error) {
4468         case -EBADR:
4469                 /* There's a chance the new master received our lock before
4470                    dlm_recover_master_reply(), this wouldn't happen if we did
4471                    a barrier between recover_masters and recover_locks. */
4472                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4473                           (unsigned long)r, r->res_name);
4474                 dlm_send_rcom_lock(r, lkb);
4475                 goto out;
4476         case -EEXIST:
4477                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4478                 /* fall through */
4479         case 0:
4480                 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4481                 break;
4482         default:
4483                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4484                           error, lkb->lkb_id);
4485         }
4486
4487         /* an ack for dlm_recover_locks() which waits for replies from
4488            all the locks it sends to new masters */
4489         dlm_recovered_lock(r);
4490  out:
4491         unlock_rsb(r);
4492         put_rsb(r);
4493         dlm_put_lkb(lkb);
4494
4495         return 0;
4496 }
4497
4498 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4499                      int mode, uint32_t flags, void *name, unsigned int namelen,
4500                      unsigned long timeout_cs)
4501 {
4502         struct dlm_lkb *lkb;
4503         struct dlm_args args;
4504         int error;
4505
4506         dlm_lock_recovery(ls);
4507
4508         error = create_lkb(ls, &lkb);
4509         if (error) {
4510                 kfree(ua);
4511                 goto out;
4512         }
4513
4514         if (flags & DLM_LKF_VALBLK) {
4515                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4516                 if (!ua->lksb.sb_lvbptr) {
4517                         kfree(ua);
4518                         __put_lkb(ls, lkb);
4519                         error = -ENOMEM;
4520                         goto out;
4521                 }
4522         }
4523
4524         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4525            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4526            lock and that lkb_astparam is the dlm_user_args structure. */
4527
4528         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4529                               fake_astfn, ua, fake_bastfn, &args);
4530         lkb->lkb_flags |= DLM_IFL_USER;
4531         ua->old_mode = DLM_LOCK_IV;
4532
4533         if (error) {
4534                 __put_lkb(ls, lkb);
4535                 goto out;
4536         }
4537
4538         error = request_lock(ls, lkb, name, namelen, &args);
4539
4540         switch (error) {
4541         case 0:
4542                 break;
4543         case -EINPROGRESS:
4544                 error = 0;
4545                 break;
4546         case -EAGAIN:
4547                 error = 0;
4548                 /* fall through */
4549         default:
4550                 __put_lkb(ls, lkb);
4551                 goto out;
4552         }
4553
4554         /* add this new lkb to the per-process list of locks */
4555         spin_lock(&ua->proc->locks_spin);
4556         hold_lkb(lkb);
4557         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4558         spin_unlock(&ua->proc->locks_spin);
4559  out:
4560         dlm_unlock_recovery(ls);
4561         return error;
4562 }
4563
4564 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4565                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4566                      unsigned long timeout_cs)
4567 {
4568         struct dlm_lkb *lkb;
4569         struct dlm_args args;
4570         struct dlm_user_args *ua;
4571         int error;
4572
4573         dlm_lock_recovery(ls);
4574
4575         error = find_lkb(ls, lkid, &lkb);
4576         if (error)
4577                 goto out;
4578
4579         /* user can change the params on its lock when it converts it, or
4580            add an lvb that didn't exist before */
4581
4582         ua = lkb->lkb_ua;
4583
4584         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4585                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4586                 if (!ua->lksb.sb_lvbptr) {
4587                         error = -ENOMEM;
4588                         goto out_put;
4589                 }
4590         }
4591         if (lvb_in && ua->lksb.sb_lvbptr)
4592                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4593
4594         ua->xid = ua_tmp->xid;
4595         ua->castparam = ua_tmp->castparam;
4596         ua->castaddr = ua_tmp->castaddr;
4597         ua->bastparam = ua_tmp->bastparam;
4598         ua->bastaddr = ua_tmp->bastaddr;
4599         ua->user_lksb = ua_tmp->user_lksb;
4600         ua->old_mode = lkb->lkb_grmode;
4601
4602         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4603                               fake_astfn, ua, fake_bastfn, &args);
4604         if (error)
4605                 goto out_put;
4606
4607         error = convert_lock(ls, lkb, &args);
4608
4609         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4610                 error = 0;
4611  out_put:
4612         dlm_put_lkb(lkb);
4613  out:
4614         dlm_unlock_recovery(ls);
4615         kfree(ua_tmp);
4616         return error;
4617 }
4618
4619 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4620                     uint32_t flags, uint32_t lkid, char *lvb_in)
4621 {
4622         struct dlm_lkb *lkb;
4623         struct dlm_args args;
4624         struct dlm_user_args *ua;
4625         int error;
4626
4627         dlm_lock_recovery(ls);
4628
4629         error = find_lkb(ls, lkid, &lkb);
4630         if (error)
4631                 goto out;
4632
4633         ua = lkb->lkb_ua;
4634
4635         if (lvb_in && ua->lksb.sb_lvbptr)
4636                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4637         if (ua_tmp->castparam)
4638                 ua->castparam = ua_tmp->castparam;
4639         ua->user_lksb = ua_tmp->user_lksb;
4640
4641         error = set_unlock_args(flags, ua, &args);
4642         if (error)
4643                 goto out_put;
4644
4645         error = unlock_lock(ls, lkb, &args);
4646
4647         if (error == -DLM_EUNLOCK)
4648                 error = 0;
4649         /* from validate_unlock_args() */
4650         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4651                 error = 0;
4652         if (error)
4653                 goto out_put;
4654
4655         spin_lock(&ua->proc->locks_spin);
4656         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4657         if (!list_empty(&lkb->lkb_ownqueue))
4658                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4659         spin_unlock(&ua->proc->locks_spin);
4660  out_put:
4661         dlm_put_lkb(lkb);
4662  out:
4663         dlm_unlock_recovery(ls);
4664         kfree(ua_tmp);
4665         return error;
4666 }
4667
4668 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4669                     uint32_t flags, uint32_t lkid)
4670 {
4671         struct dlm_lkb *lkb;
4672         struct dlm_args args;
4673         struct dlm_user_args *ua;
4674         int error;
4675
4676         dlm_lock_recovery(ls);
4677
4678         error = find_lkb(ls, lkid, &lkb);
4679         if (error)
4680                 goto out;
4681
4682         ua = lkb->lkb_ua;
4683         if (ua_tmp->castparam)
4684                 ua->castparam = ua_tmp->castparam;
4685         ua->user_lksb = ua_tmp->user_lksb;
4686
4687         error = set_unlock_args(flags, ua, &args);
4688         if (error)
4689                 goto out_put;
4690
4691         error = cancel_lock(ls, lkb, &args);
4692
4693         if (error == -DLM_ECANCEL)
4694                 error = 0;
4695         /* from validate_unlock_args() */
4696         if (error == -EBUSY)
4697                 error = 0;
4698  out_put:
4699         dlm_put_lkb(lkb);
4700  out:
4701         dlm_unlock_recovery(ls);
4702         kfree(ua_tmp);
4703         return error;
4704 }
4705
4706 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4707 {
4708         struct dlm_lkb *lkb;
4709         struct dlm_args args;
4710         struct dlm_user_args *ua;
4711         struct dlm_rsb *r;
4712         int error;
4713
4714         dlm_lock_recovery(ls);
4715
4716         error = find_lkb(ls, lkid, &lkb);
4717         if (error)
4718                 goto out;
4719
4720         ua = lkb->lkb_ua;
4721
4722         error = set_unlock_args(flags, ua, &args);
4723         if (error)
4724                 goto out_put;
4725
4726         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4727
4728         r = lkb->lkb_resource;
4729         hold_rsb(r);
4730         lock_rsb(r);
4731
4732         error = validate_unlock_args(lkb, &args);
4733         if (error)
4734                 goto out_r;
4735         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4736
4737         error = _cancel_lock(r, lkb);
4738  out_r:
4739         unlock_rsb(r);
4740         put_rsb(r);
4741
4742         if (error == -DLM_ECANCEL)
4743                 error = 0;
4744         /* from validate_unlock_args() */
4745         if (error == -EBUSY)
4746                 error = 0;
4747  out_put:
4748         dlm_put_lkb(lkb);
4749  out:
4750         dlm_unlock_recovery(ls);
4751         return error;
4752 }
4753
4754 /* lkb's that are removed from the waiters list by revert are just left on the
4755    orphans list with the granted orphan locks, to be freed by purge */
4756
4757 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4758 {
4759         struct dlm_args args;
4760         int error;
4761
4762         hold_lkb(lkb);
4763         mutex_lock(&ls->ls_orphans_mutex);
4764         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4765         mutex_unlock(&ls->ls_orphans_mutex);
4766
4767         set_unlock_args(0, lkb->lkb_ua, &args);
4768
4769         error = cancel_lock(ls, lkb, &args);
4770         if (error == -DLM_ECANCEL)
4771                 error = 0;
4772         return error;
4773 }
4774
4775 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4776    Regardless of what rsb queue the lock is on, it's removed and freed. */
4777
4778 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4779 {
4780         struct dlm_args args;
4781         int error;
4782
4783         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4784
4785         error = unlock_lock(ls, lkb, &args);
4786         if (error == -DLM_EUNLOCK)
4787                 error = 0;
4788         return error;
4789 }
4790
4791 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4792    (which does lock_rsb) due to deadlock with receiving a message that does
4793    lock_rsb followed by dlm_user_add_ast() */
4794
4795 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4796                                      struct dlm_user_proc *proc)
4797 {
4798         struct dlm_lkb *lkb = NULL;
4799
4800         mutex_lock(&ls->ls_clear_proc_locks);
4801         if (list_empty(&proc->locks))
4802                 goto out;
4803
4804         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4805         list_del_init(&lkb->lkb_ownqueue);
4806
4807         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4808                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4809         else
4810                 lkb->lkb_flags |= DLM_IFL_DEAD;
4811  out:
4812         mutex_unlock(&ls->ls_clear_proc_locks);
4813         return lkb;
4814 }
4815
4816 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4817    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4818    which we clear here. */
4819
4820 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4821    list, and no more device_writes should add lkb's to proc->locks list; so we
4822    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4823    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4824    them ourself. */
4825
4826 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4827 {
4828         struct dlm_lkb *lkb, *safe;
4829
4830         dlm_lock_recovery(ls);
4831
4832         while (1) {
4833                 lkb = del_proc_lock(ls, proc);
4834                 if (!lkb)
4835                         break;
4836                 del_timeout(lkb);
4837                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4838                         orphan_proc_lock(ls, lkb);
4839                 else
4840                         unlock_proc_lock(ls, lkb);
4841
4842                 /* this removes the reference for the proc->locks list
4843                    added by dlm_user_request, it may result in the lkb
4844                    being freed */
4845
4846                 dlm_put_lkb(lkb);
4847         }
4848
4849         mutex_lock(&ls->ls_clear_proc_locks);
4850
4851         /* in-progress unlocks */
4852         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4853                 list_del_init(&lkb->lkb_ownqueue);
4854                 lkb->lkb_flags |= DLM_IFL_DEAD;
4855                 dlm_put_lkb(lkb);
4856         }
4857
4858         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4859                 lkb->lkb_ast_type = 0;
4860                 list_del(&lkb->lkb_astqueue);
4861                 dlm_put_lkb(lkb);
4862         }
4863
4864         mutex_unlock(&ls->ls_clear_proc_locks);
4865         dlm_unlock_recovery(ls);
4866 }
4867
4868 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4869 {
4870         struct dlm_lkb *lkb, *safe;
4871
4872         while (1) {
4873                 lkb = NULL;
4874                 spin_lock(&proc->locks_spin);
4875                 if (!list_empty(&proc->locks)) {
4876                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4877                                          lkb_ownqueue);
4878                         list_del_init(&lkb->lkb_ownqueue);
4879                 }
4880                 spin_unlock(&proc->locks_spin);
4881
4882                 if (!lkb)
4883                         break;
4884
4885                 lkb->lkb_flags |= DLM_IFL_DEAD;
4886                 unlock_proc_lock(ls, lkb);
4887                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4888         }
4889
4890         spin_lock(&proc->locks_spin);
4891         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4892                 list_del_init(&lkb->lkb_ownqueue);
4893                 lkb->lkb_flags |= DLM_IFL_DEAD;
4894                 dlm_put_lkb(lkb);
4895         }
4896         spin_unlock(&proc->locks_spin);
4897
4898         spin_lock(&proc->asts_spin);
4899         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4900                 list_del(&lkb->lkb_astqueue);
4901                 dlm_put_lkb(lkb);
4902         }
4903         spin_unlock(&proc->asts_spin);
4904 }
4905
4906 /* pid of 0 means purge all orphans */
4907
4908 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4909 {
4910         struct dlm_lkb *lkb, *safe;
4911
4912         mutex_lock(&ls->ls_orphans_mutex);
4913         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4914                 if (pid && lkb->lkb_ownpid != pid)
4915                         continue;
4916                 unlock_proc_lock(ls, lkb);
4917                 list_del_init(&lkb->lkb_ownqueue);
4918                 dlm_put_lkb(lkb);
4919         }
4920         mutex_unlock(&ls->ls_orphans_mutex);
4921 }
4922
4923 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4924 {
4925         struct dlm_message *ms;
4926         struct dlm_mhandle *mh;
4927         int error;
4928
4929         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4930                                 DLM_MSG_PURGE, &ms, &mh);
4931         if (error)
4932                 return error;
4933         ms->m_nodeid = nodeid;
4934         ms->m_pid = pid;
4935
4936         return send_message(mh, ms);
4937 }
4938
4939 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4940                    int nodeid, int pid)
4941 {
4942         int error = 0;
4943
4944         if (nodeid != dlm_our_nodeid()) {
4945                 error = send_purge(ls, nodeid, pid);
4946         } else {
4947                 dlm_lock_recovery(ls);
4948                 if (pid == current->pid)
4949                         purge_proc_locks(ls, proc);
4950                 else
4951                         do_purge(ls, nodeid, pid);
4952                 dlm_unlock_recovery(ls);
4953         }
4954         return error;
4955 }
4956