1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
10 *******************************************************************************
11 ******************************************************************************/
13 /* Central locking logic has four stages:
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
54 L: send_xxxx() -> R: receive_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
59 #include "dlm_internal.h"
62 #include "requestqueue.h"
66 #include "lockspace.h"
71 #include "lvb_table.h"
74 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
75 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
76 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
80 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_remove(struct dlm_rsb *r);
82 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
84 struct dlm_message *ms);
85 static int receive_extralen(struct dlm_message *ms);
88 * Lock compatibilty matrix - thanks Steve
89 * UN = Unlocked state. Not really a state, used as a flag
90 * PD = Padding. Used to make the matrix a nice power of two in size
91 * Other states are the same as the VMS DLM.
92 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
95 static const int __dlm_compat_matrix[8][8] = {
96 /* UN NL CR CW PR PW EX PD */
97 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
98 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
99 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
100 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
101 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
102 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
103 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
104 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
108 * This defines the direction of transfer of LVB data.
109 * Granted mode is the row; requested mode is the column.
110 * Usage: matrix[grmode+1][rqmode+1]
111 * 1 = LVB is returned to the caller
112 * 0 = LVB is written to the resource
113 * -1 = nothing happens to the LVB
116 const int dlm_lvb_operations[8][8] = {
117 /* UN NL CR CW PR PW EX PD*/
118 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
119 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
120 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
121 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
122 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
123 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
124 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
125 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
127 EXPORT_SYMBOL_GPL(dlm_lvb_operations);
129 #define modes_compat(gr, rq) \
130 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
132 int dlm_modes_compat(int mode1, int mode2)
134 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
138 * Compatibility matrix for conversions with QUECVT set.
139 * Granted mode is the row; requested mode is the column.
140 * Usage: matrix[grmode+1][rqmode+1]
143 static const int __quecvt_compat_matrix[8][8] = {
144 /* UN NL CR CW PR PW EX PD */
145 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
146 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
147 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
148 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
149 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
150 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
151 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
152 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
155 static void dlm_print_lkb(struct dlm_lkb *lkb)
157 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
158 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
159 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
160 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
161 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
164 void dlm_print_rsb(struct dlm_rsb *r)
166 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
167 r->res_nodeid, r->res_flags, r->res_first_lkid,
168 r->res_recover_locks_count, r->res_name);
171 /* Threads cannot use the lockspace while it's being recovered */
173 static inline void lock_recovery(struct dlm_ls *ls)
175 down_read(&ls->ls_in_recovery);
178 static inline void unlock_recovery(struct dlm_ls *ls)
180 up_read(&ls->ls_in_recovery);
183 static inline int lock_recovery_try(struct dlm_ls *ls)
185 return down_read_trylock(&ls->ls_in_recovery);
188 static inline int can_be_queued(struct dlm_lkb *lkb)
190 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
193 static inline int force_blocking_asts(struct dlm_lkb *lkb)
195 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
198 static inline int is_demoted(struct dlm_lkb *lkb)
200 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
203 static inline int is_remote(struct dlm_rsb *r)
205 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
206 return !!r->res_nodeid;
209 static inline int is_process_copy(struct dlm_lkb *lkb)
211 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
214 static inline int is_master_copy(struct dlm_lkb *lkb)
216 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
217 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
218 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? TRUE : FALSE;
221 static inline int middle_conversion(struct dlm_lkb *lkb)
223 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
224 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
229 static inline int down_conversion(struct dlm_lkb *lkb)
231 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
234 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
236 if (is_master_copy(lkb))
239 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
241 lkb->lkb_lksb->sb_status = rv;
242 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
244 dlm_add_ast(lkb, AST_COMP);
247 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
249 if (is_master_copy(lkb))
250 send_bast(r, lkb, rqmode);
252 lkb->lkb_bastmode = rqmode;
253 dlm_add_ast(lkb, AST_BAST);
258 * Basic operations on rsb's and lkb's
261 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
265 r = allocate_rsb(ls, len);
271 memcpy(r->res_name, name, len);
272 init_MUTEX(&r->res_sem);
274 INIT_LIST_HEAD(&r->res_lookup);
275 INIT_LIST_HEAD(&r->res_grantqueue);
276 INIT_LIST_HEAD(&r->res_convertqueue);
277 INIT_LIST_HEAD(&r->res_waitqueue);
278 INIT_LIST_HEAD(&r->res_root_list);
279 INIT_LIST_HEAD(&r->res_recover_list);
284 static int search_rsb_list(struct list_head *head, char *name, int len,
285 unsigned int flags, struct dlm_rsb **r_ret)
290 list_for_each_entry(r, head, res_hashchain) {
291 if (len == r->res_length && !memcmp(name, r->res_name, len))
297 if (r->res_nodeid && (flags & R_MASTER))
303 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
304 unsigned int flags, struct dlm_rsb **r_ret)
309 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
311 kref_get(&r->res_ref);
314 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
318 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
320 if (dlm_no_directory(ls))
323 if (r->res_nodeid == -1) {
324 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
325 r->res_first_lkid = 0;
326 } else if (r->res_nodeid > 0) {
327 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
328 r->res_first_lkid = 0;
330 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
331 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
338 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
339 unsigned int flags, struct dlm_rsb **r_ret)
342 write_lock(&ls->ls_rsbtbl[b].lock);
343 error = _search_rsb(ls, name, len, b, flags, r_ret);
344 write_unlock(&ls->ls_rsbtbl[b].lock);
349 * Find rsb in rsbtbl and potentially create/add one
351 * Delaying the release of rsb's has a similar benefit to applications keeping
352 * NL locks on an rsb, but without the guarantee that the cached master value
353 * will still be valid when the rsb is reused. Apps aren't always smart enough
354 * to keep NL locks on an rsb that they may lock again shortly; this can lead
355 * to excessive master lookups and removals if we don't delay the release.
357 * Searching for an rsb means looking through both the normal list and toss
358 * list. When found on the toss list the rsb is moved to the normal list with
359 * ref count of 1; when found on normal list the ref count is incremented.
362 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
363 unsigned int flags, struct dlm_rsb **r_ret)
365 struct dlm_rsb *r, *tmp;
366 uint32_t hash, bucket;
369 if (dlm_no_directory(ls))
372 hash = jhash(name, namelen, 0);
373 bucket = hash & (ls->ls_rsbtbl_size - 1);
375 error = search_rsb(ls, name, namelen, bucket, flags, &r);
379 if (error == -ENOENT && !(flags & R_CREATE))
382 /* the rsb was found but wasn't a master copy */
383 if (error == -ENOTBLK)
387 r = create_rsb(ls, name, namelen);
392 r->res_bucket = bucket;
394 kref_init(&r->res_ref);
396 /* With no directory, the master can be set immediately */
397 if (dlm_no_directory(ls)) {
398 int nodeid = dlm_dir_nodeid(r);
399 if (nodeid == dlm_our_nodeid())
401 r->res_nodeid = nodeid;
404 write_lock(&ls->ls_rsbtbl[bucket].lock);
405 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
407 write_unlock(&ls->ls_rsbtbl[bucket].lock);
412 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
413 write_unlock(&ls->ls_rsbtbl[bucket].lock);
420 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
421 unsigned int flags, struct dlm_rsb **r_ret)
423 return find_rsb(ls, name, namelen, flags, r_ret);
426 /* This is only called to add a reference when the code already holds
427 a valid reference to the rsb, so there's no need for locking. */
429 static inline void hold_rsb(struct dlm_rsb *r)
431 kref_get(&r->res_ref);
434 void dlm_hold_rsb(struct dlm_rsb *r)
439 static void toss_rsb(struct kref *kref)
441 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
442 struct dlm_ls *ls = r->res_ls;
444 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
445 kref_init(&r->res_ref);
446 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
447 r->res_toss_time = jiffies;
449 free_lvb(r->res_lvbptr);
450 r->res_lvbptr = NULL;
454 /* When all references to the rsb are gone it's transfered to
455 the tossed list for later disposal. */
457 static void put_rsb(struct dlm_rsb *r)
459 struct dlm_ls *ls = r->res_ls;
460 uint32_t bucket = r->res_bucket;
462 write_lock(&ls->ls_rsbtbl[bucket].lock);
463 kref_put(&r->res_ref, toss_rsb);
464 write_unlock(&ls->ls_rsbtbl[bucket].lock);
467 void dlm_put_rsb(struct dlm_rsb *r)
472 /* See comment for unhold_lkb */
474 static void unhold_rsb(struct dlm_rsb *r)
477 rv = kref_put(&r->res_ref, toss_rsb);
478 DLM_ASSERT(!rv, dlm_print_rsb(r););
481 static void kill_rsb(struct kref *kref)
483 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
485 /* All work is done after the return from kref_put() so we
486 can release the write_lock before the remove and free. */
488 DLM_ASSERT(list_empty(&r->res_lookup),);
489 DLM_ASSERT(list_empty(&r->res_grantqueue),);
490 DLM_ASSERT(list_empty(&r->res_convertqueue),);
491 DLM_ASSERT(list_empty(&r->res_waitqueue),);
492 DLM_ASSERT(list_empty(&r->res_root_list),);
493 DLM_ASSERT(list_empty(&r->res_recover_list),);
496 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
497 The rsb must exist as long as any lkb's for it do. */
499 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
502 lkb->lkb_resource = r;
505 static void detach_lkb(struct dlm_lkb *lkb)
507 if (lkb->lkb_resource) {
508 put_rsb(lkb->lkb_resource);
509 lkb->lkb_resource = NULL;
513 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
515 struct dlm_lkb *lkb, *tmp;
519 lkb = allocate_lkb(ls);
523 lkb->lkb_nodeid = -1;
524 lkb->lkb_grmode = DLM_LOCK_IV;
525 kref_init(&lkb->lkb_ref);
527 get_random_bytes(&bucket, sizeof(bucket));
528 bucket &= (ls->ls_lkbtbl_size - 1);
530 write_lock(&ls->ls_lkbtbl[bucket].lock);
532 /* counter can roll over so we must verify lkid is not in use */
535 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
537 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
539 if (tmp->lkb_id != lkid)
547 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
548 write_unlock(&ls->ls_lkbtbl[bucket].lock);
554 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
556 uint16_t bucket = lkid & 0xFFFF;
559 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
560 if (lkb->lkb_id == lkid)
566 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
569 uint16_t bucket = lkid & 0xFFFF;
571 if (bucket >= ls->ls_lkbtbl_size)
574 read_lock(&ls->ls_lkbtbl[bucket].lock);
575 lkb = __find_lkb(ls, lkid);
577 kref_get(&lkb->lkb_ref);
578 read_unlock(&ls->ls_lkbtbl[bucket].lock);
581 return lkb ? 0 : -ENOENT;
584 static void kill_lkb(struct kref *kref)
586 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
588 /* All work is done after the return from kref_put() so we
589 can release the write_lock before the detach_lkb */
591 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
594 static int put_lkb(struct dlm_lkb *lkb)
596 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
597 uint16_t bucket = lkb->lkb_id & 0xFFFF;
599 write_lock(&ls->ls_lkbtbl[bucket].lock);
600 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
601 list_del(&lkb->lkb_idtbl_list);
602 write_unlock(&ls->ls_lkbtbl[bucket].lock);
606 /* for local/process lkbs, lvbptr points to caller's lksb */
607 if (lkb->lkb_lvbptr && is_master_copy(lkb))
608 free_lvb(lkb->lkb_lvbptr);
610 free_range(lkb->lkb_range);
614 write_unlock(&ls->ls_lkbtbl[bucket].lock);
619 int dlm_put_lkb(struct dlm_lkb *lkb)
624 /* This is only called to add a reference when the code already holds
625 a valid reference to the lkb, so there's no need for locking. */
627 static inline void hold_lkb(struct dlm_lkb *lkb)
629 kref_get(&lkb->lkb_ref);
632 /* This is called when we need to remove a reference and are certain
633 it's not the last ref. e.g. del_lkb is always called between a
634 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
635 put_lkb would work fine, but would involve unnecessary locking */
637 static inline void unhold_lkb(struct dlm_lkb *lkb)
640 rv = kref_put(&lkb->lkb_ref, kill_lkb);
641 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
644 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
647 struct dlm_lkb *lkb = NULL;
649 list_for_each_entry(lkb, head, lkb_statequeue)
650 if (lkb->lkb_rqmode < mode)
654 list_add_tail(new, head);
656 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
659 /* add/remove lkb to rsb's grant/convert/wait queue */
661 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
663 kref_get(&lkb->lkb_ref);
665 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
667 lkb->lkb_status = status;
670 case DLM_LKSTS_WAITING:
671 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
672 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
674 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
676 case DLM_LKSTS_GRANTED:
677 /* convention says granted locks kept in order of grmode */
678 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
681 case DLM_LKSTS_CONVERT:
682 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
683 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
685 list_add_tail(&lkb->lkb_statequeue,
686 &r->res_convertqueue);
689 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
693 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
696 list_del(&lkb->lkb_statequeue);
700 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
704 add_lkb(r, lkb, sts);
708 /* add/remove lkb from global waiters list of lkb's waiting for
709 a reply from a remote node */
711 static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
713 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
715 down(&ls->ls_waiters_sem);
716 if (lkb->lkb_wait_type) {
717 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
720 lkb->lkb_wait_type = mstype;
721 kref_get(&lkb->lkb_ref);
722 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
724 up(&ls->ls_waiters_sem);
727 static int _remove_from_waiters(struct dlm_lkb *lkb)
731 if (!lkb->lkb_wait_type) {
732 log_print("remove_from_waiters error");
736 lkb->lkb_wait_type = 0;
737 list_del(&lkb->lkb_wait_reply);
743 static int remove_from_waiters(struct dlm_lkb *lkb)
745 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
748 down(&ls->ls_waiters_sem);
749 error = _remove_from_waiters(lkb);
750 up(&ls->ls_waiters_sem);
754 static void dir_remove(struct dlm_rsb *r)
758 if (dlm_no_directory(r->res_ls))
761 to_nodeid = dlm_dir_nodeid(r);
762 if (to_nodeid != dlm_our_nodeid())
765 dlm_dir_remove_entry(r->res_ls, to_nodeid,
766 r->res_name, r->res_length);
769 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
770 found since they are in order of newest to oldest? */
772 static int shrink_bucket(struct dlm_ls *ls, int b)
775 int count = 0, found;
779 write_lock(&ls->ls_rsbtbl[b].lock);
780 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
782 if (!time_after_eq(jiffies, r->res_toss_time +
783 dlm_config.toss_secs * HZ))
790 write_unlock(&ls->ls_rsbtbl[b].lock);
794 if (kref_put(&r->res_ref, kill_rsb)) {
795 list_del(&r->res_hashchain);
796 write_unlock(&ls->ls_rsbtbl[b].lock);
803 write_unlock(&ls->ls_rsbtbl[b].lock);
804 log_error(ls, "tossed rsb in use %s", r->res_name);
811 void dlm_scan_rsbs(struct dlm_ls *ls)
815 if (dlm_locking_stopped(ls))
818 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
819 shrink_bucket(ls, i);
824 /* lkb is master or local copy */
826 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
828 int b, len = r->res_ls->ls_lvblen;
830 /* b=1 lvb returned to caller
831 b=0 lvb written to rsb or invalidated
834 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
837 if (!lkb->lkb_lvbptr)
840 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
846 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
847 lkb->lkb_lvbseq = r->res_lvbseq;
850 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
851 rsb_set_flag(r, RSB_VALNOTVALID);
855 if (!lkb->lkb_lvbptr)
858 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
862 r->res_lvbptr = allocate_lvb(r->res_ls);
867 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
869 lkb->lkb_lvbseq = r->res_lvbseq;
870 rsb_clear_flag(r, RSB_VALNOTVALID);
873 if (rsb_flag(r, RSB_VALNOTVALID))
874 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
877 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
879 if (lkb->lkb_grmode < DLM_LOCK_PW)
882 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
883 rsb_set_flag(r, RSB_VALNOTVALID);
887 if (!lkb->lkb_lvbptr)
890 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
894 r->res_lvbptr = allocate_lvb(r->res_ls);
899 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
901 rsb_clear_flag(r, RSB_VALNOTVALID);
904 /* lkb is process copy (pc) */
906 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
907 struct dlm_message *ms)
911 if (!lkb->lkb_lvbptr)
914 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
917 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
919 int len = receive_extralen(ms);
920 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
921 lkb->lkb_lvbseq = ms->m_lvbseq;
925 /* Manipulate lkb's on rsb's convert/granted/waiting queues
926 remove_lock -- used for unlock, removes lkb from granted
927 revert_lock -- used for cancel, moves lkb from convert to granted
928 grant_lock -- used for request and convert, adds lkb to granted or
929 moves lkb from convert or waiting to granted
931 Each of these is used for master or local copy lkb's. There is
932 also a _pc() variation used to make the corresponding change on
933 a process copy (pc) lkb. */
935 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
938 lkb->lkb_grmode = DLM_LOCK_IV;
939 /* this unhold undoes the original ref from create_lkb()
940 so this leads to the lkb being freed */
944 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
946 set_lvb_unlock(r, lkb);
947 _remove_lock(r, lkb);
950 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
952 _remove_lock(r, lkb);
955 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
957 lkb->lkb_rqmode = DLM_LOCK_IV;
959 switch (lkb->lkb_status) {
960 case DLM_LKSTS_CONVERT:
961 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
963 case DLM_LKSTS_WAITING:
965 lkb->lkb_grmode = DLM_LOCK_IV;
966 /* this unhold undoes the original ref from create_lkb()
967 so this leads to the lkb being freed */
971 log_print("invalid status for revert %d", lkb->lkb_status);
975 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
980 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
982 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
983 lkb->lkb_grmode = lkb->lkb_rqmode;
985 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
987 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
990 lkb->lkb_rqmode = DLM_LOCK_IV;
992 if (lkb->lkb_range) {
993 lkb->lkb_range[GR_RANGE_START] = lkb->lkb_range[RQ_RANGE_START];
994 lkb->lkb_range[GR_RANGE_END] = lkb->lkb_range[RQ_RANGE_END];
998 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1000 set_lvb_lock(r, lkb);
1001 _grant_lock(r, lkb);
1002 lkb->lkb_highbast = 0;
1005 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1006 struct dlm_message *ms)
1008 set_lvb_lock_pc(r, lkb, ms);
1009 _grant_lock(r, lkb);
1012 /* called by grant_pending_locks() which means an async grant message must
1013 be sent to the requesting node in addition to granting the lock if the
1014 lkb belongs to a remote node. */
1016 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1019 if (is_master_copy(lkb))
1022 queue_cast(r, lkb, 0);
1025 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1027 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1029 if (lkb->lkb_id == first->lkb_id)
1035 /* Return 1 if the locks' ranges overlap. If the lkb has no range then it is
1036 assumed to cover 0-ffffffff.ffffffff */
1038 static inline int ranges_overlap(struct dlm_lkb *lkb1, struct dlm_lkb *lkb2)
1040 if (!lkb1->lkb_range || !lkb2->lkb_range)
1043 if (lkb1->lkb_range[RQ_RANGE_END] < lkb2->lkb_range[GR_RANGE_START] ||
1044 lkb1->lkb_range[RQ_RANGE_START] > lkb2->lkb_range[GR_RANGE_END])
1050 /* Check if the given lkb conflicts with another lkb on the queue. */
1052 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1054 struct dlm_lkb *this;
1056 list_for_each_entry(this, head, lkb_statequeue) {
1059 if (ranges_overlap(lkb, this) && !modes_compat(this, lkb))
1066 * "A conversion deadlock arises with a pair of lock requests in the converting
1067 * queue for one resource. The granted mode of each lock blocks the requested
1068 * mode of the other lock."
1070 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1071 * convert queue from being granted, then demote lkb (set grmode to NL).
1072 * This second form requires that we check for conv-deadlk even when
1073 * now == 0 in _can_be_granted().
1076 * Granted Queue: empty
1077 * Convert Queue: NL->EX (first lock)
1078 * PR->EX (second lock)
1080 * The first lock can't be granted because of the granted mode of the second
1081 * lock and the second lock can't be granted because it's not first in the
1082 * list. We demote the granted mode of the second lock (the lkb passed to this
1085 * After the resolution, the "grant pending" function needs to go back and try
1086 * to grant locks on the convert queue again since the first lock can now be
1090 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1092 struct dlm_lkb *this, *first = NULL, *self = NULL;
1094 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1102 if (!ranges_overlap(lkb, this))
1105 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1109 /* if lkb is on the convert queue and is preventing the first
1110 from being granted, then there's deadlock and we demote lkb.
1111 multiple converting locks may need to do this before the first
1112 converting lock can be granted. */
1114 if (self && self != first) {
1115 if (!modes_compat(lkb, first) &&
1116 !queue_conflict(&rsb->res_grantqueue, first))
1124 * Return 1 if the lock can be granted, 0 otherwise.
1125 * Also detect and resolve conversion deadlocks.
1127 * lkb is the lock to be granted
1129 * now is 1 if the function is being called in the context of the
1130 * immediate request, it is 0 if called later, after the lock has been
1133 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1136 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1138 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1141 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1142 * a new request for a NL mode lock being blocked.
1144 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1145 * request, then it would be granted. In essence, the use of this flag
1146 * tells the Lock Manager to expedite theis request by not considering
1147 * what may be in the CONVERTING or WAITING queues... As of this
1148 * writing, the EXPEDITE flag can be used only with new requests for NL
1149 * mode locks. This flag is not valid for conversion requests.
1151 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1152 * conversion or used with a non-NL requested mode. We also know an
1153 * EXPEDITE request is always granted immediately, so now must always
1154 * be 1. The full condition to grant an expedite request: (now &&
1155 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1156 * therefore be shortened to just checking the flag.
1159 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1163 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1164 * added to the remaining conditions.
1167 if (queue_conflict(&r->res_grantqueue, lkb))
1171 * 6-3: By default, a conversion request is immediately granted if the
1172 * requested mode is compatible with the modes of all other granted
1176 if (queue_conflict(&r->res_convertqueue, lkb))
1180 * 6-5: But the default algorithm for deciding whether to grant or
1181 * queue conversion requests does not by itself guarantee that such
1182 * requests are serviced on a "first come first serve" basis. This, in
1183 * turn, can lead to a phenomenon known as "indefinate postponement".
1185 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1186 * the system service employed to request a lock conversion. This flag
1187 * forces certain conversion requests to be queued, even if they are
1188 * compatible with the granted modes of other locks on the same
1189 * resource. Thus, the use of this flag results in conversion requests
1190 * being ordered on a "first come first servce" basis.
1192 * DCT: This condition is all about new conversions being able to occur
1193 * "in place" while the lock remains on the granted queue (assuming
1194 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1195 * doesn't _have_ to go onto the convert queue where it's processed in
1196 * order. The "now" variable is necessary to distinguish converts
1197 * being received and processed for the first time now, because once a
1198 * convert is moved to the conversion queue the condition below applies
1199 * requiring fifo granting.
1202 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1206 * When using range locks the NOORDER flag is set to avoid the standard
1207 * vms rules on grant order.
1210 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1214 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1215 * granted until all other conversion requests ahead of it are granted
1219 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1223 * 6-4: By default, a new request is immediately granted only if all
1224 * three of the following conditions are satisfied when the request is
1226 * - The queue of ungranted conversion requests for the resource is
1228 * - The queue of ungranted new requests for the resource is empty.
1229 * - The mode of the new request is compatible with the most
1230 * restrictive mode of all granted locks on the resource.
1233 if (now && !conv && list_empty(&r->res_convertqueue) &&
1234 list_empty(&r->res_waitqueue))
1238 * 6-4: Once a lock request is in the queue of ungranted new requests,
1239 * it cannot be granted until the queue of ungranted conversion
1240 * requests is empty, all ungranted new requests ahead of it are
1241 * granted and/or canceled, and it is compatible with the granted mode
1242 * of the most restrictive lock granted on the resource.
1245 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1246 first_in_list(lkb, &r->res_waitqueue))
1251 * The following, enabled by CONVDEADLK, departs from VMS.
1254 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1255 conversion_deadlock_detect(r, lkb)) {
1256 lkb->lkb_grmode = DLM_LOCK_NL;
1257 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1264 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1265 * simple way to provide a big optimization to applications that can use them.
1268 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1270 uint32_t flags = lkb->lkb_exflags;
1272 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1274 rv = _can_be_granted(r, lkb, now);
1278 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1281 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1283 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1287 lkb->lkb_rqmode = alt;
1288 rv = _can_be_granted(r, lkb, now);
1290 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1292 lkb->lkb_rqmode = rqmode;
1298 static int grant_pending_convert(struct dlm_rsb *r, int high)
1300 struct dlm_lkb *lkb, *s;
1301 int hi, demoted, quit, grant_restart, demote_restart;
1309 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1310 demoted = is_demoted(lkb);
1311 if (can_be_granted(r, lkb, FALSE)) {
1312 grant_lock_pending(r, lkb);
1315 hi = max_t(int, lkb->lkb_rqmode, hi);
1316 if (!demoted && is_demoted(lkb))
1323 if (demote_restart && !quit) {
1328 return max_t(int, high, hi);
1331 static int grant_pending_wait(struct dlm_rsb *r, int high)
1333 struct dlm_lkb *lkb, *s;
1335 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1336 if (can_be_granted(r, lkb, FALSE))
1337 grant_lock_pending(r, lkb);
1339 high = max_t(int, lkb->lkb_rqmode, high);
1345 static void grant_pending_locks(struct dlm_rsb *r)
1347 struct dlm_lkb *lkb, *s;
1348 int high = DLM_LOCK_IV;
1350 DLM_ASSERT(is_master(r), dlm_print_rsb(r););
1352 high = grant_pending_convert(r, high);
1353 high = grant_pending_wait(r, high);
1355 if (high == DLM_LOCK_IV)
1359 * If there are locks left on the wait/convert queue then send blocking
1360 * ASTs to granted locks based on the largest requested mode (high)
1361 * found above. This can generate spurious blocking ASTs for range
1362 * locks. FIXME: highbast < high comparison not valid for PR/CW.
1365 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1366 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1367 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1368 queue_bast(r, lkb, high);
1369 lkb->lkb_highbast = high;
1374 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1375 struct dlm_lkb *lkb)
1379 list_for_each_entry(gr, head, lkb_statequeue) {
1380 if (gr->lkb_bastaddr &&
1381 gr->lkb_highbast < lkb->lkb_rqmode &&
1382 ranges_overlap(lkb, gr) && !modes_compat(gr, lkb)) {
1383 queue_bast(r, gr, lkb->lkb_rqmode);
1384 gr->lkb_highbast = lkb->lkb_rqmode;
1389 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1391 send_bast_queue(r, &r->res_grantqueue, lkb);
1394 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1396 send_bast_queue(r, &r->res_grantqueue, lkb);
1397 send_bast_queue(r, &r->res_convertqueue, lkb);
1400 /* set_master(r, lkb) -- set the master nodeid of a resource
1402 The purpose of this function is to set the nodeid field in the given
1403 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1404 known, it can just be copied to the lkb and the function will return
1405 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1406 before it can be copied to the lkb.
1408 When the rsb nodeid is being looked up remotely, the initial lkb
1409 causing the lookup is kept on the ls_waiters list waiting for the
1410 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1411 on the rsb's res_lookup list until the master is verified.
1414 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1415 1: the rsb master is not available and the lkb has been placed on
1419 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1421 struct dlm_ls *ls = r->res_ls;
1422 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1424 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1425 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1426 r->res_first_lkid = lkb->lkb_id;
1427 lkb->lkb_nodeid = r->res_nodeid;
1431 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1432 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1436 if (r->res_nodeid == 0) {
1437 lkb->lkb_nodeid = 0;
1441 if (r->res_nodeid > 0) {
1442 lkb->lkb_nodeid = r->res_nodeid;
1446 DLM_ASSERT(r->res_nodeid == -1, dlm_print_rsb(r););
1448 dir_nodeid = dlm_dir_nodeid(r);
1450 if (dir_nodeid != our_nodeid) {
1451 r->res_first_lkid = lkb->lkb_id;
1452 send_lookup(r, lkb);
1457 /* It's possible for dlm_scand to remove an old rsb for
1458 this same resource from the toss list, us to create
1459 a new one, look up the master locally, and find it
1460 already exists just before dlm_scand does the
1461 dir_remove() on the previous rsb. */
1463 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1464 r->res_length, &ret_nodeid);
1467 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1471 if (ret_nodeid == our_nodeid) {
1472 r->res_first_lkid = 0;
1474 lkb->lkb_nodeid = 0;
1476 r->res_first_lkid = lkb->lkb_id;
1477 r->res_nodeid = ret_nodeid;
1478 lkb->lkb_nodeid = ret_nodeid;
1483 static void process_lookup_list(struct dlm_rsb *r)
1485 struct dlm_lkb *lkb, *safe;
1487 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1488 list_del(&lkb->lkb_rsb_lookup);
1489 _request_lock(r, lkb);
1494 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1496 static void confirm_master(struct dlm_rsb *r, int error)
1498 struct dlm_lkb *lkb;
1500 if (!r->res_first_lkid)
1506 r->res_first_lkid = 0;
1507 process_lookup_list(r);
1511 /* the remote master didn't queue our NOQUEUE request;
1512 make a waiting lkb the first_lkid */
1514 r->res_first_lkid = 0;
1516 if (!list_empty(&r->res_lookup)) {
1517 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1519 list_del(&lkb->lkb_rsb_lookup);
1520 r->res_first_lkid = lkb->lkb_id;
1521 _request_lock(r, lkb);
1527 log_error(r->res_ls, "confirm_master unknown error %d", error);
1531 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1532 int namelen, uint32_t parent_lkid, void *ast,
1533 void *astarg, void *bast, struct dlm_range *range,
1534 struct dlm_args *args)
1538 /* check for invalid arg usage */
1540 if (mode < 0 || mode > DLM_LOCK_EX)
1543 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1546 if (flags & DLM_LKF_CANCEL)
1549 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1552 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1555 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1558 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1561 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1564 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1567 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1573 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1576 /* parent/child locks not yet supported */
1580 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1583 /* these args will be copied to the lkb in validate_lock_args,
1584 it cannot be done now because when converting locks, fields in
1585 an active lkb cannot be modified before locking the rsb */
1587 args->flags = flags;
1588 args->astaddr = ast;
1589 args->astparam = (long) astarg;
1590 args->bastaddr = bast;
1593 args->range = range;
1599 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1601 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1602 DLM_LKF_FORCEUNLOCK))
1605 args->flags = flags;
1606 args->astparam = (long) astarg;
1610 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1611 struct dlm_args *args)
1615 if (args->flags & DLM_LKF_CONVERT) {
1616 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1619 if (args->flags & DLM_LKF_QUECVT &&
1620 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1624 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1627 if (lkb->lkb_wait_type)
1631 lkb->lkb_exflags = args->flags;
1632 lkb->lkb_sbflags = 0;
1633 lkb->lkb_astaddr = args->astaddr;
1634 lkb->lkb_astparam = args->astparam;
1635 lkb->lkb_bastaddr = args->bastaddr;
1636 lkb->lkb_rqmode = args->mode;
1637 lkb->lkb_lksb = args->lksb;
1638 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1639 lkb->lkb_ownpid = (int) current->pid;
1645 if (!lkb->lkb_range) {
1647 lkb->lkb_range = allocate_range(ls);
1648 if (!lkb->lkb_range)
1650 /* This is needed for conversions that contain ranges
1651 where the original lock didn't but it's harmless for
1653 lkb->lkb_range[GR_RANGE_START] = 0LL;
1654 lkb->lkb_range[GR_RANGE_END] = 0xffffffffffffffffULL;
1657 lkb->lkb_range[RQ_RANGE_START] = args->range->ra_start;
1658 lkb->lkb_range[RQ_RANGE_END] = args->range->ra_end;
1659 lkb->lkb_flags |= DLM_IFL_RANGE;
1665 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1669 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1672 if (args->flags & DLM_LKF_FORCEUNLOCK)
1675 if (args->flags & DLM_LKF_CANCEL &&
1676 lkb->lkb_status == DLM_LKSTS_GRANTED)
1679 if (!(args->flags & DLM_LKF_CANCEL) &&
1680 lkb->lkb_status != DLM_LKSTS_GRANTED)
1684 if (lkb->lkb_wait_type)
1688 lkb->lkb_exflags = args->flags;
1689 lkb->lkb_sbflags = 0;
1690 lkb->lkb_astparam = args->astparam;
1698 * Four stage 4 varieties:
1699 * do_request(), do_convert(), do_unlock(), do_cancel()
1700 * These are called on the master node for the given lock and
1701 * from the central locking logic.
1704 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1708 if (can_be_granted(r, lkb, TRUE)) {
1710 queue_cast(r, lkb, 0);
1714 if (can_be_queued(lkb)) {
1715 error = -EINPROGRESS;
1716 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1717 send_blocking_asts(r, lkb);
1722 if (force_blocking_asts(lkb))
1723 send_blocking_asts_all(r, lkb);
1724 queue_cast(r, lkb, -EAGAIN);
1730 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1734 /* changing an existing lock may allow others to be granted */
1736 if (can_be_granted(r, lkb, TRUE)) {
1738 queue_cast(r, lkb, 0);
1739 grant_pending_locks(r);
1743 if (can_be_queued(lkb)) {
1744 if (is_demoted(lkb))
1745 grant_pending_locks(r);
1746 error = -EINPROGRESS;
1748 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1749 send_blocking_asts(r, lkb);
1754 if (force_blocking_asts(lkb))
1755 send_blocking_asts_all(r, lkb);
1756 queue_cast(r, lkb, -EAGAIN);
1762 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1764 remove_lock(r, lkb);
1765 queue_cast(r, lkb, -DLM_EUNLOCK);
1766 grant_pending_locks(r);
1767 return -DLM_EUNLOCK;
1770 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1772 revert_lock(r, lkb);
1773 queue_cast(r, lkb, -DLM_ECANCEL);
1774 grant_pending_locks(r);
1775 return -DLM_ECANCEL;
1779 * Four stage 3 varieties:
1780 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1783 /* add a new lkb to a possibly new rsb, called by requesting process */
1785 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1789 /* set_master: sets lkb nodeid from r */
1791 error = set_master(r, lkb);
1800 /* receive_request() calls do_request() on remote node */
1801 error = send_request(r, lkb);
1803 error = do_request(r, lkb);
1808 /* change some property of an existing lkb, e.g. mode, range */
1810 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1815 /* receive_convert() calls do_convert() on remote node */
1816 error = send_convert(r, lkb);
1818 error = do_convert(r, lkb);
1823 /* remove an existing lkb from the granted queue */
1825 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1830 /* receive_unlock() calls do_unlock() on remote node */
1831 error = send_unlock(r, lkb);
1833 error = do_unlock(r, lkb);
1838 /* remove an existing lkb from the convert or wait queue */
1840 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1845 /* receive_cancel() calls do_cancel() on remote node */
1846 error = send_cancel(r, lkb);
1848 error = do_cancel(r, lkb);
1854 * Four stage 2 varieties:
1855 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1858 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1859 int len, struct dlm_args *args)
1864 error = validate_lock_args(ls, lkb, args);
1868 error = find_rsb(ls, name, len, R_CREATE, &r);
1875 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1877 error = _request_lock(r, lkb);
1886 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1887 struct dlm_args *args)
1892 r = lkb->lkb_resource;
1897 error = validate_lock_args(ls, lkb, args);
1901 error = _convert_lock(r, lkb);
1908 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1909 struct dlm_args *args)
1914 r = lkb->lkb_resource;
1919 error = validate_unlock_args(lkb, args);
1923 error = _unlock_lock(r, lkb);
1930 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1931 struct dlm_args *args)
1936 r = lkb->lkb_resource;
1941 error = validate_unlock_args(lkb, args);
1945 error = _cancel_lock(r, lkb);
1953 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
1956 int dlm_lock(dlm_lockspace_t *lockspace,
1958 struct dlm_lksb *lksb,
1961 unsigned int namelen,
1962 uint32_t parent_lkid,
1963 void (*ast) (void *astarg),
1965 void (*bast) (void *astarg, int mode),
1966 struct dlm_range *range)
1969 struct dlm_lkb *lkb;
1970 struct dlm_args args;
1971 int error, convert = flags & DLM_LKF_CONVERT;
1973 ls = dlm_find_lockspace_local(lockspace);
1980 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1982 error = create_lkb(ls, &lkb);
1987 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1988 astarg, bast, range, &args);
1993 error = convert_lock(ls, lkb, &args);
1995 error = request_lock(ls, lkb, name, namelen, &args);
1997 if (error == -EINPROGRESS)
2000 if (convert || error)
2002 if (error == -EAGAIN)
2005 unlock_recovery(ls);
2006 dlm_put_lockspace(ls);
2010 int dlm_unlock(dlm_lockspace_t *lockspace,
2013 struct dlm_lksb *lksb,
2017 struct dlm_lkb *lkb;
2018 struct dlm_args args;
2021 ls = dlm_find_lockspace_local(lockspace);
2027 error = find_lkb(ls, lkid, &lkb);
2031 error = set_unlock_args(flags, astarg, &args);
2035 if (flags & DLM_LKF_CANCEL)
2036 error = cancel_lock(ls, lkb, &args);
2038 error = unlock_lock(ls, lkb, &args);
2040 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2045 unlock_recovery(ls);
2046 dlm_put_lockspace(ls);
2051 * send/receive routines for remote operations and replies
2055 * send_request receive_request
2056 * send_convert receive_convert
2057 * send_unlock receive_unlock
2058 * send_cancel receive_cancel
2059 * send_grant receive_grant
2060 * send_bast receive_bast
2061 * send_lookup receive_lookup
2062 * send_remove receive_remove
2065 * receive_request_reply send_request_reply
2066 * receive_convert_reply send_convert_reply
2067 * receive_unlock_reply send_unlock_reply
2068 * receive_cancel_reply send_cancel_reply
2069 * receive_lookup_reply send_lookup_reply
2072 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2073 int to_nodeid, int mstype,
2074 struct dlm_message **ms_ret,
2075 struct dlm_mhandle **mh_ret)
2077 struct dlm_message *ms;
2078 struct dlm_mhandle *mh;
2080 int mb_len = sizeof(struct dlm_message);
2083 case DLM_MSG_REQUEST:
2084 case DLM_MSG_LOOKUP:
2085 case DLM_MSG_REMOVE:
2086 mb_len += r->res_length;
2088 case DLM_MSG_CONVERT:
2089 case DLM_MSG_UNLOCK:
2090 case DLM_MSG_REQUEST_REPLY:
2091 case DLM_MSG_CONVERT_REPLY:
2093 if (lkb && lkb->lkb_lvbptr)
2094 mb_len += r->res_ls->ls_lvblen;
2098 /* get_buffer gives us a message handle (mh) that we need to
2099 pass into lowcomms_commit and a message buffer (mb) that we
2100 write our data into */
2102 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2106 memset(mb, 0, mb_len);
2108 ms = (struct dlm_message *) mb;
2110 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2111 ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2112 ms->m_header.h_nodeid = dlm_our_nodeid();
2113 ms->m_header.h_length = mb_len;
2114 ms->m_header.h_cmd = DLM_MSG;
2116 ms->m_type = mstype;
2123 /* further lowcomms enhancements or alternate implementations may make
2124 the return value from this function useful at some point */
2126 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2128 dlm_message_out(ms);
2129 dlm_lowcomms_commit_buffer(mh);
2133 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2134 struct dlm_message *ms)
2136 ms->m_nodeid = lkb->lkb_nodeid;
2137 ms->m_pid = lkb->lkb_ownpid;
2138 ms->m_lkid = lkb->lkb_id;
2139 ms->m_remid = lkb->lkb_remid;
2140 ms->m_exflags = lkb->lkb_exflags;
2141 ms->m_sbflags = lkb->lkb_sbflags;
2142 ms->m_flags = lkb->lkb_flags;
2143 ms->m_lvbseq = lkb->lkb_lvbseq;
2144 ms->m_status = lkb->lkb_status;
2145 ms->m_grmode = lkb->lkb_grmode;
2146 ms->m_rqmode = lkb->lkb_rqmode;
2147 ms->m_hash = r->res_hash;
2149 /* m_result and m_bastmode are set from function args,
2150 not from lkb fields */
2152 if (lkb->lkb_bastaddr)
2153 ms->m_asts |= AST_BAST;
2154 if (lkb->lkb_astaddr)
2155 ms->m_asts |= AST_COMP;
2157 if (lkb->lkb_range) {
2158 ms->m_range[0] = lkb->lkb_range[RQ_RANGE_START];
2159 ms->m_range[1] = lkb->lkb_range[RQ_RANGE_END];
2162 if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2163 memcpy(ms->m_extra, r->res_name, r->res_length);
2165 else if (lkb->lkb_lvbptr)
2166 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2170 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2172 struct dlm_message *ms;
2173 struct dlm_mhandle *mh;
2174 int to_nodeid, error;
2176 add_to_waiters(lkb, mstype);
2178 to_nodeid = r->res_nodeid;
2180 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2184 send_args(r, lkb, ms);
2186 error = send_message(mh, ms);
2192 remove_from_waiters(lkb);
2196 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2198 return send_common(r, lkb, DLM_MSG_REQUEST);
2201 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2205 error = send_common(r, lkb, DLM_MSG_CONVERT);
2207 /* down conversions go without a reply from the master */
2208 if (!error && down_conversion(lkb)) {
2209 remove_from_waiters(lkb);
2210 r->res_ls->ls_stub_ms.m_result = 0;
2211 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2217 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2218 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2219 that the master is still correct. */
2221 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2223 return send_common(r, lkb, DLM_MSG_UNLOCK);
2226 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2228 return send_common(r, lkb, DLM_MSG_CANCEL);
2231 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2233 struct dlm_message *ms;
2234 struct dlm_mhandle *mh;
2235 int to_nodeid, error;
2237 to_nodeid = lkb->lkb_nodeid;
2239 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2243 send_args(r, lkb, ms);
2247 error = send_message(mh, ms);
2252 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2254 struct dlm_message *ms;
2255 struct dlm_mhandle *mh;
2256 int to_nodeid, error;
2258 to_nodeid = lkb->lkb_nodeid;
2260 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2264 send_args(r, lkb, ms);
2266 ms->m_bastmode = mode;
2268 error = send_message(mh, ms);
2273 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2275 struct dlm_message *ms;
2276 struct dlm_mhandle *mh;
2277 int to_nodeid, error;
2279 add_to_waiters(lkb, DLM_MSG_LOOKUP);
2281 to_nodeid = dlm_dir_nodeid(r);
2283 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2287 send_args(r, lkb, ms);
2289 error = send_message(mh, ms);
2295 remove_from_waiters(lkb);
2299 static int send_remove(struct dlm_rsb *r)
2301 struct dlm_message *ms;
2302 struct dlm_mhandle *mh;
2303 int to_nodeid, error;
2305 to_nodeid = dlm_dir_nodeid(r);
2307 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2311 memcpy(ms->m_extra, r->res_name, r->res_length);
2312 ms->m_hash = r->res_hash;
2314 error = send_message(mh, ms);
2319 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2322 struct dlm_message *ms;
2323 struct dlm_mhandle *mh;
2324 int to_nodeid, error;
2326 to_nodeid = lkb->lkb_nodeid;
2328 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2332 send_args(r, lkb, ms);
2336 error = send_message(mh, ms);
2341 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2343 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2346 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2348 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2351 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2353 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2356 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2358 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2361 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2362 int ret_nodeid, int rv)
2364 struct dlm_rsb *r = &ls->ls_stub_rsb;
2365 struct dlm_message *ms;
2366 struct dlm_mhandle *mh;
2367 int error, nodeid = ms_in->m_header.h_nodeid;
2369 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2373 ms->m_lkid = ms_in->m_lkid;
2375 ms->m_nodeid = ret_nodeid;
2377 error = send_message(mh, ms);
2382 /* which args we save from a received message depends heavily on the type
2383 of message, unlike the send side where we can safely send everything about
2384 the lkb for any type of message */
2386 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2388 lkb->lkb_exflags = ms->m_exflags;
2389 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2390 (ms->m_flags & 0x0000FFFF);
2393 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2395 lkb->lkb_sbflags = ms->m_sbflags;
2396 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2397 (ms->m_flags & 0x0000FFFF);
2400 static int receive_extralen(struct dlm_message *ms)
2402 return (ms->m_header.h_length - sizeof(struct dlm_message));
2405 static int receive_range(struct dlm_ls *ls, struct dlm_lkb *lkb,
2406 struct dlm_message *ms)
2408 if (lkb->lkb_flags & DLM_IFL_RANGE) {
2409 if (!lkb->lkb_range)
2410 lkb->lkb_range = allocate_range(ls);
2411 if (!lkb->lkb_range)
2413 lkb->lkb_range[RQ_RANGE_START] = ms->m_range[0];
2414 lkb->lkb_range[RQ_RANGE_END] = ms->m_range[1];
2419 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2420 struct dlm_message *ms)
2424 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2425 if (!lkb->lkb_lvbptr)
2426 lkb->lkb_lvbptr = allocate_lvb(ls);
2427 if (!lkb->lkb_lvbptr)
2429 len = receive_extralen(ms);
2430 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2435 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2436 struct dlm_message *ms)
2438 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2439 lkb->lkb_ownpid = ms->m_pid;
2440 lkb->lkb_remid = ms->m_lkid;
2441 lkb->lkb_grmode = DLM_LOCK_IV;
2442 lkb->lkb_rqmode = ms->m_rqmode;
2443 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2444 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2446 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2448 if (receive_range(ls, lkb, ms))
2451 if (receive_lvb(ls, lkb, ms))
2457 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2458 struct dlm_message *ms)
2460 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2461 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2462 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2463 lkb->lkb_id, lkb->lkb_remid);
2467 if (!is_master_copy(lkb))
2470 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2473 if (receive_range(ls, lkb, ms))
2475 if (lkb->lkb_range) {
2476 lkb->lkb_range[GR_RANGE_START] = 0LL;
2477 lkb->lkb_range[GR_RANGE_END] = 0xffffffffffffffffULL;
2480 if (receive_lvb(ls, lkb, ms))
2483 lkb->lkb_rqmode = ms->m_rqmode;
2484 lkb->lkb_lvbseq = ms->m_lvbseq;
2489 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2490 struct dlm_message *ms)
2492 if (!is_master_copy(lkb))
2494 if (receive_lvb(ls, lkb, ms))
2499 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2500 uses to send a reply and that the remote end uses to process the reply. */
2502 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2504 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2505 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2506 lkb->lkb_remid = ms->m_lkid;
2509 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2511 struct dlm_lkb *lkb;
2515 error = create_lkb(ls, &lkb);
2519 receive_flags(lkb, ms);
2520 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2521 error = receive_request_args(ls, lkb, ms);
2527 namelen = receive_extralen(ms);
2529 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2538 error = do_request(r, lkb);
2539 send_request_reply(r, lkb, error);
2544 if (error == -EINPROGRESS)
2551 setup_stub_lkb(ls, ms);
2552 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2555 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2557 struct dlm_lkb *lkb;
2559 int error, reply = TRUE;
2561 error = find_lkb(ls, ms->m_remid, &lkb);
2565 r = lkb->lkb_resource;
2570 receive_flags(lkb, ms);
2571 error = receive_convert_args(ls, lkb, ms);
2574 reply = !down_conversion(lkb);
2576 error = do_convert(r, lkb);
2579 send_convert_reply(r, lkb, error);
2587 setup_stub_lkb(ls, ms);
2588 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2591 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2593 struct dlm_lkb *lkb;
2597 error = find_lkb(ls, ms->m_remid, &lkb);
2601 r = lkb->lkb_resource;
2606 receive_flags(lkb, ms);
2607 error = receive_unlock_args(ls, lkb, ms);
2611 error = do_unlock(r, lkb);
2613 send_unlock_reply(r, lkb, error);
2621 setup_stub_lkb(ls, ms);
2622 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2625 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2627 struct dlm_lkb *lkb;
2631 error = find_lkb(ls, ms->m_remid, &lkb);
2635 receive_flags(lkb, ms);
2637 r = lkb->lkb_resource;
2642 error = do_cancel(r, lkb);
2643 send_cancel_reply(r, lkb, error);
2651 setup_stub_lkb(ls, ms);
2652 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2655 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2657 struct dlm_lkb *lkb;
2661 error = find_lkb(ls, ms->m_remid, &lkb);
2663 log_error(ls, "receive_grant no lkb");
2666 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2668 r = lkb->lkb_resource;
2673 receive_flags_reply(lkb, ms);
2674 grant_lock_pc(r, lkb, ms);
2675 queue_cast(r, lkb, 0);
2682 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2684 struct dlm_lkb *lkb;
2688 error = find_lkb(ls, ms->m_remid, &lkb);
2690 log_error(ls, "receive_bast no lkb");
2693 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2695 r = lkb->lkb_resource;
2700 queue_bast(r, lkb, ms->m_bastmode);
2707 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2709 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2711 from_nodeid = ms->m_header.h_nodeid;
2712 our_nodeid = dlm_our_nodeid();
2714 len = receive_extralen(ms);
2716 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2717 if (dir_nodeid != our_nodeid) {
2718 log_error(ls, "lookup dir_nodeid %d from %d",
2719 dir_nodeid, from_nodeid);
2725 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2727 /* Optimization: we're master so treat lookup as a request */
2728 if (!error && ret_nodeid == our_nodeid) {
2729 receive_request(ls, ms);
2733 send_lookup_reply(ls, ms, ret_nodeid, error);
2736 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2738 int len, dir_nodeid, from_nodeid;
2740 from_nodeid = ms->m_header.h_nodeid;
2742 len = receive_extralen(ms);
2744 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2745 if (dir_nodeid != dlm_our_nodeid()) {
2746 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2747 dir_nodeid, from_nodeid);
2751 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2754 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2756 struct dlm_lkb *lkb;
2760 error = find_lkb(ls, ms->m_remid, &lkb);
2762 log_error(ls, "receive_request_reply no lkb");
2765 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2767 mstype = lkb->lkb_wait_type;
2768 error = remove_from_waiters(lkb);
2770 log_error(ls, "receive_request_reply not on waiters");
2774 /* this is the value returned from do_request() on the master */
2775 error = ms->m_result;
2777 r = lkb->lkb_resource;
2781 /* Optimization: the dir node was also the master, so it took our
2782 lookup as a request and sent request reply instead of lookup reply */
2783 if (mstype == DLM_MSG_LOOKUP) {
2784 r->res_nodeid = ms->m_header.h_nodeid;
2785 lkb->lkb_nodeid = r->res_nodeid;
2790 /* request would block (be queued) on remote master;
2791 the unhold undoes the original ref from create_lkb()
2792 so it leads to the lkb being freed */
2793 queue_cast(r, lkb, -EAGAIN);
2794 confirm_master(r, -EAGAIN);
2800 /* request was queued or granted on remote master */
2801 receive_flags_reply(lkb, ms);
2802 lkb->lkb_remid = ms->m_lkid;
2804 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2806 grant_lock_pc(r, lkb, ms);
2807 queue_cast(r, lkb, 0);
2809 confirm_master(r, error);
2814 /* find_rsb failed to find rsb or rsb wasn't master */
2816 lkb->lkb_nodeid = -1;
2817 _request_lock(r, lkb);
2821 log_error(ls, "receive_request_reply error %d", error);
2830 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2831 struct dlm_message *ms)
2833 int error = ms->m_result;
2835 /* this is the value returned from do_convert() on the master */
2839 /* convert would block (be queued) on remote master */
2840 queue_cast(r, lkb, -EAGAIN);
2844 /* convert was queued on remote master */
2846 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2850 /* convert was granted on remote master */
2851 receive_flags_reply(lkb, ms);
2852 grant_lock_pc(r, lkb, ms);
2853 queue_cast(r, lkb, 0);
2857 log_error(r->res_ls, "receive_convert_reply error %d", error);
2861 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2863 struct dlm_rsb *r = lkb->lkb_resource;
2868 __receive_convert_reply(r, lkb, ms);
2874 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2876 struct dlm_lkb *lkb;
2879 error = find_lkb(ls, ms->m_remid, &lkb);
2881 log_error(ls, "receive_convert_reply no lkb");
2884 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2886 error = remove_from_waiters(lkb);
2888 log_error(ls, "receive_convert_reply not on waiters");
2892 _receive_convert_reply(lkb, ms);
2897 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2899 struct dlm_rsb *r = lkb->lkb_resource;
2900 int error = ms->m_result;
2905 /* this is the value returned from do_unlock() on the master */
2909 receive_flags_reply(lkb, ms);
2910 remove_lock_pc(r, lkb);
2911 queue_cast(r, lkb, -DLM_EUNLOCK);
2914 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2921 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2923 struct dlm_lkb *lkb;
2926 error = find_lkb(ls, ms->m_remid, &lkb);
2928 log_error(ls, "receive_unlock_reply no lkb");
2931 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2933 error = remove_from_waiters(lkb);
2935 log_error(ls, "receive_unlock_reply not on waiters");
2939 _receive_unlock_reply(lkb, ms);
2944 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2946 struct dlm_rsb *r = lkb->lkb_resource;
2947 int error = ms->m_result;
2952 /* this is the value returned from do_cancel() on the master */
2956 receive_flags_reply(lkb, ms);
2957 revert_lock_pc(r, lkb);
2958 queue_cast(r, lkb, -DLM_ECANCEL);
2961 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2968 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2970 struct dlm_lkb *lkb;
2973 error = find_lkb(ls, ms->m_remid, &lkb);
2975 log_error(ls, "receive_cancel_reply no lkb");
2978 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2980 error = remove_from_waiters(lkb);
2982 log_error(ls, "receive_cancel_reply not on waiters");
2986 _receive_cancel_reply(lkb, ms);
2991 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2993 struct dlm_lkb *lkb;
2995 int error, ret_nodeid;
2997 error = find_lkb(ls, ms->m_lkid, &lkb);
2999 log_error(ls, "receive_lookup_reply no lkb");
3003 error = remove_from_waiters(lkb);
3005 log_error(ls, "receive_lookup_reply not on waiters");
3009 /* this is the value returned by dlm_dir_lookup on dir node
3010 FIXME: will a non-zero error ever be returned? */
3011 error = ms->m_result;
3013 r = lkb->lkb_resource;
3017 ret_nodeid = ms->m_nodeid;
3018 if (ret_nodeid == dlm_our_nodeid()) {
3021 r->res_first_lkid = 0;
3023 /* set_master() will copy res_nodeid to lkb_nodeid */
3024 r->res_nodeid = ret_nodeid;
3027 _request_lock(r, lkb);
3030 process_lookup_list(r);
3038 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3040 struct dlm_message *ms = (struct dlm_message *) hd;
3047 ls = dlm_find_lockspace_global(hd->h_lockspace);
3049 log_print("drop message %d from %d for unknown lockspace %d",
3050 ms->m_type, nodeid, hd->h_lockspace);
3054 /* recovery may have just ended leaving a bunch of backed-up requests
3055 in the requestqueue; wait while dlm_recoverd clears them */
3058 dlm_wait_requestqueue(ls);
3060 /* recovery may have just started while there were a bunch of
3061 in-flight requests -- save them in requestqueue to be processed
3062 after recovery. we can't let dlm_recvd block on the recovery
3063 lock. if dlm_recoverd is calling this function to clear the
3064 requestqueue, it needs to be interrupted (-EINTR) if another
3065 recovery operation is starting. */
3068 if (dlm_locking_stopped(ls)) {
3070 dlm_add_requestqueue(ls, nodeid, hd);
3075 if (lock_recovery_try(ls))
3080 switch (ms->m_type) {
3082 /* messages sent to a master node */
3084 case DLM_MSG_REQUEST:
3085 receive_request(ls, ms);
3088 case DLM_MSG_CONVERT:
3089 receive_convert(ls, ms);
3092 case DLM_MSG_UNLOCK:
3093 receive_unlock(ls, ms);
3096 case DLM_MSG_CANCEL:
3097 receive_cancel(ls, ms);
3100 /* messages sent from a master node (replies to above) */
3102 case DLM_MSG_REQUEST_REPLY:
3103 receive_request_reply(ls, ms);
3106 case DLM_MSG_CONVERT_REPLY:
3107 receive_convert_reply(ls, ms);
3110 case DLM_MSG_UNLOCK_REPLY:
3111 receive_unlock_reply(ls, ms);
3114 case DLM_MSG_CANCEL_REPLY:
3115 receive_cancel_reply(ls, ms);
3118 /* messages sent from a master node (only two types of async msg) */
3121 receive_grant(ls, ms);
3125 receive_bast(ls, ms);
3128 /* messages sent to a dir node */
3130 case DLM_MSG_LOOKUP:
3131 receive_lookup(ls, ms);
3134 case DLM_MSG_REMOVE:
3135 receive_remove(ls, ms);
3138 /* messages sent from a dir node (remove has no reply) */
3140 case DLM_MSG_LOOKUP_REPLY:
3141 receive_lookup_reply(ls, ms);
3145 log_error(ls, "unknown message type %d", ms->m_type);
3148 unlock_recovery(ls);
3150 dlm_put_lockspace(ls);
3160 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3162 if (middle_conversion(lkb)) {
3164 ls->ls_stub_ms.m_result = -EINPROGRESS;
3165 _remove_from_waiters(lkb);
3166 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3168 /* Same special case as in receive_rcom_lock_args() */
3169 lkb->lkb_grmode = DLM_LOCK_IV;
3170 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3173 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3174 lkb->lkb_flags |= DLM_IFL_RESEND;
3177 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3178 conversions are async; there's no reply from the remote master */
3181 /* A waiting lkb needs recovery if the master node has failed, or
3182 the master node is changing (only when no directory is used) */
3184 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3186 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3189 if (!dlm_no_directory(ls))
3192 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3198 /* Recovery for locks that are waiting for replies from nodes that are now
3199 gone. We can just complete unlocks and cancels by faking a reply from the
3200 dead node. Requests and up-conversions we flag to be resent after
3201 recovery. Down-conversions can just be completed with a fake reply like
3202 unlocks. Conversions between PR and CW need special attention. */
3204 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3206 struct dlm_lkb *lkb, *safe;
3208 down(&ls->ls_waiters_sem);
3210 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3211 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3212 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3214 /* all outstanding lookups, regardless of destination will be
3215 resent after recovery is done */
3217 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3218 lkb->lkb_flags |= DLM_IFL_RESEND;
3222 if (!waiter_needs_recovery(ls, lkb))
3225 switch (lkb->lkb_wait_type) {
3227 case DLM_MSG_REQUEST:
3228 lkb->lkb_flags |= DLM_IFL_RESEND;
3231 case DLM_MSG_CONVERT:
3232 recover_convert_waiter(ls, lkb);
3235 case DLM_MSG_UNLOCK:
3237 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3238 _remove_from_waiters(lkb);
3239 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3243 case DLM_MSG_CANCEL:
3245 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3246 _remove_from_waiters(lkb);
3247 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3252 log_error(ls, "invalid lkb wait_type %d",
3253 lkb->lkb_wait_type);
3256 up(&ls->ls_waiters_sem);
3259 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3261 struct dlm_lkb *lkb;
3264 down(&ls->ls_waiters_sem);
3265 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3266 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3267 rv = lkb->lkb_wait_type;
3268 _remove_from_waiters(lkb);
3269 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3273 up(&ls->ls_waiters_sem);
3281 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3282 master or dir-node for r. Processing the lkb may result in it being placed
3285 int dlm_recover_waiters_post(struct dlm_ls *ls)
3287 struct dlm_lkb *lkb;
3289 int error = 0, mstype;
3292 if (dlm_locking_stopped(ls)) {
3293 log_debug(ls, "recover_waiters_post aborted");
3298 mstype = remove_resend_waiter(ls, &lkb);
3302 r = lkb->lkb_resource;
3304 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3305 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3309 case DLM_MSG_LOOKUP:
3312 _request_lock(r, lkb);
3314 confirm_master(r, 0);
3319 case DLM_MSG_REQUEST:
3322 _request_lock(r, lkb);
3327 case DLM_MSG_CONVERT:
3330 _convert_lock(r, lkb);
3336 log_error(ls, "recover_waiters_post type %d", mstype);
3343 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3344 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3346 struct dlm_ls *ls = r->res_ls;
3347 struct dlm_lkb *lkb, *safe;
3349 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3350 if (test(ls, lkb)) {
3352 /* this put should free the lkb */
3354 log_error(ls, "purged lkb not released");
3359 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3361 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3364 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3366 return is_master_copy(lkb);
3369 static void purge_dead_locks(struct dlm_rsb *r)
3371 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3372 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3373 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3376 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3378 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3379 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3380 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3383 /* Get rid of locks held by nodes that are gone. */
3385 int dlm_purge_locks(struct dlm_ls *ls)
3389 log_debug(ls, "dlm_purge_locks");
3391 down_write(&ls->ls_root_sem);
3392 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3396 purge_dead_locks(r);
3402 up_write(&ls->ls_root_sem);
3407 int dlm_grant_after_purge(struct dlm_ls *ls)
3412 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
3413 read_lock(&ls->ls_rsbtbl[i].lock);
3414 list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
3418 grant_pending_locks(r);
3419 confirm_master(r, 0);
3424 read_unlock(&ls->ls_rsbtbl[i].lock);
3430 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3433 struct dlm_lkb *lkb;
3435 list_for_each_entry(lkb, head, lkb_statequeue) {
3436 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3442 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3445 struct dlm_lkb *lkb;
3447 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3450 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3453 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3459 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3460 struct dlm_rsb *r, struct dlm_rcom *rc)
3462 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3465 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3466 lkb->lkb_ownpid = rl->rl_ownpid;
3467 lkb->lkb_remid = rl->rl_lkid;
3468 lkb->lkb_exflags = rl->rl_exflags;
3469 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3470 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3471 lkb->lkb_lvbseq = rl->rl_lvbseq;
3472 lkb->lkb_rqmode = rl->rl_rqmode;
3473 lkb->lkb_grmode = rl->rl_grmode;
3474 /* don't set lkb_status because add_lkb wants to itself */
3476 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3477 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3479 if (lkb->lkb_flags & DLM_IFL_RANGE) {
3480 lkb->lkb_range = allocate_range(ls);
3481 if (!lkb->lkb_range)
3483 memcpy(lkb->lkb_range, rl->rl_range, 4*sizeof(uint64_t));
3486 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3487 lkb->lkb_lvbptr = allocate_lvb(ls);
3488 if (!lkb->lkb_lvbptr)
3490 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3491 sizeof(struct rcom_lock);
3492 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3495 /* Conversions between PR and CW (middle modes) need special handling.
3496 The real granted mode of these converting locks cannot be determined
3497 until all locks have been rebuilt on the rsb (recover_conversion) */
3499 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3500 rl->rl_status = DLM_LKSTS_CONVERT;
3501 lkb->lkb_grmode = DLM_LOCK_IV;
3502 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3508 /* This lkb may have been recovered in a previous aborted recovery so we need
3509 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3510 If so we just send back a standard reply. If not, we create a new lkb with
3511 the given values and send back our lkid. We send back our lkid by sending
3512 back the rcom_lock struct we got but with the remid field filled in. */
3514 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3516 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3518 struct dlm_lkb *lkb;
3521 if (rl->rl_parent_lkid) {
3522 error = -EOPNOTSUPP;
3526 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3532 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3538 error = create_lkb(ls, &lkb);
3542 error = receive_rcom_lock_args(ls, lkb, r, rc);
3549 add_lkb(r, lkb, rl->rl_status);
3553 /* this is the new value returned to the lock holder for
3554 saving in its process-copy lkb */
3555 rl->rl_remid = lkb->lkb_id;
3562 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3563 rl->rl_result = error;
3567 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3569 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3571 struct dlm_lkb *lkb;
3574 error = find_lkb(ls, rl->rl_lkid, &lkb);
3576 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3580 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3582 error = rl->rl_result;
3584 r = lkb->lkb_resource;
3590 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3593 lkb->lkb_remid = rl->rl_remid;
3596 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3597 error, lkb->lkb_id);
3600 /* an ack for dlm_recover_locks() which waits for replies from
3601 all the locks it sends to new masters */
3602 dlm_recovered_lock(r);