Merge branch 'upstream-linus' of master.kernel.org:/pub/scm/linux/kernel/git/jgarzik...
[linux-2.6] / fs / ocfs2 / dlm / dlmmaster.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmmod.c
5  *
6  * standalone DLM module
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26
27
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/utsname.h>
34 #include <linux/init.h>
35 #include <linux/sysctl.h>
36 #include <linux/random.h>
37 #include <linux/blkdev.h>
38 #include <linux/socket.h>
39 #include <linux/inet.h>
40 #include <linux/spinlock.h>
41 #include <linux/delay.h>
42
43
44 #include "cluster/heartbeat.h"
45 #include "cluster/nodemanager.h"
46 #include "cluster/tcp.h"
47
48 #include "dlmapi.h"
49 #include "dlmcommon.h"
50 #include "dlmdebug.h"
51 #include "dlmdomain.h"
52
53 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
54 #include "cluster/masklog.h"
55
56 enum dlm_mle_type {
57         DLM_MLE_BLOCK,
58         DLM_MLE_MASTER,
59         DLM_MLE_MIGRATION
60 };
61
62 struct dlm_lock_name
63 {
64         u8 len;
65         u8 name[DLM_LOCKID_NAME_MAX];
66 };
67
68 struct dlm_master_list_entry
69 {
70         struct list_head list;
71         struct list_head hb_events;
72         struct dlm_ctxt *dlm;
73         spinlock_t spinlock;
74         wait_queue_head_t wq;
75         atomic_t woken;
76         struct kref mle_refs;
77         unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
78         unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
79         unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
80         unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
81         u8 master;
82         u8 new_master;
83         enum dlm_mle_type type;
84         struct o2hb_callback_func mle_hb_up;
85         struct o2hb_callback_func mle_hb_down;
86         union {
87                 struct dlm_lock_resource *res;
88                 struct dlm_lock_name name;
89         } u;
90 };
91
92 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
93                               struct dlm_master_list_entry *mle,
94                               struct o2nm_node *node,
95                               int idx);
96 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
97                             struct dlm_master_list_entry *mle,
98                             struct o2nm_node *node,
99                             int idx);
100
101 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
102 static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
103                                 unsigned int namelen, void *nodemap,
104                                 u32 flags);
105
106 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
107                                 struct dlm_master_list_entry *mle,
108                                 const char *name,
109                                 unsigned int namelen)
110 {
111         struct dlm_lock_resource *res;
112
113         if (dlm != mle->dlm)
114                 return 0;
115
116         if (mle->type == DLM_MLE_BLOCK ||
117             mle->type == DLM_MLE_MIGRATION) {
118                 if (namelen != mle->u.name.len ||
119                     memcmp(name, mle->u.name.name, namelen)!=0)
120                         return 0;
121         } else {
122                 res = mle->u.res;
123                 if (namelen != res->lockname.len ||
124                     memcmp(res->lockname.name, name, namelen) != 0)
125                         return 0;
126         }
127         return 1;
128 }
129
130 #if 0
131 /* Code here is included but defined out as it aids debugging */
132
133 void dlm_print_one_mle(struct dlm_master_list_entry *mle)
134 {
135         int i = 0, refs;
136         char *type;
137         char attached;
138         u8 master;
139         unsigned int namelen;
140         const char *name;
141         struct kref *k;
142
143         k = &mle->mle_refs;
144         if (mle->type == DLM_MLE_BLOCK)
145                 type = "BLK";
146         else if (mle->type == DLM_MLE_MASTER)
147                 type = "MAS";
148         else
149                 type = "MIG";
150         refs = atomic_read(&k->refcount);
151         master = mle->master;
152         attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
153
154         if (mle->type != DLM_MLE_MASTER) {
155                 namelen = mle->u.name.len;
156                 name = mle->u.name.name;
157         } else {
158                 namelen = mle->u.res->lockname.len;
159                 name = mle->u.res->lockname.name;
160         }
161
162         mlog(ML_NOTICE, "  #%3d: %3s  %3d  %3u   %3u %c    (%d)%.*s\n",
163                   i, type, refs, master, mle->new_master, attached,
164                   namelen, namelen, name);
165 }
166
167 static void dlm_dump_mles(struct dlm_ctxt *dlm)
168 {
169         struct dlm_master_list_entry *mle;
170         struct list_head *iter;
171         
172         mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
173         mlog(ML_NOTICE, "  ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
174         spin_lock(&dlm->master_lock);
175         list_for_each(iter, &dlm->master_list) {
176                 mle = list_entry(iter, struct dlm_master_list_entry, list);
177                 dlm_print_one_mle(mle);
178         }
179         spin_unlock(&dlm->master_lock);
180 }
181
182 int dlm_dump_all_mles(const char __user *data, unsigned int len)
183 {
184         struct list_head *iter;
185         struct dlm_ctxt *dlm;
186
187         spin_lock(&dlm_domain_lock);
188         list_for_each(iter, &dlm_domains) {
189                 dlm = list_entry (iter, struct dlm_ctxt, list);
190                 mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name);
191                 dlm_dump_mles(dlm);
192         }
193         spin_unlock(&dlm_domain_lock);
194         return len;
195 }
196 EXPORT_SYMBOL_GPL(dlm_dump_all_mles);
197
198 #endif  /*  0  */
199
200
201 static kmem_cache_t *dlm_mle_cache = NULL;
202
203
204 static void dlm_mle_release(struct kref *kref);
205 static void dlm_init_mle(struct dlm_master_list_entry *mle,
206                         enum dlm_mle_type type,
207                         struct dlm_ctxt *dlm,
208                         struct dlm_lock_resource *res,
209                         const char *name,
210                         unsigned int namelen);
211 static void dlm_put_mle(struct dlm_master_list_entry *mle);
212 static void __dlm_put_mle(struct dlm_master_list_entry *mle);
213 static int dlm_find_mle(struct dlm_ctxt *dlm,
214                         struct dlm_master_list_entry **mle,
215                         char *name, unsigned int namelen);
216
217 static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to);
218
219
220 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
221                                      struct dlm_lock_resource *res,
222                                      struct dlm_master_list_entry *mle,
223                                      int *blocked);
224 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
225                                     struct dlm_lock_resource *res,
226                                     struct dlm_master_list_entry *mle,
227                                     int blocked);
228 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
229                                  struct dlm_lock_resource *res,
230                                  struct dlm_master_list_entry *mle,
231                                  struct dlm_master_list_entry **oldmle,
232                                  const char *name, unsigned int namelen,
233                                  u8 new_master, u8 master);
234
235 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
236                                     struct dlm_lock_resource *res);
237 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
238                                       struct dlm_lock_resource *res);
239 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
240                                        struct dlm_lock_resource *res,
241                                        u8 target);
242 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
243                                        struct dlm_lock_resource *res);
244
245
246 int dlm_is_host_down(int errno)
247 {
248         switch (errno) {
249                 case -EBADF:
250                 case -ECONNREFUSED:
251                 case -ENOTCONN:
252                 case -ECONNRESET:
253                 case -EPIPE:
254                 case -EHOSTDOWN:
255                 case -EHOSTUNREACH:
256                 case -ETIMEDOUT:
257                 case -ECONNABORTED:
258                 case -ENETDOWN:
259                 case -ENETUNREACH:
260                 case -ENETRESET:
261                 case -ESHUTDOWN:
262                 case -ENOPROTOOPT:
263                 case -EINVAL:   /* if returned from our tcp code,
264                                    this means there is no socket */
265                         return 1;
266         }
267         return 0;
268 }
269
270
271 /*
272  * MASTER LIST FUNCTIONS
273  */
274
275
276 /*
277  * regarding master list entries and heartbeat callbacks:
278  *
279  * in order to avoid sleeping and allocation that occurs in
280  * heartbeat, master list entries are simply attached to the
281  * dlm's established heartbeat callbacks.  the mle is attached
282  * when it is created, and since the dlm->spinlock is held at
283  * that time, any heartbeat event will be properly discovered
284  * by the mle.  the mle needs to be detached from the
285  * dlm->mle_hb_events list as soon as heartbeat events are no
286  * longer useful to the mle, and before the mle is freed.
287  *
288  * as a general rule, heartbeat events are no longer needed by
289  * the mle once an "answer" regarding the lock master has been
290  * received.
291  */
292 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
293                                               struct dlm_master_list_entry *mle)
294 {
295         assert_spin_locked(&dlm->spinlock);
296
297         list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
298 }
299
300
301 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
302                                               struct dlm_master_list_entry *mle)
303 {
304         if (!list_empty(&mle->hb_events))
305                 list_del_init(&mle->hb_events);
306 }
307
308
309 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
310                                             struct dlm_master_list_entry *mle)
311 {
312         spin_lock(&dlm->spinlock);
313         __dlm_mle_detach_hb_events(dlm, mle);
314         spin_unlock(&dlm->spinlock);
315 }
316
317 /* remove from list and free */
318 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
319 {
320         struct dlm_ctxt *dlm;
321         dlm = mle->dlm;
322
323         assert_spin_locked(&dlm->spinlock);
324         assert_spin_locked(&dlm->master_lock);
325         BUG_ON(!atomic_read(&mle->mle_refs.refcount));
326
327         kref_put(&mle->mle_refs, dlm_mle_release);
328 }
329
330
331 /* must not have any spinlocks coming in */
332 static void dlm_put_mle(struct dlm_master_list_entry *mle)
333 {
334         struct dlm_ctxt *dlm;
335         dlm = mle->dlm;
336
337         spin_lock(&dlm->spinlock);
338         spin_lock(&dlm->master_lock);
339         __dlm_put_mle(mle);
340         spin_unlock(&dlm->master_lock);
341         spin_unlock(&dlm->spinlock);
342 }
343
344 static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
345 {
346         kref_get(&mle->mle_refs);
347 }
348
349 static void dlm_init_mle(struct dlm_master_list_entry *mle,
350                         enum dlm_mle_type type,
351                         struct dlm_ctxt *dlm,
352                         struct dlm_lock_resource *res,
353                         const char *name,
354                         unsigned int namelen)
355 {
356         assert_spin_locked(&dlm->spinlock);
357
358         mle->dlm = dlm;
359         mle->type = type;
360         INIT_LIST_HEAD(&mle->list);
361         INIT_LIST_HEAD(&mle->hb_events);
362         memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
363         spin_lock_init(&mle->spinlock);
364         init_waitqueue_head(&mle->wq);
365         atomic_set(&mle->woken, 0);
366         kref_init(&mle->mle_refs);
367         memset(mle->response_map, 0, sizeof(mle->response_map));
368         mle->master = O2NM_MAX_NODES;
369         mle->new_master = O2NM_MAX_NODES;
370
371         if (mle->type == DLM_MLE_MASTER) {
372                 BUG_ON(!res);
373                 mle->u.res = res;
374         } else if (mle->type == DLM_MLE_BLOCK) {
375                 BUG_ON(!name);
376                 memcpy(mle->u.name.name, name, namelen);
377                 mle->u.name.len = namelen;
378         } else /* DLM_MLE_MIGRATION */ {
379                 BUG_ON(!name);
380                 memcpy(mle->u.name.name, name, namelen);
381                 mle->u.name.len = namelen;
382         }
383
384         /* copy off the node_map and register hb callbacks on our copy */
385         memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
386         memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
387         clear_bit(dlm->node_num, mle->vote_map);
388         clear_bit(dlm->node_num, mle->node_map);
389
390         /* attach the mle to the domain node up/down events */
391         __dlm_mle_attach_hb_events(dlm, mle);
392 }
393
394
395 /* returns 1 if found, 0 if not */
396 static int dlm_find_mle(struct dlm_ctxt *dlm,
397                         struct dlm_master_list_entry **mle,
398                         char *name, unsigned int namelen)
399 {
400         struct dlm_master_list_entry *tmpmle;
401         struct list_head *iter;
402
403         assert_spin_locked(&dlm->master_lock);
404
405         list_for_each(iter, &dlm->master_list) {
406                 tmpmle = list_entry(iter, struct dlm_master_list_entry, list);
407                 if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
408                         continue;
409                 dlm_get_mle(tmpmle);
410                 *mle = tmpmle;
411                 return 1;
412         }
413         return 0;
414 }
415
416 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
417 {
418         struct dlm_master_list_entry *mle;
419         struct list_head *iter;
420
421         assert_spin_locked(&dlm->spinlock);
422         
423         list_for_each(iter, &dlm->mle_hb_events) {
424                 mle = list_entry(iter, struct dlm_master_list_entry, 
425                                  hb_events);
426                 if (node_up)
427                         dlm_mle_node_up(dlm, mle, NULL, idx);
428                 else
429                         dlm_mle_node_down(dlm, mle, NULL, idx);
430         }
431 }
432
433 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
434                               struct dlm_master_list_entry *mle,
435                               struct o2nm_node *node, int idx)
436 {
437         spin_lock(&mle->spinlock);
438
439         if (!test_bit(idx, mle->node_map))
440                 mlog(0, "node %u already removed from nodemap!\n", idx);
441         else
442                 clear_bit(idx, mle->node_map);
443
444         spin_unlock(&mle->spinlock);
445 }
446
447 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
448                             struct dlm_master_list_entry *mle,
449                             struct o2nm_node *node, int idx)
450 {
451         spin_lock(&mle->spinlock);
452
453         if (test_bit(idx, mle->node_map))
454                 mlog(0, "node %u already in node map!\n", idx);
455         else
456                 set_bit(idx, mle->node_map);
457
458         spin_unlock(&mle->spinlock);
459 }
460
461
462 int dlm_init_mle_cache(void)
463 {
464         dlm_mle_cache = kmem_cache_create("dlm_mle_cache",
465                                           sizeof(struct dlm_master_list_entry),
466                                           0, SLAB_HWCACHE_ALIGN,
467                                           NULL, NULL);
468         if (dlm_mle_cache == NULL)
469                 return -ENOMEM;
470         return 0;
471 }
472
473 void dlm_destroy_mle_cache(void)
474 {
475         if (dlm_mle_cache)
476                 kmem_cache_destroy(dlm_mle_cache);
477 }
478
479 static void dlm_mle_release(struct kref *kref)
480 {
481         struct dlm_master_list_entry *mle;
482         struct dlm_ctxt *dlm;
483
484         mlog_entry_void();
485
486         mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
487         dlm = mle->dlm;
488
489         if (mle->type != DLM_MLE_MASTER) {
490                 mlog(0, "calling mle_release for %.*s, type %d\n",
491                      mle->u.name.len, mle->u.name.name, mle->type);
492         } else {
493                 mlog(0, "calling mle_release for %.*s, type %d\n",
494                      mle->u.res->lockname.len,
495                      mle->u.res->lockname.name, mle->type);
496         }
497         assert_spin_locked(&dlm->spinlock);
498         assert_spin_locked(&dlm->master_lock);
499
500         /* remove from list if not already */
501         if (!list_empty(&mle->list))
502                 list_del_init(&mle->list);
503
504         /* detach the mle from the domain node up/down events */
505         __dlm_mle_detach_hb_events(dlm, mle);
506
507         /* NOTE: kfree under spinlock here.
508          * if this is bad, we can move this to a freelist. */
509         kmem_cache_free(dlm_mle_cache, mle);
510 }
511
512
513 /*
514  * LOCK RESOURCE FUNCTIONS
515  */
516
517 static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
518                                   struct dlm_lock_resource *res,
519                                   u8 owner)
520 {
521         assert_spin_locked(&res->spinlock);
522
523         mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner);
524
525         if (owner == dlm->node_num)
526                 atomic_inc(&dlm->local_resources);
527         else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN)
528                 atomic_inc(&dlm->unknown_resources);
529         else
530                 atomic_inc(&dlm->remote_resources);
531
532         res->owner = owner;
533 }
534
535 void dlm_change_lockres_owner(struct dlm_ctxt *dlm,
536                               struct dlm_lock_resource *res, u8 owner)
537 {
538         assert_spin_locked(&res->spinlock);
539
540         if (owner == res->owner)
541                 return;
542
543         if (res->owner == dlm->node_num)
544                 atomic_dec(&dlm->local_resources);
545         else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN)
546                 atomic_dec(&dlm->unknown_resources);
547         else
548                 atomic_dec(&dlm->remote_resources);
549
550         dlm_set_lockres_owner(dlm, res, owner);
551 }
552
553
554 static void dlm_lockres_release(struct kref *kref)
555 {
556         struct dlm_lock_resource *res;
557
558         res = container_of(kref, struct dlm_lock_resource, refs);
559
560         /* This should not happen -- all lockres' have a name
561          * associated with them at init time. */
562         BUG_ON(!res->lockname.name);
563
564         mlog(0, "destroying lockres %.*s\n", res->lockname.len,
565              res->lockname.name);
566
567         /* By the time we're ready to blow this guy away, we shouldn't
568          * be on any lists. */
569         BUG_ON(!hlist_unhashed(&res->hash_node));
570         BUG_ON(!list_empty(&res->granted));
571         BUG_ON(!list_empty(&res->converting));
572         BUG_ON(!list_empty(&res->blocked));
573         BUG_ON(!list_empty(&res->dirty));
574         BUG_ON(!list_empty(&res->recovering));
575         BUG_ON(!list_empty(&res->purge));
576
577         kfree(res->lockname.name);
578
579         kfree(res);
580 }
581
582 void dlm_lockres_get(struct dlm_lock_resource *res)
583 {
584         kref_get(&res->refs);
585 }
586
587 void dlm_lockres_put(struct dlm_lock_resource *res)
588 {
589         kref_put(&res->refs, dlm_lockres_release);
590 }
591
592 static void dlm_init_lockres(struct dlm_ctxt *dlm,
593                              struct dlm_lock_resource *res,
594                              const char *name, unsigned int namelen)
595 {
596         char *qname;
597
598         /* If we memset here, we lose our reference to the kmalloc'd
599          * res->lockname.name, so be sure to init every field
600          * correctly! */
601
602         qname = (char *) res->lockname.name;
603         memcpy(qname, name, namelen);
604
605         res->lockname.len = namelen;
606         res->lockname.hash = full_name_hash(name, namelen);
607
608         init_waitqueue_head(&res->wq);
609         spin_lock_init(&res->spinlock);
610         INIT_HLIST_NODE(&res->hash_node);
611         INIT_LIST_HEAD(&res->granted);
612         INIT_LIST_HEAD(&res->converting);
613         INIT_LIST_HEAD(&res->blocked);
614         INIT_LIST_HEAD(&res->dirty);
615         INIT_LIST_HEAD(&res->recovering);
616         INIT_LIST_HEAD(&res->purge);
617         atomic_set(&res->asts_reserved, 0);
618         res->migration_pending = 0;
619
620         kref_init(&res->refs);
621
622         /* just for consistency */
623         spin_lock(&res->spinlock);
624         dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
625         spin_unlock(&res->spinlock);
626
627         res->state = DLM_LOCK_RES_IN_PROGRESS;
628
629         res->last_used = 0;
630
631         memset(res->lvb, 0, DLM_LVB_LEN);
632 }
633
634 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
635                                    const char *name,
636                                    unsigned int namelen)
637 {
638         struct dlm_lock_resource *res;
639
640         res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
641         if (!res)
642                 return NULL;
643
644         res->lockname.name = kmalloc(namelen, GFP_KERNEL);
645         if (!res->lockname.name) {
646                 kfree(res);
647                 return NULL;
648         }
649
650         dlm_init_lockres(dlm, res, name, namelen);
651         return res;
652 }
653
654 /*
655  * lookup a lock resource by name.
656  * may already exist in the hashtable.
657  * lockid is null terminated
658  *
659  * if not, allocate enough for the lockres and for
660  * the temporary structure used in doing the mastering.
661  *
662  * also, do a lookup in the dlm->master_list to see
663  * if another node has begun mastering the same lock.
664  * if so, there should be a block entry in there
665  * for this name, and we should *not* attempt to master
666  * the lock here.   need to wait around for that node
667  * to assert_master (or die).
668  *
669  */
670 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
671                                           const char *lockid,
672                                           int flags)
673 {
674         struct dlm_lock_resource *tmpres=NULL, *res=NULL;
675         struct dlm_master_list_entry *mle = NULL;
676         struct dlm_master_list_entry *alloc_mle = NULL;
677         int blocked = 0;
678         int ret, nodenum;
679         struct dlm_node_iter iter;
680         unsigned int namelen;
681         int tries = 0;
682         int bit, wait_on_recovery = 0;
683
684         BUG_ON(!lockid);
685
686         namelen = strlen(lockid);
687
688         mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
689
690 lookup:
691         spin_lock(&dlm->spinlock);
692         tmpres = __dlm_lookup_lockres(dlm, lockid, namelen);
693         if (tmpres) {
694                 spin_unlock(&dlm->spinlock);
695                 mlog(0, "found in hash!\n");
696                 if (res)
697                         dlm_lockres_put(res);
698                 res = tmpres;
699                 goto leave;
700         }
701
702         if (!res) {
703                 spin_unlock(&dlm->spinlock);
704                 mlog(0, "allocating a new resource\n");
705                 /* nothing found and we need to allocate one. */
706                 alloc_mle = (struct dlm_master_list_entry *)
707                         kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
708                 if (!alloc_mle)
709                         goto leave;
710                 res = dlm_new_lockres(dlm, lockid, namelen);
711                 if (!res)
712                         goto leave;
713                 goto lookup;
714         }
715
716         mlog(0, "no lockres found, allocated our own: %p\n", res);
717
718         if (flags & LKM_LOCAL) {
719                 /* caller knows it's safe to assume it's not mastered elsewhere
720                  * DONE!  return right away */
721                 spin_lock(&res->spinlock);
722                 dlm_change_lockres_owner(dlm, res, dlm->node_num);
723                 __dlm_insert_lockres(dlm, res);
724                 spin_unlock(&res->spinlock);
725                 spin_unlock(&dlm->spinlock);
726                 /* lockres still marked IN_PROGRESS */
727                 goto wake_waiters;
728         }
729
730         /* check master list to see if another node has started mastering it */
731         spin_lock(&dlm->master_lock);
732
733         /* if we found a block, wait for lock to be mastered by another node */
734         blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
735         if (blocked) {
736                 if (mle->type == DLM_MLE_MASTER) {
737                         mlog(ML_ERROR, "master entry for nonexistent lock!\n");
738                         BUG();
739                 } else if (mle->type == DLM_MLE_MIGRATION) {
740                         /* migration is in progress! */
741                         /* the good news is that we now know the
742                          * "current" master (mle->master). */
743
744                         spin_unlock(&dlm->master_lock);
745                         assert_spin_locked(&dlm->spinlock);
746
747                         /* set the lockres owner and hash it */
748                         spin_lock(&res->spinlock);
749                         dlm_set_lockres_owner(dlm, res, mle->master);
750                         __dlm_insert_lockres(dlm, res);
751                         spin_unlock(&res->spinlock);
752                         spin_unlock(&dlm->spinlock);
753
754                         /* master is known, detach */
755                         dlm_mle_detach_hb_events(dlm, mle);
756                         dlm_put_mle(mle);
757                         mle = NULL;
758                         goto wake_waiters;
759                 }
760         } else {
761                 /* go ahead and try to master lock on this node */
762                 mle = alloc_mle;
763                 /* make sure this does not get freed below */
764                 alloc_mle = NULL;
765                 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
766                 set_bit(dlm->node_num, mle->maybe_map);
767                 list_add(&mle->list, &dlm->master_list);
768
769                 /* still holding the dlm spinlock, check the recovery map
770                  * to see if there are any nodes that still need to be 
771                  * considered.  these will not appear in the mle nodemap
772                  * but they might own this lockres.  wait on them. */
773                 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
774                 if (bit < O2NM_MAX_NODES) {
775                         mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
776                              "recover before lock mastery can begin\n",
777                              dlm->name, namelen, (char *)lockid, bit);
778                         wait_on_recovery = 1;
779                 }
780         }
781
782         /* at this point there is either a DLM_MLE_BLOCK or a
783          * DLM_MLE_MASTER on the master list, so it's safe to add the
784          * lockres to the hashtable.  anyone who finds the lock will
785          * still have to wait on the IN_PROGRESS. */
786
787         /* finally add the lockres to its hash bucket */
788         __dlm_insert_lockres(dlm, res);
789         /* get an extra ref on the mle in case this is a BLOCK
790          * if so, the creator of the BLOCK may try to put the last
791          * ref at this time in the assert master handler, so we
792          * need an extra one to keep from a bad ptr deref. */
793         dlm_get_mle(mle);
794         spin_unlock(&dlm->master_lock);
795         spin_unlock(&dlm->spinlock);
796
797         while (wait_on_recovery) {
798                 /* any cluster changes that occurred after dropping the
799                  * dlm spinlock would be detectable be a change on the mle,
800                  * so we only need to clear out the recovery map once. */
801                 if (dlm_is_recovery_lock(lockid, namelen)) {
802                         mlog(ML_NOTICE, "%s: recovery map is not empty, but "
803                              "must master $RECOVERY lock now\n", dlm->name);
804                         if (!dlm_pre_master_reco_lockres(dlm, res))
805                                 wait_on_recovery = 0;
806                         else {
807                                 mlog(0, "%s: waiting 500ms for heartbeat state "
808                                     "change\n", dlm->name);
809                                 msleep(500);
810                         }
811                         continue;
812                 } 
813
814                 dlm_kick_recovery_thread(dlm);
815                 msleep(100);
816                 dlm_wait_for_recovery(dlm);
817
818                 spin_lock(&dlm->spinlock);
819                 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
820                 if (bit < O2NM_MAX_NODES) {
821                         mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
822                              "recover before lock mastery can begin\n",
823                              dlm->name, namelen, (char *)lockid, bit);
824                         wait_on_recovery = 1;
825                 } else
826                         wait_on_recovery = 0;
827                 spin_unlock(&dlm->spinlock);
828         }
829
830         /* must wait for lock to be mastered elsewhere */
831         if (blocked)
832                 goto wait;
833
834 redo_request:
835         ret = -EINVAL;
836         dlm_node_iter_init(mle->vote_map, &iter);
837         while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
838                 ret = dlm_do_master_request(mle, nodenum);
839                 if (ret < 0)
840                         mlog_errno(ret);
841                 if (mle->master != O2NM_MAX_NODES) {
842                         /* found a master ! */
843                         if (mle->master <= nodenum)
844                                 break;
845                         /* if our master request has not reached the master
846                          * yet, keep going until it does.  this is how the
847                          * master will know that asserts are needed back to
848                          * the lower nodes. */
849                         mlog(0, "%s:%.*s: requests only up to %u but master "
850                              "is %u, keep going\n", dlm->name, namelen,
851                              lockid, nodenum, mle->master);
852                 }
853         }
854
855 wait:
856         /* keep going until the response map includes all nodes */
857         ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
858         if (ret < 0) {
859                 mlog(0, "%s:%.*s: node map changed, redo the "
860                      "master request now, blocked=%d\n",
861                      dlm->name, res->lockname.len,
862                      res->lockname.name, blocked);
863                 if (++tries > 20) {
864                         mlog(ML_ERROR, "%s:%.*s: spinning on "
865                              "dlm_wait_for_lock_mastery, blocked=%d\n", 
866                              dlm->name, res->lockname.len, 
867                              res->lockname.name, blocked);
868                         dlm_print_one_lock_resource(res);
869                         /* dlm_print_one_mle(mle); */
870                         tries = 0;
871                 }
872                 goto redo_request;
873         }
874
875         mlog(0, "lockres mastered by %u\n", res->owner);
876         /* make sure we never continue without this */
877         BUG_ON(res->owner == O2NM_MAX_NODES);
878
879         /* master is known, detach if not already detached */
880         dlm_mle_detach_hb_events(dlm, mle);
881         dlm_put_mle(mle);
882         /* put the extra ref */
883         dlm_put_mle(mle);
884
885 wake_waiters:
886         spin_lock(&res->spinlock);
887         res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
888         spin_unlock(&res->spinlock);
889         wake_up(&res->wq);
890
891 leave:
892         /* need to free the unused mle */
893         if (alloc_mle)
894                 kmem_cache_free(dlm_mle_cache, alloc_mle);
895
896         return res;
897 }
898
899
900 #define DLM_MASTERY_TIMEOUT_MS   5000
901
902 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
903                                      struct dlm_lock_resource *res,
904                                      struct dlm_master_list_entry *mle,
905                                      int *blocked)
906 {
907         u8 m;
908         int ret, bit;
909         int map_changed, voting_done;
910         int assert, sleep;
911
912 recheck:
913         ret = 0;
914         assert = 0;
915
916         /* check if another node has already become the owner */
917         spin_lock(&res->spinlock);
918         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
919                 mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
920                      res->lockname.len, res->lockname.name, res->owner);
921                 spin_unlock(&res->spinlock);
922                 /* this will cause the master to re-assert across
923                  * the whole cluster, freeing up mles */
924                 ret = dlm_do_master_request(mle, res->owner);
925                 if (ret < 0) {
926                         /* give recovery a chance to run */
927                         mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
928                         msleep(500);
929                         goto recheck;
930                 }
931                 ret = 0;
932                 goto leave;
933         }
934         spin_unlock(&res->spinlock);
935
936         spin_lock(&mle->spinlock);
937         m = mle->master;
938         map_changed = (memcmp(mle->vote_map, mle->node_map,
939                               sizeof(mle->vote_map)) != 0);
940         voting_done = (memcmp(mle->vote_map, mle->response_map,
941                              sizeof(mle->vote_map)) == 0);
942
943         /* restart if we hit any errors */
944         if (map_changed) {
945                 int b;
946                 mlog(0, "%s: %.*s: node map changed, restarting\n",
947                      dlm->name, res->lockname.len, res->lockname.name);
948                 ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
949                 b = (mle->type == DLM_MLE_BLOCK);
950                 if ((*blocked && !b) || (!*blocked && b)) {
951                         mlog(0, "%s:%.*s: status change: old=%d new=%d\n", 
952                              dlm->name, res->lockname.len, res->lockname.name,
953                              *blocked, b);
954                         *blocked = b;
955                 }
956                 spin_unlock(&mle->spinlock);
957                 if (ret < 0) {
958                         mlog_errno(ret);
959                         goto leave;
960                 }
961                 mlog(0, "%s:%.*s: restart lock mastery succeeded, "
962                      "rechecking now\n", dlm->name, res->lockname.len,
963                      res->lockname.name);
964                 goto recheck;
965         }
966
967         if (m != O2NM_MAX_NODES) {
968                 /* another node has done an assert!
969                  * all done! */
970                 sleep = 0;
971         } else {
972                 sleep = 1;
973                 /* have all nodes responded? */
974                 if (voting_done && !*blocked) {
975                         bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
976                         if (dlm->node_num <= bit) {
977                                 /* my node number is lowest.
978                                  * now tell other nodes that I am
979                                  * mastering this. */
980                                 mle->master = dlm->node_num;
981                                 assert = 1;
982                                 sleep = 0;
983                         }
984                         /* if voting is done, but we have not received
985                          * an assert master yet, we must sleep */
986                 }
987         }
988
989         spin_unlock(&mle->spinlock);
990
991         /* sleep if we haven't finished voting yet */
992         if (sleep) {
993                 unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
994
995                 /*
996                 if (atomic_read(&mle->mle_refs.refcount) < 2)
997                         mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
998                         atomic_read(&mle->mle_refs.refcount),
999                         res->lockname.len, res->lockname.name);
1000                 */
1001                 atomic_set(&mle->woken, 0);
1002                 (void)wait_event_timeout(mle->wq,
1003                                          (atomic_read(&mle->woken) == 1),
1004                                          timeo);
1005                 if (res->owner == O2NM_MAX_NODES) {
1006                         mlog(0, "waiting again\n");
1007                         goto recheck;
1008                 }
1009                 mlog(0, "done waiting, master is %u\n", res->owner);
1010                 ret = 0;
1011                 goto leave;
1012         }
1013
1014         ret = 0;   /* done */
1015         if (assert) {
1016                 m = dlm->node_num;
1017                 mlog(0, "about to master %.*s here, this=%u\n",
1018                      res->lockname.len, res->lockname.name, m);
1019                 ret = dlm_do_assert_master(dlm, res->lockname.name,
1020                                            res->lockname.len, mle->vote_map, 0);
1021                 if (ret) {
1022                         /* This is a failure in the network path,
1023                          * not in the response to the assert_master
1024                          * (any nonzero response is a BUG on this node).
1025                          * Most likely a socket just got disconnected
1026                          * due to node death. */
1027                         mlog_errno(ret);
1028                 }
1029                 /* no longer need to restart lock mastery.
1030                  * all living nodes have been contacted. */
1031                 ret = 0;
1032         }
1033
1034         /* set the lockres owner */
1035         spin_lock(&res->spinlock);
1036         dlm_change_lockres_owner(dlm, res, m);
1037         spin_unlock(&res->spinlock);
1038
1039 leave:
1040         return ret;
1041 }
1042
1043 struct dlm_bitmap_diff_iter
1044 {
1045         int curnode;
1046         unsigned long *orig_bm;
1047         unsigned long *cur_bm;
1048         unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1049 };
1050
1051 enum dlm_node_state_change
1052 {
1053         NODE_DOWN = -1,
1054         NODE_NO_CHANGE = 0,
1055         NODE_UP
1056 };
1057
1058 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1059                                       unsigned long *orig_bm,
1060                                       unsigned long *cur_bm)
1061 {
1062         unsigned long p1, p2;
1063         int i;
1064
1065         iter->curnode = -1;
1066         iter->orig_bm = orig_bm;
1067         iter->cur_bm = cur_bm;
1068
1069         for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1070                 p1 = *(iter->orig_bm + i);
1071                 p2 = *(iter->cur_bm + i);
1072                 iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1073         }
1074 }
1075
1076 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1077                                      enum dlm_node_state_change *state)
1078 {
1079         int bit;
1080
1081         if (iter->curnode >= O2NM_MAX_NODES)
1082                 return -ENOENT;
1083
1084         bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1085                             iter->curnode+1);
1086         if (bit >= O2NM_MAX_NODES) {
1087                 iter->curnode = O2NM_MAX_NODES;
1088                 return -ENOENT;
1089         }
1090
1091         /* if it was there in the original then this node died */
1092         if (test_bit(bit, iter->orig_bm))
1093                 *state = NODE_DOWN;
1094         else
1095                 *state = NODE_UP;
1096
1097         iter->curnode = bit;
1098         return bit;
1099 }
1100
1101
1102 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1103                                     struct dlm_lock_resource *res,
1104                                     struct dlm_master_list_entry *mle,
1105                                     int blocked)
1106 {
1107         struct dlm_bitmap_diff_iter bdi;
1108         enum dlm_node_state_change sc;
1109         int node;
1110         int ret = 0;
1111
1112         mlog(0, "something happened such that the "
1113              "master process may need to be restarted!\n");
1114
1115         assert_spin_locked(&mle->spinlock);
1116
1117         dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1118         node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1119         while (node >= 0) {
1120                 if (sc == NODE_UP) {
1121                         /* a node came up.  clear any old vote from
1122                          * the response map and set it in the vote map
1123                          * then restart the mastery. */
1124                         mlog(ML_NOTICE, "node %d up while restarting\n", node);
1125
1126                         /* redo the master request, but only for the new node */
1127                         mlog(0, "sending request to new node\n");
1128                         clear_bit(node, mle->response_map);
1129                         set_bit(node, mle->vote_map);
1130                 } else {
1131                         mlog(ML_ERROR, "node down! %d\n", node);
1132
1133                         /* if the node wasn't involved in mastery skip it,
1134                          * but clear it out from the maps so that it will
1135                          * not affect mastery of this lockres */
1136                         clear_bit(node, mle->response_map);
1137                         clear_bit(node, mle->vote_map);
1138                         if (!test_bit(node, mle->maybe_map))
1139                                 goto next;
1140
1141                         /* if we're already blocked on lock mastery, and the
1142                          * dead node wasn't the expected master, or there is
1143                          * another node in the maybe_map, keep waiting */
1144                         if (blocked) {
1145                                 int lowest = find_next_bit(mle->maybe_map,
1146                                                        O2NM_MAX_NODES, 0);
1147
1148                                 /* act like it was never there */
1149                                 clear_bit(node, mle->maybe_map);
1150
1151                                 if (node != lowest)
1152                                         goto next;
1153
1154                                 mlog(ML_ERROR, "expected master %u died while "
1155                                      "this node was blocked waiting on it!\n",
1156                                      node);
1157                                 lowest = find_next_bit(mle->maybe_map,
1158                                                        O2NM_MAX_NODES,
1159                                                        lowest+1);
1160                                 if (lowest < O2NM_MAX_NODES) {
1161                                         mlog(0, "still blocked. waiting "
1162                                              "on %u now\n", lowest);
1163                                         goto next;
1164                                 }
1165
1166                                 /* mle is an MLE_BLOCK, but there is now
1167                                  * nothing left to block on.  we need to return
1168                                  * all the way back out and try again with
1169                                  * an MLE_MASTER. dlm_do_local_recovery_cleanup
1170                                  * has already run, so the mle refcount is ok */
1171                                 mlog(0, "no longer blocking. we can "
1172                                      "try to master this here\n");
1173                                 mle->type = DLM_MLE_MASTER;
1174                                 memset(mle->maybe_map, 0,
1175                                        sizeof(mle->maybe_map));
1176                                 memset(mle->response_map, 0,
1177                                        sizeof(mle->maybe_map));
1178                                 memcpy(mle->vote_map, mle->node_map,
1179                                        sizeof(mle->node_map));
1180                                 mle->u.res = res;
1181                                 set_bit(dlm->node_num, mle->maybe_map);
1182
1183                                 ret = -EAGAIN;
1184                                 goto next;
1185                         }
1186
1187                         clear_bit(node, mle->maybe_map);
1188                         if (node > dlm->node_num)
1189                                 goto next;
1190
1191                         mlog(0, "dead node in map!\n");
1192                         /* yuck. go back and re-contact all nodes
1193                          * in the vote_map, removing this node. */
1194                         memset(mle->response_map, 0,
1195                                sizeof(mle->response_map));
1196                 }
1197                 ret = -EAGAIN;
1198 next:
1199                 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1200         }
1201         return ret;
1202 }
1203
1204
1205 /*
1206  * DLM_MASTER_REQUEST_MSG
1207  *
1208  * returns: 0 on success,
1209  *          -errno on a network error
1210  *
1211  * on error, the caller should assume the target node is "dead"
1212  *
1213  */
1214
1215 static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
1216 {
1217         struct dlm_ctxt *dlm = mle->dlm;
1218         struct dlm_master_request request;
1219         int ret, response=0, resend;
1220
1221         memset(&request, 0, sizeof(request));
1222         request.node_idx = dlm->node_num;
1223
1224         BUG_ON(mle->type == DLM_MLE_MIGRATION);
1225
1226         if (mle->type != DLM_MLE_MASTER) {
1227                 request.namelen = mle->u.name.len;
1228                 memcpy(request.name, mle->u.name.name, request.namelen);
1229         } else {
1230                 request.namelen = mle->u.res->lockname.len;
1231                 memcpy(request.name, mle->u.res->lockname.name,
1232                         request.namelen);
1233         }
1234
1235 again:
1236         ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1237                                  sizeof(request), to, &response);
1238         if (ret < 0)  {
1239                 if (ret == -ESRCH) {
1240                         /* should never happen */
1241                         mlog(ML_ERROR, "TCP stack not ready!\n");
1242                         BUG();
1243                 } else if (ret == -EINVAL) {
1244                         mlog(ML_ERROR, "bad args passed to o2net!\n");
1245                         BUG();
1246                 } else if (ret == -ENOMEM) {
1247                         mlog(ML_ERROR, "out of memory while trying to send "
1248                              "network message!  retrying\n");
1249                         /* this is totally crude */
1250                         msleep(50);
1251                         goto again;
1252                 } else if (!dlm_is_host_down(ret)) {
1253                         /* not a network error. bad. */
1254                         mlog_errno(ret);
1255                         mlog(ML_ERROR, "unhandled error!");
1256                         BUG();
1257                 }
1258                 /* all other errors should be network errors,
1259                  * and likely indicate node death */
1260                 mlog(ML_ERROR, "link to %d went down!\n", to);
1261                 goto out;
1262         }
1263
1264         ret = 0;
1265         resend = 0;
1266         spin_lock(&mle->spinlock);
1267         switch (response) {
1268                 case DLM_MASTER_RESP_YES:
1269                         set_bit(to, mle->response_map);
1270                         mlog(0, "node %u is the master, response=YES\n", to);
1271                         mle->master = to;
1272                         break;
1273                 case DLM_MASTER_RESP_NO:
1274                         mlog(0, "node %u not master, response=NO\n", to);
1275                         set_bit(to, mle->response_map);
1276                         break;
1277                 case DLM_MASTER_RESP_MAYBE:
1278                         mlog(0, "node %u not master, response=MAYBE\n", to);
1279                         set_bit(to, mle->response_map);
1280                         set_bit(to, mle->maybe_map);
1281                         break;
1282                 case DLM_MASTER_RESP_ERROR:
1283                         mlog(0, "node %u hit an error, resending\n", to);
1284                         resend = 1;
1285                         response = 0;
1286                         break;
1287                 default:
1288                         mlog(ML_ERROR, "bad response! %u\n", response);
1289                         BUG();
1290         }
1291         spin_unlock(&mle->spinlock);
1292         if (resend) {
1293                 /* this is also totally crude */
1294                 msleep(50);
1295                 goto again;
1296         }
1297
1298 out:
1299         return ret;
1300 }
1301
1302 /*
1303  * locks that can be taken here:
1304  * dlm->spinlock
1305  * res->spinlock
1306  * mle->spinlock
1307  * dlm->master_list
1308  *
1309  * if possible, TRIM THIS DOWN!!!
1310  */
1311 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
1312 {
1313         u8 response = DLM_MASTER_RESP_MAYBE;
1314         struct dlm_ctxt *dlm = data;
1315         struct dlm_lock_resource *res = NULL;
1316         struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1317         struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1318         char *name;
1319         unsigned int namelen;
1320         int found, ret;
1321         int set_maybe;
1322         int dispatch_assert = 0;
1323
1324         if (!dlm_grab(dlm))
1325                 return DLM_MASTER_RESP_NO;
1326
1327         if (!dlm_domain_fully_joined(dlm)) {
1328                 response = DLM_MASTER_RESP_NO;
1329                 goto send_response;
1330         }
1331
1332         name = request->name;
1333         namelen = request->namelen;
1334
1335         if (namelen > DLM_LOCKID_NAME_MAX) {
1336                 response = DLM_IVBUFLEN;
1337                 goto send_response;
1338         }
1339
1340 way_up_top:
1341         spin_lock(&dlm->spinlock);
1342         res = __dlm_lookup_lockres(dlm, name, namelen);
1343         if (res) {
1344                 spin_unlock(&dlm->spinlock);
1345
1346                 /* take care of the easy cases up front */
1347                 spin_lock(&res->spinlock);
1348                 if (res->state & DLM_LOCK_RES_RECOVERING) {
1349                         spin_unlock(&res->spinlock);
1350                         mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1351                              "being recovered\n");
1352                         response = DLM_MASTER_RESP_ERROR;
1353                         if (mle)
1354                                 kmem_cache_free(dlm_mle_cache, mle);
1355                         goto send_response;
1356                 }
1357
1358                 if (res->owner == dlm->node_num) {
1359                         spin_unlock(&res->spinlock);
1360                         // mlog(0, "this node is the master\n");
1361                         response = DLM_MASTER_RESP_YES;
1362                         if (mle)
1363                                 kmem_cache_free(dlm_mle_cache, mle);
1364
1365                         /* this node is the owner.
1366                          * there is some extra work that needs to
1367                          * happen now.  the requesting node has
1368                          * caused all nodes up to this one to
1369                          * create mles.  this node now needs to
1370                          * go back and clean those up. */
1371                         dispatch_assert = 1;
1372                         goto send_response;
1373                 } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1374                         spin_unlock(&res->spinlock);
1375                         // mlog(0, "node %u is the master\n", res->owner);
1376                         response = DLM_MASTER_RESP_NO;
1377                         if (mle)
1378                                 kmem_cache_free(dlm_mle_cache, mle);
1379                         goto send_response;
1380                 }
1381
1382                 /* ok, there is no owner.  either this node is
1383                  * being blocked, or it is actively trying to
1384                  * master this lock. */
1385                 if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1386                         mlog(ML_ERROR, "lock with no owner should be "
1387                              "in-progress!\n");
1388                         BUG();
1389                 }
1390
1391                 // mlog(0, "lockres is in progress...\n");
1392                 spin_lock(&dlm->master_lock);
1393                 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1394                 if (!found) {
1395                         mlog(ML_ERROR, "no mle found for this lock!\n");
1396                         BUG();
1397                 }
1398                 set_maybe = 1;
1399                 spin_lock(&tmpmle->spinlock);
1400                 if (tmpmle->type == DLM_MLE_BLOCK) {
1401                         // mlog(0, "this node is waiting for "
1402                         // "lockres to be mastered\n");
1403                         response = DLM_MASTER_RESP_NO;
1404                 } else if (tmpmle->type == DLM_MLE_MIGRATION) {
1405                         mlog(0, "node %u is master, but trying to migrate to "
1406                              "node %u.\n", tmpmle->master, tmpmle->new_master);
1407                         if (tmpmle->master == dlm->node_num) {
1408                                 response = DLM_MASTER_RESP_YES;
1409                                 mlog(ML_ERROR, "no owner on lockres, but this "
1410                                      "node is trying to migrate it to %u?!\n",
1411                                      tmpmle->new_master);
1412                                 BUG();
1413                         } else {
1414                                 /* the real master can respond on its own */
1415                                 response = DLM_MASTER_RESP_NO;
1416                         }
1417                 } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1418                         set_maybe = 0;
1419                         if (tmpmle->master == dlm->node_num) {
1420                                 response = DLM_MASTER_RESP_YES;
1421                                 /* this node will be the owner.
1422                                  * go back and clean the mles on any
1423                                  * other nodes */
1424                                 dispatch_assert = 1;
1425                         } else
1426                                 response = DLM_MASTER_RESP_NO;
1427                 } else {
1428                         // mlog(0, "this node is attempting to "
1429                         // "master lockres\n");
1430                         response = DLM_MASTER_RESP_MAYBE;
1431                 }
1432                 if (set_maybe)
1433                         set_bit(request->node_idx, tmpmle->maybe_map);
1434                 spin_unlock(&tmpmle->spinlock);
1435
1436                 spin_unlock(&dlm->master_lock);
1437                 spin_unlock(&res->spinlock);
1438
1439                 /* keep the mle attached to heartbeat events */
1440                 dlm_put_mle(tmpmle);
1441                 if (mle)
1442                         kmem_cache_free(dlm_mle_cache, mle);
1443                 goto send_response;
1444         }
1445
1446         /*
1447          * lockres doesn't exist on this node
1448          * if there is an MLE_BLOCK, return NO
1449          * if there is an MLE_MASTER, return MAYBE
1450          * otherwise, add an MLE_BLOCK, return NO
1451          */
1452         spin_lock(&dlm->master_lock);
1453         found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1454         if (!found) {
1455                 /* this lockid has never been seen on this node yet */
1456                 // mlog(0, "no mle found\n");
1457                 if (!mle) {
1458                         spin_unlock(&dlm->master_lock);
1459                         spin_unlock(&dlm->spinlock);
1460
1461                         mle = (struct dlm_master_list_entry *)
1462                                 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
1463                         if (!mle) {
1464                                 response = DLM_MASTER_RESP_ERROR;
1465                                 mlog_errno(-ENOMEM);
1466                                 goto send_response;
1467                         }
1468                         spin_lock(&dlm->spinlock);
1469                         dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,
1470                                          name, namelen);
1471                         spin_unlock(&dlm->spinlock);
1472                         goto way_up_top;
1473                 }
1474
1475                 // mlog(0, "this is second time thru, already allocated, "
1476                 // "add the block.\n");
1477                 set_bit(request->node_idx, mle->maybe_map);
1478                 list_add(&mle->list, &dlm->master_list);
1479                 response = DLM_MASTER_RESP_NO;
1480         } else {
1481                 // mlog(0, "mle was found\n");
1482                 set_maybe = 1;
1483                 spin_lock(&tmpmle->spinlock);
1484                 if (tmpmle->master == dlm->node_num) {
1485                         mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1486                         BUG();
1487                 }
1488                 if (tmpmle->type == DLM_MLE_BLOCK)
1489                         response = DLM_MASTER_RESP_NO;
1490                 else if (tmpmle->type == DLM_MLE_MIGRATION) {
1491                         mlog(0, "migration mle was found (%u->%u)\n",
1492                              tmpmle->master, tmpmle->new_master);
1493                         /* real master can respond on its own */
1494                         response = DLM_MASTER_RESP_NO;
1495                 } else
1496                         response = DLM_MASTER_RESP_MAYBE;
1497                 if (set_maybe)
1498                         set_bit(request->node_idx, tmpmle->maybe_map);
1499                 spin_unlock(&tmpmle->spinlock);
1500         }
1501         spin_unlock(&dlm->master_lock);
1502         spin_unlock(&dlm->spinlock);
1503
1504         if (found) {
1505                 /* keep the mle attached to heartbeat events */
1506                 dlm_put_mle(tmpmle);
1507         }
1508 send_response:
1509
1510         if (dispatch_assert) {
1511                 if (response != DLM_MASTER_RESP_YES)
1512                         mlog(ML_ERROR, "invalid response %d\n", response);
1513                 if (!res) {
1514                         mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1515                         BUG();
1516                 }
1517                 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1518                              dlm->node_num, res->lockname.len, res->lockname.name);
1519                 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx, 
1520                                                  DLM_ASSERT_MASTER_MLE_CLEANUP);
1521                 if (ret < 0) {
1522                         mlog(ML_ERROR, "failed to dispatch assert master work\n");
1523                         response = DLM_MASTER_RESP_ERROR;
1524                 }
1525         }
1526
1527         dlm_put(dlm);
1528         return response;
1529 }
1530
1531 /*
1532  * DLM_ASSERT_MASTER_MSG
1533  */
1534
1535
1536 /*
1537  * NOTE: this can be used for debugging
1538  * can periodically run all locks owned by this node
1539  * and re-assert across the cluster...
1540  */
1541 static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
1542                                 unsigned int namelen, void *nodemap,
1543                                 u32 flags)
1544 {
1545         struct dlm_assert_master assert;
1546         int to, tmpret;
1547         struct dlm_node_iter iter;
1548         int ret = 0;
1549         int reassert;
1550
1551         BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1552 again:
1553         reassert = 0;
1554
1555         /* note that if this nodemap is empty, it returns 0 */
1556         dlm_node_iter_init(nodemap, &iter);
1557         while ((to = dlm_node_iter_next(&iter)) >= 0) {
1558                 int r = 0;
1559                 mlog(0, "sending assert master to %d (%.*s)\n", to,
1560                      namelen, lockname);
1561                 memset(&assert, 0, sizeof(assert));
1562                 assert.node_idx = dlm->node_num;
1563                 assert.namelen = namelen;
1564                 memcpy(assert.name, lockname, namelen);
1565                 assert.flags = cpu_to_be32(flags);
1566
1567                 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1568                                             &assert, sizeof(assert), to, &r);
1569                 if (tmpret < 0) {
1570                         mlog(ML_ERROR, "assert_master returned %d!\n", tmpret);
1571                         if (!dlm_is_host_down(tmpret)) {
1572                                 mlog(ML_ERROR, "unhandled error!\n");
1573                                 BUG();
1574                         }
1575                         /* a node died.  finish out the rest of the nodes. */
1576                         mlog(ML_ERROR, "link to %d went down!\n", to);
1577                         /* any nonzero status return will do */
1578                         ret = tmpret;
1579                 } else if (r < 0) {
1580                         /* ok, something horribly messed.  kill thyself. */
1581                         mlog(ML_ERROR,"during assert master of %.*s to %u, "
1582                              "got %d.\n", namelen, lockname, to, r);
1583                         dlm_dump_lock_resources(dlm);
1584                         BUG();
1585                 } else if (r == EAGAIN) {
1586                         mlog(0, "%.*s: node %u create mles on other "
1587                              "nodes and requests a re-assert\n", 
1588                              namelen, lockname, to);
1589                         reassert = 1;
1590                 }
1591         }
1592
1593         if (reassert)
1594                 goto again;
1595
1596         return ret;
1597 }
1598
1599 /*
1600  * locks that can be taken here:
1601  * dlm->spinlock
1602  * res->spinlock
1603  * mle->spinlock
1604  * dlm->master_list
1605  *
1606  * if possible, TRIM THIS DOWN!!!
1607  */
1608 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1609 {
1610         struct dlm_ctxt *dlm = data;
1611         struct dlm_master_list_entry *mle = NULL;
1612         struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1613         struct dlm_lock_resource *res = NULL;
1614         char *name;
1615         unsigned int namelen;
1616         u32 flags;
1617         int master_request = 0;
1618         int ret = 0;
1619
1620         if (!dlm_grab(dlm))
1621                 return 0;
1622
1623         name = assert->name;
1624         namelen = assert->namelen;
1625         flags = be32_to_cpu(assert->flags);
1626
1627         if (namelen > DLM_LOCKID_NAME_MAX) {
1628                 mlog(ML_ERROR, "Invalid name length!");
1629                 goto done;
1630         }
1631
1632         spin_lock(&dlm->spinlock);
1633
1634         if (flags)
1635                 mlog(0, "assert_master with flags: %u\n", flags);
1636
1637         /* find the MLE */
1638         spin_lock(&dlm->master_lock);
1639         if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1640                 /* not an error, could be master just re-asserting */
1641                 mlog(0, "just got an assert_master from %u, but no "
1642                      "MLE for it! (%.*s)\n", assert->node_idx,
1643                      namelen, name);
1644         } else {
1645                 int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1646                 if (bit >= O2NM_MAX_NODES) {
1647                         /* not necessarily an error, though less likely.
1648                          * could be master just re-asserting. */
1649                         mlog(ML_ERROR, "no bits set in the maybe_map, but %u "
1650                              "is asserting! (%.*s)\n", assert->node_idx,
1651                              namelen, name);
1652                 } else if (bit != assert->node_idx) {
1653                         if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1654                                 mlog(0, "master %u was found, %u should "
1655                                      "back off\n", assert->node_idx, bit);
1656                         } else {
1657                                 /* with the fix for bug 569, a higher node
1658                                  * number winning the mastery will respond
1659                                  * YES to mastery requests, but this node
1660                                  * had no way of knowing.  let it pass. */
1661                                 mlog(ML_ERROR, "%u is the lowest node, "
1662                                      "%u is asserting. (%.*s)  %u must "
1663                                      "have begun after %u won.\n", bit,
1664                                      assert->node_idx, namelen, name, bit,
1665                                      assert->node_idx);
1666                         }
1667                 }
1668         }
1669         spin_unlock(&dlm->master_lock);
1670
1671         /* ok everything checks out with the MLE
1672          * now check to see if there is a lockres */
1673         res = __dlm_lookup_lockres(dlm, name, namelen);
1674         if (res) {
1675                 spin_lock(&res->spinlock);
1676                 if (res->state & DLM_LOCK_RES_RECOVERING)  {
1677                         mlog(ML_ERROR, "%u asserting but %.*s is "
1678                              "RECOVERING!\n", assert->node_idx, namelen, name);
1679                         goto kill;
1680                 }
1681                 if (!mle) {
1682                         if (res->owner != assert->node_idx) {
1683                                 mlog(ML_ERROR, "assert_master from "
1684                                           "%u, but current owner is "
1685                                           "%u! (%.*s)\n",
1686                                        assert->node_idx, res->owner,
1687                                        namelen, name);
1688                                 goto kill;
1689                         }
1690                 } else if (mle->type != DLM_MLE_MIGRATION) {
1691                         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1692                                 /* owner is just re-asserting */
1693                                 if (res->owner == assert->node_idx) {
1694                                         mlog(0, "owner %u re-asserting on "
1695                                              "lock %.*s\n", assert->node_idx,
1696                                              namelen, name);
1697                                         goto ok;
1698                                 }
1699                                 mlog(ML_ERROR, "got assert_master from "
1700                                      "node %u, but %u is the owner! "
1701                                      "(%.*s)\n", assert->node_idx,
1702                                      res->owner, namelen, name);
1703                                 goto kill;
1704                         }
1705                         if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1706                                 mlog(ML_ERROR, "got assert from %u, but lock "
1707                                      "with no owner should be "
1708                                      "in-progress! (%.*s)\n",
1709                                      assert->node_idx,
1710                                      namelen, name);
1711                                 goto kill;
1712                         }
1713                 } else /* mle->type == DLM_MLE_MIGRATION */ {
1714                         /* should only be getting an assert from new master */
1715                         if (assert->node_idx != mle->new_master) {
1716                                 mlog(ML_ERROR, "got assert from %u, but "
1717                                      "new master is %u, and old master "
1718                                      "was %u (%.*s)\n",
1719                                      assert->node_idx, mle->new_master,
1720                                      mle->master, namelen, name);
1721                                 goto kill;
1722                         }
1723
1724                 }
1725 ok:
1726                 spin_unlock(&res->spinlock);
1727         }
1728         spin_unlock(&dlm->spinlock);
1729
1730         // mlog(0, "woo!  got an assert_master from node %u!\n",
1731         //           assert->node_idx);
1732         if (mle) {
1733                 int extra_ref = 0;
1734                 int nn = -1;
1735                 
1736                 spin_lock(&mle->spinlock);
1737                 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1738                         extra_ref = 1;
1739                 else {
1740                         /* MASTER mle: if any bits set in the response map
1741                          * then the calling node needs to re-assert to clear
1742                          * up nodes that this node contacted */
1743                         while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, 
1744                                                     nn+1)) < O2NM_MAX_NODES) {
1745                                 if (nn != dlm->node_num && nn != assert->node_idx)
1746                                         master_request = 1;
1747                         }
1748                 }
1749                 mle->master = assert->node_idx;
1750                 atomic_set(&mle->woken, 1);
1751                 wake_up(&mle->wq);
1752                 spin_unlock(&mle->spinlock);
1753
1754                 if (mle->type == DLM_MLE_MIGRATION && res) {
1755                         mlog(0, "finishing off migration of lockres %.*s, "
1756                              "from %u to %u\n",
1757                                res->lockname.len, res->lockname.name,
1758                                dlm->node_num, mle->new_master);
1759                         spin_lock(&res->spinlock);
1760                         res->state &= ~DLM_LOCK_RES_MIGRATING;
1761                         dlm_change_lockres_owner(dlm, res, mle->new_master);
1762                         BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1763                         spin_unlock(&res->spinlock);
1764                 }
1765                 /* master is known, detach if not already detached */
1766                 dlm_mle_detach_hb_events(dlm, mle);
1767                 dlm_put_mle(mle);
1768                 
1769                 if (extra_ref) {
1770                         /* the assert master message now balances the extra
1771                          * ref given by the master / migration request message.
1772                          * if this is the last put, it will be removed
1773                          * from the list. */
1774                         dlm_put_mle(mle);
1775                 }
1776         }
1777
1778 done:
1779         ret = 0;
1780         if (res)
1781                 dlm_lockres_put(res);
1782         dlm_put(dlm);
1783         if (master_request) {
1784                 mlog(0, "need to tell master to reassert\n");
1785                 ret = EAGAIN;  // positive. negative would shoot down the node.
1786         }
1787         return ret;
1788
1789 kill:
1790         /* kill the caller! */
1791         spin_unlock(&res->spinlock);
1792         spin_unlock(&dlm->spinlock);
1793         dlm_lockres_put(res);
1794         mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
1795              "and killing the other node now!  This node is OK and can continue.\n");
1796         dlm_dump_lock_resources(dlm);
1797         dlm_put(dlm);
1798         return -EINVAL;
1799 }
1800
1801 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
1802                                struct dlm_lock_resource *res,
1803                                int ignore_higher, u8 request_from, u32 flags)
1804 {
1805         struct dlm_work_item *item;
1806         item = kcalloc(1, sizeof(*item), GFP_KERNEL);
1807         if (!item)
1808                 return -ENOMEM;
1809
1810
1811         /* queue up work for dlm_assert_master_worker */
1812         dlm_grab(dlm);  /* get an extra ref for the work item */
1813         dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
1814         item->u.am.lockres = res; /* already have a ref */
1815         /* can optionally ignore node numbers higher than this node */
1816         item->u.am.ignore_higher = ignore_higher;
1817         item->u.am.request_from = request_from;
1818         item->u.am.flags = flags;
1819
1820         if (ignore_higher) 
1821                 mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len, 
1822                      res->lockname.name);
1823                 
1824         spin_lock(&dlm->work_lock);
1825         list_add_tail(&item->list, &dlm->work_list);
1826         spin_unlock(&dlm->work_lock);
1827
1828         schedule_work(&dlm->dispatched_work);
1829         return 0;
1830 }
1831
1832 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
1833 {
1834         struct dlm_ctxt *dlm = data;
1835         int ret = 0;
1836         struct dlm_lock_resource *res;
1837         unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1838         int ignore_higher;
1839         int bit;
1840         u8 request_from;
1841         u32 flags;
1842
1843         dlm = item->dlm;
1844         res = item->u.am.lockres;
1845         ignore_higher = item->u.am.ignore_higher;
1846         request_from = item->u.am.request_from;
1847         flags = item->u.am.flags;
1848
1849         spin_lock(&dlm->spinlock);
1850         memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
1851         spin_unlock(&dlm->spinlock);
1852
1853         clear_bit(dlm->node_num, nodemap);
1854         if (ignore_higher) {
1855                 /* if is this just to clear up mles for nodes below
1856                  * this node, do not send the message to the original
1857                  * caller or any node number higher than this */
1858                 clear_bit(request_from, nodemap);
1859                 bit = dlm->node_num;
1860                 while (1) {
1861                         bit = find_next_bit(nodemap, O2NM_MAX_NODES,
1862                                             bit+1);
1863                         if (bit >= O2NM_MAX_NODES)
1864                                 break;
1865                         clear_bit(bit, nodemap);
1866                 }
1867         }
1868
1869         /* this call now finishes out the nodemap
1870          * even if one or more nodes die */
1871         mlog(0, "worker about to master %.*s here, this=%u\n",
1872                      res->lockname.len, res->lockname.name, dlm->node_num);
1873         ret = dlm_do_assert_master(dlm, res->lockname.name,
1874                                    res->lockname.len,
1875                                    nodemap, flags);
1876         if (ret < 0) {
1877                 /* no need to restart, we are done */
1878                 mlog_errno(ret);
1879         }
1880
1881         dlm_lockres_put(res);
1882
1883         mlog(0, "finished with dlm_assert_master_worker\n");
1884 }
1885
1886 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
1887  * We cannot wait for node recovery to complete to begin mastering this
1888  * lockres because this lockres is used to kick off recovery! ;-)
1889  * So, do a pre-check on all living nodes to see if any of those nodes
1890  * think that $RECOVERY is currently mastered by a dead node.  If so,
1891  * we wait a short time to allow that node to get notified by its own
1892  * heartbeat stack, then check again.  All $RECOVERY lock resources
1893  * mastered by dead nodes are purged when the hearbeat callback is 
1894  * fired, so we can know for sure that it is safe to continue once
1895  * the node returns a live node or no node.  */
1896 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
1897                                        struct dlm_lock_resource *res)
1898 {
1899         struct dlm_node_iter iter;
1900         int nodenum;
1901         int ret = 0;
1902         u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
1903
1904         spin_lock(&dlm->spinlock);
1905         dlm_node_iter_init(dlm->domain_map, &iter);
1906         spin_unlock(&dlm->spinlock);
1907
1908         while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
1909                 /* do not send to self */
1910                 if (nodenum == dlm->node_num)
1911                         continue;
1912                 ret = dlm_do_master_requery(dlm, res, nodenum, &master);
1913                 if (ret < 0) {
1914                         mlog_errno(ret);
1915                         if (!dlm_is_host_down(ret))
1916                                 BUG();
1917                         /* host is down, so answer for that node would be
1918                          * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
1919                 }
1920
1921                 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1922                         /* check to see if this master is in the recovery map */
1923                         spin_lock(&dlm->spinlock);
1924                         if (test_bit(master, dlm->recovery_map)) {
1925                                 mlog(ML_NOTICE, "%s: node %u has not seen "
1926                                      "node %u go down yet, and thinks the "
1927                                      "dead node is mastering the recovery "
1928                                      "lock.  must wait.\n", dlm->name,
1929                                      nodenum, master);
1930                                 ret = -EAGAIN;
1931                         }
1932                         spin_unlock(&dlm->spinlock);
1933                         mlog(0, "%s: reco lock master is %u\n", dlm->name, 
1934                              master);
1935                         break;
1936                 }
1937         }
1938         return ret;
1939 }
1940
1941
1942 /*
1943  * DLM_MIGRATE_LOCKRES
1944  */
1945
1946
1947 int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1948                         u8 target)
1949 {
1950         struct dlm_master_list_entry *mle = NULL;
1951         struct dlm_master_list_entry *oldmle = NULL;
1952         struct dlm_migratable_lockres *mres = NULL;
1953         int ret = -EINVAL;
1954         const char *name;
1955         unsigned int namelen;
1956         int mle_added = 0;
1957         struct list_head *queue, *iter;
1958         int i;
1959         struct dlm_lock *lock;
1960         int empty = 1;
1961
1962         if (!dlm_grab(dlm))
1963                 return -EINVAL;
1964
1965         name = res->lockname.name;
1966         namelen = res->lockname.len;
1967
1968         mlog(0, "migrating %.*s to %u\n", namelen, name, target);
1969
1970         /*
1971          * ensure this lockres is a proper candidate for migration
1972          */
1973         spin_lock(&res->spinlock);
1974         if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
1975                 mlog(0, "cannot migrate lockres with unknown owner!\n");
1976                 spin_unlock(&res->spinlock);
1977                 goto leave;
1978         }
1979         if (res->owner != dlm->node_num) {
1980                 mlog(0, "cannot migrate lockres this node doesn't own!\n");
1981                 spin_unlock(&res->spinlock);
1982                 goto leave;
1983         }
1984         mlog(0, "checking queues...\n");
1985         queue = &res->granted;
1986         for (i=0; i<3; i++) {
1987                 list_for_each(iter, queue) {
1988                         lock = list_entry (iter, struct dlm_lock, list);
1989                         empty = 0;
1990                         if (lock->ml.node == dlm->node_num) {
1991                                 mlog(0, "found a lock owned by this node "
1992                                      "still on the %s queue!  will not "
1993                                      "migrate this lockres\n",
1994                                      i==0 ? "granted" :
1995                                      (i==1 ? "converting" : "blocked"));
1996                                 spin_unlock(&res->spinlock);
1997                                 ret = -ENOTEMPTY;
1998                                 goto leave;
1999                         }
2000                 }
2001                 queue++;
2002         }
2003         mlog(0, "all locks on this lockres are nonlocal.  continuing\n");
2004         spin_unlock(&res->spinlock);
2005
2006         /* no work to do */
2007         if (empty) {
2008                 mlog(0, "no locks were found on this lockres! done!\n");
2009                 ret = 0;
2010                 goto leave;
2011         }
2012
2013         /*
2014          * preallocate up front
2015          * if this fails, abort
2016          */
2017
2018         ret = -ENOMEM;
2019         mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL);
2020         if (!mres) {
2021                 mlog_errno(ret);
2022                 goto leave;
2023         }
2024
2025         mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2026                                                                 GFP_KERNEL);
2027         if (!mle) {
2028                 mlog_errno(ret);
2029                 goto leave;
2030         }
2031         ret = 0;
2032
2033         /*
2034          * find a node to migrate the lockres to
2035          */
2036
2037         mlog(0, "picking a migration node\n");
2038         spin_lock(&dlm->spinlock);
2039         /* pick a new node */
2040         if (!test_bit(target, dlm->domain_map) ||
2041             target >= O2NM_MAX_NODES) {
2042                 target = dlm_pick_migration_target(dlm, res);
2043         }
2044         mlog(0, "node %u chosen for migration\n", target);
2045
2046         if (target >= O2NM_MAX_NODES ||
2047             !test_bit(target, dlm->domain_map)) {
2048                 /* target chosen is not alive */
2049                 ret = -EINVAL;
2050         }
2051
2052         if (ret) {
2053                 spin_unlock(&dlm->spinlock);
2054                 goto fail;
2055         }
2056
2057         mlog(0, "continuing with target = %u\n", target);
2058
2059         /*
2060          * clear any existing master requests and
2061          * add the migration mle to the list
2062          */
2063         spin_lock(&dlm->master_lock);
2064         ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2065                                     namelen, target, dlm->node_num);
2066         spin_unlock(&dlm->master_lock);
2067         spin_unlock(&dlm->spinlock);
2068
2069         if (ret == -EEXIST) {
2070                 mlog(0, "another process is already migrating it\n");
2071                 goto fail;
2072         }
2073         mle_added = 1;
2074
2075         /*
2076          * set the MIGRATING flag and flush asts
2077          * if we fail after this we need to re-dirty the lockres
2078          */
2079         if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2080                 mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2081                      "the target went down.\n", res->lockname.len,
2082                      res->lockname.name, target);
2083                 spin_lock(&res->spinlock);
2084                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2085                 spin_unlock(&res->spinlock);
2086                 ret = -EINVAL;
2087         }
2088
2089 fail:
2090         if (oldmle) {
2091                 /* master is known, detach if not already detached */
2092                 dlm_mle_detach_hb_events(dlm, oldmle);
2093                 dlm_put_mle(oldmle);
2094         }
2095
2096         if (ret < 0) {
2097                 if (mle_added) {
2098                         dlm_mle_detach_hb_events(dlm, mle);
2099                         dlm_put_mle(mle);
2100                 } else if (mle) {
2101                         kmem_cache_free(dlm_mle_cache, mle);
2102                 }
2103                 goto leave;
2104         }
2105
2106         /*
2107          * at this point, we have a migration target, an mle
2108          * in the master list, and the MIGRATING flag set on
2109          * the lockres
2110          */
2111
2112
2113         /* get an extra reference on the mle.
2114          * otherwise the assert_master from the new
2115          * master will destroy this.
2116          * also, make sure that all callers of dlm_get_mle
2117          * take both dlm->spinlock and dlm->master_lock */
2118         spin_lock(&dlm->spinlock);
2119         spin_lock(&dlm->master_lock);
2120         dlm_get_mle(mle);
2121         spin_unlock(&dlm->master_lock);
2122         spin_unlock(&dlm->spinlock);
2123
2124         /* notify new node and send all lock state */
2125         /* call send_one_lockres with migration flag.
2126          * this serves as notice to the target node that a
2127          * migration is starting. */
2128         ret = dlm_send_one_lockres(dlm, res, mres, target,
2129                                    DLM_MRES_MIGRATION);
2130
2131         if (ret < 0) {
2132                 mlog(0, "migration to node %u failed with %d\n",
2133                      target, ret);
2134                 /* migration failed, detach and clean up mle */
2135                 dlm_mle_detach_hb_events(dlm, mle);
2136                 dlm_put_mle(mle);
2137                 dlm_put_mle(mle);
2138                 goto leave;
2139         }
2140
2141         /* at this point, the target sends a message to all nodes,
2142          * (using dlm_do_migrate_request).  this node is skipped since
2143          * we had to put an mle in the list to begin the process.  this
2144          * node now waits for target to do an assert master.  this node
2145          * will be the last one notified, ensuring that the migration
2146          * is complete everywhere.  if the target dies while this is
2147          * going on, some nodes could potentially see the target as the
2148          * master, so it is important that my recovery finds the migration
2149          * mle and sets the master to UNKNONWN. */
2150
2151
2152         /* wait for new node to assert master */
2153         while (1) {
2154                 ret = wait_event_interruptible_timeout(mle->wq,
2155                                         (atomic_read(&mle->woken) == 1),
2156                                         msecs_to_jiffies(5000));
2157
2158                 if (ret >= 0) {
2159                         if (atomic_read(&mle->woken) == 1 ||
2160                             res->owner == target)
2161                                 break;
2162
2163                         mlog(0, "timed out during migration\n");
2164                         /* avoid hang during shutdown when migrating lockres 
2165                          * to a node which also goes down */
2166                         if (dlm_is_node_dead(dlm, target)) {
2167                                 mlog(0, "%s:%.*s: expected migration target %u "
2168                                      "is no longer up.  restarting.\n",
2169                                      dlm->name, res->lockname.len,
2170                                      res->lockname.name, target);
2171                                 ret = -ERESTARTSYS;
2172                         }
2173                 }
2174                 if (ret == -ERESTARTSYS) {
2175                         /* migration failed, detach and clean up mle */
2176                         dlm_mle_detach_hb_events(dlm, mle);
2177                         dlm_put_mle(mle);
2178                         dlm_put_mle(mle);
2179                         goto leave;
2180                 }
2181                 /* TODO: if node died: stop, clean up, return error */
2182         }
2183
2184         /* all done, set the owner, clear the flag */
2185         spin_lock(&res->spinlock);
2186         dlm_set_lockres_owner(dlm, res, target);
2187         res->state &= ~DLM_LOCK_RES_MIGRATING;
2188         dlm_remove_nonlocal_locks(dlm, res);
2189         spin_unlock(&res->spinlock);
2190         wake_up(&res->wq);
2191
2192         /* master is known, detach if not already detached */
2193         dlm_mle_detach_hb_events(dlm, mle);
2194         dlm_put_mle(mle);
2195         ret = 0;
2196
2197         dlm_lockres_calc_usage(dlm, res);
2198
2199 leave:
2200         /* re-dirty the lockres if we failed */
2201         if (ret < 0)
2202                 dlm_kick_thread(dlm, res);
2203
2204         /* TODO: cleanup */
2205         if (mres)
2206                 free_page((unsigned long)mres);
2207
2208         dlm_put(dlm);
2209
2210         mlog(0, "returning %d\n", ret);
2211         return ret;
2212 }
2213 EXPORT_SYMBOL_GPL(dlm_migrate_lockres);
2214
2215 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2216 {
2217         int ret;
2218         spin_lock(&dlm->ast_lock);
2219         spin_lock(&lock->spinlock);
2220         ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2221         spin_unlock(&lock->spinlock);
2222         spin_unlock(&dlm->ast_lock);
2223         return ret;
2224 }
2225
2226 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2227                                      struct dlm_lock_resource *res,
2228                                      u8 mig_target)
2229 {
2230         int can_proceed;
2231         spin_lock(&res->spinlock);
2232         can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2233         spin_unlock(&res->spinlock);
2234
2235         /* target has died, so make the caller break out of the 
2236          * wait_event, but caller must recheck the domain_map */
2237         spin_lock(&dlm->spinlock);
2238         if (!test_bit(mig_target, dlm->domain_map))
2239                 can_proceed = 1;
2240         spin_unlock(&dlm->spinlock);
2241         return can_proceed;
2242 }
2243
2244 int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2245 {
2246         int ret;
2247         spin_lock(&res->spinlock);
2248         ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2249         spin_unlock(&res->spinlock);
2250         return ret;
2251 }
2252
2253
2254 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2255                                        struct dlm_lock_resource *res,
2256                                        u8 target)
2257 {
2258         int ret = 0;
2259
2260         mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2261                res->lockname.len, res->lockname.name, dlm->node_num,
2262                target);
2263         /* need to set MIGRATING flag on lockres.  this is done by
2264          * ensuring that all asts have been flushed for this lockres. */
2265         spin_lock(&res->spinlock);
2266         BUG_ON(res->migration_pending);
2267         res->migration_pending = 1;
2268         /* strategy is to reserve an extra ast then release
2269          * it below, letting the release do all of the work */
2270         __dlm_lockres_reserve_ast(res);
2271         spin_unlock(&res->spinlock);
2272
2273         /* now flush all the pending asts.. hang out for a bit */
2274         dlm_kick_thread(dlm, res);
2275         wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2276         dlm_lockres_release_ast(dlm, res);
2277
2278         mlog(0, "about to wait on migration_wq, dirty=%s\n",
2279                res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2280         /* if the extra ref we just put was the final one, this
2281          * will pass thru immediately.  otherwise, we need to wait
2282          * for the last ast to finish. */
2283 again:
2284         ret = wait_event_interruptible_timeout(dlm->migration_wq,
2285                    dlm_migration_can_proceed(dlm, res, target),
2286                    msecs_to_jiffies(1000));
2287         if (ret < 0) {
2288                 mlog(0, "woken again: migrating? %s, dead? %s\n",
2289                        res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2290                        test_bit(target, dlm->domain_map) ? "no":"yes");
2291         } else {
2292                 mlog(0, "all is well: migrating? %s, dead? %s\n",
2293                        res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2294                        test_bit(target, dlm->domain_map) ? "no":"yes");
2295         }
2296         if (!dlm_migration_can_proceed(dlm, res, target)) {
2297                 mlog(0, "trying again...\n");
2298                 goto again;
2299         }
2300
2301         /* did the target go down or die? */
2302         spin_lock(&dlm->spinlock);
2303         if (!test_bit(target, dlm->domain_map)) {
2304                 mlog(ML_ERROR, "aha. migration target %u just went down\n",
2305                      target);
2306                 ret = -EHOSTDOWN;
2307         }
2308         spin_unlock(&dlm->spinlock);
2309
2310         /*
2311          * at this point:
2312          *
2313          *   o the DLM_LOCK_RES_MIGRATING flag is set
2314          *   o there are no pending asts on this lockres
2315          *   o all processes trying to reserve an ast on this
2316          *     lockres must wait for the MIGRATING flag to clear
2317          */
2318         return ret;
2319 }
2320
2321 /* last step in the migration process.
2322  * original master calls this to free all of the dlm_lock
2323  * structures that used to be for other nodes. */
2324 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2325                                       struct dlm_lock_resource *res)
2326 {
2327         struct list_head *iter, *iter2;
2328         struct list_head *queue = &res->granted;
2329         int i;
2330         struct dlm_lock *lock;
2331
2332         assert_spin_locked(&res->spinlock);
2333
2334         BUG_ON(res->owner == dlm->node_num);
2335
2336         for (i=0; i<3; i++) {
2337                 list_for_each_safe(iter, iter2, queue) {
2338                         lock = list_entry (iter, struct dlm_lock, list);
2339                         if (lock->ml.node != dlm->node_num) {
2340                                 mlog(0, "putting lock for node %u\n",
2341                                      lock->ml.node);
2342                                 /* be extra careful */
2343                                 BUG_ON(!list_empty(&lock->ast_list));
2344                                 BUG_ON(!list_empty(&lock->bast_list));
2345                                 BUG_ON(lock->ast_pending);
2346                                 BUG_ON(lock->bast_pending);
2347                                 list_del_init(&lock->list);
2348                                 dlm_lock_put(lock);
2349                         }
2350                 }
2351                 queue++;
2352         }
2353 }
2354
2355 /* for now this is not too intelligent.  we will
2356  * need stats to make this do the right thing.
2357  * this just finds the first lock on one of the
2358  * queues and uses that node as the target. */
2359 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2360                                     struct dlm_lock_resource *res)
2361 {
2362         int i;
2363         struct list_head *queue = &res->granted;
2364         struct list_head *iter;
2365         struct dlm_lock *lock;
2366         int nodenum;
2367
2368         assert_spin_locked(&dlm->spinlock);
2369
2370         spin_lock(&res->spinlock);
2371         for (i=0; i<3; i++) {
2372                 list_for_each(iter, queue) {
2373                         /* up to the caller to make sure this node
2374                          * is alive */
2375                         lock = list_entry (iter, struct dlm_lock, list);
2376                         if (lock->ml.node != dlm->node_num) {
2377                                 spin_unlock(&res->spinlock);
2378                                 return lock->ml.node;
2379                         }
2380                 }
2381                 queue++;
2382         }
2383         spin_unlock(&res->spinlock);
2384         mlog(0, "have not found a suitable target yet! checking domain map\n");
2385
2386         /* ok now we're getting desperate.  pick anyone alive. */
2387         nodenum = -1;
2388         while (1) {
2389                 nodenum = find_next_bit(dlm->domain_map,
2390                                         O2NM_MAX_NODES, nodenum+1);
2391                 mlog(0, "found %d in domain map\n", nodenum);
2392                 if (nodenum >= O2NM_MAX_NODES)
2393                         break;
2394                 if (nodenum != dlm->node_num) {
2395                         mlog(0, "picking %d\n", nodenum);
2396                         return nodenum;
2397                 }
2398         }
2399
2400         mlog(0, "giving up.  no master to migrate to\n");
2401         return DLM_LOCK_RES_OWNER_UNKNOWN;
2402 }
2403
2404
2405
2406 /* this is called by the new master once all lockres
2407  * data has been received */
2408 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2409                                   struct dlm_lock_resource *res,
2410                                   u8 master, u8 new_master,
2411                                   struct dlm_node_iter *iter)
2412 {
2413         struct dlm_migrate_request migrate;
2414         int ret, status = 0;
2415         int nodenum;
2416
2417         memset(&migrate, 0, sizeof(migrate));
2418         migrate.namelen = res->lockname.len;
2419         memcpy(migrate.name, res->lockname.name, migrate.namelen);
2420         migrate.new_master = new_master;
2421         migrate.master = master;
2422
2423         ret = 0;
2424
2425         /* send message to all nodes, except the master and myself */
2426         while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
2427                 if (nodenum == master ||
2428                     nodenum == new_master)
2429                         continue;
2430
2431                 ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
2432                                          &migrate, sizeof(migrate), nodenum,
2433                                          &status);
2434                 if (ret < 0)
2435                         mlog_errno(ret);
2436                 else if (status < 0) {
2437                         mlog(0, "migrate request (node %u) returned %d!\n",
2438                              nodenum, status);
2439                         ret = status;
2440                 }
2441         }
2442
2443         if (ret < 0)
2444                 mlog_errno(ret);
2445
2446         mlog(0, "returning ret=%d\n", ret);
2447         return ret;
2448 }
2449
2450
2451 /* if there is an existing mle for this lockres, we now know who the master is.
2452  * (the one who sent us *this* message) we can clear it up right away.
2453  * since the process that put the mle on the list still has a reference to it,
2454  * we can unhash it now, set the master and wake the process.  as a result,
2455  * we will have no mle in the list to start with.  now we can add an mle for
2456  * the migration and this should be the only one found for those scanning the
2457  * list.  */
2458 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
2459 {
2460         struct dlm_ctxt *dlm = data;
2461         struct dlm_lock_resource *res = NULL;
2462         struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
2463         struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
2464         const char *name;
2465         unsigned int namelen;
2466         int ret = 0;
2467
2468         if (!dlm_grab(dlm))
2469                 return -EINVAL;
2470
2471         name = migrate->name;
2472         namelen = migrate->namelen;
2473
2474         /* preallocate.. if this fails, abort */
2475         mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2476                                                          GFP_KERNEL);
2477
2478         if (!mle) {
2479                 ret = -ENOMEM;
2480                 goto leave;
2481         }
2482
2483         /* check for pre-existing lock */
2484         spin_lock(&dlm->spinlock);
2485         res = __dlm_lookup_lockres(dlm, name, namelen);
2486         spin_lock(&dlm->master_lock);
2487
2488         if (res) {
2489                 spin_lock(&res->spinlock);
2490                 if (res->state & DLM_LOCK_RES_RECOVERING) {
2491                         /* if all is working ok, this can only mean that we got
2492                         * a migrate request from a node that we now see as
2493                         * dead.  what can we do here?  drop it to the floor? */
2494                         spin_unlock(&res->spinlock);
2495                         mlog(ML_ERROR, "Got a migrate request, but the "
2496                              "lockres is marked as recovering!");
2497                         kmem_cache_free(dlm_mle_cache, mle);
2498                         ret = -EINVAL; /* need a better solution */
2499                         goto unlock;
2500                 }
2501                 res->state |= DLM_LOCK_RES_MIGRATING;
2502                 spin_unlock(&res->spinlock);
2503         }
2504
2505         /* ignore status.  only nonzero status would BUG. */
2506         ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
2507                                     name, namelen,
2508                                     migrate->new_master,
2509                                     migrate->master);
2510
2511 unlock:
2512         spin_unlock(&dlm->master_lock);
2513         spin_unlock(&dlm->spinlock);
2514
2515         if (oldmle) {
2516                 /* master is known, detach if not already detached */
2517                 dlm_mle_detach_hb_events(dlm, oldmle);
2518                 dlm_put_mle(oldmle);
2519         }
2520
2521         if (res)
2522                 dlm_lockres_put(res);
2523 leave:
2524         dlm_put(dlm);
2525         return ret;
2526 }
2527
2528 /* must be holding dlm->spinlock and dlm->master_lock
2529  * when adding a migration mle, we can clear any other mles
2530  * in the master list because we know with certainty that
2531  * the master is "master".  so we remove any old mle from
2532  * the list after setting it's master field, and then add
2533  * the new migration mle.  this way we can hold with the rule
2534  * of having only one mle for a given lock name at all times. */
2535 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2536                                  struct dlm_lock_resource *res,
2537                                  struct dlm_master_list_entry *mle,
2538                                  struct dlm_master_list_entry **oldmle,
2539                                  const char *name, unsigned int namelen,
2540                                  u8 new_master, u8 master)
2541 {
2542         int found;
2543         int ret = 0;
2544
2545         *oldmle = NULL;
2546
2547         mlog_entry_void();
2548
2549         assert_spin_locked(&dlm->spinlock);
2550         assert_spin_locked(&dlm->master_lock);
2551
2552         /* caller is responsible for any ref taken here on oldmle */
2553         found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
2554         if (found) {
2555                 struct dlm_master_list_entry *tmp = *oldmle;
2556                 spin_lock(&tmp->spinlock);
2557                 if (tmp->type == DLM_MLE_MIGRATION) {
2558                         if (master == dlm->node_num) {
2559                                 /* ah another process raced me to it */
2560                                 mlog(0, "tried to migrate %.*s, but some "
2561                                      "process beat me to it\n",
2562                                      namelen, name);
2563                                 ret = -EEXIST;
2564                         } else {
2565                                 /* bad.  2 NODES are trying to migrate! */
2566                                 mlog(ML_ERROR, "migration error  mle: "
2567                                      "master=%u new_master=%u // request: "
2568                                      "master=%u new_master=%u // "
2569                                      "lockres=%.*s\n",
2570                                      tmp->master, tmp->new_master,
2571                                      master, new_master,
2572                                      namelen, name);
2573                                 BUG();
2574                         }
2575                 } else {
2576                         /* this is essentially what assert_master does */
2577                         tmp->master = master;
2578                         atomic_set(&tmp->woken, 1);
2579                         wake_up(&tmp->wq);
2580                         /* remove it from the list so that only one
2581                          * mle will be found */
2582                         list_del_init(&tmp->list);
2583                 }
2584                 spin_unlock(&tmp->spinlock);
2585         }
2586
2587         /* now add a migration mle to the tail of the list */
2588         dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
2589         mle->new_master = new_master;
2590         mle->master = master;
2591         /* do this for consistency with other mle types */
2592         set_bit(new_master, mle->maybe_map);
2593         list_add(&mle->list, &dlm->master_list);
2594
2595         return ret;
2596 }
2597
2598
2599 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
2600 {
2601         struct list_head *iter, *iter2;
2602         struct dlm_master_list_entry *mle;
2603         struct dlm_lock_resource *res;
2604
2605         mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
2606 top:
2607         assert_spin_locked(&dlm->spinlock);
2608
2609         /* clean the master list */
2610         spin_lock(&dlm->master_lock);
2611         list_for_each_safe(iter, iter2, &dlm->master_list) {
2612                 mle = list_entry(iter, struct dlm_master_list_entry, list);
2613
2614                 BUG_ON(mle->type != DLM_MLE_BLOCK &&
2615                        mle->type != DLM_MLE_MASTER &&
2616                        mle->type != DLM_MLE_MIGRATION);
2617
2618                 /* MASTER mles are initiated locally.  the waiting
2619                  * process will notice the node map change
2620                  * shortly.  let that happen as normal. */
2621                 if (mle->type == DLM_MLE_MASTER)
2622                         continue;
2623
2624
2625                 /* BLOCK mles are initiated by other nodes.
2626                  * need to clean up if the dead node would have
2627                  * been the master. */
2628                 if (mle->type == DLM_MLE_BLOCK) {
2629                         int bit;
2630
2631                         spin_lock(&mle->spinlock);
2632                         bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
2633                         if (bit != dead_node) {
2634                                 mlog(0, "mle found, but dead node %u would "
2635                                      "not have been master\n", dead_node);
2636                                 spin_unlock(&mle->spinlock);
2637                         } else {
2638                                 /* must drop the refcount by one since the
2639                                  * assert_master will never arrive.  this
2640                                  * may result in the mle being unlinked and
2641                                  * freed, but there may still be a process
2642                                  * waiting in the dlmlock path which is fine. */
2643                                 mlog(ML_ERROR, "node %u was expected master\n",
2644                                      dead_node);
2645                                 atomic_set(&mle->woken, 1);
2646                                 spin_unlock(&mle->spinlock);
2647                                 wake_up(&mle->wq);
2648                                 /* do not need events any longer, so detach 
2649                                  * from heartbeat */
2650                                 __dlm_mle_detach_hb_events(dlm, mle);
2651                                 __dlm_put_mle(mle);
2652                         }
2653                         continue;
2654                 }
2655
2656                 /* everything else is a MIGRATION mle */
2657
2658                 /* the rule for MIGRATION mles is that the master
2659                  * becomes UNKNOWN if *either* the original or
2660                  * the new master dies.  all UNKNOWN lockreses
2661                  * are sent to whichever node becomes the recovery
2662                  * master.  the new master is responsible for
2663                  * determining if there is still a master for
2664                  * this lockres, or if he needs to take over
2665                  * mastery.  either way, this node should expect
2666                  * another message to resolve this. */
2667                 if (mle->master != dead_node &&
2668                     mle->new_master != dead_node)
2669                         continue;
2670
2671                 /* if we have reached this point, this mle needs to
2672                  * be removed from the list and freed. */
2673
2674                 /* remove from the list early.  NOTE: unlinking
2675                  * list_head while in list_for_each_safe */
2676                 spin_lock(&mle->spinlock);
2677                 list_del_init(&mle->list);
2678                 atomic_set(&mle->woken, 1);
2679                 spin_unlock(&mle->spinlock);
2680                 wake_up(&mle->wq);
2681
2682                 mlog(0, "node %u died during migration from "
2683                      "%u to %u!\n", dead_node,
2684                      mle->master, mle->new_master);
2685                 /* if there is a lockres associated with this
2686                  * mle, find it and set its owner to UNKNOWN */
2687                 res = __dlm_lookup_lockres(dlm, mle->u.name.name,
2688                                         mle->u.name.len);
2689                 if (res) {
2690                         /* unfortunately if we hit this rare case, our
2691                          * lock ordering is messed.  we need to drop
2692                          * the master lock so that we can take the
2693                          * lockres lock, meaning that we will have to
2694                          * restart from the head of list. */
2695                         spin_unlock(&dlm->master_lock);
2696
2697                         /* move lockres onto recovery list */
2698                         spin_lock(&res->spinlock);
2699                         dlm_set_lockres_owner(dlm, res,
2700                                         DLM_LOCK_RES_OWNER_UNKNOWN);
2701                         dlm_move_lockres_to_recovery_list(dlm, res);
2702                         spin_unlock(&res->spinlock);
2703                         dlm_lockres_put(res);
2704
2705                         /* about to get rid of mle, detach from heartbeat */
2706                         __dlm_mle_detach_hb_events(dlm, mle);
2707
2708                         /* dump the mle */
2709                         spin_lock(&dlm->master_lock);
2710                         __dlm_put_mle(mle);
2711                         spin_unlock(&dlm->master_lock);
2712
2713                         /* restart */
2714                         goto top;
2715                 }
2716
2717                 /* this may be the last reference */
2718                 __dlm_put_mle(mle);
2719         }
2720         spin_unlock(&dlm->master_lock);
2721 }
2722
2723
2724 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2725                          u8 old_master)
2726 {
2727         struct dlm_node_iter iter;
2728         int ret = 0;
2729
2730         spin_lock(&dlm->spinlock);
2731         dlm_node_iter_init(dlm->domain_map, &iter);
2732         clear_bit(old_master, iter.node_map);
2733         clear_bit(dlm->node_num, iter.node_map);
2734         spin_unlock(&dlm->spinlock);
2735
2736         mlog(0, "now time to do a migrate request to other nodes\n");
2737         ret = dlm_do_migrate_request(dlm, res, old_master,
2738                                      dlm->node_num, &iter);
2739         if (ret < 0) {
2740                 mlog_errno(ret);
2741                 goto leave;
2742         }
2743
2744         mlog(0, "doing assert master of %.*s to all except the original node\n",
2745              res->lockname.len, res->lockname.name);
2746         /* this call now finishes out the nodemap
2747          * even if one or more nodes die */
2748         ret = dlm_do_assert_master(dlm, res->lockname.name,
2749                                    res->lockname.len, iter.node_map,
2750                                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
2751         if (ret < 0) {
2752                 /* no longer need to retry.  all living nodes contacted. */
2753                 mlog_errno(ret);
2754                 ret = 0;
2755         }
2756
2757         memset(iter.node_map, 0, sizeof(iter.node_map));
2758         set_bit(old_master, iter.node_map);
2759         mlog(0, "doing assert master of %.*s back to %u\n",
2760              res->lockname.len, res->lockname.name, old_master);
2761         ret = dlm_do_assert_master(dlm, res->lockname.name,
2762                                    res->lockname.len, iter.node_map,
2763                                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
2764         if (ret < 0) {
2765                 mlog(0, "assert master to original master failed "
2766                      "with %d.\n", ret);
2767                 /* the only nonzero status here would be because of
2768                  * a dead original node.  we're done. */
2769                 ret = 0;
2770         }
2771
2772         /* all done, set the owner, clear the flag */
2773         spin_lock(&res->spinlock);
2774         dlm_set_lockres_owner(dlm, res, dlm->node_num);
2775         res->state &= ~DLM_LOCK_RES_MIGRATING;
2776         spin_unlock(&res->spinlock);
2777         /* re-dirty it on the new master */
2778         dlm_kick_thread(dlm, res);
2779         wake_up(&res->wq);
2780 leave:
2781         return ret;
2782 }
2783
2784 /*
2785  * LOCKRES AST REFCOUNT
2786  * this is integral to migration
2787  */
2788
2789 /* for future intent to call an ast, reserve one ahead of time.
2790  * this should be called only after waiting on the lockres
2791  * with dlm_wait_on_lockres, and while still holding the
2792  * spinlock after the call. */
2793 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
2794 {
2795         assert_spin_locked(&res->spinlock);
2796         if (res->state & DLM_LOCK_RES_MIGRATING) {
2797                 __dlm_print_one_lock_resource(res);
2798         }
2799         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
2800
2801         atomic_inc(&res->asts_reserved);
2802 }
2803
2804 /*
2805  * used to drop the reserved ast, either because it went unused,
2806  * or because the ast/bast was actually called.
2807  *
2808  * also, if there is a pending migration on this lockres,
2809  * and this was the last pending ast on the lockres,
2810  * atomically set the MIGRATING flag before we drop the lock.
2811  * this is how we ensure that migration can proceed with no
2812  * asts in progress.  note that it is ok if the state of the
2813  * queues is such that a lock should be granted in the future
2814  * or that a bast should be fired, because the new master will
2815  * shuffle the lists on this lockres as soon as it is migrated.
2816  */
2817 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
2818                              struct dlm_lock_resource *res)
2819 {
2820         if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
2821                 return;
2822
2823         if (!res->migration_pending) {
2824                 spin_unlock(&res->spinlock);
2825                 return;
2826         }
2827
2828         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
2829         res->migration_pending = 0;
2830         res->state |= DLM_LOCK_RES_MIGRATING;
2831         spin_unlock(&res->spinlock);
2832         wake_up(&res->wq);
2833         wake_up(&dlm->migration_wq);
2834 }