SUNRPC: Give cloned RPC clients their own rpc_pipefs directory
[linux-2.6] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23
24 #include <asm/system.h>
25
26 #include <linux/module.h>
27 #include <linux/types.h>
28 #include <linux/mm.h>
29 #include <linux/slab.h>
30 #include <linux/utsname.h>
31 #include <linux/workqueue.h>
32
33 #include <linux/sunrpc/clnt.h>
34 #include <linux/sunrpc/rpc_pipe_fs.h>
35 #include <linux/sunrpc/metrics.h>
36
37
38 #define RPC_SLACK_SPACE         (1024)  /* total overkill */
39
40 #ifdef RPC_DEBUG
41 # define RPCDBG_FACILITY        RPCDBG_CALL
42 #endif
43
44 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
45
46
47 static void     call_start(struct rpc_task *task);
48 static void     call_reserve(struct rpc_task *task);
49 static void     call_reserveresult(struct rpc_task *task);
50 static void     call_allocate(struct rpc_task *task);
51 static void     call_encode(struct rpc_task *task);
52 static void     call_decode(struct rpc_task *task);
53 static void     call_bind(struct rpc_task *task);
54 static void     call_bind_status(struct rpc_task *task);
55 static void     call_transmit(struct rpc_task *task);
56 static void     call_status(struct rpc_task *task);
57 static void     call_transmit_status(struct rpc_task *task);
58 static void     call_refresh(struct rpc_task *task);
59 static void     call_refreshresult(struct rpc_task *task);
60 static void     call_timeout(struct rpc_task *task);
61 static void     call_connect(struct rpc_task *task);
62 static void     call_connect_status(struct rpc_task *task);
63 static __be32 * call_header(struct rpc_task *task);
64 static __be32 * call_verify(struct rpc_task *task);
65
66
67 static int
68 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
69 {
70         static uint32_t clntid;
71         int error;
72
73         clnt->cl_vfsmnt = ERR_PTR(-ENOENT);
74         clnt->cl_dentry = ERR_PTR(-ENOENT);
75         if (dir_name == NULL)
76                 return 0;
77
78         clnt->cl_vfsmnt = rpc_get_mount();
79         if (IS_ERR(clnt->cl_vfsmnt))
80                 return PTR_ERR(clnt->cl_vfsmnt);
81
82         for (;;) {
83                 snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
84                                 "%s/clnt%x", dir_name,
85                                 (unsigned int)clntid++);
86                 clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
87                 clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
88                 if (!IS_ERR(clnt->cl_dentry))
89                         return 0;
90                 error = PTR_ERR(clnt->cl_dentry);
91                 if (error != -EEXIST) {
92                         printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
93                                         clnt->cl_pathname, error);
94                         rpc_put_mount();
95                         return error;
96                 }
97         }
98 }
99
100 static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, struct rpc_program *program, u32 vers, rpc_authflavor_t flavor)
101 {
102         struct rpc_version      *version;
103         struct rpc_clnt         *clnt = NULL;
104         struct rpc_auth         *auth;
105         int err;
106         int len;
107
108         dprintk("RPC: creating %s client for %s (xprt %p)\n",
109                 program->name, servname, xprt);
110
111         err = -EINVAL;
112         if (!xprt)
113                 goto out_no_xprt;
114         if (vers >= program->nrvers || !(version = program->version[vers]))
115                 goto out_err;
116
117         err = -ENOMEM;
118         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
119         if (!clnt)
120                 goto out_err;
121         atomic_set(&clnt->cl_users, 0);
122         atomic_set(&clnt->cl_count, 1);
123         clnt->cl_parent = clnt;
124
125         clnt->cl_server = clnt->cl_inline_name;
126         len = strlen(servname) + 1;
127         if (len > sizeof(clnt->cl_inline_name)) {
128                 char *buf = kmalloc(len, GFP_KERNEL);
129                 if (buf != 0)
130                         clnt->cl_server = buf;
131                 else
132                         len = sizeof(clnt->cl_inline_name);
133         }
134         strlcpy(clnt->cl_server, servname, len);
135
136         clnt->cl_xprt     = xprt;
137         clnt->cl_procinfo = version->procs;
138         clnt->cl_maxproc  = version->nrprocs;
139         clnt->cl_protname = program->name;
140         clnt->cl_prog     = program->number;
141         clnt->cl_vers     = version->number;
142         clnt->cl_stats    = program->stats;
143         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
144         err = -ENOMEM;
145         if (clnt->cl_metrics == NULL)
146                 goto out_no_stats;
147         clnt->cl_program  = program;
148
149         if (!xprt_bound(clnt->cl_xprt))
150                 clnt->cl_autobind = 1;
151
152         clnt->cl_rtt = &clnt->cl_rtt_default;
153         rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
154
155         err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
156         if (err < 0)
157                 goto out_no_path;
158
159         auth = rpcauth_create(flavor, clnt);
160         if (IS_ERR(auth)) {
161                 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
162                                 flavor);
163                 err = PTR_ERR(auth);
164                 goto out_no_auth;
165         }
166
167         /* save the nodename */
168         clnt->cl_nodelen = strlen(utsname()->nodename);
169         if (clnt->cl_nodelen > UNX_MAXNODENAME)
170                 clnt->cl_nodelen = UNX_MAXNODENAME;
171         memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen);
172         return clnt;
173
174 out_no_auth:
175         if (!IS_ERR(clnt->cl_dentry)) {
176                 rpc_rmdir(clnt->cl_dentry);
177                 rpc_put_mount();
178         }
179 out_no_path:
180         rpc_free_iostats(clnt->cl_metrics);
181 out_no_stats:
182         if (clnt->cl_server != clnt->cl_inline_name)
183                 kfree(clnt->cl_server);
184         kfree(clnt);
185 out_err:
186         xprt_put(xprt);
187 out_no_xprt:
188         return ERR_PTR(err);
189 }
190
191 /*
192  * rpc_create - create an RPC client and transport with one call
193  * @args: rpc_clnt create argument structure
194  *
195  * Creates and initializes an RPC transport and an RPC client.
196  *
197  * It can ping the server in order to determine if it is up, and to see if
198  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
199  * this behavior so asynchronous tasks can also use rpc_create.
200  */
201 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
202 {
203         struct rpc_xprt *xprt;
204         struct rpc_clnt *clnt;
205
206         xprt = xprt_create_transport(args->protocol, args->address,
207                                         args->addrsize, args->timeout);
208         if (IS_ERR(xprt))
209                 return (struct rpc_clnt *)xprt;
210
211         /*
212          * By default, kernel RPC client connects from a reserved port.
213          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
214          * but it is always enabled for rpciod, which handles the connect
215          * operation.
216          */
217         xprt->resvport = 1;
218         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
219                 xprt->resvport = 0;
220
221         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
222                 args->program->name, args->servername, xprt);
223
224         clnt = rpc_new_client(xprt, args->servername, args->program,
225                                 args->version, args->authflavor);
226         if (IS_ERR(clnt))
227                 return clnt;
228
229         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
230                 int err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
231                 if (err != 0) {
232                         rpc_shutdown_client(clnt);
233                         return ERR_PTR(err);
234                 }
235         }
236
237         clnt->cl_softrtry = 1;
238         if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
239                 clnt->cl_softrtry = 0;
240
241         if (args->flags & RPC_CLNT_CREATE_INTR)
242                 clnt->cl_intr = 1;
243         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
244                 clnt->cl_autobind = 1;
245         if (args->flags & RPC_CLNT_CREATE_ONESHOT)
246                 clnt->cl_oneshot = 1;
247
248         return clnt;
249 }
250 EXPORT_SYMBOL_GPL(rpc_create);
251
252 /*
253  * This function clones the RPC client structure. It allows us to share the
254  * same transport while varying parameters such as the authentication
255  * flavour.
256  */
257 struct rpc_clnt *
258 rpc_clone_client(struct rpc_clnt *clnt)
259 {
260         struct rpc_clnt *new;
261         int err = -ENOMEM;
262
263         new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
264         if (!new)
265                 goto out_no_clnt;
266         atomic_set(&new->cl_count, 1);
267         atomic_set(&new->cl_users, 0);
268         new->cl_metrics = rpc_alloc_iostats(clnt);
269         if (new->cl_metrics == NULL)
270                 goto out_no_stats;
271         err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
272         if (err != 0)
273                 goto out_no_path;
274         new->cl_parent = clnt;
275         atomic_inc(&clnt->cl_count);
276         new->cl_xprt = xprt_get(clnt->cl_xprt);
277         /* Turn off autobind on clones */
278         new->cl_autobind = 0;
279         new->cl_oneshot = 0;
280         new->cl_dead = 0;
281         rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
282         if (new->cl_auth)
283                 atomic_inc(&new->cl_auth->au_count);
284         return new;
285 out_no_path:
286         rpc_free_iostats(new->cl_metrics);
287 out_no_stats:
288         kfree(new);
289 out_no_clnt:
290         dprintk("RPC: %s returned error %d\n", __FUNCTION__, err);
291         return ERR_PTR(err);
292 }
293
294 /*
295  * Properly shut down an RPC client, terminating all outstanding
296  * requests. Note that we must be certain that cl_oneshot and
297  * cl_dead are cleared, or else the client would be destroyed
298  * when the last task releases it.
299  */
300 int
301 rpc_shutdown_client(struct rpc_clnt *clnt)
302 {
303         dprintk("RPC: shutting down %s client for %s, tasks=%d\n",
304                         clnt->cl_protname, clnt->cl_server,
305                         atomic_read(&clnt->cl_users));
306
307         while (atomic_read(&clnt->cl_users) > 0) {
308                 /* Don't let rpc_release_client destroy us */
309                 clnt->cl_oneshot = 0;
310                 clnt->cl_dead = 0;
311                 rpc_killall_tasks(clnt);
312                 wait_event_timeout(destroy_wait,
313                         !atomic_read(&clnt->cl_users), 1*HZ);
314         }
315
316         if (atomic_read(&clnt->cl_users) < 0) {
317                 printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n",
318                                 clnt, atomic_read(&clnt->cl_users));
319 #ifdef RPC_DEBUG
320                 rpc_show_tasks();
321 #endif
322                 BUG();
323         }
324
325         return rpc_destroy_client(clnt);
326 }
327
328 /*
329  * Delete an RPC client
330  */
331 int
332 rpc_destroy_client(struct rpc_clnt *clnt)
333 {
334         if (!atomic_dec_and_test(&clnt->cl_count))
335                 return 1;
336         BUG_ON(atomic_read(&clnt->cl_users) != 0);
337
338         dprintk("RPC: destroying %s client for %s\n",
339                         clnt->cl_protname, clnt->cl_server);
340         if (clnt->cl_auth) {
341                 rpcauth_destroy(clnt->cl_auth);
342                 clnt->cl_auth = NULL;
343         }
344         if (!IS_ERR(clnt->cl_dentry)) {
345                 rpc_rmdir(clnt->cl_dentry);
346                 rpc_put_mount();
347         }
348         if (clnt->cl_parent != clnt) {
349                 rpc_destroy_client(clnt->cl_parent);
350                 goto out_free;
351         }
352         if (clnt->cl_server != clnt->cl_inline_name)
353                 kfree(clnt->cl_server);
354 out_free:
355         rpc_free_iostats(clnt->cl_metrics);
356         clnt->cl_metrics = NULL;
357         xprt_put(clnt->cl_xprt);
358         kfree(clnt);
359         return 0;
360 }
361
362 /*
363  * Release an RPC client
364  */
365 void
366 rpc_release_client(struct rpc_clnt *clnt)
367 {
368         dprintk("RPC:      rpc_release_client(%p, %d)\n",
369                                 clnt, atomic_read(&clnt->cl_users));
370
371         if (!atomic_dec_and_test(&clnt->cl_users))
372                 return;
373         wake_up(&destroy_wait);
374         if (clnt->cl_oneshot || clnt->cl_dead)
375                 rpc_destroy_client(clnt);
376 }
377
378 /**
379  * rpc_bind_new_program - bind a new RPC program to an existing client
380  * @old - old rpc_client
381  * @program - rpc program to set
382  * @vers - rpc program version
383  *
384  * Clones the rpc client and sets up a new RPC program. This is mainly
385  * of use for enabling different RPC programs to share the same transport.
386  * The Sun NFSv2/v3 ACL protocol can do this.
387  */
388 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
389                                       struct rpc_program *program,
390                                       int vers)
391 {
392         struct rpc_clnt *clnt;
393         struct rpc_version *version;
394         int err;
395
396         BUG_ON(vers >= program->nrvers || !program->version[vers]);
397         version = program->version[vers];
398         clnt = rpc_clone_client(old);
399         if (IS_ERR(clnt))
400                 goto out;
401         clnt->cl_procinfo = version->procs;
402         clnt->cl_maxproc  = version->nrprocs;
403         clnt->cl_protname = program->name;
404         clnt->cl_prog     = program->number;
405         clnt->cl_vers     = version->number;
406         clnt->cl_stats    = program->stats;
407         err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
408         if (err != 0) {
409                 rpc_shutdown_client(clnt);
410                 clnt = ERR_PTR(err);
411         }
412 out:    
413         return clnt;
414 }
415
416 /*
417  * Default callback for async RPC calls
418  */
419 static void
420 rpc_default_callback(struct rpc_task *task, void *data)
421 {
422 }
423
424 static const struct rpc_call_ops rpc_default_ops = {
425         .rpc_call_done = rpc_default_callback,
426 };
427
428 /*
429  *      Export the signal mask handling for synchronous code that
430  *      sleeps on RPC calls
431  */
432 #define RPC_INTR_SIGNALS (sigmask(SIGHUP) | sigmask(SIGINT) | sigmask(SIGQUIT) | sigmask(SIGTERM))
433  
434 static void rpc_save_sigmask(sigset_t *oldset, int intr)
435 {
436         unsigned long   sigallow = sigmask(SIGKILL);
437         sigset_t sigmask;
438
439         /* Block all signals except those listed in sigallow */
440         if (intr)
441                 sigallow |= RPC_INTR_SIGNALS;
442         siginitsetinv(&sigmask, sigallow);
443         sigprocmask(SIG_BLOCK, &sigmask, oldset);
444 }
445
446 static inline void rpc_task_sigmask(struct rpc_task *task, sigset_t *oldset)
447 {
448         rpc_save_sigmask(oldset, !RPC_TASK_UNINTERRUPTIBLE(task));
449 }
450
451 static inline void rpc_restore_sigmask(sigset_t *oldset)
452 {
453         sigprocmask(SIG_SETMASK, oldset, NULL);
454 }
455
456 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
457 {
458         rpc_save_sigmask(oldset, clnt->cl_intr);
459 }
460
461 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
462 {
463         rpc_restore_sigmask(oldset);
464 }
465
466 /*
467  * New rpc_call implementation
468  */
469 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
470 {
471         struct rpc_task *task;
472         sigset_t        oldset;
473         int             status;
474
475         /* If this client is slain all further I/O fails */
476         if (clnt->cl_dead) 
477                 return -EIO;
478
479         BUG_ON(flags & RPC_TASK_ASYNC);
480
481         task = rpc_new_task(clnt, flags, &rpc_default_ops, NULL);
482         if (task == NULL)
483                 return -ENOMEM;
484
485         /* Mask signals on RPC calls _and_ GSS_AUTH upcalls */
486         rpc_task_sigmask(task, &oldset);
487
488         rpc_call_setup(task, msg, 0);
489
490         /* Set up the call info struct and execute the task */
491         status = task->tk_status;
492         if (status != 0) {
493                 rpc_release_task(task);
494                 goto out;
495         }
496         atomic_inc(&task->tk_count);
497         status = rpc_execute(task);
498         if (status == 0)
499                 status = task->tk_status;
500         rpc_put_task(task);
501 out:
502         rpc_restore_sigmask(&oldset);
503         return status;
504 }
505
506 /*
507  * New rpc_call implementation
508  */
509 int
510 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
511                const struct rpc_call_ops *tk_ops, void *data)
512 {
513         struct rpc_task *task;
514         sigset_t        oldset;
515         int             status;
516
517         /* If this client is slain all further I/O fails */
518         status = -EIO;
519         if (clnt->cl_dead) 
520                 goto out_release;
521
522         flags |= RPC_TASK_ASYNC;
523
524         /* Create/initialize a new RPC task */
525         status = -ENOMEM;
526         if (!(task = rpc_new_task(clnt, flags, tk_ops, data)))
527                 goto out_release;
528
529         /* Mask signals on GSS_AUTH upcalls */
530         rpc_task_sigmask(task, &oldset);                
531
532         rpc_call_setup(task, msg, 0);
533
534         /* Set up the call info struct and execute the task */
535         status = task->tk_status;
536         if (status == 0)
537                 rpc_execute(task);
538         else
539                 rpc_release_task(task);
540
541         rpc_restore_sigmask(&oldset);           
542         return status;
543 out_release:
544         if (tk_ops->rpc_release != NULL)
545                 tk_ops->rpc_release(data);
546         return status;
547 }
548
549
550 void
551 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
552 {
553         task->tk_msg   = *msg;
554         task->tk_flags |= flags;
555         /* Bind the user cred */
556         if (task->tk_msg.rpc_cred != NULL)
557                 rpcauth_holdcred(task);
558         else
559                 rpcauth_bindcred(task);
560
561         if (task->tk_status == 0)
562                 task->tk_action = call_start;
563         else
564                 task->tk_action = rpc_exit_task;
565 }
566
567 /**
568  * rpc_peeraddr - extract remote peer address from clnt's xprt
569  * @clnt: RPC client structure
570  * @buf: target buffer
571  * @size: length of target buffer
572  *
573  * Returns the number of bytes that are actually in the stored address.
574  */
575 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
576 {
577         size_t bytes;
578         struct rpc_xprt *xprt = clnt->cl_xprt;
579
580         bytes = sizeof(xprt->addr);
581         if (bytes > bufsize)
582                 bytes = bufsize;
583         memcpy(buf, &clnt->cl_xprt->addr, bytes);
584         return xprt->addrlen;
585 }
586 EXPORT_SYMBOL_GPL(rpc_peeraddr);
587
588 /**
589  * rpc_peeraddr2str - return remote peer address in printable format
590  * @clnt: RPC client structure
591  * @format: address format
592  *
593  */
594 char *rpc_peeraddr2str(struct rpc_clnt *clnt, enum rpc_display_format_t format)
595 {
596         struct rpc_xprt *xprt = clnt->cl_xprt;
597         return xprt->ops->print_addr(xprt, format);
598 }
599 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
600
601 void
602 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
603 {
604         struct rpc_xprt *xprt = clnt->cl_xprt;
605         if (xprt->ops->set_buffer_size)
606                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
607 }
608
609 /*
610  * Return size of largest payload RPC client can support, in bytes
611  *
612  * For stream transports, this is one RPC record fragment (see RFC
613  * 1831), as we don't support multi-record requests yet.  For datagram
614  * transports, this is the size of an IP packet minus the IP, UDP, and
615  * RPC header sizes.
616  */
617 size_t rpc_max_payload(struct rpc_clnt *clnt)
618 {
619         return clnt->cl_xprt->max_payload;
620 }
621 EXPORT_SYMBOL_GPL(rpc_max_payload);
622
623 /**
624  * rpc_force_rebind - force transport to check that remote port is unchanged
625  * @clnt: client to rebind
626  *
627  */
628 void rpc_force_rebind(struct rpc_clnt *clnt)
629 {
630         if (clnt->cl_autobind)
631                 xprt_clear_bound(clnt->cl_xprt);
632 }
633 EXPORT_SYMBOL_GPL(rpc_force_rebind);
634
635 /*
636  * Restart an (async) RPC call. Usually called from within the
637  * exit handler.
638  */
639 void
640 rpc_restart_call(struct rpc_task *task)
641 {
642         if (RPC_ASSASSINATED(task))
643                 return;
644
645         task->tk_action = call_start;
646 }
647
648 /*
649  * 0.  Initial state
650  *
651  *     Other FSM states can be visited zero or more times, but
652  *     this state is visited exactly once for each RPC.
653  */
654 static void
655 call_start(struct rpc_task *task)
656 {
657         struct rpc_clnt *clnt = task->tk_client;
658
659         dprintk("RPC: %4d call_start %s%d proc %d (%s)\n", task->tk_pid,
660                 clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc->p_proc,
661                 (RPC_IS_ASYNC(task) ? "async" : "sync"));
662
663         /* Increment call count */
664         task->tk_msg.rpc_proc->p_count++;
665         clnt->cl_stats->rpccnt++;
666         task->tk_action = call_reserve;
667 }
668
669 /*
670  * 1.   Reserve an RPC call slot
671  */
672 static void
673 call_reserve(struct rpc_task *task)
674 {
675         dprintk("RPC: %4d call_reserve\n", task->tk_pid);
676
677         if (!rpcauth_uptodatecred(task)) {
678                 task->tk_action = call_refresh;
679                 return;
680         }
681
682         task->tk_status  = 0;
683         task->tk_action  = call_reserveresult;
684         xprt_reserve(task);
685 }
686
687 /*
688  * 1b.  Grok the result of xprt_reserve()
689  */
690 static void
691 call_reserveresult(struct rpc_task *task)
692 {
693         int status = task->tk_status;
694
695         dprintk("RPC: %4d call_reserveresult (status %d)\n",
696                                 task->tk_pid, task->tk_status);
697
698         /*
699          * After a call to xprt_reserve(), we must have either
700          * a request slot or else an error status.
701          */
702         task->tk_status = 0;
703         if (status >= 0) {
704                 if (task->tk_rqstp) {
705                         task->tk_action = call_allocate;
706                         return;
707                 }
708
709                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
710                                 __FUNCTION__, status);
711                 rpc_exit(task, -EIO);
712                 return;
713         }
714
715         /*
716          * Even though there was an error, we may have acquired
717          * a request slot somehow.  Make sure not to leak it.
718          */
719         if (task->tk_rqstp) {
720                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
721                                 __FUNCTION__, status);
722                 xprt_release(task);
723         }
724
725         switch (status) {
726         case -EAGAIN:   /* woken up; retry */
727                 task->tk_action = call_reserve;
728                 return;
729         case -EIO:      /* probably a shutdown */
730                 break;
731         default:
732                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
733                                 __FUNCTION__, status);
734                 break;
735         }
736         rpc_exit(task, status);
737 }
738
739 /*
740  * 2.   Allocate the buffer. For details, see sched.c:rpc_malloc.
741  *      (Note: buffer memory is freed in xprt_release).
742  */
743 static void
744 call_allocate(struct rpc_task *task)
745 {
746         struct rpc_rqst *req = task->tk_rqstp;
747         struct rpc_xprt *xprt = task->tk_xprt;
748         unsigned int    bufsiz;
749
750         dprintk("RPC: %4d call_allocate (status %d)\n", 
751                                 task->tk_pid, task->tk_status);
752         task->tk_action = call_bind;
753         if (req->rq_buffer)
754                 return;
755
756         /* FIXME: compute buffer requirements more exactly using
757          * auth->au_wslack */
758         bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
759
760         if (xprt->ops->buf_alloc(task, bufsiz << 1) != NULL)
761                 return;
762         printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); 
763
764         if (RPC_IS_ASYNC(task) || !signalled()) {
765                 xprt_release(task);
766                 task->tk_action = call_reserve;
767                 rpc_delay(task, HZ>>4);
768                 return;
769         }
770
771         rpc_exit(task, -ERESTARTSYS);
772 }
773
774 static inline int
775 rpc_task_need_encode(struct rpc_task *task)
776 {
777         return task->tk_rqstp->rq_snd_buf.len == 0;
778 }
779
780 static inline void
781 rpc_task_force_reencode(struct rpc_task *task)
782 {
783         task->tk_rqstp->rq_snd_buf.len = 0;
784 }
785
786 /*
787  * 3.   Encode arguments of an RPC call
788  */
789 static void
790 call_encode(struct rpc_task *task)
791 {
792         struct rpc_rqst *req = task->tk_rqstp;
793         struct xdr_buf *sndbuf = &req->rq_snd_buf;
794         struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
795         unsigned int    bufsiz;
796         kxdrproc_t      encode;
797         __be32          *p;
798
799         dprintk("RPC: %4d call_encode (status %d)\n", 
800                                 task->tk_pid, task->tk_status);
801
802         /* Default buffer setup */
803         bufsiz = req->rq_bufsize >> 1;
804         sndbuf->head[0].iov_base = (void *)req->rq_buffer;
805         sndbuf->head[0].iov_len  = bufsiz;
806         sndbuf->tail[0].iov_len  = 0;
807         sndbuf->page_len         = 0;
808         sndbuf->len              = 0;
809         sndbuf->buflen           = bufsiz;
810         rcvbuf->head[0].iov_base = (void *)((char *)req->rq_buffer + bufsiz);
811         rcvbuf->head[0].iov_len  = bufsiz;
812         rcvbuf->tail[0].iov_len  = 0;
813         rcvbuf->page_len         = 0;
814         rcvbuf->len              = 0;
815         rcvbuf->buflen           = bufsiz;
816
817         /* Encode header and provided arguments */
818         encode = task->tk_msg.rpc_proc->p_encode;
819         if (!(p = call_header(task))) {
820                 printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
821                 rpc_exit(task, -EIO);
822                 return;
823         }
824         if (encode == NULL)
825                 return;
826
827         task->tk_status = rpcauth_wrap_req(task, encode, req, p,
828                         task->tk_msg.rpc_argp);
829         if (task->tk_status == -ENOMEM) {
830                 /* XXX: Is this sane? */
831                 rpc_delay(task, 3*HZ);
832                 task->tk_status = -EAGAIN;
833         }
834 }
835
836 /*
837  * 4.   Get the server port number if not yet set
838  */
839 static void
840 call_bind(struct rpc_task *task)
841 {
842         struct rpc_xprt *xprt = task->tk_xprt;
843
844         dprintk("RPC: %4d call_bind (status %d)\n",
845                                 task->tk_pid, task->tk_status);
846
847         task->tk_action = call_connect;
848         if (!xprt_bound(xprt)) {
849                 task->tk_action = call_bind_status;
850                 task->tk_timeout = xprt->bind_timeout;
851                 xprt->ops->rpcbind(task);
852         }
853 }
854
855 /*
856  * 4a.  Sort out bind result
857  */
858 static void
859 call_bind_status(struct rpc_task *task)
860 {
861         int status = -EACCES;
862
863         if (task->tk_status >= 0) {
864                 dprintk("RPC: %4d call_bind_status (status %d)\n",
865                                         task->tk_pid, task->tk_status);
866                 task->tk_status = 0;
867                 task->tk_action = call_connect;
868                 return;
869         }
870
871         switch (task->tk_status) {
872         case -EACCES:
873                 dprintk("RPC: %4d remote rpcbind: RPC program/version unavailable\n",
874                                 task->tk_pid);
875                 rpc_delay(task, 3*HZ);
876                 goto retry_timeout;
877         case -ETIMEDOUT:
878                 dprintk("RPC: %4d rpcbind request timed out\n",
879                                 task->tk_pid);
880                 goto retry_timeout;
881         case -EPFNOSUPPORT:
882                 dprintk("RPC: %4d remote rpcbind service unavailable\n",
883                                 task->tk_pid);
884                 break;
885         case -EPROTONOSUPPORT:
886                 dprintk("RPC: %4d remote rpcbind version 2 unavailable\n",
887                                 task->tk_pid);
888                 break;
889         default:
890                 dprintk("RPC: %4d unrecognized rpcbind error (%d)\n",
891                                 task->tk_pid, -task->tk_status);
892                 status = -EIO;
893         }
894
895         rpc_exit(task, status);
896         return;
897
898 retry_timeout:
899         task->tk_action = call_timeout;
900 }
901
902 /*
903  * 4b.  Connect to the RPC server
904  */
905 static void
906 call_connect(struct rpc_task *task)
907 {
908         struct rpc_xprt *xprt = task->tk_xprt;
909
910         dprintk("RPC: %4d call_connect xprt %p %s connected\n",
911                         task->tk_pid, xprt,
912                         (xprt_connected(xprt) ? "is" : "is not"));
913
914         task->tk_action = call_transmit;
915         if (!xprt_connected(xprt)) {
916                 task->tk_action = call_connect_status;
917                 if (task->tk_status < 0)
918                         return;
919                 xprt_connect(task);
920         }
921 }
922
923 /*
924  * 4c.  Sort out connect result
925  */
926 static void
927 call_connect_status(struct rpc_task *task)
928 {
929         struct rpc_clnt *clnt = task->tk_client;
930         int status = task->tk_status;
931
932         dprintk("RPC: %5u call_connect_status (status %d)\n", 
933                                 task->tk_pid, task->tk_status);
934
935         task->tk_status = 0;
936         if (status >= 0) {
937                 clnt->cl_stats->netreconn++;
938                 task->tk_action = call_transmit;
939                 return;
940         }
941
942         /* Something failed: remote service port may have changed */
943         rpc_force_rebind(clnt);
944
945         switch (status) {
946         case -ENOTCONN:
947         case -EAGAIN:
948                 task->tk_action = call_bind;
949                 if (!RPC_IS_SOFT(task))
950                         return;
951                 /* if soft mounted, test if we've timed out */
952         case -ETIMEDOUT:
953                 task->tk_action = call_timeout;
954                 return;
955         }
956         rpc_exit(task, -EIO);
957 }
958
959 /*
960  * 5.   Transmit the RPC request, and wait for reply
961  */
962 static void
963 call_transmit(struct rpc_task *task)
964 {
965         dprintk("RPC: %4d call_transmit (status %d)\n", 
966                                 task->tk_pid, task->tk_status);
967
968         task->tk_action = call_status;
969         if (task->tk_status < 0)
970                 return;
971         task->tk_status = xprt_prepare_transmit(task);
972         if (task->tk_status != 0)
973                 return;
974         task->tk_action = call_transmit_status;
975         /* Encode here so that rpcsec_gss can use correct sequence number. */
976         if (rpc_task_need_encode(task)) {
977                 BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
978                 call_encode(task);
979                 /* Did the encode result in an error condition? */
980                 if (task->tk_status != 0)
981                         return;
982         }
983         xprt_transmit(task);
984         if (task->tk_status < 0)
985                 return;
986         /*
987          * On success, ensure that we call xprt_end_transmit() before sleeping
988          * in order to allow access to the socket to other RPC requests.
989          */
990         call_transmit_status(task);
991         if (task->tk_msg.rpc_proc->p_decode != NULL)
992                 return;
993         task->tk_action = rpc_exit_task;
994         rpc_wake_up_task(task);
995 }
996
997 /*
998  * 5a.  Handle cleanup after a transmission
999  */
1000 static void
1001 call_transmit_status(struct rpc_task *task)
1002 {
1003         task->tk_action = call_status;
1004         /*
1005          * Special case: if we've been waiting on the socket's write_space()
1006          * callback, then don't call xprt_end_transmit().
1007          */
1008         if (task->tk_status == -EAGAIN)
1009                 return;
1010         xprt_end_transmit(task);
1011         rpc_task_force_reencode(task);
1012 }
1013
1014 /*
1015  * 6.   Sort out the RPC call status
1016  */
1017 static void
1018 call_status(struct rpc_task *task)
1019 {
1020         struct rpc_clnt *clnt = task->tk_client;
1021         struct rpc_rqst *req = task->tk_rqstp;
1022         int             status;
1023
1024         if (req->rq_received > 0 && !req->rq_bytes_sent)
1025                 task->tk_status = req->rq_received;
1026
1027         dprintk("RPC: %4d call_status (status %d)\n", 
1028                                 task->tk_pid, task->tk_status);
1029
1030         status = task->tk_status;
1031         if (status >= 0) {
1032                 task->tk_action = call_decode;
1033                 return;
1034         }
1035
1036         task->tk_status = 0;
1037         switch(status) {
1038         case -EHOSTDOWN:
1039         case -EHOSTUNREACH:
1040         case -ENETUNREACH:
1041                 /*
1042                  * Delay any retries for 3 seconds, then handle as if it
1043                  * were a timeout.
1044                  */
1045                 rpc_delay(task, 3*HZ);
1046         case -ETIMEDOUT:
1047                 task->tk_action = call_timeout;
1048                 break;
1049         case -ECONNREFUSED:
1050         case -ENOTCONN:
1051                 rpc_force_rebind(clnt);
1052                 task->tk_action = call_bind;
1053                 break;
1054         case -EAGAIN:
1055                 task->tk_action = call_transmit;
1056                 break;
1057         case -EIO:
1058                 /* shutdown or soft timeout */
1059                 rpc_exit(task, status);
1060                 break;
1061         default:
1062                 printk("%s: RPC call returned error %d\n",
1063                                clnt->cl_protname, -status);
1064                 rpc_exit(task, status);
1065         }
1066 }
1067
1068 /*
1069  * 6a.  Handle RPC timeout
1070  *      We do not release the request slot, so we keep using the
1071  *      same XID for all retransmits.
1072  */
1073 static void
1074 call_timeout(struct rpc_task *task)
1075 {
1076         struct rpc_clnt *clnt = task->tk_client;
1077
1078         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1079                 dprintk("RPC: %4d call_timeout (minor)\n", task->tk_pid);
1080                 goto retry;
1081         }
1082
1083         dprintk("RPC: %4d call_timeout (major)\n", task->tk_pid);
1084         task->tk_timeouts++;
1085
1086         if (RPC_IS_SOFT(task)) {
1087                 printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1088                                 clnt->cl_protname, clnt->cl_server);
1089                 rpc_exit(task, -EIO);
1090                 return;
1091         }
1092
1093         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1094                 task->tk_flags |= RPC_CALL_MAJORSEEN;
1095                 printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1096                         clnt->cl_protname, clnt->cl_server);
1097         }
1098         rpc_force_rebind(clnt);
1099
1100 retry:
1101         clnt->cl_stats->rpcretrans++;
1102         task->tk_action = call_bind;
1103         task->tk_status = 0;
1104 }
1105
1106 /*
1107  * 7.   Decode the RPC reply
1108  */
1109 static void
1110 call_decode(struct rpc_task *task)
1111 {
1112         struct rpc_clnt *clnt = task->tk_client;
1113         struct rpc_rqst *req = task->tk_rqstp;
1114         kxdrproc_t      decode = task->tk_msg.rpc_proc->p_decode;
1115         __be32          *p;
1116
1117         dprintk("RPC: %4d call_decode (status %d)\n", 
1118                                 task->tk_pid, task->tk_status);
1119
1120         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1121                 printk(KERN_NOTICE "%s: server %s OK\n",
1122                         clnt->cl_protname, clnt->cl_server);
1123                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1124         }
1125
1126         if (task->tk_status < 12) {
1127                 if (!RPC_IS_SOFT(task)) {
1128                         task->tk_action = call_bind;
1129                         clnt->cl_stats->rpcretrans++;
1130                         goto out_retry;
1131                 }
1132                 dprintk("%s: too small RPC reply size (%d bytes)\n",
1133                         clnt->cl_protname, task->tk_status);
1134                 task->tk_action = call_timeout;
1135                 goto out_retry;
1136         }
1137
1138         /*
1139          * Ensure that we see all writes made by xprt_complete_rqst()
1140          * before it changed req->rq_received.
1141          */
1142         smp_rmb();
1143         req->rq_rcv_buf.len = req->rq_private_buf.len;
1144
1145         /* Check that the softirq receive buffer is valid */
1146         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1147                                 sizeof(req->rq_rcv_buf)) != 0);
1148
1149         /* Verify the RPC header */
1150         p = call_verify(task);
1151         if (IS_ERR(p)) {
1152                 if (p == ERR_PTR(-EAGAIN))
1153                         goto out_retry;
1154                 return;
1155         }
1156
1157         task->tk_action = rpc_exit_task;
1158
1159         if (decode)
1160                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1161                                                       task->tk_msg.rpc_resp);
1162         dprintk("RPC: %4d call_decode result %d\n", task->tk_pid,
1163                                         task->tk_status);
1164         return;
1165 out_retry:
1166         req->rq_received = req->rq_private_buf.len = 0;
1167         task->tk_status = 0;
1168 }
1169
1170 /*
1171  * 8.   Refresh the credentials if rejected by the server
1172  */
1173 static void
1174 call_refresh(struct rpc_task *task)
1175 {
1176         dprintk("RPC: %4d call_refresh\n", task->tk_pid);
1177
1178         xprt_release(task);     /* Must do to obtain new XID */
1179         task->tk_action = call_refreshresult;
1180         task->tk_status = 0;
1181         task->tk_client->cl_stats->rpcauthrefresh++;
1182         rpcauth_refreshcred(task);
1183 }
1184
1185 /*
1186  * 8a.  Process the results of a credential refresh
1187  */
1188 static void
1189 call_refreshresult(struct rpc_task *task)
1190 {
1191         int status = task->tk_status;
1192         dprintk("RPC: %4d call_refreshresult (status %d)\n", 
1193                                 task->tk_pid, task->tk_status);
1194
1195         task->tk_status = 0;
1196         task->tk_action = call_reserve;
1197         if (status >= 0 && rpcauth_uptodatecred(task))
1198                 return;
1199         if (status == -EACCES) {
1200                 rpc_exit(task, -EACCES);
1201                 return;
1202         }
1203         task->tk_action = call_refresh;
1204         if (status != -ETIMEDOUT)
1205                 rpc_delay(task, 3*HZ);
1206         return;
1207 }
1208
1209 /*
1210  * Call header serialization
1211  */
1212 static __be32 *
1213 call_header(struct rpc_task *task)
1214 {
1215         struct rpc_clnt *clnt = task->tk_client;
1216         struct rpc_rqst *req = task->tk_rqstp;
1217         __be32          *p = req->rq_svec[0].iov_base;
1218
1219         /* FIXME: check buffer size? */
1220
1221         p = xprt_skip_transport_header(task->tk_xprt, p);
1222         *p++ = req->rq_xid;             /* XID */
1223         *p++ = htonl(RPC_CALL);         /* CALL */
1224         *p++ = htonl(RPC_VERSION);      /* RPC version */
1225         *p++ = htonl(clnt->cl_prog);    /* program number */
1226         *p++ = htonl(clnt->cl_vers);    /* program version */
1227         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
1228         p = rpcauth_marshcred(task, p);
1229         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1230         return p;
1231 }
1232
1233 /*
1234  * Reply header verification
1235  */
1236 static __be32 *
1237 call_verify(struct rpc_task *task)
1238 {
1239         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1240         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1241         __be32  *p = iov->iov_base;
1242         u32 n;
1243         int error = -EACCES;
1244
1245         if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
1246                 /* RFC-1014 says that the representation of XDR data must be a
1247                  * multiple of four bytes
1248                  * - if it isn't pointer subtraction in the NFS client may give
1249                  *   undefined results
1250                  */
1251                 printk(KERN_WARNING
1252                        "call_verify: XDR representation not a multiple of"
1253                        " 4 bytes: 0x%x\n", task->tk_rqstp->rq_rcv_buf.len);
1254                 goto out_eio;
1255         }
1256         if ((len -= 3) < 0)
1257                 goto out_overflow;
1258         p += 1; /* skip XID */
1259
1260         if ((n = ntohl(*p++)) != RPC_REPLY) {
1261                 printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
1262                 goto out_garbage;
1263         }
1264         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1265                 if (--len < 0)
1266                         goto out_overflow;
1267                 switch ((n = ntohl(*p++))) {
1268                         case RPC_AUTH_ERROR:
1269                                 break;
1270                         case RPC_MISMATCH:
1271                                 dprintk("%s: RPC call version mismatch!\n", __FUNCTION__);
1272                                 error = -EPROTONOSUPPORT;
1273                                 goto out_err;
1274                         default:
1275                                 dprintk("%s: RPC call rejected, unknown error: %x\n", __FUNCTION__, n);
1276                                 goto out_eio;
1277                 }
1278                 if (--len < 0)
1279                         goto out_overflow;
1280                 switch ((n = ntohl(*p++))) {
1281                 case RPC_AUTH_REJECTEDCRED:
1282                 case RPC_AUTH_REJECTEDVERF:
1283                 case RPCSEC_GSS_CREDPROBLEM:
1284                 case RPCSEC_GSS_CTXPROBLEM:
1285                         if (!task->tk_cred_retry)
1286                                 break;
1287                         task->tk_cred_retry--;
1288                         dprintk("RPC: %4d call_verify: retry stale creds\n",
1289                                                         task->tk_pid);
1290                         rpcauth_invalcred(task);
1291                         task->tk_action = call_refresh;
1292                         goto out_retry;
1293                 case RPC_AUTH_BADCRED:
1294                 case RPC_AUTH_BADVERF:
1295                         /* possibly garbled cred/verf? */
1296                         if (!task->tk_garb_retry)
1297                                 break;
1298                         task->tk_garb_retry--;
1299                         dprintk("RPC: %4d call_verify: retry garbled creds\n",
1300                                                         task->tk_pid);
1301                         task->tk_action = call_bind;
1302                         goto out_retry;
1303                 case RPC_AUTH_TOOWEAK:
1304                         printk(KERN_NOTICE "call_verify: server %s requires stronger "
1305                                "authentication.\n", task->tk_client->cl_server);
1306                         break;
1307                 default:
1308                         printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
1309                         error = -EIO;
1310                 }
1311                 dprintk("RPC: %4d call_verify: call rejected %d\n",
1312                                                 task->tk_pid, n);
1313                 goto out_err;
1314         }
1315         if (!(p = rpcauth_checkverf(task, p))) {
1316                 printk(KERN_WARNING "call_verify: auth check failed\n");
1317                 goto out_garbage;               /* bad verifier, retry */
1318         }
1319         len = p - (__be32 *)iov->iov_base - 1;
1320         if (len < 0)
1321                 goto out_overflow;
1322         switch ((n = ntohl(*p++))) {
1323         case RPC_SUCCESS:
1324                 return p;
1325         case RPC_PROG_UNAVAIL:
1326                 dprintk("RPC: call_verify: program %u is unsupported by server %s\n",
1327                                 (unsigned int)task->tk_client->cl_prog,
1328                                 task->tk_client->cl_server);
1329                 error = -EPFNOSUPPORT;
1330                 goto out_err;
1331         case RPC_PROG_MISMATCH:
1332                 dprintk("RPC: call_verify: program %u, version %u unsupported by server %s\n",
1333                                 (unsigned int)task->tk_client->cl_prog,
1334                                 (unsigned int)task->tk_client->cl_vers,
1335                                 task->tk_client->cl_server);
1336                 error = -EPROTONOSUPPORT;
1337                 goto out_err;
1338         case RPC_PROC_UNAVAIL:
1339                 dprintk("RPC: call_verify: proc %p unsupported by program %u, version %u on server %s\n",
1340                                 task->tk_msg.rpc_proc,
1341                                 task->tk_client->cl_prog,
1342                                 task->tk_client->cl_vers,
1343                                 task->tk_client->cl_server);
1344                 error = -EOPNOTSUPP;
1345                 goto out_err;
1346         case RPC_GARBAGE_ARGS:
1347                 dprintk("RPC: %4d %s: server saw garbage\n", task->tk_pid, __FUNCTION__);
1348                 break;                  /* retry */
1349         default:
1350                 printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
1351                 /* Also retry */
1352         }
1353
1354 out_garbage:
1355         task->tk_client->cl_stats->rpcgarbage++;
1356         if (task->tk_garb_retry) {
1357                 task->tk_garb_retry--;
1358                 dprintk("RPC %s: retrying %4d\n", __FUNCTION__, task->tk_pid);
1359                 task->tk_action = call_bind;
1360 out_retry:
1361                 return ERR_PTR(-EAGAIN);
1362         }
1363         printk(KERN_WARNING "RPC %s: retry failed, exit EIO\n", __FUNCTION__);
1364 out_eio:
1365         error = -EIO;
1366 out_err:
1367         rpc_exit(task, error);
1368         return ERR_PTR(error);
1369 out_overflow:
1370         printk(KERN_WARNING "RPC %s: server reply was truncated.\n", __FUNCTION__);
1371         goto out_garbage;
1372 }
1373
1374 static int rpcproc_encode_null(void *rqstp, __be32 *data, void *obj)
1375 {
1376         return 0;
1377 }
1378
1379 static int rpcproc_decode_null(void *rqstp, __be32 *data, void *obj)
1380 {
1381         return 0;
1382 }
1383
1384 static struct rpc_procinfo rpcproc_null = {
1385         .p_encode = rpcproc_encode_null,
1386         .p_decode = rpcproc_decode_null,
1387 };
1388
1389 int rpc_ping(struct rpc_clnt *clnt, int flags)
1390 {
1391         struct rpc_message msg = {
1392                 .rpc_proc = &rpcproc_null,
1393         };
1394         int err;
1395         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
1396         err = rpc_call_sync(clnt, &msg, flags);
1397         put_rpccred(msg.rpc_cred);
1398         return err;
1399 }