Merge branch 'for-2.6.31' of git://git.kernel.org/pub/scm/linux/kernel/git/broonie...
[linux-2.6] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n = simple_strtol(buf, NULL, 0);
39
40         ls = dlm_find_lockspace_local(ls->ls_local_handle);
41         if (!ls)
42                 return -EINVAL;
43
44         switch (n) {
45         case 0:
46                 dlm_ls_stop(ls);
47                 break;
48         case 1:
49                 dlm_ls_start(ls);
50                 break;
51         default:
52                 ret = -EINVAL;
53         }
54         dlm_put_lockspace(ls);
55         return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62         wake_up(&ls->ls_uevent_wait);
63         return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74         return len;
75 }
76
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79         uint32_t status = dlm_recover_status(ls);
80         return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87
88 struct dlm_attr {
89         struct attribute attr;
90         ssize_t (*show)(struct dlm_ls *, char *);
91         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93
94 static struct dlm_attr dlm_attr_control = {
95         .attr  = {.name = "control", .mode = S_IWUSR},
96         .store = dlm_control_store
97 };
98
99 static struct dlm_attr dlm_attr_event = {
100         .attr  = {.name = "event_done", .mode = S_IWUSR},
101         .store = dlm_event_store
102 };
103
104 static struct dlm_attr dlm_attr_id = {
105         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106         .show  = dlm_id_show,
107         .store = dlm_id_store
108 };
109
110 static struct dlm_attr dlm_attr_recover_status = {
111         .attr  = {.name = "recover_status", .mode = S_IRUGO},
112         .show  = dlm_recover_status_show
113 };
114
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117         .show  = dlm_recover_nodeid_show
118 };
119
120 static struct attribute *dlm_attrs[] = {
121         &dlm_attr_control.attr,
122         &dlm_attr_event.attr,
123         &dlm_attr_id.attr,
124         &dlm_attr_recover_status.attr,
125         &dlm_attr_recover_nodeid.attr,
126         NULL,
127 };
128
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130                              char *buf)
131 {
132         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134         return a->show ? a->show(ls, buf) : 0;
135 }
136
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138                               const char *buf, size_t len)
139 {
140         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142         return a->store ? a->store(ls, buf, len) : len;
143 }
144
145 static void lockspace_kobj_release(struct kobject *k)
146 {
147         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148         kfree(ls);
149 }
150
151 static struct sysfs_ops dlm_attr_ops = {
152         .show  = dlm_attr_show,
153         .store = dlm_attr_store,
154 };
155
156 static struct kobj_type dlm_ktype = {
157         .default_attrs = dlm_attrs,
158         .sysfs_ops     = &dlm_attr_ops,
159         .release       = lockspace_kobj_release,
160 };
161
162 static struct kset *dlm_kset;
163
164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166         int error;
167
168         if (in)
169                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170         else
171                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172
173         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174
175         /* dlm_controld will see the uevent, do the necessary group management
176            and then write to sysfs to wake us */
177
178         error = wait_event_interruptible(ls->ls_uevent_wait,
179                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180
181         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182
183         if (error)
184                 goto out;
185
186         error = ls->ls_uevent_result;
187  out:
188         if (error)
189                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190                           error, ls->ls_uevent_result);
191         return error;
192 }
193
194
195 int __init dlm_lockspace_init(void)
196 {
197         ls_count = 0;
198         mutex_init(&ls_lock);
199         INIT_LIST_HEAD(&lslist);
200         spin_lock_init(&lslist_lock);
201
202         dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
203         if (!dlm_kset) {
204                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
205                 return -ENOMEM;
206         }
207         return 0;
208 }
209
210 void dlm_lockspace_exit(void)
211 {
212         kset_unregister(dlm_kset);
213 }
214
215 static struct dlm_ls *find_ls_to_scan(void)
216 {
217         struct dlm_ls *ls;
218
219         spin_lock(&lslist_lock);
220         list_for_each_entry(ls, &lslist, ls_list) {
221                 if (time_after_eq(jiffies, ls->ls_scan_time +
222                                             dlm_config.ci_scan_secs * HZ)) {
223                         spin_unlock(&lslist_lock);
224                         return ls;
225                 }
226         }
227         spin_unlock(&lslist_lock);
228         return NULL;
229 }
230
231 static int dlm_scand(void *data)
232 {
233         struct dlm_ls *ls;
234         int timeout_jiffies = dlm_config.ci_scan_secs * HZ;
235
236         while (!kthread_should_stop()) {
237                 ls = find_ls_to_scan();
238                 if (ls) {
239                         if (dlm_lock_recovery_try(ls)) {
240                                 ls->ls_scan_time = jiffies;
241                                 dlm_scan_rsbs(ls);
242                                 dlm_scan_timeout(ls);
243                                 dlm_unlock_recovery(ls);
244                         } else {
245                                 ls->ls_scan_time += HZ;
246                         }
247                 } else {
248                         schedule_timeout_interruptible(timeout_jiffies);
249                 }
250         }
251         return 0;
252 }
253
254 static int dlm_scand_start(void)
255 {
256         struct task_struct *p;
257         int error = 0;
258
259         p = kthread_run(dlm_scand, NULL, "dlm_scand");
260         if (IS_ERR(p))
261                 error = PTR_ERR(p);
262         else
263                 scand_task = p;
264         return error;
265 }
266
267 static void dlm_scand_stop(void)
268 {
269         kthread_stop(scand_task);
270 }
271
272 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
273 {
274         struct dlm_ls *ls;
275
276         spin_lock(&lslist_lock);
277
278         list_for_each_entry(ls, &lslist, ls_list) {
279                 if (ls->ls_global_id == id) {
280                         ls->ls_count++;
281                         goto out;
282                 }
283         }
284         ls = NULL;
285  out:
286         spin_unlock(&lslist_lock);
287         return ls;
288 }
289
290 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
291 {
292         struct dlm_ls *ls;
293
294         spin_lock(&lslist_lock);
295         list_for_each_entry(ls, &lslist, ls_list) {
296                 if (ls->ls_local_handle == lockspace) {
297                         ls->ls_count++;
298                         goto out;
299                 }
300         }
301         ls = NULL;
302  out:
303         spin_unlock(&lslist_lock);
304         return ls;
305 }
306
307 struct dlm_ls *dlm_find_lockspace_device(int minor)
308 {
309         struct dlm_ls *ls;
310
311         spin_lock(&lslist_lock);
312         list_for_each_entry(ls, &lslist, ls_list) {
313                 if (ls->ls_device.minor == minor) {
314                         ls->ls_count++;
315                         goto out;
316                 }
317         }
318         ls = NULL;
319  out:
320         spin_unlock(&lslist_lock);
321         return ls;
322 }
323
324 void dlm_put_lockspace(struct dlm_ls *ls)
325 {
326         spin_lock(&lslist_lock);
327         ls->ls_count--;
328         spin_unlock(&lslist_lock);
329 }
330
331 static void remove_lockspace(struct dlm_ls *ls)
332 {
333         for (;;) {
334                 spin_lock(&lslist_lock);
335                 if (ls->ls_count == 0) {
336                         WARN_ON(ls->ls_create_count != 0);
337                         list_del(&ls->ls_list);
338                         spin_unlock(&lslist_lock);
339                         return;
340                 }
341                 spin_unlock(&lslist_lock);
342                 ssleep(1);
343         }
344 }
345
346 static int threads_start(void)
347 {
348         int error;
349
350         /* Thread which process lock requests for all lockspace's */
351         error = dlm_astd_start();
352         if (error) {
353                 log_print("cannot start dlm_astd thread %d", error);
354                 goto fail;
355         }
356
357         error = dlm_scand_start();
358         if (error) {
359                 log_print("cannot start dlm_scand thread %d", error);
360                 goto astd_fail;
361         }
362
363         /* Thread for sending/receiving messages for all lockspace's */
364         error = dlm_lowcomms_start();
365         if (error) {
366                 log_print("cannot start dlm lowcomms %d", error);
367                 goto scand_fail;
368         }
369
370         return 0;
371
372  scand_fail:
373         dlm_scand_stop();
374  astd_fail:
375         dlm_astd_stop();
376  fail:
377         return error;
378 }
379
380 static void threads_stop(void)
381 {
382         dlm_scand_stop();
383         dlm_lowcomms_stop();
384         dlm_astd_stop();
385 }
386
387 static int new_lockspace(const char *name, int namelen, void **lockspace,
388                          uint32_t flags, int lvblen)
389 {
390         struct dlm_ls *ls;
391         int i, size, error;
392         int do_unreg = 0;
393
394         if (namelen > DLM_LOCKSPACE_LEN)
395                 return -EINVAL;
396
397         if (!lvblen || (lvblen % 8))
398                 return -EINVAL;
399
400         if (!try_module_get(THIS_MODULE))
401                 return -EINVAL;
402
403         if (!dlm_user_daemon_available()) {
404                 module_put(THIS_MODULE);
405                 return -EUNATCH;
406         }
407
408         error = 0;
409
410         spin_lock(&lslist_lock);
411         list_for_each_entry(ls, &lslist, ls_list) {
412                 WARN_ON(ls->ls_create_count <= 0);
413                 if (ls->ls_namelen != namelen)
414                         continue;
415                 if (memcmp(ls->ls_name, name, namelen))
416                         continue;
417                 if (flags & DLM_LSFL_NEWEXCL) {
418                         error = -EEXIST;
419                         break;
420                 }
421                 ls->ls_create_count++;
422                 *lockspace = ls;
423                 error = 1;
424                 break;
425         }
426         spin_unlock(&lslist_lock);
427
428         if (error)
429                 goto out;
430
431         error = -ENOMEM;
432
433         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
434         if (!ls)
435                 goto out;
436         memcpy(ls->ls_name, name, namelen);
437         ls->ls_namelen = namelen;
438         ls->ls_lvblen = lvblen;
439         ls->ls_count = 0;
440         ls->ls_flags = 0;
441         ls->ls_scan_time = jiffies;
442
443         if (flags & DLM_LSFL_TIMEWARN)
444                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
445
446         if (flags & DLM_LSFL_FS)
447                 ls->ls_allocation = GFP_NOFS;
448         else
449                 ls->ls_allocation = GFP_KERNEL;
450
451         /* ls_exflags are forced to match among nodes, and we don't
452            need to require all nodes to have some flags set */
453         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
454                                     DLM_LSFL_NEWEXCL));
455
456         size = dlm_config.ci_rsbtbl_size;
457         ls->ls_rsbtbl_size = size;
458
459         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
460         if (!ls->ls_rsbtbl)
461                 goto out_lsfree;
462         for (i = 0; i < size; i++) {
463                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
464                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
465                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
466         }
467
468         size = dlm_config.ci_lkbtbl_size;
469         ls->ls_lkbtbl_size = size;
470
471         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
472         if (!ls->ls_lkbtbl)
473                 goto out_rsbfree;
474         for (i = 0; i < size; i++) {
475                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
476                 rwlock_init(&ls->ls_lkbtbl[i].lock);
477                 ls->ls_lkbtbl[i].counter = 1;
478         }
479
480         size = dlm_config.ci_dirtbl_size;
481         ls->ls_dirtbl_size = size;
482
483         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
484         if (!ls->ls_dirtbl)
485                 goto out_lkbfree;
486         for (i = 0; i < size; i++) {
487                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
488                 spin_lock_init(&ls->ls_dirtbl[i].lock);
489         }
490
491         INIT_LIST_HEAD(&ls->ls_waiters);
492         mutex_init(&ls->ls_waiters_mutex);
493         INIT_LIST_HEAD(&ls->ls_orphans);
494         mutex_init(&ls->ls_orphans_mutex);
495         INIT_LIST_HEAD(&ls->ls_timeout);
496         mutex_init(&ls->ls_timeout_mutex);
497
498         INIT_LIST_HEAD(&ls->ls_nodes);
499         INIT_LIST_HEAD(&ls->ls_nodes_gone);
500         ls->ls_num_nodes = 0;
501         ls->ls_low_nodeid = 0;
502         ls->ls_total_weight = 0;
503         ls->ls_node_array = NULL;
504
505         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
506         ls->ls_stub_rsb.res_ls = ls;
507
508         ls->ls_debug_rsb_dentry = NULL;
509         ls->ls_debug_waiters_dentry = NULL;
510
511         init_waitqueue_head(&ls->ls_uevent_wait);
512         ls->ls_uevent_result = 0;
513         init_completion(&ls->ls_members_done);
514         ls->ls_members_result = -1;
515
516         ls->ls_recoverd_task = NULL;
517         mutex_init(&ls->ls_recoverd_active);
518         spin_lock_init(&ls->ls_recover_lock);
519         spin_lock_init(&ls->ls_rcom_spin);
520         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
521         ls->ls_recover_status = 0;
522         ls->ls_recover_seq = 0;
523         ls->ls_recover_args = NULL;
524         init_rwsem(&ls->ls_in_recovery);
525         init_rwsem(&ls->ls_recv_active);
526         INIT_LIST_HEAD(&ls->ls_requestqueue);
527         mutex_init(&ls->ls_requestqueue_mutex);
528         mutex_init(&ls->ls_clear_proc_locks);
529
530         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
531         if (!ls->ls_recover_buf)
532                 goto out_dirfree;
533
534         INIT_LIST_HEAD(&ls->ls_recover_list);
535         spin_lock_init(&ls->ls_recover_list_lock);
536         ls->ls_recover_list_count = 0;
537         ls->ls_local_handle = ls;
538         init_waitqueue_head(&ls->ls_wait_general);
539         INIT_LIST_HEAD(&ls->ls_root_list);
540         init_rwsem(&ls->ls_root_sem);
541
542         down_write(&ls->ls_in_recovery);
543
544         spin_lock(&lslist_lock);
545         ls->ls_create_count = 1;
546         list_add(&ls->ls_list, &lslist);
547         spin_unlock(&lslist_lock);
548
549         /* needs to find ls in lslist */
550         error = dlm_recoverd_start(ls);
551         if (error) {
552                 log_error(ls, "can't start dlm_recoverd %d", error);
553                 goto out_delist;
554         }
555
556         ls->ls_kobj.kset = dlm_kset;
557         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
558                                      "%s", ls->ls_name);
559         if (error)
560                 goto out_stop;
561         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
562
563         /* let kobject handle freeing of ls if there's an error */
564         do_unreg = 1;
565
566         /* This uevent triggers dlm_controld in userspace to add us to the
567            group of nodes that are members of this lockspace (managed by the
568            cluster infrastructure.)  Once it's done that, it tells us who the
569            current lockspace members are (via configfs) and then tells the
570            lockspace to start running (via sysfs) in dlm_ls_start(). */
571
572         error = do_uevent(ls, 1);
573         if (error)
574                 goto out_stop;
575
576         wait_for_completion(&ls->ls_members_done);
577         error = ls->ls_members_result;
578         if (error)
579                 goto out_members;
580
581         dlm_create_debug_file(ls);
582
583         log_debug(ls, "join complete");
584         *lockspace = ls;
585         return 0;
586
587  out_members:
588         do_uevent(ls, 0);
589         dlm_clear_members(ls);
590         kfree(ls->ls_node_array);
591  out_stop:
592         dlm_recoverd_stop(ls);
593  out_delist:
594         spin_lock(&lslist_lock);
595         list_del(&ls->ls_list);
596         spin_unlock(&lslist_lock);
597         kfree(ls->ls_recover_buf);
598  out_dirfree:
599         kfree(ls->ls_dirtbl);
600  out_lkbfree:
601         kfree(ls->ls_lkbtbl);
602  out_rsbfree:
603         kfree(ls->ls_rsbtbl);
604  out_lsfree:
605         if (do_unreg)
606                 kobject_put(&ls->ls_kobj);
607         else
608                 kfree(ls);
609  out:
610         module_put(THIS_MODULE);
611         return error;
612 }
613
614 int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
615                       uint32_t flags, int lvblen)
616 {
617         int error = 0;
618
619         mutex_lock(&ls_lock);
620         if (!ls_count)
621                 error = threads_start();
622         if (error)
623                 goto out;
624
625         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
626         if (!error)
627                 ls_count++;
628         if (error > 0)
629                 error = 0;
630         if (!ls_count)
631                 threads_stop();
632  out:
633         mutex_unlock(&ls_lock);
634         return error;
635 }
636
637 /* Return 1 if the lockspace still has active remote locks,
638  *        2 if the lockspace still has active local locks.
639  */
640 static int lockspace_busy(struct dlm_ls *ls)
641 {
642         int i, lkb_found = 0;
643         struct dlm_lkb *lkb;
644
645         /* NOTE: We check the lockidtbl here rather than the resource table.
646            This is because there may be LKBs queued as ASTs that have been
647            unlinked from their RSBs and are pending deletion once the AST has
648            been delivered */
649
650         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
651                 read_lock(&ls->ls_lkbtbl[i].lock);
652                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
653                         lkb_found = 1;
654                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
655                                             lkb_idtbl_list) {
656                                 if (!lkb->lkb_nodeid) {
657                                         read_unlock(&ls->ls_lkbtbl[i].lock);
658                                         return 2;
659                                 }
660                         }
661                 }
662                 read_unlock(&ls->ls_lkbtbl[i].lock);
663         }
664         return lkb_found;
665 }
666
667 static int release_lockspace(struct dlm_ls *ls, int force)
668 {
669         struct dlm_lkb *lkb;
670         struct dlm_rsb *rsb;
671         struct list_head *head;
672         int i, busy, rv;
673
674         busy = lockspace_busy(ls);
675
676         spin_lock(&lslist_lock);
677         if (ls->ls_create_count == 1) {
678                 if (busy > force)
679                         rv = -EBUSY;
680                 else {
681                         /* remove_lockspace takes ls off lslist */
682                         ls->ls_create_count = 0;
683                         rv = 0;
684                 }
685         } else if (ls->ls_create_count > 1) {
686                 rv = --ls->ls_create_count;
687         } else {
688                 rv = -EINVAL;
689         }
690         spin_unlock(&lslist_lock);
691
692         if (rv) {
693                 log_debug(ls, "release_lockspace no remove %d", rv);
694                 return rv;
695         }
696
697         dlm_device_deregister(ls);
698
699         if (force < 3 && dlm_user_daemon_available())
700                 do_uevent(ls, 0);
701
702         dlm_recoverd_stop(ls);
703
704         remove_lockspace(ls);
705
706         dlm_delete_debug_file(ls);
707
708         dlm_astd_suspend();
709
710         kfree(ls->ls_recover_buf);
711
712         /*
713          * Free direntry structs.
714          */
715
716         dlm_dir_clear(ls);
717         kfree(ls->ls_dirtbl);
718
719         /*
720          * Free all lkb's on lkbtbl[] lists.
721          */
722
723         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
724                 head = &ls->ls_lkbtbl[i].list;
725                 while (!list_empty(head)) {
726                         lkb = list_entry(head->next, struct dlm_lkb,
727                                          lkb_idtbl_list);
728
729                         list_del(&lkb->lkb_idtbl_list);
730
731                         dlm_del_ast(lkb);
732
733                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
734                                 dlm_free_lvb(lkb->lkb_lvbptr);
735
736                         dlm_free_lkb(lkb);
737                 }
738         }
739         dlm_astd_resume();
740
741         kfree(ls->ls_lkbtbl);
742
743         /*
744          * Free all rsb's on rsbtbl[] lists
745          */
746
747         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
748                 head = &ls->ls_rsbtbl[i].list;
749                 while (!list_empty(head)) {
750                         rsb = list_entry(head->next, struct dlm_rsb,
751                                          res_hashchain);
752
753                         list_del(&rsb->res_hashchain);
754                         dlm_free_rsb(rsb);
755                 }
756
757                 head = &ls->ls_rsbtbl[i].toss;
758                 while (!list_empty(head)) {
759                         rsb = list_entry(head->next, struct dlm_rsb,
760                                          res_hashchain);
761                         list_del(&rsb->res_hashchain);
762                         dlm_free_rsb(rsb);
763                 }
764         }
765
766         kfree(ls->ls_rsbtbl);
767
768         /*
769          * Free structures on any other lists
770          */
771
772         dlm_purge_requestqueue(ls);
773         kfree(ls->ls_recover_args);
774         dlm_clear_free_entries(ls);
775         dlm_clear_members(ls);
776         dlm_clear_members_gone(ls);
777         kfree(ls->ls_node_array);
778         log_debug(ls, "release_lockspace final free");
779         kobject_put(&ls->ls_kobj);
780         /* The ls structure will be freed when the kobject is done with */
781
782         module_put(THIS_MODULE);
783         return 0;
784 }
785
786 /*
787  * Called when a system has released all its locks and is not going to use the
788  * lockspace any longer.  We free everything we're managing for this lockspace.
789  * Remaining nodes will go through the recovery process as if we'd died.  The
790  * lockspace must continue to function as usual, participating in recoveries,
791  * until this returns.
792  *
793  * Force has 4 possible values:
794  * 0 - don't destroy locksapce if it has any LKBs
795  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
796  * 2 - destroy lockspace regardless of LKBs
797  * 3 - destroy lockspace as part of a forced shutdown
798  */
799
800 int dlm_release_lockspace(void *lockspace, int force)
801 {
802         struct dlm_ls *ls;
803         int error;
804
805         ls = dlm_find_lockspace_local(lockspace);
806         if (!ls)
807                 return -EINVAL;
808         dlm_put_lockspace(ls);
809
810         mutex_lock(&ls_lock);
811         error = release_lockspace(ls, force);
812         if (!error)
813                 ls_count--;
814         if (!ls_count)
815                 threads_stop();
816         mutex_unlock(&ls_lock);
817
818         return error;
819 }
820
821 void dlm_stop_lockspaces(void)
822 {
823         struct dlm_ls *ls;
824
825  restart:
826         spin_lock(&lslist_lock);
827         list_for_each_entry(ls, &lslist, ls_list) {
828                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
829                         continue;
830                 spin_unlock(&lslist_lock);
831                 log_error(ls, "no userland control daemon, stopping lockspace");
832                 dlm_ls_stop(ls);
833                 goto restart;
834         }
835         spin_unlock(&lslist_lock);
836 }
837