1 /******************************************************************************
 
   2 *******************************************************************************
 
   4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
 
   5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
 
   7 **  This copyrighted material is made available to anyone wishing to use,
 
   8 **  modify, copy, or redistribute it subject to the terms and conditions
 
   9 **  of the GNU General Public License v.2.
 
  11 *******************************************************************************
 
  12 ******************************************************************************/
 
  14 #include "dlm_internal.h"
 
  15 #include "lockspace.h"
 
  28  * Recovery waiting routines: these functions wait for a particular reply from
 
  29  * a remote node, or for the remote node to report a certain status.  They need
 
  30  * to abort if the lockspace is stopped indicating a node has failed (perhaps
 
  31  * the one being waited for).
 
  35  * Wait until given function returns non-zero or lockspace is stopped
 
  36  * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes).  When another
 
  37  * function thinks it could have completed the waited-on task, they should wake
 
  38  * up ls_wait_general to get an immediate response rather than waiting for the
 
  39  * timer to detect the result.  A timer wakes us up periodically while waiting
 
  40  * to see if we should abort due to a node failure.  This should only be called
 
  41  * by the dlm_recoverd thread.
 
  44 static void dlm_wait_timer_fn(unsigned long data)
 
  46         struct dlm_ls *ls = (struct dlm_ls *) data;
 
  47         mod_timer(&ls->ls_timer, jiffies + (dlm_config.recover_timer * HZ));
 
  48         wake_up(&ls->ls_wait_general);
 
  51 int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
 
  55         init_timer(&ls->ls_timer);
 
  56         ls->ls_timer.function = dlm_wait_timer_fn;
 
  57         ls->ls_timer.data = (long) ls;
 
  58         ls->ls_timer.expires = jiffies + (dlm_config.recover_timer * HZ);
 
  59         add_timer(&ls->ls_timer);
 
  61         wait_event(ls->ls_wait_general, testfn(ls) || dlm_recovery_stopped(ls));
 
  62         del_timer_sync(&ls->ls_timer);
 
  64         if (dlm_recovery_stopped(ls)) {
 
  65                 log_debug(ls, "dlm_wait_function aborted");
 
  72  * An efficient way for all nodes to wait for all others to have a certain
 
  73  * status.  The node with the lowest nodeid polls all the others for their
 
  74  * status (wait_status_all) and all the others poll the node with the low id
 
  75  * for its accumulated result (wait_status_low).  When all nodes have set
 
  76  * status flag X, then status flag X_ALL will be set on the low nodeid.
 
  79 uint32_t dlm_recover_status(struct dlm_ls *ls)
 
  82         spin_lock(&ls->ls_recover_lock);
 
  83         status = ls->ls_recover_status;
 
  84         spin_unlock(&ls->ls_recover_lock);
 
  88 void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
 
  90         spin_lock(&ls->ls_recover_lock);
 
  91         ls->ls_recover_status |= status;
 
  92         spin_unlock(&ls->ls_recover_lock);
 
  95 static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
 
  97         struct dlm_rcom *rc = (struct dlm_rcom *) ls->ls_recover_buf;
 
  98         struct dlm_member *memb;
 
 101         list_for_each_entry(memb, &ls->ls_nodes, list) {
 
 104                         if (dlm_recovery_stopped(ls)) {
 
 109                         error = dlm_rcom_status(ls, memb->nodeid);
 
 113                         if (rc->rc_result & wait_status)
 
 124 static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status)
 
 126         struct dlm_rcom *rc = (struct dlm_rcom *) ls->ls_recover_buf;
 
 127         int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
 
 130                 if (dlm_recovery_stopped(ls)) {
 
 135                 error = dlm_rcom_status(ls, nodeid);
 
 139                 if (rc->rc_result & wait_status)
 
 149 static int wait_status(struct dlm_ls *ls, uint32_t status)
 
 151         uint32_t status_all = status << 1;
 
 154         if (ls->ls_low_nodeid == dlm_our_nodeid()) {
 
 155                 error = wait_status_all(ls, status);
 
 157                         dlm_set_recover_status(ls, status_all);
 
 159                 error = wait_status_low(ls, status_all);
 
 164 int dlm_recover_members_wait(struct dlm_ls *ls)
 
 166         return wait_status(ls, DLM_RS_NODES);
 
 169 int dlm_recover_directory_wait(struct dlm_ls *ls)
 
 171         return wait_status(ls, DLM_RS_DIR);
 
 174 int dlm_recover_locks_wait(struct dlm_ls *ls)
 
 176         return wait_status(ls, DLM_RS_LOCKS);
 
 179 int dlm_recover_done_wait(struct dlm_ls *ls)
 
 181         return wait_status(ls, DLM_RS_DONE);
 
 185  * The recover_list contains all the rsb's for which we've requested the new
 
 186  * master nodeid.  As replies are returned from the resource directories the
 
 187  * rsb's are removed from the list.  When the list is empty we're done.
 
 189  * The recover_list is later similarly used for all rsb's for which we've sent
 
 190  * new lkb's and need to receive new corresponding lkid's.
 
 192  * We use the address of the rsb struct as a simple local identifier for the
 
 193  * rsb so we can match an rcom reply with the rsb it was sent for.
 
 196 static int recover_list_empty(struct dlm_ls *ls)
 
 200         spin_lock(&ls->ls_recover_list_lock);
 
 201         empty = list_empty(&ls->ls_recover_list);
 
 202         spin_unlock(&ls->ls_recover_list_lock);
 
 207 static void recover_list_add(struct dlm_rsb *r)
 
 209         struct dlm_ls *ls = r->res_ls;
 
 211         spin_lock(&ls->ls_recover_list_lock);
 
 212         if (list_empty(&r->res_recover_list)) {
 
 213                 list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
 
 214                 ls->ls_recover_list_count++;
 
 217         spin_unlock(&ls->ls_recover_list_lock);
 
 220 static void recover_list_del(struct dlm_rsb *r)
 
 222         struct dlm_ls *ls = r->res_ls;
 
 224         spin_lock(&ls->ls_recover_list_lock);
 
 225         list_del_init(&r->res_recover_list);
 
 226         ls->ls_recover_list_count--;
 
 227         spin_unlock(&ls->ls_recover_list_lock);
 
 232 static struct dlm_rsb *recover_list_find(struct dlm_ls *ls, uint64_t id)
 
 234         struct dlm_rsb *r = NULL;
 
 236         spin_lock(&ls->ls_recover_list_lock);
 
 238         list_for_each_entry(r, &ls->ls_recover_list, res_recover_list) {
 
 239                 if (id == (unsigned long) r)
 
 244         spin_unlock(&ls->ls_recover_list_lock);
 
 248 static void recover_list_clear(struct dlm_ls *ls)
 
 250         struct dlm_rsb *r, *s;
 
 252         spin_lock(&ls->ls_recover_list_lock);
 
 253         list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
 
 254                 list_del_init(&r->res_recover_list);
 
 256                 ls->ls_recover_list_count--;
 
 259         if (ls->ls_recover_list_count != 0) {
 
 260                 log_error(ls, "warning: recover_list_count %d",
 
 261                           ls->ls_recover_list_count);
 
 262                 ls->ls_recover_list_count = 0;
 
 264         spin_unlock(&ls->ls_recover_list_lock);
 
 268 /* Master recovery: find new master node for rsb's that were
 
 269    mastered on nodes that have been removed.
 
 273    dlm_send_rcom_lookup            ->  receive_rcom_lookup
 
 275    receive_rcom_lookup_reply       <-
 
 276    dlm_recover_master_reply
 
 283  * Set the lock master for all LKBs in a lock queue
 
 284  * If we are the new master of the rsb, we may have received new
 
 285  * MSTCPY locks from other nodes already which we need to ignore
 
 286  * when setting the new nodeid.
 
 289 static void set_lock_master(struct list_head *queue, int nodeid)
 
 293         list_for_each_entry(lkb, queue, lkb_statequeue)
 
 294                 if (!(lkb->lkb_flags & DLM_IFL_MSTCPY))
 
 295                         lkb->lkb_nodeid = nodeid;
 
 298 static void set_master_lkbs(struct dlm_rsb *r)
 
 300         set_lock_master(&r->res_grantqueue, r->res_nodeid);
 
 301         set_lock_master(&r->res_convertqueue, r->res_nodeid);
 
 302         set_lock_master(&r->res_waitqueue, r->res_nodeid);
 
 306  * Propogate the new master nodeid to locks
 
 307  * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
 
 308  * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which
 
 312 static void set_new_master(struct dlm_rsb *r, int nodeid)
 
 315         r->res_nodeid = nodeid;
 
 317         rsb_set_flag(r, RSB_NEW_MASTER);
 
 318         rsb_set_flag(r, RSB_NEW_MASTER2);
 
 323  * We do async lookups on rsb's that need new masters.  The rsb's
 
 324  * waiting for a lookup reply are kept on the recover_list.
 
 327 static int recover_master(struct dlm_rsb *r)
 
 329         struct dlm_ls *ls = r->res_ls;
 
 330         int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
 
 332         dir_nodeid = dlm_dir_nodeid(r);
 
 334         if (dir_nodeid == our_nodeid) {
 
 335                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
 
 336                                        r->res_length, &ret_nodeid);
 
 338                         log_error(ls, "recover dir lookup error %d", error);
 
 340                 if (ret_nodeid == our_nodeid)
 
 342                 set_new_master(r, ret_nodeid);
 
 345                 error = dlm_send_rcom_lookup(r, dir_nodeid);
 
 352  * When not using a directory, most resource names will hash to a new static
 
 353  * master nodeid and the resource will need to be remastered.
 
 356 static int recover_master_static(struct dlm_rsb *r)
 
 358         int master = dlm_dir_nodeid(r);
 
 360         if (master == dlm_our_nodeid())
 
 363         if (r->res_nodeid != master) {
 
 365                         dlm_purge_mstcpy_locks(r);
 
 366                 set_new_master(r, master);
 
 373  * Go through local root resources and for each rsb which has a master which
 
 374  * has departed, get the new master nodeid from the directory.  The dir will
 
 375  * assign mastery to the first node to look up the new master.  That means
 
 376  * we'll discover in this lookup if we're the new master of any rsb's.
 
 378  * We fire off all the dir lookup requests individually and asynchronously to
 
 379  * the correct dir node.
 
 382 int dlm_recover_masters(struct dlm_ls *ls)
 
 385         int error = 0, count = 0;
 
 387         log_debug(ls, "dlm_recover_masters");
 
 389         down_read(&ls->ls_root_sem);
 
 390         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 
 391                 if (dlm_recovery_stopped(ls)) {
 
 392                         up_read(&ls->ls_root_sem);
 
 397                 if (dlm_no_directory(ls))
 
 398                         count += recover_master_static(r);
 
 399                 else if (!is_master(r) && dlm_is_removed(ls, r->res_nodeid)) {
 
 406         up_read(&ls->ls_root_sem);
 
 408         log_debug(ls, "dlm_recover_masters %d resources", count);
 
 410         error = dlm_wait_function(ls, &recover_list_empty);
 
 413                 recover_list_clear(ls);
 
 417 int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
 
 422         r = recover_list_find(ls, rc->rc_id);
 
 424                 log_error(ls, "dlm_recover_master_reply no id %llx",
 
 425                           (unsigned long long)rc->rc_id);
 
 429         nodeid = rc->rc_result;
 
 430         if (nodeid == dlm_our_nodeid())
 
 433         set_new_master(r, nodeid);
 
 436         if (recover_list_empty(ls))
 
 437                 wake_up(&ls->ls_wait_general);
 
 443 /* Lock recovery: rebuild the process-copy locks we hold on a
 
 444    remastered rsb on the new rsb master.
 
 449    dlm_send_rcom_lock              ->  receive_rcom_lock
 
 450                                        dlm_recover_master_copy
 
 451    receive_rcom_lock_reply         <-
 
 452    dlm_recover_process_copy
 
 457  * keep a count of the number of lkb's we send to the new master; when we get
 
 458  * an equal number of replies then recovery for the rsb is done
 
 461 static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head)
 
 466         list_for_each_entry(lkb, head, lkb_statequeue) {
 
 467                 error = dlm_send_rcom_lock(r, lkb);
 
 470                 r->res_recover_locks_count++;
 
 476 static int recover_locks(struct dlm_rsb *r)
 
 482         DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r););
 
 484         error = recover_locks_queue(r, &r->res_grantqueue);
 
 487         error = recover_locks_queue(r, &r->res_convertqueue);
 
 490         error = recover_locks_queue(r, &r->res_waitqueue);
 
 494         if (r->res_recover_locks_count)
 
 497                 rsb_clear_flag(r, RSB_NEW_MASTER);
 
 503 int dlm_recover_locks(struct dlm_ls *ls)
 
 506         int error, count = 0;
 
 508         log_debug(ls, "dlm_recover_locks");
 
 510         down_read(&ls->ls_root_sem);
 
 511         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 
 513                         rsb_clear_flag(r, RSB_NEW_MASTER);
 
 517                 if (!rsb_flag(r, RSB_NEW_MASTER))
 
 520                 if (dlm_recovery_stopped(ls)) {
 
 522                         up_read(&ls->ls_root_sem);
 
 526                 error = recover_locks(r);
 
 528                         up_read(&ls->ls_root_sem);
 
 532                 count += r->res_recover_locks_count;
 
 534         up_read(&ls->ls_root_sem);
 
 536         log_debug(ls, "dlm_recover_locks %d locks", count);
 
 538         error = dlm_wait_function(ls, &recover_list_empty);
 
 541                 recover_list_clear(ls);
 
 543                 dlm_set_recover_status(ls, DLM_RS_LOCKS);
 
 547 void dlm_recovered_lock(struct dlm_rsb *r)
 
 549         DLM_ASSERT(rsb_flag(r, RSB_NEW_MASTER), dlm_dump_rsb(r););
 
 551         r->res_recover_locks_count--;
 
 552         if (!r->res_recover_locks_count) {
 
 553                 rsb_clear_flag(r, RSB_NEW_MASTER);
 
 557         if (recover_list_empty(r->res_ls))
 
 558                 wake_up(&r->res_ls->ls_wait_general);
 
 562  * The lvb needs to be recovered on all master rsb's.  This includes setting
 
 563  * the VALNOTVALID flag if necessary, and determining the correct lvb contents
 
 564  * based on the lvb's of the locks held on the rsb.
 
 566  * RSB_VALNOTVALID is set if there are only NL/CR locks on the rsb.  If it
 
 567  * was already set prior to recovery, it's not cleared, regardless of locks.
 
 569  * The LVB contents are only considered for changing when this is a new master
 
 570  * of the rsb (NEW_MASTER2).  Then, the rsb's lvb is taken from any lkb with
 
 571  * mode > CR.  If no lkb's exist with mode above CR, the lvb contents are taken
 
 572  * from the lkb with the largest lvb sequence number.
 
 575 static void recover_lvb(struct dlm_rsb *r)
 
 577         struct dlm_lkb *lkb, *high_lkb = NULL;
 
 578         uint32_t high_seq = 0;
 
 579         int lock_lvb_exists = 0;
 
 580         int big_lock_exists = 0;
 
 581         int lvblen = r->res_ls->ls_lvblen;
 
 583         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
 
 584                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
 
 589                 if (lkb->lkb_grmode > DLM_LOCK_CR) {
 
 594                 if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) {
 
 596                         high_seq = lkb->lkb_lvbseq;
 
 600         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
 
 601                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
 
 606                 if (lkb->lkb_grmode > DLM_LOCK_CR) {
 
 611                 if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) {
 
 613                         high_seq = lkb->lkb_lvbseq;
 
 618         if (!lock_lvb_exists)
 
 621         if (!big_lock_exists)
 
 622                 rsb_set_flag(r, RSB_VALNOTVALID);
 
 624         /* don't mess with the lvb unless we're the new master */
 
 625         if (!rsb_flag(r, RSB_NEW_MASTER2))
 
 628         if (!r->res_lvbptr) {
 
 629                 r->res_lvbptr = allocate_lvb(r->res_ls);
 
 634         if (big_lock_exists) {
 
 635                 r->res_lvbseq = lkb->lkb_lvbseq;
 
 636                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, lvblen);
 
 637         } else if (high_lkb) {
 
 638                 r->res_lvbseq = high_lkb->lkb_lvbseq;
 
 639                 memcpy(r->res_lvbptr, high_lkb->lkb_lvbptr, lvblen);
 
 642                 memset(r->res_lvbptr, 0, lvblen);
 
 648 /* All master rsb's flagged RECOVER_CONVERT need to be looked at.  The locks
 
 649    converting PR->CW or CW->PR need to have their lkb_grmode set. */
 
 651 static void recover_conversion(struct dlm_rsb *r)
 
 656         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
 
 657                 if (lkb->lkb_grmode == DLM_LOCK_PR ||
 
 658                     lkb->lkb_grmode == DLM_LOCK_CW) {
 
 659                         grmode = lkb->lkb_grmode;
 
 664         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
 
 665                 if (lkb->lkb_grmode != DLM_LOCK_IV)
 
 668                         lkb->lkb_grmode = lkb->lkb_rqmode;
 
 670                         lkb->lkb_grmode = grmode;
 
 674 /* We've become the new master for this rsb and waiting/converting locks may
 
 675    need to be granted in dlm_grant_after_purge() due to locks that may have
 
 676    existed from a removed node. */
 
 678 static void set_locks_purged(struct dlm_rsb *r)
 
 680         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
 
 681                 rsb_set_flag(r, RSB_LOCKS_PURGED);
 
 684 void dlm_recover_rsbs(struct dlm_ls *ls)
 
 689         log_debug(ls, "dlm_recover_rsbs");
 
 691         down_read(&ls->ls_root_sem);
 
 692         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 
 695                         if (rsb_flag(r, RSB_RECOVER_CONVERT))
 
 696                                 recover_conversion(r);
 
 697                         if (rsb_flag(r, RSB_NEW_MASTER2))
 
 702                 rsb_clear_flag(r, RSB_RECOVER_CONVERT);
 
 703                 rsb_clear_flag(r, RSB_NEW_MASTER2);
 
 706         up_read(&ls->ls_root_sem);
 
 708         log_debug(ls, "dlm_recover_rsbs %d rsbs", count);
 
 711 /* Create a single list of all root rsb's to be used during recovery */
 
 713 int dlm_create_root_list(struct dlm_ls *ls)
 
 718         down_write(&ls->ls_root_sem);
 
 719         if (!list_empty(&ls->ls_root_list)) {
 
 720                 log_error(ls, "root list not empty");
 
 725         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 
 726                 read_lock(&ls->ls_rsbtbl[i].lock);
 
 727                 list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
 
 728                         list_add(&r->res_root_list, &ls->ls_root_list);
 
 731                 read_unlock(&ls->ls_rsbtbl[i].lock);
 
 734         up_write(&ls->ls_root_sem);
 
 738 void dlm_release_root_list(struct dlm_ls *ls)
 
 740         struct dlm_rsb *r, *safe;
 
 742         down_write(&ls->ls_root_sem);
 
 743         list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
 
 744                 list_del_init(&r->res_root_list);
 
 747         up_write(&ls->ls_root_sem);
 
 750 void dlm_clear_toss_list(struct dlm_ls *ls)
 
 752         struct dlm_rsb *r, *safe;
 
 755         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 
 756                 write_lock(&ls->ls_rsbtbl[i].lock);
 
 757                 list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss,
 
 759                         list_del(&r->res_hashchain);
 
 762                 write_unlock(&ls->ls_rsbtbl[i].lock);