Merge tag 'r8169-upstream-20061204-00' of git://electric-eye.fr.zoreil.com/home/romie...
[linux-2.6] / fs / ocfs2 / cluster / heartbeat.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public
17  * License along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA.
20  */
21
22 #include <linux/kernel.h>
23 #include <linux/sched.h>
24 #include <linux/jiffies.h>
25 #include <linux/module.h>
26 #include <linux/fs.h>
27 #include <linux/bio.h>
28 #include <linux/blkdev.h>
29 #include <linux/delay.h>
30 #include <linux/file.h>
31 #include <linux/kthread.h>
32 #include <linux/configfs.h>
33 #include <linux/random.h>
34 #include <linux/crc32.h>
35 #include <linux/time.h>
36
37 #include "heartbeat.h"
38 #include "tcp.h"
39 #include "nodemanager.h"
40 #include "quorum.h"
41
42 #include "masklog.h"
43
44
45 /*
46  * The first heartbeat pass had one global thread that would serialize all hb
47  * callback calls.  This global serializing sem should only be removed once
48  * we've made sure that all callees can deal with being called concurrently
49  * from multiple hb region threads.
50  */
51 static DECLARE_RWSEM(o2hb_callback_sem);
52
53 /*
54  * multiple hb threads are watching multiple regions.  A node is live
55  * whenever any of the threads sees activity from the node in its region.
56  */
57 static DEFINE_SPINLOCK(o2hb_live_lock);
58 static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
59 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
60 static LIST_HEAD(o2hb_node_events);
61 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
62
63 static LIST_HEAD(o2hb_all_regions);
64
65 static struct o2hb_callback {
66         struct list_head list;
67 } o2hb_callbacks[O2HB_NUM_CB];
68
69 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
70
71 #define O2HB_DEFAULT_BLOCK_BITS       9
72
73 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
74
75 /* Only sets a new threshold if there are no active regions. 
76  *
77  * No locking or otherwise interesting code is required for reading
78  * o2hb_dead_threshold as it can't change once regions are active and
79  * it's not interesting to anyone until then anyway. */
80 static void o2hb_dead_threshold_set(unsigned int threshold)
81 {
82         if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
83                 spin_lock(&o2hb_live_lock);
84                 if (list_empty(&o2hb_all_regions))
85                         o2hb_dead_threshold = threshold;
86                 spin_unlock(&o2hb_live_lock);
87         }
88 }
89
90 struct o2hb_node_event {
91         struct list_head        hn_item;
92         enum o2hb_callback_type hn_event_type;
93         struct o2nm_node        *hn_node;
94         int                     hn_node_num;
95 };
96
97 struct o2hb_disk_slot {
98         struct o2hb_disk_heartbeat_block *ds_raw_block;
99         u8                      ds_node_num;
100         u64                     ds_last_time;
101         u64                     ds_last_generation;
102         u16                     ds_equal_samples;
103         u16                     ds_changed_samples;
104         struct list_head        ds_live_item;
105 };
106
107 /* each thread owns a region.. when we're asked to tear down the region
108  * we ask the thread to stop, who cleans up the region */
109 struct o2hb_region {
110         struct config_item      hr_item;
111
112         struct list_head        hr_all_item;
113         unsigned                hr_unclean_stop:1;
114
115         /* protected by the hr_callback_sem */
116         struct task_struct      *hr_task;
117
118         unsigned int            hr_blocks;
119         unsigned long long      hr_start_block;
120
121         unsigned int            hr_block_bits;
122         unsigned int            hr_block_bytes;
123
124         unsigned int            hr_slots_per_page;
125         unsigned int            hr_num_pages;
126
127         struct page             **hr_slot_data;
128         struct block_device     *hr_bdev;
129         struct o2hb_disk_slot   *hr_slots;
130
131         /* let the person setting up hb wait for it to return until it
132          * has reached a 'steady' state.  This will be fixed when we have
133          * a more complete api that doesn't lead to this sort of fragility. */
134         atomic_t                hr_steady_iterations;
135
136         char                    hr_dev_name[BDEVNAME_SIZE];
137
138         unsigned int            hr_timeout_ms;
139
140         /* randomized as the region goes up and down so that a node
141          * recognizes a node going up and down in one iteration */
142         u64                     hr_generation;
143
144         struct delayed_work     hr_write_timeout_work;
145         unsigned long           hr_last_timeout_start;
146
147         /* Used during o2hb_check_slot to hold a copy of the block
148          * being checked because we temporarily have to zero out the
149          * crc field. */
150         struct o2hb_disk_heartbeat_block *hr_tmp_block;
151 };
152
153 struct o2hb_bio_wait_ctxt {
154         atomic_t          wc_num_reqs;
155         struct completion wc_io_complete;
156         int               wc_error;
157 };
158
159 static void o2hb_write_timeout(struct work_struct *work)
160 {
161         struct o2hb_region *reg =
162                 container_of(work, struct o2hb_region,
163                              hr_write_timeout_work.work);
164
165         mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
166              "milliseconds\n", reg->hr_dev_name,
167              jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); 
168         o2quo_disk_timeout();
169 }
170
171 static void o2hb_arm_write_timeout(struct o2hb_region *reg)
172 {
173         mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS);
174
175         cancel_delayed_work(&reg->hr_write_timeout_work);
176         reg->hr_last_timeout_start = jiffies;
177         schedule_delayed_work(&reg->hr_write_timeout_work,
178                               msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
179 }
180
181 static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
182 {
183         cancel_delayed_work(&reg->hr_write_timeout_work);
184         flush_scheduled_work();
185 }
186
187 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc,
188                                       unsigned int num_ios)
189 {
190         atomic_set(&wc->wc_num_reqs, num_ios);
191         init_completion(&wc->wc_io_complete);
192         wc->wc_error = 0;
193 }
194
195 /* Used in error paths too */
196 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
197                                      unsigned int num)
198 {
199         /* sadly atomic_sub_and_test() isn't available on all platforms.  The
200          * good news is that the fast path only completes one at a time */
201         while(num--) {
202                 if (atomic_dec_and_test(&wc->wc_num_reqs)) {
203                         BUG_ON(num > 0);
204                         complete(&wc->wc_io_complete);
205                 }
206         }
207 }
208
209 static void o2hb_wait_on_io(struct o2hb_region *reg,
210                             struct o2hb_bio_wait_ctxt *wc)
211 {
212         struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;
213
214         blk_run_address_space(mapping);
215
216         wait_for_completion(&wc->wc_io_complete);
217 }
218
219 static int o2hb_bio_end_io(struct bio *bio,
220                            unsigned int bytes_done,
221                            int error)
222 {
223         struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
224
225         if (error) {
226                 mlog(ML_ERROR, "IO Error %d\n", error);
227                 wc->wc_error = error;
228         }
229
230         if (bio->bi_size)
231                 return 1;
232
233         o2hb_bio_wait_dec(wc, 1);
234         return 0;
235 }
236
237 /* Setup a Bio to cover I/O against num_slots slots starting at
238  * start_slot. */
239 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
240                                       struct o2hb_bio_wait_ctxt *wc,
241                                       unsigned int start_slot,
242                                       unsigned int num_slots)
243 {
244         int i, nr_vecs, len, first_page, last_page;
245         unsigned int vec_len, vec_start;
246         unsigned int bits = reg->hr_block_bits;
247         unsigned int spp = reg->hr_slots_per_page;
248         struct bio *bio;
249         struct page *page;
250
251         nr_vecs = (num_slots + spp - 1) / spp;
252
253         /* Testing has shown this allocation to take long enough under
254          * GFP_KERNEL that the local node can get fenced. It would be
255          * nicest if we could pre-allocate these bios and avoid this
256          * all together. */
257         bio = bio_alloc(GFP_ATOMIC, nr_vecs);
258         if (!bio) {
259                 mlog(ML_ERROR, "Could not alloc slots BIO!\n");
260                 bio = ERR_PTR(-ENOMEM);
261                 goto bail;
262         }
263
264         /* Must put everything in 512 byte sectors for the bio... */
265         bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9);
266         bio->bi_bdev = reg->hr_bdev;
267         bio->bi_private = wc;
268         bio->bi_end_io = o2hb_bio_end_io;
269
270         first_page = start_slot / spp;
271         last_page = first_page + nr_vecs;
272         vec_start = (start_slot << bits) % PAGE_CACHE_SIZE;
273         for(i = first_page; i < last_page; i++) {
274                 page = reg->hr_slot_data[i];
275
276                 vec_len = PAGE_CACHE_SIZE;
277                 /* last page might be short */
278                 if (((i + 1) * spp) > (start_slot + num_slots))
279                         vec_len = ((num_slots + start_slot) % spp) << bits;
280                 vec_len -=  vec_start;
281
282                 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
283                      i, vec_len, vec_start);
284
285                 len = bio_add_page(bio, page, vec_len, vec_start);
286                 if (len != vec_len) {
287                         bio_put(bio);
288                         bio = ERR_PTR(-EIO);
289
290                         mlog(ML_ERROR, "Error adding page to bio i = %d, "
291                              "vec_len = %u, len = %d\n, start = %u\n",
292                              i, vec_len, len, vec_start);
293                         goto bail;
294                 }
295
296                 vec_start = 0;
297         }
298
299 bail:
300         return bio;
301 }
302
303 /*
304  * Compute the maximum number of sectors the bdev can handle in one bio,
305  * as a power of two.
306  *
307  * Stolen from oracleasm, thanks Joel!
308  */
309 static int compute_max_sectors(struct block_device *bdev)
310 {
311         int max_pages, max_sectors, pow_two_sectors;
312
313         struct request_queue *q;
314
315         q = bdev_get_queue(bdev);
316         max_pages = q->max_sectors >> (PAGE_SHIFT - 9);
317         if (max_pages > BIO_MAX_PAGES)
318                 max_pages = BIO_MAX_PAGES;
319         if (max_pages > q->max_phys_segments)
320                 max_pages = q->max_phys_segments;
321         if (max_pages > q->max_hw_segments)
322                 max_pages = q->max_hw_segments;
323         max_pages--; /* Handle I/Os that straddle a page */
324
325         if (max_pages) {
326                 max_sectors = max_pages << (PAGE_SHIFT - 9);
327         } else {
328                 /* If BIO contains 1 or less than 1 page. */
329                 max_sectors = q->max_sectors;
330         }
331         /* Why is fls() 1-based???? */
332         pow_two_sectors = 1 << (fls(max_sectors) - 1);
333
334         return pow_two_sectors;
335 }
336
337 static inline void o2hb_compute_request_limits(struct o2hb_region *reg,
338                                                unsigned int num_slots,
339                                                unsigned int *num_bios,
340                                                unsigned int *slots_per_bio)
341 {
342         unsigned int max_sectors, io_sectors;
343
344         max_sectors = compute_max_sectors(reg->hr_bdev);
345
346         io_sectors = num_slots << (reg->hr_block_bits - 9);
347
348         *num_bios = (io_sectors + max_sectors - 1) / max_sectors;
349         *slots_per_bio = max_sectors >> (reg->hr_block_bits - 9);
350
351         mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This "
352              "device can handle %u sectors of I/O\n", io_sectors, num_slots,
353              max_sectors);
354         mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n",
355              *num_bios, *slots_per_bio);
356 }
357
358 static int o2hb_read_slots(struct o2hb_region *reg,
359                            unsigned int max_slots)
360 {
361         unsigned int num_bios, slots_per_bio, start_slot, num_slots;
362         int i, status;
363         struct o2hb_bio_wait_ctxt wc;
364         struct bio **bios;
365         struct bio *bio;
366
367         o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio);
368
369         bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL);
370         if (!bios) {
371                 status = -ENOMEM;
372                 mlog_errno(status);
373                 return status;
374         }
375
376         o2hb_bio_wait_init(&wc, num_bios);
377
378         num_slots = slots_per_bio;
379         for(i = 0; i < num_bios; i++) {
380                 start_slot = i * slots_per_bio;
381
382                 /* adjust num_slots at last bio */
383                 if (max_slots < (start_slot + num_slots))
384                         num_slots = max_slots - start_slot;
385
386                 bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots);
387                 if (IS_ERR(bio)) {
388                         o2hb_bio_wait_dec(&wc, num_bios - i);
389
390                         status = PTR_ERR(bio);
391                         mlog_errno(status);
392                         goto bail_and_wait;
393                 }
394                 bios[i] = bio;
395
396                 submit_bio(READ, bio);
397         }
398
399         status = 0;
400
401 bail_and_wait:
402         o2hb_wait_on_io(reg, &wc);
403         if (wc.wc_error && !status)
404                 status = wc.wc_error;
405
406         if (bios) {
407                 for(i = 0; i < num_bios; i++)
408                         if (bios[i])
409                                 bio_put(bios[i]);
410                 kfree(bios);
411         }
412
413         return status;
414 }
415
416 static int o2hb_issue_node_write(struct o2hb_region *reg,
417                                  struct bio **write_bio,
418                                  struct o2hb_bio_wait_ctxt *write_wc)
419 {
420         int status;
421         unsigned int slot;
422         struct bio *bio;
423
424         o2hb_bio_wait_init(write_wc, 1);
425
426         slot = o2nm_this_node();
427
428         bio = o2hb_setup_one_bio(reg, write_wc, slot, 1);
429         if (IS_ERR(bio)) {
430                 status = PTR_ERR(bio);
431                 mlog_errno(status);
432                 goto bail;
433         }
434
435         submit_bio(WRITE, bio);
436
437         *write_bio = bio;
438         status = 0;
439 bail:
440         return status;
441 }
442
443 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
444                                      struct o2hb_disk_heartbeat_block *hb_block)
445 {
446         __le32 old_cksum;
447         u32 ret;
448
449         /* We want to compute the block crc with a 0 value in the
450          * hb_cksum field. Save it off here and replace after the
451          * crc. */
452         old_cksum = hb_block->hb_cksum;
453         hb_block->hb_cksum = 0;
454
455         ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
456
457         hb_block->hb_cksum = old_cksum;
458
459         return ret;
460 }
461
462 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
463 {
464         mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
465              "cksum = 0x%x, generation 0x%llx\n",
466              (long long)le64_to_cpu(hb_block->hb_seq),
467              hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
468              (long long)le64_to_cpu(hb_block->hb_generation));
469 }
470
471 static int o2hb_verify_crc(struct o2hb_region *reg,
472                            struct o2hb_disk_heartbeat_block *hb_block)
473 {
474         u32 read, computed;
475
476         read = le32_to_cpu(hb_block->hb_cksum);
477         computed = o2hb_compute_block_crc_le(reg, hb_block);
478
479         return read == computed;
480 }
481
482 /* We want to make sure that nobody is heartbeating on top of us --
483  * this will help detect an invalid configuration. */
484 static int o2hb_check_last_timestamp(struct o2hb_region *reg)
485 {
486         int node_num, ret;
487         struct o2hb_disk_slot *slot;
488         struct o2hb_disk_heartbeat_block *hb_block;
489
490         node_num = o2nm_this_node();
491
492         ret = 1;
493         slot = &reg->hr_slots[node_num];
494         /* Don't check on our 1st timestamp */
495         if (slot->ds_last_time) {
496                 hb_block = slot->ds_raw_block;
497
498                 if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time)
499                         ret = 0;
500         }
501
502         return ret;
503 }
504
505 static inline void o2hb_prepare_block(struct o2hb_region *reg,
506                                       u64 generation)
507 {
508         int node_num;
509         u64 cputime;
510         struct o2hb_disk_slot *slot;
511         struct o2hb_disk_heartbeat_block *hb_block;
512
513         node_num = o2nm_this_node();
514         slot = &reg->hr_slots[node_num];
515
516         hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
517         memset(hb_block, 0, reg->hr_block_bytes);
518         /* TODO: time stuff */
519         cputime = CURRENT_TIME.tv_sec;
520         if (!cputime)
521                 cputime = 1;
522
523         hb_block->hb_seq = cpu_to_le64(cputime);
524         hb_block->hb_node = node_num;
525         hb_block->hb_generation = cpu_to_le64(generation);
526         hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
527
528         /* This step must always happen last! */
529         hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
530                                                                    hb_block));
531
532         mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
533              (long long)cpu_to_le64(generation),
534              le32_to_cpu(hb_block->hb_cksum));
535 }
536
537 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
538                                 struct o2nm_node *node,
539                                 int idx)
540 {
541         struct list_head *iter;
542         struct o2hb_callback_func *f;
543
544         list_for_each(iter, &hbcall->list) {
545                 f = list_entry(iter, struct o2hb_callback_func, hc_item);
546                 mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
547                 (f->hc_func)(node, idx, f->hc_data);
548         }
549 }
550
551 /* Will run the list in order until we process the passed event */
552 static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
553 {
554         int empty;
555         struct o2hb_callback *hbcall;
556         struct o2hb_node_event *event;
557
558         spin_lock(&o2hb_live_lock);
559         empty = list_empty(&queued_event->hn_item);
560         spin_unlock(&o2hb_live_lock);
561         if (empty)
562                 return;
563
564         /* Holding callback sem assures we don't alter the callback
565          * lists when doing this, and serializes ourselves with other
566          * processes wanting callbacks. */
567         down_write(&o2hb_callback_sem);
568
569         spin_lock(&o2hb_live_lock);
570         while (!list_empty(&o2hb_node_events)
571                && !list_empty(&queued_event->hn_item)) {
572                 event = list_entry(o2hb_node_events.next,
573                                    struct o2hb_node_event,
574                                    hn_item);
575                 list_del_init(&event->hn_item);
576                 spin_unlock(&o2hb_live_lock);
577
578                 mlog(ML_HEARTBEAT, "Node %s event for %d\n",
579                      event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
580                      event->hn_node_num);
581
582                 hbcall = hbcall_from_type(event->hn_event_type);
583
584                 /* We should *never* have gotten on to the list with a
585                  * bad type... This isn't something that we should try
586                  * to recover from. */
587                 BUG_ON(IS_ERR(hbcall));
588
589                 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
590
591                 spin_lock(&o2hb_live_lock);
592         }
593         spin_unlock(&o2hb_live_lock);
594
595         up_write(&o2hb_callback_sem);
596 }
597
598 static void o2hb_queue_node_event(struct o2hb_node_event *event,
599                                   enum o2hb_callback_type type,
600                                   struct o2nm_node *node,
601                                   int node_num)
602 {
603         assert_spin_locked(&o2hb_live_lock);
604
605         event->hn_event_type = type;
606         event->hn_node = node;
607         event->hn_node_num = node_num;
608
609         mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
610              type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
611
612         list_add_tail(&event->hn_item, &o2hb_node_events);
613 }
614
615 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
616 {
617         struct o2hb_node_event event =
618                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
619         struct o2nm_node *node;
620
621         node = o2nm_get_node_by_num(slot->ds_node_num);
622         if (!node)
623                 return;
624
625         spin_lock(&o2hb_live_lock);
626         if (!list_empty(&slot->ds_live_item)) {
627                 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
628                      slot->ds_node_num);
629
630                 list_del_init(&slot->ds_live_item);
631
632                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
633                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
634
635                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
636                                               slot->ds_node_num);
637                 }
638         }
639         spin_unlock(&o2hb_live_lock);
640
641         o2hb_run_event_list(&event);
642
643         o2nm_node_put(node);
644 }
645
646 static int o2hb_check_slot(struct o2hb_region *reg,
647                            struct o2hb_disk_slot *slot)
648 {
649         int changed = 0, gen_changed = 0;
650         struct o2hb_node_event event =
651                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
652         struct o2nm_node *node;
653         struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
654         u64 cputime;
655         unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
656         unsigned int slot_dead_ms;
657
658         memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
659
660         /* Is this correct? Do we assume that the node doesn't exist
661          * if we're not configured for him? */
662         node = o2nm_get_node_by_num(slot->ds_node_num);
663         if (!node)
664                 return 0;
665
666         if (!o2hb_verify_crc(reg, hb_block)) {
667                 /* all paths from here will drop o2hb_live_lock for
668                  * us. */
669                 spin_lock(&o2hb_live_lock);
670
671                 /* Don't print an error on the console in this case -
672                  * a freshly formatted heartbeat area will not have a
673                  * crc set on it. */
674                 if (list_empty(&slot->ds_live_item))
675                         goto out;
676
677                 /* The node is live but pushed out a bad crc. We
678                  * consider it a transient miss but don't populate any
679                  * other values as they may be junk. */
680                 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
681                      slot->ds_node_num, reg->hr_dev_name);
682                 o2hb_dump_slot(hb_block);
683
684                 slot->ds_equal_samples++;
685                 goto fire_callbacks;
686         }
687
688         /* we don't care if these wrap.. the state transitions below
689          * clear at the right places */
690         cputime = le64_to_cpu(hb_block->hb_seq);
691         if (slot->ds_last_time != cputime)
692                 slot->ds_changed_samples++;
693         else
694                 slot->ds_equal_samples++;
695         slot->ds_last_time = cputime;
696
697         /* The node changed heartbeat generations. We assume this to
698          * mean it dropped off but came back before we timed out. We
699          * want to consider it down for the time being but don't want
700          * to lose any changed_samples state we might build up to
701          * considering it live again. */
702         if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
703                 gen_changed = 1;
704                 slot->ds_equal_samples = 0;
705                 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
706                      "to 0x%llx)\n", slot->ds_node_num,
707                      (long long)slot->ds_last_generation,
708                      (long long)le64_to_cpu(hb_block->hb_generation));
709         }
710
711         slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
712
713         mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
714              "seq %llu last %llu changed %u equal %u\n",
715              slot->ds_node_num, (long long)slot->ds_last_generation,
716              le32_to_cpu(hb_block->hb_cksum),
717              (unsigned long long)le64_to_cpu(hb_block->hb_seq), 
718              (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
719              slot->ds_equal_samples);
720
721         spin_lock(&o2hb_live_lock);
722
723 fire_callbacks:
724         /* dead nodes only come to life after some number of
725          * changes at any time during their dead time */
726         if (list_empty(&slot->ds_live_item) &&
727             slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
728                 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
729                      slot->ds_node_num, (long long)slot->ds_last_generation);
730
731                 /* first on the list generates a callback */
732                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
733                         set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
734
735                         o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
736                                               slot->ds_node_num);
737
738                         changed = 1;
739                 }
740
741                 list_add_tail(&slot->ds_live_item,
742                               &o2hb_live_slots[slot->ds_node_num]);
743
744                 slot->ds_equal_samples = 0;
745
746                 /* We want to be sure that all nodes agree on the
747                  * number of milliseconds before a node will be
748                  * considered dead. The self-fencing timeout is
749                  * computed from this value, and a discrepancy might
750                  * result in heartbeat calling a node dead when it
751                  * hasn't self-fenced yet. */
752                 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
753                 if (slot_dead_ms && slot_dead_ms != dead_ms) {
754                         /* TODO: Perhaps we can fail the region here. */
755                         mlog(ML_ERROR, "Node %d on device %s has a dead count "
756                              "of %u ms, but our count is %u ms.\n"
757                              "Please double check your configuration values "
758                              "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
759                              slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
760                              dead_ms);
761                 }
762                 goto out;
763         }
764
765         /* if the list is dead, we're done.. */
766         if (list_empty(&slot->ds_live_item))
767                 goto out;
768
769         /* live nodes only go dead after enough consequtive missed
770          * samples..  reset the missed counter whenever we see
771          * activity */
772         if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
773                 mlog(ML_HEARTBEAT, "Node %d left my region\n",
774                      slot->ds_node_num);
775
776                 /* last off the live_slot generates a callback */
777                 list_del_init(&slot->ds_live_item);
778                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
779                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
780
781                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
782                                               slot->ds_node_num);
783
784                         changed = 1;
785                 }
786
787                 /* We don't clear this because the node is still
788                  * actually writing new blocks. */
789                 if (!gen_changed)
790                         slot->ds_changed_samples = 0;
791                 goto out;
792         }
793         if (slot->ds_changed_samples) {
794                 slot->ds_changed_samples = 0;
795                 slot->ds_equal_samples = 0;
796         }
797 out:
798         spin_unlock(&o2hb_live_lock);
799
800         o2hb_run_event_list(&event);
801
802         o2nm_node_put(node);
803         return changed;
804 }
805
806 /* This could be faster if we just implmented a find_last_bit, but I
807  * don't think the circumstances warrant it. */
808 static int o2hb_highest_node(unsigned long *nodes,
809                              int numbits)
810 {
811         int highest, node;
812
813         highest = numbits;
814         node = -1;
815         while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) {
816                 if (node >= numbits)
817                         break;
818
819                 highest = node;
820         }
821
822         return highest;
823 }
824
825 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
826 {
827         int i, ret, highest_node, change = 0;
828         unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
829         struct bio *write_bio;
830         struct o2hb_bio_wait_ctxt write_wc;
831
832         ret = o2nm_configured_node_map(configured_nodes,
833                                        sizeof(configured_nodes));
834         if (ret) {
835                 mlog_errno(ret);
836                 return ret;
837         }
838
839         highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
840         if (highest_node >= O2NM_MAX_NODES) {
841                 mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n");
842                 return -EINVAL;
843         }
844
845         /* No sense in reading the slots of nodes that don't exist
846          * yet. Of course, if the node definitions have holes in them
847          * then we're reading an empty slot anyway... Consider this
848          * best-effort. */
849         ret = o2hb_read_slots(reg, highest_node + 1);
850         if (ret < 0) {
851                 mlog_errno(ret);
852                 return ret;
853         }
854
855         /* With an up to date view of the slots, we can check that no
856          * other node has been improperly configured to heartbeat in
857          * our slot. */
858         if (!o2hb_check_last_timestamp(reg))
859                 mlog(ML_ERROR, "Device \"%s\": another node is heartbeating "
860                      "in our slot!\n", reg->hr_dev_name);
861
862         /* fill in the proper info for our next heartbeat */
863         o2hb_prepare_block(reg, reg->hr_generation);
864
865         /* And fire off the write. Note that we don't wait on this I/O
866          * until later. */
867         ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
868         if (ret < 0) {
869                 mlog_errno(ret);
870                 return ret;
871         }
872
873         i = -1;
874         while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
875
876                 change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
877         }
878
879         /*
880          * We have to be sure we've advertised ourselves on disk
881          * before we can go to steady state.  This ensures that
882          * people we find in our steady state have seen us.
883          */
884         o2hb_wait_on_io(reg, &write_wc);
885         bio_put(write_bio);
886         if (write_wc.wc_error) {
887                 /* Do not re-arm the write timeout on I/O error - we
888                  * can't be sure that the new block ever made it to
889                  * disk */
890                 mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
891                      write_wc.wc_error, reg->hr_dev_name);
892                 return write_wc.wc_error;
893         }
894
895         o2hb_arm_write_timeout(reg);
896
897         /* let the person who launched us know when things are steady */
898         if (!change && (atomic_read(&reg->hr_steady_iterations) != 0)) {
899                 if (atomic_dec_and_test(&reg->hr_steady_iterations))
900                         wake_up(&o2hb_steady_queue);
901         }
902
903         return 0;
904 }
905
906 /* Subtract b from a, storing the result in a. a *must* have a larger
907  * value than b. */
908 static void o2hb_tv_subtract(struct timeval *a,
909                              struct timeval *b)
910 {
911         /* just return 0 when a is after b */
912         if (a->tv_sec < b->tv_sec ||
913             (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) {
914                 a->tv_sec = 0;
915                 a->tv_usec = 0;
916                 return;
917         }
918
919         a->tv_sec -= b->tv_sec;
920         a->tv_usec -= b->tv_usec;
921         while ( a->tv_usec < 0 ) {
922                 a->tv_sec--;
923                 a->tv_usec += 1000000;
924         }
925 }
926
927 static unsigned int o2hb_elapsed_msecs(struct timeval *start,
928                                        struct timeval *end)
929 {
930         struct timeval res = *end;
931
932         o2hb_tv_subtract(&res, start);
933
934         return res.tv_sec * 1000 + res.tv_usec / 1000;
935 }
936
937 /*
938  * we ride the region ref that the region dir holds.  before the region
939  * dir is removed and drops it ref it will wait to tear down this
940  * thread.
941  */
942 static int o2hb_thread(void *data)
943 {
944         int i, ret;
945         struct o2hb_region *reg = data;
946         struct bio *write_bio;
947         struct o2hb_bio_wait_ctxt write_wc;
948         struct timeval before_hb, after_hb;
949         unsigned int elapsed_msec;
950
951         mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
952
953         set_user_nice(current, -20);
954
955         while (!kthread_should_stop() && !reg->hr_unclean_stop) {
956                 /* We track the time spent inside
957                  * o2hb_do_disk_heartbeat so that we avoid more then
958                  * hr_timeout_ms between disk writes. On busy systems
959                  * this should result in a heartbeat which is less
960                  * likely to time itself out. */
961                 do_gettimeofday(&before_hb);
962
963                 i = 0;
964                 do {
965                         ret = o2hb_do_disk_heartbeat(reg);
966                 } while (ret && ++i < 2);
967
968                 do_gettimeofday(&after_hb);
969                 elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb);
970
971                 mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n",
972                      before_hb.tv_sec, (unsigned long) before_hb.tv_usec,
973                      after_hb.tv_sec, (unsigned long) after_hb.tv_usec,
974                      elapsed_msec);
975
976                 if (elapsed_msec < reg->hr_timeout_ms) {
977                         /* the kthread api has blocked signals for us so no
978                          * need to record the return value. */
979                         msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
980                 }
981         }
982
983         o2hb_disarm_write_timeout(reg);
984
985         /* unclean stop is only used in very bad situation */
986         for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
987                 o2hb_shutdown_slot(&reg->hr_slots[i]);
988
989         /* Explicit down notification - avoid forcing the other nodes
990          * to timeout on this region when we could just as easily
991          * write a clear generation - thus indicating to them that
992          * this node has left this region.
993          *
994          * XXX: Should we skip this on unclean_stop? */
995         o2hb_prepare_block(reg, 0);
996         ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
997         if (ret == 0) {
998                 o2hb_wait_on_io(reg, &write_wc);
999                 bio_put(write_bio);
1000         } else {
1001                 mlog_errno(ret);
1002         }
1003
1004         mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n");
1005
1006         return 0;
1007 }
1008
1009 void o2hb_init(void)
1010 {
1011         int i;
1012
1013         for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
1014                 INIT_LIST_HEAD(&o2hb_callbacks[i].list);
1015
1016         for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
1017                 INIT_LIST_HEAD(&o2hb_live_slots[i]);
1018
1019         INIT_LIST_HEAD(&o2hb_node_events);
1020
1021         memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
1022 }
1023
1024 /* if we're already in a callback then we're already serialized by the sem */
1025 static void o2hb_fill_node_map_from_callback(unsigned long *map,
1026                                              unsigned bytes)
1027 {
1028         BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
1029
1030         memcpy(map, &o2hb_live_node_bitmap, bytes);
1031 }
1032
1033 /*
1034  * get a map of all nodes that are heartbeating in any regions
1035  */
1036 void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
1037 {
1038         /* callers want to serialize this map and callbacks so that they
1039          * can trust that they don't miss nodes coming to the party */
1040         down_read(&o2hb_callback_sem);
1041         spin_lock(&o2hb_live_lock);
1042         o2hb_fill_node_map_from_callback(map, bytes);
1043         spin_unlock(&o2hb_live_lock);
1044         up_read(&o2hb_callback_sem);
1045 }
1046 EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
1047
1048 /*
1049  * heartbeat configfs bits.  The heartbeat set is a default set under
1050  * the cluster set in nodemanager.c.
1051  */
1052
1053 static struct o2hb_region *to_o2hb_region(struct config_item *item)
1054 {
1055         return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1056 }
1057
1058 /* drop_item only drops its ref after killing the thread, nothing should
1059  * be using the region anymore.  this has to clean up any state that
1060  * attributes might have built up. */
1061 static void o2hb_region_release(struct config_item *item)
1062 {
1063         int i;
1064         struct page *page;
1065         struct o2hb_region *reg = to_o2hb_region(item);
1066
1067         if (reg->hr_tmp_block)
1068                 kfree(reg->hr_tmp_block);
1069
1070         if (reg->hr_slot_data) {
1071                 for (i = 0; i < reg->hr_num_pages; i++) {
1072                         page = reg->hr_slot_data[i];
1073                         if (page)
1074                                 __free_page(page);
1075                 }
1076                 kfree(reg->hr_slot_data);
1077         }
1078
1079         if (reg->hr_bdev)
1080                 blkdev_put(reg->hr_bdev);
1081
1082         if (reg->hr_slots)
1083                 kfree(reg->hr_slots);
1084
1085         spin_lock(&o2hb_live_lock);
1086         list_del(&reg->hr_all_item);
1087         spin_unlock(&o2hb_live_lock);
1088
1089         kfree(reg);
1090 }
1091
1092 static int o2hb_read_block_input(struct o2hb_region *reg,
1093                                  const char *page,
1094                                  size_t count,
1095                                  unsigned long *ret_bytes,
1096                                  unsigned int *ret_bits)
1097 {
1098         unsigned long bytes;
1099         char *p = (char *)page;
1100
1101         bytes = simple_strtoul(p, &p, 0);
1102         if (!p || (*p && (*p != '\n')))
1103                 return -EINVAL;
1104
1105         /* Heartbeat and fs min / max block sizes are the same. */
1106         if (bytes > 4096 || bytes < 512)
1107                 return -ERANGE;
1108         if (hweight16(bytes) != 1)
1109                 return -EINVAL;
1110
1111         if (ret_bytes)
1112                 *ret_bytes = bytes;
1113         if (ret_bits)
1114                 *ret_bits = ffs(bytes) - 1;
1115
1116         return 0;
1117 }
1118
1119 static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg,
1120                                             char *page)
1121 {
1122         return sprintf(page, "%u\n", reg->hr_block_bytes);
1123 }
1124
1125 static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg,
1126                                              const char *page,
1127                                              size_t count)
1128 {
1129         int status;
1130         unsigned long block_bytes;
1131         unsigned int block_bits;
1132
1133         if (reg->hr_bdev)
1134                 return -EINVAL;
1135
1136         status = o2hb_read_block_input(reg, page, count,
1137                                        &block_bytes, &block_bits);
1138         if (status)
1139                 return status;
1140
1141         reg->hr_block_bytes = (unsigned int)block_bytes;
1142         reg->hr_block_bits = block_bits;
1143
1144         return count;
1145 }
1146
1147 static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg,
1148                                             char *page)
1149 {
1150         return sprintf(page, "%llu\n", reg->hr_start_block);
1151 }
1152
1153 static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg,
1154                                              const char *page,
1155                                              size_t count)
1156 {
1157         unsigned long long tmp;
1158         char *p = (char *)page;
1159
1160         if (reg->hr_bdev)
1161                 return -EINVAL;
1162
1163         tmp = simple_strtoull(p, &p, 0);
1164         if (!p || (*p && (*p != '\n')))
1165                 return -EINVAL;
1166
1167         reg->hr_start_block = tmp;
1168
1169         return count;
1170 }
1171
1172 static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg,
1173                                        char *page)
1174 {
1175         return sprintf(page, "%d\n", reg->hr_blocks);
1176 }
1177
1178 static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg,
1179                                         const char *page,
1180                                         size_t count)
1181 {
1182         unsigned long tmp;
1183         char *p = (char *)page;
1184
1185         if (reg->hr_bdev)
1186                 return -EINVAL;
1187
1188         tmp = simple_strtoul(p, &p, 0);
1189         if (!p || (*p && (*p != '\n')))
1190                 return -EINVAL;
1191
1192         if (tmp > O2NM_MAX_NODES || tmp == 0)
1193                 return -ERANGE;
1194
1195         reg->hr_blocks = (unsigned int)tmp;
1196
1197         return count;
1198 }
1199
1200 static ssize_t o2hb_region_dev_read(struct o2hb_region *reg,
1201                                     char *page)
1202 {
1203         unsigned int ret = 0;
1204
1205         if (reg->hr_bdev)
1206                 ret = sprintf(page, "%s\n", reg->hr_dev_name);
1207
1208         return ret;
1209 }
1210
1211 static void o2hb_init_region_params(struct o2hb_region *reg)
1212 {
1213         reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits;
1214         reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1215
1216         mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1217              reg->hr_start_block, reg->hr_blocks);
1218         mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1219              reg->hr_block_bytes, reg->hr_block_bits);
1220         mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1221         mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1222 }
1223
1224 static int o2hb_map_slot_data(struct o2hb_region *reg)
1225 {
1226         int i, j;
1227         unsigned int last_slot;
1228         unsigned int spp = reg->hr_slots_per_page;
1229         struct page *page;
1230         char *raw;
1231         struct o2hb_disk_slot *slot;
1232
1233         reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1234         if (reg->hr_tmp_block == NULL) {
1235                 mlog_errno(-ENOMEM);
1236                 return -ENOMEM;
1237         }
1238
1239         reg->hr_slots = kcalloc(reg->hr_blocks,
1240                                 sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1241         if (reg->hr_slots == NULL) {
1242                 mlog_errno(-ENOMEM);
1243                 return -ENOMEM;
1244         }
1245
1246         for(i = 0; i < reg->hr_blocks; i++) {
1247                 slot = &reg->hr_slots[i];
1248                 slot->ds_node_num = i;
1249                 INIT_LIST_HEAD(&slot->ds_live_item);
1250                 slot->ds_raw_block = NULL;
1251         }
1252
1253         reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1254         mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1255                            "at %u blocks per page\n",
1256              reg->hr_num_pages, reg->hr_blocks, spp);
1257
1258         reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1259                                     GFP_KERNEL);
1260         if (!reg->hr_slot_data) {
1261                 mlog_errno(-ENOMEM);
1262                 return -ENOMEM;
1263         }
1264
1265         for(i = 0; i < reg->hr_num_pages; i++) {
1266                 page = alloc_page(GFP_KERNEL);
1267                 if (!page) {
1268                         mlog_errno(-ENOMEM);
1269                         return -ENOMEM;
1270                 }
1271
1272                 reg->hr_slot_data[i] = page;
1273
1274                 last_slot = i * spp;
1275                 raw = page_address(page);
1276                 for (j = 0;
1277                      (j < spp) && ((j + last_slot) < reg->hr_blocks);
1278                      j++) {
1279                         BUG_ON((j + last_slot) >= reg->hr_blocks);
1280
1281                         slot = &reg->hr_slots[j + last_slot];
1282                         slot->ds_raw_block =
1283                                 (struct o2hb_disk_heartbeat_block *) raw;
1284
1285                         raw += reg->hr_block_bytes;
1286                 }
1287         }
1288
1289         return 0;
1290 }
1291
1292 /* Read in all the slots available and populate the tracking
1293  * structures so that we can start with a baseline idea of what's
1294  * there. */
1295 static int o2hb_populate_slot_data(struct o2hb_region *reg)
1296 {
1297         int ret, i;
1298         struct o2hb_disk_slot *slot;
1299         struct o2hb_disk_heartbeat_block *hb_block;
1300
1301         mlog_entry_void();
1302
1303         ret = o2hb_read_slots(reg, reg->hr_blocks);
1304         if (ret) {
1305                 mlog_errno(ret);
1306                 goto out;
1307         }
1308
1309         /* We only want to get an idea of the values initially in each
1310          * slot, so we do no verification - o2hb_check_slot will
1311          * actually determine if each configured slot is valid and
1312          * whether any values have changed. */
1313         for(i = 0; i < reg->hr_blocks; i++) {
1314                 slot = &reg->hr_slots[i];
1315                 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1316
1317                 /* Only fill the values that o2hb_check_slot uses to
1318                  * determine changing slots */
1319                 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1320                 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1321         }
1322
1323 out:
1324         mlog_exit(ret);
1325         return ret;
1326 }
1327
1328 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1329 static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
1330                                      const char *page,
1331                                      size_t count)
1332 {
1333         long fd;
1334         int sectsize;
1335         char *p = (char *)page;
1336         struct file *filp = NULL;
1337         struct inode *inode = NULL;
1338         ssize_t ret = -EINVAL;
1339
1340         if (reg->hr_bdev)
1341                 goto out;
1342
1343         /* We can't heartbeat without having had our node number
1344          * configured yet. */
1345         if (o2nm_this_node() == O2NM_MAX_NODES)
1346                 goto out;
1347
1348         fd = simple_strtol(p, &p, 0);
1349         if (!p || (*p && (*p != '\n')))
1350                 goto out;
1351
1352         if (fd < 0 || fd >= INT_MAX)
1353                 goto out;
1354
1355         filp = fget(fd);
1356         if (filp == NULL)
1357                 goto out;
1358
1359         if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1360             reg->hr_block_bytes == 0)
1361                 goto out;
1362
1363         inode = igrab(filp->f_mapping->host);
1364         if (inode == NULL)
1365                 goto out;
1366
1367         if (!S_ISBLK(inode->i_mode))
1368                 goto out;
1369
1370         reg->hr_bdev = I_BDEV(filp->f_mapping->host);
1371         ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, 0);
1372         if (ret) {
1373                 reg->hr_bdev = NULL;
1374                 goto out;
1375         }
1376         inode = NULL;
1377
1378         bdevname(reg->hr_bdev, reg->hr_dev_name);
1379
1380         sectsize = bdev_hardsect_size(reg->hr_bdev);
1381         if (sectsize != reg->hr_block_bytes) {
1382                 mlog(ML_ERROR,
1383                      "blocksize %u incorrect for device, expected %d",
1384                      reg->hr_block_bytes, sectsize);
1385                 ret = -EINVAL;
1386                 goto out;
1387         }
1388
1389         o2hb_init_region_params(reg);
1390
1391         /* Generation of zero is invalid */
1392         do {
1393                 get_random_bytes(&reg->hr_generation,
1394                                  sizeof(reg->hr_generation));
1395         } while (reg->hr_generation == 0);
1396
1397         ret = o2hb_map_slot_data(reg);
1398         if (ret) {
1399                 mlog_errno(ret);
1400                 goto out;
1401         }
1402
1403         ret = o2hb_populate_slot_data(reg);
1404         if (ret) {
1405                 mlog_errno(ret);
1406                 goto out;
1407         }
1408
1409         INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1410
1411         /*
1412          * A node is considered live after it has beat LIVE_THRESHOLD
1413          * times.  We're not steady until we've given them a chance
1414          * _after_ our first read.
1415          */
1416         atomic_set(&reg->hr_steady_iterations, O2HB_LIVE_THRESHOLD + 1);
1417
1418         reg->hr_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1419                                    reg->hr_item.ci_name);
1420         if (IS_ERR(reg->hr_task)) {
1421                 ret = PTR_ERR(reg->hr_task);
1422                 mlog_errno(ret);
1423                 reg->hr_task = NULL;
1424                 goto out;
1425         }
1426
1427         ret = wait_event_interruptible(o2hb_steady_queue,
1428                                 atomic_read(&reg->hr_steady_iterations) == 0);
1429         if (ret) {
1430                 kthread_stop(reg->hr_task);
1431                 reg->hr_task = NULL;
1432                 goto out;
1433         }
1434
1435         ret = count;
1436 out:
1437         if (filp)
1438                 fput(filp);
1439         if (inode)
1440                 iput(inode);
1441         if (ret < 0) {
1442                 if (reg->hr_bdev) {
1443                         blkdev_put(reg->hr_bdev);
1444                         reg->hr_bdev = NULL;
1445                 }
1446         }
1447         return ret;
1448 }
1449
1450 struct o2hb_region_attribute {
1451         struct configfs_attribute attr;
1452         ssize_t (*show)(struct o2hb_region *, char *);
1453         ssize_t (*store)(struct o2hb_region *, const char *, size_t);
1454 };
1455
1456 static struct o2hb_region_attribute o2hb_region_attr_block_bytes = {
1457         .attr   = { .ca_owner = THIS_MODULE,
1458                     .ca_name = "block_bytes",
1459                     .ca_mode = S_IRUGO | S_IWUSR },
1460         .show   = o2hb_region_block_bytes_read,
1461         .store  = o2hb_region_block_bytes_write,
1462 };
1463
1464 static struct o2hb_region_attribute o2hb_region_attr_start_block = {
1465         .attr   = { .ca_owner = THIS_MODULE,
1466                     .ca_name = "start_block",
1467                     .ca_mode = S_IRUGO | S_IWUSR },
1468         .show   = o2hb_region_start_block_read,
1469         .store  = o2hb_region_start_block_write,
1470 };
1471
1472 static struct o2hb_region_attribute o2hb_region_attr_blocks = {
1473         .attr   = { .ca_owner = THIS_MODULE,
1474                     .ca_name = "blocks",
1475                     .ca_mode = S_IRUGO | S_IWUSR },
1476         .show   = o2hb_region_blocks_read,
1477         .store  = o2hb_region_blocks_write,
1478 };
1479
1480 static struct o2hb_region_attribute o2hb_region_attr_dev = {
1481         .attr   = { .ca_owner = THIS_MODULE,
1482                     .ca_name = "dev",
1483                     .ca_mode = S_IRUGO | S_IWUSR },
1484         .show   = o2hb_region_dev_read,
1485         .store  = o2hb_region_dev_write,
1486 };
1487
1488 static struct configfs_attribute *o2hb_region_attrs[] = {
1489         &o2hb_region_attr_block_bytes.attr,
1490         &o2hb_region_attr_start_block.attr,
1491         &o2hb_region_attr_blocks.attr,
1492         &o2hb_region_attr_dev.attr,
1493         NULL,
1494 };
1495
1496 static ssize_t o2hb_region_show(struct config_item *item,
1497                                 struct configfs_attribute *attr,
1498                                 char *page)
1499 {
1500         struct o2hb_region *reg = to_o2hb_region(item);
1501         struct o2hb_region_attribute *o2hb_region_attr =
1502                 container_of(attr, struct o2hb_region_attribute, attr);
1503         ssize_t ret = 0;
1504
1505         if (o2hb_region_attr->show)
1506                 ret = o2hb_region_attr->show(reg, page);
1507         return ret;
1508 }
1509
1510 static ssize_t o2hb_region_store(struct config_item *item,
1511                                  struct configfs_attribute *attr,
1512                                  const char *page, size_t count)
1513 {
1514         struct o2hb_region *reg = to_o2hb_region(item);
1515         struct o2hb_region_attribute *o2hb_region_attr =
1516                 container_of(attr, struct o2hb_region_attribute, attr);
1517         ssize_t ret = -EINVAL;
1518
1519         if (o2hb_region_attr->store)
1520                 ret = o2hb_region_attr->store(reg, page, count);
1521         return ret;
1522 }
1523
1524 static struct configfs_item_operations o2hb_region_item_ops = {
1525         .release                = o2hb_region_release,
1526         .show_attribute         = o2hb_region_show,
1527         .store_attribute        = o2hb_region_store,
1528 };
1529
1530 static struct config_item_type o2hb_region_type = {
1531         .ct_item_ops    = &o2hb_region_item_ops,
1532         .ct_attrs       = o2hb_region_attrs,
1533         .ct_owner       = THIS_MODULE,
1534 };
1535
1536 /* heartbeat set */
1537
1538 struct o2hb_heartbeat_group {
1539         struct config_group hs_group;
1540         /* some stuff? */
1541 };
1542
1543 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
1544 {
1545         return group ?
1546                 container_of(group, struct o2hb_heartbeat_group, hs_group)
1547                 : NULL;
1548 }
1549
1550 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
1551                                                           const char *name)
1552 {
1553         struct o2hb_region *reg = NULL;
1554         struct config_item *ret = NULL;
1555
1556         reg = kcalloc(1, sizeof(struct o2hb_region), GFP_KERNEL);
1557         if (reg == NULL)
1558                 goto out; /* ENOMEM */
1559
1560         config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
1561
1562         ret = &reg->hr_item;
1563
1564         spin_lock(&o2hb_live_lock);
1565         list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
1566         spin_unlock(&o2hb_live_lock);
1567 out:
1568         if (ret == NULL)
1569                 kfree(reg);
1570
1571         return ret;
1572 }
1573
1574 static void o2hb_heartbeat_group_drop_item(struct config_group *group,
1575                                            struct config_item *item)
1576 {
1577         struct o2hb_region *reg = to_o2hb_region(item);
1578
1579         /* stop the thread when the user removes the region dir */
1580         if (reg->hr_task) {
1581                 kthread_stop(reg->hr_task);
1582                 reg->hr_task = NULL;
1583         }
1584
1585         config_item_put(item);
1586 }
1587
1588 struct o2hb_heartbeat_group_attribute {
1589         struct configfs_attribute attr;
1590         ssize_t (*show)(struct o2hb_heartbeat_group *, char *);
1591         ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t);
1592 };
1593
1594 static ssize_t o2hb_heartbeat_group_show(struct config_item *item,
1595                                          struct configfs_attribute *attr,
1596                                          char *page)
1597 {
1598         struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
1599         struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
1600                 container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
1601         ssize_t ret = 0;
1602
1603         if (o2hb_heartbeat_group_attr->show)
1604                 ret = o2hb_heartbeat_group_attr->show(reg, page);
1605         return ret;
1606 }
1607
1608 static ssize_t o2hb_heartbeat_group_store(struct config_item *item,
1609                                           struct configfs_attribute *attr,
1610                                           const char *page, size_t count)
1611 {
1612         struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
1613         struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
1614                 container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
1615         ssize_t ret = -EINVAL;
1616
1617         if (o2hb_heartbeat_group_attr->store)
1618                 ret = o2hb_heartbeat_group_attr->store(reg, page, count);
1619         return ret;
1620 }
1621
1622 static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group,
1623                                                      char *page)
1624 {
1625         return sprintf(page, "%u\n", o2hb_dead_threshold);
1626 }
1627
1628 static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group,
1629                                                     const char *page,
1630                                                     size_t count)
1631 {
1632         unsigned long tmp;
1633         char *p = (char *)page;
1634
1635         tmp = simple_strtoul(p, &p, 10);
1636         if (!p || (*p && (*p != '\n')))
1637                 return -EINVAL;
1638
1639         /* this will validate ranges for us. */
1640         o2hb_dead_threshold_set((unsigned int) tmp);
1641
1642         return count;
1643 }
1644
1645 static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = {
1646         .attr   = { .ca_owner = THIS_MODULE,
1647                     .ca_name = "dead_threshold",
1648                     .ca_mode = S_IRUGO | S_IWUSR },
1649         .show   = o2hb_heartbeat_group_threshold_show,
1650         .store  = o2hb_heartbeat_group_threshold_store,
1651 };
1652
1653 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
1654         &o2hb_heartbeat_group_attr_threshold.attr,
1655         NULL,
1656 };
1657
1658 static struct configfs_item_operations o2hb_hearbeat_group_item_ops = {
1659         .show_attribute         = o2hb_heartbeat_group_show,
1660         .store_attribute        = o2hb_heartbeat_group_store,
1661 };
1662
1663 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
1664         .make_item      = o2hb_heartbeat_group_make_item,
1665         .drop_item      = o2hb_heartbeat_group_drop_item,
1666 };
1667
1668 static struct config_item_type o2hb_heartbeat_group_type = {
1669         .ct_group_ops   = &o2hb_heartbeat_group_group_ops,
1670         .ct_item_ops    = &o2hb_hearbeat_group_item_ops,
1671         .ct_attrs       = o2hb_heartbeat_group_attrs,
1672         .ct_owner       = THIS_MODULE,
1673 };
1674
1675 /* this is just here to avoid touching group in heartbeat.h which the
1676  * entire damn world #includes */
1677 struct config_group *o2hb_alloc_hb_set(void)
1678 {
1679         struct o2hb_heartbeat_group *hs = NULL;
1680         struct config_group *ret = NULL;
1681
1682         hs = kcalloc(1, sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
1683         if (hs == NULL)
1684                 goto out;
1685
1686         config_group_init_type_name(&hs->hs_group, "heartbeat",
1687                                     &o2hb_heartbeat_group_type);
1688
1689         ret = &hs->hs_group;
1690 out:
1691         if (ret == NULL)
1692                 kfree(hs);
1693         return ret;
1694 }
1695
1696 void o2hb_free_hb_set(struct config_group *group)
1697 {
1698         struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
1699         kfree(hs);
1700 }
1701
1702 /* hb callback registration and issueing */
1703
1704 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
1705 {
1706         if (type == O2HB_NUM_CB)
1707                 return ERR_PTR(-EINVAL);
1708
1709         return &o2hb_callbacks[type];
1710 }
1711
1712 void o2hb_setup_callback(struct o2hb_callback_func *hc,
1713                          enum o2hb_callback_type type,
1714                          o2hb_cb_func *func,
1715                          void *data,
1716                          int priority)
1717 {
1718         INIT_LIST_HEAD(&hc->hc_item);
1719         hc->hc_func = func;
1720         hc->hc_data = data;
1721         hc->hc_priority = priority;
1722         hc->hc_type = type;
1723         hc->hc_magic = O2HB_CB_MAGIC;
1724 }
1725 EXPORT_SYMBOL_GPL(o2hb_setup_callback);
1726
1727 int o2hb_register_callback(struct o2hb_callback_func *hc)
1728 {
1729         struct o2hb_callback_func *tmp;
1730         struct list_head *iter;
1731         struct o2hb_callback *hbcall;
1732         int ret;
1733
1734         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
1735         BUG_ON(!list_empty(&hc->hc_item));
1736
1737         hbcall = hbcall_from_type(hc->hc_type);
1738         if (IS_ERR(hbcall)) {
1739                 ret = PTR_ERR(hbcall);
1740                 goto out;
1741         }
1742
1743         down_write(&o2hb_callback_sem);
1744
1745         list_for_each(iter, &hbcall->list) {
1746                 tmp = list_entry(iter, struct o2hb_callback_func, hc_item);
1747                 if (hc->hc_priority < tmp->hc_priority) {
1748                         list_add_tail(&hc->hc_item, iter);
1749                         break;
1750                 }
1751         }
1752         if (list_empty(&hc->hc_item))
1753                 list_add_tail(&hc->hc_item, &hbcall->list);
1754
1755         up_write(&o2hb_callback_sem);
1756         ret = 0;
1757 out:
1758         mlog(ML_HEARTBEAT, "returning %d on behalf of %p for funcs %p\n",
1759              ret, __builtin_return_address(0), hc);
1760         return ret;
1761 }
1762 EXPORT_SYMBOL_GPL(o2hb_register_callback);
1763
1764 int o2hb_unregister_callback(struct o2hb_callback_func *hc)
1765 {
1766         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
1767
1768         mlog(ML_HEARTBEAT, "on behalf of %p for funcs %p\n",
1769              __builtin_return_address(0), hc);
1770
1771         if (list_empty(&hc->hc_item))
1772                 return 0;
1773
1774         down_write(&o2hb_callback_sem);
1775
1776         list_del_init(&hc->hc_item);
1777
1778         up_write(&o2hb_callback_sem);
1779
1780         return 0;
1781 }
1782 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
1783
1784 int o2hb_check_node_heartbeating(u8 node_num)
1785 {
1786         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1787
1788         o2hb_fill_node_map(testing_map, sizeof(testing_map));
1789         if (!test_bit(node_num, testing_map)) {
1790                 mlog(ML_HEARTBEAT,
1791                      "node (%u) does not have heartbeating enabled.\n",
1792                      node_num);
1793                 return 0;
1794         }
1795
1796         return 1;
1797 }
1798 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
1799
1800 int o2hb_check_node_heartbeating_from_callback(u8 node_num)
1801 {
1802         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1803
1804         o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
1805         if (!test_bit(node_num, testing_map)) {
1806                 mlog(ML_HEARTBEAT,
1807                      "node (%u) does not have heartbeating enabled.\n",
1808                      node_num);
1809                 return 0;
1810         }
1811
1812         return 1;
1813 }
1814 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
1815
1816 /* Makes sure our local node is configured with a node number, and is
1817  * heartbeating. */
1818 int o2hb_check_local_node_heartbeating(void)
1819 {
1820         u8 node_num;
1821
1822         /* if this node was set then we have networking */
1823         node_num = o2nm_this_node();
1824         if (node_num == O2NM_MAX_NODES) {
1825                 mlog(ML_HEARTBEAT, "this node has not been configured.\n");
1826                 return 0;
1827         }
1828
1829         return o2hb_check_node_heartbeating(node_num);
1830 }
1831 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
1832
1833 /*
1834  * this is just a hack until we get the plumbing which flips file systems
1835  * read only and drops the hb ref instead of killing the node dead.
1836  */
1837 void o2hb_stop_all_regions(void)
1838 {
1839         struct o2hb_region *reg;
1840
1841         mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
1842
1843         spin_lock(&o2hb_live_lock);
1844
1845         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
1846                 reg->hr_unclean_stop = 1;
1847
1848         spin_unlock(&o2hb_live_lock);
1849 }
1850 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);