Merge branch 'master' of git://git.kernel.org/pub/scm/fs/xfs/xfs
[linux-2.6] / net / sunrpc / cache.c
1 /*
2  * net/sunrpc/cache.c
3  *
4  * Generic code for various authentication-related caches
5  * used by sunrpc clients and servers.
6  *
7  * Copyright (C) 2002 Neil Brown <neilb@cse.unsw.edu.au>
8  *
9  * Released under terms in GPL version 2.  See COPYING.
10  *
11  */
12
13 #include <linux/types.h>
14 #include <linux/fs.h>
15 #include <linux/file.h>
16 #include <linux/slab.h>
17 #include <linux/signal.h>
18 #include <linux/sched.h>
19 #include <linux/kmod.h>
20 #include <linux/list.h>
21 #include <linux/module.h>
22 #include <linux/ctype.h>
23 #include <asm/uaccess.h>
24 #include <linux/poll.h>
25 #include <linux/seq_file.h>
26 #include <linux/proc_fs.h>
27 #include <linux/net.h>
28 #include <linux/workqueue.h>
29 #include <linux/mutex.h>
30 #include <asm/ioctls.h>
31 #include <linux/sunrpc/types.h>
32 #include <linux/sunrpc/cache.h>
33 #include <linux/sunrpc/stats.h>
34
35 #define  RPCDBG_FACILITY RPCDBG_CACHE
36
37 static int cache_defer_req(struct cache_req *req, struct cache_head *item);
38 static void cache_revisit_request(struct cache_head *item);
39
40 static void cache_init(struct cache_head *h)
41 {
42         time_t now = get_seconds();
43         h->next = NULL;
44         h->flags = 0;
45         kref_init(&h->ref);
46         h->expiry_time = now + CACHE_NEW_EXPIRY;
47         h->last_refresh = now;
48 }
49
50 struct cache_head *sunrpc_cache_lookup(struct cache_detail *detail,
51                                        struct cache_head *key, int hash)
52 {
53         struct cache_head **head,  **hp;
54         struct cache_head *new = NULL;
55
56         head = &detail->hash_table[hash];
57
58         read_lock(&detail->hash_lock);
59
60         for (hp=head; *hp != NULL ; hp = &(*hp)->next) {
61                 struct cache_head *tmp = *hp;
62                 if (detail->match(tmp, key)) {
63                         cache_get(tmp);
64                         read_unlock(&detail->hash_lock);
65                         return tmp;
66                 }
67         }
68         read_unlock(&detail->hash_lock);
69         /* Didn't find anything, insert an empty entry */
70
71         new = detail->alloc();
72         if (!new)
73                 return NULL;
74         /* must fully initialise 'new', else
75          * we might get lose if we need to
76          * cache_put it soon.
77          */
78         cache_init(new);
79         detail->init(new, key);
80
81         write_lock(&detail->hash_lock);
82
83         /* check if entry appeared while we slept */
84         for (hp=head; *hp != NULL ; hp = &(*hp)->next) {
85                 struct cache_head *tmp = *hp;
86                 if (detail->match(tmp, key)) {
87                         cache_get(tmp);
88                         write_unlock(&detail->hash_lock);
89                         cache_put(new, detail);
90                         return tmp;
91                 }
92         }
93         new->next = *head;
94         *head = new;
95         detail->entries++;
96         cache_get(new);
97         write_unlock(&detail->hash_lock);
98
99         return new;
100 }
101 EXPORT_SYMBOL_GPL(sunrpc_cache_lookup);
102
103
104 static void queue_loose(struct cache_detail *detail, struct cache_head *ch);
105
106 static int cache_fresh_locked(struct cache_head *head, time_t expiry)
107 {
108         head->expiry_time = expiry;
109         head->last_refresh = get_seconds();
110         return !test_and_set_bit(CACHE_VALID, &head->flags);
111 }
112
113 static void cache_fresh_unlocked(struct cache_head *head,
114                         struct cache_detail *detail, int new)
115 {
116         if (new)
117                 cache_revisit_request(head);
118         if (test_and_clear_bit(CACHE_PENDING, &head->flags)) {
119                 cache_revisit_request(head);
120                 queue_loose(detail, head);
121         }
122 }
123
124 struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
125                                        struct cache_head *new, struct cache_head *old, int hash)
126 {
127         /* The 'old' entry is to be replaced by 'new'.
128          * If 'old' is not VALID, we update it directly,
129          * otherwise we need to replace it
130          */
131         struct cache_head **head;
132         struct cache_head *tmp;
133         int is_new;
134
135         if (!test_bit(CACHE_VALID, &old->flags)) {
136                 write_lock(&detail->hash_lock);
137                 if (!test_bit(CACHE_VALID, &old->flags)) {
138                         if (test_bit(CACHE_NEGATIVE, &new->flags))
139                                 set_bit(CACHE_NEGATIVE, &old->flags);
140                         else
141                                 detail->update(old, new);
142                         is_new = cache_fresh_locked(old, new->expiry_time);
143                         write_unlock(&detail->hash_lock);
144                         cache_fresh_unlocked(old, detail, is_new);
145                         return old;
146                 }
147                 write_unlock(&detail->hash_lock);
148         }
149         /* We need to insert a new entry */
150         tmp = detail->alloc();
151         if (!tmp) {
152                 cache_put(old, detail);
153                 return NULL;
154         }
155         cache_init(tmp);
156         detail->init(tmp, old);
157         head = &detail->hash_table[hash];
158
159         write_lock(&detail->hash_lock);
160         if (test_bit(CACHE_NEGATIVE, &new->flags))
161                 set_bit(CACHE_NEGATIVE, &tmp->flags);
162         else
163                 detail->update(tmp, new);
164         tmp->next = *head;
165         *head = tmp;
166         detail->entries++;
167         cache_get(tmp);
168         is_new = cache_fresh_locked(tmp, new->expiry_time);
169         cache_fresh_locked(old, 0);
170         write_unlock(&detail->hash_lock);
171         cache_fresh_unlocked(tmp, detail, is_new);
172         cache_fresh_unlocked(old, detail, 0);
173         cache_put(old, detail);
174         return tmp;
175 }
176 EXPORT_SYMBOL_GPL(sunrpc_cache_update);
177
178 static int cache_make_upcall(struct cache_detail *detail, struct cache_head *h);
179 /*
180  * This is the generic cache management routine for all
181  * the authentication caches.
182  * It checks the currency of a cache item and will (later)
183  * initiate an upcall to fill it if needed.
184  *
185  *
186  * Returns 0 if the cache_head can be used, or cache_puts it and returns
187  * -EAGAIN if upcall is pending,
188  * -ETIMEDOUT if upcall failed and should be retried,
189  * -ENOENT if cache entry was negative
190  */
191 int cache_check(struct cache_detail *detail,
192                     struct cache_head *h, struct cache_req *rqstp)
193 {
194         int rv;
195         long refresh_age, age;
196
197         /* First decide return status as best we can */
198         if (!test_bit(CACHE_VALID, &h->flags) ||
199             h->expiry_time < get_seconds())
200                 rv = -EAGAIN;
201         else if (detail->flush_time > h->last_refresh)
202                 rv = -EAGAIN;
203         else {
204                 /* entry is valid */
205                 if (test_bit(CACHE_NEGATIVE, &h->flags))
206                         rv = -ENOENT;
207                 else rv = 0;
208         }
209
210         /* now see if we want to start an upcall */
211         refresh_age = (h->expiry_time - h->last_refresh);
212         age = get_seconds() - h->last_refresh;
213
214         if (rqstp == NULL) {
215                 if (rv == -EAGAIN)
216                         rv = -ENOENT;
217         } else if (rv == -EAGAIN || age > refresh_age/2) {
218                 dprintk("RPC:       Want update, refage=%ld, age=%ld\n",
219                                 refresh_age, age);
220                 if (!test_and_set_bit(CACHE_PENDING, &h->flags)) {
221                         switch (cache_make_upcall(detail, h)) {
222                         case -EINVAL:
223                                 clear_bit(CACHE_PENDING, &h->flags);
224                                 if (rv == -EAGAIN) {
225                                         set_bit(CACHE_NEGATIVE, &h->flags);
226                                         cache_fresh_unlocked(h, detail,
227                                              cache_fresh_locked(h, get_seconds()+CACHE_NEW_EXPIRY));
228                                         rv = -ENOENT;
229                                 }
230                                 break;
231
232                         case -EAGAIN:
233                                 clear_bit(CACHE_PENDING, &h->flags);
234                                 cache_revisit_request(h);
235                                 break;
236                         }
237                 }
238         }
239
240         if (rv == -EAGAIN)
241                 if (cache_defer_req(rqstp, h) != 0)
242                         rv = -ETIMEDOUT;
243
244         if (rv)
245                 cache_put(h, detail);
246         return rv;
247 }
248 EXPORT_SYMBOL_GPL(cache_check);
249
250 /*
251  * caches need to be periodically cleaned.
252  * For this we maintain a list of cache_detail and
253  * a current pointer into that list and into the table
254  * for that entry.
255  *
256  * Each time clean_cache is called it finds the next non-empty entry
257  * in the current table and walks the list in that entry
258  * looking for entries that can be removed.
259  *
260  * An entry gets removed if:
261  * - The expiry is before current time
262  * - The last_refresh time is before the flush_time for that cache
263  *
264  * later we might drop old entries with non-NEVER expiry if that table
265  * is getting 'full' for some definition of 'full'
266  *
267  * The question of "how often to scan a table" is an interesting one
268  * and is answered in part by the use of the "nextcheck" field in the
269  * cache_detail.
270  * When a scan of a table begins, the nextcheck field is set to a time
271  * that is well into the future.
272  * While scanning, if an expiry time is found that is earlier than the
273  * current nextcheck time, nextcheck is set to that expiry time.
274  * If the flush_time is ever set to a time earlier than the nextcheck
275  * time, the nextcheck time is then set to that flush_time.
276  *
277  * A table is then only scanned if the current time is at least
278  * the nextcheck time.
279  *
280  */
281
282 static LIST_HEAD(cache_list);
283 static DEFINE_SPINLOCK(cache_list_lock);
284 static struct cache_detail *current_detail;
285 static int current_index;
286
287 static const struct file_operations cache_file_operations;
288 static const struct file_operations content_file_operations;
289 static const struct file_operations cache_flush_operations;
290
291 static void do_cache_clean(struct work_struct *work);
292 static DECLARE_DELAYED_WORK(cache_cleaner, do_cache_clean);
293
294 static void remove_cache_proc_entries(struct cache_detail *cd)
295 {
296         if (cd->proc_ent == NULL)
297                 return;
298         if (cd->flush_ent)
299                 remove_proc_entry("flush", cd->proc_ent);
300         if (cd->channel_ent)
301                 remove_proc_entry("channel", cd->proc_ent);
302         if (cd->content_ent)
303                 remove_proc_entry("content", cd->proc_ent);
304         cd->proc_ent = NULL;
305         remove_proc_entry(cd->name, proc_net_rpc);
306 }
307
308 #ifdef CONFIG_PROC_FS
309 static int create_cache_proc_entries(struct cache_detail *cd)
310 {
311         struct proc_dir_entry *p;
312
313         cd->proc_ent = proc_mkdir(cd->name, proc_net_rpc);
314         if (cd->proc_ent == NULL)
315                 goto out_nomem;
316         cd->channel_ent = cd->content_ent = NULL;
317
318         p = proc_create_data("flush", S_IFREG|S_IRUSR|S_IWUSR,
319                              cd->proc_ent, &cache_flush_operations, cd);
320         cd->flush_ent = p;
321         if (p == NULL)
322                 goto out_nomem;
323
324         if (cd->cache_request || cd->cache_parse) {
325                 p = proc_create_data("channel", S_IFREG|S_IRUSR|S_IWUSR,
326                                      cd->proc_ent, &cache_file_operations, cd);
327                 cd->channel_ent = p;
328                 if (p == NULL)
329                         goto out_nomem;
330         }
331         if (cd->cache_show) {
332                 p = proc_create_data("content", S_IFREG|S_IRUSR|S_IWUSR,
333                                 cd->proc_ent, &content_file_operations, cd);
334                 cd->content_ent = p;
335                 if (p == NULL)
336                         goto out_nomem;
337         }
338         return 0;
339 out_nomem:
340         remove_cache_proc_entries(cd);
341         return -ENOMEM;
342 }
343 #else /* CONFIG_PROC_FS */
344 static int create_cache_proc_entries(struct cache_detail *cd)
345 {
346         return 0;
347 }
348 #endif
349
350 int cache_register(struct cache_detail *cd)
351 {
352         int ret;
353
354         ret = create_cache_proc_entries(cd);
355         if (ret)
356                 return ret;
357         rwlock_init(&cd->hash_lock);
358         INIT_LIST_HEAD(&cd->queue);
359         spin_lock(&cache_list_lock);
360         cd->nextcheck = 0;
361         cd->entries = 0;
362         atomic_set(&cd->readers, 0);
363         cd->last_close = 0;
364         cd->last_warn = -1;
365         list_add(&cd->others, &cache_list);
366         spin_unlock(&cache_list_lock);
367
368         /* start the cleaning process */
369         schedule_delayed_work(&cache_cleaner, 0);
370         return 0;
371 }
372 EXPORT_SYMBOL_GPL(cache_register);
373
374 void cache_unregister(struct cache_detail *cd)
375 {
376         cache_purge(cd);
377         spin_lock(&cache_list_lock);
378         write_lock(&cd->hash_lock);
379         if (cd->entries || atomic_read(&cd->inuse)) {
380                 write_unlock(&cd->hash_lock);
381                 spin_unlock(&cache_list_lock);
382                 goto out;
383         }
384         if (current_detail == cd)
385                 current_detail = NULL;
386         list_del_init(&cd->others);
387         write_unlock(&cd->hash_lock);
388         spin_unlock(&cache_list_lock);
389         remove_cache_proc_entries(cd);
390         if (list_empty(&cache_list)) {
391                 /* module must be being unloaded so its safe to kill the worker */
392                 cancel_delayed_work_sync(&cache_cleaner);
393         }
394         return;
395 out:
396         printk(KERN_ERR "nfsd: failed to unregister %s cache\n", cd->name);
397 }
398 EXPORT_SYMBOL_GPL(cache_unregister);
399
400 /* clean cache tries to find something to clean
401  * and cleans it.
402  * It returns 1 if it cleaned something,
403  *            0 if it didn't find anything this time
404  *           -1 if it fell off the end of the list.
405  */
406 static int cache_clean(void)
407 {
408         int rv = 0;
409         struct list_head *next;
410
411         spin_lock(&cache_list_lock);
412
413         /* find a suitable table if we don't already have one */
414         while (current_detail == NULL ||
415             current_index >= current_detail->hash_size) {
416                 if (current_detail)
417                         next = current_detail->others.next;
418                 else
419                         next = cache_list.next;
420                 if (next == &cache_list) {
421                         current_detail = NULL;
422                         spin_unlock(&cache_list_lock);
423                         return -1;
424                 }
425                 current_detail = list_entry(next, struct cache_detail, others);
426                 if (current_detail->nextcheck > get_seconds())
427                         current_index = current_detail->hash_size;
428                 else {
429                         current_index = 0;
430                         current_detail->nextcheck = get_seconds()+30*60;
431                 }
432         }
433
434         /* find a non-empty bucket in the table */
435         while (current_detail &&
436                current_index < current_detail->hash_size &&
437                current_detail->hash_table[current_index] == NULL)
438                 current_index++;
439
440         /* find a cleanable entry in the bucket and clean it, or set to next bucket */
441
442         if (current_detail && current_index < current_detail->hash_size) {
443                 struct cache_head *ch, **cp;
444                 struct cache_detail *d;
445
446                 write_lock(&current_detail->hash_lock);
447
448                 /* Ok, now to clean this strand */
449
450                 cp = & current_detail->hash_table[current_index];
451                 ch = *cp;
452                 for (; ch; cp= & ch->next, ch= *cp) {
453                         if (current_detail->nextcheck > ch->expiry_time)
454                                 current_detail->nextcheck = ch->expiry_time+1;
455                         if (ch->expiry_time >= get_seconds()
456                             && ch->last_refresh >= current_detail->flush_time
457                                 )
458                                 continue;
459                         if (test_and_clear_bit(CACHE_PENDING, &ch->flags))
460                                 queue_loose(current_detail, ch);
461
462                         if (atomic_read(&ch->ref.refcount) == 1)
463                                 break;
464                 }
465                 if (ch) {
466                         *cp = ch->next;
467                         ch->next = NULL;
468                         current_detail->entries--;
469                         rv = 1;
470                 }
471                 write_unlock(&current_detail->hash_lock);
472                 d = current_detail;
473                 if (!ch)
474                         current_index ++;
475                 spin_unlock(&cache_list_lock);
476                 if (ch)
477                         cache_put(ch, d);
478         } else
479                 spin_unlock(&cache_list_lock);
480
481         return rv;
482 }
483
484 /*
485  * We want to regularly clean the cache, so we need to schedule some work ...
486  */
487 static void do_cache_clean(struct work_struct *work)
488 {
489         int delay = 5;
490         if (cache_clean() == -1)
491                 delay = 30*HZ;
492
493         if (list_empty(&cache_list))
494                 delay = 0;
495
496         if (delay)
497                 schedule_delayed_work(&cache_cleaner, delay);
498 }
499
500
501 /*
502  * Clean all caches promptly.  This just calls cache_clean
503  * repeatedly until we are sure that every cache has had a chance to
504  * be fully cleaned
505  */
506 void cache_flush(void)
507 {
508         while (cache_clean() != -1)
509                 cond_resched();
510         while (cache_clean() != -1)
511                 cond_resched();
512 }
513 EXPORT_SYMBOL_GPL(cache_flush);
514
515 void cache_purge(struct cache_detail *detail)
516 {
517         detail->flush_time = LONG_MAX;
518         detail->nextcheck = get_seconds();
519         cache_flush();
520         detail->flush_time = 1;
521 }
522 EXPORT_SYMBOL_GPL(cache_purge);
523
524
525 /*
526  * Deferral and Revisiting of Requests.
527  *
528  * If a cache lookup finds a pending entry, we
529  * need to defer the request and revisit it later.
530  * All deferred requests are stored in a hash table,
531  * indexed by "struct cache_head *".
532  * As it may be wasteful to store a whole request
533  * structure, we allow the request to provide a
534  * deferred form, which must contain a
535  * 'struct cache_deferred_req'
536  * This cache_deferred_req contains a method to allow
537  * it to be revisited when cache info is available
538  */
539
540 #define DFR_HASHSIZE    (PAGE_SIZE/sizeof(struct list_head))
541 #define DFR_HASH(item)  ((((long)item)>>4 ^ (((long)item)>>13)) % DFR_HASHSIZE)
542
543 #define DFR_MAX 300     /* ??? */
544
545 static DEFINE_SPINLOCK(cache_defer_lock);
546 static LIST_HEAD(cache_defer_list);
547 static struct list_head cache_defer_hash[DFR_HASHSIZE];
548 static int cache_defer_cnt;
549
550 static int cache_defer_req(struct cache_req *req, struct cache_head *item)
551 {
552         struct cache_deferred_req *dreq;
553         int hash = DFR_HASH(item);
554
555         if (cache_defer_cnt >= DFR_MAX) {
556                 /* too much in the cache, randomly drop this one,
557                  * or continue and drop the oldest below
558                  */
559                 if (net_random()&1)
560                         return -ETIMEDOUT;
561         }
562         dreq = req->defer(req);
563         if (dreq == NULL)
564                 return -ETIMEDOUT;
565
566         dreq->item = item;
567
568         spin_lock(&cache_defer_lock);
569
570         list_add(&dreq->recent, &cache_defer_list);
571
572         if (cache_defer_hash[hash].next == NULL)
573                 INIT_LIST_HEAD(&cache_defer_hash[hash]);
574         list_add(&dreq->hash, &cache_defer_hash[hash]);
575
576         /* it is in, now maybe clean up */
577         dreq = NULL;
578         if (++cache_defer_cnt > DFR_MAX) {
579                 dreq = list_entry(cache_defer_list.prev,
580                                   struct cache_deferred_req, recent);
581                 list_del(&dreq->recent);
582                 list_del(&dreq->hash);
583                 cache_defer_cnt--;
584         }
585         spin_unlock(&cache_defer_lock);
586
587         if (dreq) {
588                 /* there was one too many */
589                 dreq->revisit(dreq, 1);
590         }
591         if (!test_bit(CACHE_PENDING, &item->flags)) {
592                 /* must have just been validated... */
593                 cache_revisit_request(item);
594         }
595         return 0;
596 }
597
598 static void cache_revisit_request(struct cache_head *item)
599 {
600         struct cache_deferred_req *dreq;
601         struct list_head pending;
602
603         struct list_head *lp;
604         int hash = DFR_HASH(item);
605
606         INIT_LIST_HEAD(&pending);
607         spin_lock(&cache_defer_lock);
608
609         lp = cache_defer_hash[hash].next;
610         if (lp) {
611                 while (lp != &cache_defer_hash[hash]) {
612                         dreq = list_entry(lp, struct cache_deferred_req, hash);
613                         lp = lp->next;
614                         if (dreq->item == item) {
615                                 list_del(&dreq->hash);
616                                 list_move(&dreq->recent, &pending);
617                                 cache_defer_cnt--;
618                         }
619                 }
620         }
621         spin_unlock(&cache_defer_lock);
622
623         while (!list_empty(&pending)) {
624                 dreq = list_entry(pending.next, struct cache_deferred_req, recent);
625                 list_del_init(&dreq->recent);
626                 dreq->revisit(dreq, 0);
627         }
628 }
629
630 void cache_clean_deferred(void *owner)
631 {
632         struct cache_deferred_req *dreq, *tmp;
633         struct list_head pending;
634
635
636         INIT_LIST_HEAD(&pending);
637         spin_lock(&cache_defer_lock);
638
639         list_for_each_entry_safe(dreq, tmp, &cache_defer_list, recent) {
640                 if (dreq->owner == owner) {
641                         list_del(&dreq->hash);
642                         list_move(&dreq->recent, &pending);
643                         cache_defer_cnt--;
644                 }
645         }
646         spin_unlock(&cache_defer_lock);
647
648         while (!list_empty(&pending)) {
649                 dreq = list_entry(pending.next, struct cache_deferred_req, recent);
650                 list_del_init(&dreq->recent);
651                 dreq->revisit(dreq, 1);
652         }
653 }
654
655 /*
656  * communicate with user-space
657  *
658  * We have a magic /proc file - /proc/sunrpc/<cachename>/channel.
659  * On read, you get a full request, or block.
660  * On write, an update request is processed.
661  * Poll works if anything to read, and always allows write.
662  *
663  * Implemented by linked list of requests.  Each open file has
664  * a ->private that also exists in this list.  New requests are added
665  * to the end and may wakeup and preceding readers.
666  * New readers are added to the head.  If, on read, an item is found with
667  * CACHE_UPCALLING clear, we free it from the list.
668  *
669  */
670
671 static DEFINE_SPINLOCK(queue_lock);
672 static DEFINE_MUTEX(queue_io_mutex);
673
674 struct cache_queue {
675         struct list_head        list;
676         int                     reader; /* if 0, then request */
677 };
678 struct cache_request {
679         struct cache_queue      q;
680         struct cache_head       *item;
681         char                    * buf;
682         int                     len;
683         int                     readers;
684 };
685 struct cache_reader {
686         struct cache_queue      q;
687         int                     offset; /* if non-0, we have a refcnt on next request */
688 };
689
690 static ssize_t
691 cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
692 {
693         struct cache_reader *rp = filp->private_data;
694         struct cache_request *rq;
695         struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
696         int err;
697
698         if (count == 0)
699                 return 0;
700
701         mutex_lock(&queue_io_mutex); /* protect against multiple concurrent
702                               * readers on this file */
703  again:
704         spin_lock(&queue_lock);
705         /* need to find next request */
706         while (rp->q.list.next != &cd->queue &&
707                list_entry(rp->q.list.next, struct cache_queue, list)
708                ->reader) {
709                 struct list_head *next = rp->q.list.next;
710                 list_move(&rp->q.list, next);
711         }
712         if (rp->q.list.next == &cd->queue) {
713                 spin_unlock(&queue_lock);
714                 mutex_unlock(&queue_io_mutex);
715                 BUG_ON(rp->offset);
716                 return 0;
717         }
718         rq = container_of(rp->q.list.next, struct cache_request, q.list);
719         BUG_ON(rq->q.reader);
720         if (rp->offset == 0)
721                 rq->readers++;
722         spin_unlock(&queue_lock);
723
724         if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
725                 err = -EAGAIN;
726                 spin_lock(&queue_lock);
727                 list_move(&rp->q.list, &rq->q.list);
728                 spin_unlock(&queue_lock);
729         } else {
730                 if (rp->offset + count > rq->len)
731                         count = rq->len - rp->offset;
732                 err = -EFAULT;
733                 if (copy_to_user(buf, rq->buf + rp->offset, count))
734                         goto out;
735                 rp->offset += count;
736                 if (rp->offset >= rq->len) {
737                         rp->offset = 0;
738                         spin_lock(&queue_lock);
739                         list_move(&rp->q.list, &rq->q.list);
740                         spin_unlock(&queue_lock);
741                 }
742                 err = 0;
743         }
744  out:
745         if (rp->offset == 0) {
746                 /* need to release rq */
747                 spin_lock(&queue_lock);
748                 rq->readers--;
749                 if (rq->readers == 0 &&
750                     !test_bit(CACHE_PENDING, &rq->item->flags)) {
751                         list_del(&rq->q.list);
752                         spin_unlock(&queue_lock);
753                         cache_put(rq->item, cd);
754                         kfree(rq->buf);
755                         kfree(rq);
756                 } else
757                         spin_unlock(&queue_lock);
758         }
759         if (err == -EAGAIN)
760                 goto again;
761         mutex_unlock(&queue_io_mutex);
762         return err ? err :  count;
763 }
764
765 static char write_buf[8192]; /* protected by queue_io_mutex */
766
767 static ssize_t
768 cache_write(struct file *filp, const char __user *buf, size_t count,
769             loff_t *ppos)
770 {
771         int err;
772         struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
773
774         if (count == 0)
775                 return 0;
776         if (count >= sizeof(write_buf))
777                 return -EINVAL;
778
779         mutex_lock(&queue_io_mutex);
780
781         if (copy_from_user(write_buf, buf, count)) {
782                 mutex_unlock(&queue_io_mutex);
783                 return -EFAULT;
784         }
785         write_buf[count] = '\0';
786         if (cd->cache_parse)
787                 err = cd->cache_parse(cd, write_buf, count);
788         else
789                 err = -EINVAL;
790
791         mutex_unlock(&queue_io_mutex);
792         return err ? err : count;
793 }
794
795 static DECLARE_WAIT_QUEUE_HEAD(queue_wait);
796
797 static unsigned int
798 cache_poll(struct file *filp, poll_table *wait)
799 {
800         unsigned int mask;
801         struct cache_reader *rp = filp->private_data;
802         struct cache_queue *cq;
803         struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
804
805         poll_wait(filp, &queue_wait, wait);
806
807         /* alway allow write */
808         mask = POLL_OUT | POLLWRNORM;
809
810         if (!rp)
811                 return mask;
812
813         spin_lock(&queue_lock);
814
815         for (cq= &rp->q; &cq->list != &cd->queue;
816              cq = list_entry(cq->list.next, struct cache_queue, list))
817                 if (!cq->reader) {
818                         mask |= POLLIN | POLLRDNORM;
819                         break;
820                 }
821         spin_unlock(&queue_lock);
822         return mask;
823 }
824
825 static int
826 cache_ioctl(struct inode *ino, struct file *filp,
827             unsigned int cmd, unsigned long arg)
828 {
829         int len = 0;
830         struct cache_reader *rp = filp->private_data;
831         struct cache_queue *cq;
832         struct cache_detail *cd = PDE(ino)->data;
833
834         if (cmd != FIONREAD || !rp)
835                 return -EINVAL;
836
837         spin_lock(&queue_lock);
838
839         /* only find the length remaining in current request,
840          * or the length of the next request
841          */
842         for (cq= &rp->q; &cq->list != &cd->queue;
843              cq = list_entry(cq->list.next, struct cache_queue, list))
844                 if (!cq->reader) {
845                         struct cache_request *cr =
846                                 container_of(cq, struct cache_request, q);
847                         len = cr->len - rp->offset;
848                         break;
849                 }
850         spin_unlock(&queue_lock);
851
852         return put_user(len, (int __user *)arg);
853 }
854
855 static int
856 cache_open(struct inode *inode, struct file *filp)
857 {
858         struct cache_reader *rp = NULL;
859
860         nonseekable_open(inode, filp);
861         if (filp->f_mode & FMODE_READ) {
862                 struct cache_detail *cd = PDE(inode)->data;
863
864                 rp = kmalloc(sizeof(*rp), GFP_KERNEL);
865                 if (!rp)
866                         return -ENOMEM;
867                 rp->offset = 0;
868                 rp->q.reader = 1;
869                 atomic_inc(&cd->readers);
870                 spin_lock(&queue_lock);
871                 list_add(&rp->q.list, &cd->queue);
872                 spin_unlock(&queue_lock);
873         }
874         filp->private_data = rp;
875         return 0;
876 }
877
878 static int
879 cache_release(struct inode *inode, struct file *filp)
880 {
881         struct cache_reader *rp = filp->private_data;
882         struct cache_detail *cd = PDE(inode)->data;
883
884         if (rp) {
885                 spin_lock(&queue_lock);
886                 if (rp->offset) {
887                         struct cache_queue *cq;
888                         for (cq= &rp->q; &cq->list != &cd->queue;
889                              cq = list_entry(cq->list.next, struct cache_queue, list))
890                                 if (!cq->reader) {
891                                         container_of(cq, struct cache_request, q)
892                                                 ->readers--;
893                                         break;
894                                 }
895                         rp->offset = 0;
896                 }
897                 list_del(&rp->q.list);
898                 spin_unlock(&queue_lock);
899
900                 filp->private_data = NULL;
901                 kfree(rp);
902
903                 cd->last_close = get_seconds();
904                 atomic_dec(&cd->readers);
905         }
906         return 0;
907 }
908
909
910
911 static const struct file_operations cache_file_operations = {
912         .owner          = THIS_MODULE,
913         .llseek         = no_llseek,
914         .read           = cache_read,
915         .write          = cache_write,
916         .poll           = cache_poll,
917         .ioctl          = cache_ioctl, /* for FIONREAD */
918         .open           = cache_open,
919         .release        = cache_release,
920 };
921
922
923 static void queue_loose(struct cache_detail *detail, struct cache_head *ch)
924 {
925         struct cache_queue *cq;
926         spin_lock(&queue_lock);
927         list_for_each_entry(cq, &detail->queue, list)
928                 if (!cq->reader) {
929                         struct cache_request *cr = container_of(cq, struct cache_request, q);
930                         if (cr->item != ch)
931                                 continue;
932                         if (cr->readers != 0)
933                                 continue;
934                         list_del(&cr->q.list);
935                         spin_unlock(&queue_lock);
936                         cache_put(cr->item, detail);
937                         kfree(cr->buf);
938                         kfree(cr);
939                         return;
940                 }
941         spin_unlock(&queue_lock);
942 }
943
944 /*
945  * Support routines for text-based upcalls.
946  * Fields are separated by spaces.
947  * Fields are either mangled to quote space tab newline slosh with slosh
948  * or a hexified with a leading \x
949  * Record is terminated with newline.
950  *
951  */
952
953 void qword_add(char **bpp, int *lp, char *str)
954 {
955         char *bp = *bpp;
956         int len = *lp;
957         char c;
958
959         if (len < 0) return;
960
961         while ((c=*str++) && len)
962                 switch(c) {
963                 case ' ':
964                 case '\t':
965                 case '\n':
966                 case '\\':
967                         if (len >= 4) {
968                                 *bp++ = '\\';
969                                 *bp++ = '0' + ((c & 0300)>>6);
970                                 *bp++ = '0' + ((c & 0070)>>3);
971                                 *bp++ = '0' + ((c & 0007)>>0);
972                         }
973                         len -= 4;
974                         break;
975                 default:
976                         *bp++ = c;
977                         len--;
978                 }
979         if (c || len <1) len = -1;
980         else {
981                 *bp++ = ' ';
982                 len--;
983         }
984         *bpp = bp;
985         *lp = len;
986 }
987 EXPORT_SYMBOL_GPL(qword_add);
988
989 void qword_addhex(char **bpp, int *lp, char *buf, int blen)
990 {
991         char *bp = *bpp;
992         int len = *lp;
993
994         if (len < 0) return;
995
996         if (len > 2) {
997                 *bp++ = '\\';
998                 *bp++ = 'x';
999                 len -= 2;
1000                 while (blen && len >= 2) {
1001                         unsigned char c = *buf++;
1002                         *bp++ = '0' + ((c&0xf0)>>4) + (c>=0xa0)*('a'-'9'-1);
1003                         *bp++ = '0' + (c&0x0f) + ((c&0x0f)>=0x0a)*('a'-'9'-1);
1004                         len -= 2;
1005                         blen--;
1006                 }
1007         }
1008         if (blen || len<1) len = -1;
1009         else {
1010                 *bp++ = ' ';
1011                 len--;
1012         }
1013         *bpp = bp;
1014         *lp = len;
1015 }
1016 EXPORT_SYMBOL_GPL(qword_addhex);
1017
1018 static void warn_no_listener(struct cache_detail *detail)
1019 {
1020         if (detail->last_warn != detail->last_close) {
1021                 detail->last_warn = detail->last_close;
1022                 if (detail->warn_no_listener)
1023                         detail->warn_no_listener(detail);
1024         }
1025 }
1026
1027 /*
1028  * register an upcall request to user-space.
1029  * Each request is at most one page long.
1030  */
1031 static int cache_make_upcall(struct cache_detail *detail, struct cache_head *h)
1032 {
1033
1034         char *buf;
1035         struct cache_request *crq;
1036         char *bp;
1037         int len;
1038
1039         if (detail->cache_request == NULL)
1040                 return -EINVAL;
1041
1042         if (atomic_read(&detail->readers) == 0 &&
1043             detail->last_close < get_seconds() - 30) {
1044                         warn_no_listener(detail);
1045                         return -EINVAL;
1046         }
1047
1048         buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1049         if (!buf)
1050                 return -EAGAIN;
1051
1052         crq = kmalloc(sizeof (*crq), GFP_KERNEL);
1053         if (!crq) {
1054                 kfree(buf);
1055                 return -EAGAIN;
1056         }
1057
1058         bp = buf; len = PAGE_SIZE;
1059
1060         detail->cache_request(detail, h, &bp, &len);
1061
1062         if (len < 0) {
1063                 kfree(buf);
1064                 kfree(crq);
1065                 return -EAGAIN;
1066         }
1067         crq->q.reader = 0;
1068         crq->item = cache_get(h);
1069         crq->buf = buf;
1070         crq->len = PAGE_SIZE - len;
1071         crq->readers = 0;
1072         spin_lock(&queue_lock);
1073         list_add_tail(&crq->q.list, &detail->queue);
1074         spin_unlock(&queue_lock);
1075         wake_up(&queue_wait);
1076         return 0;
1077 }
1078
1079 /*
1080  * parse a message from user-space and pass it
1081  * to an appropriate cache
1082  * Messages are, like requests, separated into fields by
1083  * spaces and dequotes as \xHEXSTRING or embedded \nnn octal
1084  *
1085  * Message is
1086  *   reply cachename expiry key ... content....
1087  *
1088  * key and content are both parsed by cache
1089  */
1090
1091 #define isodigit(c) (isdigit(c) && c <= '7')
1092 int qword_get(char **bpp, char *dest, int bufsize)
1093 {
1094         /* return bytes copied, or -1 on error */
1095         char *bp = *bpp;
1096         int len = 0;
1097
1098         while (*bp == ' ') bp++;
1099
1100         if (bp[0] == '\\' && bp[1] == 'x') {
1101                 /* HEX STRING */
1102                 bp += 2;
1103                 while (isxdigit(bp[0]) && isxdigit(bp[1]) && len < bufsize) {
1104                         int byte = isdigit(*bp) ? *bp-'0' : toupper(*bp)-'A'+10;
1105                         bp++;
1106                         byte <<= 4;
1107                         byte |= isdigit(*bp) ? *bp-'0' : toupper(*bp)-'A'+10;
1108                         *dest++ = byte;
1109                         bp++;
1110                         len++;
1111                 }
1112         } else {
1113                 /* text with \nnn octal quoting */
1114                 while (*bp != ' ' && *bp != '\n' && *bp && len < bufsize-1) {
1115                         if (*bp == '\\' &&
1116                             isodigit(bp[1]) && (bp[1] <= '3') &&
1117                             isodigit(bp[2]) &&
1118                             isodigit(bp[3])) {
1119                                 int byte = (*++bp -'0');
1120                                 bp++;
1121                                 byte = (byte << 3) | (*bp++ - '0');
1122                                 byte = (byte << 3) | (*bp++ - '0');
1123                                 *dest++ = byte;
1124                                 len++;
1125                         } else {
1126                                 *dest++ = *bp++;
1127                                 len++;
1128                         }
1129                 }
1130         }
1131
1132         if (*bp != ' ' && *bp != '\n' && *bp != '\0')
1133                 return -1;
1134         while (*bp == ' ') bp++;
1135         *bpp = bp;
1136         *dest = '\0';
1137         return len;
1138 }
1139 EXPORT_SYMBOL_GPL(qword_get);
1140
1141
1142 /*
1143  * support /proc/sunrpc/cache/$CACHENAME/content
1144  * as a seqfile.
1145  * We call ->cache_show passing NULL for the item to
1146  * get a header, then pass each real item in the cache
1147  */
1148
1149 struct handle {
1150         struct cache_detail *cd;
1151 };
1152
1153 static void *c_start(struct seq_file *m, loff_t *pos)
1154         __acquires(cd->hash_lock)
1155 {
1156         loff_t n = *pos;
1157         unsigned hash, entry;
1158         struct cache_head *ch;
1159         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1160
1161
1162         read_lock(&cd->hash_lock);
1163         if (!n--)
1164                 return SEQ_START_TOKEN;
1165         hash = n >> 32;
1166         entry = n & ((1LL<<32) - 1);
1167
1168         for (ch=cd->hash_table[hash]; ch; ch=ch->next)
1169                 if (!entry--)
1170                         return ch;
1171         n &= ~((1LL<<32) - 1);
1172         do {
1173                 hash++;
1174                 n += 1LL<<32;
1175         } while(hash < cd->hash_size &&
1176                 cd->hash_table[hash]==NULL);
1177         if (hash >= cd->hash_size)
1178                 return NULL;
1179         *pos = n+1;
1180         return cd->hash_table[hash];
1181 }
1182
1183 static void *c_next(struct seq_file *m, void *p, loff_t *pos)
1184 {
1185         struct cache_head *ch = p;
1186         int hash = (*pos >> 32);
1187         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1188
1189         if (p == SEQ_START_TOKEN)
1190                 hash = 0;
1191         else if (ch->next == NULL) {
1192                 hash++;
1193                 *pos += 1LL<<32;
1194         } else {
1195                 ++*pos;
1196                 return ch->next;
1197         }
1198         *pos &= ~((1LL<<32) - 1);
1199         while (hash < cd->hash_size &&
1200                cd->hash_table[hash] == NULL) {
1201                 hash++;
1202                 *pos += 1LL<<32;
1203         }
1204         if (hash >= cd->hash_size)
1205                 return NULL;
1206         ++*pos;
1207         return cd->hash_table[hash];
1208 }
1209
1210 static void c_stop(struct seq_file *m, void *p)
1211         __releases(cd->hash_lock)
1212 {
1213         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1214         read_unlock(&cd->hash_lock);
1215 }
1216
1217 static int c_show(struct seq_file *m, void *p)
1218 {
1219         struct cache_head *cp = p;
1220         struct cache_detail *cd = ((struct handle*)m->private)->cd;
1221
1222         if (p == SEQ_START_TOKEN)
1223                 return cd->cache_show(m, cd, NULL);
1224
1225         ifdebug(CACHE)
1226                 seq_printf(m, "# expiry=%ld refcnt=%d flags=%lx\n",
1227                            cp->expiry_time, atomic_read(&cp->ref.refcount), cp->flags);
1228         cache_get(cp);
1229         if (cache_check(cd, cp, NULL))
1230                 /* cache_check does a cache_put on failure */
1231                 seq_printf(m, "# ");
1232         else
1233                 cache_put(cp, cd);
1234
1235         return cd->cache_show(m, cd, cp);
1236 }
1237
1238 static const struct seq_operations cache_content_op = {
1239         .start  = c_start,
1240         .next   = c_next,
1241         .stop   = c_stop,
1242         .show   = c_show,
1243 };
1244
1245 static int content_open(struct inode *inode, struct file *file)
1246 {
1247         struct handle *han;
1248         struct cache_detail *cd = PDE(inode)->data;
1249
1250         han = __seq_open_private(file, &cache_content_op, sizeof(*han));
1251         if (han == NULL)
1252                 return -ENOMEM;
1253
1254         han->cd = cd;
1255         return 0;
1256 }
1257
1258 static const struct file_operations content_file_operations = {
1259         .open           = content_open,
1260         .read           = seq_read,
1261         .llseek         = seq_lseek,
1262         .release        = seq_release_private,
1263 };
1264
1265 static ssize_t read_flush(struct file *file, char __user *buf,
1266                             size_t count, loff_t *ppos)
1267 {
1268         struct cache_detail *cd = PDE(file->f_path.dentry->d_inode)->data;
1269         char tbuf[20];
1270         unsigned long p = *ppos;
1271         size_t len;
1272
1273         sprintf(tbuf, "%lu\n", cd->flush_time);
1274         len = strlen(tbuf);
1275         if (p >= len)
1276                 return 0;
1277         len -= p;
1278         if (len > count)
1279                 len = count;
1280         if (copy_to_user(buf, (void*)(tbuf+p), len))
1281                 return -EFAULT;
1282         *ppos += len;
1283         return len;
1284 }
1285
1286 static ssize_t write_flush(struct file * file, const char __user * buf,
1287                              size_t count, loff_t *ppos)
1288 {
1289         struct cache_detail *cd = PDE(file->f_path.dentry->d_inode)->data;
1290         char tbuf[20];
1291         char *ep;
1292         long flushtime;
1293         if (*ppos || count > sizeof(tbuf)-1)
1294                 return -EINVAL;
1295         if (copy_from_user(tbuf, buf, count))
1296                 return -EFAULT;
1297         tbuf[count] = 0;
1298         flushtime = simple_strtoul(tbuf, &ep, 0);
1299         if (*ep && *ep != '\n')
1300                 return -EINVAL;
1301
1302         cd->flush_time = flushtime;
1303         cd->nextcheck = get_seconds();
1304         cache_flush();
1305
1306         *ppos += count;
1307         return count;
1308 }
1309
1310 static const struct file_operations cache_flush_operations = {
1311         .open           = nonseekable_open,
1312         .read           = read_flush,
1313         .write          = write_flush,
1314 };