dlm: clear defunct cancel state
[linux-2.6] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87                                     struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132
133 #define modes_compat(gr, rq) \
134         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167
168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171                r->res_nodeid, r->res_flags, r->res_first_lkid,
172                r->res_recover_locks_count, r->res_name);
173 }
174
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177         struct dlm_lkb *lkb;
178
179         dlm_print_rsb(r);
180
181         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183         printk(KERN_ERR "rsb lookup list\n");
184         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185                 dlm_print_lkb(lkb);
186         printk(KERN_ERR "rsb grant queue:\n");
187         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb convert queue:\n");
190         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb wait queue:\n");
193         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195 }
196
197 /* Threads cannot use the lockspace while it's being recovered */
198
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201         down_read(&ls->ls_in_recovery);
202 }
203
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206         up_read(&ls->ls_in_recovery);
207 }
208
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211         return down_read_trylock(&ls->ls_in_recovery);
212 }
213
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242         return !!r->res_nodeid;
243 }
244
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261                 return 1;
262         return 0;
263 }
264
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283                                   DLM_IFL_OVERLAP_CANCEL));
284 }
285
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288         if (is_master_copy(lkb))
289                 return;
290
291         del_timeout(lkb);
292
293         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294
295         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
296            timeout caused the cancel then return -ETIMEDOUT */
297         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299                 rv = -ETIMEDOUT;
300         }
301
302         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304                 rv = -EDEADLK;
305         }
306
307         lkb->lkb_lksb->sb_status = rv;
308         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309
310         dlm_add_ast(lkb, AST_COMP, 0);
311 }
312
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315         queue_cast(r, lkb,
316                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321         lkb->lkb_time_bast = ktime_get();
322
323         if (is_master_copy(lkb))
324                 send_bast(r, lkb, rqmode);
325         else
326                 dlm_add_ast(lkb, AST_BAST, rqmode);
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
334 {
335         struct dlm_rsb *r;
336
337         r = dlm_allocate_rsb(ls, len);
338         if (!r)
339                 return NULL;
340
341         r->res_ls = ls;
342         r->res_length = len;
343         memcpy(r->res_name, name, len);
344         mutex_init(&r->res_mutex);
345
346         INIT_LIST_HEAD(&r->res_lookup);
347         INIT_LIST_HEAD(&r->res_grantqueue);
348         INIT_LIST_HEAD(&r->res_convertqueue);
349         INIT_LIST_HEAD(&r->res_waitqueue);
350         INIT_LIST_HEAD(&r->res_root_list);
351         INIT_LIST_HEAD(&r->res_recover_list);
352
353         return r;
354 }
355
356 static int search_rsb_list(struct list_head *head, char *name, int len,
357                            unsigned int flags, struct dlm_rsb **r_ret)
358 {
359         struct dlm_rsb *r;
360         int error = 0;
361
362         list_for_each_entry(r, head, res_hashchain) {
363                 if (len == r->res_length && !memcmp(name, r->res_name, len))
364                         goto found;
365         }
366         *r_ret = NULL;
367         return -EBADR;
368
369  found:
370         if (r->res_nodeid && (flags & R_MASTER))
371                 error = -ENOTBLK;
372         *r_ret = r;
373         return error;
374 }
375
376 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
377                        unsigned int flags, struct dlm_rsb **r_ret)
378 {
379         struct dlm_rsb *r;
380         int error;
381
382         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
383         if (!error) {
384                 kref_get(&r->res_ref);
385                 goto out;
386         }
387         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
388         if (error)
389                 goto out;
390
391         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
392
393         if (dlm_no_directory(ls))
394                 goto out;
395
396         if (r->res_nodeid == -1) {
397                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
398                 r->res_first_lkid = 0;
399         } else if (r->res_nodeid > 0) {
400                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
401                 r->res_first_lkid = 0;
402         } else {
403                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
404                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
405         }
406  out:
407         *r_ret = r;
408         return error;
409 }
410
411 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
412                       unsigned int flags, struct dlm_rsb **r_ret)
413 {
414         int error;
415         spin_lock(&ls->ls_rsbtbl[b].lock);
416         error = _search_rsb(ls, name, len, b, flags, r_ret);
417         spin_unlock(&ls->ls_rsbtbl[b].lock);
418         return error;
419 }
420
421 /*
422  * Find rsb in rsbtbl and potentially create/add one
423  *
424  * Delaying the release of rsb's has a similar benefit to applications keeping
425  * NL locks on an rsb, but without the guarantee that the cached master value
426  * will still be valid when the rsb is reused.  Apps aren't always smart enough
427  * to keep NL locks on an rsb that they may lock again shortly; this can lead
428  * to excessive master lookups and removals if we don't delay the release.
429  *
430  * Searching for an rsb means looking through both the normal list and toss
431  * list.  When found on the toss list the rsb is moved to the normal list with
432  * ref count of 1; when found on normal list the ref count is incremented.
433  */
434
435 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
436                     unsigned int flags, struct dlm_rsb **r_ret)
437 {
438         struct dlm_rsb *r, *tmp;
439         uint32_t hash, bucket;
440         int error = -EINVAL;
441
442         if (namelen > DLM_RESNAME_MAXLEN)
443                 goto out;
444
445         if (dlm_no_directory(ls))
446                 flags |= R_CREATE;
447
448         error = 0;
449         hash = jhash(name, namelen, 0);
450         bucket = hash & (ls->ls_rsbtbl_size - 1);
451
452         error = search_rsb(ls, name, namelen, bucket, flags, &r);
453         if (!error)
454                 goto out;
455
456         if (error == -EBADR && !(flags & R_CREATE))
457                 goto out;
458
459         /* the rsb was found but wasn't a master copy */
460         if (error == -ENOTBLK)
461                 goto out;
462
463         error = -ENOMEM;
464         r = create_rsb(ls, name, namelen);
465         if (!r)
466                 goto out;
467
468         r->res_hash = hash;
469         r->res_bucket = bucket;
470         r->res_nodeid = -1;
471         kref_init(&r->res_ref);
472
473         /* With no directory, the master can be set immediately */
474         if (dlm_no_directory(ls)) {
475                 int nodeid = dlm_dir_nodeid(r);
476                 if (nodeid == dlm_our_nodeid())
477                         nodeid = 0;
478                 r->res_nodeid = nodeid;
479         }
480
481         spin_lock(&ls->ls_rsbtbl[bucket].lock);
482         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
483         if (!error) {
484                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
485                 dlm_free_rsb(r);
486                 r = tmp;
487                 goto out;
488         }
489         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
490         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
491         error = 0;
492  out:
493         *r_ret = r;
494         return error;
495 }
496
497 /* This is only called to add a reference when the code already holds
498    a valid reference to the rsb, so there's no need for locking. */
499
500 static inline void hold_rsb(struct dlm_rsb *r)
501 {
502         kref_get(&r->res_ref);
503 }
504
505 void dlm_hold_rsb(struct dlm_rsb *r)
506 {
507         hold_rsb(r);
508 }
509
510 static void toss_rsb(struct kref *kref)
511 {
512         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
513         struct dlm_ls *ls = r->res_ls;
514
515         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
516         kref_init(&r->res_ref);
517         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
518         r->res_toss_time = jiffies;
519         if (r->res_lvbptr) {
520                 dlm_free_lvb(r->res_lvbptr);
521                 r->res_lvbptr = NULL;
522         }
523 }
524
525 /* When all references to the rsb are gone it's transfered to
526    the tossed list for later disposal. */
527
528 static void put_rsb(struct dlm_rsb *r)
529 {
530         struct dlm_ls *ls = r->res_ls;
531         uint32_t bucket = r->res_bucket;
532
533         spin_lock(&ls->ls_rsbtbl[bucket].lock);
534         kref_put(&r->res_ref, toss_rsb);
535         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
536 }
537
538 void dlm_put_rsb(struct dlm_rsb *r)
539 {
540         put_rsb(r);
541 }
542
543 /* See comment for unhold_lkb */
544
545 static void unhold_rsb(struct dlm_rsb *r)
546 {
547         int rv;
548         rv = kref_put(&r->res_ref, toss_rsb);
549         DLM_ASSERT(!rv, dlm_dump_rsb(r););
550 }
551
552 static void kill_rsb(struct kref *kref)
553 {
554         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
555
556         /* All work is done after the return from kref_put() so we
557            can release the write_lock before the remove and free. */
558
559         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
560         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
561         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
562         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
563         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
564         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
565 }
566
567 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
568    The rsb must exist as long as any lkb's for it do. */
569
570 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
571 {
572         hold_rsb(r);
573         lkb->lkb_resource = r;
574 }
575
576 static void detach_lkb(struct dlm_lkb *lkb)
577 {
578         if (lkb->lkb_resource) {
579                 put_rsb(lkb->lkb_resource);
580                 lkb->lkb_resource = NULL;
581         }
582 }
583
584 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
585 {
586         struct dlm_lkb *lkb, *tmp;
587         uint32_t lkid = 0;
588         uint16_t bucket;
589
590         lkb = dlm_allocate_lkb(ls);
591         if (!lkb)
592                 return -ENOMEM;
593
594         lkb->lkb_nodeid = -1;
595         lkb->lkb_grmode = DLM_LOCK_IV;
596         kref_init(&lkb->lkb_ref);
597         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
598         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
599         INIT_LIST_HEAD(&lkb->lkb_time_list);
600
601         get_random_bytes(&bucket, sizeof(bucket));
602         bucket &= (ls->ls_lkbtbl_size - 1);
603
604         write_lock(&ls->ls_lkbtbl[bucket].lock);
605
606         /* counter can roll over so we must verify lkid is not in use */
607
608         while (lkid == 0) {
609                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
610
611                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
612                                     lkb_idtbl_list) {
613                         if (tmp->lkb_id != lkid)
614                                 continue;
615                         lkid = 0;
616                         break;
617                 }
618         }
619
620         lkb->lkb_id = lkid;
621         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
622         write_unlock(&ls->ls_lkbtbl[bucket].lock);
623
624         *lkb_ret = lkb;
625         return 0;
626 }
627
628 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
629 {
630         struct dlm_lkb *lkb;
631         uint16_t bucket = (lkid >> 16);
632
633         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
634                 if (lkb->lkb_id == lkid)
635                         return lkb;
636         }
637         return NULL;
638 }
639
640 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
641 {
642         struct dlm_lkb *lkb;
643         uint16_t bucket = (lkid >> 16);
644
645         if (bucket >= ls->ls_lkbtbl_size)
646                 return -EBADSLT;
647
648         read_lock(&ls->ls_lkbtbl[bucket].lock);
649         lkb = __find_lkb(ls, lkid);
650         if (lkb)
651                 kref_get(&lkb->lkb_ref);
652         read_unlock(&ls->ls_lkbtbl[bucket].lock);
653
654         *lkb_ret = lkb;
655         return lkb ? 0 : -ENOENT;
656 }
657
658 static void kill_lkb(struct kref *kref)
659 {
660         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
661
662         /* All work is done after the return from kref_put() so we
663            can release the write_lock before the detach_lkb */
664
665         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
666 }
667
668 /* __put_lkb() is used when an lkb may not have an rsb attached to
669    it so we need to provide the lockspace explicitly */
670
671 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
672 {
673         uint16_t bucket = (lkb->lkb_id >> 16);
674
675         write_lock(&ls->ls_lkbtbl[bucket].lock);
676         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
677                 list_del(&lkb->lkb_idtbl_list);
678                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
679
680                 detach_lkb(lkb);
681
682                 /* for local/process lkbs, lvbptr points to caller's lksb */
683                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
684                         dlm_free_lvb(lkb->lkb_lvbptr);
685                 dlm_free_lkb(lkb);
686                 return 1;
687         } else {
688                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
689                 return 0;
690         }
691 }
692
693 int dlm_put_lkb(struct dlm_lkb *lkb)
694 {
695         struct dlm_ls *ls;
696
697         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
698         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
699
700         ls = lkb->lkb_resource->res_ls;
701         return __put_lkb(ls, lkb);
702 }
703
704 /* This is only called to add a reference when the code already holds
705    a valid reference to the lkb, so there's no need for locking. */
706
707 static inline void hold_lkb(struct dlm_lkb *lkb)
708 {
709         kref_get(&lkb->lkb_ref);
710 }
711
712 /* This is called when we need to remove a reference and are certain
713    it's not the last ref.  e.g. del_lkb is always called between a
714    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
715    put_lkb would work fine, but would involve unnecessary locking */
716
717 static inline void unhold_lkb(struct dlm_lkb *lkb)
718 {
719         int rv;
720         rv = kref_put(&lkb->lkb_ref, kill_lkb);
721         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
722 }
723
724 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
725                             int mode)
726 {
727         struct dlm_lkb *lkb = NULL;
728
729         list_for_each_entry(lkb, head, lkb_statequeue)
730                 if (lkb->lkb_rqmode < mode)
731                         break;
732
733         if (!lkb)
734                 list_add_tail(new, head);
735         else
736                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
737 }
738
739 /* add/remove lkb to rsb's grant/convert/wait queue */
740
741 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
742 {
743         kref_get(&lkb->lkb_ref);
744
745         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
746
747         lkb->lkb_timestamp = ktime_get();
748
749         lkb->lkb_status = status;
750
751         switch (status) {
752         case DLM_LKSTS_WAITING:
753                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
754                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
755                 else
756                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
757                 break;
758         case DLM_LKSTS_GRANTED:
759                 /* convention says granted locks kept in order of grmode */
760                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
761                                 lkb->lkb_grmode);
762                 break;
763         case DLM_LKSTS_CONVERT:
764                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
765                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
766                 else
767                         list_add_tail(&lkb->lkb_statequeue,
768                                       &r->res_convertqueue);
769                 break;
770         default:
771                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
772         }
773 }
774
775 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
776 {
777         lkb->lkb_status = 0;
778         list_del(&lkb->lkb_statequeue);
779         unhold_lkb(lkb);
780 }
781
782 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
783 {
784         hold_lkb(lkb);
785         del_lkb(r, lkb);
786         add_lkb(r, lkb, sts);
787         unhold_lkb(lkb);
788 }
789
790 static int msg_reply_type(int mstype)
791 {
792         switch (mstype) {
793         case DLM_MSG_REQUEST:
794                 return DLM_MSG_REQUEST_REPLY;
795         case DLM_MSG_CONVERT:
796                 return DLM_MSG_CONVERT_REPLY;
797         case DLM_MSG_UNLOCK:
798                 return DLM_MSG_UNLOCK_REPLY;
799         case DLM_MSG_CANCEL:
800                 return DLM_MSG_CANCEL_REPLY;
801         case DLM_MSG_LOOKUP:
802                 return DLM_MSG_LOOKUP_REPLY;
803         }
804         return -1;
805 }
806
807 /* add/remove lkb from global waiters list of lkb's waiting for
808    a reply from a remote node */
809
810 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
811 {
812         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
813         int error = 0;
814
815         mutex_lock(&ls->ls_waiters_mutex);
816
817         if (is_overlap_unlock(lkb) ||
818             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
819                 error = -EINVAL;
820                 goto out;
821         }
822
823         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
824                 switch (mstype) {
825                 case DLM_MSG_UNLOCK:
826                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
827                         break;
828                 case DLM_MSG_CANCEL:
829                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
830                         break;
831                 default:
832                         error = -EBUSY;
833                         goto out;
834                 }
835                 lkb->lkb_wait_count++;
836                 hold_lkb(lkb);
837
838                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
839                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
840                           lkb->lkb_wait_count, lkb->lkb_flags);
841                 goto out;
842         }
843
844         DLM_ASSERT(!lkb->lkb_wait_count,
845                    dlm_print_lkb(lkb);
846                    printk("wait_count %d\n", lkb->lkb_wait_count););
847
848         lkb->lkb_wait_count++;
849         lkb->lkb_wait_type = mstype;
850         hold_lkb(lkb);
851         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
852  out:
853         if (error)
854                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
855                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
856                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
857         mutex_unlock(&ls->ls_waiters_mutex);
858         return error;
859 }
860
861 /* We clear the RESEND flag because we might be taking an lkb off the waiters
862    list as part of process_requestqueue (e.g. a lookup that has an optimized
863    request reply on the requestqueue) between dlm_recover_waiters_pre() which
864    set RESEND and dlm_recover_waiters_post() */
865
866 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
867                                 struct dlm_message *ms)
868 {
869         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
870         int overlap_done = 0;
871
872         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
873                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
874                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
875                 overlap_done = 1;
876                 goto out_del;
877         }
878
879         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
880                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
881                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
882                 overlap_done = 1;
883                 goto out_del;
884         }
885
886         /* Cancel state was preemptively cleared by a successful convert,
887            see next comment, nothing to do. */
888
889         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
890             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
891                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
892                           lkb->lkb_id, lkb->lkb_wait_type);
893                 return -1;
894         }
895
896         /* Remove for the convert reply, and premptively remove for the
897            cancel reply.  A convert has been granted while there's still
898            an outstanding cancel on it (the cancel is moot and the result
899            in the cancel reply should be 0).  We preempt the cancel reply
900            because the app gets the convert result and then can follow up
901            with another op, like convert.  This subsequent op would see the
902            lingering state of the cancel and fail with -EBUSY. */
903
904         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
905             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
906             is_overlap_cancel(lkb) && ms && !ms->m_result) {
907                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
908                           lkb->lkb_id);
909                 lkb->lkb_wait_type = 0;
910                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
911                 lkb->lkb_wait_count--;
912                 goto out_del;
913         }
914
915         /* N.B. type of reply may not always correspond to type of original
916            msg due to lookup->request optimization, verify others? */
917
918         if (lkb->lkb_wait_type) {
919                 lkb->lkb_wait_type = 0;
920                 goto out_del;
921         }
922
923         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
924                   lkb->lkb_id, mstype, lkb->lkb_flags);
925         return -1;
926
927  out_del:
928         /* the force-unlock/cancel has completed and we haven't recvd a reply
929            to the op that was in progress prior to the unlock/cancel; we
930            give up on any reply to the earlier op.  FIXME: not sure when/how
931            this would happen */
932
933         if (overlap_done && lkb->lkb_wait_type) {
934                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
935                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
936                 lkb->lkb_wait_count--;
937                 lkb->lkb_wait_type = 0;
938         }
939
940         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
941
942         lkb->lkb_flags &= ~DLM_IFL_RESEND;
943         lkb->lkb_wait_count--;
944         if (!lkb->lkb_wait_count)
945                 list_del_init(&lkb->lkb_wait_reply);
946         unhold_lkb(lkb);
947         return 0;
948 }
949
950 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
951 {
952         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
953         int error;
954
955         mutex_lock(&ls->ls_waiters_mutex);
956         error = _remove_from_waiters(lkb, mstype, NULL);
957         mutex_unlock(&ls->ls_waiters_mutex);
958         return error;
959 }
960
961 /* Handles situations where we might be processing a "fake" or "stub" reply in
962    which we can't try to take waiters_mutex again. */
963
964 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
965 {
966         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
967         int error;
968
969         if (ms != &ls->ls_stub_ms)
970                 mutex_lock(&ls->ls_waiters_mutex);
971         error = _remove_from_waiters(lkb, ms->m_type, ms);
972         if (ms != &ls->ls_stub_ms)
973                 mutex_unlock(&ls->ls_waiters_mutex);
974         return error;
975 }
976
977 static void dir_remove(struct dlm_rsb *r)
978 {
979         int to_nodeid;
980
981         if (dlm_no_directory(r->res_ls))
982                 return;
983
984         to_nodeid = dlm_dir_nodeid(r);
985         if (to_nodeid != dlm_our_nodeid())
986                 send_remove(r);
987         else
988                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
989                                      r->res_name, r->res_length);
990 }
991
992 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
993    found since they are in order of newest to oldest? */
994
995 static int shrink_bucket(struct dlm_ls *ls, int b)
996 {
997         struct dlm_rsb *r;
998         int count = 0, found;
999
1000         for (;;) {
1001                 found = 0;
1002                 spin_lock(&ls->ls_rsbtbl[b].lock);
1003                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1004                                             res_hashchain) {
1005                         if (!time_after_eq(jiffies, r->res_toss_time +
1006                                            dlm_config.ci_toss_secs * HZ))
1007                                 continue;
1008                         found = 1;
1009                         break;
1010                 }
1011
1012                 if (!found) {
1013                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1014                         break;
1015                 }
1016
1017                 if (kref_put(&r->res_ref, kill_rsb)) {
1018                         list_del(&r->res_hashchain);
1019                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1020
1021                         if (is_master(r))
1022                                 dir_remove(r);
1023                         dlm_free_rsb(r);
1024                         count++;
1025                 } else {
1026                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1027                         log_error(ls, "tossed rsb in use %s", r->res_name);
1028                 }
1029         }
1030
1031         return count;
1032 }
1033
1034 void dlm_scan_rsbs(struct dlm_ls *ls)
1035 {
1036         int i;
1037
1038         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1039                 shrink_bucket(ls, i);
1040                 if (dlm_locking_stopped(ls))
1041                         break;
1042                 cond_resched();
1043         }
1044 }
1045
1046 static void add_timeout(struct dlm_lkb *lkb)
1047 {
1048         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1049
1050         if (is_master_copy(lkb))
1051                 return;
1052
1053         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1054             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1055                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1056                 goto add_it;
1057         }
1058         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1059                 goto add_it;
1060         return;
1061
1062  add_it:
1063         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1064         mutex_lock(&ls->ls_timeout_mutex);
1065         hold_lkb(lkb);
1066         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1067         mutex_unlock(&ls->ls_timeout_mutex);
1068 }
1069
1070 static void del_timeout(struct dlm_lkb *lkb)
1071 {
1072         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1073
1074         mutex_lock(&ls->ls_timeout_mutex);
1075         if (!list_empty(&lkb->lkb_time_list)) {
1076                 list_del_init(&lkb->lkb_time_list);
1077                 unhold_lkb(lkb);
1078         }
1079         mutex_unlock(&ls->ls_timeout_mutex);
1080 }
1081
1082 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1083    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1084    and then lock rsb because of lock ordering in add_timeout.  We may need
1085    to specify some special timeout-related bits in the lkb that are just to
1086    be accessed under the timeout_mutex. */
1087
1088 void dlm_scan_timeout(struct dlm_ls *ls)
1089 {
1090         struct dlm_rsb *r;
1091         struct dlm_lkb *lkb;
1092         int do_cancel, do_warn;
1093         s64 wait_us;
1094
1095         for (;;) {
1096                 if (dlm_locking_stopped(ls))
1097                         break;
1098
1099                 do_cancel = 0;
1100                 do_warn = 0;
1101                 mutex_lock(&ls->ls_timeout_mutex);
1102                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1103
1104                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1105                                                         lkb->lkb_timestamp));
1106
1107                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1108                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1109                                 do_cancel = 1;
1110
1111                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1112                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1113                                 do_warn = 1;
1114
1115                         if (!do_cancel && !do_warn)
1116                                 continue;
1117                         hold_lkb(lkb);
1118                         break;
1119                 }
1120                 mutex_unlock(&ls->ls_timeout_mutex);
1121
1122                 if (!do_cancel && !do_warn)
1123                         break;
1124
1125                 r = lkb->lkb_resource;
1126                 hold_rsb(r);
1127                 lock_rsb(r);
1128
1129                 if (do_warn) {
1130                         /* clear flag so we only warn once */
1131                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1132                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1133                                 del_timeout(lkb);
1134                         dlm_timeout_warn(lkb);
1135                 }
1136
1137                 if (do_cancel) {
1138                         log_debug(ls, "timeout cancel %x node %d %s",
1139                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1140                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1141                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1142                         del_timeout(lkb);
1143                         _cancel_lock(r, lkb);
1144                 }
1145
1146                 unlock_rsb(r);
1147                 unhold_rsb(r);
1148                 dlm_put_lkb(lkb);
1149         }
1150 }
1151
1152 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1153    dlm_recoverd before checking/setting ls_recover_begin. */
1154
1155 void dlm_adjust_timeouts(struct dlm_ls *ls)
1156 {
1157         struct dlm_lkb *lkb;
1158         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1159
1160         ls->ls_recover_begin = 0;
1161         mutex_lock(&ls->ls_timeout_mutex);
1162         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1163                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1164         mutex_unlock(&ls->ls_timeout_mutex);
1165 }
1166
1167 /* lkb is master or local copy */
1168
1169 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1170 {
1171         int b, len = r->res_ls->ls_lvblen;
1172
1173         /* b=1 lvb returned to caller
1174            b=0 lvb written to rsb or invalidated
1175            b=-1 do nothing */
1176
1177         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1178
1179         if (b == 1) {
1180                 if (!lkb->lkb_lvbptr)
1181                         return;
1182
1183                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1184                         return;
1185
1186                 if (!r->res_lvbptr)
1187                         return;
1188
1189                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1190                 lkb->lkb_lvbseq = r->res_lvbseq;
1191
1192         } else if (b == 0) {
1193                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1194                         rsb_set_flag(r, RSB_VALNOTVALID);
1195                         return;
1196                 }
1197
1198                 if (!lkb->lkb_lvbptr)
1199                         return;
1200
1201                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1202                         return;
1203
1204                 if (!r->res_lvbptr)
1205                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1206
1207                 if (!r->res_lvbptr)
1208                         return;
1209
1210                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1211                 r->res_lvbseq++;
1212                 lkb->lkb_lvbseq = r->res_lvbseq;
1213                 rsb_clear_flag(r, RSB_VALNOTVALID);
1214         }
1215
1216         if (rsb_flag(r, RSB_VALNOTVALID))
1217                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1218 }
1219
1220 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1221 {
1222         if (lkb->lkb_grmode < DLM_LOCK_PW)
1223                 return;
1224
1225         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1226                 rsb_set_flag(r, RSB_VALNOTVALID);
1227                 return;
1228         }
1229
1230         if (!lkb->lkb_lvbptr)
1231                 return;
1232
1233         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1234                 return;
1235
1236         if (!r->res_lvbptr)
1237                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1238
1239         if (!r->res_lvbptr)
1240                 return;
1241
1242         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1243         r->res_lvbseq++;
1244         rsb_clear_flag(r, RSB_VALNOTVALID);
1245 }
1246
1247 /* lkb is process copy (pc) */
1248
1249 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1250                             struct dlm_message *ms)
1251 {
1252         int b;
1253
1254         if (!lkb->lkb_lvbptr)
1255                 return;
1256
1257         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1258                 return;
1259
1260         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1261         if (b == 1) {
1262                 int len = receive_extralen(ms);
1263                 if (len > DLM_RESNAME_MAXLEN)
1264                         len = DLM_RESNAME_MAXLEN;
1265                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1266                 lkb->lkb_lvbseq = ms->m_lvbseq;
1267         }
1268 }
1269
1270 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1271    remove_lock -- used for unlock, removes lkb from granted
1272    revert_lock -- used for cancel, moves lkb from convert to granted
1273    grant_lock  -- used for request and convert, adds lkb to granted or
1274                   moves lkb from convert or waiting to granted
1275
1276    Each of these is used for master or local copy lkb's.  There is
1277    also a _pc() variation used to make the corresponding change on
1278    a process copy (pc) lkb. */
1279
1280 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1281 {
1282         del_lkb(r, lkb);
1283         lkb->lkb_grmode = DLM_LOCK_IV;
1284         /* this unhold undoes the original ref from create_lkb()
1285            so this leads to the lkb being freed */
1286         unhold_lkb(lkb);
1287 }
1288
1289 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1290 {
1291         set_lvb_unlock(r, lkb);
1292         _remove_lock(r, lkb);
1293 }
1294
1295 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1296 {
1297         _remove_lock(r, lkb);
1298 }
1299
1300 /* returns: 0 did nothing
1301             1 moved lock to granted
1302            -1 removed lock */
1303
1304 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1305 {
1306         int rv = 0;
1307
1308         lkb->lkb_rqmode = DLM_LOCK_IV;
1309
1310         switch (lkb->lkb_status) {
1311         case DLM_LKSTS_GRANTED:
1312                 break;
1313         case DLM_LKSTS_CONVERT:
1314                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1315                 rv = 1;
1316                 break;
1317         case DLM_LKSTS_WAITING:
1318                 del_lkb(r, lkb);
1319                 lkb->lkb_grmode = DLM_LOCK_IV;
1320                 /* this unhold undoes the original ref from create_lkb()
1321                    so this leads to the lkb being freed */
1322                 unhold_lkb(lkb);
1323                 rv = -1;
1324                 break;
1325         default:
1326                 log_print("invalid status for revert %d", lkb->lkb_status);
1327         }
1328         return rv;
1329 }
1330
1331 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1332 {
1333         return revert_lock(r, lkb);
1334 }
1335
1336 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1337 {
1338         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1339                 lkb->lkb_grmode = lkb->lkb_rqmode;
1340                 if (lkb->lkb_status)
1341                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1342                 else
1343                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1344         }
1345
1346         lkb->lkb_rqmode = DLM_LOCK_IV;
1347 }
1348
1349 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1350 {
1351         set_lvb_lock(r, lkb);
1352         _grant_lock(r, lkb);
1353         lkb->lkb_highbast = 0;
1354 }
1355
1356 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1357                           struct dlm_message *ms)
1358 {
1359         set_lvb_lock_pc(r, lkb, ms);
1360         _grant_lock(r, lkb);
1361 }
1362
1363 /* called by grant_pending_locks() which means an async grant message must
1364    be sent to the requesting node in addition to granting the lock if the
1365    lkb belongs to a remote node. */
1366
1367 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1368 {
1369         grant_lock(r, lkb);
1370         if (is_master_copy(lkb))
1371                 send_grant(r, lkb);
1372         else
1373                 queue_cast(r, lkb, 0);
1374 }
1375
1376 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1377    change the granted/requested modes.  We're munging things accordingly in
1378    the process copy.
1379    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1380    conversion deadlock
1381    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1382    compatible with other granted locks */
1383
1384 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1385 {
1386         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1387                 log_print("munge_demoted %x invalid reply type %d",
1388                           lkb->lkb_id, ms->m_type);
1389                 return;
1390         }
1391
1392         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1393                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1394                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1395                 return;
1396         }
1397
1398         lkb->lkb_grmode = DLM_LOCK_NL;
1399 }
1400
1401 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1402 {
1403         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1404             ms->m_type != DLM_MSG_GRANT) {
1405                 log_print("munge_altmode %x invalid reply type %d",
1406                           lkb->lkb_id, ms->m_type);
1407                 return;
1408         }
1409
1410         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1411                 lkb->lkb_rqmode = DLM_LOCK_PR;
1412         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1413                 lkb->lkb_rqmode = DLM_LOCK_CW;
1414         else {
1415                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1416                 dlm_print_lkb(lkb);
1417         }
1418 }
1419
1420 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1421 {
1422         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1423                                            lkb_statequeue);
1424         if (lkb->lkb_id == first->lkb_id)
1425                 return 1;
1426
1427         return 0;
1428 }
1429
1430 /* Check if the given lkb conflicts with another lkb on the queue. */
1431
1432 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1433 {
1434         struct dlm_lkb *this;
1435
1436         list_for_each_entry(this, head, lkb_statequeue) {
1437                 if (this == lkb)
1438                         continue;
1439                 if (!modes_compat(this, lkb))
1440                         return 1;
1441         }
1442         return 0;
1443 }
1444
1445 /*
1446  * "A conversion deadlock arises with a pair of lock requests in the converting
1447  * queue for one resource.  The granted mode of each lock blocks the requested
1448  * mode of the other lock."
1449  *
1450  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1451  * convert queue from being granted, then deadlk/demote lkb.
1452  *
1453  * Example:
1454  * Granted Queue: empty
1455  * Convert Queue: NL->EX (first lock)
1456  *                PR->EX (second lock)
1457  *
1458  * The first lock can't be granted because of the granted mode of the second
1459  * lock and the second lock can't be granted because it's not first in the
1460  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1461  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1462  * flag set and return DEMOTED in the lksb flags.
1463  *
1464  * Originally, this function detected conv-deadlk in a more limited scope:
1465  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1466  * - if lkb1 was the first entry in the queue (not just earlier), and was
1467  *   blocked by the granted mode of lkb2, and there was nothing on the
1468  *   granted queue preventing lkb1 from being granted immediately, i.e.
1469  *   lkb2 was the only thing preventing lkb1 from being granted.
1470  *
1471  * That second condition meant we'd only say there was conv-deadlk if
1472  * resolving it (by demotion) would lead to the first lock on the convert
1473  * queue being granted right away.  It allowed conversion deadlocks to exist
1474  * between locks on the convert queue while they couldn't be granted anyway.
1475  *
1476  * Now, we detect and take action on conversion deadlocks immediately when
1477  * they're created, even if they may not be immediately consequential.  If
1478  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1479  * mode that would prevent lkb1's conversion from being granted, we do a
1480  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1481  * I think this means that the lkb_is_ahead condition below should always
1482  * be zero, i.e. there will never be conv-deadlk between two locks that are
1483  * both already on the convert queue.
1484  */
1485
1486 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1487 {
1488         struct dlm_lkb *lkb1;
1489         int lkb_is_ahead = 0;
1490
1491         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1492                 if (lkb1 == lkb2) {
1493                         lkb_is_ahead = 1;
1494                         continue;
1495                 }
1496
1497                 if (!lkb_is_ahead) {
1498                         if (!modes_compat(lkb2, lkb1))
1499                                 return 1;
1500                 } else {
1501                         if (!modes_compat(lkb2, lkb1) &&
1502                             !modes_compat(lkb1, lkb2))
1503                                 return 1;
1504                 }
1505         }
1506         return 0;
1507 }
1508
1509 /*
1510  * Return 1 if the lock can be granted, 0 otherwise.
1511  * Also detect and resolve conversion deadlocks.
1512  *
1513  * lkb is the lock to be granted
1514  *
1515  * now is 1 if the function is being called in the context of the
1516  * immediate request, it is 0 if called later, after the lock has been
1517  * queued.
1518  *
1519  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1520  */
1521
1522 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1523 {
1524         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1525
1526         /*
1527          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1528          * a new request for a NL mode lock being blocked.
1529          *
1530          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1531          * request, then it would be granted.  In essence, the use of this flag
1532          * tells the Lock Manager to expedite theis request by not considering
1533          * what may be in the CONVERTING or WAITING queues...  As of this
1534          * writing, the EXPEDITE flag can be used only with new requests for NL
1535          * mode locks.  This flag is not valid for conversion requests.
1536          *
1537          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1538          * conversion or used with a non-NL requested mode.  We also know an
1539          * EXPEDITE request is always granted immediately, so now must always
1540          * be 1.  The full condition to grant an expedite request: (now &&
1541          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1542          * therefore be shortened to just checking the flag.
1543          */
1544
1545         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1546                 return 1;
1547
1548         /*
1549          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1550          * added to the remaining conditions.
1551          */
1552
1553         if (queue_conflict(&r->res_grantqueue, lkb))
1554                 goto out;
1555
1556         /*
1557          * 6-3: By default, a conversion request is immediately granted if the
1558          * requested mode is compatible with the modes of all other granted
1559          * locks
1560          */
1561
1562         if (queue_conflict(&r->res_convertqueue, lkb))
1563                 goto out;
1564
1565         /*
1566          * 6-5: But the default algorithm for deciding whether to grant or
1567          * queue conversion requests does not by itself guarantee that such
1568          * requests are serviced on a "first come first serve" basis.  This, in
1569          * turn, can lead to a phenomenon known as "indefinate postponement".
1570          *
1571          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1572          * the system service employed to request a lock conversion.  This flag
1573          * forces certain conversion requests to be queued, even if they are
1574          * compatible with the granted modes of other locks on the same
1575          * resource.  Thus, the use of this flag results in conversion requests
1576          * being ordered on a "first come first servce" basis.
1577          *
1578          * DCT: This condition is all about new conversions being able to occur
1579          * "in place" while the lock remains on the granted queue (assuming
1580          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1581          * doesn't _have_ to go onto the convert queue where it's processed in
1582          * order.  The "now" variable is necessary to distinguish converts
1583          * being received and processed for the first time now, because once a
1584          * convert is moved to the conversion queue the condition below applies
1585          * requiring fifo granting.
1586          */
1587
1588         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1589                 return 1;
1590
1591         /*
1592          * The NOORDER flag is set to avoid the standard vms rules on grant
1593          * order.
1594          */
1595
1596         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1597                 return 1;
1598
1599         /*
1600          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1601          * granted until all other conversion requests ahead of it are granted
1602          * and/or canceled.
1603          */
1604
1605         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1606                 return 1;
1607
1608         /*
1609          * 6-4: By default, a new request is immediately granted only if all
1610          * three of the following conditions are satisfied when the request is
1611          * issued:
1612          * - The queue of ungranted conversion requests for the resource is
1613          *   empty.
1614          * - The queue of ungranted new requests for the resource is empty.
1615          * - The mode of the new request is compatible with the most
1616          *   restrictive mode of all granted locks on the resource.
1617          */
1618
1619         if (now && !conv && list_empty(&r->res_convertqueue) &&
1620             list_empty(&r->res_waitqueue))
1621                 return 1;
1622
1623         /*
1624          * 6-4: Once a lock request is in the queue of ungranted new requests,
1625          * it cannot be granted until the queue of ungranted conversion
1626          * requests is empty, all ungranted new requests ahead of it are
1627          * granted and/or canceled, and it is compatible with the granted mode
1628          * of the most restrictive lock granted on the resource.
1629          */
1630
1631         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1632             first_in_list(lkb, &r->res_waitqueue))
1633                 return 1;
1634  out:
1635         return 0;
1636 }
1637
1638 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1639                           int *err)
1640 {
1641         int rv;
1642         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1643         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1644
1645         if (err)
1646                 *err = 0;
1647
1648         rv = _can_be_granted(r, lkb, now);
1649         if (rv)
1650                 goto out;
1651
1652         /*
1653          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1654          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1655          * cancels one of the locks.
1656          */
1657
1658         if (is_convert && can_be_queued(lkb) &&
1659             conversion_deadlock_detect(r, lkb)) {
1660                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1661                         lkb->lkb_grmode = DLM_LOCK_NL;
1662                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1663                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1664                         if (err)
1665                                 *err = -EDEADLK;
1666                         else {
1667                                 log_print("can_be_granted deadlock %x now %d",
1668                                           lkb->lkb_id, now);
1669                                 dlm_dump_rsb(r);
1670                         }
1671                 }
1672                 goto out;
1673         }
1674
1675         /*
1676          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1677          * to grant a request in a mode other than the normal rqmode.  It's a
1678          * simple way to provide a big optimization to applications that can
1679          * use them.
1680          */
1681
1682         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1683                 alt = DLM_LOCK_PR;
1684         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1685                 alt = DLM_LOCK_CW;
1686
1687         if (alt) {
1688                 lkb->lkb_rqmode = alt;
1689                 rv = _can_be_granted(r, lkb, now);
1690                 if (rv)
1691                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1692                 else
1693                         lkb->lkb_rqmode = rqmode;
1694         }
1695  out:
1696         return rv;
1697 }
1698
1699 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1700    for locks pending on the convert list.  Once verified (watch for these
1701    log_prints), we should be able to just call _can_be_granted() and not
1702    bother with the demote/deadlk cases here (and there's no easy way to deal
1703    with a deadlk here, we'd have to generate something like grant_lock with
1704    the deadlk error.) */
1705
1706 /* Returns the highest requested mode of all blocked conversions; sets
1707    cw if there's a blocked conversion to DLM_LOCK_CW. */
1708
1709 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1710 {
1711         struct dlm_lkb *lkb, *s;
1712         int hi, demoted, quit, grant_restart, demote_restart;
1713         int deadlk;
1714
1715         quit = 0;
1716  restart:
1717         grant_restart = 0;
1718         demote_restart = 0;
1719         hi = DLM_LOCK_IV;
1720
1721         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1722                 demoted = is_demoted(lkb);
1723                 deadlk = 0;
1724
1725                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1726                         grant_lock_pending(r, lkb);
1727                         grant_restart = 1;
1728                         continue;
1729                 }
1730
1731                 if (!demoted && is_demoted(lkb)) {
1732                         log_print("WARN: pending demoted %x node %d %s",
1733                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1734                         demote_restart = 1;
1735                         continue;
1736                 }
1737
1738                 if (deadlk) {
1739                         log_print("WARN: pending deadlock %x node %d %s",
1740                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1741                         dlm_dump_rsb(r);
1742                         continue;
1743                 }
1744
1745                 hi = max_t(int, lkb->lkb_rqmode, hi);
1746
1747                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1748                         *cw = 1;
1749         }
1750
1751         if (grant_restart)
1752                 goto restart;
1753         if (demote_restart && !quit) {
1754                 quit = 1;
1755                 goto restart;
1756         }
1757
1758         return max_t(int, high, hi);
1759 }
1760
1761 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1762 {
1763         struct dlm_lkb *lkb, *s;
1764
1765         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1766                 if (can_be_granted(r, lkb, 0, NULL))
1767                         grant_lock_pending(r, lkb);
1768                 else {
1769                         high = max_t(int, lkb->lkb_rqmode, high);
1770                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1771                                 *cw = 1;
1772                 }
1773         }
1774
1775         return high;
1776 }
1777
1778 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1779    on either the convert or waiting queue.
1780    high is the largest rqmode of all locks blocked on the convert or
1781    waiting queue. */
1782
1783 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1784 {
1785         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1786                 if (gr->lkb_highbast < DLM_LOCK_EX)
1787                         return 1;
1788                 return 0;
1789         }
1790
1791         if (gr->lkb_highbast < high &&
1792             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1793                 return 1;
1794         return 0;
1795 }
1796
1797 static void grant_pending_locks(struct dlm_rsb *r)
1798 {
1799         struct dlm_lkb *lkb, *s;
1800         int high = DLM_LOCK_IV;
1801         int cw = 0;
1802
1803         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1804
1805         high = grant_pending_convert(r, high, &cw);
1806         high = grant_pending_wait(r, high, &cw);
1807
1808         if (high == DLM_LOCK_IV)
1809                 return;
1810
1811         /*
1812          * If there are locks left on the wait/convert queue then send blocking
1813          * ASTs to granted locks based on the largest requested mode (high)
1814          * found above.
1815          */
1816
1817         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1818                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1819                         if (cw && high == DLM_LOCK_PR &&
1820                             lkb->lkb_grmode == DLM_LOCK_PR)
1821                                 queue_bast(r, lkb, DLM_LOCK_CW);
1822                         else
1823                                 queue_bast(r, lkb, high);
1824                         lkb->lkb_highbast = high;
1825                 }
1826         }
1827 }
1828
1829 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1830 {
1831         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1832             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1833                 if (gr->lkb_highbast < DLM_LOCK_EX)
1834                         return 1;
1835                 return 0;
1836         }
1837
1838         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1839                 return 1;
1840         return 0;
1841 }
1842
1843 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1844                             struct dlm_lkb *lkb)
1845 {
1846         struct dlm_lkb *gr;
1847
1848         list_for_each_entry(gr, head, lkb_statequeue) {
1849                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1850                         queue_bast(r, gr, lkb->lkb_rqmode);
1851                         gr->lkb_highbast = lkb->lkb_rqmode;
1852                 }
1853         }
1854 }
1855
1856 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1857 {
1858         send_bast_queue(r, &r->res_grantqueue, lkb);
1859 }
1860
1861 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1862 {
1863         send_bast_queue(r, &r->res_grantqueue, lkb);
1864         send_bast_queue(r, &r->res_convertqueue, lkb);
1865 }
1866
1867 /* set_master(r, lkb) -- set the master nodeid of a resource
1868
1869    The purpose of this function is to set the nodeid field in the given
1870    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1871    known, it can just be copied to the lkb and the function will return
1872    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1873    before it can be copied to the lkb.
1874
1875    When the rsb nodeid is being looked up remotely, the initial lkb
1876    causing the lookup is kept on the ls_waiters list waiting for the
1877    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1878    on the rsb's res_lookup list until the master is verified.
1879
1880    Return values:
1881    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1882    1: the rsb master is not available and the lkb has been placed on
1883       a wait queue
1884 */
1885
1886 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1887 {
1888         struct dlm_ls *ls = r->res_ls;
1889         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1890
1891         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1892                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1893                 r->res_first_lkid = lkb->lkb_id;
1894                 lkb->lkb_nodeid = r->res_nodeid;
1895                 return 0;
1896         }
1897
1898         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1899                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1900                 return 1;
1901         }
1902
1903         if (r->res_nodeid == 0) {
1904                 lkb->lkb_nodeid = 0;
1905                 return 0;
1906         }
1907
1908         if (r->res_nodeid > 0) {
1909                 lkb->lkb_nodeid = r->res_nodeid;
1910                 return 0;
1911         }
1912
1913         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1914
1915         dir_nodeid = dlm_dir_nodeid(r);
1916
1917         if (dir_nodeid != our_nodeid) {
1918                 r->res_first_lkid = lkb->lkb_id;
1919                 send_lookup(r, lkb);
1920                 return 1;
1921         }
1922
1923         for (i = 0; i < 2; i++) {
1924                 /* It's possible for dlm_scand to remove an old rsb for
1925                    this same resource from the toss list, us to create
1926                    a new one, look up the master locally, and find it
1927                    already exists just before dlm_scand does the
1928                    dir_remove() on the previous rsb. */
1929
1930                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1931                                        r->res_length, &ret_nodeid);
1932                 if (!error)
1933                         break;
1934                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1935                 schedule();
1936         }
1937         if (error && error != -EEXIST)
1938                 return error;
1939
1940         if (ret_nodeid == our_nodeid) {
1941                 r->res_first_lkid = 0;
1942                 r->res_nodeid = 0;
1943                 lkb->lkb_nodeid = 0;
1944         } else {
1945                 r->res_first_lkid = lkb->lkb_id;
1946                 r->res_nodeid = ret_nodeid;
1947                 lkb->lkb_nodeid = ret_nodeid;
1948         }
1949         return 0;
1950 }
1951
1952 static void process_lookup_list(struct dlm_rsb *r)
1953 {
1954         struct dlm_lkb *lkb, *safe;
1955
1956         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1957                 list_del_init(&lkb->lkb_rsb_lookup);
1958                 _request_lock(r, lkb);
1959                 schedule();
1960         }
1961 }
1962
1963 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1964
1965 static void confirm_master(struct dlm_rsb *r, int error)
1966 {
1967         struct dlm_lkb *lkb;
1968
1969         if (!r->res_first_lkid)
1970                 return;
1971
1972         switch (error) {
1973         case 0:
1974         case -EINPROGRESS:
1975                 r->res_first_lkid = 0;
1976                 process_lookup_list(r);
1977                 break;
1978
1979         case -EAGAIN:
1980         case -EBADR:
1981         case -ENOTBLK:
1982                 /* the remote request failed and won't be retried (it was
1983                    a NOQUEUE, or has been canceled/unlocked); make a waiting
1984                    lkb the first_lkid */
1985
1986                 r->res_first_lkid = 0;
1987
1988                 if (!list_empty(&r->res_lookup)) {
1989                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1990                                          lkb_rsb_lookup);
1991                         list_del_init(&lkb->lkb_rsb_lookup);
1992                         r->res_first_lkid = lkb->lkb_id;
1993                         _request_lock(r, lkb);
1994                 }
1995                 break;
1996
1997         default:
1998                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1999         }
2000 }
2001
2002 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2003                          int namelen, unsigned long timeout_cs,
2004                          void (*ast) (void *astparam),
2005                          void *astparam,
2006                          void (*bast) (void *astparam, int mode),
2007                          struct dlm_args *args)
2008 {
2009         int rv = -EINVAL;
2010
2011         /* check for invalid arg usage */
2012
2013         if (mode < 0 || mode > DLM_LOCK_EX)
2014                 goto out;
2015
2016         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2017                 goto out;
2018
2019         if (flags & DLM_LKF_CANCEL)
2020                 goto out;
2021
2022         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2023                 goto out;
2024
2025         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2026                 goto out;
2027
2028         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2029                 goto out;
2030
2031         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2032                 goto out;
2033
2034         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2035                 goto out;
2036
2037         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2038                 goto out;
2039
2040         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2041                 goto out;
2042
2043         if (!ast || !lksb)
2044                 goto out;
2045
2046         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2047                 goto out;
2048
2049         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2050                 goto out;
2051
2052         /* these args will be copied to the lkb in validate_lock_args,
2053            it cannot be done now because when converting locks, fields in
2054            an active lkb cannot be modified before locking the rsb */
2055
2056         args->flags = flags;
2057         args->astfn = ast;
2058         args->astparam = astparam;
2059         args->bastfn = bast;
2060         args->timeout = timeout_cs;
2061         args->mode = mode;
2062         args->lksb = lksb;
2063         rv = 0;
2064  out:
2065         return rv;
2066 }
2067
2068 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2069 {
2070         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2071                       DLM_LKF_FORCEUNLOCK))
2072                 return -EINVAL;
2073
2074         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2075                 return -EINVAL;
2076
2077         args->flags = flags;
2078         args->astparam = astarg;
2079         return 0;
2080 }
2081
2082 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2083                               struct dlm_args *args)
2084 {
2085         int rv = -EINVAL;
2086
2087         if (args->flags & DLM_LKF_CONVERT) {
2088                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2089                         goto out;
2090
2091                 if (args->flags & DLM_LKF_QUECVT &&
2092                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2093                         goto out;
2094
2095                 rv = -EBUSY;
2096                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2097                         goto out;
2098
2099                 if (lkb->lkb_wait_type)
2100                         goto out;
2101
2102                 if (is_overlap(lkb))
2103                         goto out;
2104         }
2105
2106         lkb->lkb_exflags = args->flags;
2107         lkb->lkb_sbflags = 0;
2108         lkb->lkb_astfn = args->astfn;
2109         lkb->lkb_astparam = args->astparam;
2110         lkb->lkb_bastfn = args->bastfn;
2111         lkb->lkb_rqmode = args->mode;
2112         lkb->lkb_lksb = args->lksb;
2113         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2114         lkb->lkb_ownpid = (int) current->pid;
2115         lkb->lkb_timeout_cs = args->timeout;
2116         rv = 0;
2117  out:
2118         if (rv)
2119                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2120                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2121                           lkb->lkb_status, lkb->lkb_wait_type,
2122                           lkb->lkb_resource->res_name);
2123         return rv;
2124 }
2125
2126 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2127    for success */
2128
2129 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2130    because there may be a lookup in progress and it's valid to do
2131    cancel/unlockf on it */
2132
2133 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2134 {
2135         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2136         int rv = -EINVAL;
2137
2138         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2139                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2140                 dlm_print_lkb(lkb);
2141                 goto out;
2142         }
2143
2144         /* an lkb may still exist even though the lock is EOL'ed due to a
2145            cancel, unlock or failed noqueue request; an app can't use these
2146            locks; return same error as if the lkid had not been found at all */
2147
2148         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2149                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2150                 rv = -ENOENT;
2151                 goto out;
2152         }
2153
2154         /* an lkb may be waiting for an rsb lookup to complete where the
2155            lookup was initiated by another lock */
2156
2157         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2158                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2159                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2160                         list_del_init(&lkb->lkb_rsb_lookup);
2161                         queue_cast(lkb->lkb_resource, lkb,
2162                                    args->flags & DLM_LKF_CANCEL ?
2163                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2164                         unhold_lkb(lkb); /* undoes create_lkb() */
2165                 }
2166                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2167                 rv = -EBUSY;
2168                 goto out;
2169         }
2170
2171         /* cancel not allowed with another cancel/unlock in progress */
2172
2173         if (args->flags & DLM_LKF_CANCEL) {
2174                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2175                         goto out;
2176
2177                 if (is_overlap(lkb))
2178                         goto out;
2179
2180                 /* don't let scand try to do a cancel */
2181                 del_timeout(lkb);
2182
2183                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2184                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2185                         rv = -EBUSY;
2186                         goto out;
2187                 }
2188
2189                 switch (lkb->lkb_wait_type) {
2190                 case DLM_MSG_LOOKUP:
2191                 case DLM_MSG_REQUEST:
2192                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2193                         rv = -EBUSY;
2194                         goto out;
2195                 case DLM_MSG_UNLOCK:
2196                 case DLM_MSG_CANCEL:
2197                         goto out;
2198                 }
2199                 /* add_to_waiters() will set OVERLAP_CANCEL */
2200                 goto out_ok;
2201         }
2202
2203         /* do we need to allow a force-unlock if there's a normal unlock
2204            already in progress?  in what conditions could the normal unlock
2205            fail such that we'd want to send a force-unlock to be sure? */
2206
2207         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2208                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2209                         goto out;
2210
2211                 if (is_overlap_unlock(lkb))
2212                         goto out;
2213
2214                 /* don't let scand try to do a cancel */
2215                 del_timeout(lkb);
2216
2217                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2218                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2219                         rv = -EBUSY;
2220                         goto out;
2221                 }
2222
2223                 switch (lkb->lkb_wait_type) {
2224                 case DLM_MSG_LOOKUP:
2225                 case DLM_MSG_REQUEST:
2226                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2227                         rv = -EBUSY;
2228                         goto out;
2229                 case DLM_MSG_UNLOCK:
2230                         goto out;
2231                 }
2232                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2233                 goto out_ok;
2234         }
2235
2236         /* normal unlock not allowed if there's any op in progress */
2237         rv = -EBUSY;
2238         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2239                 goto out;
2240
2241  out_ok:
2242         /* an overlapping op shouldn't blow away exflags from other op */
2243         lkb->lkb_exflags |= args->flags;
2244         lkb->lkb_sbflags = 0;
2245         lkb->lkb_astparam = args->astparam;
2246         rv = 0;
2247  out:
2248         if (rv)
2249                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2250                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2251                           args->flags, lkb->lkb_wait_type,
2252                           lkb->lkb_resource->res_name);
2253         return rv;
2254 }
2255
2256 /*
2257  * Four stage 4 varieties:
2258  * do_request(), do_convert(), do_unlock(), do_cancel()
2259  * These are called on the master node for the given lock and
2260  * from the central locking logic.
2261  */
2262
2263 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2264 {
2265         int error = 0;
2266
2267         if (can_be_granted(r, lkb, 1, NULL)) {
2268                 grant_lock(r, lkb);
2269                 queue_cast(r, lkb, 0);
2270                 goto out;
2271         }
2272
2273         if (can_be_queued(lkb)) {
2274                 error = -EINPROGRESS;
2275                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2276                 send_blocking_asts(r, lkb);
2277                 add_timeout(lkb);
2278                 goto out;
2279         }
2280
2281         error = -EAGAIN;
2282         if (force_blocking_asts(lkb))
2283                 send_blocking_asts_all(r, lkb);
2284         queue_cast(r, lkb, -EAGAIN);
2285
2286  out:
2287         return error;
2288 }
2289
2290 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2291 {
2292         int error = 0;
2293         int deadlk = 0;
2294
2295         /* changing an existing lock may allow others to be granted */
2296
2297         if (can_be_granted(r, lkb, 1, &deadlk)) {
2298                 grant_lock(r, lkb);
2299                 queue_cast(r, lkb, 0);
2300                 grant_pending_locks(r);
2301                 goto out;
2302         }
2303
2304         /* can_be_granted() detected that this lock would block in a conversion
2305            deadlock, so we leave it on the granted queue and return EDEADLK in
2306            the ast for the convert. */
2307
2308         if (deadlk) {
2309                 /* it's left on the granted queue */
2310                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2311                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2312                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2313                 revert_lock(r, lkb);
2314                 queue_cast(r, lkb, -EDEADLK);
2315                 error = -EDEADLK;
2316                 goto out;
2317         }
2318
2319         /* is_demoted() means the can_be_granted() above set the grmode
2320            to NL, and left us on the granted queue.  This auto-demotion
2321            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2322            now grantable.  We have to try to grant other converting locks
2323            before we try again to grant this one. */
2324
2325         if (is_demoted(lkb)) {
2326                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2327                 if (_can_be_granted(r, lkb, 1)) {
2328                         grant_lock(r, lkb);
2329                         queue_cast(r, lkb, 0);
2330                         grant_pending_locks(r);
2331                         goto out;
2332                 }
2333                 /* else fall through and move to convert queue */
2334         }
2335
2336         if (can_be_queued(lkb)) {
2337                 error = -EINPROGRESS;
2338                 del_lkb(r, lkb);
2339                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2340                 send_blocking_asts(r, lkb);
2341                 add_timeout(lkb);
2342                 goto out;
2343         }
2344
2345         error = -EAGAIN;
2346         if (force_blocking_asts(lkb))
2347                 send_blocking_asts_all(r, lkb);
2348         queue_cast(r, lkb, -EAGAIN);
2349
2350  out:
2351         return error;
2352 }
2353
2354 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2355 {
2356         remove_lock(r, lkb);
2357         queue_cast(r, lkb, -DLM_EUNLOCK);
2358         grant_pending_locks(r);
2359         return -DLM_EUNLOCK;
2360 }
2361
2362 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2363  
2364 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2365 {
2366         int error;
2367
2368         error = revert_lock(r, lkb);
2369         if (error) {
2370                 queue_cast(r, lkb, -DLM_ECANCEL);
2371                 grant_pending_locks(r);
2372                 return -DLM_ECANCEL;
2373         }
2374         return 0;
2375 }
2376
2377 /*
2378  * Four stage 3 varieties:
2379  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2380  */
2381
2382 /* add a new lkb to a possibly new rsb, called by requesting process */
2383
2384 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2385 {
2386         int error;
2387
2388         /* set_master: sets lkb nodeid from r */
2389
2390         error = set_master(r, lkb);
2391         if (error < 0)
2392                 goto out;
2393         if (error) {
2394                 error = 0;
2395                 goto out;
2396         }
2397
2398         if (is_remote(r))
2399                 /* receive_request() calls do_request() on remote node */
2400                 error = send_request(r, lkb);
2401         else
2402                 error = do_request(r, lkb);
2403  out:
2404         return error;
2405 }
2406
2407 /* change some property of an existing lkb, e.g. mode */
2408
2409 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2410 {
2411         int error;
2412
2413         if (is_remote(r))
2414                 /* receive_convert() calls do_convert() on remote node */
2415                 error = send_convert(r, lkb);
2416         else
2417                 error = do_convert(r, lkb);
2418
2419         return error;
2420 }
2421
2422 /* remove an existing lkb from the granted queue */
2423
2424 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2425 {
2426         int error;
2427
2428         if (is_remote(r))
2429                 /* receive_unlock() calls do_unlock() on remote node */
2430                 error = send_unlock(r, lkb);
2431         else
2432                 error = do_unlock(r, lkb);
2433
2434         return error;
2435 }
2436
2437 /* remove an existing lkb from the convert or wait queue */
2438
2439 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2440 {
2441         int error;
2442
2443         if (is_remote(r))
2444                 /* receive_cancel() calls do_cancel() on remote node */
2445                 error = send_cancel(r, lkb);
2446         else
2447                 error = do_cancel(r, lkb);
2448
2449         return error;
2450 }
2451
2452 /*
2453  * Four stage 2 varieties:
2454  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2455  */
2456
2457 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2458                         int len, struct dlm_args *args)
2459 {
2460         struct dlm_rsb *r;
2461         int error;
2462
2463         error = validate_lock_args(ls, lkb, args);
2464         if (error)
2465                 goto out;
2466
2467         error = find_rsb(ls, name, len, R_CREATE, &r);
2468         if (error)
2469                 goto out;
2470
2471         lock_rsb(r);
2472
2473         attach_lkb(r, lkb);
2474         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2475
2476         error = _request_lock(r, lkb);
2477
2478         unlock_rsb(r);
2479         put_rsb(r);
2480
2481  out:
2482         return error;
2483 }
2484
2485 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2486                         struct dlm_args *args)
2487 {
2488         struct dlm_rsb *r;
2489         int error;
2490
2491         r = lkb->lkb_resource;
2492
2493         hold_rsb(r);
2494         lock_rsb(r);
2495
2496         error = validate_lock_args(ls, lkb, args);
2497         if (error)
2498                 goto out;
2499
2500         error = _convert_lock(r, lkb);
2501  out:
2502         unlock_rsb(r);
2503         put_rsb(r);
2504         return error;
2505 }
2506
2507 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2508                        struct dlm_args *args)
2509 {
2510         struct dlm_rsb *r;
2511         int error;
2512
2513         r = lkb->lkb_resource;
2514
2515         hold_rsb(r);
2516         lock_rsb(r);
2517
2518         error = validate_unlock_args(lkb, args);
2519         if (error)
2520                 goto out;
2521
2522         error = _unlock_lock(r, lkb);
2523  out:
2524         unlock_rsb(r);
2525         put_rsb(r);
2526         return error;
2527 }
2528
2529 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2530                        struct dlm_args *args)
2531 {
2532         struct dlm_rsb *r;
2533         int error;
2534
2535         r = lkb->lkb_resource;
2536
2537         hold_rsb(r);
2538         lock_rsb(r);
2539
2540         error = validate_unlock_args(lkb, args);
2541         if (error)
2542                 goto out;
2543
2544         error = _cancel_lock(r, lkb);
2545  out:
2546         unlock_rsb(r);
2547         put_rsb(r);
2548         return error;
2549 }
2550
2551 /*
2552  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2553  */
2554
2555 int dlm_lock(dlm_lockspace_t *lockspace,
2556              int mode,
2557              struct dlm_lksb *lksb,
2558              uint32_t flags,
2559              void *name,
2560              unsigned int namelen,
2561              uint32_t parent_lkid,
2562              void (*ast) (void *astarg),
2563              void *astarg,
2564              void (*bast) (void *astarg, int mode))
2565 {
2566         struct dlm_ls *ls;
2567         struct dlm_lkb *lkb;
2568         struct dlm_args args;
2569         int error, convert = flags & DLM_LKF_CONVERT;
2570
2571         ls = dlm_find_lockspace_local(lockspace);
2572         if (!ls)
2573                 return -EINVAL;
2574
2575         dlm_lock_recovery(ls);
2576
2577         if (convert)
2578                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2579         else
2580                 error = create_lkb(ls, &lkb);
2581
2582         if (error)
2583                 goto out;
2584
2585         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2586                               astarg, bast, &args);
2587         if (error)
2588                 goto out_put;
2589
2590         if (convert)
2591                 error = convert_lock(ls, lkb, &args);
2592         else
2593                 error = request_lock(ls, lkb, name, namelen, &args);
2594
2595         if (error == -EINPROGRESS)
2596                 error = 0;
2597  out_put:
2598         if (convert || error)
2599                 __put_lkb(ls, lkb);
2600         if (error == -EAGAIN || error == -EDEADLK)
2601                 error = 0;
2602  out:
2603         dlm_unlock_recovery(ls);
2604         dlm_put_lockspace(ls);
2605         return error;
2606 }
2607
2608 int dlm_unlock(dlm_lockspace_t *lockspace,
2609                uint32_t lkid,
2610                uint32_t flags,
2611                struct dlm_lksb *lksb,
2612                void *astarg)
2613 {
2614         struct dlm_ls *ls;
2615         struct dlm_lkb *lkb;
2616         struct dlm_args args;
2617         int error;
2618
2619         ls = dlm_find_lockspace_local(lockspace);
2620         if (!ls)
2621                 return -EINVAL;
2622
2623         dlm_lock_recovery(ls);
2624
2625         error = find_lkb(ls, lkid, &lkb);
2626         if (error)
2627                 goto out;
2628
2629         error = set_unlock_args(flags, astarg, &args);
2630         if (error)
2631                 goto out_put;
2632
2633         if (flags & DLM_LKF_CANCEL)
2634                 error = cancel_lock(ls, lkb, &args);
2635         else
2636                 error = unlock_lock(ls, lkb, &args);
2637
2638         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2639                 error = 0;
2640         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2641                 error = 0;
2642  out_put:
2643         dlm_put_lkb(lkb);
2644  out:
2645         dlm_unlock_recovery(ls);
2646         dlm_put_lockspace(ls);
2647         return error;
2648 }
2649
2650 /*
2651  * send/receive routines for remote operations and replies
2652  *
2653  * send_args
2654  * send_common
2655  * send_request                 receive_request
2656  * send_convert                 receive_convert
2657  * send_unlock                  receive_unlock
2658  * send_cancel                  receive_cancel
2659  * send_grant                   receive_grant
2660  * send_bast                    receive_bast
2661  * send_lookup                  receive_lookup
2662  * send_remove                  receive_remove
2663  *
2664  *                              send_common_reply
2665  * receive_request_reply        send_request_reply
2666  * receive_convert_reply        send_convert_reply
2667  * receive_unlock_reply         send_unlock_reply
2668  * receive_cancel_reply         send_cancel_reply
2669  * receive_lookup_reply         send_lookup_reply
2670  */
2671
2672 static int _create_message(struct dlm_ls *ls, int mb_len,
2673                            int to_nodeid, int mstype,
2674                            struct dlm_message **ms_ret,
2675                            struct dlm_mhandle **mh_ret)
2676 {
2677         struct dlm_message *ms;
2678         struct dlm_mhandle *mh;
2679         char *mb;
2680
2681         /* get_buffer gives us a message handle (mh) that we need to
2682            pass into lowcomms_commit and a message buffer (mb) that we
2683            write our data into */
2684
2685         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2686         if (!mh)
2687                 return -ENOBUFS;
2688
2689         memset(mb, 0, mb_len);
2690
2691         ms = (struct dlm_message *) mb;
2692
2693         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2694         ms->m_header.h_lockspace = ls->ls_global_id;
2695         ms->m_header.h_nodeid = dlm_our_nodeid();
2696         ms->m_header.h_length = mb_len;
2697         ms->m_header.h_cmd = DLM_MSG;
2698
2699         ms->m_type = mstype;
2700
2701         *mh_ret = mh;
2702         *ms_ret = ms;
2703         return 0;
2704 }
2705
2706 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2707                           int to_nodeid, int mstype,
2708                           struct dlm_message **ms_ret,
2709                           struct dlm_mhandle **mh_ret)
2710 {
2711         int mb_len = sizeof(struct dlm_message);
2712
2713         switch (mstype) {
2714         case DLM_MSG_REQUEST:
2715         case DLM_MSG_LOOKUP:
2716         case DLM_MSG_REMOVE:
2717                 mb_len += r->res_length;
2718                 break;
2719         case DLM_MSG_CONVERT:
2720         case DLM_MSG_UNLOCK:
2721         case DLM_MSG_REQUEST_REPLY:
2722         case DLM_MSG_CONVERT_REPLY:
2723         case DLM_MSG_GRANT:
2724                 if (lkb && lkb->lkb_lvbptr)
2725                         mb_len += r->res_ls->ls_lvblen;
2726                 break;
2727         }
2728
2729         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2730                                ms_ret, mh_ret);
2731 }
2732
2733 /* further lowcomms enhancements or alternate implementations may make
2734    the return value from this function useful at some point */
2735
2736 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2737 {
2738         dlm_message_out(ms);
2739         dlm_lowcomms_commit_buffer(mh);
2740         return 0;
2741 }
2742
2743 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2744                       struct dlm_message *ms)
2745 {
2746         ms->m_nodeid   = lkb->lkb_nodeid;
2747         ms->m_pid      = lkb->lkb_ownpid;
2748         ms->m_lkid     = lkb->lkb_id;
2749         ms->m_remid    = lkb->lkb_remid;
2750         ms->m_exflags  = lkb->lkb_exflags;
2751         ms->m_sbflags  = lkb->lkb_sbflags;
2752         ms->m_flags    = lkb->lkb_flags;
2753         ms->m_lvbseq   = lkb->lkb_lvbseq;
2754         ms->m_status   = lkb->lkb_status;
2755         ms->m_grmode   = lkb->lkb_grmode;
2756         ms->m_rqmode   = lkb->lkb_rqmode;
2757         ms->m_hash     = r->res_hash;
2758
2759         /* m_result and m_bastmode are set from function args,
2760            not from lkb fields */
2761
2762         if (lkb->lkb_bastfn)
2763                 ms->m_asts |= AST_BAST;
2764         if (lkb->lkb_astfn)
2765                 ms->m_asts |= AST_COMP;
2766
2767         /* compare with switch in create_message; send_remove() doesn't
2768            use send_args() */
2769
2770         switch (ms->m_type) {
2771         case DLM_MSG_REQUEST:
2772         case DLM_MSG_LOOKUP:
2773                 memcpy(ms->m_extra, r->res_name, r->res_length);
2774                 break;
2775         case DLM_MSG_CONVERT:
2776         case DLM_MSG_UNLOCK:
2777         case DLM_MSG_REQUEST_REPLY:
2778         case DLM_MSG_CONVERT_REPLY:
2779         case DLM_MSG_GRANT:
2780                 if (!lkb->lkb_lvbptr)
2781                         break;
2782                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2783                 break;
2784         }
2785 }
2786
2787 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2788 {
2789         struct dlm_message *ms;
2790         struct dlm_mhandle *mh;
2791         int to_nodeid, error;
2792
2793         error = add_to_waiters(lkb, mstype);
2794         if (error)
2795                 return error;
2796
2797         to_nodeid = r->res_nodeid;
2798
2799         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2800         if (error)
2801                 goto fail;
2802
2803         send_args(r, lkb, ms);
2804
2805         error = send_message(mh, ms);
2806         if (error)
2807                 goto fail;
2808         return 0;
2809
2810  fail:
2811         remove_from_waiters(lkb, msg_reply_type(mstype));
2812         return error;
2813 }
2814
2815 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2816 {
2817         return send_common(r, lkb, DLM_MSG_REQUEST);
2818 }
2819
2820 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2821 {
2822         int error;
2823
2824         error = send_common(r, lkb, DLM_MSG_CONVERT);
2825
2826         /* down conversions go without a reply from the master */
2827         if (!error && down_conversion(lkb)) {
2828                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2829                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2830                 r->res_ls->ls_stub_ms.m_result = 0;
2831                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2832                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2833         }
2834
2835         return error;
2836 }
2837
2838 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2839    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2840    that the master is still correct. */
2841
2842 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2843 {
2844         return send_common(r, lkb, DLM_MSG_UNLOCK);
2845 }
2846
2847 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2848 {
2849         return send_common(r, lkb, DLM_MSG_CANCEL);
2850 }
2851
2852 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2853 {
2854         struct dlm_message *ms;
2855         struct dlm_mhandle *mh;
2856         int to_nodeid, error;
2857
2858         to_nodeid = lkb->lkb_nodeid;
2859
2860         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2861         if (error)
2862                 goto out;
2863
2864         send_args(r, lkb, ms);
2865
2866         ms->m_result = 0;
2867
2868         error = send_message(mh, ms);
2869  out:
2870         return error;
2871 }
2872
2873 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2874 {
2875         struct dlm_message *ms;
2876         struct dlm_mhandle *mh;
2877         int to_nodeid, error;
2878
2879         to_nodeid = lkb->lkb_nodeid;
2880
2881         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2882         if (error)
2883                 goto out;
2884
2885         send_args(r, lkb, ms);
2886
2887         ms->m_bastmode = mode;
2888
2889         error = send_message(mh, ms);
2890  out:
2891         return error;
2892 }
2893
2894 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2895 {
2896         struct dlm_message *ms;
2897         struct dlm_mhandle *mh;
2898         int to_nodeid, error;
2899
2900         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2901         if (error)
2902                 return error;
2903
2904         to_nodeid = dlm_dir_nodeid(r);
2905
2906         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2907         if (error)
2908                 goto fail;
2909
2910         send_args(r, lkb, ms);
2911
2912         error = send_message(mh, ms);
2913         if (error)
2914                 goto fail;
2915         return 0;
2916
2917  fail:
2918         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2919         return error;
2920 }
2921
2922 static int send_remove(struct dlm_rsb *r)
2923 {
2924         struct dlm_message *ms;
2925         struct dlm_mhandle *mh;
2926         int to_nodeid, error;
2927
2928         to_nodeid = dlm_dir_nodeid(r);
2929
2930         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2931         if (error)
2932                 goto out;
2933
2934         memcpy(ms->m_extra, r->res_name, r->res_length);
2935         ms->m_hash = r->res_hash;
2936
2937         error = send_message(mh, ms);
2938  out:
2939         return error;
2940 }
2941
2942 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2943                              int mstype, int rv)
2944 {
2945         struct dlm_message *ms;
2946         struct dlm_mhandle *mh;
2947         int to_nodeid, error;
2948
2949         to_nodeid = lkb->lkb_nodeid;
2950
2951         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2952         if (error)
2953                 goto out;
2954
2955         send_args(r, lkb, ms);
2956
2957         ms->m_result = rv;
2958
2959         error = send_message(mh, ms);
2960  out:
2961         return error;
2962 }
2963
2964 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2965 {
2966         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2967 }
2968
2969 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2970 {
2971         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2972 }
2973
2974 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2975 {
2976         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2977 }
2978
2979 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2980 {
2981         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2982 }
2983
2984 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2985                              int ret_nodeid, int rv)
2986 {
2987         struct dlm_rsb *r = &ls->ls_stub_rsb;
2988         struct dlm_message *ms;
2989         struct dlm_mhandle *mh;
2990         int error, nodeid = ms_in->m_header.h_nodeid;
2991
2992         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2993         if (error)
2994                 goto out;
2995
2996         ms->m_lkid = ms_in->m_lkid;
2997         ms->m_result = rv;
2998         ms->m_nodeid = ret_nodeid;
2999
3000         error = send_message(mh, ms);
3001  out:
3002         return error;
3003 }
3004
3005 /* which args we save from a received message depends heavily on the type
3006    of message, unlike the send side where we can safely send everything about
3007    the lkb for any type of message */
3008
3009 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3010 {
3011         lkb->lkb_exflags = ms->m_exflags;
3012         lkb->lkb_sbflags = ms->m_sbflags;
3013         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3014                          (ms->m_flags & 0x0000FFFF);
3015 }
3016
3017 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3018 {
3019         lkb->lkb_sbflags = ms->m_sbflags;
3020         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3021                          (ms->m_flags & 0x0000FFFF);
3022 }
3023
3024 static int receive_extralen(struct dlm_message *ms)
3025 {
3026         return (ms->m_header.h_length - sizeof(struct dlm_message));
3027 }
3028
3029 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3030                        struct dlm_message *ms)
3031 {
3032         int len;
3033
3034         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3035                 if (!lkb->lkb_lvbptr)
3036                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3037                 if (!lkb->lkb_lvbptr)
3038                         return -ENOMEM;
3039                 len = receive_extralen(ms);
3040                 if (len > DLM_RESNAME_MAXLEN)
3041                         len = DLM_RESNAME_MAXLEN;
3042                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3043         }
3044         return 0;
3045 }
3046
3047 static void fake_bastfn(void *astparam, int mode)
3048 {
3049         log_print("fake_bastfn should not be called");
3050 }
3051
3052 static void fake_astfn(void *astparam)
3053 {
3054         log_print("fake_astfn should not be called");
3055 }
3056
3057 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3058                                 struct dlm_message *ms)
3059 {
3060         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3061         lkb->lkb_ownpid = ms->m_pid;
3062         lkb->lkb_remid = ms->m_lkid;
3063         lkb->lkb_grmode = DLM_LOCK_IV;
3064         lkb->lkb_rqmode = ms->m_rqmode;
3065
3066         lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3067         lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3068
3069         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3070                 /* lkb was just created so there won't be an lvb yet */
3071                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3072                 if (!lkb->lkb_lvbptr)
3073                         return -ENOMEM;
3074         }
3075
3076         return 0;
3077 }
3078
3079 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3080                                 struct dlm_message *ms)
3081 {
3082         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3083                 return -EBUSY;
3084
3085         if (receive_lvb(ls, lkb, ms))
3086                 return -ENOMEM;
3087
3088         lkb->lkb_rqmode = ms->m_rqmode;
3089         lkb->lkb_lvbseq = ms->m_lvbseq;
3090
3091         return 0;
3092 }
3093
3094 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3095                                struct dlm_message *ms)
3096 {
3097         if (receive_lvb(ls, lkb, ms))
3098                 return -ENOMEM;
3099         return 0;
3100 }
3101
3102 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3103    uses to send a reply and that the remote end uses to process the reply. */
3104
3105 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3106 {
3107         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3108         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3109         lkb->lkb_remid = ms->m_lkid;
3110 }
3111
3112 /* This is called after the rsb is locked so that we can safely inspect
3113    fields in the lkb. */
3114
3115 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3116 {
3117         int from = ms->m_header.h_nodeid;
3118         int error = 0;
3119
3120         switch (ms->m_type) {
3121         case DLM_MSG_CONVERT:
3122         case DLM_MSG_UNLOCK:
3123         case DLM_MSG_CANCEL:
3124                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3125                         error = -EINVAL;
3126                 break;
3127
3128         case DLM_MSG_CONVERT_REPLY:
3129         case DLM_MSG_UNLOCK_REPLY:
3130         case DLM_MSG_CANCEL_REPLY:
3131         case DLM_MSG_GRANT:
3132         case DLM_MSG_BAST:
3133                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3134                         error = -EINVAL;
3135                 break;
3136
3137         case DLM_MSG_REQUEST_REPLY:
3138                 if (!is_process_copy(lkb))
3139                         error = -EINVAL;
3140                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3141                         error = -EINVAL;
3142                 break;
3143
3144         default:
3145                 error = -EINVAL;
3146         }
3147
3148         if (error)
3149                 log_error(lkb->lkb_resource->res_ls,
3150                           "ignore invalid message %d from %d %x %x %x %d",
3151                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3152                           lkb->lkb_flags, lkb->lkb_nodeid);
3153         return error;
3154 }
3155
3156 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3157 {
3158         struct dlm_lkb *lkb;
3159         struct dlm_rsb *r;
3160         int error, namelen;
3161
3162         error = create_lkb(ls, &lkb);
3163         if (error)
3164                 goto fail;
3165
3166         receive_flags(lkb, ms);
3167         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3168         error = receive_request_args(ls, lkb, ms);
3169         if (error) {
3170                 __put_lkb(ls, lkb);
3171                 goto fail;
3172         }
3173
3174         namelen = receive_extralen(ms);
3175
3176         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3177         if (error) {
3178                 __put_lkb(ls, lkb);
3179                 goto fail;
3180         }
3181
3182         lock_rsb(r);
3183
3184         attach_lkb(r, lkb);
3185         error = do_request(r, lkb);
3186         send_request_reply(r, lkb, error);
3187
3188         unlock_rsb(r);
3189         put_rsb(r);
3190
3191         if (error == -EINPROGRESS)
3192                 error = 0;
3193         if (error)
3194                 dlm_put_lkb(lkb);
3195         return;
3196
3197  fail:
3198         setup_stub_lkb(ls, ms);
3199         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3200 }
3201
3202 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3203 {
3204         struct dlm_lkb *lkb;
3205         struct dlm_rsb *r;
3206         int error, reply = 1;
3207
3208         error = find_lkb(ls, ms->m_remid, &lkb);
3209         if (error)
3210                 goto fail;
3211
3212         r = lkb->lkb_resource;
3213
3214         hold_rsb(r);
3215         lock_rsb(r);
3216
3217         error = validate_message(lkb, ms);
3218         if (error)
3219                 goto out;
3220
3221         receive_flags(lkb, ms);
3222         error = receive_convert_args(ls, lkb, ms);
3223         if (error)
3224                 goto out_reply;
3225         reply = !down_conversion(lkb);
3226
3227         error = do_convert(r, lkb);
3228  out_reply:
3229         if (reply)
3230                 send_convert_reply(r, lkb, error);
3231  out:
3232         unlock_rsb(r);
3233         put_rsb(r);
3234         dlm_put_lkb(lkb);
3235         return;
3236
3237  fail:
3238         setup_stub_lkb(ls, ms);
3239         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3240 }
3241
3242 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3243 {
3244         struct dlm_lkb *lkb;
3245         struct dlm_rsb *r;
3246         int error;
3247
3248         error = find_lkb(ls, ms->m_remid, &lkb);
3249         if (error)
3250                 goto fail;
3251
3252         r = lkb->lkb_resource;
3253
3254         hold_rsb(r);
3255         lock_rsb(r);
3256
3257         error = validate_message(lkb, ms);
3258         if (error)
3259                 goto out;
3260
3261         receive_flags(lkb, ms);
3262         error = receive_unlock_args(ls, lkb, ms);
3263         if (error)
3264                 goto out_reply;
3265
3266         error = do_unlock(r, lkb);
3267  out_reply:
3268         send_unlock_reply(r, lkb, error);
3269  out:
3270         unlock_rsb(r);
3271         put_rsb(r);
3272         dlm_put_lkb(lkb);
3273         return;
3274
3275  fail:
3276         setup_stub_lkb(ls, ms);
3277         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3278 }
3279
3280 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3281 {
3282         struct dlm_lkb *lkb;
3283         struct dlm_rsb *r;
3284         int error;
3285
3286         error = find_lkb(ls, ms->m_remid, &lkb);
3287         if (error)
3288                 goto fail;
3289
3290         receive_flags(lkb, ms);
3291
3292         r = lkb->lkb_resource;
3293
3294         hold_rsb(r);
3295         lock_rsb(r);
3296
3297         error = validate_message(lkb, ms);
3298         if (error)
3299                 goto out;
3300
3301         error = do_cancel(r, lkb);
3302         send_cancel_reply(r, lkb, error);
3303  out:
3304         unlock_rsb(r);
3305         put_rsb(r);
3306         dlm_put_lkb(lkb);
3307         return;
3308
3309  fail:
3310         setup_stub_lkb(ls, ms);
3311         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3312 }
3313
3314 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3315 {
3316         struct dlm_lkb *lkb;
3317         struct dlm_rsb *r;
3318         int error;
3319
3320         error = find_lkb(ls, ms->m_remid, &lkb);
3321         if (error) {
3322                 log_debug(ls, "receive_grant from %d no lkb %x",
3323                           ms->m_header.h_nodeid, ms->m_remid);
3324                 return;
3325         }
3326
3327         r = lkb->lkb_resource;
3328
3329         hold_rsb(r);
3330         lock_rsb(r);
3331
3332         error = validate_message(lkb, ms);
3333         if (error)
3334                 goto out;
3335
3336         receive_flags_reply(lkb, ms);
3337         if (is_altmode(lkb))
3338                 munge_altmode(lkb, ms);
3339         grant_lock_pc(r, lkb, ms);
3340         queue_cast(r, lkb, 0);
3341  out:
3342         unlock_rsb(r);
3343         put_rsb(r);
3344         dlm_put_lkb(lkb);
3345 }
3346
3347 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3348 {
3349         struct dlm_lkb *lkb;
3350         struct dlm_rsb *r;
3351         int error;
3352
3353         error = find_lkb(ls, ms->m_remid, &lkb);
3354         if (error) {
3355                 log_debug(ls, "receive_bast from %d no lkb %x",
3356                           ms->m_header.h_nodeid, ms->m_remid);
3357                 return;
3358         }
3359
3360         r = lkb->lkb_resource;
3361
3362         hold_rsb(r);
3363         lock_rsb(r);
3364
3365         error = validate_message(lkb, ms);
3366         if (error)
3367                 goto out;
3368
3369         queue_bast(r, lkb, ms->m_bastmode);
3370  out:
3371         unlock_rsb(r);
3372         put_rsb(r);
3373         dlm_put_lkb(lkb);
3374 }
3375
3376 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3377 {
3378         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3379
3380         from_nodeid = ms->m_header.h_nodeid;
3381         our_nodeid = dlm_our_nodeid();
3382
3383         len = receive_extralen(ms);
3384
3385         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3386         if (dir_nodeid != our_nodeid) {
3387                 log_error(ls, "lookup dir_nodeid %d from %d",
3388                           dir_nodeid, from_nodeid);
3389                 error = -EINVAL;
3390                 ret_nodeid = -1;
3391                 goto out;
3392         }
3393
3394         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3395
3396         /* Optimization: we're master so treat lookup as a request */
3397         if (!error && ret_nodeid == our_nodeid) {
3398                 receive_request(ls, ms);
3399                 return;
3400         }
3401  out:
3402         send_lookup_reply(ls, ms, ret_nodeid, error);
3403 }
3404
3405 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3406 {
3407         int len, dir_nodeid, from_nodeid;
3408
3409         from_nodeid = ms->m_header.h_nodeid;
3410
3411         len = receive_extralen(ms);
3412
3413         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3414         if (dir_nodeid != dlm_our_nodeid()) {
3415                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3416                           dir_nodeid, from_nodeid);
3417                 return;
3418         }
3419
3420         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3421 }
3422
3423 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3424 {
3425         do_purge(ls, ms->m_nodeid, ms->m_pid);
3426 }
3427
3428 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3429 {
3430         struct dlm_lkb *lkb;
3431         struct dlm_rsb *r;
3432         int error, mstype, result;
3433
3434         error = find_lkb(ls, ms->m_remid, &lkb);
3435         if (error) {
3436                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3437                           ms->m_header.h_nodeid, ms->m_remid);
3438                 return;
3439         }
3440
3441         r = lkb->lkb_resource;
3442         hold_rsb(r);
3443         lock_rsb(r);
3444
3445         error = validate_message(lkb, ms);
3446         if (error)
3447                 goto out;
3448
3449         mstype = lkb->lkb_wait_type;
3450         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3451         if (error)
3452                 goto out;
3453
3454         /* Optimization: the dir node was also the master, so it took our
3455            lookup as a request and sent request reply instead of lookup reply */
3456         if (mstype == DLM_MSG_LOOKUP) {
3457                 r->res_nodeid = ms->m_header.h_nodeid;
3458                 lkb->lkb_nodeid = r->res_nodeid;
3459         }
3460
3461         /* this is the value returned from do_request() on the master */
3462         result = ms->m_result;
3463
3464         switch (result) {
3465         case -EAGAIN:
3466                 /* request would block (be queued) on remote master */
3467                 queue_cast(r, lkb, -EAGAIN);
3468                 confirm_master(r, -EAGAIN);
3469                 unhold_lkb(lkb); /* undoes create_lkb() */
3470                 break;
3471
3472         case -EINPROGRESS:
3473         case 0:
3474                 /* request was queued or granted on remote master */
3475                 receive_flags_reply(lkb, ms);
3476                 lkb->lkb_remid = ms->m_lkid;
3477                 if (is_altmode(lkb))
3478                         munge_altmode(lkb, ms);
3479                 if (result) {
3480                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3481                         add_timeout(lkb);
3482                 } else {
3483                         grant_lock_pc(r, lkb, ms);
3484                         queue_cast(r, lkb, 0);
3485                 }
3486                 confirm_master(r, result);
3487                 break;
3488
3489         case -EBADR:
3490         case -ENOTBLK:
3491                 /* find_rsb failed to find rsb or rsb wasn't master */
3492                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3493                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3494                 r->res_nodeid = -1;
3495                 lkb->lkb_nodeid = -1;
3496
3497                 if (is_overlap(lkb)) {
3498                         /* we'll ignore error in cancel/unlock reply */
3499                         queue_cast_overlap(r, lkb);
3500                         confirm_master(r, result);
3501                         unhold_lkb(lkb); /* undoes create_lkb() */
3502                 } else
3503                         _request_lock(r, lkb);
3504                 break;
3505
3506         default:
3507                 log_error(ls, "receive_request_reply %x error %d",
3508                           lkb->lkb_id, result);
3509         }
3510
3511         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3512                 log_debug(ls, "receive_request_reply %x result %d unlock",
3513                           lkb->lkb_id, result);
3514                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3515                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3516                 send_unlock(r, lkb);
3517         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3518                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3519                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3520                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3521                 send_cancel(r, lkb);
3522         } else {
3523                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3524                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3525         }
3526  out:
3527         unlock_rsb(r);
3528         put_rsb(r);
3529         dlm_put_lkb(lkb);
3530 }
3531
3532 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3533                                     struct dlm_message *ms)
3534 {
3535         /* this is the value returned from do_convert() on the master */
3536         switch (ms->m_result) {
3537         case -EAGAIN:
3538                 /* convert would block (be queued) on remote master */
3539                 queue_cast(r, lkb, -EAGAIN);
3540                 break;
3541
3542         case -EDEADLK:
3543                 receive_flags_reply(lkb, ms);
3544                 revert_lock_pc(r, lkb);
3545                 queue_cast(r, lkb, -EDEADLK);
3546                 break;
3547
3548         case -EINPROGRESS:
3549                 /* convert was queued on remote master */
3550                 receive_flags_reply(lkb, ms);
3551                 if (is_demoted(lkb))
3552                         munge_demoted(lkb, ms);
3553                 del_lkb(r, lkb);
3554                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3555                 add_timeout(lkb);
3556                 break;
3557
3558         case 0:
3559                 /* convert was granted on remote master */
3560                 receive_flags_reply(lkb, ms);
3561                 if (is_demoted(lkb))
3562                         munge_demoted(lkb, ms);
3563                 grant_lock_pc(r, lkb, ms);
3564                 queue_cast(r, lkb, 0);
3565                 break;
3566
3567         default:
3568                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3569                           lkb->lkb_id, ms->m_result);
3570         }
3571 }
3572
3573 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3574 {
3575         struct dlm_rsb *r = lkb->lkb_resource;
3576         int error;
3577
3578         hold_rsb(r);
3579         lock_rsb(r);
3580
3581         error = validate_message(lkb, ms);
3582         if (error)
3583                 goto out;
3584
3585         /* stub reply can happen with waiters_mutex held */
3586         error = remove_from_waiters_ms(lkb, ms);
3587         if (error)
3588                 goto out;
3589
3590         __receive_convert_reply(r, lkb, ms);
3591  out:
3592         unlock_rsb(r);
3593         put_rsb(r);
3594 }
3595
3596 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3597 {
3598         struct dlm_lkb *lkb;
3599         int error;
3600
3601         error = find_lkb(ls, ms->m_remid, &lkb);
3602         if (error) {
3603                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3604                           ms->m_header.h_nodeid, ms->m_remid);
3605                 return;
3606         }
3607
3608         _receive_convert_reply(lkb, ms);
3609         dlm_put_lkb(lkb);
3610 }
3611
3612 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3613 {
3614         struct dlm_rsb *r = lkb->lkb_resource;
3615         int error;
3616
3617         hold_rsb(r);
3618         lock_rsb(r);
3619
3620         error = validate_message(lkb, ms);
3621         if (error)
3622                 goto out;
3623
3624         /* stub reply can happen with waiters_mutex held */
3625         error = remove_from_waiters_ms(lkb, ms);
3626         if (error)
3627                 goto out;
3628
3629         /* this is the value returned from do_unlock() on the master */
3630
3631         switch (ms->m_result) {
3632         case -DLM_EUNLOCK:
3633                 receive_flags_reply(lkb, ms);
3634                 remove_lock_pc(r, lkb);
3635                 queue_cast(r, lkb, -DLM_EUNLOCK);
3636                 break;
3637         case -ENOENT:
3638                 break;
3639         default:
3640                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3641                           lkb->lkb_id, ms->m_result);
3642         }
3643  out:
3644         unlock_rsb(r);
3645         put_rsb(r);
3646 }
3647
3648 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3649 {
3650         struct dlm_lkb *lkb;
3651         int error;
3652
3653         error = find_lkb(ls, ms->m_remid, &lkb);
3654         if (error) {
3655                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3656                           ms->m_header.h_nodeid, ms->m_remid);
3657                 return;
3658         }
3659
3660         _receive_unlock_reply(lkb, ms);
3661         dlm_put_lkb(lkb);
3662 }
3663
3664 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3665 {
3666         struct dlm_rsb *r = lkb->lkb_resource;
3667         int error;
3668
3669         hold_rsb(r);
3670         lock_rsb(r);
3671
3672         error = validate_message(lkb, ms);
3673         if (error)
3674                 goto out;
3675
3676         /* stub reply can happen with waiters_mutex held */
3677         error = remove_from_waiters_ms(lkb, ms);
3678         if (error)
3679                 goto out;
3680
3681         /* this is the value returned from do_cancel() on the master */
3682
3683         switch (ms->m_result) {
3684         case -DLM_ECANCEL:
3685                 receive_flags_reply(lkb, ms);
3686                 revert_lock_pc(r, lkb);
3687                 queue_cast(r, lkb, -DLM_ECANCEL);
3688                 break;
3689         case 0:
3690                 break;
3691         default:
3692                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3693                           lkb->lkb_id, ms->m_result);
3694         }
3695  out:
3696         unlock_rsb(r);
3697         put_rsb(r);
3698 }
3699
3700 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3701 {
3702         struct dlm_lkb *lkb;
3703         int error;
3704
3705         error = find_lkb(ls, ms->m_remid, &lkb);
3706         if (error) {
3707                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3708                           ms->m_header.h_nodeid, ms->m_remid);
3709                 return;
3710         }
3711
3712         _receive_cancel_reply(lkb, ms);
3713         dlm_put_lkb(lkb);
3714 }
3715
3716 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3717 {
3718         struct dlm_lkb *lkb;
3719         struct dlm_rsb *r;
3720         int error, ret_nodeid;
3721
3722         error = find_lkb(ls, ms->m_lkid, &lkb);
3723         if (error) {
3724                 log_error(ls, "receive_lookup_reply no lkb");
3725                 return;
3726         }
3727
3728         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3729            FIXME: will a non-zero error ever be returned? */
3730
3731         r = lkb->lkb_resource;
3732         hold_rsb(r);
3733         lock_rsb(r);
3734
3735         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3736         if (error)
3737                 goto out;
3738
3739         ret_nodeid = ms->m_nodeid;
3740         if (ret_nodeid == dlm_our_nodeid()) {
3741                 r->res_nodeid = 0;
3742                 ret_nodeid = 0;
3743                 r->res_first_lkid = 0;
3744         } else {
3745                 /* set_master() will copy res_nodeid to lkb_nodeid */
3746                 r->res_nodeid = ret_nodeid;
3747         }
3748
3749         if (is_overlap(lkb)) {
3750                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3751                           lkb->lkb_id, lkb->lkb_flags);
3752                 queue_cast_overlap(r, lkb);
3753                 unhold_lkb(lkb); /* undoes create_lkb() */
3754                 goto out_list;
3755         }
3756
3757         _request_lock(r, lkb);
3758
3759  out_list:
3760         if (!ret_nodeid)
3761                 process_lookup_list(r);
3762  out:
3763         unlock_rsb(r);
3764         put_rsb(r);
3765         dlm_put_lkb(lkb);
3766 }
3767
3768 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3769 {
3770         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3771                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3772                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3773                           ms->m_remid, ms->m_result);
3774                 return;
3775         }
3776
3777         switch (ms->m_type) {
3778
3779         /* messages sent to a master node */
3780
3781         case DLM_MSG_REQUEST:
3782                 receive_request(ls, ms);
3783                 break;
3784
3785         case DLM_MSG_CONVERT:
3786                 receive_convert(ls, ms);
3787                 break;
3788
3789         case DLM_MSG_UNLOCK:
3790                 receive_unlock(ls, ms);
3791                 break;
3792
3793         case DLM_MSG_CANCEL:
3794                 receive_cancel(ls, ms);
3795                 break;
3796
3797         /* messages sent from a master node (replies to above) */
3798
3799         case DLM_MSG_REQUEST_REPLY:
3800                 receive_request_reply(ls, ms);
3801                 break;
3802
3803         case DLM_MSG_CONVERT_REPLY:
3804                 receive_convert_reply(ls, ms);
3805                 break;
3806
3807         case DLM_MSG_UNLOCK_REPLY:
3808                 receive_unlock_reply(ls, ms);
3809                 break;
3810
3811         case DLM_MSG_CANCEL_REPLY:
3812                 receive_cancel_reply(ls, ms);
3813                 break;
3814
3815         /* messages sent from a master node (only two types of async msg) */
3816
3817         case DLM_MSG_GRANT:
3818                 receive_grant(ls, ms);
3819                 break;
3820
3821         case DLM_MSG_BAST:
3822                 receive_bast(ls, ms);
3823                 break;
3824
3825         /* messages sent to a dir node */
3826
3827         case DLM_MSG_LOOKUP:
3828                 receive_lookup(ls, ms);
3829                 break;
3830
3831         case DLM_MSG_REMOVE:
3832                 receive_remove(ls, ms);
3833                 break;
3834
3835         /* messages sent from a dir node (remove has no reply) */
3836
3837         case DLM_MSG_LOOKUP_REPLY:
3838                 receive_lookup_reply(ls, ms);
3839                 break;
3840
3841         /* other messages */
3842
3843         case DLM_MSG_PURGE:
3844                 receive_purge(ls, ms);
3845                 break;
3846
3847         default:
3848                 log_error(ls, "unknown message type %d", ms->m_type);
3849         }
3850
3851         dlm_astd_wake();
3852 }
3853
3854 /* If the lockspace is in recovery mode (locking stopped), then normal
3855    messages are saved on the requestqueue for processing after recovery is
3856    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3857    messages off the requestqueue before we process new ones. This occurs right
3858    after recovery completes when we transition from saving all messages on
3859    requestqueue, to processing all the saved messages, to processing new
3860    messages as they arrive. */
3861
3862 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3863                                 int nodeid)
3864 {
3865         if (dlm_locking_stopped(ls)) {
3866                 dlm_add_requestqueue(ls, nodeid, ms);
3867         } else {
3868                 dlm_wait_requestqueue(ls);
3869                 _receive_message(ls, ms);
3870         }
3871 }
3872
3873 /* This is called by dlm_recoverd to process messages that were saved on
3874    the requestqueue. */
3875
3876 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3877 {
3878         _receive_message(ls, ms);
3879 }
3880
3881 /* This is called by the midcomms layer when something is received for
3882    the lockspace.  It could be either a MSG (normal message sent as part of
3883    standard locking activity) or an RCOM (recovery message sent as part of
3884    lockspace recovery). */
3885
3886 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3887 {
3888         struct dlm_header *hd = &p->header;
3889         struct dlm_ls *ls;
3890         int type = 0;
3891
3892         switch (hd->h_cmd) {
3893         case DLM_MSG:
3894                 dlm_message_in(&p->message);
3895                 type = p->message.m_type;
3896                 break;
3897         case DLM_RCOM:
3898                 dlm_rcom_in(&p->rcom);
3899                 type = p->rcom.rc_type;
3900                 break;
3901         default:
3902                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3903                 return;
3904         }
3905
3906         if (hd->h_nodeid != nodeid) {
3907                 log_print("invalid h_nodeid %d from %d lockspace %x",
3908                           hd->h_nodeid, nodeid, hd->h_lockspace);
3909                 return;
3910         }
3911
3912         ls = dlm_find_lockspace_global(hd->h_lockspace);
3913         if (!ls) {
3914                 if (dlm_config.ci_log_debug)
3915                         log_print("invalid lockspace %x from %d cmd %d type %d",
3916                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
3917
3918                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3919                         dlm_send_ls_not_ready(nodeid, &p->rcom);
3920                 return;
3921         }
3922
3923         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3924            be inactive (in this ls) before transitioning to recovery mode */
3925
3926         down_read(&ls->ls_recv_active);
3927         if (hd->h_cmd == DLM_MSG)
3928                 dlm_receive_message(ls, &p->message, nodeid);
3929         else
3930                 dlm_receive_rcom(ls, &p->rcom, nodeid);
3931         up_read(&ls->ls_recv_active);
3932
3933         dlm_put_lockspace(ls);
3934 }
3935
3936 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3937 {
3938         if (middle_conversion(lkb)) {
3939                 hold_lkb(lkb);
3940                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3941                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3942                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3943                 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3944                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3945
3946                 /* Same special case as in receive_rcom_lock_args() */
3947                 lkb->lkb_grmode = DLM_LOCK_IV;
3948                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3949                 unhold_lkb(lkb);
3950
3951         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3952                 lkb->lkb_flags |= DLM_IFL_RESEND;
3953         }
3954
3955         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3956            conversions are async; there's no reply from the remote master */
3957 }
3958
3959 /* A waiting lkb needs recovery if the master node has failed, or
3960    the master node is changing (only when no directory is used) */
3961
3962 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3963 {
3964         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3965                 return 1;
3966
3967         if (!dlm_no_directory(ls))
3968                 return 0;
3969
3970         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3971                 return 1;
3972
3973         return 0;
3974 }
3975
3976 /* Recovery for locks that are waiting for replies from nodes that are now
3977    gone.  We can just complete unlocks and cancels by faking a reply from the
3978    dead node.  Requests and up-conversions we flag to be resent after
3979    recovery.  Down-conversions can just be completed with a fake reply like
3980    unlocks.  Conversions between PR and CW need special attention. */
3981
3982 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3983 {
3984         struct dlm_lkb *lkb, *safe;
3985         int wait_type, stub_unlock_result, stub_cancel_result;
3986
3987         mutex_lock(&ls->ls_waiters_mutex);
3988
3989         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3990                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3991                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3992
3993                 /* all outstanding lookups, regardless of destination  will be
3994                    resent after recovery is done */
3995
3996                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3997                         lkb->lkb_flags |= DLM_IFL_RESEND;
3998                         continue;
3999                 }
4000
4001                 if (!waiter_needs_recovery(ls, lkb))
4002                         continue;
4003
4004                 wait_type = lkb->lkb_wait_type;
4005                 stub_unlock_result = -DLM_EUNLOCK;
4006                 stub_cancel_result = -DLM_ECANCEL;
4007
4008                 /* Main reply may have been received leaving a zero wait_type,
4009                    but a reply for the overlapping op may not have been
4010                    received.  In that case we need to fake the appropriate
4011                    reply for the overlap op. */
4012
4013                 if (!wait_type) {
4014                         if (is_overlap_cancel(lkb)) {
4015                                 wait_type = DLM_MSG_CANCEL;
4016                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4017                                         stub_cancel_result = 0;
4018                         }
4019                         if (is_overlap_unlock(lkb)) {
4020                                 wait_type = DLM_MSG_UNLOCK;
4021                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4022                                         stub_unlock_result = -ENOENT;
4023                         }
4024
4025                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4026                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
4027                                   stub_cancel_result, stub_unlock_result);
4028                 }
4029
4030                 switch (wait_type) {
4031
4032                 case DLM_MSG_REQUEST:
4033                         lkb->lkb_flags |= DLM_IFL_RESEND;
4034                         break;
4035
4036                 case DLM_MSG_CONVERT:
4037                         recover_convert_waiter(ls, lkb);
4038                         break;
4039
4040                 case DLM_MSG_UNLOCK:
4041                         hold_lkb(lkb);
4042                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4043                         ls->ls_stub_ms.m_result = stub_unlock_result;
4044                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4045                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4046                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4047                         dlm_put_lkb(lkb);
4048                         break;
4049
4050                 case DLM_MSG_CANCEL:
4051                         hold_lkb(lkb);
4052                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4053                         ls->ls_stub_ms.m_result = stub_cancel_result;
4054                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4055                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4056                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4057                         dlm_put_lkb(lkb);
4058                         break;
4059
4060                 default:
4061                         log_error(ls, "invalid lkb wait_type %d %d",
4062                                   lkb->lkb_wait_type, wait_type);
4063                 }
4064                 schedule();
4065         }
4066         mutex_unlock(&ls->ls_waiters_mutex);
4067 }
4068
4069 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4070 {
4071         struct dlm_lkb *lkb;
4072         int found = 0;
4073
4074         mutex_lock(&ls->ls_waiters_mutex);
4075         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4076                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4077                         hold_lkb(lkb);
4078                         found = 1;
4079                         break;
4080                 }
4081         }
4082         mutex_unlock(&ls->ls_waiters_mutex);
4083
4084         if (!found)
4085                 lkb = NULL;
4086         return lkb;
4087 }
4088
4089 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4090    master or dir-node for r.  Processing the lkb may result in it being placed
4091    back on waiters. */
4092
4093 /* We do this after normal locking has been enabled and any saved messages
4094    (in requestqueue) have been processed.  We should be confident that at
4095    this point we won't get or process a reply to any of these waiting
4096    operations.  But, new ops may be coming in on the rsbs/locks here from
4097    userspace or remotely. */
4098
4099 /* there may have been an overlap unlock/cancel prior to recovery or after
4100    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4101    overlap flag would just have been set and nothing new sent.  we can be
4102    confident here than any replies to either the initial op or overlap ops
4103    prior to recovery have been received. */
4104
4105 int dlm_recover_waiters_post(struct dlm_ls *ls)
4106 {
4107         struct dlm_lkb *lkb;
4108         struct dlm_rsb *r;
4109         int error = 0, mstype, err, oc, ou;
4110
4111         while (1) {
4112                 if (dlm_locking_stopped(ls)) {
4113                         log_debug(ls, "recover_waiters_post aborted");
4114                         error = -EINTR;
4115                         break;
4116                 }
4117
4118                 lkb = find_resend_waiter(ls);
4119                 if (!lkb)
4120                         break;
4121
4122                 r = lkb->lkb_resource;
4123                 hold_rsb(r);
4124                 lock_rsb(r);
4125
4126                 mstype = lkb->lkb_wait_type;
4127                 oc = is_overlap_cancel(lkb);
4128                 ou = is_overlap_unlock(lkb);
4129                 err = 0;
4130
4131                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4132                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4133
4134                 /* At this point we assume that we won't get a reply to any
4135                    previous op or overlap op on this lock.  First, do a big
4136                    remove_from_waiters() for all previous ops. */
4137
4138                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4139                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4140                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4141                 lkb->lkb_wait_type = 0;
4142                 lkb->lkb_wait_count = 0;
4143                 mutex_lock(&ls->ls_waiters_mutex);
4144                 list_del_init(&lkb->lkb_wait_reply);
4145                 mutex_unlock(&ls->ls_waiters_mutex);
4146                 unhold_lkb(lkb); /* for waiters list */
4147
4148                 if (oc || ou) {
4149                         /* do an unlock or cancel instead of resending */
4150                         switch (mstype) {
4151                         case DLM_MSG_LOOKUP:
4152                         case DLM_MSG_REQUEST:
4153                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4154                                                         -DLM_ECANCEL);
4155                                 unhold_lkb(lkb); /* undoes create_lkb() */
4156                                 break;
4157                         case DLM_MSG_CONVERT:
4158                                 if (oc) {
4159                                         queue_cast(r, lkb, -DLM_ECANCEL);
4160                                 } else {
4161                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4162                                         _unlock_lock(r, lkb);
4163                                 }
4164                                 break;
4165                         default:
4166                                 err = 1;
4167                         }
4168                 } else {
4169                         switch (mstype) {
4170                         case DLM_MSG_LOOKUP:
4171                         case DLM_MSG_REQUEST:
4172                                 _request_lock(r, lkb);
4173                                 if (is_master(r))
4174                                         confirm_master(r, 0);
4175                                 break;
4176                         case DLM_MSG_CONVERT:
4177                                 _convert_lock(r, lkb);
4178                                 break;
4179                         default:
4180                                 err = 1;
4181                         }
4182                 }
4183
4184                 if (err)
4185                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4186                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4187                 unlock_rsb(r);
4188                 put_rsb(r);
4189                 dlm_put_lkb(lkb);
4190         }
4191
4192         return error;
4193 }
4194
4195 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4196                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4197 {
4198         struct dlm_ls *ls = r->res_ls;
4199         struct dlm_lkb *lkb, *safe;
4200
4201         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4202                 if (test(ls, lkb)) {
4203                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4204                         del_lkb(r, lkb);
4205                         /* this put should free the lkb */
4206                         if (!dlm_put_lkb(lkb))
4207                                 log_error(ls, "purged lkb not released");
4208                 }
4209         }
4210 }
4211
4212 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4213 {
4214         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4215 }
4216
4217 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4218 {
4219         return is_master_copy(lkb);
4220 }
4221
4222 static void purge_dead_locks(struct dlm_rsb *r)
4223 {
4224         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4225         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4226         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4227 }
4228
4229 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4230 {
4231         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4232         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4233         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4234 }
4235
4236 /* Get rid of locks held by nodes that are gone. */
4237
4238 int dlm_purge_locks(struct dlm_ls *ls)
4239 {
4240         struct dlm_rsb *r;
4241
4242         log_debug(ls, "dlm_purge_locks");
4243
4244         down_write(&ls->ls_root_sem);
4245         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4246                 hold_rsb(r);
4247                 lock_rsb(r);
4248                 if (is_master(r))
4249                         purge_dead_locks(r);
4250                 unlock_rsb(r);
4251                 unhold_rsb(r);
4252
4253                 schedule();
4254         }
4255         up_write(&ls->ls_root_sem);
4256
4257         return 0;
4258 }
4259
4260 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4261 {
4262         struct dlm_rsb *r, *r_ret = NULL;
4263
4264         spin_lock(&ls->ls_rsbtbl[bucket].lock);
4265         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4266                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4267                         continue;
4268                 hold_rsb(r);
4269                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4270                 r_ret = r;
4271                 break;
4272         }
4273         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4274         return r_ret;
4275 }
4276
4277 void dlm_grant_after_purge(struct dlm_ls *ls)
4278 {
4279         struct dlm_rsb *r;
4280         int bucket = 0;
4281
4282         while (1) {
4283                 r = find_purged_rsb(ls, bucket);
4284                 if (!r) {
4285                         if (bucket == ls->ls_rsbtbl_size - 1)
4286                                 break;
4287                         bucket++;
4288                         continue;
4289                 }
4290                 lock_rsb(r);
4291                 if (is_master(r)) {
4292                         grant_pending_locks(r);
4293                         confirm_master(r, 0);
4294                 }
4295                 unlock_rsb(r);
4296                 put_rsb(r);
4297                 schedule();
4298         }
4299 }
4300
4301 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4302                                          uint32_t remid)
4303 {
4304         struct dlm_lkb *lkb;
4305
4306         list_for_each_entry(lkb, head, lkb_statequeue) {
4307                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4308                         return lkb;
4309         }
4310         return NULL;
4311 }
4312
4313 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4314                                     uint32_t remid)
4315 {
4316         struct dlm_lkb *lkb;
4317
4318         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4319         if (lkb)
4320                 return lkb;
4321         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4322         if (lkb)
4323                 return lkb;
4324         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4325         if (lkb)
4326                 return lkb;
4327         return NULL;
4328 }
4329
4330 /* needs at least dlm_rcom + rcom_lock */
4331 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4332                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4333 {
4334         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4335
4336         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4337         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4338         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4339         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4340         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4341         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4342         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4343         lkb->lkb_rqmode = rl->rl_rqmode;
4344         lkb->lkb_grmode = rl->rl_grmode;
4345         /* don't set lkb_status because add_lkb wants to itself */
4346
4347         lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4348         lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4349
4350         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4351                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4352                          sizeof(struct rcom_lock);
4353                 if (lvblen > ls->ls_lvblen)
4354                         return -EINVAL;
4355                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4356                 if (!lkb->lkb_lvbptr)
4357                         return -ENOMEM;
4358                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4359         }
4360
4361         /* Conversions between PR and CW (middle modes) need special handling.
4362            The real granted mode of these converting locks cannot be determined
4363            until all locks have been rebuilt on the rsb (recover_conversion) */
4364
4365         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4366             middle_conversion(lkb)) {
4367                 rl->rl_status = DLM_LKSTS_CONVERT;
4368                 lkb->lkb_grmode = DLM_LOCK_IV;
4369                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4370         }
4371
4372         return 0;
4373 }
4374
4375 /* This lkb may have been recovered in a previous aborted recovery so we need
4376    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4377    If so we just send back a standard reply.  If not, we create a new lkb with
4378    the given values and send back our lkid.  We send back our lkid by sending
4379    back the rcom_lock struct we got but with the remid field filled in. */
4380
4381 /* needs at least dlm_rcom + rcom_lock */
4382 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4383 {
4384         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4385         struct dlm_rsb *r;
4386         struct dlm_lkb *lkb;
4387         int error;
4388
4389         if (rl->rl_parent_lkid) {
4390                 error = -EOPNOTSUPP;
4391                 goto out;
4392         }
4393
4394         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4395                          R_MASTER, &r);
4396         if (error)
4397                 goto out;
4398
4399         lock_rsb(r);
4400
4401         lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4402         if (lkb) {
4403                 error = -EEXIST;
4404                 goto out_remid;
4405         }
4406
4407         error = create_lkb(ls, &lkb);
4408         if (error)
4409                 goto out_unlock;
4410
4411         error = receive_rcom_lock_args(ls, lkb, r, rc);
4412         if (error) {
4413                 __put_lkb(ls, lkb);
4414                 goto out_unlock;
4415         }
4416
4417         attach_lkb(r, lkb);
4418         add_lkb(r, lkb, rl->rl_status);
4419         error = 0;
4420
4421  out_remid:
4422         /* this is the new value returned to the lock holder for
4423            saving in its process-copy lkb */
4424         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4425
4426  out_unlock:
4427         unlock_rsb(r);
4428         put_rsb(r);
4429  out:
4430         if (error)
4431                 log_debug(ls, "recover_master_copy %d %x", error,
4432                           le32_to_cpu(rl->rl_lkid));
4433         rl->rl_result = cpu_to_le32(error);
4434         return error;
4435 }
4436
4437 /* needs at least dlm_rcom + rcom_lock */
4438 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4439 {
4440         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4441         struct dlm_rsb *r;
4442         struct dlm_lkb *lkb;
4443         int error;
4444
4445         error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4446         if (error) {
4447                 log_error(ls, "recover_process_copy no lkid %x",
4448                                 le32_to_cpu(rl->rl_lkid));
4449                 return error;
4450         }
4451
4452         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4453
4454         error = le32_to_cpu(rl->rl_result);
4455
4456         r = lkb->lkb_resource;
4457         hold_rsb(r);
4458         lock_rsb(r);
4459
4460         switch (error) {
4461         case -EBADR:
4462                 /* There's a chance the new master received our lock before
4463                    dlm_recover_master_reply(), this wouldn't happen if we did
4464                    a barrier between recover_masters and recover_locks. */
4465                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4466                           (unsigned long)r, r->res_name);
4467                 dlm_send_rcom_lock(r, lkb);
4468                 goto out;
4469         case -EEXIST:
4470                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4471                 /* fall through */
4472         case 0:
4473                 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4474                 break;
4475         default:
4476                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4477                           error, lkb->lkb_id);
4478         }
4479
4480         /* an ack for dlm_recover_locks() which waits for replies from
4481            all the locks it sends to new masters */
4482         dlm_recovered_lock(r);
4483  out:
4484         unlock_rsb(r);
4485         put_rsb(r);
4486         dlm_put_lkb(lkb);
4487
4488         return 0;
4489 }
4490
4491 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4492                      int mode, uint32_t flags, void *name, unsigned int namelen,
4493                      unsigned long timeout_cs)
4494 {
4495         struct dlm_lkb *lkb;
4496         struct dlm_args args;
4497         int error;
4498
4499         dlm_lock_recovery(ls);
4500
4501         error = create_lkb(ls, &lkb);
4502         if (error) {
4503                 kfree(ua);
4504                 goto out;
4505         }
4506
4507         if (flags & DLM_LKF_VALBLK) {
4508                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4509                 if (!ua->lksb.sb_lvbptr) {
4510                         kfree(ua);
4511                         __put_lkb(ls, lkb);
4512                         error = -ENOMEM;
4513                         goto out;
4514                 }
4515         }
4516
4517         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4518            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4519            lock and that lkb_astparam is the dlm_user_args structure. */
4520
4521         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4522                               fake_astfn, ua, fake_bastfn, &args);
4523         lkb->lkb_flags |= DLM_IFL_USER;
4524         ua->old_mode = DLM_LOCK_IV;
4525
4526         if (error) {
4527                 __put_lkb(ls, lkb);
4528                 goto out;
4529         }
4530
4531         error = request_lock(ls, lkb, name, namelen, &args);
4532
4533         switch (error) {
4534         case 0:
4535                 break;
4536         case -EINPROGRESS:
4537                 error = 0;
4538                 break;
4539         case -EAGAIN:
4540                 error = 0;
4541                 /* fall through */
4542         default:
4543                 __put_lkb(ls, lkb);
4544                 goto out;
4545         }
4546
4547         /* add this new lkb to the per-process list of locks */
4548         spin_lock(&ua->proc->locks_spin);
4549         hold_lkb(lkb);
4550         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4551         spin_unlock(&ua->proc->locks_spin);
4552  out:
4553         dlm_unlock_recovery(ls);
4554         return error;
4555 }
4556
4557 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4558                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4559                      unsigned long timeout_cs)
4560 {
4561         struct dlm_lkb *lkb;
4562         struct dlm_args args;
4563         struct dlm_user_args *ua;
4564         int error;
4565
4566         dlm_lock_recovery(ls);
4567
4568         error = find_lkb(ls, lkid, &lkb);
4569         if (error)
4570                 goto out;
4571
4572         /* user can change the params on its lock when it converts it, or
4573            add an lvb that didn't exist before */
4574
4575         ua = lkb->lkb_ua;
4576
4577         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4578                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4579                 if (!ua->lksb.sb_lvbptr) {
4580                         error = -ENOMEM;
4581                         goto out_put;
4582                 }
4583         }
4584         if (lvb_in && ua->lksb.sb_lvbptr)
4585                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4586
4587         ua->xid = ua_tmp->xid;
4588         ua->castparam = ua_tmp->castparam;
4589         ua->castaddr = ua_tmp->castaddr;
4590         ua->bastparam = ua_tmp->bastparam;
4591         ua->bastaddr = ua_tmp->bastaddr;
4592         ua->user_lksb = ua_tmp->user_lksb;
4593         ua->old_mode = lkb->lkb_grmode;
4594
4595         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4596                               fake_astfn, ua, fake_bastfn, &args);
4597         if (error)
4598                 goto out_put;
4599
4600         error = convert_lock(ls, lkb, &args);
4601
4602         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4603                 error = 0;
4604  out_put:
4605         dlm_put_lkb(lkb);
4606  out:
4607         dlm_unlock_recovery(ls);
4608         kfree(ua_tmp);
4609         return error;
4610 }
4611
4612 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4613                     uint32_t flags, uint32_t lkid, char *lvb_in)
4614 {
4615         struct dlm_lkb *lkb;
4616         struct dlm_args args;
4617         struct dlm_user_args *ua;
4618         int error;
4619
4620         dlm_lock_recovery(ls);
4621
4622         error = find_lkb(ls, lkid, &lkb);
4623         if (error)
4624                 goto out;
4625
4626         ua = lkb->lkb_ua;
4627
4628         if (lvb_in && ua->lksb.sb_lvbptr)
4629                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4630         if (ua_tmp->castparam)
4631                 ua->castparam = ua_tmp->castparam;
4632         ua->user_lksb = ua_tmp->user_lksb;
4633
4634         error = set_unlock_args(flags, ua, &args);
4635         if (error)
4636                 goto out_put;
4637
4638         error = unlock_lock(ls, lkb, &args);
4639
4640         if (error == -DLM_EUNLOCK)
4641                 error = 0;
4642         /* from validate_unlock_args() */
4643         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4644                 error = 0;
4645         if (error)
4646                 goto out_put;
4647
4648         spin_lock(&ua->proc->locks_spin);
4649         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4650         if (!list_empty(&lkb->lkb_ownqueue))
4651                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4652         spin_unlock(&ua->proc->locks_spin);
4653  out_put:
4654         dlm_put_lkb(lkb);
4655  out:
4656         dlm_unlock_recovery(ls);
4657         kfree(ua_tmp);
4658         return error;
4659 }
4660
4661 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4662                     uint32_t flags, uint32_t lkid)
4663 {
4664         struct dlm_lkb *lkb;
4665         struct dlm_args args;
4666         struct dlm_user_args *ua;
4667         int error;
4668
4669         dlm_lock_recovery(ls);
4670
4671         error = find_lkb(ls, lkid, &lkb);
4672         if (error)
4673                 goto out;
4674
4675         ua = lkb->lkb_ua;
4676         if (ua_tmp->castparam)
4677                 ua->castparam = ua_tmp->castparam;
4678         ua->user_lksb = ua_tmp->user_lksb;
4679
4680         error = set_unlock_args(flags, ua, &args);
4681         if (error)
4682                 goto out_put;
4683
4684         error = cancel_lock(ls, lkb, &args);
4685
4686         if (error == -DLM_ECANCEL)
4687                 error = 0;
4688         /* from validate_unlock_args() */
4689         if (error == -EBUSY)
4690                 error = 0;
4691  out_put:
4692         dlm_put_lkb(lkb);
4693  out:
4694         dlm_unlock_recovery(ls);
4695         kfree(ua_tmp);
4696         return error;
4697 }
4698
4699 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4700 {
4701         struct dlm_lkb *lkb;
4702         struct dlm_args args;
4703         struct dlm_user_args *ua;
4704         struct dlm_rsb *r;
4705         int error;
4706
4707         dlm_lock_recovery(ls);
4708
4709         error = find_lkb(ls, lkid, &lkb);
4710         if (error)
4711                 goto out;
4712
4713         ua = lkb->lkb_ua;
4714
4715         error = set_unlock_args(flags, ua, &args);
4716         if (error)
4717                 goto out_put;
4718
4719         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4720
4721         r = lkb->lkb_resource;
4722         hold_rsb(r);
4723         lock_rsb(r);
4724
4725         error = validate_unlock_args(lkb, &args);
4726         if (error)
4727                 goto out_r;
4728         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4729
4730         error = _cancel_lock(r, lkb);
4731  out_r:
4732         unlock_rsb(r);
4733         put_rsb(r);
4734
4735         if (error == -DLM_ECANCEL)
4736                 error = 0;
4737         /* from validate_unlock_args() */
4738         if (error == -EBUSY)
4739                 error = 0;
4740  out_put:
4741         dlm_put_lkb(lkb);
4742  out:
4743         dlm_unlock_recovery(ls);
4744         return error;
4745 }
4746
4747 /* lkb's that are removed from the waiters list by revert are just left on the
4748    orphans list with the granted orphan locks, to be freed by purge */
4749
4750 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4751 {
4752         struct dlm_args args;
4753         int error;
4754
4755         hold_lkb(lkb);
4756         mutex_lock(&ls->ls_orphans_mutex);
4757         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4758         mutex_unlock(&ls->ls_orphans_mutex);
4759
4760         set_unlock_args(0, lkb->lkb_ua, &args);
4761
4762         error = cancel_lock(ls, lkb, &args);
4763         if (error == -DLM_ECANCEL)
4764                 error = 0;
4765         return error;
4766 }
4767
4768 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4769    Regardless of what rsb queue the lock is on, it's removed and freed. */
4770
4771 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4772 {
4773         struct dlm_args args;
4774         int error;
4775
4776         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4777
4778         error = unlock_lock(ls, lkb, &args);
4779         if (error == -DLM_EUNLOCK)
4780                 error = 0;
4781         return error;
4782 }
4783
4784 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4785    (which does lock_rsb) due to deadlock with receiving a message that does
4786    lock_rsb followed by dlm_user_add_ast() */
4787
4788 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4789                                      struct dlm_user_proc *proc)
4790 {
4791         struct dlm_lkb *lkb = NULL;
4792
4793         mutex_lock(&ls->ls_clear_proc_locks);
4794         if (list_empty(&proc->locks))
4795                 goto out;
4796
4797         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4798         list_del_init(&lkb->lkb_ownqueue);
4799
4800         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4801                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4802         else
4803                 lkb->lkb_flags |= DLM_IFL_DEAD;
4804  out:
4805         mutex_unlock(&ls->ls_clear_proc_locks);
4806         return lkb;
4807 }
4808
4809 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4810    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4811    which we clear here. */
4812
4813 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4814    list, and no more device_writes should add lkb's to proc->locks list; so we
4815    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4816    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4817    them ourself. */
4818
4819 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4820 {
4821         struct dlm_lkb *lkb, *safe;
4822
4823         dlm_lock_recovery(ls);
4824
4825         while (1) {
4826                 lkb = del_proc_lock(ls, proc);
4827                 if (!lkb)
4828                         break;
4829                 del_timeout(lkb);
4830                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4831                         orphan_proc_lock(ls, lkb);
4832                 else
4833                         unlock_proc_lock(ls, lkb);
4834
4835                 /* this removes the reference for the proc->locks list
4836                    added by dlm_user_request, it may result in the lkb
4837                    being freed */
4838
4839                 dlm_put_lkb(lkb);
4840         }
4841
4842         mutex_lock(&ls->ls_clear_proc_locks);
4843
4844         /* in-progress unlocks */
4845         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4846                 list_del_init(&lkb->lkb_ownqueue);
4847                 lkb->lkb_flags |= DLM_IFL_DEAD;
4848                 dlm_put_lkb(lkb);
4849         }
4850
4851         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4852                 lkb->lkb_ast_type = 0;
4853                 list_del(&lkb->lkb_astqueue);
4854                 dlm_put_lkb(lkb);
4855         }
4856
4857         mutex_unlock(&ls->ls_clear_proc_locks);
4858         dlm_unlock_recovery(ls);
4859 }
4860
4861 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4862 {
4863         struct dlm_lkb *lkb, *safe;
4864
4865         while (1) {
4866                 lkb = NULL;
4867                 spin_lock(&proc->locks_spin);
4868                 if (!list_empty(&proc->locks)) {
4869                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4870                                          lkb_ownqueue);
4871                         list_del_init(&lkb->lkb_ownqueue);
4872                 }
4873                 spin_unlock(&proc->locks_spin);
4874
4875                 if (!lkb)
4876                         break;
4877
4878                 lkb->lkb_flags |= DLM_IFL_DEAD;
4879                 unlock_proc_lock(ls, lkb);
4880                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4881         }
4882
4883         spin_lock(&proc->locks_spin);
4884         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4885                 list_del_init(&lkb->lkb_ownqueue);
4886                 lkb->lkb_flags |= DLM_IFL_DEAD;
4887                 dlm_put_lkb(lkb);
4888         }
4889         spin_unlock(&proc->locks_spin);
4890
4891         spin_lock(&proc->asts_spin);
4892         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4893                 list_del(&lkb->lkb_astqueue);
4894                 dlm_put_lkb(lkb);
4895         }
4896         spin_unlock(&proc->asts_spin);
4897 }
4898
4899 /* pid of 0 means purge all orphans */
4900
4901 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4902 {
4903         struct dlm_lkb *lkb, *safe;
4904
4905         mutex_lock(&ls->ls_orphans_mutex);
4906         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4907                 if (pid && lkb->lkb_ownpid != pid)
4908                         continue;
4909                 unlock_proc_lock(ls, lkb);
4910                 list_del_init(&lkb->lkb_ownqueue);
4911                 dlm_put_lkb(lkb);
4912         }
4913         mutex_unlock(&ls->ls_orphans_mutex);
4914 }
4915
4916 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4917 {
4918         struct dlm_message *ms;
4919         struct dlm_mhandle *mh;
4920         int error;
4921
4922         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4923                                 DLM_MSG_PURGE, &ms, &mh);
4924         if (error)
4925                 return error;
4926         ms->m_nodeid = nodeid;
4927         ms->m_pid = pid;
4928
4929         return send_message(mh, ms);
4930 }
4931
4932 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4933                    int nodeid, int pid)
4934 {
4935         int error = 0;
4936
4937         if (nodeid != dlm_our_nodeid()) {
4938                 error = send_purge(ls, nodeid, pid);
4939         } else {
4940                 dlm_lock_recovery(ls);
4941                 if (pid == current->pid)
4942                         purge_proc_locks(ls, proc);
4943                 else
4944                         do_purge(ls, nodeid, pid);
4945                 dlm_unlock_recovery(ls);
4946         }
4947         return error;
4948 }
4949