1 /* -*- mode: c; c-basic-offset: 8; -*-
 
   2  * vim: noexpandtab sw=8 ts=8 sts=0:
 
   6  * Code which implements an OCFS2 specific interface to our DLM.
 
   8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
 
  10  * This program is free software; you can redistribute it and/or
 
  11  * modify it under the terms of the GNU General Public
 
  12  * License as published by the Free Software Foundation; either
 
  13  * version 2 of the License, or (at your option) any later version.
 
  15  * This program is distributed in the hope that it will be useful,
 
  16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
  18  * General Public License for more details.
 
  20  * You should have received a copy of the GNU General Public
 
  21  * License along with this program; if not, write to the
 
  22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 
  23  * Boston, MA 021110-1307, USA.
 
  26 #include <linux/types.h>
 
  27 #include <linux/slab.h>
 
  28 #include <linux/highmem.h>
 
  30 #include <linux/kthread.h>
 
  31 #include <linux/pagemap.h>
 
  32 #include <linux/debugfs.h>
 
  33 #include <linux/seq_file.h>
 
  34 #include <linux/time.h>
 
  36 #define MLOG_MASK_PREFIX ML_DLM_GLUE
 
  37 #include <cluster/masklog.h>
 
  40 #include "ocfs2_lockingver.h"
 
  45 #include "extent_map.h"
 
  47 #include "heartbeat.h"
 
  50 #include "stackglue.h"
 
  55 #include "buffer_head_io.h"
 
  57 struct ocfs2_mask_waiter {
 
  58         struct list_head        mw_item;
 
  60         struct completion       mw_complete;
 
  61         unsigned long           mw_mask;
 
  62         unsigned long           mw_goal;
 
  63 #ifdef CONFIG_OCFS2_FS_STATS
 
  64         unsigned long long      mw_lock_start;
 
  68 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
 
  69 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
 
  70 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
 
  73  * Return value from ->downconvert_worker functions.
 
  75  * These control the precise actions of ocfs2_unblock_lock()
 
  76  * and ocfs2_process_blocked_lock()
 
  79 enum ocfs2_unblock_action {
 
  80         UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
 
  81         UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
 
  82                                       * ->post_unlock callback */
 
  83         UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
 
  84                                       * ->post_unlock() callback. */
 
  87 struct ocfs2_unblock_ctl {
 
  89         enum ocfs2_unblock_action unblock_action;
 
  92 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
 
  94 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
 
  96 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
 
  99 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
 
 102 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
 
 103                                      struct ocfs2_lock_res *lockres);
 
 106 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
 
 108 /* This aids in debugging situations where a bad LVB might be involved. */
 
 109 static void ocfs2_dump_meta_lvb_info(u64 level,
 
 110                                      const char *function,
 
 112                                      struct ocfs2_lock_res *lockres)
 
 114         struct ocfs2_meta_lvb *lvb =
 
 115                 (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
 
 117         mlog(level, "LVB information for %s (called from %s:%u):\n",
 
 118              lockres->l_name, function, line);
 
 119         mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
 
 120              lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
 
 121              be32_to_cpu(lvb->lvb_igeneration));
 
 122         mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
 
 123              (unsigned long long)be64_to_cpu(lvb->lvb_isize),
 
 124              be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
 
 125              be16_to_cpu(lvb->lvb_imode));
 
 126         mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
 
 127              "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
 
 128              (long long)be64_to_cpu(lvb->lvb_iatime_packed),
 
 129              (long long)be64_to_cpu(lvb->lvb_ictime_packed),
 
 130              (long long)be64_to_cpu(lvb->lvb_imtime_packed),
 
 131              be32_to_cpu(lvb->lvb_iattr));
 
 136  * OCFS2 Lock Resource Operations
 
 138  * These fine tune the behavior of the generic dlmglue locking infrastructure.
 
 140  * The most basic of lock types can point ->l_priv to their respective
 
 141  * struct ocfs2_super and allow the default actions to manage things.
 
 143  * Right now, each lock type also needs to implement an init function,
 
 144  * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
 
 145  * should be called when the lock is no longer needed (i.e., object
 
 148 struct ocfs2_lock_res_ops {
 
 150          * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
 
 151          * this callback if ->l_priv is not an ocfs2_super pointer
 
 153         struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
 
 156          * Optionally called in the downconvert thread after a
 
 157          * successful downconvert. The lockres will not be referenced
 
 158          * after this callback is called, so it is safe to free
 
 161          * The exact semantics of when this is called are controlled
 
 162          * by ->downconvert_worker()
 
 164         void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
 
 167          * Allow a lock type to add checks to determine whether it is
 
 168          * safe to downconvert a lock. Return 0 to re-queue the
 
 169          * downconvert at a later time, nonzero to continue.
 
 171          * For most locks, the default checks that there are no
 
 172          * incompatible holders are sufficient.
 
 174          * Called with the lockres spinlock held.
 
 176         int (*check_downconvert)(struct ocfs2_lock_res *, int);
 
 179          * Allows a lock type to populate the lock value block. This
 
 180          * is called on downconvert, and when we drop a lock.
 
 182          * Locks that want to use this should set LOCK_TYPE_USES_LVB
 
 183          * in the flags field.
 
 185          * Called with the lockres spinlock held.
 
 187         void (*set_lvb)(struct ocfs2_lock_res *);
 
 190          * Called from the downconvert thread when it is determined
 
 191          * that a lock will be downconverted. This is called without
 
 192          * any locks held so the function can do work that might
 
 193          * schedule (syncing out data, etc).
 
 195          * This should return any one of the ocfs2_unblock_action
 
 196          * values, depending on what it wants the thread to do.
 
 198         int (*downconvert_worker)(struct ocfs2_lock_res *, int);
 
 201          * LOCK_TYPE_* flags which describe the specific requirements
 
 202          * of a lock type. Descriptions of each individual flag follow.
 
 208  * Some locks want to "refresh" potentially stale data when a
 
 209  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
 
 210  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
 
 211  * individual lockres l_flags member from the ast function. It is
 
 212  * expected that the locking wrapper will clear the
 
 213  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
 
 215 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
 
 218  * Indicate that a lock type makes use of the lock value block. The
 
 219  * ->set_lvb lock type callback must be defined.
 
 221 #define LOCK_TYPE_USES_LVB              0x2
 
 223 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
 
 224         .get_osb        = ocfs2_get_inode_osb,
 
 228 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
 
 229         .get_osb        = ocfs2_get_inode_osb,
 
 230         .check_downconvert = ocfs2_check_meta_downconvert,
 
 231         .set_lvb        = ocfs2_set_meta_lvb,
 
 232         .downconvert_worker = ocfs2_data_convert_worker,
 
 233         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
 
 236 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
 
 237         .flags          = LOCK_TYPE_REQUIRES_REFRESH,
 
 240 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
 
 244 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
 
 245         .get_osb        = ocfs2_get_dentry_osb,
 
 246         .post_unlock    = ocfs2_dentry_post_unlock,
 
 247         .downconvert_worker = ocfs2_dentry_convert_worker,
 
 251 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
 
 252         .get_osb        = ocfs2_get_inode_osb,
 
 256 static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
 
 257         .get_osb        = ocfs2_get_file_osb,
 
 261 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
 
 263         return lockres->l_type == OCFS2_LOCK_TYPE_META ||
 
 264                 lockres->l_type == OCFS2_LOCK_TYPE_RW ||
 
 265                 lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
 
 268 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
 
 270         BUG_ON(!ocfs2_is_inode_lock(lockres));
 
 272         return (struct inode *) lockres->l_priv;
 
 275 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
 
 277         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
 
 279         return (struct ocfs2_dentry_lock *)lockres->l_priv;
 
 282 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
 
 284         if (lockres->l_ops->get_osb)
 
 285                 return lockres->l_ops->get_osb(lockres);
 
 287         return (struct ocfs2_super *)lockres->l_priv;
 
 290 static int ocfs2_lock_create(struct ocfs2_super *osb,
 
 291                              struct ocfs2_lock_res *lockres,
 
 294 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
 
 296 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
 
 297                                  struct ocfs2_lock_res *lockres,
 
 299 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
 
 300 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
 
 301 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
 
 302 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
 
 303 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
 
 304                                         struct ocfs2_lock_res *lockres);
 
 305 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
 
 307 #define ocfs2_log_dlm_error(_func, _err, _lockres) do {                 \
 
 308         mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \
 
 309              _err, _func, _lockres->l_name);                            \
 
 311 static int ocfs2_downconvert_thread(void *arg);
 
 312 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
 
 313                                         struct ocfs2_lock_res *lockres);
 
 314 static int ocfs2_inode_lock_update(struct inode *inode,
 
 315                                   struct buffer_head **bh);
 
 316 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
 
 317 static inline int ocfs2_highest_compat_lock_level(int level);
 
 318 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
 
 320 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
 
 321                                   struct ocfs2_lock_res *lockres,
 
 324                                   unsigned int generation);
 
 325 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
 
 326                                         struct ocfs2_lock_res *lockres);
 
 327 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
 
 328                                 struct ocfs2_lock_res *lockres);
 
 331 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
 
 340         BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
 
 342         len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
 
 343                        ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
 
 344                        (long long)blkno, generation);
 
 346         BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
 
 348         mlog(0, "built lock resource with name: %s\n", name);
 
 353 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
 
 355 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
 
 356                                        struct ocfs2_dlm_debug *dlm_debug)
 
 358         mlog(0, "Add tracking for lockres %s\n", res->l_name);
 
 360         spin_lock(&ocfs2_dlm_tracking_lock);
 
 361         list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
 
 362         spin_unlock(&ocfs2_dlm_tracking_lock);
 
 365 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
 
 367         spin_lock(&ocfs2_dlm_tracking_lock);
 
 368         if (!list_empty(&res->l_debug_list))
 
 369                 list_del_init(&res->l_debug_list);
 
 370         spin_unlock(&ocfs2_dlm_tracking_lock);
 
 373 #ifdef CONFIG_OCFS2_FS_STATS
 
 374 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
 
 376         res->l_lock_num_prmode = 0;
 
 377         res->l_lock_num_prmode_failed = 0;
 
 378         res->l_lock_total_prmode = 0;
 
 379         res->l_lock_max_prmode = 0;
 
 380         res->l_lock_num_exmode = 0;
 
 381         res->l_lock_num_exmode_failed = 0;
 
 382         res->l_lock_total_exmode = 0;
 
 383         res->l_lock_max_exmode = 0;
 
 384         res->l_lock_refresh = 0;
 
 387 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
 
 388                                     struct ocfs2_mask_waiter *mw, int ret)
 
 390         unsigned long long *num, *sum;
 
 391         unsigned int *max, *failed;
 
 392         struct timespec ts = current_kernel_time();
 
 393         unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start;
 
 395         if (level == LKM_PRMODE) {
 
 396                 num = &res->l_lock_num_prmode;
 
 397                 sum = &res->l_lock_total_prmode;
 
 398                 max = &res->l_lock_max_prmode;
 
 399                 failed = &res->l_lock_num_prmode_failed;
 
 400         } else if (level == LKM_EXMODE) {
 
 401                 num = &res->l_lock_num_exmode;
 
 402                 sum = &res->l_lock_total_exmode;
 
 403                 max = &res->l_lock_max_exmode;
 
 404                 failed = &res->l_lock_num_exmode_failed;
 
 416 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
 
 418         lockres->l_lock_refresh++;
 
 421 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
 
 423         struct timespec ts = current_kernel_time();
 
 424         mw->mw_lock_start = timespec_to_ns(&ts);
 
 427 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
 
 430 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
 
 431                            int level, struct ocfs2_mask_waiter *mw, int ret)
 
 434 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
 
 437 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
 
 442 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
 
 443                                        struct ocfs2_lock_res *res,
 
 444                                        enum ocfs2_lock_type type,
 
 445                                        struct ocfs2_lock_res_ops *ops,
 
 452         res->l_level         = DLM_LOCK_IV;
 
 453         res->l_requested     = DLM_LOCK_IV;
 
 454         res->l_blocking      = DLM_LOCK_IV;
 
 455         res->l_action        = OCFS2_AST_INVALID;
 
 456         res->l_unlock_action = OCFS2_UNLOCK_INVALID;
 
 458         res->l_flags         = OCFS2_LOCK_INITIALIZED;
 
 460         ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
 
 462         ocfs2_init_lock_stats(res);
 
 465 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
 
 467         /* This also clears out the lock status block */
 
 468         memset(res, 0, sizeof(struct ocfs2_lock_res));
 
 469         spin_lock_init(&res->l_lock);
 
 470         init_waitqueue_head(&res->l_event);
 
 471         INIT_LIST_HEAD(&res->l_blocked_list);
 
 472         INIT_LIST_HEAD(&res->l_mask_waiters);
 
 475 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
 
 476                                enum ocfs2_lock_type type,
 
 477                                unsigned int generation,
 
 480         struct ocfs2_lock_res_ops *ops;
 
 483                 case OCFS2_LOCK_TYPE_RW:
 
 484                         ops = &ocfs2_inode_rw_lops;
 
 486                 case OCFS2_LOCK_TYPE_META:
 
 487                         ops = &ocfs2_inode_inode_lops;
 
 489                 case OCFS2_LOCK_TYPE_OPEN:
 
 490                         ops = &ocfs2_inode_open_lops;
 
 493                         mlog_bug_on_msg(1, "type: %d\n", type);
 
 494                         ops = NULL; /* thanks, gcc */
 
 498         ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
 
 499                               generation, res->l_name);
 
 500         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
 
 503 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
 
 505         struct inode *inode = ocfs2_lock_res_inode(lockres);
 
 507         return OCFS2_SB(inode->i_sb);
 
 510 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
 
 512         struct ocfs2_file_private *fp = lockres->l_priv;
 
 514         return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
 
 517 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
 
 519         __be64 inode_blkno_be;
 
 521         memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
 
 524         return be64_to_cpu(inode_blkno_be);
 
 527 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
 
 529         struct ocfs2_dentry_lock *dl = lockres->l_priv;
 
 531         return OCFS2_SB(dl->dl_inode->i_sb);
 
 534 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
 
 535                                 u64 parent, struct inode *inode)
 
 538         u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
 
 539         __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
 
 540         struct ocfs2_lock_res *lockres = &dl->dl_lockres;
 
 542         ocfs2_lock_res_init_once(lockres);
 
 545          * Unfortunately, the standard lock naming scheme won't work
 
 546          * here because we have two 16 byte values to use. Instead,
 
 547          * we'll stuff the inode number as a binary value. We still
 
 548          * want error prints to show something without garbling the
 
 549          * display, so drop a null byte in there before the inode
 
 550          * number. A future version of OCFS2 will likely use all
 
 551          * binary lock names. The stringified names have been a
 
 552          * tremendous aid in debugging, but now that the debugfs
 
 553          * interface exists, we can mangle things there if need be.
 
 555          * NOTE: We also drop the standard "pad" value (the total lock
 
 556          * name size stays the same though - the last part is all
 
 557          * zeros due to the memset in ocfs2_lock_res_init_once()
 
 559         len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
 
 561                        ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
 
 564         BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
 
 566         memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
 
 569         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
 
 570                                    OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
 
 574 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
 
 575                                       struct ocfs2_super *osb)
 
 577         /* Superblock lockres doesn't come from a slab so we call init
 
 578          * once on it manually.  */
 
 579         ocfs2_lock_res_init_once(res);
 
 580         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
 
 582         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
 
 583                                    &ocfs2_super_lops, osb);
 
 586 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
 
 587                                        struct ocfs2_super *osb)
 
 589         /* Rename lockres doesn't come from a slab so we call init
 
 590          * once on it manually.  */
 
 591         ocfs2_lock_res_init_once(res);
 
 592         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
 
 593         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
 
 594                                    &ocfs2_rename_lops, osb);
 
 597 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
 
 598                               struct ocfs2_file_private *fp)
 
 600         struct inode *inode = fp->fp_file->f_mapping->host;
 
 601         struct ocfs2_inode_info *oi = OCFS2_I(inode);
 
 603         ocfs2_lock_res_init_once(lockres);
 
 604         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
 
 605                               inode->i_generation, lockres->l_name);
 
 606         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
 
 607                                    OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
 
 609         lockres->l_flags |= OCFS2_LOCK_NOCACHE;
 
 612 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
 
 616         if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
 
 619         ocfs2_remove_lockres_tracking(res);
 
 621         mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
 
 622                         "Lockres %s is on the blocked list\n",
 
 624         mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
 
 625                         "Lockres %s has mask waiters pending\n",
 
 627         mlog_bug_on_msg(spin_is_locked(&res->l_lock),
 
 628                         "Lockres %s is locked\n",
 
 630         mlog_bug_on_msg(res->l_ro_holders,
 
 631                         "Lockres %s has %u ro holders\n",
 
 632                         res->l_name, res->l_ro_holders);
 
 633         mlog_bug_on_msg(res->l_ex_holders,
 
 634                         "Lockres %s has %u ex holders\n",
 
 635                         res->l_name, res->l_ex_holders);
 
 637         /* Need to clear out the lock status block for the dlm */
 
 638         memset(&res->l_lksb, 0, sizeof(res->l_lksb));
 
 644 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
 
 653                 lockres->l_ex_holders++;
 
 656                 lockres->l_ro_holders++;
 
 665 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
 
 674                 BUG_ON(!lockres->l_ex_holders);
 
 675                 lockres->l_ex_holders--;
 
 678                 BUG_ON(!lockres->l_ro_holders);
 
 679                 lockres->l_ro_holders--;
 
 687 /* WARNING: This function lives in a world where the only three lock
 
 688  * levels are EX, PR, and NL. It *will* have to be adjusted when more
 
 689  * lock types are added. */
 
 690 static inline int ocfs2_highest_compat_lock_level(int level)
 
 692         int new_level = DLM_LOCK_EX;
 
 694         if (level == DLM_LOCK_EX)
 
 695                 new_level = DLM_LOCK_NL;
 
 696         else if (level == DLM_LOCK_PR)
 
 697                 new_level = DLM_LOCK_PR;
 
 701 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
 
 702                               unsigned long newflags)
 
 704         struct ocfs2_mask_waiter *mw, *tmp;
 
 706         assert_spin_locked(&lockres->l_lock);
 
 708         lockres->l_flags = newflags;
 
 710         list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
 
 711                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
 
 714                 list_del_init(&mw->mw_item);
 
 716                 complete(&mw->mw_complete);
 
 719 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
 
 721         lockres_set_flags(lockres, lockres->l_flags | or);
 
 723 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
 
 726         lockres_set_flags(lockres, lockres->l_flags & ~clear);
 
 729 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
 
 733         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
 
 734         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
 
 735         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
 
 736         BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
 
 738         lockres->l_level = lockres->l_requested;
 
 739         if (lockres->l_level <=
 
 740             ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
 
 741                 lockres->l_blocking = DLM_LOCK_NL;
 
 742                 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
 
 744         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 
 749 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
 
 753         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
 
 754         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
 
 756         /* Convert from RO to EX doesn't really need anything as our
 
 757          * information is already up to data. Convert from NL to
 
 758          * *anything* however should mark ourselves as needing an
 
 760         if (lockres->l_level == DLM_LOCK_NL &&
 
 761             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
 
 762                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 
 764         lockres->l_level = lockres->l_requested;
 
 765         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 
 770 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
 
 774         BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
 
 775         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
 
 777         if (lockres->l_requested > DLM_LOCK_NL &&
 
 778             !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
 
 779             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
 
 780                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 
 782         lockres->l_level = lockres->l_requested;
 
 783         lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
 
 784         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 
 789 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
 
 792         int needs_downconvert = 0;
 
 795         assert_spin_locked(&lockres->l_lock);
 
 797         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
 
 799         if (level > lockres->l_blocking) {
 
 800                 /* only schedule a downconvert if we haven't already scheduled
 
 801                  * one that goes low enough to satisfy the level we're
 
 802                  * blocking.  this also catches the case where we get
 
 804                 if (ocfs2_highest_compat_lock_level(level) <
 
 805                     ocfs2_highest_compat_lock_level(lockres->l_blocking))
 
 806                         needs_downconvert = 1;
 
 808                 lockres->l_blocking = level;
 
 811         mlog_exit(needs_downconvert);
 
 812         return needs_downconvert;
 
 816  * OCFS2_LOCK_PENDING and l_pending_gen.
 
 818  * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
 
 819  * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
 
 820  * for more details on the race.
 
 822  * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
 
 823  * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
 
 824  * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
 
 825  * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
 
 826  * the caller is going to try to clear PENDING again.  If nothing else is
 
 827  * happening, __lockres_clear_pending() sees PENDING is unset and does
 
 830  * But what if another path (eg downconvert thread) has just started a
 
 831  * new locking action?  The other path has re-set PENDING.  Our path
 
 832  * cannot clear PENDING, because that will re-open the original race
 
 838  *  ocfs2_cluster_lock()
 
 843  *    ocfs2_locking_ast()               ocfs2_downconvert_thread()
 
 844  *     clear PENDING                     ocfs2_unblock_lock()
 
 847  *                                        ocfs2_prepare_downconvert()
 
 857  * So as you can see, we now have a window where l_lock is not held,
 
 858  * PENDING is not set, and ocfs2_dlm_lock() has not been called.
 
 860  * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
 
 861  * set by ocfs2_prepare_downconvert().  That wasn't nice.
 
 863  * To solve this we introduce l_pending_gen.  A call to
 
 864  * lockres_clear_pending() will only do so when it is passed a generation
 
 865  * number that matches the lockres.  lockres_set_pending() will return the
 
 866  * current generation number.  When ocfs2_cluster_lock() goes to clear
 
 867  * PENDING, it passes the generation it got from set_pending().  In our
 
 868  * example above, the generation numbers will *not* match.  Thus,
 
 869  * ocfs2_cluster_lock() will not clear the PENDING set by
 
 870  * ocfs2_prepare_downconvert().
 
 873 /* Unlocked version for ocfs2_locking_ast() */
 
 874 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
 
 875                                     unsigned int generation,
 
 876                                     struct ocfs2_super *osb)
 
 878         assert_spin_locked(&lockres->l_lock);
 
 881          * The ast and locking functions can race us here.  The winner
 
 882          * will clear pending, the loser will not.
 
 884         if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
 
 885             (lockres->l_pending_gen != generation))
 
 888         lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
 
 889         lockres->l_pending_gen++;
 
 892          * The downconvert thread may have skipped us because we
 
 893          * were PENDING.  Wake it up.
 
 895         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
 
 896                 ocfs2_wake_downconvert_thread(osb);
 
 899 /* Locked version for callers of ocfs2_dlm_lock() */
 
 900 static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
 
 901                                   unsigned int generation,
 
 902                                   struct ocfs2_super *osb)
 
 906         spin_lock_irqsave(&lockres->l_lock, flags);
 
 907         __lockres_clear_pending(lockres, generation, osb);
 
 908         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
 911 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
 
 913         assert_spin_locked(&lockres->l_lock);
 
 914         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
 
 916         lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
 
 918         return lockres->l_pending_gen;
 
 922 static void ocfs2_blocking_ast(void *opaque, int level)
 
 924         struct ocfs2_lock_res *lockres = opaque;
 
 925         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
 
 926         int needs_downconvert;
 
 929         BUG_ON(level <= DLM_LOCK_NL);
 
 931         mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
 
 932              lockres->l_name, level, lockres->l_level,
 
 933              ocfs2_lock_type_string(lockres->l_type));
 
 936          * We can skip the bast for locks which don't enable caching -
 
 937          * they'll be dropped at the earliest possible time anyway.
 
 939         if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
 
 942         spin_lock_irqsave(&lockres->l_lock, flags);
 
 943         needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
 
 944         if (needs_downconvert)
 
 945                 ocfs2_schedule_blocked_lock(osb, lockres);
 
 946         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
 948         wake_up(&lockres->l_event);
 
 950         ocfs2_wake_downconvert_thread(osb);
 
 953 static void ocfs2_locking_ast(void *opaque)
 
 955         struct ocfs2_lock_res *lockres = opaque;
 
 956         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
 
 960         spin_lock_irqsave(&lockres->l_lock, flags);
 
 962         status = ocfs2_dlm_lock_status(&lockres->l_lksb);
 
 964         if (status == -EAGAIN) {
 
 965                 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 
 970                 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
 
 971                      lockres->l_name, status);
 
 972                 spin_unlock_irqrestore(&lockres->l_lock, flags);
 
 976         switch(lockres->l_action) {
 
 977         case OCFS2_AST_ATTACH:
 
 978                 ocfs2_generic_handle_attach_action(lockres);
 
 979                 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
 
 981         case OCFS2_AST_CONVERT:
 
 982                 ocfs2_generic_handle_convert_action(lockres);
 
 984         case OCFS2_AST_DOWNCONVERT:
 
 985                 ocfs2_generic_handle_downconvert_action(lockres);
 
 988                 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
 
 989                      "lockres flags = 0x%lx, unlock action: %u\n",
 
 990                      lockres->l_name, lockres->l_action, lockres->l_flags,
 
 991                      lockres->l_unlock_action);
 
 995         /* set it to something invalid so if we get called again we
 
 997         lockres->l_action = OCFS2_AST_INVALID;
 
 999         /* Did we try to cancel this lock?  Clear that state */
 
