2  *  linux/net/sunrpc/clnt.c
 
   4  *  This file contains the high-level RPC interface.
 
   5  *  It is modeled as a finite state machine to support both synchronous
 
   6  *  and asynchronous requests.
 
   8  *  -   RPC header generation and argument serialization.
 
   9  *  -   Credential refresh.
 
  10  *  -   TCP connect handling.
 
  11  *  -   Retry of operation when it is suspected the operation failed because
 
  12  *      of uid squashing on the server, or when the credentials were stale
 
  13  *      and need to be refreshed, or when a packet was damaged in transit.
 
  14  *      This may be have to be moved to the VFS layer.
 
  16  *  NB: BSD uses a more intelligent approach to guessing when a request
 
  17  *  or reply has been lost by keeping the RTO estimate for each procedure.
 
  18  *  We currently make do with a constant timeout value.
 
  20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
 
  21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
 
  24 #include <asm/system.h>
 
  26 #include <linux/module.h>
 
  27 #include <linux/types.h>
 
  28 #include <linux/kallsyms.h>
 
  30 #include <linux/slab.h>
 
  31 #include <linux/smp_lock.h>
 
  32 #include <linux/utsname.h>
 
  33 #include <linux/workqueue.h>
 
  34 #include <linux/in6.h>
 
  36 #include <linux/sunrpc/clnt.h>
 
  37 #include <linux/sunrpc/rpc_pipe_fs.h>
 
  38 #include <linux/sunrpc/metrics.h>
 
  42 # define RPCDBG_FACILITY        RPCDBG_CALL
 
  45 #define dprint_status(t)                                        \
 
  46         dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
 
  47                         __func__, t->tk_status)
 
  50  * All RPC clients are linked into this list
 
  52 static LIST_HEAD(all_clients);
 
  53 static DEFINE_SPINLOCK(rpc_client_lock);
 
  55 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
 
  58 static void     call_start(struct rpc_task *task);
 
  59 static void     call_reserve(struct rpc_task *task);
 
  60 static void     call_reserveresult(struct rpc_task *task);
 
  61 static void     call_allocate(struct rpc_task *task);
 
  62 static void     call_decode(struct rpc_task *task);
 
  63 static void     call_bind(struct rpc_task *task);
 
  64 static void     call_bind_status(struct rpc_task *task);
 
  65 static void     call_transmit(struct rpc_task *task);
 
  66 static void     call_status(struct rpc_task *task);
 
  67 static void     call_transmit_status(struct rpc_task *task);
 
  68 static void     call_refresh(struct rpc_task *task);
 
  69 static void     call_refreshresult(struct rpc_task *task);
 
  70 static void     call_timeout(struct rpc_task *task);
 
  71 static void     call_connect(struct rpc_task *task);
 
  72 static void     call_connect_status(struct rpc_task *task);
 
  74 static __be32   *rpc_encode_header(struct rpc_task *task);
 
  75 static __be32   *rpc_verify_header(struct rpc_task *task);
 
  76 static int      rpc_ping(struct rpc_clnt *clnt, int flags);
 
  78 static void rpc_register_client(struct rpc_clnt *clnt)
 
  80         spin_lock(&rpc_client_lock);
 
  81         list_add(&clnt->cl_clients, &all_clients);
 
  82         spin_unlock(&rpc_client_lock);
 
  85 static void rpc_unregister_client(struct rpc_clnt *clnt)
 
  87         spin_lock(&rpc_client_lock);
 
  88         list_del(&clnt->cl_clients);
 
  89         spin_unlock(&rpc_client_lock);
 
  93 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
 
  95         static uint32_t clntid;
 
  98         clnt->cl_vfsmnt = ERR_PTR(-ENOENT);
 
  99         clnt->cl_dentry = ERR_PTR(-ENOENT);
 
 100         if (dir_name == NULL)
 
 103         clnt->cl_vfsmnt = rpc_get_mount();
 
 104         if (IS_ERR(clnt->cl_vfsmnt))
 
 105                 return PTR_ERR(clnt->cl_vfsmnt);
 
 108                 snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
 
 109                                 "%s/clnt%x", dir_name,
 
 110                                 (unsigned int)clntid++);
 
 111                 clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
 
 112                 clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
 
 113                 if (!IS_ERR(clnt->cl_dentry))
 
 115                 error = PTR_ERR(clnt->cl_dentry);
 
 116                 if (error != -EEXIST) {
 
 117                         printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
 
 118                                         clnt->cl_pathname, error);
 
 125 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
 
 127         struct rpc_program      *program = args->program;
 
 128         struct rpc_version      *version;
 
 129         struct rpc_clnt         *clnt = NULL;
 
 130         struct rpc_auth         *auth;
 
 134         /* sanity check the name before trying to print it */
 
 136         len = strlen(args->servername);
 
 137         if (len > RPC_MAXNETNAMELEN)
 
 141         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
 
 142                         program->name, args->servername, xprt);
 
 151         if (args->version >= program->nrvers)
 
 153         version = program->version[args->version];
 
 158         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
 
 161         clnt->cl_parent = clnt;
 
 163         clnt->cl_server = clnt->cl_inline_name;
 
 164         if (len > sizeof(clnt->cl_inline_name)) {
 
 165                 char *buf = kmalloc(len, GFP_KERNEL);
 
 167                         clnt->cl_server = buf;
 
 169                         len = sizeof(clnt->cl_inline_name);
 
 171         strlcpy(clnt->cl_server, args->servername, len);
 
 173         clnt->cl_xprt     = xprt;
 
 174         clnt->cl_procinfo = version->procs;
 
 175         clnt->cl_maxproc  = version->nrprocs;
 
 176         clnt->cl_protname = program->name;
 
 177         clnt->cl_prog     = program->number;
 
 178         clnt->cl_vers     = version->number;
 
 179         clnt->cl_stats    = program->stats;
 
 180         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
 
 182         if (clnt->cl_metrics == NULL)
 
 184         clnt->cl_program  = program;
 
 185         INIT_LIST_HEAD(&clnt->cl_tasks);
 
 186         spin_lock_init(&clnt->cl_lock);
 
 188         if (!xprt_bound(clnt->cl_xprt))
 
 189                 clnt->cl_autobind = 1;
 
 191         clnt->cl_timeout = xprt->timeout;
 
 192         if (args->timeout != NULL) {
 
 193                 memcpy(&clnt->cl_timeout_default, args->timeout,
 
 194                                 sizeof(clnt->cl_timeout_default));
 
 195                 clnt->cl_timeout = &clnt->cl_timeout_default;
 
 198         clnt->cl_rtt = &clnt->cl_rtt_default;
 
 199         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
 
 201         kref_init(&clnt->cl_kref);
 
 203         err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
 
 207         auth = rpcauth_create(args->authflavor, clnt);
 
 209                 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
 
 215         /* save the nodename */
 
 216         clnt->cl_nodelen = strlen(utsname()->nodename);
 
 217         if (clnt->cl_nodelen > UNX_MAXNODENAME)
 
 218                 clnt->cl_nodelen = UNX_MAXNODENAME;
 
 219         memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen);
 
 220         rpc_register_client(clnt);
 
 224         if (!IS_ERR(clnt->cl_dentry)) {
 
 225                 rpc_rmdir(clnt->cl_dentry);
 
 229         rpc_free_iostats(clnt->cl_metrics);
 
 231         if (clnt->cl_server != clnt->cl_inline_name)
 
 232                 kfree(clnt->cl_server);
 
 243  * rpc_create - create an RPC client and transport with one call
 
 244  * @args: rpc_clnt create argument structure
 
 246  * Creates and initializes an RPC transport and an RPC client.
 
 248  * It can ping the server in order to determine if it is up, and to see if
 
 249  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
 
 250  * this behavior so asynchronous tasks can also use rpc_create.
 
 252 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
 
 254         struct rpc_xprt *xprt;
 
 255         struct rpc_clnt *clnt;
 
 256         struct xprt_create xprtargs = {
 
 257                 .ident = args->protocol,
 
 258                 .srcaddr = args->saddress,
 
 259                 .dstaddr = args->address,
 
 260                 .addrlen = args->addrsize,
 
 265          * If the caller chooses not to specify a hostname, whip
 
 266          * up a string representation of the passed-in address.
 
 268         if (args->servername == NULL) {
 
 269                 servername[0] = '\0';
 
 270                 switch (args->address->sa_family) {
 
 272                         struct sockaddr_in *sin =
 
 273                                         (struct sockaddr_in *)args->address;
 
 274                         snprintf(servername, sizeof(servername), NIPQUAD_FMT,
 
 275                                  NIPQUAD(sin->sin_addr.s_addr));
 
 279                         struct sockaddr_in6 *sin =
 
 280                                         (struct sockaddr_in6 *)args->address;
 
 281                         snprintf(servername, sizeof(servername), NIP6_FMT,
 
 282                                  NIP6(sin->sin6_addr));
 
 286                         /* caller wants default server name, but
 
 287                          * address family isn't recognized. */
 
 288                         return ERR_PTR(-EINVAL);
 
 290                 args->servername = servername;
 
 293         xprt = xprt_create_transport(&xprtargs);
 
 295                 return (struct rpc_clnt *)xprt;
 
 298          * By default, kernel RPC client connects from a reserved port.
 
 299          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
 
 300          * but it is always enabled for rpciod, which handles the connect
 
 304         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
 
 307         clnt = rpc_new_client(args, xprt);
 
 311         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
 
 312                 int err = rpc_ping(clnt, RPC_TASK_SOFT);
 
 314                         rpc_shutdown_client(clnt);
 
 319         clnt->cl_softrtry = 1;
 
 320         if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
 
 321                 clnt->cl_softrtry = 0;
 
 323         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
 
 324                 clnt->cl_autobind = 1;
 
 325         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
 
 326                 clnt->cl_discrtry = 1;
 
 327         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
 
 332 EXPORT_SYMBOL_GPL(rpc_create);
 
 335  * This function clones the RPC client structure. It allows us to share the
 
 336  * same transport while varying parameters such as the authentication
 
 340 rpc_clone_client(struct rpc_clnt *clnt)
 
 342         struct rpc_clnt *new;
 
 345         new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
 
 348         new->cl_parent = clnt;
 
 349         /* Turn off autobind on clones */
 
 350         new->cl_autobind = 0;
 
 351         INIT_LIST_HEAD(&new->cl_tasks);
 
 352         spin_lock_init(&new->cl_lock);
 
 353         rpc_init_rtt(&new->cl_rtt_default, clnt->cl_timeout->to_initval);
 
 354         new->cl_metrics = rpc_alloc_iostats(clnt);
 
 355         if (new->cl_metrics == NULL)
 
 357         kref_init(&new->cl_kref);
 
 358         err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
 
 362                 atomic_inc(&new->cl_auth->au_count);
 
 363         xprt_get(clnt->cl_xprt);
 
 364         kref_get(&clnt->cl_kref);
 
 365         rpc_register_client(new);
 
 369         rpc_free_iostats(new->cl_metrics);
 
 373         dprintk("RPC:       %s: returned error %d\n", __func__, err);
 
 376 EXPORT_SYMBOL_GPL(rpc_clone_client);
 
 379  * Properly shut down an RPC client, terminating all outstanding
 
 382 void rpc_shutdown_client(struct rpc_clnt *clnt)
 
 384         dprintk("RPC:       shutting down %s client for %s\n",
 
 385                         clnt->cl_protname, clnt->cl_server);
 
 387         while (!list_empty(&clnt->cl_tasks)) {
 
 388                 rpc_killall_tasks(clnt);
 
 389                 wait_event_timeout(destroy_wait,
 
 390                         list_empty(&clnt->cl_tasks), 1*HZ);
 
 393         rpc_release_client(clnt);
 
 395 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
 
 401 rpc_free_client(struct kref *kref)
 
 403         struct rpc_clnt *clnt = container_of(kref, struct rpc_clnt, cl_kref);
 
 405         dprintk("RPC:       destroying %s client for %s\n",
 
 406                         clnt->cl_protname, clnt->cl_server);
 
 407         if (!IS_ERR(clnt->cl_dentry)) {
 
 408                 rpc_rmdir(clnt->cl_dentry);
 
 411         if (clnt->cl_parent != clnt) {
 
 412                 rpc_release_client(clnt->cl_parent);
 
 415         if (clnt->cl_server != clnt->cl_inline_name)
 
 416                 kfree(clnt->cl_server);
 
 418         rpc_unregister_client(clnt);
 
 419         rpc_free_iostats(clnt->cl_metrics);
 
 420         clnt->cl_metrics = NULL;
 
 421         xprt_put(clnt->cl_xprt);
 
 430 rpc_free_auth(struct kref *kref)
 
 432         struct rpc_clnt *clnt = container_of(kref, struct rpc_clnt, cl_kref);
 
 434         if (clnt->cl_auth == NULL) {
 
 435                 rpc_free_client(kref);
 
 440          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
 
 441          *       release remaining GSS contexts. This mechanism ensures
 
 442          *       that it can do so safely.
 
 445         rpcauth_release(clnt->cl_auth);
 
 446         clnt->cl_auth = NULL;
 
 447         kref_put(kref, rpc_free_client);
 
 451  * Release reference to the RPC client
 
 454 rpc_release_client(struct rpc_clnt *clnt)
 
 456         dprintk("RPC:       rpc_release_client(%p)\n", clnt);
 
 458         if (list_empty(&clnt->cl_tasks))
 
 459                 wake_up(&destroy_wait);
 
 460         kref_put(&clnt->cl_kref, rpc_free_auth);
 
 464  * rpc_bind_new_program - bind a new RPC program to an existing client
 
 465  * @old: old rpc_client
 
 466  * @program: rpc program to set
 
 467  * @vers: rpc program version
 
 469  * Clones the rpc client and sets up a new RPC program. This is mainly
 
 470  * of use for enabling different RPC programs to share the same transport.
 
 471  * The Sun NFSv2/v3 ACL protocol can do this.
 
 473 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
 
 474                                       struct rpc_program *program,
 
 477         struct rpc_clnt *clnt;
 
 478         struct rpc_version *version;
 
 481         BUG_ON(vers >= program->nrvers || !program->version[vers]);
 
 482         version = program->version[vers];
 
 483         clnt = rpc_clone_client(old);
 
 486         clnt->cl_procinfo = version->procs;
 
 487         clnt->cl_maxproc  = version->nrprocs;
 
 488         clnt->cl_protname = program->name;
 
 489         clnt->cl_prog     = program->number;
 
 490         clnt->cl_vers     = version->number;
 
 491         clnt->cl_stats    = program->stats;
 
 492         err = rpc_ping(clnt, RPC_TASK_SOFT);
 
 494                 rpc_shutdown_client(clnt);
 
 500 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
 
 503  * Default callback for async RPC calls
 
 506 rpc_default_callback(struct rpc_task *task, void *data)
 
 510 static const struct rpc_call_ops rpc_default_ops = {
 
 511         .rpc_call_done = rpc_default_callback,
 
 515  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
 
 516  * @task_setup_data: pointer to task initialisation data
 
 518 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
 
 520         struct rpc_task *task, *ret;
 
 522         task = rpc_new_task(task_setup_data);
 
 524                 rpc_release_calldata(task_setup_data->callback_ops,
 
 525                                 task_setup_data->callback_data);
 
 526                 ret = ERR_PTR(-ENOMEM);
 
 530         if (task->tk_status != 0) {
 
 531                 ret = ERR_PTR(task->tk_status);
 
 535         atomic_inc(&task->tk_count);
 
 541 EXPORT_SYMBOL_GPL(rpc_run_task);
 
 544  * rpc_call_sync - Perform a synchronous RPC call
 
 545  * @clnt: pointer to RPC client
 
 546  * @msg: RPC call parameters
 
 547  * @flags: RPC call flags
 
 549 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
 
 551         struct rpc_task *task;
 
 552         struct rpc_task_setup task_setup_data = {
 
 555                 .callback_ops = &rpc_default_ops,
 
 560         BUG_ON(flags & RPC_TASK_ASYNC);
 
 562         task = rpc_run_task(&task_setup_data);
 
 564                 return PTR_ERR(task);
 
 565         status = task->tk_status;
 
 569 EXPORT_SYMBOL_GPL(rpc_call_sync);
 
 572  * rpc_call_async - Perform an asynchronous RPC call
 
 573  * @clnt: pointer to RPC client
 
 574  * @msg: RPC call parameters
 
 575  * @flags: RPC call flags
 
 576  * @tk_ops: RPC call ops
 
 577  * @data: user call data
 
 580 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
 
 581                const struct rpc_call_ops *tk_ops, void *data)
 
 583         struct rpc_task *task;
 
 584         struct rpc_task_setup task_setup_data = {
 
 587                 .callback_ops = tk_ops,
 
 588                 .callback_data = data,
 
 589                 .flags = flags|RPC_TASK_ASYNC,
 
 592         task = rpc_run_task(&task_setup_data);
 
 594                 return PTR_ERR(task);
 
 598 EXPORT_SYMBOL_GPL(rpc_call_async);
 
 601 rpc_call_start(struct rpc_task *task)
 
 603         task->tk_action = call_start;
 
 605 EXPORT_SYMBOL_GPL(rpc_call_start);
 
 608  * rpc_peeraddr - extract remote peer address from clnt's xprt
 
 609  * @clnt: RPC client structure
 
 610  * @buf: target buffer
 
 611  * @bufsize: length of target buffer
 
 613  * Returns the number of bytes that are actually in the stored address.
 
 615 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
 
 618         struct rpc_xprt *xprt = clnt->cl_xprt;
 
 620         bytes = sizeof(xprt->addr);
 
 623         memcpy(buf, &clnt->cl_xprt->addr, bytes);
 
 624         return xprt->addrlen;
 
 626 EXPORT_SYMBOL_GPL(rpc_peeraddr);
 
 629  * rpc_peeraddr2str - return remote peer address in printable format
 
 630  * @clnt: RPC client structure
 
 631  * @format: address format
 
 634 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
 
 635                              enum rpc_display_format_t format)
 
 637         struct rpc_xprt *xprt = clnt->cl_xprt;
 
 639         if (xprt->address_strings[format] != NULL)
 
 640                 return xprt->address_strings[format];
 
 642                 return "unprintable";
 
 644 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
 
 647 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
 
 649         struct rpc_xprt *xprt = clnt->cl_xprt;
 
 650         if (xprt->ops->set_buffer_size)
 
 651                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
 
 653 EXPORT_SYMBOL_GPL(rpc_setbufsize);
 
 656  * Return size of largest payload RPC client can support, in bytes
 
 658  * For stream transports, this is one RPC record fragment (see RFC
 
 659  * 1831), as we don't support multi-record requests yet.  For datagram
 
 660  * transports, this is the size of an IP packet minus the IP, UDP, and
 
 663 size_t rpc_max_payload(struct rpc_clnt *clnt)
 
 665         return clnt->cl_xprt->max_payload;
 
 667 EXPORT_SYMBOL_GPL(rpc_max_payload);
 
 670  * rpc_force_rebind - force transport to check that remote port is unchanged
 
 671  * @clnt: client to rebind
 
 674 void rpc_force_rebind(struct rpc_clnt *clnt)
 
 676         if (clnt->cl_autobind)
 
 677                 xprt_clear_bound(clnt->cl_xprt);
 
 679 EXPORT_SYMBOL_GPL(rpc_force_rebind);
 
 682  * Restart an (async) RPC call. Usually called from within the
 
 686 rpc_restart_call(struct rpc_task *task)
 
 688         if (RPC_ASSASSINATED(task))
 
 691         task->tk_action = call_start;
 
 693 EXPORT_SYMBOL_GPL(rpc_restart_call);
 
 696 static const char *rpc_proc_name(const struct rpc_task *task)
 
 698         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
 
 713  *     Other FSM states can be visited zero or more times, but
 
 714  *     this state is visited exactly once for each RPC.
 
 717 call_start(struct rpc_task *task)
 
 719         struct rpc_clnt *clnt = task->tk_client;
 
 721         dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
 
 722                         clnt->cl_protname, clnt->cl_vers,
 
 724                         (RPC_IS_ASYNC(task) ? "async" : "sync"));
 
 726         /* Increment call count */
 
 727         task->tk_msg.rpc_proc->p_count++;
 
 728         clnt->cl_stats->rpccnt++;
 
 729         task->tk_action = call_reserve;
 
 733  * 1.   Reserve an RPC call slot
 
 736 call_reserve(struct rpc_task *task)
 
 740         if (!rpcauth_uptodatecred(task)) {
 
 741                 task->tk_action = call_refresh;
 
 746         task->tk_action  = call_reserveresult;
 
 751  * 1b.  Grok the result of xprt_reserve()
 
 754 call_reserveresult(struct rpc_task *task)
 
 756         int status = task->tk_status;
 
 761          * After a call to xprt_reserve(), we must have either
 
 762          * a request slot or else an error status.
 
 766                 if (task->tk_rqstp) {
 
 767                         task->tk_action = call_allocate;
 
 771                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
 
 773                 rpc_exit(task, -EIO);
 
 778          * Even though there was an error, we may have acquired
 
 779          * a request slot somehow.  Make sure not to leak it.
 
 781         if (task->tk_rqstp) {
 
 782                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
 
 788         case -EAGAIN:   /* woken up; retry */
 
 789                 task->tk_action = call_reserve;
 
 791         case -EIO:      /* probably a shutdown */
 
 794                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
 
 798         rpc_exit(task, status);
 
 802  * 2.   Allocate the buffer. For details, see sched.c:rpc_malloc.
 
 803  *      (Note: buffer memory is freed in xprt_release).
 
 806 call_allocate(struct rpc_task *task)
 
 808         unsigned int slack = task->tk_msg.rpc_cred->cr_auth->au_cslack;
 
 809         struct rpc_rqst *req = task->tk_rqstp;
 
 810         struct rpc_xprt *xprt = task->tk_xprt;
 
 811         struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
 
 816         task->tk_action = call_bind;
 
 821         if (proc->p_proc != 0) {
 
 822                 BUG_ON(proc->p_arglen == 0);
 
 823                 if (proc->p_decode != NULL)
 
 824                         BUG_ON(proc->p_replen == 0);
 
 828          * Calculate the size (in quads) of the RPC call
 
 829          * and reply headers, and convert both values
 
 832         req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
 
 833         req->rq_callsize <<= 2;
 
 834         req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
 
 835         req->rq_rcvsize <<= 2;
 
 837         req->rq_buffer = xprt->ops->buf_alloc(task,
 
 838                                         req->rq_callsize + req->rq_rcvsize);
 
 839         if (req->rq_buffer != NULL)
 
 842         dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
 
 844         if (RPC_IS_ASYNC(task) || !signalled()) {
 
 845                 task->tk_action = call_allocate;
 
 846                 rpc_delay(task, HZ>>4);
 
 850         rpc_exit(task, -ERESTARTSYS);
 
 854 rpc_task_need_encode(struct rpc_task *task)
 
 856         return task->tk_rqstp->rq_snd_buf.len == 0;
 
 860 rpc_task_force_reencode(struct rpc_task *task)
 
 862         task->tk_rqstp->rq_snd_buf.len = 0;
 
 866 rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
 
 868         buf->head[0].iov_base = start;
 
 869         buf->head[0].iov_len = len;
 
 870         buf->tail[0].iov_len = 0;
 
 878  * 3.   Encode arguments of an RPC call
 
 881 rpc_xdr_encode(struct rpc_task *task)
 
 883         struct rpc_rqst *req = task->tk_rqstp;
 
 889         rpc_xdr_buf_init(&req->rq_snd_buf,
 
 892         rpc_xdr_buf_init(&req->rq_rcv_buf,
 
 893                          (char *)req->rq_buffer + req->rq_callsize,
 
 896         p = rpc_encode_header(task);
 
 898                 printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
 
 899                 rpc_exit(task, -EIO);
 
 903         encode = task->tk_msg.rpc_proc->p_encode;
 
 907         task->tk_status = rpcauth_wrap_req(task, encode, req, p,
 
 908                         task->tk_msg.rpc_argp);
 
 912  * 4.   Get the server port number if not yet set
 
 915 call_bind(struct rpc_task *task)
 
 917         struct rpc_xprt *xprt = task->tk_xprt;
 
 921         task->tk_action = call_connect;
 
 922         if (!xprt_bound(xprt)) {
 
 923                 task->tk_action = call_bind_status;
 
 924                 task->tk_timeout = xprt->bind_timeout;
 
 925                 xprt->ops->rpcbind(task);
 
 930  * 4a.  Sort out bind result
 
 933 call_bind_status(struct rpc_task *task)
 
 937         if (task->tk_status >= 0) {
 
 940                 task->tk_action = call_connect;
 
 944         switch (task->tk_status) {
 
 946                 dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
 
 947                 rpc_delay(task, HZ >> 2);
 
 950                 dprintk("RPC: %5u remote rpcbind: RPC program/version "
 
 951                                 "unavailable\n", task->tk_pid);
 
 952                 /* fail immediately if this is an RPC ping */
 
 953                 if (task->tk_msg.rpc_proc->p_proc == 0) {
 
 954                         status = -EOPNOTSUPP;
 
 957                 rpc_delay(task, 3*HZ);
 
 960                 dprintk("RPC: %5u rpcbind request timed out\n",
 
 964                 /* server doesn't support any rpcbind version we know of */
 
 965                 dprintk("RPC: %5u remote rpcbind service unavailable\n",
 
 968         case -EPROTONOSUPPORT:
 
 969                 dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
 
 972                 task->tk_action = call_bind;
 
 975                 dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
 
 976                                 task->tk_pid, -task->tk_status);
 
 979         rpc_exit(task, status);
 
 983         task->tk_action = call_timeout;
 
 987  * 4b.  Connect to the RPC server
 
 990 call_connect(struct rpc_task *task)
 
 992         struct rpc_xprt *xprt = task->tk_xprt;
 
 994         dprintk("RPC: %5u call_connect xprt %p %s connected\n",
 
 996                         (xprt_connected(xprt) ? "is" : "is not"));
 
 998         task->tk_action = call_transmit;
 
 999         if (!xprt_connected(xprt)) {
 
1000                 task->tk_action = call_connect_status;
 
1001                 if (task->tk_status < 0)
 
1008  * 4c.  Sort out connect result
 
1011 call_connect_status(struct rpc_task *task)
 
1013         struct rpc_clnt *clnt = task->tk_client;
 
1014         int status = task->tk_status;
 
1016         dprint_status(task);
 
1018         task->tk_status = 0;
 
1020                 clnt->cl_stats->netreconn++;
 
1021                 task->tk_action = call_transmit;
 
1025         /* Something failed: remote service port may have changed */
 
1026         rpc_force_rebind(clnt);
 
1031                 task->tk_action = call_bind;
 
1032                 if (!RPC_IS_SOFT(task))
 
1034                 /* if soft mounted, test if we've timed out */
 
1036                 task->tk_action = call_timeout;
 
1039         rpc_exit(task, -EIO);
 
1043  * 5.   Transmit the RPC request, and wait for reply
 
1046 call_transmit(struct rpc_task *task)
 
1048         dprint_status(task);
 
1050         task->tk_action = call_status;
 
1051         if (task->tk_status < 0)
 
1053         task->tk_status = xprt_prepare_transmit(task);
 
1054         if (task->tk_status != 0)
 
1056         task->tk_action = call_transmit_status;
 
1057         /* Encode here so that rpcsec_gss can use correct sequence number. */
 
1058         if (rpc_task_need_encode(task)) {
 
1059                 BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
 
1060                 rpc_xdr_encode(task);
 
1061                 /* Did the encode result in an error condition? */
 
1062                 if (task->tk_status != 0) {
 
1063                         /* Was the error nonfatal? */
 
1064                         if (task->tk_status == -EAGAIN)
 
1065                                 rpc_delay(task, HZ >> 4);
 
1067                                 rpc_exit(task, task->tk_status);
 
1071         xprt_transmit(task);
 
1072         if (task->tk_status < 0)
 
1075          * On success, ensure that we call xprt_end_transmit() before sleeping
 
1076          * in order to allow access to the socket to other RPC requests.
 
1078         call_transmit_status(task);
 
1079         if (task->tk_msg.rpc_proc->p_decode != NULL)
 
1081         task->tk_action = rpc_exit_task;
 
1082         rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
 
1086  * 5a.  Handle cleanup after a transmission
 
1089 call_transmit_status(struct rpc_task *task)
 
1091         task->tk_action = call_status;
 
1093          * Special case: if we've been waiting on the socket's write_space()
 
1094          * callback, then don't call xprt_end_transmit().
 
1096         if (task->tk_status == -EAGAIN)
 
1098         xprt_end_transmit(task);
 
1099         rpc_task_force_reencode(task);
 
1103  * 6.   Sort out the RPC call status
 
1106 call_status(struct rpc_task *task)
 
1108         struct rpc_clnt *clnt = task->tk_client;
 
1109         struct rpc_rqst *req = task->tk_rqstp;
 
1112         if (req->rq_received > 0 && !req->rq_bytes_sent)
 
1113                 task->tk_status = req->rq_received;
 
1115         dprint_status(task);
 
1117         status = task->tk_status;
 
1119                 task->tk_action = call_decode;
 
1123         task->tk_status = 0;
 
1129                  * Delay any retries for 3 seconds, then handle as if it
 
1132                 rpc_delay(task, 3*HZ);
 
1134                 task->tk_action = call_timeout;
 
1135                 if (task->tk_client->cl_discrtry)
 
1136                         xprt_conditional_disconnect(task->tk_xprt,
 
1137                                         req->rq_connect_cookie);
 
1141                 rpc_force_rebind(clnt);
 
1142                 task->tk_action = call_bind;
 
1145                 task->tk_action = call_transmit;
 
1148                 /* shutdown or soft timeout */
 
1149                 rpc_exit(task, status);
 
1152                 if (clnt->cl_chatty)
 
1153                         printk("%s: RPC call returned error %d\n",
 
1154                                clnt->cl_protname, -status);
 
1155                 rpc_exit(task, status);
 
1160  * 6a.  Handle RPC timeout
 
1161  *      We do not release the request slot, so we keep using the
 
1162  *      same XID for all retransmits.
 
1165 call_timeout(struct rpc_task *task)
 
1167         struct rpc_clnt *clnt = task->tk_client;
 
1169         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
 
1170                 dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
 
1174         dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
 
1175         task->tk_timeouts++;
 
1177         if (RPC_IS_SOFT(task)) {
 
1178                 if (clnt->cl_chatty)
 
1179                         printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
 
1180                                 clnt->cl_protname, clnt->cl_server);
 
1181                 rpc_exit(task, -EIO);
 
1185         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
 
1186                 task->tk_flags |= RPC_CALL_MAJORSEEN;
 
1187                 if (clnt->cl_chatty)
 
1188                         printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
 
1189                         clnt->cl_protname, clnt->cl_server);
 
1191         rpc_force_rebind(clnt);
 
1193          * Did our request time out due to an RPCSEC_GSS out-of-sequence
 
1194          * event? RFC2203 requires the server to drop all such requests.
 
1196         rpcauth_invalcred(task);
 
1199         clnt->cl_stats->rpcretrans++;
 
1200         task->tk_action = call_bind;
 
1201         task->tk_status = 0;
 
1205  * 7.   Decode the RPC reply
 
1208 call_decode(struct rpc_task *task)
 
1210         struct rpc_clnt *clnt = task->tk_client;
 
1211         struct rpc_rqst *req = task->tk_rqstp;
 
1212         kxdrproc_t      decode = task->tk_msg.rpc_proc->p_decode;
 
1215         dprintk("RPC: %5u call_decode (status %d)\n",
 
1216                         task->tk_pid, task->tk_status);
 
1218         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
 
1219                 if (clnt->cl_chatty)
 
1220                         printk(KERN_NOTICE "%s: server %s OK\n",
 
1221                                 clnt->cl_protname, clnt->cl_server);
 
1222                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
 
1226          * Ensure that we see all writes made by xprt_complete_rqst()
 
1227          * before it changed req->rq_received.
 
1230         req->rq_rcv_buf.len = req->rq_private_buf.len;
 
1232         /* Check that the softirq receive buffer is valid */
 
1233         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
 
1234                                 sizeof(req->rq_rcv_buf)) != 0);
 
1236         if (req->rq_rcv_buf.len < 12) {
 
1237                 if (!RPC_IS_SOFT(task)) {
 
1238                         task->tk_action = call_bind;
 
1239                         clnt->cl_stats->rpcretrans++;
 
1242                 dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
 
1243                                 clnt->cl_protname, task->tk_status);
 
1244                 task->tk_action = call_timeout;
 
1248         p = rpc_verify_header(task);
 
1250                 if (p == ERR_PTR(-EAGAIN))
 
1255         task->tk_action = rpc_exit_task;
 
1258                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
 
1259                                                       task->tk_msg.rpc_resp);
 
1261         dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
 
1265         task->tk_status = 0;
 
1266         /* Note: rpc_verify_header() may have freed the RPC slot */
 
1267         if (task->tk_rqstp == req) {
 
1268                 req->rq_received = req->rq_rcv_buf.len = 0;
 
1269                 if (task->tk_client->cl_discrtry)
 
1270                         xprt_conditional_disconnect(task->tk_xprt,
 
1271                                         req->rq_connect_cookie);
 
1276  * 8.   Refresh the credentials if rejected by the server
 
1279 call_refresh(struct rpc_task *task)
 
1281         dprint_status(task);
 
1283         task->tk_action = call_refreshresult;
 
1284         task->tk_status = 0;
 
1285         task->tk_client->cl_stats->rpcauthrefresh++;
 
1286         rpcauth_refreshcred(task);
 
1290  * 8a.  Process the results of a credential refresh
 
1293 call_refreshresult(struct rpc_task *task)
 
1295         int status = task->tk_status;
 
1297         dprint_status(task);
 
1299         task->tk_status = 0;
 
1300         task->tk_action = call_reserve;
 
1301         if (status >= 0 && rpcauth_uptodatecred(task))
 
1303         if (status == -EACCES) {
 
1304                 rpc_exit(task, -EACCES);
 
1307         task->tk_action = call_refresh;
 
1308         if (status != -ETIMEDOUT)
 
1309                 rpc_delay(task, 3*HZ);
 
1314 rpc_encode_header(struct rpc_task *task)
 
1316         struct rpc_clnt *clnt = task->tk_client;
 
1317         struct rpc_rqst *req = task->tk_rqstp;
 
1318         __be32          *p = req->rq_svec[0].iov_base;
 
1320         /* FIXME: check buffer size? */
 
1322         p = xprt_skip_transport_header(task->tk_xprt, p);
 
1323         *p++ = req->rq_xid;             /* XID */
 
1324         *p++ = htonl(RPC_CALL);         /* CALL */
 
1325         *p++ = htonl(RPC_VERSION);      /* RPC version */
 
1326         *p++ = htonl(clnt->cl_prog);    /* program number */
 
1327         *p++ = htonl(clnt->cl_vers);    /* program version */
 
1328         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
 
1329         p = rpcauth_marshcred(task, p);
 
1330         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
 
1335 rpc_verify_header(struct rpc_task *task)
 
1337         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
 
1338         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
 
1339         __be32  *p = iov->iov_base;
 
1341         int error = -EACCES;
 
1343         if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
 
1344                 /* RFC-1014 says that the representation of XDR data must be a
 
1345                  * multiple of four bytes
 
1346                  * - if it isn't pointer subtraction in the NFS client may give
 
1349                 dprintk("RPC: %5u %s: XDR representation not a multiple of"
 
1350                        " 4 bytes: 0x%x\n", task->tk_pid, __func__,
 
1351                        task->tk_rqstp->rq_rcv_buf.len);
 
1356         p += 1; /* skip XID */
 
1358         if ((n = ntohl(*p++)) != RPC_REPLY) {
 
1359                 dprintk("RPC: %5u %s: not an RPC reply: %x\n",
 
1360                                 task->tk_pid, __func__, n);
 
1363         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
 
1366                 switch ((n = ntohl(*p++))) {
 
1367                         case RPC_AUTH_ERROR:
 
1370                                 dprintk("RPC: %5u %s: RPC call version "
 
1372                                                 task->tk_pid, __func__);
 
1373                                 error = -EPROTONOSUPPORT;
 
1376                                 dprintk("RPC: %5u %s: RPC call rejected, "
 
1377                                                 "unknown error: %x\n",
 
1378                                                 task->tk_pid, __func__, n);
 
1383                 switch ((n = ntohl(*p++))) {
 
1384                 case RPC_AUTH_REJECTEDCRED:
 
1385                 case RPC_AUTH_REJECTEDVERF:
 
1386                 case RPCSEC_GSS_CREDPROBLEM:
 
1387                 case RPCSEC_GSS_CTXPROBLEM:
 
1388                         if (!task->tk_cred_retry)
 
1390                         task->tk_cred_retry--;
 
1391                         dprintk("RPC: %5u %s: retry stale creds\n",
 
1392                                         task->tk_pid, __func__);
 
1393                         rpcauth_invalcred(task);
 
1394                         /* Ensure we obtain a new XID! */
 
1396                         task->tk_action = call_refresh;
 
1398                 case RPC_AUTH_BADCRED:
 
1399                 case RPC_AUTH_BADVERF:
 
1400                         /* possibly garbled cred/verf? */
 
1401                         if (!task->tk_garb_retry)
 
1403                         task->tk_garb_retry--;
 
1404                         dprintk("RPC: %5u %s: retry garbled creds\n",
 
1405                                         task->tk_pid, __func__);
 
1406                         task->tk_action = call_bind;
 
1408                 case RPC_AUTH_TOOWEAK:
 
1409                         printk(KERN_NOTICE "RPC: server %s requires stronger "
 
1410                                "authentication.\n", task->tk_client->cl_server);
 
1413                         dprintk("RPC: %5u %s: unknown auth error: %x\n",
 
1414                                         task->tk_pid, __func__, n);
 
1417                 dprintk("RPC: %5u %s: call rejected %d\n",
 
1418                                 task->tk_pid, __func__, n);
 
1421         if (!(p = rpcauth_checkverf(task, p))) {
 
1422                 dprintk("RPC: %5u %s: auth check failed\n",
 
1423                                 task->tk_pid, __func__);
 
1424                 goto out_garbage;               /* bad verifier, retry */
 
1426         len = p - (__be32 *)iov->iov_base - 1;
 
1429         switch ((n = ntohl(*p++))) {
 
1432         case RPC_PROG_UNAVAIL:
 
1433                 dprintk("RPC: %5u %s: program %u is unsupported by server %s\n",
 
1434                                 task->tk_pid, __func__,
 
1435                                 (unsigned int)task->tk_client->cl_prog,
 
1436                                 task->tk_client->cl_server);
 
1437                 error = -EPFNOSUPPORT;
 
1439         case RPC_PROG_MISMATCH:
 
1440                 dprintk("RPC: %5u %s: program %u, version %u unsupported by "
 
1441                                 "server %s\n", task->tk_pid, __func__,
 
1442                                 (unsigned int)task->tk_client->cl_prog,
 
1443                                 (unsigned int)task->tk_client->cl_vers,
 
1444                                 task->tk_client->cl_server);
 
1445                 error = -EPROTONOSUPPORT;
 
1447         case RPC_PROC_UNAVAIL:
 
1448                 dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
 
1449                                 "version %u on server %s\n",
 
1450                                 task->tk_pid, __func__,
 
1451                                 rpc_proc_name(task),
 
1452                                 task->tk_client->cl_prog,
 
1453                                 task->tk_client->cl_vers,
 
1454                                 task->tk_client->cl_server);
 
1455                 error = -EOPNOTSUPP;
 
1457         case RPC_GARBAGE_ARGS:
 
1458                 dprintk("RPC: %5u %s: server saw garbage\n",
 
1459                                 task->tk_pid, __func__);
 
1462                 dprintk("RPC: %5u %s: server accept status: %x\n",
 
1463                                 task->tk_pid, __func__, n);
 
1468         task->tk_client->cl_stats->rpcgarbage++;
 
1469         if (task->tk_garb_retry) {
 
1470                 task->tk_garb_retry--;
 
1471                 dprintk("RPC: %5u %s: retrying\n",
 
1472                                 task->tk_pid, __func__);
 
1473                 task->tk_action = call_bind;
 
1475                 return ERR_PTR(-EAGAIN);
 
1480         rpc_exit(task, error);
 
1481         dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
 
1483         return ERR_PTR(error);
 
1485         dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
 
1490 static int rpcproc_encode_null(void *rqstp, __be32 *data, void *obj)
 
1495 static int rpcproc_decode_null(void *rqstp, __be32 *data, void *obj)
 
1500 static struct rpc_procinfo rpcproc_null = {
 
1501         .p_encode = rpcproc_encode_null,
 
1502         .p_decode = rpcproc_decode_null,
 
1505 static int rpc_ping(struct rpc_clnt *clnt, int flags)
 
1507         struct rpc_message msg = {
 
1508                 .rpc_proc = &rpcproc_null,
 
1511         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
 
1512         err = rpc_call_sync(clnt, &msg, flags);
 
1513         put_rpccred(msg.rpc_cred);
 
1517 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
 
1519         struct rpc_message msg = {
 
1520                 .rpc_proc = &rpcproc_null,
 
1523         struct rpc_task_setup task_setup_data = {
 
1525                 .rpc_message = &msg,
 
1526                 .callback_ops = &rpc_default_ops,
 
1529         return rpc_run_task(&task_setup_data);
 
1531 EXPORT_SYMBOL_GPL(rpc_call_null);
 
1534 static void rpc_show_header(void)
 
1536         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
 
1537                 "-timeout ---ops--\n");
 
1540 static void rpc_show_task(const struct rpc_clnt *clnt,
 
1541                           const struct rpc_task *task)
 
1543         const char *rpc_waitq = "none";
 
1544         char *p, action[KSYM_SYMBOL_LEN];
 
1546         if (RPC_IS_QUEUED(task))
 
1547                 rpc_waitq = rpc_qname(task->tk_waitqueue);
 
1549         /* map tk_action pointer to a function name; then trim off
 
1550          * the "+0x0 [sunrpc]" */
 
1551         sprint_symbol(action, (unsigned long)task->tk_action);
 
1552         p = strchr(action, '+');
 
1556         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%s q:%s\n",
 
1557                 task->tk_pid, task->tk_flags, task->tk_status,
 
1558                 clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
 
1559                 clnt->cl_protname, clnt->cl_vers, rpc_proc_name(task),
 
1563 void rpc_show_tasks(void)
 
1565         struct rpc_clnt *clnt;
 
1566         struct rpc_task *task;
 
1569         spin_lock(&rpc_client_lock);
 
1570         list_for_each_entry(clnt, &all_clients, cl_clients) {
 
1571                 spin_lock(&clnt->cl_lock);
 
1572                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
 
1577                         rpc_show_task(clnt, task);
 
1579                 spin_unlock(&clnt->cl_lock);
 
1581         spin_unlock(&rpc_client_lock);