[TCP]: Increase the max_burst threshold from 3 to tp->reordering.
[linux-2.6] / fs / dlm / dir.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25
26
27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28 {
29         spin_lock(&ls->ls_recover_list_lock);
30         list_add(&de->list, &ls->ls_recover_list);
31         spin_unlock(&ls->ls_recover_list_lock);
32 }
33
34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35 {
36         int found = 0;
37         struct dlm_direntry *de;
38
39         spin_lock(&ls->ls_recover_list_lock);
40         list_for_each_entry(de, &ls->ls_recover_list, list) {
41                 if (de->length == len) {
42                         list_del(&de->list);
43                         de->master_nodeid = 0;
44                         memset(de->name, 0, len);
45                         found = 1;
46                         break;
47                 }
48         }
49         spin_unlock(&ls->ls_recover_list_lock);
50
51         if (!found)
52                 de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_KERNEL);
53         return de;
54 }
55
56 void dlm_clear_free_entries(struct dlm_ls *ls)
57 {
58         struct dlm_direntry *de;
59
60         spin_lock(&ls->ls_recover_list_lock);
61         while (!list_empty(&ls->ls_recover_list)) {
62                 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63                                 list);
64                 list_del(&de->list);
65                 kfree(de);
66         }
67         spin_unlock(&ls->ls_recover_list_lock);
68 }
69
70 /*
71  * We use the upper 16 bits of the hash value to select the directory node.
72  * Low bits are used for distribution of rsb's among hash buckets on each node.
73  *
74  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
75  * num_nodes to the hash value.  This value in the desired range is used as an
76  * offset into the sorted list of nodeid's to give the particular nodeid.
77  */
78
79 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
80 {
81         struct list_head *tmp;
82         struct dlm_member *memb = NULL;
83         uint32_t node, n = 0;
84         int nodeid;
85
86         if (ls->ls_num_nodes == 1) {
87                 nodeid = dlm_our_nodeid();
88                 goto out;
89         }
90
91         if (ls->ls_node_array) {
92                 node = (hash >> 16) % ls->ls_total_weight;
93                 nodeid = ls->ls_node_array[node];
94                 goto out;
95         }
96
97         /* make_member_array() failed to kmalloc ls_node_array... */
98
99         node = (hash >> 16) % ls->ls_num_nodes;
100
101         list_for_each(tmp, &ls->ls_nodes) {
102                 if (n++ != node)
103                         continue;
104                 memb = list_entry(tmp, struct dlm_member, list);
105                 break;
106         }
107
108         DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109                                  ls->ls_num_nodes, n, node););
110         nodeid = memb->nodeid;
111  out:
112         return nodeid;
113 }
114
115 int dlm_dir_nodeid(struct dlm_rsb *r)
116 {
117         return dlm_hash2nodeid(r->res_ls, r->res_hash);
118 }
119
120 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121 {
122         uint32_t val;
123
124         val = jhash(name, len, 0);
125         val &= (ls->ls_dirtbl_size - 1);
126
127         return val;
128 }
129
130 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131 {
132         uint32_t bucket;
133
134         bucket = dir_hash(ls, de->name, de->length);
135         list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136 }
137
138 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
139                                           int namelen, uint32_t bucket)
140 {
141         struct dlm_direntry *de;
142
143         list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144                 if (de->length == namelen && !memcmp(name, de->name, namelen))
145                         goto out;
146         }
147         de = NULL;
148  out:
149         return de;
150 }
151
152 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153 {
154         struct dlm_direntry *de;
155         uint32_t bucket;
156
157         bucket = dir_hash(ls, name, namelen);
158
159         write_lock(&ls->ls_dirtbl[bucket].lock);
160
161         de = search_bucket(ls, name, namelen, bucket);
162
163         if (!de) {
164                 log_error(ls, "remove fr %u none", nodeid);
165                 goto out;
166         }
167
168         if (de->master_nodeid != nodeid) {
169                 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170                 goto out;
171         }
172
173         list_del(&de->list);
174         kfree(de);
175  out:
176         write_unlock(&ls->ls_dirtbl[bucket].lock);
177 }
178
179 void dlm_dir_clear(struct dlm_ls *ls)
180 {
181         struct list_head *head;
182         struct dlm_direntry *de;
183         int i;
184
185         DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186
187         for (i = 0; i < ls->ls_dirtbl_size; i++) {
188                 write_lock(&ls->ls_dirtbl[i].lock);
189                 head = &ls->ls_dirtbl[i].list;
190                 while (!list_empty(head)) {
191                         de = list_entry(head->next, struct dlm_direntry, list);
192                         list_del(&de->list);
193                         put_free_de(ls, de);
194                 }
195                 write_unlock(&ls->ls_dirtbl[i].lock);
196         }
197 }
198
199 int dlm_recover_directory(struct dlm_ls *ls)
200 {
201         struct dlm_member *memb;
202         struct dlm_direntry *de;
203         char *b, *last_name = NULL;
204         int error = -ENOMEM, last_len, count = 0;
205         uint16_t namelen;
206
207         log_debug(ls, "dlm_recover_directory");
208
209         if (dlm_no_directory(ls))
210                 goto out_status;
211
212         dlm_dir_clear(ls);
213
214         last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_KERNEL);
215         if (!last_name)
216                 goto out;
217
218         list_for_each_entry(memb, &ls->ls_nodes, list) {
219                 memset(last_name, 0, DLM_RESNAME_MAXLEN);
220                 last_len = 0;
221
222                 for (;;) {
223                         int left;
224                         error = dlm_recovery_stopped(ls);
225                         if (error)
226                                 goto out_free;
227
228                         error = dlm_rcom_names(ls, memb->nodeid,
229                                                last_name, last_len);
230                         if (error)
231                                 goto out_free;
232
233                         schedule();
234
235                         /*
236                          * pick namelen/name pairs out of received buffer
237                          */
238
239                         b = ls->ls_recover_buf->rc_buf;
240                         left = ls->ls_recover_buf->rc_header.h_length;
241                         left -= sizeof(struct dlm_rcom);
242
243                         for (;;) {
244                                 __be16 v;
245
246                                 error = -EINVAL;
247                                 if (left < sizeof(__be16))
248                                         goto out_free;
249
250                                 memcpy(&v, b, sizeof(__be16));
251                                 namelen = be16_to_cpu(v);
252                                 b += sizeof(__be16);
253                                 left -= sizeof(__be16);
254
255                                 /* namelen of 0xFFFFF marks end of names for
256                                    this node; namelen of 0 marks end of the
257                                    buffer */
258
259                                 if (namelen == 0xFFFF)
260                                         goto done;
261                                 if (!namelen)
262                                         break;
263
264                                 if (namelen > left)
265                                         goto out_free;
266
267                                 if (namelen > DLM_RESNAME_MAXLEN)
268                                         goto out_free;
269
270                                 error = -ENOMEM;
271                                 de = get_free_de(ls, namelen);
272                                 if (!de)
273                                         goto out_free;
274
275                                 de->master_nodeid = memb->nodeid;
276                                 de->length = namelen;
277                                 last_len = namelen;
278                                 memcpy(de->name, b, namelen);
279                                 memcpy(last_name, b, namelen);
280                                 b += namelen;
281                                 left -= namelen;
282
283                                 add_entry_to_hash(ls, de);
284                                 count++;
285                         }
286                 }
287          done:
288                 ;
289         }
290
291  out_status:
292         error = 0;
293         dlm_set_recover_status(ls, DLM_RS_DIR);
294         log_debug(ls, "dlm_recover_directory %d entries", count);
295  out_free:
296         kfree(last_name);
297  out:
298         dlm_clear_free_entries(ls);
299         return error;
300 }
301
302 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
303                      int namelen, int *r_nodeid)
304 {
305         struct dlm_direntry *de, *tmp;
306         uint32_t bucket;
307
308         bucket = dir_hash(ls, name, namelen);
309
310         write_lock(&ls->ls_dirtbl[bucket].lock);
311         de = search_bucket(ls, name, namelen, bucket);
312         if (de) {
313                 *r_nodeid = de->master_nodeid;
314                 write_unlock(&ls->ls_dirtbl[bucket].lock);
315                 if (*r_nodeid == nodeid)
316                         return -EEXIST;
317                 return 0;
318         }
319
320         write_unlock(&ls->ls_dirtbl[bucket].lock);
321
322         if (namelen > DLM_RESNAME_MAXLEN)
323                 return -EINVAL;
324
325         de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_KERNEL);
326         if (!de)
327                 return -ENOMEM;
328
329         de->master_nodeid = nodeid;
330         de->length = namelen;
331         memcpy(de->name, name, namelen);
332
333         write_lock(&ls->ls_dirtbl[bucket].lock);
334         tmp = search_bucket(ls, name, namelen, bucket);
335         if (tmp) {
336                 kfree(de);
337                 de = tmp;
338         } else {
339                 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
340         }
341         *r_nodeid = de->master_nodeid;
342         write_unlock(&ls->ls_dirtbl[bucket].lock);
343         return 0;
344 }
345
346 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
347                    int *r_nodeid)
348 {
349         return get_entry(ls, nodeid, name, namelen, r_nodeid);
350 }
351
352 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
353 {
354         struct dlm_rsb *r;
355
356         down_read(&ls->ls_root_sem);
357         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
358                 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
359                         up_read(&ls->ls_root_sem);
360                         return r;
361                 }
362         }
363         up_read(&ls->ls_root_sem);
364         return NULL;
365 }
366
367 /* Find the rsb where we left off (or start again), then send rsb names
368    for rsb's we're master of and whose directory node matches the requesting
369    node.  inbuf is the rsb name last sent, inlen is the name's length */
370
371 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
372                            char *outbuf, int outlen, int nodeid)
373 {
374         struct list_head *list;
375         struct dlm_rsb *r;
376         int offset = 0, dir_nodeid;
377         uint16_t be_namelen;
378
379         down_read(&ls->ls_root_sem);
380
381         if (inlen > 1) {
382                 r = find_rsb_root(ls, inbuf, inlen);
383                 if (!r) {
384                         inbuf[inlen - 1] = '\0';
385                         log_error(ls, "copy_master_names from %d start %d %s",
386                                   nodeid, inlen, inbuf);
387                         goto out;
388                 }
389                 list = r->res_root_list.next;
390         } else {
391                 list = ls->ls_root_list.next;
392         }
393
394         for (offset = 0; list != &ls->ls_root_list; list = list->next) {
395                 r = list_entry(list, struct dlm_rsb, res_root_list);
396                 if (r->res_nodeid)
397                         continue;
398
399                 dir_nodeid = dlm_dir_nodeid(r);
400                 if (dir_nodeid != nodeid)
401                         continue;
402
403                 /*
404                  * The block ends when we can't fit the following in the
405                  * remaining buffer space:
406                  * namelen (uint16_t) +
407                  * name (r->res_length) +
408                  * end-of-block record 0x0000 (uint16_t)
409                  */
410
411                 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
412                         /* Write end-of-block record */
413                         be_namelen = 0;
414                         memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
415                         offset += sizeof(uint16_t);
416                         goto out;
417                 }
418
419                 be_namelen = cpu_to_be16(r->res_length);
420                 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
421                 offset += sizeof(uint16_t);
422                 memcpy(outbuf + offset, r->res_name, r->res_length);
423                 offset += r->res_length;
424         }
425
426         /*
427          * If we've reached the end of the list (and there's room) write a
428          * terminating record.
429          */
430
431         if ((list == &ls->ls_root_list) &&
432             (offset + sizeof(uint16_t) <= outlen)) {
433                 be_namelen = 0xFFFF;
434                 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
435                 offset += sizeof(uint16_t);
436         }
437
438  out:
439         up_read(&ls->ls_root_sem);
440 }
441