Linux 2.6.31-rc6
[linux-2.6] / fs / ocfs2 / dlmglue.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/kthread.h>
31 #include <linux/pagemap.h>
32 #include <linux/debugfs.h>
33 #include <linux/seq_file.h>
34 #include <linux/time.h>
35 #include <linux/quotaops.h>
36
37 #define MLOG_MASK_PREFIX ML_DLM_GLUE
38 #include <cluster/masklog.h>
39
40 #include "ocfs2.h"
41 #include "ocfs2_lockingver.h"
42
43 #include "alloc.h"
44 #include "dcache.h"
45 #include "dlmglue.h"
46 #include "extent_map.h"
47 #include "file.h"
48 #include "heartbeat.h"
49 #include "inode.h"
50 #include "journal.h"
51 #include "stackglue.h"
52 #include "slot_map.h"
53 #include "super.h"
54 #include "uptodate.h"
55 #include "quota.h"
56
57 #include "buffer_head_io.h"
58
59 struct ocfs2_mask_waiter {
60         struct list_head        mw_item;
61         int                     mw_status;
62         struct completion       mw_complete;
63         unsigned long           mw_mask;
64         unsigned long           mw_goal;
65 #ifdef CONFIG_OCFS2_FS_STATS
66         unsigned long long      mw_lock_start;
67 #endif
68 };
69
70 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
71 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
72 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
73 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
74
75 /*
76  * Return value from ->downconvert_worker functions.
77  *
78  * These control the precise actions of ocfs2_unblock_lock()
79  * and ocfs2_process_blocked_lock()
80  *
81  */
82 enum ocfs2_unblock_action {
83         UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
84         UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
85                                       * ->post_unlock callback */
86         UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
87                                       * ->post_unlock() callback. */
88 };
89
90 struct ocfs2_unblock_ctl {
91         int requeue;
92         enum ocfs2_unblock_action unblock_action;
93 };
94
95 /* Lockdep class keys */
96 struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
97
98 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
99                                         int new_level);
100 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
101
102 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
103                                      int blocking);
104
105 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
106                                        int blocking);
107
108 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
109                                      struct ocfs2_lock_res *lockres);
110
111 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
112
113 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
114
115 /* This aids in debugging situations where a bad LVB might be involved. */
116 static void ocfs2_dump_meta_lvb_info(u64 level,
117                                      const char *function,
118                                      unsigned int line,
119                                      struct ocfs2_lock_res *lockres)
120 {
121         struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
122
123         mlog(level, "LVB information for %s (called from %s:%u):\n",
124              lockres->l_name, function, line);
125         mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
126              lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
127              be32_to_cpu(lvb->lvb_igeneration));
128         mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
129              (unsigned long long)be64_to_cpu(lvb->lvb_isize),
130              be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
131              be16_to_cpu(lvb->lvb_imode));
132         mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
133              "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
134              (long long)be64_to_cpu(lvb->lvb_iatime_packed),
135              (long long)be64_to_cpu(lvb->lvb_ictime_packed),
136              (long long)be64_to_cpu(lvb->lvb_imtime_packed),
137              be32_to_cpu(lvb->lvb_iattr));
138 }
139
140
141 /*
142  * OCFS2 Lock Resource Operations
143  *
144  * These fine tune the behavior of the generic dlmglue locking infrastructure.
145  *
146  * The most basic of lock types can point ->l_priv to their respective
147  * struct ocfs2_super and allow the default actions to manage things.
148  *
149  * Right now, each lock type also needs to implement an init function,
150  * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
151  * should be called when the lock is no longer needed (i.e., object
152  * destruction time).
153  */
154 struct ocfs2_lock_res_ops {
155         /*
156          * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
157          * this callback if ->l_priv is not an ocfs2_super pointer
158          */
159         struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
160
161         /*
162          * Optionally called in the downconvert thread after a
163          * successful downconvert. The lockres will not be referenced
164          * after this callback is called, so it is safe to free
165          * memory, etc.
166          *
167          * The exact semantics of when this is called are controlled
168          * by ->downconvert_worker()
169          */
170         void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
171
172         /*
173          * Allow a lock type to add checks to determine whether it is
174          * safe to downconvert a lock. Return 0 to re-queue the
175          * downconvert at a later time, nonzero to continue.
176          *
177          * For most locks, the default checks that there are no
178          * incompatible holders are sufficient.
179          *
180          * Called with the lockres spinlock held.
181          */
182         int (*check_downconvert)(struct ocfs2_lock_res *, int);
183
184         /*
185          * Allows a lock type to populate the lock value block. This
186          * is called on downconvert, and when we drop a lock.
187          *
188          * Locks that want to use this should set LOCK_TYPE_USES_LVB
189          * in the flags field.
190          *
191          * Called with the lockres spinlock held.
192          */
193         void (*set_lvb)(struct ocfs2_lock_res *);
194
195         /*
196          * Called from the downconvert thread when it is determined
197          * that a lock will be downconverted. This is called without
198          * any locks held so the function can do work that might
199          * schedule (syncing out data, etc).
200          *
201          * This should return any one of the ocfs2_unblock_action
202          * values, depending on what it wants the thread to do.
203          */
204         int (*downconvert_worker)(struct ocfs2_lock_res *, int);
205
206         /*
207          * LOCK_TYPE_* flags which describe the specific requirements
208          * of a lock type. Descriptions of each individual flag follow.
209          */
210         int flags;
211 };
212
213 /*
214  * Some locks want to "refresh" potentially stale data when a
215  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
216  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
217  * individual lockres l_flags member from the ast function. It is
218  * expected that the locking wrapper will clear the
219  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
220  */
221 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
222
223 /*
224  * Indicate that a lock type makes use of the lock value block. The
225  * ->set_lvb lock type callback must be defined.
226  */
227 #define LOCK_TYPE_USES_LVB              0x2
228
229 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
230         .get_osb        = ocfs2_get_inode_osb,
231         .flags          = 0,
232 };
233
234 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
235         .get_osb        = ocfs2_get_inode_osb,
236         .check_downconvert = ocfs2_check_meta_downconvert,
237         .set_lvb        = ocfs2_set_meta_lvb,
238         .downconvert_worker = ocfs2_data_convert_worker,
239         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
240 };
241
242 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
243         .flags          = LOCK_TYPE_REQUIRES_REFRESH,
244 };
245
246 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
247         .flags          = 0,
248 };
249
250 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
251         .flags          = 0,
252 };
253
254 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
255         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
256 };
257
258 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
259         .get_osb        = ocfs2_get_dentry_osb,
260         .post_unlock    = ocfs2_dentry_post_unlock,
261         .downconvert_worker = ocfs2_dentry_convert_worker,
262         .flags          = 0,
263 };
264
265 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
266         .get_osb        = ocfs2_get_inode_osb,
267         .flags          = 0,
268 };
269
270 static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
271         .get_osb        = ocfs2_get_file_osb,
272         .flags          = 0,
273 };
274
275 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
276         .set_lvb        = ocfs2_set_qinfo_lvb,
277         .get_osb        = ocfs2_get_qinfo_osb,
278         .flags          = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
279 };
280
281 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
282 {
283         return lockres->l_type == OCFS2_LOCK_TYPE_META ||
284                 lockres->l_type == OCFS2_LOCK_TYPE_RW ||
285                 lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
286 }
287
288 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
289 {
290         BUG_ON(!ocfs2_is_inode_lock(lockres));
291
292         return (struct inode *) lockres->l_priv;
293 }
294
295 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
296 {
297         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
298
299         return (struct ocfs2_dentry_lock *)lockres->l_priv;
300 }
301
302 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
303 {
304         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
305
306         return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
307 }
308
309 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
310 {
311         if (lockres->l_ops->get_osb)
312                 return lockres->l_ops->get_osb(lockres);
313
314         return (struct ocfs2_super *)lockres->l_priv;
315 }
316
317 static int ocfs2_lock_create(struct ocfs2_super *osb,
318                              struct ocfs2_lock_res *lockres,
319                              int level,
320                              u32 dlm_flags);
321 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
322                                                      int wanted);
323 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
324                                    struct ocfs2_lock_res *lockres,
325                                    int level, unsigned long caller_ip);
326 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
327                                         struct ocfs2_lock_res *lockres,
328                                         int level)
329 {
330         __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
331 }
332
333 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
334 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
335 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
336 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
337 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
338                                         struct ocfs2_lock_res *lockres);
339 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
340                                                 int convert);
341 #define ocfs2_log_dlm_error(_func, _err, _lockres) do {                                 \
342         if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY)                               \
343                 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n",        \
344                      _err, _func, _lockres->l_name);                                    \
345         else                                                                            \
346                 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n",  \
347                      _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name,  \
348                      (unsigned int)ocfs2_get_dentry_lock_ino(_lockres));                \
349 } while (0)
350 static int ocfs2_downconvert_thread(void *arg);
351 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
352                                         struct ocfs2_lock_res *lockres);
353 static int ocfs2_inode_lock_update(struct inode *inode,
354                                   struct buffer_head **bh);
355 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
356 static inline int ocfs2_highest_compat_lock_level(int level);
357 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
358                                               int new_level);
359 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
360                                   struct ocfs2_lock_res *lockres,
361                                   int new_level,
362                                   int lvb,
363                                   unsigned int generation);
364 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
365                                         struct ocfs2_lock_res *lockres);
366 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
367                                 struct ocfs2_lock_res *lockres);
368
369
370 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
371                                   u64 blkno,
372                                   u32 generation,
373                                   char *name)
374 {
375         int len;
376
377         mlog_entry_void();
378
379         BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
380
381         len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
382                        ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
383                        (long long)blkno, generation);
384
385         BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
386
387         mlog(0, "built lock resource with name: %s\n", name);
388
389         mlog_exit_void();
390 }
391
392 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
393
394 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
395                                        struct ocfs2_dlm_debug *dlm_debug)
396 {
397         mlog(0, "Add tracking for lockres %s\n", res->l_name);
398
399         spin_lock(&ocfs2_dlm_tracking_lock);
400         list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
401         spin_unlock(&ocfs2_dlm_tracking_lock);
402 }
403
404 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
405 {
406         spin_lock(&ocfs2_dlm_tracking_lock);
407         if (!list_empty(&res->l_debug_list))
408                 list_del_init(&res->l_debug_list);
409         spin_unlock(&ocfs2_dlm_tracking_lock);
410 }
411
412 #ifdef CONFIG_OCFS2_FS_STATS
413 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
414 {
415         res->l_lock_num_prmode = 0;
416         res->l_lock_num_prmode_failed = 0;
417         res->l_lock_total_prmode = 0;
418         res->l_lock_max_prmode = 0;
419         res->l_lock_num_exmode = 0;
420         res->l_lock_num_exmode_failed = 0;
421         res->l_lock_total_exmode = 0;
422         res->l_lock_max_exmode = 0;
423         res->l_lock_refresh = 0;
424 }
425
426 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
427                                     struct ocfs2_mask_waiter *mw, int ret)
428 {
429         unsigned long long *num, *sum;
430         unsigned int *max, *failed;
431         struct timespec ts = current_kernel_time();
432         unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start;
433
434         if (level == LKM_PRMODE) {
435                 num = &res->l_lock_num_prmode;
436                 sum = &res->l_lock_total_prmode;
437                 max = &res->l_lock_max_prmode;
438                 failed = &res->l_lock_num_prmode_failed;
439         } else if (level == LKM_EXMODE) {
440                 num = &res->l_lock_num_exmode;
441                 sum = &res->l_lock_total_exmode;
442                 max = &res->l_lock_max_exmode;
443                 failed = &res->l_lock_num_exmode_failed;
444         } else
445                 return;
446
447         (*num)++;
448         (*sum) += time;
449         if (time > *max)
450                 *max = time;
451         if (ret)
452                 (*failed)++;
453 }
454
455 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
456 {
457         lockres->l_lock_refresh++;
458 }
459
460 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
461 {
462         struct timespec ts = current_kernel_time();
463         mw->mw_lock_start = timespec_to_ns(&ts);
464 }
465 #else
466 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
467 {
468 }
469 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
470                            int level, struct ocfs2_mask_waiter *mw, int ret)
471 {
472 }
473 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
474 {
475 }
476 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
477 {
478 }
479 #endif
480
481 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
482                                        struct ocfs2_lock_res *res,
483                                        enum ocfs2_lock_type type,
484                                        struct ocfs2_lock_res_ops *ops,
485                                        void *priv)
486 {
487         res->l_type          = type;
488         res->l_ops           = ops;
489         res->l_priv          = priv;
490
491         res->l_level         = DLM_LOCK_IV;
492         res->l_requested     = DLM_LOCK_IV;
493         res->l_blocking      = DLM_LOCK_IV;
494         res->l_action        = OCFS2_AST_INVALID;
495         res->l_unlock_action = OCFS2_UNLOCK_INVALID;
496
497         res->l_flags         = OCFS2_LOCK_INITIALIZED;
498
499         ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
500
501         ocfs2_init_lock_stats(res);
502 #ifdef CONFIG_DEBUG_LOCK_ALLOC
503         if (type != OCFS2_LOCK_TYPE_OPEN)
504                 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
505                                  &lockdep_keys[type], 0);
506         else
507                 res->l_lockdep_map.key = NULL;
508 #endif
509 }
510
511 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
512 {
513         /* This also clears out the lock status block */
514         memset(res, 0, sizeof(struct ocfs2_lock_res));
515         spin_lock_init(&res->l_lock);
516         init_waitqueue_head(&res->l_event);
517         INIT_LIST_HEAD(&res->l_blocked_list);
518         INIT_LIST_HEAD(&res->l_mask_waiters);
519 }
520
521 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
522                                enum ocfs2_lock_type type,
523                                unsigned int generation,
524                                struct inode *inode)
525 {
526         struct ocfs2_lock_res_ops *ops;
527
528         switch(type) {
529                 case OCFS2_LOCK_TYPE_RW:
530                         ops = &ocfs2_inode_rw_lops;
531                         break;
532                 case OCFS2_LOCK_TYPE_META:
533                         ops = &ocfs2_inode_inode_lops;
534                         break;
535                 case OCFS2_LOCK_TYPE_OPEN:
536                         ops = &ocfs2_inode_open_lops;
537                         break;
538                 default:
539                         mlog_bug_on_msg(1, "type: %d\n", type);
540                         ops = NULL; /* thanks, gcc */
541                         break;
542         };
543
544         ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
545                               generation, res->l_name);
546         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
547 }
548
549 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
550 {
551         struct inode *inode = ocfs2_lock_res_inode(lockres);
552
553         return OCFS2_SB(inode->i_sb);
554 }
555
556 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
557 {
558         struct ocfs2_mem_dqinfo *info = lockres->l_priv;
559
560         return OCFS2_SB(info->dqi_gi.dqi_sb);
561 }
562
563 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
564 {
565         struct ocfs2_file_private *fp = lockres->l_priv;
566
567         return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
568 }
569
570 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
571 {
572         __be64 inode_blkno_be;
573
574         memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
575                sizeof(__be64));
576
577         return be64_to_cpu(inode_blkno_be);
578 }
579
580 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
581 {
582         struct ocfs2_dentry_lock *dl = lockres->l_priv;
583
584         return OCFS2_SB(dl->dl_inode->i_sb);
585 }
586
587 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
588                                 u64 parent, struct inode *inode)
589 {
590         int len;
591         u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
592         __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
593         struct ocfs2_lock_res *lockres = &dl->dl_lockres;
594
595         ocfs2_lock_res_init_once(lockres);
596
597         /*
598          * Unfortunately, the standard lock naming scheme won't work
599          * here because we have two 16 byte values to use. Instead,
600          * we'll stuff the inode number as a binary value. We still
601          * want error prints to show something without garbling the
602          * display, so drop a null byte in there before the inode
603          * number. A future version of OCFS2 will likely use all
604          * binary lock names. The stringified names have been a
605          * tremendous aid in debugging, but now that the debugfs
606          * interface exists, we can mangle things there if need be.
607          *
608          * NOTE: We also drop the standard "pad" value (the total lock
609          * name size stays the same though - the last part is all
610          * zeros due to the memset in ocfs2_lock_res_init_once()
611          */
612         len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
613                        "%c%016llx",
614                        ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
615                        (long long)parent);
616
617         BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
618
619         memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
620                sizeof(__be64));
621
622         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
623                                    OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
624                                    dl);
625 }
626
627 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
628                                       struct ocfs2_super *osb)
629 {
630         /* Superblock lockres doesn't come from a slab so we call init
631          * once on it manually.  */
632         ocfs2_lock_res_init_once(res);
633         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
634                               0, res->l_name);
635         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
636                                    &ocfs2_super_lops, osb);
637 }
638
639 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
640                                        struct ocfs2_super *osb)
641 {
642         /* Rename lockres doesn't come from a slab so we call init
643          * once on it manually.  */
644         ocfs2_lock_res_init_once(res);
645         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
646         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
647                                    &ocfs2_rename_lops, osb);
648 }
649
650 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
651                                          struct ocfs2_super *osb)
652 {
653         /* nfs_sync lockres doesn't come from a slab so we call init
654          * once on it manually.  */
655         ocfs2_lock_res_init_once(res);
656         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
657         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
658                                    &ocfs2_nfs_sync_lops, osb);
659 }
660
661 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
662                                             struct ocfs2_super *osb)
663 {
664         ocfs2_lock_res_init_once(res);
665         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
666         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
667                                    &ocfs2_orphan_scan_lops, osb);
668 }
669
670 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
671                               struct ocfs2_file_private *fp)
672 {
673         struct inode *inode = fp->fp_file->f_mapping->host;
674         struct ocfs2_inode_info *oi = OCFS2_I(inode);
675
676         ocfs2_lock_res_init_once(lockres);
677         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
678                               inode->i_generation, lockres->l_name);
679         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
680                                    OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
681                                    fp);
682         lockres->l_flags |= OCFS2_LOCK_NOCACHE;
683 }
684
685 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
686                                struct ocfs2_mem_dqinfo *info)
687 {
688         ocfs2_lock_res_init_once(lockres);
689         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
690                               0, lockres->l_name);
691         ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
692                                    OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
693                                    info);
694 }
695
696 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
697 {
698         mlog_entry_void();
699
700         if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
701                 return;
702
703         ocfs2_remove_lockres_tracking(res);
704
705         mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
706                         "Lockres %s is on the blocked list\n",
707                         res->l_name);
708         mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
709                         "Lockres %s has mask waiters pending\n",
710                         res->l_name);
711         mlog_bug_on_msg(spin_is_locked(&res->l_lock),
712                         "Lockres %s is locked\n",
713                         res->l_name);
714         mlog_bug_on_msg(res->l_ro_holders,
715                         "Lockres %s has %u ro holders\n",
716                         res->l_name, res->l_ro_holders);
717         mlog_bug_on_msg(res->l_ex_holders,
718                         "Lockres %s has %u ex holders\n",
719                         res->l_name, res->l_ex_holders);
720
721         /* Need to clear out the lock status block for the dlm */
722         memset(&res->l_lksb, 0, sizeof(res->l_lksb));
723
724         res->l_flags = 0UL;
725         mlog_exit_void();
726 }
727
728 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
729                                      int level)
730 {
731         mlog_entry_void();
732
733         BUG_ON(!lockres);
734
735         switch(level) {
736         case DLM_LOCK_EX:
737                 lockres->l_ex_holders++;
738                 break;
739         case DLM_LOCK_PR:
740                 lockres->l_ro_holders++;
741                 break;
742         default:
743                 BUG();
744         }
745
746         mlog_exit_void();
747 }
748
749 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
750                                      int level)
751 {
752         mlog_entry_void();
753
754         BUG_ON(!lockres);
755
756         switch(level) {
757         case DLM_LOCK_EX:
758                 BUG_ON(!lockres->l_ex_holders);
759                 lockres->l_ex_holders--;
760                 break;
761         case DLM_LOCK_PR:
762                 BUG_ON(!lockres->l_ro_holders);
763                 lockres->l_ro_holders--;
764                 break;
765         default:
766                 BUG();
767         }
768         mlog_exit_void();
769 }
770
771 /* WARNING: This function lives in a world where the only three lock
772  * levels are EX, PR, and NL. It *will* have to be adjusted when more
773  * lock types are added. */
774 static inline int ocfs2_highest_compat_lock_level(int level)
775 {
776         int new_level = DLM_LOCK_EX;
777
778         if (level == DLM_LOCK_EX)
779                 new_level = DLM_LOCK_NL;
780         else if (level == DLM_LOCK_PR)
781                 new_level = DLM_LOCK_PR;
782         return new_level;
783 }
784
785 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
786                               unsigned long newflags)
787 {
788         struct ocfs2_mask_waiter *mw, *tmp;
789
790         assert_spin_locked(&lockres->l_lock);
791
792         lockres->l_flags = newflags;
793
794         list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
795                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
796                         continue;
797
798                 list_del_init(&mw->mw_item);
799                 mw->mw_status = 0;
800                 complete(&mw->mw_complete);
801         }
802 }
803 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
804 {
805         lockres_set_flags(lockres, lockres->l_flags | or);
806 }
807 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
808                                 unsigned long clear)
809 {
810         lockres_set_flags(lockres, lockres->l_flags & ~clear);
811 }
812
813 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
814 {
815         mlog_entry_void();
816
817         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
818         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
819         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
820         BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
821
822         lockres->l_level = lockres->l_requested;
823         if (lockres->l_level <=
824             ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
825                 lockres->l_blocking = DLM_LOCK_NL;
826                 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
827         }
828         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
829
830         mlog_exit_void();
831 }
832
833 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
834 {
835         mlog_entry_void();
836
837         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
838         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
839
840         /* Convert from RO to EX doesn't really need anything as our
841          * information is already up to data. Convert from NL to
842          * *anything* however should mark ourselves as needing an
843          * update */
844         if (lockres->l_level == DLM_LOCK_NL &&
845             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
846                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
847
848         lockres->l_level = lockres->l_requested;
849         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
850
851         mlog_exit_void();
852 }
853
854 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
855 {
856         mlog_entry_void();
857
858         BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
859         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
860
861         if (lockres->l_requested > DLM_LOCK_NL &&
862             !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
863             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
864                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
865
866         lockres->l_level = lockres->l_requested;
867         lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
868         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
869
870         mlog_exit_void();
871 }
872
873 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
874                                      int level)
875 {
876         int needs_downconvert = 0;
877         mlog_entry_void();
878
879         assert_spin_locked(&lockres->l_lock);
880
881         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
882
883         if (level > lockres->l_blocking) {
884                 /* only schedule a downconvert if we haven't already scheduled
885                  * one that goes low enough to satisfy the level we're
886                  * blocking.  this also catches the case where we get
887                  * duplicate BASTs */
888                 if (ocfs2_highest_compat_lock_level(level) <
889                     ocfs2_highest_compat_lock_level(lockres->l_blocking))
890                         needs_downconvert = 1;
891
892                 lockres->l_blocking = level;
893         }
894
895         mlog_exit(needs_downconvert);
896         return needs_downconvert;
897 }
898
899 /*
900  * OCFS2_LOCK_PENDING and l_pending_gen.
901  *
902  * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
903  * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
904  * for more details on the race.
905  *
906  * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
907  * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
908  * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
909  * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
910  * the caller is going to try to clear PENDING again.  If nothing else is
911  * happening, __lockres_clear_pending() sees PENDING is unset and does
912  * nothing.
913  *
914  * But what if another path (eg downconvert thread) has just started a
915  * new locking action?  The other path has re-set PENDING.  Our path
916  * cannot clear PENDING, because that will re-open the original race
917  * window.
918  *
919  * [Example]
920  *
921  * ocfs2_meta_lock()
922  *  ocfs2_cluster_lock()
923  *   set BUSY
924  *   set PENDING
925  *   drop l_lock
926  *   ocfs2_dlm_lock()
927  *    ocfs2_locking_ast()               ocfs2_downconvert_thread()
928  *     clear PENDING                     ocfs2_unblock_lock()
929  *                                        take_l_lock
930  *                                        !BUSY
931  *                                        ocfs2_prepare_downconvert()
932  *                                         set BUSY
933  *                                         set PENDING
934  *                                        drop l_lock
935  *   take l_lock
936  *   clear PENDING
937  *   drop l_lock
938  *                      <window>
939  *                                        ocfs2_dlm_lock()
940  *
941  * So as you can see, we now have a window where l_lock is not held,
942  * PENDING is not set, and ocfs2_dlm_lock() has not been called.
943  *
944  * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
945  * set by ocfs2_prepare_downconvert().  That wasn't nice.
946  *
947  * To solve this we introduce l_pending_gen.  A call to
948  * lockres_clear_pending() will only do so when it is passed a generation
949  * number that matches the lockres.  lockres_set_pending() will return the
950  * current generation number.  When ocfs2_cluster_lock() goes to clear
951  * PENDING, it passes the generation it got from set_pending().  In our
952  * example above, the generation numbers will *not* match.  Thus,
953  * ocfs2_cluster_lock() will not clear the PENDING set by
954  * ocfs2_prepare_downconvert().
955  */
956
957 /* Unlocked version for ocfs2_locking_ast() */
958 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
959                                     unsigned int generation,
960                                     struct ocfs2_super *osb)
961 {
962         assert_spin_locked(&lockres->l_lock);
963
964         /*
965          * The ast and locking functions can race us here.  The winner
966          * will clear pending, the loser will not.
967          */
968         if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
969             (lockres->l_pending_gen != generation))
970                 return;
971
972         lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
973         lockres->l_pending_gen++;
974
975         /*
976          * The downconvert thread may have skipped us because we
977          * were PENDING.  Wake it up.
978          */
979         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
980                 ocfs2_wake_downconvert_thread(osb);
981 }
982
983 /* Locked version for callers of ocfs2_dlm_lock() */
984 static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
985                                   unsigned int generation,
986                                   struct ocfs2_super *osb)
987 {
988         unsigned long flags;
989
990         spin_lock_irqsave(&lockres->l_lock, flags);
991         __lockres_clear_pending(lockres, generation, osb);
992         spin_unlock_irqrestore(&lockres->l_lock, flags);
993 }
994
995 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
996 {
997         assert_spin_locked(&lockres->l_lock);
998         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
999
1000         lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
1001
1002         return lockres->l_pending_gen;
1003 }
1004
1005
1006 static void ocfs2_blocking_ast(void *opaque, int level)
1007 {
1008         struct ocfs2_lock_res *lockres = opaque;
1009         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1010         int needs_downconvert;
1011         unsigned long flags;
1012
1013         BUG_ON(level <= DLM_LOCK_NL);
1014
1015         mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
1016              lockres->l_name, level, lockres->l_level,
1017              ocfs2_lock_type_string(lockres->l_type));
1018
1019         /*
1020          * We can skip the bast for locks which don't enable caching -
1021          * they'll be dropped at the earliest possible time anyway.
1022          */
1023         if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
1024                 return;
1025
1026         spin_lock_irqsave(&lockres->l_lock, flags);
1027         needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
1028         if (needs_downconvert)
1029                 ocfs2_schedule_blocked_lock(osb, lockres);
1030         spin_unlock_irqrestore(&lockres->l_lock, flags);
1031
1032         wake_up(&lockres->l_event);
1033
1034         ocfs2_wake_downconvert_thread(osb);
1035 }
1036
1037 static void ocfs2_locking_ast(void *opaque)
1038 {
1039         struct ocfs2_lock_res *lockres = opaque;
1040         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1041         unsigned long flags;
1042         int status;
1043
1044         spin_lock_irqsave(&lockres->l_lock, flags);
1045
1046         status = ocfs2_dlm_lock_status(&lockres->l_lksb);
1047
1048         if (status == -EAGAIN) {
1049                 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1050                 goto out;
1051         }
1052
1053         if (status) {
1054                 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
1055                      lockres->l_name, status);
1056                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1057                 return;
1058         }
1059
1060         switch(lockres->l_action) {
1061         case OCFS2_AST_ATTACH:
1062                 ocfs2_generic_handle_attach_action(lockres);
1063                 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
1064                 break;
1065         case OCFS2_AST_CONVERT:
1066                 ocfs2_generic_handle_convert_action(lockres);
1067                 break;
1068         case OCFS2_AST_DOWNCONVERT:
1069                 ocfs2_generic_handle_downconvert_action(lockres);
1070                 break;
1071         default:
1072                 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
1073                      "lockres flags = 0x%lx, unlock action: %u\n",
1074                      lockres->l_name, lockres->l_action, lockres->l_flags,
1075                      lockres->l_unlock_action);
1076                 BUG();
1077         }
1078 out:
1079         /* set it to something invalid so if we get called again we
1080          * can catch it. */
1081         lockres->l_action = OCFS2_AST_INVALID;
1082
1083         /* Did we try to cancel this lock?  Clear that state */
1084         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
1085                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1086
1087         /*
1088          * We may have beaten the locking functions here.  We certainly
1089          * know that dlm_lock() has been called :-)
1090          * Because we can't have two lock calls in flight at once, we
1091          * can use lockres->l_pending_gen.
1092          */
1093         __lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
1094
1095         wake_up(&lockres->l_event);
1096         spin_unlock_irqrestore(&lockres->l_lock, flags);
1097 }
1098
1099 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1100                                                 int convert)
1101 {
1102         unsigned long flags;
1103
1104         mlog_entry_void();
1105         spin_lock_irqsave(&lockres->l_lock, flags);
1106         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1107         if (convert)
1108                 lockres->l_action = OCFS2_AST_INVALID;
1109         else
1110                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1111         spin_unlock_irqrestore(&lockres->l_lock, flags);
1112
1113         wake_up(&lockres->l_event);
1114         mlog_exit_void();
1115 }
1116
1117 /* Note: If we detect another process working on the lock (i.e.,
1118  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
1119  * to do the right thing in that case.
1120  */
1121 static int ocfs2_lock_create(struct ocfs2_super *osb,
1122                              struct ocfs2_lock_res *lockres,
1123                              int level,
1124                              u32 dlm_flags)
1125 {
1126         int ret = 0;
1127         unsigned long flags;
1128         unsigned int gen;
1129
1130         mlog_entry_void();
1131
1132         mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
1133              dlm_flags);
1134
1135         spin_lock_irqsave(&lockres->l_lock, flags);
1136         if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
1137             (lockres->l_flags & OCFS2_LOCK_BUSY)) {
1138                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1139                 goto bail;
1140         }
1141
1142         lockres->l_action = OCFS2_AST_ATTACH;
1143         lockres->l_requested = level;
1144         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1145         gen = lockres_set_pending(lockres);
1146         spin_unlock_irqrestore(&lockres->l_lock, flags);
1147
1148         ret = ocfs2_dlm_lock(osb->cconn,
1149                              level,
1150                              &lockres->l_lksb,
1151                              dlm_flags,
1152                              lockres->l_name,
1153                              OCFS2_LOCK_ID_MAX_LEN - 1,
1154                              lockres);
1155         lockres_clear_pending(lockres, gen, osb);
1156         if (ret) {
1157                 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1158                 ocfs2_recover_from_dlm_error(lockres, 1);
1159         }
1160
1161         mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
1162
1163 bail:
1164         mlog_exit(ret);
1165         return ret;
1166 }
1167
1168 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
1169                                         int flag)
1170 {
1171         unsigned long flags;
1172         int ret;
1173
1174         spin_lock_irqsave(&lockres->l_lock, flags);
1175         ret = lockres->l_flags & flag;
1176         spin_unlock_irqrestore(&lockres->l_lock, flags);
1177
1178         return ret;
1179 }
1180
1181 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
1182
1183 {
1184         wait_event(lockres->l_event,
1185                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
1186 }
1187
1188 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
1189
1190 {
1191         wait_event(lockres->l_event,
1192                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
1193 }
1194
1195 /* predict what lock level we'll be dropping down to on behalf
1196  * of another node, and return true if the currently wanted
1197  * level will be compatible with it. */
1198 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
1199                                                      int wanted)
1200 {
1201         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
1202
1203         return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
1204 }
1205
1206 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
1207 {
1208         INIT_LIST_HEAD(&mw->mw_item);
1209         init_completion(&mw->mw_complete);
1210         ocfs2_init_start_time(mw);
1211 }
1212
1213 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
1214 {
1215         wait_for_completion(&mw->mw_complete);
1216         /* Re-arm the completion in case we want to wait on it again */
1217         INIT_COMPLETION(mw->mw_complete);
1218         return mw->mw_status;
1219 }
1220
1221 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
1222                                     struct ocfs2_mask_waiter *mw,
1223                                     unsigned long mask,
1224                                     unsigned long goal)
1225 {
1226         BUG_ON(!list_empty(&mw->mw_item));
1227
1228         assert_spin_locked(&lockres->l_lock);
1229
1230         list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
1231         mw->mw_mask = mask;
1232         mw->mw_goal = goal;
1233 }
1234
1235 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
1236  * if the mask still hadn't reached its goal */
1237 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1238                                       struct ocfs2_mask_waiter *mw)
1239 {
1240         unsigned long flags;
1241         int ret = 0;
1242
1243         spin_lock_irqsave(&lockres->l_lock, flags);
1244         if (!list_empty(&mw->mw_item)) {
1245                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
1246                         ret = -EBUSY;
1247
1248                 list_del_init(&mw->mw_item);
1249                 init_completion(&mw->mw_complete);
1250         }
1251         spin_unlock_irqrestore(&lockres->l_lock, flags);
1252
1253         return ret;
1254
1255 }
1256
1257 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1258                                              struct ocfs2_lock_res *lockres)
1259 {
1260         int ret;
1261
1262         ret = wait_for_completion_interruptible(&mw->mw_complete);
1263         if (ret)
1264                 lockres_remove_mask_waiter(lockres, mw);
1265         else
1266                 ret = mw->mw_status;
1267         /* Re-arm the completion in case we want to wait on it again */
1268         INIT_COMPLETION(mw->mw_complete);
1269         return ret;
1270 }
1271
1272 static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
1273                                 struct ocfs2_lock_res *lockres,
1274                                 int level,
1275                                 u32 lkm_flags,
1276                                 int arg_flags,
1277                                 int l_subclass,
1278                                 unsigned long caller_ip)
1279 {
1280         struct ocfs2_mask_waiter mw;
1281         int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1282         int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1283         unsigned long flags;
1284         unsigned int gen;
1285         int noqueue_attempted = 0;
1286
1287         mlog_entry_void();
1288
1289         ocfs2_init_mask_waiter(&mw);
1290
1291         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1292                 lkm_flags |= DLM_LKF_VALBLK;
1293
1294 again:
1295         wait = 0;
1296
1297         if (catch_signals && signal_pending(current)) {
1298                 ret = -ERESTARTSYS;
1299                 goto out;
1300         }
1301
1302         spin_lock_irqsave(&lockres->l_lock, flags);
1303
1304         mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1305                         "Cluster lock called on freeing lockres %s! flags "
1306                         "0x%lx\n", lockres->l_name, lockres->l_flags);
1307
1308         /* We only compare against the currently granted level
1309          * here. If the lock is blocked waiting on a downconvert,
1310          * we'll get caught below. */
1311         if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1312             level > lockres->l_level) {
1313                 /* is someone sitting in dlm_lock? If so, wait on
1314                  * them. */
1315                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1316                 wait = 1;
1317                 goto unlock;
1318         }
1319
1320         if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1321             !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1322                 /* is the lock is currently blocked on behalf of
1323                  * another node */
1324                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1325                 wait = 1;
1326                 goto unlock;
1327         }
1328
1329         if (level > lockres->l_level) {
1330                 if (noqueue_attempted > 0) {
1331                         ret = -EAGAIN;
1332                         goto unlock;
1333                 }
1334                 if (lkm_flags & DLM_LKF_NOQUEUE)
1335                         noqueue_attempted = 1;
1336
1337                 if (lockres->l_action != OCFS2_AST_INVALID)
1338                         mlog(ML_ERROR, "lockres %s has action %u pending\n",
1339                              lockres->l_name, lockres->l_action);
1340
1341                 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1342                         lockres->l_action = OCFS2_AST_ATTACH;
1343                         lkm_flags &= ~DLM_LKF_CONVERT;
1344                 } else {
1345                         lockres->l_action = OCFS2_AST_CONVERT;
1346                         lkm_flags |= DLM_LKF_CONVERT;
1347                 }
1348
1349                 lockres->l_requested = level;
1350                 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1351                 gen = lockres_set_pending(lockres);
1352                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1353
1354                 BUG_ON(level == DLM_LOCK_IV);
1355                 BUG_ON(level == DLM_LOCK_NL);
1356
1357                 mlog(0, "lock %s, convert from %d to level = %d\n",
1358                      lockres->l_name, lockres->l_level, level);
1359
1360                 /* call dlm_lock to upgrade lock now */
1361                 ret = ocfs2_dlm_lock(osb->cconn,
1362                                      level,
1363                                      &lockres->l_lksb,
1364                                      lkm_flags,
1365                                      lockres->l_name,
1366                                      OCFS2_LOCK_ID_MAX_LEN - 1,
1367                                      lockres);
1368                 lockres_clear_pending(lockres, gen, osb);
1369                 if (ret) {
1370                         if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1371                             (ret != -EAGAIN)) {
1372                                 ocfs2_log_dlm_error("ocfs2_dlm_lock",
1373                                                     ret, lockres);
1374                         }
1375                         ocfs2_recover_from_dlm_error(lockres, 1);
1376                         goto out;
1377                 }
1378
1379                 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
1380                      lockres->l_name);
1381
1382                 /* At this point we've gone inside the dlm and need to
1383                  * complete our work regardless. */
1384                 catch_signals = 0;
1385
1386                 /* wait for busy to clear and carry on */
1387                 goto again;
1388         }
1389
1390         /* Ok, if we get here then we're good to go. */
1391         ocfs2_inc_holders(lockres, level);
1392
1393         ret = 0;
1394 unlock:
1395         spin_unlock_irqrestore(&lockres->l_lock, flags);
1396 out:
1397         /*
1398          * This is helping work around a lock inversion between the page lock
1399          * and dlm locks.  One path holds the page lock while calling aops
1400          * which block acquiring dlm locks.  The voting thread holds dlm
1401          * locks while acquiring page locks while down converting data locks.
1402          * This block is helping an aop path notice the inversion and back
1403          * off to unlock its page lock before trying the dlm lock again.
1404          */
1405         if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1406             mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1407                 wait = 0;
1408                 if (lockres_remove_mask_waiter(lockres, &mw))
1409                         ret = -EAGAIN;
1410                 else
1411                         goto again;
1412         }
1413         if (wait) {
1414                 ret = ocfs2_wait_for_mask(&mw);
1415                 if (ret == 0)
1416                         goto again;
1417                 mlog_errno(ret);
1418         }
1419         ocfs2_update_lock_stats(lockres, level, &mw, ret);
1420
1421 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1422         if (!ret && lockres->l_lockdep_map.key != NULL) {
1423                 if (level == DLM_LOCK_PR)
1424                         rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
1425                                 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1426                                 caller_ip);
1427                 else
1428                         rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
1429                                 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1430                                 caller_ip);
1431         }
1432 #endif
1433         mlog_exit(ret);
1434         return ret;
1435 }
1436
1437 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
1438                                      struct ocfs2_lock_res *lockres,
1439                                      int level,
1440                                      u32 lkm_flags,
1441                                      int arg_flags)
1442 {
1443         return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
1444                                     0, _RET_IP_);
1445 }
1446
1447
1448 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
1449                                    struct ocfs2_lock_res *lockres,
1450                                    int level,
1451                                    unsigned long caller_ip)
1452 {
1453         unsigned long flags;
1454
1455         mlog_entry_void();
1456         spin_lock_irqsave(&lockres->l_lock, flags);
1457         ocfs2_dec_holders(lockres, level);
1458         ocfs2_downconvert_on_unlock(osb, lockres);
1459         spin_unlock_irqrestore(&lockres->l_lock, flags);
1460 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1461         if (lockres->l_lockdep_map.key != NULL)
1462                 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip);
1463 #endif
1464         mlog_exit_void();
1465 }
1466
1467 static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1468                                  struct ocfs2_lock_res *lockres,
1469                                  int ex,
1470                                  int local)
1471 {
1472         int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1473         unsigned long flags;
1474         u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1475
1476         spin_lock_irqsave(&lockres->l_lock, flags);
1477         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1478         lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1479         spin_unlock_irqrestore(&lockres->l_lock, flags);
1480
1481         return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1482 }
1483
1484 /* Grants us an EX lock on the data and metadata resources, skipping
1485  * the normal cluster directory lookup. Use this ONLY on newly created
1486  * inodes which other nodes can't possibly see, and which haven't been
1487  * hashed in the inode hash yet. This can give us a good performance
1488  * increase as it'll skip the network broadcast normally associated
1489  * with creating a new lock resource. */
1490 int ocfs2_create_new_inode_locks(struct inode *inode)
1491 {
1492         int ret;
1493         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1494
1495         BUG_ON(!inode);
1496         BUG_ON(!ocfs2_inode_is_new(inode));
1497
1498         mlog_entry_void();
1499
1500         mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1501
1502         /* NOTE: That we don't increment any of the holder counts, nor
1503          * do we add anything to a journal handle. Since this is
1504          * supposed to be a new inode which the cluster doesn't know
1505          * about yet, there is no need to.  As far as the LVB handling
1506          * is concerned, this is basically like acquiring an EX lock
1507          * on a resource which has an invalid one -- we'll set it
1508          * valid when we release the EX. */
1509
1510         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1511         if (ret) {
1512                 mlog_errno(ret);
1513                 goto bail;
1514         }
1515
1516         /*
1517          * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1518          * don't use a generation in their lock names.
1519          */
1520         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1521         if (ret) {
1522                 mlog_errno(ret);
1523                 goto bail;
1524         }
1525
1526         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1527         if (ret) {
1528                 mlog_errno(ret);
1529                 goto bail;
1530         }
1531
1532 bail:
1533         mlog_exit(ret);
1534         return ret;
1535 }
1536
1537 int ocfs2_rw_lock(struct inode *inode, int write)
1538 {
1539         int status, level;
1540         struct ocfs2_lock_res *lockres;
1541         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1542
1543         BUG_ON(!inode);
1544
1545         mlog_entry_void();
1546
1547         mlog(0, "inode %llu take %s RW lock\n",
1548              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1549              write ? "EXMODE" : "PRMODE");
1550
1551         if (ocfs2_mount_local(osb))
1552                 return 0;
1553
1554         lockres = &OCFS2_I(inode)->ip_rw_lockres;
1555
1556         level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1557
1558         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1559                                     0);
1560         if (status < 0)
1561                 mlog_errno(status);
1562
1563         mlog_exit(status);
1564         return status;
1565 }
1566
1567 void ocfs2_rw_unlock(struct inode *inode, int write)
1568 {
1569         int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1570         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1571         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1572
1573         mlog_entry_void();
1574
1575         mlog(0, "inode %llu drop %s RW lock\n",
1576              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1577              write ? "EXMODE" : "PRMODE");
1578
1579         if (!ocfs2_mount_local(osb))
1580                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1581
1582         mlog_exit_void();
1583 }
1584
1585 /*
1586  * ocfs2_open_lock always get PR mode lock.
1587  */
1588 int ocfs2_open_lock(struct inode *inode)
1589 {
1590         int status = 0;
1591         struct ocfs2_lock_res *lockres;
1592         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1593
1594         BUG_ON(!inode);
1595
1596         mlog_entry_void();
1597
1598         mlog(0, "inode %llu take PRMODE open lock\n",
1599              (unsigned long long)OCFS2_I(inode)->ip_blkno);
1600
1601         if (ocfs2_mount_local(osb))
1602                 goto out;
1603
1604         lockres = &OCFS2_I(inode)->ip_open_lockres;
1605
1606         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1607                                     DLM_LOCK_PR, 0, 0);
1608         if (status < 0)
1609                 mlog_errno(status);
1610
1611 out:
1612         mlog_exit(status);
1613         return status;
1614 }
1615
1616 int ocfs2_try_open_lock(struct inode *inode, int write)
1617 {
1618         int status = 0, level;
1619         struct ocfs2_lock_res *lockres;
1620         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1621
1622         BUG_ON(!inode);
1623
1624         mlog_entry_void();
1625
1626         mlog(0, "inode %llu try to take %s open lock\n",
1627              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1628              write ? "EXMODE" : "PRMODE");
1629
1630         if (ocfs2_mount_local(osb))
1631                 goto out;
1632
1633         lockres = &OCFS2_I(inode)->ip_open_lockres;
1634
1635         level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1636
1637         /*
1638          * The file system may already holding a PRMODE/EXMODE open lock.
1639          * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1640          * other nodes and the -EAGAIN will indicate to the caller that
1641          * this inode is still in use.
1642          */
1643         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1644                                     level, DLM_LKF_NOQUEUE, 0);
1645
1646 out:
1647         mlog_exit(status);
1648         return status;
1649 }
1650
1651 /*
1652  * ocfs2_open_unlock unlock PR and EX mode open locks.
1653  */
1654 void ocfs2_open_unlock(struct inode *inode)
1655 {
1656         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1657         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1658
1659         mlog_entry_void();
1660
1661         mlog(0, "inode %llu drop open lock\n",
1662              (unsigned long long)OCFS2_I(inode)->ip_blkno);
1663
1664         if (ocfs2_mount_local(osb))
1665                 goto out;
1666
1667         if(lockres->l_ro_holders)
1668                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1669                                      DLM_LOCK_PR);
1670         if(lockres->l_ex_holders)
1671                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1672                                      DLM_LOCK_EX);
1673
1674 out:
1675         mlog_exit_void();
1676 }
1677
1678 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1679                                      int level)
1680 {
1681         int ret;
1682         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1683         unsigned long flags;
1684         struct ocfs2_mask_waiter mw;
1685
1686         ocfs2_init_mask_waiter(&mw);
1687
1688 retry_cancel:
1689         spin_lock_irqsave(&lockres->l_lock, flags);
1690         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1691                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
1692                 if (ret) {
1693                         spin_unlock_irqrestore(&lockres->l_lock, flags);
1694                         ret = ocfs2_cancel_convert(osb, lockres);
1695                         if (ret < 0) {
1696                                 mlog_errno(ret);
1697                                 goto out;
1698                         }
1699                         goto retry_cancel;
1700                 }
1701                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1702                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1703
1704                 ocfs2_wait_for_mask(&mw);
1705                 goto retry_cancel;
1706         }
1707
1708         ret = -ERESTARTSYS;
1709         /*
1710          * We may still have gotten the lock, in which case there's no
1711          * point to restarting the syscall.
1712          */
1713         if (lockres->l_level == level)
1714                 ret = 0;
1715
1716         mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1717              lockres->l_flags, lockres->l_level, lockres->l_action);
1718
1719         spin_unlock_irqrestore(&lockres->l_lock, flags);
1720
1721 out:
1722         return ret;
1723 }
1724
1725 /*
1726  * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1727  * flock() calls. The locking approach this requires is sufficiently
1728  * different from all other cluster lock types that we implement a
1729  * seperate path to the "low-level" dlm calls. In particular:
1730  *
1731  * - No optimization of lock levels is done - we take at exactly
1732  *   what's been requested.
1733  *
1734  * - No lock caching is employed. We immediately downconvert to
1735  *   no-lock at unlock time. This also means flock locks never go on
1736  *   the blocking list).
1737  *
1738  * - Since userspace can trivially deadlock itself with flock, we make
1739  *   sure to allow cancellation of a misbehaving applications flock()
1740  *   request.
1741  *
1742  * - Access to any flock lockres doesn't require concurrency, so we
1743  *   can simplify the code by requiring the caller to guarantee
1744  *   serialization of dlmglue flock calls.
1745  */
1746 int ocfs2_file_lock(struct file *file, int ex, int trylock)
1747 {
1748         int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1749         unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
1750         unsigned long flags;
1751         struct ocfs2_file_private *fp = file->private_data;
1752         struct ocfs2_lock_res *lockres = &fp->fp_flock;
1753         struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1754         struct ocfs2_mask_waiter mw;
1755
1756         ocfs2_init_mask_waiter(&mw);
1757
1758         if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1759             (lockres->l_level > DLM_LOCK_NL)) {
1760                 mlog(ML_ERROR,
1761                      "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1762                      "level: %u\n", lockres->l_name, lockres->l_flags,
1763                      lockres->l_level);
1764                 return -EINVAL;
1765         }
1766
1767         spin_lock_irqsave(&lockres->l_lock, flags);
1768         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1769                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1770                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1771
1772                 /*
1773                  * Get the lock at NLMODE to start - that way we
1774                  * can cancel the upconvert request if need be.
1775                  */
1776                 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
1777                 if (ret < 0) {
1778                         mlog_errno(ret);
1779                         goto out;
1780                 }
1781
1782                 ret = ocfs2_wait_for_mask(&mw);
1783                 if (ret) {
1784                         mlog_errno(ret);
1785                         goto out;
1786                 }
1787                 spin_lock_irqsave(&lockres->l_lock, flags);
1788         }
1789
1790         lockres->l_action = OCFS2_AST_CONVERT;
1791         lkm_flags |= DLM_LKF_CONVERT;
1792         lockres->l_requested = level;
1793         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1794
1795         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1796         spin_unlock_irqrestore(&lockres->l_lock, flags);
1797
1798         ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
1799                              lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
1800                              lockres);
1801         if (ret) {
1802                 if (!trylock || (ret != -EAGAIN)) {
1803                         ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1804                         ret = -EINVAL;
1805                 }
1806
1807                 ocfs2_recover_from_dlm_error(lockres, 1);
1808                 lockres_remove_mask_waiter(lockres, &mw);
1809                 goto out;
1810         }
1811
1812         ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
1813         if (ret == -ERESTARTSYS) {
1814                 /*
1815                  * Userspace can cause deadlock itself with
1816                  * flock(). Current behavior locally is to allow the
1817                  * deadlock, but abort the system call if a signal is
1818                  * received. We follow this example, otherwise a
1819                  * poorly written program could sit in kernel until
1820                  * reboot.
1821                  *
1822                  * Handling this is a bit more complicated for Ocfs2
1823                  * though. We can't exit this function with an
1824                  * outstanding lock request, so a cancel convert is
1825                  * required. We intentionally overwrite 'ret' - if the
1826                  * cancel fails and the lock was granted, it's easier
1827                  * to just bubble sucess back up to the user.
1828                  */
1829                 ret = ocfs2_flock_handle_signal(lockres, level);
1830         } else if (!ret && (level > lockres->l_level)) {
1831                 /* Trylock failed asynchronously */
1832                 BUG_ON(!trylock);
1833                 ret = -EAGAIN;
1834         }
1835
1836 out:
1837
1838         mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
1839              lockres->l_name, ex, trylock, ret);
1840         return ret;
1841 }
1842
1843 void ocfs2_file_unlock(struct file *file)
1844 {
1845         int ret;
1846         unsigned int gen;
1847         unsigned long flags;
1848         struct ocfs2_file_private *fp = file->private_data;
1849         struct ocfs2_lock_res *lockres = &fp->fp_flock;
1850         struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1851         struct ocfs2_mask_waiter mw;
1852
1853         ocfs2_init_mask_waiter(&mw);
1854
1855         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
1856                 return;
1857
1858         if (lockres->l_level == DLM_LOCK_NL)
1859                 return;
1860
1861         mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
1862              lockres->l_name, lockres->l_flags, lockres->l_level,
1863              lockres->l_action);
1864
1865         spin_lock_irqsave(&lockres->l_lock, flags);
1866         /*
1867          * Fake a blocking ast for the downconvert code.
1868          */
1869         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1870         lockres->l_blocking = DLM_LOCK_EX;
1871
1872         gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
1873         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1874         spin_unlock_irqrestore(&lockres->l_lock, flags);
1875
1876         ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
1877         if (ret) {
1878                 mlog_errno(ret);
1879                 return;
1880         }
1881
1882         ret = ocfs2_wait_for_mask(&mw);
1883         if (ret)
1884                 mlog_errno(ret);
1885 }
1886
1887 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
1888                                         struct ocfs2_lock_res *lockres)
1889 {
1890         int kick = 0;
1891
1892         mlog_entry_void();
1893
1894         /* If we know that another node is waiting on our lock, kick
1895          * the downconvert thread * pre-emptively when we reach a release
1896          * condition. */
1897         if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1898                 switch(lockres->l_blocking) {
1899                 case DLM_LOCK_EX:
1900                         if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1901                                 kick = 1;
1902                         break;
1903                 case DLM_LOCK_PR:
1904                         if (!lockres->l_ex_holders)
1905                                 kick = 1;
1906                         break;
1907                 default:
1908                         BUG();
1909                 }
1910         }
1911
1912         if (kick)
1913                 ocfs2_wake_downconvert_thread(osb);
1914
1915         mlog_exit_void();
1916 }
1917
1918 #define OCFS2_SEC_BITS   34
1919 #define OCFS2_SEC_SHIFT  (64 - 34)
1920 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1921
1922 /* LVB only has room for 64 bits of time here so we pack it for
1923  * now. */
1924 static u64 ocfs2_pack_timespec(struct timespec *spec)
1925 {
1926         u64 res;
1927         u64 sec = spec->tv_sec;
1928         u32 nsec = spec->tv_nsec;
1929
1930         res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1931
1932         return res;
1933 }
1934
1935 /* Call this with the lockres locked. I am reasonably sure we don't
1936  * need ip_lock in this function as anyone who would be changing those
1937  * values is supposed to be blocked in ocfs2_inode_lock right now. */
1938 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1939 {
1940         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1941         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1942         struct ocfs2_meta_lvb *lvb;
1943
1944         mlog_entry_void();
1945
1946         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
1947
1948         /*
1949          * Invalidate the LVB of a deleted inode - this way other
1950          * nodes are forced to go to disk and discover the new inode
1951          * status.
1952          */
1953         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1954                 lvb->lvb_version = 0;
1955                 goto out;
1956         }
1957
1958         lvb->lvb_version   = OCFS2_LVB_VERSION;
1959         lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
1960         lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1961         lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1962         lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1963         lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1964         lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1965         lvb->lvb_iatime_packed  =
1966                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1967         lvb->lvb_ictime_packed =
1968                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1969         lvb->lvb_imtime_packed =
1970                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1971         lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1972         lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
1973         lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1974
1975 out:
1976         mlog_meta_lvb(0, lockres);
1977
1978         mlog_exit_void();
1979 }
1980
1981 static void ocfs2_unpack_timespec(struct timespec *spec,
1982                                   u64 packed_time)
1983 {
1984         spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1985         spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1986 }
1987
1988 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1989 {
1990         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1991         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1992         struct ocfs2_meta_lvb *lvb;
1993
1994         mlog_entry_void();
1995
1996         mlog_meta_lvb(0, lockres);
1997
1998         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
1999
2000         /* We're safe here without the lockres lock... */
2001         spin_lock(&oi->ip_lock);
2002         oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
2003         i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
2004
2005         oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
2006         oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
2007         ocfs2_set_inode_flags(inode);
2008
2009         /* fast-symlinks are a special case */
2010         if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
2011                 inode->i_blocks = 0;
2012         else
2013                 inode->i_blocks = ocfs2_inode_sector_count(inode);
2014
2015         inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
2016         inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
2017         inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
2018         inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
2019         ocfs2_unpack_timespec(&inode->i_atime,
2020                               be64_to_cpu(lvb->lvb_iatime_packed));
2021         ocfs2_unpack_timespec(&inode->i_mtime,
2022                               be64_to_cpu(lvb->lvb_imtime_packed));
2023         ocfs2_unpack_timespec(&inode->i_ctime,
2024                               be64_to_cpu(lvb->lvb_ictime_packed));
2025         spin_unlock(&oi->ip_lock);
2026
2027         mlog_exit_void();
2028 }
2029
2030 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
2031                                               struct ocfs2_lock_res *lockres)
2032 {
2033         struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2034
2035         if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
2036             && lvb->lvb_version == OCFS2_LVB_VERSION
2037             && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
2038                 return 1;
2039         return 0;
2040 }
2041
2042 /* Determine whether a lock resource needs to be refreshed, and
2043  * arbitrate who gets to refresh it.
2044  *
2045  *   0 means no refresh needed.
2046  *
2047  *   > 0 means you need to refresh this and you MUST call
2048  *   ocfs2_complete_lock_res_refresh afterwards. */
2049 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
2050 {
2051         unsigned long flags;
2052         int status = 0;
2053
2054         mlog_entry_void();
2055
2056 refresh_check:
2057         spin_lock_irqsave(&lockres->l_lock, flags);
2058         if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2059                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2060                 goto bail;
2061         }
2062
2063         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2064                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2065
2066                 ocfs2_wait_on_refreshing_lock(lockres);
2067                 goto refresh_check;
2068         }
2069
2070         /* Ok, I'll be the one to refresh this lock. */
2071         lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
2072         spin_unlock_irqrestore(&lockres->l_lock, flags);
2073
2074         status = 1;
2075 bail:
2076         mlog_exit(status);
2077         return status;
2078 }
2079
2080 /* If status is non zero, I'll mark it as not being in refresh
2081  * anymroe, but i won't clear the needs refresh flag. */
2082 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
2083                                                    int status)
2084 {
2085         unsigned long flags;
2086         mlog_entry_void();
2087
2088         spin_lock_irqsave(&lockres->l_lock, flags);
2089         lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
2090         if (!status)
2091                 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
2092         spin_unlock_irqrestore(&lockres->l_lock, flags);
2093
2094         wake_up(&lockres->l_event);
2095
2096         mlog_exit_void();
2097 }
2098
2099 /* may or may not return a bh if it went to disk. */
2100 static int ocfs2_inode_lock_update(struct inode *inode,
2101                                   struct buffer_head **bh)
2102 {
2103         int status = 0;
2104         struct ocfs2_inode_info *oi = OCFS2_I(inode);
2105         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2106         struct ocfs2_dinode *fe;
2107         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2108
2109         mlog_entry_void();
2110
2111         if (ocfs2_mount_local(osb))
2112                 goto bail;
2113
2114         spin_lock(&oi->ip_lock);
2115         if (oi->ip_flags & OCFS2_INODE_DELETED) {
2116                 mlog(0, "Orphaned inode %llu was deleted while we "
2117                      "were waiting on a lock. ip_flags = 0x%x\n",
2118                      (unsigned long long)oi->ip_blkno, oi->ip_flags);
2119                 spin_unlock(&oi->ip_lock);
2120                 status = -ENOENT;
2121                 goto bail;
2122         }
2123         spin_unlock(&oi->ip_lock);
2124
2125         if (!ocfs2_should_refresh_lock_res(lockres))
2126                 goto bail;
2127
2128         /* This will discard any caching information we might have had
2129          * for the inode metadata. */
2130         ocfs2_metadata_cache_purge(inode);
2131
2132         ocfs2_extent_map_trunc(inode, 0);
2133
2134         if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
2135                 mlog(0, "Trusting LVB on inode %llu\n",
2136                      (unsigned long long)oi->ip_blkno);
2137                 ocfs2_refresh_inode_from_lvb(inode);
2138         } else {
2139                 /* Boo, we have to go to disk. */
2140                 /* read bh, cast, ocfs2_refresh_inode */
2141                 status = ocfs2_read_inode_block(inode, bh);
2142                 if (status < 0) {
2143                         mlog_errno(status);
2144                         goto bail_refresh;
2145                 }
2146                 fe = (struct ocfs2_dinode *) (*bh)->b_data;
2147
2148                 /* This is a good chance to make sure we're not
2149                  * locking an invalid object.  ocfs2_read_inode_block()
2150                  * already checked that the inode block is sane.
2151                  *
2152                  * We bug on a stale inode here because we checked
2153                  * above whether it was wiped from disk. The wiping
2154                  * node provides a guarantee that we receive that
2155                  * message and can mark the inode before dropping any
2156                  * locks associated with it. */
2157                 mlog_bug_on_msg(inode->i_generation !=
2158                                 le32_to_cpu(fe->i_generation),
2159                                 "Invalid dinode %llu disk generation: %u "
2160                                 "inode->i_generation: %u\n",
2161                                 (unsigned long long)oi->ip_blkno,
2162                                 le32_to_cpu(fe->i_generation),
2163                                 inode->i_generation);
2164                 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
2165                                 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
2166                                 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
2167                                 (unsigned long long)oi->ip_blkno,
2168                                 (unsigned long long)le64_to_cpu(fe->i_dtime),
2169                                 le32_to_cpu(fe->i_flags));
2170
2171                 ocfs2_refresh_inode(inode, fe);
2172                 ocfs2_track_lock_refresh(lockres);
2173         }
2174
2175         status = 0;
2176 bail_refresh:
2177         ocfs2_complete_lock_res_refresh(lockres, status);
2178 bail:
2179         mlog_exit(status);
2180         return status;
2181 }
2182
2183 static int ocfs2_assign_bh(struct inode *inode,
2184                            struct buffer_head **ret_bh,
2185                            struct buffer_head *passed_bh)
2186 {
2187         int status;
2188
2189         if (passed_bh) {
2190                 /* Ok, the update went to disk for us, use the
2191                  * returned bh. */
2192                 *ret_bh = passed_bh;
2193                 get_bh(*ret_bh);
2194
2195                 return 0;
2196         }
2197
2198         status = ocfs2_read_inode_block(inode, ret_bh);
2199         if (status < 0)
2200                 mlog_errno(status);
2201
2202         return status;
2203 }
2204
2205 /*
2206  * returns < 0 error if the callback will never be called, otherwise
2207  * the result of the lock will be communicated via the callback.
2208  */
2209 int ocfs2_inode_lock_full_nested(struct inode *inode,
2210                                  struct buffer_head **ret_bh,
2211                                  int ex,
2212                                  int arg_flags,
2213                                  int subclass)
2214 {
2215         int status, level, acquired;
2216         u32 dlm_flags;
2217         struct ocfs2_lock_res *lockres = NULL;
2218         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2219         struct buffer_head *local_bh = NULL;
2220
2221         BUG_ON(!inode);
2222
2223         mlog_entry_void();
2224
2225         mlog(0, "inode %llu, take %s META lock\n",
2226              (unsigned long long)OCFS2_I(inode)->ip_blkno,
2227              ex ? "EXMODE" : "PRMODE");
2228
2229         status = 0;
2230         acquired = 0;
2231         /* We'll allow faking a readonly metadata lock for
2232          * rodevices. */
2233         if (ocfs2_is_hard_readonly(osb)) {
2234                 if (ex)
2235                         status = -EROFS;
2236                 goto bail;
2237         }
2238
2239         if (ocfs2_mount_local(osb))
2240                 goto local;
2241
2242         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2243                 ocfs2_wait_for_recovery(osb);
2244
2245         lockres = &OCFS2_I(inode)->ip_inode_lockres;
2246         level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2247         dlm_flags = 0;
2248         if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
2249                 dlm_flags |= DLM_LKF_NOQUEUE;
2250
2251         status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
2252                                       arg_flags, subclass, _RET_IP_);
2253         if (status < 0) {
2254                 if (status != -EAGAIN && status != -EIOCBRETRY)
2255                         mlog_errno(status);
2256                 goto bail;
2257         }
2258
2259         /* Notify the error cleanup path to drop the cluster lock. */
2260         acquired = 1;
2261
2262         /* We wait twice because a node may have died while we were in
2263          * the lower dlm layers. The second time though, we've
2264          * committed to owning this lock so we don't allow signals to
2265          * abort the operation. */
2266         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2267                 ocfs2_wait_for_recovery(osb);
2268
2269 local:
2270         /*
2271          * We only see this flag if we're being called from
2272          * ocfs2_read_locked_inode(). It means we're locking an inode
2273          * which hasn't been populated yet, so clear the refresh flag
2274          * and let the caller handle it.
2275          */
2276         if (inode->i_state & I_NEW) {
2277                 status = 0;
2278                 if (lockres)
2279                         ocfs2_complete_lock_res_refresh(lockres, 0);
2280                 goto bail;
2281         }
2282
2283         /* This is fun. The caller may want a bh back, or it may
2284          * not. ocfs2_inode_lock_update definitely wants one in, but
2285          * may or may not read one, depending on what's in the
2286          * LVB. The result of all of this is that we've *only* gone to
2287          * disk if we have to, so the complexity is worthwhile. */
2288         status = ocfs2_inode_lock_update(inode, &local_bh);
2289         if (status < 0) {
2290                 if (status != -ENOENT)
2291                         mlog_errno(status);
2292                 goto bail;
2293         }
2294
2295         if (ret_bh) {
2296                 status = ocfs2_assign_bh(inode, ret_bh, local_bh);
2297                 if (status < 0) {
2298                         mlog_errno(status);
2299                         goto bail;
2300                 }
2301         }
2302
2303 bail:
2304         if (status < 0) {
2305                 if (ret_bh && (*ret_bh)) {
2306                         brelse(*ret_bh);
2307                         *ret_bh = NULL;
2308                 }
2309                 if (acquired)
2310                         ocfs2_inode_unlock(inode, ex);
2311         }
2312
2313         if (local_bh)
2314                 brelse(local_bh);
2315
2316         mlog_exit(status);
2317         return status;
2318 }
2319
2320 /*
2321  * This is working around a lock inversion between tasks acquiring DLM
2322  * locks while holding a page lock and the downconvert thread which
2323  * blocks dlm lock acquiry while acquiring page locks.
2324  *
2325  * ** These _with_page variantes are only intended to be called from aop
2326  * methods that hold page locks and return a very specific *positive* error
2327  * code that aop methods pass up to the VFS -- test for errors with != 0. **
2328  *
2329  * The DLM is called such that it returns -EAGAIN if it would have
2330  * blocked waiting for the downconvert thread.  In that case we unlock
2331  * our page so the downconvert thread can make progress.  Once we've
2332  * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2333  * that called us can bubble that back up into the VFS who will then
2334  * immediately retry the aop call.
2335  *
2336  * We do a blocking lock and immediate unlock before returning, though, so that
2337  * the lock has a great chance of being cached on this node by the time the VFS
2338  * calls back to retry the aop.    This has a potential to livelock as nodes
2339  * ping locks back and forth, but that's a risk we're willing to take to avoid
2340  * the lock inversion simply.
2341  */
2342 int ocfs2_inode_lock_with_page(struct inode *inode,
2343                               struct buffer_head **ret_bh,
2344                               int ex,
2345                               struct page *page)
2346 {
2347         int ret;
2348
2349         ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2350         if (ret == -EAGAIN) {
2351                 unlock_page(page);
2352                 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2353                         ocfs2_inode_unlock(inode, ex);
2354                 ret = AOP_TRUNCATED_PAGE;
2355         }
2356
2357         return ret;
2358 }
2359
2360 int ocfs2_inode_lock_atime(struct inode *inode,
2361                           struct vfsmount *vfsmnt,
2362                           int *level)
2363 {
2364         int ret;
2365
2366         mlog_entry_void();
2367         ret = ocfs2_inode_lock(inode, NULL, 0);
2368         if (ret < 0) {
2369                 mlog_errno(ret);
2370                 return ret;
2371         }
2372
2373         /*
2374          * If we should update atime, we will get EX lock,
2375          * otherwise we just get PR lock.
2376          */
2377         if (ocfs2_should_update_atime(inode, vfsmnt)) {
2378                 struct buffer_head *bh = NULL;
2379
2380                 ocfs2_inode_unlock(inode, 0);
2381                 ret = ocfs2_inode_lock(inode, &bh, 1);
2382                 if (ret < 0) {
2383                         mlog_errno(ret);
2384                         return ret;
2385                 }
2386                 *level = 1;
2387                 if (ocfs2_should_update_atime(inode, vfsmnt))
2388                         ocfs2_update_inode_atime(inode, bh);
2389                 if (bh)
2390                         brelse(bh);
2391         } else
2392                 *level = 0;
2393
2394         mlog_exit(ret);
2395         return ret;
2396 }
2397
2398 void ocfs2_inode_unlock(struct inode *inode,
2399                        int ex)
2400 {
2401         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2402         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2403         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2404
2405         mlog_entry_void();
2406
2407         mlog(0, "inode %llu drop %s META lock\n",
2408              (unsigned long long)OCFS2_I(inode)->ip_blkno,
2409              ex ? "EXMODE" : "PRMODE");
2410
2411         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
2412             !ocfs2_mount_local(osb))
2413                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
2414
2415         mlog_exit_void();
2416 }
2417
2418 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
2419 {
2420         struct ocfs2_lock_res *lockres;
2421         struct ocfs2_orphan_scan_lvb *lvb;
2422         int status = 0;
2423
2424         if (ocfs2_is_hard_readonly(osb))
2425                 return -EROFS;
2426
2427         if (ocfs2_mount_local(osb))
2428                 return 0;
2429
2430         lockres = &osb->osb_orphan_scan.os_lockres;
2431         status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2432         if (status < 0)
2433                 return status;
2434
2435         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2436         if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2437             lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
2438                 *seqno = be32_to_cpu(lvb->lvb_os_seqno);
2439         else
2440                 *seqno = osb->osb_orphan_scan.os_seqno + 1;
2441
2442         return status;
2443 }
2444
2445 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
2446 {
2447         struct ocfs2_lock_res *lockres;
2448         struct ocfs2_orphan_scan_lvb *lvb;
2449
2450         if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
2451                 lockres = &osb->osb_orphan_scan.os_lockres;
2452                 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2453                 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
2454                 lvb->lvb_os_seqno = cpu_to_be32(seqno);
2455                 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2456         }
2457 }
2458
2459 int ocfs2_super_lock(struct ocfs2_super *osb,
2460                      int ex)
2461 {
2462         int status = 0;
2463         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2464         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2465
2466         mlog_entry_void();
2467
2468         if (ocfs2_is_hard_readonly(osb))
2469                 return -EROFS;
2470
2471         if (ocfs2_mount_local(osb))
2472                 goto bail;
2473
2474         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2475         if (status < 0) {
2476                 mlog_errno(status);
2477                 goto bail;
2478         }
2479
2480         /* The super block lock path is really in the best position to
2481          * know when resources covered by the lock need to be
2482          * refreshed, so we do it here. Of course, making sense of
2483          * everything is up to the caller :) */
2484         status = ocfs2_should_refresh_lock_res(lockres);
2485         if (status < 0) {
2486                 mlog_errno(status);
2487                 goto bail;
2488         }
2489         if (status) {
2490                 status = ocfs2_refresh_slot_info(osb);
2491
2492                 ocfs2_complete_lock_res_refresh(lockres, status);
2493
2494                 if (status < 0)
2495                         mlog_errno(status);
2496                 ocfs2_track_lock_refresh(lockres);
2497         }
2498 bail:
2499         mlog_exit(status);
2500         return status;
2501 }
2502
2503 void ocfs2_super_unlock(struct ocfs2_super *osb,
2504                         int ex)
2505 {
2506         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2507         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2508
2509         if (!ocfs2_mount_local(osb))
2510                 ocfs2_cluster_unlock(osb, lockres, level);
2511 }
2512
2513 int ocfs2_rename_lock(struct ocfs2_super *osb)
2514 {
2515         int status;
2516         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2517
2518         if (ocfs2_is_hard_readonly(osb))
2519                 return -EROFS;
2520
2521         if (ocfs2_mount_local(osb))
2522                 return 0;
2523
2524         status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2525         if (status < 0)
2526                 mlog_errno(status);
2527
2528         return status;
2529 }
2530
2531 void ocfs2_rename_unlock(struct ocfs2_super *osb)
2532 {
2533         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2534
2535         if (!ocfs2_mount_local(osb))
2536                 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2537 }
2538
2539 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
2540 {
2541         int status;
2542         struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2543
2544         if (ocfs2_is_hard_readonly(osb))
2545                 return -EROFS;
2546
2547         if (ocfs2_mount_local(osb))
2548                 return 0;
2549
2550         status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
2551                                     0, 0);
2552         if (status < 0)
2553                 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
2554
2555         return status;
2556 }
2557
2558 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
2559 {
2560         struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2561
2562         if (!ocfs2_mount_local(osb))
2563                 ocfs2_cluster_unlock(osb, lockres,
2564                                      ex ? LKM_EXMODE : LKM_PRMODE);
2565 }
2566
2567 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2568 {
2569         int ret;
2570         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2571         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2572         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2573
2574         BUG_ON(!dl);
2575
2576         if (ocfs2_is_hard_readonly(osb))
2577                 return -EROFS;
2578
2579         if (ocfs2_mount_local(osb))
2580                 return 0;
2581
2582         ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2583         if (ret < 0)
2584                 mlog_errno(ret);
2585
2586         return ret;
2587 }
2588
2589 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2590 {
2591         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2592         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2593         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2594
2595         if (!ocfs2_mount_local(osb))
2596                 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2597 }
2598
2599 /* Reference counting of the dlm debug structure. We want this because
2600  * open references on the debug inodes can live on after a mount, so
2601  * we can't rely on the ocfs2_super to always exist. */
2602 static void ocfs2_dlm_debug_free(struct kref *kref)
2603 {
2604         struct ocfs2_dlm_debug *dlm_debug;
2605
2606         dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2607
2608         kfree(dlm_debug);
2609 }
2610
2611 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2612 {
2613         if (dlm_debug)
2614                 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2615 }
2616
2617 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2618 {
2619         kref_get(&debug->d_refcnt);
2620 }
2621
2622 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2623 {
2624         struct ocfs2_dlm_debug *dlm_debug;
2625
2626         dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2627         if (!dlm_debug) {
2628                 mlog_errno(-ENOMEM);
2629                 goto out;
2630         }
2631
2632         kref_init(&dlm_debug->d_refcnt);
2633         INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2634         dlm_debug->d_locking_state = NULL;
2635 out:
2636         return dlm_debug;
2637 }
2638
2639 /* Access to this is arbitrated for us via seq_file->sem. */
2640 struct ocfs2_dlm_seq_priv {
2641         struct ocfs2_dlm_debug *p_dlm_debug;
2642         struct ocfs2_lock_res p_iter_res;
2643         struct ocfs2_lock_res p_tmp_res;
2644 };
2645
2646 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2647                                                  struct ocfs2_dlm_seq_priv *priv)
2648 {
2649         struct ocfs2_lock_res *iter, *ret = NULL;
2650         struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2651
2652         assert_spin_locked(&ocfs2_dlm_tracking_lock);
2653
2654         list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2655                 /* discover the head of the list */
2656                 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2657                         mlog(0, "End of list found, %p\n", ret);
2658                         break;
2659                 }
2660
2661                 /* We track our "dummy" iteration lockres' by a NULL
2662                  * l_ops field. */
2663                 if (iter->l_ops != NULL) {
2664                         ret = iter;
2665                         break;
2666                 }
2667         }
2668
2669         return ret;
2670 }
2671
2672 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2673 {
2674         struct ocfs2_dlm_seq_priv *priv = m->private;
2675         struct ocfs2_lock_res *iter;
2676
2677         spin_lock(&ocfs2_dlm_tracking_lock);
2678         iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2679         if (iter) {
2680                 /* Since lockres' have the lifetime of their container
2681                  * (which can be inodes, ocfs2_supers, etc) we want to
2682                  * copy this out to a temporary lockres while still
2683                  * under the spinlock. Obviously after this we can't
2684                  * trust any pointers on the copy returned, but that's
2685                  * ok as the information we want isn't typically held
2686                  * in them. */
2687                 priv->p_tmp_res = *iter;
2688                 iter = &priv->p_tmp_res;
2689         }
2690         spin_unlock(&ocfs2_dlm_tracking_lock);
2691
2692         return iter;
2693 }
2694
2695 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2696 {
2697 }
2698
2699 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2700 {
2701         struct ocfs2_dlm_seq_priv *priv = m->private;
2702         struct ocfs2_lock_res *iter = v;
2703         struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2704
2705         spin_lock(&ocfs2_dlm_tracking_lock);
2706         iter = ocfs2_dlm_next_res(iter, priv);
2707         list_del_init(&dummy->l_debug_list);
2708         if (iter) {
2709                 list_add(&dummy->l_debug_list, &iter->l_debug_list);
2710                 priv->p_tmp_res = *iter;
2711                 iter = &priv->p_tmp_res;
2712         }
2713         spin_unlock(&ocfs2_dlm_tracking_lock);
2714
2715         return iter;
2716 }
2717
2718 /* So that debugfs.ocfs2 can determine which format is being used */
2719 #define OCFS2_DLM_DEBUG_STR_VERSION 2
2720 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2721 {
2722         int i;
2723         char *lvb;
2724         struct ocfs2_lock_res *lockres = v;
2725
2726         if (!lockres)
2727                 return -EINVAL;
2728
2729         seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2730
2731         if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2732                 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2733                            lockres->l_name,
2734                            (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2735         else
2736                 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2737
2738         seq_printf(m, "%d\t"
2739                    "0x%lx\t"
2740                    "0x%x\t"
2741                    "0x%x\t"
2742                    "%u\t"
2743                    "%u\t"
2744                    "%d\t"
2745                    "%d\t",
2746                    lockres->l_level,
2747                    lockres->l_flags,
2748                    lockres->l_action,
2749                    lockres->l_unlock_action,
2750                    lockres->l_ro_holders,
2751                    lockres->l_ex_holders,
2752                    lockres->l_requested,
2753                    lockres->l_blocking);
2754
2755         /* Dump the raw LVB */
2756         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2757         for(i = 0; i < DLM_LVB_LEN; i++)
2758                 seq_printf(m, "0x%x\t", lvb[i]);
2759
2760 #ifdef CONFIG_OCFS2_FS_STATS
2761 # define lock_num_prmode(_l)            (_l)->l_lock_num_prmode
2762 # define lock_num_exmode(_l)            (_l)->l_lock_num_exmode
2763 # define lock_num_prmode_failed(_l)     (_l)->l_lock_num_prmode_failed
2764 # define lock_num_exmode_failed(_l)     (_l)->l_lock_num_exmode_failed
2765 # define lock_total_prmode(_l)          (_l)->l_lock_total_prmode
2766 # define lock_total_exmode(_l)          (_l)->l_lock_total_exmode
2767 # define lock_max_prmode(_l)            (_l)->l_lock_max_prmode
2768 # define lock_max_exmode(_l)            (_l)->l_lock_max_exmode
2769 # define lock_refresh(_l)               (_l)->l_lock_refresh
2770 #else
2771 # define lock_num_prmode(_l)            (0ULL)
2772 # define lock_num_exmode(_l)            (0ULL)
2773 # define lock_num_prmode_failed(_l)     (0)
2774 # define lock_num_exmode_failed(_l)     (0)
2775 # define lock_total_prmode(_l)          (0ULL)
2776 # define lock_total_exmode(_l)          (0ULL)
2777 # define lock_max_prmode(_l)            (0)
2778 # define lock_max_exmode(_l)            (0)
2779 # define lock_refresh(_l)               (0)
2780 #endif
2781         /* The following seq_print was added in version 2 of this output */
2782         seq_printf(m, "%llu\t"
2783                    "%llu\t"
2784                    "%u\t"
2785                    "%u\t"
2786                    "%llu\t"
2787                    "%llu\t"
2788                    "%u\t"
2789                    "%u\t"
2790                    "%u\t",
2791                    lock_num_prmode(lockres),
2792                    lock_num_exmode(lockres),
2793                    lock_num_prmode_failed(lockres),
2794                    lock_num_exmode_failed(lockres),
2795                    lock_total_prmode(lockres),
2796                    lock_total_exmode(lockres),
2797                    lock_max_prmode(lockres),
2798                    lock_max_exmode(lockres),
2799                    lock_refresh(lockres));
2800
2801         /* End the line */
2802         seq_printf(m, "\n");
2803         return 0;
2804 }
2805
2806 static const struct seq_operations ocfs2_dlm_seq_ops = {
2807         .start =        ocfs2_dlm_seq_start,
2808         .stop =         ocfs2_dlm_seq_stop,
2809         .next =         ocfs2_dlm_seq_next,
2810         .show =         ocfs2_dlm_seq_show,
2811 };
2812
2813 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2814 {
2815         struct seq_file *seq = (struct seq_file *) file->private_data;
2816         struct ocfs2_dlm_seq_priv *priv = seq->private;
2817         struct ocfs2_lock_res *res = &priv->p_iter_res;
2818
2819         ocfs2_remove_lockres_tracking(res);
2820         ocfs2_put_dlm_debug(priv->p_dlm_debug);
2821         return seq_release_private(inode, file);
2822 }
2823
2824 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2825 {
2826         int ret;
2827         struct ocfs2_dlm_seq_priv *priv;
2828         struct seq_file *seq;
2829         struct ocfs2_super *osb;
2830
2831         priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2832         if (!priv) {
2833                 ret = -ENOMEM;
2834                 mlog_errno(ret);
2835                 goto out;
2836         }
2837         osb = inode->i_private;
2838         ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2839         priv->p_dlm_debug = osb->osb_dlm_debug;
2840         INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2841
2842         ret = seq_open(file, &ocfs2_dlm_seq_ops);
2843         if (ret) {
2844                 kfree(priv);
2845                 mlog_errno(ret);
2846                 goto out;
2847         }
2848
2849         seq = (struct seq_file *) file->private_data;
2850         seq->private = priv;
2851
2852         ocfs2_add_lockres_tracking(&priv->p_iter_res,
2853                                    priv->p_dlm_debug);
2854
2855 out:
2856         return ret;
2857 }
2858
2859 static const struct file_operations ocfs2_dlm_debug_fops = {
2860         .open =         ocfs2_dlm_debug_open,
2861         .release =      ocfs2_dlm_debug_release,
2862         .read =         seq_read,
2863         .llseek =       seq_lseek,
2864 };
2865
2866 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2867 {
2868         int ret = 0;
2869         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2870
2871         dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2872                                                          S_IFREG|S_IRUSR,
2873                                                          osb->osb_debug_root,
2874                                                          osb,
2875                                                          &ocfs2_dlm_debug_fops);
2876         if (!dlm_debug->d_locking_state) {
2877                 ret = -EINVAL;
2878                 mlog(ML_ERROR,
2879                      "Unable to create locking state debugfs file.\n");
2880                 goto out;
2881         }
2882
2883         ocfs2_get_dlm_debug(dlm_debug);
2884 out:
2885         return ret;
2886 }
2887
2888 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2889 {
2890         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2891
2892         if (dlm_debug) {
2893                 debugfs_remove(dlm_debug->d_locking_state);
2894                 ocfs2_put_dlm_debug(dlm_debug);
2895         }
2896 }
2897
2898 int ocfs2_dlm_init(struct ocfs2_super *osb)
2899 {
2900         int status = 0;
2901         struct ocfs2_cluster_connection *conn = NULL;
2902
2903         mlog_entry_void();
2904
2905         if (ocfs2_mount_local(osb)) {
2906                 osb->node_num = 0;
2907                 goto local;
2908         }
2909
2910         status = ocfs2_dlm_init_debug(osb);
2911         if (status < 0) {
2912                 mlog_errno(status);
2913                 goto bail;
2914         }
2915
2916         /* launch downconvert thread */
2917         osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
2918         if (IS_ERR(osb->dc_task)) {
2919                 status = PTR_ERR(osb->dc_task);
2920                 osb->dc_task = NULL;
2921                 mlog_errno(status);
2922                 goto bail;
2923         }
2924
2925         /* for now, uuid == domain */
2926         status = ocfs2_cluster_connect(osb->osb_cluster_stack,
2927                                        osb->uuid_str,
2928                                        strlen(osb->uuid_str),
2929                                        ocfs2_do_node_down, osb,
2930                                        &conn);
2931         if (status) {
2932                 mlog_errno(status);
2933                 goto bail;
2934         }
2935
2936         status = ocfs2_cluster_this_node(&osb->node_num);
2937         if (status < 0) {
2938                 mlog_errno(status);
2939                 mlog(ML_ERROR,
2940                      "could not find this host's node number\n");
2941                 ocfs2_cluster_disconnect(conn, 0);
2942                 goto bail;
2943         }
2944
2945 local:
2946         ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2947         ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2948         ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
2949         ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
2950
2951         osb->cconn = conn;
2952
2953         status = 0;
2954 bail:
2955         if (status < 0) {
2956                 ocfs2_dlm_shutdown_debug(osb);
2957                 if (osb->dc_task)
2958                         kthread_stop(osb->dc_task);
2959         }
2960
2961         mlog_exit(status);
2962         return status;
2963 }
2964
2965 void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
2966                         int hangup_pending)
2967 {
2968         mlog_entry_void();
2969
2970         ocfs2_drop_osb_locks(osb);
2971
2972         /*
2973          * Now that we have dropped all locks and ocfs2_dismount_volume()
2974          * has disabled recovery, the DLM won't be talking to us.  It's
2975          * safe to tear things down before disconnecting the cluster.
2976          */
2977
2978         if (osb->dc_task) {
2979                 kthread_stop(osb->dc_task);
2980                 osb->dc_task = NULL;
2981         }
2982
2983         ocfs2_lock_res_free(&osb->osb_super_lockres);
2984         ocfs2_lock_res_free(&osb->osb_rename_lockres);
2985         ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
2986         ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
2987
2988         ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
2989         osb->cconn = NULL;
2990
2991         ocfs2_dlm_shutdown_debug(osb);
2992
2993         mlog_exit_void();
2994 }
2995
2996 static void ocfs2_unlock_ast(void *opaque, int error)
2997 {
2998         struct ocfs2_lock_res *lockres = opaque;
2999         unsigned long flags;
3000
3001         mlog_entry_void();
3002
3003         mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
3004              lockres->l_unlock_action);
3005
3006         spin_lock_irqsave(&lockres->l_lock, flags);
3007         if (error) {
3008                 mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
3009                      "unlock_action %d\n", error, lockres->l_name,
3010                      lockres->l_unlock_action);
3011                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3012                 return;
3013         }
3014
3015         switch(lockres->l_unlock_action) {
3016         case OCFS2_UNLOCK_CANCEL_CONVERT:
3017                 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
3018                 lockres->l_action = OCFS2_AST_INVALID;
3019                 /* Downconvert thread may have requeued this lock, we
3020                  * need to wake it. */
3021                 if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3022                         ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
3023                 break;
3024         case OCFS2_UNLOCK_DROP_LOCK:
3025                 lockres->l_level = DLM_LOCK_IV;
3026                 break;
3027         default:
3028                 BUG();
3029         }
3030
3031         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
3032         lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
3033         wake_up(&lockres->l_event);
3034         spin_unlock_irqrestore(&lockres->l_lock, flags);
3035
3036         mlog_exit_void();
3037 }
3038
3039 static int ocfs2_drop_lock(struct ocfs2_super *osb,
3040                            struct ocfs2_lock_res *lockres)
3041 {
3042         int ret;
3043         unsigned long flags;
3044         u32 lkm_flags = 0;
3045
3046         /* We didn't get anywhere near actually using this lockres. */
3047         if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
3048                 goto out;
3049
3050         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3051                 lkm_flags |= DLM_LKF_VALBLK;
3052
3053         spin_lock_irqsave(&lockres->l_lock, flags);
3054
3055         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
3056                         "lockres %s, flags 0x%lx\n",
3057                         lockres->l_name, lockres->l_flags);
3058
3059         while (lockres->l_flags & OCFS2_LOCK_BUSY) {
3060                 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
3061                      "%u, unlock_action = %u\n",
3062                      lockres->l_name, lockres->l_flags, lockres->l_action,
3063                      lockres->l_unlock_action);
3064
3065                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3066
3067                 /* XXX: Today we just wait on any busy
3068                  * locks... Perhaps we need to cancel converts in the
3069                  * future? */
3070                 ocfs2_wait_on_busy_lock(lockres);
3071
3072                 spin_lock_irqsave(&lockres->l_lock, flags);
3073         }
3074
3075         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3076                 if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
3077                     lockres->l_level == DLM_LOCK_EX &&
3078                     !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3079                         lockres->l_ops->set_lvb(lockres);
3080         }
3081
3082         if (lockres->l_flags & OCFS2_LOCK_BUSY)
3083                 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
3084                      lockres->l_name);
3085         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3086                 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
3087
3088         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
3089                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3090                 goto out;
3091         }
3092
3093         lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
3094
3095         /* make sure we never get here while waiting for an ast to
3096          * fire. */
3097         BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
3098
3099         /* is this necessary? */
3100         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3101         lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
3102         spin_unlock_irqrestore(&lockres->l_lock, flags);
3103
3104         mlog(0, "lock %s\n", lockres->l_name);
3105
3106         ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
3107                                lockres);
3108         if (ret) {
3109                 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3110                 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
3111                 ocfs2_dlm_dump_lksb(&lockres->l_lksb);
3112                 BUG();
3113         }
3114         mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
3115              lockres->l_name);
3116
3117         ocfs2_wait_on_busy_lock(lockres);
3118 out:
3119         mlog_exit(0);
3120         return 0;
3121 }
3122
3123 /* Mark the lockres as being dropped. It will no longer be
3124  * queued if blocking, but we still may have to wait on it
3125  * being dequeued from the downconvert thread before we can consider
3126  * it safe to drop. 
3127  *
3128  * You can *not* attempt to call cluster_lock on this lockres anymore. */
3129 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
3130 {
3131         int status;
3132         struct ocfs2_mask_waiter mw;
3133         unsigned long flags;
3134
3135         ocfs2_init_mask_waiter(&mw);
3136
3137         spin_lock_irqsave(&lockres->l_lock, flags);
3138         lockres->l_flags |= OCFS2_LOCK_FREEING;
3139         while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
3140                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
3141                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3142
3143                 mlog(0, "Waiting on lockres %s\n", lockres->l_name);
3144
3145                 status = ocfs2_wait_for_mask(&mw);
3146                 if (status)
3147                         mlog_errno(status);
3148
3149                 spin_lock_irqsave(&lockres->l_lock, flags);
3150         }
3151         spin_unlock_irqrestore(&lockres->l_lock, flags);
3152 }
3153
3154 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
3155                                struct ocfs2_lock_res *lockres)
3156 {
3157         int ret;
3158
3159         ocfs2_mark_lockres_freeing(lockres);
3160         ret = ocfs2_drop_lock(osb, lockres);
3161         if (ret)
3162                 mlog_errno(ret);
3163 }
3164
3165 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
3166 {
3167         ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
3168         ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
3169         ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
3170         ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
3171 }
3172
3173 int ocfs2_drop_inode_locks(struct inode *inode)
3174 {
3175         int status, err;
3176
3177         mlog_entry_void();
3178
3179         /* No need to call ocfs2_mark_lockres_freeing here -
3180          * ocfs2_clear_inode has done it for us. */
3181
3182         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3183                               &OCFS2_I(inode)->ip_open_lockres);
3184         if (err < 0)
3185                 mlog_errno(err);
3186
3187         status = err;
3188
3189         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3190                               &OCFS2_I(inode)->ip_inode_lockres);
3191         if (err < 0)
3192                 mlog_errno(err);
3193         if (err < 0 && !status)
3194                 status = err;
3195
3196         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3197                               &OCFS2_I(inode)->ip_rw_lockres);
3198         if (err < 0)
3199                 mlog_errno(err);
3200         if (err < 0 && !status)
3201                 status = err;
3202
3203         mlog_exit(status);
3204         return status;
3205 }
3206
3207 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
3208                                               int new_level)
3209 {
3210         assert_spin_locked(&lockres->l_lock);
3211
3212         BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
3213
3214         if (lockres->l_level <= new_level) {
3215                 mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n",
3216                      lockres->l_level, new_level);
3217                 BUG();
3218         }
3219
3220         mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
3221              lockres->l_name, new_level, lockres->l_blocking);
3222
3223         lockres->l_action = OCFS2_AST_DOWNCONVERT;
3224         lockres->l_requested = new_level;
3225         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3226         return lockres_set_pending(lockres);
3227 }
3228
3229 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3230                                   struct ocfs2_lock_res *lockres,
3231                                   int new_level,
3232                                   int lvb,
3233                                   unsigned int generation)
3234 {
3235         int ret;
3236         u32 dlm_flags = DLM_LKF_CONVERT;
3237
3238         mlog_entry_void();
3239
3240         if (lvb)
3241                 dlm_flags |= DLM_LKF_VALBLK;
3242
3243         ret = ocfs2_dlm_lock(osb->cconn,
3244                              new_level,
3245                              &lockres->l_lksb,
3246                              dlm_flags,
3247                              lockres->l_name,
3248                              OCFS2_LOCK_ID_MAX_LEN - 1,
3249                              lockres);
3250         lockres_clear_pending(lockres, generation, osb);
3251         if (ret) {
3252                 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
3253                 ocfs2_recover_from_dlm_error(lockres, 1);
3254                 goto bail;
3255         }
3256
3257         ret = 0;
3258 bail:
3259         mlog_exit(ret);
3260         return ret;
3261 }
3262
3263 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
3264 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3265                                         struct ocfs2_lock_res *lockres)
3266 {
3267         assert_spin_locked(&lockres->l_lock);
3268
3269         mlog_entry_void();
3270         mlog(0, "lock %s\n", lockres->l_name);
3271
3272         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
3273                 /* If we're already trying to cancel a lock conversion
3274                  * then just drop the spinlock and allow the caller to
3275                  * requeue this lock. */
3276
3277                 mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
3278                 return 0;
3279         }
3280
3281         /* were we in a convert when we got the bast fire? */
3282         BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
3283                lockres->l_action != OCFS2_AST_DOWNCONVERT);
3284         /* set things up for the unlockast to know to just
3285          * clear out the ast_action and unset busy, etc. */
3286         lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
3287
3288         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
3289                         "lock %s, invalid flags: 0x%lx\n",
3290                         lockres->l_name, lockres->l_flags);
3291
3292         return 1;
3293 }
3294
3295 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
3296                                 struct ocfs2_lock_res *lockres)
3297 {
3298         int ret;
3299
3300         mlog_entry_void();
3301         mlog(0, "lock %s\n", lockres->l_name);
3302
3303         ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
3304                                DLM_LKF_CANCEL, lockres);
3305         if (ret) {
3306                 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3307                 ocfs2_recover_from_dlm_error(lockres, 0);
3308         }
3309
3310         mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name);
3311
3312         mlog_exit(ret);
3313         return ret;
3314 }
3315
3316 static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3317                               struct ocfs2_lock_res *lockres,
3318                               struct ocfs2_unblock_ctl *ctl)
3319 {
3320         unsigned long flags;
3321         int blocking;
3322         int new_level;
3323         int ret = 0;
3324         int set_lvb = 0;
3325         unsigned int gen;
3326
3327         mlog_entry_void();
3328
3329         spin_lock_irqsave(&lockres->l_lock, flags);
3330
3331         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
3332
3333 recheck:
3334         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3335                 /* XXX
3336                  * This is a *big* race.  The OCFS2_LOCK_PENDING flag
3337                  * exists entirely for one reason - another thread has set
3338                  * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3339                  *
3340                  * If we do ocfs2_cancel_convert() before the other thread
3341                  * calls dlm_lock(), our cancel will do nothing.  We will
3342                  * get no ast, and we will have no way of knowing the
3343                  * cancel failed.  Meanwhile, the other thread will call
3344                  * into dlm_lock() and wait...forever.
3345                  *
3346                  * Why forever?  Because another node has asked for the
3347                  * lock first; that's why we're here in unblock_lock().
3348                  *
3349                  * The solution is OCFS2_LOCK_PENDING.  When PENDING is
3350                  * set, we just requeue the unblock.  Only when the other
3351                  * thread has called dlm_lock() and cleared PENDING will
3352                  * we then cancel their request.
3353                  *
3354                  * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3355                  * at the same time they set OCFS2_DLM_BUSY.  They must
3356                  * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3357                  */
3358                 if (lockres->l_flags & OCFS2_LOCK_PENDING)
3359                         goto leave_requeue;
3360
3361                 ctl->requeue = 1;
3362                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
3363                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3364                 if (ret) {
3365                         ret = ocfs2_cancel_convert(osb, lockres);
3366                         if (ret < 0)
3367                                 mlog_errno(ret);
3368                 }
3369                 goto leave;
3370         }
3371
3372         /* if we're blocking an exclusive and we have *any* holders,
3373          * then requeue. */
3374         if ((lockres->l_blocking == DLM_LOCK_EX)
3375             && (lockres->l_ex_holders || lockres->l_ro_holders))
3376                 goto leave_requeue;
3377
3378         /* If it's a PR we're blocking, then only
3379          * requeue if we've got any EX holders */
3380         if (lockres->l_blocking == DLM_LOCK_PR &&
3381             lockres->l_ex_holders)
3382                 goto leave_requeue;
3383
3384         /*
3385          * Can we get a lock in this state if the holder counts are
3386          * zero? The meta data unblock code used to check this.
3387          */
3388         if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
3389             && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
3390                 goto leave_requeue;
3391
3392         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
3393
3394         if (lockres->l_ops->check_downconvert
3395             && !lockres->l_ops->check_downconvert(lockres, new_level))
3396                 goto leave_requeue;
3397
3398         /* If we get here, then we know that there are no more
3399          * incompatible holders (and anyone asking for an incompatible
3400          * lock is blocked). We can now downconvert the lock */
3401         if (!lockres->l_ops->downconvert_worker)
3402                 goto downconvert;
3403
3404         /* Some lockres types want to do a bit of work before
3405          * downconverting a lock. Allow that here. The worker function
3406          * may sleep, so we save off a copy of what we're blocking as
3407          * it may change while we're not holding the spin lock. */
3408         blocking = lockres->l_blocking;
3409         spin_unlock_irqrestore(&lockres->l_lock, flags);
3410
3411         ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
3412
3413         if (ctl->unblock_action == UNBLOCK_STOP_POST)
3414                 goto leave;
3415
3416         spin_lock_irqsave(&lockres->l_lock, flags);
3417         if (blocking != lockres->l_blocking) {
3418                 /* If this changed underneath us, then we can't drop
3419                  * it just yet. */
3420                 goto recheck;
3421         }
3422
3423 downconvert:
3424         ctl->requeue = 0;
3425
3426         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3427                 if (lockres->l_level == DLM_LOCK_EX)
3428                         set_lvb = 1;
3429
3430                 /*
3431                  * We only set the lvb if the lock has been fully
3432                  * refreshed - otherwise we risk setting stale
3433                  * data. Otherwise, there's no need to actually clear
3434                  * out the lvb here as it's value is still valid.
3435                  */
3436                 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3437                         lockres->l_ops->set_lvb(lockres);
3438         }
3439
3440         gen = ocfs2_prepare_downconvert(lockres, new_level);
3441         spin_unlock_irqrestore(&lockres->l_lock, flags);
3442         ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3443                                      gen);
3444
3445 leave:
3446         mlog_exit(ret);
3447         return ret;
3448
3449 leave_requeue:
3450         spin_unlock_irqrestore(&lockres->l_lock, flags);
3451         ctl->requeue = 1;
3452
3453         mlog_exit(0);
3454         return 0;
3455 }
3456
3457 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3458                                      int blocking)
3459 {
3460         struct inode *inode;
3461         struct address_space *mapping;
3462
3463         inode = ocfs2_lock_res_inode(lockres);
3464         mapping = inode->i_mapping;
3465
3466         if (!S_ISREG(inode->i_mode))
3467                 goto out;
3468
3469         /*
3470          * We need this before the filemap_fdatawrite() so that it can
3471          * transfer the dirty bit from the PTE to the
3472          * page. Unfortunately this means that even for EX->PR
3473          * downconverts, we'll lose our mappings and have to build
3474          * them up again.
3475          */
3476         unmap_mapping_range(mapping, 0, 0, 0);
3477
3478         if (filemap_fdatawrite(mapping)) {
3479                 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3480                      (unsigned long long)OCFS2_I(inode)->ip_blkno);
3481         }
3482         sync_mapping_buffers(mapping);
3483         if (blocking == DLM_LOCK_EX) {
3484                 truncate_inode_pages(mapping, 0);
3485         } else {
3486                 /* We only need to wait on the I/O if we're not also
3487                  * truncating pages because truncate_inode_pages waits
3488                  * for us above. We don't truncate pages if we're
3489                  * blocking anything < EXMODE because we want to keep
3490                  * them around in that case. */
3491                 filemap_fdatawait(mapping);
3492         }
3493
3494 out:
3495         return UNBLOCK_CONTINUE;
3496 }
3497
3498 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3499                                         int new_level)
3500 {
3501         struct inode *inode = ocfs2_lock_res_inode(lockres);
3502         int checkpointed = ocfs2_inode_fully_checkpointed(inode);
3503
3504         BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
3505         BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
3506
3507         if (checkpointed)
3508                 return 1;
3509
3510         ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
3511         return 0;
3512 }
3513
3514 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3515 {
3516         struct inode *inode = ocfs2_lock_res_inode(lockres);
3517
3518         __ocfs2_stuff_meta_lvb(inode);
3519 }
3520
3521 /*
3522  * Does the final reference drop on our dentry lock. Right now this
3523  * happens in the downconvert thread, but we could choose to simplify the
3524  * dlmglue API and push these off to the ocfs2_wq in the future.
3525  */
3526 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3527                                      struct ocfs2_lock_res *lockres)
3528 {
3529         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3530         ocfs2_dentry_lock_put(osb, dl);
3531 }
3532
3533 /*
3534  * d_delete() matching dentries before the lock downconvert.
3535  *
3536  * At this point, any process waiting to destroy the
3537  * dentry_lock due to last ref count is stopped by the
3538  * OCFS2_LOCK_QUEUED flag.
3539  *
3540  * We have two potential problems
3541  *
3542  * 1) If we do the last reference drop on our dentry_lock (via dput)
3543  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
3544  *    the downconvert to finish. Instead we take an elevated
3545  *    reference and push the drop until after we've completed our
3546  *    unblock processing.
3547  *
3548  * 2) There might be another process with a final reference,
3549  *    waiting on us to finish processing. If this is the case, we
3550  *    detect it and exit out - there's no more dentries anyway.
3551  */
3552 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3553                                        int blocking)
3554 {
3555         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3556         struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
3557         struct dentry *dentry;
3558         unsigned long flags;
3559         int extra_ref = 0;
3560
3561         /*
3562          * This node is blocking another node from getting a read
3563          * lock. This happens when we've renamed within a
3564          * directory. We've forced the other nodes to d_delete(), but
3565          * we never actually dropped our lock because it's still
3566          * valid. The downconvert code will retain a PR for this node,
3567          * so there's no further work to do.
3568          */
3569         if (blocking == DLM_LOCK_PR)
3570                 return UNBLOCK_CONTINUE;
3571
3572         /*
3573          * Mark this inode as potentially orphaned. The code in
3574          * ocfs2_delete_inode() will figure out whether it actually
3575          * needs to be freed or not.
3576          */
3577         spin_lock(&oi->ip_lock);
3578         oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
3579         spin_unlock(&oi->ip_lock);
3580
3581         /*
3582          * Yuck. We need to make sure however that the check of
3583          * OCFS2_LOCK_FREEING and the extra reference are atomic with
3584          * respect to a reference decrement or the setting of that
3585          * flag.
3586          */
3587         spin_lock_irqsave(&lockres->l_lock, flags);
3588         spin_lock(&dentry_attach_lock);
3589         if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
3590             && dl->dl_count) {
3591                 dl->dl_count++;
3592                 extra_ref = 1;
3593         }
3594         spin_unlock(&dentry_attach_lock);
3595         spin_unlock_irqrestore(&lockres->l_lock, flags);
3596
3597         mlog(0, "extra_ref = %d\n", extra_ref);
3598
3599         /*
3600          * We have a process waiting on us in ocfs2_dentry_iput(),
3601          * which means we can't have any more outstanding
3602          * aliases. There's no need to do any more work.
3603          */
3604         if (!extra_ref)
3605                 return UNBLOCK_CONTINUE;
3606
3607         spin_lock(&dentry_attach_lock);
3608         while (1) {
3609                 dentry = ocfs2_find_local_alias(dl->dl_inode,
3610                                                 dl->dl_parent_blkno, 1);
3611                 if (!dentry)
3612                         break;
3613                 spin_unlock(&dentry_attach_lock);
3614
3615                 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
3616                      dentry->d_name.name);
3617
3618                 /*
3619                  * The following dcache calls may do an
3620                  * iput(). Normally we don't want that from the
3621                  * downconverting thread, but in this case it's ok
3622                  * because the requesting node already has an
3623                  * exclusive lock on the inode, so it can't be queued
3624                  * for a downconvert.
3625                  */
3626                 d_delete(dentry);
3627                 dput(dentry);
3628
3629                 spin_lock(&dentry_attach_lock);
3630         }
3631         spin_unlock(&dentry_attach_lock);
3632
3633         /*
3634          * If we are the last holder of this dentry lock, there is no
3635          * reason to downconvert so skip straight to the unlock.
3636          */
3637         if (dl->dl_count == 1)
3638                 return UNBLOCK_STOP_POST;
3639
3640         return UNBLOCK_CONTINUE_POST;
3641 }
3642
3643 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
3644 {
3645         struct ocfs2_qinfo_lvb *lvb;
3646         struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
3647         struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
3648                                             oinfo->dqi_gi.dqi_type);
3649
3650         mlog_entry_void();
3651
3652         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3653         lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
3654         lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
3655         lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
3656         lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
3657         lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
3658         lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
3659         lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
3660
3661         mlog_exit_void();
3662 }
3663
3664 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
3665 {
3666         struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3667         struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
3668         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3669
3670         mlog_entry_void();
3671         if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
3672                 ocfs2_cluster_unlock(osb, lockres, level);
3673         mlog_exit_void();
3674 }
3675
3676 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
3677 {
3678         struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
3679                                             oinfo->dqi_gi.dqi_type);
3680         struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3681         struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3682         struct buffer_head *bh = NULL;
3683         struct ocfs2_global_disk_dqinfo *gdinfo;
3684         int status = 0;
3685
3686         if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
3687             lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
3688                 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
3689                 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
3690                 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
3691                 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
3692                 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
3693                 oinfo->dqi_gi.dqi_free_entry =
3694                                         be32_to_cpu(lvb->lvb_free_entry);
3695         } else {
3696                 status = ocfs2_read_quota_block(oinfo->dqi_gqinode, 0, &bh);
3697                 if (status) {
3698                         mlog_errno(status);
3699                         goto bail;
3700                 }
3701                 gdinfo = (struct ocfs2_global_disk_dqinfo *)
3702                                         (bh->b_data + OCFS2_GLOBAL_INFO_OFF);
3703                 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
3704                 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
3705                 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
3706                 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
3707                 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
3708                 oinfo->dqi_gi.dqi_free_entry =
3709                                         le32_to_cpu(gdinfo->dqi_free_entry);
3710                 brelse(bh);
3711                 ocfs2_track_lock_refresh(lockres);
3712         }
3713
3714 bail:
3715         return status;
3716 }
3717
3718 /* Lock quota info, this function expects at least shared lock on the quota file
3719  * so that we can safely refresh quota info from disk. */
3720 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
3721 {
3722         struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3723         struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
3724         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3725         int status = 0;
3726
3727         mlog_entry_void();
3728
3729         /* On RO devices, locking really isn't needed... */
3730         if (ocfs2_is_hard_readonly(osb)) {
3731                 if (ex)
3732                         status = -EROFS;
3733                 goto bail;
3734         }
3735         if (ocfs2_mount_local(osb))
3736                 goto bail;
3737
3738         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
3739         if (status < 0) {
3740                 mlog_errno(status);
3741                 goto bail;
3742         }
3743         if (!ocfs2_should_refresh_lock_res(lockres))
3744                 goto bail;
3745         /* OK, we have the lock but we need to refresh the quota info */
3746         status = ocfs2_refresh_qinfo(oinfo);
3747         if (status)
3748                 ocfs2_qinfo_unlock(oinfo, ex);
3749         ocfs2_complete_lock_res_refresh(lockres, status);
3750 bail:
3751         mlog_exit(status);
3752         return status;
3753 }
3754
3755 /*
3756  * This is the filesystem locking protocol.  It provides the lock handling
3757  * hooks for the underlying DLM.  It has a maximum version number.
3758  * The version number allows interoperability with systems running at
3759  * the same major number and an equal or smaller minor number.
3760  *
3761  * Whenever the filesystem does new things with locks (adds or removes a
3762  * lock, orders them differently, does different things underneath a lock),
3763  * the version must be changed.  The protocol is negotiated when joining
3764  * the dlm domain.  A node may join the domain if its major version is
3765  * identical to all other nodes and its minor version is greater than
3766  * or equal to all other nodes.  When its minor version is greater than
3767  * the other nodes, it will run at the minor version specified by the
3768  * other nodes.
3769  *
3770  * If a locking change is made that will not be compatible with older
3771  * versions, the major number must be increased and the minor version set
3772  * to zero.  If a change merely adds a behavior that can be disabled when
3773  * speaking to older versions, the minor version must be increased.  If a
3774  * change adds a fully backwards compatible change (eg, LVB changes that
3775  * are just ignored by older versions), the version does not need to be
3776  * updated.
3777  */
3778 static struct ocfs2_locking_protocol lproto = {
3779         .lp_max_version = {
3780                 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
3781                 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
3782         },
3783         .lp_lock_ast            = ocfs2_locking_ast,
3784         .lp_blocking_ast        = ocfs2_blocking_ast,
3785         .lp_unlock_ast          = ocfs2_unlock_ast,
3786 };
3787
3788 void ocfs2_set_locking_protocol(void)
3789 {
3790         ocfs2_stack_glue_set_locking_protocol(&lproto);
3791 }
3792
3793
3794 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3795                                        struct ocfs2_lock_res *lockres)
3796 {
3797         int status;
3798         struct ocfs2_unblock_ctl ctl = {0, 0,};
3799         unsigned long flags;
3800
3801         /* Our reference to the lockres in this function can be
3802          * considered valid until we remove the OCFS2_LOCK_QUEUED
3803          * flag. */
3804
3805         mlog_entry_void();
3806
3807         BUG_ON(!lockres);
3808         BUG_ON(!lockres->l_ops);
3809
3810         mlog(0, "lockres %s blocked.\n", lockres->l_name);
3811
3812         /* Detect whether a lock has been marked as going away while
3813          * the downconvert thread was processing other things. A lock can
3814          * still be marked with OCFS2_LOCK_FREEING after this check,
3815          * but short circuiting here will still save us some
3816          * performance. */
3817         spin_lock_irqsave(&lockres->l_lock, flags);
3818         if (lockres->l_flags & OCFS2_LOCK_FREEING)
3819                 goto unqueue;
3820         spin_unlock_irqrestore(&lockres->l_lock, flags);
3821
3822         status = ocfs2_unblock_lock(osb, lockres, &ctl);
3823         if (status < 0)
3824                 mlog_errno(status);
3825
3826         spin_lock_irqsave(&lockres->l_lock, flags);
3827 unqueue:
3828         if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3829                 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3830         } else
3831                 ocfs2_schedule_blocked_lock(osb, lockres);
3832
3833         mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3834              ctl.requeue ? "yes" : "no");
3835         spin_unlock_irqrestore(&lockres->l_lock, flags);
3836
3837         if (ctl.unblock_action != UNBLOCK_CONTINUE
3838             && lockres->l_ops->post_unlock)
3839                 lockres->l_ops->post_unlock(osb, lockres);
3840
3841         mlog_exit_void();
3842 }
3843
3844 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3845                                         struct ocfs2_lock_res *lockres)
3846 {
3847         mlog_entry_void();
3848
3849         assert_spin_locked(&lockres->l_lock);
3850
3851         if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3852                 /* Do not schedule a lock for downconvert when it's on
3853                  * the way to destruction - any nodes wanting access
3854                  * to the resource will get it soon. */
3855                 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3856                      lockres->l_name, lockres->l_flags);
3857                 return;
3858         }
3859
3860         lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3861
3862         spin_lock(&osb->dc_task_lock);
3863         if (list_empty(&lockres->l_blocked_list)) {
3864                 list_add_tail(&lockres->l_blocked_list,
3865                               &osb->blocked_lock_list);
3866                 osb->blocked_lock_count++;
3867         }
3868         spin_unlock(&osb->dc_task_lock);
3869
3870         mlog_exit_void();
3871 }
3872
3873 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
3874 {
3875         unsigned long processed;
3876         struct ocfs2_lock_res *lockres;
3877
3878         mlog_entry_void();
3879
3880         spin_lock(&osb->dc_task_lock);
3881         /* grab this early so we know to try again if a state change and
3882          * wake happens part-way through our work  */
3883         osb->dc_work_sequence = osb->dc_wake_sequence;
3884
3885         processed = osb->blocked_lock_count;
3886         while (processed) {
3887                 BUG_ON(list_empty(&osb->blocked_lock_list));
3888
3889                 lockres = list_entry(osb->blocked_lock_list.next,
3890                                      struct ocfs2_lock_res, l_blocked_list);
3891                 list_del_init(&lockres->l_blocked_list);
3892                 osb->blocked_lock_count--;
3893                 spin_unlock(&osb->dc_task_lock);
3894
3895                 BUG_ON(!processed);
3896                 processed--;
3897
3898                 ocfs2_process_blocked_lock(osb, lockres);
3899
3900                 spin_lock(&osb->dc_task_lock);
3901         }
3902         spin_unlock(&osb->dc_task_lock);
3903
3904         mlog_exit_void();
3905 }
3906
3907 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
3908 {
3909         int empty = 0;
3910
3911         spin_lock(&osb->dc_task_lock);
3912         if (list_empty(&osb->blocked_lock_list))
3913                 empty = 1;
3914
3915         spin_unlock(&osb->dc_task_lock);
3916         return empty;
3917 }
3918
3919 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
3920 {
3921         int should_wake = 0;
3922
3923         spin_lock(&osb->dc_task_lock);
3924         if (osb->dc_work_sequence != osb->dc_wake_sequence)
3925                 should_wake = 1;
3926         spin_unlock(&osb->dc_task_lock);
3927
3928         return should_wake;
3929 }
3930
3931 static int ocfs2_downconvert_thread(void *arg)
3932 {
3933         int status = 0;
3934         struct ocfs2_super *osb = arg;
3935
3936         /* only quit once we've been asked to stop and there is no more
3937          * work available */
3938         while (!(kthread_should_stop() &&
3939                 ocfs2_downconvert_thread_lists_empty(osb))) {
3940
3941                 wait_event_interruptible(osb->dc_event,
3942                                          ocfs2_downconvert_thread_should_wake(osb) ||
3943                                          kthread_should_stop());
3944
3945                 mlog(0, "downconvert_thread: awoken\n");
3946
3947                 ocfs2_downconvert_thread_do_work(osb);
3948         }
3949
3950         osb->dc_task = NULL;
3951         return status;
3952 }
3953
3954 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
3955 {
3956         spin_lock(&osb->dc_task_lock);
3957         /* make sure the voting thread gets a swipe at whatever changes
3958          * the caller may have made to the voting state */
3959         osb->dc_wake_sequence++;
3960         spin_unlock(&osb->dc_task_lock);
3961         wake_up(&osb->dc_event);
3962 }