[PATCH] knfsd: split svc_serv into pools
[linux-2.6] / net / sunrpc / svc.c
1 /*
2  * linux/net/sunrpc/svc.c
3  *
4  * High-level RPC service routines
5  *
6  * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
7  */
8
9 #include <linux/linkage.h>
10 #include <linux/sched.h>
11 #include <linux/errno.h>
12 #include <linux/net.h>
13 #include <linux/in.h>
14 #include <linux/mm.h>
15
16 #include <linux/sunrpc/types.h>
17 #include <linux/sunrpc/xdr.h>
18 #include <linux/sunrpc/stats.h>
19 #include <linux/sunrpc/svcsock.h>
20 #include <linux/sunrpc/clnt.h>
21
22 #define RPCDBG_FACILITY RPCDBG_SVCDSP
23 #define RPC_PARANOIA 1
24
25 /*
26  * Create an RPC service
27  */
28 struct svc_serv *
29 svc_create(struct svc_program *prog, unsigned int bufsize,
30            void (*shutdown)(struct svc_serv *serv))
31 {
32         struct svc_serv *serv;
33         int vers;
34         unsigned int xdrsize;
35         unsigned int i;
36
37         if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
38                 return NULL;
39         serv->sv_name      = prog->pg_name;
40         serv->sv_program   = prog;
41         serv->sv_nrthreads = 1;
42         serv->sv_stats     = prog->pg_stats;
43         serv->sv_bufsz     = bufsize? bufsize : 4096;
44         serv->sv_shutdown  = shutdown;
45         xdrsize = 0;
46         while (prog) {
47                 prog->pg_lovers = prog->pg_nvers-1;
48                 for (vers=0; vers<prog->pg_nvers ; vers++)
49                         if (prog->pg_vers[vers]) {
50                                 prog->pg_hivers = vers;
51                                 if (prog->pg_lovers > vers)
52                                         prog->pg_lovers = vers;
53                                 if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
54                                         xdrsize = prog->pg_vers[vers]->vs_xdrsize;
55                         }
56                 prog = prog->pg_next;
57         }
58         serv->sv_xdrsize   = xdrsize;
59         INIT_LIST_HEAD(&serv->sv_tempsocks);
60         INIT_LIST_HEAD(&serv->sv_permsocks);
61         init_timer(&serv->sv_temptimer);
62         spin_lock_init(&serv->sv_lock);
63
64         serv->sv_nrpools = 1;
65         serv->sv_pools =
66                 kcalloc(sizeof(struct svc_pool), serv->sv_nrpools,
67                         GFP_KERNEL);
68         if (!serv->sv_pools) {
69                 kfree(serv);
70                 return NULL;
71         }
72
73         for (i = 0; i < serv->sv_nrpools; i++) {
74                 struct svc_pool *pool = &serv->sv_pools[i];
75
76                 dprintk("initialising pool %u for %s\n",
77                                 i, serv->sv_name);
78
79                 pool->sp_id = i;
80                 INIT_LIST_HEAD(&pool->sp_threads);
81                 INIT_LIST_HEAD(&pool->sp_sockets);
82                 spin_lock_init(&pool->sp_lock);
83         }
84
85
86         /* Remove any stale portmap registrations */
87         svc_register(serv, 0, 0);
88
89         return serv;
90 }
91
92 /*
93  * Destroy an RPC service.  Should be called with the BKL held
94  */
95 void
96 svc_destroy(struct svc_serv *serv)
97 {
98         struct svc_sock *svsk;
99
100         dprintk("RPC: svc_destroy(%s, %d)\n",
101                                 serv->sv_program->pg_name,
102                                 serv->sv_nrthreads);
103
104         if (serv->sv_nrthreads) {
105                 if (--(serv->sv_nrthreads) != 0) {
106                         svc_sock_update_bufs(serv);
107                         return;
108                 }
109         } else
110                 printk("svc_destroy: no threads for serv=%p!\n", serv);
111
112         del_timer_sync(&serv->sv_temptimer);
113
114         while (!list_empty(&serv->sv_tempsocks)) {
115                 svsk = list_entry(serv->sv_tempsocks.next,
116                                   struct svc_sock,
117                                   sk_list);
118                 svc_delete_socket(svsk);
119         }
120         if (serv->sv_shutdown)
121                 serv->sv_shutdown(serv);
122
123         while (!list_empty(&serv->sv_permsocks)) {
124                 svsk = list_entry(serv->sv_permsocks.next,
125                                   struct svc_sock,
126                                   sk_list);
127                 svc_delete_socket(svsk);
128         }
129         
130         cache_clean_deferred(serv);
131
132         /* Unregister service with the portmapper */
133         svc_register(serv, 0, 0);
134         kfree(serv->sv_pools);
135         kfree(serv);
136 }
137
138 /*
139  * Allocate an RPC server's buffer space.
140  * We allocate pages and place them in rq_argpages.
141  */
142 static int
143 svc_init_buffer(struct svc_rqst *rqstp, unsigned int size)
144 {
145         int pages;
146         int arghi;
147         
148         if (size > RPCSVC_MAXPAYLOAD)
149                 size = RPCSVC_MAXPAYLOAD;
150         pages = 2 + (size+ PAGE_SIZE -1) / PAGE_SIZE;
151         rqstp->rq_argused = 0;
152         rqstp->rq_resused = 0;
153         arghi = 0;
154         BUG_ON(pages > RPCSVC_MAXPAGES);
155         while (pages) {
156                 struct page *p = alloc_page(GFP_KERNEL);
157                 if (!p)
158                         break;
159                 rqstp->rq_argpages[arghi++] = p;
160                 pages--;
161         }
162         rqstp->rq_arghi = arghi;
163         return ! pages;
164 }
165
166 /*
167  * Release an RPC server buffer
168  */
169 static void
170 svc_release_buffer(struct svc_rqst *rqstp)
171 {
172         while (rqstp->rq_arghi)
173                 put_page(rqstp->rq_argpages[--rqstp->rq_arghi]);
174         while (rqstp->rq_resused) {
175                 if (rqstp->rq_respages[--rqstp->rq_resused] == NULL)
176                         continue;
177                 put_page(rqstp->rq_respages[rqstp->rq_resused]);
178         }
179         rqstp->rq_argused = 0;
180 }
181
182 /*
183  * Create a thread in the given pool.  Caller must hold BKL.
184  */
185 static int
186 __svc_create_thread(svc_thread_fn func, struct svc_serv *serv,
187                     struct svc_pool *pool)
188 {
189         struct svc_rqst *rqstp;
190         int             error = -ENOMEM;
191
192         rqstp = kzalloc(sizeof(*rqstp), GFP_KERNEL);
193         if (!rqstp)
194                 goto out;
195
196         init_waitqueue_head(&rqstp->rq_wait);
197
198         if (!(rqstp->rq_argp = kmalloc(serv->sv_xdrsize, GFP_KERNEL))
199          || !(rqstp->rq_resp = kmalloc(serv->sv_xdrsize, GFP_KERNEL))
200          || !svc_init_buffer(rqstp, serv->sv_bufsz))
201                 goto out_thread;
202
203         serv->sv_nrthreads++;
204         spin_lock_bh(&pool->sp_lock);
205         pool->sp_nrthreads++;
206         spin_unlock_bh(&pool->sp_lock);
207         rqstp->rq_server = serv;
208         rqstp->rq_pool = pool;
209         error = kernel_thread((int (*)(void *)) func, rqstp, 0);
210         if (error < 0)
211                 goto out_thread;
212         svc_sock_update_bufs(serv);
213         error = 0;
214 out:
215         return error;
216
217 out_thread:
218         svc_exit_thread(rqstp);
219         goto out;
220 }
221
222 /*
223  * Create a thread in the default pool.  Caller must hold BKL.
224  */
225 int
226 svc_create_thread(svc_thread_fn func, struct svc_serv *serv)
227 {
228         return __svc_create_thread(func, serv, &serv->sv_pools[0]);
229 }
230
231 /*
232  * Called from a server thread as it's exiting.  Caller must hold BKL.
233  */
234 void
235 svc_exit_thread(struct svc_rqst *rqstp)
236 {
237         struct svc_serv *serv = rqstp->rq_server;
238         struct svc_pool *pool = rqstp->rq_pool;
239
240         svc_release_buffer(rqstp);
241         kfree(rqstp->rq_resp);
242         kfree(rqstp->rq_argp);
243         kfree(rqstp->rq_auth_data);
244
245         spin_lock_bh(&pool->sp_lock);
246         pool->sp_nrthreads--;
247         spin_unlock_bh(&pool->sp_lock);
248
249         kfree(rqstp);
250
251         /* Release the server */
252         if (serv)
253                 svc_destroy(serv);
254 }
255
256 /*
257  * Register an RPC service with the local portmapper.
258  * To unregister a service, call this routine with 
259  * proto and port == 0.
260  */
261 int
262 svc_register(struct svc_serv *serv, int proto, unsigned short port)
263 {
264         struct svc_program      *progp;
265         unsigned long           flags;
266         int                     i, error = 0, dummy;
267
268         progp = serv->sv_program;
269
270         dprintk("RPC: svc_register(%s, %s, %d)\n",
271                 progp->pg_name, proto == IPPROTO_UDP? "udp" : "tcp", port);
272
273         if (!port)
274                 clear_thread_flag(TIF_SIGPENDING);
275
276         for (i = 0; i < progp->pg_nvers; i++) {
277                 if (progp->pg_vers[i] == NULL)
278                         continue;
279                 error = rpc_register(progp->pg_prog, i, proto, port, &dummy);
280                 if (error < 0)
281                         break;
282                 if (port && !dummy) {
283                         error = -EACCES;
284                         break;
285                 }
286         }
287
288         if (!port) {
289                 spin_lock_irqsave(&current->sighand->siglock, flags);
290                 recalc_sigpending();
291                 spin_unlock_irqrestore(&current->sighand->siglock, flags);
292         }
293
294         return error;
295 }
296
297 /*
298  * Process the RPC request.
299  */
300 int
301 svc_process(struct svc_rqst *rqstp)
302 {
303         struct svc_program      *progp;
304         struct svc_version      *versp = NULL;  /* compiler food */
305         struct svc_procedure    *procp = NULL;
306         struct kvec *           argv = &rqstp->rq_arg.head[0];
307         struct kvec *           resv = &rqstp->rq_res.head[0];
308         struct svc_serv         *serv = rqstp->rq_server;
309         kxdrproc_t              xdr;
310         __be32                  *statp;
311         u32                     dir, prog, vers, proc;
312         __be32                  auth_stat, rpc_stat;
313         int                     auth_res;
314         __be32                  *accept_statp;
315
316         rpc_stat = rpc_success;
317
318         if (argv->iov_len < 6*4)
319                 goto err_short_len;
320
321         /* setup response xdr_buf.
322          * Initially it has just one page 
323          */
324         svc_take_page(rqstp); /* must succeed */
325         resv->iov_base = page_address(rqstp->rq_respages[0]);
326         resv->iov_len = 0;
327         rqstp->rq_res.pages = rqstp->rq_respages+1;
328         rqstp->rq_res.len = 0;
329         rqstp->rq_res.page_base = 0;
330         rqstp->rq_res.page_len = 0;
331         rqstp->rq_res.buflen = PAGE_SIZE;
332         rqstp->rq_res.tail[0].iov_base = NULL;
333         rqstp->rq_res.tail[0].iov_len = 0;
334         /* Will be turned off only in gss privacy case: */
335         rqstp->rq_sendfile_ok = 1;
336         /* tcp needs a space for the record length... */
337         if (rqstp->rq_prot == IPPROTO_TCP)
338                 svc_putnl(resv, 0);
339
340         rqstp->rq_xid = svc_getu32(argv);
341         svc_putu32(resv, rqstp->rq_xid);
342
343         dir  = svc_getnl(argv);
344         vers = svc_getnl(argv);
345
346         /* First words of reply: */
347         svc_putnl(resv, 1);             /* REPLY */
348
349         if (dir != 0)           /* direction != CALL */
350                 goto err_bad_dir;
351         if (vers != 2)          /* RPC version number */
352                 goto err_bad_rpc;
353
354         /* Save position in case we later decide to reject: */
355         accept_statp = resv->iov_base + resv->iov_len;
356
357         svc_putnl(resv, 0);             /* ACCEPT */
358
359         rqstp->rq_prog = prog = svc_getnl(argv);        /* program number */
360         rqstp->rq_vers = vers = svc_getnl(argv);        /* version number */
361         rqstp->rq_proc = proc = svc_getnl(argv);        /* procedure number */
362
363         progp = serv->sv_program;
364
365         for (progp = serv->sv_program; progp; progp = progp->pg_next)
366                 if (prog == progp->pg_prog)
367                         break;
368
369         /*
370          * Decode auth data, and add verifier to reply buffer.
371          * We do this before anything else in order to get a decent
372          * auth verifier.
373          */
374         auth_res = svc_authenticate(rqstp, &auth_stat);
375         /* Also give the program a chance to reject this call: */
376         if (auth_res == SVC_OK && progp) {
377                 auth_stat = rpc_autherr_badcred;
378                 auth_res = progp->pg_authenticate(rqstp);
379         }
380         switch (auth_res) {
381         case SVC_OK:
382                 break;
383         case SVC_GARBAGE:
384                 rpc_stat = rpc_garbage_args;
385                 goto err_bad;
386         case SVC_SYSERR:
387                 rpc_stat = rpc_system_err;
388                 goto err_bad;
389         case SVC_DENIED:
390                 goto err_bad_auth;
391         case SVC_DROP:
392                 goto dropit;
393         case SVC_COMPLETE:
394                 goto sendit;
395         }
396
397         if (progp == NULL)
398                 goto err_bad_prog;
399
400         if (vers >= progp->pg_nvers ||
401           !(versp = progp->pg_vers[vers]))
402                 goto err_bad_vers;
403
404         procp = versp->vs_proc + proc;
405         if (proc >= versp->vs_nproc || !procp->pc_func)
406                 goto err_bad_proc;
407         rqstp->rq_server   = serv;
408         rqstp->rq_procinfo = procp;
409
410         /* Syntactic check complete */
411         serv->sv_stats->rpccnt++;
412
413         /* Build the reply header. */
414         statp = resv->iov_base +resv->iov_len;
415         svc_putnl(resv, RPC_SUCCESS);
416
417         /* Bump per-procedure stats counter */
418         procp->pc_count++;
419
420         /* Initialize storage for argp and resp */
421         memset(rqstp->rq_argp, 0, procp->pc_argsize);
422         memset(rqstp->rq_resp, 0, procp->pc_ressize);
423
424         /* un-reserve some of the out-queue now that we have a 
425          * better idea of reply size
426          */
427         if (procp->pc_xdrressize)
428                 svc_reserve(rqstp, procp->pc_xdrressize<<2);
429
430         /* Call the function that processes the request. */
431         if (!versp->vs_dispatch) {
432                 /* Decode arguments */
433                 xdr = procp->pc_decode;
434                 if (xdr && !xdr(rqstp, argv->iov_base, rqstp->rq_argp))
435                         goto err_garbage;
436
437                 *statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
438
439                 /* Encode reply */
440                 if (*statp == rpc_success && (xdr = procp->pc_encode)
441                  && !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
442                         dprintk("svc: failed to encode reply\n");
443                         /* serv->sv_stats->rpcsystemerr++; */
444                         *statp = rpc_system_err;
445                 }
446         } else {
447                 dprintk("svc: calling dispatcher\n");
448                 if (!versp->vs_dispatch(rqstp, statp)) {
449                         /* Release reply info */
450                         if (procp->pc_release)
451                                 procp->pc_release(rqstp, NULL, rqstp->rq_resp);
452                         goto dropit;
453                 }
454         }
455
456         /* Check RPC status result */
457         if (*statp != rpc_success)
458                 resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
459
460         /* Release reply info */
461         if (procp->pc_release)
462                 procp->pc_release(rqstp, NULL, rqstp->rq_resp);
463
464         if (procp->pc_encode == NULL)
465                 goto dropit;
466
467  sendit:
468         if (svc_authorise(rqstp))
469                 goto dropit;
470         return svc_send(rqstp);
471
472  dropit:
473         svc_authorise(rqstp);   /* doesn't hurt to call this twice */
474         dprintk("svc: svc_process dropit\n");
475         svc_drop(rqstp);
476         return 0;
477
478 err_short_len:
479 #ifdef RPC_PARANOIA
480         printk("svc: short len %Zd, dropping request\n", argv->iov_len);
481 #endif
482         goto dropit;                    /* drop request */
483
484 err_bad_dir:
485 #ifdef RPC_PARANOIA
486         printk("svc: bad direction %d, dropping request\n", dir);
487 #endif
488         serv->sv_stats->rpcbadfmt++;
489         goto dropit;                    /* drop request */
490
491 err_bad_rpc:
492         serv->sv_stats->rpcbadfmt++;
493         svc_putnl(resv, 1);     /* REJECT */
494         svc_putnl(resv, 0);     /* RPC_MISMATCH */
495         svc_putnl(resv, 2);     /* Only RPCv2 supported */
496         svc_putnl(resv, 2);
497         goto sendit;
498
499 err_bad_auth:
500         dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat));
501         serv->sv_stats->rpcbadauth++;
502         /* Restore write pointer to location of accept status: */
503         xdr_ressize_check(rqstp, accept_statp);
504         svc_putnl(resv, 1);     /* REJECT */
505         svc_putnl(resv, 1);     /* AUTH_ERROR */
506         svc_putnl(resv, ntohl(auth_stat));      /* status */
507         goto sendit;
508
509 err_bad_prog:
510         dprintk("svc: unknown program %d\n", prog);
511         serv->sv_stats->rpcbadfmt++;
512         svc_putnl(resv, RPC_PROG_UNAVAIL);
513         goto sendit;
514
515 err_bad_vers:
516 #ifdef RPC_PARANOIA
517         printk("svc: unknown version (%d)\n", vers);
518 #endif
519         serv->sv_stats->rpcbadfmt++;
520         svc_putnl(resv, RPC_PROG_MISMATCH);
521         svc_putnl(resv, progp->pg_lovers);
522         svc_putnl(resv, progp->pg_hivers);
523         goto sendit;
524
525 err_bad_proc:
526 #ifdef RPC_PARANOIA
527         printk("svc: unknown procedure (%d)\n", proc);
528 #endif
529         serv->sv_stats->rpcbadfmt++;
530         svc_putnl(resv, RPC_PROC_UNAVAIL);
531         goto sendit;
532
533 err_garbage:
534 #ifdef RPC_PARANOIA
535         printk("svc: failed to decode args\n");
536 #endif
537         rpc_stat = rpc_garbage_args;
538 err_bad:
539         serv->sv_stats->rpcbadfmt++;
540         svc_putnl(resv, ntohl(rpc_stat));
541         goto sendit;
542 }