1000         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
 
1001                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
 
1004          * We may have beaten the locking functions here.  We certainly
 
1005          * know that dlm_lock() has been called :-)
 
1006          * Because we can't have two lock calls in flight at once, we
 
1007          * can use lockres->l_pending_gen.
 
1009         __lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
 
1011         wake_up(&lockres->l_event);
 
1012         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1015 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
 
1018         unsigned long flags;
 
1021         spin_lock_irqsave(&lockres->l_lock, flags);
 
1022         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 
1024                 lockres->l_action = OCFS2_AST_INVALID;
 
1026                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
 
1027         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1029         wake_up(&lockres->l_event);
 
1033 /* Note: If we detect another process working on the lock (i.e.,
 
1034  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
 
1035  * to do the right thing in that case.
 
1037 static int ocfs2_lock_create(struct ocfs2_super *osb,
 
1038                              struct ocfs2_lock_res *lockres,
 
1043         unsigned long flags;
 
1048         mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
 
1051         spin_lock_irqsave(&lockres->l_lock, flags);
 
1052         if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
 
1053             (lockres->l_flags & OCFS2_LOCK_BUSY)) {
 
1054                 spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1058         lockres->l_action = OCFS2_AST_ATTACH;
 
1059         lockres->l_requested = level;
 
1060         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
 
1061         gen = lockres_set_pending(lockres);
 
1062         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1064         ret = ocfs2_dlm_lock(osb->cconn,
 
1069                              OCFS2_LOCK_ID_MAX_LEN - 1,
 
1071         lockres_clear_pending(lockres, gen, osb);
 
1073                 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
 
1074                 ocfs2_recover_from_dlm_error(lockres, 1);
 
1077         mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
 
1084 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
 
1087         unsigned long flags;
 
1090         spin_lock_irqsave(&lockres->l_lock, flags);
 
1091         ret = lockres->l_flags & flag;
 
1092         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1097 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
 
1100         wait_event(lockres->l_event,
 
1101                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
 
1104 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
 
1107         wait_event(lockres->l_event,
 
1108                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
 
1111 /* predict what lock level we'll be dropping down to on behalf
 
1112  * of another node, and return true if the currently wanted
 
1113  * level will be compatible with it. */
 
