hugetlbfs: use lib/parser, fix docs
[linux-2.6] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87                                     struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91 void dlm_timeout_warn(struct dlm_lkb *lkb);
92
93 /*
94  * Lock compatibilty matrix - thanks Steve
95  * UN = Unlocked state. Not really a state, used as a flag
96  * PD = Padding. Used to make the matrix a nice power of two in size
97  * Other states are the same as the VMS DLM.
98  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
99  */
100
101 static const int __dlm_compat_matrix[8][8] = {
102       /* UN NL CR CW PR PW EX PD */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
105         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
106         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
107         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
108         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
109         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
110         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
111 };
112
113 /*
114  * This defines the direction of transfer of LVB data.
115  * Granted mode is the row; requested mode is the column.
116  * Usage: matrix[grmode+1][rqmode+1]
117  * 1 = LVB is returned to the caller
118  * 0 = LVB is written to the resource
119  * -1 = nothing happens to the LVB
120  */
121
122 const int dlm_lvb_operations[8][8] = {
123         /* UN   NL  CR  CW  PR  PW  EX  PD*/
124         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
125         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
126         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
127         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
128         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
129         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
132 };
133
134 #define modes_compat(gr, rq) \
135         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137 int dlm_modes_compat(int mode1, int mode2)
138 {
139         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140 }
141
142 /*
143  * Compatibility matrix for conversions with QUECVT set.
144  * Granted mode is the row; requested mode is the column.
145  * Usage: matrix[grmode+1][rqmode+1]
146  */
147
148 static const int __quecvt_compat_matrix[8][8] = {
149       /* UN NL CR CW PR PW EX PD */
150         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
151         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
152         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
153         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
154         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
155         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
156         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
157         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
158 };
159
160 void dlm_print_lkb(struct dlm_lkb *lkb)
161 {
162         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
164                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
167 }
168
169 void dlm_print_rsb(struct dlm_rsb *r)
170 {
171         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172                r->res_nodeid, r->res_flags, r->res_first_lkid,
173                r->res_recover_locks_count, r->res_name);
174 }
175
176 void dlm_dump_rsb(struct dlm_rsb *r)
177 {
178         struct dlm_lkb *lkb;
179
180         dlm_print_rsb(r);
181
182         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184         printk(KERN_ERR "rsb lookup list\n");
185         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186                 dlm_print_lkb(lkb);
187         printk(KERN_ERR "rsb grant queue:\n");
188         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189                 dlm_print_lkb(lkb);
190         printk(KERN_ERR "rsb convert queue:\n");
191         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192                 dlm_print_lkb(lkb);
193         printk(KERN_ERR "rsb wait queue:\n");
194         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195                 dlm_print_lkb(lkb);
196 }
197
198 /* Threads cannot use the lockspace while it's being recovered */
199
200 static inline void dlm_lock_recovery(struct dlm_ls *ls)
201 {
202         down_read(&ls->ls_in_recovery);
203 }
204
205 void dlm_unlock_recovery(struct dlm_ls *ls)
206 {
207         up_read(&ls->ls_in_recovery);
208 }
209
210 int dlm_lock_recovery_try(struct dlm_ls *ls)
211 {
212         return down_read_trylock(&ls->ls_in_recovery);
213 }
214
215 static inline int can_be_queued(struct dlm_lkb *lkb)
216 {
217         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218 }
219
220 static inline int force_blocking_asts(struct dlm_lkb *lkb)
221 {
222         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223 }
224
225 static inline int is_demoted(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228 }
229
230 static inline int is_altmode(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233 }
234
235 static inline int is_granted(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238 }
239
240 static inline int is_remote(struct dlm_rsb *r)
241 {
242         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243         return !!r->res_nodeid;
244 }
245
246 static inline int is_process_copy(struct dlm_lkb *lkb)
247 {
248         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249 }
250
251 static inline int is_master_copy(struct dlm_lkb *lkb)
252 {
253         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
255         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
256 }
257
258 static inline int middle_conversion(struct dlm_lkb *lkb)
259 {
260         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
262                 return 1;
263         return 0;
264 }
265
266 static inline int down_conversion(struct dlm_lkb *lkb)
267 {
268         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269 }
270
271 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272 {
273         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274 }
275
276 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277 {
278         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279 }
280
281 static inline int is_overlap(struct dlm_lkb *lkb)
282 {
283         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284                                   DLM_IFL_OVERLAP_CANCEL));
285 }
286
287 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288 {
289         if (is_master_copy(lkb))
290                 return;
291
292         del_timeout(lkb);
293
294         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295
296         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297            timeout caused the cancel then return -ETIMEDOUT */
298         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300                 rv = -ETIMEDOUT;
301         }
302
303         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305                 rv = -EDEADLK;
306         }
307
308         lkb->lkb_lksb->sb_status = rv;
309         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
310
311         dlm_add_ast(lkb, AST_COMP);
312 }
313
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316         queue_cast(r, lkb,
317                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322         if (is_master_copy(lkb))
323                 send_bast(r, lkb, rqmode);
324         else {
325                 lkb->lkb_bastmode = rqmode;
326                 dlm_add_ast(lkb, AST_BAST);
327         }
328 }
329
330 /*
331  * Basic operations on rsb's and lkb's
332  */
333
334 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
335 {
336         struct dlm_rsb *r;
337
338         r = allocate_rsb(ls, len);
339         if (!r)
340                 return NULL;
341
342         r->res_ls = ls;
343         r->res_length = len;
344         memcpy(r->res_name, name, len);
345         mutex_init(&r->res_mutex);
346
347         INIT_LIST_HEAD(&r->res_lookup);
348         INIT_LIST_HEAD(&r->res_grantqueue);
349         INIT_LIST_HEAD(&r->res_convertqueue);
350         INIT_LIST_HEAD(&r->res_waitqueue);
351         INIT_LIST_HEAD(&r->res_root_list);
352         INIT_LIST_HEAD(&r->res_recover_list);
353
354         return r;
355 }
356
357 static int search_rsb_list(struct list_head *head, char *name, int len,
358                            unsigned int flags, struct dlm_rsb **r_ret)
359 {
360         struct dlm_rsb *r;
361         int error = 0;
362
363         list_for_each_entry(r, head, res_hashchain) {
364                 if (len == r->res_length && !memcmp(name, r->res_name, len))
365                         goto found;
366         }
367         return -EBADR;
368
369  found:
370         if (r->res_nodeid && (flags & R_MASTER))
371                 error = -ENOTBLK;
372         *r_ret = r;
373         return error;
374 }
375
376 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
377                        unsigned int flags, struct dlm_rsb **r_ret)
378 {
379         struct dlm_rsb *r;
380         int error;
381
382         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
383         if (!error) {
384                 kref_get(&r->res_ref);
385                 goto out;
386         }
387         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
388         if (error)
389                 goto out;
390
391         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
392
393         if (dlm_no_directory(ls))
394                 goto out;
395
396         if (r->res_nodeid == -1) {
397                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
398                 r->res_first_lkid = 0;
399         } else if (r->res_nodeid > 0) {
400                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
401                 r->res_first_lkid = 0;
402         } else {
403                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
404                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
405         }
406  out:
407         *r_ret = r;
408         return error;
409 }
410
411 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
412                       unsigned int flags, struct dlm_rsb **r_ret)
413 {
414         int error;
415         write_lock(&ls->ls_rsbtbl[b].lock);
416         error = _search_rsb(ls, name, len, b, flags, r_ret);
417         write_unlock(&ls->ls_rsbtbl[b].lock);
418         return error;
419 }
420
421 /*
422  * Find rsb in rsbtbl and potentially create/add one
423  *
424  * Delaying the release of rsb's has a similar benefit to applications keeping
425  * NL locks on an rsb, but without the guarantee that the cached master value
426  * will still be valid when the rsb is reused.  Apps aren't always smart enough
427  * to keep NL locks on an rsb that they may lock again shortly; this can lead
428  * to excessive master lookups and removals if we don't delay the release.
429  *
430  * Searching for an rsb means looking through both the normal list and toss
431  * list.  When found on the toss list the rsb is moved to the normal list with
432  * ref count of 1; when found on normal list the ref count is incremented.
433  */
434
435 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
436                     unsigned int flags, struct dlm_rsb **r_ret)
437 {
438         struct dlm_rsb *r, *tmp;
439         uint32_t hash, bucket;
440         int error = 0;
441
442         if (dlm_no_directory(ls))
443                 flags |= R_CREATE;
444
445         hash = jhash(name, namelen, 0);
446         bucket = hash & (ls->ls_rsbtbl_size - 1);
447
448         error = search_rsb(ls, name, namelen, bucket, flags, &r);
449         if (!error)
450                 goto out;
451
452         if (error == -EBADR && !(flags & R_CREATE))
453                 goto out;
454
455         /* the rsb was found but wasn't a master copy */
456         if (error == -ENOTBLK)
457                 goto out;
458
459         error = -ENOMEM;
460         r = create_rsb(ls, name, namelen);
461         if (!r)
462                 goto out;
463
464         r->res_hash = hash;
465         r->res_bucket = bucket;
466         r->res_nodeid = -1;
467         kref_init(&r->res_ref);
468
469         /* With no directory, the master can be set immediately */
470         if (dlm_no_directory(ls)) {
471                 int nodeid = dlm_dir_nodeid(r);
472                 if (nodeid == dlm_our_nodeid())
473                         nodeid = 0;
474                 r->res_nodeid = nodeid;
475         }
476
477         write_lock(&ls->ls_rsbtbl[bucket].lock);
478         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
479         if (!error) {
480                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
481                 free_rsb(r);
482                 r = tmp;
483                 goto out;
484         }
485         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
486         write_unlock(&ls->ls_rsbtbl[bucket].lock);
487         error = 0;
488  out:
489         *r_ret = r;
490         return error;
491 }
492
493 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
494                  unsigned int flags, struct dlm_rsb **r_ret)
495 {
496         return find_rsb(ls, name, namelen, flags, r_ret);
497 }
498
499 /* This is only called to add a reference when the code already holds
500    a valid reference to the rsb, so there's no need for locking. */
501
502 static inline void hold_rsb(struct dlm_rsb *r)
503 {
504         kref_get(&r->res_ref);
505 }
506
507 void dlm_hold_rsb(struct dlm_rsb *r)
508 {
509         hold_rsb(r);
510 }
511
512 static void toss_rsb(struct kref *kref)
513 {
514         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
515         struct dlm_ls *ls = r->res_ls;
516
517         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
518         kref_init(&r->res_ref);
519         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
520         r->res_toss_time = jiffies;
521         if (r->res_lvbptr) {
522                 free_lvb(r->res_lvbptr);
523                 r->res_lvbptr = NULL;
524         }
525 }
526
527 /* When all references to the rsb are gone it's transfered to
528    the tossed list for later disposal. */
529
530 static void put_rsb(struct dlm_rsb *r)
531 {
532         struct dlm_ls *ls = r->res_ls;
533         uint32_t bucket = r->res_bucket;
534
535         write_lock(&ls->ls_rsbtbl[bucket].lock);
536         kref_put(&r->res_ref, toss_rsb);
537         write_unlock(&ls->ls_rsbtbl[bucket].lock);
538 }
539
540 void dlm_put_rsb(struct dlm_rsb *r)
541 {
542         put_rsb(r);
543 }
544
545 /* See comment for unhold_lkb */
546
547 static void unhold_rsb(struct dlm_rsb *r)
548 {
549         int rv;
550         rv = kref_put(&r->res_ref, toss_rsb);
551         DLM_ASSERT(!rv, dlm_dump_rsb(r););
552 }
553
554 static void kill_rsb(struct kref *kref)
555 {
556         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
557
558         /* All work is done after the return from kref_put() so we
559            can release the write_lock before the remove and free. */
560
561         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
562         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
563         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
564         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
565         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
566         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
567 }
568
569 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
570    The rsb must exist as long as any lkb's for it do. */
571
572 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
573 {
574         hold_rsb(r);
575         lkb->lkb_resource = r;
576 }
577
578 static void detach_lkb(struct dlm_lkb *lkb)
579 {
580         if (lkb->lkb_resource) {
581                 put_rsb(lkb->lkb_resource);
582                 lkb->lkb_resource = NULL;
583         }
584 }
585
586 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
587 {
588         struct dlm_lkb *lkb, *tmp;
589         uint32_t lkid = 0;
590         uint16_t bucket;
591
592         lkb = allocate_lkb(ls);
593         if (!lkb)
594                 return -ENOMEM;
595
596         lkb->lkb_nodeid = -1;
597         lkb->lkb_grmode = DLM_LOCK_IV;
598         kref_init(&lkb->lkb_ref);
599         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
600         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
601         INIT_LIST_HEAD(&lkb->lkb_time_list);
602
603         get_random_bytes(&bucket, sizeof(bucket));
604         bucket &= (ls->ls_lkbtbl_size - 1);
605
606         write_lock(&ls->ls_lkbtbl[bucket].lock);
607
608         /* counter can roll over so we must verify lkid is not in use */
609
610         while (lkid == 0) {
611                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
612
613                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
614                                     lkb_idtbl_list) {
615                         if (tmp->lkb_id != lkid)
616                                 continue;
617                         lkid = 0;
618                         break;
619                 }
620         }
621
622         lkb->lkb_id = lkid;
623         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
624         write_unlock(&ls->ls_lkbtbl[bucket].lock);
625
626         *lkb_ret = lkb;
627         return 0;
628 }
629
630 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
631 {
632         struct dlm_lkb *lkb;
633         uint16_t bucket = (lkid >> 16);
634
635         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
636                 if (lkb->lkb_id == lkid)
637                         return lkb;
638         }
639         return NULL;
640 }
641
642 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
643 {
644         struct dlm_lkb *lkb;
645         uint16_t bucket = (lkid >> 16);
646
647         if (bucket >= ls->ls_lkbtbl_size)
648                 return -EBADSLT;
649
650         read_lock(&ls->ls_lkbtbl[bucket].lock);
651         lkb = __find_lkb(ls, lkid);
652         if (lkb)
653                 kref_get(&lkb->lkb_ref);
654         read_unlock(&ls->ls_lkbtbl[bucket].lock);
655
656         *lkb_ret = lkb;
657         return lkb ? 0 : -ENOENT;
658 }
659
660 static void kill_lkb(struct kref *kref)
661 {
662         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
663
664         /* All work is done after the return from kref_put() so we
665            can release the write_lock before the detach_lkb */
666
667         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
668 }
669
670 /* __put_lkb() is used when an lkb may not have an rsb attached to
671    it so we need to provide the lockspace explicitly */
672
673 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
674 {
675         uint16_t bucket = (lkb->lkb_id >> 16);
676
677         write_lock(&ls->ls_lkbtbl[bucket].lock);
678         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
679                 list_del(&lkb->lkb_idtbl_list);
680                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
681
682                 detach_lkb(lkb);
683
684                 /* for local/process lkbs, lvbptr points to caller's lksb */
685                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
686                         free_lvb(lkb->lkb_lvbptr);
687                 free_lkb(lkb);
688                 return 1;
689         } else {
690                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
691                 return 0;
692         }
693 }
694
695 int dlm_put_lkb(struct dlm_lkb *lkb)
696 {
697         struct dlm_ls *ls;
698
699         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
700         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
701
702         ls = lkb->lkb_resource->res_ls;
703         return __put_lkb(ls, lkb);
704 }
705
706 /* This is only called to add a reference when the code already holds
707    a valid reference to the lkb, so there's no need for locking. */
708
709 static inline void hold_lkb(struct dlm_lkb *lkb)
710 {
711         kref_get(&lkb->lkb_ref);
712 }
713
714 /* This is called when we need to remove a reference and are certain
715    it's not the last ref.  e.g. del_lkb is always called between a
716    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
717    put_lkb would work fine, but would involve unnecessary locking */
718
719 static inline void unhold_lkb(struct dlm_lkb *lkb)
720 {
721         int rv;
722         rv = kref_put(&lkb->lkb_ref, kill_lkb);
723         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
724 }
725
726 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
727                             int mode)
728 {
729         struct dlm_lkb *lkb = NULL;
730
731         list_for_each_entry(lkb, head, lkb_statequeue)
732                 if (lkb->lkb_rqmode < mode)
733                         break;
734
735         if (!lkb)
736                 list_add_tail(new, head);
737         else
738                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
739 }
740
741 /* add/remove lkb to rsb's grant/convert/wait queue */
742
743 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
744 {
745         kref_get(&lkb->lkb_ref);
746
747         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
748
749         lkb->lkb_status = status;
750
751         switch (status) {
752         case DLM_LKSTS_WAITING:
753                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
754                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
755                 else
756                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
757                 break;
758         case DLM_LKSTS_GRANTED:
759                 /* convention says granted locks kept in order of grmode */
760                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
761                                 lkb->lkb_grmode);
762                 break;
763         case DLM_LKSTS_CONVERT:
764                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
765                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
766                 else
767                         list_add_tail(&lkb->lkb_statequeue,
768                                       &r->res_convertqueue);
769                 break;
770         default:
771                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
772         }
773 }
774
775 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
776 {
777         lkb->lkb_status = 0;
778         list_del(&lkb->lkb_statequeue);
779         unhold_lkb(lkb);
780 }
781
782 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
783 {
784         hold_lkb(lkb);
785         del_lkb(r, lkb);
786         add_lkb(r, lkb, sts);
787         unhold_lkb(lkb);
788 }
789
790 static int msg_reply_type(int mstype)
791 {
792         switch (mstype) {
793         case DLM_MSG_REQUEST:
794                 return DLM_MSG_REQUEST_REPLY;
795         case DLM_MSG_CONVERT:
796                 return DLM_MSG_CONVERT_REPLY;
797         case DLM_MSG_UNLOCK:
798                 return DLM_MSG_UNLOCK_REPLY;
799         case DLM_MSG_CANCEL:
800                 return DLM_MSG_CANCEL_REPLY;
801         case DLM_MSG_LOOKUP:
802                 return DLM_MSG_LOOKUP_REPLY;
803         }
804         return -1;
805 }
806
807 /* add/remove lkb from global waiters list of lkb's waiting for
808    a reply from a remote node */
809
810 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
811 {
812         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
813         int error = 0;
814
815         mutex_lock(&ls->ls_waiters_mutex);
816
817         if (is_overlap_unlock(lkb) ||
818             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
819                 error = -EINVAL;
820                 goto out;
821         }
822
823         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
824                 switch (mstype) {
825                 case DLM_MSG_UNLOCK:
826                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
827                         break;
828                 case DLM_MSG_CANCEL:
829                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
830                         break;
831                 default:
832                         error = -EBUSY;
833                         goto out;
834                 }
835                 lkb->lkb_wait_count++;
836                 hold_lkb(lkb);
837
838                 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
839                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
840                           lkb->lkb_wait_count, lkb->lkb_flags);
841                 goto out;
842         }
843
844         DLM_ASSERT(!lkb->lkb_wait_count,
845                    dlm_print_lkb(lkb);
846                    printk("wait_count %d\n", lkb->lkb_wait_count););
847
848         lkb->lkb_wait_count++;
849         lkb->lkb_wait_type = mstype;
850         hold_lkb(lkb);
851         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
852  out:
853         if (error)
854                 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
855                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
856                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
857         mutex_unlock(&ls->ls_waiters_mutex);
858         return error;
859 }
860
861 /* We clear the RESEND flag because we might be taking an lkb off the waiters
862    list as part of process_requestqueue (e.g. a lookup that has an optimized
863    request reply on the requestqueue) between dlm_recover_waiters_pre() which
864    set RESEND and dlm_recover_waiters_post() */
865
866 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
867 {
868         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
869         int overlap_done = 0;
870
871         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
872                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
873                 overlap_done = 1;
874                 goto out_del;
875         }
876
877         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
878                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
879                 overlap_done = 1;
880                 goto out_del;
881         }
882
883         /* N.B. type of reply may not always correspond to type of original
884            msg due to lookup->request optimization, verify others? */
885
886         if (lkb->lkb_wait_type) {
887                 lkb->lkb_wait_type = 0;
888                 goto out_del;
889         }
890
891         log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
892                   lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
893         return -1;
894
895  out_del:
896         /* the force-unlock/cancel has completed and we haven't recvd a reply
897            to the op that was in progress prior to the unlock/cancel; we
898            give up on any reply to the earlier op.  FIXME: not sure when/how
899            this would happen */
900
901         if (overlap_done && lkb->lkb_wait_type) {
902                 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
903                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
904                 lkb->lkb_wait_count--;
905                 lkb->lkb_wait_type = 0;
906         }
907
908         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
909
910         lkb->lkb_flags &= ~DLM_IFL_RESEND;
911         lkb->lkb_wait_count--;
912         if (!lkb->lkb_wait_count)
913                 list_del_init(&lkb->lkb_wait_reply);
914         unhold_lkb(lkb);
915         return 0;
916 }
917
918 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
919 {
920         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
921         int error;
922
923         mutex_lock(&ls->ls_waiters_mutex);
924         error = _remove_from_waiters(lkb, mstype);
925         mutex_unlock(&ls->ls_waiters_mutex);
926         return error;
927 }
928
929 /* Handles situations where we might be processing a "fake" or "stub" reply in
930    which we can't try to take waiters_mutex again. */
931
932 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
933 {
934         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
935         int error;
936
937         if (ms != &ls->ls_stub_ms)
938                 mutex_lock(&ls->ls_waiters_mutex);
939         error = _remove_from_waiters(lkb, ms->m_type);
940         if (ms != &ls->ls_stub_ms)
941                 mutex_unlock(&ls->ls_waiters_mutex);
942         return error;
943 }
944
945 static void dir_remove(struct dlm_rsb *r)
946 {
947         int to_nodeid;
948
949         if (dlm_no_directory(r->res_ls))
950                 return;
951
952         to_nodeid = dlm_dir_nodeid(r);
953         if (to_nodeid != dlm_our_nodeid())
954                 send_remove(r);
955         else
956                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
957                                      r->res_name, r->res_length);
958 }
959
960 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
961    found since they are in order of newest to oldest? */
962
963 static int shrink_bucket(struct dlm_ls *ls, int b)
964 {
965         struct dlm_rsb *r;
966         int count = 0, found;
967
968         for (;;) {
969                 found = 0;
970                 write_lock(&ls->ls_rsbtbl[b].lock);
971                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
972                                             res_hashchain) {
973                         if (!time_after_eq(jiffies, r->res_toss_time +
974                                            dlm_config.ci_toss_secs * HZ))
975                                 continue;
976                         found = 1;
977                         break;
978                 }
979
980                 if (!found) {
981                         write_unlock(&ls->ls_rsbtbl[b].lock);
982                         break;
983                 }
984
985                 if (kref_put(&r->res_ref, kill_rsb)) {
986                         list_del(&r->res_hashchain);
987                         write_unlock(&ls->ls_rsbtbl[b].lock);
988
989                         if (is_master(r))
990                                 dir_remove(r);
991                         free_rsb(r);
992                         count++;
993                 } else {
994                         write_unlock(&ls->ls_rsbtbl[b].lock);
995                         log_error(ls, "tossed rsb in use %s", r->res_name);
996                 }
997         }
998
999         return count;
1000 }
1001
1002 void dlm_scan_rsbs(struct dlm_ls *ls)
1003 {
1004         int i;
1005
1006         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1007                 shrink_bucket(ls, i);
1008                 if (dlm_locking_stopped(ls))
1009                         break;
1010                 cond_resched();
1011         }
1012 }
1013
1014 static void add_timeout(struct dlm_lkb *lkb)
1015 {
1016         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1017
1018         if (is_master_copy(lkb)) {
1019                 lkb->lkb_timestamp = jiffies;
1020                 return;
1021         }
1022
1023         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1024             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1025                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1026                 goto add_it;
1027         }
1028         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1029                 goto add_it;
1030         return;
1031
1032  add_it:
1033         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1034         mutex_lock(&ls->ls_timeout_mutex);
1035         hold_lkb(lkb);
1036         lkb->lkb_timestamp = jiffies;
1037         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1038         mutex_unlock(&ls->ls_timeout_mutex);
1039 }
1040
1041 static void del_timeout(struct dlm_lkb *lkb)
1042 {
1043         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1044
1045         mutex_lock(&ls->ls_timeout_mutex);
1046         if (!list_empty(&lkb->lkb_time_list)) {
1047                 list_del_init(&lkb->lkb_time_list);
1048                 unhold_lkb(lkb);
1049         }
1050         mutex_unlock(&ls->ls_timeout_mutex);
1051 }
1052
1053 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1054    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1055    and then lock rsb because of lock ordering in add_timeout.  We may need
1056    to specify some special timeout-related bits in the lkb that are just to
1057    be accessed under the timeout_mutex. */
1058
1059 void dlm_scan_timeout(struct dlm_ls *ls)
1060 {
1061         struct dlm_rsb *r;
1062         struct dlm_lkb *lkb;
1063         int do_cancel, do_warn;
1064
1065         for (;;) {
1066                 if (dlm_locking_stopped(ls))
1067                         break;
1068
1069                 do_cancel = 0;
1070                 do_warn = 0;
1071                 mutex_lock(&ls->ls_timeout_mutex);
1072                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1073
1074                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1075                             time_after_eq(jiffies, lkb->lkb_timestamp +
1076                                           lkb->lkb_timeout_cs * HZ/100))
1077                                 do_cancel = 1;
1078
1079                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1080                             time_after_eq(jiffies, lkb->lkb_timestamp +
1081                                            dlm_config.ci_timewarn_cs * HZ/100))
1082                                 do_warn = 1;
1083
1084                         if (!do_cancel && !do_warn)
1085                                 continue;
1086                         hold_lkb(lkb);
1087                         break;
1088                 }
1089                 mutex_unlock(&ls->ls_timeout_mutex);
1090
1091                 if (!do_cancel && !do_warn)
1092                         break;
1093
1094                 r = lkb->lkb_resource;
1095                 hold_rsb(r);
1096                 lock_rsb(r);
1097
1098                 if (do_warn) {
1099                         /* clear flag so we only warn once */
1100                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1101                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1102                                 del_timeout(lkb);
1103                         dlm_timeout_warn(lkb);
1104                 }
1105
1106                 if (do_cancel) {
1107                         log_debug(ls, "timeout cancel %x node %d %s",
1108                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1109                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1110                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1111                         del_timeout(lkb);
1112                         _cancel_lock(r, lkb);
1113                 }
1114
1115                 unlock_rsb(r);
1116                 unhold_rsb(r);
1117                 dlm_put_lkb(lkb);
1118         }
1119 }
1120
1121 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1122    dlm_recoverd before checking/setting ls_recover_begin. */
1123
1124 void dlm_adjust_timeouts(struct dlm_ls *ls)
1125 {
1126         struct dlm_lkb *lkb;
1127         long adj = jiffies - ls->ls_recover_begin;
1128
1129         ls->ls_recover_begin = 0;
1130         mutex_lock(&ls->ls_timeout_mutex);
1131         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1132                 lkb->lkb_timestamp += adj;
1133         mutex_unlock(&ls->ls_timeout_mutex);
1134 }
1135
1136 /* lkb is master or local copy */
1137
1138 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1139 {
1140         int b, len = r->res_ls->ls_lvblen;
1141
1142         /* b=1 lvb returned to caller
1143            b=0 lvb written to rsb or invalidated
1144            b=-1 do nothing */
1145
1146         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1147
1148         if (b == 1) {
1149                 if (!lkb->lkb_lvbptr)
1150                         return;
1151
1152                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1153                         return;
1154
1155                 if (!r->res_lvbptr)
1156                         return;
1157
1158                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1159                 lkb->lkb_lvbseq = r->res_lvbseq;
1160
1161         } else if (b == 0) {
1162                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1163                         rsb_set_flag(r, RSB_VALNOTVALID);
1164                         return;
1165                 }
1166
1167                 if (!lkb->lkb_lvbptr)
1168                         return;
1169
1170                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1171                         return;
1172
1173                 if (!r->res_lvbptr)
1174                         r->res_lvbptr = allocate_lvb(r->res_ls);
1175
1176                 if (!r->res_lvbptr)
1177                         return;
1178
1179                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1180                 r->res_lvbseq++;
1181                 lkb->lkb_lvbseq = r->res_lvbseq;
1182                 rsb_clear_flag(r, RSB_VALNOTVALID);
1183         }
1184
1185         if (rsb_flag(r, RSB_VALNOTVALID))
1186                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1187 }
1188
1189 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1190 {
1191         if (lkb->lkb_grmode < DLM_LOCK_PW)
1192                 return;
1193
1194         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1195                 rsb_set_flag(r, RSB_VALNOTVALID);
1196                 return;
1197         }
1198
1199         if (!lkb->lkb_lvbptr)
1200                 return;
1201
1202         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1203                 return;
1204
1205         if (!r->res_lvbptr)
1206                 r->res_lvbptr = allocate_lvb(r->res_ls);
1207
1208         if (!r->res_lvbptr)
1209                 return;
1210
1211         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1212         r->res_lvbseq++;
1213         rsb_clear_flag(r, RSB_VALNOTVALID);
1214 }
1215
1216 /* lkb is process copy (pc) */
1217
1218 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1219                             struct dlm_message *ms)
1220 {
1221         int b;
1222
1223         if (!lkb->lkb_lvbptr)
1224                 return;
1225
1226         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1227                 return;
1228
1229         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1230         if (b == 1) {
1231                 int len = receive_extralen(ms);
1232                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1233                 lkb->lkb_lvbseq = ms->m_lvbseq;
1234         }
1235 }
1236
1237 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1238    remove_lock -- used for unlock, removes lkb from granted
1239    revert_lock -- used for cancel, moves lkb from convert to granted
1240    grant_lock  -- used for request and convert, adds lkb to granted or
1241                   moves lkb from convert or waiting to granted
1242
1243    Each of these is used for master or local copy lkb's.  There is
1244    also a _pc() variation used to make the corresponding change on
1245    a process copy (pc) lkb. */
1246
1247 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1248 {
1249         del_lkb(r, lkb);
1250         lkb->lkb_grmode = DLM_LOCK_IV;
1251         /* this unhold undoes the original ref from create_lkb()
1252            so this leads to the lkb being freed */
1253         unhold_lkb(lkb);
1254 }
1255
1256 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1257 {
1258         set_lvb_unlock(r, lkb);
1259         _remove_lock(r, lkb);
1260 }
1261
1262 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1263 {
1264         _remove_lock(r, lkb);
1265 }
1266
1267 /* returns: 0 did nothing
1268             1 moved lock to granted
1269            -1 removed lock */
1270
1271 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1272 {
1273         int rv = 0;
1274
1275         lkb->lkb_rqmode = DLM_LOCK_IV;
1276
1277         switch (lkb->lkb_status) {
1278         case DLM_LKSTS_GRANTED:
1279                 break;
1280         case DLM_LKSTS_CONVERT:
1281                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1282                 rv = 1;
1283                 break;
1284         case DLM_LKSTS_WAITING:
1285                 del_lkb(r, lkb);
1286                 lkb->lkb_grmode = DLM_LOCK_IV;
1287                 /* this unhold undoes the original ref from create_lkb()
1288                    so this leads to the lkb being freed */
1289                 unhold_lkb(lkb);
1290                 rv = -1;
1291                 break;
1292         default:
1293                 log_print("invalid status for revert %d", lkb->lkb_status);
1294         }
1295         return rv;
1296 }
1297
1298 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1299 {
1300         return revert_lock(r, lkb);
1301 }
1302
1303 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1304 {
1305         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1306                 lkb->lkb_grmode = lkb->lkb_rqmode;
1307                 if (lkb->lkb_status)
1308                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1309                 else
1310                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1311         }
1312
1313         lkb->lkb_rqmode = DLM_LOCK_IV;
1314 }
1315
1316 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1317 {
1318         set_lvb_lock(r, lkb);
1319         _grant_lock(r, lkb);
1320         lkb->lkb_highbast = 0;
1321 }
1322
1323 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1324                           struct dlm_message *ms)
1325 {
1326         set_lvb_lock_pc(r, lkb, ms);
1327         _grant_lock(r, lkb);
1328 }
1329
1330 /* called by grant_pending_locks() which means an async grant message must
1331    be sent to the requesting node in addition to granting the lock if the
1332    lkb belongs to a remote node. */
1333
1334 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1335 {
1336         grant_lock(r, lkb);
1337         if (is_master_copy(lkb))
1338                 send_grant(r, lkb);
1339         else
1340                 queue_cast(r, lkb, 0);
1341 }
1342
1343 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1344    change the granted/requested modes.  We're munging things accordingly in
1345    the process copy.
1346    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1347    conversion deadlock
1348    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1349    compatible with other granted locks */
1350
1351 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1352 {
1353         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1354                 log_print("munge_demoted %x invalid reply type %d",
1355                           lkb->lkb_id, ms->m_type);
1356                 return;
1357         }
1358
1359         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1360                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1361                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1362                 return;
1363         }
1364
1365         lkb->lkb_grmode = DLM_LOCK_NL;
1366 }
1367
1368 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1369 {
1370         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1371             ms->m_type != DLM_MSG_GRANT) {
1372                 log_print("munge_altmode %x invalid reply type %d",
1373                           lkb->lkb_id, ms->m_type);
1374                 return;
1375         }
1376
1377         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1378                 lkb->lkb_rqmode = DLM_LOCK_PR;
1379         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1380                 lkb->lkb_rqmode = DLM_LOCK_CW;
1381         else {
1382                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1383                 dlm_print_lkb(lkb);
1384         }
1385 }
1386
1387 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1388 {
1389         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1390                                            lkb_statequeue);
1391         if (lkb->lkb_id == first->lkb_id)
1392                 return 1;
1393
1394         return 0;
1395 }
1396
1397 /* Check if the given lkb conflicts with another lkb on the queue. */
1398
1399 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1400 {
1401         struct dlm_lkb *this;
1402
1403         list_for_each_entry(this, head, lkb_statequeue) {
1404                 if (this == lkb)
1405                         continue;
1406                 if (!modes_compat(this, lkb))
1407                         return 1;
1408         }
1409         return 0;
1410 }
1411
1412 /*
1413  * "A conversion deadlock arises with a pair of lock requests in the converting
1414  * queue for one resource.  The granted mode of each lock blocks the requested
1415  * mode of the other lock."
1416  *
1417  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1418  * convert queue from being granted, then deadlk/demote lkb.
1419  *
1420  * Example:
1421  * Granted Queue: empty
1422  * Convert Queue: NL->EX (first lock)
1423  *                PR->EX (second lock)
1424  *
1425  * The first lock can't be granted because of the granted mode of the second
1426  * lock and the second lock can't be granted because it's not first in the
1427  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1428  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1429  * flag set and return DEMOTED in the lksb flags.
1430  *
1431  * Originally, this function detected conv-deadlk in a more limited scope:
1432  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1433  * - if lkb1 was the first entry in the queue (not just earlier), and was
1434  *   blocked by the granted mode of lkb2, and there was nothing on the
1435  *   granted queue preventing lkb1 from being granted immediately, i.e.
1436  *   lkb2 was the only thing preventing lkb1 from being granted.
1437  *
1438  * That second condition meant we'd only say there was conv-deadlk if
1439  * resolving it (by demotion) would lead to the first lock on the convert
1440  * queue being granted right away.  It allowed conversion deadlocks to exist
1441  * between locks on the convert queue while they couldn't be granted anyway.
1442  *
1443  * Now, we detect and take action on conversion deadlocks immediately when
1444  * they're created, even if they may not be immediately consequential.  If
1445  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1446  * mode that would prevent lkb1's conversion from being granted, we do a
1447  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1448  * I think this means that the lkb_is_ahead condition below should always
1449  * be zero, i.e. there will never be conv-deadlk between two locks that are
1450  * both already on the convert queue.
1451  */
1452
1453 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1454 {
1455         struct dlm_lkb *lkb1;
1456         int lkb_is_ahead = 0;
1457
1458         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1459                 if (lkb1 == lkb2) {
1460                         lkb_is_ahead = 1;
1461                         continue;
1462                 }
1463
1464                 if (!lkb_is_ahead) {
1465                         if (!modes_compat(lkb2, lkb1))
1466                                 return 1;
1467                 } else {
1468                         if (!modes_compat(lkb2, lkb1) &&
1469                             !modes_compat(lkb1, lkb2))
1470                                 return 1;
1471                 }
1472         }
1473         return 0;
1474 }
1475
1476 /*
1477  * Return 1 if the lock can be granted, 0 otherwise.
1478  * Also detect and resolve conversion deadlocks.
1479  *
1480  * lkb is the lock to be granted
1481  *
1482  * now is 1 if the function is being called in the context of the
1483  * immediate request, it is 0 if called later, after the lock has been
1484  * queued.
1485  *
1486  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1487  */
1488
1489 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1490 {
1491         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1492
1493         /*
1494          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1495          * a new request for a NL mode lock being blocked.
1496          *
1497          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1498          * request, then it would be granted.  In essence, the use of this flag
1499          * tells the Lock Manager to expedite theis request by not considering
1500          * what may be in the CONVERTING or WAITING queues...  As of this
1501          * writing, the EXPEDITE flag can be used only with new requests for NL
1502          * mode locks.  This flag is not valid for conversion requests.
1503          *
1504          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1505          * conversion or used with a non-NL requested mode.  We also know an
1506          * EXPEDITE request is always granted immediately, so now must always
1507          * be 1.  The full condition to grant an expedite request: (now &&
1508          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1509          * therefore be shortened to just checking the flag.
1510          */
1511
1512         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1513                 return 1;
1514
1515         /*
1516          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1517          * added to the remaining conditions.
1518          */
1519
1520         if (queue_conflict(&r->res_grantqueue, lkb))
1521                 goto out;
1522
1523         /*
1524          * 6-3: By default, a conversion request is immediately granted if the
1525          * requested mode is compatible with the modes of all other granted
1526          * locks
1527          */
1528
1529         if (queue_conflict(&r->res_convertqueue, lkb))
1530                 goto out;
1531
1532         /*
1533          * 6-5: But the default algorithm for deciding whether to grant or
1534          * queue conversion requests does not by itself guarantee that such
1535          * requests are serviced on a "first come first serve" basis.  This, in
1536          * turn, can lead to a phenomenon known as "indefinate postponement".
1537          *
1538          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1539          * the system service employed to request a lock conversion.  This flag
1540          * forces certain conversion requests to be queued, even if they are
1541          * compatible with the granted modes of other locks on the same
1542          * resource.  Thus, the use of this flag results in conversion requests
1543          * being ordered on a "first come first servce" basis.
1544          *
1545          * DCT: This condition is all about new conversions being able to occur
1546          * "in place" while the lock remains on the granted queue (assuming
1547          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1548          * doesn't _have_ to go onto the convert queue where it's processed in
1549          * order.  The "now" variable is necessary to distinguish converts
1550          * being received and processed for the first time now, because once a
1551          * convert is moved to the conversion queue the condition below applies
1552          * requiring fifo granting.
1553          */
1554
1555         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1556                 return 1;
1557
1558         /*
1559          * The NOORDER flag is set to avoid the standard vms rules on grant
1560          * order.
1561          */
1562
1563         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1564                 return 1;
1565
1566         /*
1567          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1568          * granted until all other conversion requests ahead of it are granted
1569          * and/or canceled.
1570          */
1571
1572         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1573                 return 1;
1574
1575         /*
1576          * 6-4: By default, a new request is immediately granted only if all
1577          * three of the following conditions are satisfied when the request is
1578          * issued:
1579          * - The queue of ungranted conversion requests for the resource is
1580          *   empty.
1581          * - The queue of ungranted new requests for the resource is empty.
1582          * - The mode of the new request is compatible with the most
1583          *   restrictive mode of all granted locks on the resource.
1584          */
1585
1586         if (now && !conv && list_empty(&r->res_convertqueue) &&
1587             list_empty(&r->res_waitqueue))
1588                 return 1;
1589
1590         /*
1591          * 6-4: Once a lock request is in the queue of ungranted new requests,
1592          * it cannot be granted until the queue of ungranted conversion
1593          * requests is empty, all ungranted new requests ahead of it are
1594          * granted and/or canceled, and it is compatible with the granted mode
1595          * of the most restrictive lock granted on the resource.
1596          */
1597
1598         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1599             first_in_list(lkb, &r->res_waitqueue))
1600                 return 1;
1601  out:
1602         return 0;
1603 }
1604
1605 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1606                           int *err)
1607 {
1608         int rv;
1609         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1610         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1611
1612         if (err)
1613                 *err = 0;
1614
1615         rv = _can_be_granted(r, lkb, now);
1616         if (rv)
1617                 goto out;
1618
1619         /*
1620          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1621          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1622          * cancels one of the locks.
1623          */
1624
1625         if (is_convert && can_be_queued(lkb) &&
1626             conversion_deadlock_detect(r, lkb)) {
1627                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1628                         lkb->lkb_grmode = DLM_LOCK_NL;
1629                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1630                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1631                         if (err)
1632                                 *err = -EDEADLK;
1633                         else {
1634                                 log_print("can_be_granted deadlock %x now %d",
1635                                           lkb->lkb_id, now);
1636                                 dlm_dump_rsb(r);
1637                         }
1638                 }
1639                 goto out;
1640         }
1641
1642         /*
1643          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1644          * to grant a request in a mode other than the normal rqmode.  It's a
1645          * simple way to provide a big optimization to applications that can
1646          * use them.
1647          */
1648
1649         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1650                 alt = DLM_LOCK_PR;
1651         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1652                 alt = DLM_LOCK_CW;
1653
1654         if (alt) {
1655                 lkb->lkb_rqmode = alt;
1656                 rv = _can_be_granted(r, lkb, now);
1657                 if (rv)
1658                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1659                 else
1660                         lkb->lkb_rqmode = rqmode;
1661         }
1662  out:
1663         return rv;
1664 }
1665
1666 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1667    for locks pending on the convert list.  Once verified (watch for these
1668    log_prints), we should be able to just call _can_be_granted() and not
1669    bother with the demote/deadlk cases here (and there's no easy way to deal
1670    with a deadlk here, we'd have to generate something like grant_lock with
1671    the deadlk error.) */
1672
1673 /* returns the highest requested mode of all blocked conversions */
1674
1675 static int grant_pending_convert(struct dlm_rsb *r, int high)
1676 {
1677         struct dlm_lkb *lkb, *s;
1678         int hi, demoted, quit, grant_restart, demote_restart;
1679         int deadlk;
1680
1681         quit = 0;
1682  restart:
1683         grant_restart = 0;
1684         demote_restart = 0;
1685         hi = DLM_LOCK_IV;
1686
1687         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1688                 demoted = is_demoted(lkb);
1689                 deadlk = 0;
1690
1691                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1692                         grant_lock_pending(r, lkb);
1693                         grant_restart = 1;
1694                         continue;
1695                 }
1696
1697                 if (!demoted && is_demoted(lkb)) {
1698                         log_print("WARN: pending demoted %x node %d %s",
1699                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1700                         demote_restart = 1;
1701                         continue;
1702                 }
1703
1704                 if (deadlk) {
1705                         log_print("WARN: pending deadlock %x node %d %s",
1706                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1707                         dlm_dump_rsb(r);
1708                         continue;
1709                 }
1710
1711                 hi = max_t(int, lkb->lkb_rqmode, hi);
1712         }
1713
1714         if (grant_restart)
1715                 goto restart;
1716         if (demote_restart && !quit) {
1717                 quit = 1;
1718                 goto restart;
1719         }
1720
1721         return max_t(int, high, hi);
1722 }
1723
1724 static int grant_pending_wait(struct dlm_rsb *r, int high)
1725 {
1726         struct dlm_lkb *lkb, *s;
1727
1728         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1729                 if (can_be_granted(r, lkb, 0, NULL))
1730                         grant_lock_pending(r, lkb);
1731                 else
1732                         high = max_t(int, lkb->lkb_rqmode, high);
1733         }
1734
1735         return high;
1736 }
1737
1738 static void grant_pending_locks(struct dlm_rsb *r)
1739 {
1740         struct dlm_lkb *lkb, *s;
1741         int high = DLM_LOCK_IV;
1742
1743         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1744
1745         high = grant_pending_convert(r, high);
1746         high = grant_pending_wait(r, high);
1747
1748         if (high == DLM_LOCK_IV)
1749                 return;
1750
1751         /*
1752          * If there are locks left on the wait/convert queue then send blocking
1753          * ASTs to granted locks based on the largest requested mode (high)
1754          * found above. FIXME: highbast < high comparison not valid for PR/CW.
1755          */
1756
1757         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1758                 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1759                     !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1760                         queue_bast(r, lkb, high);
1761                         lkb->lkb_highbast = high;
1762                 }
1763         }
1764 }
1765
1766 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1767                             struct dlm_lkb *lkb)
1768 {
1769         struct dlm_lkb *gr;
1770
1771         list_for_each_entry(gr, head, lkb_statequeue) {
1772                 if (gr->lkb_bastaddr &&
1773                     gr->lkb_highbast < lkb->lkb_rqmode &&
1774                     !modes_compat(gr, lkb)) {
1775                         queue_bast(r, gr, lkb->lkb_rqmode);
1776                         gr->lkb_highbast = lkb->lkb_rqmode;
1777                 }
1778         }
1779 }
1780
1781 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1782 {
1783         send_bast_queue(r, &r->res_grantqueue, lkb);
1784 }
1785
1786 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1787 {
1788         send_bast_queue(r, &r->res_grantqueue, lkb);
1789         send_bast_queue(r, &r->res_convertqueue, lkb);
1790 }
1791
1792 /* set_master(r, lkb) -- set the master nodeid of a resource
1793
1794    The purpose of this function is to set the nodeid field in the given
1795    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1796    known, it can just be copied to the lkb and the function will return
1797    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1798    before it can be copied to the lkb.
1799
1800    When the rsb nodeid is being looked up remotely, the initial lkb
1801    causing the lookup is kept on the ls_waiters list waiting for the
1802    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1803    on the rsb's res_lookup list until the master is verified.
1804
1805    Return values:
1806    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1807    1: the rsb master is not available and the lkb has been placed on
1808       a wait queue
1809 */
1810
1811 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1812 {
1813         struct dlm_ls *ls = r->res_ls;
1814         int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1815
1816         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1817                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1818                 r->res_first_lkid = lkb->lkb_id;
1819                 lkb->lkb_nodeid = r->res_nodeid;
1820                 return 0;
1821         }
1822
1823         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1824                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1825                 return 1;
1826         }
1827
1828         if (r->res_nodeid == 0) {
1829                 lkb->lkb_nodeid = 0;
1830                 return 0;
1831         }
1832
1833         if (r->res_nodeid > 0) {
1834                 lkb->lkb_nodeid = r->res_nodeid;
1835                 return 0;
1836         }
1837
1838         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1839
1840         dir_nodeid = dlm_dir_nodeid(r);
1841
1842         if (dir_nodeid != our_nodeid) {
1843                 r->res_first_lkid = lkb->lkb_id;
1844                 send_lookup(r, lkb);
1845                 return 1;
1846         }
1847
1848         for (;;) {
1849                 /* It's possible for dlm_scand to remove an old rsb for
1850                    this same resource from the toss list, us to create
1851                    a new one, look up the master locally, and find it
1852                    already exists just before dlm_scand does the
1853                    dir_remove() on the previous rsb. */
1854
1855                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1856                                        r->res_length, &ret_nodeid);
1857                 if (!error)
1858                         break;
1859                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1860                 schedule();
1861         }
1862
1863         if (ret_nodeid == our_nodeid) {
1864                 r->res_first_lkid = 0;
1865                 r->res_nodeid = 0;
1866                 lkb->lkb_nodeid = 0;
1867         } else {
1868                 r->res_first_lkid = lkb->lkb_id;
1869                 r->res_nodeid = ret_nodeid;
1870                 lkb->lkb_nodeid = ret_nodeid;
1871         }
1872         return 0;
1873 }
1874
1875 static void process_lookup_list(struct dlm_rsb *r)
1876 {
1877         struct dlm_lkb *lkb, *safe;
1878
1879         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1880                 list_del_init(&lkb->lkb_rsb_lookup);
1881                 _request_lock(r, lkb);
1882                 schedule();
1883         }
1884 }
1885
1886 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1887
1888 static void confirm_master(struct dlm_rsb *r, int error)
1889 {
1890         struct dlm_lkb *lkb;
1891
1892         if (!r->res_first_lkid)
1893                 return;
1894
1895         switch (error) {
1896         case 0:
1897         case -EINPROGRESS:
1898                 r->res_first_lkid = 0;
1899                 process_lookup_list(r);
1900                 break;
1901
1902         case -EAGAIN:
1903                 /* the remote master didn't queue our NOQUEUE request;
1904                    make a waiting lkb the first_lkid */
1905
1906                 r->res_first_lkid = 0;
1907
1908                 if (!list_empty(&r->res_lookup)) {
1909                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1910                                          lkb_rsb_lookup);
1911                         list_del_init(&lkb->lkb_rsb_lookup);
1912                         r->res_first_lkid = lkb->lkb_id;
1913                         _request_lock(r, lkb);
1914                 } else
1915                         r->res_nodeid = -1;
1916                 break;
1917
1918         default:
1919                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1920         }
1921 }
1922
1923 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1924                          int namelen, unsigned long timeout_cs, void *ast,
1925                          void *astarg, void *bast, struct dlm_args *args)
1926 {
1927         int rv = -EINVAL;
1928
1929         /* check for invalid arg usage */
1930
1931         if (mode < 0 || mode > DLM_LOCK_EX)
1932                 goto out;
1933
1934         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1935                 goto out;
1936
1937         if (flags & DLM_LKF_CANCEL)
1938                 goto out;
1939
1940         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1941                 goto out;
1942
1943         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1944                 goto out;
1945
1946         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1947                 goto out;
1948
1949         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1950                 goto out;
1951
1952         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1953                 goto out;
1954
1955         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1956                 goto out;
1957
1958         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1959                 goto out;
1960
1961         if (!ast || !lksb)
1962                 goto out;
1963
1964         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1965                 goto out;
1966
1967         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1968                 goto out;
1969
1970         /* these args will be copied to the lkb in validate_lock_args,
1971            it cannot be done now because when converting locks, fields in
1972            an active lkb cannot be modified before locking the rsb */
1973
1974         args->flags = flags;
1975         args->astaddr = ast;
1976         args->astparam = (long) astarg;
1977         args->bastaddr = bast;
1978         args->timeout = timeout_cs;
1979         args->mode = mode;
1980         args->lksb = lksb;
1981         rv = 0;
1982  out:
1983         return rv;
1984 }
1985
1986 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1987 {
1988         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1989                       DLM_LKF_FORCEUNLOCK))
1990                 return -EINVAL;
1991
1992         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1993                 return -EINVAL;
1994
1995         args->flags = flags;
1996         args->astparam = (long) astarg;
1997         return 0;
1998 }
1999
2000 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2001                               struct dlm_args *args)
2002 {
2003         int rv = -EINVAL;
2004
2005         if (args->flags & DLM_LKF_CONVERT) {
2006                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2007                         goto out;
2008
2009                 if (args->flags & DLM_LKF_QUECVT &&
2010                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2011                         goto out;
2012
2013                 rv = -EBUSY;
2014                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2015                         goto out;
2016
2017                 if (lkb->lkb_wait_type)
2018                         goto out;
2019
2020                 if (is_overlap(lkb))
2021                         goto out;
2022         }
2023
2024         lkb->lkb_exflags = args->flags;
2025         lkb->lkb_sbflags = 0;
2026         lkb->lkb_astaddr = args->astaddr;
2027         lkb->lkb_astparam = args->astparam;
2028         lkb->lkb_bastaddr = args->bastaddr;
2029         lkb->lkb_rqmode = args->mode;
2030         lkb->lkb_lksb = args->lksb;
2031         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2032         lkb->lkb_ownpid = (int) current->pid;
2033         lkb->lkb_timeout_cs = args->timeout;
2034         rv = 0;
2035  out:
2036         return rv;
2037 }
2038
2039 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2040    for success */
2041
2042 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2043    because there may be a lookup in progress and it's valid to do
2044    cancel/unlockf on it */
2045
2046 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2047 {
2048         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2049         int rv = -EINVAL;
2050
2051         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2052                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2053                 dlm_print_lkb(lkb);
2054                 goto out;
2055         }
2056
2057         /* an lkb may still exist even though the lock is EOL'ed due to a
2058            cancel, unlock or failed noqueue request; an app can't use these
2059            locks; return same error as if the lkid had not been found at all */
2060
2061         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2062                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2063                 rv = -ENOENT;
2064                 goto out;
2065         }
2066
2067         /* an lkb may be waiting for an rsb lookup to complete where the
2068            lookup was initiated by another lock */
2069
2070         if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2071                 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2072                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2073                         list_del_init(&lkb->lkb_rsb_lookup);
2074                         queue_cast(lkb->lkb_resource, lkb,
2075                                    args->flags & DLM_LKF_CANCEL ?
2076                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2077                         unhold_lkb(lkb); /* undoes create_lkb() */
2078                         rv = -EBUSY;
2079                         goto out;
2080                 }
2081         }
2082
2083         /* cancel not allowed with another cancel/unlock in progress */
2084
2085         if (args->flags & DLM_LKF_CANCEL) {
2086                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2087                         goto out;
2088
2089                 if (is_overlap(lkb))
2090                         goto out;
2091
2092                 /* don't let scand try to do a cancel */
2093                 del_timeout(lkb);
2094
2095                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2096                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2097                         rv = -EBUSY;
2098                         goto out;
2099                 }
2100
2101                 switch (lkb->lkb_wait_type) {
2102                 case DLM_MSG_LOOKUP:
2103                 case DLM_MSG_REQUEST:
2104                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2105                         rv = -EBUSY;
2106                         goto out;
2107                 case DLM_MSG_UNLOCK:
2108                 case DLM_MSG_CANCEL:
2109                         goto out;
2110                 }
2111                 /* add_to_waiters() will set OVERLAP_CANCEL */
2112                 goto out_ok;
2113         }
2114
2115         /* do we need to allow a force-unlock if there's a normal unlock
2116            already in progress?  in what conditions could the normal unlock
2117            fail such that we'd want to send a force-unlock to be sure? */
2118
2119         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2120                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2121                         goto out;
2122
2123                 if (is_overlap_unlock(lkb))
2124                         goto out;
2125
2126                 /* don't let scand try to do a cancel */
2127                 del_timeout(lkb);
2128
2129                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2130                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2131                         rv = -EBUSY;
2132                         goto out;
2133                 }
2134
2135                 switch (lkb->lkb_wait_type) {
2136                 case DLM_MSG_LOOKUP:
2137                 case DLM_MSG_REQUEST:
2138                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2139                         rv = -EBUSY;
2140                         goto out;
2141                 case DLM_MSG_UNLOCK:
2142                         goto out;
2143                 }
2144                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2145                 goto out_ok;
2146         }
2147
2148         /* normal unlock not allowed if there's any op in progress */
2149         rv = -EBUSY;
2150         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2151                 goto out;
2152
2153  out_ok:
2154         /* an overlapping op shouldn't blow away exflags from other op */
2155         lkb->lkb_exflags |= args->flags;
2156         lkb->lkb_sbflags = 0;
2157         lkb->lkb_astparam = args->astparam;
2158         rv = 0;
2159  out:
2160         if (rv)
2161                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2162                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2163                           args->flags, lkb->lkb_wait_type,
2164                           lkb->lkb_resource->res_name);
2165         return rv;
2166 }
2167
2168 /*
2169  * Four stage 4 varieties:
2170  * do_request(), do_convert(), do_unlock(), do_cancel()
2171  * These are called on the master node for the given lock and
2172  * from the central locking logic.
2173  */
2174
2175 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2176 {
2177         int error = 0;
2178
2179         if (can_be_granted(r, lkb, 1, NULL)) {
2180                 grant_lock(r, lkb);
2181                 queue_cast(r, lkb, 0);
2182                 goto out;
2183         }
2184
2185         if (can_be_queued(lkb)) {
2186                 error = -EINPROGRESS;
2187                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2188                 send_blocking_asts(r, lkb);
2189                 add_timeout(lkb);
2190                 goto out;
2191         }
2192
2193         error = -EAGAIN;
2194         if (force_blocking_asts(lkb))
2195                 send_blocking_asts_all(r, lkb);
2196         queue_cast(r, lkb, -EAGAIN);
2197
2198  out:
2199         return error;
2200 }
2201
2202 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2203 {
2204         int error = 0;
2205         int deadlk = 0;
2206
2207         /* changing an existing lock may allow others to be granted */
2208
2209         if (can_be_granted(r, lkb, 1, &deadlk)) {
2210                 grant_lock(r, lkb);
2211                 queue_cast(r, lkb, 0);
2212                 grant_pending_locks(r);
2213                 goto out;
2214         }
2215
2216         /* can_be_granted() detected that this lock would block in a conversion
2217            deadlock, so we leave it on the granted queue and return EDEADLK in
2218            the ast for the convert. */
2219
2220         if (deadlk) {
2221                 /* it's left on the granted queue */
2222                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2223                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2224                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2225                 revert_lock(r, lkb);
2226                 queue_cast(r, lkb, -EDEADLK);
2227                 error = -EDEADLK;
2228                 goto out;
2229         }
2230
2231         /* is_demoted() means the can_be_granted() above set the grmode
2232            to NL, and left us on the granted queue.  This auto-demotion
2233            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2234            now grantable.  We have to try to grant other converting locks
2235            before we try again to grant this one. */
2236
2237         if (is_demoted(lkb)) {
2238                 grant_pending_convert(r, DLM_LOCK_IV);
2239                 if (_can_be_granted(r, lkb, 1)) {
2240                         grant_lock(r, lkb);
2241                         queue_cast(r, lkb, 0);
2242                         grant_pending_locks(r);
2243                         goto out;
2244                 }
2245                 /* else fall through and move to convert queue */
2246         }
2247
2248         if (can_be_queued(lkb)) {
2249                 error = -EINPROGRESS;
2250                 del_lkb(r, lkb);
2251                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2252                 send_blocking_asts(r, lkb);
2253                 add_timeout(lkb);
2254                 goto out;
2255         }
2256
2257         error = -EAGAIN;
2258         if (force_blocking_asts(lkb))
2259                 send_blocking_asts_all(r, lkb);
2260         queue_cast(r, lkb, -EAGAIN);
2261
2262  out:
2263         return error;
2264 }
2265
2266 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2267 {
2268         remove_lock(r, lkb);
2269         queue_cast(r, lkb, -DLM_EUNLOCK);
2270         grant_pending_locks(r);
2271         return -DLM_EUNLOCK;
2272 }
2273
2274 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2275  
2276 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2277 {
2278         int error;
2279
2280         error = revert_lock(r, lkb);
2281         if (error) {
2282                 queue_cast(r, lkb, -DLM_ECANCEL);
2283                 grant_pending_locks(r);
2284                 return -DLM_ECANCEL;
2285         }
2286         return 0;
2287 }
2288
2289 /*
2290  * Four stage 3 varieties:
2291  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2292  */
2293
2294 /* add a new lkb to a possibly new rsb, called by requesting process */
2295
2296 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2297 {
2298         int error;
2299
2300         /* set_master: sets lkb nodeid from r */
2301
2302         error = set_master(r, lkb);
2303         if (error < 0)
2304                 goto out;
2305         if (error) {
2306                 error = 0;
2307                 goto out;
2308         }
2309
2310         if (is_remote(r))
2311                 /* receive_request() calls do_request() on remote node */
2312                 error = send_request(r, lkb);
2313         else
2314                 error = do_request(r, lkb);
2315  out:
2316         return error;
2317 }
2318
2319 /* change some property of an existing lkb, e.g. mode */
2320
2321 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2322 {
2323         int error;
2324
2325         if (is_remote(r))
2326                 /* receive_convert() calls do_convert() on remote node */
2327                 error = send_convert(r, lkb);
2328         else
2329                 error = do_convert(r, lkb);
2330
2331         return error;
2332 }
2333
2334 /* remove an existing lkb from the granted queue */
2335
2336 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2337 {
2338         int error;
2339
2340         if (is_remote(r))
2341                 /* receive_unlock() calls do_unlock() on remote node */
2342                 error = send_unlock(r, lkb);
2343         else
2344                 error = do_unlock(r, lkb);
2345
2346         return error;
2347 }
2348
2349 /* remove an existing lkb from the convert or wait queue */
2350
2351 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2352 {
2353         int error;
2354
2355         if (is_remote(r))
2356                 /* receive_cancel() calls do_cancel() on remote node */
2357                 error = send_cancel(r, lkb);
2358         else
2359                 error = do_cancel(r, lkb);
2360
2361         return error;
2362 }
2363
2364 /*
2365  * Four stage 2 varieties:
2366  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2367  */
2368
2369 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2370                         int len, struct dlm_args *args)
2371 {
2372         struct dlm_rsb *r;
2373         int error;
2374
2375         error = validate_lock_args(ls, lkb, args);
2376         if (error)
2377                 goto out;
2378
2379         error = find_rsb(ls, name, len, R_CREATE, &r);
2380         if (error)
2381                 goto out;
2382
2383         lock_rsb(r);
2384
2385         attach_lkb(r, lkb);
2386         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2387
2388         error = _request_lock(r, lkb);
2389
2390         unlock_rsb(r);
2391         put_rsb(r);
2392
2393  out:
2394         return error;
2395 }
2396
2397 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2398                         struct dlm_args *args)
2399 {
2400         struct dlm_rsb *r;
2401         int error;
2402
2403         r = lkb->lkb_resource;
2404
2405         hold_rsb(r);
2406         lock_rsb(r);
2407
2408         error = validate_lock_args(ls, lkb, args);
2409         if (error)
2410                 goto out;
2411
2412         error = _convert_lock(r, lkb);
2413  out:
2414         unlock_rsb(r);
2415         put_rsb(r);
2416         return error;
2417 }
2418
2419 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2420                        struct dlm_args *args)
2421 {
2422         struct dlm_rsb *r;
2423         int error;
2424
2425         r = lkb->lkb_resource;
2426
2427         hold_rsb(r);
2428         lock_rsb(r);
2429
2430         error = validate_unlock_args(lkb, args);
2431         if (error)
2432                 goto out;
2433
2434         error = _unlock_lock(r, lkb);
2435  out:
2436         unlock_rsb(r);
2437         put_rsb(r);
2438         return error;
2439 }
2440
2441 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2442                        struct dlm_args *args)
2443 {
2444         struct dlm_rsb *r;
2445         int error;
2446
2447         r = lkb->lkb_resource;
2448
2449         hold_rsb(r);
2450         lock_rsb(r);
2451
2452         error = validate_unlock_args(lkb, args);
2453         if (error)
2454                 goto out;
2455
2456         error = _cancel_lock(r, lkb);
2457  out:
2458         unlock_rsb(r);
2459         put_rsb(r);
2460         return error;
2461 }
2462
2463 /*
2464  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2465  */
2466
2467 int dlm_lock(dlm_lockspace_t *lockspace,
2468              int mode,
2469              struct dlm_lksb *lksb,
2470              uint32_t flags,
2471              void *name,
2472              unsigned int namelen,
2473              uint32_t parent_lkid,
2474              void (*ast) (void *astarg),
2475              void *astarg,
2476              void (*bast) (void *astarg, int mode))
2477 {
2478         struct dlm_ls *ls;
2479         struct dlm_lkb *lkb;
2480         struct dlm_args args;
2481         int error, convert = flags & DLM_LKF_CONVERT;
2482
2483         ls = dlm_find_lockspace_local(lockspace);
2484         if (!ls)
2485                 return -EINVAL;
2486
2487         dlm_lock_recovery(ls);
2488
2489         if (convert)
2490                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2491         else
2492                 error = create_lkb(ls, &lkb);
2493
2494         if (error)
2495                 goto out;
2496
2497         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2498                               astarg, bast, &args);
2499         if (error)
2500                 goto out_put;
2501
2502         if (convert)
2503                 error = convert_lock(ls, lkb, &args);
2504         else
2505                 error = request_lock(ls, lkb, name, namelen, &args);
2506
2507         if (error == -EINPROGRESS)
2508                 error = 0;
2509  out_put:
2510         if (convert || error)
2511                 __put_lkb(ls, lkb);
2512         if (error == -EAGAIN || error == -EDEADLK)
2513                 error = 0;
2514  out:
2515         dlm_unlock_recovery(ls);
2516         dlm_put_lockspace(ls);
2517         return error;
2518 }
2519
2520 int dlm_unlock(dlm_lockspace_t *lockspace,
2521                uint32_t lkid,
2522                uint32_t flags,
2523                struct dlm_lksb *lksb,
2524                void *astarg)
2525 {
2526         struct dlm_ls *ls;
2527         struct dlm_lkb *lkb;
2528         struct dlm_args args;
2529         int error;
2530
2531         ls = dlm_find_lockspace_local(lockspace);
2532         if (!ls)
2533                 return -EINVAL;
2534
2535         dlm_lock_recovery(ls);
2536
2537         error = find_lkb(ls, lkid, &lkb);
2538         if (error)
2539                 goto out;
2540
2541         error = set_unlock_args(flags, astarg, &args);
2542         if (error)
2543                 goto out_put;
2544
2545         if (flags & DLM_LKF_CANCEL)
2546                 error = cancel_lock(ls, lkb, &args);
2547         else
2548                 error = unlock_lock(ls, lkb, &args);
2549
2550         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2551                 error = 0;
2552         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2553                 error = 0;
2554  out_put:
2555         dlm_put_lkb(lkb);
2556  out:
2557         dlm_unlock_recovery(ls);
2558         dlm_put_lockspace(ls);
2559         return error;
2560 }
2561
2562 /*
2563  * send/receive routines for remote operations and replies
2564  *
2565  * send_args
2566  * send_common
2567  * send_request                 receive_request
2568  * send_convert                 receive_convert
2569  * send_unlock                  receive_unlock
2570  * send_cancel                  receive_cancel
2571  * send_grant                   receive_grant
2572  * send_bast                    receive_bast
2573  * send_lookup                  receive_lookup
2574  * send_remove                  receive_remove
2575  *
2576  *                              send_common_reply
2577  * receive_request_reply        send_request_reply
2578  * receive_convert_reply        send_convert_reply
2579  * receive_unlock_reply         send_unlock_reply
2580  * receive_cancel_reply         send_cancel_reply
2581  * receive_lookup_reply         send_lookup_reply
2582  */
2583
2584 static int _create_message(struct dlm_ls *ls, int mb_len,
2585                            int to_nodeid, int mstype,
2586                            struct dlm_message **ms_ret,
2587                            struct dlm_mhandle **mh_ret)
2588 {
2589         struct dlm_message *ms;
2590         struct dlm_mhandle *mh;
2591         char *mb;
2592
2593         /* get_buffer gives us a message handle (mh) that we need to
2594            pass into lowcomms_commit and a message buffer (mb) that we
2595            write our data into */
2596
2597         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2598         if (!mh)
2599                 return -ENOBUFS;
2600
2601         memset(mb, 0, mb_len);
2602
2603         ms = (struct dlm_message *) mb;
2604
2605         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2606         ms->m_header.h_lockspace = ls->ls_global_id;
2607         ms->m_header.h_nodeid = dlm_our_nodeid();
2608         ms->m_header.h_length = mb_len;
2609         ms->m_header.h_cmd = DLM_MSG;
2610
2611         ms->m_type = mstype;
2612
2613         *mh_ret = mh;
2614         *ms_ret = ms;
2615         return 0;
2616 }
2617
2618 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2619                           int to_nodeid, int mstype,
2620                           struct dlm_message **ms_ret,
2621                           struct dlm_mhandle **mh_ret)
2622 {
2623         int mb_len = sizeof(struct dlm_message);
2624
2625         switch (mstype) {
2626         case DLM_MSG_REQUEST:
2627         case DLM_MSG_LOOKUP:
2628         case DLM_MSG_REMOVE:
2629                 mb_len += r->res_length;
2630                 break;
2631         case DLM_MSG_CONVERT:
2632         case DLM_MSG_UNLOCK:
2633         case DLM_MSG_REQUEST_REPLY:
2634         case DLM_MSG_CONVERT_REPLY:
2635         case DLM_MSG_GRANT:
2636                 if (lkb && lkb->lkb_lvbptr)
2637                         mb_len += r->res_ls->ls_lvblen;
2638                 break;
2639         }
2640
2641         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2642                                ms_ret, mh_ret);
2643 }
2644
2645 /* further lowcomms enhancements or alternate implementations may make
2646    the return value from this function useful at some point */
2647
2648 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2649 {
2650         dlm_message_out(ms);
2651         dlm_lowcomms_commit_buffer(mh);
2652         return 0;
2653 }
2654
2655 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2656                       struct dlm_message *ms)
2657 {
2658         ms->m_nodeid   = lkb->lkb_nodeid;
2659         ms->m_pid      = lkb->lkb_ownpid;
2660         ms->m_lkid     = lkb->lkb_id;
2661         ms->m_remid    = lkb->lkb_remid;
2662         ms->m_exflags  = lkb->lkb_exflags;
2663         ms->m_sbflags  = lkb->lkb_sbflags;
2664         ms->m_flags    = lkb->lkb_flags;
2665         ms->m_lvbseq   = lkb->lkb_lvbseq;
2666         ms->m_status   = lkb->lkb_status;
2667         ms->m_grmode   = lkb->lkb_grmode;
2668         ms->m_rqmode   = lkb->lkb_rqmode;
2669         ms->m_hash     = r->res_hash;
2670
2671         /* m_result and m_bastmode are set from function args,
2672            not from lkb fields */
2673
2674         if (lkb->lkb_bastaddr)
2675                 ms->m_asts |= AST_BAST;
2676         if (lkb->lkb_astaddr)
2677                 ms->m_asts |= AST_COMP;
2678
2679         /* compare with switch in create_message; send_remove() doesn't
2680            use send_args() */
2681
2682         switch (ms->m_type) {
2683         case DLM_MSG_REQUEST:
2684         case DLM_MSG_LOOKUP:
2685                 memcpy(ms->m_extra, r->res_name, r->res_length);
2686                 break;
2687         case DLM_MSG_CONVERT:
2688         case DLM_MSG_UNLOCK:
2689         case DLM_MSG_REQUEST_REPLY:
2690         case DLM_MSG_CONVERT_REPLY:
2691         case DLM_MSG_GRANT:
2692                 if (!lkb->lkb_lvbptr)
2693                         break;
2694                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2695                 break;
2696         }
2697 }
2698
2699 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2700 {
2701         struct dlm_message *ms;
2702         struct dlm_mhandle *mh;
2703         int to_nodeid, error;
2704
2705         error = add_to_waiters(lkb, mstype);
2706         if (error)
2707                 return error;
2708
2709         to_nodeid = r->res_nodeid;
2710
2711         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2712         if (error)
2713                 goto fail;
2714
2715         send_args(r, lkb, ms);
2716
2717         error = send_message(mh, ms);
2718         if (error)
2719                 goto fail;
2720         return 0;
2721
2722  fail:
2723         remove_from_waiters(lkb, msg_reply_type(mstype));
2724         return error;
2725 }
2726
2727 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2728 {
2729         return send_common(r, lkb, DLM_MSG_REQUEST);
2730 }
2731
2732 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2733 {
2734         int error;
2735
2736         error = send_common(r, lkb, DLM_MSG_CONVERT);
2737
2738         /* down conversions go without a reply from the master */
2739         if (!error && down_conversion(lkb)) {
2740                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2741                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2742                 r->res_ls->ls_stub_ms.m_result = 0;
2743                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2744                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2745         }
2746
2747         return error;
2748 }
2749
2750 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2751    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2752    that the master is still correct. */
2753
2754 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2755 {
2756         return send_common(r, lkb, DLM_MSG_UNLOCK);
2757 }
2758
2759 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2760 {
2761         return send_common(r, lkb, DLM_MSG_CANCEL);
2762 }
2763
2764 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2765 {
2766         struct dlm_message *ms;
2767         struct dlm_mhandle *mh;
2768         int to_nodeid, error;
2769
2770         to_nodeid = lkb->lkb_nodeid;
2771
2772         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2773         if (error)
2774                 goto out;
2775
2776         send_args(r, lkb, ms);
2777
2778         ms->m_result = 0;
2779
2780         error = send_message(mh, ms);
2781  out:
2782         return error;
2783 }
2784
2785 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2786 {
2787         struct dlm_message *ms;
2788         struct dlm_mhandle *mh;
2789         int to_nodeid, error;
2790
2791         to_nodeid = lkb->lkb_nodeid;
2792
2793         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2794         if (error)
2795                 goto out;
2796
2797         send_args(r, lkb, ms);
2798
2799         ms->m_bastmode = mode;
2800
2801         error = send_message(mh, ms);
2802  out:
2803         return error;
2804 }
2805
2806 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2807 {
2808         struct dlm_message *ms;
2809         struct dlm_mhandle *mh;
2810         int to_nodeid, error;
2811
2812         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2813         if (error)
2814                 return error;
2815
2816         to_nodeid = dlm_dir_nodeid(r);
2817
2818         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2819         if (error)
2820                 goto fail;
2821
2822         send_args(r, lkb, ms);
2823
2824         error = send_message(mh, ms);
2825         if (error)
2826                 goto fail;
2827         return 0;
2828
2829  fail:
2830         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2831         return error;
2832 }
2833
2834 static int send_remove(struct dlm_rsb *r)
2835 {
2836         struct dlm_message *ms;
2837         struct dlm_mhandle *mh;
2838         int to_nodeid, error;
2839
2840         to_nodeid = dlm_dir_nodeid(r);
2841
2842         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2843         if (error)
2844                 goto out;
2845
2846         memcpy(ms->m_extra, r->res_name, r->res_length);
2847         ms->m_hash = r->res_hash;
2848
2849         error = send_message(mh, ms);
2850  out:
2851         return error;
2852 }
2853
2854 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2855                              int mstype, int rv)
2856 {
2857         struct dlm_message *ms;
2858         struct dlm_mhandle *mh;
2859         int to_nodeid, error;
2860
2861         to_nodeid = lkb->lkb_nodeid;
2862
2863         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2864         if (error)
2865                 goto out;
2866
2867         send_args(r, lkb, ms);
2868
2869         ms->m_result = rv;
2870
2871         error = send_message(mh, ms);
2872  out:
2873         return error;
2874 }
2875
2876 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2877 {
2878         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2879 }
2880
2881 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2882 {
2883         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2884 }
2885
2886 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2887 {
2888         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2889 }
2890
2891 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2892 {
2893         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2894 }
2895
2896 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2897                              int ret_nodeid, int rv)
2898 {
2899         struct dlm_rsb *r = &ls->ls_stub_rsb;
2900         struct dlm_message *ms;
2901         struct dlm_mhandle *mh;
2902         int error, nodeid = ms_in->m_header.h_nodeid;
2903
2904         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2905         if (error)
2906                 goto out;
2907
2908         ms->m_lkid = ms_in->m_lkid;
2909         ms->m_result = rv;
2910         ms->m_nodeid = ret_nodeid;
2911
2912         error = send_message(mh, ms);
2913  out:
2914         return error;
2915 }
2916
2917 /* which args we save from a received message depends heavily on the type
2918    of message, unlike the send side where we can safely send everything about
2919    the lkb for any type of message */
2920
2921 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2922 {
2923         lkb->lkb_exflags = ms->m_exflags;
2924         lkb->lkb_sbflags = ms->m_sbflags;
2925         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2926                          (ms->m_flags & 0x0000FFFF);
2927 }
2928
2929 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2930 {
2931         lkb->lkb_sbflags = ms->m_sbflags;
2932         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2933                          (ms->m_flags & 0x0000FFFF);
2934 }
2935
2936 static int receive_extralen(struct dlm_message *ms)
2937 {
2938         return (ms->m_header.h_length - sizeof(struct dlm_message));
2939 }
2940
2941 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2942                        struct dlm_message *ms)
2943 {
2944         int len;
2945
2946         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2947                 if (!lkb->lkb_lvbptr)
2948                         lkb->lkb_lvbptr = allocate_lvb(ls);
2949                 if (!lkb->lkb_lvbptr)
2950                         return -ENOMEM;
2951                 len = receive_extralen(ms);
2952                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2953         }
2954         return 0;
2955 }
2956
2957 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2958                                 struct dlm_message *ms)
2959 {
2960         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2961         lkb->lkb_ownpid = ms->m_pid;
2962         lkb->lkb_remid = ms->m_lkid;
2963         lkb->lkb_grmode = DLM_LOCK_IV;
2964         lkb->lkb_rqmode = ms->m_rqmode;
2965         lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2966         lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2967
2968         DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2969
2970         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2971                 /* lkb was just created so there won't be an lvb yet */
2972                 lkb->lkb_lvbptr = allocate_lvb(ls);
2973                 if (!lkb->lkb_lvbptr)
2974                         return -ENOMEM;
2975         }
2976
2977         return 0;
2978 }
2979
2980 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2981                                 struct dlm_message *ms)
2982 {
2983         if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2984                 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2985                           lkb->lkb_nodeid, ms->m_header.h_nodeid,
2986                           lkb->lkb_id, lkb->lkb_remid);
2987                 return -EINVAL;
2988         }
2989
2990         if (!is_master_copy(lkb))
2991                 return -EINVAL;
2992
2993         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2994                 return -EBUSY;
2995
2996         if (receive_lvb(ls, lkb, ms))
2997                 return -ENOMEM;
2998
2999         lkb->lkb_rqmode = ms->m_rqmode;
3000         lkb->lkb_lvbseq = ms->m_lvbseq;
3001
3002         return 0;
3003 }
3004
3005 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3006                                struct dlm_message *ms)
3007 {
3008         if (!is_master_copy(lkb))
3009                 return -EINVAL;
3010         if (receive_lvb(ls, lkb, ms))
3011                 return -ENOMEM;
3012         return 0;
3013 }
3014
3015 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3016    uses to send a reply and that the remote end uses to process the reply. */
3017
3018 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3019 {
3020         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3021         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3022         lkb->lkb_remid = ms->m_lkid;
3023 }
3024
3025 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3026 {
3027         struct dlm_lkb *lkb;
3028         struct dlm_rsb *r;
3029         int error, namelen;
3030
3031         error = create_lkb(ls, &lkb);
3032         if (error)
3033                 goto fail;
3034
3035         receive_flags(lkb, ms);
3036         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3037         error = receive_request_args(ls, lkb, ms);
3038         if (error) {
3039                 __put_lkb(ls, lkb);
3040                 goto fail;
3041         }
3042
3043         namelen = receive_extralen(ms);
3044
3045         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3046         if (error) {
3047                 __put_lkb(ls, lkb);
3048                 goto fail;
3049         }
3050
3051         lock_rsb(r);
3052
3053         attach_lkb(r, lkb);
3054         error = do_request(r, lkb);
3055         send_request_reply(r, lkb, error);
3056
3057         unlock_rsb(r);
3058         put_rsb(r);
3059
3060         if (error == -EINPROGRESS)
3061                 error = 0;
3062         if (error)
3063                 dlm_put_lkb(lkb);
3064         return;
3065
3066  fail:
3067         setup_stub_lkb(ls, ms);
3068         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3069 }
3070
3071 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3072 {
3073         struct dlm_lkb *lkb;
3074         struct dlm_rsb *r;
3075         int error, reply = 1;
3076
3077         error = find_lkb(ls, ms->m_remid, &lkb);
3078         if (error)
3079                 goto fail;
3080
3081         r = lkb->lkb_resource;
3082
3083         hold_rsb(r);
3084         lock_rsb(r);
3085
3086         receive_flags(lkb, ms);
3087         error = receive_convert_args(ls, lkb, ms);
3088         if (error)
3089                 goto out;
3090         reply = !down_conversion(lkb);
3091
3092         error = do_convert(r, lkb);
3093  out:
3094         if (reply)
3095                 send_convert_reply(r, lkb, error);
3096
3097         unlock_rsb(r);
3098         put_rsb(r);
3099         dlm_put_lkb(lkb);
3100         return;
3101
3102  fail:
3103         setup_stub_lkb(ls, ms);
3104         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3105 }
3106
3107 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3108 {
3109         struct dlm_lkb *lkb;
3110         struct dlm_rsb *r;
3111         int error;
3112
3113         error = find_lkb(ls, ms->m_remid, &lkb);
3114         if (error)
3115                 goto fail;
3116
3117         r = lkb->lkb_resource;
3118
3119         hold_rsb(r);
3120         lock_rsb(r);
3121
3122         receive_flags(lkb, ms);
3123         error = receive_unlock_args(ls, lkb, ms);
3124         if (error)
3125                 goto out;
3126
3127         error = do_unlock(r, lkb);
3128  out:
3129         send_unlock_reply(r, lkb, error);
3130
3131         unlock_rsb(r);
3132         put_rsb(r);
3133         dlm_put_lkb(lkb);
3134         return;
3135
3136  fail:
3137         setup_stub_lkb(ls, ms);
3138         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3139 }
3140
3141 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3142 {
3143         struct dlm_lkb *lkb;
3144         struct dlm_rsb *r;
3145         int error;
3146
3147         error = find_lkb(ls, ms->m_remid, &lkb);
3148         if (error)
3149                 goto fail;
3150
3151         receive_flags(lkb, ms);
3152
3153         r = lkb->lkb_resource;
3154
3155         hold_rsb(r);
3156         lock_rsb(r);
3157
3158         error = do_cancel(r, lkb);
3159         send_cancel_reply(r, lkb, error);
3160
3161         unlock_rsb(r);
3162         put_rsb(r);
3163         dlm_put_lkb(lkb);
3164         return;
3165
3166  fail:
3167         setup_stub_lkb(ls, ms);
3168         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3169 }
3170
3171 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3172 {
3173         struct dlm_lkb *lkb;
3174         struct dlm_rsb *r;
3175         int error;
3176
3177         error = find_lkb(ls, ms->m_remid, &lkb);
3178         if (error) {
3179                 log_error(ls, "receive_grant no lkb");
3180                 return;
3181         }
3182         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3183
3184         r = lkb->lkb_resource;
3185
3186         hold_rsb(r);
3187         lock_rsb(r);
3188
3189         receive_flags_reply(lkb, ms);
3190         if (is_altmode(lkb))
3191                 munge_altmode(lkb, ms);
3192         grant_lock_pc(r, lkb, ms);
3193         queue_cast(r, lkb, 0);
3194
3195         unlock_rsb(r);
3196         put_rsb(r);
3197         dlm_put_lkb(lkb);
3198 }
3199
3200 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3201 {
3202         struct dlm_lkb *lkb;
3203         struct dlm_rsb *r;
3204         int error;
3205
3206         error = find_lkb(ls, ms->m_remid, &lkb);
3207         if (error) {
3208                 log_error(ls, "receive_bast no lkb");
3209                 return;
3210         }
3211         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3212
3213         r = lkb->lkb_resource;
3214
3215         hold_rsb(r);
3216         lock_rsb(r);
3217
3218         queue_bast(r, lkb, ms->m_bastmode);
3219
3220         unlock_rsb(r);
3221         put_rsb(r);
3222         dlm_put_lkb(lkb);
3223 }
3224
3225 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3226 {
3227         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3228
3229         from_nodeid = ms->m_header.h_nodeid;
3230         our_nodeid = dlm_our_nodeid();
3231
3232         len = receive_extralen(ms);
3233
3234         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3235         if (dir_nodeid != our_nodeid) {
3236                 log_error(ls, "lookup dir_nodeid %d from %d",
3237                           dir_nodeid, from_nodeid);
3238                 error = -EINVAL;
3239                 ret_nodeid = -1;
3240                 goto out;
3241         }
3242
3243         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3244
3245         /* Optimization: we're master so treat lookup as a request */
3246         if (!error && ret_nodeid == our_nodeid) {
3247                 receive_request(ls, ms);
3248                 return;
3249         }
3250  out:
3251         send_lookup_reply(ls, ms, ret_nodeid, error);
3252 }
3253
3254 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3255 {
3256         int len, dir_nodeid, from_nodeid;
3257
3258         from_nodeid = ms->m_header.h_nodeid;
3259
3260         len = receive_extralen(ms);
3261
3262         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3263         if (dir_nodeid != dlm_our_nodeid()) {
3264                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3265                           dir_nodeid, from_nodeid);
3266                 return;
3267         }
3268
3269         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3270 }
3271
3272 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3273 {
3274         do_purge(ls, ms->m_nodeid, ms->m_pid);
3275 }
3276
3277 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3278 {
3279         struct dlm_lkb *lkb;
3280         struct dlm_rsb *r;
3281         int error, mstype, result;
3282
3283         error = find_lkb(ls, ms->m_remid, &lkb);
3284         if (error) {
3285                 log_error(ls, "receive_request_reply no lkb");
3286                 return;
3287         }
3288         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3289
3290         r = lkb->lkb_resource;
3291         hold_rsb(r);
3292         lock_rsb(r);
3293
3294         mstype = lkb->lkb_wait_type;
3295         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3296         if (error)
3297                 goto out;
3298
3299         /* Optimization: the dir node was also the master, so it took our
3300            lookup as a request and sent request reply instead of lookup reply */
3301         if (mstype == DLM_MSG_LOOKUP) {
3302                 r->res_nodeid = ms->m_header.h_nodeid;
3303                 lkb->lkb_nodeid = r->res_nodeid;
3304         }
3305
3306         /* this is the value returned from do_request() on the master */
3307         result = ms->m_result;
3308
3309         switch (result) {
3310         case -EAGAIN:
3311                 /* request would block (be queued) on remote master */
3312                 queue_cast(r, lkb, -EAGAIN);
3313                 confirm_master(r, -EAGAIN);
3314                 unhold_lkb(lkb); /* undoes create_lkb() */
3315                 break;
3316
3317         case -EINPROGRESS:
3318         case 0:
3319                 /* request was queued or granted on remote master */
3320                 receive_flags_reply(lkb, ms);
3321                 lkb->lkb_remid = ms->m_lkid;
3322                 if (is_altmode(lkb))
3323                         munge_altmode(lkb, ms);
3324                 if (result) {
3325                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3326                         add_timeout(lkb);
3327                 } else {
3328                         grant_lock_pc(r, lkb, ms);
3329                         queue_cast(r, lkb, 0);
3330                 }
3331                 confirm_master(r, result);
3332                 break;
3333
3334         case -EBADR:
3335         case -ENOTBLK:
3336                 /* find_rsb failed to find rsb or rsb wasn't master */
3337                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3338                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3339                 r->res_nodeid = -1;
3340                 lkb->lkb_nodeid = -1;
3341
3342                 if (is_overlap(lkb)) {
3343                         /* we'll ignore error in cancel/unlock reply */
3344                         queue_cast_overlap(r, lkb);
3345                         unhold_lkb(lkb); /* undoes create_lkb() */
3346                 } else
3347                         _request_lock(r, lkb);
3348                 break;
3349
3350         default:
3351                 log_error(ls, "receive_request_reply %x error %d",
3352                           lkb->lkb_id, result);
3353         }
3354
3355         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3356                 log_debug(ls, "receive_request_reply %x result %d unlock",
3357                           lkb->lkb_id, result);
3358                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3359                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3360                 send_unlock(r, lkb);
3361         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3362                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3363                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3364                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3365                 send_cancel(r, lkb);
3366         } else {
3367                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3368                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3369         }
3370  out:
3371         unlock_rsb(r);
3372         put_rsb(r);
3373         dlm_put_lkb(lkb);
3374 }
3375
3376 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3377                                     struct dlm_message *ms)
3378 {
3379         /* this is the value returned from do_convert() on the master */
3380         switch (ms->m_result) {
3381         case -EAGAIN:
3382                 /* convert would block (be queued) on remote master */
3383                 queue_cast(r, lkb, -EAGAIN);
3384                 break;
3385
3386         case -EDEADLK:
3387                 receive_flags_reply(lkb, ms);
3388                 revert_lock_pc(r, lkb);
3389                 queue_cast(r, lkb, -EDEADLK);
3390                 break;
3391
3392         case -EINPROGRESS:
3393                 /* convert was queued on remote master */
3394                 receive_flags_reply(lkb, ms);
3395                 if (is_demoted(lkb))
3396                         munge_demoted(lkb, ms);
3397                 del_lkb(r, lkb);
3398                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3399                 add_timeout(lkb);
3400                 break;
3401
3402         case 0:
3403                 /* convert was granted on remote master */
3404                 receive_flags_reply(lkb, ms);
3405                 if (is_demoted(lkb))
3406                         munge_demoted(lkb, ms);
3407                 grant_lock_pc(r, lkb, ms);
3408                 queue_cast(r, lkb, 0);
3409                 break;
3410
3411         default:
3412                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3413                           lkb->lkb_id, ms->m_result);
3414         }
3415 }
3416
3417 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3418 {
3419         struct dlm_rsb *r = lkb->lkb_resource;
3420         int error;
3421
3422         hold_rsb(r);
3423         lock_rsb(r);
3424
3425         /* stub reply can happen with waiters_mutex held */
3426         error = remove_from_waiters_ms(lkb, ms);
3427         if (error)
3428                 goto out;
3429
3430         __receive_convert_reply(r, lkb, ms);
3431  out:
3432         unlock_rsb(r);
3433         put_rsb(r);
3434 }
3435
3436 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3437 {
3438         struct dlm_lkb *lkb;
3439         int error;
3440
3441         error = find_lkb(ls, ms->m_remid, &lkb);
3442         if (error) {
3443                 log_error(ls, "receive_convert_reply no lkb");
3444                 return;
3445         }
3446         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3447
3448         _receive_convert_reply(lkb, ms);
3449         dlm_put_lkb(lkb);
3450 }
3451
3452 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3453 {
3454         struct dlm_rsb *r = lkb->lkb_resource;
3455         int error;
3456
3457         hold_rsb(r);
3458         lock_rsb(r);
3459
3460         /* stub reply can happen with waiters_mutex held */
3461         error = remove_from_waiters_ms(lkb, ms);
3462         if (error)
3463                 goto out;
3464
3465         /* this is the value returned from do_unlock() on the master */
3466
3467         switch (ms->m_result) {
3468         case -DLM_EUNLOCK:
3469                 receive_flags_reply(lkb, ms);
3470                 remove_lock_pc(r, lkb);
3471                 queue_cast(r, lkb, -DLM_EUNLOCK);
3472                 break;
3473         case -ENOENT:
3474                 break;
3475         default:
3476                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3477                           lkb->lkb_id, ms->m_result);
3478         }
3479  out:
3480         unlock_rsb(r);
3481         put_rsb(r);
3482 }
3483
3484 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3485 {
3486         struct dlm_lkb *lkb;
3487         int error;
3488
3489         error = find_lkb(ls, ms->m_remid, &lkb);
3490         if (error) {
3491                 log_error(ls, "receive_unlock_reply no lkb");
3492                 return;
3493         }
3494         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3495
3496         _receive_unlock_reply(lkb, ms);
3497         dlm_put_lkb(lkb);
3498 }
3499
3500 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3501 {
3502         struct dlm_rsb *r = lkb->lkb_resource;
3503         int error;
3504
3505         hold_rsb(r);
3506         lock_rsb(r);
3507
3508         /* stub reply can happen with waiters_mutex held */
3509         error = remove_from_waiters_ms(lkb, ms);
3510         if (error)
3511                 goto out;
3512
3513         /* this is the value returned from do_cancel() on the master */
3514
3515         switch (ms->m_result) {
3516         case -DLM_ECANCEL:
3517                 receive_flags_reply(lkb, ms);
3518                 revert_lock_pc(r, lkb);
3519                 queue_cast(r, lkb, -DLM_ECANCEL);
3520                 break;
3521         case 0:
3522                 break;
3523         default:
3524                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3525                           lkb->lkb_id, ms->m_result);
3526         }
3527  out:
3528         unlock_rsb(r);
3529         put_rsb(r);
3530 }
3531
3532 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3533 {
3534         struct dlm_lkb *lkb;
3535         int error;
3536
3537         error = find_lkb(ls, ms->m_remid, &lkb);
3538         if (error) {
3539                 log_error(ls, "receive_cancel_reply no lkb");
3540                 return;
3541         }
3542         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3543
3544         _receive_cancel_reply(lkb, ms);
3545         dlm_put_lkb(lkb);
3546 }
3547
3548 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3549 {
3550         struct dlm_lkb *lkb;
3551         struct dlm_rsb *r;
3552         int error, ret_nodeid;
3553
3554         error = find_lkb(ls, ms->m_lkid, &lkb);
3555         if (error) {
3556                 log_error(ls, "receive_lookup_reply no lkb");
3557                 return;
3558         }
3559
3560         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3561            FIXME: will a non-zero error ever be returned? */
3562
3563         r = lkb->lkb_resource;
3564         hold_rsb(r);
3565         lock_rsb(r);
3566
3567         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3568         if (error)
3569                 goto out;
3570
3571         ret_nodeid = ms->m_nodeid;
3572         if (ret_nodeid == dlm_our_nodeid()) {
3573                 r->res_nodeid = 0;
3574                 ret_nodeid = 0;
3575                 r->res_first_lkid = 0;
3576         } else {
3577                 /* set_master() will copy res_nodeid to lkb_nodeid */
3578                 r->res_nodeid = ret_nodeid;
3579         }
3580
3581         if (is_overlap(lkb)) {
3582                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3583                           lkb->lkb_id, lkb->lkb_flags);
3584                 queue_cast_overlap(r, lkb);
3585                 unhold_lkb(lkb); /* undoes create_lkb() */
3586                 goto out_list;
3587         }
3588
3589         _request_lock(r, lkb);
3590
3591  out_list:
3592         if (!ret_nodeid)
3593                 process_lookup_list(r);
3594  out:
3595         unlock_rsb(r);
3596         put_rsb(r);
3597         dlm_put_lkb(lkb);
3598 }
3599
3600 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3601 {
3602         struct dlm_message *ms = (struct dlm_message *) hd;
3603         struct dlm_ls *ls;
3604         int error = 0;
3605
3606         if (!recovery)
3607                 dlm_message_in(ms);
3608
3609         ls = dlm_find_lockspace_global(hd->h_lockspace);
3610         if (!ls) {
3611                 log_print("drop message %d from %d for unknown lockspace %d",
3612                           ms->m_type, nodeid, hd->h_lockspace);
3613                 return -EINVAL;
3614         }
3615
3616         /* recovery may have just ended leaving a bunch of backed-up requests
3617            in the requestqueue; wait while dlm_recoverd clears them */
3618
3619         if (!recovery)
3620                 dlm_wait_requestqueue(ls);
3621
3622         /* recovery may have just started while there were a bunch of
3623            in-flight requests -- save them in requestqueue to be processed
3624            after recovery.  we can't let dlm_recvd block on the recovery
3625            lock.  if dlm_recoverd is calling this function to clear the
3626            requestqueue, it needs to be interrupted (-EINTR) if another
3627            recovery operation is starting. */
3628
3629         while (1) {
3630                 if (dlm_locking_stopped(ls)) {
3631                         if (recovery) {
3632                                 error = -EINTR;
3633                                 goto out;
3634                         }
3635                         error = dlm_add_requestqueue(ls, nodeid, hd);
3636                         if (error == -EAGAIN)
3637                                 continue;
3638                         else {
3639                                 error = -EINTR;
3640                                 goto out;
3641                         }
3642                 }
3643
3644                 if (dlm_lock_recovery_try(ls))
3645                         break;
3646                 schedule();
3647         }
3648
3649         switch (ms->m_type) {
3650
3651         /* messages sent to a master node */
3652
3653         case DLM_MSG_REQUEST:
3654                 receive_request(ls, ms);
3655                 break;
3656
3657         case DLM_MSG_CONVERT:
3658                 receive_convert(ls, ms);
3659                 break;
3660
3661         case DLM_MSG_UNLOCK:
3662                 receive_unlock(ls, ms);
3663                 break;
3664
3665         case DLM_MSG_CANCEL:
3666                 receive_cancel(ls, ms);
3667                 break;
3668
3669         /* messages sent from a master node (replies to above) */
3670
3671         case DLM_MSG_REQUEST_REPLY:
3672                 receive_request_reply(ls, ms);
3673                 break;
3674
3675         case DLM_MSG_CONVERT_REPLY:
3676                 receive_convert_reply(ls, ms);
3677                 break;
3678
3679         case DLM_MSG_UNLOCK_REPLY:
3680                 receive_unlock_reply(ls, ms);
3681                 break;
3682
3683         case DLM_MSG_CANCEL_REPLY:
3684                 receive_cancel_reply(ls, ms);
3685                 break;
3686
3687         /* messages sent from a master node (only two types of async msg) */
3688
3689         case DLM_MSG_GRANT:
3690                 receive_grant(ls, ms);
3691                 break;
3692
3693         case DLM_MSG_BAST:
3694                 receive_bast(ls, ms);
3695                 break;
3696
3697         /* messages sent to a dir node */
3698
3699         case DLM_MSG_LOOKUP:
3700                 receive_lookup(ls, ms);
3701                 break;
3702
3703         case DLM_MSG_REMOVE:
3704                 receive_remove(ls, ms);
3705                 break;
3706
3707         /* messages sent from a dir node (remove has no reply) */
3708
3709         case DLM_MSG_LOOKUP_REPLY:
3710                 receive_lookup_reply(ls, ms);
3711                 break;
3712
3713         /* other messages */
3714
3715         case DLM_MSG_PURGE:
3716                 receive_purge(ls, ms);
3717                 break;
3718
3719         default:
3720                 log_error(ls, "unknown message type %d", ms->m_type);
3721         }
3722
3723         dlm_unlock_recovery(ls);
3724  out:
3725         dlm_put_lockspace(ls);
3726         dlm_astd_wake();
3727         return error;
3728 }
3729
3730
3731 /*
3732  * Recovery related
3733  */
3734
3735 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3736 {
3737         if (middle_conversion(lkb)) {
3738                 hold_lkb(lkb);
3739                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3740                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3741                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3742                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3743
3744                 /* Same special case as in receive_rcom_lock_args() */
3745                 lkb->lkb_grmode = DLM_LOCK_IV;
3746                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3747                 unhold_lkb(lkb);
3748
3749         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3750                 lkb->lkb_flags |= DLM_IFL_RESEND;
3751         }
3752
3753         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3754            conversions are async; there's no reply from the remote master */
3755 }
3756
3757 /* A waiting lkb needs recovery if the master node has failed, or
3758    the master node is changing (only when no directory is used) */
3759
3760 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3761 {
3762         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3763                 return 1;
3764
3765         if (!dlm_no_directory(ls))
3766                 return 0;
3767
3768         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3769                 return 1;
3770
3771         return 0;
3772 }
3773
3774 /* Recovery for locks that are waiting for replies from nodes that are now
3775    gone.  We can just complete unlocks and cancels by faking a reply from the
3776    dead node.  Requests and up-conversions we flag to be resent after
3777    recovery.  Down-conversions can just be completed with a fake reply like
3778    unlocks.  Conversions between PR and CW need special attention. */
3779
3780 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3781 {
3782         struct dlm_lkb *lkb, *safe;
3783
3784         mutex_lock(&ls->ls_waiters_mutex);
3785
3786         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3787                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3788                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3789
3790                 /* all outstanding lookups, regardless of destination  will be
3791                    resent after recovery is done */
3792
3793                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3794                         lkb->lkb_flags |= DLM_IFL_RESEND;
3795                         continue;
3796                 }
3797
3798                 if (!waiter_needs_recovery(ls, lkb))
3799                         continue;
3800
3801                 switch (lkb->lkb_wait_type) {
3802
3803                 case DLM_MSG_REQUEST:
3804                         lkb->lkb_flags |= DLM_IFL_RESEND;
3805                         break;
3806
3807                 case DLM_MSG_CONVERT:
3808                         recover_convert_waiter(ls, lkb);
3809                         break;
3810
3811                 case DLM_MSG_UNLOCK:
3812                         hold_lkb(lkb);
3813                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3814                         ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3815                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3816                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3817                         dlm_put_lkb(lkb);
3818                         break;
3819
3820                 case DLM_MSG_CANCEL:
3821                         hold_lkb(lkb);
3822                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3823                         ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3824                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3825                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3826                         dlm_put_lkb(lkb);
3827                         break;
3828
3829                 default:
3830                         log_error(ls, "invalid lkb wait_type %d",
3831                                   lkb->lkb_wait_type);
3832                 }
3833                 schedule();
3834         }
3835         mutex_unlock(&ls->ls_waiters_mutex);
3836 }
3837
3838 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3839 {
3840         struct dlm_lkb *lkb;
3841         int found = 0;
3842
3843         mutex_lock(&ls->ls_waiters_mutex);
3844         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3845                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3846                         hold_lkb(lkb);
3847                         found = 1;
3848                         break;
3849                 }
3850         }
3851         mutex_unlock(&ls->ls_waiters_mutex);
3852
3853         if (!found)
3854                 lkb = NULL;
3855         return lkb;
3856 }
3857
3858 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3859    master or dir-node for r.  Processing the lkb may result in it being placed
3860    back on waiters. */
3861
3862 /* We do this after normal locking has been enabled and any saved messages
3863    (in requestqueue) have been processed.  We should be confident that at
3864    this point we won't get or process a reply to any of these waiting
3865    operations.  But, new ops may be coming in on the rsbs/locks here from
3866    userspace or remotely. */
3867
3868 /* there may have been an overlap unlock/cancel prior to recovery or after
3869    recovery.  if before, the lkb may still have a pos wait_count; if after, the
3870    overlap flag would just have been set and nothing new sent.  we can be
3871    confident here than any replies to either the initial op or overlap ops
3872    prior to recovery have been received. */
3873
3874 int dlm_recover_waiters_post(struct dlm_ls *ls)
3875 {
3876         struct dlm_lkb *lkb;
3877         struct dlm_rsb *r;
3878         int error = 0, mstype, err, oc, ou;
3879
3880         while (1) {
3881                 if (dlm_locking_stopped(ls)) {
3882                         log_debug(ls, "recover_waiters_post aborted");
3883                         error = -EINTR;
3884                         break;
3885                 }
3886
3887                 lkb = find_resend_waiter(ls);
3888                 if (!lkb)
3889                         break;
3890
3891                 r = lkb->lkb_resource;
3892                 hold_rsb(r);
3893                 lock_rsb(r);
3894
3895                 mstype = lkb->lkb_wait_type;
3896                 oc = is_overlap_cancel(lkb);
3897                 ou = is_overlap_unlock(lkb);
3898                 err = 0;
3899
3900                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3901                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3902
3903                 /* At this point we assume that we won't get a reply to any
3904                    previous op or overlap op on this lock.  First, do a big
3905                    remove_from_waiters() for all previous ops. */
3906
3907                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3908                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3909                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3910                 lkb->lkb_wait_type = 0;
3911                 lkb->lkb_wait_count = 0;
3912                 mutex_lock(&ls->ls_waiters_mutex);
3913                 list_del_init(&lkb->lkb_wait_reply);
3914                 mutex_unlock(&ls->ls_waiters_mutex);
3915                 unhold_lkb(lkb); /* for waiters list */
3916
3917                 if (oc || ou) {
3918                         /* do an unlock or cancel instead of resending */
3919                         switch (mstype) {
3920                         case DLM_MSG_LOOKUP:
3921                         case DLM_MSG_REQUEST:
3922                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3923                                                         -DLM_ECANCEL);
3924                                 unhold_lkb(lkb); /* undoes create_lkb() */
3925                                 break;
3926                         case DLM_MSG_CONVERT:
3927                                 if (oc) {
3928                                         queue_cast(r, lkb, -DLM_ECANCEL);
3929                                 } else {
3930                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3931                                         _unlock_lock(r, lkb);
3932                                 }
3933                                 break;
3934                         default:
3935                                 err = 1;
3936                         }
3937                 } else {
3938                         switch (mstype) {
3939                         case DLM_MSG_LOOKUP:
3940                         case DLM_MSG_REQUEST:
3941                                 _request_lock(r, lkb);
3942                                 if (is_master(r))
3943                                         confirm_master(r, 0);
3944                                 break;
3945                         case DLM_MSG_CONVERT:
3946                                 _convert_lock(r, lkb);
3947                                 break;
3948                         default:
3949                                 err = 1;
3950                         }
3951                 }
3952
3953                 if (err)
3954                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
3955                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3956                 unlock_rsb(r);
3957                 put_rsb(r);
3958                 dlm_put_lkb(lkb);
3959         }
3960
3961         return error;
3962 }
3963
3964 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3965                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3966 {
3967         struct dlm_ls *ls = r->res_ls;
3968         struct dlm_lkb *lkb, *safe;
3969
3970         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3971                 if (test(ls, lkb)) {
3972                         rsb_set_flag(r, RSB_LOCKS_PURGED);
3973                         del_lkb(r, lkb);
3974                         /* this put should free the lkb */
3975                         if (!dlm_put_lkb(lkb))
3976                                 log_error(ls, "purged lkb not released");
3977                 }
3978         }
3979 }
3980
3981 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3982 {
3983         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3984 }
3985
3986 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3987 {
3988         return is_master_copy(lkb);
3989 }
3990
3991 static void purge_dead_locks(struct dlm_rsb *r)
3992 {
3993         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3994         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3995         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3996 }
3997
3998 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3999 {
4000         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4001         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4002         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4003 }
4004
4005 /* Get rid of locks held by nodes that are gone. */
4006
4007 int dlm_purge_locks(struct dlm_ls *ls)
4008 {
4009         struct dlm_rsb *r;
4010
4011         log_debug(ls, "dlm_purge_locks");
4012
4013         down_write(&ls->ls_root_sem);
4014         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4015                 hold_rsb(r);
4016                 lock_rsb(r);
4017                 if (is_master(r))
4018                         purge_dead_locks(r);
4019                 unlock_rsb(r);
4020                 unhold_rsb(r);
4021
4022                 schedule();
4023         }
4024         up_write(&ls->ls_root_sem);
4025
4026         return 0;
4027 }
4028
4029 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4030 {
4031         struct dlm_rsb *r, *r_ret = NULL;
4032
4033         read_lock(&ls->ls_rsbtbl[bucket].lock);
4034         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4035                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4036                         continue;
4037                 hold_rsb(r);
4038                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4039                 r_ret = r;
4040                 break;
4041         }
4042         read_unlock(&ls->ls_rsbtbl[bucket].lock);
4043         return r_ret;
4044 }
4045
4046 void dlm_grant_after_purge(struct dlm_ls *ls)
4047 {
4048         struct dlm_rsb *r;
4049         int bucket = 0;
4050
4051         while (1) {
4052                 r = find_purged_rsb(ls, bucket);
4053                 if (!r) {
4054                         if (bucket == ls->ls_rsbtbl_size - 1)
4055                                 break;
4056                         bucket++;
4057                         continue;
4058                 }
4059                 lock_rsb(r);
4060                 if (is_master(r)) {
4061                         grant_pending_locks(r);
4062                         confirm_master(r, 0);
4063                 }
4064                 unlock_rsb(r);
4065                 put_rsb(r);
4066                 schedule();
4067         }
4068 }
4069
4070 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4071                                          uint32_t remid)
4072 {
4073         struct dlm_lkb *lkb;
4074
4075         list_for_each_entry(lkb, head, lkb_statequeue) {
4076                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4077                         return lkb;
4078         }
4079         return NULL;
4080 }
4081
4082 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4083                                     uint32_t remid)
4084 {
4085         struct dlm_lkb *lkb;
4086
4087         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4088         if (lkb)
4089                 return lkb;
4090         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4091         if (lkb)
4092                 return lkb;
4093         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4094         if (lkb)
4095                 return lkb;
4096         return NULL;
4097 }
4098
4099 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4100                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4101 {
4102         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4103         int lvblen;
4104
4105         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4106         lkb->lkb_ownpid = rl->rl_ownpid;
4107         lkb->lkb_remid = rl->rl_lkid;
4108         lkb->lkb_exflags = rl->rl_exflags;
4109         lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
4110         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4111         lkb->lkb_lvbseq = rl->rl_lvbseq;
4112         lkb->lkb_rqmode = rl->rl_rqmode;
4113         lkb->lkb_grmode = rl->rl_grmode;
4114         /* don't set lkb_status because add_lkb wants to itself */
4115
4116         lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
4117         lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
4118
4119         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4120                 lkb->lkb_lvbptr = allocate_lvb(ls);
4121                 if (!lkb->lkb_lvbptr)
4122                         return -ENOMEM;
4123                 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4124                          sizeof(struct rcom_lock);
4125                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4126         }
4127
4128         /* Conversions between PR and CW (middle modes) need special handling.
4129            The real granted mode of these converting locks cannot be determined
4130            until all locks have been rebuilt on the rsb (recover_conversion) */
4131
4132         if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
4133                 rl->rl_status = DLM_LKSTS_CONVERT;
4134                 lkb->lkb_grmode = DLM_LOCK_IV;
4135                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4136         }
4137
4138         return 0;
4139 }
4140
4141 /* This lkb may have been recovered in a previous aborted recovery so we need
4142    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4143    If so we just send back a standard reply.  If not, we create a new lkb with
4144    the given values and send back our lkid.  We send back our lkid by sending
4145    back the rcom_lock struct we got but with the remid field filled in. */
4146
4147 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4148 {
4149         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4150         struct dlm_rsb *r;
4151         struct dlm_lkb *lkb;
4152         int error;
4153
4154         if (rl->rl_parent_lkid) {
4155                 error = -EOPNOTSUPP;
4156                 goto out;
4157         }
4158
4159         error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
4160         if (error)
4161                 goto out;
4162
4163         lock_rsb(r);
4164
4165         lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
4166         if (lkb) {
4167                 error = -EEXIST;
4168                 goto out_remid;
4169         }
4170
4171         error = create_lkb(ls, &lkb);
4172         if (error)
4173                 goto out_unlock;
4174
4175         error = receive_rcom_lock_args(ls, lkb, r, rc);
4176         if (error) {
4177                 __put_lkb(ls, lkb);
4178                 goto out_unlock;
4179         }
4180
4181         attach_lkb(r, lkb);
4182         add_lkb(r, lkb, rl->rl_status);
4183         error = 0;
4184
4185  out_remid:
4186         /* this is the new value returned to the lock holder for
4187            saving in its process-copy lkb */
4188         rl->rl_remid = lkb->lkb_id;
4189
4190  out_unlock:
4191         unlock_rsb(r);
4192         put_rsb(r);
4193  out:
4194         if (error)
4195                 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
4196         rl->rl_result = error;
4197         return error;
4198 }
4199
4200 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4201 {
4202         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4203         struct dlm_rsb *r;
4204         struct dlm_lkb *lkb;
4205         int error;
4206
4207         error = find_lkb(ls, rl->rl_lkid, &lkb);
4208         if (error) {
4209                 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
4210                 return error;
4211         }
4212
4213         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4214
4215         error = rl->rl_result;
4216
4217         r = lkb->lkb_resource;
4218         hold_rsb(r);
4219         lock_rsb(r);
4220
4221         switch (error) {
4222         case -EBADR:
4223                 /* There's a chance the new master received our lock before
4224                    dlm_recover_master_reply(), this wouldn't happen if we did
4225                    a barrier between recover_masters and recover_locks. */
4226                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4227                           (unsigned long)r, r->res_name);
4228                 dlm_send_rcom_lock(r, lkb);
4229                 goto out;
4230         case -EEXIST:
4231                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4232                 /* fall through */
4233         case 0:
4234                 lkb->lkb_remid = rl->rl_remid;
4235                 break;
4236         default:
4237                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4238                           error, lkb->lkb_id);
4239         }
4240
4241         /* an ack for dlm_recover_locks() which waits for replies from
4242            all the locks it sends to new masters */
4243         dlm_recovered_lock(r);
4244  out:
4245         unlock_rsb(r);
4246         put_rsb(r);
4247         dlm_put_lkb(lkb);
4248
4249         return 0;
4250 }
4251
4252 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4253                      int mode, uint32_t flags, void *name, unsigned int namelen,
4254                      unsigned long timeout_cs)
4255 {
4256         struct dlm_lkb *lkb;
4257         struct dlm_args args;
4258         int error;
4259
4260         dlm_lock_recovery(ls);
4261
4262         error = create_lkb(ls, &lkb);
4263         if (error) {
4264                 kfree(ua);
4265                 goto out;
4266         }
4267
4268         if (flags & DLM_LKF_VALBLK) {
4269                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4270                 if (!ua->lksb.sb_lvbptr) {
4271                         kfree(ua);
4272                         __put_lkb(ls, lkb);
4273                         error = -ENOMEM;
4274                         goto out;
4275                 }
4276         }
4277
4278         /* After ua is attached to lkb it will be freed by free_lkb().
4279            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4280            lock and that lkb_astparam is the dlm_user_args structure. */
4281
4282         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4283                               DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4284         lkb->lkb_flags |= DLM_IFL_USER;
4285         ua->old_mode = DLM_LOCK_IV;
4286
4287         if (error) {
4288                 __put_lkb(ls, lkb);
4289                 goto out;
4290         }
4291
4292         error = request_lock(ls, lkb, name, namelen, &args);
4293
4294         switch (error) {
4295         case 0:
4296                 break;
4297         case -EINPROGRESS:
4298                 error = 0;
4299                 break;
4300         case -EAGAIN:
4301                 error = 0;
4302                 /* fall through */
4303         default:
4304                 __put_lkb(ls, lkb);
4305                 goto out;
4306         }
4307
4308         /* add this new lkb to the per-process list of locks */
4309         spin_lock(&ua->proc->locks_spin);
4310         hold_lkb(lkb);
4311         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4312         spin_unlock(&ua->proc->locks_spin);
4313  out:
4314         dlm_unlock_recovery(ls);
4315         return error;
4316 }
4317
4318 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4319                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4320                      unsigned long timeout_cs)
4321 {
4322         struct dlm_lkb *lkb;
4323         struct dlm_args args;
4324         struct dlm_user_args *ua;
4325         int error;
4326
4327         dlm_lock_recovery(ls);
4328
4329         error = find_lkb(ls, lkid, &lkb);
4330         if (error)
4331                 goto out;
4332
4333         /* user can change the params on its lock when it converts it, or
4334            add an lvb that didn't exist before */
4335
4336         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4337
4338         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4339                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4340                 if (!ua->lksb.sb_lvbptr) {
4341                         error = -ENOMEM;
4342                         goto out_put;
4343                 }
4344         }
4345         if (lvb_in && ua->lksb.sb_lvbptr)
4346                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4347
4348         ua->xid = ua_tmp->xid;
4349         ua->castparam = ua_tmp->castparam;
4350         ua->castaddr = ua_tmp->castaddr;
4351         ua->bastparam = ua_tmp->bastparam;
4352         ua->bastaddr = ua_tmp->bastaddr;
4353         ua->user_lksb = ua_tmp->user_lksb;
4354         ua->old_mode = lkb->lkb_grmode;
4355
4356         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4357                               DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4358         if (error)
4359                 goto out_put;
4360
4361         error = convert_lock(ls, lkb, &args);
4362
4363         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4364                 error = 0;
4365  out_put:
4366         dlm_put_lkb(lkb);
4367  out:
4368         dlm_unlock_recovery(ls);
4369         kfree(ua_tmp);
4370         return error;
4371 }
4372
4373 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4374                     uint32_t flags, uint32_t lkid, char *lvb_in)
4375 {
4376         struct dlm_lkb *lkb;
4377         struct dlm_args args;
4378         struct dlm_user_args *ua;
4379         int error;
4380
4381         dlm_lock_recovery(ls);
4382
4383         error = find_lkb(ls, lkid, &lkb);
4384         if (error)
4385                 goto out;
4386
4387         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4388
4389         if (lvb_in && ua->lksb.sb_lvbptr)
4390                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4391         ua->castparam = ua_tmp->castparam;
4392         ua->user_lksb = ua_tmp->user_lksb;
4393
4394         error = set_unlock_args(flags, ua, &args);
4395         if (error)
4396                 goto out_put;
4397
4398         error = unlock_lock(ls, lkb, &args);
4399
4400         if (error == -DLM_EUNLOCK)
4401                 error = 0;
4402         /* from validate_unlock_args() */
4403         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4404                 error = 0;
4405         if (error)
4406                 goto out_put;
4407
4408         spin_lock(&ua->proc->locks_spin);
4409         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4410         if (!list_empty(&lkb->lkb_ownqueue))
4411                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4412         spin_unlock(&ua->proc->locks_spin);
4413  out_put:
4414         dlm_put_lkb(lkb);
4415  out:
4416         dlm_unlock_recovery(ls);
4417         kfree(ua_tmp);
4418         return error;
4419 }
4420
4421 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4422                     uint32_t flags, uint32_t lkid)
4423 {
4424         struct dlm_lkb *lkb;
4425         struct dlm_args args;
4426         struct dlm_user_args *ua;
4427         int error;
4428
4429         dlm_lock_recovery(ls);
4430
4431         error = find_lkb(ls, lkid, &lkb);
4432         if (error)
4433                 goto out;
4434
4435         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4436         ua->castparam = ua_tmp->castparam;
4437         ua->user_lksb = ua_tmp->user_lksb;
4438
4439         error = set_unlock_args(flags, ua, &args);
4440         if (error)
4441                 goto out_put;
4442
4443         error = cancel_lock(ls, lkb, &args);
4444
4445         if (error == -DLM_ECANCEL)
4446                 error = 0;
4447         /* from validate_unlock_args() */
4448         if (error == -EBUSY)
4449                 error = 0;
4450  out_put:
4451         dlm_put_lkb(lkb);
4452  out:
4453         dlm_unlock_recovery(ls);
4454         kfree(ua_tmp);
4455         return error;
4456 }
4457
4458 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4459 {
4460         struct dlm_lkb *lkb;
4461         struct dlm_args args;
4462         struct dlm_user_args *ua;
4463         struct dlm_rsb *r;
4464         int error;
4465
4466         dlm_lock_recovery(ls);
4467
4468         error = find_lkb(ls, lkid, &lkb);
4469         if (error)
4470                 goto out;
4471
4472         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4473
4474         error = set_unlock_args(flags, ua, &args);
4475         if (error)
4476                 goto out_put;
4477
4478         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4479
4480         r = lkb->lkb_resource;
4481         hold_rsb(r);
4482         lock_rsb(r);
4483
4484         error = validate_unlock_args(lkb, &args);
4485         if (error)
4486                 goto out_r;
4487         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4488
4489         error = _cancel_lock(r, lkb);
4490  out_r:
4491         unlock_rsb(r);
4492         put_rsb(r);
4493
4494         if (error == -DLM_ECANCEL)
4495                 error = 0;
4496         /* from validate_unlock_args() */
4497         if (error == -EBUSY)
4498                 error = 0;
4499  out_put:
4500         dlm_put_lkb(lkb);
4501  out:
4502         dlm_unlock_recovery(ls);
4503         return error;
4504 }
4505
4506 /* lkb's that are removed from the waiters list by revert are just left on the
4507    orphans list with the granted orphan locks, to be freed by purge */
4508
4509 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4510 {
4511         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4512         struct dlm_args args;
4513         int error;
4514
4515         hold_lkb(lkb);
4516         mutex_lock(&ls->ls_orphans_mutex);
4517         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4518         mutex_unlock(&ls->ls_orphans_mutex);
4519
4520         set_unlock_args(0, ua, &args);
4521
4522         error = cancel_lock(ls, lkb, &args);
4523         if (error == -DLM_ECANCEL)
4524                 error = 0;
4525         return error;
4526 }
4527
4528 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4529    Regardless of what rsb queue the lock is on, it's removed and freed. */
4530
4531 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4532 {
4533         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4534         struct dlm_args args;
4535         int error;
4536
4537         set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4538
4539         error = unlock_lock(ls, lkb, &args);
4540         if (error == -DLM_EUNLOCK)
4541                 error = 0;
4542         return error;
4543 }
4544
4545 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4546    (which does lock_rsb) due to deadlock with receiving a message that does
4547    lock_rsb followed by dlm_user_add_ast() */
4548
4549 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4550                                      struct dlm_user_proc *proc)
4551 {
4552         struct dlm_lkb *lkb = NULL;
4553
4554         mutex_lock(&ls->ls_clear_proc_locks);
4555         if (list_empty(&proc->locks))
4556                 goto out;
4557
4558         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4559         list_del_init(&lkb->lkb_ownqueue);
4560
4561         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4562                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4563         else
4564                 lkb->lkb_flags |= DLM_IFL_DEAD;
4565  out:
4566         mutex_unlock(&ls->ls_clear_proc_locks);
4567         return lkb;
4568 }
4569
4570 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4571    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4572    which we clear here. */
4573
4574 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4575    list, and no more device_writes should add lkb's to proc->locks list; so we
4576    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4577    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4578    them ourself. */
4579
4580 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4581 {
4582         struct dlm_lkb *lkb, *safe;
4583
4584         dlm_lock_recovery(ls);
4585
4586         while (1) {
4587                 lkb = del_proc_lock(ls, proc);
4588                 if (!lkb)
4589                         break;
4590                 del_timeout(lkb);
4591                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4592                         orphan_proc_lock(ls, lkb);
4593                 else
4594                         unlock_proc_lock(ls, lkb);
4595
4596                 /* this removes the reference for the proc->locks list
4597                    added by dlm_user_request, it may result in the lkb
4598                    being freed */
4599
4600                 dlm_put_lkb(lkb);
4601         }
4602
4603         mutex_lock(&ls->ls_clear_proc_locks);
4604
4605         /* in-progress unlocks */
4606         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4607                 list_del_init(&lkb->lkb_ownqueue);
4608                 lkb->lkb_flags |= DLM_IFL_DEAD;
4609                 dlm_put_lkb(lkb);
4610         }
4611
4612         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4613                 list_del(&lkb->lkb_astqueue);
4614                 dlm_put_lkb(lkb);
4615         }
4616
4617         mutex_unlock(&ls->ls_clear_proc_locks);
4618         dlm_unlock_recovery(ls);
4619 }
4620
4621 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4622 {
4623         struct dlm_lkb *lkb, *safe;
4624
4625         while (1) {
4626                 lkb = NULL;
4627                 spin_lock(&proc->locks_spin);
4628                 if (!list_empty(&proc->locks)) {
4629                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4630                                          lkb_ownqueue);
4631                         list_del_init(&lkb->lkb_ownqueue);
4632                 }
4633                 spin_unlock(&proc->locks_spin);
4634
4635                 if (!lkb)
4636                         break;
4637
4638                 lkb->lkb_flags |= DLM_IFL_DEAD;
4639                 unlock_proc_lock(ls, lkb);
4640                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4641         }
4642
4643         spin_lock(&proc->locks_spin);
4644         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4645                 list_del_init(&lkb->lkb_ownqueue);
4646                 lkb->lkb_flags |= DLM_IFL_DEAD;
4647                 dlm_put_lkb(lkb);
4648         }
4649         spin_unlock(&proc->locks_spin);
4650
4651         spin_lock(&proc->asts_spin);
4652         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4653                 list_del(&lkb->lkb_astqueue);
4654                 dlm_put_lkb(lkb);
4655         }
4656         spin_unlock(&proc->asts_spin);
4657 }
4658
4659 /* pid of 0 means purge all orphans */
4660
4661 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4662 {
4663         struct dlm_lkb *lkb, *safe;
4664
4665         mutex_lock(&ls->ls_orphans_mutex);
4666         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4667                 if (pid && lkb->lkb_ownpid != pid)
4668                         continue;
4669                 unlock_proc_lock(ls, lkb);
4670                 list_del_init(&lkb->lkb_ownqueue);
4671                 dlm_put_lkb(lkb);
4672         }
4673         mutex_unlock(&ls->ls_orphans_mutex);
4674 }
4675
4676 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4677 {
4678         struct dlm_message *ms;
4679         struct dlm_mhandle *mh;
4680         int error;
4681
4682         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4683                                 DLM_MSG_PURGE, &ms, &mh);
4684         if (error)
4685                 return error;
4686         ms->m_nodeid = nodeid;
4687         ms->m_pid = pid;
4688
4689         return send_message(mh, ms);
4690 }
4691
4692 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4693                    int nodeid, int pid)
4694 {
4695         int error = 0;
4696
4697         if (nodeid != dlm_our_nodeid()) {
4698                 error = send_purge(ls, nodeid, pid);
4699         } else {
4700                 dlm_lock_recovery(ls);
4701                 if (pid == current->pid)
4702                         purge_proc_locks(ls, proc);
4703                 else
4704                         do_purge(ls, nodeid, pid);
4705                 dlm_unlock_recovery(ls);
4706         }
4707         return error;
4708 }
4709