[GFS2] use log_error before LM_OUT_ERROR
[linux-2.6] / fs / gfs2 / locking / dlm / lock.c
1 /*
2  * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
3  * Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
4  *
5  * This copyrighted material is made available to anyone wishing to use,
6  * modify, copy, or redistribute it subject to the terms and conditions
7  * of the GNU General Public License version 2.
8  */
9
10 #include "lock_dlm.h"
11
12 static char junk_lvb[GDLM_LVB_SIZE];
13
14 static void queue_complete(struct gdlm_lock *lp)
15 {
16         struct gdlm_ls *ls = lp->ls;
17
18         clear_bit(LFL_ACTIVE, &lp->flags);
19
20         spin_lock(&ls->async_lock);
21         list_add_tail(&lp->clist, &ls->complete);
22         spin_unlock(&ls->async_lock);
23         wake_up(&ls->thread_wait);
24 }
25
26 static inline void gdlm_ast(void *astarg)
27 {
28         queue_complete(astarg);
29 }
30
31 static inline void gdlm_bast(void *astarg, int mode)
32 {
33         struct gdlm_lock *lp = astarg;
34         struct gdlm_ls *ls = lp->ls;
35
36         if (!mode) {
37                 printk(KERN_INFO "lock_dlm: bast mode zero %x,%llx\n",
38                         lp->lockname.ln_type,
39                         (unsigned long long)lp->lockname.ln_number);
40                 return;
41         }
42
43         spin_lock(&ls->async_lock);
44         if (!lp->bast_mode) {
45                 list_add_tail(&lp->blist, &ls->blocking);
46                 lp->bast_mode = mode;
47         } else if (lp->bast_mode < mode)
48                 lp->bast_mode = mode;
49         spin_unlock(&ls->async_lock);
50         wake_up(&ls->thread_wait);
51 }
52
53 void gdlm_queue_delayed(struct gdlm_lock *lp)
54 {
55         struct gdlm_ls *ls = lp->ls;
56
57         spin_lock(&ls->async_lock);
58         list_add_tail(&lp->delay_list, &ls->delayed);
59         spin_unlock(&ls->async_lock);
60 }
61
62 /* convert gfs lock-state to dlm lock-mode */
63
64 static s16 make_mode(s16 lmstate)
65 {
66         switch (lmstate) {
67         case LM_ST_UNLOCKED:
68                 return DLM_LOCK_NL;
69         case LM_ST_EXCLUSIVE:
70                 return DLM_LOCK_EX;
71         case LM_ST_DEFERRED:
72                 return DLM_LOCK_CW;
73         case LM_ST_SHARED:
74                 return DLM_LOCK_PR;
75         }
76         gdlm_assert(0, "unknown LM state %d", lmstate);
77         return -1;
78 }
79
80 /* convert dlm lock-mode to gfs lock-state */
81
82 s16 gdlm_make_lmstate(s16 dlmmode)
83 {
84         switch (dlmmode) {
85         case DLM_LOCK_IV:
86         case DLM_LOCK_NL:
87                 return LM_ST_UNLOCKED;
88         case DLM_LOCK_EX:
89                 return LM_ST_EXCLUSIVE;
90         case DLM_LOCK_CW:
91                 return LM_ST_DEFERRED;
92         case DLM_LOCK_PR:
93                 return LM_ST_SHARED;
94         }
95         gdlm_assert(0, "unknown DLM mode %d", dlmmode);
96         return -1;
97 }
98
99 /* verify agreement with GFS on the current lock state, NB: DLM_LOCK_NL and
100    DLM_LOCK_IV are both considered LM_ST_UNLOCKED by GFS. */
101
102 static void check_cur_state(struct gdlm_lock *lp, unsigned int cur_state)
103 {
104         s16 cur = make_mode(cur_state);
105         if (lp->cur != DLM_LOCK_IV)
106                 gdlm_assert(lp->cur == cur, "%d, %d", lp->cur, cur);
107 }
108
109 static inline unsigned int make_flags(struct gdlm_lock *lp,
110                                       unsigned int gfs_flags,
111                                       s16 cur, s16 req)
112 {
113         unsigned int lkf = 0;
114
115         if (gfs_flags & LM_FLAG_TRY)
116                 lkf |= DLM_LKF_NOQUEUE;
117
118         if (gfs_flags & LM_FLAG_TRY_1CB) {
119                 lkf |= DLM_LKF_NOQUEUE;
120                 lkf |= DLM_LKF_NOQUEUEBAST;
121         }
122
123         if (gfs_flags & LM_FLAG_PRIORITY) {
124                 lkf |= DLM_LKF_NOORDER;
125                 lkf |= DLM_LKF_HEADQUE;
126         }
127
128         if (gfs_flags & LM_FLAG_ANY) {
129                 if (req == DLM_LOCK_PR)
130                         lkf |= DLM_LKF_ALTCW;
131                 else if (req == DLM_LOCK_CW)
132                         lkf |= DLM_LKF_ALTPR;
133         }
134
135         if (lp->lksb.sb_lkid != 0) {
136                 lkf |= DLM_LKF_CONVERT;
137
138                 /* Conversion deadlock avoidance by DLM */
139
140                 if (!test_bit(LFL_FORCE_PROMOTE, &lp->flags) &&
141                     !(lkf & DLM_LKF_NOQUEUE) &&
142                     cur > DLM_LOCK_NL && req > DLM_LOCK_NL && cur != req)
143                         lkf |= DLM_LKF_CONVDEADLK;
144         }
145
146         if (lp->lvb)
147                 lkf |= DLM_LKF_VALBLK;
148
149         return lkf;
150 }
151
152 /* make_strname - convert GFS lock numbers to a string */
153
154 static inline void make_strname(const struct lm_lockname *lockname,
155                                 struct gdlm_strname *str)
156 {
157         sprintf(str->name, "%8x%16llx", lockname->ln_type,
158                 (unsigned long long)lockname->ln_number);
159         str->namelen = GDLM_STRNAME_BYTES;
160 }
161
162 static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name,
163                           struct gdlm_lock **lpp)
164 {
165         struct gdlm_lock *lp;
166
167         lp = kzalloc(sizeof(struct gdlm_lock), GFP_KERNEL);
168         if (!lp)
169                 return -ENOMEM;
170
171         lp->lockname = *name;
172         make_strname(name, &lp->strname);
173         lp->ls = ls;
174         lp->cur = DLM_LOCK_IV;
175         lp->lvb = NULL;
176         lp->hold_null = NULL;
177         init_completion(&lp->ast_wait);
178         INIT_LIST_HEAD(&lp->clist);
179         INIT_LIST_HEAD(&lp->blist);
180         INIT_LIST_HEAD(&lp->delay_list);
181
182         spin_lock(&ls->async_lock);
183         list_add(&lp->all_list, &ls->all_locks);
184         ls->all_locks_count++;
185         spin_unlock(&ls->async_lock);
186
187         *lpp = lp;
188         return 0;
189 }
190
191 void gdlm_delete_lp(struct gdlm_lock *lp)
192 {
193         struct gdlm_ls *ls = lp->ls;
194
195         spin_lock(&ls->async_lock);
196         if (!list_empty(&lp->clist))
197                 list_del_init(&lp->clist);
198         if (!list_empty(&lp->blist))
199                 list_del_init(&lp->blist);
200         if (!list_empty(&lp->delay_list))
201                 list_del_init(&lp->delay_list);
202         gdlm_assert(!list_empty(&lp->all_list), "%x,%llx", lp->lockname.ln_type,
203                     (unsigned long long)lp->lockname.ln_number);
204         list_del_init(&lp->all_list);
205         ls->all_locks_count--;
206         spin_unlock(&ls->async_lock);
207
208         kfree(lp);
209 }
210
211 int gdlm_get_lock(void *lockspace, struct lm_lockname *name,
212                   void **lockp)
213 {
214         struct gdlm_lock *lp;
215         int error;
216
217         error = gdlm_create_lp(lockspace, name, &lp);
218
219         *lockp = lp;
220         return error;
221 }
222
223 void gdlm_put_lock(void *lock)
224 {
225         gdlm_delete_lp(lock);
226 }
227
228 unsigned int gdlm_do_lock(struct gdlm_lock *lp)
229 {
230         struct gdlm_ls *ls = lp->ls;
231         int error, bast = 1;
232
233         /*
234          * When recovery is in progress, delay lock requests for submission
235          * once recovery is done.  Requests for recovery (NOEXP) and unlocks
236          * can pass.
237          */
238
239         if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
240             !test_bit(LFL_NOBLOCK, &lp->flags) && lp->req != DLM_LOCK_NL) {
241                 gdlm_queue_delayed(lp);
242                 return LM_OUT_ASYNC;
243         }
244
245         /*
246          * Submit the actual lock request.
247          */
248
249         if (test_bit(LFL_NOBAST, &lp->flags))
250                 bast = 0;
251
252         set_bit(LFL_ACTIVE, &lp->flags);
253
254         log_debug("lk %x,%llx id %x %d,%d %x", lp->lockname.ln_type,
255                   (unsigned long long)lp->lockname.ln_number, lp->lksb.sb_lkid,
256                   lp->cur, lp->req, lp->lkf);
257
258         error = dlm_lock(ls->dlm_lockspace, lp->req, &lp->lksb, lp->lkf,
259                          lp->strname.name, lp->strname.namelen, 0, gdlm_ast,
260                          lp, bast ? gdlm_bast : NULL);
261
262         if ((error == -EAGAIN) && (lp->lkf & DLM_LKF_NOQUEUE)) {
263                 lp->lksb.sb_status = -EAGAIN;
264                 queue_complete(lp);
265                 error = 0;
266         }
267
268         if (error) {
269                 log_error("%s: gdlm_lock %x,%llx err=%d cur=%d req=%d lkf=%x "
270                           "flags=%lx", ls->fsname, lp->lockname.ln_type,
271                           (unsigned long long)lp->lockname.ln_number, error,
272                           lp->cur, lp->req, lp->lkf, lp->flags);
273                 return LM_OUT_ERROR;
274         }
275         return LM_OUT_ASYNC;
276 }
277
278 static unsigned int gdlm_do_unlock(struct gdlm_lock *lp)
279 {
280         struct gdlm_ls *ls = lp->ls;
281         unsigned int lkf = 0;
282         int error;
283
284         set_bit(LFL_DLM_UNLOCK, &lp->flags);
285         set_bit(LFL_ACTIVE, &lp->flags);
286
287         if (lp->lvb)
288                 lkf = DLM_LKF_VALBLK;
289
290         log_debug("un %x,%llx %x %d %x", lp->lockname.ln_type,
291                   (unsigned long long)lp->lockname.ln_number,
292                   lp->lksb.sb_lkid, lp->cur, lkf);
293
294         error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, lkf, NULL, lp);
295
296         if (error) {
297                 log_error("%s: gdlm_unlock %x,%llx err=%d cur=%d req=%d lkf=%x "
298                           "flags=%lx", ls->fsname, lp->lockname.ln_type,
299                           (unsigned long long)lp->lockname.ln_number, error,
300                           lp->cur, lp->req, lp->lkf, lp->flags);
301                 return LM_OUT_ERROR;
302         }
303         return LM_OUT_ASYNC;
304 }
305
306 unsigned int gdlm_lock(void *lock, unsigned int cur_state,
307                        unsigned int req_state, unsigned int flags)
308 {
309         struct gdlm_lock *lp = lock;
310
311         clear_bit(LFL_DLM_CANCEL, &lp->flags);
312         if (flags & LM_FLAG_NOEXP)
313                 set_bit(LFL_NOBLOCK, &lp->flags);
314
315         check_cur_state(lp, cur_state);
316         lp->req = make_mode(req_state);
317         lp->lkf = make_flags(lp, flags, lp->cur, lp->req);
318
319         return gdlm_do_lock(lp);
320 }
321
322 unsigned int gdlm_unlock(void *lock, unsigned int cur_state)
323 {
324         struct gdlm_lock *lp = lock;
325
326         clear_bit(LFL_DLM_CANCEL, &lp->flags);
327         if (lp->cur == DLM_LOCK_IV)
328                 return 0;
329         return gdlm_do_unlock(lp);
330 }
331
332 void gdlm_cancel(void *lock)
333 {
334         struct gdlm_lock *lp = lock;
335         struct gdlm_ls *ls = lp->ls;
336         int error, delay_list = 0;
337
338         if (test_bit(LFL_DLM_CANCEL, &lp->flags))
339                 return;
340
341         log_info("gdlm_cancel %x,%llx flags %lx", lp->lockname.ln_type,
342                  (unsigned long long)lp->lockname.ln_number, lp->flags);
343
344         spin_lock(&ls->async_lock);
345         if (!list_empty(&lp->delay_list)) {
346                 list_del_init(&lp->delay_list);
347                 delay_list = 1;
348         }
349         spin_unlock(&ls->async_lock);
350
351         if (delay_list) {
352                 set_bit(LFL_CANCEL, &lp->flags);
353                 set_bit(LFL_ACTIVE, &lp->flags);
354                 queue_complete(lp);
355                 return;
356         }
357
358         if (!test_bit(LFL_ACTIVE, &lp->flags) ||
359             test_bit(LFL_DLM_UNLOCK, &lp->flags)) {
360                 log_info("gdlm_cancel skip %x,%llx flags %lx",
361                          lp->lockname.ln_type,
362                          (unsigned long long)lp->lockname.ln_number, lp->flags);
363                 return;
364         }
365
366         /* the lock is blocked in the dlm */
367
368         set_bit(LFL_DLM_CANCEL, &lp->flags);
369         set_bit(LFL_ACTIVE, &lp->flags);
370
371         error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, DLM_LKF_CANCEL,
372                            NULL, lp);
373
374         log_info("gdlm_cancel rv %d %x,%llx flags %lx", error,
375                  lp->lockname.ln_type,
376                  (unsigned long long)lp->lockname.ln_number, lp->flags);
377
378         if (error == -EBUSY)
379                 clear_bit(LFL_DLM_CANCEL, &lp->flags);
380 }
381
382 static int gdlm_add_lvb(struct gdlm_lock *lp)
383 {
384         char *lvb;
385
386         lvb = kzalloc(GDLM_LVB_SIZE, GFP_KERNEL);
387         if (!lvb)
388                 return -ENOMEM;
389
390         lp->lksb.sb_lvbptr = lvb;
391         lp->lvb = lvb;
392         return 0;
393 }
394
395 static void gdlm_del_lvb(struct gdlm_lock *lp)
396 {
397         kfree(lp->lvb);
398         lp->lvb = NULL;
399         lp->lksb.sb_lvbptr = NULL;
400 }
401
402 /* This can do a synchronous dlm request (requiring a lock_dlm thread to get
403    the completion) because gfs won't call hold_lvb() during a callback (from
404    the context of a lock_dlm thread). */
405
406 static int hold_null_lock(struct gdlm_lock *lp)
407 {
408         struct gdlm_lock *lpn = NULL;
409         int error;
410
411         if (lp->hold_null) {
412                 printk(KERN_INFO "lock_dlm: lvb already held\n");
413                 return 0;
414         }
415
416         error = gdlm_create_lp(lp->ls, &lp->lockname, &lpn);
417         if (error)
418                 goto out;
419
420         lpn->lksb.sb_lvbptr = junk_lvb;
421         lpn->lvb = junk_lvb;
422
423         lpn->req = DLM_LOCK_NL;
424         lpn->lkf = DLM_LKF_VALBLK | DLM_LKF_EXPEDITE;
425         set_bit(LFL_NOBAST, &lpn->flags);
426         set_bit(LFL_INLOCK, &lpn->flags);
427
428         init_completion(&lpn->ast_wait);
429         gdlm_do_lock(lpn);
430         wait_for_completion(&lpn->ast_wait);
431         error = lpn->lksb.sb_status;
432         if (error) {
433                 printk(KERN_INFO "lock_dlm: hold_null_lock dlm error %d\n",
434                        error);
435                 gdlm_delete_lp(lpn);
436                 lpn = NULL;
437         }
438 out:
439         lp->hold_null = lpn;
440         return error;
441 }
442
443 /* This cannot do a synchronous dlm request (requiring a lock_dlm thread to get
444    the completion) because gfs may call unhold_lvb() during a callback (from
445    the context of a lock_dlm thread) which could cause a deadlock since the
446    other lock_dlm thread could be engaged in recovery. */
447
448 static void unhold_null_lock(struct gdlm_lock *lp)
449 {
450         struct gdlm_lock *lpn = lp->hold_null;
451
452         gdlm_assert(lpn, "%x,%llx", lp->lockname.ln_type,
453                     (unsigned long long)lp->lockname.ln_number);
454         lpn->lksb.sb_lvbptr = NULL;
455         lpn->lvb = NULL;
456         set_bit(LFL_UNLOCK_DELETE, &lpn->flags);
457         gdlm_do_unlock(lpn);
458         lp->hold_null = NULL;
459 }
460
461 /* Acquire a NL lock because gfs requires the value block to remain
462    intact on the resource while the lvb is "held" even if it's holding no locks
463    on the resource. */
464
465 int gdlm_hold_lvb(void *lock, char **lvbp)
466 {
467         struct gdlm_lock *lp = lock;
468         int error;
469
470         error = gdlm_add_lvb(lp);
471         if (error)
472                 return error;
473
474         *lvbp = lp->lvb;
475
476         error = hold_null_lock(lp);
477         if (error)
478                 gdlm_del_lvb(lp);
479
480         return error;
481 }
482
483 void gdlm_unhold_lvb(void *lock, char *lvb)
484 {
485         struct gdlm_lock *lp = lock;
486
487         unhold_null_lock(lp);
488         gdlm_del_lvb(lp);
489 }
490
491 void gdlm_submit_delayed(struct gdlm_ls *ls)
492 {
493         struct gdlm_lock *lp, *safe;
494
495         spin_lock(&ls->async_lock);
496         list_for_each_entry_safe(lp, safe, &ls->delayed, delay_list) {
497                 list_del_init(&lp->delay_list);
498                 list_add_tail(&lp->delay_list, &ls->submit);
499         }
500         spin_unlock(&ls->async_lock);
501         wake_up(&ls->thread_wait);
502 }
503
504 int gdlm_release_all_locks(struct gdlm_ls *ls)
505 {
506         struct gdlm_lock *lp, *safe;
507         int count = 0;
508
509         spin_lock(&ls->async_lock);
510         list_for_each_entry_safe(lp, safe, &ls->all_locks, all_list) {
511                 list_del_init(&lp->all_list);
512
513                 if (lp->lvb && lp->lvb != junk_lvb)
514                         kfree(lp->lvb);
515                 kfree(lp);
516                 count++;
517         }
518         spin_unlock(&ls->async_lock);
519
520         return count;
521 }
522