Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/roland...
[linux-2.6] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n = simple_strtol(buf, NULL, 0);
39
40         ls = dlm_find_lockspace_local(ls->ls_local_handle);
41         if (!ls)
42                 return -EINVAL;
43
44         switch (n) {
45         case 0:
46                 dlm_ls_stop(ls);
47                 break;
48         case 1:
49                 dlm_ls_start(ls);
50                 break;
51         default:
52                 ret = -EINVAL;
53         }
54         dlm_put_lockspace(ls);
55         return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62         wake_up(&ls->ls_uevent_wait);
63         return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74         return len;
75 }
76
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79         uint32_t status = dlm_recover_status(ls);
80         return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87
88 struct dlm_attr {
89         struct attribute attr;
90         ssize_t (*show)(struct dlm_ls *, char *);
91         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93
94 static struct dlm_attr dlm_attr_control = {
95         .attr  = {.name = "control", .mode = S_IWUSR},
96         .store = dlm_control_store
97 };
98
99 static struct dlm_attr dlm_attr_event = {
100         .attr  = {.name = "event_done", .mode = S_IWUSR},
101         .store = dlm_event_store
102 };
103
104 static struct dlm_attr dlm_attr_id = {
105         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106         .show  = dlm_id_show,
107         .store = dlm_id_store
108 };
109
110 static struct dlm_attr dlm_attr_recover_status = {
111         .attr  = {.name = "recover_status", .mode = S_IRUGO},
112         .show  = dlm_recover_status_show
113 };
114
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117         .show  = dlm_recover_nodeid_show
118 };
119
120 static struct attribute *dlm_attrs[] = {
121         &dlm_attr_control.attr,
122         &dlm_attr_event.attr,
123         &dlm_attr_id.attr,
124         &dlm_attr_recover_status.attr,
125         &dlm_attr_recover_nodeid.attr,
126         NULL,
127 };
128
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130                              char *buf)
131 {
132         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134         return a->show ? a->show(ls, buf) : 0;
135 }
136
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138                               const char *buf, size_t len)
139 {
140         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142         return a->store ? a->store(ls, buf, len) : len;
143 }
144
145 static void lockspace_kobj_release(struct kobject *k)
146 {
147         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148         kfree(ls);
149 }
150
151 static struct sysfs_ops dlm_attr_ops = {
152         .show  = dlm_attr_show,
153         .store = dlm_attr_store,
154 };
155
156 static struct kobj_type dlm_ktype = {
157         .default_attrs = dlm_attrs,
158         .sysfs_ops     = &dlm_attr_ops,
159         .release       = lockspace_kobj_release,
160 };
161
162 static struct kset *dlm_kset;
163
164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166         int error;
167
168         if (in)
169                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170         else
171                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172
173         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174
175         /* dlm_controld will see the uevent, do the necessary group management
176            and then write to sysfs to wake us */
177
178         error = wait_event_interruptible(ls->ls_uevent_wait,
179                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180
181         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182
183         if (error)
184                 goto out;
185
186         error = ls->ls_uevent_result;
187  out:
188         if (error)
189                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190                           error, ls->ls_uevent_result);
191         return error;
192 }
193
194
195 int __init dlm_lockspace_init(void)
196 {
197         ls_count = 0;
198         mutex_init(&ls_lock);
199         INIT_LIST_HEAD(&lslist);
200         spin_lock_init(&lslist_lock);
201
202         dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
203         if (!dlm_kset) {
204                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
205                 return -ENOMEM;
206         }
207         return 0;
208 }
209
210 void dlm_lockspace_exit(void)
211 {
212         kset_unregister(dlm_kset);
213 }
214
215 static struct dlm_ls *find_ls_to_scan(void)
216 {
217         struct dlm_ls *ls;
218
219         spin_lock(&lslist_lock);
220         list_for_each_entry(ls, &lslist, ls_list) {
221                 if (time_after_eq(jiffies, ls->ls_scan_time +
222                                             dlm_config.ci_scan_secs * HZ)) {
223                         spin_unlock(&lslist_lock);
224                         return ls;
225                 }
226         }
227         spin_unlock(&lslist_lock);
228         return NULL;
229 }
230
231 static int dlm_scand(void *data)
232 {
233         struct dlm_ls *ls;
234         int timeout_jiffies = dlm_config.ci_scan_secs * HZ;
235
236         while (!kthread_should_stop()) {
237                 ls = find_ls_to_scan();
238                 if (ls) {
239                         if (dlm_lock_recovery_try(ls)) {
240                                 ls->ls_scan_time = jiffies;
241                                 dlm_scan_rsbs(ls);
242                                 dlm_scan_timeout(ls);
243                                 dlm_unlock_recovery(ls);
244                         } else {
245                                 ls->ls_scan_time += HZ;
246                         }
247                 } else {
248                         schedule_timeout_interruptible(timeout_jiffies);
249                 }
250         }
251         return 0;
252 }
253
254 static int dlm_scand_start(void)
255 {
256         struct task_struct *p;
257         int error = 0;
258
259         p = kthread_run(dlm_scand, NULL, "dlm_scand");
260         if (IS_ERR(p))
261                 error = PTR_ERR(p);
262         else
263                 scand_task = p;
264         return error;
265 }
266
267 static void dlm_scand_stop(void)
268 {
269         kthread_stop(scand_task);
270 }
271
272 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
273 {
274         struct dlm_ls *ls;
275
276         spin_lock(&lslist_lock);
277
278         list_for_each_entry(ls, &lslist, ls_list) {
279                 if (ls->ls_global_id == id) {
280                         ls->ls_count++;
281                         goto out;
282                 }
283         }
284         ls = NULL;
285  out:
286         spin_unlock(&lslist_lock);
287         return ls;
288 }
289
290 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
291 {
292         struct dlm_ls *ls;
293
294         spin_lock(&lslist_lock);
295         list_for_each_entry(ls, &lslist, ls_list) {
296                 if (ls->ls_local_handle == lockspace) {
297                         ls->ls_count++;
298                         goto out;
299                 }
300         }
301         ls = NULL;
302  out:
303         spin_unlock(&lslist_lock);
304         return ls;
305 }
306
307 struct dlm_ls *dlm_find_lockspace_device(int minor)
308 {
309         struct dlm_ls *ls;
310
311         spin_lock(&lslist_lock);
312         list_for_each_entry(ls, &lslist, ls_list) {
313                 if (ls->ls_device.minor == minor) {
314                         ls->ls_count++;
315                         goto out;
316                 }
317         }
318         ls = NULL;
319  out:
320         spin_unlock(&lslist_lock);
321         return ls;
322 }
323
324 void dlm_put_lockspace(struct dlm_ls *ls)
325 {
326         spin_lock(&lslist_lock);
327         ls->ls_count--;
328         spin_unlock(&lslist_lock);
329 }
330
331 static void remove_lockspace(struct dlm_ls *ls)
332 {
333         for (;;) {
334                 spin_lock(&lslist_lock);
335                 if (ls->ls_count == 0) {
336                         WARN_ON(ls->ls_create_count != 0);
337                         list_del(&ls->ls_list);
338                         spin_unlock(&lslist_lock);
339                         return;
340                 }
341                 spin_unlock(&lslist_lock);
342                 ssleep(1);
343         }
344 }
345
346 static int threads_start(void)
347 {
348         int error;
349
350         /* Thread which process lock requests for all lockspace's */
351         error = dlm_astd_start();
352         if (error) {
353                 log_print("cannot start dlm_astd thread %d", error);
354                 goto fail;
355         }
356
357         error = dlm_scand_start();
358         if (error) {
359                 log_print("cannot start dlm_scand thread %d", error);
360                 goto astd_fail;
361         }
362
363         /* Thread for sending/receiving messages for all lockspace's */
364         error = dlm_lowcomms_start();
365         if (error) {
366                 log_print("cannot start dlm lowcomms %d", error);
367                 goto scand_fail;
368         }
369
370         return 0;
371
372  scand_fail:
373         dlm_scand_stop();
374  astd_fail:
375         dlm_astd_stop();
376  fail:
377         return error;
378 }
379
380 static void threads_stop(void)
381 {
382         dlm_scand_stop();
383         dlm_lowcomms_stop();
384         dlm_astd_stop();
385 }
386
387 static int new_lockspace(char *name, int namelen, void **lockspace,
388                          uint32_t flags, int lvblen)
389 {
390         struct dlm_ls *ls;
391         int i, size, error;
392         int do_unreg = 0;
393
394         if (namelen > DLM_LOCKSPACE_LEN)
395                 return -EINVAL;
396
397         if (!lvblen || (lvblen % 8))
398                 return -EINVAL;
399
400         if (!try_module_get(THIS_MODULE))
401                 return -EINVAL;
402
403         if (!dlm_user_daemon_available()) {
404                 module_put(THIS_MODULE);
405                 return -EUNATCH;
406         }
407
408         error = 0;
409
410         spin_lock(&lslist_lock);
411         list_for_each_entry(ls, &lslist, ls_list) {
412                 WARN_ON(ls->ls_create_count <= 0);
413                 if (ls->ls_namelen != namelen)
414                         continue;
415                 if (memcmp(ls->ls_name, name, namelen))
416                         continue;
417                 if (flags & DLM_LSFL_NEWEXCL) {
418                         error = -EEXIST;
419                         break;
420                 }
421                 ls->ls_create_count++;
422                 module_put(THIS_MODULE);
423                 error = 1; /* not an error, return 0 */
424                 break;
425         }
426         spin_unlock(&lslist_lock);
427
428         if (error < 0)
429                 goto out;
430         if (error)
431                 goto ret_zero;
432
433         error = -ENOMEM;
434
435         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
436         if (!ls)
437                 goto out;
438         memcpy(ls->ls_name, name, namelen);
439         ls->ls_namelen = namelen;
440         ls->ls_lvblen = lvblen;
441         ls->ls_count = 0;
442         ls->ls_flags = 0;
443         ls->ls_scan_time = jiffies;
444
445         if (flags & DLM_LSFL_TIMEWARN)
446                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
447
448         if (flags & DLM_LSFL_FS)
449                 ls->ls_allocation = GFP_NOFS;
450         else
451                 ls->ls_allocation = GFP_KERNEL;
452
453         /* ls_exflags are forced to match among nodes, and we don't
454            need to require all nodes to have some flags set */
455         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
456                                     DLM_LSFL_NEWEXCL));
457
458         size = dlm_config.ci_rsbtbl_size;
459         ls->ls_rsbtbl_size = size;
460
461         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
462         if (!ls->ls_rsbtbl)
463                 goto out_lsfree;
464         for (i = 0; i < size; i++) {
465                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
466                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
467                 rwlock_init(&ls->ls_rsbtbl[i].lock);
468         }
469
470         size = dlm_config.ci_lkbtbl_size;
471         ls->ls_lkbtbl_size = size;
472
473         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
474         if (!ls->ls_lkbtbl)
475                 goto out_rsbfree;
476         for (i = 0; i < size; i++) {
477                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
478                 rwlock_init(&ls->ls_lkbtbl[i].lock);
479                 ls->ls_lkbtbl[i].counter = 1;
480         }
481
482         size = dlm_config.ci_dirtbl_size;
483         ls->ls_dirtbl_size = size;
484
485         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
486         if (!ls->ls_dirtbl)
487                 goto out_lkbfree;
488         for (i = 0; i < size; i++) {
489                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
490                 rwlock_init(&ls->ls_dirtbl[i].lock);
491         }
492
493         INIT_LIST_HEAD(&ls->ls_waiters);
494         mutex_init(&ls->ls_waiters_mutex);
495         INIT_LIST_HEAD(&ls->ls_orphans);
496         mutex_init(&ls->ls_orphans_mutex);
497         INIT_LIST_HEAD(&ls->ls_timeout);
498         mutex_init(&ls->ls_timeout_mutex);
499
500         INIT_LIST_HEAD(&ls->ls_nodes);
501         INIT_LIST_HEAD(&ls->ls_nodes_gone);
502         ls->ls_num_nodes = 0;
503         ls->ls_low_nodeid = 0;
504         ls->ls_total_weight = 0;
505         ls->ls_node_array = NULL;
506
507         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
508         ls->ls_stub_rsb.res_ls = ls;
509
510         ls->ls_debug_rsb_dentry = NULL;
511         ls->ls_debug_waiters_dentry = NULL;
512
513         init_waitqueue_head(&ls->ls_uevent_wait);
514         ls->ls_uevent_result = 0;
515         init_completion(&ls->ls_members_done);
516         ls->ls_members_result = -1;
517
518         ls->ls_recoverd_task = NULL;
519         mutex_init(&ls->ls_recoverd_active);
520         spin_lock_init(&ls->ls_recover_lock);
521         spin_lock_init(&ls->ls_rcom_spin);
522         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
523         ls->ls_recover_status = 0;
524         ls->ls_recover_seq = 0;
525         ls->ls_recover_args = NULL;
526         init_rwsem(&ls->ls_in_recovery);
527         init_rwsem(&ls->ls_recv_active);
528         INIT_LIST_HEAD(&ls->ls_requestqueue);
529         mutex_init(&ls->ls_requestqueue_mutex);
530         mutex_init(&ls->ls_clear_proc_locks);
531
532         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
533         if (!ls->ls_recover_buf)
534                 goto out_dirfree;
535
536         INIT_LIST_HEAD(&ls->ls_recover_list);
537         spin_lock_init(&ls->ls_recover_list_lock);
538         ls->ls_recover_list_count = 0;
539         ls->ls_local_handle = ls;
540         init_waitqueue_head(&ls->ls_wait_general);
541         INIT_LIST_HEAD(&ls->ls_root_list);
542         init_rwsem(&ls->ls_root_sem);
543
544         down_write(&ls->ls_in_recovery);
545
546         spin_lock(&lslist_lock);
547         ls->ls_create_count = 1;
548         list_add(&ls->ls_list, &lslist);
549         spin_unlock(&lslist_lock);
550
551         /* needs to find ls in lslist */
552         error = dlm_recoverd_start(ls);
553         if (error) {
554                 log_error(ls, "can't start dlm_recoverd %d", error);
555                 goto out_delist;
556         }
557
558         ls->ls_kobj.kset = dlm_kset;
559         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
560                                      "%s", ls->ls_name);
561         if (error)
562                 goto out_stop;
563         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
564
565         /* let kobject handle freeing of ls if there's an error */
566         do_unreg = 1;
567
568         /* This uevent triggers dlm_controld in userspace to add us to the
569            group of nodes that are members of this lockspace (managed by the
570            cluster infrastructure.)  Once it's done that, it tells us who the
571            current lockspace members are (via configfs) and then tells the
572            lockspace to start running (via sysfs) in dlm_ls_start(). */
573
574         error = do_uevent(ls, 1);
575         if (error)
576                 goto out_stop;
577
578         wait_for_completion(&ls->ls_members_done);
579         error = ls->ls_members_result;
580         if (error)
581                 goto out_members;
582
583         dlm_create_debug_file(ls);
584
585         log_debug(ls, "join complete");
586  ret_zero:
587         *lockspace = ls;
588         return 0;
589
590  out_members:
591         do_uevent(ls, 0);
592         dlm_clear_members(ls);
593         kfree(ls->ls_node_array);
594  out_stop:
595         dlm_recoverd_stop(ls);
596  out_delist:
597         spin_lock(&lslist_lock);
598         list_del(&ls->ls_list);
599         spin_unlock(&lslist_lock);
600         kfree(ls->ls_recover_buf);
601  out_dirfree:
602         kfree(ls->ls_dirtbl);
603  out_lkbfree:
604         kfree(ls->ls_lkbtbl);
605  out_rsbfree:
606         kfree(ls->ls_rsbtbl);
607  out_lsfree:
608         if (do_unreg)
609                 kobject_put(&ls->ls_kobj);
610         else
611                 kfree(ls);
612  out:
613         module_put(THIS_MODULE);
614         return error;
615 }
616
617 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
618                       uint32_t flags, int lvblen)
619 {
620         int error = 0;
621
622         mutex_lock(&ls_lock);
623         if (!ls_count)
624                 error = threads_start();
625         if (error)
626                 goto out;
627
628         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
629         if (!error)
630                 ls_count++;
631         else if (!ls_count)
632                 threads_stop();
633  out:
634         mutex_unlock(&ls_lock);
635         return error;
636 }
637
638 /* Return 1 if the lockspace still has active remote locks,
639  *        2 if the lockspace still has active local locks.
640  */
641 static int lockspace_busy(struct dlm_ls *ls)
642 {
643         int i, lkb_found = 0;
644         struct dlm_lkb *lkb;
645
646         /* NOTE: We check the lockidtbl here rather than the resource table.
647            This is because there may be LKBs queued as ASTs that have been
648            unlinked from their RSBs and are pending deletion once the AST has
649            been delivered */
650
651         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
652                 read_lock(&ls->ls_lkbtbl[i].lock);
653                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
654                         lkb_found = 1;
655                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
656                                             lkb_idtbl_list) {
657                                 if (!lkb->lkb_nodeid) {
658                                         read_unlock(&ls->ls_lkbtbl[i].lock);
659                                         return 2;
660                                 }
661                         }
662                 }
663                 read_unlock(&ls->ls_lkbtbl[i].lock);
664         }
665         return lkb_found;
666 }
667
668 static int release_lockspace(struct dlm_ls *ls, int force)
669 {
670         struct dlm_lkb *lkb;
671         struct dlm_rsb *rsb;
672         struct list_head *head;
673         int i, busy, rv;
674
675         busy = lockspace_busy(ls);
676
677         spin_lock(&lslist_lock);
678         if (ls->ls_create_count == 1) {
679                 if (busy > force)
680                         rv = -EBUSY;
681                 else {
682                         /* remove_lockspace takes ls off lslist */
683                         ls->ls_create_count = 0;
684                         rv = 0;
685                 }
686         } else if (ls->ls_create_count > 1) {
687                 rv = --ls->ls_create_count;
688         } else {
689                 rv = -EINVAL;
690         }
691         spin_unlock(&lslist_lock);
692
693         if (rv) {
694                 log_debug(ls, "release_lockspace no remove %d", rv);
695                 return rv;
696         }
697
698         dlm_device_deregister(ls);
699
700         if (force < 3 && dlm_user_daemon_available())
701                 do_uevent(ls, 0);
702
703         dlm_recoverd_stop(ls);
704
705         remove_lockspace(ls);
706
707         dlm_delete_debug_file(ls);
708
709         dlm_astd_suspend();
710
711         kfree(ls->ls_recover_buf);
712
713         /*
714          * Free direntry structs.
715          */
716
717         dlm_dir_clear(ls);
718         kfree(ls->ls_dirtbl);
719
720         /*
721          * Free all lkb's on lkbtbl[] lists.
722          */
723
724         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
725                 head = &ls->ls_lkbtbl[i].list;
726                 while (!list_empty(head)) {
727                         lkb = list_entry(head->next, struct dlm_lkb,
728                                          lkb_idtbl_list);
729
730                         list_del(&lkb->lkb_idtbl_list);
731
732                         dlm_del_ast(lkb);
733
734                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
735                                 dlm_free_lvb(lkb->lkb_lvbptr);
736
737                         dlm_free_lkb(lkb);
738                 }
739         }
740         dlm_astd_resume();
741
742         kfree(ls->ls_lkbtbl);
743
744         /*
745          * Free all rsb's on rsbtbl[] lists
746          */
747
748         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
749                 head = &ls->ls_rsbtbl[i].list;
750                 while (!list_empty(head)) {
751                         rsb = list_entry(head->next, struct dlm_rsb,
752                                          res_hashchain);
753
754                         list_del(&rsb->res_hashchain);
755                         dlm_free_rsb(rsb);
756                 }
757
758                 head = &ls->ls_rsbtbl[i].toss;
759                 while (!list_empty(head)) {
760                         rsb = list_entry(head->next, struct dlm_rsb,
761                                          res_hashchain);
762                         list_del(&rsb->res_hashchain);
763                         dlm_free_rsb(rsb);
764                 }
765         }
766
767         kfree(ls->ls_rsbtbl);
768
769         /*
770          * Free structures on any other lists
771          */
772
773         dlm_purge_requestqueue(ls);
774         kfree(ls->ls_recover_args);
775         dlm_clear_free_entries(ls);
776         dlm_clear_members(ls);
777         dlm_clear_members_gone(ls);
778         kfree(ls->ls_node_array);
779         log_debug(ls, "release_lockspace final free");
780         kobject_put(&ls->ls_kobj);
781         /* The ls structure will be freed when the kobject is done with */
782
783         module_put(THIS_MODULE);
784         return 0;
785 }
786
787 /*
788  * Called when a system has released all its locks and is not going to use the
789  * lockspace any longer.  We free everything we're managing for this lockspace.
790  * Remaining nodes will go through the recovery process as if we'd died.  The
791  * lockspace must continue to function as usual, participating in recoveries,
792  * until this returns.
793  *
794  * Force has 4 possible values:
795  * 0 - don't destroy locksapce if it has any LKBs
796  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
797  * 2 - destroy lockspace regardless of LKBs
798  * 3 - destroy lockspace as part of a forced shutdown
799  */
800
801 int dlm_release_lockspace(void *lockspace, int force)
802 {
803         struct dlm_ls *ls;
804         int error;
805
806         ls = dlm_find_lockspace_local(lockspace);
807         if (!ls)
808                 return -EINVAL;
809         dlm_put_lockspace(ls);
810
811         mutex_lock(&ls_lock);
812         error = release_lockspace(ls, force);
813         if (!error)
814                 ls_count--;
815         else if (!ls_count)
816                 threads_stop();
817         mutex_unlock(&ls_lock);
818
819         return error;
820 }
821
822 void dlm_stop_lockspaces(void)
823 {
824         struct dlm_ls *ls;
825
826  restart:
827         spin_lock(&lslist_lock);
828         list_for_each_entry(ls, &lslist, ls_list) {
829                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
830                         continue;
831                 spin_unlock(&lslist_lock);
832                 log_error(ls, "no userland control daemon, stopping lockspace");
833                 dlm_ls_stop(ls);
834                 goto restart;
835         }
836         spin_unlock(&lslist_lock);
837 }
838