dlm: improve how bast mode handling
[linux-2.6] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87                                     struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132
133 #define modes_compat(gr, rq) \
134         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167
168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171                r->res_nodeid, r->res_flags, r->res_first_lkid,
172                r->res_recover_locks_count, r->res_name);
173 }
174
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177         struct dlm_lkb *lkb;
178
179         dlm_print_rsb(r);
180
181         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183         printk(KERN_ERR "rsb lookup list\n");
184         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185                 dlm_print_lkb(lkb);
186         printk(KERN_ERR "rsb grant queue:\n");
187         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb convert queue:\n");
190         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb wait queue:\n");
193         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195 }
196
197 /* Threads cannot use the lockspace while it's being recovered */
198
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201         down_read(&ls->ls_in_recovery);
202 }
203
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206         up_read(&ls->ls_in_recovery);
207 }
208
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211         return down_read_trylock(&ls->ls_in_recovery);
212 }
213
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242         return !!r->res_nodeid;
243 }
244
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261                 return 1;
262         return 0;
263 }
264
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283                                   DLM_IFL_OVERLAP_CANCEL));
284 }
285
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288         if (is_master_copy(lkb))
289                 return;
290
291         del_timeout(lkb);
292
293         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294
295         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
296            timeout caused the cancel then return -ETIMEDOUT */
297         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299                 rv = -ETIMEDOUT;
300         }
301
302         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304                 rv = -EDEADLK;
305         }
306
307         lkb->lkb_lksb->sb_status = rv;
308         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309
310         dlm_add_ast(lkb, AST_COMP, 0);
311 }
312
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315         queue_cast(r, lkb,
316                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321         if (is_master_copy(lkb))
322                 send_bast(r, lkb, rqmode);
323         else
324                 dlm_add_ast(lkb, AST_BAST, rqmode);
325 }
326
327 /*
328  * Basic operations on rsb's and lkb's
329  */
330
331 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
332 {
333         struct dlm_rsb *r;
334
335         r = dlm_allocate_rsb(ls, len);
336         if (!r)
337                 return NULL;
338
339         r->res_ls = ls;
340         r->res_length = len;
341         memcpy(r->res_name, name, len);
342         mutex_init(&r->res_mutex);
343
344         INIT_LIST_HEAD(&r->res_lookup);
345         INIT_LIST_HEAD(&r->res_grantqueue);
346         INIT_LIST_HEAD(&r->res_convertqueue);
347         INIT_LIST_HEAD(&r->res_waitqueue);
348         INIT_LIST_HEAD(&r->res_root_list);
349         INIT_LIST_HEAD(&r->res_recover_list);
350
351         return r;
352 }
353
354 static int search_rsb_list(struct list_head *head, char *name, int len,
355                            unsigned int flags, struct dlm_rsb **r_ret)
356 {
357         struct dlm_rsb *r;
358         int error = 0;
359
360         list_for_each_entry(r, head, res_hashchain) {
361                 if (len == r->res_length && !memcmp(name, r->res_name, len))
362                         goto found;
363         }
364         *r_ret = NULL;
365         return -EBADR;
366
367  found:
368         if (r->res_nodeid && (flags & R_MASTER))
369                 error = -ENOTBLK;
370         *r_ret = r;
371         return error;
372 }
373
374 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
375                        unsigned int flags, struct dlm_rsb **r_ret)
376 {
377         struct dlm_rsb *r;
378         int error;
379
380         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
381         if (!error) {
382                 kref_get(&r->res_ref);
383                 goto out;
384         }
385         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
386         if (error)
387                 goto out;
388
389         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
390
391         if (dlm_no_directory(ls))
392                 goto out;
393
394         if (r->res_nodeid == -1) {
395                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
396                 r->res_first_lkid = 0;
397         } else if (r->res_nodeid > 0) {
398                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
399                 r->res_first_lkid = 0;
400         } else {
401                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
402                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
403         }
404  out:
405         *r_ret = r;
406         return error;
407 }
408
409 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
410                       unsigned int flags, struct dlm_rsb **r_ret)
411 {
412         int error;
413         write_lock(&ls->ls_rsbtbl[b].lock);
414         error = _search_rsb(ls, name, len, b, flags, r_ret);
415         write_unlock(&ls->ls_rsbtbl[b].lock);
416         return error;
417 }
418
419 /*
420  * Find rsb in rsbtbl and potentially create/add one
421  *
422  * Delaying the release of rsb's has a similar benefit to applications keeping
423  * NL locks on an rsb, but without the guarantee that the cached master value
424  * will still be valid when the rsb is reused.  Apps aren't always smart enough
425  * to keep NL locks on an rsb that they may lock again shortly; this can lead
426  * to excessive master lookups and removals if we don't delay the release.
427  *
428  * Searching for an rsb means looking through both the normal list and toss
429  * list.  When found on the toss list the rsb is moved to the normal list with
430  * ref count of 1; when found on normal list the ref count is incremented.
431  */
432
433 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
434                     unsigned int flags, struct dlm_rsb **r_ret)
435 {
436         struct dlm_rsb *r, *tmp;
437         uint32_t hash, bucket;
438         int error = -EINVAL;
439
440         if (namelen > DLM_RESNAME_MAXLEN)
441                 goto out;
442
443         if (dlm_no_directory(ls))
444                 flags |= R_CREATE;
445
446         error = 0;
447         hash = jhash(name, namelen, 0);
448         bucket = hash & (ls->ls_rsbtbl_size - 1);
449
450         error = search_rsb(ls, name, namelen, bucket, flags, &r);
451         if (!error)
452                 goto out;
453
454         if (error == -EBADR && !(flags & R_CREATE))
455                 goto out;
456
457         /* the rsb was found but wasn't a master copy */
458         if (error == -ENOTBLK)
459                 goto out;
460
461         error = -ENOMEM;
462         r = create_rsb(ls, name, namelen);
463         if (!r)
464                 goto out;
465
466         r->res_hash = hash;
467         r->res_bucket = bucket;
468         r->res_nodeid = -1;
469         kref_init(&r->res_ref);
470
471         /* With no directory, the master can be set immediately */
472         if (dlm_no_directory(ls)) {
473                 int nodeid = dlm_dir_nodeid(r);
474                 if (nodeid == dlm_our_nodeid())
475                         nodeid = 0;
476                 r->res_nodeid = nodeid;
477         }
478
479         write_lock(&ls->ls_rsbtbl[bucket].lock);
480         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
481         if (!error) {
482                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
483                 dlm_free_rsb(r);
484                 r = tmp;
485                 goto out;
486         }
487         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
488         write_unlock(&ls->ls_rsbtbl[bucket].lock);
489         error = 0;
490  out:
491         *r_ret = r;
492         return error;
493 }
494
495 /* This is only called to add a reference when the code already holds
496    a valid reference to the rsb, so there's no need for locking. */
497
498 static inline void hold_rsb(struct dlm_rsb *r)
499 {
500         kref_get(&r->res_ref);
501 }
502
503 void dlm_hold_rsb(struct dlm_rsb *r)
504 {
505         hold_rsb(r);
506 }
507
508 static void toss_rsb(struct kref *kref)
509 {
510         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
511         struct dlm_ls *ls = r->res_ls;
512
513         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
514         kref_init(&r->res_ref);
515         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
516         r->res_toss_time = jiffies;
517         if (r->res_lvbptr) {
518                 dlm_free_lvb(r->res_lvbptr);
519                 r->res_lvbptr = NULL;
520         }
521 }
522
523 /* When all references to the rsb are gone it's transfered to
524    the tossed list for later disposal. */
525
526 static void put_rsb(struct dlm_rsb *r)
527 {
528         struct dlm_ls *ls = r->res_ls;
529         uint32_t bucket = r->res_bucket;
530
531         write_lock(&ls->ls_rsbtbl[bucket].lock);
532         kref_put(&r->res_ref, toss_rsb);
533         write_unlock(&ls->ls_rsbtbl[bucket].lock);
534 }
535
536 void dlm_put_rsb(struct dlm_rsb *r)
537 {
538         put_rsb(r);
539 }
540
541 /* See comment for unhold_lkb */
542
543 static void unhold_rsb(struct dlm_rsb *r)
544 {
545         int rv;
546         rv = kref_put(&r->res_ref, toss_rsb);
547         DLM_ASSERT(!rv, dlm_dump_rsb(r););
548 }
549
550 static void kill_rsb(struct kref *kref)
551 {
552         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
553
554         /* All work is done after the return from kref_put() so we
555            can release the write_lock before the remove and free. */
556
557         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
558         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
559         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
560         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
561         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
562         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
563 }
564
565 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
566    The rsb must exist as long as any lkb's for it do. */
567
568 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
569 {
570         hold_rsb(r);
571         lkb->lkb_resource = r;
572 }
573
574 static void detach_lkb(struct dlm_lkb *lkb)
575 {
576         if (lkb->lkb_resource) {
577                 put_rsb(lkb->lkb_resource);
578                 lkb->lkb_resource = NULL;
579         }
580 }
581
582 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
583 {
584         struct dlm_lkb *lkb, *tmp;
585         uint32_t lkid = 0;
586         uint16_t bucket;
587
588         lkb = dlm_allocate_lkb(ls);
589         if (!lkb)
590                 return -ENOMEM;
591
592         lkb->lkb_nodeid = -1;
593         lkb->lkb_grmode = DLM_LOCK_IV;
594         kref_init(&lkb->lkb_ref);
595         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
596         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
597         INIT_LIST_HEAD(&lkb->lkb_time_list);
598
599         get_random_bytes(&bucket, sizeof(bucket));
600         bucket &= (ls->ls_lkbtbl_size - 1);
601
602         write_lock(&ls->ls_lkbtbl[bucket].lock);
603
604         /* counter can roll over so we must verify lkid is not in use */
605
606         while (lkid == 0) {
607                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
608
609                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
610                                     lkb_idtbl_list) {
611                         if (tmp->lkb_id != lkid)
612                                 continue;
613                         lkid = 0;
614                         break;
615                 }
616         }
617
618         lkb->lkb_id = lkid;
619         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
620         write_unlock(&ls->ls_lkbtbl[bucket].lock);
621
622         *lkb_ret = lkb;
623         return 0;
624 }
625
626 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
627 {
628         struct dlm_lkb *lkb;
629         uint16_t bucket = (lkid >> 16);
630
631         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
632                 if (lkb->lkb_id == lkid)
633                         return lkb;
634         }
635         return NULL;
636 }
637
638 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
639 {
640         struct dlm_lkb *lkb;
641         uint16_t bucket = (lkid >> 16);
642
643         if (bucket >= ls->ls_lkbtbl_size)
644                 return -EBADSLT;
645
646         read_lock(&ls->ls_lkbtbl[bucket].lock);
647         lkb = __find_lkb(ls, lkid);
648         if (lkb)
649                 kref_get(&lkb->lkb_ref);
650         read_unlock(&ls->ls_lkbtbl[bucket].lock);
651
652         *lkb_ret = lkb;
653         return lkb ? 0 : -ENOENT;
654 }
655
656 static void kill_lkb(struct kref *kref)
657 {
658         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
659
660         /* All work is done after the return from kref_put() so we
661            can release the write_lock before the detach_lkb */
662
663         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
664 }
665
666 /* __put_lkb() is used when an lkb may not have an rsb attached to
667    it so we need to provide the lockspace explicitly */
668
669 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
670 {
671         uint16_t bucket = (lkb->lkb_id >> 16);
672
673         write_lock(&ls->ls_lkbtbl[bucket].lock);
674         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
675                 list_del(&lkb->lkb_idtbl_list);
676                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
677
678                 detach_lkb(lkb);
679
680                 /* for local/process lkbs, lvbptr points to caller's lksb */
681                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
682                         dlm_free_lvb(lkb->lkb_lvbptr);
683                 dlm_free_lkb(lkb);
684                 return 1;
685         } else {
686                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
687                 return 0;
688         }
689 }
690
691 int dlm_put_lkb(struct dlm_lkb *lkb)
692 {
693         struct dlm_ls *ls;
694
695         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
696         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
697
698         ls = lkb->lkb_resource->res_ls;
699         return __put_lkb(ls, lkb);
700 }
701
702 /* This is only called to add a reference when the code already holds
703    a valid reference to the lkb, so there's no need for locking. */
704
705 static inline void hold_lkb(struct dlm_lkb *lkb)
706 {
707         kref_get(&lkb->lkb_ref);
708 }
709
710 /* This is called when we need to remove a reference and are certain
711    it's not the last ref.  e.g. del_lkb is always called between a
712    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
713    put_lkb would work fine, but would involve unnecessary locking */
714
715 static inline void unhold_lkb(struct dlm_lkb *lkb)
716 {
717         int rv;
718         rv = kref_put(&lkb->lkb_ref, kill_lkb);
719         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
720 }
721
722 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
723                             int mode)
724 {
725         struct dlm_lkb *lkb = NULL;
726
727         list_for_each_entry(lkb, head, lkb_statequeue)
728                 if (lkb->lkb_rqmode < mode)
729                         break;
730
731         if (!lkb)
732                 list_add_tail(new, head);
733         else
734                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
735 }
736
737 /* add/remove lkb to rsb's grant/convert/wait queue */
738
739 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
740 {
741         kref_get(&lkb->lkb_ref);
742
743         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
744
745         lkb->lkb_status = status;
746
747         switch (status) {
748         case DLM_LKSTS_WAITING:
749                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
750                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
751                 else
752                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
753                 break;
754         case DLM_LKSTS_GRANTED:
755                 /* convention says granted locks kept in order of grmode */
756                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
757                                 lkb->lkb_grmode);
758                 break;
759         case DLM_LKSTS_CONVERT:
760                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
761                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
762                 else
763                         list_add_tail(&lkb->lkb_statequeue,
764                                       &r->res_convertqueue);
765                 break;
766         default:
767                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
768         }
769 }
770
771 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
772 {
773         lkb->lkb_status = 0;
774         list_del(&lkb->lkb_statequeue);
775         unhold_lkb(lkb);
776 }
777
778 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
779 {
780         hold_lkb(lkb);
781         del_lkb(r, lkb);
782         add_lkb(r, lkb, sts);
783         unhold_lkb(lkb);
784 }
785
786 static int msg_reply_type(int mstype)
787 {
788         switch (mstype) {
789         case DLM_MSG_REQUEST:
790                 return DLM_MSG_REQUEST_REPLY;
791         case DLM_MSG_CONVERT:
792                 return DLM_MSG_CONVERT_REPLY;
793         case DLM_MSG_UNLOCK:
794                 return DLM_MSG_UNLOCK_REPLY;
795         case DLM_MSG_CANCEL:
796                 return DLM_MSG_CANCEL_REPLY;
797         case DLM_MSG_LOOKUP:
798                 return DLM_MSG_LOOKUP_REPLY;
799         }
800         return -1;
801 }
802
803 /* add/remove lkb from global waiters list of lkb's waiting for
804    a reply from a remote node */
805
806 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
807 {
808         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
809         int error = 0;
810
811         mutex_lock(&ls->ls_waiters_mutex);
812
813         if (is_overlap_unlock(lkb) ||
814             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
815                 error = -EINVAL;
816                 goto out;
817         }
818
819         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
820                 switch (mstype) {
821                 case DLM_MSG_UNLOCK:
822                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
823                         break;
824                 case DLM_MSG_CANCEL:
825                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
826                         break;
827                 default:
828                         error = -EBUSY;
829                         goto out;
830                 }
831                 lkb->lkb_wait_count++;
832                 hold_lkb(lkb);
833
834                 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
835                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
836                           lkb->lkb_wait_count, lkb->lkb_flags);
837                 goto out;
838         }
839
840         DLM_ASSERT(!lkb->lkb_wait_count,
841                    dlm_print_lkb(lkb);
842                    printk("wait_count %d\n", lkb->lkb_wait_count););
843
844         lkb->lkb_wait_count++;
845         lkb->lkb_wait_type = mstype;
846         hold_lkb(lkb);
847         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
848  out:
849         if (error)
850                 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
851                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
852                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
853         mutex_unlock(&ls->ls_waiters_mutex);
854         return error;
855 }
856
857 /* We clear the RESEND flag because we might be taking an lkb off the waiters
858    list as part of process_requestqueue (e.g. a lookup that has an optimized
859    request reply on the requestqueue) between dlm_recover_waiters_pre() which
860    set RESEND and dlm_recover_waiters_post() */
861
862 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
863 {
864         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
865         int overlap_done = 0;
866
867         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
868                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
869                 overlap_done = 1;
870                 goto out_del;
871         }
872
873         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
874                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
875                 overlap_done = 1;
876                 goto out_del;
877         }
878
879         /* N.B. type of reply may not always correspond to type of original
880            msg due to lookup->request optimization, verify others? */
881
882         if (lkb->lkb_wait_type) {
883                 lkb->lkb_wait_type = 0;
884                 goto out_del;
885         }
886
887         log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
888                   lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
889         return -1;
890
891  out_del:
892         /* the force-unlock/cancel has completed and we haven't recvd a reply
893            to the op that was in progress prior to the unlock/cancel; we
894            give up on any reply to the earlier op.  FIXME: not sure when/how
895            this would happen */
896
897         if (overlap_done && lkb->lkb_wait_type) {
898                 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
899                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
900                 lkb->lkb_wait_count--;
901                 lkb->lkb_wait_type = 0;
902         }
903
904         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
905
906         lkb->lkb_flags &= ~DLM_IFL_RESEND;
907         lkb->lkb_wait_count--;
908         if (!lkb->lkb_wait_count)
909                 list_del_init(&lkb->lkb_wait_reply);
910         unhold_lkb(lkb);
911         return 0;
912 }
913
914 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
915 {
916         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
917         int error;
918
919         mutex_lock(&ls->ls_waiters_mutex);
920         error = _remove_from_waiters(lkb, mstype);
921         mutex_unlock(&ls->ls_waiters_mutex);
922         return error;
923 }
924
925 /* Handles situations where we might be processing a "fake" or "stub" reply in
926    which we can't try to take waiters_mutex again. */
927
928 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
929 {
930         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
931         int error;
932
933         if (ms != &ls->ls_stub_ms)
934                 mutex_lock(&ls->ls_waiters_mutex);
935         error = _remove_from_waiters(lkb, ms->m_type);
936         if (ms != &ls->ls_stub_ms)
937                 mutex_unlock(&ls->ls_waiters_mutex);
938         return error;
939 }
940
941 static void dir_remove(struct dlm_rsb *r)
942 {
943         int to_nodeid;
944
945         if (dlm_no_directory(r->res_ls))
946                 return;
947
948         to_nodeid = dlm_dir_nodeid(r);
949         if (to_nodeid != dlm_our_nodeid())
950                 send_remove(r);
951         else
952                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
953                                      r->res_name, r->res_length);
954 }
955
956 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
957    found since they are in order of newest to oldest? */
958
959 static int shrink_bucket(struct dlm_ls *ls, int b)
960 {
961         struct dlm_rsb *r;
962         int count = 0, found;
963
964         for (;;) {
965                 found = 0;
966                 write_lock(&ls->ls_rsbtbl[b].lock);
967                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
968                                             res_hashchain) {
969                         if (!time_after_eq(jiffies, r->res_toss_time +
970                                            dlm_config.ci_toss_secs * HZ))
971                                 continue;
972                         found = 1;
973                         break;
974                 }
975
976                 if (!found) {
977                         write_unlock(&ls->ls_rsbtbl[b].lock);
978                         break;
979                 }
980
981                 if (kref_put(&r->res_ref, kill_rsb)) {
982                         list_del(&r->res_hashchain);
983                         write_unlock(&ls->ls_rsbtbl[b].lock);
984
985                         if (is_master(r))
986                                 dir_remove(r);
987                         dlm_free_rsb(r);
988                         count++;
989                 } else {
990                         write_unlock(&ls->ls_rsbtbl[b].lock);
991                         log_error(ls, "tossed rsb in use %s", r->res_name);
992                 }
993         }
994
995         return count;
996 }
997
998 void dlm_scan_rsbs(struct dlm_ls *ls)
999 {
1000         int i;
1001
1002         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1003                 shrink_bucket(ls, i);
1004                 if (dlm_locking_stopped(ls))
1005                         break;
1006                 cond_resched();
1007         }
1008 }
1009
1010 static void add_timeout(struct dlm_lkb *lkb)
1011 {
1012         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1013
1014         if (is_master_copy(lkb)) {
1015                 lkb->lkb_timestamp = jiffies;
1016                 return;
1017         }
1018
1019         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1020             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1021                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1022                 goto add_it;
1023         }
1024         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1025                 goto add_it;
1026         return;
1027
1028  add_it:
1029         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1030         mutex_lock(&ls->ls_timeout_mutex);
1031         hold_lkb(lkb);
1032         lkb->lkb_timestamp = jiffies;
1033         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1034         mutex_unlock(&ls->ls_timeout_mutex);
1035 }
1036
1037 static void del_timeout(struct dlm_lkb *lkb)
1038 {
1039         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1040
1041         mutex_lock(&ls->ls_timeout_mutex);
1042         if (!list_empty(&lkb->lkb_time_list)) {
1043                 list_del_init(&lkb->lkb_time_list);
1044                 unhold_lkb(lkb);
1045         }
1046         mutex_unlock(&ls->ls_timeout_mutex);
1047 }
1048
1049 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1050    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1051    and then lock rsb because of lock ordering in add_timeout.  We may need
1052    to specify some special timeout-related bits in the lkb that are just to
1053    be accessed under the timeout_mutex. */
1054
1055 void dlm_scan_timeout(struct dlm_ls *ls)
1056 {
1057         struct dlm_rsb *r;
1058         struct dlm_lkb *lkb;
1059         int do_cancel, do_warn;
1060
1061         for (;;) {
1062                 if (dlm_locking_stopped(ls))
1063                         break;
1064
1065                 do_cancel = 0;
1066                 do_warn = 0;
1067                 mutex_lock(&ls->ls_timeout_mutex);
1068                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1069
1070                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1071                             time_after_eq(jiffies, lkb->lkb_timestamp +
1072                                           lkb->lkb_timeout_cs * HZ/100))
1073                                 do_cancel = 1;
1074
1075                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1076                             time_after_eq(jiffies, lkb->lkb_timestamp +
1077                                            dlm_config.ci_timewarn_cs * HZ/100))
1078                                 do_warn = 1;
1079
1080                         if (!do_cancel && !do_warn)
1081                                 continue;
1082                         hold_lkb(lkb);
1083                         break;
1084                 }
1085                 mutex_unlock(&ls->ls_timeout_mutex);
1086
1087                 if (!do_cancel && !do_warn)
1088                         break;
1089
1090                 r = lkb->lkb_resource;
1091                 hold_rsb(r);
1092                 lock_rsb(r);
1093
1094                 if (do_warn) {
1095                         /* clear flag so we only warn once */
1096                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1097                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1098                                 del_timeout(lkb);
1099                         dlm_timeout_warn(lkb);
1100                 }
1101
1102                 if (do_cancel) {
1103                         log_debug(ls, "timeout cancel %x node %d %s",
1104                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1105                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1106                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1107                         del_timeout(lkb);
1108                         _cancel_lock(r, lkb);
1109                 }
1110
1111                 unlock_rsb(r);
1112                 unhold_rsb(r);
1113                 dlm_put_lkb(lkb);
1114         }
1115 }
1116
1117 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1118    dlm_recoverd before checking/setting ls_recover_begin. */
1119
1120 void dlm_adjust_timeouts(struct dlm_ls *ls)
1121 {
1122         struct dlm_lkb *lkb;
1123         long adj = jiffies - ls->ls_recover_begin;
1124
1125         ls->ls_recover_begin = 0;
1126         mutex_lock(&ls->ls_timeout_mutex);
1127         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1128                 lkb->lkb_timestamp += adj;
1129         mutex_unlock(&ls->ls_timeout_mutex);
1130 }
1131
1132 /* lkb is master or local copy */
1133
1134 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1135 {
1136         int b, len = r->res_ls->ls_lvblen;
1137
1138         /* b=1 lvb returned to caller
1139            b=0 lvb written to rsb or invalidated
1140            b=-1 do nothing */
1141
1142         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1143
1144         if (b == 1) {
1145                 if (!lkb->lkb_lvbptr)
1146                         return;
1147
1148                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1149                         return;
1150
1151                 if (!r->res_lvbptr)
1152                         return;
1153
1154                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1155                 lkb->lkb_lvbseq = r->res_lvbseq;
1156
1157         } else if (b == 0) {
1158                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1159                         rsb_set_flag(r, RSB_VALNOTVALID);
1160                         return;
1161                 }
1162
1163                 if (!lkb->lkb_lvbptr)
1164                         return;
1165
1166                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1167                         return;
1168
1169                 if (!r->res_lvbptr)
1170                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1171
1172                 if (!r->res_lvbptr)
1173                         return;
1174
1175                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1176                 r->res_lvbseq++;
1177                 lkb->lkb_lvbseq = r->res_lvbseq;
1178                 rsb_clear_flag(r, RSB_VALNOTVALID);
1179         }
1180
1181         if (rsb_flag(r, RSB_VALNOTVALID))
1182                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1183 }
1184
1185 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1186 {
1187         if (lkb->lkb_grmode < DLM_LOCK_PW)
1188                 return;
1189
1190         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1191                 rsb_set_flag(r, RSB_VALNOTVALID);
1192                 return;
1193         }
1194
1195         if (!lkb->lkb_lvbptr)
1196                 return;
1197
1198         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1199                 return;
1200
1201         if (!r->res_lvbptr)
1202                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1203
1204         if (!r->res_lvbptr)
1205                 return;
1206
1207         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1208         r->res_lvbseq++;
1209         rsb_clear_flag(r, RSB_VALNOTVALID);
1210 }
1211
1212 /* lkb is process copy (pc) */
1213
1214 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1215                             struct dlm_message *ms)
1216 {
1217         int b;
1218
1219         if (!lkb->lkb_lvbptr)
1220                 return;
1221
1222         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1223                 return;
1224
1225         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1226         if (b == 1) {
1227                 int len = receive_extralen(ms);
1228                 if (len > DLM_RESNAME_MAXLEN)
1229                         len = DLM_RESNAME_MAXLEN;
1230                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1231                 lkb->lkb_lvbseq = ms->m_lvbseq;
1232         }
1233 }
1234
1235 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1236    remove_lock -- used for unlock, removes lkb from granted
1237    revert_lock -- used for cancel, moves lkb from convert to granted
1238    grant_lock  -- used for request and convert, adds lkb to granted or
1239                   moves lkb from convert or waiting to granted
1240
1241    Each of these is used for master or local copy lkb's.  There is
1242    also a _pc() variation used to make the corresponding change on
1243    a process copy (pc) lkb. */
1244
1245 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1246 {
1247         del_lkb(r, lkb);
1248         lkb->lkb_grmode = DLM_LOCK_IV;
1249         /* this unhold undoes the original ref from create_lkb()
1250            so this leads to the lkb being freed */
1251         unhold_lkb(lkb);
1252 }
1253
1254 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1255 {
1256         set_lvb_unlock(r, lkb);
1257         _remove_lock(r, lkb);
1258 }
1259
1260 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1261 {
1262         _remove_lock(r, lkb);
1263 }
1264
1265 /* returns: 0 did nothing
1266             1 moved lock to granted
1267            -1 removed lock */
1268
1269 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1270 {
1271         int rv = 0;
1272
1273         lkb->lkb_rqmode = DLM_LOCK_IV;
1274
1275         switch (lkb->lkb_status) {
1276         case DLM_LKSTS_GRANTED:
1277                 break;
1278         case DLM_LKSTS_CONVERT:
1279                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1280                 rv = 1;
1281                 break;
1282         case DLM_LKSTS_WAITING:
1283                 del_lkb(r, lkb);
1284                 lkb->lkb_grmode = DLM_LOCK_IV;
1285                 /* this unhold undoes the original ref from create_lkb()
1286                    so this leads to the lkb being freed */
1287                 unhold_lkb(lkb);
1288                 rv = -1;
1289                 break;
1290         default:
1291                 log_print("invalid status for revert %d", lkb->lkb_status);
1292         }
1293         return rv;
1294 }
1295
1296 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1297 {
1298         return revert_lock(r, lkb);
1299 }
1300
1301 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1302 {
1303         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1304                 lkb->lkb_grmode = lkb->lkb_rqmode;
1305                 if (lkb->lkb_status)
1306                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1307                 else
1308                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1309         }
1310
1311         lkb->lkb_rqmode = DLM_LOCK_IV;
1312 }
1313
1314 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1315 {
1316         set_lvb_lock(r, lkb);
1317         _grant_lock(r, lkb);
1318         lkb->lkb_highbast = 0;
1319 }
1320
1321 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1322                           struct dlm_message *ms)
1323 {
1324         set_lvb_lock_pc(r, lkb, ms);
1325         _grant_lock(r, lkb);
1326 }
1327
1328 /* called by grant_pending_locks() which means an async grant message must
1329    be sent to the requesting node in addition to granting the lock if the
1330    lkb belongs to a remote node. */
1331
1332 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1333 {
1334         grant_lock(r, lkb);
1335         if (is_master_copy(lkb))
1336                 send_grant(r, lkb);
1337         else
1338                 queue_cast(r, lkb, 0);
1339 }
1340
1341 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1342    change the granted/requested modes.  We're munging things accordingly in
1343    the process copy.
1344    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1345    conversion deadlock
1346    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1347    compatible with other granted locks */
1348
1349 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1350 {
1351         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1352                 log_print("munge_demoted %x invalid reply type %d",
1353                           lkb->lkb_id, ms->m_type);
1354                 return;
1355         }
1356
1357         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1358                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1359                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1360                 return;
1361         }
1362
1363         lkb->lkb_grmode = DLM_LOCK_NL;
1364 }
1365
1366 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1367 {
1368         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1369             ms->m_type != DLM_MSG_GRANT) {
1370                 log_print("munge_altmode %x invalid reply type %d",
1371                           lkb->lkb_id, ms->m_type);
1372                 return;
1373         }
1374
1375         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1376                 lkb->lkb_rqmode = DLM_LOCK_PR;
1377         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1378                 lkb->lkb_rqmode = DLM_LOCK_CW;
1379         else {
1380                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1381                 dlm_print_lkb(lkb);
1382         }
1383 }
1384
1385 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1386 {
1387         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1388                                            lkb_statequeue);
1389         if (lkb->lkb_id == first->lkb_id)
1390                 return 1;
1391
1392         return 0;
1393 }
1394
1395 /* Check if the given lkb conflicts with another lkb on the queue. */
1396
1397 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1398 {
1399         struct dlm_lkb *this;
1400
1401         list_for_each_entry(this, head, lkb_statequeue) {
1402                 if (this == lkb)
1403                         continue;
1404                 if (!modes_compat(this, lkb))
1405                         return 1;
1406         }
1407         return 0;
1408 }
1409
1410 /*
1411  * "A conversion deadlock arises with a pair of lock requests in the converting
1412  * queue for one resource.  The granted mode of each lock blocks the requested
1413  * mode of the other lock."
1414  *
1415  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1416  * convert queue from being granted, then deadlk/demote lkb.
1417  *
1418  * Example:
1419  * Granted Queue: empty
1420  * Convert Queue: NL->EX (first lock)
1421  *                PR->EX (second lock)
1422  *
1423  * The first lock can't be granted because of the granted mode of the second
1424  * lock and the second lock can't be granted because it's not first in the
1425  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1426  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1427  * flag set and return DEMOTED in the lksb flags.
1428  *
1429  * Originally, this function detected conv-deadlk in a more limited scope:
1430  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1431  * - if lkb1 was the first entry in the queue (not just earlier), and was
1432  *   blocked by the granted mode of lkb2, and there was nothing on the
1433  *   granted queue preventing lkb1 from being granted immediately, i.e.
1434  *   lkb2 was the only thing preventing lkb1 from being granted.
1435  *
1436  * That second condition meant we'd only say there was conv-deadlk if
1437  * resolving it (by demotion) would lead to the first lock on the convert
1438  * queue being granted right away.  It allowed conversion deadlocks to exist
1439  * between locks on the convert queue while they couldn't be granted anyway.
1440  *
1441  * Now, we detect and take action on conversion deadlocks immediately when
1442  * they're created, even if they may not be immediately consequential.  If
1443  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1444  * mode that would prevent lkb1's conversion from being granted, we do a
1445  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1446  * I think this means that the lkb_is_ahead condition below should always
1447  * be zero, i.e. there will never be conv-deadlk between two locks that are
1448  * both already on the convert queue.
1449  */
1450
1451 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1452 {
1453         struct dlm_lkb *lkb1;
1454         int lkb_is_ahead = 0;
1455
1456         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1457                 if (lkb1 == lkb2) {
1458                         lkb_is_ahead = 1;
1459                         continue;
1460                 }
1461
1462                 if (!lkb_is_ahead) {
1463                         if (!modes_compat(lkb2, lkb1))
1464                                 return 1;
1465                 } else {
1466                         if (!modes_compat(lkb2, lkb1) &&
1467                             !modes_compat(lkb1, lkb2))
1468                                 return 1;
1469                 }
1470         }
1471         return 0;
1472 }
1473
1474 /*
1475  * Return 1 if the lock can be granted, 0 otherwise.
1476  * Also detect and resolve conversion deadlocks.
1477  *
1478  * lkb is the lock to be granted
1479  *
1480  * now is 1 if the function is being called in the context of the
1481  * immediate request, it is 0 if called later, after the lock has been
1482  * queued.
1483  *
1484  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1485  */
1486
1487 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1488 {
1489         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1490
1491         /*
1492          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1493          * a new request for a NL mode lock being blocked.
1494          *
1495          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1496          * request, then it would be granted.  In essence, the use of this flag
1497          * tells the Lock Manager to expedite theis request by not considering
1498          * what may be in the CONVERTING or WAITING queues...  As of this
1499          * writing, the EXPEDITE flag can be used only with new requests for NL
1500          * mode locks.  This flag is not valid for conversion requests.
1501          *
1502          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1503          * conversion or used with a non-NL requested mode.  We also know an
1504          * EXPEDITE request is always granted immediately, so now must always
1505          * be 1.  The full condition to grant an expedite request: (now &&
1506          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1507          * therefore be shortened to just checking the flag.
1508          */
1509
1510         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1511                 return 1;
1512
1513         /*
1514          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1515          * added to the remaining conditions.
1516          */
1517
1518         if (queue_conflict(&r->res_grantqueue, lkb))
1519                 goto out;
1520
1521         /*
1522          * 6-3: By default, a conversion request is immediately granted if the
1523          * requested mode is compatible with the modes of all other granted
1524          * locks
1525          */
1526
1527         if (queue_conflict(&r->res_convertqueue, lkb))
1528                 goto out;
1529
1530         /*
1531          * 6-5: But the default algorithm for deciding whether to grant or
1532          * queue conversion requests does not by itself guarantee that such
1533          * requests are serviced on a "first come first serve" basis.  This, in
1534          * turn, can lead to a phenomenon known as "indefinate postponement".
1535          *
1536          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1537          * the system service employed to request a lock conversion.  This flag
1538          * forces certain conversion requests to be queued, even if they are
1539          * compatible with the granted modes of other locks on the same
1540          * resource.  Thus, the use of this flag results in conversion requests
1541          * being ordered on a "first come first servce" basis.
1542          *
1543          * DCT: This condition is all about new conversions being able to occur
1544          * "in place" while the lock remains on the granted queue (assuming
1545          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1546          * doesn't _have_ to go onto the convert queue where it's processed in
1547          * order.  The "now" variable is necessary to distinguish converts
1548          * being received and processed for the first time now, because once a
1549          * convert is moved to the conversion queue the condition below applies
1550          * requiring fifo granting.
1551          */
1552
1553         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1554                 return 1;
1555
1556         /*
1557          * The NOORDER flag is set to avoid the standard vms rules on grant
1558          * order.
1559          */
1560
1561         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1562                 return 1;
1563
1564         /*
1565          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1566          * granted until all other conversion requests ahead of it are granted
1567          * and/or canceled.
1568          */
1569
1570         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1571                 return 1;
1572
1573         /*
1574          * 6-4: By default, a new request is immediately granted only if all
1575          * three of the following conditions are satisfied when the request is
1576          * issued:
1577          * - The queue of ungranted conversion requests for the resource is
1578          *   empty.
1579          * - The queue of ungranted new requests for the resource is empty.
1580          * - The mode of the new request is compatible with the most
1581          *   restrictive mode of all granted locks on the resource.
1582          */
1583
1584         if (now && !conv && list_empty(&r->res_convertqueue) &&
1585             list_empty(&r->res_waitqueue))
1586                 return 1;
1587
1588         /*
1589          * 6-4: Once a lock request is in the queue of ungranted new requests,
1590          * it cannot be granted until the queue of ungranted conversion
1591          * requests is empty, all ungranted new requests ahead of it are
1592          * granted and/or canceled, and it is compatible with the granted mode
1593          * of the most restrictive lock granted on the resource.
1594          */
1595
1596         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1597             first_in_list(lkb, &r->res_waitqueue))
1598                 return 1;
1599  out:
1600         return 0;
1601 }
1602
1603 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1604                           int *err)
1605 {
1606         int rv;
1607         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1608         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1609
1610         if (err)
1611                 *err = 0;
1612
1613         rv = _can_be_granted(r, lkb, now);
1614         if (rv)
1615                 goto out;
1616
1617         /*
1618          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1619          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1620          * cancels one of the locks.
1621          */
1622
1623         if (is_convert && can_be_queued(lkb) &&
1624             conversion_deadlock_detect(r, lkb)) {
1625                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1626                         lkb->lkb_grmode = DLM_LOCK_NL;
1627                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1628                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1629                         if (err)
1630                                 *err = -EDEADLK;
1631                         else {
1632                                 log_print("can_be_granted deadlock %x now %d",
1633                                           lkb->lkb_id, now);
1634                                 dlm_dump_rsb(r);
1635                         }
1636                 }
1637                 goto out;
1638         }
1639
1640         /*
1641          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1642          * to grant a request in a mode other than the normal rqmode.  It's a
1643          * simple way to provide a big optimization to applications that can
1644          * use them.
1645          */
1646
1647         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1648                 alt = DLM_LOCK_PR;
1649         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1650                 alt = DLM_LOCK_CW;
1651
1652         if (alt) {
1653                 lkb->lkb_rqmode = alt;
1654                 rv = _can_be_granted(r, lkb, now);
1655                 if (rv)
1656                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1657                 else
1658                         lkb->lkb_rqmode = rqmode;
1659         }
1660  out:
1661         return rv;
1662 }
1663
1664 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1665    for locks pending on the convert list.  Once verified (watch for these
1666    log_prints), we should be able to just call _can_be_granted() and not
1667    bother with the demote/deadlk cases here (and there's no easy way to deal
1668    with a deadlk here, we'd have to generate something like grant_lock with
1669    the deadlk error.) */
1670
1671 /* Returns the highest requested mode of all blocked conversions; sets
1672    cw if there's a blocked conversion to DLM_LOCK_CW. */
1673
1674 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1675 {
1676         struct dlm_lkb *lkb, *s;
1677         int hi, demoted, quit, grant_restart, demote_restart;
1678         int deadlk;
1679
1680         quit = 0;
1681  restart:
1682         grant_restart = 0;
1683         demote_restart = 0;
1684         hi = DLM_LOCK_IV;
1685
1686         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1687                 demoted = is_demoted(lkb);
1688                 deadlk = 0;
1689
1690                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1691                         grant_lock_pending(r, lkb);
1692                         grant_restart = 1;
1693                         continue;
1694                 }
1695
1696                 if (!demoted && is_demoted(lkb)) {
1697                         log_print("WARN: pending demoted %x node %d %s",
1698                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1699                         demote_restart = 1;
1700                         continue;
1701                 }
1702
1703                 if (deadlk) {
1704                         log_print("WARN: pending deadlock %x node %d %s",
1705                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1706                         dlm_dump_rsb(r);
1707                         continue;
1708                 }
1709
1710                 hi = max_t(int, lkb->lkb_rqmode, hi);
1711
1712                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1713                         *cw = 1;
1714         }
1715
1716         if (grant_restart)
1717                 goto restart;
1718         if (demote_restart && !quit) {
1719                 quit = 1;
1720                 goto restart;
1721         }
1722
1723         return max_t(int, high, hi);
1724 }
1725
1726 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1727 {
1728         struct dlm_lkb *lkb, *s;
1729
1730         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1731                 if (can_be_granted(r, lkb, 0, NULL))
1732                         grant_lock_pending(r, lkb);
1733                 else {
1734                         high = max_t(int, lkb->lkb_rqmode, high);
1735                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1736                                 *cw = 1;
1737                 }
1738         }
1739
1740         return high;
1741 }
1742
1743 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1744    on either the convert or waiting queue.
1745    high is the largest rqmode of all locks blocked on the convert or
1746    waiting queue. */
1747
1748 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1749 {
1750         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1751                 if (gr->lkb_highbast < DLM_LOCK_EX)
1752                         return 1;
1753                 return 0;
1754         }
1755
1756         if (gr->lkb_highbast < high &&
1757             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1758                 return 1;
1759         return 0;
1760 }
1761
1762 static void grant_pending_locks(struct dlm_rsb *r)
1763 {
1764         struct dlm_lkb *lkb, *s;
1765         int high = DLM_LOCK_IV;
1766         int cw = 0;
1767
1768         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1769
1770         high = grant_pending_convert(r, high, &cw);
1771         high = grant_pending_wait(r, high, &cw);
1772
1773         if (high == DLM_LOCK_IV)
1774                 return;
1775
1776         /*
1777          * If there are locks left on the wait/convert queue then send blocking
1778          * ASTs to granted locks based on the largest requested mode (high)
1779          * found above.
1780          */
1781
1782         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1783                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1784                         if (cw && high == DLM_LOCK_PR &&
1785                             lkb->lkb_grmode == DLM_LOCK_PR)
1786                                 queue_bast(r, lkb, DLM_LOCK_CW);
1787                         else
1788                                 queue_bast(r, lkb, high);
1789                         lkb->lkb_highbast = high;
1790                 }
1791         }
1792 }
1793
1794 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1795 {
1796         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1797             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1798                 if (gr->lkb_highbast < DLM_LOCK_EX)
1799                         return 1;
1800                 return 0;
1801         }
1802
1803         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1804                 return 1;
1805         return 0;
1806 }
1807
1808 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1809                             struct dlm_lkb *lkb)
1810 {
1811         struct dlm_lkb *gr;
1812
1813         list_for_each_entry(gr, head, lkb_statequeue) {
1814                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1815                         queue_bast(r, gr, lkb->lkb_rqmode);
1816                         gr->lkb_highbast = lkb->lkb_rqmode;
1817                 }
1818         }
1819 }
1820
1821 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1822 {
1823         send_bast_queue(r, &r->res_grantqueue, lkb);
1824 }
1825
1826 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1827 {
1828         send_bast_queue(r, &r->res_grantqueue, lkb);
1829         send_bast_queue(r, &r->res_convertqueue, lkb);
1830 }
1831
1832 /* set_master(r, lkb) -- set the master nodeid of a resource
1833
1834    The purpose of this function is to set the nodeid field in the given
1835    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1836    known, it can just be copied to the lkb and the function will return
1837    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1838    before it can be copied to the lkb.
1839
1840    When the rsb nodeid is being looked up remotely, the initial lkb
1841    causing the lookup is kept on the ls_waiters list waiting for the
1842    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1843    on the rsb's res_lookup list until the master is verified.
1844
1845    Return values:
1846    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1847    1: the rsb master is not available and the lkb has been placed on
1848       a wait queue
1849 */
1850
1851 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1852 {
1853         struct dlm_ls *ls = r->res_ls;
1854         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1855
1856         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1857                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1858                 r->res_first_lkid = lkb->lkb_id;
1859                 lkb->lkb_nodeid = r->res_nodeid;
1860                 return 0;
1861         }
1862
1863         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1864                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1865                 return 1;
1866         }
1867
1868         if (r->res_nodeid == 0) {
1869                 lkb->lkb_nodeid = 0;
1870                 return 0;
1871         }
1872
1873         if (r->res_nodeid > 0) {
1874                 lkb->lkb_nodeid = r->res_nodeid;
1875                 return 0;
1876         }
1877
1878         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1879
1880         dir_nodeid = dlm_dir_nodeid(r);
1881
1882         if (dir_nodeid != our_nodeid) {
1883                 r->res_first_lkid = lkb->lkb_id;
1884                 send_lookup(r, lkb);
1885                 return 1;
1886         }
1887
1888         for (i = 0; i < 2; i++) {
1889                 /* It's possible for dlm_scand to remove an old rsb for
1890                    this same resource from the toss list, us to create
1891                    a new one, look up the master locally, and find it
1892                    already exists just before dlm_scand does the
1893                    dir_remove() on the previous rsb. */
1894
1895                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1896                                        r->res_length, &ret_nodeid);
1897                 if (!error)
1898                         break;
1899                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1900                 schedule();
1901         }
1902         if (error && error != -EEXIST)
1903                 return error;
1904
1905         if (ret_nodeid == our_nodeid) {
1906                 r->res_first_lkid = 0;
1907                 r->res_nodeid = 0;
1908                 lkb->lkb_nodeid = 0;
1909         } else {
1910                 r->res_first_lkid = lkb->lkb_id;
1911                 r->res_nodeid = ret_nodeid;
1912                 lkb->lkb_nodeid = ret_nodeid;
1913         }
1914         return 0;
1915 }
1916
1917 static void process_lookup_list(struct dlm_rsb *r)
1918 {
1919         struct dlm_lkb *lkb, *safe;
1920
1921         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1922                 list_del_init(&lkb->lkb_rsb_lookup);
1923                 _request_lock(r, lkb);
1924                 schedule();
1925         }
1926 }
1927
1928 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1929
1930 static void confirm_master(struct dlm_rsb *r, int error)
1931 {
1932         struct dlm_lkb *lkb;
1933
1934         if (!r->res_first_lkid)
1935                 return;
1936
1937         switch (error) {
1938         case 0:
1939         case -EINPROGRESS:
1940                 r->res_first_lkid = 0;
1941                 process_lookup_list(r);
1942                 break;
1943
1944         case -EAGAIN:
1945         case -EBADR:
1946         case -ENOTBLK:
1947                 /* the remote request failed and won't be retried (it was
1948                    a NOQUEUE, or has been canceled/unlocked); make a waiting
1949                    lkb the first_lkid */
1950
1951                 r->res_first_lkid = 0;
1952
1953                 if (!list_empty(&r->res_lookup)) {
1954                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1955                                          lkb_rsb_lookup);
1956                         list_del_init(&lkb->lkb_rsb_lookup);
1957                         r->res_first_lkid = lkb->lkb_id;
1958                         _request_lock(r, lkb);
1959                 }
1960                 break;
1961
1962         default:
1963                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1964         }
1965 }
1966
1967 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1968                          int namelen, unsigned long timeout_cs,
1969                          void (*ast) (void *astparam),
1970                          void *astparam,
1971                          void (*bast) (void *astparam, int mode),
1972                          struct dlm_args *args)
1973 {
1974         int rv = -EINVAL;
1975
1976         /* check for invalid arg usage */
1977
1978         if (mode < 0 || mode > DLM_LOCK_EX)
1979                 goto out;
1980
1981         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1982                 goto out;
1983
1984         if (flags & DLM_LKF_CANCEL)
1985                 goto out;
1986
1987         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1988                 goto out;
1989
1990         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1991                 goto out;
1992
1993         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1994                 goto out;
1995
1996         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1997                 goto out;
1998
1999         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2000                 goto out;
2001
2002         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2003                 goto out;
2004
2005         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2006                 goto out;
2007
2008         if (!ast || !lksb)
2009                 goto out;
2010
2011         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2012                 goto out;
2013
2014         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2015                 goto out;
2016
2017         /* these args will be copied to the lkb in validate_lock_args,
2018            it cannot be done now because when converting locks, fields in
2019            an active lkb cannot be modified before locking the rsb */
2020
2021         args->flags = flags;
2022         args->astfn = ast;
2023         args->astparam = astparam;
2024         args->bastfn = bast;
2025         args->timeout = timeout_cs;
2026         args->mode = mode;
2027         args->lksb = lksb;
2028         rv = 0;
2029  out:
2030         return rv;
2031 }
2032
2033 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2034 {
2035         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2036                       DLM_LKF_FORCEUNLOCK))
2037                 return -EINVAL;
2038
2039         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2040                 return -EINVAL;
2041
2042         args->flags = flags;
2043         args->astparam = astarg;
2044         return 0;
2045 }
2046
2047 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2048                               struct dlm_args *args)
2049 {
2050         int rv = -EINVAL;
2051
2052         if (args->flags & DLM_LKF_CONVERT) {
2053                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2054                         goto out;
2055
2056                 if (args->flags & DLM_LKF_QUECVT &&
2057                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2058                         goto out;
2059
2060                 rv = -EBUSY;
2061                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2062                         goto out;
2063
2064                 if (lkb->lkb_wait_type)
2065                         goto out;
2066
2067                 if (is_overlap(lkb))
2068                         goto out;
2069         }
2070
2071         lkb->lkb_exflags = args->flags;
2072         lkb->lkb_sbflags = 0;
2073         lkb->lkb_astfn = args->astfn;
2074         lkb->lkb_astparam = args->astparam;
2075         lkb->lkb_bastfn = args->bastfn;
2076         lkb->lkb_rqmode = args->mode;
2077         lkb->lkb_lksb = args->lksb;
2078         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2079         lkb->lkb_ownpid = (int) current->pid;
2080         lkb->lkb_timeout_cs = args->timeout;
2081         rv = 0;
2082  out:
2083         return rv;
2084 }
2085
2086 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2087    for success */
2088
2089 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2090    because there may be a lookup in progress and it's valid to do
2091    cancel/unlockf on it */
2092
2093 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2094 {
2095         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2096         int rv = -EINVAL;
2097
2098         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2099                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2100                 dlm_print_lkb(lkb);
2101                 goto out;
2102         }
2103
2104         /* an lkb may still exist even though the lock is EOL'ed due to a
2105            cancel, unlock or failed noqueue request; an app can't use these
2106            locks; return same error as if the lkid had not been found at all */
2107
2108         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2109                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2110                 rv = -ENOENT;
2111                 goto out;
2112         }
2113
2114         /* an lkb may be waiting for an rsb lookup to complete where the
2115            lookup was initiated by another lock */
2116
2117         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2118                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2119                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2120                         list_del_init(&lkb->lkb_rsb_lookup);
2121                         queue_cast(lkb->lkb_resource, lkb,
2122                                    args->flags & DLM_LKF_CANCEL ?
2123                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2124                         unhold_lkb(lkb); /* undoes create_lkb() */
2125                 }
2126                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2127                 rv = -EBUSY;
2128                 goto out;
2129         }
2130
2131         /* cancel not allowed with another cancel/unlock in progress */
2132
2133         if (args->flags & DLM_LKF_CANCEL) {
2134                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2135                         goto out;
2136
2137                 if (is_overlap(lkb))
2138                         goto out;
2139
2140                 /* don't let scand try to do a cancel */
2141                 del_timeout(lkb);
2142
2143                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2144                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2145                         rv = -EBUSY;
2146                         goto out;
2147                 }
2148
2149                 switch (lkb->lkb_wait_type) {
2150                 case DLM_MSG_LOOKUP:
2151                 case DLM_MSG_REQUEST:
2152                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2153                         rv = -EBUSY;
2154                         goto out;
2155                 case DLM_MSG_UNLOCK:
2156                 case DLM_MSG_CANCEL:
2157                         goto out;
2158                 }
2159                 /* add_to_waiters() will set OVERLAP_CANCEL */
2160                 goto out_ok;
2161         }
2162
2163         /* do we need to allow a force-unlock if there's a normal unlock
2164            already in progress?  in what conditions could the normal unlock
2165            fail such that we'd want to send a force-unlock to be sure? */
2166
2167         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2168                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2169                         goto out;
2170
2171                 if (is_overlap_unlock(lkb))
2172                         goto out;
2173
2174                 /* don't let scand try to do a cancel */
2175                 del_timeout(lkb);
2176
2177                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2178                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2179                         rv = -EBUSY;
2180                         goto out;
2181                 }
2182
2183                 switch (lkb->lkb_wait_type) {
2184                 case DLM_MSG_LOOKUP:
2185                 case DLM_MSG_REQUEST:
2186                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2187                         rv = -EBUSY;
2188                         goto out;
2189                 case DLM_MSG_UNLOCK:
2190                         goto out;
2191                 }
2192                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2193                 goto out_ok;
2194         }
2195
2196         /* normal unlock not allowed if there's any op in progress */
2197         rv = -EBUSY;
2198         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2199                 goto out;
2200
2201  out_ok:
2202         /* an overlapping op shouldn't blow away exflags from other op */
2203         lkb->lkb_exflags |= args->flags;
2204         lkb->lkb_sbflags = 0;
2205         lkb->lkb_astparam = args->astparam;
2206         rv = 0;
2207  out:
2208         if (rv)
2209                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2210                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2211                           args->flags, lkb->lkb_wait_type,
2212                           lkb->lkb_resource->res_name);
2213         return rv;
2214 }
2215
2216 /*
2217  * Four stage 4 varieties:
2218  * do_request(), do_convert(), do_unlock(), do_cancel()
2219  * These are called on the master node for the given lock and
2220  * from the central locking logic.
2221  */
2222
2223 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2224 {
2225         int error = 0;
2226
2227         if (can_be_granted(r, lkb, 1, NULL)) {
2228                 grant_lock(r, lkb);
2229                 queue_cast(r, lkb, 0);
2230                 goto out;
2231         }
2232
2233         if (can_be_queued(lkb)) {
2234                 error = -EINPROGRESS;
2235                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2236                 send_blocking_asts(r, lkb);
2237                 add_timeout(lkb);
2238                 goto out;
2239         }
2240
2241         error = -EAGAIN;
2242         if (force_blocking_asts(lkb))
2243                 send_blocking_asts_all(r, lkb);
2244         queue_cast(r, lkb, -EAGAIN);
2245
2246  out:
2247         return error;
2248 }
2249
2250 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2251 {
2252         int error = 0;
2253         int deadlk = 0;
2254
2255         /* changing an existing lock may allow others to be granted */
2256
2257         if (can_be_granted(r, lkb, 1, &deadlk)) {
2258                 grant_lock(r, lkb);
2259                 queue_cast(r, lkb, 0);
2260                 grant_pending_locks(r);
2261                 goto out;
2262         }
2263
2264         /* can_be_granted() detected that this lock would block in a conversion
2265            deadlock, so we leave it on the granted queue and return EDEADLK in
2266            the ast for the convert. */
2267
2268         if (deadlk) {
2269                 /* it's left on the granted queue */
2270                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2271                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2272                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2273                 revert_lock(r, lkb);
2274                 queue_cast(r, lkb, -EDEADLK);
2275                 error = -EDEADLK;
2276                 goto out;
2277         }
2278
2279         /* is_demoted() means the can_be_granted() above set the grmode
2280            to NL, and left us on the granted queue.  This auto-demotion
2281            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2282            now grantable.  We have to try to grant other converting locks
2283            before we try again to grant this one. */
2284
2285         if (is_demoted(lkb)) {
2286                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2287                 if (_can_be_granted(r, lkb, 1)) {
2288                         grant_lock(r, lkb);
2289                         queue_cast(r, lkb, 0);
2290                         grant_pending_locks(r);
2291                         goto out;
2292                 }
2293                 /* else fall through and move to convert queue */
2294         }
2295
2296         if (can_be_queued(lkb)) {
2297                 error = -EINPROGRESS;
2298                 del_lkb(r, lkb);
2299                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2300                 send_blocking_asts(r, lkb);
2301                 add_timeout(lkb);
2302                 goto out;
2303         }
2304
2305         error = -EAGAIN;
2306         if (force_blocking_asts(lkb))
2307                 send_blocking_asts_all(r, lkb);
2308         queue_cast(r, lkb, -EAGAIN);
2309
2310  out:
2311         return error;
2312 }
2313
2314 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2315 {
2316         remove_lock(r, lkb);
2317         queue_cast(r, lkb, -DLM_EUNLOCK);
2318         grant_pending_locks(r);
2319         return -DLM_EUNLOCK;
2320 }
2321
2322 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2323  
2324 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2325 {
2326         int error;
2327
2328         error = revert_lock(r, lkb);
2329         if (error) {
2330                 queue_cast(r, lkb, -DLM_ECANCEL);
2331                 grant_pending_locks(r);
2332                 return -DLM_ECANCEL;
2333         }
2334         return 0;
2335 }
2336
2337 /*
2338  * Four stage 3 varieties:
2339  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2340  */
2341
2342 /* add a new lkb to a possibly new rsb, called by requesting process */
2343
2344 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2345 {
2346         int error;
2347
2348         /* set_master: sets lkb nodeid from r */
2349
2350         error = set_master(r, lkb);
2351         if (error < 0)
2352                 goto out;
2353         if (error) {
2354                 error = 0;
2355                 goto out;
2356         }
2357
2358         if (is_remote(r))
2359                 /* receive_request() calls do_request() on remote node */
2360                 error = send_request(r, lkb);
2361         else
2362                 error = do_request(r, lkb);
2363  out:
2364         return error;
2365 }
2366
2367 /* change some property of an existing lkb, e.g. mode */
2368
2369 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2370 {
2371         int error;
2372
2373         if (is_remote(r))
2374                 /* receive_convert() calls do_convert() on remote node */
2375                 error = send_convert(r, lkb);
2376         else
2377                 error = do_convert(r, lkb);
2378
2379         return error;
2380 }
2381
2382 /* remove an existing lkb from the granted queue */
2383
2384 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2385 {
2386         int error;
2387
2388         if (is_remote(r))
2389                 /* receive_unlock() calls do_unlock() on remote node */
2390                 error = send_unlock(r, lkb);
2391         else
2392                 error = do_unlock(r, lkb);
2393
2394         return error;
2395 }
2396
2397 /* remove an existing lkb from the convert or wait queue */
2398
2399 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2400 {
2401         int error;
2402
2403         if (is_remote(r))
2404                 /* receive_cancel() calls do_cancel() on remote node */
2405                 error = send_cancel(r, lkb);
2406         else
2407                 error = do_cancel(r, lkb);
2408
2409         return error;
2410 }
2411
2412 /*
2413  * Four stage 2 varieties:
2414  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2415  */
2416
2417 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2418                         int len, struct dlm_args *args)
2419 {
2420         struct dlm_rsb *r;
2421         int error;
2422
2423         error = validate_lock_args(ls, lkb, args);
2424         if (error)
2425                 goto out;
2426
2427         error = find_rsb(ls, name, len, R_CREATE, &r);
2428         if (error)
2429                 goto out;
2430
2431         lock_rsb(r);
2432
2433         attach_lkb(r, lkb);
2434         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2435
2436         error = _request_lock(r, lkb);
2437
2438         unlock_rsb(r);
2439         put_rsb(r);
2440
2441  out:
2442         return error;
2443 }
2444
2445 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2446                         struct dlm_args *args)
2447 {
2448         struct dlm_rsb *r;
2449         int error;
2450
2451         r = lkb->lkb_resource;
2452
2453         hold_rsb(r);
2454         lock_rsb(r);
2455
2456         error = validate_lock_args(ls, lkb, args);
2457         if (error)
2458                 goto out;
2459
2460         error = _convert_lock(r, lkb);
2461  out:
2462         unlock_rsb(r);
2463         put_rsb(r);
2464         return error;
2465 }
2466
2467 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2468                        struct dlm_args *args)
2469 {
2470         struct dlm_rsb *r;
2471         int error;
2472
2473         r = lkb->lkb_resource;
2474
2475         hold_rsb(r);
2476         lock_rsb(r);
2477
2478         error = validate_unlock_args(lkb, args);
2479         if (error)
2480                 goto out;
2481
2482         error = _unlock_lock(r, lkb);
2483  out:
2484         unlock_rsb(r);
2485         put_rsb(r);
2486         return error;
2487 }
2488
2489 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2490                        struct dlm_args *args)
2491 {
2492         struct dlm_rsb *r;
2493         int error;
2494
2495         r = lkb->lkb_resource;
2496
2497         hold_rsb(r);
2498         lock_rsb(r);
2499
2500         error = validate_unlock_args(lkb, args);
2501         if (error)
2502                 goto out;
2503
2504         error = _cancel_lock(r, lkb);
2505  out:
2506         unlock_rsb(r);
2507         put_rsb(r);
2508         return error;
2509 }
2510
2511 /*
2512  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2513  */
2514
2515 int dlm_lock(dlm_lockspace_t *lockspace,
2516              int mode,
2517              struct dlm_lksb *lksb,
2518              uint32_t flags,
2519              void *name,
2520              unsigned int namelen,
2521              uint32_t parent_lkid,
2522              void (*ast) (void *astarg),
2523              void *astarg,
2524              void (*bast) (void *astarg, int mode))
2525 {
2526         struct dlm_ls *ls;
2527         struct dlm_lkb *lkb;
2528         struct dlm_args args;
2529         int error, convert = flags & DLM_LKF_CONVERT;
2530
2531         ls = dlm_find_lockspace_local(lockspace);
2532         if (!ls)
2533                 return -EINVAL;
2534
2535         dlm_lock_recovery(ls);
2536
2537         if (convert)
2538                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2539         else
2540                 error = create_lkb(ls, &lkb);
2541
2542         if (error)
2543                 goto out;
2544
2545         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2546                               astarg, bast, &args);
2547         if (error)
2548                 goto out_put;
2549
2550         if (convert)
2551                 error = convert_lock(ls, lkb, &args);
2552         else
2553                 error = request_lock(ls, lkb, name, namelen, &args);
2554
2555         if (error == -EINPROGRESS)
2556                 error = 0;
2557  out_put:
2558         if (convert || error)
2559                 __put_lkb(ls, lkb);
2560         if (error == -EAGAIN || error == -EDEADLK)
2561                 error = 0;
2562  out:
2563         dlm_unlock_recovery(ls);
2564         dlm_put_lockspace(ls);
2565         return error;
2566 }
2567
2568 int dlm_unlock(dlm_lockspace_t *lockspace,
2569                uint32_t lkid,
2570                uint32_t flags,
2571                struct dlm_lksb *lksb,
2572                void *astarg)
2573 {
2574         struct dlm_ls *ls;
2575         struct dlm_lkb *lkb;
2576         struct dlm_args args;
2577         int error;
2578
2579         ls = dlm_find_lockspace_local(lockspace);
2580         if (!ls)
2581                 return -EINVAL;
2582
2583         dlm_lock_recovery(ls);
2584
2585         error = find_lkb(ls, lkid, &lkb);
2586         if (error)
2587                 goto out;
2588
2589         error = set_unlock_args(flags, astarg, &args);
2590         if (error)
2591                 goto out_put;
2592
2593         if (flags & DLM_LKF_CANCEL)
2594                 error = cancel_lock(ls, lkb, &args);
2595         else
2596                 error = unlock_lock(ls, lkb, &args);
2597
2598         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2599                 error = 0;
2600         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2601                 error = 0;
2602  out_put:
2603         dlm_put_lkb(lkb);
2604  out:
2605         dlm_unlock_recovery(ls);
2606         dlm_put_lockspace(ls);
2607         return error;
2608 }
2609
2610 /*
2611  * send/receive routines for remote operations and replies
2612  *
2613  * send_args
2614  * send_common
2615  * send_request                 receive_request
2616  * send_convert                 receive_convert
2617  * send_unlock                  receive_unlock
2618  * send_cancel                  receive_cancel
2619  * send_grant                   receive_grant
2620  * send_bast                    receive_bast
2621  * send_lookup                  receive_lookup
2622  * send_remove                  receive_remove
2623  *
2624  *                              send_common_reply
2625  * receive_request_reply        send_request_reply
2626  * receive_convert_reply        send_convert_reply
2627  * receive_unlock_reply         send_unlock_reply
2628  * receive_cancel_reply         send_cancel_reply
2629  * receive_lookup_reply         send_lookup_reply
2630  */
2631
2632 static int _create_message(struct dlm_ls *ls, int mb_len,
2633                            int to_nodeid, int mstype,
2634                            struct dlm_message **ms_ret,
2635                            struct dlm_mhandle **mh_ret)
2636 {
2637         struct dlm_message *ms;
2638         struct dlm_mhandle *mh;
2639         char *mb;
2640
2641         /* get_buffer gives us a message handle (mh) that we need to
2642            pass into lowcomms_commit and a message buffer (mb) that we
2643            write our data into */
2644
2645         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2646         if (!mh)
2647                 return -ENOBUFS;
2648
2649         memset(mb, 0, mb_len);
2650
2651         ms = (struct dlm_message *) mb;
2652
2653         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2654         ms->m_header.h_lockspace = ls->ls_global_id;
2655         ms->m_header.h_nodeid = dlm_our_nodeid();
2656         ms->m_header.h_length = mb_len;
2657         ms->m_header.h_cmd = DLM_MSG;
2658
2659         ms->m_type = mstype;
2660
2661         *mh_ret = mh;
2662         *ms_ret = ms;
2663         return 0;
2664 }
2665
2666 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2667                           int to_nodeid, int mstype,
2668                           struct dlm_message **ms_ret,
2669                           struct dlm_mhandle **mh_ret)
2670 {
2671         int mb_len = sizeof(struct dlm_message);
2672
2673         switch (mstype) {
2674         case DLM_MSG_REQUEST:
2675         case DLM_MSG_LOOKUP:
2676         case DLM_MSG_REMOVE:
2677                 mb_len += r->res_length;
2678                 break;
2679         case DLM_MSG_CONVERT:
2680         case DLM_MSG_UNLOCK:
2681         case DLM_MSG_REQUEST_REPLY:
2682         case DLM_MSG_CONVERT_REPLY:
2683         case DLM_MSG_GRANT:
2684                 if (lkb && lkb->lkb_lvbptr)
2685                         mb_len += r->res_ls->ls_lvblen;
2686                 break;
2687         }
2688
2689         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2690                                ms_ret, mh_ret);
2691 }
2692
2693 /* further lowcomms enhancements or alternate implementations may make
2694    the return value from this function useful at some point */
2695
2696 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2697 {
2698         dlm_message_out(ms);
2699         dlm_lowcomms_commit_buffer(mh);
2700         return 0;
2701 }
2702
2703 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2704                       struct dlm_message *ms)
2705 {
2706         ms->m_nodeid   = lkb->lkb_nodeid;
2707         ms->m_pid      = lkb->lkb_ownpid;
2708         ms->m_lkid     = lkb->lkb_id;
2709         ms->m_remid    = lkb->lkb_remid;
2710         ms->m_exflags  = lkb->lkb_exflags;
2711         ms->m_sbflags  = lkb->lkb_sbflags;
2712         ms->m_flags    = lkb->lkb_flags;
2713         ms->m_lvbseq   = lkb->lkb_lvbseq;
2714         ms->m_status   = lkb->lkb_status;
2715         ms->m_grmode   = lkb->lkb_grmode;
2716         ms->m_rqmode   = lkb->lkb_rqmode;
2717         ms->m_hash     = r->res_hash;
2718
2719         /* m_result and m_bastmode are set from function args,
2720            not from lkb fields */
2721
2722         if (lkb->lkb_bastfn)
2723                 ms->m_asts |= AST_BAST;
2724         if (lkb->lkb_astfn)
2725                 ms->m_asts |= AST_COMP;
2726
2727         /* compare with switch in create_message; send_remove() doesn't
2728            use send_args() */
2729
2730         switch (ms->m_type) {
2731         case DLM_MSG_REQUEST:
2732         case DLM_MSG_LOOKUP:
2733                 memcpy(ms->m_extra, r->res_name, r->res_length);
2734                 break;
2735         case DLM_MSG_CONVERT:
2736         case DLM_MSG_UNLOCK:
2737         case DLM_MSG_REQUEST_REPLY:
2738         case DLM_MSG_CONVERT_REPLY:
2739         case DLM_MSG_GRANT:
2740                 if (!lkb->lkb_lvbptr)
2741                         break;
2742                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2743                 break;
2744         }
2745 }
2746
2747 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2748 {
2749         struct dlm_message *ms;
2750         struct dlm_mhandle *mh;
2751         int to_nodeid, error;
2752
2753         error = add_to_waiters(lkb, mstype);
2754         if (error)
2755                 return error;
2756
2757         to_nodeid = r->res_nodeid;
2758
2759         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2760         if (error)
2761                 goto fail;
2762
2763         send_args(r, lkb, ms);
2764
2765         error = send_message(mh, ms);
2766         if (error)
2767                 goto fail;
2768         return 0;
2769
2770  fail:
2771         remove_from_waiters(lkb, msg_reply_type(mstype));
2772         return error;
2773 }
2774
2775 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2776 {
2777         return send_common(r, lkb, DLM_MSG_REQUEST);
2778 }
2779
2780 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2781 {
2782         int error;
2783
2784         error = send_common(r, lkb, DLM_MSG_CONVERT);
2785
2786         /* down conversions go without a reply from the master */
2787         if (!error && down_conversion(lkb)) {
2788                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2789                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2790                 r->res_ls->ls_stub_ms.m_result = 0;
2791                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2792                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2793         }
2794
2795         return error;
2796 }
2797
2798 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2799    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2800    that the master is still correct. */
2801
2802 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2803 {
2804         return send_common(r, lkb, DLM_MSG_UNLOCK);
2805 }
2806
2807 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2808 {
2809         return send_common(r, lkb, DLM_MSG_CANCEL);
2810 }
2811
2812 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2813 {
2814         struct dlm_message *ms;
2815         struct dlm_mhandle *mh;
2816         int to_nodeid, error;
2817
2818         to_nodeid = lkb->lkb_nodeid;
2819
2820         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2821         if (error)
2822                 goto out;
2823
2824         send_args(r, lkb, ms);
2825
2826         ms->m_result = 0;
2827
2828         error = send_message(mh, ms);
2829  out:
2830         return error;
2831 }
2832
2833 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2834 {
2835         struct dlm_message *ms;
2836         struct dlm_mhandle *mh;
2837         int to_nodeid, error;
2838
2839         to_nodeid = lkb->lkb_nodeid;
2840
2841         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2842         if (error)
2843                 goto out;
2844
2845         send_args(r, lkb, ms);
2846
2847         ms->m_bastmode = mode;
2848
2849         error = send_message(mh, ms);
2850  out:
2851         return error;
2852 }
2853
2854 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2855 {
2856         struct dlm_message *ms;
2857         struct dlm_mhandle *mh;
2858         int to_nodeid, error;
2859
2860         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2861         if (error)
2862                 return error;
2863
2864         to_nodeid = dlm_dir_nodeid(r);
2865
2866         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2867         if (error)
2868                 goto fail;
2869
2870         send_args(r, lkb, ms);
2871
2872         error = send_message(mh, ms);
2873         if (error)
2874                 goto fail;
2875         return 0;
2876
2877  fail:
2878         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2879         return error;
2880 }
2881
2882 static int send_remove(struct dlm_rsb *r)
2883 {
2884         struct dlm_message *ms;
2885         struct dlm_mhandle *mh;
2886         int to_nodeid, error;
2887
2888         to_nodeid = dlm_dir_nodeid(r);
2889
2890         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2891         if (error)
2892                 goto out;
2893
2894         memcpy(ms->m_extra, r->res_name, r->res_length);
2895         ms->m_hash = r->res_hash;
2896
2897         error = send_message(mh, ms);
2898  out:
2899         return error;
2900 }
2901
2902 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2903                              int mstype, int rv)
2904 {
2905         struct dlm_message *ms;
2906         struct dlm_mhandle *mh;
2907         int to_nodeid, error;
2908
2909         to_nodeid = lkb->lkb_nodeid;
2910
2911         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2912         if (error)
2913                 goto out;
2914
2915         send_args(r, lkb, ms);
2916
2917         ms->m_result = rv;
2918
2919         error = send_message(mh, ms);
2920  out:
2921         return error;
2922 }
2923
2924 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2925 {
2926         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2927 }
2928
2929 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2930 {
2931         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2932 }
2933
2934 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2935 {
2936         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2937 }
2938
2939 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2940 {
2941         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2942 }
2943
2944 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2945                              int ret_nodeid, int rv)
2946 {
2947         struct dlm_rsb *r = &ls->ls_stub_rsb;
2948         struct dlm_message *ms;
2949         struct dlm_mhandle *mh;
2950         int error, nodeid = ms_in->m_header.h_nodeid;
2951
2952         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2953         if (error)
2954                 goto out;
2955
2956         ms->m_lkid = ms_in->m_lkid;
2957         ms->m_result = rv;
2958         ms->m_nodeid = ret_nodeid;
2959
2960         error = send_message(mh, ms);
2961  out:
2962         return error;
2963 }
2964
2965 /* which args we save from a received message depends heavily on the type
2966    of message, unlike the send side where we can safely send everything about
2967    the lkb for any type of message */
2968
2969 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2970 {
2971         lkb->lkb_exflags = ms->m_exflags;
2972         lkb->lkb_sbflags = ms->m_sbflags;
2973         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2974                          (ms->m_flags & 0x0000FFFF);
2975 }
2976
2977 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2978 {
2979         lkb->lkb_sbflags = ms->m_sbflags;
2980         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2981                          (ms->m_flags & 0x0000FFFF);
2982 }
2983
2984 static int receive_extralen(struct dlm_message *ms)
2985 {
2986         return (ms->m_header.h_length - sizeof(struct dlm_message));
2987 }
2988
2989 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2990                        struct dlm_message *ms)
2991 {
2992         int len;
2993
2994         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2995                 if (!lkb->lkb_lvbptr)
2996                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
2997                 if (!lkb->lkb_lvbptr)
2998                         return -ENOMEM;
2999                 len = receive_extralen(ms);
3000                 if (len > DLM_RESNAME_MAXLEN)
3001                         len = DLM_RESNAME_MAXLEN;
3002                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3003         }
3004         return 0;
3005 }
3006
3007 static void fake_bastfn(void *astparam, int mode)
3008 {
3009         log_print("fake_bastfn should not be called");
3010 }
3011
3012 static void fake_astfn(void *astparam)
3013 {
3014         log_print("fake_astfn should not be called");
3015 }
3016
3017 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3018                                 struct dlm_message *ms)
3019 {
3020         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3021         lkb->lkb_ownpid = ms->m_pid;
3022         lkb->lkb_remid = ms->m_lkid;
3023         lkb->lkb_grmode = DLM_LOCK_IV;
3024         lkb->lkb_rqmode = ms->m_rqmode;
3025
3026         lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3027         lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3028
3029         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3030                 /* lkb was just created so there won't be an lvb yet */
3031                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3032                 if (!lkb->lkb_lvbptr)
3033                         return -ENOMEM;
3034         }
3035
3036         return 0;
3037 }
3038
3039 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3040                                 struct dlm_message *ms)
3041 {
3042         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3043                 return -EBUSY;
3044
3045         if (receive_lvb(ls, lkb, ms))
3046                 return -ENOMEM;
3047
3048         lkb->lkb_rqmode = ms->m_rqmode;
3049         lkb->lkb_lvbseq = ms->m_lvbseq;
3050
3051         return 0;
3052 }
3053
3054 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3055                                struct dlm_message *ms)
3056 {
3057         if (receive_lvb(ls, lkb, ms))
3058                 return -ENOMEM;
3059         return 0;
3060 }
3061
3062 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3063    uses to send a reply and that the remote end uses to process the reply. */
3064
3065 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3066 {
3067         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3068         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3069         lkb->lkb_remid = ms->m_lkid;
3070 }
3071
3072 /* This is called after the rsb is locked so that we can safely inspect
3073    fields in the lkb. */
3074
3075 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3076 {
3077         int from = ms->m_header.h_nodeid;
3078         int error = 0;
3079
3080         switch (ms->m_type) {
3081         case DLM_MSG_CONVERT:
3082         case DLM_MSG_UNLOCK:
3083         case DLM_MSG_CANCEL:
3084                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3085                         error = -EINVAL;
3086                 break;
3087
3088         case DLM_MSG_CONVERT_REPLY:
3089         case DLM_MSG_UNLOCK_REPLY:
3090         case DLM_MSG_CANCEL_REPLY:
3091         case DLM_MSG_GRANT:
3092         case DLM_MSG_BAST:
3093                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3094                         error = -EINVAL;
3095                 break;
3096
3097         case DLM_MSG_REQUEST_REPLY:
3098                 if (!is_process_copy(lkb))
3099                         error = -EINVAL;
3100                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3101                         error = -EINVAL;
3102                 break;
3103
3104         default:
3105                 error = -EINVAL;
3106         }
3107
3108         if (error)
3109                 log_error(lkb->lkb_resource->res_ls,
3110                           "ignore invalid message %d from %d %x %x %x %d",
3111                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3112                           lkb->lkb_flags, lkb->lkb_nodeid);
3113         return error;
3114 }
3115
3116 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3117 {
3118         struct dlm_lkb *lkb;
3119         struct dlm_rsb *r;
3120         int error, namelen;
3121
3122         error = create_lkb(ls, &lkb);
3123         if (error)
3124                 goto fail;
3125
3126         receive_flags(lkb, ms);
3127         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3128         error = receive_request_args(ls, lkb, ms);
3129         if (error) {
3130                 __put_lkb(ls, lkb);
3131                 goto fail;
3132         }
3133
3134         namelen = receive_extralen(ms);
3135
3136         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3137         if (error) {
3138                 __put_lkb(ls, lkb);
3139                 goto fail;
3140         }
3141
3142         lock_rsb(r);
3143
3144         attach_lkb(r, lkb);
3145         error = do_request(r, lkb);
3146         send_request_reply(r, lkb, error);
3147
3148         unlock_rsb(r);
3149         put_rsb(r);
3150
3151         if (error == -EINPROGRESS)
3152                 error = 0;
3153         if (error)
3154                 dlm_put_lkb(lkb);
3155         return;
3156
3157  fail:
3158         setup_stub_lkb(ls, ms);
3159         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3160 }
3161
3162 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3163 {
3164         struct dlm_lkb *lkb;
3165         struct dlm_rsb *r;
3166         int error, reply = 1;
3167
3168         error = find_lkb(ls, ms->m_remid, &lkb);
3169         if (error)
3170                 goto fail;
3171
3172         r = lkb->lkb_resource;
3173
3174         hold_rsb(r);
3175         lock_rsb(r);
3176
3177         error = validate_message(lkb, ms);
3178         if (error)
3179                 goto out;
3180
3181         receive_flags(lkb, ms);
3182         error = receive_convert_args(ls, lkb, ms);
3183         if (error)
3184                 goto out_reply;
3185         reply = !down_conversion(lkb);
3186
3187         error = do_convert(r, lkb);
3188  out_reply:
3189         if (reply)
3190                 send_convert_reply(r, lkb, error);
3191  out:
3192         unlock_rsb(r);
3193         put_rsb(r);
3194         dlm_put_lkb(lkb);
3195         return;
3196
3197  fail:
3198         setup_stub_lkb(ls, ms);
3199         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3200 }
3201
3202 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3203 {
3204         struct dlm_lkb *lkb;
3205         struct dlm_rsb *r;
3206         int error;
3207
3208         error = find_lkb(ls, ms->m_remid, &lkb);
3209         if (error)
3210                 goto fail;
3211
3212         r = lkb->lkb_resource;
3213
3214         hold_rsb(r);
3215         lock_rsb(r);
3216
3217         error = validate_message(lkb, ms);
3218         if (error)
3219                 goto out;
3220
3221         receive_flags(lkb, ms);
3222         error = receive_unlock_args(ls, lkb, ms);
3223         if (error)
3224                 goto out_reply;
3225
3226         error = do_unlock(r, lkb);
3227  out_reply:
3228         send_unlock_reply(r, lkb, error);
3229  out:
3230         unlock_rsb(r);
3231         put_rsb(r);
3232         dlm_put_lkb(lkb);
3233         return;
3234
3235  fail:
3236         setup_stub_lkb(ls, ms);
3237         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3238 }
3239
3240 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3241 {
3242         struct dlm_lkb *lkb;
3243         struct dlm_rsb *r;
3244         int error;
3245
3246         error = find_lkb(ls, ms->m_remid, &lkb);
3247         if (error)
3248                 goto fail;
3249
3250         receive_flags(lkb, ms);
3251
3252         r = lkb->lkb_resource;
3253
3254         hold_rsb(r);
3255         lock_rsb(r);
3256
3257         error = validate_message(lkb, ms);
3258         if (error)
3259                 goto out;
3260
3261         error = do_cancel(r, lkb);
3262         send_cancel_reply(r, lkb, error);
3263  out:
3264         unlock_rsb(r);
3265         put_rsb(r);
3266         dlm_put_lkb(lkb);
3267         return;
3268
3269  fail:
3270         setup_stub_lkb(ls, ms);
3271         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3272 }
3273
3274 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3275 {
3276         struct dlm_lkb *lkb;
3277         struct dlm_rsb *r;
3278         int error;
3279
3280         error = find_lkb(ls, ms->m_remid, &lkb);
3281         if (error) {
3282                 log_debug(ls, "receive_grant from %d no lkb %x",
3283                           ms->m_header.h_nodeid, ms->m_remid);
3284                 return;
3285         }
3286
3287         r = lkb->lkb_resource;
3288
3289         hold_rsb(r);
3290         lock_rsb(r);
3291
3292         error = validate_message(lkb, ms);
3293         if (error)
3294                 goto out;
3295
3296         receive_flags_reply(lkb, ms);
3297         if (is_altmode(lkb))
3298                 munge_altmode(lkb, ms);
3299         grant_lock_pc(r, lkb, ms);
3300         queue_cast(r, lkb, 0);
3301  out:
3302         unlock_rsb(r);
3303         put_rsb(r);
3304         dlm_put_lkb(lkb);
3305 }
3306
3307 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3308 {
3309         struct dlm_lkb *lkb;
3310         struct dlm_rsb *r;
3311         int error;
3312
3313         error = find_lkb(ls, ms->m_remid, &lkb);
3314         if (error) {
3315                 log_debug(ls, "receive_bast from %d no lkb %x",
3316                           ms->m_header.h_nodeid, ms->m_remid);
3317                 return;
3318         }
3319
3320         r = lkb->lkb_resource;
3321
3322         hold_rsb(r);
3323         lock_rsb(r);
3324
3325         error = validate_message(lkb, ms);
3326         if (error)
3327                 goto out;
3328
3329         queue_bast(r, lkb, ms->m_bastmode);
3330  out:
3331         unlock_rsb(r);
3332         put_rsb(r);
3333         dlm_put_lkb(lkb);
3334 }
3335
3336 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3337 {
3338         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3339
3340         from_nodeid = ms->m_header.h_nodeid;
3341         our_nodeid = dlm_our_nodeid();
3342
3343         len = receive_extralen(ms);
3344
3345         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3346         if (dir_nodeid != our_nodeid) {
3347                 log_error(ls, "lookup dir_nodeid %d from %d",
3348                           dir_nodeid, from_nodeid);
3349                 error = -EINVAL;
3350                 ret_nodeid = -1;
3351                 goto out;
3352         }
3353
3354         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3355
3356         /* Optimization: we're master so treat lookup as a request */
3357         if (!error && ret_nodeid == our_nodeid) {
3358                 receive_request(ls, ms);
3359                 return;
3360         }
3361  out:
3362         send_lookup_reply(ls, ms, ret_nodeid, error);
3363 }
3364
3365 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3366 {
3367         int len, dir_nodeid, from_nodeid;
3368
3369         from_nodeid = ms->m_header.h_nodeid;
3370
3371         len = receive_extralen(ms);
3372
3373         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3374         if (dir_nodeid != dlm_our_nodeid()) {
3375                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3376                           dir_nodeid, from_nodeid);
3377                 return;
3378         }
3379
3380         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3381 }
3382
3383 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3384 {
3385         do_purge(ls, ms->m_nodeid, ms->m_pid);
3386 }
3387
3388 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3389 {
3390         struct dlm_lkb *lkb;
3391         struct dlm_rsb *r;
3392         int error, mstype, result;
3393
3394         error = find_lkb(ls, ms->m_remid, &lkb);
3395         if (error) {
3396                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3397                           ms->m_header.h_nodeid, ms->m_remid);
3398                 return;
3399         }
3400
3401         r = lkb->lkb_resource;
3402         hold_rsb(r);
3403         lock_rsb(r);
3404
3405         error = validate_message(lkb, ms);
3406         if (error)
3407                 goto out;
3408
3409         mstype = lkb->lkb_wait_type;
3410         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3411         if (error)
3412                 goto out;
3413
3414         /* Optimization: the dir node was also the master, so it took our
3415            lookup as a request and sent request reply instead of lookup reply */
3416         if (mstype == DLM_MSG_LOOKUP) {
3417                 r->res_nodeid = ms->m_header.h_nodeid;
3418                 lkb->lkb_nodeid = r->res_nodeid;
3419         }
3420
3421         /* this is the value returned from do_request() on the master */
3422         result = ms->m_result;
3423
3424         switch (result) {
3425         case -EAGAIN:
3426                 /* request would block (be queued) on remote master */
3427                 queue_cast(r, lkb, -EAGAIN);
3428                 confirm_master(r, -EAGAIN);
3429                 unhold_lkb(lkb); /* undoes create_lkb() */
3430                 break;
3431
3432         case -EINPROGRESS:
3433         case 0:
3434                 /* request was queued or granted on remote master */
3435                 receive_flags_reply(lkb, ms);
3436                 lkb->lkb_remid = ms->m_lkid;
3437                 if (is_altmode(lkb))
3438                         munge_altmode(lkb, ms);
3439                 if (result) {
3440                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3441                         add_timeout(lkb);
3442                 } else {
3443                         grant_lock_pc(r, lkb, ms);
3444                         queue_cast(r, lkb, 0);
3445                 }
3446                 confirm_master(r, result);
3447                 break;
3448
3449         case -EBADR:
3450         case -ENOTBLK:
3451                 /* find_rsb failed to find rsb or rsb wasn't master */
3452                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3453                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3454                 r->res_nodeid = -1;
3455                 lkb->lkb_nodeid = -1;
3456
3457                 if (is_overlap(lkb)) {
3458                         /* we'll ignore error in cancel/unlock reply */
3459                         queue_cast_overlap(r, lkb);
3460                         confirm_master(r, result);
3461                         unhold_lkb(lkb); /* undoes create_lkb() */
3462                 } else
3463                         _request_lock(r, lkb);
3464                 break;
3465
3466         default:
3467                 log_error(ls, "receive_request_reply %x error %d",
3468                           lkb->lkb_id, result);
3469         }
3470
3471         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3472                 log_debug(ls, "receive_request_reply %x result %d unlock",
3473                           lkb->lkb_id, result);
3474                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3475                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3476                 send_unlock(r, lkb);
3477         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3478                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3479                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3480                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3481                 send_cancel(r, lkb);
3482         } else {
3483                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3484                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3485         }
3486  out:
3487         unlock_rsb(r);
3488         put_rsb(r);
3489         dlm_put_lkb(lkb);
3490 }
3491
3492 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3493                                     struct dlm_message *ms)
3494 {
3495         /* this is the value returned from do_convert() on the master */
3496         switch (ms->m_result) {
3497         case -EAGAIN:
3498                 /* convert would block (be queued) on remote master */
3499                 queue_cast(r, lkb, -EAGAIN);
3500                 break;
3501
3502         case -EDEADLK:
3503                 receive_flags_reply(lkb, ms);
3504                 revert_lock_pc(r, lkb);
3505                 queue_cast(r, lkb, -EDEADLK);
3506                 break;
3507
3508         case -EINPROGRESS:
3509                 /* convert was queued on remote master */
3510                 receive_flags_reply(lkb, ms);
3511                 if (is_demoted(lkb))
3512                         munge_demoted(lkb, ms);
3513                 del_lkb(r, lkb);
3514                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3515                 add_timeout(lkb);
3516                 break;
3517
3518         case 0:
3519                 /* convert was granted on remote master */
3520                 receive_flags_reply(lkb, ms);
3521                 if (is_demoted(lkb))
3522                         munge_demoted(lkb, ms);
3523                 grant_lock_pc(r, lkb, ms);
3524                 queue_cast(r, lkb, 0);
3525                 break;
3526
3527         default:
3528                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3529                           lkb->lkb_id, ms->m_result);
3530         }
3531 }
3532
3533 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3534 {
3535         struct dlm_rsb *r = lkb->lkb_resource;
3536         int error;
3537
3538         hold_rsb(r);
3539         lock_rsb(r);
3540
3541         error = validate_message(lkb, ms);
3542         if (error)
3543                 goto out;
3544
3545         /* stub reply can happen with waiters_mutex held */
3546         error = remove_from_waiters_ms(lkb, ms);
3547         if (error)
3548                 goto out;
3549
3550         __receive_convert_reply(r, lkb, ms);
3551  out:
3552         unlock_rsb(r);
3553         put_rsb(r);
3554 }
3555
3556 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3557 {
3558         struct dlm_lkb *lkb;
3559         int error;
3560
3561         error = find_lkb(ls, ms->m_remid, &lkb);
3562         if (error) {
3563                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3564                           ms->m_header.h_nodeid, ms->m_remid);
3565                 return;
3566         }
3567
3568         _receive_convert_reply(lkb, ms);
3569         dlm_put_lkb(lkb);
3570 }
3571
3572 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3573 {
3574         struct dlm_rsb *r = lkb->lkb_resource;
3575         int error;
3576
3577         hold_rsb(r);
3578         lock_rsb(r);
3579
3580         error = validate_message(lkb, ms);
3581         if (error)
3582                 goto out;
3583
3584         /* stub reply can happen with waiters_mutex held */
3585         error = remove_from_waiters_ms(lkb, ms);
3586         if (error)
3587                 goto out;
3588
3589         /* this is the value returned from do_unlock() on the master */
3590
3591         switch (ms->m_result) {
3592         case -DLM_EUNLOCK:
3593                 receive_flags_reply(lkb, ms);
3594                 remove_lock_pc(r, lkb);
3595                 queue_cast(r, lkb, -DLM_EUNLOCK);
3596                 break;
3597         case -ENOENT:
3598                 break;
3599         default:
3600                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3601                           lkb->lkb_id, ms->m_result);
3602         }
3603  out:
3604         unlock_rsb(r);
3605         put_rsb(r);
3606 }
3607
3608 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3609 {
3610         struct dlm_lkb *lkb;
3611         int error;
3612
3613         error = find_lkb(ls, ms->m_remid, &lkb);
3614         if (error) {
3615                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3616                           ms->m_header.h_nodeid, ms->m_remid);
3617                 return;
3618         }
3619
3620         _receive_unlock_reply(lkb, ms);
3621         dlm_put_lkb(lkb);
3622 }
3623
3624 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3625 {
3626         struct dlm_rsb *r = lkb->lkb_resource;
3627         int error;
3628
3629         hold_rsb(r);
3630         lock_rsb(r);
3631
3632         error = validate_message(lkb, ms);
3633         if (error)
3634                 goto out;
3635
3636         /* stub reply can happen with waiters_mutex held */
3637         error = remove_from_waiters_ms(lkb, ms);
3638         if (error)
3639                 goto out;
3640
3641         /* this is the value returned from do_cancel() on the master */
3642
3643         switch (ms->m_result) {
3644         case -DLM_ECANCEL:
3645                 receive_flags_reply(lkb, ms);
3646                 revert_lock_pc(r, lkb);
3647                 queue_cast(r, lkb, -DLM_ECANCEL);
3648                 break;
3649         case 0:
3650                 break;
3651         default:
3652                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3653                           lkb->lkb_id, ms->m_result);
3654         }
3655  out:
3656         unlock_rsb(r);
3657         put_rsb(r);
3658 }
3659
3660 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3661 {
3662         struct dlm_lkb *lkb;
3663         int error;
3664
3665         error = find_lkb(ls, ms->m_remid, &lkb);
3666         if (error) {
3667                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3668                           ms->m_header.h_nodeid, ms->m_remid);
3669                 return;
3670         }
3671
3672         _receive_cancel_reply(lkb, ms);
3673         dlm_put_lkb(lkb);
3674 }
3675
3676 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3677 {
3678         struct dlm_lkb *lkb;
3679         struct dlm_rsb *r;
3680         int error, ret_nodeid;
3681
3682         error = find_lkb(ls, ms->m_lkid, &lkb);
3683         if (error) {
3684                 log_error(ls, "receive_lookup_reply no lkb");
3685                 return;
3686         }
3687
3688         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3689            FIXME: will a non-zero error ever be returned? */
3690
3691         r = lkb->lkb_resource;
3692         hold_rsb(r);
3693         lock_rsb(r);
3694
3695         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3696         if (error)
3697                 goto out;
3698
3699         ret_nodeid = ms->m_nodeid;
3700         if (ret_nodeid == dlm_our_nodeid()) {
3701                 r->res_nodeid = 0;
3702                 ret_nodeid = 0;
3703                 r->res_first_lkid = 0;
3704         } else {
3705                 /* set_master() will copy res_nodeid to lkb_nodeid */
3706                 r->res_nodeid = ret_nodeid;
3707         }
3708
3709         if (is_overlap(lkb)) {
3710                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3711                           lkb->lkb_id, lkb->lkb_flags);
3712                 queue_cast_overlap(r, lkb);
3713                 unhold_lkb(lkb); /* undoes create_lkb() */
3714                 goto out_list;
3715         }
3716
3717         _request_lock(r, lkb);
3718
3719  out_list:
3720         if (!ret_nodeid)
3721                 process_lookup_list(r);
3722  out:
3723         unlock_rsb(r);
3724         put_rsb(r);
3725         dlm_put_lkb(lkb);
3726 }
3727
3728 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3729 {
3730         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3731                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3732                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3733                           ms->m_remid, ms->m_result);
3734                 return;
3735         }
3736
3737         switch (ms->m_type) {
3738
3739         /* messages sent to a master node */
3740
3741         case DLM_MSG_REQUEST:
3742                 receive_request(ls, ms);
3743                 break;
3744
3745         case DLM_MSG_CONVERT:
3746                 receive_convert(ls, ms);
3747                 break;
3748
3749         case DLM_MSG_UNLOCK:
3750                 receive_unlock(ls, ms);
3751                 break;
3752
3753         case DLM_MSG_CANCEL:
3754                 receive_cancel(ls, ms);
3755                 break;
3756
3757         /* messages sent from a master node (replies to above) */
3758
3759         case DLM_MSG_REQUEST_REPLY:
3760                 receive_request_reply(ls, ms);
3761                 break;
3762
3763         case DLM_MSG_CONVERT_REPLY:
3764                 receive_convert_reply(ls, ms);
3765                 break;
3766
3767         case DLM_MSG_UNLOCK_REPLY:
3768                 receive_unlock_reply(ls, ms);
3769                 break;
3770
3771         case DLM_MSG_CANCEL_REPLY:
3772                 receive_cancel_reply(ls, ms);
3773                 break;
3774
3775         /* messages sent from a master node (only two types of async msg) */
3776
3777         case DLM_MSG_GRANT:
3778                 receive_grant(ls, ms);
3779                 break;
3780
3781         case DLM_MSG_BAST:
3782                 receive_bast(ls, ms);
3783                 break;
3784
3785         /* messages sent to a dir node */
3786
3787         case DLM_MSG_LOOKUP:
3788                 receive_lookup(ls, ms);
3789                 break;
3790
3791         case DLM_MSG_REMOVE:
3792                 receive_remove(ls, ms);
3793                 break;
3794
3795         /* messages sent from a dir node (remove has no reply) */
3796
3797         case DLM_MSG_LOOKUP_REPLY:
3798                 receive_lookup_reply(ls, ms);
3799                 break;
3800
3801         /* other messages */
3802
3803         case DLM_MSG_PURGE:
3804                 receive_purge(ls, ms);
3805                 break;
3806
3807         default:
3808                 log_error(ls, "unknown message type %d", ms->m_type);
3809         }
3810
3811         dlm_astd_wake();
3812 }
3813
3814 /* If the lockspace is in recovery mode (locking stopped), then normal
3815    messages are saved on the requestqueue for processing after recovery is
3816    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3817    messages off the requestqueue before we process new ones. This occurs right
3818    after recovery completes when we transition from saving all messages on
3819    requestqueue, to processing all the saved messages, to processing new
3820    messages as they arrive. */
3821
3822 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3823                                 int nodeid)
3824 {
3825         if (dlm_locking_stopped(ls)) {
3826                 dlm_add_requestqueue(ls, nodeid, ms);
3827         } else {
3828                 dlm_wait_requestqueue(ls);
3829                 _receive_message(ls, ms);
3830         }
3831 }
3832
3833 /* This is called by dlm_recoverd to process messages that were saved on
3834    the requestqueue. */
3835
3836 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3837 {
3838         _receive_message(ls, ms);
3839 }
3840
3841 /* This is called by the midcomms layer when something is received for
3842    the lockspace.  It could be either a MSG (normal message sent as part of
3843    standard locking activity) or an RCOM (recovery message sent as part of
3844    lockspace recovery). */
3845
3846 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3847 {
3848         struct dlm_header *hd = &p->header;
3849         struct dlm_ls *ls;
3850         int type = 0;
3851
3852         switch (hd->h_cmd) {
3853         case DLM_MSG:
3854                 dlm_message_in(&p->message);
3855                 type = p->message.m_type;
3856                 break;
3857         case DLM_RCOM:
3858                 dlm_rcom_in(&p->rcom);
3859                 type = p->rcom.rc_type;
3860                 break;
3861         default:
3862                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3863                 return;
3864         }
3865
3866         if (hd->h_nodeid != nodeid) {
3867                 log_print("invalid h_nodeid %d from %d lockspace %x",
3868                           hd->h_nodeid, nodeid, hd->h_lockspace);
3869                 return;
3870         }
3871
3872         ls = dlm_find_lockspace_global(hd->h_lockspace);
3873         if (!ls) {
3874                 if (dlm_config.ci_log_debug)
3875                         log_print("invalid lockspace %x from %d cmd %d type %d",
3876                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
3877
3878                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3879                         dlm_send_ls_not_ready(nodeid, &p->rcom);
3880                 return;
3881         }
3882
3883         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3884            be inactive (in this ls) before transitioning to recovery mode */
3885
3886         down_read(&ls->ls_recv_active);
3887         if (hd->h_cmd == DLM_MSG)
3888                 dlm_receive_message(ls, &p->message, nodeid);
3889         else
3890                 dlm_receive_rcom(ls, &p->rcom, nodeid);
3891         up_read(&ls->ls_recv_active);
3892
3893         dlm_put_lockspace(ls);
3894 }
3895
3896 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3897 {
3898         if (middle_conversion(lkb)) {
3899                 hold_lkb(lkb);
3900                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3901                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3902                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3903                 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3904                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3905
3906                 /* Same special case as in receive_rcom_lock_args() */
3907                 lkb->lkb_grmode = DLM_LOCK_IV;
3908                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3909                 unhold_lkb(lkb);
3910
3911         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3912                 lkb->lkb_flags |= DLM_IFL_RESEND;
3913         }
3914
3915         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3916            conversions are async; there's no reply from the remote master */
3917 }
3918
3919 /* A waiting lkb needs recovery if the master node has failed, or
3920    the master node is changing (only when no directory is used) */
3921
3922 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3923 {
3924         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3925                 return 1;
3926
3927         if (!dlm_no_directory(ls))
3928                 return 0;
3929
3930         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3931                 return 1;
3932
3933         return 0;
3934 }
3935
3936 /* Recovery for locks that are waiting for replies from nodes that are now
3937    gone.  We can just complete unlocks and cancels by faking a reply from the
3938    dead node.  Requests and up-conversions we flag to be resent after
3939    recovery.  Down-conversions can just be completed with a fake reply like
3940    unlocks.  Conversions between PR and CW need special attention. */
3941
3942 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3943 {
3944         struct dlm_lkb *lkb, *safe;
3945         int wait_type, stub_unlock_result, stub_cancel_result;
3946
3947         mutex_lock(&ls->ls_waiters_mutex);
3948
3949         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3950                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3951                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3952
3953                 /* all outstanding lookups, regardless of destination  will be
3954                    resent after recovery is done */
3955
3956                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3957                         lkb->lkb_flags |= DLM_IFL_RESEND;
3958                         continue;
3959                 }
3960
3961                 if (!waiter_needs_recovery(ls, lkb))
3962                         continue;
3963
3964                 wait_type = lkb->lkb_wait_type;
3965                 stub_unlock_result = -DLM_EUNLOCK;
3966                 stub_cancel_result = -DLM_ECANCEL;
3967
3968                 /* Main reply may have been received leaving a zero wait_type,
3969                    but a reply for the overlapping op may not have been
3970                    received.  In that case we need to fake the appropriate
3971                    reply for the overlap op. */
3972
3973                 if (!wait_type) {
3974                         if (is_overlap_cancel(lkb)) {
3975                                 wait_type = DLM_MSG_CANCEL;
3976                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
3977                                         stub_cancel_result = 0;
3978                         }
3979                         if (is_overlap_unlock(lkb)) {
3980                                 wait_type = DLM_MSG_UNLOCK;
3981                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
3982                                         stub_unlock_result = -ENOENT;
3983                         }
3984
3985                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
3986                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
3987                                   stub_cancel_result, stub_unlock_result);
3988                 }
3989
3990                 switch (wait_type) {
3991
3992                 case DLM_MSG_REQUEST:
3993                         lkb->lkb_flags |= DLM_IFL_RESEND;
3994                         break;
3995
3996                 case DLM_MSG_CONVERT:
3997                         recover_convert_waiter(ls, lkb);
3998                         break;
3999
4000                 case DLM_MSG_UNLOCK:
4001                         hold_lkb(lkb);
4002                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4003                         ls->ls_stub_ms.m_result = stub_unlock_result;
4004                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4005                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4006                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4007                         dlm_put_lkb(lkb);
4008                         break;
4009
4010                 case DLM_MSG_CANCEL:
4011                         hold_lkb(lkb);
4012                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4013                         ls->ls_stub_ms.m_result = stub_cancel_result;
4014                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4015                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4016                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4017                         dlm_put_lkb(lkb);
4018                         break;
4019
4020                 default:
4021                         log_error(ls, "invalid lkb wait_type %d %d",
4022                                   lkb->lkb_wait_type, wait_type);
4023                 }
4024                 schedule();
4025         }
4026         mutex_unlock(&ls->ls_waiters_mutex);
4027 }
4028
4029 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4030 {
4031         struct dlm_lkb *lkb;
4032         int found = 0;
4033
4034         mutex_lock(&ls->ls_waiters_mutex);
4035         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4036                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4037                         hold_lkb(lkb);
4038                         found = 1;
4039                         break;
4040                 }
4041         }
4042         mutex_unlock(&ls->ls_waiters_mutex);
4043
4044         if (!found)
4045                 lkb = NULL;
4046         return lkb;
4047 }
4048
4049 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4050    master or dir-node for r.  Processing the lkb may result in it being placed
4051    back on waiters. */
4052
4053 /* We do this after normal locking has been enabled and any saved messages
4054    (in requestqueue) have been processed.  We should be confident that at
4055    this point we won't get or process a reply to any of these waiting
4056    operations.  But, new ops may be coming in on the rsbs/locks here from
4057    userspace or remotely. */
4058
4059 /* there may have been an overlap unlock/cancel prior to recovery or after
4060    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4061    overlap flag would just have been set and nothing new sent.  we can be
4062    confident here than any replies to either the initial op or overlap ops
4063    prior to recovery have been received. */
4064
4065 int dlm_recover_waiters_post(struct dlm_ls *ls)
4066 {
4067         struct dlm_lkb *lkb;
4068         struct dlm_rsb *r;
4069         int error = 0, mstype, err, oc, ou;
4070
4071         while (1) {
4072                 if (dlm_locking_stopped(ls)) {
4073                         log_debug(ls, "recover_waiters_post aborted");
4074                         error = -EINTR;
4075                         break;
4076                 }
4077
4078                 lkb = find_resend_waiter(ls);
4079                 if (!lkb)
4080                         break;
4081
4082                 r = lkb->lkb_resource;
4083                 hold_rsb(r);
4084                 lock_rsb(r);
4085
4086                 mstype = lkb->lkb_wait_type;
4087                 oc = is_overlap_cancel(lkb);
4088                 ou = is_overlap_unlock(lkb);
4089                 err = 0;
4090
4091                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4092                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4093
4094                 /* At this point we assume that we won't get a reply to any
4095                    previous op or overlap op on this lock.  First, do a big
4096                    remove_from_waiters() for all previous ops. */
4097
4098                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4099                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4100                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4101                 lkb->lkb_wait_type = 0;
4102                 lkb->lkb_wait_count = 0;
4103                 mutex_lock(&ls->ls_waiters_mutex);
4104                 list_del_init(&lkb->lkb_wait_reply);
4105                 mutex_unlock(&ls->ls_waiters_mutex);
4106                 unhold_lkb(lkb); /* for waiters list */
4107
4108                 if (oc || ou) {
4109                         /* do an unlock or cancel instead of resending */
4110                         switch (mstype) {
4111                         case DLM_MSG_LOOKUP:
4112                         case DLM_MSG_REQUEST:
4113                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4114                                                         -DLM_ECANCEL);
4115                                 unhold_lkb(lkb); /* undoes create_lkb() */
4116                                 break;
4117                         case DLM_MSG_CONVERT:
4118                                 if (oc) {
4119                                         queue_cast(r, lkb, -DLM_ECANCEL);
4120                                 } else {
4121                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4122                                         _unlock_lock(r, lkb);
4123                                 }
4124                                 break;
4125                         default:
4126                                 err = 1;
4127                         }
4128                 } else {
4129                         switch (mstype) {
4130                         case DLM_MSG_LOOKUP:
4131                         case DLM_MSG_REQUEST:
4132                                 _request_lock(r, lkb);
4133                                 if (is_master(r))
4134                                         confirm_master(r, 0);
4135                                 break;
4136                         case DLM_MSG_CONVERT:
4137                                 _convert_lock(r, lkb);
4138                                 break;
4139                         default:
4140                                 err = 1;
4141                         }
4142                 }
4143
4144                 if (err)
4145                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4146                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4147                 unlock_rsb(r);
4148                 put_rsb(r);
4149                 dlm_put_lkb(lkb);
4150         }
4151
4152         return error;
4153 }
4154
4155 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4156                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4157 {
4158         struct dlm_ls *ls = r->res_ls;
4159         struct dlm_lkb *lkb, *safe;
4160
4161         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4162                 if (test(ls, lkb)) {
4163                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4164                         del_lkb(r, lkb);
4165                         /* this put should free the lkb */
4166                         if (!dlm_put_lkb(lkb))
4167                                 log_error(ls, "purged lkb not released");
4168                 }
4169         }
4170 }
4171
4172 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4173 {
4174         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4175 }
4176
4177 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4178 {
4179         return is_master_copy(lkb);
4180 }
4181
4182 static void purge_dead_locks(struct dlm_rsb *r)
4183 {
4184         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4185         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4186         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4187 }
4188
4189 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4190 {
4191         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4192         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4193         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4194 }
4195
4196 /* Get rid of locks held by nodes that are gone. */
4197
4198 int dlm_purge_locks(struct dlm_ls *ls)
4199 {
4200         struct dlm_rsb *r;
4201
4202         log_debug(ls, "dlm_purge_locks");
4203
4204         down_write(&ls->ls_root_sem);
4205         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4206                 hold_rsb(r);
4207                 lock_rsb(r);
4208                 if (is_master(r))
4209                         purge_dead_locks(r);
4210                 unlock_rsb(r);
4211                 unhold_rsb(r);
4212
4213                 schedule();
4214         }
4215         up_write(&ls->ls_root_sem);
4216
4217         return 0;
4218 }
4219
4220 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4221 {
4222         struct dlm_rsb *r, *r_ret = NULL;
4223
4224         read_lock(&ls->ls_rsbtbl[bucket].lock);
4225         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4226                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4227                         continue;
4228                 hold_rsb(r);
4229                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4230                 r_ret = r;
4231                 break;
4232         }
4233         read_unlock(&ls->ls_rsbtbl[bucket].lock);
4234         return r_ret;
4235 }
4236
4237 void dlm_grant_after_purge(struct dlm_ls *ls)
4238 {
4239         struct dlm_rsb *r;
4240         int bucket = 0;
4241
4242         while (1) {
4243                 r = find_purged_rsb(ls, bucket);
4244                 if (!r) {
4245                         if (bucket == ls->ls_rsbtbl_size - 1)
4246                                 break;
4247                         bucket++;
4248                         continue;
4249                 }
4250                 lock_rsb(r);
4251                 if (is_master(r)) {
4252                         grant_pending_locks(r);
4253                         confirm_master(r, 0);
4254                 }
4255                 unlock_rsb(r);
4256                 put_rsb(r);
4257                 schedule();
4258         }
4259 }
4260
4261 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4262                                          uint32_t remid)
4263 {
4264         struct dlm_lkb *lkb;
4265
4266         list_for_each_entry(lkb, head, lkb_statequeue) {
4267                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4268                         return lkb;
4269         }
4270         return NULL;
4271 }
4272
4273 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4274                                     uint32_t remid)
4275 {
4276         struct dlm_lkb *lkb;
4277
4278         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4279         if (lkb)
4280                 return lkb;
4281         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4282         if (lkb)
4283                 return lkb;
4284         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4285         if (lkb)
4286                 return lkb;
4287         return NULL;
4288 }
4289
4290 /* needs at least dlm_rcom + rcom_lock */
4291 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4292                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4293 {
4294         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4295
4296         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4297         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4298         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4299         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4300         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4301         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4302         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4303         lkb->lkb_rqmode = rl->rl_rqmode;
4304         lkb->lkb_grmode = rl->rl_grmode;
4305         /* don't set lkb_status because add_lkb wants to itself */
4306
4307         lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4308         lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4309
4310         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4311                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4312                          sizeof(struct rcom_lock);
4313                 if (lvblen > ls->ls_lvblen)
4314                         return -EINVAL;
4315                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4316                 if (!lkb->lkb_lvbptr)
4317                         return -ENOMEM;
4318                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4319         }
4320
4321         /* Conversions between PR and CW (middle modes) need special handling.
4322            The real granted mode of these converting locks cannot be determined
4323            until all locks have been rebuilt on the rsb (recover_conversion) */
4324
4325         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4326             middle_conversion(lkb)) {
4327                 rl->rl_status = DLM_LKSTS_CONVERT;
4328                 lkb->lkb_grmode = DLM_LOCK_IV;
4329                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4330         }
4331
4332         return 0;
4333 }
4334
4335 /* This lkb may have been recovered in a previous aborted recovery so we need
4336    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4337    If so we just send back a standard reply.  If not, we create a new lkb with
4338    the given values and send back our lkid.  We send back our lkid by sending
4339    back the rcom_lock struct we got but with the remid field filled in. */
4340
4341 /* needs at least dlm_rcom + rcom_lock */
4342 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4343 {
4344         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4345         struct dlm_rsb *r;
4346         struct dlm_lkb *lkb;
4347         int error;
4348
4349         if (rl->rl_parent_lkid) {
4350                 error = -EOPNOTSUPP;
4351                 goto out;
4352         }
4353
4354         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4355                          R_MASTER, &r);
4356         if (error)
4357                 goto out;
4358
4359         lock_rsb(r);
4360
4361         lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4362         if (lkb) {
4363                 error = -EEXIST;
4364                 goto out_remid;
4365         }
4366
4367         error = create_lkb(ls, &lkb);
4368         if (error)
4369                 goto out_unlock;
4370
4371         error = receive_rcom_lock_args(ls, lkb, r, rc);
4372         if (error) {
4373                 __put_lkb(ls, lkb);
4374                 goto out_unlock;
4375         }
4376
4377         attach_lkb(r, lkb);
4378         add_lkb(r, lkb, rl->rl_status);
4379         error = 0;
4380
4381  out_remid:
4382         /* this is the new value returned to the lock holder for
4383            saving in its process-copy lkb */
4384         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4385
4386  out_unlock:
4387         unlock_rsb(r);
4388         put_rsb(r);
4389  out:
4390         if (error)
4391                 log_debug(ls, "recover_master_copy %d %x", error,
4392                           le32_to_cpu(rl->rl_lkid));
4393         rl->rl_result = cpu_to_le32(error);
4394         return error;
4395 }
4396
4397 /* needs at least dlm_rcom + rcom_lock */
4398 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4399 {
4400         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4401         struct dlm_rsb *r;
4402         struct dlm_lkb *lkb;
4403         int error;
4404
4405         error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4406         if (error) {
4407                 log_error(ls, "recover_process_copy no lkid %x",
4408                                 le32_to_cpu(rl->rl_lkid));
4409                 return error;
4410         }
4411
4412         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4413
4414         error = le32_to_cpu(rl->rl_result);
4415
4416         r = lkb->lkb_resource;
4417         hold_rsb(r);
4418         lock_rsb(r);
4419
4420         switch (error) {
4421         case -EBADR:
4422                 /* There's a chance the new master received our lock before
4423                    dlm_recover_master_reply(), this wouldn't happen if we did
4424                    a barrier between recover_masters and recover_locks. */
4425                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4426                           (unsigned long)r, r->res_name);
4427                 dlm_send_rcom_lock(r, lkb);
4428                 goto out;
4429         case -EEXIST:
4430                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4431                 /* fall through */
4432         case 0:
4433                 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4434                 break;
4435         default:
4436                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4437                           error, lkb->lkb_id);
4438         }
4439
4440         /* an ack for dlm_recover_locks() which waits for replies from
4441            all the locks it sends to new masters */
4442         dlm_recovered_lock(r);
4443  out:
4444         unlock_rsb(r);
4445         put_rsb(r);
4446         dlm_put_lkb(lkb);
4447
4448         return 0;
4449 }
4450
4451 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4452                      int mode, uint32_t flags, void *name, unsigned int namelen,
4453                      unsigned long timeout_cs)
4454 {
4455         struct dlm_lkb *lkb;
4456         struct dlm_args args;
4457         int error;
4458
4459         dlm_lock_recovery(ls);
4460
4461         error = create_lkb(ls, &lkb);
4462         if (error) {
4463                 kfree(ua);
4464                 goto out;
4465         }
4466
4467         if (flags & DLM_LKF_VALBLK) {
4468                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4469                 if (!ua->lksb.sb_lvbptr) {
4470                         kfree(ua);
4471                         __put_lkb(ls, lkb);
4472                         error = -ENOMEM;
4473                         goto out;
4474                 }
4475         }
4476
4477         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4478            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4479            lock and that lkb_astparam is the dlm_user_args structure. */
4480
4481         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4482                               fake_astfn, ua, fake_bastfn, &args);
4483         lkb->lkb_flags |= DLM_IFL_USER;
4484         ua->old_mode = DLM_LOCK_IV;
4485
4486         if (error) {
4487                 __put_lkb(ls, lkb);
4488                 goto out;
4489         }
4490
4491         error = request_lock(ls, lkb, name, namelen, &args);
4492
4493         switch (error) {
4494         case 0:
4495                 break;
4496         case -EINPROGRESS:
4497                 error = 0;
4498                 break;
4499         case -EAGAIN:
4500                 error = 0;
4501                 /* fall through */
4502         default:
4503                 __put_lkb(ls, lkb);
4504                 goto out;
4505         }
4506
4507         /* add this new lkb to the per-process list of locks */
4508         spin_lock(&ua->proc->locks_spin);
4509         hold_lkb(lkb);
4510         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4511         spin_unlock(&ua->proc->locks_spin);
4512  out:
4513         dlm_unlock_recovery(ls);
4514         return error;
4515 }
4516
4517 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4518                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4519                      unsigned long timeout_cs)
4520 {
4521         struct dlm_lkb *lkb;
4522         struct dlm_args args;
4523         struct dlm_user_args *ua;
4524         int error;
4525
4526         dlm_lock_recovery(ls);
4527
4528         error = find_lkb(ls, lkid, &lkb);
4529         if (error)
4530                 goto out;
4531
4532         /* user can change the params on its lock when it converts it, or
4533            add an lvb that didn't exist before */
4534
4535         ua = lkb->lkb_ua;
4536
4537         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4538                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4539                 if (!ua->lksb.sb_lvbptr) {
4540                         error = -ENOMEM;
4541                         goto out_put;
4542                 }
4543         }
4544         if (lvb_in && ua->lksb.sb_lvbptr)
4545                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4546
4547         ua->xid = ua_tmp->xid;
4548         ua->castparam = ua_tmp->castparam;
4549         ua->castaddr = ua_tmp->castaddr;
4550         ua->bastparam = ua_tmp->bastparam;
4551         ua->bastaddr = ua_tmp->bastaddr;
4552         ua->user_lksb = ua_tmp->user_lksb;
4553         ua->old_mode = lkb->lkb_grmode;
4554
4555         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4556                               fake_astfn, ua, fake_bastfn, &args);
4557         if (error)
4558                 goto out_put;
4559
4560         error = convert_lock(ls, lkb, &args);
4561
4562         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4563                 error = 0;
4564  out_put:
4565         dlm_put_lkb(lkb);
4566  out:
4567         dlm_unlock_recovery(ls);
4568         kfree(ua_tmp);
4569         return error;
4570 }
4571
4572 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4573                     uint32_t flags, uint32_t lkid, char *lvb_in)
4574 {
4575         struct dlm_lkb *lkb;
4576         struct dlm_args args;
4577         struct dlm_user_args *ua;
4578         int error;
4579
4580         dlm_lock_recovery(ls);
4581
4582         error = find_lkb(ls, lkid, &lkb);
4583         if (error)
4584                 goto out;
4585
4586         ua = lkb->lkb_ua;
4587
4588         if (lvb_in && ua->lksb.sb_lvbptr)
4589                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4590         if (ua_tmp->castparam)
4591                 ua->castparam = ua_tmp->castparam;
4592         ua->user_lksb = ua_tmp->user_lksb;
4593
4594         error = set_unlock_args(flags, ua, &args);
4595         if (error)
4596                 goto out_put;
4597
4598         error = unlock_lock(ls, lkb, &args);
4599
4600         if (error == -DLM_EUNLOCK)
4601                 error = 0;
4602         /* from validate_unlock_args() */
4603         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4604                 error = 0;
4605         if (error)
4606                 goto out_put;
4607
4608         spin_lock(&ua->proc->locks_spin);
4609         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4610         if (!list_empty(&lkb->lkb_ownqueue))
4611                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4612         spin_unlock(&ua->proc->locks_spin);
4613  out_put:
4614         dlm_put_lkb(lkb);
4615  out:
4616         dlm_unlock_recovery(ls);
4617         kfree(ua_tmp);
4618         return error;
4619 }
4620
4621 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4622                     uint32_t flags, uint32_t lkid)
4623 {
4624         struct dlm_lkb *lkb;
4625         struct dlm_args args;
4626         struct dlm_user_args *ua;
4627         int error;
4628
4629         dlm_lock_recovery(ls);
4630
4631         error = find_lkb(ls, lkid, &lkb);
4632         if (error)
4633                 goto out;
4634
4635         ua = lkb->lkb_ua;
4636         if (ua_tmp->castparam)
4637                 ua->castparam = ua_tmp->castparam;
4638         ua->user_lksb = ua_tmp->user_lksb;
4639
4640         error = set_unlock_args(flags, ua, &args);
4641         if (error)
4642                 goto out_put;
4643
4644         error = cancel_lock(ls, lkb, &args);
4645
4646         if (error == -DLM_ECANCEL)
4647                 error = 0;
4648         /* from validate_unlock_args() */
4649         if (error == -EBUSY)
4650                 error = 0;
4651  out_put:
4652         dlm_put_lkb(lkb);
4653  out:
4654         dlm_unlock_recovery(ls);
4655         kfree(ua_tmp);
4656         return error;
4657 }
4658
4659 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4660 {
4661         struct dlm_lkb *lkb;
4662         struct dlm_args args;
4663         struct dlm_user_args *ua;
4664         struct dlm_rsb *r;
4665         int error;
4666
4667         dlm_lock_recovery(ls);
4668
4669         error = find_lkb(ls, lkid, &lkb);
4670         if (error)
4671                 goto out;
4672
4673         ua = lkb->lkb_ua;
4674
4675         error = set_unlock_args(flags, ua, &args);
4676         if (error)
4677                 goto out_put;
4678
4679         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4680
4681         r = lkb->lkb_resource;
4682         hold_rsb(r);
4683         lock_rsb(r);
4684
4685         error = validate_unlock_args(lkb, &args);
4686         if (error)
4687                 goto out_r;
4688         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4689
4690         error = _cancel_lock(r, lkb);
4691  out_r:
4692         unlock_rsb(r);
4693         put_rsb(r);
4694
4695         if (error == -DLM_ECANCEL)
4696                 error = 0;
4697         /* from validate_unlock_args() */
4698         if (error == -EBUSY)
4699                 error = 0;
4700  out_put:
4701         dlm_put_lkb(lkb);
4702  out:
4703         dlm_unlock_recovery(ls);
4704         return error;
4705 }
4706
4707 /* lkb's that are removed from the waiters list by revert are just left on the
4708    orphans list with the granted orphan locks, to be freed by purge */
4709
4710 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4711 {
4712         struct dlm_args args;
4713         int error;
4714
4715         hold_lkb(lkb);
4716         mutex_lock(&ls->ls_orphans_mutex);
4717         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4718         mutex_unlock(&ls->ls_orphans_mutex);
4719
4720         set_unlock_args(0, lkb->lkb_ua, &args);
4721
4722         error = cancel_lock(ls, lkb, &args);
4723         if (error == -DLM_ECANCEL)
4724                 error = 0;
4725         return error;
4726 }
4727
4728 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4729    Regardless of what rsb queue the lock is on, it's removed and freed. */
4730
4731 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4732 {
4733         struct dlm_args args;
4734         int error;
4735
4736         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4737
4738         error = unlock_lock(ls, lkb, &args);
4739         if (error == -DLM_EUNLOCK)
4740                 error = 0;
4741         return error;
4742 }
4743
4744 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4745    (which does lock_rsb) due to deadlock with receiving a message that does
4746    lock_rsb followed by dlm_user_add_ast() */
4747
4748 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4749                                      struct dlm_user_proc *proc)
4750 {
4751         struct dlm_lkb *lkb = NULL;
4752
4753         mutex_lock(&ls->ls_clear_proc_locks);
4754         if (list_empty(&proc->locks))
4755                 goto out;
4756
4757         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4758         list_del_init(&lkb->lkb_ownqueue);
4759
4760         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4761                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4762         else
4763                 lkb->lkb_flags |= DLM_IFL_DEAD;
4764  out:
4765         mutex_unlock(&ls->ls_clear_proc_locks);
4766         return lkb;
4767 }
4768
4769 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4770    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4771    which we clear here. */
4772
4773 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4774    list, and no more device_writes should add lkb's to proc->locks list; so we
4775    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4776    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4777    them ourself. */
4778
4779 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4780 {
4781         struct dlm_lkb *lkb, *safe;
4782
4783         dlm_lock_recovery(ls);
4784
4785         while (1) {
4786                 lkb = del_proc_lock(ls, proc);
4787                 if (!lkb)
4788                         break;
4789                 del_timeout(lkb);
4790                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4791                         orphan_proc_lock(ls, lkb);
4792                 else
4793                         unlock_proc_lock(ls, lkb);
4794
4795                 /* this removes the reference for the proc->locks list
4796                    added by dlm_user_request, it may result in the lkb
4797                    being freed */
4798
4799                 dlm_put_lkb(lkb);
4800         }
4801
4802         mutex_lock(&ls->ls_clear_proc_locks);
4803
4804         /* in-progress unlocks */
4805         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4806                 list_del_init(&lkb->lkb_ownqueue);
4807                 lkb->lkb_flags |= DLM_IFL_DEAD;
4808                 dlm_put_lkb(lkb);
4809         }
4810
4811         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4812                 lkb->lkb_ast_type = 0;
4813                 list_del(&lkb->lkb_astqueue);
4814                 dlm_put_lkb(lkb);
4815         }
4816
4817         mutex_unlock(&ls->ls_clear_proc_locks);
4818         dlm_unlock_recovery(ls);
4819 }
4820
4821 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4822 {
4823         struct dlm_lkb *lkb, *safe;
4824
4825         while (1) {
4826                 lkb = NULL;
4827                 spin_lock(&proc->locks_spin);
4828                 if (!list_empty(&proc->locks)) {
4829                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4830                                          lkb_ownqueue);
4831                         list_del_init(&lkb->lkb_ownqueue);
4832                 }
4833                 spin_unlock(&proc->locks_spin);
4834
4835                 if (!lkb)
4836                         break;
4837
4838                 lkb->lkb_flags |= DLM_IFL_DEAD;
4839                 unlock_proc_lock(ls, lkb);
4840                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4841         }
4842
4843         spin_lock(&proc->locks_spin);
4844         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4845                 list_del_init(&lkb->lkb_ownqueue);
4846                 lkb->lkb_flags |= DLM_IFL_DEAD;
4847                 dlm_put_lkb(lkb);
4848         }
4849         spin_unlock(&proc->locks_spin);
4850
4851         spin_lock(&proc->asts_spin);
4852         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4853                 list_del(&lkb->lkb_astqueue);
4854                 dlm_put_lkb(lkb);
4855         }
4856         spin_unlock(&proc->asts_spin);
4857 }
4858
4859 /* pid of 0 means purge all orphans */
4860
4861 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4862 {
4863         struct dlm_lkb *lkb, *safe;
4864
4865         mutex_lock(&ls->ls_orphans_mutex);
4866         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4867                 if (pid && lkb->lkb_ownpid != pid)
4868                         continue;
4869                 unlock_proc_lock(ls, lkb);
4870                 list_del_init(&lkb->lkb_ownqueue);
4871                 dlm_put_lkb(lkb);
4872         }
4873         mutex_unlock(&ls->ls_orphans_mutex);
4874 }
4875
4876 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4877 {
4878         struct dlm_message *ms;
4879         struct dlm_mhandle *mh;
4880         int error;
4881
4882         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4883                                 DLM_MSG_PURGE, &ms, &mh);
4884         if (error)
4885                 return error;
4886         ms->m_nodeid = nodeid;
4887         ms->m_pid = pid;
4888
4889         return send_message(mh, ms);
4890 }
4891
4892 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4893                    int nodeid, int pid)
4894 {
4895         int error = 0;
4896
4897         if (nodeid != dlm_our_nodeid()) {
4898                 error = send_purge(ls, nodeid, pid);
4899         } else {
4900                 dlm_lock_recovery(ls);
4901                 if (pid == current->pid)
4902                         purge_proc_locks(ls, proc);
4903                 else
4904                         do_purge(ls, nodeid, pid);
4905                 dlm_unlock_recovery(ls);
4906         }
4907         return error;
4908 }
4909