1 /* -*- mode: c; c-basic-offset: 8; -*-
 
   2  * vim: noexpandtab sw=8 ts=8 sts=0:
 
   6  * Code which interfaces ocfs2 with fs/dlm and a userspace stack.
 
   8  * Copyright (C) 2007 Oracle.  All rights reserved.
 
  10  * This program is free software; you can redistribute it and/or
 
  11  * modify it under the terms of the GNU General Public
 
  12  * License as published by the Free Software Foundation, version 2.
 
  14  * This program is distributed in the hope that it will be useful,
 
  15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
  17  * General Public License for more details.
 
  20 #include <linux/module.h>
 
  22 #include <linux/miscdevice.h>
 
  23 #include <linux/mutex.h>
 
  24 #include <linux/smp_lock.h>
 
  25 #include <linux/reboot.h>
 
  26 #include <asm/uaccess.h>
 
  28 #include "ocfs2.h"  /* For struct ocfs2_lock_res */
 
  29 #include "stackglue.h"
 
  31 #include <linux/dlm_plock.h>
 
  34  * The control protocol starts with a handshake.  Until the handshake
 
  35  * is complete, the control device will fail all write(2)s.
 
  37  * The handshake is simple.  First, the client reads until EOF.  Each line
 
  38  * of output is a supported protocol tag.  All protocol tags are a single
 
  39  * character followed by a two hex digit version number.  Currently the
 
  40  * only things supported is T01, for "Text-base version 0x01".  Next, the
 
  41  * client writes the version they would like to use, including the newline.
 
  42  * Thus, the protocol tag is 'T01\n'.  If the version tag written is
 
  43  * unknown, -EINVAL is returned.  Once the negotiation is complete, the
 
  44  * client can start sending messages.
 
  46  * The T01 protocol has three messages.  First is the "SETN" message.
 
  47  * It has the following syntax:
 
  49  *  SETN<space><8-char-hex-nodenum><newline>
 
  51  * This is 14 characters.
 
  53  * The "SETN" message must be the first message following the protocol.
 
  54  * It tells ocfs2_control the local node number.
 
  56  * Next comes the "SETV" message.  It has the following syntax:
 
  58  *  SETV<space><2-char-hex-major><space><2-char-hex-minor><newline>
 
  60  * This is 11 characters.
 
  62  * The "SETV" message sets the filesystem locking protocol version as
 
  63  * negotiated by the client.  The client negotiates based on the maximum
 
  64  * version advertised in /sys/fs/ocfs2/max_locking_protocol.  The major
 
  65  * number from the "SETV" message must match
 
  66  * ocfs2_user_plugin.sp_proto->lp_max_version.pv_major, and the minor number
 
  67  * must be less than or equal to ...->lp_max_version.pv_minor.
 
  69  * Once this information has been set, mounts will be allowed.  From this
 
  70  * point on, the "DOWN" message can be sent for node down notification.
 
  71  * It has the following syntax:
 
  73  *  DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline>
 
  77  *  DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n
 
  79  * This is 47 characters.
 
  83  * Whether or not the client has done the handshake.
 
  84  * For now, we have just one protocol version.
 
  86 #define OCFS2_CONTROL_PROTO                     "T01\n"
 
  87 #define OCFS2_CONTROL_PROTO_LEN                 4
 
  89 /* Handshake states */
 
  90 #define OCFS2_CONTROL_HANDSHAKE_INVALID         (0)
 
  91 #define OCFS2_CONTROL_HANDSHAKE_READ            (1)
 
  92 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL        (2)
 
  93 #define OCFS2_CONTROL_HANDSHAKE_VALID           (3)
 
  96 #define OCFS2_CONTROL_MESSAGE_OP_LEN            4
 
  97 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP        "SETN"
 
  98 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN 14
 
  99 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP     "SETV"
 
 100 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN      11
 
 101 #define OCFS2_CONTROL_MESSAGE_DOWN_OP           "DOWN"
 
 102 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN    47
 
 103 #define OCFS2_TEXT_UUID_LEN                     32
 
 104 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN        2
 
 105 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN       8
 
 108  * ocfs2_live_connection is refcounted because the filesystem and
 
 109  * miscdevice sides can detach in different order.  Let's just be safe.
 
 111 struct ocfs2_live_connection {
 
 112         struct list_head                oc_list;
 
 113         struct ocfs2_cluster_connection *oc_conn;
 
 116 struct ocfs2_control_private {
 
 117         struct list_head op_list;
 
 120         struct ocfs2_protocol_version op_proto;
 
 123 /* SETN<space><8-char-hex-nodenum><newline> */
 
 124 struct ocfs2_control_message_setn {
 
 125         char    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
 
 127         char    nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
 
 131 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */
 
 132 struct ocfs2_control_message_setv {
 
 133         char    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
 
 135         char    major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
 
 137         char    minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
 
 141 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
 
 142 struct ocfs2_control_message_down {
 
 143         char    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
 
 145         char    uuid[OCFS2_TEXT_UUID_LEN];
 
 147         char    nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
 
 151 union ocfs2_control_message {
 
 152         char                                    tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
 
 153         struct ocfs2_control_message_setn       u_setn;
 
 154         struct ocfs2_control_message_setv       u_setv;
 
 155         struct ocfs2_control_message_down       u_down;
 
 158 static struct ocfs2_stack_plugin ocfs2_user_plugin;
 
 160 static atomic_t ocfs2_control_opened;
 
 161 static int ocfs2_control_this_node = -1;
 
 162 static struct ocfs2_protocol_version running_proto;
 
 164 static LIST_HEAD(ocfs2_live_connection_list);
 
 165 static LIST_HEAD(ocfs2_control_private_list);
 
 166 static DEFINE_MUTEX(ocfs2_control_lock);
 
 168 static inline void ocfs2_control_set_handshake_state(struct file *file,
 
 171         struct ocfs2_control_private *p = file->private_data;
 
 175 static inline int ocfs2_control_get_handshake_state(struct file *file)
 
 177         struct ocfs2_control_private *p = file->private_data;
 
 181 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
 
 183         size_t len = strlen(name);
 
 184         struct ocfs2_live_connection *c;
 
 186         BUG_ON(!mutex_is_locked(&ocfs2_control_lock));
 
 188         list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) {
 
 189                 if ((c->oc_conn->cc_namelen == len) &&
 
 190                     !strncmp(c->oc_conn->cc_name, name, len))
 
 198  * ocfs2_live_connection structures are created underneath the ocfs2
 
 199  * mount path.  Since the VFS prevents multiple calls to
 
 200  * fill_super(), we can't get dupes here.
 
 202 static int ocfs2_live_connection_new(struct ocfs2_cluster_connection *conn,
 
 203                                      struct ocfs2_live_connection **c_ret)
 
 206         struct ocfs2_live_connection *c;
 
 208         c = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
 
 212         mutex_lock(&ocfs2_control_lock);
 
 215         if (atomic_read(&ocfs2_control_opened))
 
 216                 list_add(&c->oc_list, &ocfs2_live_connection_list);
 
 219                        "ocfs2: Userspace control daemon is not present\n");
 
 223         mutex_unlock(&ocfs2_control_lock);
 
 234  * This function disconnects the cluster connection from ocfs2_control.
 
 235  * Afterwards, userspace can't affect the cluster connection.
 
 237 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
 
 239         mutex_lock(&ocfs2_control_lock);
 
 240         list_del_init(&c->oc_list);
 
 242         mutex_unlock(&ocfs2_control_lock);
 
 247 static int ocfs2_control_cfu(void *target, size_t target_len,
 
 248                              const char __user *buf, size_t count)
 
 250         /* The T01 expects write(2) calls to have exactly one command */
 
 251         if ((count != target_len) ||
 
 252             (count > sizeof(union ocfs2_control_message)))
 
 255         if (copy_from_user(target, buf, target_len))
 
 261 static ssize_t ocfs2_control_validate_protocol(struct file *file,
 
 262                                                const char __user *buf,
 
 266         char kbuf[OCFS2_CONTROL_PROTO_LEN];
 
 268         ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN,
 
 273         if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN))
 
 276         ocfs2_control_set_handshake_state(file,
 
 277                                           OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
 
 282 static void ocfs2_control_send_down(const char *uuid,
 
 285         struct ocfs2_live_connection *c;
 
 287         mutex_lock(&ocfs2_control_lock);
 
 289         c = ocfs2_connection_find(uuid);
 
 291                 BUG_ON(c->oc_conn == NULL);
 
 292                 c->oc_conn->cc_recovery_handler(nodenum,
 
 293                                                 c->oc_conn->cc_recovery_data);
 
 296         mutex_unlock(&ocfs2_control_lock);
 
 300  * Called whenever configuration elements are sent to /dev/ocfs2_control.
 
 301  * If all configuration elements are present, try to set the global
 
 302  * values.  If there is a problem, return an error.  Skip any missing
 
 303  * elements, and only bump ocfs2_control_opened when we have all elements
 
 304  * and are successful.
 
 306 static int ocfs2_control_install_private(struct file *file)
 
 310         struct ocfs2_control_private *p = file->private_data;
 
 312         BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
 
 314         mutex_lock(&ocfs2_control_lock);
 
 316         if (p->op_this_node < 0) {
 
 318         } else if ((ocfs2_control_this_node >= 0) &&
 
 319                    (ocfs2_control_this_node != p->op_this_node)) {
 
 324         if (!p->op_proto.pv_major) {
 
 326         } else if (!list_empty(&ocfs2_live_connection_list) &&
 
 327                    ((running_proto.pv_major != p->op_proto.pv_major) ||
 
 328                     (running_proto.pv_minor != p->op_proto.pv_minor))) {
 
 334                 ocfs2_control_this_node = p->op_this_node;
 
 335                 running_proto.pv_major = p->op_proto.pv_major;
 
 336                 running_proto.pv_minor = p->op_proto.pv_minor;
 
 340         mutex_unlock(&ocfs2_control_lock);
 
 343                 /* We set the global values successfully */
 
 344                 atomic_inc(&ocfs2_control_opened);
 
 345                 ocfs2_control_set_handshake_state(file,
 
 346                                         OCFS2_CONTROL_HANDSHAKE_VALID);
 
 352 static int ocfs2_control_get_this_node(void)
 
 356         mutex_lock(&ocfs2_control_lock);
 
 357         if (ocfs2_control_this_node < 0)
 
 360                 rc = ocfs2_control_this_node;
 
 361         mutex_unlock(&ocfs2_control_lock);
 
 366 static int ocfs2_control_do_setnode_msg(struct file *file,
 
 367                                         struct ocfs2_control_message_setn *msg)
 
 371         struct ocfs2_control_private *p = file->private_data;
 
 373         if (ocfs2_control_get_handshake_state(file) !=
 
 374             OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
 
 377         if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
 
 378                     OCFS2_CONTROL_MESSAGE_OP_LEN))
 
 381         if ((msg->space != ' ') || (msg->newline != '\n'))
 
 383         msg->space = msg->newline = '\0';
 
 385         nodenum = simple_strtol(msg->nodestr, &ptr, 16);
 
 389         if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
 
 390             (nodenum > INT_MAX) || (nodenum < 0))
 
 392         p->op_this_node = nodenum;
 
 394         return ocfs2_control_install_private(file);
 
 397 static int ocfs2_control_do_setversion_msg(struct file *file,
 
 398                                            struct ocfs2_control_message_setv *msg)
 
 402         struct ocfs2_control_private *p = file->private_data;
 
 403         struct ocfs2_protocol_version *max =
 
 404                 &ocfs2_user_plugin.sp_proto->lp_max_version;
 
 406         if (ocfs2_control_get_handshake_state(file) !=
 
 407             OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
 
 410         if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
 
 411                     OCFS2_CONTROL_MESSAGE_OP_LEN))
 
 414         if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
 
 415             (msg->newline != '\n'))
 
 417         msg->space1 = msg->space2 = msg->newline = '\0';
 
 419         major = simple_strtol(msg->major, &ptr, 16);
 
 422         minor = simple_strtol(msg->minor, &ptr, 16);
 
 427          * The major must be between 1 and 255, inclusive.  The minor
 
 428          * must be between 0 and 255, inclusive.  The version passed in
 
 429          * must be within the maximum version supported by the filesystem.
 
 431         if ((major == LONG_MIN) || (major == LONG_MAX) ||
 
 432             (major > (u8)-1) || (major < 1))
 
 434         if ((minor == LONG_MIN) || (minor == LONG_MAX) ||
 
 435             (minor > (u8)-1) || (minor < 0))
 
 437         if ((major != max->pv_major) ||
 
 438             (minor > max->pv_minor))
 
 441         p->op_proto.pv_major = major;
 
 442         p->op_proto.pv_minor = minor;
 
 444         return ocfs2_control_install_private(file);
 
 447 static int ocfs2_control_do_down_msg(struct file *file,
 
 448                                      struct ocfs2_control_message_down *msg)
 
 453         if (ocfs2_control_get_handshake_state(file) !=
 
 454             OCFS2_CONTROL_HANDSHAKE_VALID)
 
 457         if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
 
 458                     OCFS2_CONTROL_MESSAGE_OP_LEN))
 
 461         if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
 
 462             (msg->newline != '\n'))
 
 464         msg->space1 = msg->space2 = msg->newline = '\0';
 
 466         nodenum = simple_strtol(msg->nodestr, &p, 16);
 
 470         if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
 
 471             (nodenum > INT_MAX) || (nodenum < 0))
 
 474         ocfs2_control_send_down(msg->uuid, nodenum);
 
 479 static ssize_t ocfs2_control_message(struct file *file,
 
 480                                      const char __user *buf,
 
 484         union ocfs2_control_message msg;
 
 486         /* Try to catch padding issues */
 
 487         WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) !=
 
 488                 (sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1)));
 
 490         memset(&msg, 0, sizeof(union ocfs2_control_message));
 
 491         ret = ocfs2_control_cfu(&msg, count, buf, count);
 
 495         if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) &&
 
 496             !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
 
 497                      OCFS2_CONTROL_MESSAGE_OP_LEN))
 
 498                 ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn);
 
 499         else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) &&
 
 500                  !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
 
 501                           OCFS2_CONTROL_MESSAGE_OP_LEN))
 
 502                 ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv);
 
 503         else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) &&
 
 504                  !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
 
 505                           OCFS2_CONTROL_MESSAGE_OP_LEN))
 
 506                 ret = ocfs2_control_do_down_msg(file, &msg.u_down);
 
 511         return ret ? ret : count;
 
 514 static ssize_t ocfs2_control_write(struct file *file,
 
 515                                    const char __user *buf,
 
 521         switch (ocfs2_control_get_handshake_state(file)) {
 
 522                 case OCFS2_CONTROL_HANDSHAKE_INVALID:
 
 526                 case OCFS2_CONTROL_HANDSHAKE_READ:
 
 527                         ret = ocfs2_control_validate_protocol(file, buf,
 
 531                 case OCFS2_CONTROL_HANDSHAKE_PROTOCOL:
 
 532                 case OCFS2_CONTROL_HANDSHAKE_VALID:
 
 533                         ret = ocfs2_control_message(file, buf, count);
 
 546  * This is a naive version.  If we ever have a new protocol, we'll expand
 
 547  * it.  Probably using seq_file.
 
 549 static ssize_t ocfs2_control_read(struct file *file,
 
 556         ret = simple_read_from_buffer(buf, count, ppos,
 
 557                         OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN);
 
 559         /* Have we read the whole protocol list? */
 
 560         if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN)
 
 561                 ocfs2_control_set_handshake_state(file,
 
 562                                                   OCFS2_CONTROL_HANDSHAKE_READ);
 
 567 static int ocfs2_control_release(struct inode *inode, struct file *file)
 
 569         struct ocfs2_control_private *p = file->private_data;
 
 571         mutex_lock(&ocfs2_control_lock);
 
 573         if (ocfs2_control_get_handshake_state(file) !=
 
 574             OCFS2_CONTROL_HANDSHAKE_VALID)
 
 577         if (atomic_dec_and_test(&ocfs2_control_opened)) {
 
 578                 if (!list_empty(&ocfs2_live_connection_list)) {
 
 579                         /* XXX: Do bad things! */
 
 581                                "ocfs2: Unexpected release of ocfs2_control!\n"
 
 582                                "       Loss of cluster connection requires "
 
 583                                "an emergency restart!\n");
 
 587                  * Last valid close clears the node number and resets
 
 588                  * the locking protocol version
 
 590                 ocfs2_control_this_node = -1;
 
 591                 running_proto.pv_major = 0;
 
 592                 running_proto.pv_major = 0;
 
 596         list_del_init(&p->op_list);
 
 597         file->private_data = NULL;
 
 599         mutex_unlock(&ocfs2_control_lock);
 
 606 static int ocfs2_control_open(struct inode *inode, struct file *file)
 
 608         struct ocfs2_control_private *p;
 
 610         p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL);
 
 613         p->op_this_node = -1;
 
 616         mutex_lock(&ocfs2_control_lock);
 
 617         file->private_data = p;
 
 618         list_add(&p->op_list, &ocfs2_control_private_list);
 
 619         mutex_unlock(&ocfs2_control_lock);
 
 625 static const struct file_operations ocfs2_control_fops = {
 
 626         .open    = ocfs2_control_open,
 
 627         .release = ocfs2_control_release,
 
 628         .read    = ocfs2_control_read,
 
 629         .write   = ocfs2_control_write,
 
 630         .owner   = THIS_MODULE,
 
 633 static struct miscdevice ocfs2_control_device = {
 
 634         .minor          = MISC_DYNAMIC_MINOR,
 
 635         .name           = "ocfs2_control",
 
 636         .fops           = &ocfs2_control_fops,
 
 639 static int ocfs2_control_init(void)
 
 643         atomic_set(&ocfs2_control_opened, 0);
 
 645         rc = misc_register(&ocfs2_control_device);
 
 648                        "ocfs2: Unable to register ocfs2_control device "
 
 655 static void ocfs2_control_exit(void)
 
 659         rc = misc_deregister(&ocfs2_control_device);
 
 662                        "ocfs2: Unable to deregister ocfs2_control device "
 
 667 static struct dlm_lksb *fsdlm_astarg_to_lksb(void *astarg)
 
 669         struct ocfs2_lock_res *res = astarg;
 
 670         return &res->l_lksb.lksb_fsdlm;
 
 673 static void fsdlm_lock_ast_wrapper(void *astarg)
 
 675         struct dlm_lksb *lksb = fsdlm_astarg_to_lksb(astarg);
 
 676         int status = lksb->sb_status;
 
 678         BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
 
 681          * For now we're punting on the issue of other non-standard errors
 
 682          * where we can't tell if the unlock_ast or lock_ast should be called.
 
 683          * The main "other error" that's possible is EINVAL which means the
 
 684          * function was called with invalid args, which shouldn't be possible
 
 685          * since the caller here is under our control.  Other non-standard
 
 686          * errors probably fall into the same category, or otherwise are fatal
 
 687          * which means we can't carry on anyway.
 
 690         if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
 
 691                 ocfs2_user_plugin.sp_proto->lp_unlock_ast(astarg, 0);
 
 693                 ocfs2_user_plugin.sp_proto->lp_lock_ast(astarg);
 
 696 static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
 
 698         BUG_ON(ocfs2_user_plugin.sp_proto == NULL);
 
 700         ocfs2_user_plugin.sp_proto->lp_blocking_ast(astarg, level);
 
 703 static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
 
 705                          union ocfs2_dlm_lksb *lksb,
 
 708                          unsigned int namelen,
 
 713         if (!lksb->lksb_fsdlm.sb_lvbptr)
 
 714                 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
 
 715                                              sizeof(struct dlm_lksb);
 
 717         ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
 
 718                        flags|DLM_LKF_NODLCKWT, name, namelen, 0,
 
 719                        fsdlm_lock_ast_wrapper, astarg,
 
 720                        fsdlm_blocking_ast_wrapper);
 
 724 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
 
 725                            union ocfs2_dlm_lksb *lksb,
 
 731         ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
 
 732                          flags, &lksb->lksb_fsdlm, astarg);
 
 736 static int user_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
 
 738         return lksb->lksb_fsdlm.sb_status;
 
 741 static void *user_dlm_lvb(union ocfs2_dlm_lksb *lksb)
 
 743         if (!lksb->lksb_fsdlm.sb_lvbptr)
 
 744                 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
 
 745                                              sizeof(struct dlm_lksb);
 
 746         return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
 
 749 static void user_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
 
 753 static int user_plock(struct ocfs2_cluster_connection *conn,
 
 757                       struct file_lock *fl)
 
 760          * This more or less just demuxes the plock request into any
 
 761          * one of three dlm calls.
 
 763          * Internally, fs/dlm will pass these to a misc device, which
 
 764          * a userspace daemon will read and write to.
 
 766          * For now, cancel requests (which happen internally only),
 
 767          * are turned into unlocks. Most of this function taken from
 
 771         if (cmd == F_CANCELLK) {
 
 773                 fl->fl_type = F_UNLCK;
 
 777                 return dlm_posix_get(conn->cc_lockspace, ino, file, fl);
 
 778         else if (fl->fl_type == F_UNLCK)
 
 779                 return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl);
 
 781                 return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl);
 
 785  * Compare a requested locking protocol version against the current one.
 
 787  * If the major numbers are different, they are incompatible.
 
 788  * If the current minor is greater than the request, they are incompatible.
 
 789  * If the current minor is less than or equal to the request, they are
 
 790  * compatible, and the requester should run at the current minor version.
 
 792 static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
 
 793                                struct ocfs2_protocol_version *request)
 
 795         if (existing->pv_major != request->pv_major)
 
 798         if (existing->pv_minor > request->pv_minor)
 
 801         if (existing->pv_minor < request->pv_minor)
 
 802                 request->pv_minor = existing->pv_minor;
 
 807 static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
 
 809         dlm_lockspace_t *fsdlm;
 
 810         struct ocfs2_live_connection *control;
 
 813         BUG_ON(conn == NULL);
 
 815         rc = ocfs2_live_connection_new(conn, &control);
 
 820          * running_proto must have been set before we allowed any mounts
 
 823         if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
 
 825                        "Unable to mount with fs locking protocol version "
 
 826                        "%u.%u because the userspace control daemon has "
 
 827                        "negotiated %u.%u\n",
 
 828                        conn->cc_version.pv_major, conn->cc_version.pv_minor,
 
 829                        running_proto.pv_major, running_proto.pv_minor);
 
 831                 ocfs2_live_connection_drop(control);
 
 835         rc = dlm_new_lockspace(conn->cc_name, strlen(conn->cc_name),
 
 836                                &fsdlm, DLM_LSFL_FS, DLM_LVB_LEN);
 
 838                 ocfs2_live_connection_drop(control);
 
 842         conn->cc_private = control;
 
 843         conn->cc_lockspace = fsdlm;
 
 848 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
 
 850         dlm_release_lockspace(conn->cc_lockspace, 2);
 
 851         conn->cc_lockspace = NULL;
 
 852         ocfs2_live_connection_drop(conn->cc_private);
 
 853         conn->cc_private = NULL;
 
 857 static int user_cluster_this_node(unsigned int *this_node)
 
 861         rc = ocfs2_control_get_this_node();
 
 869 static struct ocfs2_stack_operations ocfs2_user_plugin_ops = {
 
 870         .connect        = user_cluster_connect,
 
 871         .disconnect     = user_cluster_disconnect,
 
 872         .this_node      = user_cluster_this_node,
 
 873         .dlm_lock       = user_dlm_lock,
 
 874         .dlm_unlock     = user_dlm_unlock,
 
 875         .lock_status    = user_dlm_lock_status,
 
 876         .lock_lvb       = user_dlm_lvb,
 
 878         .dump_lksb      = user_dlm_dump_lksb,
 
 881 static struct ocfs2_stack_plugin ocfs2_user_plugin = {
 
 883         .sp_ops         = &ocfs2_user_plugin_ops,
 
 884         .sp_owner       = THIS_MODULE,
 
 888 static int __init ocfs2_user_plugin_init(void)
 
 892         rc = ocfs2_control_init();
 
 894                 rc = ocfs2_stack_glue_register(&ocfs2_user_plugin);
 
 896                         ocfs2_control_exit();
 
 902 static void __exit ocfs2_user_plugin_exit(void)
 
 904         ocfs2_stack_glue_unregister(&ocfs2_user_plugin);
 
 905         ocfs2_control_exit();
 
 908 MODULE_AUTHOR("Oracle");
 
 909 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks");
 
 910 MODULE_LICENSE("GPL");
 
 911 module_init(ocfs2_user_plugin_init);
 
 912 module_exit(ocfs2_user_plugin_exit);