Merge git://git.kernel.org/pub/scm/linux/kernel/git/steve/gfs2-2.6-nmw
[linux-2.6] / fs / gfs2 / locking / dlm / lock.c
1 /*
2  * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
3  * Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
4  *
5  * This copyrighted material is made available to anyone wishing to use,
6  * modify, copy, or redistribute it subject to the terms and conditions
7  * of the GNU General Public License version 2.
8  */
9
10 #include "lock_dlm.h"
11
12 static char junk_lvb[GDLM_LVB_SIZE];
13
14 static void queue_complete(struct gdlm_lock *lp)
15 {
16         struct gdlm_ls *ls = lp->ls;
17
18         clear_bit(LFL_ACTIVE, &lp->flags);
19
20         spin_lock(&ls->async_lock);
21         list_add_tail(&lp->clist, &ls->complete);
22         spin_unlock(&ls->async_lock);
23         wake_up(&ls->thread_wait);
24 }
25
26 static inline void gdlm_ast(void *astarg)
27 {
28         queue_complete(astarg);
29 }
30
31 static inline void gdlm_bast(void *astarg, int mode)
32 {
33         struct gdlm_lock *lp = astarg;
34         struct gdlm_ls *ls = lp->ls;
35
36         if (!mode) {
37                 printk(KERN_INFO "lock_dlm: bast mode zero %x,%llx\n",
38                         lp->lockname.ln_type,
39                         (unsigned long long)lp->lockname.ln_number);
40                 return;
41         }
42
43         spin_lock(&ls->async_lock);
44         if (!lp->bast_mode) {
45                 list_add_tail(&lp->blist, &ls->blocking);
46                 lp->bast_mode = mode;
47         } else if (lp->bast_mode < mode)
48                 lp->bast_mode = mode;
49         spin_unlock(&ls->async_lock);
50         wake_up(&ls->thread_wait);
51 }
52
53 void gdlm_queue_delayed(struct gdlm_lock *lp)
54 {
55         struct gdlm_ls *ls = lp->ls;
56
57         spin_lock(&ls->async_lock);
58         list_add_tail(&lp->delay_list, &ls->delayed);
59         spin_unlock(&ls->async_lock);
60 }
61
62 /* convert gfs lock-state to dlm lock-mode */
63
64 static s16 make_mode(s16 lmstate)
65 {
66         switch (lmstate) {
67         case LM_ST_UNLOCKED:
68                 return DLM_LOCK_NL;
69         case LM_ST_EXCLUSIVE:
70                 return DLM_LOCK_EX;
71         case LM_ST_DEFERRED:
72                 return DLM_LOCK_CW;
73         case LM_ST_SHARED:
74                 return DLM_LOCK_PR;
75         }
76         gdlm_assert(0, "unknown LM state %d", lmstate);
77         return -1;
78 }
79
80 /* convert dlm lock-mode to gfs lock-state */
81
82 s16 gdlm_make_lmstate(s16 dlmmode)
83 {
84         switch (dlmmode) {
85         case DLM_LOCK_IV:
86         case DLM_LOCK_NL:
87                 return LM_ST_UNLOCKED;
88         case DLM_LOCK_EX:
89                 return LM_ST_EXCLUSIVE;
90         case DLM_LOCK_CW:
91                 return LM_ST_DEFERRED;
92         case DLM_LOCK_PR:
93                 return LM_ST_SHARED;
94         }
95         gdlm_assert(0, "unknown DLM mode %d", dlmmode);
96         return -1;
97 }
98
99 /* verify agreement with GFS on the current lock state, NB: DLM_LOCK_NL and
100    DLM_LOCK_IV are both considered LM_ST_UNLOCKED by GFS. */
101
102 static void check_cur_state(struct gdlm_lock *lp, unsigned int cur_state)
103 {
104         s16 cur = make_mode(cur_state);
105         if (lp->cur != DLM_LOCK_IV)
106                 gdlm_assert(lp->cur == cur, "%d, %d", lp->cur, cur);
107 }
108
109 static inline unsigned int make_flags(struct gdlm_lock *lp,
110                                       unsigned int gfs_flags,
111                                       s16 cur, s16 req)
112 {
113         unsigned int lkf = 0;
114
115         if (gfs_flags & LM_FLAG_TRY)
116                 lkf |= DLM_LKF_NOQUEUE;
117
118         if (gfs_flags & LM_FLAG_TRY_1CB) {
119                 lkf |= DLM_LKF_NOQUEUE;
120                 lkf |= DLM_LKF_NOQUEUEBAST;
121         }
122
123         if (gfs_flags & LM_FLAG_PRIORITY) {
124                 lkf |= DLM_LKF_NOORDER;
125                 lkf |= DLM_LKF_HEADQUE;
126         }
127
128         if (gfs_flags & LM_FLAG_ANY) {
129                 if (req == DLM_LOCK_PR)
130                         lkf |= DLM_LKF_ALTCW;
131                 else if (req == DLM_LOCK_CW)
132                         lkf |= DLM_LKF_ALTPR;
133         }
134
135         if (lp->lksb.sb_lkid != 0) {
136                 lkf |= DLM_LKF_CONVERT;
137
138                 /* Conversion deadlock avoidance by DLM */
139
140                 if (!(lp->ls->fsflags & LM_MFLAG_CONV_NODROP) &&
141                     !test_bit(LFL_FORCE_PROMOTE, &lp->flags) &&
142                     !(lkf & DLM_LKF_NOQUEUE) &&
143                     cur > DLM_LOCK_NL && req > DLM_LOCK_NL && cur != req)
144                         lkf |= DLM_LKF_CONVDEADLK;
145         }
146
147         if (lp->lvb)
148                 lkf |= DLM_LKF_VALBLK;
149
150         return lkf;
151 }
152
153 /* make_strname - convert GFS lock numbers to a string */
154
155 static inline void make_strname(const struct lm_lockname *lockname,
156                                 struct gdlm_strname *str)
157 {
158         sprintf(str->name, "%8x%16llx", lockname->ln_type,
159                 (unsigned long long)lockname->ln_number);
160         str->namelen = GDLM_STRNAME_BYTES;
161 }
162
163 static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name,
164                           struct gdlm_lock **lpp)
165 {
166         struct gdlm_lock *lp;
167
168         lp = kzalloc(sizeof(struct gdlm_lock), GFP_NOFS);
169         if (!lp)
170                 return -ENOMEM;
171
172         lp->lockname = *name;
173         make_strname(name, &lp->strname);
174         lp->ls = ls;
175         lp->cur = DLM_LOCK_IV;
176         lp->lvb = NULL;
177         lp->hold_null = NULL;
178         INIT_LIST_HEAD(&lp->clist);
179         INIT_LIST_HEAD(&lp->blist);
180         INIT_LIST_HEAD(&lp->delay_list);
181
182         spin_lock(&ls->async_lock);
183         list_add(&lp->all_list, &ls->all_locks);
184         ls->all_locks_count++;
185         spin_unlock(&ls->async_lock);
186
187         *lpp = lp;
188         return 0;
189 }
190
191 void gdlm_delete_lp(struct gdlm_lock *lp)
192 {
193         struct gdlm_ls *ls = lp->ls;
194
195         spin_lock(&ls->async_lock);
196         if (!list_empty(&lp->clist))
197                 list_del_init(&lp->clist);
198         if (!list_empty(&lp->blist))
199                 list_del_init(&lp->blist);
200         if (!list_empty(&lp->delay_list))
201                 list_del_init(&lp->delay_list);
202         gdlm_assert(!list_empty(&lp->all_list), "%x,%llx", lp->lockname.ln_type,
203                     (unsigned long long)lp->lockname.ln_number);
204         list_del_init(&lp->all_list);
205         ls->all_locks_count--;
206         spin_unlock(&ls->async_lock);
207
208         kfree(lp);
209 }
210
211 int gdlm_get_lock(void *lockspace, struct lm_lockname *name,
212                   void **lockp)
213 {
214         struct gdlm_lock *lp;
215         int error;
216
217         error = gdlm_create_lp(lockspace, name, &lp);
218
219         *lockp = lp;
220         return error;
221 }
222
223 void gdlm_put_lock(void *lock)
224 {
225         gdlm_delete_lp(lock);
226 }
227
228 unsigned int gdlm_do_lock(struct gdlm_lock *lp)
229 {
230         struct gdlm_ls *ls = lp->ls;
231         int error, bast = 1;
232
233         /*
234          * When recovery is in progress, delay lock requests for submission
235          * once recovery is done.  Requests for recovery (NOEXP) and unlocks
236          * can pass.
237          */
238
239         if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
240             !test_bit(LFL_NOBLOCK, &lp->flags) && lp->req != DLM_LOCK_NL) {
241                 gdlm_queue_delayed(lp);
242                 return LM_OUT_ASYNC;
243         }
244
245         /*
246          * Submit the actual lock request.
247          */
248
249         if (test_bit(LFL_NOBAST, &lp->flags))
250                 bast = 0;
251
252         set_bit(LFL_ACTIVE, &lp->flags);
253
254         log_debug("lk %x,%llx id %x %d,%d %x", lp->lockname.ln_type,
255                   (unsigned long long)lp->lockname.ln_number, lp->lksb.sb_lkid,
256                   lp->cur, lp->req, lp->lkf);
257
258         error = dlm_lock(ls->dlm_lockspace, lp->req, &lp->lksb, lp->lkf,
259                          lp->strname.name, lp->strname.namelen, 0, gdlm_ast,
260                          lp, bast ? gdlm_bast : NULL);
261
262         if ((error == -EAGAIN) && (lp->lkf & DLM_LKF_NOQUEUE)) {
263                 lp->lksb.sb_status = -EAGAIN;
264                 queue_complete(lp);
265                 error = 0;
266         }
267
268         if (error) {
269                 log_error("%s: gdlm_lock %x,%llx err=%d cur=%d req=%d lkf=%x "
270                           "flags=%lx", ls->fsname, lp->lockname.ln_type,
271                           (unsigned long long)lp->lockname.ln_number, error,
272                           lp->cur, lp->req, lp->lkf, lp->flags);
273                 return LM_OUT_ERROR;
274         }
275         return LM_OUT_ASYNC;
276 }
277
278 static unsigned int gdlm_do_unlock(struct gdlm_lock *lp)
279 {
280         struct gdlm_ls *ls = lp->ls;
281         unsigned int lkf = 0;
282         int error;
283
284         set_bit(LFL_DLM_UNLOCK, &lp->flags);
285         set_bit(LFL_ACTIVE, &lp->flags);
286
287         if (lp->lvb)
288                 lkf = DLM_LKF_VALBLK;
289
290         log_debug("un %x,%llx %x %d %x", lp->lockname.ln_type,
291                   (unsigned long long)lp->lockname.ln_number,
292                   lp->lksb.sb_lkid, lp->cur, lkf);
293
294         error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, lkf, NULL, lp);
295
296         if (error) {
297                 log_error("%s: gdlm_unlock %x,%llx err=%d cur=%d req=%d lkf=%x "
298                           "flags=%lx", ls->fsname, lp->lockname.ln_type,
299                           (unsigned long long)lp->lockname.ln_number, error,
300                           lp->cur, lp->req, lp->lkf, lp->flags);
301                 return LM_OUT_ERROR;
302         }
303         return LM_OUT_ASYNC;
304 }
305
306 unsigned int gdlm_lock(void *lock, unsigned int cur_state,
307                        unsigned int req_state, unsigned int flags)
308 {
309         struct gdlm_lock *lp = lock;
310
311         clear_bit(LFL_DLM_CANCEL, &lp->flags);
312         if (flags & LM_FLAG_NOEXP)
313                 set_bit(LFL_NOBLOCK, &lp->flags);
314
315         check_cur_state(lp, cur_state);
316         lp->req = make_mode(req_state);
317         lp->lkf = make_flags(lp, flags, lp->cur, lp->req);
318
319         return gdlm_do_lock(lp);
320 }
321
322 unsigned int gdlm_unlock(void *lock, unsigned int cur_state)
323 {
324         struct gdlm_lock *lp = lock;
325
326         clear_bit(LFL_DLM_CANCEL, &lp->flags);
327         if (lp->cur == DLM_LOCK_IV)
328                 return 0;
329         return gdlm_do_unlock(lp);
330 }
331
332 void gdlm_cancel(void *lock)
333 {
334         struct gdlm_lock *lp = lock;
335         struct gdlm_ls *ls = lp->ls;
336         int error, delay_list = 0;
337
338         if (test_bit(LFL_DLM_CANCEL, &lp->flags))
339                 return;
340
341         log_info("gdlm_cancel %x,%llx flags %lx", lp->lockname.ln_type,
342                  (unsigned long long)lp->lockname.ln_number, lp->flags);
343
344         spin_lock(&ls->async_lock);
345         if (!list_empty(&lp->delay_list)) {
346                 list_del_init(&lp->delay_list);
347                 delay_list = 1;
348         }
349         spin_unlock(&ls->async_lock);
350
351         if (delay_list) {
352                 set_bit(LFL_CANCEL, &lp->flags);
353                 set_bit(LFL_ACTIVE, &lp->flags);
354                 queue_complete(lp);
355                 return;
356         }
357
358         if (!test_bit(LFL_ACTIVE, &lp->flags) ||
359             test_bit(LFL_DLM_UNLOCK, &lp->flags)) {
360                 log_info("gdlm_cancel skip %x,%llx flags %lx",
361                          lp->lockname.ln_type,
362                          (unsigned long long)lp->lockname.ln_number, lp->flags);
363                 return;
364         }
365
366         /* the lock is blocked in the dlm */
367
368         set_bit(LFL_DLM_CANCEL, &lp->flags);
369         set_bit(LFL_ACTIVE, &lp->flags);
370
371         error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, DLM_LKF_CANCEL,
372                            NULL, lp);
373
374         log_info("gdlm_cancel rv %d %x,%llx flags %lx", error,
375                  lp->lockname.ln_type,
376                  (unsigned long long)lp->lockname.ln_number, lp->flags);
377
378         if (error == -EBUSY)
379                 clear_bit(LFL_DLM_CANCEL, &lp->flags);
380 }
381
382 static int gdlm_add_lvb(struct gdlm_lock *lp)
383 {
384         char *lvb;
385
386         lvb = kzalloc(GDLM_LVB_SIZE, GFP_NOFS);
387         if (!lvb)
388                 return -ENOMEM;
389
390         lp->lksb.sb_lvbptr = lvb;
391         lp->lvb = lvb;
392         return 0;
393 }
394
395 static void gdlm_del_lvb(struct gdlm_lock *lp)
396 {
397         kfree(lp->lvb);
398         lp->lvb = NULL;
399         lp->lksb.sb_lvbptr = NULL;
400 }
401
402 static int gdlm_ast_wait(void *word)
403 {
404         schedule();
405         return 0;
406 }
407
408 /* This can do a synchronous dlm request (requiring a lock_dlm thread to get
409    the completion) because gfs won't call hold_lvb() during a callback (from
410    the context of a lock_dlm thread). */
411
412 static int hold_null_lock(struct gdlm_lock *lp)
413 {
414         struct gdlm_lock *lpn = NULL;
415         int error;
416
417         if (lp->hold_null) {
418                 printk(KERN_INFO "lock_dlm: lvb already held\n");
419                 return 0;
420         }
421
422         error = gdlm_create_lp(lp->ls, &lp->lockname, &lpn);
423         if (error)
424                 goto out;
425
426         lpn->lksb.sb_lvbptr = junk_lvb;
427         lpn->lvb = junk_lvb;
428
429         lpn->req = DLM_LOCK_NL;
430         lpn->lkf = DLM_LKF_VALBLK | DLM_LKF_EXPEDITE;
431         set_bit(LFL_NOBAST, &lpn->flags);
432         set_bit(LFL_INLOCK, &lpn->flags);
433         set_bit(LFL_AST_WAIT, &lpn->flags);
434
435         gdlm_do_lock(lpn);
436         wait_on_bit(&lpn->flags, LFL_AST_WAIT, gdlm_ast_wait, TASK_UNINTERRUPTIBLE);
437         error = lpn->lksb.sb_status;
438         if (error) {
439                 printk(KERN_INFO "lock_dlm: hold_null_lock dlm error %d\n",
440                        error);
441                 gdlm_delete_lp(lpn);
442                 lpn = NULL;
443         }
444 out:
445         lp->hold_null = lpn;
446         return error;
447 }
448
449 /* This cannot do a synchronous dlm request (requiring a lock_dlm thread to get
450    the completion) because gfs may call unhold_lvb() during a callback (from
451    the context of a lock_dlm thread) which could cause a deadlock since the
452    other lock_dlm thread could be engaged in recovery. */
453
454 static void unhold_null_lock(struct gdlm_lock *lp)
455 {
456         struct gdlm_lock *lpn = lp->hold_null;
457
458         gdlm_assert(lpn, "%x,%llx", lp->lockname.ln_type,
459                     (unsigned long long)lp->lockname.ln_number);
460         lpn->lksb.sb_lvbptr = NULL;
461         lpn->lvb = NULL;
462         set_bit(LFL_UNLOCK_DELETE, &lpn->flags);
463         gdlm_do_unlock(lpn);
464         lp->hold_null = NULL;
465 }
466
467 /* Acquire a NL lock because gfs requires the value block to remain
468    intact on the resource while the lvb is "held" even if it's holding no locks
469    on the resource. */
470
471 int gdlm_hold_lvb(void *lock, char **lvbp)
472 {
473         struct gdlm_lock *lp = lock;
474         int error;
475
476         error = gdlm_add_lvb(lp);
477         if (error)
478                 return error;
479
480         *lvbp = lp->lvb;
481
482         error = hold_null_lock(lp);
483         if (error)
484                 gdlm_del_lvb(lp);
485
486         return error;
487 }
488
489 void gdlm_unhold_lvb(void *lock, char *lvb)
490 {
491         struct gdlm_lock *lp = lock;
492
493         unhold_null_lock(lp);
494         gdlm_del_lvb(lp);
495 }
496
497 void gdlm_submit_delayed(struct gdlm_ls *ls)
498 {
499         struct gdlm_lock *lp, *safe;
500
501         spin_lock(&ls->async_lock);
502         list_for_each_entry_safe(lp, safe, &ls->delayed, delay_list) {
503                 list_del_init(&lp->delay_list);
504                 list_add_tail(&lp->delay_list, &ls->submit);
505         }
506         spin_unlock(&ls->async_lock);
507         wake_up(&ls->thread_wait);
508 }
509
510 int gdlm_release_all_locks(struct gdlm_ls *ls)
511 {
512         struct gdlm_lock *lp, *safe;
513         int count = 0;
514
515         spin_lock(&ls->async_lock);
516         list_for_each_entry_safe(lp, safe, &ls->all_locks, all_list) {
517                 list_del_init(&lp->all_list);
518
519                 if (lp->lvb && lp->lvb != junk_lvb)
520                         kfree(lp->lvb);
521                 kfree(lp);
522                 count++;
523         }
524         spin_unlock(&ls->async_lock);
525
526         return count;
527 }
528