[XFRM]: beet: fix IP option encapsulation
[linux-2.6] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23
24 #include <asm/system.h>
25
26 #include <linux/module.h>
27 #include <linux/types.h>
28 #include <linux/mm.h>
29 #include <linux/slab.h>
30 #include <linux/smp_lock.h>
31 #include <linux/utsname.h>
32 #include <linux/workqueue.h>
33
34 #include <linux/sunrpc/clnt.h>
35 #include <linux/sunrpc/rpc_pipe_fs.h>
36 #include <linux/sunrpc/metrics.h>
37
38
39 #define RPC_SLACK_SPACE         (1024)  /* total overkill */
40
41 #ifdef RPC_DEBUG
42 # define RPCDBG_FACILITY        RPCDBG_CALL
43 #endif
44
45 #define dprint_status(t)                                        \
46         dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
47                         __FUNCTION__, t->tk_status)
48
49 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
50
51
52 static void     call_start(struct rpc_task *task);
53 static void     call_reserve(struct rpc_task *task);
54 static void     call_reserveresult(struct rpc_task *task);
55 static void     call_allocate(struct rpc_task *task);
56 static void     call_encode(struct rpc_task *task);
57 static void     call_decode(struct rpc_task *task);
58 static void     call_bind(struct rpc_task *task);
59 static void     call_bind_status(struct rpc_task *task);
60 static void     call_transmit(struct rpc_task *task);
61 static void     call_status(struct rpc_task *task);
62 static void     call_transmit_status(struct rpc_task *task);
63 static void     call_refresh(struct rpc_task *task);
64 static void     call_refreshresult(struct rpc_task *task);
65 static void     call_timeout(struct rpc_task *task);
66 static void     call_connect(struct rpc_task *task);
67 static void     call_connect_status(struct rpc_task *task);
68 static __be32 * call_header(struct rpc_task *task);
69 static __be32 * call_verify(struct rpc_task *task);
70
71
72 static int
73 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
74 {
75         static uint32_t clntid;
76         int error;
77
78         clnt->cl_vfsmnt = ERR_PTR(-ENOENT);
79         clnt->cl_dentry = ERR_PTR(-ENOENT);
80         if (dir_name == NULL)
81                 return 0;
82
83         clnt->cl_vfsmnt = rpc_get_mount();
84         if (IS_ERR(clnt->cl_vfsmnt))
85                 return PTR_ERR(clnt->cl_vfsmnt);
86
87         for (;;) {
88                 snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
89                                 "%s/clnt%x", dir_name,
90                                 (unsigned int)clntid++);
91                 clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
92                 clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
93                 if (!IS_ERR(clnt->cl_dentry))
94                         return 0;
95                 error = PTR_ERR(clnt->cl_dentry);
96                 if (error != -EEXIST) {
97                         printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
98                                         clnt->cl_pathname, error);
99                         rpc_put_mount();
100                         return error;
101                 }
102         }
103 }
104
105 static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, struct rpc_program *program, u32 vers, rpc_authflavor_t flavor)
106 {
107         struct rpc_version      *version;
108         struct rpc_clnt         *clnt = NULL;
109         struct rpc_auth         *auth;
110         int err;
111         int len;
112
113         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
114                         program->name, servname, xprt);
115
116         err = -EINVAL;
117         if (!xprt)
118                 goto out_no_xprt;
119         if (vers >= program->nrvers || !(version = program->version[vers]))
120                 goto out_err;
121
122         err = -ENOMEM;
123         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
124         if (!clnt)
125                 goto out_err;
126         atomic_set(&clnt->cl_users, 0);
127         atomic_set(&clnt->cl_count, 1);
128         clnt->cl_parent = clnt;
129
130         clnt->cl_server = clnt->cl_inline_name;
131         len = strlen(servname) + 1;
132         if (len > sizeof(clnt->cl_inline_name)) {
133                 char *buf = kmalloc(len, GFP_KERNEL);
134                 if (buf != 0)
135                         clnt->cl_server = buf;
136                 else
137                         len = sizeof(clnt->cl_inline_name);
138         }
139         strlcpy(clnt->cl_server, servname, len);
140
141         clnt->cl_xprt     = xprt;
142         clnt->cl_procinfo = version->procs;
143         clnt->cl_maxproc  = version->nrprocs;
144         clnt->cl_protname = program->name;
145         clnt->cl_prog     = program->number;
146         clnt->cl_vers     = version->number;
147         clnt->cl_stats    = program->stats;
148         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
149         err = -ENOMEM;
150         if (clnt->cl_metrics == NULL)
151                 goto out_no_stats;
152         clnt->cl_program  = program;
153
154         if (!xprt_bound(clnt->cl_xprt))
155                 clnt->cl_autobind = 1;
156
157         clnt->cl_rtt = &clnt->cl_rtt_default;
158         rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
159
160         err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
161         if (err < 0)
162                 goto out_no_path;
163
164         auth = rpcauth_create(flavor, clnt);
165         if (IS_ERR(auth)) {
166                 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
167                                 flavor);
168                 err = PTR_ERR(auth);
169                 goto out_no_auth;
170         }
171
172         /* save the nodename */
173         clnt->cl_nodelen = strlen(utsname()->nodename);
174         if (clnt->cl_nodelen > UNX_MAXNODENAME)
175                 clnt->cl_nodelen = UNX_MAXNODENAME;
176         memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen);
177         return clnt;
178
179 out_no_auth:
180         if (!IS_ERR(clnt->cl_dentry)) {
181                 rpc_rmdir(clnt->cl_dentry);
182                 rpc_put_mount();
183         }
184 out_no_path:
185         rpc_free_iostats(clnt->cl_metrics);
186 out_no_stats:
187         if (clnt->cl_server != clnt->cl_inline_name)
188                 kfree(clnt->cl_server);
189         kfree(clnt);
190 out_err:
191         xprt_put(xprt);
192 out_no_xprt:
193         return ERR_PTR(err);
194 }
195
196 /*
197  * rpc_create - create an RPC client and transport with one call
198  * @args: rpc_clnt create argument structure
199  *
200  * Creates and initializes an RPC transport and an RPC client.
201  *
202  * It can ping the server in order to determine if it is up, and to see if
203  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
204  * this behavior so asynchronous tasks can also use rpc_create.
205  */
206 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
207 {
208         struct rpc_xprt *xprt;
209         struct rpc_clnt *clnt;
210
211         xprt = xprt_create_transport(args->protocol, args->address,
212                                         args->addrsize, args->timeout);
213         if (IS_ERR(xprt))
214                 return (struct rpc_clnt *)xprt;
215
216         /*
217          * By default, kernel RPC client connects from a reserved port.
218          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
219          * but it is always enabled for rpciod, which handles the connect
220          * operation.
221          */
222         xprt->resvport = 1;
223         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
224                 xprt->resvport = 0;
225
226         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
227                         args->program->name, args->servername, xprt);
228
229         clnt = rpc_new_client(xprt, args->servername, args->program,
230                                 args->version, args->authflavor);
231         if (IS_ERR(clnt))
232                 return clnt;
233
234         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
235                 int err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
236                 if (err != 0) {
237                         rpc_shutdown_client(clnt);
238                         return ERR_PTR(err);
239                 }
240         }
241
242         clnt->cl_softrtry = 1;
243         if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
244                 clnt->cl_softrtry = 0;
245
246         if (args->flags & RPC_CLNT_CREATE_INTR)
247                 clnt->cl_intr = 1;
248         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
249                 clnt->cl_autobind = 1;
250         if (args->flags & RPC_CLNT_CREATE_ONESHOT)
251                 clnt->cl_oneshot = 1;
252         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
253                 clnt->cl_discrtry = 1;
254
255         return clnt;
256 }
257 EXPORT_SYMBOL_GPL(rpc_create);
258
259 /*
260  * This function clones the RPC client structure. It allows us to share the
261  * same transport while varying parameters such as the authentication
262  * flavour.
263  */
264 struct rpc_clnt *
265 rpc_clone_client(struct rpc_clnt *clnt)
266 {
267         struct rpc_clnt *new;
268         int err = -ENOMEM;
269
270         new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
271         if (!new)
272                 goto out_no_clnt;
273         atomic_set(&new->cl_count, 1);
274         atomic_set(&new->cl_users, 0);
275         new->cl_metrics = rpc_alloc_iostats(clnt);
276         if (new->cl_metrics == NULL)
277                 goto out_no_stats;
278         err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
279         if (err != 0)
280                 goto out_no_path;
281         new->cl_parent = clnt;
282         atomic_inc(&clnt->cl_count);
283         new->cl_xprt = xprt_get(clnt->cl_xprt);
284         /* Turn off autobind on clones */
285         new->cl_autobind = 0;
286         new->cl_oneshot = 0;
287         new->cl_dead = 0;
288         rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
289         if (new->cl_auth)
290                 atomic_inc(&new->cl_auth->au_count);
291         return new;
292 out_no_path:
293         rpc_free_iostats(new->cl_metrics);
294 out_no_stats:
295         kfree(new);
296 out_no_clnt:
297         dprintk("RPC:       %s: returned error %d\n", __FUNCTION__, err);
298         return ERR_PTR(err);
299 }
300
301 /*
302  * Properly shut down an RPC client, terminating all outstanding
303  * requests. Note that we must be certain that cl_oneshot and
304  * cl_dead are cleared, or else the client would be destroyed
305  * when the last task releases it.
306  */
307 int
308 rpc_shutdown_client(struct rpc_clnt *clnt)
309 {
310         dprintk("RPC:       shutting down %s client for %s, tasks=%d\n",
311                         clnt->cl_protname, clnt->cl_server,
312                         atomic_read(&clnt->cl_users));
313
314         while (atomic_read(&clnt->cl_users) > 0) {
315                 /* Don't let rpc_release_client destroy us */
316                 clnt->cl_oneshot = 0;
317                 clnt->cl_dead = 0;
318                 rpc_killall_tasks(clnt);
319                 wait_event_timeout(destroy_wait,
320                         !atomic_read(&clnt->cl_users), 1*HZ);
321         }
322
323         if (atomic_read(&clnt->cl_users) < 0) {
324                 printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n",
325                                 clnt, atomic_read(&clnt->cl_users));
326 #ifdef RPC_DEBUG
327                 rpc_show_tasks();
328 #endif
329                 BUG();
330         }
331
332         return rpc_destroy_client(clnt);
333 }
334
335 /*
336  * Delete an RPC client
337  */
338 int
339 rpc_destroy_client(struct rpc_clnt *clnt)
340 {
341         if (!atomic_dec_and_test(&clnt->cl_count))
342                 return 1;
343         BUG_ON(atomic_read(&clnt->cl_users) != 0);
344
345         dprintk("RPC:       destroying %s client for %s\n",
346                         clnt->cl_protname, clnt->cl_server);
347         if (clnt->cl_auth) {
348                 rpcauth_destroy(clnt->cl_auth);
349                 clnt->cl_auth = NULL;
350         }
351         if (!IS_ERR(clnt->cl_dentry)) {
352                 rpc_rmdir(clnt->cl_dentry);
353                 rpc_put_mount();
354         }
355         if (clnt->cl_parent != clnt) {
356                 rpc_destroy_client(clnt->cl_parent);
357                 goto out_free;
358         }
359         if (clnt->cl_server != clnt->cl_inline_name)
360                 kfree(clnt->cl_server);
361 out_free:
362         rpc_free_iostats(clnt->cl_metrics);
363         clnt->cl_metrics = NULL;
364         xprt_put(clnt->cl_xprt);
365         kfree(clnt);
366         return 0;
367 }
368
369 /*
370  * Release an RPC client
371  */
372 void
373 rpc_release_client(struct rpc_clnt *clnt)
374 {
375         dprintk("RPC:       rpc_release_client(%p, %d)\n",
376                         clnt, atomic_read(&clnt->cl_users));
377
378         if (!atomic_dec_and_test(&clnt->cl_users))
379                 return;
380         wake_up(&destroy_wait);
381         if (clnt->cl_oneshot || clnt->cl_dead)
382                 rpc_destroy_client(clnt);
383 }
384
385 /**
386  * rpc_bind_new_program - bind a new RPC program to an existing client
387  * @old - old rpc_client
388  * @program - rpc program to set
389  * @vers - rpc program version
390  *
391  * Clones the rpc client and sets up a new RPC program. This is mainly
392  * of use for enabling different RPC programs to share the same transport.
393  * The Sun NFSv2/v3 ACL protocol can do this.
394  */
395 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
396                                       struct rpc_program *program,
397                                       int vers)
398 {
399         struct rpc_clnt *clnt;
400         struct rpc_version *version;
401         int err;
402
403         BUG_ON(vers >= program->nrvers || !program->version[vers]);
404         version = program->version[vers];
405         clnt = rpc_clone_client(old);
406         if (IS_ERR(clnt))
407                 goto out;
408         clnt->cl_procinfo = version->procs;
409         clnt->cl_maxproc  = version->nrprocs;
410         clnt->cl_protname = program->name;
411         clnt->cl_prog     = program->number;
412         clnt->cl_vers     = version->number;
413         clnt->cl_stats    = program->stats;
414         err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
415         if (err != 0) {
416                 rpc_shutdown_client(clnt);
417                 clnt = ERR_PTR(err);
418         }
419 out:
420         return clnt;
421 }
422
423 /*
424  * Default callback for async RPC calls
425  */
426 static void
427 rpc_default_callback(struct rpc_task *task, void *data)
428 {
429 }
430
431 static const struct rpc_call_ops rpc_default_ops = {
432         .rpc_call_done = rpc_default_callback,
433 };
434
435 /*
436  *      Export the signal mask handling for synchronous code that
437  *      sleeps on RPC calls
438  */
439 #define RPC_INTR_SIGNALS (sigmask(SIGHUP) | sigmask(SIGINT) | sigmask(SIGQUIT) | sigmask(SIGTERM))
440
441 static void rpc_save_sigmask(sigset_t *oldset, int intr)
442 {
443         unsigned long   sigallow = sigmask(SIGKILL);
444         sigset_t sigmask;
445
446         /* Block all signals except those listed in sigallow */
447         if (intr)
448                 sigallow |= RPC_INTR_SIGNALS;
449         siginitsetinv(&sigmask, sigallow);
450         sigprocmask(SIG_BLOCK, &sigmask, oldset);
451 }
452
453 static inline void rpc_task_sigmask(struct rpc_task *task, sigset_t *oldset)
454 {
455         rpc_save_sigmask(oldset, !RPC_TASK_UNINTERRUPTIBLE(task));
456 }
457
458 static inline void rpc_restore_sigmask(sigset_t *oldset)
459 {
460         sigprocmask(SIG_SETMASK, oldset, NULL);
461 }
462
463 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
464 {
465         rpc_save_sigmask(oldset, clnt->cl_intr);
466 }
467
468 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
469 {
470         rpc_restore_sigmask(oldset);
471 }
472
473 /*
474  * New rpc_call implementation
475  */
476 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
477 {
478         struct rpc_task *task;
479         sigset_t        oldset;
480         int             status;
481
482         /* If this client is slain all further I/O fails */
483         if (clnt->cl_dead)
484                 return -EIO;
485
486         BUG_ON(flags & RPC_TASK_ASYNC);
487
488         task = rpc_new_task(clnt, flags, &rpc_default_ops, NULL);
489         if (task == NULL)
490                 return -ENOMEM;
491
492         /* Mask signals on RPC calls _and_ GSS_AUTH upcalls */
493         rpc_task_sigmask(task, &oldset);
494
495         /* Set up the call info struct and execute the task */
496         rpc_call_setup(task, msg, 0);
497         if (task->tk_status == 0) {
498                 atomic_inc(&task->tk_count);
499                 rpc_execute(task);
500         }
501         status = task->tk_status;
502         rpc_put_task(task);
503         rpc_restore_sigmask(&oldset);
504         return status;
505 }
506
507 /*
508  * New rpc_call implementation
509  */
510 int
511 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
512                const struct rpc_call_ops *tk_ops, void *data)
513 {
514         struct rpc_task *task;
515         sigset_t        oldset;
516         int             status;
517
518         /* If this client is slain all further I/O fails */
519         status = -EIO;
520         if (clnt->cl_dead)
521                 goto out_release;
522
523         flags |= RPC_TASK_ASYNC;
524
525         /* Create/initialize a new RPC task */
526         status = -ENOMEM;
527         if (!(task = rpc_new_task(clnt, flags, tk_ops, data)))
528                 goto out_release;
529
530         /* Mask signals on GSS_AUTH upcalls */
531         rpc_task_sigmask(task, &oldset);
532
533         rpc_call_setup(task, msg, 0);
534
535         /* Set up the call info struct and execute the task */
536         status = task->tk_status;
537         if (status == 0)
538                 rpc_execute(task);
539         else
540                 rpc_put_task(task);
541
542         rpc_restore_sigmask(&oldset);
543         return status;
544 out_release:
545         rpc_release_calldata(tk_ops, data);
546         return status;
547 }
548
549
550 void
551 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
552 {
553         task->tk_msg   = *msg;
554         task->tk_flags |= flags;
555         /* Bind the user cred */
556         if (task->tk_msg.rpc_cred != NULL)
557                 rpcauth_holdcred(task);
558         else
559                 rpcauth_bindcred(task);
560
561         if (task->tk_status == 0)
562                 task->tk_action = call_start;
563         else
564                 task->tk_action = rpc_exit_task;
565 }
566
567 /**
568  * rpc_peeraddr - extract remote peer address from clnt's xprt
569  * @clnt: RPC client structure
570  * @buf: target buffer
571  * @size: length of target buffer
572  *
573  * Returns the number of bytes that are actually in the stored address.
574  */
575 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
576 {
577         size_t bytes;
578         struct rpc_xprt *xprt = clnt->cl_xprt;
579
580         bytes = sizeof(xprt->addr);
581         if (bytes > bufsize)
582                 bytes = bufsize;
583         memcpy(buf, &clnt->cl_xprt->addr, bytes);
584         return xprt->addrlen;
585 }
586 EXPORT_SYMBOL_GPL(rpc_peeraddr);
587
588 /**
589  * rpc_peeraddr2str - return remote peer address in printable format
590  * @clnt: RPC client structure
591  * @format: address format
592  *
593  */
594 char *rpc_peeraddr2str(struct rpc_clnt *clnt, enum rpc_display_format_t format)
595 {
596         struct rpc_xprt *xprt = clnt->cl_xprt;
597
598         if (xprt->address_strings[format] != NULL)
599                 return xprt->address_strings[format];
600         else
601                 return "unprintable";
602 }
603 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
604
605 void
606 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
607 {
608         struct rpc_xprt *xprt = clnt->cl_xprt;
609         if (xprt->ops->set_buffer_size)
610                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
611 }
612
613 /*
614  * Return size of largest payload RPC client can support, in bytes
615  *
616  * For stream transports, this is one RPC record fragment (see RFC
617  * 1831), as we don't support multi-record requests yet.  For datagram
618  * transports, this is the size of an IP packet minus the IP, UDP, and
619  * RPC header sizes.
620  */
621 size_t rpc_max_payload(struct rpc_clnt *clnt)
622 {
623         return clnt->cl_xprt->max_payload;
624 }
625 EXPORT_SYMBOL_GPL(rpc_max_payload);
626
627 /**
628  * rpc_force_rebind - force transport to check that remote port is unchanged
629  * @clnt: client to rebind
630  *
631  */
632 void rpc_force_rebind(struct rpc_clnt *clnt)
633 {
634         if (clnt->cl_autobind)
635                 xprt_clear_bound(clnt->cl_xprt);
636 }
637 EXPORT_SYMBOL_GPL(rpc_force_rebind);
638
639 /*
640  * Restart an (async) RPC call. Usually called from within the
641  * exit handler.
642  */
643 void
644 rpc_restart_call(struct rpc_task *task)
645 {
646         if (RPC_ASSASSINATED(task))
647                 return;
648
649         task->tk_action = call_start;
650 }
651
652 /*
653  * 0.  Initial state
654  *
655  *     Other FSM states can be visited zero or more times, but
656  *     this state is visited exactly once for each RPC.
657  */
658 static void
659 call_start(struct rpc_task *task)
660 {
661         struct rpc_clnt *clnt = task->tk_client;
662
663         dprintk("RPC: %5u call_start %s%d proc %d (%s)\n", task->tk_pid,
664                         clnt->cl_protname, clnt->cl_vers,
665                         task->tk_msg.rpc_proc->p_proc,
666                         (RPC_IS_ASYNC(task) ? "async" : "sync"));
667
668         /* Increment call count */
669         task->tk_msg.rpc_proc->p_count++;
670         clnt->cl_stats->rpccnt++;
671         task->tk_action = call_reserve;
672 }
673
674 /*
675  * 1.   Reserve an RPC call slot
676  */
677 static void
678 call_reserve(struct rpc_task *task)
679 {
680         dprint_status(task);
681
682         if (!rpcauth_uptodatecred(task)) {
683                 task->tk_action = call_refresh;
684                 return;
685         }
686
687         task->tk_status  = 0;
688         task->tk_action  = call_reserveresult;
689         xprt_reserve(task);
690 }
691
692 /*
693  * 1b.  Grok the result of xprt_reserve()
694  */
695 static void
696 call_reserveresult(struct rpc_task *task)
697 {
698         int status = task->tk_status;
699
700         dprint_status(task);
701
702         /*
703          * After a call to xprt_reserve(), we must have either
704          * a request slot or else an error status.
705          */
706         task->tk_status = 0;
707         if (status >= 0) {
708                 if (task->tk_rqstp) {
709                         task->tk_action = call_allocate;
710                         return;
711                 }
712
713                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
714                                 __FUNCTION__, status);
715                 rpc_exit(task, -EIO);
716                 return;
717         }
718
719         /*
720          * Even though there was an error, we may have acquired
721          * a request slot somehow.  Make sure not to leak it.
722          */
723         if (task->tk_rqstp) {
724                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
725                                 __FUNCTION__, status);
726                 xprt_release(task);
727         }
728
729         switch (status) {
730         case -EAGAIN:   /* woken up; retry */
731                 task->tk_action = call_reserve;
732                 return;
733         case -EIO:      /* probably a shutdown */
734                 break;
735         default:
736                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
737                                 __FUNCTION__, status);
738                 break;
739         }
740         rpc_exit(task, status);
741 }
742
743 /*
744  * 2.   Allocate the buffer. For details, see sched.c:rpc_malloc.
745  *      (Note: buffer memory is freed in xprt_release).
746  */
747 static void
748 call_allocate(struct rpc_task *task)
749 {
750         struct rpc_rqst *req = task->tk_rqstp;
751         struct rpc_xprt *xprt = task->tk_xprt;
752         unsigned int    bufsiz;
753
754         dprint_status(task);
755
756         task->tk_action = call_bind;
757         if (req->rq_buffer)
758                 return;
759
760         /* FIXME: compute buffer requirements more exactly using
761          * auth->au_wslack */
762         bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
763
764         if (xprt->ops->buf_alloc(task, bufsiz << 1) != NULL)
765                 return;
766
767         dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
768
769         if (RPC_IS_ASYNC(task) || !signalled()) {
770                 xprt_release(task);
771                 task->tk_action = call_reserve;
772                 rpc_delay(task, HZ>>4);
773                 return;
774         }
775
776         rpc_exit(task, -ERESTARTSYS);
777 }
778
779 static inline int
780 rpc_task_need_encode(struct rpc_task *task)
781 {
782         return task->tk_rqstp->rq_snd_buf.len == 0;
783 }
784
785 static inline void
786 rpc_task_force_reencode(struct rpc_task *task)
787 {
788         task->tk_rqstp->rq_snd_buf.len = 0;
789 }
790
791 /*
792  * 3.   Encode arguments of an RPC call
793  */
794 static void
795 call_encode(struct rpc_task *task)
796 {
797         struct rpc_rqst *req = task->tk_rqstp;
798         struct xdr_buf *sndbuf = &req->rq_snd_buf;
799         struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
800         unsigned int    bufsiz;
801         kxdrproc_t      encode;
802         __be32          *p;
803
804         dprint_status(task);
805
806         /* Default buffer setup */
807         bufsiz = req->rq_bufsize >> 1;
808         sndbuf->head[0].iov_base = (void *)req->rq_buffer;
809         sndbuf->head[0].iov_len  = bufsiz;
810         sndbuf->tail[0].iov_len  = 0;
811         sndbuf->page_len         = 0;
812         sndbuf->len              = 0;
813         sndbuf->buflen           = bufsiz;
814         rcvbuf->head[0].iov_base = (void *)((char *)req->rq_buffer + bufsiz);
815         rcvbuf->head[0].iov_len  = bufsiz;
816         rcvbuf->tail[0].iov_len  = 0;
817         rcvbuf->page_len         = 0;
818         rcvbuf->len              = 0;
819         rcvbuf->buflen           = bufsiz;
820
821         /* Encode header and provided arguments */
822         encode = task->tk_msg.rpc_proc->p_encode;
823         if (!(p = call_header(task))) {
824                 printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
825                 rpc_exit(task, -EIO);
826                 return;
827         }
828         if (encode == NULL)
829                 return;
830
831         lock_kernel();
832         task->tk_status = rpcauth_wrap_req(task, encode, req, p,
833                         task->tk_msg.rpc_argp);
834         unlock_kernel();
835         if (task->tk_status == -ENOMEM) {
836                 /* XXX: Is this sane? */
837                 rpc_delay(task, 3*HZ);
838                 task->tk_status = -EAGAIN;
839         }
840 }
841
842 /*
843  * 4.   Get the server port number if not yet set
844  */
845 static void
846 call_bind(struct rpc_task *task)
847 {
848         struct rpc_xprt *xprt = task->tk_xprt;
849
850         dprint_status(task);
851
852         task->tk_action = call_connect;
853         if (!xprt_bound(xprt)) {
854                 task->tk_action = call_bind_status;
855                 task->tk_timeout = xprt->bind_timeout;
856                 xprt->ops->rpcbind(task);
857         }
858 }
859
860 /*
861  * 4a.  Sort out bind result
862  */
863 static void
864 call_bind_status(struct rpc_task *task)
865 {
866         int status = -EACCES;
867
868         if (task->tk_status >= 0) {
869                 dprint_status(task);
870                 task->tk_status = 0;
871                 task->tk_action = call_connect;
872                 return;
873         }
874
875         switch (task->tk_status) {
876         case -EACCES:
877                 dprintk("RPC: %5u remote rpcbind: RPC program/version "
878                                 "unavailable\n", task->tk_pid);
879                 rpc_delay(task, 3*HZ);
880                 goto retry_timeout;
881         case -ETIMEDOUT:
882                 dprintk("RPC: %5u rpcbind request timed out\n",
883                                 task->tk_pid);
884                 goto retry_timeout;
885         case -EPFNOSUPPORT:
886                 dprintk("RPC: %5u remote rpcbind service unavailable\n",
887                                 task->tk_pid);
888                 break;
889         case -EPROTONOSUPPORT:
890                 dprintk("RPC: %5u remote rpcbind version 2 unavailable\n",
891                                 task->tk_pid);
892                 break;
893         default:
894                 dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
895                                 task->tk_pid, -task->tk_status);
896                 status = -EIO;
897         }
898
899         rpc_exit(task, status);
900         return;
901
902 retry_timeout:
903         task->tk_action = call_timeout;
904 }
905
906 /*
907  * 4b.  Connect to the RPC server
908  */
909 static void
910 call_connect(struct rpc_task *task)
911 {
912         struct rpc_xprt *xprt = task->tk_xprt;
913
914         dprintk("RPC: %5u call_connect xprt %p %s connected\n",
915                         task->tk_pid, xprt,
916                         (xprt_connected(xprt) ? "is" : "is not"));
917
918         task->tk_action = call_transmit;
919         if (!xprt_connected(xprt)) {
920                 task->tk_action = call_connect_status;
921                 if (task->tk_status < 0)
922                         return;
923                 xprt_connect(task);
924         }
925 }
926
927 /*
928  * 4c.  Sort out connect result
929  */
930 static void
931 call_connect_status(struct rpc_task *task)
932 {
933         struct rpc_clnt *clnt = task->tk_client;
934         int status = task->tk_status;
935
936         dprint_status(task);
937
938         task->tk_status = 0;
939         if (status >= 0) {
940                 clnt->cl_stats->netreconn++;
941                 task->tk_action = call_transmit;
942                 return;
943         }
944
945         /* Something failed: remote service port may have changed */
946         rpc_force_rebind(clnt);
947
948         switch (status) {
949         case -ENOTCONN:
950         case -EAGAIN:
951                 task->tk_action = call_bind;
952                 if (!RPC_IS_SOFT(task))
953                         return;
954                 /* if soft mounted, test if we've timed out */
955         case -ETIMEDOUT:
956                 task->tk_action = call_timeout;
957                 return;
958         }
959         rpc_exit(task, -EIO);
960 }
961
962 /*
963  * 5.   Transmit the RPC request, and wait for reply
964  */
965 static void
966 call_transmit(struct rpc_task *task)
967 {
968         dprint_status(task);
969
970         task->tk_action = call_status;
971         if (task->tk_status < 0)
972                 return;
973         task->tk_status = xprt_prepare_transmit(task);
974         if (task->tk_status != 0)
975                 return;
976         task->tk_action = call_transmit_status;
977         /* Encode here so that rpcsec_gss can use correct sequence number. */
978         if (rpc_task_need_encode(task)) {
979                 BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
980                 call_encode(task);
981                 /* Did the encode result in an error condition? */
982                 if (task->tk_status != 0)
983                         return;
984         }
985         xprt_transmit(task);
986         if (task->tk_status < 0)
987                 return;
988         /*
989          * On success, ensure that we call xprt_end_transmit() before sleeping
990          * in order to allow access to the socket to other RPC requests.
991          */
992         call_transmit_status(task);
993         if (task->tk_msg.rpc_proc->p_decode != NULL)
994                 return;
995         task->tk_action = rpc_exit_task;
996         rpc_wake_up_task(task);
997 }
998
999 /*
1000  * 5a.  Handle cleanup after a transmission
1001  */
1002 static void
1003 call_transmit_status(struct rpc_task *task)
1004 {
1005         task->tk_action = call_status;
1006         /*
1007          * Special case: if we've been waiting on the socket's write_space()
1008          * callback, then don't call xprt_end_transmit().
1009          */
1010         if (task->tk_status == -EAGAIN)
1011                 return;
1012         xprt_end_transmit(task);
1013         rpc_task_force_reencode(task);
1014 }
1015
1016 /*
1017  * 6.   Sort out the RPC call status
1018  */
1019 static void
1020 call_status(struct rpc_task *task)
1021 {
1022         struct rpc_clnt *clnt = task->tk_client;
1023         struct rpc_rqst *req = task->tk_rqstp;
1024         int             status;
1025
1026         if (req->rq_received > 0 && !req->rq_bytes_sent)
1027                 task->tk_status = req->rq_received;
1028
1029         dprint_status(task);
1030
1031         status = task->tk_status;
1032         if (status >= 0) {
1033                 task->tk_action = call_decode;
1034                 return;
1035         }
1036
1037         task->tk_status = 0;
1038         switch(status) {
1039         case -EHOSTDOWN:
1040         case -EHOSTUNREACH:
1041         case -ENETUNREACH:
1042                 /*
1043                  * Delay any retries for 3 seconds, then handle as if it
1044                  * were a timeout.
1045                  */
1046                 rpc_delay(task, 3*HZ);
1047         case -ETIMEDOUT:
1048                 task->tk_action = call_timeout;
1049                 break;
1050         case -ECONNREFUSED:
1051         case -ENOTCONN:
1052                 rpc_force_rebind(clnt);
1053                 task->tk_action = call_bind;
1054                 break;
1055         case -EAGAIN:
1056                 task->tk_action = call_transmit;
1057                 break;
1058         case -EIO:
1059                 /* shutdown or soft timeout */
1060                 rpc_exit(task, status);
1061                 break;
1062         default:
1063                 printk("%s: RPC call returned error %d\n",
1064                                clnt->cl_protname, -status);
1065                 rpc_exit(task, status);
1066         }
1067 }
1068
1069 /*
1070  * 6a.  Handle RPC timeout
1071  *      We do not release the request slot, so we keep using the
1072  *      same XID for all retransmits.
1073  */
1074 static void
1075 call_timeout(struct rpc_task *task)
1076 {
1077         struct rpc_clnt *clnt = task->tk_client;
1078
1079         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1080                 dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1081                 goto retry;
1082         }
1083
1084         dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1085         task->tk_timeouts++;
1086
1087         if (RPC_IS_SOFT(task)) {
1088                 printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1089                                 clnt->cl_protname, clnt->cl_server);
1090                 rpc_exit(task, -EIO);
1091                 return;
1092         }
1093
1094         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1095                 task->tk_flags |= RPC_CALL_MAJORSEEN;
1096                 printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1097                         clnt->cl_protname, clnt->cl_server);
1098         }
1099         rpc_force_rebind(clnt);
1100
1101 retry:
1102         clnt->cl_stats->rpcretrans++;
1103         task->tk_action = call_bind;
1104         task->tk_status = 0;
1105 }
1106
1107 /*
1108  * 7.   Decode the RPC reply
1109  */
1110 static void
1111 call_decode(struct rpc_task *task)
1112 {
1113         struct rpc_clnt *clnt = task->tk_client;
1114         struct rpc_rqst *req = task->tk_rqstp;
1115         kxdrproc_t      decode = task->tk_msg.rpc_proc->p_decode;
1116         __be32          *p;
1117
1118         dprintk("RPC: %5u call_decode (status %d)\n",
1119                         task->tk_pid, task->tk_status);
1120
1121         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1122                 printk(KERN_NOTICE "%s: server %s OK\n",
1123                         clnt->cl_protname, clnt->cl_server);
1124                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1125         }
1126
1127         if (task->tk_status < 12) {
1128                 if (!RPC_IS_SOFT(task)) {
1129                         task->tk_action = call_bind;
1130                         clnt->cl_stats->rpcretrans++;
1131                         goto out_retry;
1132                 }
1133                 dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
1134                                 clnt->cl_protname, task->tk_status);
1135                 task->tk_action = call_timeout;
1136                 goto out_retry;
1137         }
1138
1139         /*
1140          * Ensure that we see all writes made by xprt_complete_rqst()
1141          * before it changed req->rq_received.
1142          */
1143         smp_rmb();
1144         req->rq_rcv_buf.len = req->rq_private_buf.len;
1145
1146         /* Check that the softirq receive buffer is valid */
1147         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1148                                 sizeof(req->rq_rcv_buf)) != 0);
1149
1150         /* Verify the RPC header */
1151         p = call_verify(task);
1152         if (IS_ERR(p)) {
1153                 if (p == ERR_PTR(-EAGAIN))
1154                         goto out_retry;
1155                 return;
1156         }
1157
1158         task->tk_action = rpc_exit_task;
1159
1160         if (decode) {
1161                 lock_kernel();
1162                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1163                                                       task->tk_msg.rpc_resp);
1164                 unlock_kernel();
1165         }
1166         dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
1167                         task->tk_status);
1168         return;
1169 out_retry:
1170         req->rq_received = req->rq_private_buf.len = 0;
1171         task->tk_status = 0;
1172 }
1173
1174 /*
1175  * 8.   Refresh the credentials if rejected by the server
1176  */
1177 static void
1178 call_refresh(struct rpc_task *task)
1179 {
1180         dprint_status(task);
1181
1182         xprt_release(task);     /* Must do to obtain new XID */
1183         task->tk_action = call_refreshresult;
1184         task->tk_status = 0;
1185         task->tk_client->cl_stats->rpcauthrefresh++;
1186         rpcauth_refreshcred(task);
1187 }
1188
1189 /*
1190  * 8a.  Process the results of a credential refresh
1191  */
1192 static void
1193 call_refreshresult(struct rpc_task *task)
1194 {
1195         int status = task->tk_status;
1196
1197         dprint_status(task);
1198
1199         task->tk_status = 0;
1200         task->tk_action = call_reserve;
1201         if (status >= 0 && rpcauth_uptodatecred(task))
1202                 return;
1203         if (status == -EACCES) {
1204                 rpc_exit(task, -EACCES);
1205                 return;
1206         }
1207         task->tk_action = call_refresh;
1208         if (status != -ETIMEDOUT)
1209                 rpc_delay(task, 3*HZ);
1210         return;
1211 }
1212
1213 /*
1214  * Call header serialization
1215  */
1216 static __be32 *
1217 call_header(struct rpc_task *task)
1218 {
1219         struct rpc_clnt *clnt = task->tk_client;
1220         struct rpc_rqst *req = task->tk_rqstp;
1221         __be32          *p = req->rq_svec[0].iov_base;
1222
1223         /* FIXME: check buffer size? */
1224
1225         p = xprt_skip_transport_header(task->tk_xprt, p);
1226         *p++ = req->rq_xid;             /* XID */
1227         *p++ = htonl(RPC_CALL);         /* CALL */
1228         *p++ = htonl(RPC_VERSION);      /* RPC version */
1229         *p++ = htonl(clnt->cl_prog);    /* program number */
1230         *p++ = htonl(clnt->cl_vers);    /* program version */
1231         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
1232         p = rpcauth_marshcred(task, p);
1233         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1234         return p;
1235 }
1236
1237 /*
1238  * Reply header verification
1239  */
1240 static __be32 *
1241 call_verify(struct rpc_task *task)
1242 {
1243         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1244         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1245         __be32  *p = iov->iov_base;
1246         u32 n;
1247         int error = -EACCES;
1248
1249         if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
1250                 /* RFC-1014 says that the representation of XDR data must be a
1251                  * multiple of four bytes
1252                  * - if it isn't pointer subtraction in the NFS client may give
1253                  *   undefined results
1254                  */
1255                 printk(KERN_WARNING
1256                        "call_verify: XDR representation not a multiple of"
1257                        " 4 bytes: 0x%x\n", task->tk_rqstp->rq_rcv_buf.len);
1258                 goto out_eio;
1259         }
1260         if ((len -= 3) < 0)
1261                 goto out_overflow;
1262         p += 1; /* skip XID */
1263
1264         if ((n = ntohl(*p++)) != RPC_REPLY) {
1265                 printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
1266                 goto out_garbage;
1267         }
1268         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1269                 if (--len < 0)
1270                         goto out_overflow;
1271                 switch ((n = ntohl(*p++))) {
1272                         case RPC_AUTH_ERROR:
1273                                 break;
1274                         case RPC_MISMATCH:
1275                                 dprintk("RPC: %5u %s: RPC call version "
1276                                                 "mismatch!\n",
1277                                                 task->tk_pid, __FUNCTION__);
1278                                 error = -EPROTONOSUPPORT;
1279                                 goto out_err;
1280                         default:
1281                                 dprintk("RPC: %5u %s: RPC call rejected, "
1282                                                 "unknown error: %x\n",
1283                                                 task->tk_pid, __FUNCTION__, n);
1284                                 goto out_eio;
1285                 }
1286                 if (--len < 0)
1287                         goto out_overflow;
1288                 switch ((n = ntohl(*p++))) {
1289                 case RPC_AUTH_REJECTEDCRED:
1290                 case RPC_AUTH_REJECTEDVERF:
1291                 case RPCSEC_GSS_CREDPROBLEM:
1292                 case RPCSEC_GSS_CTXPROBLEM:
1293                         if (!task->tk_cred_retry)
1294                                 break;
1295                         task->tk_cred_retry--;
1296                         dprintk("RPC: %5u %s: retry stale creds\n",
1297                                         task->tk_pid, __FUNCTION__);
1298                         rpcauth_invalcred(task);
1299                         task->tk_action = call_refresh;
1300                         goto out_retry;
1301                 case RPC_AUTH_BADCRED:
1302                 case RPC_AUTH_BADVERF:
1303                         /* possibly garbled cred/verf? */
1304                         if (!task->tk_garb_retry)
1305                                 break;
1306                         task->tk_garb_retry--;
1307                         dprintk("RPC: %5u %s: retry garbled creds\n",
1308                                         task->tk_pid, __FUNCTION__);
1309                         task->tk_action = call_bind;
1310                         goto out_retry;
1311                 case RPC_AUTH_TOOWEAK:
1312                         printk(KERN_NOTICE "call_verify: server %s requires stronger "
1313                                "authentication.\n", task->tk_client->cl_server);
1314                         break;
1315                 default:
1316                         printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
1317                         error = -EIO;
1318                 }
1319                 dprintk("RPC: %5u %s: call rejected %d\n",
1320                                 task->tk_pid, __FUNCTION__, n);
1321                 goto out_err;
1322         }
1323         if (!(p = rpcauth_checkverf(task, p))) {
1324                 printk(KERN_WARNING "call_verify: auth check failed\n");
1325                 goto out_garbage;               /* bad verifier, retry */
1326         }
1327         len = p - (__be32 *)iov->iov_base - 1;
1328         if (len < 0)
1329                 goto out_overflow;
1330         switch ((n = ntohl(*p++))) {
1331         case RPC_SUCCESS:
1332                 return p;
1333         case RPC_PROG_UNAVAIL:
1334                 dprintk("RPC: %5u %s: program %u is unsupported by server %s\n",
1335                                 task->tk_pid, __FUNCTION__,
1336                                 (unsigned int)task->tk_client->cl_prog,
1337                                 task->tk_client->cl_server);
1338                 error = -EPFNOSUPPORT;
1339                 goto out_err;
1340         case RPC_PROG_MISMATCH:
1341                 dprintk("RPC: %5u %s: program %u, version %u unsupported by "
1342                                 "server %s\n", task->tk_pid, __FUNCTION__,
1343                                 (unsigned int)task->tk_client->cl_prog,
1344                                 (unsigned int)task->tk_client->cl_vers,
1345                                 task->tk_client->cl_server);
1346                 error = -EPROTONOSUPPORT;
1347                 goto out_err;
1348         case RPC_PROC_UNAVAIL:
1349                 dprintk("RPC: %5u %s: proc %p unsupported by program %u, "
1350                                 "version %u on server %s\n",
1351                                 task->tk_pid, __FUNCTION__,
1352                                 task->tk_msg.rpc_proc,
1353                                 task->tk_client->cl_prog,
1354                                 task->tk_client->cl_vers,
1355                                 task->tk_client->cl_server);
1356                 error = -EOPNOTSUPP;
1357                 goto out_err;
1358         case RPC_GARBAGE_ARGS:
1359                 dprintk("RPC: %5u %s: server saw garbage\n",
1360                                 task->tk_pid, __FUNCTION__);
1361                 break;                  /* retry */
1362         default:
1363                 printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
1364                 /* Also retry */
1365         }
1366
1367 out_garbage:
1368         task->tk_client->cl_stats->rpcgarbage++;
1369         if (task->tk_garb_retry) {
1370                 task->tk_garb_retry--;
1371                 dprintk("RPC: %5u %s: retrying\n",
1372                                 task->tk_pid, __FUNCTION__);
1373                 task->tk_action = call_bind;
1374 out_retry:
1375                 return ERR_PTR(-EAGAIN);
1376         }
1377         printk(KERN_WARNING "RPC %s: retry failed, exit EIO\n", __FUNCTION__);
1378 out_eio:
1379         error = -EIO;
1380 out_err:
1381         rpc_exit(task, error);
1382         return ERR_PTR(error);
1383 out_overflow:
1384         printk(KERN_WARNING "RPC %s: server reply was truncated.\n", __FUNCTION__);
1385         goto out_garbage;
1386 }
1387
1388 static int rpcproc_encode_null(void *rqstp, __be32 *data, void *obj)
1389 {
1390         return 0;
1391 }
1392
1393 static int rpcproc_decode_null(void *rqstp, __be32 *data, void *obj)
1394 {
1395         return 0;
1396 }
1397
1398 static struct rpc_procinfo rpcproc_null = {
1399         .p_encode = rpcproc_encode_null,
1400         .p_decode = rpcproc_decode_null,
1401 };
1402
1403 int rpc_ping(struct rpc_clnt *clnt, int flags)
1404 {
1405         struct rpc_message msg = {
1406                 .rpc_proc = &rpcproc_null,
1407         };
1408         int err;
1409         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
1410         err = rpc_call_sync(clnt, &msg, flags);
1411         put_rpccred(msg.rpc_cred);
1412         return err;
1413 }