1114 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
 
1117         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
 
1119         return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
 
1122 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
 
1124         INIT_LIST_HEAD(&mw->mw_item);
 
1125         init_completion(&mw->mw_complete);
 
1126         ocfs2_init_start_time(mw);
 
1129 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
 
1131         wait_for_completion(&mw->mw_complete);
 
1132         /* Re-arm the completion in case we want to wait on it again */
 
1133         INIT_COMPLETION(mw->mw_complete);
 
1134         return mw->mw_status;
 
1137 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
 
1138                                     struct ocfs2_mask_waiter *mw,
 
1142         BUG_ON(!list_empty(&mw->mw_item));
 
1144         assert_spin_locked(&lockres->l_lock);
 
1146         list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
 
1151 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
 
1152  * if the mask still hadn't reached its goal */
 
1153 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
 
1154                                       struct ocfs2_mask_waiter *mw)
 
1156         unsigned long flags;
 
1159         spin_lock_irqsave(&lockres->l_lock, flags);
 
1160         if (!list_empty(&mw->mw_item)) {
 
1161                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
 
1164                 list_del_init(&mw->mw_item);
 
1165                 init_completion(&mw->mw_complete);
 
1167         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1173 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
 
1174                                              struct ocfs2_lock_res *lockres)
 
1178         ret = wait_for_completion_interruptible(&mw->mw_complete);
 
1180                 lockres_remove_mask_waiter(lockres, mw);
 
1182                 ret = mw->mw_status;
 
1183         /* Re-arm the completion in case we want to wait on it again */
 
1184         INIT_COMPLETION(mw->mw_complete);
 
