Merge branch 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/jgarzi...
[linux-2.6] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26
27 #ifdef CONFIG_DLM_DEBUG
28 int dlm_create_debug_file(struct dlm_ls *ls);
29 void dlm_delete_debug_file(struct dlm_ls *ls);
30 #else
31 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
32 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
33 #endif
34
35 static int                      ls_count;
36 static struct mutex             ls_lock;
37 static struct list_head         lslist;
38 static spinlock_t               lslist_lock;
39 static struct task_struct *     scand_task;
40
41
42 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
43 {
44         ssize_t ret = len;
45         int n = simple_strtol(buf, NULL, 0);
46
47         ls = dlm_find_lockspace_local(ls->ls_local_handle);
48         if (!ls)
49                 return -EINVAL;
50
51         switch (n) {
52         case 0:
53                 dlm_ls_stop(ls);
54                 break;
55         case 1:
56                 dlm_ls_start(ls);
57                 break;
58         default:
59                 ret = -EINVAL;
60         }
61         dlm_put_lockspace(ls);
62         return ret;
63 }
64
65 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
66 {
67         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
68         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69         wake_up(&ls->ls_uevent_wait);
70         return len;
71 }
72
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
81         return len;
82 }
83
84 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
85 {
86         uint32_t status = dlm_recover_status(ls);
87         return snprintf(buf, PAGE_SIZE, "%x\n", status);
88 }
89
90 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
91 {
92         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
93 }
94
95 struct dlm_attr {
96         struct attribute attr;
97         ssize_t (*show)(struct dlm_ls *, char *);
98         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
99 };
100
101 static struct dlm_attr dlm_attr_control = {
102         .attr  = {.name = "control", .mode = S_IWUSR},
103         .store = dlm_control_store
104 };
105
106 static struct dlm_attr dlm_attr_event = {
107         .attr  = {.name = "event_done", .mode = S_IWUSR},
108         .store = dlm_event_store
109 };
110
111 static struct dlm_attr dlm_attr_id = {
112         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
113         .show  = dlm_id_show,
114         .store = dlm_id_store
115 };
116
117 static struct dlm_attr dlm_attr_recover_status = {
118         .attr  = {.name = "recover_status", .mode = S_IRUGO},
119         .show  = dlm_recover_status_show
120 };
121
122 static struct dlm_attr dlm_attr_recover_nodeid = {
123         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
124         .show  = dlm_recover_nodeid_show
125 };
126
127 static struct attribute *dlm_attrs[] = {
128         &dlm_attr_control.attr,
129         &dlm_attr_event.attr,
130         &dlm_attr_id.attr,
131         &dlm_attr_recover_status.attr,
132         &dlm_attr_recover_nodeid.attr,
133         NULL,
134 };
135
136 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
137                              char *buf)
138 {
139         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
140         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141         return a->show ? a->show(ls, buf) : 0;
142 }
143
144 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
145                               const char *buf, size_t len)
146 {
147         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
148         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
149         return a->store ? a->store(ls, buf, len) : len;
150 }
151
152 static void lockspace_kobj_release(struct kobject *k)
153 {
154         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
155         kfree(ls);
156 }
157
158 static struct sysfs_ops dlm_attr_ops = {
159         .show  = dlm_attr_show,
160         .store = dlm_attr_store,
161 };
162
163 static struct kobj_type dlm_ktype = {
164         .default_attrs = dlm_attrs,
165         .sysfs_ops     = &dlm_attr_ops,
166         .release       = lockspace_kobj_release,
167 };
168
169 static struct kset *dlm_kset;
170
171 static int do_uevent(struct dlm_ls *ls, int in)
172 {
173         int error;
174
175         if (in)
176                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
177         else
178                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
179
180         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
181
182         /* dlm_controld will see the uevent, do the necessary group management
183            and then write to sysfs to wake us */
184
185         error = wait_event_interruptible(ls->ls_uevent_wait,
186                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
187
188         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
189
190         if (error)
191                 goto out;
192
193         error = ls->ls_uevent_result;
194  out:
195         if (error)
196                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
197                           error, ls->ls_uevent_result);
198         return error;
199 }
200
201
202 int dlm_lockspace_init(void)
203 {
204         ls_count = 0;
205         mutex_init(&ls_lock);
206         INIT_LIST_HEAD(&lslist);
207         spin_lock_init(&lslist_lock);
208
209         dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
210         if (!dlm_kset) {
211                 printk(KERN_WARNING "%s: can not create kset\n", __FUNCTION__);
212                 return -ENOMEM;
213         }
214         return 0;
215 }
216
217 void dlm_lockspace_exit(void)
218 {
219         kset_unregister(dlm_kset);
220 }
221
222 static int dlm_scand(void *data)
223 {
224         struct dlm_ls *ls;
225
226         while (!kthread_should_stop()) {
227                 list_for_each_entry(ls, &lslist, ls_list) {
228                         if (dlm_lock_recovery_try(ls)) {
229                                 dlm_scan_rsbs(ls);
230                                 dlm_scan_timeout(ls);
231                                 dlm_unlock_recovery(ls);
232                         }
233                 }
234                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
235         }
236         return 0;
237 }
238
239 static int dlm_scand_start(void)
240 {
241         struct task_struct *p;
242         int error = 0;
243
244         p = kthread_run(dlm_scand, NULL, "dlm_scand");
245         if (IS_ERR(p))
246                 error = PTR_ERR(p);
247         else
248                 scand_task = p;
249         return error;
250 }
251
252 static void dlm_scand_stop(void)
253 {
254         kthread_stop(scand_task);
255 }
256
257 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
258 {
259         struct dlm_ls *ls;
260
261         spin_lock(&lslist_lock);
262
263         list_for_each_entry(ls, &lslist, ls_list) {
264                 if (ls->ls_namelen == namelen &&
265                     memcmp(ls->ls_name, name, namelen) == 0)
266                         goto out;
267         }
268         ls = NULL;
269  out:
270         spin_unlock(&lslist_lock);
271         return ls;
272 }
273
274 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
275 {
276         struct dlm_ls *ls;
277
278         spin_lock(&lslist_lock);
279
280         list_for_each_entry(ls, &lslist, ls_list) {
281                 if (ls->ls_global_id == id) {
282                         ls->ls_count++;
283                         goto out;
284                 }
285         }
286         ls = NULL;
287  out:
288         spin_unlock(&lslist_lock);
289         return ls;
290 }
291
292 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
293 {
294         struct dlm_ls *ls;
295
296         spin_lock(&lslist_lock);
297         list_for_each_entry(ls, &lslist, ls_list) {
298                 if (ls->ls_local_handle == lockspace) {
299                         ls->ls_count++;
300                         goto out;
301                 }
302         }
303         ls = NULL;
304  out:
305         spin_unlock(&lslist_lock);
306         return ls;
307 }
308
309 struct dlm_ls *dlm_find_lockspace_device(int minor)
310 {
311         struct dlm_ls *ls;
312
313         spin_lock(&lslist_lock);
314         list_for_each_entry(ls, &lslist, ls_list) {
315                 if (ls->ls_device.minor == minor) {
316                         ls->ls_count++;
317                         goto out;
318                 }
319         }
320         ls = NULL;
321  out:
322         spin_unlock(&lslist_lock);
323         return ls;
324 }
325
326 void dlm_put_lockspace(struct dlm_ls *ls)
327 {
328         spin_lock(&lslist_lock);
329         ls->ls_count--;
330         spin_unlock(&lslist_lock);
331 }
332
333 static void remove_lockspace(struct dlm_ls *ls)
334 {
335         for (;;) {
336                 spin_lock(&lslist_lock);
337                 if (ls->ls_count == 0) {
338                         list_del(&ls->ls_list);
339                         spin_unlock(&lslist_lock);
340                         return;
341                 }
342                 spin_unlock(&lslist_lock);
343                 ssleep(1);
344         }
345 }
346
347 static int threads_start(void)
348 {
349         int error;
350
351         /* Thread which process lock requests for all lockspace's */
352         error = dlm_astd_start();
353         if (error) {
354                 log_print("cannot start dlm_astd thread %d", error);
355                 goto fail;
356         }
357
358         error = dlm_scand_start();
359         if (error) {
360                 log_print("cannot start dlm_scand thread %d", error);
361                 goto astd_fail;
362         }
363
364         /* Thread for sending/receiving messages for all lockspace's */
365         error = dlm_lowcomms_start();
366         if (error) {
367                 log_print("cannot start dlm lowcomms %d", error);
368                 goto scand_fail;
369         }
370
371         return 0;
372
373  scand_fail:
374         dlm_scand_stop();
375  astd_fail:
376         dlm_astd_stop();
377  fail:
378         return error;
379 }
380
381 static void threads_stop(void)
382 {
383         dlm_scand_stop();
384         dlm_lowcomms_stop();
385         dlm_astd_stop();
386 }
387
388 static int new_lockspace(char *name, int namelen, void **lockspace,
389                          uint32_t flags, int lvblen)
390 {
391         struct dlm_ls *ls;
392         int i, size, error = -ENOMEM;
393         int do_unreg = 0;
394
395         if (namelen > DLM_LOCKSPACE_LEN)
396                 return -EINVAL;
397
398         if (!lvblen || (lvblen % 8))
399                 return -EINVAL;
400
401         if (!try_module_get(THIS_MODULE))
402                 return -EINVAL;
403
404         ls = dlm_find_lockspace_name(name, namelen);
405         if (ls) {
406                 *lockspace = ls;
407                 module_put(THIS_MODULE);
408                 return -EEXIST;
409         }
410
411         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
412         if (!ls)
413                 goto out;
414         memcpy(ls->ls_name, name, namelen);
415         ls->ls_namelen = namelen;
416         ls->ls_lvblen = lvblen;
417         ls->ls_count = 0;
418         ls->ls_flags = 0;
419
420         if (flags & DLM_LSFL_TIMEWARN)
421                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
422
423         if (flags & DLM_LSFL_FS)
424                 ls->ls_allocation = GFP_NOFS;
425         else
426                 ls->ls_allocation = GFP_KERNEL;
427
428         /* ls_exflags are forced to match among nodes, and we don't
429            need to require all nodes to have TIMEWARN or FS set */
430         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS));
431
432         size = dlm_config.ci_rsbtbl_size;
433         ls->ls_rsbtbl_size = size;
434
435         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
436         if (!ls->ls_rsbtbl)
437                 goto out_lsfree;
438         for (i = 0; i < size; i++) {
439                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
440                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
441                 rwlock_init(&ls->ls_rsbtbl[i].lock);
442         }
443
444         size = dlm_config.ci_lkbtbl_size;
445         ls->ls_lkbtbl_size = size;
446
447         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
448         if (!ls->ls_lkbtbl)
449                 goto out_rsbfree;
450         for (i = 0; i < size; i++) {
451                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
452                 rwlock_init(&ls->ls_lkbtbl[i].lock);
453                 ls->ls_lkbtbl[i].counter = 1;
454         }
455
456         size = dlm_config.ci_dirtbl_size;
457         ls->ls_dirtbl_size = size;
458
459         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
460         if (!ls->ls_dirtbl)
461                 goto out_lkbfree;
462         for (i = 0; i < size; i++) {
463                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
464                 rwlock_init(&ls->ls_dirtbl[i].lock);
465         }
466
467         INIT_LIST_HEAD(&ls->ls_waiters);
468         mutex_init(&ls->ls_waiters_mutex);
469         INIT_LIST_HEAD(&ls->ls_orphans);
470         mutex_init(&ls->ls_orphans_mutex);
471         INIT_LIST_HEAD(&ls->ls_timeout);
472         mutex_init(&ls->ls_timeout_mutex);
473
474         INIT_LIST_HEAD(&ls->ls_nodes);
475         INIT_LIST_HEAD(&ls->ls_nodes_gone);
476         ls->ls_num_nodes = 0;
477         ls->ls_low_nodeid = 0;
478         ls->ls_total_weight = 0;
479         ls->ls_node_array = NULL;
480
481         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
482         ls->ls_stub_rsb.res_ls = ls;
483
484         ls->ls_debug_rsb_dentry = NULL;
485         ls->ls_debug_waiters_dentry = NULL;
486
487         init_waitqueue_head(&ls->ls_uevent_wait);
488         ls->ls_uevent_result = 0;
489         init_completion(&ls->ls_members_done);
490         ls->ls_members_result = -1;
491
492         ls->ls_recoverd_task = NULL;
493         mutex_init(&ls->ls_recoverd_active);
494         spin_lock_init(&ls->ls_recover_lock);
495         spin_lock_init(&ls->ls_rcom_spin);
496         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
497         ls->ls_recover_status = 0;
498         ls->ls_recover_seq = 0;
499         ls->ls_recover_args = NULL;
500         init_rwsem(&ls->ls_in_recovery);
501         init_rwsem(&ls->ls_recv_active);
502         INIT_LIST_HEAD(&ls->ls_requestqueue);
503         mutex_init(&ls->ls_requestqueue_mutex);
504         mutex_init(&ls->ls_clear_proc_locks);
505
506         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
507         if (!ls->ls_recover_buf)
508                 goto out_dirfree;
509
510         INIT_LIST_HEAD(&ls->ls_recover_list);
511         spin_lock_init(&ls->ls_recover_list_lock);
512         ls->ls_recover_list_count = 0;
513         ls->ls_local_handle = ls;
514         init_waitqueue_head(&ls->ls_wait_general);
515         INIT_LIST_HEAD(&ls->ls_root_list);
516         init_rwsem(&ls->ls_root_sem);
517
518         down_write(&ls->ls_in_recovery);
519
520         spin_lock(&lslist_lock);
521         list_add(&ls->ls_list, &lslist);
522         spin_unlock(&lslist_lock);
523
524         /* needs to find ls in lslist */
525         error = dlm_recoverd_start(ls);
526         if (error) {
527                 log_error(ls, "can't start dlm_recoverd %d", error);
528                 goto out_delist;
529         }
530
531         ls->ls_kobj.kset = dlm_kset;
532         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
533                                      "%s", ls->ls_name);
534         if (error)
535                 goto out_stop;
536         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
537
538         /* let kobject handle freeing of ls if there's an error */
539         do_unreg = 1;
540
541         /* This uevent triggers dlm_controld in userspace to add us to the
542            group of nodes that are members of this lockspace (managed by the
543            cluster infrastructure.)  Once it's done that, it tells us who the
544            current lockspace members are (via configfs) and then tells the
545            lockspace to start running (via sysfs) in dlm_ls_start(). */
546
547         error = do_uevent(ls, 1);
548         if (error)
549                 goto out_stop;
550
551         wait_for_completion(&ls->ls_members_done);
552         error = ls->ls_members_result;
553         if (error)
554                 goto out_members;
555
556         dlm_create_debug_file(ls);
557
558         log_debug(ls, "join complete");
559
560         *lockspace = ls;
561         return 0;
562
563  out_members:
564         do_uevent(ls, 0);
565         dlm_clear_members(ls);
566         kfree(ls->ls_node_array);
567  out_stop:
568         dlm_recoverd_stop(ls);
569  out_delist:
570         spin_lock(&lslist_lock);
571         list_del(&ls->ls_list);
572         spin_unlock(&lslist_lock);
573         kfree(ls->ls_recover_buf);
574  out_dirfree:
575         kfree(ls->ls_dirtbl);
576  out_lkbfree:
577         kfree(ls->ls_lkbtbl);
578  out_rsbfree:
579         kfree(ls->ls_rsbtbl);
580  out_lsfree:
581         if (do_unreg)
582                 kobject_put(&ls->ls_kobj);
583         else
584                 kfree(ls);
585  out:
586         module_put(THIS_MODULE);
587         return error;
588 }
589
590 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
591                       uint32_t flags, int lvblen)
592 {
593         int error = 0;
594
595         mutex_lock(&ls_lock);
596         if (!ls_count)
597                 error = threads_start();
598         if (error)
599                 goto out;
600
601         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
602         if (!error)
603                 ls_count++;
604         else if (!ls_count)
605                 threads_stop();
606  out:
607         mutex_unlock(&ls_lock);
608         return error;
609 }
610
611 /* Return 1 if the lockspace still has active remote locks,
612  *        2 if the lockspace still has active local locks.
613  */
614 static int lockspace_busy(struct dlm_ls *ls)
615 {
616         int i, lkb_found = 0;
617         struct dlm_lkb *lkb;
618
619         /* NOTE: We check the lockidtbl here rather than the resource table.
620            This is because there may be LKBs queued as ASTs that have been
621            unlinked from their RSBs and are pending deletion once the AST has
622            been delivered */
623
624         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
625                 read_lock(&ls->ls_lkbtbl[i].lock);
626                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
627                         lkb_found = 1;
628                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
629                                             lkb_idtbl_list) {
630                                 if (!lkb->lkb_nodeid) {
631                                         read_unlock(&ls->ls_lkbtbl[i].lock);
632                                         return 2;
633                                 }
634                         }
635                 }
636                 read_unlock(&ls->ls_lkbtbl[i].lock);
637         }
638         return lkb_found;
639 }
640
641 static int release_lockspace(struct dlm_ls *ls, int force)
642 {
643         struct dlm_lkb *lkb;
644         struct dlm_rsb *rsb;
645         struct list_head *head;
646         int i;
647         int busy = lockspace_busy(ls);
648
649         if (busy > force)
650                 return -EBUSY;
651
652         if (force < 3)
653                 do_uevent(ls, 0);
654
655         dlm_recoverd_stop(ls);
656
657         remove_lockspace(ls);
658
659         dlm_delete_debug_file(ls);
660
661         dlm_astd_suspend();
662
663         kfree(ls->ls_recover_buf);
664
665         /*
666          * Free direntry structs.
667          */
668
669         dlm_dir_clear(ls);
670         kfree(ls->ls_dirtbl);
671
672         /*
673          * Free all lkb's on lkbtbl[] lists.
674          */
675
676         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
677                 head = &ls->ls_lkbtbl[i].list;
678                 while (!list_empty(head)) {
679                         lkb = list_entry(head->next, struct dlm_lkb,
680                                          lkb_idtbl_list);
681
682                         list_del(&lkb->lkb_idtbl_list);
683
684                         dlm_del_ast(lkb);
685
686                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
687                                 free_lvb(lkb->lkb_lvbptr);
688
689                         free_lkb(lkb);
690                 }
691         }
692         dlm_astd_resume();
693
694         kfree(ls->ls_lkbtbl);
695
696         /*
697          * Free all rsb's on rsbtbl[] lists
698          */
699
700         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
701                 head = &ls->ls_rsbtbl[i].list;
702                 while (!list_empty(head)) {
703                         rsb = list_entry(head->next, struct dlm_rsb,
704                                          res_hashchain);
705
706                         list_del(&rsb->res_hashchain);
707                         free_rsb(rsb);
708                 }
709
710                 head = &ls->ls_rsbtbl[i].toss;
711                 while (!list_empty(head)) {
712                         rsb = list_entry(head->next, struct dlm_rsb,
713                                          res_hashchain);
714                         list_del(&rsb->res_hashchain);
715                         free_rsb(rsb);
716                 }
717         }
718
719         kfree(ls->ls_rsbtbl);
720
721         /*
722          * Free structures on any other lists
723          */
724
725         dlm_purge_requestqueue(ls);
726         kfree(ls->ls_recover_args);
727         dlm_clear_free_entries(ls);
728         dlm_clear_members(ls);
729         dlm_clear_members_gone(ls);
730         kfree(ls->ls_node_array);
731         kobject_put(&ls->ls_kobj);
732         /* The ls structure will be freed when the kobject is done with */
733
734         mutex_lock(&ls_lock);
735         ls_count--;
736         if (!ls_count)
737                 threads_stop();
738         mutex_unlock(&ls_lock);
739
740         module_put(THIS_MODULE);
741         return 0;
742 }
743
744 /*
745  * Called when a system has released all its locks and is not going to use the
746  * lockspace any longer.  We free everything we're managing for this lockspace.
747  * Remaining nodes will go through the recovery process as if we'd died.  The
748  * lockspace must continue to function as usual, participating in recoveries,
749  * until this returns.
750  *
751  * Force has 4 possible values:
752  * 0 - don't destroy locksapce if it has any LKBs
753  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
754  * 2 - destroy lockspace regardless of LKBs
755  * 3 - destroy lockspace as part of a forced shutdown
756  */
757
758 int dlm_release_lockspace(void *lockspace, int force)
759 {
760         struct dlm_ls *ls;
761
762         ls = dlm_find_lockspace_local(lockspace);
763         if (!ls)
764                 return -EINVAL;
765         dlm_put_lockspace(ls);
766         return release_lockspace(ls, force);
767 }
768