1 /******************************************************************************
 
   2 *******************************************************************************
 
   4 **  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
 
   6 **  This copyrighted material is made available to anyone wishing to use,
 
   7 **  modify, copy, or redistribute it subject to the terms and conditions
 
   8 **  of the GNU General Public License v.2.
 
  10 *******************************************************************************
 
  11 ******************************************************************************/
 
  13 #include "dlm_internal.h"
 
  18 #include "requestqueue.h"
 
  21         struct list_head list;
 
  27  * Requests received while the lockspace is in recovery get added to the
 
  28  * request queue and processed when recovery is complete.  This happens when
 
  29  * the lockspace is suspended on some nodes before it is on others, or the
 
  30  * lockspace is enabled on some while still suspended on others.
 
  33 void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd)
 
  36         int length = hd->h_length;
 
  38         if (dlm_is_removed(ls, nodeid))
 
  41         e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL);
 
  43                 log_print("dlm_add_requestqueue: out of memory\n");
 
  48         memcpy(e->request, hd, length);
 
  50         mutex_lock(&ls->ls_requestqueue_mutex);
 
  51         list_add_tail(&e->list, &ls->ls_requestqueue);
 
  52         mutex_unlock(&ls->ls_requestqueue_mutex);
 
  55 int dlm_process_requestqueue(struct dlm_ls *ls)
 
  58         struct dlm_header *hd;
 
  61         mutex_lock(&ls->ls_requestqueue_mutex);
 
  64                 if (list_empty(&ls->ls_requestqueue)) {
 
  65                         mutex_unlock(&ls->ls_requestqueue_mutex);
 
  69                 e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
 
  70                 mutex_unlock(&ls->ls_requestqueue_mutex);
 
  72                 hd = (struct dlm_header *) e->request;
 
  73                 error = dlm_receive_message(hd, e->nodeid, 1);
 
  75                 if (error == -EINTR) {
 
  76                         /* entry is left on requestqueue */
 
  77                         log_debug(ls, "process_requestqueue abort eintr");
 
  81                 mutex_lock(&ls->ls_requestqueue_mutex);
 
  85                 if (dlm_locking_stopped(ls)) {
 
  86                         log_debug(ls, "process_requestqueue abort running");
 
  87                         mutex_unlock(&ls->ls_requestqueue_mutex);
 
  98  * After recovery is done, locking is resumed and dlm_recoverd takes all the
 
  99  * saved requests and processes them as they would have been by dlm_recvd.  At
 
 100  * the same time, dlm_recvd will start receiving new requests from remote
 
 101  * nodes.  We want to delay dlm_recvd processing new requests until
 
 102  * dlm_recoverd has finished processing the old saved requests.
 
 105 void dlm_wait_requestqueue(struct dlm_ls *ls)
 
 108                 mutex_lock(&ls->ls_requestqueue_mutex);
 
 109                 if (list_empty(&ls->ls_requestqueue))
 
 111                 if (dlm_locking_stopped(ls))
 
 113                 mutex_unlock(&ls->ls_requestqueue_mutex);
 
 116         mutex_unlock(&ls->ls_requestqueue_mutex);
 
 119 static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
 
 121         uint32_t type = ms->m_type;
 
 123         if (dlm_is_removed(ls, nodeid))
 
 126         /* directory operations are always purged because the directory is
 
 127            always rebuilt during recovery and the lookups resent */
 
 129         if (type == DLM_MSG_REMOVE ||
 
 130             type == DLM_MSG_LOOKUP ||
 
 131             type == DLM_MSG_LOOKUP_REPLY)
 
 134         if (!dlm_no_directory(ls))
 
 137         /* with no directory, the master is likely to change as a part of
 
 138            recovery; requests to/from the defunct master need to be purged */
 
 141         case DLM_MSG_REQUEST:
 
 142         case DLM_MSG_CONVERT:
 
 145                 /* we're no longer the master of this resource, the sender
 
 146                    will resend to the new master (see waiter_needs_recovery) */
 
 148                 if (dlm_hash2nodeid(ls, ms->m_hash) != dlm_our_nodeid())
 
 152         case DLM_MSG_REQUEST_REPLY:
 
 153         case DLM_MSG_CONVERT_REPLY:
 
 154         case DLM_MSG_UNLOCK_REPLY:
 
 155         case DLM_MSG_CANCEL_REPLY:
 
 157                 /* this reply is from the former master of the resource,
 
 158                    we'll resend to the new master if needed */
 
 160                 if (dlm_hash2nodeid(ls, ms->m_hash) != nodeid)
 
 168 void dlm_purge_requestqueue(struct dlm_ls *ls)
 
 170         struct dlm_message *ms;
 
 171         struct rq_entry *e, *safe;
 
 173         mutex_lock(&ls->ls_requestqueue_mutex);
 
 174         list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
 
 175                 ms = (struct dlm_message *) e->request;
 
 177                 if (purge_request(ls, ms, e->nodeid)) {
 
 182         mutex_unlock(&ls->ls_requestqueue_mutex);