1188 static int ocfs2_cluster_lock(struct ocfs2_super *osb,
 
1189                               struct ocfs2_lock_res *lockres,
 
1194         struct ocfs2_mask_waiter mw;
 
1195         int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
 
1196         int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
 
1197         unsigned long flags;
 
1199         int noqueue_attempted = 0;
 
1203         ocfs2_init_mask_waiter(&mw);
 
1205         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
 
1206                 lkm_flags |= DLM_LKF_VALBLK;
 
1211         if (catch_signals && signal_pending(current)) {
 
1216         spin_lock_irqsave(&lockres->l_lock, flags);
 
1218         mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
 
1219                         "Cluster lock called on freeing lockres %s! flags "
 
1220                         "0x%lx\n", lockres->l_name, lockres->l_flags);
 
1222         /* We only compare against the currently granted level
 
1223          * here. If the lock is blocked waiting on a downconvert,
 
1224          * we'll get caught below. */
 
1225         if (lockres->l_flags & OCFS2_LOCK_BUSY &&
 
1226             level > lockres->l_level) {
 
1227                 /* is someone sitting in dlm_lock? If so, wait on
 
1229                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
 
1234         if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
 
1235             !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
 
1236                 /* is the lock is currently blocked on behalf of
 
1238                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
 
1243         if (level > lockres->l_level) {
 
1244                 if (noqueue_attempted > 0) {
 
1248                 if (lkm_flags & DLM_LKF_NOQUEUE)
 
1249                         noqueue_attempted = 1;
 
1251                 if (lockres->l_action != OCFS2_AST_INVALID)
 
1252                         mlog(ML_ERROR, "lockres %s has action %u pending\n",
 
1253                              lockres->l_name, lockres->l_action);
 
1255                 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
 
1256                         lockres->l_action = OCFS2_AST_ATTACH;
 
1257                         lkm_flags &= ~DLM_LKF_CONVERT;
 
1259                         lockres->l_action = OCFS2_AST_CONVERT;
 
1260                         lkm_flags |= DLM_LKF_CONVERT;
 
1263                 lockres->l_requested = level;
 
1264                 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
 
1265                 gen = lockres_set_pending(lockres);
 
1266                 spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1268                 BUG_ON(level == DLM_LOCK_IV);
 
1269                 BUG_ON(level == DLM_LOCK_NL);
 
1271                 mlog(0, "lock %s, convert from %d to level = %d\n",
 
1272                      lockres->l_name, lockres->l_level, level);
 
1274                 /* call dlm_lock to upgrade lock now */
 
1275                 ret = ocfs2_dlm_lock(osb->cconn,
 
1280                                      OCFS2_LOCK_ID_MAX_LEN - 1,
 
1282                 lockres_clear_pending(lockres, gen, osb);
 
1284                         if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
 
1286                                 ocfs2_log_dlm_error("ocfs2_dlm_lock",
 
1289                         ocfs2_recover_from_dlm_error(lockres, 1);
 
1293                 mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n",
 
1296                 /* At this point we've gone inside the dlm and need to
 
1297                  * complete our work regardless. */
 
1300                 /* wait for busy to clear and carry on */
 
1304         /* Ok, if we get here then we're good to go. */
 
1305         ocfs2_inc_holders(lockres, level);
 
1309         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1312          * This is helping work around a lock inversion between the page lock
 
1313          * and dlm locks.  One path holds the page lock while calling aops
 
1314          * which block acquiring dlm locks.  The voting thread holds dlm
 
1315          * locks while acquiring page locks while down converting data locks.
 
1316          * This block is helping an aop path notice the inversion and back
 
1317          * off to unlock its page lock before trying the dlm lock again.
 
1319         if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
 
1320             mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
 
1322                 if (lockres_remove_mask_waiter(lockres, &mw))
 
1328                 ret = ocfs2_wait_for_mask(&mw);
 
1333         ocfs2_update_lock_stats(lockres, level, &mw, ret);
 
1339 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
 
1340                                  struct ocfs2_lock_res *lockres,
 
1343         unsigned long flags;
 
1346         spin_lock_irqsave(&lockres->l_lock, flags);
 
1347         ocfs2_dec_holders(lockres, level);
 
1348         ocfs2_downconvert_on_unlock(osb, lockres);
 
1349         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1353 static int ocfs2_create_new_lock(struct ocfs2_super *osb,
 
1354                                  struct ocfs2_lock_res *lockres,
 
1358         int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
 
1359         unsigned long flags;
 
1360         u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
 
1362         spin_lock_irqsave(&lockres->l_lock, flags);
 
1363         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
 
1364         lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
 
1365         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1367         return ocfs2_lock_create(osb, lockres, level, lkm_flags);
 
1370 /* Grants us an EX lock on the data and metadata resources, skipping
 
1371  * the normal cluster directory lookup. Use this ONLY on newly created
 
1372  * inodes which other nodes can't possibly see, and which haven't been
 
1373  * hashed in the inode hash yet. This can give us a good performance
 
1374  * increase as it'll skip the network broadcast normally associated
 
1375  * with creating a new lock resource. */
 
1376 int ocfs2_create_new_inode_locks(struct inode *inode)
 
1379         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
1382         BUG_ON(!ocfs2_inode_is_new(inode));
 
1386         mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
 
1388         /* NOTE: That we don't increment any of the holder counts, nor
 
1389          * do we add anything to a journal handle. Since this is
 
1390          * supposed to be a new inode which the cluster doesn't know
 
1391          * about yet, there is no need to.  As far as the LVB handling
 
1392          * is concerned, this is basically like acquiring an EX lock
 
1393          * on a resource which has an invalid one -- we'll set it
 
1394          * valid when we release the EX. */
 
1396         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
 
1403          * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
 
1404          * don't use a generation in their lock names.
 
1406         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
 
1412         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
 
1423 int ocfs2_rw_lock(struct inode *inode, int write)
 
1426         struct ocfs2_lock_res *lockres;
 
1427         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
1433         mlog(0, "inode %llu take %s RW lock\n",
 
1434              (unsigned long long)OCFS2_I(inode)->ip_blkno,
 
1435              write ? "EXMODE" : "PRMODE");
 
1437         if (ocfs2_mount_local(osb))
 
1440         lockres = &OCFS2_I(inode)->ip_rw_lockres;
 
1442         level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
 
1444         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
 
1453 void ocfs2_rw_unlock(struct inode *inode, int write)
 
1455         int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
 
1456         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
 
1457         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
1461         mlog(0, "inode %llu drop %s RW lock\n",
 
1462              (unsigned long long)OCFS2_I(inode)->ip_blkno,
 
1463              write ? "EXMODE" : "PRMODE");
 
1465         if (!ocfs2_mount_local(osb))
 
1466                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
 
1472  * ocfs2_open_lock always get PR mode lock.
 
1474 int ocfs2_open_lock(struct inode *inode)
 
1477         struct ocfs2_lock_res *lockres;
 
1478         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
1484         mlog(0, "inode %llu take PRMODE open lock\n",
 
1485              (unsigned long long)OCFS2_I(inode)->ip_blkno);
 
1487         if (ocfs2_mount_local(osb))
 
1490         lockres = &OCFS2_I(inode)->ip_open_lockres;
 
1492         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
 
1502 int ocfs2_try_open_lock(struct inode *inode, int write)
 
1504         int status = 0, level;
 
1505         struct ocfs2_lock_res *lockres;
 
1506         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
1512         mlog(0, "inode %llu try to take %s open lock\n",
 
1513              (unsigned long long)OCFS2_I(inode)->ip_blkno,
 
1514              write ? "EXMODE" : "PRMODE");
 
1516         if (ocfs2_mount_local(osb))
 
1519         lockres = &OCFS2_I(inode)->ip_open_lockres;
 
1521         level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
 
1524          * The file system may already holding a PRMODE/EXMODE open lock.
 
1525          * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
 
1526          * other nodes and the -EAGAIN will indicate to the caller that
 
1527          * this inode is still in use.
 
1529         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
 
1530                                     level, DLM_LKF_NOQUEUE, 0);
 
1538  * ocfs2_open_unlock unlock PR and EX mode open locks.
 
1540 void ocfs2_open_unlock(struct inode *inode)
 
1542         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
 
1543         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
1547         mlog(0, "inode %llu drop open lock\n",
 
1548              (unsigned long long)OCFS2_I(inode)->ip_blkno);
 
1550         if (ocfs2_mount_local(osb))
 
1553         if(lockres->l_ro_holders)
 
1554                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
 
1556         if(lockres->l_ex_holders)
 
1557                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
 
1564 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
 
1568         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
 
1569         unsigned long flags;
 
1570         struct ocfs2_mask_waiter mw;
 
1572         ocfs2_init_mask_waiter(&mw);
 
1575         spin_lock_irqsave(&lockres->l_lock, flags);
 
1576         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
 
1577                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
 
1579                         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1580                         ret = ocfs2_cancel_convert(osb, lockres);
 
1587                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
 
1588                 spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1590                 ocfs2_wait_for_mask(&mw);
 
1596          * We may still have gotten the lock, in which case there's no
 
1597          * point to restarting the syscall.
 
1599         if (lockres->l_level == level)
 
1602         mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
 
1603              lockres->l_flags, lockres->l_level, lockres->l_action);
 
1605         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1612  * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
 
1613  * flock() calls. The locking approach this requires is sufficiently
 
1614  * different from all other cluster lock types that we implement a
 
1615  * seperate path to the "low-level" dlm calls. In particular:
 
1617  * - No optimization of lock levels is done - we take at exactly
 
1618  *   what's been requested.
 
1620  * - No lock caching is employed. We immediately downconvert to
 
1621  *   no-lock at unlock time. This also means flock locks never go on
 
1622  *   the blocking list).
 
1624  * - Since userspace can trivially deadlock itself with flock, we make
 
1625  *   sure to allow cancellation of a misbehaving applications flock()
 
1628  * - Access to any flock lockres doesn't require concurrency, so we
 
1629  *   can simplify the code by requiring the caller to guarantee
 
1630  *   serialization of dlmglue flock calls.
 
1632 int ocfs2_file_lock(struct file *file, int ex, int trylock)
 
1634         int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
 
1635         unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
 
1636         unsigned long flags;
 
1637         struct ocfs2_file_private *fp = file->private_data;
 
1638         struct ocfs2_lock_res *lockres = &fp->fp_flock;
 
1639         struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
 
1640         struct ocfs2_mask_waiter mw;
 
1642         ocfs2_init_mask_waiter(&mw);
 
1644         if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
 
1645             (lockres->l_level > DLM_LOCK_NL)) {
 
1647                      "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
 
1648                      "level: %u\n", lockres->l_name, lockres->l_flags,
 
1653         spin_lock_irqsave(&lockres->l_lock, flags);
 
1654         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
 
1655                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
 
1656                 spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1659                  * Get the lock at NLMODE to start - that way we
 
1660                  * can cancel the upconvert request if need be.
 
1662                 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
 
1668                 ret = ocfs2_wait_for_mask(&mw);
 
1673                 spin_lock_irqsave(&lockres->l_lock, flags);
 
1676         lockres->l_action = OCFS2_AST_CONVERT;
 
1677         lkm_flags |= DLM_LKF_CONVERT;
 
1678         lockres->l_requested = level;
 
1679         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
 
1681         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
 
1682         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1684         ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
 
1685                              lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
 
1688                 if (!trylock || (ret != -EAGAIN)) {
 
1689                         ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
 
1693                 ocfs2_recover_from_dlm_error(lockres, 1);
 
1694                 lockres_remove_mask_waiter(lockres, &mw);
 
1698         ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
 
1699         if (ret == -ERESTARTSYS) {
 
1701                  * Userspace can cause deadlock itself with
 
1702                  * flock(). Current behavior locally is to allow the
 
1703                  * deadlock, but abort the system call if a signal is
 
1704                  * received. We follow this example, otherwise a
 
1705                  * poorly written program could sit in kernel until
 
1708                  * Handling this is a bit more complicated for Ocfs2
 
1709                  * though. We can't exit this function with an
 
1710                  * outstanding lock request, so a cancel convert is
 
1711                  * required. We intentionally overwrite 'ret' - if the
 
1712                  * cancel fails and the lock was granted, it's easier
 
1713                  * to just bubble sucess back up to the user.
 
1715                 ret = ocfs2_flock_handle_signal(lockres, level);
 
1716         } else if (!ret && (level > lockres->l_level)) {
 
1717                 /* Trylock failed asynchronously */
 
1724         mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
 
1725              lockres->l_name, ex, trylock, ret);
 
1729 void ocfs2_file_unlock(struct file *file)
 
1733         unsigned long flags;
 
1734         struct ocfs2_file_private *fp = file->private_data;
 
1735         struct ocfs2_lock_res *lockres = &fp->fp_flock;
 
1736         struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
 
1737         struct ocfs2_mask_waiter mw;
 
1739         ocfs2_init_mask_waiter(&mw);
 
1741         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
 
1744         if (lockres->l_level == DLM_LOCK_NL)
 
1747         mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
 
1748              lockres->l_name, lockres->l_flags, lockres->l_level,
 
1751         spin_lock_irqsave(&lockres->l_lock, flags);
 
1753          * Fake a blocking ast for the downconvert code.
 
1755         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
 
1756         lockres->l_blocking = DLM_LOCK_EX;
 
1758         gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
 
1759         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
 
1760         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1762         ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
 
1768         ret = ocfs2_wait_for_mask(&mw);
 
1773 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
 
1774                                         struct ocfs2_lock_res *lockres)
 
1780         /* If we know that another node is waiting on our lock, kick
 
1781          * the downconvert thread * pre-emptively when we reach a release
 
1783         if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
 
1784                 switch(lockres->l_blocking) {
 
1786                         if (!lockres->l_ex_holders && !lockres->l_ro_holders)
 
1790                         if (!lockres->l_ex_holders)
 
1799                 ocfs2_wake_downconvert_thread(osb);
 
1804 #define OCFS2_SEC_BITS   34
 
1805 #define OCFS2_SEC_SHIFT  (64 - 34)
 
1806 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
 
1808 /* LVB only has room for 64 bits of time here so we pack it for
 
1810 static u64 ocfs2_pack_timespec(struct timespec *spec)
 
1813         u64 sec = spec->tv_sec;
 
1814         u32 nsec = spec->tv_nsec;
 
1816         res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
 
1821 /* Call this with the lockres locked. I am reasonably sure we don't
 
1822  * need ip_lock in this function as anyone who would be changing those
 
1823  * values is supposed to be blocked in ocfs2_inode_lock right now. */
 
1824 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
 
1826         struct ocfs2_inode_info *oi = OCFS2_I(inode);
 
1827         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
 
1828         struct ocfs2_meta_lvb *lvb;
 
1832         lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
 
1835          * Invalidate the LVB of a deleted inode - this way other
 
1836          * nodes are forced to go to disk and discover the new inode
 
1839         if (oi->ip_flags & OCFS2_INODE_DELETED) {
 
1840                 lvb->lvb_version = 0;
 
1844         lvb->lvb_version   = OCFS2_LVB_VERSION;
 
1845         lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
 
1846         lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
 
1847         lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
 
1848         lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
 
1849         lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
 
1850         lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
 
1851         lvb->lvb_iatime_packed  =
 
1852                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
 
1853         lvb->lvb_ictime_packed =
 
1854                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
 
1855         lvb->lvb_imtime_packed =
 
1856                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
 
1857         lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
 
1858         lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
 
1859         lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
 
1862         mlog_meta_lvb(0, lockres);
 
1867 static void ocfs2_unpack_timespec(struct timespec *spec,
 
1870         spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
 
1871         spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
 
1874 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
 
1876         struct ocfs2_inode_info *oi = OCFS2_I(inode);
 
1877         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
 
1878         struct ocfs2_meta_lvb *lvb;
 
1882         mlog_meta_lvb(0, lockres);
 
1884         lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
 
1886         /* We're safe here without the lockres lock... */
 
1887         spin_lock(&oi->ip_lock);
 
1888         oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
 
1889         i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
 
1891         oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
 
1892         oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
 
1893         ocfs2_set_inode_flags(inode);
 
1895         /* fast-symlinks are a special case */
 
1896         if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
 
1897                 inode->i_blocks = 0;
 
1899                 inode->i_blocks = ocfs2_inode_sector_count(inode);
 
1901         inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
 
1902         inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
 
1903         inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
 
1904         inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
 
1905         ocfs2_unpack_timespec(&inode->i_atime,
 
1906                               be64_to_cpu(lvb->lvb_iatime_packed));
 
1907         ocfs2_unpack_timespec(&inode->i_mtime,
 
1908                               be64_to_cpu(lvb->lvb_imtime_packed));
 
1909         ocfs2_unpack_timespec(&inode->i_ctime,
 
1910                               be64_to_cpu(lvb->lvb_ictime_packed));
 
1911         spin_unlock(&oi->ip_lock);
 
1916 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
 
1917                                               struct ocfs2_lock_res *lockres)
 
1919         struct ocfs2_meta_lvb *lvb =
 
1920                 (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
 
1922         if (lvb->lvb_version == OCFS2_LVB_VERSION
 
1923             && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
 
1928 /* Determine whether a lock resource needs to be refreshed, and
 
1929  * arbitrate who gets to refresh it.
 
1931  *   0 means no refresh needed.
 
1933  *   > 0 means you need to refresh this and you MUST call
 
1934  *   ocfs2_complete_lock_res_refresh afterwards. */
 
1935 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
 
1937         unsigned long flags;
 
1943         spin_lock_irqsave(&lockres->l_lock, flags);
 
1944         if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
 
1945                 spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1949         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
 
1950                 spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1952                 ocfs2_wait_on_refreshing_lock(lockres);
 
1956         /* Ok, I'll be the one to refresh this lock. */
 
1957         lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
 
1958         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1966 /* If status is non zero, I'll mark it as not being in refresh
 
1967  * anymroe, but i won't clear the needs refresh flag. */
 
1968 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
 
1971         unsigned long flags;
 
1974         spin_lock_irqsave(&lockres->l_lock, flags);
 
1975         lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
 
1977                 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 
1978         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
1980         wake_up(&lockres->l_event);
 
1985 /* may or may not return a bh if it went to disk. */
 
1986 static int ocfs2_inode_lock_update(struct inode *inode,
 
1987                                   struct buffer_head **bh)
 
1990         struct ocfs2_inode_info *oi = OCFS2_I(inode);
 
1991         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
 
1992         struct ocfs2_dinode *fe;
 
1993         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
1997         if (ocfs2_mount_local(osb))
 
2000         spin_lock(&oi->ip_lock);
 
2001         if (oi->ip_flags & OCFS2_INODE_DELETED) {
 
2002                 mlog(0, "Orphaned inode %llu was deleted while we "
 
2003                      "were waiting on a lock. ip_flags = 0x%x\n",
 
2004                      (unsigned long long)oi->ip_blkno, oi->ip_flags);
 
2005                 spin_unlock(&oi->ip_lock);
 
2009         spin_unlock(&oi->ip_lock);
 
2011         if (!ocfs2_should_refresh_lock_res(lockres))
 
2014         /* This will discard any caching information we might have had
 
2015          * for the inode metadata. */
 
2016         ocfs2_metadata_cache_purge(inode);
 
2018         ocfs2_extent_map_trunc(inode, 0);
 
2020         if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
 
2021                 mlog(0, "Trusting LVB on inode %llu\n",
 
2022                      (unsigned long long)oi->ip_blkno);
 
2023                 ocfs2_refresh_inode_from_lvb(inode);
 
2025                 /* Boo, we have to go to disk. */
 
2026                 /* read bh, cast, ocfs2_refresh_inode */
 
2027                 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
 
2028                                           bh, OCFS2_BH_CACHED, inode);
 
2033                 fe = (struct ocfs2_dinode *) (*bh)->b_data;
 
2035                 /* This is a good chance to make sure we're not
 
2036                  * locking an invalid object.
 
2038                  * We bug on a stale inode here because we checked
 
2039                  * above whether it was wiped from disk. The wiping
 
2040                  * node provides a guarantee that we receive that
 
2041                  * message and can mark the inode before dropping any
 
2042                  * locks associated with it. */
 
2043                 if (!OCFS2_IS_VALID_DINODE(fe)) {
 
2044                         OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
 
2048                 mlog_bug_on_msg(inode->i_generation !=
 
2049                                 le32_to_cpu(fe->i_generation),
 
2050                                 "Invalid dinode %llu disk generation: %u "
 
2051                                 "inode->i_generation: %u\n",
 
2052                                 (unsigned long long)oi->ip_blkno,
 
2053                                 le32_to_cpu(fe->i_generation),
 
2054                                 inode->i_generation);
 
2055                 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
 
2056                                 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
 
2057                                 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
 
2058                                 (unsigned long long)oi->ip_blkno,
 
2059                                 (unsigned long long)le64_to_cpu(fe->i_dtime),
 
2060                                 le32_to_cpu(fe->i_flags));
 
2062                 ocfs2_refresh_inode(inode, fe);
 
2063                 ocfs2_track_lock_refresh(lockres);
 
2068         ocfs2_complete_lock_res_refresh(lockres, status);
 
2074 static int ocfs2_assign_bh(struct inode *inode,
 
2075                            struct buffer_head **ret_bh,
 
2076                            struct buffer_head *passed_bh)
 
2081                 /* Ok, the update went to disk for us, use the
 
2083                 *ret_bh = passed_bh;
 
2089         status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
 
2090                                   OCFS2_I(inode)->ip_blkno,
 
2101  * returns < 0 error if the callback will never be called, otherwise
 
2102  * the result of the lock will be communicated via the callback.
 
2104 int ocfs2_inode_lock_full(struct inode *inode,
 
2105                          struct buffer_head **ret_bh,
 
2109         int status, level, acquired;
 
2111         struct ocfs2_lock_res *lockres = NULL;
 
2112         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
2113         struct buffer_head *local_bh = NULL;
 
2119         mlog(0, "inode %llu, take %s META lock\n",
 
2120              (unsigned long long)OCFS2_I(inode)->ip_blkno,
 
2121              ex ? "EXMODE" : "PRMODE");
 
2125         /* We'll allow faking a readonly metadata lock for
 
2127         if (ocfs2_is_hard_readonly(osb)) {
 
2133         if (ocfs2_mount_local(osb))
 
2136         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
 
2137                 ocfs2_wait_for_recovery(osb);
 
2139         lockres = &OCFS2_I(inode)->ip_inode_lockres;
 
2140         level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
 
2142         if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
 
2143                 dlm_flags |= DLM_LKF_NOQUEUE;
 
2145         status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
 
2147                 if (status != -EAGAIN && status != -EIOCBRETRY)
 
2152         /* Notify the error cleanup path to drop the cluster lock. */
 
2155         /* We wait twice because a node may have died while we were in
 
2156          * the lower dlm layers. The second time though, we've
 
2157          * committed to owning this lock so we don't allow signals to
 
2158          * abort the operation. */
 
2159         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
 
2160                 ocfs2_wait_for_recovery(osb);
 
2164          * We only see this flag if we're being called from
 
2165          * ocfs2_read_locked_inode(). It means we're locking an inode
 
2166          * which hasn't been populated yet, so clear the refresh flag
 
2167          * and let the caller handle it.
 
2169         if (inode->i_state & I_NEW) {
 
2172                         ocfs2_complete_lock_res_refresh(lockres, 0);
 
2176         /* This is fun. The caller may want a bh back, or it may
 
2177          * not. ocfs2_inode_lock_update definitely wants one in, but
 
2178          * may or may not read one, depending on what's in the
 
2179          * LVB. The result of all of this is that we've *only* gone to
 
2180          * disk if we have to, so the complexity is worthwhile. */
 
2181         status = ocfs2_inode_lock_update(inode, &local_bh);
 
2183                 if (status != -ENOENT)
 
2189                 status = ocfs2_assign_bh(inode, ret_bh, local_bh);
 
2198                 if (ret_bh && (*ret_bh)) {
 
2203                         ocfs2_inode_unlock(inode, ex);
 
2214  * This is working around a lock inversion between tasks acquiring DLM
 
2215  * locks while holding a page lock and the downconvert thread which
 
2216  * blocks dlm lock acquiry while acquiring page locks.
 
2218  * ** These _with_page variantes are only intended to be called from aop
 
2219  * methods that hold page locks and return a very specific *positive* error
 
2220  * code that aop methods pass up to the VFS -- test for errors with != 0. **
 
2222  * The DLM is called such that it returns -EAGAIN if it would have
 
2223  * blocked waiting for the downconvert thread.  In that case we unlock
 
2224  * our page so the downconvert thread can make progress.  Once we've
 
2225  * done this we have to return AOP_TRUNCATED_PAGE so the aop method
 
2226  * that called us can bubble that back up into the VFS who will then
 
2227  * immediately retry the aop call.
 
2229  * We do a blocking lock and immediate unlock before returning, though, so that
 
2230  * the lock has a great chance of being cached on this node by the time the VFS
 
2231  * calls back to retry the aop.    This has a potential to livelock as nodes
 
2232  * ping locks back and forth, but that's a risk we're willing to take to avoid
 
2233  * the lock inversion simply.
 
2235 int ocfs2_inode_lock_with_page(struct inode *inode,
 
2236                               struct buffer_head **ret_bh,
 
2242         ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
 
2243         if (ret == -EAGAIN) {
 
2245                 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
 
2246                         ocfs2_inode_unlock(inode, ex);
 
2247                 ret = AOP_TRUNCATED_PAGE;
 
2253 int ocfs2_inode_lock_atime(struct inode *inode,
 
2254                           struct vfsmount *vfsmnt,
 
2260         ret = ocfs2_inode_lock(inode, NULL, 0);
 
2267          * If we should update atime, we will get EX lock,
 
2268          * otherwise we just get PR lock.
 
2270         if (ocfs2_should_update_atime(inode, vfsmnt)) {
 
2271                 struct buffer_head *bh = NULL;
 
2273                 ocfs2_inode_unlock(inode, 0);
 
2274                 ret = ocfs2_inode_lock(inode, &bh, 1);
 
2280                 if (ocfs2_should_update_atime(inode, vfsmnt))
 
2281                         ocfs2_update_inode_atime(inode, bh);
 
2291 void ocfs2_inode_unlock(struct inode *inode,
 
2294         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
 
2295         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
 
2296         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
2300         mlog(0, "inode %llu drop %s META lock\n",
 
2301              (unsigned long long)OCFS2_I(inode)->ip_blkno,
 
2302              ex ? "EXMODE" : "PRMODE");
 
2304         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
 
2305             !ocfs2_mount_local(osb))
 
2306                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
 
2311 int ocfs2_super_lock(struct ocfs2_super *osb,
 
2315         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
 
2316         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
 
2320         if (ocfs2_is_hard_readonly(osb))
 
2323         if (ocfs2_mount_local(osb))
 
2326         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
 
2332         /* The super block lock path is really in the best position to
 
2333          * know when resources covered by the lock need to be
 
2334          * refreshed, so we do it here. Of course, making sense of
 
2335          * everything is up to the caller :) */
 
2336         status = ocfs2_should_refresh_lock_res(lockres);
 
2342                 status = ocfs2_refresh_slot_info(osb);
 
2344                 ocfs2_complete_lock_res_refresh(lockres, status);
 
2348                 ocfs2_track_lock_refresh(lockres);
 
2355 void ocfs2_super_unlock(struct ocfs2_super *osb,
 
2358         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
 
2359         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
 
2361         if (!ocfs2_mount_local(osb))
 
2362                 ocfs2_cluster_unlock(osb, lockres, level);
 
2365 int ocfs2_rename_lock(struct ocfs2_super *osb)
 
2368         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
 
2370         if (ocfs2_is_hard_readonly(osb))
 
2373         if (ocfs2_mount_local(osb))
 
2376         status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
 
2383 void ocfs2_rename_unlock(struct ocfs2_super *osb)
 
2385         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
 
2387         if (!ocfs2_mount_local(osb))
 
2388                 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
 
2391 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
 
2394         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
 
2395         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
 
2396         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
 
2400         if (ocfs2_is_hard_readonly(osb))
 
2403         if (ocfs2_mount_local(osb))
 
2406         ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
 
2413 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
 
2415         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
 
2416         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
 
2417         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
 
2419         if (!ocfs2_mount_local(osb))
 
2420                 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
 
2423 /* Reference counting of the dlm debug structure. We want this because
 
2424  * open references on the debug inodes can live on after a mount, so
 
2425  * we can't rely on the ocfs2_super to always exist. */
 
2426 static void ocfs2_dlm_debug_free(struct kref *kref)
 
2428         struct ocfs2_dlm_debug *dlm_debug;
 
2430         dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
 
2435 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
 
2438                 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
 
2441 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
 
2443         kref_get(&debug->d_refcnt);
 
2446 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
 
2448         struct ocfs2_dlm_debug *dlm_debug;
 
2450         dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
 
2452                 mlog_errno(-ENOMEM);
 
2456         kref_init(&dlm_debug->d_refcnt);
 
2457         INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
 
2458         dlm_debug->d_locking_state = NULL;
 
2463 /* Access to this is arbitrated for us via seq_file->sem. */
 
2464 struct ocfs2_dlm_seq_priv {
 
2465         struct ocfs2_dlm_debug *p_dlm_debug;
 
2466         struct ocfs2_lock_res p_iter_res;
 
2467         struct ocfs2_lock_res p_tmp_res;
 
2470 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
 
2471                                                  struct ocfs2_dlm_seq_priv *priv)
 
2473         struct ocfs2_lock_res *iter, *ret = NULL;
 
2474         struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
 
2476         assert_spin_locked(&ocfs2_dlm_tracking_lock);
 
2478         list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
 
2479                 /* discover the head of the list */
 
2480                 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
 
2481                         mlog(0, "End of list found, %p\n", ret);
 
2485                 /* We track our "dummy" iteration lockres' by a NULL
 
2487                 if (iter->l_ops != NULL) {
 
2496 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
 
2498         struct ocfs2_dlm_seq_priv *priv = m->private;
 
2499         struct ocfs2_lock_res *iter;
 
2501         spin_lock(&ocfs2_dlm_tracking_lock);
 
2502         iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
 
2504                 /* Since lockres' have the lifetime of their container
 
2505                  * (which can be inodes, ocfs2_supers, etc) we want to
 
2506                  * copy this out to a temporary lockres while still
 
2507                  * under the spinlock. Obviously after this we can't
 
2508                  * trust any pointers on the copy returned, but that's
 
2509                  * ok as the information we want isn't typically held
 
2511                 priv->p_tmp_res = *iter;
 
2512                 iter = &priv->p_tmp_res;
 
2514         spin_unlock(&ocfs2_dlm_tracking_lock);
 
2519 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
 
2523 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
 
2525         struct ocfs2_dlm_seq_priv *priv = m->private;
 
2526         struct ocfs2_lock_res *iter = v;
 
2527         struct ocfs2_lock_res *dummy = &priv->p_iter_res;
 
2529         spin_lock(&ocfs2_dlm_tracking_lock);
 
2530         iter = ocfs2_dlm_next_res(iter, priv);
 
2531         list_del_init(&dummy->l_debug_list);
 
2533                 list_add(&dummy->l_debug_list, &iter->l_debug_list);
 
2534                 priv->p_tmp_res = *iter;
 
2535                 iter = &priv->p_tmp_res;
 
2537         spin_unlock(&ocfs2_dlm_tracking_lock);
 
2542 /* So that debugfs.ocfs2 can determine which format is being used */
 
2543 #define OCFS2_DLM_DEBUG_STR_VERSION 2
 
2544 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
 
2548         struct ocfs2_lock_res *lockres = v;
 
2553         seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
 
2555         if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
 
2556                 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
 
2558                            (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
 
2560                 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
 
2562         seq_printf(m, "%d\t"
 
2573                    lockres->l_unlock_action,
 
2574                    lockres->l_ro_holders,
 
2575                    lockres->l_ex_holders,
 
2576                    lockres->l_requested,
 
2577                    lockres->l_blocking);
 
2579         /* Dump the raw LVB */
 
2580         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
 
2581         for(i = 0; i < DLM_LVB_LEN; i++)
 
2582                 seq_printf(m, "0x%x\t", lvb[i]);
 
2584 #ifdef CONFIG_OCFS2_FS_STATS
 
2585 # define lock_num_prmode(_l)            (_l)->l_lock_num_prmode
 
2586 # define lock_num_exmode(_l)            (_l)->l_lock_num_exmode
 
2587 # define lock_num_prmode_failed(_l)     (_l)->l_lock_num_prmode_failed
 
2588 # define lock_num_exmode_failed(_l)     (_l)->l_lock_num_exmode_failed
 
2589 # define lock_total_prmode(_l)          (_l)->l_lock_total_prmode
 
2590 # define lock_total_exmode(_l)          (_l)->l_lock_total_exmode
 
2591 # define lock_max_prmode(_l)            (_l)->l_lock_max_prmode
 
2592 # define lock_max_exmode(_l)            (_l)->l_lock_max_exmode
 
2593 # define lock_refresh(_l)               (_l)->l_lock_refresh
 
2595 # define lock_num_prmode(_l)            (0ULL)
 
2596 # define lock_num_exmode(_l)            (0ULL)
 
2597 # define lock_num_prmode_failed(_l)     (0)
 
2598 # define lock_num_exmode_failed(_l)     (0)
 
2599 # define lock_total_prmode(_l)          (0ULL)
 
2600 # define lock_total_exmode(_l)          (0ULL)
 
2601 # define lock_max_prmode(_l)            (0)
 
2602 # define lock_max_exmode(_l)            (0)
 
2603 # define lock_refresh(_l)               (0)
 
2605         /* The following seq_print was added in version 2 of this output */
 
2606         seq_printf(m, "%llu\t"
 
2615                    lock_num_prmode(lockres),
 
2616                    lock_num_exmode(lockres),
 
2617                    lock_num_prmode_failed(lockres),
 
2618                    lock_num_exmode_failed(lockres),
 
2619                    lock_total_prmode(lockres),
 
2620                    lock_total_exmode(lockres),
 
2621                    lock_max_prmode(lockres),
 
2622                    lock_max_exmode(lockres),
 
2623                    lock_refresh(lockres));
 
2626         seq_printf(m, "\n");
 
2630 static const struct seq_operations ocfs2_dlm_seq_ops = {
 
2631         .start =        ocfs2_dlm_seq_start,
 
2632         .stop =         ocfs2_dlm_seq_stop,
 
2633         .next =         ocfs2_dlm_seq_next,
 
2634         .show =         ocfs2_dlm_seq_show,
 
2637 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
 
2639         struct seq_file *seq = (struct seq_file *) file->private_data;
 
2640         struct ocfs2_dlm_seq_priv *priv = seq->private;
 
2641         struct ocfs2_lock_res *res = &priv->p_iter_res;
 
2643         ocfs2_remove_lockres_tracking(res);
 
2644         ocfs2_put_dlm_debug(priv->p_dlm_debug);
 
2645         return seq_release_private(inode, file);
 
2648 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
 
2651         struct ocfs2_dlm_seq_priv *priv;
 
2652         struct seq_file *seq;
 
2653         struct ocfs2_super *osb;
 
2655         priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
 
2661         osb = inode->i_private;
 
2662         ocfs2_get_dlm_debug(osb->osb_dlm_debug);
 
2663         priv->p_dlm_debug = osb->osb_dlm_debug;
 
2664         INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
 
2666         ret = seq_open(file, &ocfs2_dlm_seq_ops);
 
2673         seq = (struct seq_file *) file->private_data;
 
2674         seq->private = priv;
 
2676         ocfs2_add_lockres_tracking(&priv->p_iter_res,
 
2683 static const struct file_operations ocfs2_dlm_debug_fops = {
 
2684         .open =         ocfs2_dlm_debug_open,
 
2685         .release =      ocfs2_dlm_debug_release,
 
2687         .llseek =       seq_lseek,
 
2690 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
 
2693         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
 
2695         dlm_debug->d_locking_state = debugfs_create_file("locking_state",
 
2697                                                          osb->osb_debug_root,
 
2699                                                          &ocfs2_dlm_debug_fops);
 
2700         if (!dlm_debug->d_locking_state) {
 
2703                      "Unable to create locking state debugfs file.\n");
 
2707         ocfs2_get_dlm_debug(dlm_debug);
 
2712 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
 
2714         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
 
2717                 debugfs_remove(dlm_debug->d_locking_state);
 
2718                 ocfs2_put_dlm_debug(dlm_debug);
 
2722 int ocfs2_dlm_init(struct ocfs2_super *osb)
 
2725         struct ocfs2_cluster_connection *conn = NULL;
 
2729         if (ocfs2_mount_local(osb)) {
 
2734         status = ocfs2_dlm_init_debug(osb);
 
2740         /* launch downconvert thread */
 
2741         osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
 
2742         if (IS_ERR(osb->dc_task)) {
 
2743                 status = PTR_ERR(osb->dc_task);
 
2744                 osb->dc_task = NULL;
 
2749         /* for now, uuid == domain */
 
2750         status = ocfs2_cluster_connect(osb->osb_cluster_stack,
 
2752                                        strlen(osb->uuid_str),
 
2753                                        ocfs2_do_node_down, osb,
 
2760         status = ocfs2_cluster_this_node(&osb->node_num);
 
2764                      "could not find this host's node number\n");
 
2765                 ocfs2_cluster_disconnect(conn, 0);
 
2770         ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
 
2771         ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
 
2778                 ocfs2_dlm_shutdown_debug(osb);
 
2780                         kthread_stop(osb->dc_task);
 
2787 void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
 
2792         ocfs2_drop_osb_locks(osb);
 
2795          * Now that we have dropped all locks and ocfs2_dismount_volume()
 
2796          * has disabled recovery, the DLM won't be talking to us.  It's
 
2797          * safe to tear things down before disconnecting the cluster.
 
2801                 kthread_stop(osb->dc_task);
 
2802                 osb->dc_task = NULL;
 
2805         ocfs2_lock_res_free(&osb->osb_super_lockres);
 
2806         ocfs2_lock_res_free(&osb->osb_rename_lockres);
 
2808         ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
 
2811         ocfs2_dlm_shutdown_debug(osb);
 
2816 static void ocfs2_unlock_ast(void *opaque, int error)
 
2818         struct ocfs2_lock_res *lockres = opaque;
 
2819         unsigned long flags;
 
2823         mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
 
2824              lockres->l_unlock_action);
 
2826         spin_lock_irqsave(&lockres->l_lock, flags);
 
2828                 mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
 
2829                      "unlock_action %d\n", error, lockres->l_name,
 
2830                      lockres->l_unlock_action);
 
2831                 spin_unlock_irqrestore(&lockres->l_lock, flags);
 
2835         switch(lockres->l_unlock_action) {
 
2836         case OCFS2_UNLOCK_CANCEL_CONVERT:
 
2837                 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
 
2838                 lockres->l_action = OCFS2_AST_INVALID;
 
2840         case OCFS2_UNLOCK_DROP_LOCK:
 
2841                 lockres->l_level = DLM_LOCK_IV;
 
2847         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
 
2848         lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
 
2849         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
2851         wake_up(&lockres->l_event);
 
2856 static int ocfs2_drop_lock(struct ocfs2_super *osb,
 
2857                            struct ocfs2_lock_res *lockres)
 
2860         unsigned long flags;
 
2863         /* We didn't get anywhere near actually using this lockres. */
 
2864         if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
 
2867         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
 
2868                 lkm_flags |= DLM_LKF_VALBLK;
 
2870         spin_lock_irqsave(&lockres->l_lock, flags);
 
2872         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
 
2873                         "lockres %s, flags 0x%lx\n",
 
2874                         lockres->l_name, lockres->l_flags);
 
2876         while (lockres->l_flags & OCFS2_LOCK_BUSY) {
 
2877                 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
 
2878                      "%u, unlock_action = %u\n",
 
2879                      lockres->l_name, lockres->l_flags, lockres->l_action,
 
2880                      lockres->l_unlock_action);
 
2882                 spin_unlock_irqrestore(&lockres->l_lock, flags);
 
2884                 /* XXX: Today we just wait on any busy
 
2885                  * locks... Perhaps we need to cancel converts in the
 
2887                 ocfs2_wait_on_busy_lock(lockres);
 
2889                 spin_lock_irqsave(&lockres->l_lock, flags);
 
2892         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
 
2893                 if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
 
2894                     lockres->l_level == DLM_LOCK_EX &&
 
2895                     !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
 
2896                         lockres->l_ops->set_lvb(lockres);
 
2899         if (lockres->l_flags & OCFS2_LOCK_BUSY)
 
2900                 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
 
2902         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
 
2903                 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
 
2905         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
 
2906                 spin_unlock_irqrestore(&lockres->l_lock, flags);
 
2910         lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
 
2912         /* make sure we never get here while waiting for an ast to
 
2914         BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
 
2916         /* is this necessary? */
 
2917         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
 
2918         lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
 
2919         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
2921         mlog(0, "lock %s\n", lockres->l_name);
 
2923         ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
 
2926                 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
 
2927                 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
 
2928                 ocfs2_dlm_dump_lksb(&lockres->l_lksb);
 
2931         mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n",
 
2934         ocfs2_wait_on_busy_lock(lockres);
 
2940 /* Mark the lockres as being dropped. It will no longer be
 
2941  * queued if blocking, but we still may have to wait on it
 
2942  * being dequeued from the downconvert thread before we can consider
 
2945  * You can *not* attempt to call cluster_lock on this lockres anymore. */
 
2946 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
 
2949         struct ocfs2_mask_waiter mw;
 
2950         unsigned long flags;
 
2952         ocfs2_init_mask_waiter(&mw);
 
2954         spin_lock_irqsave(&lockres->l_lock, flags);
 
2955         lockres->l_flags |= OCFS2_LOCK_FREEING;
 
2956         while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
 
2957                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
 
2958                 spin_unlock_irqrestore(&lockres->l_lock, flags);
 
2960                 mlog(0, "Waiting on lockres %s\n", lockres->l_name);
 
2962                 status = ocfs2_wait_for_mask(&mw);
 
2966                 spin_lock_irqsave(&lockres->l_lock, flags);
 
2968         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
2971 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
 
2972                                struct ocfs2_lock_res *lockres)
 
2976         ocfs2_mark_lockres_freeing(lockres);
 
2977         ret = ocfs2_drop_lock(osb, lockres);
 
2982 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
 
2984         ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
 
2985         ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
 
2988 int ocfs2_drop_inode_locks(struct inode *inode)
 
2994         /* No need to call ocfs2_mark_lockres_freeing here -
 
2995          * ocfs2_clear_inode has done it for us. */
 
2997         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
 
2998                               &OCFS2_I(inode)->ip_open_lockres);
 
3004         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
 
3005                               &OCFS2_I(inode)->ip_inode_lockres);
 
3008         if (err < 0 && !status)
 
3011         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
 
3012                               &OCFS2_I(inode)->ip_rw_lockres);
 
3015         if (err < 0 && !status)
 
3022 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
 
3025         assert_spin_locked(&lockres->l_lock);
 
3027         BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
 
3029         if (lockres->l_level <= new_level) {
 
3030                 mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n",
 
3031                      lockres->l_level, new_level);
 
3035         mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
 
3036              lockres->l_name, new_level, lockres->l_blocking);
 
3038         lockres->l_action = OCFS2_AST_DOWNCONVERT;
 
3039         lockres->l_requested = new_level;
 
3040         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
 
3041         return lockres_set_pending(lockres);
 
3044 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
 
3045                                   struct ocfs2_lock_res *lockres,
 
3048                                   unsigned int generation)
 
3051         u32 dlm_flags = DLM_LKF_CONVERT;
 
3056                 dlm_flags |= DLM_LKF_VALBLK;
 
3058         ret = ocfs2_dlm_lock(osb->cconn,
 
3063                              OCFS2_LOCK_ID_MAX_LEN - 1,
 
3065         lockres_clear_pending(lockres, generation, osb);
 
3067                 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
 
3068                 ocfs2_recover_from_dlm_error(lockres, 1);
 
3078 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
 
3079 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
 
3080                                         struct ocfs2_lock_res *lockres)
 
3082         assert_spin_locked(&lockres->l_lock);
 
3085         mlog(0, "lock %s\n", lockres->l_name);
 
3087         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
 
3088                 /* If we're already trying to cancel a lock conversion
 
3089                  * then just drop the spinlock and allow the caller to
 
3090                  * requeue this lock. */
 
3092                 mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
 
3096         /* were we in a convert when we got the bast fire? */
 
3097         BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
 
3098                lockres->l_action != OCFS2_AST_DOWNCONVERT);
 
3099         /* set things up for the unlockast to know to just
 
3100          * clear out the ast_action and unset busy, etc. */
 
3101         lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
 
3103         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
 
3104                         "lock %s, invalid flags: 0x%lx\n",
 
3105                         lockres->l_name, lockres->l_flags);
 
3110 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
 
3111                                 struct ocfs2_lock_res *lockres)
 
3116         mlog(0, "lock %s\n", lockres->l_name);
 
3118         ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
 
3119                                DLM_LKF_CANCEL, lockres);
 
3121                 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
 
3122                 ocfs2_recover_from_dlm_error(lockres, 0);
 
3125         mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name);
 
3131 static int ocfs2_unblock_lock(struct ocfs2_super *osb,
 
3132                               struct ocfs2_lock_res *lockres,
 
3133                               struct ocfs2_unblock_ctl *ctl)
 
3135         unsigned long flags;
 
3144         spin_lock_irqsave(&lockres->l_lock, flags);
 
3146         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
 
3149         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
 
3151                  * This is a *big* race.  The OCFS2_LOCK_PENDING flag
 
3152                  * exists entirely for one reason - another thread has set
 
3153                  * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
 
3155                  * If we do ocfs2_cancel_convert() before the other thread
 
3156                  * calls dlm_lock(), our cancel will do nothing.  We will
 
3157                  * get no ast, and we will have no way of knowing the
 
3158                  * cancel failed.  Meanwhile, the other thread will call
 
3159                  * into dlm_lock() and wait...forever.
 
3161                  * Why forever?  Because another node has asked for the
 
3162                  * lock first; that's why we're here in unblock_lock().
 
3164                  * The solution is OCFS2_LOCK_PENDING.  When PENDING is
 
3165                  * set, we just requeue the unblock.  Only when the other
 
3166                  * thread has called dlm_lock() and cleared PENDING will
 
3167                  * we then cancel their request.
 
3169                  * All callers of dlm_lock() must set OCFS2_DLM_PENDING
 
3170                  * at the same time they set OCFS2_DLM_BUSY.  They must
 
3171                  * clear OCFS2_DLM_PENDING after dlm_lock() returns.
 
3173                 if (lockres->l_flags & OCFS2_LOCK_PENDING)
 
3177                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
 
3178                 spin_unlock_irqrestore(&lockres->l_lock, flags);
 
3180                         ret = ocfs2_cancel_convert(osb, lockres);
 
3187         /* if we're blocking an exclusive and we have *any* holders,
 
3189         if ((lockres->l_blocking == DLM_LOCK_EX)
 
3190             && (lockres->l_ex_holders || lockres->l_ro_holders))
 
3193         /* If it's a PR we're blocking, then only
 
3194          * requeue if we've got any EX holders */
 
3195         if (lockres->l_blocking == DLM_LOCK_PR &&
 
3196             lockres->l_ex_holders)
 
3200          * Can we get a lock in this state if the holder counts are
 
3201          * zero? The meta data unblock code used to check this.
 
3203         if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
 
3204             && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
 
3207         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
 
3209         if (lockres->l_ops->check_downconvert
 
3210             && !lockres->l_ops->check_downconvert(lockres, new_level))
 
3213         /* If we get here, then we know that there are no more
 
3214          * incompatible holders (and anyone asking for an incompatible
 
3215          * lock is blocked). We can now downconvert the lock */
 
3216         if (!lockres->l_ops->downconvert_worker)
 
3219         /* Some lockres types want to do a bit of work before
 
3220          * downconverting a lock. Allow that here. The worker function
 
3221          * may sleep, so we save off a copy of what we're blocking as
 
3222          * it may change while we're not holding the spin lock. */
 
3223         blocking = lockres->l_blocking;
 
3224         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
3226         ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
 
3228         if (ctl->unblock_action == UNBLOCK_STOP_POST)
 
3231         spin_lock_irqsave(&lockres->l_lock, flags);
 
3232         if (blocking != lockres->l_blocking) {
 
3233                 /* If this changed underneath us, then we can't drop
 
3241         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
 
3242                 if (lockres->l_level == DLM_LOCK_EX)
 
3246                  * We only set the lvb if the lock has been fully
 
3247                  * refreshed - otherwise we risk setting stale
 
3248                  * data. Otherwise, there's no need to actually clear
 
3249                  * out the lvb here as it's value is still valid.
 
3251                 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
 
3252                         lockres->l_ops->set_lvb(lockres);
 
3255         gen = ocfs2_prepare_downconvert(lockres, new_level);
 
3256         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
3257         ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
 
3265         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
3272 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
 
3275         struct inode *inode;
 
3276         struct address_space *mapping;
 
3278         inode = ocfs2_lock_res_inode(lockres);
 
3279         mapping = inode->i_mapping;
 
3281         if (!S_ISREG(inode->i_mode))
 
3285          * We need this before the filemap_fdatawrite() so that it can
 
3286          * transfer the dirty bit from the PTE to the
 
3287          * page. Unfortunately this means that even for EX->PR
 
3288          * downconverts, we'll lose our mappings and have to build
 
3291         unmap_mapping_range(mapping, 0, 0, 0);
 
3293         if (filemap_fdatawrite(mapping)) {
 
3294                 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
 
3295                      (unsigned long long)OCFS2_I(inode)->ip_blkno);
 
3297         sync_mapping_buffers(mapping);
 
3298         if (blocking == DLM_LOCK_EX) {
 
3299                 truncate_inode_pages(mapping, 0);
 
3301                 /* We only need to wait on the I/O if we're not also
 
3302                  * truncating pages because truncate_inode_pages waits
 
3303                  * for us above. We don't truncate pages if we're
 
3304                  * blocking anything < EXMODE because we want to keep
 
3305                  * them around in that case. */
 
3306                 filemap_fdatawait(mapping);
 
3310         return UNBLOCK_CONTINUE;
 
3313 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
 
3316         struct inode *inode = ocfs2_lock_res_inode(lockres);
 
3317         int checkpointed = ocfs2_inode_fully_checkpointed(inode);
 
3319         BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
 
3320         BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
 
3325         ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
 
3329 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
 
3331         struct inode *inode = ocfs2_lock_res_inode(lockres);
 
3333         __ocfs2_stuff_meta_lvb(inode);
 
3337  * Does the final reference drop on our dentry lock. Right now this
 
3338  * happens in the downconvert thread, but we could choose to simplify the
 
3339  * dlmglue API and push these off to the ocfs2_wq in the future.
 
3341 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
 
3342                                      struct ocfs2_lock_res *lockres)
 
3344         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
 
3345         ocfs2_dentry_lock_put(osb, dl);
 
3349  * d_delete() matching dentries before the lock downconvert.
 
3351  * At this point, any process waiting to destroy the
 
3352  * dentry_lock due to last ref count is stopped by the
 
3353  * OCFS2_LOCK_QUEUED flag.
 
3355  * We have two potential problems
 
3357  * 1) If we do the last reference drop on our dentry_lock (via dput)
 
3358  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
 
3359  *    the downconvert to finish. Instead we take an elevated
 
3360  *    reference and push the drop until after we've completed our
 
3361  *    unblock processing.
 
3363  * 2) There might be another process with a final reference,
 
3364  *    waiting on us to finish processing. If this is the case, we
 
3365  *    detect it and exit out - there's no more dentries anyway.
 
3367 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
 
3370         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
 
3371         struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
 
3372         struct dentry *dentry;
 
3373         unsigned long flags;
 
3377          * This node is blocking another node from getting a read
 
3378          * lock. This happens when we've renamed within a
 
3379          * directory. We've forced the other nodes to d_delete(), but
 
3380          * we never actually dropped our lock because it's still
 
3381          * valid. The downconvert code will retain a PR for this node,
 
3382          * so there's no further work to do.
 
3384         if (blocking == DLM_LOCK_PR)
 
3385                 return UNBLOCK_CONTINUE;
 
3388          * Mark this inode as potentially orphaned. The code in
 
3389          * ocfs2_delete_inode() will figure out whether it actually
 
3390          * needs to be freed or not.
 
3392         spin_lock(&oi->ip_lock);
 
3393         oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
 
3394         spin_unlock(&oi->ip_lock);
 
3397          * Yuck. We need to make sure however that the check of
 
3398          * OCFS2_LOCK_FREEING and the extra reference are atomic with
 
3399          * respect to a reference decrement or the setting of that
 
3402         spin_lock_irqsave(&lockres->l_lock, flags);
 
3403         spin_lock(&dentry_attach_lock);
 
3404         if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
 
3409         spin_unlock(&dentry_attach_lock);
 
3410         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
3412         mlog(0, "extra_ref = %d\n", extra_ref);
 
3415          * We have a process waiting on us in ocfs2_dentry_iput(),
 
3416          * which means we can't have any more outstanding
 
3417          * aliases. There's no need to do any more work.
 
3420                 return UNBLOCK_CONTINUE;
 
3422         spin_lock(&dentry_attach_lock);
 
3424                 dentry = ocfs2_find_local_alias(dl->dl_inode,
 
3425                                                 dl->dl_parent_blkno, 1);
 
3428                 spin_unlock(&dentry_attach_lock);
 
3430                 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
 
3431                      dentry->d_name.name);
 
3434                  * The following dcache calls may do an
 
3435                  * iput(). Normally we don't want that from the
 
3436                  * downconverting thread, but in this case it's ok
 
3437                  * because the requesting node already has an
 
3438                  * exclusive lock on the inode, so it can't be queued
 
3439                  * for a downconvert.
 
3444                 spin_lock(&dentry_attach_lock);
 
3446         spin_unlock(&dentry_attach_lock);
 
3449          * If we are the last holder of this dentry lock, there is no
 
3450          * reason to downconvert so skip straight to the unlock.
 
3452         if (dl->dl_count == 1)
 
3453                 return UNBLOCK_STOP_POST;
 
3455         return UNBLOCK_CONTINUE_POST;
 
3459  * This is the filesystem locking protocol.  It provides the lock handling
 
3460  * hooks for the underlying DLM.  It has a maximum version number.
 
3461  * The version number allows interoperability with systems running at
 
3462  * the same major number and an equal or smaller minor number.
 
3464  * Whenever the filesystem does new things with locks (adds or removes a
 
3465  * lock, orders them differently, does different things underneath a lock),
 
3466  * the version must be changed.  The protocol is negotiated when joining
 
3467  * the dlm domain.  A node may join the domain if its major version is
 
3468  * identical to all other nodes and its minor version is greater than
 
3469  * or equal to all other nodes.  When its minor version is greater than
 
3470  * the other nodes, it will run at the minor version specified by the
 
3473  * If a locking change is made that will not be compatible with older
 
3474  * versions, the major number must be increased and the minor version set
 
3475  * to zero.  If a change merely adds a behavior that can be disabled when
 
3476  * speaking to older versions, the minor version must be increased.  If a
 
3477  * change adds a fully backwards compatible change (eg, LVB changes that
 
3478  * are just ignored by older versions), the version does not need to be
 
3481 static struct ocfs2_locking_protocol lproto = {
 
3483                 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
 
3484                 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
 
3486         .lp_lock_ast            = ocfs2_locking_ast,
 
3487         .lp_blocking_ast        = ocfs2_blocking_ast,
 
3488         .lp_unlock_ast          = ocfs2_unlock_ast,
 
3491 void ocfs2_set_locking_protocol(void)
 
3493         ocfs2_stack_glue_set_locking_protocol(&lproto);
 
3497 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
 
3498                                        struct ocfs2_lock_res *lockres)
 
3501         struct ocfs2_unblock_ctl ctl = {0, 0,};
 
3502         unsigned long flags;
 
3504         /* Our reference to the lockres in this function can be
 
3505          * considered valid until we remove the OCFS2_LOCK_QUEUED
 
3511         BUG_ON(!lockres->l_ops);
 
3513         mlog(0, "lockres %s blocked.\n", lockres->l_name);
 
3515         /* Detect whether a lock has been marked as going away while
 
3516          * the downconvert thread was processing other things. A lock can
 
3517          * still be marked with OCFS2_LOCK_FREEING after this check,
 
3518          * but short circuiting here will still save us some
 
3520         spin_lock_irqsave(&lockres->l_lock, flags);
 
3521         if (lockres->l_flags & OCFS2_LOCK_FREEING)
 
3523         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
3525         status = ocfs2_unblock_lock(osb, lockres, &ctl);
 
3529         spin_lock_irqsave(&lockres->l_lock, flags);
 
3531         if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
 
3532                 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
 
3534                 ocfs2_schedule_blocked_lock(osb, lockres);
 
3536         mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
 
3537              ctl.requeue ? "yes" : "no");
 
3538         spin_unlock_irqrestore(&lockres->l_lock, flags);
 
3540         if (ctl.unblock_action != UNBLOCK_CONTINUE
 
3541             && lockres->l_ops->post_unlock)
 
3542                 lockres->l_ops->post_unlock(osb, lockres);
 
3547 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
 
3548                                         struct ocfs2_lock_res *lockres)
 
3552         assert_spin_locked(&lockres->l_lock);
 
3554         if (lockres->l_flags & OCFS2_LOCK_FREEING) {
 
3555                 /* Do not schedule a lock for downconvert when it's on
 
3556                  * the way to destruction - any nodes wanting access
 
3557                  * to the resource will get it soon. */
 
3558                 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
 
3559                      lockres->l_name, lockres->l_flags);
 
3563         lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
 
3565         spin_lock(&osb->dc_task_lock);
 
3566         if (list_empty(&lockres->l_blocked_list)) {
 
3567                 list_add_tail(&lockres->l_blocked_list,
 
3568                               &osb->blocked_lock_list);
 
3569                 osb->blocked_lock_count++;
 
3571         spin_unlock(&osb->dc_task_lock);
 
3576 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
 
3578         unsigned long processed;
 
3579         struct ocfs2_lock_res *lockres;
 
3583         spin_lock(&osb->dc_task_lock);
 
3584         /* grab this early so we know to try again if a state change and
 
3585          * wake happens part-way through our work  */
 
3586         osb->dc_work_sequence = osb->dc_wake_sequence;
 
3588         processed = osb->blocked_lock_count;
 
3590                 BUG_ON(list_empty(&osb->blocked_lock_list));
 
3592                 lockres = list_entry(osb->blocked_lock_list.next,
 
3593                                      struct ocfs2_lock_res, l_blocked_list);
 
3594                 list_del_init(&lockres->l_blocked_list);
 
3595                 osb->blocked_lock_count--;
 
3596                 spin_unlock(&osb->dc_task_lock);
 
3601                 ocfs2_process_blocked_lock(osb, lockres);
 
3603                 spin_lock(&osb->dc_task_lock);
 
3605         spin_unlock(&osb->dc_task_lock);
 
3610 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
 
3614         spin_lock(&osb->dc_task_lock);
 
3615         if (list_empty(&osb->blocked_lock_list))
 
3618         spin_unlock(&osb->dc_task_lock);
 
3622 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
 
3624         int should_wake = 0;
 
3626         spin_lock(&osb->dc_task_lock);
 
3627         if (osb->dc_work_sequence != osb->dc_wake_sequence)
 
3629         spin_unlock(&osb->dc_task_lock);
 
3634 static int ocfs2_downconvert_thread(void *arg)
 
3637         struct ocfs2_super *osb = arg;
 
3639         /* only quit once we've been asked to stop and there is no more
 
3641         while (!(kthread_should_stop() &&
 
3642                 ocfs2_downconvert_thread_lists_empty(osb))) {
 
3644                 wait_event_interruptible(osb->dc_event,
 
3645                                          ocfs2_downconvert_thread_should_wake(osb) ||
 
3646                                          kthread_should_stop());
 
3648                 mlog(0, "downconvert_thread: awoken\n");
 
3650                 ocfs2_downconvert_thread_do_work(osb);
 
3653         osb->dc_task = NULL;
 
3657 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
 
3659         spin_lock(&osb->dc_task_lock);
 
3660         /* make sure the voting thread gets a swipe at whatever changes
 
3661          * the caller may have made to the voting state */
 
3662         osb->dc_wake_sequence++;
 
3663         spin_unlock(&osb->dc_task_lock);
 
3664         wake_up(&osb->dc_event);