Merge branch 'sched/new-API-sched_setscheduler' of git://git.kernel.org/pub/scm/linux...
[linux-2.6] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87                                     struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132
133 #define modes_compat(gr, rq) \
134         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167
168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171                r->res_nodeid, r->res_flags, r->res_first_lkid,
172                r->res_recover_locks_count, r->res_name);
173 }
174
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177         struct dlm_lkb *lkb;
178
179         dlm_print_rsb(r);
180
181         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183         printk(KERN_ERR "rsb lookup list\n");
184         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185                 dlm_print_lkb(lkb);
186         printk(KERN_ERR "rsb grant queue:\n");
187         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb convert queue:\n");
190         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb wait queue:\n");
193         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195 }
196
197 /* Threads cannot use the lockspace while it's being recovered */
198
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201         down_read(&ls->ls_in_recovery);
202 }
203
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206         up_read(&ls->ls_in_recovery);
207 }
208
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211         return down_read_trylock(&ls->ls_in_recovery);
212 }
213
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242         return !!r->res_nodeid;
243 }
244
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261                 return 1;
262         return 0;
263 }
264
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283                                   DLM_IFL_OVERLAP_CANCEL));
284 }
285
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288         if (is_master_copy(lkb))
289                 return;
290
291         del_timeout(lkb);
292
293         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294
295         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
296            timeout caused the cancel then return -ETIMEDOUT */
297         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299                 rv = -ETIMEDOUT;
300         }
301
302         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304                 rv = -EDEADLK;
305         }
306
307         lkb->lkb_lksb->sb_status = rv;
308         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309
310         dlm_add_ast(lkb, AST_COMP);
311 }
312
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315         queue_cast(r, lkb,
316                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321         if (is_master_copy(lkb))
322                 send_bast(r, lkb, rqmode);
323         else {
324                 lkb->lkb_bastmode = rqmode;
325                 dlm_add_ast(lkb, AST_BAST);
326         }
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
334 {
335         struct dlm_rsb *r;
336
337         r = dlm_allocate_rsb(ls, len);
338         if (!r)
339                 return NULL;
340
341         r->res_ls = ls;
342         r->res_length = len;
343         memcpy(r->res_name, name, len);
344         mutex_init(&r->res_mutex);
345
346         INIT_LIST_HEAD(&r->res_lookup);
347         INIT_LIST_HEAD(&r->res_grantqueue);
348         INIT_LIST_HEAD(&r->res_convertqueue);
349         INIT_LIST_HEAD(&r->res_waitqueue);
350         INIT_LIST_HEAD(&r->res_root_list);
351         INIT_LIST_HEAD(&r->res_recover_list);
352
353         return r;
354 }
355
356 static int search_rsb_list(struct list_head *head, char *name, int len,
357                            unsigned int flags, struct dlm_rsb **r_ret)
358 {
359         struct dlm_rsb *r;
360         int error = 0;
361
362         list_for_each_entry(r, head, res_hashchain) {
363                 if (len == r->res_length && !memcmp(name, r->res_name, len))
364                         goto found;
365         }
366         return -EBADR;
367
368  found:
369         if (r->res_nodeid && (flags & R_MASTER))
370                 error = -ENOTBLK;
371         *r_ret = r;
372         return error;
373 }
374
375 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
376                        unsigned int flags, struct dlm_rsb **r_ret)
377 {
378         struct dlm_rsb *r;
379         int error;
380
381         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
382         if (!error) {
383                 kref_get(&r->res_ref);
384                 goto out;
385         }
386         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
387         if (error)
388                 goto out;
389
390         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
391
392         if (dlm_no_directory(ls))
393                 goto out;
394
395         if (r->res_nodeid == -1) {
396                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
397                 r->res_first_lkid = 0;
398         } else if (r->res_nodeid > 0) {
399                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
400                 r->res_first_lkid = 0;
401         } else {
402                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
403                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
404         }
405  out:
406         *r_ret = r;
407         return error;
408 }
409
410 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
411                       unsigned int flags, struct dlm_rsb **r_ret)
412 {
413         int error;
414         write_lock(&ls->ls_rsbtbl[b].lock);
415         error = _search_rsb(ls, name, len, b, flags, r_ret);
416         write_unlock(&ls->ls_rsbtbl[b].lock);
417         return error;
418 }
419
420 /*
421  * Find rsb in rsbtbl and potentially create/add one
422  *
423  * Delaying the release of rsb's has a similar benefit to applications keeping
424  * NL locks on an rsb, but without the guarantee that the cached master value
425  * will still be valid when the rsb is reused.  Apps aren't always smart enough
426  * to keep NL locks on an rsb that they may lock again shortly; this can lead
427  * to excessive master lookups and removals if we don't delay the release.
428  *
429  * Searching for an rsb means looking through both the normal list and toss
430  * list.  When found on the toss list the rsb is moved to the normal list with
431  * ref count of 1; when found on normal list the ref count is incremented.
432  */
433
434 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
435                     unsigned int flags, struct dlm_rsb **r_ret)
436 {
437         struct dlm_rsb *r, *tmp;
438         uint32_t hash, bucket;
439         int error = -EINVAL;
440
441         if (namelen > DLM_RESNAME_MAXLEN)
442                 goto out;
443
444         if (dlm_no_directory(ls))
445                 flags |= R_CREATE;
446
447         error = 0;
448         hash = jhash(name, namelen, 0);
449         bucket = hash & (ls->ls_rsbtbl_size - 1);
450
451         error = search_rsb(ls, name, namelen, bucket, flags, &r);
452         if (!error)
453                 goto out;
454
455         if (error == -EBADR && !(flags & R_CREATE))
456                 goto out;
457
458         /* the rsb was found but wasn't a master copy */
459         if (error == -ENOTBLK)
460                 goto out;
461
462         error = -ENOMEM;
463         r = create_rsb(ls, name, namelen);
464         if (!r)
465                 goto out;
466
467         r->res_hash = hash;
468         r->res_bucket = bucket;
469         r->res_nodeid = -1;
470         kref_init(&r->res_ref);
471
472         /* With no directory, the master can be set immediately */
473         if (dlm_no_directory(ls)) {
474                 int nodeid = dlm_dir_nodeid(r);
475                 if (nodeid == dlm_our_nodeid())
476                         nodeid = 0;
477                 r->res_nodeid = nodeid;
478         }
479
480         write_lock(&ls->ls_rsbtbl[bucket].lock);
481         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
482         if (!error) {
483                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
484                 dlm_free_rsb(r);
485                 r = tmp;
486                 goto out;
487         }
488         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
489         write_unlock(&ls->ls_rsbtbl[bucket].lock);
490         error = 0;
491  out:
492         *r_ret = r;
493         return error;
494 }
495
496 /* This is only called to add a reference when the code already holds
497    a valid reference to the rsb, so there's no need for locking. */
498
499 static inline void hold_rsb(struct dlm_rsb *r)
500 {
501         kref_get(&r->res_ref);
502 }
503
504 void dlm_hold_rsb(struct dlm_rsb *r)
505 {
506         hold_rsb(r);
507 }
508
509 static void toss_rsb(struct kref *kref)
510 {
511         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
512         struct dlm_ls *ls = r->res_ls;
513
514         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
515         kref_init(&r->res_ref);
516         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
517         r->res_toss_time = jiffies;
518         if (r->res_lvbptr) {
519                 dlm_free_lvb(r->res_lvbptr);
520                 r->res_lvbptr = NULL;
521         }
522 }
523
524 /* When all references to the rsb are gone it's transfered to
525    the tossed list for later disposal. */
526
527 static void put_rsb(struct dlm_rsb *r)
528 {
529         struct dlm_ls *ls = r->res_ls;
530         uint32_t bucket = r->res_bucket;
531
532         write_lock(&ls->ls_rsbtbl[bucket].lock);
533         kref_put(&r->res_ref, toss_rsb);
534         write_unlock(&ls->ls_rsbtbl[bucket].lock);
535 }
536
537 void dlm_put_rsb(struct dlm_rsb *r)
538 {
539         put_rsb(r);
540 }
541
542 /* See comment for unhold_lkb */
543
544 static void unhold_rsb(struct dlm_rsb *r)
545 {
546         int rv;
547         rv = kref_put(&r->res_ref, toss_rsb);
548         DLM_ASSERT(!rv, dlm_dump_rsb(r););
549 }
550
551 static void kill_rsb(struct kref *kref)
552 {
553         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
554
555         /* All work is done after the return from kref_put() so we
556            can release the write_lock before the remove and free. */
557
558         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
559         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
560         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
561         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
562         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
563         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
564 }
565
566 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
567    The rsb must exist as long as any lkb's for it do. */
568
569 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
570 {
571         hold_rsb(r);
572         lkb->lkb_resource = r;
573 }
574
575 static void detach_lkb(struct dlm_lkb *lkb)
576 {
577         if (lkb->lkb_resource) {
578                 put_rsb(lkb->lkb_resource);
579                 lkb->lkb_resource = NULL;
580         }
581 }
582
583 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
584 {
585         struct dlm_lkb *lkb, *tmp;
586         uint32_t lkid = 0;
587         uint16_t bucket;
588
589         lkb = dlm_allocate_lkb(ls);
590         if (!lkb)
591                 return -ENOMEM;
592
593         lkb->lkb_nodeid = -1;
594         lkb->lkb_grmode = DLM_LOCK_IV;
595         kref_init(&lkb->lkb_ref);
596         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
597         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
598         INIT_LIST_HEAD(&lkb->lkb_time_list);
599
600         get_random_bytes(&bucket, sizeof(bucket));
601         bucket &= (ls->ls_lkbtbl_size - 1);
602
603         write_lock(&ls->ls_lkbtbl[bucket].lock);
604
605         /* counter can roll over so we must verify lkid is not in use */
606
607         while (lkid == 0) {
608                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
609
610                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
611                                     lkb_idtbl_list) {
612                         if (tmp->lkb_id != lkid)
613                                 continue;
614                         lkid = 0;
615                         break;
616                 }
617         }
618
619         lkb->lkb_id = lkid;
620         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
621         write_unlock(&ls->ls_lkbtbl[bucket].lock);
622
623         *lkb_ret = lkb;
624         return 0;
625 }
626
627 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
628 {
629         struct dlm_lkb *lkb;
630         uint16_t bucket = (lkid >> 16);
631
632         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
633                 if (lkb->lkb_id == lkid)
634                         return lkb;
635         }
636         return NULL;
637 }
638
639 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
640 {
641         struct dlm_lkb *lkb;
642         uint16_t bucket = (lkid >> 16);
643
644         if (bucket >= ls->ls_lkbtbl_size)
645                 return -EBADSLT;
646
647         read_lock(&ls->ls_lkbtbl[bucket].lock);
648         lkb = __find_lkb(ls, lkid);
649         if (lkb)
650                 kref_get(&lkb->lkb_ref);
651         read_unlock(&ls->ls_lkbtbl[bucket].lock);
652
653         *lkb_ret = lkb;
654         return lkb ? 0 : -ENOENT;
655 }
656
657 static void kill_lkb(struct kref *kref)
658 {
659         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
660
661         /* All work is done after the return from kref_put() so we
662            can release the write_lock before the detach_lkb */
663
664         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
665 }
666
667 /* __put_lkb() is used when an lkb may not have an rsb attached to
668    it so we need to provide the lockspace explicitly */
669
670 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
671 {
672         uint16_t bucket = (lkb->lkb_id >> 16);
673
674         write_lock(&ls->ls_lkbtbl[bucket].lock);
675         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
676                 list_del(&lkb->lkb_idtbl_list);
677                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
678
679                 detach_lkb(lkb);
680
681                 /* for local/process lkbs, lvbptr points to caller's lksb */
682                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
683                         dlm_free_lvb(lkb->lkb_lvbptr);
684                 dlm_free_lkb(lkb);
685                 return 1;
686         } else {
687                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
688                 return 0;
689         }
690 }
691
692 int dlm_put_lkb(struct dlm_lkb *lkb)
693 {
694         struct dlm_ls *ls;
695
696         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
697         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
698
699         ls = lkb->lkb_resource->res_ls;
700         return __put_lkb(ls, lkb);
701 }
702
703 /* This is only called to add a reference when the code already holds
704    a valid reference to the lkb, so there's no need for locking. */
705
706 static inline void hold_lkb(struct dlm_lkb *lkb)
707 {
708         kref_get(&lkb->lkb_ref);
709 }
710
711 /* This is called when we need to remove a reference and are certain
712    it's not the last ref.  e.g. del_lkb is always called between a
713    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
714    put_lkb would work fine, but would involve unnecessary locking */
715
716 static inline void unhold_lkb(struct dlm_lkb *lkb)
717 {
718         int rv;
719         rv = kref_put(&lkb->lkb_ref, kill_lkb);
720         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
721 }
722
723 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
724                             int mode)
725 {
726         struct dlm_lkb *lkb = NULL;
727
728         list_for_each_entry(lkb, head, lkb_statequeue)
729                 if (lkb->lkb_rqmode < mode)
730                         break;
731
732         if (!lkb)
733                 list_add_tail(new, head);
734         else
735                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
736 }
737
738 /* add/remove lkb to rsb's grant/convert/wait queue */
739
740 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
741 {
742         kref_get(&lkb->lkb_ref);
743
744         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
745
746         lkb->lkb_status = status;
747
748         switch (status) {
749         case DLM_LKSTS_WAITING:
750                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
751                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
752                 else
753                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
754                 break;
755         case DLM_LKSTS_GRANTED:
756                 /* convention says granted locks kept in order of grmode */
757                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
758                                 lkb->lkb_grmode);
759                 break;
760         case DLM_LKSTS_CONVERT:
761                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
762                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
763                 else
764                         list_add_tail(&lkb->lkb_statequeue,
765                                       &r->res_convertqueue);
766                 break;
767         default:
768                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
769         }
770 }
771
772 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
773 {
774         lkb->lkb_status = 0;
775         list_del(&lkb->lkb_statequeue);
776         unhold_lkb(lkb);
777 }
778
779 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
780 {
781         hold_lkb(lkb);
782         del_lkb(r, lkb);
783         add_lkb(r, lkb, sts);
784         unhold_lkb(lkb);
785 }
786
787 static int msg_reply_type(int mstype)
788 {
789         switch (mstype) {
790         case DLM_MSG_REQUEST:
791                 return DLM_MSG_REQUEST_REPLY;
792         case DLM_MSG_CONVERT:
793                 return DLM_MSG_CONVERT_REPLY;
794         case DLM_MSG_UNLOCK:
795                 return DLM_MSG_UNLOCK_REPLY;
796         case DLM_MSG_CANCEL:
797                 return DLM_MSG_CANCEL_REPLY;
798         case DLM_MSG_LOOKUP:
799                 return DLM_MSG_LOOKUP_REPLY;
800         }
801         return -1;
802 }
803
804 /* add/remove lkb from global waiters list of lkb's waiting for
805    a reply from a remote node */
806
807 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
808 {
809         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
810         int error = 0;
811
812         mutex_lock(&ls->ls_waiters_mutex);
813
814         if (is_overlap_unlock(lkb) ||
815             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
816                 error = -EINVAL;
817                 goto out;
818         }
819
820         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
821                 switch (mstype) {
822                 case DLM_MSG_UNLOCK:
823                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
824                         break;
825                 case DLM_MSG_CANCEL:
826                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
827                         break;
828                 default:
829                         error = -EBUSY;
830                         goto out;
831                 }
832                 lkb->lkb_wait_count++;
833                 hold_lkb(lkb);
834
835                 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
836                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
837                           lkb->lkb_wait_count, lkb->lkb_flags);
838                 goto out;
839         }
840
841         DLM_ASSERT(!lkb->lkb_wait_count,
842                    dlm_print_lkb(lkb);
843                    printk("wait_count %d\n", lkb->lkb_wait_count););
844
845         lkb->lkb_wait_count++;
846         lkb->lkb_wait_type = mstype;
847         hold_lkb(lkb);
848         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
849  out:
850         if (error)
851                 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
852                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
853                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
854         mutex_unlock(&ls->ls_waiters_mutex);
855         return error;
856 }
857
858 /* We clear the RESEND flag because we might be taking an lkb off the waiters
859    list as part of process_requestqueue (e.g. a lookup that has an optimized
860    request reply on the requestqueue) between dlm_recover_waiters_pre() which
861    set RESEND and dlm_recover_waiters_post() */
862
863 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
864 {
865         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
866         int overlap_done = 0;
867
868         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
869                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
870                 overlap_done = 1;
871                 goto out_del;
872         }
873
874         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
875                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
876                 overlap_done = 1;
877                 goto out_del;
878         }
879
880         /* N.B. type of reply may not always correspond to type of original
881            msg due to lookup->request optimization, verify others? */
882
883         if (lkb->lkb_wait_type) {
884                 lkb->lkb_wait_type = 0;
885                 goto out_del;
886         }
887
888         log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
889                   lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
890         return -1;
891
892  out_del:
893         /* the force-unlock/cancel has completed and we haven't recvd a reply
894            to the op that was in progress prior to the unlock/cancel; we
895            give up on any reply to the earlier op.  FIXME: not sure when/how
896            this would happen */
897
898         if (overlap_done && lkb->lkb_wait_type) {
899                 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
900                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
901                 lkb->lkb_wait_count--;
902                 lkb->lkb_wait_type = 0;
903         }
904
905         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
906
907         lkb->lkb_flags &= ~DLM_IFL_RESEND;
908         lkb->lkb_wait_count--;
909         if (!lkb->lkb_wait_count)
910                 list_del_init(&lkb->lkb_wait_reply);
911         unhold_lkb(lkb);
912         return 0;
913 }
914
915 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
916 {
917         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
918         int error;
919
920         mutex_lock(&ls->ls_waiters_mutex);
921         error = _remove_from_waiters(lkb, mstype);
922         mutex_unlock(&ls->ls_waiters_mutex);
923         return error;
924 }
925
926 /* Handles situations where we might be processing a "fake" or "stub" reply in
927    which we can't try to take waiters_mutex again. */
928
929 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
930 {
931         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
932         int error;
933
934         if (ms != &ls->ls_stub_ms)
935                 mutex_lock(&ls->ls_waiters_mutex);
936         error = _remove_from_waiters(lkb, ms->m_type);
937         if (ms != &ls->ls_stub_ms)
938                 mutex_unlock(&ls->ls_waiters_mutex);
939         return error;
940 }
941
942 static void dir_remove(struct dlm_rsb *r)
943 {
944         int to_nodeid;
945
946         if (dlm_no_directory(r->res_ls))
947                 return;
948
949         to_nodeid = dlm_dir_nodeid(r);
950         if (to_nodeid != dlm_our_nodeid())
951                 send_remove(r);
952         else
953                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
954                                      r->res_name, r->res_length);
955 }
956
957 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
958    found since they are in order of newest to oldest? */
959
960 static int shrink_bucket(struct dlm_ls *ls, int b)
961 {
962         struct dlm_rsb *r;
963         int count = 0, found;
964
965         for (;;) {
966                 found = 0;
967                 write_lock(&ls->ls_rsbtbl[b].lock);
968                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
969                                             res_hashchain) {
970                         if (!time_after_eq(jiffies, r->res_toss_time +
971                                            dlm_config.ci_toss_secs * HZ))
972                                 continue;
973                         found = 1;
974                         break;
975                 }
976
977                 if (!found) {
978                         write_unlock(&ls->ls_rsbtbl[b].lock);
979                         break;
980                 }
981
982                 if (kref_put(&r->res_ref, kill_rsb)) {
983                         list_del(&r->res_hashchain);
984                         write_unlock(&ls->ls_rsbtbl[b].lock);
985
986                         if (is_master(r))
987                                 dir_remove(r);
988                         dlm_free_rsb(r);
989                         count++;
990                 } else {
991                         write_unlock(&ls->ls_rsbtbl[b].lock);
992                         log_error(ls, "tossed rsb in use %s", r->res_name);
993                 }
994         }
995
996         return count;
997 }
998
999 void dlm_scan_rsbs(struct dlm_ls *ls)
1000 {
1001         int i;
1002
1003         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1004                 shrink_bucket(ls, i);
1005                 if (dlm_locking_stopped(ls))
1006                         break;
1007                 cond_resched();
1008         }
1009 }
1010
1011 static void add_timeout(struct dlm_lkb *lkb)
1012 {
1013         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1014
1015         if (is_master_copy(lkb)) {
1016                 lkb->lkb_timestamp = jiffies;
1017                 return;
1018         }
1019
1020         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1021             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1022                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1023                 goto add_it;
1024         }
1025         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1026                 goto add_it;
1027         return;
1028
1029  add_it:
1030         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1031         mutex_lock(&ls->ls_timeout_mutex);
1032         hold_lkb(lkb);
1033         lkb->lkb_timestamp = jiffies;
1034         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1035         mutex_unlock(&ls->ls_timeout_mutex);
1036 }
1037
1038 static void del_timeout(struct dlm_lkb *lkb)
1039 {
1040         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1041
1042         mutex_lock(&ls->ls_timeout_mutex);
1043         if (!list_empty(&lkb->lkb_time_list)) {
1044                 list_del_init(&lkb->lkb_time_list);
1045                 unhold_lkb(lkb);
1046         }
1047         mutex_unlock(&ls->ls_timeout_mutex);
1048 }
1049
1050 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1051    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1052    and then lock rsb because of lock ordering in add_timeout.  We may need
1053    to specify some special timeout-related bits in the lkb that are just to
1054    be accessed under the timeout_mutex. */
1055
1056 void dlm_scan_timeout(struct dlm_ls *ls)
1057 {
1058         struct dlm_rsb *r;
1059         struct dlm_lkb *lkb;
1060         int do_cancel, do_warn;
1061
1062         for (;;) {
1063                 if (dlm_locking_stopped(ls))
1064                         break;
1065
1066                 do_cancel = 0;
1067                 do_warn = 0;
1068                 mutex_lock(&ls->ls_timeout_mutex);
1069                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1070
1071                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1072                             time_after_eq(jiffies, lkb->lkb_timestamp +
1073                                           lkb->lkb_timeout_cs * HZ/100))
1074                                 do_cancel = 1;
1075
1076                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1077                             time_after_eq(jiffies, lkb->lkb_timestamp +
1078                                            dlm_config.ci_timewarn_cs * HZ/100))
1079                                 do_warn = 1;
1080
1081                         if (!do_cancel && !do_warn)
1082                                 continue;
1083                         hold_lkb(lkb);
1084                         break;
1085                 }
1086                 mutex_unlock(&ls->ls_timeout_mutex);
1087
1088                 if (!do_cancel && !do_warn)
1089                         break;
1090
1091                 r = lkb->lkb_resource;
1092                 hold_rsb(r);
1093                 lock_rsb(r);
1094
1095                 if (do_warn) {
1096                         /* clear flag so we only warn once */
1097                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1098                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1099                                 del_timeout(lkb);
1100                         dlm_timeout_warn(lkb);
1101                 }
1102
1103                 if (do_cancel) {
1104                         log_debug(ls, "timeout cancel %x node %d %s",
1105                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1106                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1107                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1108                         del_timeout(lkb);
1109                         _cancel_lock(r, lkb);
1110                 }
1111
1112                 unlock_rsb(r);
1113                 unhold_rsb(r);
1114                 dlm_put_lkb(lkb);
1115         }
1116 }
1117
1118 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1119    dlm_recoverd before checking/setting ls_recover_begin. */
1120
1121 void dlm_adjust_timeouts(struct dlm_ls *ls)
1122 {
1123         struct dlm_lkb *lkb;
1124         long adj = jiffies - ls->ls_recover_begin;
1125
1126         ls->ls_recover_begin = 0;
1127         mutex_lock(&ls->ls_timeout_mutex);
1128         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1129                 lkb->lkb_timestamp += adj;
1130         mutex_unlock(&ls->ls_timeout_mutex);
1131 }
1132
1133 /* lkb is master or local copy */
1134
1135 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1136 {
1137         int b, len = r->res_ls->ls_lvblen;
1138
1139         /* b=1 lvb returned to caller
1140            b=0 lvb written to rsb or invalidated
1141            b=-1 do nothing */
1142
1143         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1144
1145         if (b == 1) {
1146                 if (!lkb->lkb_lvbptr)
1147                         return;
1148
1149                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1150                         return;
1151
1152                 if (!r->res_lvbptr)
1153                         return;
1154
1155                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1156                 lkb->lkb_lvbseq = r->res_lvbseq;
1157
1158         } else if (b == 0) {
1159                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1160                         rsb_set_flag(r, RSB_VALNOTVALID);
1161                         return;
1162                 }
1163
1164                 if (!lkb->lkb_lvbptr)
1165                         return;
1166
1167                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1168                         return;
1169
1170                 if (!r->res_lvbptr)
1171                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1172
1173                 if (!r->res_lvbptr)
1174                         return;
1175
1176                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1177                 r->res_lvbseq++;
1178                 lkb->lkb_lvbseq = r->res_lvbseq;
1179                 rsb_clear_flag(r, RSB_VALNOTVALID);
1180         }
1181
1182         if (rsb_flag(r, RSB_VALNOTVALID))
1183                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1184 }
1185
1186 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1187 {
1188         if (lkb->lkb_grmode < DLM_LOCK_PW)
1189                 return;
1190
1191         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1192                 rsb_set_flag(r, RSB_VALNOTVALID);
1193                 return;
1194         }
1195
1196         if (!lkb->lkb_lvbptr)
1197                 return;
1198
1199         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1200                 return;
1201
1202         if (!r->res_lvbptr)
1203                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1204
1205         if (!r->res_lvbptr)
1206                 return;
1207
1208         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1209         r->res_lvbseq++;
1210         rsb_clear_flag(r, RSB_VALNOTVALID);
1211 }
1212
1213 /* lkb is process copy (pc) */
1214
1215 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1216                             struct dlm_message *ms)
1217 {
1218         int b;
1219
1220         if (!lkb->lkb_lvbptr)
1221                 return;
1222
1223         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1224                 return;
1225
1226         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1227         if (b == 1) {
1228                 int len = receive_extralen(ms);
1229                 if (len > DLM_RESNAME_MAXLEN)
1230                         len = DLM_RESNAME_MAXLEN;
1231                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1232                 lkb->lkb_lvbseq = ms->m_lvbseq;
1233         }
1234 }
1235
1236 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1237    remove_lock -- used for unlock, removes lkb from granted
1238    revert_lock -- used for cancel, moves lkb from convert to granted
1239    grant_lock  -- used for request and convert, adds lkb to granted or
1240                   moves lkb from convert or waiting to granted
1241
1242    Each of these is used for master or local copy lkb's.  There is
1243    also a _pc() variation used to make the corresponding change on
1244    a process copy (pc) lkb. */
1245
1246 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1247 {
1248         del_lkb(r, lkb);
1249         lkb->lkb_grmode = DLM_LOCK_IV;
1250         /* this unhold undoes the original ref from create_lkb()
1251            so this leads to the lkb being freed */
1252         unhold_lkb(lkb);
1253 }
1254
1255 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1256 {
1257         set_lvb_unlock(r, lkb);
1258         _remove_lock(r, lkb);
1259 }
1260
1261 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1262 {
1263         _remove_lock(r, lkb);
1264 }
1265
1266 /* returns: 0 did nothing
1267             1 moved lock to granted
1268            -1 removed lock */
1269
1270 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1271 {
1272         int rv = 0;
1273
1274         lkb->lkb_rqmode = DLM_LOCK_IV;
1275
1276         switch (lkb->lkb_status) {
1277         case DLM_LKSTS_GRANTED:
1278                 break;
1279         case DLM_LKSTS_CONVERT:
1280                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1281                 rv = 1;
1282                 break;
1283         case DLM_LKSTS_WAITING:
1284                 del_lkb(r, lkb);
1285                 lkb->lkb_grmode = DLM_LOCK_IV;
1286                 /* this unhold undoes the original ref from create_lkb()
1287                    so this leads to the lkb being freed */
1288                 unhold_lkb(lkb);
1289                 rv = -1;
1290                 break;
1291         default:
1292                 log_print("invalid status for revert %d", lkb->lkb_status);
1293         }
1294         return rv;
1295 }
1296
1297 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1298 {
1299         return revert_lock(r, lkb);
1300 }
1301
1302 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1303 {
1304         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1305                 lkb->lkb_grmode = lkb->lkb_rqmode;
1306                 if (lkb->lkb_status)
1307                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1308                 else
1309                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1310         }
1311
1312         lkb->lkb_rqmode = DLM_LOCK_IV;
1313 }
1314
1315 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1316 {
1317         set_lvb_lock(r, lkb);
1318         _grant_lock(r, lkb);
1319         lkb->lkb_highbast = 0;
1320 }
1321
1322 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1323                           struct dlm_message *ms)
1324 {
1325         set_lvb_lock_pc(r, lkb, ms);
1326         _grant_lock(r, lkb);
1327 }
1328
1329 /* called by grant_pending_locks() which means an async grant message must
1330    be sent to the requesting node in addition to granting the lock if the
1331    lkb belongs to a remote node. */
1332
1333 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1334 {
1335         grant_lock(r, lkb);
1336         if (is_master_copy(lkb))
1337                 send_grant(r, lkb);
1338         else
1339                 queue_cast(r, lkb, 0);
1340 }
1341
1342 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1343    change the granted/requested modes.  We're munging things accordingly in
1344    the process copy.
1345    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1346    conversion deadlock
1347    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1348    compatible with other granted locks */
1349
1350 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1351 {
1352         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1353                 log_print("munge_demoted %x invalid reply type %d",
1354                           lkb->lkb_id, ms->m_type);
1355                 return;
1356         }
1357
1358         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1359                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1360                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1361                 return;
1362         }
1363
1364         lkb->lkb_grmode = DLM_LOCK_NL;
1365 }
1366
1367 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1368 {
1369         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1370             ms->m_type != DLM_MSG_GRANT) {
1371                 log_print("munge_altmode %x invalid reply type %d",
1372                           lkb->lkb_id, ms->m_type);
1373                 return;
1374         }
1375
1376         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1377                 lkb->lkb_rqmode = DLM_LOCK_PR;
1378         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1379                 lkb->lkb_rqmode = DLM_LOCK_CW;
1380         else {
1381                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1382                 dlm_print_lkb(lkb);
1383         }
1384 }
1385
1386 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1387 {
1388         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1389                                            lkb_statequeue);
1390         if (lkb->lkb_id == first->lkb_id)
1391                 return 1;
1392
1393         return 0;
1394 }
1395
1396 /* Check if the given lkb conflicts with another lkb on the queue. */
1397
1398 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1399 {
1400         struct dlm_lkb *this;
1401
1402         list_for_each_entry(this, head, lkb_statequeue) {
1403                 if (this == lkb)
1404                         continue;
1405                 if (!modes_compat(this, lkb))
1406                         return 1;
1407         }
1408         return 0;
1409 }
1410
1411 /*
1412  * "A conversion deadlock arises with a pair of lock requests in the converting
1413  * queue for one resource.  The granted mode of each lock blocks the requested
1414  * mode of the other lock."
1415  *
1416  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1417  * convert queue from being granted, then deadlk/demote lkb.
1418  *
1419  * Example:
1420  * Granted Queue: empty
1421  * Convert Queue: NL->EX (first lock)
1422  *                PR->EX (second lock)
1423  *
1424  * The first lock can't be granted because of the granted mode of the second
1425  * lock and the second lock can't be granted because it's not first in the
1426  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1427  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1428  * flag set and return DEMOTED in the lksb flags.
1429  *
1430  * Originally, this function detected conv-deadlk in a more limited scope:
1431  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1432  * - if lkb1 was the first entry in the queue (not just earlier), and was
1433  *   blocked by the granted mode of lkb2, and there was nothing on the
1434  *   granted queue preventing lkb1 from being granted immediately, i.e.
1435  *   lkb2 was the only thing preventing lkb1 from being granted.
1436  *
1437  * That second condition meant we'd only say there was conv-deadlk if
1438  * resolving it (by demotion) would lead to the first lock on the convert
1439  * queue being granted right away.  It allowed conversion deadlocks to exist
1440  * between locks on the convert queue while they couldn't be granted anyway.
1441  *
1442  * Now, we detect and take action on conversion deadlocks immediately when
1443  * they're created, even if they may not be immediately consequential.  If
1444  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1445  * mode that would prevent lkb1's conversion from being granted, we do a
1446  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1447  * I think this means that the lkb_is_ahead condition below should always
1448  * be zero, i.e. there will never be conv-deadlk between two locks that are
1449  * both already on the convert queue.
1450  */
1451
1452 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1453 {
1454         struct dlm_lkb *lkb1;
1455         int lkb_is_ahead = 0;
1456
1457         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1458                 if (lkb1 == lkb2) {
1459                         lkb_is_ahead = 1;
1460                         continue;
1461                 }
1462
1463                 if (!lkb_is_ahead) {
1464                         if (!modes_compat(lkb2, lkb1))
1465                                 return 1;
1466                 } else {
1467                         if (!modes_compat(lkb2, lkb1) &&
1468                             !modes_compat(lkb1, lkb2))
1469                                 return 1;
1470                 }
1471         }
1472         return 0;
1473 }
1474
1475 /*
1476  * Return 1 if the lock can be granted, 0 otherwise.
1477  * Also detect and resolve conversion deadlocks.
1478  *
1479  * lkb is the lock to be granted
1480  *
1481  * now is 1 if the function is being called in the context of the
1482  * immediate request, it is 0 if called later, after the lock has been
1483  * queued.
1484  *
1485  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1486  */
1487
1488 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1489 {
1490         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1491
1492         /*
1493          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1494          * a new request for a NL mode lock being blocked.
1495          *
1496          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1497          * request, then it would be granted.  In essence, the use of this flag
1498          * tells the Lock Manager to expedite theis request by not considering
1499          * what may be in the CONVERTING or WAITING queues...  As of this
1500          * writing, the EXPEDITE flag can be used only with new requests for NL
1501          * mode locks.  This flag is not valid for conversion requests.
1502          *
1503          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1504          * conversion or used with a non-NL requested mode.  We also know an
1505          * EXPEDITE request is always granted immediately, so now must always
1506          * be 1.  The full condition to grant an expedite request: (now &&
1507          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1508          * therefore be shortened to just checking the flag.
1509          */
1510
1511         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1512                 return 1;
1513
1514         /*
1515          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1516          * added to the remaining conditions.
1517          */
1518
1519         if (queue_conflict(&r->res_grantqueue, lkb))
1520                 goto out;
1521
1522         /*
1523          * 6-3: By default, a conversion request is immediately granted if the
1524          * requested mode is compatible with the modes of all other granted
1525          * locks
1526          */
1527
1528         if (queue_conflict(&r->res_convertqueue, lkb))
1529                 goto out;
1530
1531         /*
1532          * 6-5: But the default algorithm for deciding whether to grant or
1533          * queue conversion requests does not by itself guarantee that such
1534          * requests are serviced on a "first come first serve" basis.  This, in
1535          * turn, can lead to a phenomenon known as "indefinate postponement".
1536          *
1537          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1538          * the system service employed to request a lock conversion.  This flag
1539          * forces certain conversion requests to be queued, even if they are
1540          * compatible with the granted modes of other locks on the same
1541          * resource.  Thus, the use of this flag results in conversion requests
1542          * being ordered on a "first come first servce" basis.
1543          *
1544          * DCT: This condition is all about new conversions being able to occur
1545          * "in place" while the lock remains on the granted queue (assuming
1546          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1547          * doesn't _have_ to go onto the convert queue where it's processed in
1548          * order.  The "now" variable is necessary to distinguish converts
1549          * being received and processed for the first time now, because once a
1550          * convert is moved to the conversion queue the condition below applies
1551          * requiring fifo granting.
1552          */
1553
1554         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1555                 return 1;
1556
1557         /*
1558          * The NOORDER flag is set to avoid the standard vms rules on grant
1559          * order.
1560          */
1561
1562         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1563                 return 1;
1564
1565         /*
1566          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1567          * granted until all other conversion requests ahead of it are granted
1568          * and/or canceled.
1569          */
1570
1571         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1572                 return 1;
1573
1574         /*
1575          * 6-4: By default, a new request is immediately granted only if all
1576          * three of the following conditions are satisfied when the request is
1577          * issued:
1578          * - The queue of ungranted conversion requests for the resource is
1579          *   empty.
1580          * - The queue of ungranted new requests for the resource is empty.
1581          * - The mode of the new request is compatible with the most
1582          *   restrictive mode of all granted locks on the resource.
1583          */
1584
1585         if (now && !conv && list_empty(&r->res_convertqueue) &&
1586             list_empty(&r->res_waitqueue))
1587                 return 1;
1588
1589         /*
1590          * 6-4: Once a lock request is in the queue of ungranted new requests,
1591          * it cannot be granted until the queue of ungranted conversion
1592          * requests is empty, all ungranted new requests ahead of it are
1593          * granted and/or canceled, and it is compatible with the granted mode
1594          * of the most restrictive lock granted on the resource.
1595          */
1596
1597         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1598             first_in_list(lkb, &r->res_waitqueue))
1599                 return 1;
1600  out:
1601         return 0;
1602 }
1603
1604 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1605                           int *err)
1606 {
1607         int rv;
1608         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1609         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1610
1611         if (err)
1612                 *err = 0;
1613
1614         rv = _can_be_granted(r, lkb, now);
1615         if (rv)
1616                 goto out;
1617
1618         /*
1619          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1620          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1621          * cancels one of the locks.
1622          */
1623
1624         if (is_convert && can_be_queued(lkb) &&
1625             conversion_deadlock_detect(r, lkb)) {
1626                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1627                         lkb->lkb_grmode = DLM_LOCK_NL;
1628                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1629                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1630                         if (err)
1631                                 *err = -EDEADLK;
1632                         else {
1633                                 log_print("can_be_granted deadlock %x now %d",
1634                                           lkb->lkb_id, now);
1635                                 dlm_dump_rsb(r);
1636                         }
1637                 }
1638                 goto out;
1639         }
1640
1641         /*
1642          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1643          * to grant a request in a mode other than the normal rqmode.  It's a
1644          * simple way to provide a big optimization to applications that can
1645          * use them.
1646          */
1647
1648         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1649                 alt = DLM_LOCK_PR;
1650         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1651                 alt = DLM_LOCK_CW;
1652
1653         if (alt) {
1654                 lkb->lkb_rqmode = alt;
1655                 rv = _can_be_granted(r, lkb, now);
1656                 if (rv)
1657                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1658                 else
1659                         lkb->lkb_rqmode = rqmode;
1660         }
1661  out:
1662         return rv;
1663 }
1664
1665 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1666    for locks pending on the convert list.  Once verified (watch for these
1667    log_prints), we should be able to just call _can_be_granted() and not
1668    bother with the demote/deadlk cases here (and there's no easy way to deal
1669    with a deadlk here, we'd have to generate something like grant_lock with
1670    the deadlk error.) */
1671
1672 /* Returns the highest requested mode of all blocked conversions; sets
1673    cw if there's a blocked conversion to DLM_LOCK_CW. */
1674
1675 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1676 {
1677         struct dlm_lkb *lkb, *s;
1678         int hi, demoted, quit, grant_restart, demote_restart;
1679         int deadlk;
1680
1681         quit = 0;
1682  restart:
1683         grant_restart = 0;
1684         demote_restart = 0;
1685         hi = DLM_LOCK_IV;
1686
1687         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1688                 demoted = is_demoted(lkb);
1689                 deadlk = 0;
1690
1691                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1692                         grant_lock_pending(r, lkb);
1693                         grant_restart = 1;
1694                         continue;
1695                 }
1696
1697                 if (!demoted && is_demoted(lkb)) {
1698                         log_print("WARN: pending demoted %x node %d %s",
1699                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1700                         demote_restart = 1;
1701                         continue;
1702                 }
1703
1704                 if (deadlk) {
1705                         log_print("WARN: pending deadlock %x node %d %s",
1706                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1707                         dlm_dump_rsb(r);
1708                         continue;
1709                 }
1710
1711                 hi = max_t(int, lkb->lkb_rqmode, hi);
1712
1713                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1714                         *cw = 1;
1715         }
1716
1717         if (grant_restart)
1718                 goto restart;
1719         if (demote_restart && !quit) {
1720                 quit = 1;
1721                 goto restart;
1722         }
1723
1724         return max_t(int, high, hi);
1725 }
1726
1727 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1728 {
1729         struct dlm_lkb *lkb, *s;
1730
1731         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1732                 if (can_be_granted(r, lkb, 0, NULL))
1733                         grant_lock_pending(r, lkb);
1734                 else {
1735                         high = max_t(int, lkb->lkb_rqmode, high);
1736                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1737                                 *cw = 1;
1738                 }
1739         }
1740
1741         return high;
1742 }
1743
1744 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1745    on either the convert or waiting queue.
1746    high is the largest rqmode of all locks blocked on the convert or
1747    waiting queue. */
1748
1749 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1750 {
1751         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1752                 if (gr->lkb_highbast < DLM_LOCK_EX)
1753                         return 1;
1754                 return 0;
1755         }
1756
1757         if (gr->lkb_highbast < high &&
1758             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1759                 return 1;
1760         return 0;
1761 }
1762
1763 static void grant_pending_locks(struct dlm_rsb *r)
1764 {
1765         struct dlm_lkb *lkb, *s;
1766         int high = DLM_LOCK_IV;
1767         int cw = 0;
1768
1769         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1770
1771         high = grant_pending_convert(r, high, &cw);
1772         high = grant_pending_wait(r, high, &cw);
1773
1774         if (high == DLM_LOCK_IV)
1775                 return;
1776
1777         /*
1778          * If there are locks left on the wait/convert queue then send blocking
1779          * ASTs to granted locks based on the largest requested mode (high)
1780          * found above.
1781          */
1782
1783         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1784                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1785                         if (cw && high == DLM_LOCK_PR)
1786                                 queue_bast(r, lkb, DLM_LOCK_CW);
1787                         else
1788                                 queue_bast(r, lkb, high);
1789                         lkb->lkb_highbast = high;
1790                 }
1791         }
1792 }
1793
1794 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1795 {
1796         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1797             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1798                 if (gr->lkb_highbast < DLM_LOCK_EX)
1799                         return 1;
1800                 return 0;
1801         }
1802
1803         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1804                 return 1;
1805         return 0;
1806 }
1807
1808 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1809                             struct dlm_lkb *lkb)
1810 {
1811         struct dlm_lkb *gr;
1812
1813         list_for_each_entry(gr, head, lkb_statequeue) {
1814                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1815                         queue_bast(r, gr, lkb->lkb_rqmode);
1816                         gr->lkb_highbast = lkb->lkb_rqmode;
1817                 }
1818         }
1819 }
1820
1821 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1822 {
1823         send_bast_queue(r, &r->res_grantqueue, lkb);
1824 }
1825
1826 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1827 {
1828         send_bast_queue(r, &r->res_grantqueue, lkb);
1829         send_bast_queue(r, &r->res_convertqueue, lkb);
1830 }
1831
1832 /* set_master(r, lkb) -- set the master nodeid of a resource
1833
1834    The purpose of this function is to set the nodeid field in the given
1835    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1836    known, it can just be copied to the lkb and the function will return
1837    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1838    before it can be copied to the lkb.
1839
1840    When the rsb nodeid is being looked up remotely, the initial lkb
1841    causing the lookup is kept on the ls_waiters list waiting for the
1842    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1843    on the rsb's res_lookup list until the master is verified.
1844
1845    Return values:
1846    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1847    1: the rsb master is not available and the lkb has been placed on
1848       a wait queue
1849 */
1850
1851 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1852 {
1853         struct dlm_ls *ls = r->res_ls;
1854         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1855
1856         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1857                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1858                 r->res_first_lkid = lkb->lkb_id;
1859                 lkb->lkb_nodeid = r->res_nodeid;
1860                 return 0;
1861         }
1862
1863         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1864                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1865                 return 1;
1866         }
1867
1868         if (r->res_nodeid == 0) {
1869                 lkb->lkb_nodeid = 0;
1870                 return 0;
1871         }
1872
1873         if (r->res_nodeid > 0) {
1874                 lkb->lkb_nodeid = r->res_nodeid;
1875                 return 0;
1876         }
1877
1878         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1879
1880         dir_nodeid = dlm_dir_nodeid(r);
1881
1882         if (dir_nodeid != our_nodeid) {
1883                 r->res_first_lkid = lkb->lkb_id;
1884                 send_lookup(r, lkb);
1885                 return 1;
1886         }
1887
1888         for (i = 0; i < 2; i++) {
1889                 /* It's possible for dlm_scand to remove an old rsb for
1890                    this same resource from the toss list, us to create
1891                    a new one, look up the master locally, and find it
1892                    already exists just before dlm_scand does the
1893                    dir_remove() on the previous rsb. */
1894
1895                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1896                                        r->res_length, &ret_nodeid);
1897                 if (!error)
1898                         break;
1899                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1900                 schedule();
1901         }
1902         if (error && error != -EEXIST)
1903                 return error;
1904
1905         if (ret_nodeid == our_nodeid) {
1906                 r->res_first_lkid = 0;
1907                 r->res_nodeid = 0;
1908                 lkb->lkb_nodeid = 0;
1909         } else {
1910                 r->res_first_lkid = lkb->lkb_id;
1911                 r->res_nodeid = ret_nodeid;
1912                 lkb->lkb_nodeid = ret_nodeid;
1913         }
1914         return 0;
1915 }
1916
1917 static void process_lookup_list(struct dlm_rsb *r)
1918 {
1919         struct dlm_lkb *lkb, *safe;
1920
1921         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1922                 list_del_init(&lkb->lkb_rsb_lookup);
1923                 _request_lock(r, lkb);
1924                 schedule();
1925         }
1926 }
1927
1928 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1929
1930 static void confirm_master(struct dlm_rsb *r, int error)
1931 {
1932         struct dlm_lkb *lkb;
1933
1934         if (!r->res_first_lkid)
1935                 return;
1936
1937         switch (error) {
1938         case 0:
1939         case -EINPROGRESS:
1940                 r->res_first_lkid = 0;
1941                 process_lookup_list(r);
1942                 break;
1943
1944         case -EAGAIN:
1945         case -EBADR:
1946         case -ENOTBLK:
1947                 /* the remote request failed and won't be retried (it was
1948                    a NOQUEUE, or has been canceled/unlocked); make a waiting
1949                    lkb the first_lkid */
1950
1951                 r->res_first_lkid = 0;
1952
1953                 if (!list_empty(&r->res_lookup)) {
1954                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1955                                          lkb_rsb_lookup);
1956                         list_del_init(&lkb->lkb_rsb_lookup);
1957                         r->res_first_lkid = lkb->lkb_id;
1958                         _request_lock(r, lkb);
1959                 }
1960                 break;
1961
1962         default:
1963                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1964         }
1965 }
1966
1967 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1968                          int namelen, unsigned long timeout_cs,
1969                          void (*ast) (void *astparam),
1970                          void *astparam,
1971                          void (*bast) (void *astparam, int mode),
1972                          struct dlm_args *args)
1973 {
1974         int rv = -EINVAL;
1975
1976         /* check for invalid arg usage */
1977
1978         if (mode < 0 || mode > DLM_LOCK_EX)
1979                 goto out;
1980
1981         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1982                 goto out;
1983
1984         if (flags & DLM_LKF_CANCEL)
1985                 goto out;
1986
1987         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1988                 goto out;
1989
1990         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1991                 goto out;
1992
1993         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1994                 goto out;
1995
1996         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1997                 goto out;
1998
1999         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2000                 goto out;
2001
2002         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2003                 goto out;
2004
2005         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2006                 goto out;
2007
2008         if (!ast || !lksb)
2009                 goto out;
2010
2011         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2012                 goto out;
2013
2014         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2015                 goto out;
2016
2017         /* these args will be copied to the lkb in validate_lock_args,
2018            it cannot be done now because when converting locks, fields in
2019            an active lkb cannot be modified before locking the rsb */
2020
2021         args->flags = flags;
2022         args->astfn = ast;
2023         args->astparam = astparam;
2024         args->bastfn = bast;
2025         args->timeout = timeout_cs;
2026         args->mode = mode;
2027         args->lksb = lksb;
2028         rv = 0;
2029  out:
2030         return rv;
2031 }
2032
2033 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2034 {
2035         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2036                       DLM_LKF_FORCEUNLOCK))
2037                 return -EINVAL;
2038
2039         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2040                 return -EINVAL;
2041
2042         args->flags = flags;
2043         args->astparam = astarg;
2044         return 0;
2045 }
2046
2047 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2048                               struct dlm_args *args)
2049 {
2050         int rv = -EINVAL;
2051
2052         if (args->flags & DLM_LKF_CONVERT) {
2053                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2054                         goto out;
2055
2056                 if (args->flags & DLM_LKF_QUECVT &&
2057                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2058                         goto out;
2059
2060                 rv = -EBUSY;
2061                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2062                         goto out;
2063
2064                 if (lkb->lkb_wait_type)
2065                         goto out;
2066
2067                 if (is_overlap(lkb))
2068                         goto out;
2069         }
2070
2071         lkb->lkb_exflags = args->flags;
2072         lkb->lkb_sbflags = 0;
2073         lkb->lkb_astfn = args->astfn;
2074         lkb->lkb_astparam = args->astparam;
2075         lkb->lkb_bastfn = args->bastfn;
2076         lkb->lkb_rqmode = args->mode;
2077         lkb->lkb_lksb = args->lksb;
2078         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2079         lkb->lkb_ownpid = (int) current->pid;
2080         lkb->lkb_timeout_cs = args->timeout;
2081         rv = 0;
2082  out:
2083         return rv;
2084 }
2085
2086 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2087    for success */
2088
2089 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2090    because there may be a lookup in progress and it's valid to do
2091    cancel/unlockf on it */
2092
2093 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2094 {
2095         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2096         int rv = -EINVAL;
2097
2098         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2099                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2100                 dlm_print_lkb(lkb);
2101                 goto out;
2102         }
2103
2104         /* an lkb may still exist even though the lock is EOL'ed due to a
2105            cancel, unlock or failed noqueue request; an app can't use these
2106            locks; return same error as if the lkid had not been found at all */
2107
2108         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2109                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2110                 rv = -ENOENT;
2111                 goto out;
2112         }
2113
2114         /* an lkb may be waiting for an rsb lookup to complete where the
2115            lookup was initiated by another lock */
2116
2117         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2118                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2119                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2120                         list_del_init(&lkb->lkb_rsb_lookup);
2121                         queue_cast(lkb->lkb_resource, lkb,
2122                                    args->flags & DLM_LKF_CANCEL ?
2123                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2124                         unhold_lkb(lkb); /* undoes create_lkb() */
2125                 }
2126                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2127                 rv = -EBUSY;
2128                 goto out;
2129         }
2130
2131         /* cancel not allowed with another cancel/unlock in progress */
2132
2133         if (args->flags & DLM_LKF_CANCEL) {
2134                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2135                         goto out;
2136
2137                 if (is_overlap(lkb))
2138                         goto out;
2139
2140                 /* don't let scand try to do a cancel */
2141                 del_timeout(lkb);
2142
2143                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2144                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2145                         rv = -EBUSY;
2146                         goto out;
2147                 }
2148
2149                 switch (lkb->lkb_wait_type) {
2150                 case DLM_MSG_LOOKUP:
2151                 case DLM_MSG_REQUEST:
2152                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2153                         rv = -EBUSY;
2154                         goto out;
2155                 case DLM_MSG_UNLOCK:
2156                 case DLM_MSG_CANCEL:
2157                         goto out;
2158                 }
2159                 /* add_to_waiters() will set OVERLAP_CANCEL */
2160                 goto out_ok;
2161         }
2162
2163         /* do we need to allow a force-unlock if there's a normal unlock
2164            already in progress?  in what conditions could the normal unlock
2165            fail such that we'd want to send a force-unlock to be sure? */
2166
2167         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2168                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2169                         goto out;
2170
2171                 if (is_overlap_unlock(lkb))
2172                         goto out;
2173
2174                 /* don't let scand try to do a cancel */
2175                 del_timeout(lkb);
2176
2177                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2178                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2179                         rv = -EBUSY;
2180                         goto out;
2181                 }
2182
2183                 switch (lkb->lkb_wait_type) {
2184                 case DLM_MSG_LOOKUP:
2185                 case DLM_MSG_REQUEST:
2186                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2187                         rv = -EBUSY;
2188                         goto out;
2189                 case DLM_MSG_UNLOCK:
2190                         goto out;
2191                 }
2192                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2193                 goto out_ok;
2194         }
2195
2196         /* normal unlock not allowed if there's any op in progress */
2197         rv = -EBUSY;
2198         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2199                 goto out;
2200
2201  out_ok:
2202         /* an overlapping op shouldn't blow away exflags from other op */
2203         lkb->lkb_exflags |= args->flags;
2204         lkb->lkb_sbflags = 0;
2205         lkb->lkb_astparam = args->astparam;
2206         rv = 0;
2207  out:
2208         if (rv)
2209                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2210                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2211                           args->flags, lkb->lkb_wait_type,
2212                           lkb->lkb_resource->res_name);
2213         return rv;
2214 }
2215
2216 /*
2217  * Four stage 4 varieties:
2218  * do_request(), do_convert(), do_unlock(), do_cancel()
2219  * These are called on the master node for the given lock and
2220  * from the central locking logic.
2221  */
2222
2223 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2224 {
2225         int error = 0;
2226
2227         if (can_be_granted(r, lkb, 1, NULL)) {
2228                 grant_lock(r, lkb);
2229                 queue_cast(r, lkb, 0);
2230                 goto out;
2231         }
2232
2233         if (can_be_queued(lkb)) {
2234                 error = -EINPROGRESS;
2235                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2236                 send_blocking_asts(r, lkb);
2237                 add_timeout(lkb);
2238                 goto out;
2239         }
2240
2241         error = -EAGAIN;
2242         if (force_blocking_asts(lkb))
2243                 send_blocking_asts_all(r, lkb);
2244         queue_cast(r, lkb, -EAGAIN);
2245
2246  out:
2247         return error;
2248 }
2249
2250 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2251 {
2252         int error = 0;
2253         int deadlk = 0;
2254
2255         /* changing an existing lock may allow others to be granted */
2256
2257         if (can_be_granted(r, lkb, 1, &deadlk)) {
2258                 grant_lock(r, lkb);
2259                 queue_cast(r, lkb, 0);
2260                 grant_pending_locks(r);
2261                 goto out;
2262         }
2263
2264         /* can_be_granted() detected that this lock would block in a conversion
2265            deadlock, so we leave it on the granted queue and return EDEADLK in
2266            the ast for the convert. */
2267
2268         if (deadlk) {
2269                 /* it's left on the granted queue */
2270                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2271                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2272                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2273                 revert_lock(r, lkb);
2274                 queue_cast(r, lkb, -EDEADLK);
2275                 error = -EDEADLK;
2276                 goto out;
2277         }
2278
2279         /* is_demoted() means the can_be_granted() above set the grmode
2280            to NL, and left us on the granted queue.  This auto-demotion
2281            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2282            now grantable.  We have to try to grant other converting locks
2283            before we try again to grant this one. */
2284
2285         if (is_demoted(lkb)) {
2286                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2287                 if (_can_be_granted(r, lkb, 1)) {
2288                         grant_lock(r, lkb);
2289                         queue_cast(r, lkb, 0);
2290                         grant_pending_locks(r);
2291                         goto out;
2292                 }
2293                 /* else fall through and move to convert queue */
2294         }
2295
2296         if (can_be_queued(lkb)) {
2297                 error = -EINPROGRESS;
2298                 del_lkb(r, lkb);
2299                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2300                 send_blocking_asts(r, lkb);
2301                 add_timeout(lkb);
2302                 goto out;
2303         }
2304
2305         error = -EAGAIN;
2306         if (force_blocking_asts(lkb))
2307                 send_blocking_asts_all(r, lkb);
2308         queue_cast(r, lkb, -EAGAIN);
2309
2310  out:
2311         return error;
2312 }
2313
2314 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2315 {
2316         remove_lock(r, lkb);
2317         queue_cast(r, lkb, -DLM_EUNLOCK);
2318         grant_pending_locks(r);
2319         return -DLM_EUNLOCK;
2320 }
2321
2322 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2323  
2324 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2325 {
2326         int error;
2327
2328         error = revert_lock(r, lkb);
2329         if (error) {
2330                 queue_cast(r, lkb, -DLM_ECANCEL);
2331                 grant_pending_locks(r);
2332                 return -DLM_ECANCEL;
2333         }
2334         return 0;
2335 }
2336
2337 /*
2338  * Four stage 3 varieties:
2339  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2340  */
2341
2342 /* add a new lkb to a possibly new rsb, called by requesting process */
2343
2344 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2345 {
2346         int error;
2347
2348         /* set_master: sets lkb nodeid from r */
2349
2350         error = set_master(r, lkb);
2351         if (error < 0)
2352                 goto out;
2353         if (error) {
2354                 error = 0;
2355                 goto out;
2356         }
2357
2358         if (is_remote(r))
2359                 /* receive_request() calls do_request() on remote node */
2360                 error = send_request(r, lkb);
2361         else
2362                 error = do_request(r, lkb);
2363  out:
2364         return error;
2365 }
2366
2367 /* change some property of an existing lkb, e.g. mode */
2368
2369 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2370 {
2371         int error;
2372
2373         if (is_remote(r))
2374                 /* receive_convert() calls do_convert() on remote node */
2375                 error = send_convert(r, lkb);
2376         else
2377                 error = do_convert(r, lkb);
2378
2379         return error;
2380 }
2381
2382 /* remove an existing lkb from the granted queue */
2383
2384 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2385 {
2386         int error;
2387
2388         if (is_remote(r))
2389                 /* receive_unlock() calls do_unlock() on remote node */
2390                 error = send_unlock(r, lkb);
2391         else
2392                 error = do_unlock(r, lkb);
2393
2394         return error;
2395 }
2396
2397 /* remove an existing lkb from the convert or wait queue */
2398
2399 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2400 {
2401         int error;
2402
2403         if (is_remote(r))
2404                 /* receive_cancel() calls do_cancel() on remote node */
2405                 error = send_cancel(r, lkb);
2406         else
2407                 error = do_cancel(r, lkb);
2408
2409         return error;
2410 }
2411
2412 /*
2413  * Four stage 2 varieties:
2414  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2415  */
2416
2417 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2418                         int len, struct dlm_args *args)
2419 {
2420         struct dlm_rsb *r;
2421         int error;
2422
2423         error = validate_lock_args(ls, lkb, args);
2424         if (error)
2425                 goto out;
2426
2427         error = find_rsb(ls, name, len, R_CREATE, &r);
2428         if (error)
2429                 goto out;
2430
2431         lock_rsb(r);
2432
2433         attach_lkb(r, lkb);
2434         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2435
2436         error = _request_lock(r, lkb);
2437
2438         unlock_rsb(r);
2439         put_rsb(r);
2440
2441  out:
2442         return error;
2443 }
2444
2445 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2446                         struct dlm_args *args)
2447 {
2448         struct dlm_rsb *r;
2449         int error;
2450
2451         r = lkb->lkb_resource;
2452
2453         hold_rsb(r);
2454         lock_rsb(r);
2455
2456         error = validate_lock_args(ls, lkb, args);
2457         if (error)
2458                 goto out;
2459
2460         error = _convert_lock(r, lkb);
2461  out:
2462         unlock_rsb(r);
2463         put_rsb(r);
2464         return error;
2465 }
2466
2467 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2468                        struct dlm_args *args)
2469 {
2470         struct dlm_rsb *r;
2471         int error;
2472
2473         r = lkb->lkb_resource;
2474
2475         hold_rsb(r);
2476         lock_rsb(r);
2477
2478         error = validate_unlock_args(lkb, args);
2479         if (error)
2480                 goto out;
2481
2482         error = _unlock_lock(r, lkb);
2483  out:
2484         unlock_rsb(r);
2485         put_rsb(r);
2486         return error;
2487 }
2488
2489 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2490                        struct dlm_args *args)
2491 {
2492         struct dlm_rsb *r;
2493         int error;
2494
2495         r = lkb->lkb_resource;
2496
2497         hold_rsb(r);
2498         lock_rsb(r);
2499
2500         error = validate_unlock_args(lkb, args);
2501         if (error)
2502                 goto out;
2503
2504         error = _cancel_lock(r, lkb);
2505  out:
2506         unlock_rsb(r);
2507         put_rsb(r);
2508         return error;
2509 }
2510
2511 /*
2512  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2513  */
2514
2515 int dlm_lock(dlm_lockspace_t *lockspace,
2516              int mode,
2517              struct dlm_lksb *lksb,
2518              uint32_t flags,
2519              void *name,
2520              unsigned int namelen,
2521              uint32_t parent_lkid,
2522              void (*ast) (void *astarg),
2523              void *astarg,
2524              void (*bast) (void *astarg, int mode))
2525 {
2526         struct dlm_ls *ls;
2527         struct dlm_lkb *lkb;
2528         struct dlm_args args;
2529         int error, convert = flags & DLM_LKF_CONVERT;
2530
2531         ls = dlm_find_lockspace_local(lockspace);
2532         if (!ls)
2533                 return -EINVAL;
2534
2535         dlm_lock_recovery(ls);
2536
2537         if (convert)
2538                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2539         else
2540                 error = create_lkb(ls, &lkb);
2541
2542         if (error)
2543                 goto out;
2544
2545         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2546                               astarg, bast, &args);
2547         if (error)
2548                 goto out_put;
2549
2550         if (convert)
2551                 error = convert_lock(ls, lkb, &args);
2552         else
2553                 error = request_lock(ls, lkb, name, namelen, &args);
2554
2555         if (error == -EINPROGRESS)
2556                 error = 0;
2557  out_put:
2558         if (convert || error)
2559                 __put_lkb(ls, lkb);
2560         if (error == -EAGAIN || error == -EDEADLK)
2561                 error = 0;
2562  out:
2563         dlm_unlock_recovery(ls);
2564         dlm_put_lockspace(ls);
2565         return error;
2566 }
2567
2568 int dlm_unlock(dlm_lockspace_t *lockspace,
2569                uint32_t lkid,
2570                uint32_t flags,
2571                struct dlm_lksb *lksb,
2572                void *astarg)
2573 {
2574         struct dlm_ls *ls;
2575         struct dlm_lkb *lkb;
2576         struct dlm_args args;
2577         int error;
2578
2579         ls = dlm_find_lockspace_local(lockspace);
2580         if (!ls)
2581                 return -EINVAL;
2582
2583         dlm_lock_recovery(ls);
2584
2585         error = find_lkb(ls, lkid, &lkb);
2586         if (error)
2587                 goto out;
2588
2589         error = set_unlock_args(flags, astarg, &args);
2590         if (error)
2591                 goto out_put;
2592
2593         if (flags & DLM_LKF_CANCEL)
2594                 error = cancel_lock(ls, lkb, &args);
2595         else
2596                 error = unlock_lock(ls, lkb, &args);
2597
2598         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2599                 error = 0;
2600         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2601                 error = 0;
2602  out_put:
2603         dlm_put_lkb(lkb);
2604  out:
2605         dlm_unlock_recovery(ls);
2606         dlm_put_lockspace(ls);
2607         return error;
2608 }
2609
2610 /*
2611  * send/receive routines for remote operations and replies
2612  *
2613  * send_args
2614  * send_common
2615  * send_request                 receive_request
2616  * send_convert                 receive_convert
2617  * send_unlock                  receive_unlock
2618  * send_cancel                  receive_cancel
2619  * send_grant                   receive_grant
2620  * send_bast                    receive_bast
2621  * send_lookup                  receive_lookup
2622  * send_remove                  receive_remove
2623  *
2624  *                              send_common_reply
2625  * receive_request_reply        send_request_reply
2626  * receive_convert_reply        send_convert_reply
2627  * receive_unlock_reply         send_unlock_reply
2628  * receive_cancel_reply         send_cancel_reply
2629  * receive_lookup_reply         send_lookup_reply
2630  */
2631
2632 static int _create_message(struct dlm_ls *ls, int mb_len,
2633                            int to_nodeid, int mstype,
2634                            struct dlm_message **ms_ret,
2635                            struct dlm_mhandle **mh_ret)
2636 {
2637         struct dlm_message *ms;
2638         struct dlm_mhandle *mh;
2639         char *mb;
2640
2641         /* get_buffer gives us a message handle (mh) that we need to
2642            pass into lowcomms_commit and a message buffer (mb) that we
2643            write our data into */
2644
2645         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2646         if (!mh)
2647                 return -ENOBUFS;
2648
2649         memset(mb, 0, mb_len);
2650
2651         ms = (struct dlm_message *) mb;
2652
2653         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2654         ms->m_header.h_lockspace = ls->ls_global_id;
2655         ms->m_header.h_nodeid = dlm_our_nodeid();
2656         ms->m_header.h_length = mb_len;
2657         ms->m_header.h_cmd = DLM_MSG;
2658
2659         ms->m_type = mstype;
2660
2661         *mh_ret = mh;
2662         *ms_ret = ms;
2663         return 0;
2664 }
2665
2666 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2667                           int to_nodeid, int mstype,
2668                           struct dlm_message **ms_ret,
2669                           struct dlm_mhandle **mh_ret)
2670 {
2671         int mb_len = sizeof(struct dlm_message);
2672
2673         switch (mstype) {
2674         case DLM_MSG_REQUEST:
2675         case DLM_MSG_LOOKUP:
2676         case DLM_MSG_REMOVE:
2677                 mb_len += r->res_length;
2678                 break;
2679         case DLM_MSG_CONVERT:
2680         case DLM_MSG_UNLOCK:
2681         case DLM_MSG_REQUEST_REPLY:
2682         case DLM_MSG_CONVERT_REPLY:
2683         case DLM_MSG_GRANT:
2684                 if (lkb && lkb->lkb_lvbptr)
2685                         mb_len += r->res_ls->ls_lvblen;
2686                 break;
2687         }
2688
2689         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2690                                ms_ret, mh_ret);
2691 }
2692
2693 /* further lowcomms enhancements or alternate implementations may make
2694    the return value from this function useful at some point */
2695
2696 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2697 {
2698         dlm_message_out(ms);
2699         dlm_lowcomms_commit_buffer(mh);
2700         return 0;
2701 }
2702
2703 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2704                       struct dlm_message *ms)
2705 {
2706         ms->m_nodeid   = lkb->lkb_nodeid;
2707         ms->m_pid      = lkb->lkb_ownpid;
2708         ms->m_lkid     = lkb->lkb_id;
2709         ms->m_remid    = lkb->lkb_remid;
2710         ms->m_exflags  = lkb->lkb_exflags;
2711         ms->m_sbflags  = lkb->lkb_sbflags;
2712         ms->m_flags    = lkb->lkb_flags;
2713         ms->m_lvbseq   = lkb->lkb_lvbseq;
2714         ms->m_status   = lkb->lkb_status;
2715         ms->m_grmode   = lkb->lkb_grmode;
2716         ms->m_rqmode   = lkb->lkb_rqmode;
2717         ms->m_hash     = r->res_hash;
2718
2719         /* m_result and m_bastmode are set from function args,
2720            not from lkb fields */
2721
2722         if (lkb->lkb_bastfn)
2723                 ms->m_asts |= AST_BAST;
2724         if (lkb->lkb_astfn)
2725                 ms->m_asts |= AST_COMP;
2726
2727         /* compare with switch in create_message; send_remove() doesn't
2728            use send_args() */
2729
2730         switch (ms->m_type) {
2731         case DLM_MSG_REQUEST:
2732         case DLM_MSG_LOOKUP:
2733                 memcpy(ms->m_extra, r->res_name, r->res_length);
2734                 break;
2735         case DLM_MSG_CONVERT:
2736         case DLM_MSG_UNLOCK:
2737         case DLM_MSG_REQUEST_REPLY:
2738         case DLM_MSG_CONVERT_REPLY:
2739         case DLM_MSG_GRANT:
2740                 if (!lkb->lkb_lvbptr)
2741                         break;
2742                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2743                 break;
2744         }
2745 }
2746
2747 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2748 {
2749         struct dlm_message *ms;
2750         struct dlm_mhandle *mh;
2751         int to_nodeid, error;
2752
2753         error = add_to_waiters(lkb, mstype);
2754         if (error)
2755                 return error;
2756
2757         to_nodeid = r->res_nodeid;
2758
2759         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2760         if (error)
2761                 goto fail;
2762
2763         send_args(r, lkb, ms);
2764
2765         error = send_message(mh, ms);
2766         if (error)
2767                 goto fail;
2768         return 0;
2769
2770  fail:
2771         remove_from_waiters(lkb, msg_reply_type(mstype));
2772         return error;
2773 }
2774
2775 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2776 {
2777         return send_common(r, lkb, DLM_MSG_REQUEST);
2778 }
2779
2780 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2781 {
2782         int error;
2783
2784         error = send_common(r, lkb, DLM_MSG_CONVERT);
2785
2786         /* down conversions go without a reply from the master */
2787         if (!error && down_conversion(lkb)) {
2788                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2789                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2790                 r->res_ls->ls_stub_ms.m_result = 0;
2791                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2792                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2793         }
2794
2795         return error;
2796 }
2797
2798 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2799    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2800    that the master is still correct. */
2801
2802 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2803 {
2804         return send_common(r, lkb, DLM_MSG_UNLOCK);
2805 }
2806
2807 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2808 {
2809         return send_common(r, lkb, DLM_MSG_CANCEL);
2810 }
2811
2812 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2813 {
2814         struct dlm_message *ms;
2815         struct dlm_mhandle *mh;
2816         int to_nodeid, error;
2817
2818         to_nodeid = lkb->lkb_nodeid;
2819
2820         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2821         if (error)
2822                 goto out;
2823
2824         send_args(r, lkb, ms);
2825
2826         ms->m_result = 0;
2827
2828         error = send_message(mh, ms);
2829  out:
2830         return error;
2831 }
2832
2833 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2834 {
2835         struct dlm_message *ms;
2836         struct dlm_mhandle *mh;
2837         int to_nodeid, error;
2838
2839         to_nodeid = lkb->lkb_nodeid;
2840
2841         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2842         if (error)
2843                 goto out;
2844
2845         send_args(r, lkb, ms);
2846
2847         ms->m_bastmode = mode;
2848
2849         error = send_message(mh, ms);
2850  out:
2851         return error;
2852 }
2853
2854 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2855 {
2856         struct dlm_message *ms;
2857         struct dlm_mhandle *mh;
2858         int to_nodeid, error;
2859
2860         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2861         if (error)
2862                 return error;
2863
2864         to_nodeid = dlm_dir_nodeid(r);
2865
2866         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2867         if (error)
2868                 goto fail;
2869
2870         send_args(r, lkb, ms);
2871
2872         error = send_message(mh, ms);
2873         if (error)
2874                 goto fail;
2875         return 0;
2876
2877  fail:
2878         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2879         return error;
2880 }
2881
2882 static int send_remove(struct dlm_rsb *r)
2883 {
2884         struct dlm_message *ms;
2885         struct dlm_mhandle *mh;
2886         int to_nodeid, error;
2887
2888         to_nodeid = dlm_dir_nodeid(r);
2889
2890         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2891         if (error)
2892                 goto out;
2893
2894         memcpy(ms->m_extra, r->res_name, r->res_length);
2895         ms->m_hash = r->res_hash;
2896
2897         error = send_message(mh, ms);
2898  out:
2899         return error;
2900 }
2901
2902 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2903                              int mstype, int rv)
2904 {
2905         struct dlm_message *ms;
2906         struct dlm_mhandle *mh;
2907         int to_nodeid, error;
2908
2909         to_nodeid = lkb->lkb_nodeid;
2910
2911         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2912         if (error)
2913                 goto out;
2914
2915         send_args(r, lkb, ms);
2916
2917         ms->m_result = rv;
2918
2919         error = send_message(mh, ms);
2920  out:
2921         return error;
2922 }
2923
2924 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2925 {
2926         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2927 }
2928
2929 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2930 {
2931         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2932 }
2933
2934 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2935 {
2936         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2937 }
2938
2939 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2940 {
2941         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2942 }
2943
2944 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2945                              int ret_nodeid, int rv)
2946 {
2947         struct dlm_rsb *r = &ls->ls_stub_rsb;
2948         struct dlm_message *ms;
2949         struct dlm_mhandle *mh;
2950         int error, nodeid = ms_in->m_header.h_nodeid;
2951
2952         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2953         if (error)
2954                 goto out;
2955
2956         ms->m_lkid = ms_in->m_lkid;
2957         ms->m_result = rv;
2958         ms->m_nodeid = ret_nodeid;
2959
2960         error = send_message(mh, ms);
2961  out:
2962         return error;
2963 }
2964
2965 /* which args we save from a received message depends heavily on the type
2966    of message, unlike the send side where we can safely send everything about
2967    the lkb for any type of message */
2968
2969 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2970 {
2971         lkb->lkb_exflags = ms->m_exflags;
2972         lkb->lkb_sbflags = ms->m_sbflags;
2973         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2974                          (ms->m_flags & 0x0000FFFF);
2975 }
2976
2977 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2978 {
2979         lkb->lkb_sbflags = ms->m_sbflags;
2980         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2981                          (ms->m_flags & 0x0000FFFF);
2982 }
2983
2984 static int receive_extralen(struct dlm_message *ms)
2985 {
2986         return (ms->m_header.h_length - sizeof(struct dlm_message));
2987 }
2988
2989 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2990                        struct dlm_message *ms)
2991 {
2992         int len;
2993
2994         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2995                 if (!lkb->lkb_lvbptr)
2996                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
2997                 if (!lkb->lkb_lvbptr)
2998                         return -ENOMEM;
2999                 len = receive_extralen(ms);
3000                 if (len > DLM_RESNAME_MAXLEN)
3001                         len = DLM_RESNAME_MAXLEN;
3002                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3003         }
3004         return 0;
3005 }
3006
3007 static void fake_bastfn(void *astparam, int mode)
3008 {
3009         log_print("fake_bastfn should not be called");
3010 }
3011
3012 static void fake_astfn(void *astparam)
3013 {
3014         log_print("fake_astfn should not be called");
3015 }
3016
3017 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3018                                 struct dlm_message *ms)
3019 {
3020         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3021         lkb->lkb_ownpid = ms->m_pid;
3022         lkb->lkb_remid = ms->m_lkid;
3023         lkb->lkb_grmode = DLM_LOCK_IV;
3024         lkb->lkb_rqmode = ms->m_rqmode;
3025
3026         lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3027         lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3028
3029         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3030                 /* lkb was just created so there won't be an lvb yet */
3031                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3032                 if (!lkb->lkb_lvbptr)
3033                         return -ENOMEM;
3034         }
3035
3036         return 0;
3037 }
3038
3039 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3040                                 struct dlm_message *ms)
3041 {
3042         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3043                 return -EBUSY;
3044
3045         if (receive_lvb(ls, lkb, ms))
3046                 return -ENOMEM;
3047
3048         lkb->lkb_rqmode = ms->m_rqmode;
3049         lkb->lkb_lvbseq = ms->m_lvbseq;
3050
3051         return 0;
3052 }
3053
3054 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3055                                struct dlm_message *ms)
3056 {
3057         if (receive_lvb(ls, lkb, ms))
3058                 return -ENOMEM;
3059         return 0;
3060 }
3061
3062 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3063    uses to send a reply and that the remote end uses to process the reply. */
3064
3065 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3066 {
3067         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3068         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3069         lkb->lkb_remid = ms->m_lkid;
3070 }
3071
3072 /* This is called after the rsb is locked so that we can safely inspect
3073    fields in the lkb. */
3074
3075 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3076 {
3077         int from = ms->m_header.h_nodeid;
3078         int error = 0;
3079
3080         switch (ms->m_type) {
3081         case DLM_MSG_CONVERT:
3082         case DLM_MSG_UNLOCK:
3083         case DLM_MSG_CANCEL:
3084                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3085                         error = -EINVAL;
3086                 break;
3087
3088         case DLM_MSG_CONVERT_REPLY:
3089         case DLM_MSG_UNLOCK_REPLY:
3090         case DLM_MSG_CANCEL_REPLY:
3091         case DLM_MSG_GRANT:
3092         case DLM_MSG_BAST:
3093                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3094                         error = -EINVAL;
3095                 break;
3096
3097         case DLM_MSG_REQUEST_REPLY:
3098                 if (!is_process_copy(lkb))
3099                         error = -EINVAL;
3100                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3101                         error = -EINVAL;
3102                 break;
3103
3104         default:
3105                 error = -EINVAL;
3106         }
3107
3108         if (error)
3109                 log_error(lkb->lkb_resource->res_ls,
3110                           "ignore invalid message %d from %d %x %x %x %d",
3111                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3112                           lkb->lkb_flags, lkb->lkb_nodeid);
3113         return error;
3114 }
3115
3116 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3117 {
3118         struct dlm_lkb *lkb;
3119         struct dlm_rsb *r;
3120         int error, namelen;
3121
3122         error = create_lkb(ls, &lkb);
3123         if (error)
3124                 goto fail;
3125
3126         receive_flags(lkb, ms);
3127         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3128         error = receive_request_args(ls, lkb, ms);
3129         if (error) {
3130                 __put_lkb(ls, lkb);
3131                 goto fail;
3132         }
3133
3134         namelen = receive_extralen(ms);
3135
3136         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3137         if (error) {
3138                 __put_lkb(ls, lkb);
3139                 goto fail;
3140         }
3141
3142         lock_rsb(r);
3143
3144         attach_lkb(r, lkb);
3145         error = do_request(r, lkb);
3146         send_request_reply(r, lkb, error);
3147
3148         unlock_rsb(r);
3149         put_rsb(r);
3150
3151         if (error == -EINPROGRESS)
3152                 error = 0;
3153         if (error)
3154                 dlm_put_lkb(lkb);
3155         return;
3156
3157  fail:
3158         setup_stub_lkb(ls, ms);
3159         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3160 }
3161
3162 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3163 {
3164         struct dlm_lkb *lkb;
3165         struct dlm_rsb *r;
3166         int error, reply = 1;
3167
3168         error = find_lkb(ls, ms->m_remid, &lkb);
3169         if (error)
3170                 goto fail;
3171
3172         r = lkb->lkb_resource;
3173
3174         hold_rsb(r);
3175         lock_rsb(r);
3176
3177         error = validate_message(lkb, ms);
3178         if (error)
3179                 goto out;
3180
3181         receive_flags(lkb, ms);
3182         error = receive_convert_args(ls, lkb, ms);
3183         if (error)
3184                 goto out_reply;
3185         reply = !down_conversion(lkb);
3186
3187         error = do_convert(r, lkb);
3188  out_reply:
3189         if (reply)
3190                 send_convert_reply(r, lkb, error);
3191  out:
3192         unlock_rsb(r);
3193         put_rsb(r);
3194         dlm_put_lkb(lkb);
3195         return;
3196
3197  fail:
3198         setup_stub_lkb(ls, ms);
3199         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3200 }
3201
3202 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3203 {
3204         struct dlm_lkb *lkb;
3205         struct dlm_rsb *r;
3206         int error;
3207
3208         error = find_lkb(ls, ms->m_remid, &lkb);
3209         if (error)
3210                 goto fail;
3211
3212         r = lkb->lkb_resource;
3213
3214         hold_rsb(r);
3215         lock_rsb(r);
3216
3217         error = validate_message(lkb, ms);
3218         if (error)
3219                 goto out;
3220
3221         receive_flags(lkb, ms);
3222         error = receive_unlock_args(ls, lkb, ms);
3223         if (error)
3224                 goto out_reply;
3225
3226         error = do_unlock(r, lkb);
3227  out_reply:
3228         send_unlock_reply(r, lkb, error);
3229  out:
3230         unlock_rsb(r);
3231         put_rsb(r);
3232         dlm_put_lkb(lkb);
3233         return;
3234
3235  fail:
3236         setup_stub_lkb(ls, ms);
3237         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3238 }
3239
3240 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3241 {
3242         struct dlm_lkb *lkb;
3243         struct dlm_rsb *r;
3244         int error;
3245
3246         error = find_lkb(ls, ms->m_remid, &lkb);
3247         if (error)
3248                 goto fail;
3249
3250         receive_flags(lkb, ms);
3251
3252         r = lkb->lkb_resource;
3253
3254         hold_rsb(r);
3255         lock_rsb(r);
3256
3257         error = validate_message(lkb, ms);
3258         if (error)
3259                 goto out;
3260
3261         error = do_cancel(r, lkb);
3262         send_cancel_reply(r, lkb, error);
3263  out:
3264         unlock_rsb(r);
3265         put_rsb(r);
3266         dlm_put_lkb(lkb);
3267         return;
3268
3269  fail:
3270         setup_stub_lkb(ls, ms);
3271         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3272 }
3273
3274 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3275 {
3276         struct dlm_lkb *lkb;
3277         struct dlm_rsb *r;
3278         int error;
3279
3280         error = find_lkb(ls, ms->m_remid, &lkb);
3281         if (error) {
3282                 log_debug(ls, "receive_grant from %d no lkb %x",
3283                           ms->m_header.h_nodeid, ms->m_remid);
3284                 return;
3285         }
3286
3287         r = lkb->lkb_resource;
3288
3289         hold_rsb(r);
3290         lock_rsb(r);
3291
3292         error = validate_message(lkb, ms);
3293         if (error)
3294                 goto out;
3295
3296         receive_flags_reply(lkb, ms);
3297         if (is_altmode(lkb))
3298                 munge_altmode(lkb, ms);
3299         grant_lock_pc(r, lkb, ms);
3300         queue_cast(r, lkb, 0);
3301  out:
3302         unlock_rsb(r);
3303         put_rsb(r);
3304         dlm_put_lkb(lkb);
3305 }
3306
3307 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3308 {
3309         struct dlm_lkb *lkb;
3310         struct dlm_rsb *r;
3311         int error;
3312
3313         error = find_lkb(ls, ms->m_remid, &lkb);
3314         if (error) {
3315                 log_debug(ls, "receive_bast from %d no lkb %x",
3316                           ms->m_header.h_nodeid, ms->m_remid);
3317                 return;
3318         }
3319
3320         r = lkb->lkb_resource;
3321
3322         hold_rsb(r);
3323         lock_rsb(r);
3324
3325         error = validate_message(lkb, ms);
3326         if (error)
3327                 goto out;
3328
3329         queue_bast(r, lkb, ms->m_bastmode);
3330  out:
3331         unlock_rsb(r);
3332         put_rsb(r);
3333         dlm_put_lkb(lkb);
3334 }
3335
3336 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3337 {
3338         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3339
3340         from_nodeid = ms->m_header.h_nodeid;
3341         our_nodeid = dlm_our_nodeid();
3342
3343         len = receive_extralen(ms);
3344
3345         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3346         if (dir_nodeid != our_nodeid) {
3347                 log_error(ls, "lookup dir_nodeid %d from %d",
3348                           dir_nodeid, from_nodeid);
3349                 error = -EINVAL;
3350                 ret_nodeid = -1;
3351                 goto out;
3352         }
3353
3354         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3355
3356         /* Optimization: we're master so treat lookup as a request */
3357         if (!error && ret_nodeid == our_nodeid) {
3358                 receive_request(ls, ms);
3359                 return;
3360         }
3361  out:
3362         send_lookup_reply(ls, ms, ret_nodeid, error);
3363 }
3364
3365 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3366 {
3367         int len, dir_nodeid, from_nodeid;
3368
3369         from_nodeid = ms->m_header.h_nodeid;
3370
3371         len = receive_extralen(ms);
3372
3373         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3374         if (dir_nodeid != dlm_our_nodeid()) {
3375                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3376                           dir_nodeid, from_nodeid);
3377                 return;
3378         }
3379
3380         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3381 }
3382
3383 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3384 {
3385         do_purge(ls, ms->m_nodeid, ms->m_pid);
3386 }
3387
3388 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3389 {
3390         struct dlm_lkb *lkb;
3391         struct dlm_rsb *r;
3392         int error, mstype, result;
3393
3394         error = find_lkb(ls, ms->m_remid, &lkb);
3395         if (error) {
3396                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3397                           ms->m_header.h_nodeid, ms->m_remid);
3398                 return;
3399         }
3400
3401         r = lkb->lkb_resource;
3402         hold_rsb(r);
3403         lock_rsb(r);
3404
3405         error = validate_message(lkb, ms);
3406         if (error)
3407                 goto out;
3408
3409         mstype = lkb->lkb_wait_type;
3410         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3411         if (error)
3412                 goto out;
3413
3414         /* Optimization: the dir node was also the master, so it took our
3415            lookup as a request and sent request reply instead of lookup reply */
3416         if (mstype == DLM_MSG_LOOKUP) {
3417                 r->res_nodeid = ms->m_header.h_nodeid;
3418                 lkb->lkb_nodeid = r->res_nodeid;
3419         }
3420
3421         /* this is the value returned from do_request() on the master */
3422         result = ms->m_result;
3423
3424         switch (result) {
3425         case -EAGAIN:
3426                 /* request would block (be queued) on remote master */
3427                 queue_cast(r, lkb, -EAGAIN);
3428                 confirm_master(r, -EAGAIN);
3429                 unhold_lkb(lkb); /* undoes create_lkb() */
3430                 break;
3431
3432         case -EINPROGRESS:
3433         case 0:
3434                 /* request was queued or granted on remote master */
3435                 receive_flags_reply(lkb, ms);
3436                 lkb->lkb_remid = ms->m_lkid;
3437                 if (is_altmode(lkb))
3438                         munge_altmode(lkb, ms);
3439                 if (result) {
3440                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3441                         add_timeout(lkb);
3442                 } else {
3443                         grant_lock_pc(r, lkb, ms);
3444                         queue_cast(r, lkb, 0);
3445                 }
3446                 confirm_master(r, result);
3447                 break;
3448
3449         case -EBADR:
3450         case -ENOTBLK:
3451                 /* find_rsb failed to find rsb or rsb wasn't master */
3452                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3453                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3454                 r->res_nodeid = -1;
3455                 lkb->lkb_nodeid = -1;
3456
3457                 if (is_overlap(lkb)) {
3458                         /* we'll ignore error in cancel/unlock reply */
3459                         queue_cast_overlap(r, lkb);
3460                         confirm_master(r, result);
3461                         unhold_lkb(lkb); /* undoes create_lkb() */
3462                 } else
3463                         _request_lock(r, lkb);
3464                 break;
3465
3466         default:
3467                 log_error(ls, "receive_request_reply %x error %d",
3468                           lkb->lkb_id, result);
3469         }
3470
3471         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3472                 log_debug(ls, "receive_request_reply %x result %d unlock",
3473                           lkb->lkb_id, result);
3474                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3475                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3476                 send_unlock(r, lkb);
3477         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3478                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3479                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3480                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3481                 send_cancel(r, lkb);
3482         } else {
3483                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3484                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3485         }
3486  out:
3487         unlock_rsb(r);
3488         put_rsb(r);
3489         dlm_put_lkb(lkb);
3490 }
3491
3492 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3493                                     struct dlm_message *ms)
3494 {
3495         /* this is the value returned from do_convert() on the master */
3496         switch (ms->m_result) {
3497         case -EAGAIN:
3498                 /* convert would block (be queued) on remote master */
3499                 queue_cast(r, lkb, -EAGAIN);
3500                 break;
3501
3502         case -EDEADLK:
3503                 receive_flags_reply(lkb, ms);
3504                 revert_lock_pc(r, lkb);
3505                 queue_cast(r, lkb, -EDEADLK);
3506                 break;
3507
3508         case -EINPROGRESS:
3509                 /* convert was queued on remote master */
3510                 receive_flags_reply(lkb, ms);
3511                 if (is_demoted(lkb))
3512                         munge_demoted(lkb, ms);
3513                 del_lkb(r, lkb);
3514                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3515                 add_timeout(lkb);
3516                 break;
3517
3518         case 0:
3519                 /* convert was granted on remote master */
3520                 receive_flags_reply(lkb, ms);
3521                 if (is_demoted(lkb))
3522                         munge_demoted(lkb, ms);
3523                 grant_lock_pc(r, lkb, ms);
3524                 queue_cast(r, lkb, 0);
3525                 break;
3526
3527         default:
3528                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3529                           lkb->lkb_id, ms->m_result);
3530         }
3531 }
3532
3533 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3534 {
3535         struct dlm_rsb *r = lkb->lkb_resource;
3536         int error;
3537
3538         hold_rsb(r);
3539         lock_rsb(r);
3540
3541         error = validate_message(lkb, ms);
3542         if (error)
3543                 goto out;
3544
3545         /* stub reply can happen with waiters_mutex held */
3546         error = remove_from_waiters_ms(lkb, ms);
3547         if (error)
3548                 goto out;
3549
3550         __receive_convert_reply(r, lkb, ms);
3551  out:
3552         unlock_rsb(r);
3553         put_rsb(r);
3554 }
3555
3556 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3557 {
3558         struct dlm_lkb *lkb;
3559         int error;
3560
3561         error = find_lkb(ls, ms->m_remid, &lkb);
3562         if (error) {
3563                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3564                           ms->m_header.h_nodeid, ms->m_remid);
3565                 return;
3566         }
3567
3568         _receive_convert_reply(lkb, ms);
3569         dlm_put_lkb(lkb);
3570 }
3571
3572 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3573 {
3574         struct dlm_rsb *r = lkb->lkb_resource;
3575         int error;
3576
3577         hold_rsb(r);
3578         lock_rsb(r);
3579
3580         error = validate_message(lkb, ms);
3581         if (error)
3582                 goto out;
3583
3584         /* stub reply can happen with waiters_mutex held */
3585         error = remove_from_waiters_ms(lkb, ms);
3586         if (error)
3587                 goto out;
3588
3589         /* this is the value returned from do_unlock() on the master */
3590
3591         switch (ms->m_result) {
3592         case -DLM_EUNLOCK:
3593                 receive_flags_reply(lkb, ms);
3594                 remove_lock_pc(r, lkb);
3595                 queue_cast(r, lkb, -DLM_EUNLOCK);
3596                 break;
3597         case -ENOENT:
3598                 break;
3599         default:
3600                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3601                           lkb->lkb_id, ms->m_result);
3602         }
3603  out:
3604         unlock_rsb(r);
3605         put_rsb(r);
3606 }
3607
3608 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3609 {
3610         struct dlm_lkb *lkb;
3611         int error;
3612
3613         error = find_lkb(ls, ms->m_remid, &lkb);
3614         if (error) {
3615                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3616                           ms->m_header.h_nodeid, ms->m_remid);
3617                 return;
3618         }
3619
3620         _receive_unlock_reply(lkb, ms);
3621         dlm_put_lkb(lkb);
3622 }
3623
3624 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3625 {
3626         struct dlm_rsb *r = lkb->lkb_resource;
3627         int error;
3628
3629         hold_rsb(r);
3630         lock_rsb(r);
3631
3632         error = validate_message(lkb, ms);
3633         if (error)
3634                 goto out;
3635
3636         /* stub reply can happen with waiters_mutex held */
3637         error = remove_from_waiters_ms(lkb, ms);
3638         if (error)
3639                 goto out;
3640
3641         /* this is the value returned from do_cancel() on the master */
3642
3643         switch (ms->m_result) {
3644         case -DLM_ECANCEL:
3645                 receive_flags_reply(lkb, ms);
3646                 revert_lock_pc(r, lkb);
3647                 queue_cast(r, lkb, -DLM_ECANCEL);
3648                 break;
3649         case 0:
3650                 break;
3651         default:
3652                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3653                           lkb->lkb_id, ms->m_result);
3654         }
3655  out:
3656         unlock_rsb(r);
3657         put_rsb(r);
3658 }
3659
3660 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3661 {
3662         struct dlm_lkb *lkb;
3663         int error;
3664
3665         error = find_lkb(ls, ms->m_remid, &lkb);
3666         if (error) {
3667                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3668                           ms->m_header.h_nodeid, ms->m_remid);
3669                 return;
3670         }
3671
3672         _receive_cancel_reply(lkb, ms);
3673         dlm_put_lkb(lkb);
3674 }
3675
3676 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3677 {
3678         struct dlm_lkb *lkb;
3679         struct dlm_rsb *r;
3680         int error, ret_nodeid;
3681
3682         error = find_lkb(ls, ms->m_lkid, &lkb);
3683         if (error) {
3684                 log_error(ls, "receive_lookup_reply no lkb");
3685                 return;
3686         }
3687
3688         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3689            FIXME: will a non-zero error ever be returned? */
3690
3691         r = lkb->lkb_resource;
3692         hold_rsb(r);
3693         lock_rsb(r);
3694
3695         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3696         if (error)
3697                 goto out;
3698
3699         ret_nodeid = ms->m_nodeid;
3700         if (ret_nodeid == dlm_our_nodeid()) {
3701                 r->res_nodeid = 0;
3702                 ret_nodeid = 0;
3703                 r->res_first_lkid = 0;
3704         } else {
3705                 /* set_master() will copy res_nodeid to lkb_nodeid */
3706                 r->res_nodeid = ret_nodeid;
3707         }
3708
3709         if (is_overlap(lkb)) {
3710                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3711                           lkb->lkb_id, lkb->lkb_flags);
3712                 queue_cast_overlap(r, lkb);
3713                 unhold_lkb(lkb); /* undoes create_lkb() */
3714                 goto out_list;
3715         }
3716
3717         _request_lock(r, lkb);
3718
3719  out_list:
3720         if (!ret_nodeid)
3721                 process_lookup_list(r);
3722  out:
3723         unlock_rsb(r);
3724         put_rsb(r);
3725         dlm_put_lkb(lkb);
3726 }
3727
3728 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3729 {
3730         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3731                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3732                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3733                           ms->m_remid, ms->m_result);
3734                 return;
3735         }
3736
3737         switch (ms->m_type) {
3738
3739         /* messages sent to a master node */
3740
3741         case DLM_MSG_REQUEST:
3742                 receive_request(ls, ms);
3743                 break;
3744
3745         case DLM_MSG_CONVERT:
3746                 receive_convert(ls, ms);
3747                 break;
3748
3749         case DLM_MSG_UNLOCK:
3750                 receive_unlock(ls, ms);
3751                 break;
3752
3753         case DLM_MSG_CANCEL:
3754                 receive_cancel(ls, ms);
3755                 break;
3756
3757         /* messages sent from a master node (replies to above) */
3758
3759         case DLM_MSG_REQUEST_REPLY:
3760                 receive_request_reply(ls, ms);
3761                 break;
3762
3763         case DLM_MSG_CONVERT_REPLY:
3764                 receive_convert_reply(ls, ms);
3765                 break;
3766
3767         case DLM_MSG_UNLOCK_REPLY:
3768                 receive_unlock_reply(ls, ms);
3769                 break;
3770
3771         case DLM_MSG_CANCEL_REPLY:
3772                 receive_cancel_reply(ls, ms);
3773                 break;
3774
3775         /* messages sent from a master node (only two types of async msg) */
3776
3777         case DLM_MSG_GRANT:
3778                 receive_grant(ls, ms);
3779                 break;
3780
3781         case DLM_MSG_BAST:
3782                 receive_bast(ls, ms);
3783                 break;
3784
3785         /* messages sent to a dir node */
3786
3787         case DLM_MSG_LOOKUP:
3788                 receive_lookup(ls, ms);
3789                 break;
3790
3791         case DLM_MSG_REMOVE:
3792                 receive_remove(ls, ms);
3793                 break;
3794
3795         /* messages sent from a dir node (remove has no reply) */
3796
3797         case DLM_MSG_LOOKUP_REPLY:
3798                 receive_lookup_reply(ls, ms);
3799                 break;
3800
3801         /* other messages */
3802
3803         case DLM_MSG_PURGE:
3804                 receive_purge(ls, ms);
3805                 break;
3806
3807         default:
3808                 log_error(ls, "unknown message type %d", ms->m_type);
3809         }
3810
3811         dlm_astd_wake();
3812 }
3813
3814 /* If the lockspace is in recovery mode (locking stopped), then normal
3815    messages are saved on the requestqueue for processing after recovery is
3816    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3817    messages off the requestqueue before we process new ones. This occurs right
3818    after recovery completes when we transition from saving all messages on
3819    requestqueue, to processing all the saved messages, to processing new
3820    messages as they arrive. */
3821
3822 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3823                                 int nodeid)
3824 {
3825         if (dlm_locking_stopped(ls)) {
3826                 dlm_add_requestqueue(ls, nodeid, ms);
3827         } else {
3828                 dlm_wait_requestqueue(ls);
3829                 _receive_message(ls, ms);
3830         }
3831 }
3832
3833 /* This is called by dlm_recoverd to process messages that were saved on
3834    the requestqueue. */
3835
3836 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3837 {
3838         _receive_message(ls, ms);
3839 }
3840
3841 /* This is called by the midcomms layer when something is received for
3842    the lockspace.  It could be either a MSG (normal message sent as part of
3843    standard locking activity) or an RCOM (recovery message sent as part of
3844    lockspace recovery). */
3845
3846 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3847 {
3848         struct dlm_header *hd = &p->header;
3849         struct dlm_ls *ls;
3850         int type = 0;
3851
3852         switch (hd->h_cmd) {
3853         case DLM_MSG:
3854                 dlm_message_in(&p->message);
3855                 type = p->message.m_type;
3856                 break;
3857         case DLM_RCOM:
3858                 dlm_rcom_in(&p->rcom);
3859                 type = p->rcom.rc_type;
3860                 break;
3861         default:
3862                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3863                 return;
3864         }
3865
3866         if (hd->h_nodeid != nodeid) {
3867                 log_print("invalid h_nodeid %d from %d lockspace %x",
3868                           hd->h_nodeid, nodeid, hd->h_lockspace);
3869                 return;
3870         }
3871
3872         ls = dlm_find_lockspace_global(hd->h_lockspace);
3873         if (!ls) {
3874                 if (dlm_config.ci_log_debug)
3875                         log_print("invalid lockspace %x from %d cmd %d type %d",
3876                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
3877
3878                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3879                         dlm_send_ls_not_ready(nodeid, &p->rcom);
3880                 return;
3881         }
3882
3883         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3884            be inactive (in this ls) before transitioning to recovery mode */
3885
3886         down_read(&ls->ls_recv_active);
3887         if (hd->h_cmd == DLM_MSG)
3888                 dlm_receive_message(ls, &p->message, nodeid);
3889         else
3890                 dlm_receive_rcom(ls, &p->rcom, nodeid);
3891         up_read(&ls->ls_recv_active);
3892
3893         dlm_put_lockspace(ls);
3894 }
3895
3896 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3897 {
3898         if (middle_conversion(lkb)) {
3899                 hold_lkb(lkb);
3900                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3901                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3902                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3903                 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3904                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3905
3906                 /* Same special case as in receive_rcom_lock_args() */
3907                 lkb->lkb_grmode = DLM_LOCK_IV;
3908                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3909                 unhold_lkb(lkb);
3910
3911         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3912                 lkb->lkb_flags |= DLM_IFL_RESEND;
3913         }
3914
3915         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3916            conversions are async; there's no reply from the remote master */
3917 }
3918
3919 /* A waiting lkb needs recovery if the master node has failed, or
3920    the master node is changing (only when no directory is used) */
3921
3922 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3923 {
3924         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3925                 return 1;
3926
3927         if (!dlm_no_directory(ls))
3928                 return 0;
3929
3930         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3931                 return 1;
3932
3933         return 0;
3934 }
3935
3936 /* Recovery for locks that are waiting for replies from nodes that are now
3937    gone.  We can just complete unlocks and cancels by faking a reply from the
3938    dead node.  Requests and up-conversions we flag to be resent after
3939    recovery.  Down-conversions can just be completed with a fake reply like
3940    unlocks.  Conversions between PR and CW need special attention. */
3941
3942 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3943 {
3944         struct dlm_lkb *lkb, *safe;
3945         int wait_type, stub_unlock_result, stub_cancel_result;
3946
3947         mutex_lock(&ls->ls_waiters_mutex);
3948
3949         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3950                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3951                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3952
3953                 /* all outstanding lookups, regardless of destination  will be
3954                    resent after recovery is done */
3955
3956                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3957                         lkb->lkb_flags |= DLM_IFL_RESEND;
3958                         continue;
3959                 }
3960
3961                 if (!waiter_needs_recovery(ls, lkb))
3962                         continue;
3963
3964                 wait_type = lkb->lkb_wait_type;
3965                 stub_unlock_result = -DLM_EUNLOCK;
3966                 stub_cancel_result = -DLM_ECANCEL;
3967
3968                 /* Main reply may have been received leaving a zero wait_type,
3969                    but a reply for the overlapping op may not have been
3970                    received.  In that case we need to fake the appropriate
3971                    reply for the overlap op. */
3972
3973                 if (!wait_type) {
3974                         if (is_overlap_cancel(lkb)) {
3975                                 wait_type = DLM_MSG_CANCEL;
3976                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
3977                                         stub_cancel_result = 0;
3978                         }
3979                         if (is_overlap_unlock(lkb)) {
3980                                 wait_type = DLM_MSG_UNLOCK;
3981                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
3982                                         stub_unlock_result = -ENOENT;
3983                         }
3984
3985                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
3986                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
3987                                   stub_cancel_result, stub_unlock_result);
3988                 }
3989
3990                 switch (wait_type) {
3991
3992                 case DLM_MSG_REQUEST:
3993                         lkb->lkb_flags |= DLM_IFL_RESEND;
3994                         break;
3995
3996                 case DLM_MSG_CONVERT:
3997                         recover_convert_waiter(ls, lkb);
3998                         break;
3999
4000                 case DLM_MSG_UNLOCK:
4001                         hold_lkb(lkb);
4002                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4003                         ls->ls_stub_ms.m_result = stub_unlock_result;
4004                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4005                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4006                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4007                         dlm_put_lkb(lkb);
4008                         break;
4009
4010                 case DLM_MSG_CANCEL:
4011                         hold_lkb(lkb);
4012                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4013                         ls->ls_stub_ms.m_result = stub_cancel_result;
4014                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4015                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4016                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4017                         dlm_put_lkb(lkb);
4018                         break;
4019
4020                 default:
4021                         log_error(ls, "invalid lkb wait_type %d %d",
4022                                   lkb->lkb_wait_type, wait_type);
4023                 }
4024                 schedule();
4025         }
4026         mutex_unlock(&ls->ls_waiters_mutex);
4027 }
4028
4029 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4030 {
4031         struct dlm_lkb *lkb;
4032         int found = 0;
4033
4034         mutex_lock(&ls->ls_waiters_mutex);
4035         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4036                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4037                         hold_lkb(lkb);
4038                         found = 1;
4039                         break;
4040                 }
4041         }
4042         mutex_unlock(&ls->ls_waiters_mutex);
4043
4044         if (!found)
4045                 lkb = NULL;
4046         return lkb;
4047 }
4048
4049 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4050    master or dir-node for r.  Processing the lkb may result in it being placed
4051    back on waiters. */
4052
4053 /* We do this after normal locking has been enabled and any saved messages
4054    (in requestqueue) have been processed.  We should be confident that at
4055    this point we won't get or process a reply to any of these waiting
4056    operations.  But, new ops may be coming in on the rsbs/locks here from
4057    userspace or remotely. */
4058
4059 /* there may have been an overlap unlock/cancel prior to recovery or after
4060    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4061    overlap flag would just have been set and nothing new sent.  we can be
4062    confident here than any replies to either the initial op or overlap ops
4063    prior to recovery have been received. */
4064
4065 int dlm_recover_waiters_post(struct dlm_ls *ls)
4066 {
4067         struct dlm_lkb *lkb;
4068         struct dlm_rsb *r;
4069         int error = 0, mstype, err, oc, ou;
4070
4071         while (1) {
4072                 if (dlm_locking_stopped(ls)) {
4073                         log_debug(ls, "recover_waiters_post aborted");
4074                         error = -EINTR;
4075                         break;
4076                 }
4077
4078                 lkb = find_resend_waiter(ls);
4079                 if (!lkb)
4080                         break;
4081
4082                 r = lkb->lkb_resource;
4083                 hold_rsb(r);
4084                 lock_rsb(r);
4085
4086                 mstype = lkb->lkb_wait_type;
4087                 oc = is_overlap_cancel(lkb);
4088                 ou = is_overlap_unlock(lkb);
4089                 err = 0;
4090
4091                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4092                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4093
4094                 /* At this point we assume that we won't get a reply to any
4095                    previous op or overlap op on this lock.  First, do a big
4096                    remove_from_waiters() for all previous ops. */
4097
4098                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4099                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4100                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4101                 lkb->lkb_wait_type = 0;
4102                 lkb->lkb_wait_count = 0;
4103                 mutex_lock(&ls->ls_waiters_mutex);
4104                 list_del_init(&lkb->lkb_wait_reply);
4105                 mutex_unlock(&ls->ls_waiters_mutex);
4106                 unhold_lkb(lkb); /* for waiters list */
4107
4108                 if (oc || ou) {
4109                         /* do an unlock or cancel instead of resending */
4110                         switch (mstype) {
4111                         case DLM_MSG_LOOKUP:
4112                         case DLM_MSG_REQUEST:
4113                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4114                                                         -DLM_ECANCEL);
4115                                 unhold_lkb(lkb); /* undoes create_lkb() */
4116                                 break;
4117                         case DLM_MSG_CONVERT:
4118                                 if (oc) {
4119                                         queue_cast(r, lkb, -DLM_ECANCEL);
4120                                 } else {
4121                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4122                                         _unlock_lock(r, lkb);
4123                                 }
4124                                 break;
4125                         default:
4126                                 err = 1;
4127                         }
4128                 } else {
4129                         switch (mstype) {
4130                         case DLM_MSG_LOOKUP:
4131                         case DLM_MSG_REQUEST:
4132                                 _request_lock(r, lkb);
4133                                 if (is_master(r))
4134                                         confirm_master(r, 0);
4135                                 break;
4136                         case DLM_MSG_CONVERT:
4137                                 _convert_lock(r, lkb);
4138                                 break;
4139                         default:
4140                                 err = 1;
4141                         }
4142                 }
4143
4144                 if (err)
4145                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4146                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4147                 unlock_rsb(r);
4148                 put_rsb(r);
4149                 dlm_put_lkb(lkb);
4150         }
4151
4152         return error;
4153 }
4154
4155 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4156                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4157 {
4158         struct dlm_ls *ls = r->res_ls;
4159         struct dlm_lkb *lkb, *safe;
4160
4161         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4162                 if (test(ls, lkb)) {
4163                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4164                         del_lkb(r, lkb);
4165                         /* this put should free the lkb */
4166                         if (!dlm_put_lkb(lkb))
4167                                 log_error(ls, "purged lkb not released");
4168                 }
4169         }
4170 }
4171
4172 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4173 {
4174         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4175 }
4176
4177 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4178 {
4179         return is_master_copy(lkb);
4180 }
4181
4182 static void purge_dead_locks(struct dlm_rsb *r)
4183 {
4184         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4185         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4186         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4187 }
4188
4189 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4190 {
4191         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4192         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4193         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4194 }
4195
4196 /* Get rid of locks held by nodes that are gone. */
4197
4198 int dlm_purge_locks(struct dlm_ls *ls)
4199 {
4200         struct dlm_rsb *r;
4201
4202         log_debug(ls, "dlm_purge_locks");
4203
4204         down_write(&ls->ls_root_sem);
4205         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4206                 hold_rsb(r);
4207                 lock_rsb(r);
4208                 if (is_master(r))
4209                         purge_dead_locks(r);
4210                 unlock_rsb(r);
4211                 unhold_rsb(r);
4212
4213                 schedule();
4214         }
4215         up_write(&ls->ls_root_sem);
4216
4217         return 0;
4218 }
4219
4220 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4221 {
4222         struct dlm_rsb *r, *r_ret = NULL;
4223
4224         read_lock(&ls->ls_rsbtbl[bucket].lock);
4225         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4226                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4227                         continue;
4228                 hold_rsb(r);
4229                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4230                 r_ret = r;
4231                 break;
4232         }
4233         read_unlock(&ls->ls_rsbtbl[bucket].lock);
4234         return r_ret;
4235 }
4236
4237 void dlm_grant_after_purge(struct dlm_ls *ls)
4238 {
4239         struct dlm_rsb *r;
4240         int bucket = 0;
4241
4242         while (1) {
4243                 r = find_purged_rsb(ls, bucket);
4244                 if (!r) {
4245                         if (bucket == ls->ls_rsbtbl_size - 1)
4246                                 break;
4247                         bucket++;
4248                         continue;
4249                 }
4250                 lock_rsb(r);
4251                 if (is_master(r)) {
4252                         grant_pending_locks(r);
4253                         confirm_master(r, 0);
4254                 }
4255                 unlock_rsb(r);
4256                 put_rsb(r);
4257                 schedule();
4258         }
4259 }
4260
4261 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4262                                          uint32_t remid)
4263 {
4264         struct dlm_lkb *lkb;
4265
4266         list_for_each_entry(lkb, head, lkb_statequeue) {
4267                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4268                         return lkb;
4269         }
4270         return NULL;
4271 }
4272
4273 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4274                                     uint32_t remid)
4275 {
4276         struct dlm_lkb *lkb;
4277
4278         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4279         if (lkb)
4280                 return lkb;
4281         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4282         if (lkb)
4283                 return lkb;
4284         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4285         if (lkb)
4286                 return lkb;
4287         return NULL;
4288 }
4289
4290 /* needs at least dlm_rcom + rcom_lock */
4291 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4292                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4293 {
4294         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4295
4296         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4297         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4298         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4299         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4300         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4301         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4302         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4303         lkb->lkb_rqmode = rl->rl_rqmode;
4304         lkb->lkb_grmode = rl->rl_grmode;
4305         /* don't set lkb_status because add_lkb wants to itself */
4306
4307         lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4308         lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4309
4310         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4311                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4312                          sizeof(struct rcom_lock);
4313                 if (lvblen > ls->ls_lvblen)
4314                         return -EINVAL;
4315                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4316                 if (!lkb->lkb_lvbptr)
4317                         return -ENOMEM;
4318                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4319         }
4320
4321         /* Conversions between PR and CW (middle modes) need special handling.
4322            The real granted mode of these converting locks cannot be determined
4323            until all locks have been rebuilt on the rsb (recover_conversion) */
4324
4325         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4326             middle_conversion(lkb)) {
4327                 rl->rl_status = DLM_LKSTS_CONVERT;
4328                 lkb->lkb_grmode = DLM_LOCK_IV;
4329                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4330         }
4331
4332         return 0;
4333 }
4334
4335 /* This lkb may have been recovered in a previous aborted recovery so we need
4336    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4337    If so we just send back a standard reply.  If not, we create a new lkb with
4338    the given values and send back our lkid.  We send back our lkid by sending
4339    back the rcom_lock struct we got but with the remid field filled in. */
4340
4341 /* needs at least dlm_rcom + rcom_lock */
4342 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4343 {
4344         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4345         struct dlm_rsb *r;
4346         struct dlm_lkb *lkb;
4347         int error;
4348
4349         if (rl->rl_parent_lkid) {
4350                 error = -EOPNOTSUPP;
4351                 goto out;
4352         }
4353
4354         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4355                          R_MASTER, &r);
4356         if (error)
4357                 goto out;
4358
4359         lock_rsb(r);
4360
4361         lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4362         if (lkb) {
4363                 error = -EEXIST;
4364                 goto out_remid;
4365         }
4366
4367         error = create_lkb(ls, &lkb);
4368         if (error)
4369                 goto out_unlock;
4370
4371         error = receive_rcom_lock_args(ls, lkb, r, rc);
4372         if (error) {
4373                 __put_lkb(ls, lkb);
4374                 goto out_unlock;
4375         }
4376
4377         attach_lkb(r, lkb);
4378         add_lkb(r, lkb, rl->rl_status);
4379         error = 0;
4380
4381  out_remid:
4382         /* this is the new value returned to the lock holder for
4383            saving in its process-copy lkb */
4384         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4385
4386  out_unlock:
4387         unlock_rsb(r);
4388         put_rsb(r);
4389  out:
4390         if (error)
4391                 log_debug(ls, "recover_master_copy %d %x", error,
4392                           le32_to_cpu(rl->rl_lkid));
4393         rl->rl_result = cpu_to_le32(error);
4394         return error;
4395 }
4396
4397 /* needs at least dlm_rcom + rcom_lock */
4398 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4399 {
4400         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4401         struct dlm_rsb *r;
4402         struct dlm_lkb *lkb;
4403         int error;
4404
4405         error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4406         if (error) {
4407                 log_error(ls, "recover_process_copy no lkid %x",
4408                                 le32_to_cpu(rl->rl_lkid));
4409                 return error;
4410         }
4411
4412         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4413
4414         error = le32_to_cpu(rl->rl_result);
4415
4416         r = lkb->lkb_resource;
4417         hold_rsb(r);
4418         lock_rsb(r);
4419
4420         switch (error) {
4421         case -EBADR:
4422                 /* There's a chance the new master received our lock before
4423                    dlm_recover_master_reply(), this wouldn't happen if we did
4424                    a barrier between recover_masters and recover_locks. */
4425                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4426                           (unsigned long)r, r->res_name);
4427                 dlm_send_rcom_lock(r, lkb);
4428                 goto out;
4429         case -EEXIST:
4430                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4431                 /* fall through */
4432         case 0:
4433                 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4434                 break;
4435         default:
4436                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4437                           error, lkb->lkb_id);
4438         }
4439
4440         /* an ack for dlm_recover_locks() which waits for replies from
4441            all the locks it sends to new masters */
4442         dlm_recovered_lock(r);
4443  out:
4444         unlock_rsb(r);
4445         put_rsb(r);
4446         dlm_put_lkb(lkb);
4447
4448         return 0;
4449 }
4450
4451 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4452                      int mode, uint32_t flags, void *name, unsigned int namelen,
4453                      unsigned long timeout_cs)
4454 {
4455         struct dlm_lkb *lkb;
4456         struct dlm_args args;
4457         int error;
4458
4459         dlm_lock_recovery(ls);
4460
4461         error = create_lkb(ls, &lkb);
4462         if (error) {
4463                 kfree(ua);
4464                 goto out;
4465         }
4466
4467         if (flags & DLM_LKF_VALBLK) {
4468                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4469                 if (!ua->lksb.sb_lvbptr) {
4470                         kfree(ua);
4471                         __put_lkb(ls, lkb);
4472                         error = -ENOMEM;
4473                         goto out;
4474                 }
4475         }
4476
4477         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4478            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4479            lock and that lkb_astparam is the dlm_user_args structure. */
4480
4481         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4482                               fake_astfn, ua, fake_bastfn, &args);
4483         lkb->lkb_flags |= DLM_IFL_USER;
4484         ua->old_mode = DLM_LOCK_IV;
4485
4486         if (error) {
4487                 __put_lkb(ls, lkb);
4488                 goto out;
4489         }
4490
4491         error = request_lock(ls, lkb, name, namelen, &args);
4492
4493         switch (error) {
4494         case 0:
4495                 break;
4496         case -EINPROGRESS:
4497                 error = 0;
4498                 break;
4499         case -EAGAIN:
4500                 error = 0;
4501                 /* fall through */
4502         default:
4503                 __put_lkb(ls, lkb);
4504                 goto out;
4505         }
4506
4507         /* add this new lkb to the per-process list of locks */
4508         spin_lock(&ua->proc->locks_spin);
4509         hold_lkb(lkb);
4510         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4511         spin_unlock(&ua->proc->locks_spin);
4512  out:
4513         dlm_unlock_recovery(ls);
4514         return error;
4515 }
4516
4517 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4518                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4519                      unsigned long timeout_cs)
4520 {
4521         struct dlm_lkb *lkb;
4522         struct dlm_args args;
4523         struct dlm_user_args *ua;
4524         int error;
4525
4526         dlm_lock_recovery(ls);
4527
4528         error = find_lkb(ls, lkid, &lkb);
4529         if (error)
4530                 goto out;
4531
4532         /* user can change the params on its lock when it converts it, or
4533            add an lvb that didn't exist before */
4534
4535         ua = lkb->lkb_ua;
4536
4537         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4538                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4539                 if (!ua->lksb.sb_lvbptr) {
4540                         error = -ENOMEM;
4541                         goto out_put;
4542                 }
4543         }
4544         if (lvb_in && ua->lksb.sb_lvbptr)
4545                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4546
4547         ua->xid = ua_tmp->xid;
4548         ua->castparam = ua_tmp->castparam;
4549         ua->castaddr = ua_tmp->castaddr;
4550         ua->bastparam = ua_tmp->bastparam;
4551         ua->bastaddr = ua_tmp->bastaddr;
4552         ua->user_lksb = ua_tmp->user_lksb;
4553         ua->old_mode = lkb->lkb_grmode;
4554
4555         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4556                               fake_astfn, ua, fake_bastfn, &args);
4557         if (error)
4558                 goto out_put;
4559
4560         error = convert_lock(ls, lkb, &args);
4561
4562         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4563                 error = 0;
4564  out_put:
4565         dlm_put_lkb(lkb);
4566  out:
4567         dlm_unlock_recovery(ls);
4568         kfree(ua_tmp);
4569         return error;
4570 }
4571
4572 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4573                     uint32_t flags, uint32_t lkid, char *lvb_in)
4574 {
4575         struct dlm_lkb *lkb;
4576         struct dlm_args args;
4577         struct dlm_user_args *ua;
4578         int error;
4579
4580         dlm_lock_recovery(ls);
4581
4582         error = find_lkb(ls, lkid, &lkb);
4583         if (error)
4584                 goto out;
4585
4586         ua = lkb->lkb_ua;
4587
4588         if (lvb_in && ua->lksb.sb_lvbptr)
4589                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4590         if (ua_tmp->castparam)
4591                 ua->castparam = ua_tmp->castparam;
4592         ua->user_lksb = ua_tmp->user_lksb;
4593
4594         error = set_unlock_args(flags, ua, &args);
4595         if (error)
4596                 goto out_put;
4597
4598         error = unlock_lock(ls, lkb, &args);
4599
4600         if (error == -DLM_EUNLOCK)
4601                 error = 0;
4602         /* from validate_unlock_args() */
4603         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4604                 error = 0;
4605         if (error)
4606                 goto out_put;
4607
4608         spin_lock(&ua->proc->locks_spin);
4609         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4610         if (!list_empty(&lkb->lkb_ownqueue))
4611                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4612         spin_unlock(&ua->proc->locks_spin);
4613  out_put:
4614         dlm_put_lkb(lkb);
4615  out:
4616         dlm_unlock_recovery(ls);
4617         kfree(ua_tmp);
4618         return error;
4619 }
4620
4621 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4622                     uint32_t flags, uint32_t lkid)
4623 {
4624         struct dlm_lkb *lkb;
4625         struct dlm_args args;
4626         struct dlm_user_args *ua;
4627         int error;
4628
4629         dlm_lock_recovery(ls);
4630
4631         error = find_lkb(ls, lkid, &lkb);
4632         if (error)
4633                 goto out;
4634
4635         ua = lkb->lkb_ua;
4636         if (ua_tmp->castparam)
4637                 ua->castparam = ua_tmp->castparam;
4638         ua->user_lksb = ua_tmp->user_lksb;
4639
4640         error = set_unlock_args(flags, ua, &args);
4641         if (error)
4642                 goto out_put;
4643
4644         error = cancel_lock(ls, lkb, &args);
4645
4646         if (error == -DLM_ECANCEL)
4647                 error = 0;
4648         /* from validate_unlock_args() */
4649         if (error == -EBUSY)
4650                 error = 0;
4651  out_put:
4652         dlm_put_lkb(lkb);
4653  out:
4654         dlm_unlock_recovery(ls);
4655         kfree(ua_tmp);
4656         return error;
4657 }
4658
4659 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4660 {
4661         struct dlm_lkb *lkb;
4662         struct dlm_args args;
4663         struct dlm_user_args *ua;
4664         struct dlm_rsb *r;
4665         int error;
4666
4667         dlm_lock_recovery(ls);
4668
4669         error = find_lkb(ls, lkid, &lkb);
4670         if (error)
4671                 goto out;
4672
4673         ua = lkb->lkb_ua;
4674
4675         error = set_unlock_args(flags, ua, &args);
4676         if (error)
4677                 goto out_put;
4678
4679         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4680
4681         r = lkb->lkb_resource;
4682         hold_rsb(r);
4683         lock_rsb(r);
4684
4685         error = validate_unlock_args(lkb, &args);
4686         if (error)
4687                 goto out_r;
4688         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4689
4690         error = _cancel_lock(r, lkb);
4691  out_r:
4692         unlock_rsb(r);
4693         put_rsb(r);
4694
4695         if (error == -DLM_ECANCEL)
4696                 error = 0;
4697         /* from validate_unlock_args() */
4698         if (error == -EBUSY)
4699                 error = 0;
4700  out_put:
4701         dlm_put_lkb(lkb);
4702  out:
4703         dlm_unlock_recovery(ls);
4704         return error;
4705 }
4706
4707 /* lkb's that are removed from the waiters list by revert are just left on the
4708    orphans list with the granted orphan locks, to be freed by purge */
4709
4710 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4711 {
4712         struct dlm_args args;
4713         int error;
4714
4715         hold_lkb(lkb);
4716         mutex_lock(&ls->ls_orphans_mutex);
4717         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4718         mutex_unlock(&ls->ls_orphans_mutex);
4719
4720         set_unlock_args(0, lkb->lkb_ua, &args);
4721
4722         error = cancel_lock(ls, lkb, &args);
4723         if (error == -DLM_ECANCEL)
4724                 error = 0;
4725         return error;
4726 }
4727
4728 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4729    Regardless of what rsb queue the lock is on, it's removed and freed. */
4730
4731 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4732 {
4733         struct dlm_args args;
4734         int error;
4735
4736         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4737
4738         error = unlock_lock(ls, lkb, &args);
4739         if (error == -DLM_EUNLOCK)
4740                 error = 0;
4741         return error;
4742 }
4743
4744 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4745    (which does lock_rsb) due to deadlock with receiving a message that does
4746    lock_rsb followed by dlm_user_add_ast() */
4747
4748 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4749                                      struct dlm_user_proc *proc)
4750 {
4751         struct dlm_lkb *lkb = NULL;
4752
4753         mutex_lock(&ls->ls_clear_proc_locks);
4754         if (list_empty(&proc->locks))
4755                 goto out;
4756
4757         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4758         list_del_init(&lkb->lkb_ownqueue);
4759
4760         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4761                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4762         else
4763                 lkb->lkb_flags |= DLM_IFL_DEAD;
4764  out:
4765         mutex_unlock(&ls->ls_clear_proc_locks);
4766         return lkb;
4767 }
4768
4769 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4770    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4771    which we clear here. */
4772
4773 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4774    list, and no more device_writes should add lkb's to proc->locks list; so we
4775    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4776    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4777    them ourself. */
4778
4779 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4780 {
4781         struct dlm_lkb *lkb, *safe;
4782
4783         dlm_lock_recovery(ls);
4784
4785         while (1) {
4786                 lkb = del_proc_lock(ls, proc);
4787                 if (!lkb)
4788                         break;
4789                 del_timeout(lkb);
4790                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4791                         orphan_proc_lock(ls, lkb);
4792                 else
4793                         unlock_proc_lock(ls, lkb);
4794
4795                 /* this removes the reference for the proc->locks list
4796                    added by dlm_user_request, it may result in the lkb
4797                    being freed */
4798
4799                 dlm_put_lkb(lkb);
4800         }
4801
4802         mutex_lock(&ls->ls_clear_proc_locks);
4803
4804         /* in-progress unlocks */
4805         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4806                 list_del_init(&lkb->lkb_ownqueue);
4807                 lkb->lkb_flags |= DLM_IFL_DEAD;
4808                 dlm_put_lkb(lkb);
4809         }
4810
4811         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4812                 lkb->lkb_ast_type = 0;
4813                 list_del(&lkb->lkb_astqueue);
4814                 dlm_put_lkb(lkb);
4815         }
4816
4817         mutex_unlock(&ls->ls_clear_proc_locks);
4818         dlm_unlock_recovery(ls);
4819 }
4820
4821 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4822 {
4823         struct dlm_lkb *lkb, *safe;
4824
4825         while (1) {
4826                 lkb = NULL;
4827                 spin_lock(&proc->locks_spin);
4828                 if (!list_empty(&proc->locks)) {
4829                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4830                                          lkb_ownqueue);
4831                         list_del_init(&lkb->lkb_ownqueue);
4832                 }
4833                 spin_unlock(&proc->locks_spin);
4834
4835                 if (!lkb)
4836                         break;
4837
4838                 lkb->lkb_flags |= DLM_IFL_DEAD;
4839                 unlock_proc_lock(ls, lkb);
4840                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4841         }
4842
4843         spin_lock(&proc->locks_spin);
4844         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4845                 list_del_init(&lkb->lkb_ownqueue);
4846                 lkb->lkb_flags |= DLM_IFL_DEAD;
4847                 dlm_put_lkb(lkb);
4848         }
4849         spin_unlock(&proc->locks_spin);
4850
4851         spin_lock(&proc->asts_spin);
4852         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4853                 list_del(&lkb->lkb_astqueue);
4854                 dlm_put_lkb(lkb);
4855         }
4856         spin_unlock(&proc->asts_spin);
4857 }
4858
4859 /* pid of 0 means purge all orphans */
4860
4861 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4862 {
4863         struct dlm_lkb *lkb, *safe;
4864
4865         mutex_lock(&ls->ls_orphans_mutex);
4866         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4867                 if (pid && lkb->lkb_ownpid != pid)
4868                         continue;
4869                 unlock_proc_lock(ls, lkb);
4870                 list_del_init(&lkb->lkb_ownqueue);
4871                 dlm_put_lkb(lkb);
4872         }
4873         mutex_unlock(&ls->ls_orphans_mutex);
4874 }
4875
4876 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4877 {
4878         struct dlm_message *ms;
4879         struct dlm_mhandle *mh;
4880         int error;
4881
4882         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4883                                 DLM_MSG_PURGE, &ms, &mh);
4884         if (error)
4885                 return error;
4886         ms->m_nodeid = nodeid;
4887         ms->m_pid = pid;
4888
4889         return send_message(mh, ms);
4890 }
4891
4892 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4893                    int nodeid, int pid)
4894 {
4895         int error = 0;
4896
4897         if (nodeid != dlm_our_nodeid()) {
4898                 error = send_purge(ls, nodeid, pid);
4899         } else {
4900                 dlm_lock_recovery(ls);
4901                 if (pid == current->pid)
4902                         purge_proc_locks(ls, proc);
4903                 else
4904                         do_purge(ls, nodeid, pid);
4905                 dlm_unlock_recovery(ls);
4906         }
4907         return error;
4908 }
4909