1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
29 spin_lock(&ls->ls_recover_list_lock);
30 list_add(&de->list, &ls->ls_recover_list);
31 spin_unlock(&ls->ls_recover_list_lock);
34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
37 struct dlm_direntry *de;
39 spin_lock(&ls->ls_recover_list_lock);
40 list_for_each_entry(de, &ls->ls_recover_list, list) {
41 if (de->length == len) {
43 de->master_nodeid = 0;
44 memset(de->name, 0, len);
49 spin_unlock(&ls->ls_recover_list_lock);
52 de = kzalloc(sizeof(struct dlm_direntry) + len,
57 void dlm_clear_free_entries(struct dlm_ls *ls)
59 struct dlm_direntry *de;
61 spin_lock(&ls->ls_recover_list_lock);
62 while (!list_empty(&ls->ls_recover_list)) {
63 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
68 spin_unlock(&ls->ls_recover_list_lock);
72 * We use the upper 16 bits of the hash value to select the directory node.
73 * Low bits are used for distribution of rsb's among hash buckets on each node.
75 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
76 * num_nodes to the hash value. This value in the desired range is used as an
77 * offset into the sorted list of nodeid's to give the particular nodeid.
80 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
82 struct list_head *tmp;
83 struct dlm_member *memb = NULL;
87 if (ls->ls_num_nodes == 1) {
88 nodeid = dlm_our_nodeid();
92 if (ls->ls_node_array) {
93 node = (hash >> 16) % ls->ls_total_weight;
94 nodeid = ls->ls_node_array[node];
98 /* make_member_array() failed to kmalloc ls_node_array... */
100 node = (hash >> 16) % ls->ls_num_nodes;
102 list_for_each(tmp, &ls->ls_nodes) {
105 memb = list_entry(tmp, struct dlm_member, list);
109 DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
110 ls->ls_num_nodes, n, node););
111 nodeid = memb->nodeid;
116 int dlm_dir_nodeid(struct dlm_rsb *r)
118 return dlm_hash2nodeid(r->res_ls, r->res_hash);
121 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
125 val = jhash(name, len, 0);
126 val &= (ls->ls_dirtbl_size - 1);
131 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
135 bucket = dir_hash(ls, de->name, de->length);
136 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
139 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
140 int namelen, uint32_t bucket)
142 struct dlm_direntry *de;
144 list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
145 if (de->length == namelen && !memcmp(name, de->name, namelen))
153 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
155 struct dlm_direntry *de;
158 bucket = dir_hash(ls, name, namelen);
160 spin_lock(&ls->ls_dirtbl[bucket].lock);
162 de = search_bucket(ls, name, namelen, bucket);
165 log_error(ls, "remove fr %u none", nodeid);
169 if (de->master_nodeid != nodeid) {
170 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
177 spin_unlock(&ls->ls_dirtbl[bucket].lock);
180 void dlm_dir_clear(struct dlm_ls *ls)
182 struct list_head *head;
183 struct dlm_direntry *de;
186 DLM_ASSERT(list_empty(&ls->ls_recover_list), );
188 for (i = 0; i < ls->ls_dirtbl_size; i++) {
189 spin_lock(&ls->ls_dirtbl[i].lock);
190 head = &ls->ls_dirtbl[i].list;
191 while (!list_empty(head)) {
192 de = list_entry(head->next, struct dlm_direntry, list);
196 spin_unlock(&ls->ls_dirtbl[i].lock);
200 int dlm_recover_directory(struct dlm_ls *ls)
202 struct dlm_member *memb;
203 struct dlm_direntry *de;
204 char *b, *last_name = NULL;
205 int error = -ENOMEM, last_len, count = 0;
208 log_debug(ls, "dlm_recover_directory");
210 if (dlm_no_directory(ls))
215 last_name = kmalloc(DLM_RESNAME_MAXLEN, ls->ls_allocation);
219 list_for_each_entry(memb, &ls->ls_nodes, list) {
220 memset(last_name, 0, DLM_RESNAME_MAXLEN);
225 error = dlm_recovery_stopped(ls);
229 error = dlm_rcom_names(ls, memb->nodeid,
230 last_name, last_len);
237 * pick namelen/name pairs out of received buffer
240 b = ls->ls_recover_buf->rc_buf;
241 left = ls->ls_recover_buf->rc_header.h_length;
242 left -= sizeof(struct dlm_rcom);
248 if (left < sizeof(__be16))
251 memcpy(&v, b, sizeof(__be16));
252 namelen = be16_to_cpu(v);
254 left -= sizeof(__be16);
256 /* namelen of 0xFFFFF marks end of names for
257 this node; namelen of 0 marks end of the
260 if (namelen == 0xFFFF)
268 if (namelen > DLM_RESNAME_MAXLEN)
272 de = get_free_de(ls, namelen);
276 de->master_nodeid = memb->nodeid;
277 de->length = namelen;
279 memcpy(de->name, b, namelen);
280 memcpy(last_name, b, namelen);
284 add_entry_to_hash(ls, de);
294 dlm_set_recover_status(ls, DLM_RS_DIR);
295 log_debug(ls, "dlm_recover_directory %d entries", count);
299 dlm_clear_free_entries(ls);
303 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
304 int namelen, int *r_nodeid)
306 struct dlm_direntry *de, *tmp;
309 bucket = dir_hash(ls, name, namelen);
311 spin_lock(&ls->ls_dirtbl[bucket].lock);
312 de = search_bucket(ls, name, namelen, bucket);
314 *r_nodeid = de->master_nodeid;
315 spin_unlock(&ls->ls_dirtbl[bucket].lock);
316 if (*r_nodeid == nodeid)
321 spin_unlock(&ls->ls_dirtbl[bucket].lock);
323 if (namelen > DLM_RESNAME_MAXLEN)
326 de = kzalloc(sizeof(struct dlm_direntry) + namelen, ls->ls_allocation);
330 de->master_nodeid = nodeid;
331 de->length = namelen;
332 memcpy(de->name, name, namelen);
334 spin_lock(&ls->ls_dirtbl[bucket].lock);
335 tmp = search_bucket(ls, name, namelen, bucket);
340 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
342 *r_nodeid = de->master_nodeid;
343 spin_unlock(&ls->ls_dirtbl[bucket].lock);
347 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
350 return get_entry(ls, nodeid, name, namelen, r_nodeid);
353 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
357 down_read(&ls->ls_root_sem);
358 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
359 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
360 up_read(&ls->ls_root_sem);
364 up_read(&ls->ls_root_sem);
368 /* Find the rsb where we left off (or start again), then send rsb names
369 for rsb's we're master of and whose directory node matches the requesting
370 node. inbuf is the rsb name last sent, inlen is the name's length */
372 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
373 char *outbuf, int outlen, int nodeid)
375 struct list_head *list;
377 int offset = 0, dir_nodeid;
380 down_read(&ls->ls_root_sem);
383 r = find_rsb_root(ls, inbuf, inlen);
385 inbuf[inlen - 1] = '\0';
386 log_error(ls, "copy_master_names from %d start %d %s",
387 nodeid, inlen, inbuf);
390 list = r->res_root_list.next;
392 list = ls->ls_root_list.next;
395 for (offset = 0; list != &ls->ls_root_list; list = list->next) {
396 r = list_entry(list, struct dlm_rsb, res_root_list);
400 dir_nodeid = dlm_dir_nodeid(r);
401 if (dir_nodeid != nodeid)
405 * The block ends when we can't fit the following in the
406 * remaining buffer space:
407 * namelen (uint16_t) +
408 * name (r->res_length) +
409 * end-of-block record 0x0000 (uint16_t)
412 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
413 /* Write end-of-block record */
414 be_namelen = cpu_to_be16(0);
415 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
416 offset += sizeof(__be16);
420 be_namelen = cpu_to_be16(r->res_length);
421 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
422 offset += sizeof(__be16);
423 memcpy(outbuf + offset, r->res_name, r->res_length);
424 offset += r->res_length;
428 * If we've reached the end of the list (and there's room) write a
429 * terminating record.
432 if ((list == &ls->ls_root_list) &&
433 (offset + sizeof(uint16_t) <= outlen)) {
434 be_namelen = cpu_to_be16(0xFFFF);
435 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
436 offset += sizeof(__be16);
440 up_read(&ls->ls_root_sem);