Merge branch 'master'
[linux-2.6] / fs / gfs2 / locking / dlm / lock.c
1 /*
2  * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
3  * Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
4  *
5  * This copyrighted material is made available to anyone wishing to use,
6  * modify, copy, or redistribute it subject to the terms and conditions
7  * of the GNU General Public License v.2.
8  */
9
10 #include "lock_dlm.h"
11
12 static char junk_lvb[GDLM_LVB_SIZE];
13
14 static void queue_complete(struct gdlm_lock *lp)
15 {
16         struct gdlm_ls *ls = lp->ls;
17
18         clear_bit(LFL_ACTIVE, &lp->flags);
19
20         spin_lock(&ls->async_lock);
21         list_add_tail(&lp->clist, &ls->complete);
22         spin_unlock(&ls->async_lock);
23         wake_up(&ls->thread_wait);
24 }
25
26 static inline void gdlm_ast(void *astarg)
27 {
28         queue_complete(astarg);
29 }
30
31 static inline void gdlm_bast(void *astarg, int mode)
32 {
33         struct gdlm_lock *lp = astarg;
34         struct gdlm_ls *ls = lp->ls;
35
36         if (!mode) {
37                 printk(KERN_INFO "lock_dlm: bast mode zero %x,%llx\n",
38                         lp->lockname.ln_type,
39                         (unsigned long long)lp->lockname.ln_number);
40                 return;
41         }
42
43         spin_lock(&ls->async_lock);
44         if (!lp->bast_mode) {
45                 list_add_tail(&lp->blist, &ls->blocking);
46                 lp->bast_mode = mode;
47         } else if (lp->bast_mode < mode)
48                 lp->bast_mode = mode;
49         spin_unlock(&ls->async_lock);
50         wake_up(&ls->thread_wait);
51 }
52
53 void gdlm_queue_delayed(struct gdlm_lock *lp)
54 {
55         struct gdlm_ls *ls = lp->ls;
56
57         spin_lock(&ls->async_lock);
58         list_add_tail(&lp->delay_list, &ls->delayed);
59         spin_unlock(&ls->async_lock);
60 }
61
62 /* convert gfs lock-state to dlm lock-mode */
63
64 static int16_t make_mode(int16_t lmstate)
65 {
66         switch (lmstate) {
67         case LM_ST_UNLOCKED:
68                 return DLM_LOCK_NL;
69         case LM_ST_EXCLUSIVE:
70                 return DLM_LOCK_EX;
71         case LM_ST_DEFERRED:
72                 return DLM_LOCK_CW;
73         case LM_ST_SHARED:
74                 return DLM_LOCK_PR;
75         }
76         gdlm_assert(0, "unknown LM state %d", lmstate);
77         return -1;
78 }
79
80 /* convert dlm lock-mode to gfs lock-state */
81
82 int16_t gdlm_make_lmstate(int16_t dlmmode)
83 {
84         switch (dlmmode) {
85         case DLM_LOCK_IV:
86         case DLM_LOCK_NL:
87                 return LM_ST_UNLOCKED;
88         case DLM_LOCK_EX:
89                 return LM_ST_EXCLUSIVE;
90         case DLM_LOCK_CW:
91                 return LM_ST_DEFERRED;
92         case DLM_LOCK_PR:
93                 return LM_ST_SHARED;
94         }
95         gdlm_assert(0, "unknown DLM mode %d", dlmmode);
96         return -1;
97 }
98
99 /* verify agreement with GFS on the current lock state, NB: DLM_LOCK_NL and
100    DLM_LOCK_IV are both considered LM_ST_UNLOCKED by GFS. */
101
102 static void check_cur_state(struct gdlm_lock *lp, unsigned int cur_state)
103 {
104         int16_t cur = make_mode(cur_state);
105         if (lp->cur != DLM_LOCK_IV)
106                 gdlm_assert(lp->cur == cur, "%d, %d", lp->cur, cur);
107 }
108
109 static inline unsigned int make_flags(struct gdlm_lock *lp,
110                                       unsigned int gfs_flags,
111                                       int16_t cur, int16_t req)
112 {
113         unsigned int lkf = 0;
114
115         if (gfs_flags & LM_FLAG_TRY)
116                 lkf |= DLM_LKF_NOQUEUE;
117
118         if (gfs_flags & LM_FLAG_TRY_1CB) {
119                 lkf |= DLM_LKF_NOQUEUE;
120                 lkf |= DLM_LKF_NOQUEUEBAST;
121         }
122
123         if (gfs_flags & LM_FLAG_PRIORITY) {
124                 lkf |= DLM_LKF_NOORDER;
125                 lkf |= DLM_LKF_HEADQUE;
126         }
127
128         if (gfs_flags & LM_FLAG_ANY) {
129                 if (req == DLM_LOCK_PR)
130                         lkf |= DLM_LKF_ALTCW;
131                 else if (req == DLM_LOCK_CW)
132                         lkf |= DLM_LKF_ALTPR;
133         }
134
135         if (lp->lksb.sb_lkid != 0) {
136                 lkf |= DLM_LKF_CONVERT;
137
138                 /* Conversion deadlock avoidance by DLM */
139
140                 if (!test_bit(LFL_FORCE_PROMOTE, &lp->flags) &&
141                     !(lkf & DLM_LKF_NOQUEUE) &&
142                     cur > DLM_LOCK_NL && req > DLM_LOCK_NL && cur != req)
143                         lkf |= DLM_LKF_CONVDEADLK;
144         }
145
146         if (lp->lvb)
147                 lkf |= DLM_LKF_VALBLK;
148
149         return lkf;
150 }
151
152 /* make_strname - convert GFS lock numbers to a string */
153
154 static inline void make_strname(struct lm_lockname *lockname,
155                                 struct gdlm_strname *str)
156 {
157         sprintf(str->name, "%8x%16llx", lockname->ln_type,
158                 (unsigned long long)lockname->ln_number);
159         str->namelen = GDLM_STRNAME_BYTES;
160 }
161
162 static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name,
163                           struct gdlm_lock **lpp)
164 {
165         struct gdlm_lock *lp;
166
167         lp = kzalloc(sizeof(struct gdlm_lock), GFP_KERNEL);
168         if (!lp)
169                 return -ENOMEM;
170
171         lp->lockname = *name;
172         lp->ls = ls;
173         lp->cur = DLM_LOCK_IV;
174         lp->lvb = NULL;
175         lp->hold_null = NULL;
176         init_completion(&lp->ast_wait);
177         INIT_LIST_HEAD(&lp->clist);
178         INIT_LIST_HEAD(&lp->blist);
179         INIT_LIST_HEAD(&lp->delay_list);
180
181         spin_lock(&ls->async_lock);
182         list_add(&lp->all_list, &ls->all_locks);
183         ls->all_locks_count++;
184         spin_unlock(&ls->async_lock);
185
186         *lpp = lp;
187         return 0;
188 }
189
190 void gdlm_delete_lp(struct gdlm_lock *lp)
191 {
192         struct gdlm_ls *ls = lp->ls;
193
194         spin_lock(&ls->async_lock);
195         if (!list_empty(&lp->clist))
196                 list_del_init(&lp->clist);
197         if (!list_empty(&lp->blist))
198                 list_del_init(&lp->blist);
199         if (!list_empty(&lp->delay_list))
200                 list_del_init(&lp->delay_list);
201         gdlm_assert(!list_empty(&lp->all_list), "%x,%llx", lp->lockname.ln_type,
202                     (unsigned long long)lp->lockname.ln_number);
203         list_del_init(&lp->all_list);
204         ls->all_locks_count--;
205         spin_unlock(&ls->async_lock);
206
207         kfree(lp);
208 }
209
210 int gdlm_get_lock(lm_lockspace_t *lockspace, struct lm_lockname *name,
211                   lm_lock_t **lockp)
212 {
213         struct gdlm_lock *lp;
214         int error;
215
216         error = gdlm_create_lp((struct gdlm_ls *) lockspace, name, &lp);
217
218         *lockp = (lm_lock_t *) lp;
219         return error;
220 }
221
222 void gdlm_put_lock(lm_lock_t *lock)
223 {
224         gdlm_delete_lp((struct gdlm_lock *) lock);
225 }
226
227 unsigned int gdlm_do_lock(struct gdlm_lock *lp)
228 {
229         struct gdlm_ls *ls = lp->ls;
230         struct gdlm_strname str;
231         int error, bast = 1;
232
233         /*
234          * When recovery is in progress, delay lock requests for submission
235          * once recovery is done.  Requests for recovery (NOEXP) and unlocks
236          * can pass.
237          */
238
239         if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
240             !test_bit(LFL_NOBLOCK, &lp->flags) && lp->req != DLM_LOCK_NL) {
241                 gdlm_queue_delayed(lp);
242                 return LM_OUT_ASYNC;
243         }
244
245         /*
246          * Submit the actual lock request.
247          */
248
249         if (test_bit(LFL_NOBAST, &lp->flags))
250                 bast = 0;
251
252         make_strname(&lp->lockname, &str);
253
254         set_bit(LFL_ACTIVE, &lp->flags);
255
256         log_debug("lk %x,%llx id %x %d,%d %x", lp->lockname.ln_type,
257                   (unsigned long long)lp->lockname.ln_number, lp->lksb.sb_lkid,
258                   lp->cur, lp->req, lp->lkf);
259
260         error = dlm_lock(ls->dlm_lockspace, lp->req, &lp->lksb, lp->lkf,
261                          str.name, str.namelen, 0, gdlm_ast, lp,
262                          bast ? gdlm_bast : NULL);
263
264         if ((error == -EAGAIN) && (lp->lkf & DLM_LKF_NOQUEUE)) {
265                 lp->lksb.sb_status = -EAGAIN;
266                 queue_complete(lp);
267                 error = 0;
268         }
269
270         if (error) {
271                 log_debug("%s: gdlm_lock %x,%llx err=%d cur=%d req=%d lkf=%x "
272                           "flags=%lx", ls->fsname, lp->lockname.ln_type,
273                           (unsigned long long)lp->lockname.ln_number, error,
274                           lp->cur, lp->req, lp->lkf, lp->flags);
275                 return LM_OUT_ERROR;
276         }
277         return LM_OUT_ASYNC;
278 }
279
280 static unsigned int gdlm_do_unlock(struct gdlm_lock *lp)
281 {
282         struct gdlm_ls *ls = lp->ls;
283         unsigned int lkf = 0;
284         int error;
285
286         set_bit(LFL_DLM_UNLOCK, &lp->flags);
287         set_bit(LFL_ACTIVE, &lp->flags);
288
289         if (lp->lvb)
290                 lkf = DLM_LKF_VALBLK;
291
292         log_debug("un %x,%llx %x %d %x", lp->lockname.ln_type,
293                   (unsigned long long)lp->lockname.ln_number,
294                   lp->lksb.sb_lkid, lp->cur, lkf);
295
296         error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, lkf, NULL, lp);
297
298         if (error) {
299                 log_debug("%s: gdlm_unlock %x,%llx err=%d cur=%d req=%d lkf=%x "
300                           "flags=%lx", ls->fsname, lp->lockname.ln_type,
301                           (unsigned long long)lp->lockname.ln_number, error,
302                           lp->cur, lp->req, lp->lkf, lp->flags);
303                 return LM_OUT_ERROR;
304         }
305         return LM_OUT_ASYNC;
306 }
307
308 unsigned int gdlm_lock(lm_lock_t *lock, unsigned int cur_state,
309                        unsigned int req_state, unsigned int flags)
310 {
311         struct gdlm_lock *lp = (struct gdlm_lock *) lock;
312
313         clear_bit(LFL_DLM_CANCEL, &lp->flags);
314         if (flags & LM_FLAG_NOEXP)
315                 set_bit(LFL_NOBLOCK, &lp->flags);
316
317         check_cur_state(lp, cur_state);
318         lp->req = make_mode(req_state);
319         lp->lkf = make_flags(lp, flags, lp->cur, lp->req);
320
321         return gdlm_do_lock(lp);
322 }
323
324 unsigned int gdlm_unlock(lm_lock_t *lock, unsigned int cur_state)
325 {
326         struct gdlm_lock *lp = (struct gdlm_lock *) lock;
327
328         clear_bit(LFL_DLM_CANCEL, &lp->flags);
329         if (lp->cur == DLM_LOCK_IV)
330                 return 0;
331         return gdlm_do_unlock(lp);
332 }
333
334 void gdlm_cancel(lm_lock_t *lock)
335 {
336         struct gdlm_lock *lp = (struct gdlm_lock *) lock;
337         struct gdlm_ls *ls = lp->ls;
338         int error, delay_list = 0;
339
340         if (test_bit(LFL_DLM_CANCEL, &lp->flags))
341                 return;
342
343         log_info("gdlm_cancel %x,%llx flags %lx", lp->lockname.ln_type,
344                  (unsigned long long)lp->lockname.ln_number, lp->flags);
345
346         spin_lock(&ls->async_lock);
347         if (!list_empty(&lp->delay_list)) {
348                 list_del_init(&lp->delay_list);
349                 delay_list = 1;
350         }
351         spin_unlock(&ls->async_lock);
352
353         if (delay_list) {
354                 set_bit(LFL_CANCEL, &lp->flags);
355                 set_bit(LFL_ACTIVE, &lp->flags);
356                 queue_complete(lp);
357                 return;
358         }
359
360         if (!test_bit(LFL_ACTIVE, &lp->flags) ||
361             test_bit(LFL_DLM_UNLOCK, &lp->flags)) {
362                 log_info("gdlm_cancel skip %x,%llx flags %lx",
363                          lp->lockname.ln_type,
364                          (unsigned long long)lp->lockname.ln_number, lp->flags);
365                 return;
366         }
367
368         /* the lock is blocked in the dlm */
369
370         set_bit(LFL_DLM_CANCEL, &lp->flags);
371         set_bit(LFL_ACTIVE, &lp->flags);
372
373         error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, DLM_LKF_CANCEL,
374                            NULL, lp);
375
376         log_info("gdlm_cancel rv %d %x,%llx flags %lx", error,
377                  lp->lockname.ln_type,
378                  (unsigned long long)lp->lockname.ln_number, lp->flags);
379
380         if (error == -EBUSY)
381                 clear_bit(LFL_DLM_CANCEL, &lp->flags);
382 }
383
384 static int gdlm_add_lvb(struct gdlm_lock *lp)
385 {
386         char *lvb;
387
388         lvb = kzalloc(GDLM_LVB_SIZE, GFP_KERNEL);
389         if (!lvb)
390                 return -ENOMEM;
391
392         lp->lksb.sb_lvbptr = lvb;
393         lp->lvb = lvb;
394         return 0;
395 }
396
397 static void gdlm_del_lvb(struct gdlm_lock *lp)
398 {
399         kfree(lp->lvb);
400         lp->lvb = NULL;
401         lp->lksb.sb_lvbptr = NULL;
402 }
403
404 /* This can do a synchronous dlm request (requiring a lock_dlm thread to get
405    the completion) because gfs won't call hold_lvb() during a callback (from
406    the context of a lock_dlm thread). */
407
408 static int hold_null_lock(struct gdlm_lock *lp)
409 {
410         struct gdlm_lock *lpn = NULL;
411         int error;
412
413         if (lp->hold_null) {
414                 printk(KERN_INFO "lock_dlm: lvb already held\n");
415                 return 0;
416         }
417
418         error = gdlm_create_lp(lp->ls, &lp->lockname, &lpn);
419         if (error)
420                 goto out;
421
422         lpn->lksb.sb_lvbptr = junk_lvb;
423         lpn->lvb = junk_lvb;
424
425         lpn->req = DLM_LOCK_NL;
426         lpn->lkf = DLM_LKF_VALBLK | DLM_LKF_EXPEDITE;
427         set_bit(LFL_NOBAST, &lpn->flags);
428         set_bit(LFL_INLOCK, &lpn->flags);
429
430         init_completion(&lpn->ast_wait);
431         gdlm_do_lock(lpn);
432         wait_for_completion(&lpn->ast_wait);
433         error = lp->lksb.sb_status;
434         if (error) {
435                 printk(KERN_INFO "lock_dlm: hold_null_lock dlm error %d\n",
436                        error);
437                 gdlm_delete_lp(lpn);
438                 lpn = NULL;
439         }
440  out:
441         lp->hold_null = lpn;
442         return error;
443 }
444
445 /* This cannot do a synchronous dlm request (requiring a lock_dlm thread to get
446    the completion) because gfs may call unhold_lvb() during a callback (from
447    the context of a lock_dlm thread) which could cause a deadlock since the
448    other lock_dlm thread could be engaged in recovery. */
449
450 static void unhold_null_lock(struct gdlm_lock *lp)
451 {
452         struct gdlm_lock *lpn = lp->hold_null;
453
454         gdlm_assert(lpn, "%x,%llx", lp->lockname.ln_type,
455                     (unsigned long long)lp->lockname.ln_number);
456         lpn->lksb.sb_lvbptr = NULL;
457         lpn->lvb = NULL;
458         set_bit(LFL_UNLOCK_DELETE, &lpn->flags);
459         gdlm_do_unlock(lpn);
460         lp->hold_null = NULL;
461 }
462
463 /* Acquire a NL lock because gfs requires the value block to remain
464    intact on the resource while the lvb is "held" even if it's holding no locks
465    on the resource. */
466
467 int gdlm_hold_lvb(lm_lock_t *lock, char **lvbp)
468 {
469         struct gdlm_lock *lp = (struct gdlm_lock *) lock;
470         int error;
471
472         error = gdlm_add_lvb(lp);
473         if (error)
474                 return error;
475
476         *lvbp = lp->lvb;
477
478         error = hold_null_lock(lp);
479         if (error)
480                 gdlm_del_lvb(lp);
481
482         return error;
483 }
484
485 void gdlm_unhold_lvb(lm_lock_t *lock, char *lvb)
486 {
487         struct gdlm_lock *lp = (struct gdlm_lock *) lock;
488
489         unhold_null_lock(lp);
490         gdlm_del_lvb(lp);
491 }
492
493 void gdlm_sync_lvb(lm_lock_t *lock, char *lvb)
494 {
495         struct gdlm_lock *lp = (struct gdlm_lock *) lock;
496
497         if (lp->cur != DLM_LOCK_EX)
498                 return;
499
500         init_completion(&lp->ast_wait);
501         set_bit(LFL_SYNC_LVB, &lp->flags);
502
503         lp->req = DLM_LOCK_EX;
504         lp->lkf = make_flags(lp, 0, lp->cur, lp->req);
505
506         gdlm_do_lock(lp);
507         wait_for_completion(&lp->ast_wait);
508 }
509
510 void gdlm_submit_delayed(struct gdlm_ls *ls)
511 {
512         struct gdlm_lock *lp, *safe;
513
514         spin_lock(&ls->async_lock);
515         list_for_each_entry_safe(lp, safe, &ls->delayed, delay_list) {
516                 list_del_init(&lp->delay_list);
517                 list_add_tail(&lp->delay_list, &ls->submit);
518         }
519         spin_unlock(&ls->async_lock);
520         wake_up(&ls->thread_wait);
521 }
522
523 int gdlm_release_all_locks(struct gdlm_ls *ls)
524 {
525         struct gdlm_lock *lp, *safe;
526         int count = 0;
527
528         spin_lock(&ls->async_lock);
529         list_for_each_entry_safe(lp, safe, &ls->all_locks, all_list) {
530                 list_del_init(&lp->all_list);
531
532                 if (lp->lvb && lp->lvb != junk_lvb)
533                         kfree(lp->lvb);
534                 kfree(lp);
535                 count++;
536         }
537         spin_unlock(&ls->async_lock);
538
539         return count;
540 }
541