1 /******************************************************************************
 
   2 *******************************************************************************
 
   4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
 
   5 **  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
 
   7 **  This copyrighted material is made available to anyone wishing to use,
 
   8 **  modify, copy, or redistribute it subject to the terms and conditions
 
   9 **  of the GNU General Public License v.2.
 
  11 *******************************************************************************
 
  12 ******************************************************************************/
 
  14 #include "dlm_internal.h"
 
  15 #include "lockspace.h"
 
  25 #include "requestqueue.h"
 
  29 static struct mutex             ls_lock;
 
  30 static struct list_head         lslist;
 
  31 static spinlock_t               lslist_lock;
 
  32 static struct task_struct *     scand_task;
 
  35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 
  38         int n = simple_strtol(buf, NULL, 0);
 
  40         ls = dlm_find_lockspace_local(ls->ls_local_handle);
 
  54         dlm_put_lockspace(ls);
 
  58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
 
  60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
 
  61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
 
  62         wake_up(&ls->ls_uevent_wait);
 
  66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
 
  68         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
 
  71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
 
  73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
 
  77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
 
  79         uint32_t status = dlm_recover_status(ls);
 
  80         return snprintf(buf, PAGE_SIZE, "%x\n", status);
 
  83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
 
  85         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
 
  89         struct attribute attr;
 
  90         ssize_t (*show)(struct dlm_ls *, char *);
 
  91         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
 
  94 static struct dlm_attr dlm_attr_control = {
 
  95         .attr  = {.name = "control", .mode = S_IWUSR},
 
  96         .store = dlm_control_store
 
  99 static struct dlm_attr dlm_attr_event = {
 
 100         .attr  = {.name = "event_done", .mode = S_IWUSR},
 
 101         .store = dlm_event_store
 
 104 static struct dlm_attr dlm_attr_id = {
 
 105         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
 
 107         .store = dlm_id_store
 
 110 static struct dlm_attr dlm_attr_recover_status = {
 
 111         .attr  = {.name = "recover_status", .mode = S_IRUGO},
 
 112         .show  = dlm_recover_status_show
 
 115 static struct dlm_attr dlm_attr_recover_nodeid = {
 
 116         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
 
 117         .show  = dlm_recover_nodeid_show
 
 120 static struct attribute *dlm_attrs[] = {
 
 121         &dlm_attr_control.attr,
 
 122         &dlm_attr_event.attr,
 
 124         &dlm_attr_recover_status.attr,
 
 125         &dlm_attr_recover_nodeid.attr,
 
 129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
 
 132         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 
 133         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 
 134         return a->show ? a->show(ls, buf) : 0;
 
 137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
 
 138                               const char *buf, size_t len)
 
 140         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
 
 141         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
 
 142         return a->store ? a->store(ls, buf, len) : len;
 
 145 static void lockspace_kobj_release(struct kobject *k)
 
 147         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
 
 151 static struct sysfs_ops dlm_attr_ops = {
 
 152         .show  = dlm_attr_show,
 
 153         .store = dlm_attr_store,
 
 156 static struct kobj_type dlm_ktype = {
 
 157         .default_attrs = dlm_attrs,
 
 158         .sysfs_ops     = &dlm_attr_ops,
 
 159         .release       = lockspace_kobj_release,
 
 162 static struct kset *dlm_kset;
 
 164 static int do_uevent(struct dlm_ls *ls, int in)
 
 169                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
 
 171                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
 
 173         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
 
 175         /* dlm_controld will see the uevent, do the necessary group management
 
 176            and then write to sysfs to wake us */
 
 178         error = wait_event_interruptible(ls->ls_uevent_wait,
 
 179                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 
 181         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
 
 186         error = ls->ls_uevent_result;
 
 189                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
 
 190                           error, ls->ls_uevent_result);
 
 195 int __init dlm_lockspace_init(void)
 
 198         mutex_init(&ls_lock);
 
 199         INIT_LIST_HEAD(&lslist);
 
 200         spin_lock_init(&lslist_lock);
 
 202         dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
 
 204                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
 
 210 void dlm_lockspace_exit(void)
 
 212         kset_unregister(dlm_kset);
 
 215 static struct dlm_ls *find_ls_to_scan(void)
 
 219         spin_lock(&lslist_lock);
 
 220         list_for_each_entry(ls, &lslist, ls_list) {
 
 221                 if (time_after_eq(jiffies, ls->ls_scan_time +
 
 222                                             dlm_config.ci_scan_secs * HZ)) {
 
 223                         spin_unlock(&lslist_lock);
 
 227         spin_unlock(&lslist_lock);
 
 231 static int dlm_scand(void *data)
 
 234         int timeout_jiffies = dlm_config.ci_scan_secs * HZ;
 
 236         while (!kthread_should_stop()) {
 
 237                 ls = find_ls_to_scan();
 
 239                         if (dlm_lock_recovery_try(ls)) {
 
 240                                 ls->ls_scan_time = jiffies;
 
 242                                 dlm_scan_timeout(ls);
 
 243                                 dlm_unlock_recovery(ls);
 
 245                                 ls->ls_scan_time += HZ;
 
 248                         schedule_timeout_interruptible(timeout_jiffies);
 
 254 static int dlm_scand_start(void)
 
 256         struct task_struct *p;
 
 259         p = kthread_run(dlm_scand, NULL, "dlm_scand");
 
 267 static void dlm_scand_stop(void)
 
 269         kthread_stop(scand_task);
 
 272 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
 
 276         spin_lock(&lslist_lock);
 
 278         list_for_each_entry(ls, &lslist, ls_list) {
 
 279                 if (ls->ls_global_id == id) {
 
 286         spin_unlock(&lslist_lock);
 
 290 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 
 294         spin_lock(&lslist_lock);
 
 295         list_for_each_entry(ls, &lslist, ls_list) {
 
 296                 if (ls->ls_local_handle == lockspace) {
 
 303         spin_unlock(&lslist_lock);
 
 307 struct dlm_ls *dlm_find_lockspace_device(int minor)
 
 311         spin_lock(&lslist_lock);
 
 312         list_for_each_entry(ls, &lslist, ls_list) {
 
 313                 if (ls->ls_device.minor == minor) {
 
 320         spin_unlock(&lslist_lock);
 
 324 void dlm_put_lockspace(struct dlm_ls *ls)
 
 326         spin_lock(&lslist_lock);
 
 328         spin_unlock(&lslist_lock);
 
 331 static void remove_lockspace(struct dlm_ls *ls)
 
 334                 spin_lock(&lslist_lock);
 
 335                 if (ls->ls_count == 0) {
 
 336                         WARN_ON(ls->ls_create_count != 0);
 
 337                         list_del(&ls->ls_list);
 
 338                         spin_unlock(&lslist_lock);
 
 341                 spin_unlock(&lslist_lock);
 
 346 static int threads_start(void)
 
 350         /* Thread which process lock requests for all lockspace's */
 
 351         error = dlm_astd_start();
 
 353                 log_print("cannot start dlm_astd thread %d", error);
 
 357         error = dlm_scand_start();
 
 359                 log_print("cannot start dlm_scand thread %d", error);
 
 363         /* Thread for sending/receiving messages for all lockspace's */
 
 364         error = dlm_lowcomms_start();
 
 366                 log_print("cannot start dlm lowcomms %d", error);
 
 380 static void threads_stop(void)
 
 387 static int new_lockspace(char *name, int namelen, void **lockspace,
 
 388                          uint32_t flags, int lvblen)
 
 394         if (namelen > DLM_LOCKSPACE_LEN)
 
 397         if (!lvblen || (lvblen % 8))
 
 400         if (!try_module_get(THIS_MODULE))
 
 403         if (!dlm_user_daemon_available()) {
 
 404                 module_put(THIS_MODULE);
 
 410         spin_lock(&lslist_lock);
 
 411         list_for_each_entry(ls, &lslist, ls_list) {
 
 412                 WARN_ON(ls->ls_create_count <= 0);
 
 413                 if (ls->ls_namelen != namelen)
 
 415                 if (memcmp(ls->ls_name, name, namelen))
 
 417                 if (flags & DLM_LSFL_NEWEXCL) {
 
 421                 ls->ls_create_count++;
 
 422                 module_put(THIS_MODULE);
 
 423                 error = 1; /* not an error, return 0 */
 
 426         spin_unlock(&lslist_lock);
 
 435         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
 
 438         memcpy(ls->ls_name, name, namelen);
 
 439         ls->ls_namelen = namelen;
 
 440         ls->ls_lvblen = lvblen;
 
 443         ls->ls_scan_time = jiffies;
 
 445         if (flags & DLM_LSFL_TIMEWARN)
 
 446                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 
 448         if (flags & DLM_LSFL_FS)
 
 449                 ls->ls_allocation = GFP_NOFS;
 
 451                 ls->ls_allocation = GFP_KERNEL;
 
 453         /* ls_exflags are forced to match among nodes, and we don't
 
 454            need to require all nodes to have some flags set */
 
 455         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
 
 458         size = dlm_config.ci_rsbtbl_size;
 
 459         ls->ls_rsbtbl_size = size;
 
 461         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
 
 464         for (i = 0; i < size; i++) {
 
 465                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
 
 466                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
 
 467                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
 
 470         size = dlm_config.ci_lkbtbl_size;
 
 471         ls->ls_lkbtbl_size = size;
 
 473         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
 
 476         for (i = 0; i < size; i++) {
 
 477                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
 
 478                 rwlock_init(&ls->ls_lkbtbl[i].lock);
 
 479                 ls->ls_lkbtbl[i].counter = 1;
 
 482         size = dlm_config.ci_dirtbl_size;
 
 483         ls->ls_dirtbl_size = size;
 
 485         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
 
 488         for (i = 0; i < size; i++) {
 
 489                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
 
 490                 spin_lock_init(&ls->ls_dirtbl[i].lock);
 
 493         INIT_LIST_HEAD(&ls->ls_waiters);
 
 494         mutex_init(&ls->ls_waiters_mutex);
 
 495         INIT_LIST_HEAD(&ls->ls_orphans);
 
 496         mutex_init(&ls->ls_orphans_mutex);
 
 497         INIT_LIST_HEAD(&ls->ls_timeout);
 
 498         mutex_init(&ls->ls_timeout_mutex);
 
 500         INIT_LIST_HEAD(&ls->ls_nodes);
 
 501         INIT_LIST_HEAD(&ls->ls_nodes_gone);
 
 502         ls->ls_num_nodes = 0;
 
 503         ls->ls_low_nodeid = 0;
 
 504         ls->ls_total_weight = 0;
 
 505         ls->ls_node_array = NULL;
 
 507         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
 
 508         ls->ls_stub_rsb.res_ls = ls;
 
 510         ls->ls_debug_rsb_dentry = NULL;
 
 511         ls->ls_debug_waiters_dentry = NULL;
 
 513         init_waitqueue_head(&ls->ls_uevent_wait);
 
 514         ls->ls_uevent_result = 0;
 
 515         init_completion(&ls->ls_members_done);
 
 516         ls->ls_members_result = -1;
 
 518         ls->ls_recoverd_task = NULL;
 
 519         mutex_init(&ls->ls_recoverd_active);
 
 520         spin_lock_init(&ls->ls_recover_lock);
 
 521         spin_lock_init(&ls->ls_rcom_spin);
 
 522         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
 
 523         ls->ls_recover_status = 0;
 
 524         ls->ls_recover_seq = 0;
 
 525         ls->ls_recover_args = NULL;
 
 526         init_rwsem(&ls->ls_in_recovery);
 
 527         init_rwsem(&ls->ls_recv_active);
 
 528         INIT_LIST_HEAD(&ls->ls_requestqueue);
 
 529         mutex_init(&ls->ls_requestqueue_mutex);
 
 530         mutex_init(&ls->ls_clear_proc_locks);
 
 532         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
 
 533         if (!ls->ls_recover_buf)
 
 536         INIT_LIST_HEAD(&ls->ls_recover_list);
 
 537         spin_lock_init(&ls->ls_recover_list_lock);
 
 538         ls->ls_recover_list_count = 0;
 
 539         ls->ls_local_handle = ls;
 
 540         init_waitqueue_head(&ls->ls_wait_general);
 
 541         INIT_LIST_HEAD(&ls->ls_root_list);
 
 542         init_rwsem(&ls->ls_root_sem);
 
 544         down_write(&ls->ls_in_recovery);
 
 546         spin_lock(&lslist_lock);
 
 547         ls->ls_create_count = 1;
 
 548         list_add(&ls->ls_list, &lslist);
 
 549         spin_unlock(&lslist_lock);
 
 551         /* needs to find ls in lslist */
 
 552         error = dlm_recoverd_start(ls);
 
 554                 log_error(ls, "can't start dlm_recoverd %d", error);
 
 558         ls->ls_kobj.kset = dlm_kset;
 
 559         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 
 563         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
 
 565         /* let kobject handle freeing of ls if there's an error */
 
 568         /* This uevent triggers dlm_controld in userspace to add us to the
 
 569            group of nodes that are members of this lockspace (managed by the
 
 570            cluster infrastructure.)  Once it's done that, it tells us who the
 
 571            current lockspace members are (via configfs) and then tells the
 
 572            lockspace to start running (via sysfs) in dlm_ls_start(). */
 
 574         error = do_uevent(ls, 1);
 
 578         wait_for_completion(&ls->ls_members_done);
 
 579         error = ls->ls_members_result;
 
 583         dlm_create_debug_file(ls);
 
 585         log_debug(ls, "join complete");
 
 592         dlm_clear_members(ls);
 
 593         kfree(ls->ls_node_array);
 
 595         dlm_recoverd_stop(ls);
 
 597         spin_lock(&lslist_lock);
 
 598         list_del(&ls->ls_list);
 
 599         spin_unlock(&lslist_lock);
 
 600         kfree(ls->ls_recover_buf);
 
 602         kfree(ls->ls_dirtbl);
 
 604         kfree(ls->ls_lkbtbl);
 
 606         kfree(ls->ls_rsbtbl);
 
 609                 kobject_put(&ls->ls_kobj);
 
 613         module_put(THIS_MODULE);
 
 617 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
 
 618                       uint32_t flags, int lvblen)
 
 622         mutex_lock(&ls_lock);
 
 624                 error = threads_start();
 
 628         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
 
 634         mutex_unlock(&ls_lock);
 
 638 /* Return 1 if the lockspace still has active remote locks,
 
 639  *        2 if the lockspace still has active local locks.
 
 641 static int lockspace_busy(struct dlm_ls *ls)
 
 643         int i, lkb_found = 0;
 
 646         /* NOTE: We check the lockidtbl here rather than the resource table.
 
 647            This is because there may be LKBs queued as ASTs that have been
 
 648            unlinked from their RSBs and are pending deletion once the AST has
 
 651         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
 
 652                 read_lock(&ls->ls_lkbtbl[i].lock);
 
 653                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
 
 655                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
 
 657                                 if (!lkb->lkb_nodeid) {
 
 658                                         read_unlock(&ls->ls_lkbtbl[i].lock);
 
 663                 read_unlock(&ls->ls_lkbtbl[i].lock);
 
 668 static int release_lockspace(struct dlm_ls *ls, int force)
 
 672         struct list_head *head;
 
 675         busy = lockspace_busy(ls);
 
 677         spin_lock(&lslist_lock);
 
 678         if (ls->ls_create_count == 1) {
 
 682                         /* remove_lockspace takes ls off lslist */
 
 683                         ls->ls_create_count = 0;
 
 686         } else if (ls->ls_create_count > 1) {
 
 687                 rv = --ls->ls_create_count;
 
 691         spin_unlock(&lslist_lock);
 
 694                 log_debug(ls, "release_lockspace no remove %d", rv);
 
 698         dlm_device_deregister(ls);
 
 700         if (force < 3 && dlm_user_daemon_available())
 
 703         dlm_recoverd_stop(ls);
 
 705         remove_lockspace(ls);
 
 707         dlm_delete_debug_file(ls);
 
 711         kfree(ls->ls_recover_buf);
 
 714          * Free direntry structs.
 
 718         kfree(ls->ls_dirtbl);
 
 721          * Free all lkb's on lkbtbl[] lists.
 
 724         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
 
 725                 head = &ls->ls_lkbtbl[i].list;
 
 726                 while (!list_empty(head)) {
 
 727                         lkb = list_entry(head->next, struct dlm_lkb,
 
 730                         list_del(&lkb->lkb_idtbl_list);
 
 734                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
 
 735                                 dlm_free_lvb(lkb->lkb_lvbptr);
 
 742         kfree(ls->ls_lkbtbl);
 
 745          * Free all rsb's on rsbtbl[] lists
 
 748         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 
 749                 head = &ls->ls_rsbtbl[i].list;
 
 750                 while (!list_empty(head)) {
 
 751                         rsb = list_entry(head->next, struct dlm_rsb,
 
 754                         list_del(&rsb->res_hashchain);
 
 758                 head = &ls->ls_rsbtbl[i].toss;
 
 759                 while (!list_empty(head)) {
 
 760                         rsb = list_entry(head->next, struct dlm_rsb,
 
 762                         list_del(&rsb->res_hashchain);
 
 767         kfree(ls->ls_rsbtbl);
 
 770          * Free structures on any other lists
 
 773         dlm_purge_requestqueue(ls);
 
 774         kfree(ls->ls_recover_args);
 
 775         dlm_clear_free_entries(ls);
 
 776         dlm_clear_members(ls);
 
 777         dlm_clear_members_gone(ls);
 
 778         kfree(ls->ls_node_array);
 
 779         log_debug(ls, "release_lockspace final free");
 
 780         kobject_put(&ls->ls_kobj);
 
 781         /* The ls structure will be freed when the kobject is done with */
 
 783         module_put(THIS_MODULE);
 
 788  * Called when a system has released all its locks and is not going to use the
 
 789  * lockspace any longer.  We free everything we're managing for this lockspace.
 
 790  * Remaining nodes will go through the recovery process as if we'd died.  The
 
 791  * lockspace must continue to function as usual, participating in recoveries,
 
 792  * until this returns.
 
 794  * Force has 4 possible values:
 
 795  * 0 - don't destroy locksapce if it has any LKBs
 
 796  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
 
 797  * 2 - destroy lockspace regardless of LKBs
 
 798  * 3 - destroy lockspace as part of a forced shutdown
 
 801 int dlm_release_lockspace(void *lockspace, int force)
 
 806         ls = dlm_find_lockspace_local(lockspace);
 
 809         dlm_put_lockspace(ls);
 
 811         mutex_lock(&ls_lock);
 
 812         error = release_lockspace(ls, force);
 
 817         mutex_unlock(&ls_lock);
 
 822 void dlm_stop_lockspaces(void)
 
 827         spin_lock(&lslist_lock);
 
 828         list_for_each_entry(ls, &lslist, ls_list) {
 
 829                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
 
 831                 spin_unlock(&lslist_lock);
 
 832                 log_error(ls, "no userland control daemon, stopping lockspace");
 
 836         spin_unlock(&lslist_lock);