1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
25 #include "requestqueue.h"
27 #ifdef CONFIG_DLM_DEBUG
28 int dlm_create_debug_file(struct dlm_ls *ls);
29 void dlm_delete_debug_file(struct dlm_ls *ls);
31 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
32 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
36 static struct mutex ls_lock;
37 static struct list_head lslist;
38 static spinlock_t lslist_lock;
39 static struct task_struct * scand_task;
42 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
45 int n = simple_strtol(buf, NULL, 0);
47 ls = dlm_find_lockspace_local(ls->ls_local_handle);
61 dlm_put_lockspace(ls);
65 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
67 ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
68 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69 wake_up(&ls->ls_uevent_wait);
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
75 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
80 ls->ls_global_id = simple_strtoul(buf, NULL, 0);
84 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
86 uint32_t status = dlm_recover_status(ls);
87 return snprintf(buf, PAGE_SIZE, "%x\n", status);
90 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
92 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
96 struct attribute attr;
97 ssize_t (*show)(struct dlm_ls *, char *);
98 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
101 static struct dlm_attr dlm_attr_control = {
102 .attr = {.name = "control", .mode = S_IWUSR},
103 .store = dlm_control_store
106 static struct dlm_attr dlm_attr_event = {
107 .attr = {.name = "event_done", .mode = S_IWUSR},
108 .store = dlm_event_store
111 static struct dlm_attr dlm_attr_id = {
112 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
114 .store = dlm_id_store
117 static struct dlm_attr dlm_attr_recover_status = {
118 .attr = {.name = "recover_status", .mode = S_IRUGO},
119 .show = dlm_recover_status_show
122 static struct dlm_attr dlm_attr_recover_nodeid = {
123 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
124 .show = dlm_recover_nodeid_show
127 static struct attribute *dlm_attrs[] = {
128 &dlm_attr_control.attr,
129 &dlm_attr_event.attr,
131 &dlm_attr_recover_status.attr,
132 &dlm_attr_recover_nodeid.attr,
136 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
139 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
140 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141 return a->show ? a->show(ls, buf) : 0;
144 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
145 const char *buf, size_t len)
147 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
148 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
149 return a->store ? a->store(ls, buf, len) : len;
152 static void lockspace_kobj_release(struct kobject *k)
154 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
158 static struct sysfs_ops dlm_attr_ops = {
159 .show = dlm_attr_show,
160 .store = dlm_attr_store,
163 static struct kobj_type dlm_ktype = {
164 .default_attrs = dlm_attrs,
165 .sysfs_ops = &dlm_attr_ops,
166 .release = lockspace_kobj_release,
169 static struct kset *dlm_kset;
171 static int do_uevent(struct dlm_ls *ls, int in)
176 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
178 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
180 log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
182 /* dlm_controld will see the uevent, do the necessary group management
183 and then write to sysfs to wake us */
185 error = wait_event_interruptible(ls->ls_uevent_wait,
186 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
188 log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
193 error = ls->ls_uevent_result;
196 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
197 error, ls->ls_uevent_result);
202 int dlm_lockspace_init(void)
205 mutex_init(&ls_lock);
206 INIT_LIST_HEAD(&lslist);
207 spin_lock_init(&lslist_lock);
209 dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
211 printk(KERN_WARNING "%s: can not create kset\n", __FUNCTION__);
217 void dlm_lockspace_exit(void)
219 kset_unregister(dlm_kset);
222 static int dlm_scand(void *data)
226 while (!kthread_should_stop()) {
227 list_for_each_entry(ls, &lslist, ls_list) {
228 if (dlm_lock_recovery_try(ls)) {
230 dlm_scan_timeout(ls);
231 dlm_unlock_recovery(ls);
234 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
239 static int dlm_scand_start(void)
241 struct task_struct *p;
244 p = kthread_run(dlm_scand, NULL, "dlm_scand");
252 static void dlm_scand_stop(void)
254 kthread_stop(scand_task);
257 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
261 spin_lock(&lslist_lock);
263 list_for_each_entry(ls, &lslist, ls_list) {
264 if (ls->ls_namelen == namelen &&
265 memcmp(ls->ls_name, name, namelen) == 0)
270 spin_unlock(&lslist_lock);
274 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
278 spin_lock(&lslist_lock);
280 list_for_each_entry(ls, &lslist, ls_list) {
281 if (ls->ls_global_id == id) {
288 spin_unlock(&lslist_lock);
292 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
296 spin_lock(&lslist_lock);
297 list_for_each_entry(ls, &lslist, ls_list) {
298 if (ls->ls_local_handle == lockspace) {
305 spin_unlock(&lslist_lock);
309 struct dlm_ls *dlm_find_lockspace_device(int minor)
313 spin_lock(&lslist_lock);
314 list_for_each_entry(ls, &lslist, ls_list) {
315 if (ls->ls_device.minor == minor) {
322 spin_unlock(&lslist_lock);
326 void dlm_put_lockspace(struct dlm_ls *ls)
328 spin_lock(&lslist_lock);
330 spin_unlock(&lslist_lock);
333 static void remove_lockspace(struct dlm_ls *ls)
336 spin_lock(&lslist_lock);
337 if (ls->ls_count == 0) {
338 list_del(&ls->ls_list);
339 spin_unlock(&lslist_lock);
342 spin_unlock(&lslist_lock);
347 static int threads_start(void)
351 /* Thread which process lock requests for all lockspace's */
352 error = dlm_astd_start();
354 log_print("cannot start dlm_astd thread %d", error);
358 error = dlm_scand_start();
360 log_print("cannot start dlm_scand thread %d", error);
364 /* Thread for sending/receiving messages for all lockspace's */
365 error = dlm_lowcomms_start();
367 log_print("cannot start dlm lowcomms %d", error);
381 static void threads_stop(void)
388 static int new_lockspace(char *name, int namelen, void **lockspace,
389 uint32_t flags, int lvblen)
392 int i, size, error = -ENOMEM;
395 if (namelen > DLM_LOCKSPACE_LEN)
398 if (!lvblen || (lvblen % 8))
401 if (!try_module_get(THIS_MODULE))
404 ls = dlm_find_lockspace_name(name, namelen);
407 module_put(THIS_MODULE);
411 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
414 memcpy(ls->ls_name, name, namelen);
415 ls->ls_namelen = namelen;
416 ls->ls_lvblen = lvblen;
420 if (flags & DLM_LSFL_TIMEWARN)
421 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
423 if (flags & DLM_LSFL_FS)
424 ls->ls_allocation = GFP_NOFS;
426 ls->ls_allocation = GFP_KERNEL;
428 /* ls_exflags are forced to match among nodes, and we don't
429 need to require all nodes to have TIMEWARN or FS set */
430 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS));
432 size = dlm_config.ci_rsbtbl_size;
433 ls->ls_rsbtbl_size = size;
435 ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
438 for (i = 0; i < size; i++) {
439 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
440 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
441 rwlock_init(&ls->ls_rsbtbl[i].lock);
444 size = dlm_config.ci_lkbtbl_size;
445 ls->ls_lkbtbl_size = size;
447 ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
450 for (i = 0; i < size; i++) {
451 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
452 rwlock_init(&ls->ls_lkbtbl[i].lock);
453 ls->ls_lkbtbl[i].counter = 1;
456 size = dlm_config.ci_dirtbl_size;
457 ls->ls_dirtbl_size = size;
459 ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
462 for (i = 0; i < size; i++) {
463 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
464 rwlock_init(&ls->ls_dirtbl[i].lock);
467 INIT_LIST_HEAD(&ls->ls_waiters);
468 mutex_init(&ls->ls_waiters_mutex);
469 INIT_LIST_HEAD(&ls->ls_orphans);
470 mutex_init(&ls->ls_orphans_mutex);
471 INIT_LIST_HEAD(&ls->ls_timeout);
472 mutex_init(&ls->ls_timeout_mutex);
474 INIT_LIST_HEAD(&ls->ls_nodes);
475 INIT_LIST_HEAD(&ls->ls_nodes_gone);
476 ls->ls_num_nodes = 0;
477 ls->ls_low_nodeid = 0;
478 ls->ls_total_weight = 0;
479 ls->ls_node_array = NULL;
481 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
482 ls->ls_stub_rsb.res_ls = ls;
484 ls->ls_debug_rsb_dentry = NULL;
485 ls->ls_debug_waiters_dentry = NULL;
487 init_waitqueue_head(&ls->ls_uevent_wait);
488 ls->ls_uevent_result = 0;
489 init_completion(&ls->ls_members_done);
490 ls->ls_members_result = -1;
492 ls->ls_recoverd_task = NULL;
493 mutex_init(&ls->ls_recoverd_active);
494 spin_lock_init(&ls->ls_recover_lock);
495 spin_lock_init(&ls->ls_rcom_spin);
496 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
497 ls->ls_recover_status = 0;
498 ls->ls_recover_seq = 0;
499 ls->ls_recover_args = NULL;
500 init_rwsem(&ls->ls_in_recovery);
501 init_rwsem(&ls->ls_recv_active);
502 INIT_LIST_HEAD(&ls->ls_requestqueue);
503 mutex_init(&ls->ls_requestqueue_mutex);
504 mutex_init(&ls->ls_clear_proc_locks);
506 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
507 if (!ls->ls_recover_buf)
510 INIT_LIST_HEAD(&ls->ls_recover_list);
511 spin_lock_init(&ls->ls_recover_list_lock);
512 ls->ls_recover_list_count = 0;
513 ls->ls_local_handle = ls;
514 init_waitqueue_head(&ls->ls_wait_general);
515 INIT_LIST_HEAD(&ls->ls_root_list);
516 init_rwsem(&ls->ls_root_sem);
518 down_write(&ls->ls_in_recovery);
520 spin_lock(&lslist_lock);
521 list_add(&ls->ls_list, &lslist);
522 spin_unlock(&lslist_lock);
524 /* needs to find ls in lslist */
525 error = dlm_recoverd_start(ls);
527 log_error(ls, "can't start dlm_recoverd %d", error);
531 ls->ls_kobj.kset = dlm_kset;
532 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
536 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
538 /* let kobject handle freeing of ls if there's an error */
541 /* This uevent triggers dlm_controld in userspace to add us to the
542 group of nodes that are members of this lockspace (managed by the
543 cluster infrastructure.) Once it's done that, it tells us who the
544 current lockspace members are (via configfs) and then tells the
545 lockspace to start running (via sysfs) in dlm_ls_start(). */
547 error = do_uevent(ls, 1);
551 wait_for_completion(&ls->ls_members_done);
552 error = ls->ls_members_result;
556 dlm_create_debug_file(ls);
558 log_debug(ls, "join complete");
565 dlm_clear_members(ls);
566 kfree(ls->ls_node_array);
568 dlm_recoverd_stop(ls);
570 spin_lock(&lslist_lock);
571 list_del(&ls->ls_list);
572 spin_unlock(&lslist_lock);
573 kfree(ls->ls_recover_buf);
575 kfree(ls->ls_dirtbl);
577 kfree(ls->ls_lkbtbl);
579 kfree(ls->ls_rsbtbl);
582 kobject_put(&ls->ls_kobj);
586 module_put(THIS_MODULE);
590 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
591 uint32_t flags, int lvblen)
595 mutex_lock(&ls_lock);
597 error = threads_start();
601 error = new_lockspace(name, namelen, lockspace, flags, lvblen);
607 mutex_unlock(&ls_lock);
611 /* Return 1 if the lockspace still has active remote locks,
612 * 2 if the lockspace still has active local locks.
614 static int lockspace_busy(struct dlm_ls *ls)
616 int i, lkb_found = 0;
619 /* NOTE: We check the lockidtbl here rather than the resource table.
620 This is because there may be LKBs queued as ASTs that have been
621 unlinked from their RSBs and are pending deletion once the AST has
624 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
625 read_lock(&ls->ls_lkbtbl[i].lock);
626 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
628 list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
630 if (!lkb->lkb_nodeid) {
631 read_unlock(&ls->ls_lkbtbl[i].lock);
636 read_unlock(&ls->ls_lkbtbl[i].lock);
641 static int release_lockspace(struct dlm_ls *ls, int force)
645 struct list_head *head;
647 int busy = lockspace_busy(ls);
655 dlm_recoverd_stop(ls);
657 remove_lockspace(ls);
659 dlm_delete_debug_file(ls);
663 kfree(ls->ls_recover_buf);
666 * Free direntry structs.
670 kfree(ls->ls_dirtbl);
673 * Free all lkb's on lkbtbl[] lists.
676 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
677 head = &ls->ls_lkbtbl[i].list;
678 while (!list_empty(head)) {
679 lkb = list_entry(head->next, struct dlm_lkb,
682 list_del(&lkb->lkb_idtbl_list);
686 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
687 free_lvb(lkb->lkb_lvbptr);
694 kfree(ls->ls_lkbtbl);
697 * Free all rsb's on rsbtbl[] lists
700 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
701 head = &ls->ls_rsbtbl[i].list;
702 while (!list_empty(head)) {
703 rsb = list_entry(head->next, struct dlm_rsb,
706 list_del(&rsb->res_hashchain);
710 head = &ls->ls_rsbtbl[i].toss;
711 while (!list_empty(head)) {
712 rsb = list_entry(head->next, struct dlm_rsb,
714 list_del(&rsb->res_hashchain);
719 kfree(ls->ls_rsbtbl);
722 * Free structures on any other lists
725 dlm_purge_requestqueue(ls);
726 kfree(ls->ls_recover_args);
727 dlm_clear_free_entries(ls);
728 dlm_clear_members(ls);
729 dlm_clear_members_gone(ls);
730 kfree(ls->ls_node_array);
731 kobject_put(&ls->ls_kobj);
732 /* The ls structure will be freed when the kobject is done with */
734 mutex_lock(&ls_lock);
738 mutex_unlock(&ls_lock);
740 module_put(THIS_MODULE);
745 * Called when a system has released all its locks and is not going to use the
746 * lockspace any longer. We free everything we're managing for this lockspace.
747 * Remaining nodes will go through the recovery process as if we'd died. The
748 * lockspace must continue to function as usual, participating in recoveries,
749 * until this returns.
751 * Force has 4 possible values:
752 * 0 - don't destroy locksapce if it has any LKBs
753 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
754 * 2 - destroy lockspace regardless of LKBs
755 * 3 - destroy lockspace as part of a forced shutdown
758 int dlm_release_lockspace(void *lockspace, int force)
762 ls = dlm_find_lockspace_local(lockspace);
765 dlm_put_lockspace(ls);
766 return release_lockspace(ls, force);