Merge branches 'release' and 'autoload' into release
[linux-2.6] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87                                     struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132
133 #define modes_compat(gr, rq) \
134         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167
168 void dlm_print_rsb(struct dlm_rsb *r)
169 {
170         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171                r->res_nodeid, r->res_flags, r->res_first_lkid,
172                r->res_recover_locks_count, r->res_name);
173 }
174
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177         struct dlm_lkb *lkb;
178
179         dlm_print_rsb(r);
180
181         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183         printk(KERN_ERR "rsb lookup list\n");
184         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185                 dlm_print_lkb(lkb);
186         printk(KERN_ERR "rsb grant queue:\n");
187         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb convert queue:\n");
190         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb wait queue:\n");
193         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195 }
196
197 /* Threads cannot use the lockspace while it's being recovered */
198
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201         down_read(&ls->ls_in_recovery);
202 }
203
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206         up_read(&ls->ls_in_recovery);
207 }
208
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211         return down_read_trylock(&ls->ls_in_recovery);
212 }
213
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242         return !!r->res_nodeid;
243 }
244
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261                 return 1;
262         return 0;
263 }
264
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283                                   DLM_IFL_OVERLAP_CANCEL));
284 }
285
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288         if (is_master_copy(lkb))
289                 return;
290
291         del_timeout(lkb);
292
293         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294
295         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
296            timeout caused the cancel then return -ETIMEDOUT */
297         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299                 rv = -ETIMEDOUT;
300         }
301
302         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304                 rv = -EDEADLK;
305         }
306
307         lkb->lkb_lksb->sb_status = rv;
308         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309
310         dlm_add_ast(lkb, AST_COMP);
311 }
312
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315         queue_cast(r, lkb,
316                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321         if (is_master_copy(lkb))
322                 send_bast(r, lkb, rqmode);
323         else {
324                 lkb->lkb_bastmode = rqmode;
325                 dlm_add_ast(lkb, AST_BAST);
326         }
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
334 {
335         struct dlm_rsb *r;
336
337         r = dlm_allocate_rsb(ls, len);
338         if (!r)
339                 return NULL;
340
341         r->res_ls = ls;
342         r->res_length = len;
343         memcpy(r->res_name, name, len);
344         mutex_init(&r->res_mutex);
345
346         INIT_LIST_HEAD(&r->res_lookup);
347         INIT_LIST_HEAD(&r->res_grantqueue);
348         INIT_LIST_HEAD(&r->res_convertqueue);
349         INIT_LIST_HEAD(&r->res_waitqueue);
350         INIT_LIST_HEAD(&r->res_root_list);
351         INIT_LIST_HEAD(&r->res_recover_list);
352
353         return r;
354 }
355
356 static int search_rsb_list(struct list_head *head, char *name, int len,
357                            unsigned int flags, struct dlm_rsb **r_ret)
358 {
359         struct dlm_rsb *r;
360         int error = 0;
361
362         list_for_each_entry(r, head, res_hashchain) {
363                 if (len == r->res_length && !memcmp(name, r->res_name, len))
364                         goto found;
365         }
366         return -EBADR;
367
368  found:
369         if (r->res_nodeid && (flags & R_MASTER))
370                 error = -ENOTBLK;
371         *r_ret = r;
372         return error;
373 }
374
375 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
376                        unsigned int flags, struct dlm_rsb **r_ret)
377 {
378         struct dlm_rsb *r;
379         int error;
380
381         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
382         if (!error) {
383                 kref_get(&r->res_ref);
384                 goto out;
385         }
386         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
387         if (error)
388                 goto out;
389
390         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
391
392         if (dlm_no_directory(ls))
393                 goto out;
394
395         if (r->res_nodeid == -1) {
396                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
397                 r->res_first_lkid = 0;
398         } else if (r->res_nodeid > 0) {
399                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
400                 r->res_first_lkid = 0;
401         } else {
402                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
403                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
404         }
405  out:
406         *r_ret = r;
407         return error;
408 }
409
410 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
411                       unsigned int flags, struct dlm_rsb **r_ret)
412 {
413         int error;
414         write_lock(&ls->ls_rsbtbl[b].lock);
415         error = _search_rsb(ls, name, len, b, flags, r_ret);
416         write_unlock(&ls->ls_rsbtbl[b].lock);
417         return error;
418 }
419
420 /*
421  * Find rsb in rsbtbl and potentially create/add one
422  *
423  * Delaying the release of rsb's has a similar benefit to applications keeping
424  * NL locks on an rsb, but without the guarantee that the cached master value
425  * will still be valid when the rsb is reused.  Apps aren't always smart enough
426  * to keep NL locks on an rsb that they may lock again shortly; this can lead
427  * to excessive master lookups and removals if we don't delay the release.
428  *
429  * Searching for an rsb means looking through both the normal list and toss
430  * list.  When found on the toss list the rsb is moved to the normal list with
431  * ref count of 1; when found on normal list the ref count is incremented.
432  */
433
434 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
435                     unsigned int flags, struct dlm_rsb **r_ret)
436 {
437         struct dlm_rsb *r, *tmp;
438         uint32_t hash, bucket;
439         int error = 0;
440
441         if (dlm_no_directory(ls))
442                 flags |= R_CREATE;
443
444         hash = jhash(name, namelen, 0);
445         bucket = hash & (ls->ls_rsbtbl_size - 1);
446
447         error = search_rsb(ls, name, namelen, bucket, flags, &r);
448         if (!error)
449                 goto out;
450
451         if (error == -EBADR && !(flags & R_CREATE))
452                 goto out;
453
454         /* the rsb was found but wasn't a master copy */
455         if (error == -ENOTBLK)
456                 goto out;
457
458         error = -ENOMEM;
459         r = create_rsb(ls, name, namelen);
460         if (!r)
461                 goto out;
462
463         r->res_hash = hash;
464         r->res_bucket = bucket;
465         r->res_nodeid = -1;
466         kref_init(&r->res_ref);
467
468         /* With no directory, the master can be set immediately */
469         if (dlm_no_directory(ls)) {
470                 int nodeid = dlm_dir_nodeid(r);
471                 if (nodeid == dlm_our_nodeid())
472                         nodeid = 0;
473                 r->res_nodeid = nodeid;
474         }
475
476         write_lock(&ls->ls_rsbtbl[bucket].lock);
477         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
478         if (!error) {
479                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
480                 dlm_free_rsb(r);
481                 r = tmp;
482                 goto out;
483         }
484         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
485         write_unlock(&ls->ls_rsbtbl[bucket].lock);
486         error = 0;
487  out:
488         *r_ret = r;
489         return error;
490 }
491
492 /* This is only called to add a reference when the code already holds
493    a valid reference to the rsb, so there's no need for locking. */
494
495 static inline void hold_rsb(struct dlm_rsb *r)
496 {
497         kref_get(&r->res_ref);
498 }
499
500 void dlm_hold_rsb(struct dlm_rsb *r)
501 {
502         hold_rsb(r);
503 }
504
505 static void toss_rsb(struct kref *kref)
506 {
507         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
508         struct dlm_ls *ls = r->res_ls;
509
510         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
511         kref_init(&r->res_ref);
512         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
513         r->res_toss_time = jiffies;
514         if (r->res_lvbptr) {
515                 dlm_free_lvb(r->res_lvbptr);
516                 r->res_lvbptr = NULL;
517         }
518 }
519
520 /* When all references to the rsb are gone it's transfered to
521    the tossed list for later disposal. */
522
523 static void put_rsb(struct dlm_rsb *r)
524 {
525         struct dlm_ls *ls = r->res_ls;
526         uint32_t bucket = r->res_bucket;
527
528         write_lock(&ls->ls_rsbtbl[bucket].lock);
529         kref_put(&r->res_ref, toss_rsb);
530         write_unlock(&ls->ls_rsbtbl[bucket].lock);
531 }
532
533 void dlm_put_rsb(struct dlm_rsb *r)
534 {
535         put_rsb(r);
536 }
537
538 /* See comment for unhold_lkb */
539
540 static void unhold_rsb(struct dlm_rsb *r)
541 {
542         int rv;
543         rv = kref_put(&r->res_ref, toss_rsb);
544         DLM_ASSERT(!rv, dlm_dump_rsb(r););
545 }
546
547 static void kill_rsb(struct kref *kref)
548 {
549         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
550
551         /* All work is done after the return from kref_put() so we
552            can release the write_lock before the remove and free. */
553
554         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
555         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
556         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
557         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
558         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
559         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
560 }
561
562 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
563    The rsb must exist as long as any lkb's for it do. */
564
565 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
566 {
567         hold_rsb(r);
568         lkb->lkb_resource = r;
569 }
570
571 static void detach_lkb(struct dlm_lkb *lkb)
572 {
573         if (lkb->lkb_resource) {
574                 put_rsb(lkb->lkb_resource);
575                 lkb->lkb_resource = NULL;
576         }
577 }
578
579 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
580 {
581         struct dlm_lkb *lkb, *tmp;
582         uint32_t lkid = 0;
583         uint16_t bucket;
584
585         lkb = dlm_allocate_lkb(ls);
586         if (!lkb)
587                 return -ENOMEM;
588
589         lkb->lkb_nodeid = -1;
590         lkb->lkb_grmode = DLM_LOCK_IV;
591         kref_init(&lkb->lkb_ref);
592         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
593         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
594         INIT_LIST_HEAD(&lkb->lkb_time_list);
595
596         get_random_bytes(&bucket, sizeof(bucket));
597         bucket &= (ls->ls_lkbtbl_size - 1);
598
599         write_lock(&ls->ls_lkbtbl[bucket].lock);
600
601         /* counter can roll over so we must verify lkid is not in use */
602
603         while (lkid == 0) {
604                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
605
606                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
607                                     lkb_idtbl_list) {
608                         if (tmp->lkb_id != lkid)
609                                 continue;
610                         lkid = 0;
611                         break;
612                 }
613         }
614
615         lkb->lkb_id = lkid;
616         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
617         write_unlock(&ls->ls_lkbtbl[bucket].lock);
618
619         *lkb_ret = lkb;
620         return 0;
621 }
622
623 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
624 {
625         struct dlm_lkb *lkb;
626         uint16_t bucket = (lkid >> 16);
627
628         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
629                 if (lkb->lkb_id == lkid)
630                         return lkb;
631         }
632         return NULL;
633 }
634
635 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
636 {
637         struct dlm_lkb *lkb;
638         uint16_t bucket = (lkid >> 16);
639
640         if (bucket >= ls->ls_lkbtbl_size)
641                 return -EBADSLT;
642
643         read_lock(&ls->ls_lkbtbl[bucket].lock);
644         lkb = __find_lkb(ls, lkid);
645         if (lkb)
646                 kref_get(&lkb->lkb_ref);
647         read_unlock(&ls->ls_lkbtbl[bucket].lock);
648
649         *lkb_ret = lkb;
650         return lkb ? 0 : -ENOENT;
651 }
652
653 static void kill_lkb(struct kref *kref)
654 {
655         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
656
657         /* All work is done after the return from kref_put() so we
658            can release the write_lock before the detach_lkb */
659
660         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
661 }
662
663 /* __put_lkb() is used when an lkb may not have an rsb attached to
664    it so we need to provide the lockspace explicitly */
665
666 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
667 {
668         uint16_t bucket = (lkb->lkb_id >> 16);
669
670         write_lock(&ls->ls_lkbtbl[bucket].lock);
671         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
672                 list_del(&lkb->lkb_idtbl_list);
673                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
674
675                 detach_lkb(lkb);
676
677                 /* for local/process lkbs, lvbptr points to caller's lksb */
678                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
679                         dlm_free_lvb(lkb->lkb_lvbptr);
680                 dlm_free_lkb(lkb);
681                 return 1;
682         } else {
683                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
684                 return 0;
685         }
686 }
687
688 int dlm_put_lkb(struct dlm_lkb *lkb)
689 {
690         struct dlm_ls *ls;
691
692         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
693         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
694
695         ls = lkb->lkb_resource->res_ls;
696         return __put_lkb(ls, lkb);
697 }
698
699 /* This is only called to add a reference when the code already holds
700    a valid reference to the lkb, so there's no need for locking. */
701
702 static inline void hold_lkb(struct dlm_lkb *lkb)
703 {
704         kref_get(&lkb->lkb_ref);
705 }
706
707 /* This is called when we need to remove a reference and are certain
708    it's not the last ref.  e.g. del_lkb is always called between a
709    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
710    put_lkb would work fine, but would involve unnecessary locking */
711
712 static inline void unhold_lkb(struct dlm_lkb *lkb)
713 {
714         int rv;
715         rv = kref_put(&lkb->lkb_ref, kill_lkb);
716         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
717 }
718
719 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
720                             int mode)
721 {
722         struct dlm_lkb *lkb = NULL;
723
724         list_for_each_entry(lkb, head, lkb_statequeue)
725                 if (lkb->lkb_rqmode < mode)
726                         break;
727
728         if (!lkb)
729                 list_add_tail(new, head);
730         else
731                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
732 }
733
734 /* add/remove lkb to rsb's grant/convert/wait queue */
735
736 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
737 {
738         kref_get(&lkb->lkb_ref);
739
740         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
741
742         lkb->lkb_status = status;
743
744         switch (status) {
745         case DLM_LKSTS_WAITING:
746                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
747                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
748                 else
749                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
750                 break;
751         case DLM_LKSTS_GRANTED:
752                 /* convention says granted locks kept in order of grmode */
753                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
754                                 lkb->lkb_grmode);
755                 break;
756         case DLM_LKSTS_CONVERT:
757                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
758                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
759                 else
760                         list_add_tail(&lkb->lkb_statequeue,
761                                       &r->res_convertqueue);
762                 break;
763         default:
764                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
765         }
766 }
767
768 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
769 {
770         lkb->lkb_status = 0;
771         list_del(&lkb->lkb_statequeue);
772         unhold_lkb(lkb);
773 }
774
775 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
776 {
777         hold_lkb(lkb);
778         del_lkb(r, lkb);
779         add_lkb(r, lkb, sts);
780         unhold_lkb(lkb);
781 }
782
783 static int msg_reply_type(int mstype)
784 {
785         switch (mstype) {
786         case DLM_MSG_REQUEST:
787                 return DLM_MSG_REQUEST_REPLY;
788         case DLM_MSG_CONVERT:
789                 return DLM_MSG_CONVERT_REPLY;
790         case DLM_MSG_UNLOCK:
791                 return DLM_MSG_UNLOCK_REPLY;
792         case DLM_MSG_CANCEL:
793                 return DLM_MSG_CANCEL_REPLY;
794         case DLM_MSG_LOOKUP:
795                 return DLM_MSG_LOOKUP_REPLY;
796         }
797         return -1;
798 }
799
800 /* add/remove lkb from global waiters list of lkb's waiting for
801    a reply from a remote node */
802
803 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
804 {
805         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
806         int error = 0;
807
808         mutex_lock(&ls->ls_waiters_mutex);
809
810         if (is_overlap_unlock(lkb) ||
811             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
812                 error = -EINVAL;
813                 goto out;
814         }
815
816         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
817                 switch (mstype) {
818                 case DLM_MSG_UNLOCK:
819                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
820                         break;
821                 case DLM_MSG_CANCEL:
822                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
823                         break;
824                 default:
825                         error = -EBUSY;
826                         goto out;
827                 }
828                 lkb->lkb_wait_count++;
829                 hold_lkb(lkb);
830
831                 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
832                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
833                           lkb->lkb_wait_count, lkb->lkb_flags);
834                 goto out;
835         }
836
837         DLM_ASSERT(!lkb->lkb_wait_count,
838                    dlm_print_lkb(lkb);
839                    printk("wait_count %d\n", lkb->lkb_wait_count););
840
841         lkb->lkb_wait_count++;
842         lkb->lkb_wait_type = mstype;
843         hold_lkb(lkb);
844         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
845  out:
846         if (error)
847                 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
848                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
849                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
850         mutex_unlock(&ls->ls_waiters_mutex);
851         return error;
852 }
853
854 /* We clear the RESEND flag because we might be taking an lkb off the waiters
855    list as part of process_requestqueue (e.g. a lookup that has an optimized
856    request reply on the requestqueue) between dlm_recover_waiters_pre() which
857    set RESEND and dlm_recover_waiters_post() */
858
859 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
860 {
861         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
862         int overlap_done = 0;
863
864         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
865                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
866                 overlap_done = 1;
867                 goto out_del;
868         }
869
870         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
871                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
872                 overlap_done = 1;
873                 goto out_del;
874         }
875
876         /* N.B. type of reply may not always correspond to type of original
877            msg due to lookup->request optimization, verify others? */
878
879         if (lkb->lkb_wait_type) {
880                 lkb->lkb_wait_type = 0;
881                 goto out_del;
882         }
883
884         log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
885                   lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
886         return -1;
887
888  out_del:
889         /* the force-unlock/cancel has completed and we haven't recvd a reply
890            to the op that was in progress prior to the unlock/cancel; we
891            give up on any reply to the earlier op.  FIXME: not sure when/how
892            this would happen */
893
894         if (overlap_done && lkb->lkb_wait_type) {
895                 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
896                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
897                 lkb->lkb_wait_count--;
898                 lkb->lkb_wait_type = 0;
899         }
900
901         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
902
903         lkb->lkb_flags &= ~DLM_IFL_RESEND;
904         lkb->lkb_wait_count--;
905         if (!lkb->lkb_wait_count)
906                 list_del_init(&lkb->lkb_wait_reply);
907         unhold_lkb(lkb);
908         return 0;
909 }
910
911 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
912 {
913         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
914         int error;
915
916         mutex_lock(&ls->ls_waiters_mutex);
917         error = _remove_from_waiters(lkb, mstype);
918         mutex_unlock(&ls->ls_waiters_mutex);
919         return error;
920 }
921
922 /* Handles situations where we might be processing a "fake" or "stub" reply in
923    which we can't try to take waiters_mutex again. */
924
925 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
926 {
927         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
928         int error;
929
930         if (ms != &ls->ls_stub_ms)
931                 mutex_lock(&ls->ls_waiters_mutex);
932         error = _remove_from_waiters(lkb, ms->m_type);
933         if (ms != &ls->ls_stub_ms)
934                 mutex_unlock(&ls->ls_waiters_mutex);
935         return error;
936 }
937
938 static void dir_remove(struct dlm_rsb *r)
939 {
940         int to_nodeid;
941
942         if (dlm_no_directory(r->res_ls))
943                 return;
944
945         to_nodeid = dlm_dir_nodeid(r);
946         if (to_nodeid != dlm_our_nodeid())
947                 send_remove(r);
948         else
949                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
950                                      r->res_name, r->res_length);
951 }
952
953 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
954    found since they are in order of newest to oldest? */
955
956 static int shrink_bucket(struct dlm_ls *ls, int b)
957 {
958         struct dlm_rsb *r;
959         int count = 0, found;
960
961         for (;;) {
962                 found = 0;
963                 write_lock(&ls->ls_rsbtbl[b].lock);
964                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
965                                             res_hashchain) {
966                         if (!time_after_eq(jiffies, r->res_toss_time +
967                                            dlm_config.ci_toss_secs * HZ))
968                                 continue;
969                         found = 1;
970                         break;
971                 }
972
973                 if (!found) {
974                         write_unlock(&ls->ls_rsbtbl[b].lock);
975                         break;
976                 }
977
978                 if (kref_put(&r->res_ref, kill_rsb)) {
979                         list_del(&r->res_hashchain);
980                         write_unlock(&ls->ls_rsbtbl[b].lock);
981
982                         if (is_master(r))
983                                 dir_remove(r);
984                         dlm_free_rsb(r);
985                         count++;
986                 } else {
987                         write_unlock(&ls->ls_rsbtbl[b].lock);
988                         log_error(ls, "tossed rsb in use %s", r->res_name);
989                 }
990         }
991
992         return count;
993 }
994
995 void dlm_scan_rsbs(struct dlm_ls *ls)
996 {
997         int i;
998
999         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1000                 shrink_bucket(ls, i);
1001                 if (dlm_locking_stopped(ls))
1002                         break;
1003                 cond_resched();
1004         }
1005 }
1006
1007 static void add_timeout(struct dlm_lkb *lkb)
1008 {
1009         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1010
1011         if (is_master_copy(lkb)) {
1012                 lkb->lkb_timestamp = jiffies;
1013                 return;
1014         }
1015
1016         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1017             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1018                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1019                 goto add_it;
1020         }
1021         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1022                 goto add_it;
1023         return;
1024
1025  add_it:
1026         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1027         mutex_lock(&ls->ls_timeout_mutex);
1028         hold_lkb(lkb);
1029         lkb->lkb_timestamp = jiffies;
1030         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1031         mutex_unlock(&ls->ls_timeout_mutex);
1032 }
1033
1034 static void del_timeout(struct dlm_lkb *lkb)
1035 {
1036         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1037
1038         mutex_lock(&ls->ls_timeout_mutex);
1039         if (!list_empty(&lkb->lkb_time_list)) {
1040                 list_del_init(&lkb->lkb_time_list);
1041                 unhold_lkb(lkb);
1042         }
1043         mutex_unlock(&ls->ls_timeout_mutex);
1044 }
1045
1046 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1047    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1048    and then lock rsb because of lock ordering in add_timeout.  We may need
1049    to specify some special timeout-related bits in the lkb that are just to
1050    be accessed under the timeout_mutex. */
1051
1052 void dlm_scan_timeout(struct dlm_ls *ls)
1053 {
1054         struct dlm_rsb *r;
1055         struct dlm_lkb *lkb;
1056         int do_cancel, do_warn;
1057
1058         for (;;) {
1059                 if (dlm_locking_stopped(ls))
1060                         break;
1061
1062                 do_cancel = 0;
1063                 do_warn = 0;
1064                 mutex_lock(&ls->ls_timeout_mutex);
1065                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1066
1067                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1068                             time_after_eq(jiffies, lkb->lkb_timestamp +
1069                                           lkb->lkb_timeout_cs * HZ/100))
1070                                 do_cancel = 1;
1071
1072                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1073                             time_after_eq(jiffies, lkb->lkb_timestamp +
1074                                            dlm_config.ci_timewarn_cs * HZ/100))
1075                                 do_warn = 1;
1076
1077                         if (!do_cancel && !do_warn)
1078                                 continue;
1079                         hold_lkb(lkb);
1080                         break;
1081                 }
1082                 mutex_unlock(&ls->ls_timeout_mutex);
1083
1084                 if (!do_cancel && !do_warn)
1085                         break;
1086
1087                 r = lkb->lkb_resource;
1088                 hold_rsb(r);
1089                 lock_rsb(r);
1090
1091                 if (do_warn) {
1092                         /* clear flag so we only warn once */
1093                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1094                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1095                                 del_timeout(lkb);
1096                         dlm_timeout_warn(lkb);
1097                 }
1098
1099                 if (do_cancel) {
1100                         log_debug(ls, "timeout cancel %x node %d %s",
1101                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1102                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1103                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1104                         del_timeout(lkb);
1105                         _cancel_lock(r, lkb);
1106                 }
1107
1108                 unlock_rsb(r);
1109                 unhold_rsb(r);
1110                 dlm_put_lkb(lkb);
1111         }
1112 }
1113
1114 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1115    dlm_recoverd before checking/setting ls_recover_begin. */
1116
1117 void dlm_adjust_timeouts(struct dlm_ls *ls)
1118 {
1119         struct dlm_lkb *lkb;
1120         long adj = jiffies - ls->ls_recover_begin;
1121
1122         ls->ls_recover_begin = 0;
1123         mutex_lock(&ls->ls_timeout_mutex);
1124         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1125                 lkb->lkb_timestamp += adj;
1126         mutex_unlock(&ls->ls_timeout_mutex);
1127 }
1128
1129 /* lkb is master or local copy */
1130
1131 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1132 {
1133         int b, len = r->res_ls->ls_lvblen;
1134
1135         /* b=1 lvb returned to caller
1136            b=0 lvb written to rsb or invalidated
1137            b=-1 do nothing */
1138
1139         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1140
1141         if (b == 1) {
1142                 if (!lkb->lkb_lvbptr)
1143                         return;
1144
1145                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1146                         return;
1147
1148                 if (!r->res_lvbptr)
1149                         return;
1150
1151                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1152                 lkb->lkb_lvbseq = r->res_lvbseq;
1153
1154         } else if (b == 0) {
1155                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1156                         rsb_set_flag(r, RSB_VALNOTVALID);
1157                         return;
1158                 }
1159
1160                 if (!lkb->lkb_lvbptr)
1161                         return;
1162
1163                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1164                         return;
1165
1166                 if (!r->res_lvbptr)
1167                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1168
1169                 if (!r->res_lvbptr)
1170                         return;
1171
1172                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1173                 r->res_lvbseq++;
1174                 lkb->lkb_lvbseq = r->res_lvbseq;
1175                 rsb_clear_flag(r, RSB_VALNOTVALID);
1176         }
1177
1178         if (rsb_flag(r, RSB_VALNOTVALID))
1179                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1180 }
1181
1182 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1183 {
1184         if (lkb->lkb_grmode < DLM_LOCK_PW)
1185                 return;
1186
1187         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1188                 rsb_set_flag(r, RSB_VALNOTVALID);
1189                 return;
1190         }
1191
1192         if (!lkb->lkb_lvbptr)
1193                 return;
1194
1195         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1196                 return;
1197
1198         if (!r->res_lvbptr)
1199                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1200
1201         if (!r->res_lvbptr)
1202                 return;
1203
1204         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1205         r->res_lvbseq++;
1206         rsb_clear_flag(r, RSB_VALNOTVALID);
1207 }
1208
1209 /* lkb is process copy (pc) */
1210
1211 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1212                             struct dlm_message *ms)
1213 {
1214         int b;
1215
1216         if (!lkb->lkb_lvbptr)
1217                 return;
1218
1219         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1220                 return;
1221
1222         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1223         if (b == 1) {
1224                 int len = receive_extralen(ms);
1225                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1226                 lkb->lkb_lvbseq = ms->m_lvbseq;
1227         }
1228 }
1229
1230 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1231    remove_lock -- used for unlock, removes lkb from granted
1232    revert_lock -- used for cancel, moves lkb from convert to granted
1233    grant_lock  -- used for request and convert, adds lkb to granted or
1234                   moves lkb from convert or waiting to granted
1235
1236    Each of these is used for master or local copy lkb's.  There is
1237    also a _pc() variation used to make the corresponding change on
1238    a process copy (pc) lkb. */
1239
1240 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1241 {
1242         del_lkb(r, lkb);
1243         lkb->lkb_grmode = DLM_LOCK_IV;
1244         /* this unhold undoes the original ref from create_lkb()
1245            so this leads to the lkb being freed */
1246         unhold_lkb(lkb);
1247 }
1248
1249 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1250 {
1251         set_lvb_unlock(r, lkb);
1252         _remove_lock(r, lkb);
1253 }
1254
1255 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1256 {
1257         _remove_lock(r, lkb);
1258 }
1259
1260 /* returns: 0 did nothing
1261             1 moved lock to granted
1262            -1 removed lock */
1263
1264 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1265 {
1266         int rv = 0;
1267
1268         lkb->lkb_rqmode = DLM_LOCK_IV;
1269
1270         switch (lkb->lkb_status) {
1271         case DLM_LKSTS_GRANTED:
1272                 break;
1273         case DLM_LKSTS_CONVERT:
1274                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1275                 rv = 1;
1276                 break;
1277         case DLM_LKSTS_WAITING:
1278                 del_lkb(r, lkb);
1279                 lkb->lkb_grmode = DLM_LOCK_IV;
1280                 /* this unhold undoes the original ref from create_lkb()
1281                    so this leads to the lkb being freed */
1282                 unhold_lkb(lkb);
1283                 rv = -1;
1284                 break;
1285         default:
1286                 log_print("invalid status for revert %d", lkb->lkb_status);
1287         }
1288         return rv;
1289 }
1290
1291 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1292 {
1293         return revert_lock(r, lkb);
1294 }
1295
1296 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1297 {
1298         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1299                 lkb->lkb_grmode = lkb->lkb_rqmode;
1300                 if (lkb->lkb_status)
1301                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1302                 else
1303                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1304         }
1305
1306         lkb->lkb_rqmode = DLM_LOCK_IV;
1307 }
1308
1309 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1310 {
1311         set_lvb_lock(r, lkb);
1312         _grant_lock(r, lkb);
1313         lkb->lkb_highbast = 0;
1314 }
1315
1316 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1317                           struct dlm_message *ms)
1318 {
1319         set_lvb_lock_pc(r, lkb, ms);
1320         _grant_lock(r, lkb);
1321 }
1322
1323 /* called by grant_pending_locks() which means an async grant message must
1324    be sent to the requesting node in addition to granting the lock if the
1325    lkb belongs to a remote node. */
1326
1327 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1328 {
1329         grant_lock(r, lkb);
1330         if (is_master_copy(lkb))
1331                 send_grant(r, lkb);
1332         else
1333                 queue_cast(r, lkb, 0);
1334 }
1335
1336 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1337    change the granted/requested modes.  We're munging things accordingly in
1338    the process copy.
1339    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1340    conversion deadlock
1341    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1342    compatible with other granted locks */
1343
1344 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1345 {
1346         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1347                 log_print("munge_demoted %x invalid reply type %d",
1348                           lkb->lkb_id, ms->m_type);
1349                 return;
1350         }
1351
1352         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1353                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1354                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1355                 return;
1356         }
1357
1358         lkb->lkb_grmode = DLM_LOCK_NL;
1359 }
1360
1361 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1362 {
1363         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1364             ms->m_type != DLM_MSG_GRANT) {
1365                 log_print("munge_altmode %x invalid reply type %d",
1366                           lkb->lkb_id, ms->m_type);
1367                 return;
1368         }
1369
1370         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1371                 lkb->lkb_rqmode = DLM_LOCK_PR;
1372         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1373                 lkb->lkb_rqmode = DLM_LOCK_CW;
1374         else {
1375                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1376                 dlm_print_lkb(lkb);
1377         }
1378 }
1379
1380 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1381 {
1382         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1383                                            lkb_statequeue);
1384         if (lkb->lkb_id == first->lkb_id)
1385                 return 1;
1386
1387         return 0;
1388 }
1389
1390 /* Check if the given lkb conflicts with another lkb on the queue. */
1391
1392 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1393 {
1394         struct dlm_lkb *this;
1395
1396         list_for_each_entry(this, head, lkb_statequeue) {
1397                 if (this == lkb)
1398                         continue;
1399                 if (!modes_compat(this, lkb))
1400                         return 1;
1401         }
1402         return 0;
1403 }
1404
1405 /*
1406  * "A conversion deadlock arises with a pair of lock requests in the converting
1407  * queue for one resource.  The granted mode of each lock blocks the requested
1408  * mode of the other lock."
1409  *
1410  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1411  * convert queue from being granted, then deadlk/demote lkb.
1412  *
1413  * Example:
1414  * Granted Queue: empty
1415  * Convert Queue: NL->EX (first lock)
1416  *                PR->EX (second lock)
1417  *
1418  * The first lock can't be granted because of the granted mode of the second
1419  * lock and the second lock can't be granted because it's not first in the
1420  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1421  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1422  * flag set and return DEMOTED in the lksb flags.
1423  *
1424  * Originally, this function detected conv-deadlk in a more limited scope:
1425  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1426  * - if lkb1 was the first entry in the queue (not just earlier), and was
1427  *   blocked by the granted mode of lkb2, and there was nothing on the
1428  *   granted queue preventing lkb1 from being granted immediately, i.e.
1429  *   lkb2 was the only thing preventing lkb1 from being granted.
1430  *
1431  * That second condition meant we'd only say there was conv-deadlk if
1432  * resolving it (by demotion) would lead to the first lock on the convert
1433  * queue being granted right away.  It allowed conversion deadlocks to exist
1434  * between locks on the convert queue while they couldn't be granted anyway.
1435  *
1436  * Now, we detect and take action on conversion deadlocks immediately when
1437  * they're created, even if they may not be immediately consequential.  If
1438  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1439  * mode that would prevent lkb1's conversion from being granted, we do a
1440  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1441  * I think this means that the lkb_is_ahead condition below should always
1442  * be zero, i.e. there will never be conv-deadlk between two locks that are
1443  * both already on the convert queue.
1444  */
1445
1446 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1447 {
1448         struct dlm_lkb *lkb1;
1449         int lkb_is_ahead = 0;
1450
1451         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1452                 if (lkb1 == lkb2) {
1453                         lkb_is_ahead = 1;
1454                         continue;
1455                 }
1456
1457                 if (!lkb_is_ahead) {
1458                         if (!modes_compat(lkb2, lkb1))
1459                                 return 1;
1460                 } else {
1461                         if (!modes_compat(lkb2, lkb1) &&
1462                             !modes_compat(lkb1, lkb2))
1463                                 return 1;
1464                 }
1465         }
1466         return 0;
1467 }
1468
1469 /*
1470  * Return 1 if the lock can be granted, 0 otherwise.
1471  * Also detect and resolve conversion deadlocks.
1472  *
1473  * lkb is the lock to be granted
1474  *
1475  * now is 1 if the function is being called in the context of the
1476  * immediate request, it is 0 if called later, after the lock has been
1477  * queued.
1478  *
1479  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1480  */
1481
1482 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1483 {
1484         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1485
1486         /*
1487          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1488          * a new request for a NL mode lock being blocked.
1489          *
1490          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1491          * request, then it would be granted.  In essence, the use of this flag
1492          * tells the Lock Manager to expedite theis request by not considering
1493          * what may be in the CONVERTING or WAITING queues...  As of this
1494          * writing, the EXPEDITE flag can be used only with new requests for NL
1495          * mode locks.  This flag is not valid for conversion requests.
1496          *
1497          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1498          * conversion or used with a non-NL requested mode.  We also know an
1499          * EXPEDITE request is always granted immediately, so now must always
1500          * be 1.  The full condition to grant an expedite request: (now &&
1501          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1502          * therefore be shortened to just checking the flag.
1503          */
1504
1505         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1506                 return 1;
1507
1508         /*
1509          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1510          * added to the remaining conditions.
1511          */
1512
1513         if (queue_conflict(&r->res_grantqueue, lkb))
1514                 goto out;
1515
1516         /*
1517          * 6-3: By default, a conversion request is immediately granted if the
1518          * requested mode is compatible with the modes of all other granted
1519          * locks
1520          */
1521
1522         if (queue_conflict(&r->res_convertqueue, lkb))
1523                 goto out;
1524
1525         /*
1526          * 6-5: But the default algorithm for deciding whether to grant or
1527          * queue conversion requests does not by itself guarantee that such
1528          * requests are serviced on a "first come first serve" basis.  This, in
1529          * turn, can lead to a phenomenon known as "indefinate postponement".
1530          *
1531          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1532          * the system service employed to request a lock conversion.  This flag
1533          * forces certain conversion requests to be queued, even if they are
1534          * compatible with the granted modes of other locks on the same
1535          * resource.  Thus, the use of this flag results in conversion requests
1536          * being ordered on a "first come first servce" basis.
1537          *
1538          * DCT: This condition is all about new conversions being able to occur
1539          * "in place" while the lock remains on the granted queue (assuming
1540          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1541          * doesn't _have_ to go onto the convert queue where it's processed in
1542          * order.  The "now" variable is necessary to distinguish converts
1543          * being received and processed for the first time now, because once a
1544          * convert is moved to the conversion queue the condition below applies
1545          * requiring fifo granting.
1546          */
1547
1548         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1549                 return 1;
1550
1551         /*
1552          * The NOORDER flag is set to avoid the standard vms rules on grant
1553          * order.
1554          */
1555
1556         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1557                 return 1;
1558
1559         /*
1560          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1561          * granted until all other conversion requests ahead of it are granted
1562          * and/or canceled.
1563          */
1564
1565         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1566                 return 1;
1567
1568         /*
1569          * 6-4: By default, a new request is immediately granted only if all
1570          * three of the following conditions are satisfied when the request is
1571          * issued:
1572          * - The queue of ungranted conversion requests for the resource is
1573          *   empty.
1574          * - The queue of ungranted new requests for the resource is empty.
1575          * - The mode of the new request is compatible with the most
1576          *   restrictive mode of all granted locks on the resource.
1577          */
1578
1579         if (now && !conv && list_empty(&r->res_convertqueue) &&
1580             list_empty(&r->res_waitqueue))
1581                 return 1;
1582
1583         /*
1584          * 6-4: Once a lock request is in the queue of ungranted new requests,
1585          * it cannot be granted until the queue of ungranted conversion
1586          * requests is empty, all ungranted new requests ahead of it are
1587          * granted and/or canceled, and it is compatible with the granted mode
1588          * of the most restrictive lock granted on the resource.
1589          */
1590
1591         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1592             first_in_list(lkb, &r->res_waitqueue))
1593                 return 1;
1594  out:
1595         return 0;
1596 }
1597
1598 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1599                           int *err)
1600 {
1601         int rv;
1602         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1603         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1604
1605         if (err)
1606                 *err = 0;
1607
1608         rv = _can_be_granted(r, lkb, now);
1609         if (rv)
1610                 goto out;
1611
1612         /*
1613          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1614          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1615          * cancels one of the locks.
1616          */
1617
1618         if (is_convert && can_be_queued(lkb) &&
1619             conversion_deadlock_detect(r, lkb)) {
1620                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1621                         lkb->lkb_grmode = DLM_LOCK_NL;
1622                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1623                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1624                         if (err)
1625                                 *err = -EDEADLK;
1626                         else {
1627                                 log_print("can_be_granted deadlock %x now %d",
1628                                           lkb->lkb_id, now);
1629                                 dlm_dump_rsb(r);
1630                         }
1631                 }
1632                 goto out;
1633         }
1634
1635         /*
1636          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1637          * to grant a request in a mode other than the normal rqmode.  It's a
1638          * simple way to provide a big optimization to applications that can
1639          * use them.
1640          */
1641
1642         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1643                 alt = DLM_LOCK_PR;
1644         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1645                 alt = DLM_LOCK_CW;
1646
1647         if (alt) {
1648                 lkb->lkb_rqmode = alt;
1649                 rv = _can_be_granted(r, lkb, now);
1650                 if (rv)
1651                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1652                 else
1653                         lkb->lkb_rqmode = rqmode;
1654         }
1655  out:
1656         return rv;
1657 }
1658
1659 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1660    for locks pending on the convert list.  Once verified (watch for these
1661    log_prints), we should be able to just call _can_be_granted() and not
1662    bother with the demote/deadlk cases here (and there's no easy way to deal
1663    with a deadlk here, we'd have to generate something like grant_lock with
1664    the deadlk error.) */
1665
1666 /* Returns the highest requested mode of all blocked conversions; sets
1667    cw if there's a blocked conversion to DLM_LOCK_CW. */
1668
1669 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1670 {
1671         struct dlm_lkb *lkb, *s;
1672         int hi, demoted, quit, grant_restart, demote_restart;
1673         int deadlk;
1674
1675         quit = 0;
1676  restart:
1677         grant_restart = 0;
1678         demote_restart = 0;
1679         hi = DLM_LOCK_IV;
1680
1681         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1682                 demoted = is_demoted(lkb);
1683                 deadlk = 0;
1684
1685                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1686                         grant_lock_pending(r, lkb);
1687                         grant_restart = 1;
1688                         continue;
1689                 }
1690
1691                 if (!demoted && is_demoted(lkb)) {
1692                         log_print("WARN: pending demoted %x node %d %s",
1693                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1694                         demote_restart = 1;
1695                         continue;
1696                 }
1697
1698                 if (deadlk) {
1699                         log_print("WARN: pending deadlock %x node %d %s",
1700                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1701                         dlm_dump_rsb(r);
1702                         continue;
1703                 }
1704
1705                 hi = max_t(int, lkb->lkb_rqmode, hi);
1706
1707                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1708                         *cw = 1;
1709         }
1710
1711         if (grant_restart)
1712                 goto restart;
1713         if (demote_restart && !quit) {
1714                 quit = 1;
1715                 goto restart;
1716         }
1717
1718         return max_t(int, high, hi);
1719 }
1720
1721 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1722 {
1723         struct dlm_lkb *lkb, *s;
1724
1725         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1726                 if (can_be_granted(r, lkb, 0, NULL))
1727                         grant_lock_pending(r, lkb);
1728                 else {
1729                         high = max_t(int, lkb->lkb_rqmode, high);
1730                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1731                                 *cw = 1;
1732                 }
1733         }
1734
1735         return high;
1736 }
1737
1738 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1739    on either the convert or waiting queue.
1740    high is the largest rqmode of all locks blocked on the convert or
1741    waiting queue. */
1742
1743 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1744 {
1745         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1746                 if (gr->lkb_highbast < DLM_LOCK_EX)
1747                         return 1;
1748                 return 0;
1749         }
1750
1751         if (gr->lkb_highbast < high &&
1752             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1753                 return 1;
1754         return 0;
1755 }
1756
1757 static void grant_pending_locks(struct dlm_rsb *r)
1758 {
1759         struct dlm_lkb *lkb, *s;
1760         int high = DLM_LOCK_IV;
1761         int cw = 0;
1762
1763         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1764
1765         high = grant_pending_convert(r, high, &cw);
1766         high = grant_pending_wait(r, high, &cw);
1767
1768         if (high == DLM_LOCK_IV)
1769                 return;
1770
1771         /*
1772          * If there are locks left on the wait/convert queue then send blocking
1773          * ASTs to granted locks based on the largest requested mode (high)
1774          * found above.
1775          */
1776
1777         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1778                 if (lkb->lkb_bastaddr && lock_requires_bast(lkb, high, cw)) {
1779                         if (cw && high == DLM_LOCK_PR)
1780                                 queue_bast(r, lkb, DLM_LOCK_CW);
1781                         else
1782                                 queue_bast(r, lkb, high);
1783                         lkb->lkb_highbast = high;
1784                 }
1785         }
1786 }
1787
1788 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1789 {
1790         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1791             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1792                 if (gr->lkb_highbast < DLM_LOCK_EX)
1793                         return 1;
1794                 return 0;
1795         }
1796
1797         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1798                 return 1;
1799         return 0;
1800 }
1801
1802 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1803                             struct dlm_lkb *lkb)
1804 {
1805         struct dlm_lkb *gr;
1806
1807         list_for_each_entry(gr, head, lkb_statequeue) {
1808                 if (gr->lkb_bastaddr && modes_require_bast(gr, lkb)) {
1809                         queue_bast(r, gr, lkb->lkb_rqmode);
1810                         gr->lkb_highbast = lkb->lkb_rqmode;
1811                 }
1812         }
1813 }
1814
1815 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1816 {
1817         send_bast_queue(r, &r->res_grantqueue, lkb);
1818 }
1819
1820 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1821 {
1822         send_bast_queue(r, &r->res_grantqueue, lkb);
1823         send_bast_queue(r, &r->res_convertqueue, lkb);
1824 }
1825
1826 /* set_master(r, lkb) -- set the master nodeid of a resource
1827
1828    The purpose of this function is to set the nodeid field in the given
1829    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1830    known, it can just be copied to the lkb and the function will return
1831    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1832    before it can be copied to the lkb.
1833
1834    When the rsb nodeid is being looked up remotely, the initial lkb
1835    causing the lookup is kept on the ls_waiters list waiting for the
1836    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1837    on the rsb's res_lookup list until the master is verified.
1838
1839    Return values:
1840    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1841    1: the rsb master is not available and the lkb has been placed on
1842       a wait queue
1843 */
1844
1845 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1846 {
1847         struct dlm_ls *ls = r->res_ls;
1848         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1849
1850         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1851                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1852                 r->res_first_lkid = lkb->lkb_id;
1853                 lkb->lkb_nodeid = r->res_nodeid;
1854                 return 0;
1855         }
1856
1857         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1858                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1859                 return 1;
1860         }
1861
1862         if (r->res_nodeid == 0) {
1863                 lkb->lkb_nodeid = 0;
1864                 return 0;
1865         }
1866
1867         if (r->res_nodeid > 0) {
1868                 lkb->lkb_nodeid = r->res_nodeid;
1869                 return 0;
1870         }
1871
1872         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1873
1874         dir_nodeid = dlm_dir_nodeid(r);
1875
1876         if (dir_nodeid != our_nodeid) {
1877                 r->res_first_lkid = lkb->lkb_id;
1878                 send_lookup(r, lkb);
1879                 return 1;
1880         }
1881
1882         for (i = 0; i < 2; i++) {
1883                 /* It's possible for dlm_scand to remove an old rsb for
1884                    this same resource from the toss list, us to create
1885                    a new one, look up the master locally, and find it
1886                    already exists just before dlm_scand does the
1887                    dir_remove() on the previous rsb. */
1888
1889                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1890                                        r->res_length, &ret_nodeid);
1891                 if (!error)
1892                         break;
1893                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1894                 schedule();
1895         }
1896         if (error && error != -EEXIST)
1897                 return error;
1898
1899         if (ret_nodeid == our_nodeid) {
1900                 r->res_first_lkid = 0;
1901                 r->res_nodeid = 0;
1902                 lkb->lkb_nodeid = 0;
1903         } else {
1904                 r->res_first_lkid = lkb->lkb_id;
1905                 r->res_nodeid = ret_nodeid;
1906                 lkb->lkb_nodeid = ret_nodeid;
1907         }
1908         return 0;
1909 }
1910
1911 static void process_lookup_list(struct dlm_rsb *r)
1912 {
1913         struct dlm_lkb *lkb, *safe;
1914
1915         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1916                 list_del_init(&lkb->lkb_rsb_lookup);
1917                 _request_lock(r, lkb);
1918                 schedule();
1919         }
1920 }
1921
1922 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1923
1924 static void confirm_master(struct dlm_rsb *r, int error)
1925 {
1926         struct dlm_lkb *lkb;
1927
1928         if (!r->res_first_lkid)
1929                 return;
1930
1931         switch (error) {
1932         case 0:
1933         case -EINPROGRESS:
1934                 r->res_first_lkid = 0;
1935                 process_lookup_list(r);
1936                 break;
1937
1938         case -EAGAIN:
1939         case -EBADR:
1940         case -ENOTBLK:
1941                 /* the remote request failed and won't be retried (it was
1942                    a NOQUEUE, or has been canceled/unlocked); make a waiting
1943                    lkb the first_lkid */
1944
1945                 r->res_first_lkid = 0;
1946
1947                 if (!list_empty(&r->res_lookup)) {
1948                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1949                                          lkb_rsb_lookup);
1950                         list_del_init(&lkb->lkb_rsb_lookup);
1951                         r->res_first_lkid = lkb->lkb_id;
1952                         _request_lock(r, lkb);
1953                 } else
1954                         r->res_nodeid = -1;
1955                 break;
1956
1957         default:
1958                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1959         }
1960 }
1961
1962 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1963                          int namelen, unsigned long timeout_cs, void *ast,
1964                          void *astarg, void *bast, struct dlm_args *args)
1965 {
1966         int rv = -EINVAL;
1967
1968         /* check for invalid arg usage */
1969
1970         if (mode < 0 || mode > DLM_LOCK_EX)
1971                 goto out;
1972
1973         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1974                 goto out;
1975
1976         if (flags & DLM_LKF_CANCEL)
1977                 goto out;
1978
1979         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1980                 goto out;
1981
1982         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1983                 goto out;
1984
1985         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1986                 goto out;
1987
1988         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1989                 goto out;
1990
1991         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1992                 goto out;
1993
1994         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1995                 goto out;
1996
1997         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1998                 goto out;
1999
2000         if (!ast || !lksb)
2001                 goto out;
2002
2003         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2004                 goto out;
2005
2006         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2007                 goto out;
2008
2009         /* these args will be copied to the lkb in validate_lock_args,
2010            it cannot be done now because when converting locks, fields in
2011            an active lkb cannot be modified before locking the rsb */
2012
2013         args->flags = flags;
2014         args->astaddr = ast;
2015         args->astparam = (long) astarg;
2016         args->bastaddr = bast;
2017         args->timeout = timeout_cs;
2018         args->mode = mode;
2019         args->lksb = lksb;
2020         rv = 0;
2021  out:
2022         return rv;
2023 }
2024
2025 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2026 {
2027         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2028                       DLM_LKF_FORCEUNLOCK))
2029                 return -EINVAL;
2030
2031         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2032                 return -EINVAL;
2033
2034         args->flags = flags;
2035         args->astparam = (long) astarg;
2036         return 0;
2037 }
2038
2039 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2040                               struct dlm_args *args)
2041 {
2042         int rv = -EINVAL;
2043
2044         if (args->flags & DLM_LKF_CONVERT) {
2045                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2046                         goto out;
2047
2048                 if (args->flags & DLM_LKF_QUECVT &&
2049                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2050                         goto out;
2051
2052                 rv = -EBUSY;
2053                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2054                         goto out;
2055
2056                 if (lkb->lkb_wait_type)
2057                         goto out;
2058
2059                 if (is_overlap(lkb))
2060                         goto out;
2061         }
2062
2063         lkb->lkb_exflags = args->flags;
2064         lkb->lkb_sbflags = 0;
2065         lkb->lkb_astaddr = args->astaddr;
2066         lkb->lkb_astparam = args->astparam;
2067         lkb->lkb_bastaddr = args->bastaddr;
2068         lkb->lkb_rqmode = args->mode;
2069         lkb->lkb_lksb = args->lksb;
2070         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2071         lkb->lkb_ownpid = (int) current->pid;
2072         lkb->lkb_timeout_cs = args->timeout;
2073         rv = 0;
2074  out:
2075         return rv;
2076 }
2077
2078 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2079    for success */
2080
2081 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2082    because there may be a lookup in progress and it's valid to do
2083    cancel/unlockf on it */
2084
2085 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2086 {
2087         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2088         int rv = -EINVAL;
2089
2090         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2091                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2092                 dlm_print_lkb(lkb);
2093                 goto out;
2094         }
2095
2096         /* an lkb may still exist even though the lock is EOL'ed due to a
2097            cancel, unlock or failed noqueue request; an app can't use these
2098            locks; return same error as if the lkid had not been found at all */
2099
2100         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2101                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2102                 rv = -ENOENT;
2103                 goto out;
2104         }
2105
2106         /* an lkb may be waiting for an rsb lookup to complete where the
2107            lookup was initiated by another lock */
2108
2109         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2110                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2111                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2112                         list_del_init(&lkb->lkb_rsb_lookup);
2113                         queue_cast(lkb->lkb_resource, lkb,
2114                                    args->flags & DLM_LKF_CANCEL ?
2115                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2116                         unhold_lkb(lkb); /* undoes create_lkb() */
2117                 }
2118                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2119                 rv = -EBUSY;
2120                 goto out;
2121         }
2122
2123         /* cancel not allowed with another cancel/unlock in progress */
2124
2125         if (args->flags & DLM_LKF_CANCEL) {
2126                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2127                         goto out;
2128
2129                 if (is_overlap(lkb))
2130                         goto out;
2131
2132                 /* don't let scand try to do a cancel */
2133                 del_timeout(lkb);
2134
2135                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2136                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2137                         rv = -EBUSY;
2138                         goto out;
2139                 }
2140
2141                 switch (lkb->lkb_wait_type) {
2142                 case DLM_MSG_LOOKUP:
2143                 case DLM_MSG_REQUEST:
2144                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2145                         rv = -EBUSY;
2146                         goto out;
2147                 case DLM_MSG_UNLOCK:
2148                 case DLM_MSG_CANCEL:
2149                         goto out;
2150                 }
2151                 /* add_to_waiters() will set OVERLAP_CANCEL */
2152                 goto out_ok;
2153         }
2154
2155         /* do we need to allow a force-unlock if there's a normal unlock
2156            already in progress?  in what conditions could the normal unlock
2157            fail such that we'd want to send a force-unlock to be sure? */
2158
2159         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2160                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2161                         goto out;
2162
2163                 if (is_overlap_unlock(lkb))
2164                         goto out;
2165
2166                 /* don't let scand try to do a cancel */
2167                 del_timeout(lkb);
2168
2169                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2170                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2171                         rv = -EBUSY;
2172                         goto out;
2173                 }
2174
2175                 switch (lkb->lkb_wait_type) {
2176                 case DLM_MSG_LOOKUP:
2177                 case DLM_MSG_REQUEST:
2178                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2179                         rv = -EBUSY;
2180                         goto out;
2181                 case DLM_MSG_UNLOCK:
2182                         goto out;
2183                 }
2184                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2185                 goto out_ok;
2186         }
2187
2188         /* normal unlock not allowed if there's any op in progress */
2189         rv = -EBUSY;
2190         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2191                 goto out;
2192
2193  out_ok:
2194         /* an overlapping op shouldn't blow away exflags from other op */
2195         lkb->lkb_exflags |= args->flags;
2196         lkb->lkb_sbflags = 0;
2197         lkb->lkb_astparam = args->astparam;
2198         rv = 0;
2199  out:
2200         if (rv)
2201                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2202                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2203                           args->flags, lkb->lkb_wait_type,
2204                           lkb->lkb_resource->res_name);
2205         return rv;
2206 }
2207
2208 /*
2209  * Four stage 4 varieties:
2210  * do_request(), do_convert(), do_unlock(), do_cancel()
2211  * These are called on the master node for the given lock and
2212  * from the central locking logic.
2213  */
2214
2215 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2216 {
2217         int error = 0;
2218
2219         if (can_be_granted(r, lkb, 1, NULL)) {
2220                 grant_lock(r, lkb);
2221                 queue_cast(r, lkb, 0);
2222                 goto out;
2223         }
2224
2225         if (can_be_queued(lkb)) {
2226                 error = -EINPROGRESS;
2227                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2228                 send_blocking_asts(r, lkb);
2229                 add_timeout(lkb);
2230                 goto out;
2231         }
2232
2233         error = -EAGAIN;
2234         if (force_blocking_asts(lkb))
2235                 send_blocking_asts_all(r, lkb);
2236         queue_cast(r, lkb, -EAGAIN);
2237
2238  out:
2239         return error;
2240 }
2241
2242 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2243 {
2244         int error = 0;
2245         int deadlk = 0;
2246
2247         /* changing an existing lock may allow others to be granted */
2248
2249         if (can_be_granted(r, lkb, 1, &deadlk)) {
2250                 grant_lock(r, lkb);
2251                 queue_cast(r, lkb, 0);
2252                 grant_pending_locks(r);
2253                 goto out;
2254         }
2255
2256         /* can_be_granted() detected that this lock would block in a conversion
2257            deadlock, so we leave it on the granted queue and return EDEADLK in
2258            the ast for the convert. */
2259
2260         if (deadlk) {
2261                 /* it's left on the granted queue */
2262                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2263                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2264                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2265                 revert_lock(r, lkb);
2266                 queue_cast(r, lkb, -EDEADLK);
2267                 error = -EDEADLK;
2268                 goto out;
2269         }
2270
2271         /* is_demoted() means the can_be_granted() above set the grmode
2272            to NL, and left us on the granted queue.  This auto-demotion
2273            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2274            now grantable.  We have to try to grant other converting locks
2275            before we try again to grant this one. */
2276
2277         if (is_demoted(lkb)) {
2278                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2279                 if (_can_be_granted(r, lkb, 1)) {
2280                         grant_lock(r, lkb);
2281                         queue_cast(r, lkb, 0);
2282                         grant_pending_locks(r);
2283                         goto out;
2284                 }
2285                 /* else fall through and move to convert queue */
2286         }
2287
2288         if (can_be_queued(lkb)) {
2289                 error = -EINPROGRESS;
2290                 del_lkb(r, lkb);
2291                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2292                 send_blocking_asts(r, lkb);
2293                 add_timeout(lkb);
2294                 goto out;
2295         }
2296
2297         error = -EAGAIN;
2298         if (force_blocking_asts(lkb))
2299                 send_blocking_asts_all(r, lkb);
2300         queue_cast(r, lkb, -EAGAIN);
2301
2302  out:
2303         return error;
2304 }
2305
2306 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2307 {
2308         remove_lock(r, lkb);
2309         queue_cast(r, lkb, -DLM_EUNLOCK);
2310         grant_pending_locks(r);
2311         return -DLM_EUNLOCK;
2312 }
2313
2314 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2315  
2316 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2317 {
2318         int error;
2319
2320         error = revert_lock(r, lkb);
2321         if (error) {
2322                 queue_cast(r, lkb, -DLM_ECANCEL);
2323                 grant_pending_locks(r);
2324                 return -DLM_ECANCEL;
2325         }
2326         return 0;
2327 }
2328
2329 /*
2330  * Four stage 3 varieties:
2331  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2332  */
2333
2334 /* add a new lkb to a possibly new rsb, called by requesting process */
2335
2336 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2337 {
2338         int error;
2339
2340         /* set_master: sets lkb nodeid from r */
2341
2342         error = set_master(r, lkb);
2343         if (error < 0)
2344                 goto out;
2345         if (error) {
2346                 error = 0;
2347                 goto out;
2348         }
2349
2350         if (is_remote(r))
2351                 /* receive_request() calls do_request() on remote node */
2352                 error = send_request(r, lkb);
2353         else
2354                 error = do_request(r, lkb);
2355  out:
2356         return error;
2357 }
2358
2359 /* change some property of an existing lkb, e.g. mode */
2360
2361 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2362 {
2363         int error;
2364
2365         if (is_remote(r))
2366                 /* receive_convert() calls do_convert() on remote node */
2367                 error = send_convert(r, lkb);
2368         else
2369                 error = do_convert(r, lkb);
2370
2371         return error;
2372 }
2373
2374 /* remove an existing lkb from the granted queue */
2375
2376 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2377 {
2378         int error;
2379
2380         if (is_remote(r))
2381                 /* receive_unlock() calls do_unlock() on remote node */
2382                 error = send_unlock(r, lkb);
2383         else
2384                 error = do_unlock(r, lkb);
2385
2386         return error;
2387 }
2388
2389 /* remove an existing lkb from the convert or wait queue */
2390
2391 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2392 {
2393         int error;
2394
2395         if (is_remote(r))
2396                 /* receive_cancel() calls do_cancel() on remote node */
2397                 error = send_cancel(r, lkb);
2398         else
2399                 error = do_cancel(r, lkb);
2400
2401         return error;
2402 }
2403
2404 /*
2405  * Four stage 2 varieties:
2406  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2407  */
2408
2409 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2410                         int len, struct dlm_args *args)
2411 {
2412         struct dlm_rsb *r;
2413         int error;
2414
2415         error = validate_lock_args(ls, lkb, args);
2416         if (error)
2417                 goto out;
2418
2419         error = find_rsb(ls, name, len, R_CREATE, &r);
2420         if (error)
2421                 goto out;
2422
2423         lock_rsb(r);
2424
2425         attach_lkb(r, lkb);
2426         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2427
2428         error = _request_lock(r, lkb);
2429
2430         unlock_rsb(r);
2431         put_rsb(r);
2432
2433  out:
2434         return error;
2435 }
2436
2437 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2438                         struct dlm_args *args)
2439 {
2440         struct dlm_rsb *r;
2441         int error;
2442
2443         r = lkb->lkb_resource;
2444
2445         hold_rsb(r);
2446         lock_rsb(r);
2447
2448         error = validate_lock_args(ls, lkb, args);
2449         if (error)
2450                 goto out;
2451
2452         error = _convert_lock(r, lkb);
2453  out:
2454         unlock_rsb(r);
2455         put_rsb(r);
2456         return error;
2457 }
2458
2459 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2460                        struct dlm_args *args)
2461 {
2462         struct dlm_rsb *r;
2463         int error;
2464
2465         r = lkb->lkb_resource;
2466
2467         hold_rsb(r);
2468         lock_rsb(r);
2469
2470         error = validate_unlock_args(lkb, args);
2471         if (error)
2472                 goto out;
2473
2474         error = _unlock_lock(r, lkb);
2475  out:
2476         unlock_rsb(r);
2477         put_rsb(r);
2478         return error;
2479 }
2480
2481 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2482                        struct dlm_args *args)
2483 {
2484         struct dlm_rsb *r;
2485         int error;
2486
2487         r = lkb->lkb_resource;
2488
2489         hold_rsb(r);
2490         lock_rsb(r);
2491
2492         error = validate_unlock_args(lkb, args);
2493         if (error)
2494                 goto out;
2495
2496         error = _cancel_lock(r, lkb);
2497  out:
2498         unlock_rsb(r);
2499         put_rsb(r);
2500         return error;
2501 }
2502
2503 /*
2504  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2505  */
2506
2507 int dlm_lock(dlm_lockspace_t *lockspace,
2508              int mode,
2509              struct dlm_lksb *lksb,
2510              uint32_t flags,
2511              void *name,
2512              unsigned int namelen,
2513              uint32_t parent_lkid,
2514              void (*ast) (void *astarg),
2515              void *astarg,
2516              void (*bast) (void *astarg, int mode))
2517 {
2518         struct dlm_ls *ls;
2519         struct dlm_lkb *lkb;
2520         struct dlm_args args;
2521         int error, convert = flags & DLM_LKF_CONVERT;
2522
2523         ls = dlm_find_lockspace_local(lockspace);
2524         if (!ls)
2525                 return -EINVAL;
2526
2527         dlm_lock_recovery(ls);
2528
2529         if (convert)
2530                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2531         else
2532                 error = create_lkb(ls, &lkb);
2533
2534         if (error)
2535                 goto out;
2536
2537         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2538                               astarg, bast, &args);
2539         if (error)
2540                 goto out_put;
2541
2542         if (convert)
2543                 error = convert_lock(ls, lkb, &args);
2544         else
2545                 error = request_lock(ls, lkb, name, namelen, &args);
2546
2547         if (error == -EINPROGRESS)
2548                 error = 0;
2549  out_put:
2550         if (convert || error)
2551                 __put_lkb(ls, lkb);
2552         if (error == -EAGAIN || error == -EDEADLK)
2553                 error = 0;
2554  out:
2555         dlm_unlock_recovery(ls);
2556         dlm_put_lockspace(ls);
2557         return error;
2558 }
2559
2560 int dlm_unlock(dlm_lockspace_t *lockspace,
2561                uint32_t lkid,
2562                uint32_t flags,
2563                struct dlm_lksb *lksb,
2564                void *astarg)
2565 {
2566         struct dlm_ls *ls;
2567         struct dlm_lkb *lkb;
2568         struct dlm_args args;
2569         int error;
2570
2571         ls = dlm_find_lockspace_local(lockspace);
2572         if (!ls)
2573                 return -EINVAL;
2574
2575         dlm_lock_recovery(ls);
2576
2577         error = find_lkb(ls, lkid, &lkb);
2578         if (error)
2579                 goto out;
2580
2581         error = set_unlock_args(flags, astarg, &args);
2582         if (error)
2583                 goto out_put;
2584
2585         if (flags & DLM_LKF_CANCEL)
2586                 error = cancel_lock(ls, lkb, &args);
2587         else
2588                 error = unlock_lock(ls, lkb, &args);
2589
2590         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2591                 error = 0;
2592         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2593                 error = 0;
2594  out_put:
2595         dlm_put_lkb(lkb);
2596  out:
2597         dlm_unlock_recovery(ls);
2598         dlm_put_lockspace(ls);
2599         return error;
2600 }
2601
2602 /*
2603  * send/receive routines for remote operations and replies
2604  *
2605  * send_args
2606  * send_common
2607  * send_request                 receive_request
2608  * send_convert                 receive_convert
2609  * send_unlock                  receive_unlock
2610  * send_cancel                  receive_cancel
2611  * send_grant                   receive_grant
2612  * send_bast                    receive_bast
2613  * send_lookup                  receive_lookup
2614  * send_remove                  receive_remove
2615  *
2616  *                              send_common_reply
2617  * receive_request_reply        send_request_reply
2618  * receive_convert_reply        send_convert_reply
2619  * receive_unlock_reply         send_unlock_reply
2620  * receive_cancel_reply         send_cancel_reply
2621  * receive_lookup_reply         send_lookup_reply
2622  */
2623
2624 static int _create_message(struct dlm_ls *ls, int mb_len,
2625                            int to_nodeid, int mstype,
2626                            struct dlm_message **ms_ret,
2627                            struct dlm_mhandle **mh_ret)
2628 {
2629         struct dlm_message *ms;
2630         struct dlm_mhandle *mh;
2631         char *mb;
2632
2633         /* get_buffer gives us a message handle (mh) that we need to
2634            pass into lowcomms_commit and a message buffer (mb) that we
2635            write our data into */
2636
2637         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2638         if (!mh)
2639                 return -ENOBUFS;
2640
2641         memset(mb, 0, mb_len);
2642
2643         ms = (struct dlm_message *) mb;
2644
2645         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2646         ms->m_header.h_lockspace = ls->ls_global_id;
2647         ms->m_header.h_nodeid = dlm_our_nodeid();
2648         ms->m_header.h_length = mb_len;
2649         ms->m_header.h_cmd = DLM_MSG;
2650
2651         ms->m_type = mstype;
2652
2653         *mh_ret = mh;
2654         *ms_ret = ms;
2655         return 0;
2656 }
2657
2658 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2659                           int to_nodeid, int mstype,
2660                           struct dlm_message **ms_ret,
2661                           struct dlm_mhandle **mh_ret)
2662 {
2663         int mb_len = sizeof(struct dlm_message);
2664
2665         switch (mstype) {
2666         case DLM_MSG_REQUEST:
2667         case DLM_MSG_LOOKUP:
2668         case DLM_MSG_REMOVE:
2669                 mb_len += r->res_length;
2670                 break;
2671         case DLM_MSG_CONVERT:
2672         case DLM_MSG_UNLOCK:
2673         case DLM_MSG_REQUEST_REPLY:
2674         case DLM_MSG_CONVERT_REPLY:
2675         case DLM_MSG_GRANT:
2676                 if (lkb && lkb->lkb_lvbptr)
2677                         mb_len += r->res_ls->ls_lvblen;
2678                 break;
2679         }
2680
2681         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2682                                ms_ret, mh_ret);
2683 }
2684
2685 /* further lowcomms enhancements or alternate implementations may make
2686    the return value from this function useful at some point */
2687
2688 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2689 {
2690         dlm_message_out(ms);
2691         dlm_lowcomms_commit_buffer(mh);
2692         return 0;
2693 }
2694
2695 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2696                       struct dlm_message *ms)
2697 {
2698         ms->m_nodeid   = lkb->lkb_nodeid;
2699         ms->m_pid      = lkb->lkb_ownpid;
2700         ms->m_lkid     = lkb->lkb_id;
2701         ms->m_remid    = lkb->lkb_remid;
2702         ms->m_exflags  = lkb->lkb_exflags;
2703         ms->m_sbflags  = lkb->lkb_sbflags;
2704         ms->m_flags    = lkb->lkb_flags;
2705         ms->m_lvbseq   = lkb->lkb_lvbseq;
2706         ms->m_status   = lkb->lkb_status;
2707         ms->m_grmode   = lkb->lkb_grmode;
2708         ms->m_rqmode   = lkb->lkb_rqmode;
2709         ms->m_hash     = r->res_hash;
2710
2711         /* m_result and m_bastmode are set from function args,
2712            not from lkb fields */
2713
2714         if (lkb->lkb_bastaddr)
2715                 ms->m_asts |= AST_BAST;
2716         if (lkb->lkb_astaddr)
2717                 ms->m_asts |= AST_COMP;
2718
2719         /* compare with switch in create_message; send_remove() doesn't
2720            use send_args() */
2721
2722         switch (ms->m_type) {
2723         case DLM_MSG_REQUEST:
2724         case DLM_MSG_LOOKUP:
2725                 memcpy(ms->m_extra, r->res_name, r->res_length);
2726                 break;
2727         case DLM_MSG_CONVERT:
2728         case DLM_MSG_UNLOCK:
2729         case DLM_MSG_REQUEST_REPLY:
2730         case DLM_MSG_CONVERT_REPLY:
2731         case DLM_MSG_GRANT:
2732                 if (!lkb->lkb_lvbptr)
2733                         break;
2734                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2735                 break;
2736         }
2737 }
2738
2739 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2740 {
2741         struct dlm_message *ms;
2742         struct dlm_mhandle *mh;
2743         int to_nodeid, error;
2744
2745         error = add_to_waiters(lkb, mstype);
2746         if (error)
2747                 return error;
2748
2749         to_nodeid = r->res_nodeid;
2750
2751         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2752         if (error)
2753                 goto fail;
2754
2755         send_args(r, lkb, ms);
2756
2757         error = send_message(mh, ms);
2758         if (error)
2759                 goto fail;
2760         return 0;
2761
2762  fail:
2763         remove_from_waiters(lkb, msg_reply_type(mstype));
2764         return error;
2765 }
2766
2767 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2768 {
2769         return send_common(r, lkb, DLM_MSG_REQUEST);
2770 }
2771
2772 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2773 {
2774         int error;
2775
2776         error = send_common(r, lkb, DLM_MSG_CONVERT);
2777
2778         /* down conversions go without a reply from the master */
2779         if (!error && down_conversion(lkb)) {
2780                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2781                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2782                 r->res_ls->ls_stub_ms.m_result = 0;
2783                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2784                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2785         }
2786
2787         return error;
2788 }
2789
2790 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2791    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2792    that the master is still correct. */
2793
2794 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2795 {
2796         return send_common(r, lkb, DLM_MSG_UNLOCK);
2797 }
2798
2799 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2800 {
2801         return send_common(r, lkb, DLM_MSG_CANCEL);
2802 }
2803
2804 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2805 {
2806         struct dlm_message *ms;
2807         struct dlm_mhandle *mh;
2808         int to_nodeid, error;
2809
2810         to_nodeid = lkb->lkb_nodeid;
2811
2812         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2813         if (error)
2814                 goto out;
2815
2816         send_args(r, lkb, ms);
2817
2818         ms->m_result = 0;
2819
2820         error = send_message(mh, ms);
2821  out:
2822         return error;
2823 }
2824
2825 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2826 {
2827         struct dlm_message *ms;
2828         struct dlm_mhandle *mh;
2829         int to_nodeid, error;
2830
2831         to_nodeid = lkb->lkb_nodeid;
2832
2833         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2834         if (error)
2835                 goto out;
2836
2837         send_args(r, lkb, ms);
2838
2839         ms->m_bastmode = mode;
2840
2841         error = send_message(mh, ms);
2842  out:
2843         return error;
2844 }
2845
2846 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2847 {
2848         struct dlm_message *ms;
2849         struct dlm_mhandle *mh;
2850         int to_nodeid, error;
2851
2852         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2853         if (error)
2854                 return error;
2855
2856         to_nodeid = dlm_dir_nodeid(r);
2857
2858         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2859         if (error)
2860                 goto fail;
2861
2862         send_args(r, lkb, ms);
2863
2864         error = send_message(mh, ms);
2865         if (error)
2866                 goto fail;
2867         return 0;
2868
2869  fail:
2870         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2871         return error;
2872 }
2873
2874 static int send_remove(struct dlm_rsb *r)
2875 {
2876         struct dlm_message *ms;
2877         struct dlm_mhandle *mh;
2878         int to_nodeid, error;
2879
2880         to_nodeid = dlm_dir_nodeid(r);
2881
2882         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2883         if (error)
2884                 goto out;
2885
2886         memcpy(ms->m_extra, r->res_name, r->res_length);
2887         ms->m_hash = r->res_hash;
2888
2889         error = send_message(mh, ms);
2890  out:
2891         return error;
2892 }
2893
2894 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2895                              int mstype, int rv)
2896 {
2897         struct dlm_message *ms;
2898         struct dlm_mhandle *mh;
2899         int to_nodeid, error;
2900
2901         to_nodeid = lkb->lkb_nodeid;
2902
2903         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2904         if (error)
2905                 goto out;
2906
2907         send_args(r, lkb, ms);
2908
2909         ms->m_result = rv;
2910
2911         error = send_message(mh, ms);
2912  out:
2913         return error;
2914 }
2915
2916 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2917 {
2918         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2919 }
2920
2921 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2922 {
2923         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2924 }
2925
2926 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2927 {
2928         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2929 }
2930
2931 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2932 {
2933         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2934 }
2935
2936 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2937                              int ret_nodeid, int rv)
2938 {
2939         struct dlm_rsb *r = &ls->ls_stub_rsb;
2940         struct dlm_message *ms;
2941         struct dlm_mhandle *mh;
2942         int error, nodeid = ms_in->m_header.h_nodeid;
2943
2944         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2945         if (error)
2946                 goto out;
2947
2948         ms->m_lkid = ms_in->m_lkid;
2949         ms->m_result = rv;
2950         ms->m_nodeid = ret_nodeid;
2951
2952         error = send_message(mh, ms);
2953  out:
2954         return error;
2955 }
2956
2957 /* which args we save from a received message depends heavily on the type
2958    of message, unlike the send side where we can safely send everything about
2959    the lkb for any type of message */
2960
2961 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2962 {
2963         lkb->lkb_exflags = ms->m_exflags;
2964         lkb->lkb_sbflags = ms->m_sbflags;
2965         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2966                          (ms->m_flags & 0x0000FFFF);
2967 }
2968
2969 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2970 {
2971         lkb->lkb_sbflags = ms->m_sbflags;
2972         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2973                          (ms->m_flags & 0x0000FFFF);
2974 }
2975
2976 static int receive_extralen(struct dlm_message *ms)
2977 {
2978         return (ms->m_header.h_length - sizeof(struct dlm_message));
2979 }
2980
2981 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2982                        struct dlm_message *ms)
2983 {
2984         int len;
2985
2986         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2987                 if (!lkb->lkb_lvbptr)
2988                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
2989                 if (!lkb->lkb_lvbptr)
2990                         return -ENOMEM;
2991                 len = receive_extralen(ms);
2992                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2993         }
2994         return 0;
2995 }
2996
2997 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2998                                 struct dlm_message *ms)
2999 {
3000         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3001         lkb->lkb_ownpid = ms->m_pid;
3002         lkb->lkb_remid = ms->m_lkid;
3003         lkb->lkb_grmode = DLM_LOCK_IV;
3004         lkb->lkb_rqmode = ms->m_rqmode;
3005         lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
3006         lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
3007
3008         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3009                 /* lkb was just created so there won't be an lvb yet */
3010                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3011                 if (!lkb->lkb_lvbptr)
3012                         return -ENOMEM;
3013         }
3014
3015         return 0;
3016 }
3017
3018 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3019                                 struct dlm_message *ms)
3020 {
3021         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3022                 return -EBUSY;
3023
3024         if (receive_lvb(ls, lkb, ms))
3025                 return -ENOMEM;
3026
3027         lkb->lkb_rqmode = ms->m_rqmode;
3028         lkb->lkb_lvbseq = ms->m_lvbseq;
3029
3030         return 0;
3031 }
3032
3033 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3034                                struct dlm_message *ms)
3035 {
3036         if (receive_lvb(ls, lkb, ms))
3037                 return -ENOMEM;
3038         return 0;
3039 }
3040
3041 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3042    uses to send a reply and that the remote end uses to process the reply. */
3043
3044 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3045 {
3046         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3047         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3048         lkb->lkb_remid = ms->m_lkid;
3049 }
3050
3051 /* This is called after the rsb is locked so that we can safely inspect
3052    fields in the lkb. */
3053
3054 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3055 {
3056         int from = ms->m_header.h_nodeid;
3057         int error = 0;
3058
3059         switch (ms->m_type) {
3060         case DLM_MSG_CONVERT:
3061         case DLM_MSG_UNLOCK:
3062         case DLM_MSG_CANCEL:
3063                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3064                         error = -EINVAL;
3065                 break;
3066
3067         case DLM_MSG_CONVERT_REPLY:
3068         case DLM_MSG_UNLOCK_REPLY:
3069         case DLM_MSG_CANCEL_REPLY:
3070         case DLM_MSG_GRANT:
3071         case DLM_MSG_BAST:
3072                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3073                         error = -EINVAL;
3074                 break;
3075
3076         case DLM_MSG_REQUEST_REPLY:
3077                 if (!is_process_copy(lkb))
3078                         error = -EINVAL;
3079                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3080                         error = -EINVAL;
3081                 break;
3082
3083         default:
3084                 error = -EINVAL;
3085         }
3086
3087         if (error)
3088                 log_error(lkb->lkb_resource->res_ls,
3089                           "ignore invalid message %d from %d %x %x %x %d",
3090                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3091                           lkb->lkb_flags, lkb->lkb_nodeid);
3092         return error;
3093 }
3094
3095 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3096 {
3097         struct dlm_lkb *lkb;
3098         struct dlm_rsb *r;
3099         int error, namelen;
3100
3101         error = create_lkb(ls, &lkb);
3102         if (error)
3103                 goto fail;
3104
3105         receive_flags(lkb, ms);
3106         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3107         error = receive_request_args(ls, lkb, ms);
3108         if (error) {
3109                 __put_lkb(ls, lkb);
3110                 goto fail;
3111         }
3112
3113         namelen = receive_extralen(ms);
3114
3115         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3116         if (error) {
3117                 __put_lkb(ls, lkb);
3118                 goto fail;
3119         }
3120
3121         lock_rsb(r);
3122
3123         attach_lkb(r, lkb);
3124         error = do_request(r, lkb);
3125         send_request_reply(r, lkb, error);
3126
3127         unlock_rsb(r);
3128         put_rsb(r);
3129
3130         if (error == -EINPROGRESS)
3131                 error = 0;
3132         if (error)
3133                 dlm_put_lkb(lkb);
3134         return;
3135
3136  fail:
3137         setup_stub_lkb(ls, ms);
3138         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3139 }
3140
3141 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3142 {
3143         struct dlm_lkb *lkb;
3144         struct dlm_rsb *r;
3145         int error, reply = 1;
3146
3147         error = find_lkb(ls, ms->m_remid, &lkb);
3148         if (error)
3149                 goto fail;
3150
3151         r = lkb->lkb_resource;
3152
3153         hold_rsb(r);
3154         lock_rsb(r);
3155
3156         error = validate_message(lkb, ms);
3157         if (error)
3158                 goto out;
3159
3160         receive_flags(lkb, ms);
3161         error = receive_convert_args(ls, lkb, ms);
3162         if (error)
3163                 goto out_reply;
3164         reply = !down_conversion(lkb);
3165
3166         error = do_convert(r, lkb);
3167  out_reply:
3168         if (reply)
3169                 send_convert_reply(r, lkb, error);
3170  out:
3171         unlock_rsb(r);
3172         put_rsb(r);
3173         dlm_put_lkb(lkb);
3174         return;
3175
3176  fail:
3177         setup_stub_lkb(ls, ms);
3178         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3179 }
3180
3181 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3182 {
3183         struct dlm_lkb *lkb;
3184         struct dlm_rsb *r;
3185         int error;
3186
3187         error = find_lkb(ls, ms->m_remid, &lkb);
3188         if (error)
3189                 goto fail;
3190
3191         r = lkb->lkb_resource;
3192
3193         hold_rsb(r);
3194         lock_rsb(r);
3195
3196         error = validate_message(lkb, ms);
3197         if (error)
3198                 goto out;
3199
3200         receive_flags(lkb, ms);
3201         error = receive_unlock_args(ls, lkb, ms);
3202         if (error)
3203                 goto out_reply;
3204
3205         error = do_unlock(r, lkb);
3206  out_reply:
3207         send_unlock_reply(r, lkb, error);
3208  out:
3209         unlock_rsb(r);
3210         put_rsb(r);
3211         dlm_put_lkb(lkb);
3212         return;
3213
3214  fail:
3215         setup_stub_lkb(ls, ms);
3216         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3217 }
3218
3219 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3220 {
3221         struct dlm_lkb *lkb;
3222         struct dlm_rsb *r;
3223         int error;
3224
3225         error = find_lkb(ls, ms->m_remid, &lkb);
3226         if (error)
3227                 goto fail;
3228
3229         receive_flags(lkb, ms);
3230
3231         r = lkb->lkb_resource;
3232
3233         hold_rsb(r);
3234         lock_rsb(r);
3235
3236         error = validate_message(lkb, ms);
3237         if (error)
3238                 goto out;
3239
3240         error = do_cancel(r, lkb);
3241         send_cancel_reply(r, lkb, error);
3242  out:
3243         unlock_rsb(r);
3244         put_rsb(r);
3245         dlm_put_lkb(lkb);
3246         return;
3247
3248  fail:
3249         setup_stub_lkb(ls, ms);
3250         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3251 }
3252
3253 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3254 {
3255         struct dlm_lkb *lkb;
3256         struct dlm_rsb *r;
3257         int error;
3258
3259         error = find_lkb(ls, ms->m_remid, &lkb);
3260         if (error) {
3261                 log_debug(ls, "receive_grant from %d no lkb %x",
3262                           ms->m_header.h_nodeid, ms->m_remid);
3263                 return;
3264         }
3265
3266         r = lkb->lkb_resource;
3267
3268         hold_rsb(r);
3269         lock_rsb(r);
3270
3271         error = validate_message(lkb, ms);
3272         if (error)
3273                 goto out;
3274
3275         receive_flags_reply(lkb, ms);
3276         if (is_altmode(lkb))
3277                 munge_altmode(lkb, ms);
3278         grant_lock_pc(r, lkb, ms);
3279         queue_cast(r, lkb, 0);
3280  out:
3281         unlock_rsb(r);
3282         put_rsb(r);
3283         dlm_put_lkb(lkb);
3284 }
3285
3286 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3287 {
3288         struct dlm_lkb *lkb;
3289         struct dlm_rsb *r;
3290         int error;
3291
3292         error = find_lkb(ls, ms->m_remid, &lkb);
3293         if (error) {
3294                 log_debug(ls, "receive_bast from %d no lkb %x",
3295                           ms->m_header.h_nodeid, ms->m_remid);
3296                 return;
3297         }
3298
3299         r = lkb->lkb_resource;
3300
3301         hold_rsb(r);
3302         lock_rsb(r);
3303
3304         error = validate_message(lkb, ms);
3305         if (error)
3306                 goto out;
3307
3308         queue_bast(r, lkb, ms->m_bastmode);
3309  out:
3310         unlock_rsb(r);
3311         put_rsb(r);
3312         dlm_put_lkb(lkb);
3313 }
3314
3315 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3316 {
3317         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3318
3319         from_nodeid = ms->m_header.h_nodeid;
3320         our_nodeid = dlm_our_nodeid();
3321
3322         len = receive_extralen(ms);
3323
3324         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3325         if (dir_nodeid != our_nodeid) {
3326                 log_error(ls, "lookup dir_nodeid %d from %d",
3327                           dir_nodeid, from_nodeid);
3328                 error = -EINVAL;
3329                 ret_nodeid = -1;
3330                 goto out;
3331         }
3332
3333         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3334
3335         /* Optimization: we're master so treat lookup as a request */
3336         if (!error && ret_nodeid == our_nodeid) {
3337                 receive_request(ls, ms);
3338                 return;
3339         }
3340  out:
3341         send_lookup_reply(ls, ms, ret_nodeid, error);
3342 }
3343
3344 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3345 {
3346         int len, dir_nodeid, from_nodeid;
3347
3348         from_nodeid = ms->m_header.h_nodeid;
3349
3350         len = receive_extralen(ms);
3351
3352         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3353         if (dir_nodeid != dlm_our_nodeid()) {
3354                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3355                           dir_nodeid, from_nodeid);
3356                 return;
3357         }
3358
3359         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3360 }
3361
3362 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3363 {
3364         do_purge(ls, ms->m_nodeid, ms->m_pid);
3365 }
3366
3367 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3368 {
3369         struct dlm_lkb *lkb;
3370         struct dlm_rsb *r;
3371         int error, mstype, result;
3372
3373         error = find_lkb(ls, ms->m_remid, &lkb);
3374         if (error) {
3375                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3376                           ms->m_header.h_nodeid, ms->m_remid);
3377                 return;
3378         }
3379
3380         r = lkb->lkb_resource;
3381         hold_rsb(r);
3382         lock_rsb(r);
3383
3384         error = validate_message(lkb, ms);
3385         if (error)
3386                 goto out;
3387
3388         mstype = lkb->lkb_wait_type;
3389         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3390         if (error)
3391                 goto out;
3392
3393         /* Optimization: the dir node was also the master, so it took our
3394            lookup as a request and sent request reply instead of lookup reply */
3395         if (mstype == DLM_MSG_LOOKUP) {
3396                 r->res_nodeid = ms->m_header.h_nodeid;
3397                 lkb->lkb_nodeid = r->res_nodeid;
3398         }
3399
3400         /* this is the value returned from do_request() on the master */
3401         result = ms->m_result;
3402
3403         switch (result) {
3404         case -EAGAIN:
3405                 /* request would block (be queued) on remote master */
3406                 queue_cast(r, lkb, -EAGAIN);
3407                 confirm_master(r, -EAGAIN);
3408                 unhold_lkb(lkb); /* undoes create_lkb() */
3409                 break;
3410
3411         case -EINPROGRESS:
3412         case 0:
3413                 /* request was queued or granted on remote master */
3414                 receive_flags_reply(lkb, ms);
3415                 lkb->lkb_remid = ms->m_lkid;
3416                 if (is_altmode(lkb))
3417                         munge_altmode(lkb, ms);
3418                 if (result) {
3419                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3420                         add_timeout(lkb);
3421                 } else {
3422                         grant_lock_pc(r, lkb, ms);
3423                         queue_cast(r, lkb, 0);
3424                 }
3425                 confirm_master(r, result);
3426                 break;
3427
3428         case -EBADR:
3429         case -ENOTBLK:
3430                 /* find_rsb failed to find rsb or rsb wasn't master */
3431                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3432                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3433                 r->res_nodeid = -1;
3434                 lkb->lkb_nodeid = -1;
3435
3436                 if (is_overlap(lkb)) {
3437                         /* we'll ignore error in cancel/unlock reply */
3438                         queue_cast_overlap(r, lkb);
3439                         confirm_master(r, result);
3440                         unhold_lkb(lkb); /* undoes create_lkb() */
3441                 } else
3442                         _request_lock(r, lkb);
3443                 break;
3444
3445         default:
3446                 log_error(ls, "receive_request_reply %x error %d",
3447                           lkb->lkb_id, result);
3448         }
3449
3450         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3451                 log_debug(ls, "receive_request_reply %x result %d unlock",
3452                           lkb->lkb_id, result);
3453                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3454                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3455                 send_unlock(r, lkb);
3456         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3457                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3458                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3459                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3460                 send_cancel(r, lkb);
3461         } else {
3462                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3463                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3464         }
3465  out:
3466         unlock_rsb(r);
3467         put_rsb(r);
3468         dlm_put_lkb(lkb);
3469 }
3470
3471 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3472                                     struct dlm_message *ms)
3473 {
3474         /* this is the value returned from do_convert() on the master */
3475         switch (ms->m_result) {
3476         case -EAGAIN:
3477                 /* convert would block (be queued) on remote master */
3478                 queue_cast(r, lkb, -EAGAIN);
3479                 break;
3480
3481         case -EDEADLK:
3482                 receive_flags_reply(lkb, ms);
3483                 revert_lock_pc(r, lkb);
3484                 queue_cast(r, lkb, -EDEADLK);
3485                 break;
3486
3487         case -EINPROGRESS:
3488                 /* convert was queued on remote master */
3489                 receive_flags_reply(lkb, ms);
3490                 if (is_demoted(lkb))
3491                         munge_demoted(lkb, ms);
3492                 del_lkb(r, lkb);
3493                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3494                 add_timeout(lkb);
3495                 break;
3496
3497         case 0:
3498                 /* convert was granted on remote master */
3499                 receive_flags_reply(lkb, ms);
3500                 if (is_demoted(lkb))
3501                         munge_demoted(lkb, ms);
3502                 grant_lock_pc(r, lkb, ms);
3503                 queue_cast(r, lkb, 0);
3504                 break;
3505
3506         default:
3507                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3508                           lkb->lkb_id, ms->m_result);
3509         }
3510 }
3511
3512 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3513 {
3514         struct dlm_rsb *r = lkb->lkb_resource;
3515         int error;
3516
3517         hold_rsb(r);
3518         lock_rsb(r);
3519
3520         error = validate_message(lkb, ms);
3521         if (error)
3522                 goto out;
3523
3524         /* stub reply can happen with waiters_mutex held */
3525         error = remove_from_waiters_ms(lkb, ms);
3526         if (error)
3527                 goto out;
3528
3529         __receive_convert_reply(r, lkb, ms);
3530  out:
3531         unlock_rsb(r);
3532         put_rsb(r);
3533 }
3534
3535 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3536 {
3537         struct dlm_lkb *lkb;
3538         int error;
3539
3540         error = find_lkb(ls, ms->m_remid, &lkb);
3541         if (error) {
3542                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3543                           ms->m_header.h_nodeid, ms->m_remid);
3544                 return;
3545         }
3546
3547         _receive_convert_reply(lkb, ms);
3548         dlm_put_lkb(lkb);
3549 }
3550
3551 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3552 {
3553         struct dlm_rsb *r = lkb->lkb_resource;
3554         int error;
3555
3556         hold_rsb(r);
3557         lock_rsb(r);
3558
3559         error = validate_message(lkb, ms);
3560         if (error)
3561                 goto out;
3562
3563         /* stub reply can happen with waiters_mutex held */
3564         error = remove_from_waiters_ms(lkb, ms);
3565         if (error)
3566                 goto out;
3567
3568         /* this is the value returned from do_unlock() on the master */
3569
3570         switch (ms->m_result) {
3571         case -DLM_EUNLOCK:
3572                 receive_flags_reply(lkb, ms);
3573                 remove_lock_pc(r, lkb);
3574                 queue_cast(r, lkb, -DLM_EUNLOCK);
3575                 break;
3576         case -ENOENT:
3577                 break;
3578         default:
3579                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3580                           lkb->lkb_id, ms->m_result);
3581         }
3582  out:
3583         unlock_rsb(r);
3584         put_rsb(r);
3585 }
3586
3587 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3588 {
3589         struct dlm_lkb *lkb;
3590         int error;
3591
3592         error = find_lkb(ls, ms->m_remid, &lkb);
3593         if (error) {
3594                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3595                           ms->m_header.h_nodeid, ms->m_remid);
3596                 return;
3597         }
3598
3599         _receive_unlock_reply(lkb, ms);
3600         dlm_put_lkb(lkb);
3601 }
3602
3603 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3604 {
3605         struct dlm_rsb *r = lkb->lkb_resource;
3606         int error;
3607
3608         hold_rsb(r);
3609         lock_rsb(r);
3610
3611         error = validate_message(lkb, ms);
3612         if (error)
3613                 goto out;
3614
3615         /* stub reply can happen with waiters_mutex held */
3616         error = remove_from_waiters_ms(lkb, ms);
3617         if (error)
3618                 goto out;
3619
3620         /* this is the value returned from do_cancel() on the master */
3621
3622         switch (ms->m_result) {
3623         case -DLM_ECANCEL:
3624                 receive_flags_reply(lkb, ms);
3625                 revert_lock_pc(r, lkb);
3626                 queue_cast(r, lkb, -DLM_ECANCEL);
3627                 break;
3628         case 0:
3629                 break;
3630         default:
3631                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3632                           lkb->lkb_id, ms->m_result);
3633         }
3634  out:
3635         unlock_rsb(r);
3636         put_rsb(r);
3637 }
3638
3639 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3640 {
3641         struct dlm_lkb *lkb;
3642         int error;
3643
3644         error = find_lkb(ls, ms->m_remid, &lkb);
3645         if (error) {
3646                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3647                           ms->m_header.h_nodeid, ms->m_remid);
3648                 return;
3649         }
3650
3651         _receive_cancel_reply(lkb, ms);
3652         dlm_put_lkb(lkb);
3653 }
3654
3655 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3656 {
3657         struct dlm_lkb *lkb;
3658         struct dlm_rsb *r;
3659         int error, ret_nodeid;
3660
3661         error = find_lkb(ls, ms->m_lkid, &lkb);
3662         if (error) {
3663                 log_error(ls, "receive_lookup_reply no lkb");
3664                 return;
3665         }
3666
3667         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3668            FIXME: will a non-zero error ever be returned? */
3669
3670         r = lkb->lkb_resource;
3671         hold_rsb(r);
3672         lock_rsb(r);
3673
3674         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3675         if (error)
3676                 goto out;
3677
3678         ret_nodeid = ms->m_nodeid;
3679         if (ret_nodeid == dlm_our_nodeid()) {
3680                 r->res_nodeid = 0;
3681                 ret_nodeid = 0;
3682                 r->res_first_lkid = 0;
3683         } else {
3684                 /* set_master() will copy res_nodeid to lkb_nodeid */
3685                 r->res_nodeid = ret_nodeid;
3686         }
3687
3688         if (is_overlap(lkb)) {
3689                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3690                           lkb->lkb_id, lkb->lkb_flags);
3691                 queue_cast_overlap(r, lkb);
3692                 unhold_lkb(lkb); /* undoes create_lkb() */
3693                 goto out_list;
3694         }
3695
3696         _request_lock(r, lkb);
3697
3698  out_list:
3699         if (!ret_nodeid)
3700                 process_lookup_list(r);
3701  out:
3702         unlock_rsb(r);
3703         put_rsb(r);
3704         dlm_put_lkb(lkb);
3705 }
3706
3707 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3708 {
3709         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3710                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3711                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3712                           ms->m_remid, ms->m_result);
3713                 return;
3714         }
3715
3716         switch (ms->m_type) {
3717
3718         /* messages sent to a master node */
3719
3720         case DLM_MSG_REQUEST:
3721                 receive_request(ls, ms);
3722                 break;
3723
3724         case DLM_MSG_CONVERT:
3725                 receive_convert(ls, ms);
3726                 break;
3727
3728         case DLM_MSG_UNLOCK:
3729                 receive_unlock(ls, ms);
3730                 break;
3731
3732         case DLM_MSG_CANCEL:
3733                 receive_cancel(ls, ms);
3734                 break;
3735
3736         /* messages sent from a master node (replies to above) */
3737
3738         case DLM_MSG_REQUEST_REPLY:
3739                 receive_request_reply(ls, ms);
3740                 break;
3741
3742         case DLM_MSG_CONVERT_REPLY:
3743                 receive_convert_reply(ls, ms);
3744                 break;
3745
3746         case DLM_MSG_UNLOCK_REPLY:
3747                 receive_unlock_reply(ls, ms);
3748                 break;
3749
3750         case DLM_MSG_CANCEL_REPLY:
3751                 receive_cancel_reply(ls, ms);
3752                 break;
3753
3754         /* messages sent from a master node (only two types of async msg) */
3755
3756         case DLM_MSG_GRANT:
3757                 receive_grant(ls, ms);
3758                 break;
3759
3760         case DLM_MSG_BAST:
3761                 receive_bast(ls, ms);
3762                 break;
3763
3764         /* messages sent to a dir node */
3765
3766         case DLM_MSG_LOOKUP:
3767                 receive_lookup(ls, ms);
3768                 break;
3769
3770         case DLM_MSG_REMOVE:
3771                 receive_remove(ls, ms);
3772                 break;
3773
3774         /* messages sent from a dir node (remove has no reply) */
3775
3776         case DLM_MSG_LOOKUP_REPLY:
3777                 receive_lookup_reply(ls, ms);
3778                 break;
3779
3780         /* other messages */
3781
3782         case DLM_MSG_PURGE:
3783                 receive_purge(ls, ms);
3784                 break;
3785
3786         default:
3787                 log_error(ls, "unknown message type %d", ms->m_type);
3788         }
3789
3790         dlm_astd_wake();
3791 }
3792
3793 /* If the lockspace is in recovery mode (locking stopped), then normal
3794    messages are saved on the requestqueue for processing after recovery is
3795    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3796    messages off the requestqueue before we process new ones. This occurs right
3797    after recovery completes when we transition from saving all messages on
3798    requestqueue, to processing all the saved messages, to processing new
3799    messages as they arrive. */
3800
3801 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3802                                 int nodeid)
3803 {
3804         if (dlm_locking_stopped(ls)) {
3805                 dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
3806         } else {
3807                 dlm_wait_requestqueue(ls);
3808                 _receive_message(ls, ms);
3809         }
3810 }
3811
3812 /* This is called by dlm_recoverd to process messages that were saved on
3813    the requestqueue. */
3814
3815 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3816 {
3817         _receive_message(ls, ms);
3818 }
3819
3820 /* This is called by the midcomms layer when something is received for
3821    the lockspace.  It could be either a MSG (normal message sent as part of
3822    standard locking activity) or an RCOM (recovery message sent as part of
3823    lockspace recovery). */
3824
3825 void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
3826 {
3827         struct dlm_message *ms = (struct dlm_message *) hd;
3828         struct dlm_rcom *rc = (struct dlm_rcom *) hd;
3829         struct dlm_ls *ls;
3830         int type = 0;
3831
3832         switch (hd->h_cmd) {
3833         case DLM_MSG:
3834                 dlm_message_in(ms);
3835                 type = ms->m_type;
3836                 break;
3837         case DLM_RCOM:
3838                 dlm_rcom_in(rc);
3839                 type = rc->rc_type;
3840                 break;
3841         default:
3842                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3843                 return;
3844         }
3845
3846         if (hd->h_nodeid != nodeid) {
3847                 log_print("invalid h_nodeid %d from %d lockspace %x",
3848                           hd->h_nodeid, nodeid, hd->h_lockspace);
3849                 return;
3850         }
3851
3852         ls = dlm_find_lockspace_global(hd->h_lockspace);
3853         if (!ls) {
3854                 if (dlm_config.ci_log_debug)
3855                         log_print("invalid lockspace %x from %d cmd %d type %d",
3856                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
3857
3858                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3859                         dlm_send_ls_not_ready(nodeid, rc);
3860                 return;
3861         }
3862
3863         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3864            be inactive (in this ls) before transitioning to recovery mode */
3865
3866         down_read(&ls->ls_recv_active);
3867         if (hd->h_cmd == DLM_MSG)
3868                 dlm_receive_message(ls, ms, nodeid);
3869         else
3870                 dlm_receive_rcom(ls, rc, nodeid);
3871         up_read(&ls->ls_recv_active);
3872
3873         dlm_put_lockspace(ls);
3874 }
3875
3876 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3877 {
3878         if (middle_conversion(lkb)) {
3879                 hold_lkb(lkb);
3880                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3881                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3882                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3883                 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3884                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3885
3886                 /* Same special case as in receive_rcom_lock_args() */
3887                 lkb->lkb_grmode = DLM_LOCK_IV;
3888                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3889                 unhold_lkb(lkb);
3890
3891         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3892                 lkb->lkb_flags |= DLM_IFL_RESEND;
3893         }
3894
3895         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3896            conversions are async; there's no reply from the remote master */
3897 }
3898
3899 /* A waiting lkb needs recovery if the master node has failed, or
3900    the master node is changing (only when no directory is used) */
3901
3902 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3903 {
3904         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3905                 return 1;
3906
3907         if (!dlm_no_directory(ls))
3908                 return 0;
3909
3910         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3911                 return 1;
3912
3913         return 0;
3914 }
3915
3916 /* Recovery for locks that are waiting for replies from nodes that are now
3917    gone.  We can just complete unlocks and cancels by faking a reply from the
3918    dead node.  Requests and up-conversions we flag to be resent after
3919    recovery.  Down-conversions can just be completed with a fake reply like
3920    unlocks.  Conversions between PR and CW need special attention. */
3921
3922 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3923 {
3924         struct dlm_lkb *lkb, *safe;
3925         int wait_type, stub_unlock_result, stub_cancel_result;
3926
3927         mutex_lock(&ls->ls_waiters_mutex);
3928
3929         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3930                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3931                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3932
3933                 /* all outstanding lookups, regardless of destination  will be
3934                    resent after recovery is done */
3935
3936                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3937                         lkb->lkb_flags |= DLM_IFL_RESEND;
3938                         continue;
3939                 }
3940
3941                 if (!waiter_needs_recovery(ls, lkb))
3942                         continue;
3943
3944                 wait_type = lkb->lkb_wait_type;
3945                 stub_unlock_result = -DLM_EUNLOCK;
3946                 stub_cancel_result = -DLM_ECANCEL;
3947
3948                 /* Main reply may have been received leaving a zero wait_type,
3949                    but a reply for the overlapping op may not have been
3950                    received.  In that case we need to fake the appropriate
3951                    reply for the overlap op. */
3952
3953                 if (!wait_type) {
3954                         if (is_overlap_cancel(lkb)) {
3955                                 wait_type = DLM_MSG_CANCEL;
3956                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
3957                                         stub_cancel_result = 0;
3958                         }
3959                         if (is_overlap_unlock(lkb)) {
3960                                 wait_type = DLM_MSG_UNLOCK;
3961                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
3962                                         stub_unlock_result = -ENOENT;
3963                         }
3964
3965                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
3966                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
3967                                   stub_cancel_result, stub_unlock_result);
3968                 }
3969
3970                 switch (wait_type) {
3971
3972                 case DLM_MSG_REQUEST:
3973                         lkb->lkb_flags |= DLM_IFL_RESEND;
3974                         break;
3975
3976                 case DLM_MSG_CONVERT:
3977                         recover_convert_waiter(ls, lkb);
3978                         break;
3979
3980                 case DLM_MSG_UNLOCK:
3981                         hold_lkb(lkb);
3982                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3983                         ls->ls_stub_ms.m_result = stub_unlock_result;
3984                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3985                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3986                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3987                         dlm_put_lkb(lkb);
3988                         break;
3989
3990                 case DLM_MSG_CANCEL:
3991                         hold_lkb(lkb);
3992                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3993                         ls->ls_stub_ms.m_result = stub_cancel_result;
3994                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3995                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3996                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3997                         dlm_put_lkb(lkb);
3998                         break;
3999
4000                 default:
4001                         log_error(ls, "invalid lkb wait_type %d %d",
4002                                   lkb->lkb_wait_type, wait_type);
4003                 }
4004                 schedule();
4005         }
4006         mutex_unlock(&ls->ls_waiters_mutex);
4007 }
4008
4009 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4010 {
4011         struct dlm_lkb *lkb;
4012         int found = 0;
4013
4014         mutex_lock(&ls->ls_waiters_mutex);
4015         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4016                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4017                         hold_lkb(lkb);
4018                         found = 1;
4019                         break;
4020                 }
4021         }
4022         mutex_unlock(&ls->ls_waiters_mutex);
4023
4024         if (!found)
4025                 lkb = NULL;
4026         return lkb;
4027 }
4028
4029 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4030    master or dir-node for r.  Processing the lkb may result in it being placed
4031    back on waiters. */
4032
4033 /* We do this after normal locking has been enabled and any saved messages
4034    (in requestqueue) have been processed.  We should be confident that at
4035    this point we won't get or process a reply to any of these waiting
4036    operations.  But, new ops may be coming in on the rsbs/locks here from
4037    userspace or remotely. */
4038
4039 /* there may have been an overlap unlock/cancel prior to recovery or after
4040    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4041    overlap flag would just have been set and nothing new sent.  we can be
4042    confident here than any replies to either the initial op or overlap ops
4043    prior to recovery have been received. */
4044
4045 int dlm_recover_waiters_post(struct dlm_ls *ls)
4046 {
4047         struct dlm_lkb *lkb;
4048         struct dlm_rsb *r;
4049         int error = 0, mstype, err, oc, ou;
4050
4051         while (1) {
4052                 if (dlm_locking_stopped(ls)) {
4053                         log_debug(ls, "recover_waiters_post aborted");
4054                         error = -EINTR;
4055                         break;
4056                 }
4057
4058                 lkb = find_resend_waiter(ls);
4059                 if (!lkb)
4060                         break;
4061
4062                 r = lkb->lkb_resource;
4063                 hold_rsb(r);
4064                 lock_rsb(r);
4065
4066                 mstype = lkb->lkb_wait_type;
4067                 oc = is_overlap_cancel(lkb);
4068                 ou = is_overlap_unlock(lkb);
4069                 err = 0;
4070
4071                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4072                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4073
4074                 /* At this point we assume that we won't get a reply to any
4075                    previous op or overlap op on this lock.  First, do a big
4076                    remove_from_waiters() for all previous ops. */
4077
4078                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4079                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4080                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4081                 lkb->lkb_wait_type = 0;
4082                 lkb->lkb_wait_count = 0;
4083                 mutex_lock(&ls->ls_waiters_mutex);
4084                 list_del_init(&lkb->lkb_wait_reply);
4085                 mutex_unlock(&ls->ls_waiters_mutex);
4086                 unhold_lkb(lkb); /* for waiters list */
4087
4088                 if (oc || ou) {
4089                         /* do an unlock or cancel instead of resending */
4090                         switch (mstype) {
4091                         case DLM_MSG_LOOKUP:
4092                         case DLM_MSG_REQUEST:
4093                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4094                                                         -DLM_ECANCEL);
4095                                 unhold_lkb(lkb); /* undoes create_lkb() */
4096                                 break;
4097                         case DLM_MSG_CONVERT:
4098                                 if (oc) {
4099                                         queue_cast(r, lkb, -DLM_ECANCEL);
4100                                 } else {
4101                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4102                                         _unlock_lock(r, lkb);
4103                                 }
4104                                 break;
4105                         default:
4106                                 err = 1;
4107                         }
4108                 } else {
4109                         switch (mstype) {
4110                         case DLM_MSG_LOOKUP:
4111                         case DLM_MSG_REQUEST:
4112                                 _request_lock(r, lkb);
4113                                 if (is_master(r))
4114                                         confirm_master(r, 0);
4115                                 break;
4116                         case DLM_MSG_CONVERT:
4117                                 _convert_lock(r, lkb);
4118                                 break;
4119                         default:
4120                                 err = 1;
4121                         }
4122                 }
4123
4124                 if (err)
4125                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4126                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4127                 unlock_rsb(r);
4128                 put_rsb(r);
4129                 dlm_put_lkb(lkb);
4130         }
4131
4132         return error;
4133 }
4134
4135 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4136                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4137 {
4138         struct dlm_ls *ls = r->res_ls;
4139         struct dlm_lkb *lkb, *safe;
4140
4141         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4142                 if (test(ls, lkb)) {
4143                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4144                         del_lkb(r, lkb);
4145                         /* this put should free the lkb */
4146                         if (!dlm_put_lkb(lkb))
4147                                 log_error(ls, "purged lkb not released");
4148                 }
4149         }
4150 }
4151
4152 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4153 {
4154         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4155 }
4156
4157 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4158 {
4159         return is_master_copy(lkb);
4160 }
4161
4162 static void purge_dead_locks(struct dlm_rsb *r)
4163 {
4164         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4165         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4166         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4167 }
4168
4169 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4170 {
4171         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4172         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4173         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4174 }
4175
4176 /* Get rid of locks held by nodes that are gone. */
4177
4178 int dlm_purge_locks(struct dlm_ls *ls)
4179 {
4180         struct dlm_rsb *r;
4181
4182         log_debug(ls, "dlm_purge_locks");
4183
4184         down_write(&ls->ls_root_sem);
4185         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4186                 hold_rsb(r);
4187                 lock_rsb(r);
4188                 if (is_master(r))
4189                         purge_dead_locks(r);
4190                 unlock_rsb(r);
4191                 unhold_rsb(r);
4192
4193                 schedule();
4194         }
4195         up_write(&ls->ls_root_sem);
4196
4197         return 0;
4198 }
4199
4200 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4201 {
4202         struct dlm_rsb *r, *r_ret = NULL;
4203
4204         read_lock(&ls->ls_rsbtbl[bucket].lock);
4205         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4206                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4207                         continue;
4208                 hold_rsb(r);
4209                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4210                 r_ret = r;
4211                 break;
4212         }
4213         read_unlock(&ls->ls_rsbtbl[bucket].lock);
4214         return r_ret;
4215 }
4216
4217 void dlm_grant_after_purge(struct dlm_ls *ls)
4218 {
4219         struct dlm_rsb *r;
4220         int bucket = 0;
4221
4222         while (1) {
4223                 r = find_purged_rsb(ls, bucket);
4224                 if (!r) {
4225                         if (bucket == ls->ls_rsbtbl_size - 1)
4226                                 break;
4227                         bucket++;
4228                         continue;
4229                 }
4230                 lock_rsb(r);
4231                 if (is_master(r)) {
4232                         grant_pending_locks(r);
4233                         confirm_master(r, 0);
4234                 }
4235                 unlock_rsb(r);
4236                 put_rsb(r);
4237                 schedule();
4238         }
4239 }
4240
4241 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4242                                          uint32_t remid)
4243 {
4244         struct dlm_lkb *lkb;
4245
4246         list_for_each_entry(lkb, head, lkb_statequeue) {
4247                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4248                         return lkb;
4249         }
4250         return NULL;
4251 }
4252
4253 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4254                                     uint32_t remid)
4255 {
4256         struct dlm_lkb *lkb;
4257
4258         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4259         if (lkb)
4260                 return lkb;
4261         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4262         if (lkb)
4263                 return lkb;
4264         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4265         if (lkb)
4266                 return lkb;
4267         return NULL;
4268 }
4269
4270 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4271                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4272 {
4273         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4274         int lvblen;
4275
4276         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4277         lkb->lkb_ownpid = rl->rl_ownpid;
4278         lkb->lkb_remid = rl->rl_lkid;
4279         lkb->lkb_exflags = rl->rl_exflags;
4280         lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
4281         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4282         lkb->lkb_lvbseq = rl->rl_lvbseq;
4283         lkb->lkb_rqmode = rl->rl_rqmode;
4284         lkb->lkb_grmode = rl->rl_grmode;
4285         /* don't set lkb_status because add_lkb wants to itself */
4286
4287         lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
4288         lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
4289
4290         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4291                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4292                 if (!lkb->lkb_lvbptr)
4293                         return -ENOMEM;
4294                 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4295                          sizeof(struct rcom_lock);
4296                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4297         }
4298
4299         /* Conversions between PR and CW (middle modes) need special handling.
4300            The real granted mode of these converting locks cannot be determined
4301            until all locks have been rebuilt on the rsb (recover_conversion) */
4302
4303         if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
4304                 rl->rl_status = DLM_LKSTS_CONVERT;
4305                 lkb->lkb_grmode = DLM_LOCK_IV;
4306                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4307         }
4308
4309         return 0;
4310 }
4311
4312 /* This lkb may have been recovered in a previous aborted recovery so we need
4313    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4314    If so we just send back a standard reply.  If not, we create a new lkb with
4315    the given values and send back our lkid.  We send back our lkid by sending
4316    back the rcom_lock struct we got but with the remid field filled in. */
4317
4318 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4319 {
4320         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4321         struct dlm_rsb *r;
4322         struct dlm_lkb *lkb;
4323         int error;
4324
4325         if (rl->rl_parent_lkid) {
4326                 error = -EOPNOTSUPP;
4327                 goto out;
4328         }
4329
4330         error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
4331         if (error)
4332                 goto out;
4333
4334         lock_rsb(r);
4335
4336         lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
4337         if (lkb) {
4338                 error = -EEXIST;
4339                 goto out_remid;
4340         }
4341
4342         error = create_lkb(ls, &lkb);
4343         if (error)
4344                 goto out_unlock;
4345
4346         error = receive_rcom_lock_args(ls, lkb, r, rc);
4347         if (error) {
4348                 __put_lkb(ls, lkb);
4349                 goto out_unlock;
4350         }
4351
4352         attach_lkb(r, lkb);
4353         add_lkb(r, lkb, rl->rl_status);
4354         error = 0;
4355
4356  out_remid:
4357         /* this is the new value returned to the lock holder for
4358            saving in its process-copy lkb */
4359         rl->rl_remid = lkb->lkb_id;
4360
4361  out_unlock:
4362         unlock_rsb(r);
4363         put_rsb(r);
4364  out:
4365         if (error)
4366                 log_debug(ls, "recover_master_copy %d %x", error, rl->rl_lkid);
4367         rl->rl_result = error;
4368         return error;
4369 }
4370
4371 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4372 {
4373         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4374         struct dlm_rsb *r;
4375         struct dlm_lkb *lkb;
4376         int error;
4377
4378         error = find_lkb(ls, rl->rl_lkid, &lkb);
4379         if (error) {
4380                 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
4381                 return error;
4382         }
4383
4384         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4385
4386         error = rl->rl_result;
4387
4388         r = lkb->lkb_resource;
4389         hold_rsb(r);
4390         lock_rsb(r);
4391
4392         switch (error) {
4393         case -EBADR:
4394                 /* There's a chance the new master received our lock before
4395                    dlm_recover_master_reply(), this wouldn't happen if we did
4396                    a barrier between recover_masters and recover_locks. */
4397                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4398                           (unsigned long)r, r->res_name);
4399                 dlm_send_rcom_lock(r, lkb);
4400                 goto out;
4401         case -EEXIST:
4402                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4403                 /* fall through */
4404         case 0:
4405                 lkb->lkb_remid = rl->rl_remid;
4406                 break;
4407         default:
4408                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4409                           error, lkb->lkb_id);
4410         }
4411
4412         /* an ack for dlm_recover_locks() which waits for replies from
4413            all the locks it sends to new masters */
4414         dlm_recovered_lock(r);
4415  out:
4416         unlock_rsb(r);
4417         put_rsb(r);
4418         dlm_put_lkb(lkb);
4419
4420         return 0;
4421 }
4422
4423 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4424                      int mode, uint32_t flags, void *name, unsigned int namelen,
4425                      unsigned long timeout_cs)
4426 {
4427         struct dlm_lkb *lkb;
4428         struct dlm_args args;
4429         int error;
4430
4431         dlm_lock_recovery(ls);
4432
4433         error = create_lkb(ls, &lkb);
4434         if (error) {
4435                 kfree(ua);
4436                 goto out;
4437         }
4438
4439         if (flags & DLM_LKF_VALBLK) {
4440                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4441                 if (!ua->lksb.sb_lvbptr) {
4442                         kfree(ua);
4443                         __put_lkb(ls, lkb);
4444                         error = -ENOMEM;
4445                         goto out;
4446                 }
4447         }
4448
4449         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4450            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4451            lock and that lkb_astparam is the dlm_user_args structure. */
4452
4453         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4454                               DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4455         lkb->lkb_flags |= DLM_IFL_USER;
4456         ua->old_mode = DLM_LOCK_IV;
4457
4458         if (error) {
4459                 __put_lkb(ls, lkb);
4460                 goto out;
4461         }
4462
4463         error = request_lock(ls, lkb, name, namelen, &args);
4464
4465         switch (error) {
4466         case 0:
4467                 break;
4468         case -EINPROGRESS:
4469                 error = 0;
4470                 break;
4471         case -EAGAIN:
4472                 error = 0;
4473                 /* fall through */
4474         default:
4475                 __put_lkb(ls, lkb);
4476                 goto out;
4477         }
4478
4479         /* add this new lkb to the per-process list of locks */
4480         spin_lock(&ua->proc->locks_spin);
4481         hold_lkb(lkb);
4482         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4483         spin_unlock(&ua->proc->locks_spin);
4484  out:
4485         dlm_unlock_recovery(ls);
4486         return error;
4487 }
4488
4489 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4490                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4491                      unsigned long timeout_cs)
4492 {
4493         struct dlm_lkb *lkb;
4494         struct dlm_args args;
4495         struct dlm_user_args *ua;
4496         int error;
4497
4498         dlm_lock_recovery(ls);
4499
4500         error = find_lkb(ls, lkid, &lkb);
4501         if (error)
4502                 goto out;
4503
4504         /* user can change the params on its lock when it converts it, or
4505            add an lvb that didn't exist before */
4506
4507         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4508
4509         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4510                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4511                 if (!ua->lksb.sb_lvbptr) {
4512                         error = -ENOMEM;
4513                         goto out_put;
4514                 }
4515         }
4516         if (lvb_in && ua->lksb.sb_lvbptr)
4517                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4518
4519         ua->xid = ua_tmp->xid;
4520         ua->castparam = ua_tmp->castparam;
4521         ua->castaddr = ua_tmp->castaddr;
4522         ua->bastparam = ua_tmp->bastparam;
4523         ua->bastaddr = ua_tmp->bastaddr;
4524         ua->user_lksb = ua_tmp->user_lksb;
4525         ua->old_mode = lkb->lkb_grmode;
4526
4527         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4528                               DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4529         if (error)
4530                 goto out_put;
4531
4532         error = convert_lock(ls, lkb, &args);
4533
4534         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4535                 error = 0;
4536  out_put:
4537         dlm_put_lkb(lkb);
4538  out:
4539         dlm_unlock_recovery(ls);
4540         kfree(ua_tmp);
4541         return error;
4542 }
4543
4544 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4545                     uint32_t flags, uint32_t lkid, char *lvb_in)
4546 {
4547         struct dlm_lkb *lkb;
4548         struct dlm_args args;
4549         struct dlm_user_args *ua;
4550         int error;
4551
4552         dlm_lock_recovery(ls);
4553
4554         error = find_lkb(ls, lkid, &lkb);
4555         if (error)
4556                 goto out;
4557
4558         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4559
4560         if (lvb_in && ua->lksb.sb_lvbptr)
4561                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4562         if (ua_tmp->castparam)
4563                 ua->castparam = ua_tmp->castparam;
4564         ua->user_lksb = ua_tmp->user_lksb;
4565
4566         error = set_unlock_args(flags, ua, &args);
4567         if (error)
4568                 goto out_put;
4569
4570         error = unlock_lock(ls, lkb, &args);
4571
4572         if (error == -DLM_EUNLOCK)
4573                 error = 0;
4574         /* from validate_unlock_args() */
4575         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4576                 error = 0;
4577         if (error)
4578                 goto out_put;
4579
4580         spin_lock(&ua->proc->locks_spin);
4581         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4582         if (!list_empty(&lkb->lkb_ownqueue))
4583                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4584         spin_unlock(&ua->proc->locks_spin);
4585  out_put:
4586         dlm_put_lkb(lkb);
4587  out:
4588         dlm_unlock_recovery(ls);
4589         kfree(ua_tmp);
4590         return error;
4591 }
4592
4593 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4594                     uint32_t flags, uint32_t lkid)
4595 {
4596         struct dlm_lkb *lkb;
4597         struct dlm_args args;
4598         struct dlm_user_args *ua;
4599         int error;
4600
4601         dlm_lock_recovery(ls);
4602
4603         error = find_lkb(ls, lkid, &lkb);
4604         if (error)
4605                 goto out;
4606
4607         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4608         if (ua_tmp->castparam)
4609                 ua->castparam = ua_tmp->castparam;
4610         ua->user_lksb = ua_tmp->user_lksb;
4611
4612         error = set_unlock_args(flags, ua, &args);
4613         if (error)
4614                 goto out_put;
4615
4616         error = cancel_lock(ls, lkb, &args);
4617
4618         if (error == -DLM_ECANCEL)
4619                 error = 0;
4620         /* from validate_unlock_args() */
4621         if (error == -EBUSY)
4622                 error = 0;
4623  out_put:
4624         dlm_put_lkb(lkb);
4625  out:
4626         dlm_unlock_recovery(ls);
4627         kfree(ua_tmp);
4628         return error;
4629 }
4630
4631 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4632 {
4633         struct dlm_lkb *lkb;
4634         struct dlm_args args;
4635         struct dlm_user_args *ua;
4636         struct dlm_rsb *r;
4637         int error;
4638
4639         dlm_lock_recovery(ls);
4640
4641         error = find_lkb(ls, lkid, &lkb);
4642         if (error)
4643                 goto out;
4644
4645         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4646
4647         error = set_unlock_args(flags, ua, &args);
4648         if (error)
4649                 goto out_put;
4650
4651         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4652
4653         r = lkb->lkb_resource;
4654         hold_rsb(r);
4655         lock_rsb(r);
4656
4657         error = validate_unlock_args(lkb, &args);
4658         if (error)
4659                 goto out_r;
4660         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4661
4662         error = _cancel_lock(r, lkb);
4663  out_r:
4664         unlock_rsb(r);
4665         put_rsb(r);
4666
4667         if (error == -DLM_ECANCEL)
4668                 error = 0;
4669         /* from validate_unlock_args() */
4670         if (error == -EBUSY)
4671                 error = 0;
4672  out_put:
4673         dlm_put_lkb(lkb);
4674  out:
4675         dlm_unlock_recovery(ls);
4676         return error;
4677 }
4678
4679 /* lkb's that are removed from the waiters list by revert are just left on the
4680    orphans list with the granted orphan locks, to be freed by purge */
4681
4682 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4683 {
4684         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4685         struct dlm_args args;
4686         int error;
4687
4688         hold_lkb(lkb);
4689         mutex_lock(&ls->ls_orphans_mutex);
4690         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4691         mutex_unlock(&ls->ls_orphans_mutex);
4692
4693         set_unlock_args(0, ua, &args);
4694
4695         error = cancel_lock(ls, lkb, &args);
4696         if (error == -DLM_ECANCEL)
4697                 error = 0;
4698         return error;
4699 }
4700
4701 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4702    Regardless of what rsb queue the lock is on, it's removed and freed. */
4703
4704 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4705 {
4706         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4707         struct dlm_args args;
4708         int error;
4709
4710         set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4711
4712         error = unlock_lock(ls, lkb, &args);
4713         if (error == -DLM_EUNLOCK)
4714                 error = 0;
4715         return error;
4716 }
4717
4718 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4719    (which does lock_rsb) due to deadlock with receiving a message that does
4720    lock_rsb followed by dlm_user_add_ast() */
4721
4722 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4723                                      struct dlm_user_proc *proc)
4724 {
4725         struct dlm_lkb *lkb = NULL;
4726
4727         mutex_lock(&ls->ls_clear_proc_locks);
4728         if (list_empty(&proc->locks))
4729                 goto out;
4730
4731         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4732         list_del_init(&lkb->lkb_ownqueue);
4733
4734         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4735                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4736         else
4737                 lkb->lkb_flags |= DLM_IFL_DEAD;
4738  out:
4739         mutex_unlock(&ls->ls_clear_proc_locks);
4740         return lkb;
4741 }
4742
4743 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4744    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4745    which we clear here. */
4746
4747 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4748    list, and no more device_writes should add lkb's to proc->locks list; so we
4749    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4750    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4751    them ourself. */
4752
4753 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4754 {
4755         struct dlm_lkb *lkb, *safe;
4756
4757         dlm_lock_recovery(ls);
4758
4759         while (1) {
4760                 lkb = del_proc_lock(ls, proc);
4761                 if (!lkb)
4762                         break;
4763                 del_timeout(lkb);
4764                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4765                         orphan_proc_lock(ls, lkb);
4766                 else
4767                         unlock_proc_lock(ls, lkb);
4768
4769                 /* this removes the reference for the proc->locks list
4770                    added by dlm_user_request, it may result in the lkb
4771                    being freed */
4772
4773                 dlm_put_lkb(lkb);
4774         }
4775
4776         mutex_lock(&ls->ls_clear_proc_locks);
4777
4778         /* in-progress unlocks */
4779         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4780                 list_del_init(&lkb->lkb_ownqueue);
4781                 lkb->lkb_flags |= DLM_IFL_DEAD;
4782                 dlm_put_lkb(lkb);
4783         }
4784
4785         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4786                 lkb->lkb_ast_type = 0;
4787                 list_del(&lkb->lkb_astqueue);
4788                 dlm_put_lkb(lkb);
4789         }
4790
4791         mutex_unlock(&ls->ls_clear_proc_locks);
4792         dlm_unlock_recovery(ls);
4793 }
4794
4795 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4796 {
4797         struct dlm_lkb *lkb, *safe;
4798
4799         while (1) {
4800                 lkb = NULL;
4801                 spin_lock(&proc->locks_spin);
4802                 if (!list_empty(&proc->locks)) {
4803                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4804                                          lkb_ownqueue);
4805                         list_del_init(&lkb->lkb_ownqueue);
4806                 }
4807                 spin_unlock(&proc->locks_spin);
4808
4809                 if (!lkb)
4810                         break;
4811
4812                 lkb->lkb_flags |= DLM_IFL_DEAD;
4813                 unlock_proc_lock(ls, lkb);
4814                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4815         }
4816
4817         spin_lock(&proc->locks_spin);
4818         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4819                 list_del_init(&lkb->lkb_ownqueue);
4820                 lkb->lkb_flags |= DLM_IFL_DEAD;
4821                 dlm_put_lkb(lkb);
4822         }
4823         spin_unlock(&proc->locks_spin);
4824
4825         spin_lock(&proc->asts_spin);
4826         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4827                 list_del(&lkb->lkb_astqueue);
4828                 dlm_put_lkb(lkb);
4829         }
4830         spin_unlock(&proc->asts_spin);
4831 }
4832
4833 /* pid of 0 means purge all orphans */
4834
4835 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4836 {
4837         struct dlm_lkb *lkb, *safe;
4838
4839         mutex_lock(&ls->ls_orphans_mutex);
4840         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4841                 if (pid && lkb->lkb_ownpid != pid)
4842                         continue;
4843                 unlock_proc_lock(ls, lkb);
4844                 list_del_init(&lkb->lkb_ownqueue);
4845                 dlm_put_lkb(lkb);
4846         }
4847         mutex_unlock(&ls->ls_orphans_mutex);
4848 }
4849
4850 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4851 {
4852         struct dlm_message *ms;
4853         struct dlm_mhandle *mh;
4854         int error;
4855
4856         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4857                                 DLM_MSG_PURGE, &ms, &mh);
4858         if (error)
4859                 return error;
4860         ms->m_nodeid = nodeid;
4861         ms->m_pid = pid;
4862
4863         return send_message(mh, ms);
4864 }
4865
4866 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4867                    int nodeid, int pid)
4868 {
4869         int error = 0;
4870
4871         if (nodeid != dlm_our_nodeid()) {
4872                 error = send_purge(ls, nodeid, pid);
4873         } else {
4874                 dlm_lock_recovery(ls);
4875                 if (pid == current->pid)
4876                         purge_proc_locks(ls, proc);
4877                 else
4878                         do_purge(ls, nodeid, pid);
4879                 dlm_unlock_recovery(ls);
4880         }
4881         return error;
4882 }
4883