Merge git://git.kernel.org/pub/scm/linux/kernel/git/herbert/crypto-2.6
[linux-2.6] / net / sunrpc / xprtrdma / transport.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * transport.c
42  *
43  * This file contains the top-level implementation of an RPC RDMA
44  * transport.
45  *
46  * Naming convention: functions beginning with xprt_ are part of the
47  * transport switch. All others are RPC RDMA internal.
48  */
49
50 #include <linux/module.h>
51 #include <linux/init.h>
52 #include <linux/seq_file.h>
53
54 #include "xprt_rdma.h"
55
56 #ifdef RPC_DEBUG
57 # define RPCDBG_FACILITY        RPCDBG_TRANS
58 #endif
59
60 MODULE_LICENSE("Dual BSD/GPL");
61
62 MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS");
63 MODULE_AUTHOR("Network Appliance, Inc.");
64
65 /*
66  * tunables
67  */
68
69 static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
70 static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
71 static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
72 static unsigned int xprt_rdma_inline_write_padding;
73 #if !RPCRDMA_PERSISTENT_REGISTRATION
74 static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_REGISTER; /* FMR? */
75 #else
76 static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_ALLPHYSICAL;
77 #endif
78
79 #ifdef RPC_DEBUG
80
81 static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
82 static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
83 static unsigned int zero;
84 static unsigned int max_padding = PAGE_SIZE;
85 static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
86 static unsigned int max_memreg = RPCRDMA_LAST - 1;
87
88 static struct ctl_table_header *sunrpc_table_header;
89
90 static ctl_table xr_tunables_table[] = {
91         {
92                 .ctl_name       = CTL_UNNUMBERED,
93                 .procname       = "rdma_slot_table_entries",
94                 .data           = &xprt_rdma_slot_table_entries,
95                 .maxlen         = sizeof(unsigned int),
96                 .mode           = 0644,
97                 .proc_handler   = &proc_dointvec_minmax,
98                 .strategy       = &sysctl_intvec,
99                 .extra1         = &min_slot_table_size,
100                 .extra2         = &max_slot_table_size
101         },
102         {
103                 .ctl_name       = CTL_UNNUMBERED,
104                 .procname       = "rdma_max_inline_read",
105                 .data           = &xprt_rdma_max_inline_read,
106                 .maxlen         = sizeof(unsigned int),
107                 .mode           = 0644,
108                 .proc_handler   = &proc_dointvec,
109                 .strategy       = &sysctl_intvec,
110         },
111         {
112                 .ctl_name       = CTL_UNNUMBERED,
113                 .procname       = "rdma_max_inline_write",
114                 .data           = &xprt_rdma_max_inline_write,
115                 .maxlen         = sizeof(unsigned int),
116                 .mode           = 0644,
117                 .proc_handler   = &proc_dointvec,
118                 .strategy       = &sysctl_intvec,
119         },
120         {
121                 .ctl_name       = CTL_UNNUMBERED,
122                 .procname       = "rdma_inline_write_padding",
123                 .data           = &xprt_rdma_inline_write_padding,
124                 .maxlen         = sizeof(unsigned int),
125                 .mode           = 0644,
126                 .proc_handler   = &proc_dointvec_minmax,
127                 .strategy       = &sysctl_intvec,
128                 .extra1         = &zero,
129                 .extra2         = &max_padding,
130         },
131         {
132                 .ctl_name       = CTL_UNNUMBERED,
133                 .procname       = "rdma_memreg_strategy",
134                 .data           = &xprt_rdma_memreg_strategy,
135                 .maxlen         = sizeof(unsigned int),
136                 .mode           = 0644,
137                 .proc_handler   = &proc_dointvec_minmax,
138                 .strategy       = &sysctl_intvec,
139                 .extra1         = &min_memreg,
140                 .extra2         = &max_memreg,
141         },
142         {
143                 .ctl_name = 0,
144         },
145 };
146
147 static ctl_table sunrpc_table[] = {
148         {
149                 .ctl_name       = CTL_SUNRPC,
150                 .procname       = "sunrpc",
151                 .mode           = 0555,
152                 .child          = xr_tunables_table
153         },
154         {
155                 .ctl_name = 0,
156         },
157 };
158
159 #endif
160
161 static struct rpc_xprt_ops xprt_rdma_procs;     /* forward reference */
162
163 static void
164 xprt_rdma_format_addresses(struct rpc_xprt *xprt)
165 {
166         struct sockaddr_in *addr = (struct sockaddr_in *)
167                                         &rpcx_to_rdmad(xprt).addr;
168         char *buf;
169
170         buf = kzalloc(20, GFP_KERNEL);
171         if (buf)
172                 snprintf(buf, 20, NIPQUAD_FMT, NIPQUAD(addr->sin_addr.s_addr));
173         xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
174
175         buf = kzalloc(8, GFP_KERNEL);
176         if (buf)
177                 snprintf(buf, 8, "%u", ntohs(addr->sin_port));
178         xprt->address_strings[RPC_DISPLAY_PORT] = buf;
179
180         xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
181
182         buf = kzalloc(48, GFP_KERNEL);
183         if (buf)
184                 snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
185                         NIPQUAD(addr->sin_addr.s_addr),
186                         ntohs(addr->sin_port), "rdma");
187         xprt->address_strings[RPC_DISPLAY_ALL] = buf;
188
189         buf = kzalloc(10, GFP_KERNEL);
190         if (buf)
191                 snprintf(buf, 10, "%02x%02x%02x%02x",
192                         NIPQUAD(addr->sin_addr.s_addr));
193         xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
194
195         buf = kzalloc(8, GFP_KERNEL);
196         if (buf)
197                 snprintf(buf, 8, "%4hx", ntohs(addr->sin_port));
198         xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
199
200         buf = kzalloc(30, GFP_KERNEL);
201         if (buf)
202                 snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
203                         NIPQUAD(addr->sin_addr.s_addr),
204                         ntohs(addr->sin_port) >> 8,
205                         ntohs(addr->sin_port) & 0xff);
206         xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
207
208         /* netid */
209         xprt->address_strings[RPC_DISPLAY_NETID] = "rdma";
210 }
211
212 static void
213 xprt_rdma_free_addresses(struct rpc_xprt *xprt)
214 {
215         kfree(xprt->address_strings[RPC_DISPLAY_ADDR]);
216         kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
217         kfree(xprt->address_strings[RPC_DISPLAY_ALL]);
218         kfree(xprt->address_strings[RPC_DISPLAY_HEX_ADDR]);
219         kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
220         kfree(xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR]);
221 }
222
223 static void
224 xprt_rdma_connect_worker(struct work_struct *work)
225 {
226         struct rpcrdma_xprt *r_xprt =
227                 container_of(work, struct rpcrdma_xprt, rdma_connect.work);
228         struct rpc_xprt *xprt = &r_xprt->xprt;
229         int rc = 0;
230
231         if (!xprt->shutdown) {
232                 xprt_clear_connected(xprt);
233
234                 dprintk("RPC:       %s: %sconnect\n", __func__,
235                                 r_xprt->rx_ep.rep_connected != 0 ? "re" : "");
236                 rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
237                 if (rc)
238                         goto out;
239         }
240         goto out_clear;
241
242 out:
243         xprt_wake_pending_tasks(xprt, rc);
244
245 out_clear:
246         dprintk("RPC:       %s: exit\n", __func__);
247         xprt_clear_connecting(xprt);
248 }
249
250 /*
251  * xprt_rdma_destroy
252  *
253  * Destroy the xprt.
254  * Free all memory associated with the object, including its own.
255  * NOTE: none of the *destroy methods free memory for their top-level
256  * objects, even though they may have allocated it (they do free
257  * private memory). It's up to the caller to handle it. In this
258  * case (RDMA transport), all structure memory is inlined with the
259  * struct rpcrdma_xprt.
260  */
261 static void
262 xprt_rdma_destroy(struct rpc_xprt *xprt)
263 {
264         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
265         int rc;
266
267         dprintk("RPC:       %s: called\n", __func__);
268
269         cancel_delayed_work(&r_xprt->rdma_connect);
270         flush_scheduled_work();
271
272         xprt_clear_connected(xprt);
273
274         rpcrdma_buffer_destroy(&r_xprt->rx_buf);
275         rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
276         if (rc)
277                 dprintk("RPC:       %s: rpcrdma_ep_destroy returned %i\n",
278                         __func__, rc);
279         rpcrdma_ia_close(&r_xprt->rx_ia);
280
281         xprt_rdma_free_addresses(xprt);
282
283         kfree(xprt->slot);
284         xprt->slot = NULL;
285         kfree(xprt);
286
287         dprintk("RPC:       %s: returning\n", __func__);
288
289         module_put(THIS_MODULE);
290 }
291
292 /**
293  * xprt_setup_rdma - Set up transport to use RDMA
294  *
295  * @args: rpc transport arguments
296  */
297 static struct rpc_xprt *
298 xprt_setup_rdma(struct xprt_create *args)
299 {
300         struct rpcrdma_create_data_internal cdata;
301         struct rpc_xprt *xprt;
302         struct rpcrdma_xprt *new_xprt;
303         struct rpcrdma_ep *new_ep;
304         struct sockaddr_in *sin;
305         int rc;
306
307         if (args->addrlen > sizeof(xprt->addr)) {
308                 dprintk("RPC:       %s: address too large\n", __func__);
309                 return ERR_PTR(-EBADF);
310         }
311
312         xprt = kzalloc(sizeof(struct rpcrdma_xprt), GFP_KERNEL);
313         if (xprt == NULL) {
314                 dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n",
315                         __func__);
316                 return ERR_PTR(-ENOMEM);
317         }
318
319         xprt->max_reqs = xprt_rdma_slot_table_entries;
320         xprt->slot = kcalloc(xprt->max_reqs,
321                                 sizeof(struct rpc_rqst), GFP_KERNEL);
322         if (xprt->slot == NULL) {
323                 dprintk("RPC:       %s: couldn't allocate %d slots\n",
324                         __func__, xprt->max_reqs);
325                 kfree(xprt);
326                 return ERR_PTR(-ENOMEM);
327         }
328
329         /* 60 second timeout, no retries */
330         xprt_set_timeout(&xprt->timeout, 0, 60UL * HZ);
331         xprt->bind_timeout = (60U * HZ);
332         xprt->connect_timeout = (60U * HZ);
333         xprt->reestablish_timeout = (5U * HZ);
334         xprt->idle_timeout = (5U * 60 * HZ);
335
336         xprt->resvport = 0;             /* privileged port not needed */
337         xprt->tsh_size = 0;             /* RPC-RDMA handles framing */
338         xprt->max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE;
339         xprt->ops = &xprt_rdma_procs;
340
341         /*
342          * Set up RDMA-specific connect data.
343          */
344
345         /* Put server RDMA address in local cdata */
346         memcpy(&cdata.addr, args->dstaddr, args->addrlen);
347
348         /* Ensure xprt->addr holds valid server TCP (not RDMA)
349          * address, for any side protocols which peek at it */
350         xprt->prot = IPPROTO_TCP;
351         xprt->addrlen = args->addrlen;
352         memcpy(&xprt->addr, &cdata.addr, xprt->addrlen);
353
354         sin = (struct sockaddr_in *)&cdata.addr;
355         if (ntohs(sin->sin_port) != 0)
356                 xprt_set_bound(xprt);
357
358         dprintk("RPC:       %s: %u.%u.%u.%u:%u\n", __func__,
359                         NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
360
361         /* Set max requests */
362         cdata.max_requests = xprt->max_reqs;
363
364         /* Set some length limits */
365         cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
366         cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
367
368         cdata.inline_wsize = xprt_rdma_max_inline_write;
369         if (cdata.inline_wsize > cdata.wsize)
370                 cdata.inline_wsize = cdata.wsize;
371
372         cdata.inline_rsize = xprt_rdma_max_inline_read;
373         if (cdata.inline_rsize > cdata.rsize)
374                 cdata.inline_rsize = cdata.rsize;
375
376         cdata.padding = xprt_rdma_inline_write_padding;
377
378         /*
379          * Create new transport instance, which includes initialized
380          *  o ia
381          *  o endpoint
382          *  o buffers
383          */
384
385         new_xprt = rpcx_to_rdmax(xprt);
386
387         rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr,
388                                 xprt_rdma_memreg_strategy);
389         if (rc)
390                 goto out1;
391
392         /*
393          * initialize and create ep
394          */
395         new_xprt->rx_data = cdata;
396         new_ep = &new_xprt->rx_ep;
397         new_ep->rep_remote_addr = cdata.addr;
398
399         rc = rpcrdma_ep_create(&new_xprt->rx_ep,
400                                 &new_xprt->rx_ia, &new_xprt->rx_data);
401         if (rc)
402                 goto out2;
403
404         /*
405          * Allocate pre-registered send and receive buffers for headers and
406          * any inline data. Also specify any padding which will be provided
407          * from a preregistered zero buffer.
408          */
409         rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
410                                 &new_xprt->rx_data);
411         if (rc)
412                 goto out3;
413
414         /*
415          * Register a callback for connection events. This is necessary because
416          * connection loss notification is async. We also catch connection loss
417          * when reaping receives.
418          */
419         INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
420         new_ep->rep_func = rpcrdma_conn_func;
421         new_ep->rep_xprt = xprt;
422
423         xprt_rdma_format_addresses(xprt);
424
425         if (!try_module_get(THIS_MODULE))
426                 goto out4;
427
428         return xprt;
429
430 out4:
431         xprt_rdma_free_addresses(xprt);
432         rc = -EINVAL;
433 out3:
434         (void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
435 out2:
436         rpcrdma_ia_close(&new_xprt->rx_ia);
437 out1:
438         kfree(xprt->slot);
439         kfree(xprt);
440         return ERR_PTR(rc);
441 }
442
443 /*
444  * Close a connection, during shutdown or timeout/reconnect
445  */
446 static void
447 xprt_rdma_close(struct rpc_xprt *xprt)
448 {
449         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
450
451         dprintk("RPC:       %s: closing\n", __func__);
452         xprt_disconnect(xprt);
453         (void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
454 }
455
456 static void
457 xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
458 {
459         struct sockaddr_in *sap;
460
461         sap = (struct sockaddr_in *)&xprt->addr;
462         sap->sin_port = htons(port);
463         sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr;
464         sap->sin_port = htons(port);
465         dprintk("RPC:       %s: %u\n", __func__, port);
466 }
467
468 static void
469 xprt_rdma_connect(struct rpc_task *task)
470 {
471         struct rpc_xprt *xprt = (struct rpc_xprt *)task->tk_xprt;
472         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
473
474         if (!xprt_test_and_set_connecting(xprt)) {
475                 if (r_xprt->rx_ep.rep_connected != 0) {
476                         /* Reconnect */
477                         schedule_delayed_work(&r_xprt->rdma_connect,
478                                 xprt->reestablish_timeout);
479                 } else {
480                         schedule_delayed_work(&r_xprt->rdma_connect, 0);
481                         if (!RPC_IS_ASYNC(task))
482                                 flush_scheduled_work();
483                 }
484         }
485 }
486
487 static int
488 xprt_rdma_reserve_xprt(struct rpc_task *task)
489 {
490         struct rpc_xprt *xprt = task->tk_xprt;
491         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
492         int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
493
494         /* == RPC_CWNDSCALE @ init, but *after* setup */
495         if (r_xprt->rx_buf.rb_cwndscale == 0UL) {
496                 r_xprt->rx_buf.rb_cwndscale = xprt->cwnd;
497                 dprintk("RPC:       %s: cwndscale %lu\n", __func__,
498                         r_xprt->rx_buf.rb_cwndscale);
499                 BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
500         }
501         xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
502         return xprt_reserve_xprt_cong(task);
503 }
504
505 /*
506  * The RDMA allocate/free functions need the task structure as a place
507  * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
508  * sequence. For this reason, the recv buffers are attached to send
509  * buffers for portions of the RPC. Note that the RPC layer allocates
510  * both send and receive buffers in the same call. We may register
511  * the receive buffer portion when using reply chunks.
512  */
513 static void *
514 xprt_rdma_allocate(struct rpc_task *task, size_t size)
515 {
516         struct rpc_xprt *xprt = task->tk_xprt;
517         struct rpcrdma_req *req, *nreq;
518
519         req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
520         BUG_ON(NULL == req);
521
522         if (size > req->rl_size) {
523                 dprintk("RPC:       %s: size %zd too large for buffer[%zd]: "
524                         "prog %d vers %d proc %d\n",
525                         __func__, size, req->rl_size,
526                         task->tk_client->cl_prog, task->tk_client->cl_vers,
527                         task->tk_msg.rpc_proc->p_proc);
528                 /*
529                  * Outgoing length shortage. Our inline write max must have
530                  * been configured to perform direct i/o.
531                  *
532                  * This is therefore a large metadata operation, and the
533                  * allocate call was made on the maximum possible message,
534                  * e.g. containing long filename(s) or symlink data. In
535                  * fact, while these metadata operations *might* carry
536                  * large outgoing payloads, they rarely *do*. However, we
537                  * have to commit to the request here, so reallocate and
538                  * register it now. The data path will never require this
539                  * reallocation.
540                  *
541                  * If the allocation or registration fails, the RPC framework
542                  * will (doggedly) retry.
543                  */
544                 if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy ==
545                                 RPCRDMA_BOUNCEBUFFERS) {
546                         /* forced to "pure inline" */
547                         dprintk("RPC:       %s: too much data (%zd) for inline "
548                                         "(r/w max %d/%d)\n", __func__, size,
549                                         rpcx_to_rdmad(xprt).inline_rsize,
550                                         rpcx_to_rdmad(xprt).inline_wsize);
551                         size = req->rl_size;
552                         rpc_exit(task, -EIO);           /* fail the operation */
553                         rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
554                         goto out;
555                 }
556                 if (task->tk_flags & RPC_TASK_SWAPPER)
557                         nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
558                 else
559                         nreq = kmalloc(sizeof *req + size, GFP_NOFS);
560                 if (nreq == NULL)
561                         goto outfail;
562
563                 if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
564                                 nreq->rl_base, size + sizeof(struct rpcrdma_req)
565                                 - offsetof(struct rpcrdma_req, rl_base),
566                                 &nreq->rl_handle, &nreq->rl_iov)) {
567                         kfree(nreq);
568                         goto outfail;
569                 }
570                 rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
571                 nreq->rl_size = size;
572                 nreq->rl_niovs = 0;
573                 nreq->rl_nchunks = 0;
574                 nreq->rl_buffer = (struct rpcrdma_buffer *)req;
575                 nreq->rl_reply = req->rl_reply;
576                 memcpy(nreq->rl_segments,
577                         req->rl_segments, sizeof nreq->rl_segments);
578                 /* flag the swap with an unused field */
579                 nreq->rl_iov.length = 0;
580                 req->rl_reply = NULL;
581                 req = nreq;
582         }
583         dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
584 out:
585         return req->rl_xdr_buf;
586
587 outfail:
588         rpcrdma_buffer_put(req);
589         rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
590         return NULL;
591 }
592
593 /*
594  * This function returns all RDMA resources to the pool.
595  */
596 static void
597 xprt_rdma_free(void *buffer)
598 {
599         struct rpcrdma_req *req;
600         struct rpcrdma_xprt *r_xprt;
601         struct rpcrdma_rep *rep;
602         int i;
603
604         if (buffer == NULL)
605                 return;
606
607         req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
608         r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
609         rep = req->rl_reply;
610
611         dprintk("RPC:       %s: called on 0x%p%s\n",
612                 __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
613
614         /*
615          * Finish the deregistration. When using mw bind, this was
616          * begun in rpcrdma_reply_handler(). In all other modes, we
617          * do it here, in thread context. The process is considered
618          * complete when the rr_func vector becomes NULL - this
619          * was put in place during rpcrdma_reply_handler() - the wait
620          * call below will not block if the dereg is "done". If
621          * interrupted, our framework will clean up.
622          */
623         for (i = 0; req->rl_nchunks;) {
624                 --req->rl_nchunks;
625                 i += rpcrdma_deregister_external(
626                         &req->rl_segments[i], r_xprt, NULL);
627         }
628
629         if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) {
630                 rep->rr_func = NULL;    /* abandon the callback */
631                 req->rl_reply = NULL;
632         }
633
634         if (req->rl_iov.length == 0) {  /* see allocate above */
635                 struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
636                 oreq->rl_reply = req->rl_reply;
637                 (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
638                                                    req->rl_handle,
639                                                    &req->rl_iov);
640                 kfree(req);
641                 req = oreq;
642         }
643
644         /* Put back request+reply buffers */
645         rpcrdma_buffer_put(req);
646 }
647
648 /*
649  * send_request invokes the meat of RPC RDMA. It must do the following:
650  *  1.  Marshal the RPC request into an RPC RDMA request, which means
651  *      putting a header in front of data, and creating IOVs for RDMA
652  *      from those in the request.
653  *  2.  In marshaling, detect opportunities for RDMA, and use them.
654  *  3.  Post a recv message to set up asynch completion, then send
655  *      the request (rpcrdma_ep_post).
656  *  4.  No partial sends are possible in the RPC-RDMA protocol (as in UDP).
657  */
658
659 static int
660 xprt_rdma_send_request(struct rpc_task *task)
661 {
662         struct rpc_rqst *rqst = task->tk_rqstp;
663         struct rpc_xprt *xprt = task->tk_xprt;
664         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
665         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
666
667         /* marshal the send itself */
668         if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
669                 r_xprt->rx_stats.failed_marshal_count++;
670                 dprintk("RPC:       %s: rpcrdma_marshal_req failed\n",
671                         __func__);
672                 return -EIO;
673         }
674
675         if (req->rl_reply == NULL)              /* e.g. reconnection */
676                 rpcrdma_recv_buffer_get(req);
677
678         if (req->rl_reply) {
679                 req->rl_reply->rr_func = rpcrdma_reply_handler;
680                 /* this need only be done once, but... */
681                 req->rl_reply->rr_xprt = xprt;
682         }
683
684         if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) {
685                 xprt_disconnect(xprt);
686                 return -ENOTCONN;       /* implies disconnect */
687         }
688
689         rqst->rq_bytes_sent = 0;
690         return 0;
691 }
692
693 static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
694 {
695         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
696         long idle_time = 0;
697
698         if (xprt_connected(xprt))
699                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
700
701         seq_printf(seq,
702           "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu "
703           "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n",
704
705            0,   /* need a local port? */
706            xprt->stat.bind_count,
707            xprt->stat.connect_count,
708            xprt->stat.connect_time,
709            idle_time,
710            xprt->stat.sends,
711            xprt->stat.recvs,
712            xprt->stat.bad_xids,
713            xprt->stat.req_u,
714            xprt->stat.bklog_u,
715
716            r_xprt->rx_stats.read_chunk_count,
717            r_xprt->rx_stats.write_chunk_count,
718            r_xprt->rx_stats.reply_chunk_count,
719            r_xprt->rx_stats.total_rdma_request,
720            r_xprt->rx_stats.total_rdma_reply,
721            r_xprt->rx_stats.pullup_copy_count,
722            r_xprt->rx_stats.fixup_copy_count,
723            r_xprt->rx_stats.hardway_register_count,
724            r_xprt->rx_stats.failed_marshal_count,
725            r_xprt->rx_stats.bad_reply_count);
726 }
727
728 /*
729  * Plumbing for rpc transport switch and kernel module
730  */
731
732 static struct rpc_xprt_ops xprt_rdma_procs = {
733         .reserve_xprt           = xprt_rdma_reserve_xprt,
734         .release_xprt           = xprt_release_xprt_cong, /* sunrpc/xprt.c */
735         .release_request        = xprt_release_rqst_cong,       /* ditto */
736         .set_retrans_timeout    = xprt_set_retrans_timeout_def, /* ditto */
737         .rpcbind                = rpcb_getport_async,   /* sunrpc/rpcb_clnt.c */
738         .set_port               = xprt_rdma_set_port,
739         .connect                = xprt_rdma_connect,
740         .buf_alloc              = xprt_rdma_allocate,
741         .buf_free               = xprt_rdma_free,
742         .send_request           = xprt_rdma_send_request,
743         .close                  = xprt_rdma_close,
744         .destroy                = xprt_rdma_destroy,
745         .print_stats            = xprt_rdma_print_stats
746 };
747
748 static struct xprt_class xprt_rdma = {
749         .list                   = LIST_HEAD_INIT(xprt_rdma.list),
750         .name                   = "rdma",
751         .owner                  = THIS_MODULE,
752         .ident                  = XPRT_TRANSPORT_RDMA,
753         .setup                  = xprt_setup_rdma,
754 };
755
756 static void __exit xprt_rdma_cleanup(void)
757 {
758         int rc;
759
760         dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
761 #ifdef RPC_DEBUG
762         if (sunrpc_table_header) {
763                 unregister_sysctl_table(sunrpc_table_header);
764                 sunrpc_table_header = NULL;
765         }
766 #endif
767         rc = xprt_unregister_transport(&xprt_rdma);
768         if (rc)
769                 dprintk("RPC:       %s: xprt_unregister returned %i\n",
770                         __func__, rc);
771 }
772
773 static int __init xprt_rdma_init(void)
774 {
775         int rc;
776
777         rc = xprt_register_transport(&xprt_rdma);
778
779         if (rc)
780                 return rc;
781
782         dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n");
783
784         dprintk(KERN_INFO "Defaults:\n");
785         dprintk(KERN_INFO "\tSlots %d\n"
786                 "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
787                 xprt_rdma_slot_table_entries,
788                 xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
789         dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n",
790                 xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);
791
792 #ifdef RPC_DEBUG
793         if (!sunrpc_table_header)
794                 sunrpc_table_header = register_sysctl_table(sunrpc_table);
795 #endif
796         return 0;
797 }
798
799 module_init(xprt_rdma_init);
800 module_exit(xprt_rdma_cleanup);