Merge commit 'origin' into devel
[linux-2.6] / net / sunrpc / xprtrdma / transport.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * transport.c
42  *
43  * This file contains the top-level implementation of an RPC RDMA
44  * transport.
45  *
46  * Naming convention: functions beginning with xprt_ are part of the
47  * transport switch. All others are RPC RDMA internal.
48  */
49
50 #include <linux/module.h>
51 #include <linux/init.h>
52 #include <linux/seq_file.h>
53
54 #include "xprt_rdma.h"
55
56 #ifdef RPC_DEBUG
57 # define RPCDBG_FACILITY        RPCDBG_TRANS
58 #endif
59
60 MODULE_LICENSE("Dual BSD/GPL");
61
62 MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS");
63 MODULE_AUTHOR("Network Appliance, Inc.");
64
65 /*
66  * tunables
67  */
68
69 static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
70 static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
71 static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
72 static unsigned int xprt_rdma_inline_write_padding;
73 #if !RPCRDMA_PERSISTENT_REGISTRATION
74 static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_REGISTER; /* FMR? */
75 #else
76 static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_ALLPHYSICAL;
77 #endif
78
79 #ifdef RPC_DEBUG
80
81 static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
82 static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
83 static unsigned int zero;
84 static unsigned int max_padding = PAGE_SIZE;
85 static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
86 static unsigned int max_memreg = RPCRDMA_LAST - 1;
87
88 static struct ctl_table_header *sunrpc_table_header;
89
90 static ctl_table xr_tunables_table[] = {
91         {
92                 .ctl_name       = CTL_UNNUMBERED,
93                 .procname       = "rdma_slot_table_entries",
94                 .data           = &xprt_rdma_slot_table_entries,
95                 .maxlen         = sizeof(unsigned int),
96                 .mode           = 0644,
97                 .proc_handler   = &proc_dointvec_minmax,
98                 .strategy       = &sysctl_intvec,
99                 .extra1         = &min_slot_table_size,
100                 .extra2         = &max_slot_table_size
101         },
102         {
103                 .ctl_name       = CTL_UNNUMBERED,
104                 .procname       = "rdma_max_inline_read",
105                 .data           = &xprt_rdma_max_inline_read,
106                 .maxlen         = sizeof(unsigned int),
107                 .mode           = 0644,
108                 .proc_handler   = &proc_dointvec,
109                 .strategy       = &sysctl_intvec,
110         },
111         {
112                 .ctl_name       = CTL_UNNUMBERED,
113                 .procname       = "rdma_max_inline_write",
114                 .data           = &xprt_rdma_max_inline_write,
115                 .maxlen         = sizeof(unsigned int),
116                 .mode           = 0644,
117                 .proc_handler   = &proc_dointvec,
118                 .strategy       = &sysctl_intvec,
119         },
120         {
121                 .ctl_name       = CTL_UNNUMBERED,
122                 .procname       = "rdma_inline_write_padding",
123                 .data           = &xprt_rdma_inline_write_padding,
124                 .maxlen         = sizeof(unsigned int),
125                 .mode           = 0644,
126                 .proc_handler   = &proc_dointvec_minmax,
127                 .strategy       = &sysctl_intvec,
128                 .extra1         = &zero,
129                 .extra2         = &max_padding,
130         },
131         {
132                 .ctl_name       = CTL_UNNUMBERED,
133                 .procname       = "rdma_memreg_strategy",
134                 .data           = &xprt_rdma_memreg_strategy,
135                 .maxlen         = sizeof(unsigned int),
136                 .mode           = 0644,
137                 .proc_handler   = &proc_dointvec_minmax,
138                 .strategy       = &sysctl_intvec,
139                 .extra1         = &min_memreg,
140                 .extra2         = &max_memreg,
141         },
142         {
143                 .ctl_name = 0,
144         },
145 };
146
147 static ctl_table sunrpc_table[] = {
148         {
149                 .ctl_name       = CTL_SUNRPC,
150                 .procname       = "sunrpc",
151                 .mode           = 0555,
152                 .child          = xr_tunables_table
153         },
154         {
155                 .ctl_name = 0,
156         },
157 };
158
159 #endif
160
161 static struct rpc_xprt_ops xprt_rdma_procs;     /* forward reference */
162
163 static void
164 xprt_rdma_format_addresses(struct rpc_xprt *xprt)
165 {
166         struct sockaddr_in *addr = (struct sockaddr_in *)
167                                         &rpcx_to_rdmad(xprt).addr;
168         char *buf;
169
170         buf = kzalloc(20, GFP_KERNEL);
171         if (buf)
172                 snprintf(buf, 20, NIPQUAD_FMT, NIPQUAD(addr->sin_addr.s_addr));
173         xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
174
175         buf = kzalloc(8, GFP_KERNEL);
176         if (buf)
177                 snprintf(buf, 8, "%u", ntohs(addr->sin_port));
178         xprt->address_strings[RPC_DISPLAY_PORT] = buf;
179
180         xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
181
182         buf = kzalloc(48, GFP_KERNEL);
183         if (buf)
184                 snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
185                         NIPQUAD(addr->sin_addr.s_addr),
186                         ntohs(addr->sin_port), "rdma");
187         xprt->address_strings[RPC_DISPLAY_ALL] = buf;
188
189         buf = kzalloc(10, GFP_KERNEL);
190         if (buf)
191                 snprintf(buf, 10, "%02x%02x%02x%02x",
192                         NIPQUAD(addr->sin_addr.s_addr));
193         xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
194
195         buf = kzalloc(8, GFP_KERNEL);
196         if (buf)
197                 snprintf(buf, 8, "%4hx", ntohs(addr->sin_port));
198         xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
199
200         buf = kzalloc(30, GFP_KERNEL);
201         if (buf)
202                 snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
203                         NIPQUAD(addr->sin_addr.s_addr),
204                         ntohs(addr->sin_port) >> 8,
205                         ntohs(addr->sin_port) & 0xff);
206         xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
207
208         /* netid */
209         xprt->address_strings[RPC_DISPLAY_NETID] = "rdma";
210 }
211
212 static void
213 xprt_rdma_free_addresses(struct rpc_xprt *xprt)
214 {
215         unsigned int i;
216
217         for (i = 0; i < RPC_DISPLAY_MAX; i++)
218                 switch (i) {
219                 case RPC_DISPLAY_PROTO:
220                 case RPC_DISPLAY_NETID:
221                         continue;
222                 default:
223                         kfree(xprt->address_strings[i]);
224                 }
225 }
226
227 static void
228 xprt_rdma_connect_worker(struct work_struct *work)
229 {
230         struct rpcrdma_xprt *r_xprt =
231                 container_of(work, struct rpcrdma_xprt, rdma_connect.work);
232         struct rpc_xprt *xprt = &r_xprt->xprt;
233         int rc = 0;
234
235         if (!xprt->shutdown) {
236                 xprt_clear_connected(xprt);
237
238                 dprintk("RPC:       %s: %sconnect\n", __func__,
239                                 r_xprt->rx_ep.rep_connected != 0 ? "re" : "");
240                 rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
241                 if (rc)
242                         goto out;
243         }
244         goto out_clear;
245
246 out:
247         xprt_wake_pending_tasks(xprt, rc);
248
249 out_clear:
250         dprintk("RPC:       %s: exit\n", __func__);
251         xprt_clear_connecting(xprt);
252 }
253
254 /*
255  * xprt_rdma_destroy
256  *
257  * Destroy the xprt.
258  * Free all memory associated with the object, including its own.
259  * NOTE: none of the *destroy methods free memory for their top-level
260  * objects, even though they may have allocated it (they do free
261  * private memory). It's up to the caller to handle it. In this
262  * case (RDMA transport), all structure memory is inlined with the
263  * struct rpcrdma_xprt.
264  */
265 static void
266 xprt_rdma_destroy(struct rpc_xprt *xprt)
267 {
268         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
269         int rc;
270
271         dprintk("RPC:       %s: called\n", __func__);
272
273         cancel_delayed_work(&r_xprt->rdma_connect);
274         flush_scheduled_work();
275
276         xprt_clear_connected(xprt);
277
278         rpcrdma_buffer_destroy(&r_xprt->rx_buf);
279         rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
280         if (rc)
281                 dprintk("RPC:       %s: rpcrdma_ep_destroy returned %i\n",
282                         __func__, rc);
283         rpcrdma_ia_close(&r_xprt->rx_ia);
284
285         xprt_rdma_free_addresses(xprt);
286
287         kfree(xprt->slot);
288         xprt->slot = NULL;
289         kfree(xprt);
290
291         dprintk("RPC:       %s: returning\n", __func__);
292
293         module_put(THIS_MODULE);
294 }
295
296 static const struct rpc_timeout xprt_rdma_default_timeout = {
297         .to_initval = 60 * HZ,
298         .to_maxval = 60 * HZ,
299 };
300
301 /**
302  * xprt_setup_rdma - Set up transport to use RDMA
303  *
304  * @args: rpc transport arguments
305  */
306 static struct rpc_xprt *
307 xprt_setup_rdma(struct xprt_create *args)
308 {
309         struct rpcrdma_create_data_internal cdata;
310         struct rpc_xprt *xprt;
311         struct rpcrdma_xprt *new_xprt;
312         struct rpcrdma_ep *new_ep;
313         struct sockaddr_in *sin;
314         int rc;
315
316         if (args->addrlen > sizeof(xprt->addr)) {
317                 dprintk("RPC:       %s: address too large\n", __func__);
318                 return ERR_PTR(-EBADF);
319         }
320
321         xprt = kzalloc(sizeof(struct rpcrdma_xprt), GFP_KERNEL);
322         if (xprt == NULL) {
323                 dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n",
324                         __func__);
325                 return ERR_PTR(-ENOMEM);
326         }
327
328         xprt->max_reqs = xprt_rdma_slot_table_entries;
329         xprt->slot = kcalloc(xprt->max_reqs,
330                                 sizeof(struct rpc_rqst), GFP_KERNEL);
331         if (xprt->slot == NULL) {
332                 dprintk("RPC:       %s: couldn't allocate %d slots\n",
333                         __func__, xprt->max_reqs);
334                 kfree(xprt);
335                 return ERR_PTR(-ENOMEM);
336         }
337
338         /* 60 second timeout, no retries */
339         xprt->timeout = &xprt_rdma_default_timeout;
340         xprt->bind_timeout = (60U * HZ);
341         xprt->connect_timeout = (60U * HZ);
342         xprt->reestablish_timeout = (5U * HZ);
343         xprt->idle_timeout = (5U * 60 * HZ);
344
345         xprt->resvport = 0;             /* privileged port not needed */
346         xprt->tsh_size = 0;             /* RPC-RDMA handles framing */
347         xprt->max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE;
348         xprt->ops = &xprt_rdma_procs;
349
350         /*
351          * Set up RDMA-specific connect data.
352          */
353
354         /* Put server RDMA address in local cdata */
355         memcpy(&cdata.addr, args->dstaddr, args->addrlen);
356
357         /* Ensure xprt->addr holds valid server TCP (not RDMA)
358          * address, for any side protocols which peek at it */
359         xprt->prot = IPPROTO_TCP;
360         xprt->addrlen = args->addrlen;
361         memcpy(&xprt->addr, &cdata.addr, xprt->addrlen);
362
363         sin = (struct sockaddr_in *)&cdata.addr;
364         if (ntohs(sin->sin_port) != 0)
365                 xprt_set_bound(xprt);
366
367         dprintk("RPC:       %s: %u.%u.%u.%u:%u\n", __func__,
368                         NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
369
370         /* Set max requests */
371         cdata.max_requests = xprt->max_reqs;
372
373         /* Set some length limits */
374         cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
375         cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
376
377         cdata.inline_wsize = xprt_rdma_max_inline_write;
378         if (cdata.inline_wsize > cdata.wsize)
379                 cdata.inline_wsize = cdata.wsize;
380
381         cdata.inline_rsize = xprt_rdma_max_inline_read;
382         if (cdata.inline_rsize > cdata.rsize)
383                 cdata.inline_rsize = cdata.rsize;
384
385         cdata.padding = xprt_rdma_inline_write_padding;
386
387         /*
388          * Create new transport instance, which includes initialized
389          *  o ia
390          *  o endpoint
391          *  o buffers
392          */
393
394         new_xprt = rpcx_to_rdmax(xprt);
395
396         rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr,
397                                 xprt_rdma_memreg_strategy);
398         if (rc)
399                 goto out1;
400
401         /*
402          * initialize and create ep
403          */
404         new_xprt->rx_data = cdata;
405         new_ep = &new_xprt->rx_ep;
406         new_ep->rep_remote_addr = cdata.addr;
407
408         rc = rpcrdma_ep_create(&new_xprt->rx_ep,
409                                 &new_xprt->rx_ia, &new_xprt->rx_data);
410         if (rc)
411                 goto out2;
412
413         /*
414          * Allocate pre-registered send and receive buffers for headers and
415          * any inline data. Also specify any padding which will be provided
416          * from a preregistered zero buffer.
417          */
418         rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
419                                 &new_xprt->rx_data);
420         if (rc)
421                 goto out3;
422
423         /*
424          * Register a callback for connection events. This is necessary because
425          * connection loss notification is async. We also catch connection loss
426          * when reaping receives.
427          */
428         INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
429         new_ep->rep_func = rpcrdma_conn_func;
430         new_ep->rep_xprt = xprt;
431
432         xprt_rdma_format_addresses(xprt);
433
434         if (!try_module_get(THIS_MODULE))
435                 goto out4;
436
437         return xprt;
438
439 out4:
440         xprt_rdma_free_addresses(xprt);
441         rc = -EINVAL;
442 out3:
443         (void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
444 out2:
445         rpcrdma_ia_close(&new_xprt->rx_ia);
446 out1:
447         kfree(xprt->slot);
448         kfree(xprt);
449         return ERR_PTR(rc);
450 }
451
452 /*
453  * Close a connection, during shutdown or timeout/reconnect
454  */
455 static void
456 xprt_rdma_close(struct rpc_xprt *xprt)
457 {
458         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
459
460         dprintk("RPC:       %s: closing\n", __func__);
461         xprt_disconnect_done(xprt);
462         (void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
463 }
464
465 static void
466 xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
467 {
468         struct sockaddr_in *sap;
469
470         sap = (struct sockaddr_in *)&xprt->addr;
471         sap->sin_port = htons(port);
472         sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr;
473         sap->sin_port = htons(port);
474         dprintk("RPC:       %s: %u\n", __func__, port);
475 }
476
477 static void
478 xprt_rdma_connect(struct rpc_task *task)
479 {
480         struct rpc_xprt *xprt = (struct rpc_xprt *)task->tk_xprt;
481         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
482
483         if (!xprt_test_and_set_connecting(xprt)) {
484                 if (r_xprt->rx_ep.rep_connected != 0) {
485                         /* Reconnect */
486                         schedule_delayed_work(&r_xprt->rdma_connect,
487                                 xprt->reestablish_timeout);
488                 } else {
489                         schedule_delayed_work(&r_xprt->rdma_connect, 0);
490                         if (!RPC_IS_ASYNC(task))
491                                 flush_scheduled_work();
492                 }
493         }
494 }
495
496 static int
497 xprt_rdma_reserve_xprt(struct rpc_task *task)
498 {
499         struct rpc_xprt *xprt = task->tk_xprt;
500         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
501         int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
502
503         /* == RPC_CWNDSCALE @ init, but *after* setup */
504         if (r_xprt->rx_buf.rb_cwndscale == 0UL) {
505                 r_xprt->rx_buf.rb_cwndscale = xprt->cwnd;
506                 dprintk("RPC:       %s: cwndscale %lu\n", __func__,
507                         r_xprt->rx_buf.rb_cwndscale);
508                 BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
509         }
510         xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
511         return xprt_reserve_xprt_cong(task);
512 }
513
514 /*
515  * The RDMA allocate/free functions need the task structure as a place
516  * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
517  * sequence. For this reason, the recv buffers are attached to send
518  * buffers for portions of the RPC. Note that the RPC layer allocates
519  * both send and receive buffers in the same call. We may register
520  * the receive buffer portion when using reply chunks.
521  */
522 static void *
523 xprt_rdma_allocate(struct rpc_task *task, size_t size)
524 {
525         struct rpc_xprt *xprt = task->tk_xprt;
526         struct rpcrdma_req *req, *nreq;
527
528         req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
529         BUG_ON(NULL == req);
530
531         if (size > req->rl_size) {
532                 dprintk("RPC:       %s: size %zd too large for buffer[%zd]: "
533                         "prog %d vers %d proc %d\n",
534                         __func__, size, req->rl_size,
535                         task->tk_client->cl_prog, task->tk_client->cl_vers,
536                         task->tk_msg.rpc_proc->p_proc);
537                 /*
538                  * Outgoing length shortage. Our inline write max must have
539                  * been configured to perform direct i/o.
540                  *
541                  * This is therefore a large metadata operation, and the
542                  * allocate call was made on the maximum possible message,
543                  * e.g. containing long filename(s) or symlink data. In
544                  * fact, while these metadata operations *might* carry
545                  * large outgoing payloads, they rarely *do*. However, we
546                  * have to commit to the request here, so reallocate and
547                  * register it now. The data path will never require this
548                  * reallocation.
549                  *
550                  * If the allocation or registration fails, the RPC framework
551                  * will (doggedly) retry.
552                  */
553                 if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy ==
554                                 RPCRDMA_BOUNCEBUFFERS) {
555                         /* forced to "pure inline" */
556                         dprintk("RPC:       %s: too much data (%zd) for inline "
557                                         "(r/w max %d/%d)\n", __func__, size,
558                                         rpcx_to_rdmad(xprt).inline_rsize,
559                                         rpcx_to_rdmad(xprt).inline_wsize);
560                         size = req->rl_size;
561                         rpc_exit(task, -EIO);           /* fail the operation */
562                         rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
563                         goto out;
564                 }
565                 if (task->tk_flags & RPC_TASK_SWAPPER)
566                         nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
567                 else
568                         nreq = kmalloc(sizeof *req + size, GFP_NOFS);
569                 if (nreq == NULL)
570                         goto outfail;
571
572                 if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
573                                 nreq->rl_base, size + sizeof(struct rpcrdma_req)
574                                 - offsetof(struct rpcrdma_req, rl_base),
575                                 &nreq->rl_handle, &nreq->rl_iov)) {
576                         kfree(nreq);
577                         goto outfail;
578                 }
579                 rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
580                 nreq->rl_size = size;
581                 nreq->rl_niovs = 0;
582                 nreq->rl_nchunks = 0;
583                 nreq->rl_buffer = (struct rpcrdma_buffer *)req;
584                 nreq->rl_reply = req->rl_reply;
585                 memcpy(nreq->rl_segments,
586                         req->rl_segments, sizeof nreq->rl_segments);
587                 /* flag the swap with an unused field */
588                 nreq->rl_iov.length = 0;
589                 req->rl_reply = NULL;
590                 req = nreq;
591         }
592         dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
593 out:
594         return req->rl_xdr_buf;
595
596 outfail:
597         rpcrdma_buffer_put(req);
598         rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
599         return NULL;
600 }
601
602 /*
603  * This function returns all RDMA resources to the pool.
604  */
605 static void
606 xprt_rdma_free(void *buffer)
607 {
608         struct rpcrdma_req *req;
609         struct rpcrdma_xprt *r_xprt;
610         struct rpcrdma_rep *rep;
611         int i;
612
613         if (buffer == NULL)
614                 return;
615
616         req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
617         if (req->rl_iov.length == 0) {  /* see allocate above */
618                 r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer,
619                                       struct rpcrdma_xprt, rx_buf);
620         } else
621                 r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
622         rep = req->rl_reply;
623
624         dprintk("RPC:       %s: called on 0x%p%s\n",
625                 __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
626
627         /*
628          * Finish the deregistration. When using mw bind, this was
629          * begun in rpcrdma_reply_handler(). In all other modes, we
630          * do it here, in thread context. The process is considered
631          * complete when the rr_func vector becomes NULL - this
632          * was put in place during rpcrdma_reply_handler() - the wait
633          * call below will not block if the dereg is "done". If
634          * interrupted, our framework will clean up.
635          */
636         for (i = 0; req->rl_nchunks;) {
637                 --req->rl_nchunks;
638                 i += rpcrdma_deregister_external(
639                         &req->rl_segments[i], r_xprt, NULL);
640         }
641
642         if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) {
643                 rep->rr_func = NULL;    /* abandon the callback */
644                 req->rl_reply = NULL;
645         }
646
647         if (req->rl_iov.length == 0) {  /* see allocate above */
648                 struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
649                 oreq->rl_reply = req->rl_reply;
650                 (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
651                                                    req->rl_handle,
652                                                    &req->rl_iov);
653                 kfree(req);
654                 req = oreq;
655         }
656
657         /* Put back request+reply buffers */
658         rpcrdma_buffer_put(req);
659 }
660
661 /*
662  * send_request invokes the meat of RPC RDMA. It must do the following:
663  *  1.  Marshal the RPC request into an RPC RDMA request, which means
664  *      putting a header in front of data, and creating IOVs for RDMA
665  *      from those in the request.
666  *  2.  In marshaling, detect opportunities for RDMA, and use them.
667  *  3.  Post a recv message to set up asynch completion, then send
668  *      the request (rpcrdma_ep_post).
669  *  4.  No partial sends are possible in the RPC-RDMA protocol (as in UDP).
670  */
671
672 static int
673 xprt_rdma_send_request(struct rpc_task *task)
674 {
675         struct rpc_rqst *rqst = task->tk_rqstp;
676         struct rpc_xprt *xprt = task->tk_xprt;
677         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
678         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
679
680         /* marshal the send itself */
681         if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
682                 r_xprt->rx_stats.failed_marshal_count++;
683                 dprintk("RPC:       %s: rpcrdma_marshal_req failed\n",
684                         __func__);
685                 return -EIO;
686         }
687
688         if (req->rl_reply == NULL)              /* e.g. reconnection */
689                 rpcrdma_recv_buffer_get(req);
690
691         if (req->rl_reply) {
692                 req->rl_reply->rr_func = rpcrdma_reply_handler;
693                 /* this need only be done once, but... */
694                 req->rl_reply->rr_xprt = xprt;
695         }
696
697         if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) {
698                 xprt_disconnect_done(xprt);
699                 return -ENOTCONN;       /* implies disconnect */
700         }
701
702         rqst->rq_bytes_sent = 0;
703         return 0;
704 }
705
706 static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
707 {
708         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
709         long idle_time = 0;
710
711         if (xprt_connected(xprt))
712                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
713
714         seq_printf(seq,
715           "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu "
716           "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n",
717
718            0,   /* need a local port? */
719            xprt->stat.bind_count,
720            xprt->stat.connect_count,
721            xprt->stat.connect_time,
722            idle_time,
723            xprt->stat.sends,
724            xprt->stat.recvs,
725            xprt->stat.bad_xids,
726            xprt->stat.req_u,
727            xprt->stat.bklog_u,
728
729            r_xprt->rx_stats.read_chunk_count,
730            r_xprt->rx_stats.write_chunk_count,
731            r_xprt->rx_stats.reply_chunk_count,
732            r_xprt->rx_stats.total_rdma_request,
733            r_xprt->rx_stats.total_rdma_reply,
734            r_xprt->rx_stats.pullup_copy_count,
735            r_xprt->rx_stats.fixup_copy_count,
736            r_xprt->rx_stats.hardway_register_count,
737            r_xprt->rx_stats.failed_marshal_count,
738            r_xprt->rx_stats.bad_reply_count);
739 }
740
741 /*
742  * Plumbing for rpc transport switch and kernel module
743  */
744
745 static struct rpc_xprt_ops xprt_rdma_procs = {
746         .reserve_xprt           = xprt_rdma_reserve_xprt,
747         .release_xprt           = xprt_release_xprt_cong, /* sunrpc/xprt.c */
748         .release_request        = xprt_release_rqst_cong,       /* ditto */
749         .set_retrans_timeout    = xprt_set_retrans_timeout_def, /* ditto */
750         .rpcbind                = rpcb_getport_async,   /* sunrpc/rpcb_clnt.c */
751         .set_port               = xprt_rdma_set_port,
752         .connect                = xprt_rdma_connect,
753         .buf_alloc              = xprt_rdma_allocate,
754         .buf_free               = xprt_rdma_free,
755         .send_request           = xprt_rdma_send_request,
756         .close                  = xprt_rdma_close,
757         .destroy                = xprt_rdma_destroy,
758         .print_stats            = xprt_rdma_print_stats
759 };
760
761 static struct xprt_class xprt_rdma = {
762         .list                   = LIST_HEAD_INIT(xprt_rdma.list),
763         .name                   = "rdma",
764         .owner                  = THIS_MODULE,
765         .ident                  = XPRT_TRANSPORT_RDMA,
766         .setup                  = xprt_setup_rdma,
767 };
768
769 static void __exit xprt_rdma_cleanup(void)
770 {
771         int rc;
772
773         dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
774 #ifdef RPC_DEBUG
775         if (sunrpc_table_header) {
776                 unregister_sysctl_table(sunrpc_table_header);
777                 sunrpc_table_header = NULL;
778         }
779 #endif
780         rc = xprt_unregister_transport(&xprt_rdma);
781         if (rc)
782                 dprintk("RPC:       %s: xprt_unregister returned %i\n",
783                         __func__, rc);
784 }
785
786 static int __init xprt_rdma_init(void)
787 {
788         int rc;
789
790         rc = xprt_register_transport(&xprt_rdma);
791
792         if (rc)
793                 return rc;
794
795         dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n");
796
797         dprintk(KERN_INFO "Defaults:\n");
798         dprintk(KERN_INFO "\tSlots %d\n"
799                 "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
800                 xprt_rdma_slot_table_entries,
801                 xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
802         dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n",
803                 xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);
804
805 #ifdef RPC_DEBUG
806         if (!sunrpc_table_header)
807                 sunrpc_table_header = register_sysctl_table(sunrpc_table);
808 #endif
809         return 0;
810 }
811
812 module_init(xprt_rdma_init);
813 module_exit(xprt_rdma_cleanup);