Merge branch 'release' of git://git.kernel.org/pub/scm/linux/kernel/git/aegl/linux-2.6
[linux-2.6] / fs / gfs2 / locking / dlm / lock.c
1 /*
2  * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
3  * Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
4  *
5  * This copyrighted material is made available to anyone wishing to use,
6  * modify, copy, or redistribute it subject to the terms and conditions
7  * of the GNU General Public License version 2.
8  */
9
10 #include "lock_dlm.h"
11
12 static char junk_lvb[GDLM_LVB_SIZE];
13
14 static void queue_complete(struct gdlm_lock *lp)
15 {
16         struct gdlm_ls *ls = lp->ls;
17
18         clear_bit(LFL_ACTIVE, &lp->flags);
19
20         spin_lock(&ls->async_lock);
21         list_add_tail(&lp->clist, &ls->complete);
22         spin_unlock(&ls->async_lock);
23         wake_up(&ls->thread_wait);
24 }
25
26 static inline void gdlm_ast(void *astarg)
27 {
28         queue_complete(astarg);
29 }
30
31 static inline void gdlm_bast(void *astarg, int mode)
32 {
33         struct gdlm_lock *lp = astarg;
34         struct gdlm_ls *ls = lp->ls;
35
36         if (!mode) {
37                 printk(KERN_INFO "lock_dlm: bast mode zero %x,%llx\n",
38                         lp->lockname.ln_type,
39                         (unsigned long long)lp->lockname.ln_number);
40                 return;
41         }
42
43         spin_lock(&ls->async_lock);
44         if (!lp->bast_mode) {
45                 list_add_tail(&lp->blist, &ls->blocking);
46                 lp->bast_mode = mode;
47         } else if (lp->bast_mode < mode)
48                 lp->bast_mode = mode;
49         spin_unlock(&ls->async_lock);
50         wake_up(&ls->thread_wait);
51 }
52
53 void gdlm_queue_delayed(struct gdlm_lock *lp)
54 {
55         struct gdlm_ls *ls = lp->ls;
56
57         spin_lock(&ls->async_lock);
58         list_add_tail(&lp->delay_list, &ls->delayed);
59         spin_unlock(&ls->async_lock);
60 }
61
62 /* convert gfs lock-state to dlm lock-mode */
63
64 static s16 make_mode(s16 lmstate)
65 {
66         switch (lmstate) {
67         case LM_ST_UNLOCKED:
68                 return DLM_LOCK_NL;
69         case LM_ST_EXCLUSIVE:
70                 return DLM_LOCK_EX;
71         case LM_ST_DEFERRED:
72                 return DLM_LOCK_CW;
73         case LM_ST_SHARED:
74                 return DLM_LOCK_PR;
75         }
76         gdlm_assert(0, "unknown LM state %d", lmstate);
77         return -1;
78 }
79
80 /* convert dlm lock-mode to gfs lock-state */
81
82 s16 gdlm_make_lmstate(s16 dlmmode)
83 {
84         switch (dlmmode) {
85         case DLM_LOCK_IV:
86         case DLM_LOCK_NL:
87                 return LM_ST_UNLOCKED;
88         case DLM_LOCK_EX:
89                 return LM_ST_EXCLUSIVE;
90         case DLM_LOCK_CW:
91                 return LM_ST_DEFERRED;
92         case DLM_LOCK_PR:
93                 return LM_ST_SHARED;
94         }
95         gdlm_assert(0, "unknown DLM mode %d", dlmmode);
96         return -1;
97 }
98
99 /* verify agreement with GFS on the current lock state, NB: DLM_LOCK_NL and
100    DLM_LOCK_IV are both considered LM_ST_UNLOCKED by GFS. */
101
102 static void check_cur_state(struct gdlm_lock *lp, unsigned int cur_state)
103 {
104         s16 cur = make_mode(cur_state);
105         if (lp->cur != DLM_LOCK_IV)
106                 gdlm_assert(lp->cur == cur, "%d, %d", lp->cur, cur);
107 }
108
109 static inline unsigned int make_flags(struct gdlm_lock *lp,
110                                       unsigned int gfs_flags,
111                                       s16 cur, s16 req)
112 {
113         unsigned int lkf = 0;
114
115         if (gfs_flags & LM_FLAG_TRY)
116                 lkf |= DLM_LKF_NOQUEUE;
117
118         if (gfs_flags & LM_FLAG_TRY_1CB) {
119                 lkf |= DLM_LKF_NOQUEUE;
120                 lkf |= DLM_LKF_NOQUEUEBAST;
121         }
122
123         if (gfs_flags & LM_FLAG_PRIORITY) {
124                 lkf |= DLM_LKF_NOORDER;
125                 lkf |= DLM_LKF_HEADQUE;
126         }
127
128         if (gfs_flags & LM_FLAG_ANY) {
129                 if (req == DLM_LOCK_PR)
130                         lkf |= DLM_LKF_ALTCW;
131                 else if (req == DLM_LOCK_CW)
132                         lkf |= DLM_LKF_ALTPR;
133         }
134
135         if (lp->lksb.sb_lkid != 0) {
136                 lkf |= DLM_LKF_CONVERT;
137
138                 /* Conversion deadlock avoidance by DLM */
139
140                 if (!test_bit(LFL_FORCE_PROMOTE, &lp->flags) &&
141                     !(lkf & DLM_LKF_NOQUEUE) &&
142                     cur > DLM_LOCK_NL && req > DLM_LOCK_NL && cur != req)
143                         lkf |= DLM_LKF_CONVDEADLK;
144         }
145
146         if (lp->lvb)
147                 lkf |= DLM_LKF_VALBLK;
148
149         return lkf;
150 }
151
152 /* make_strname - convert GFS lock numbers to a string */
153
154 static inline void make_strname(const struct lm_lockname *lockname,
155                                 struct gdlm_strname *str)
156 {
157         sprintf(str->name, "%8x%16llx", lockname->ln_type,
158                 (unsigned long long)lockname->ln_number);
159         str->namelen = GDLM_STRNAME_BYTES;
160 }
161
162 static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name,
163                           struct gdlm_lock **lpp)
164 {
165         struct gdlm_lock *lp;
166
167         lp = kzalloc(sizeof(struct gdlm_lock), GFP_KERNEL);
168         if (!lp)
169                 return -ENOMEM;
170
171         lp->lockname = *name;
172         make_strname(name, &lp->strname);
173         lp->ls = ls;
174         lp->cur = DLM_LOCK_IV;
175         lp->lvb = NULL;
176         lp->hold_null = NULL;
177         INIT_LIST_HEAD(&lp->clist);
178         INIT_LIST_HEAD(&lp->blist);
179         INIT_LIST_HEAD(&lp->delay_list);
180
181         spin_lock(&ls->async_lock);
182         list_add(&lp->all_list, &ls->all_locks);
183         ls->all_locks_count++;
184         spin_unlock(&ls->async_lock);
185
186         *lpp = lp;
187         return 0;
188 }
189
190 void gdlm_delete_lp(struct gdlm_lock *lp)
191 {
192         struct gdlm_ls *ls = lp->ls;
193
194         spin_lock(&ls->async_lock);
195         if (!list_empty(&lp->clist))
196                 list_del_init(&lp->clist);
197         if (!list_empty(&lp->blist))
198                 list_del_init(&lp->blist);
199         if (!list_empty(&lp->delay_list))
200                 list_del_init(&lp->delay_list);
201         gdlm_assert(!list_empty(&lp->all_list), "%x,%llx", lp->lockname.ln_type,
202                     (unsigned long long)lp->lockname.ln_number);
203         list_del_init(&lp->all_list);
204         ls->all_locks_count--;
205         spin_unlock(&ls->async_lock);
206
207         kfree(lp);
208 }
209
210 int gdlm_get_lock(void *lockspace, struct lm_lockname *name,
211                   void **lockp)
212 {
213         struct gdlm_lock *lp;
214         int error;
215
216         error = gdlm_create_lp(lockspace, name, &lp);
217
218         *lockp = lp;
219         return error;
220 }
221
222 void gdlm_put_lock(void *lock)
223 {
224         gdlm_delete_lp(lock);
225 }
226
227 unsigned int gdlm_do_lock(struct gdlm_lock *lp)
228 {
229         struct gdlm_ls *ls = lp->ls;
230         int error, bast = 1;
231
232         /*
233          * When recovery is in progress, delay lock requests for submission
234          * once recovery is done.  Requests for recovery (NOEXP) and unlocks
235          * can pass.
236          */
237
238         if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
239             !test_bit(LFL_NOBLOCK, &lp->flags) && lp->req != DLM_LOCK_NL) {
240                 gdlm_queue_delayed(lp);
241                 return LM_OUT_ASYNC;
242         }
243
244         /*
245          * Submit the actual lock request.
246          */
247
248         if (test_bit(LFL_NOBAST, &lp->flags))
249                 bast = 0;
250
251         set_bit(LFL_ACTIVE, &lp->flags);
252
253         log_debug("lk %x,%llx id %x %d,%d %x", lp->lockname.ln_type,
254                   (unsigned long long)lp->lockname.ln_number, lp->lksb.sb_lkid,
255                   lp->cur, lp->req, lp->lkf);
256
257         error = dlm_lock(ls->dlm_lockspace, lp->req, &lp->lksb, lp->lkf,
258                          lp->strname.name, lp->strname.namelen, 0, gdlm_ast,
259                          lp, bast ? gdlm_bast : NULL);
260
261         if ((error == -EAGAIN) && (lp->lkf & DLM_LKF_NOQUEUE)) {
262                 lp->lksb.sb_status = -EAGAIN;
263                 queue_complete(lp);
264                 error = 0;
265         }
266
267         if (error) {
268                 log_error("%s: gdlm_lock %x,%llx err=%d cur=%d req=%d lkf=%x "
269                           "flags=%lx", ls->fsname, lp->lockname.ln_type,
270                           (unsigned long long)lp->lockname.ln_number, error,
271                           lp->cur, lp->req, lp->lkf, lp->flags);
272                 return LM_OUT_ERROR;
273         }
274         return LM_OUT_ASYNC;
275 }
276
277 static unsigned int gdlm_do_unlock(struct gdlm_lock *lp)
278 {
279         struct gdlm_ls *ls = lp->ls;
280         unsigned int lkf = 0;
281         int error;
282
283         set_bit(LFL_DLM_UNLOCK, &lp->flags);
284         set_bit(LFL_ACTIVE, &lp->flags);
285
286         if (lp->lvb)
287                 lkf = DLM_LKF_VALBLK;
288
289         log_debug("un %x,%llx %x %d %x", lp->lockname.ln_type,
290                   (unsigned long long)lp->lockname.ln_number,
291                   lp->lksb.sb_lkid, lp->cur, lkf);
292
293         error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, lkf, NULL, lp);
294
295         if (error) {
296                 log_error("%s: gdlm_unlock %x,%llx err=%d cur=%d req=%d lkf=%x "
297                           "flags=%lx", ls->fsname, lp->lockname.ln_type,
298                           (unsigned long long)lp->lockname.ln_number, error,
299                           lp->cur, lp->req, lp->lkf, lp->flags);
300                 return LM_OUT_ERROR;
301         }
302         return LM_OUT_ASYNC;
303 }
304
305 unsigned int gdlm_lock(void *lock, unsigned int cur_state,
306                        unsigned int req_state, unsigned int flags)
307 {
308         struct gdlm_lock *lp = lock;
309
310         clear_bit(LFL_DLM_CANCEL, &lp->flags);
311         if (flags & LM_FLAG_NOEXP)
312                 set_bit(LFL_NOBLOCK, &lp->flags);
313
314         check_cur_state(lp, cur_state);
315         lp->req = make_mode(req_state);
316         lp->lkf = make_flags(lp, flags, lp->cur, lp->req);
317
318         return gdlm_do_lock(lp);
319 }
320
321 unsigned int gdlm_unlock(void *lock, unsigned int cur_state)
322 {
323         struct gdlm_lock *lp = lock;
324
325         clear_bit(LFL_DLM_CANCEL, &lp->flags);
326         if (lp->cur == DLM_LOCK_IV)
327                 return 0;
328         return gdlm_do_unlock(lp);
329 }
330
331 void gdlm_cancel(void *lock)
332 {
333         struct gdlm_lock *lp = lock;
334         struct gdlm_ls *ls = lp->ls;
335         int error, delay_list = 0;
336
337         if (test_bit(LFL_DLM_CANCEL, &lp->flags))
338                 return;
339
340         log_info("gdlm_cancel %x,%llx flags %lx", lp->lockname.ln_type,
341                  (unsigned long long)lp->lockname.ln_number, lp->flags);
342
343         spin_lock(&ls->async_lock);
344         if (!list_empty(&lp->delay_list)) {
345                 list_del_init(&lp->delay_list);
346                 delay_list = 1;
347         }
348         spin_unlock(&ls->async_lock);
349
350         if (delay_list) {
351                 set_bit(LFL_CANCEL, &lp->flags);
352                 set_bit(LFL_ACTIVE, &lp->flags);
353                 queue_complete(lp);
354                 return;
355         }
356
357         if (!test_bit(LFL_ACTIVE, &lp->flags) ||
358             test_bit(LFL_DLM_UNLOCK, &lp->flags)) {
359                 log_info("gdlm_cancel skip %x,%llx flags %lx",
360                          lp->lockname.ln_type,
361                          (unsigned long long)lp->lockname.ln_number, lp->flags);
362                 return;
363         }
364
365         /* the lock is blocked in the dlm */
366
367         set_bit(LFL_DLM_CANCEL, &lp->flags);
368         set_bit(LFL_ACTIVE, &lp->flags);
369
370         error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, DLM_LKF_CANCEL,
371                            NULL, lp);
372
373         log_info("gdlm_cancel rv %d %x,%llx flags %lx", error,
374                  lp->lockname.ln_type,
375                  (unsigned long long)lp->lockname.ln_number, lp->flags);
376
377         if (error == -EBUSY)
378                 clear_bit(LFL_DLM_CANCEL, &lp->flags);
379 }
380
381 static int gdlm_add_lvb(struct gdlm_lock *lp)
382 {
383         char *lvb;
384
385         lvb = kzalloc(GDLM_LVB_SIZE, GFP_KERNEL);
386         if (!lvb)
387                 return -ENOMEM;
388
389         lp->lksb.sb_lvbptr = lvb;
390         lp->lvb = lvb;
391         return 0;
392 }
393
394 static void gdlm_del_lvb(struct gdlm_lock *lp)
395 {
396         kfree(lp->lvb);
397         lp->lvb = NULL;
398         lp->lksb.sb_lvbptr = NULL;
399 }
400
401 static int gdlm_ast_wait(void *word)
402 {
403         schedule();
404         return 0;
405 }
406
407 /* This can do a synchronous dlm request (requiring a lock_dlm thread to get
408    the completion) because gfs won't call hold_lvb() during a callback (from
409    the context of a lock_dlm thread). */
410
411 static int hold_null_lock(struct gdlm_lock *lp)
412 {
413         struct gdlm_lock *lpn = NULL;
414         int error;
415
416         if (lp->hold_null) {
417                 printk(KERN_INFO "lock_dlm: lvb already held\n");
418                 return 0;
419         }
420
421         error = gdlm_create_lp(lp->ls, &lp->lockname, &lpn);
422         if (error)
423                 goto out;
424
425         lpn->lksb.sb_lvbptr = junk_lvb;
426         lpn->lvb = junk_lvb;
427
428         lpn->req = DLM_LOCK_NL;
429         lpn->lkf = DLM_LKF_VALBLK | DLM_LKF_EXPEDITE;
430         set_bit(LFL_NOBAST, &lpn->flags);
431         set_bit(LFL_INLOCK, &lpn->flags);
432         set_bit(LFL_AST_WAIT, &lpn->flags);
433
434         gdlm_do_lock(lpn);
435         wait_on_bit(&lpn->flags, LFL_AST_WAIT, gdlm_ast_wait, TASK_UNINTERRUPTIBLE);
436         error = lpn->lksb.sb_status;
437         if (error) {
438                 printk(KERN_INFO "lock_dlm: hold_null_lock dlm error %d\n",
439                        error);
440                 gdlm_delete_lp(lpn);
441                 lpn = NULL;
442         }
443 out:
444         lp->hold_null = lpn;
445         return error;
446 }
447
448 /* This cannot do a synchronous dlm request (requiring a lock_dlm thread to get
449    the completion) because gfs may call unhold_lvb() during a callback (from
450    the context of a lock_dlm thread) which could cause a deadlock since the
451    other lock_dlm thread could be engaged in recovery. */
452
453 static void unhold_null_lock(struct gdlm_lock *lp)
454 {
455         struct gdlm_lock *lpn = lp->hold_null;
456
457         gdlm_assert(lpn, "%x,%llx", lp->lockname.ln_type,
458                     (unsigned long long)lp->lockname.ln_number);
459         lpn->lksb.sb_lvbptr = NULL;
460         lpn->lvb = NULL;
461         set_bit(LFL_UNLOCK_DELETE, &lpn->flags);
462         gdlm_do_unlock(lpn);
463         lp->hold_null = NULL;
464 }
465
466 /* Acquire a NL lock because gfs requires the value block to remain
467    intact on the resource while the lvb is "held" even if it's holding no locks
468    on the resource. */
469
470 int gdlm_hold_lvb(void *lock, char **lvbp)
471 {
472         struct gdlm_lock *lp = lock;
473         int error;
474
475         error = gdlm_add_lvb(lp);
476         if (error)
477                 return error;
478
479         *lvbp = lp->lvb;
480
481         error = hold_null_lock(lp);
482         if (error)
483                 gdlm_del_lvb(lp);
484
485         return error;
486 }
487
488 void gdlm_unhold_lvb(void *lock, char *lvb)
489 {
490         struct gdlm_lock *lp = lock;
491
492         unhold_null_lock(lp);
493         gdlm_del_lvb(lp);
494 }
495
496 void gdlm_submit_delayed(struct gdlm_ls *ls)
497 {
498         struct gdlm_lock *lp, *safe;
499
500         spin_lock(&ls->async_lock);
501         list_for_each_entry_safe(lp, safe, &ls->delayed, delay_list) {
502                 list_del_init(&lp->delay_list);
503                 list_add_tail(&lp->delay_list, &ls->submit);
504         }
505         spin_unlock(&ls->async_lock);
506         wake_up(&ls->thread_wait);
507 }
508
509 int gdlm_release_all_locks(struct gdlm_ls *ls)
510 {
511         struct gdlm_lock *lp, *safe;
512         int count = 0;
513
514         spin_lock(&ls->async_lock);
515         list_for_each_entry_safe(lp, safe, &ls->all_locks, all_list) {
516                 list_del_init(&lp->all_list);
517
518                 if (lp->lvb && lp->lvb != junk_lvb)
519                         kfree(lp->lvb);
520                 kfree(lp);
521                 count++;
522         }
523         spin_unlock(&ls->async_lock);
524
525         return count;
526 }
527