RPC/RDMA: support FRMR client memory registration.
[linux-2.6] / net / sunrpc / xprtrdma / transport.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * transport.c
42  *
43  * This file contains the top-level implementation of an RPC RDMA
44  * transport.
45  *
46  * Naming convention: functions beginning with xprt_ are part of the
47  * transport switch. All others are RPC RDMA internal.
48  */
49
50 #include <linux/module.h>
51 #include <linux/init.h>
52 #include <linux/seq_file.h>
53
54 #include "xprt_rdma.h"
55
56 #ifdef RPC_DEBUG
57 # define RPCDBG_FACILITY        RPCDBG_TRANS
58 #endif
59
60 MODULE_LICENSE("Dual BSD/GPL");
61
62 MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS");
63 MODULE_AUTHOR("Network Appliance, Inc.");
64
65 /*
66  * tunables
67  */
68
69 static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
70 static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
71 static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
72 static unsigned int xprt_rdma_inline_write_padding;
73 static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR;
74
75 #ifdef RPC_DEBUG
76
77 static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
78 static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
79 static unsigned int zero;
80 static unsigned int max_padding = PAGE_SIZE;
81 static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
82 static unsigned int max_memreg = RPCRDMA_LAST - 1;
83
84 static struct ctl_table_header *sunrpc_table_header;
85
86 static ctl_table xr_tunables_table[] = {
87         {
88                 .ctl_name       = CTL_UNNUMBERED,
89                 .procname       = "rdma_slot_table_entries",
90                 .data           = &xprt_rdma_slot_table_entries,
91                 .maxlen         = sizeof(unsigned int),
92                 .mode           = 0644,
93                 .proc_handler   = &proc_dointvec_minmax,
94                 .strategy       = &sysctl_intvec,
95                 .extra1         = &min_slot_table_size,
96                 .extra2         = &max_slot_table_size
97         },
98         {
99                 .ctl_name       = CTL_UNNUMBERED,
100                 .procname       = "rdma_max_inline_read",
101                 .data           = &xprt_rdma_max_inline_read,
102                 .maxlen         = sizeof(unsigned int),
103                 .mode           = 0644,
104                 .proc_handler   = &proc_dointvec,
105                 .strategy       = &sysctl_intvec,
106         },
107         {
108                 .ctl_name       = CTL_UNNUMBERED,
109                 .procname       = "rdma_max_inline_write",
110                 .data           = &xprt_rdma_max_inline_write,
111                 .maxlen         = sizeof(unsigned int),
112                 .mode           = 0644,
113                 .proc_handler   = &proc_dointvec,
114                 .strategy       = &sysctl_intvec,
115         },
116         {
117                 .ctl_name       = CTL_UNNUMBERED,
118                 .procname       = "rdma_inline_write_padding",
119                 .data           = &xprt_rdma_inline_write_padding,
120                 .maxlen         = sizeof(unsigned int),
121                 .mode           = 0644,
122                 .proc_handler   = &proc_dointvec_minmax,
123                 .strategy       = &sysctl_intvec,
124                 .extra1         = &zero,
125                 .extra2         = &max_padding,
126         },
127         {
128                 .ctl_name       = CTL_UNNUMBERED,
129                 .procname       = "rdma_memreg_strategy",
130                 .data           = &xprt_rdma_memreg_strategy,
131                 .maxlen         = sizeof(unsigned int),
132                 .mode           = 0644,
133                 .proc_handler   = &proc_dointvec_minmax,
134                 .strategy       = &sysctl_intvec,
135                 .extra1         = &min_memreg,
136                 .extra2         = &max_memreg,
137         },
138         {
139                 .ctl_name = 0,
140         },
141 };
142
143 static ctl_table sunrpc_table[] = {
144         {
145                 .ctl_name       = CTL_SUNRPC,
146                 .procname       = "sunrpc",
147                 .mode           = 0555,
148                 .child          = xr_tunables_table
149         },
150         {
151                 .ctl_name = 0,
152         },
153 };
154
155 #endif
156
157 static struct rpc_xprt_ops xprt_rdma_procs;     /* forward reference */
158
159 static void
160 xprt_rdma_format_addresses(struct rpc_xprt *xprt)
161 {
162         struct sockaddr_in *addr = (struct sockaddr_in *)
163                                         &rpcx_to_rdmad(xprt).addr;
164         char *buf;
165
166         buf = kzalloc(20, GFP_KERNEL);
167         if (buf)
168                 snprintf(buf, 20, NIPQUAD_FMT, NIPQUAD(addr->sin_addr.s_addr));
169         xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
170
171         buf = kzalloc(8, GFP_KERNEL);
172         if (buf)
173                 snprintf(buf, 8, "%u", ntohs(addr->sin_port));
174         xprt->address_strings[RPC_DISPLAY_PORT] = buf;
175
176         xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
177
178         buf = kzalloc(48, GFP_KERNEL);
179         if (buf)
180                 snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
181                         NIPQUAD(addr->sin_addr.s_addr),
182                         ntohs(addr->sin_port), "rdma");
183         xprt->address_strings[RPC_DISPLAY_ALL] = buf;
184
185         buf = kzalloc(10, GFP_KERNEL);
186         if (buf)
187                 snprintf(buf, 10, "%02x%02x%02x%02x",
188                         NIPQUAD(addr->sin_addr.s_addr));
189         xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
190
191         buf = kzalloc(8, GFP_KERNEL);
192         if (buf)
193                 snprintf(buf, 8, "%4hx", ntohs(addr->sin_port));
194         xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
195
196         buf = kzalloc(30, GFP_KERNEL);
197         if (buf)
198                 snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
199                         NIPQUAD(addr->sin_addr.s_addr),
200                         ntohs(addr->sin_port) >> 8,
201                         ntohs(addr->sin_port) & 0xff);
202         xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
203
204         /* netid */
205         xprt->address_strings[RPC_DISPLAY_NETID] = "rdma";
206 }
207
208 static void
209 xprt_rdma_free_addresses(struct rpc_xprt *xprt)
210 {
211         unsigned int i;
212
213         for (i = 0; i < RPC_DISPLAY_MAX; i++)
214                 switch (i) {
215                 case RPC_DISPLAY_PROTO:
216                 case RPC_DISPLAY_NETID:
217                         continue;
218                 default:
219                         kfree(xprt->address_strings[i]);
220                 }
221 }
222
223 static void
224 xprt_rdma_connect_worker(struct work_struct *work)
225 {
226         struct rpcrdma_xprt *r_xprt =
227                 container_of(work, struct rpcrdma_xprt, rdma_connect.work);
228         struct rpc_xprt *xprt = &r_xprt->xprt;
229         int rc = 0;
230
231         if (!xprt->shutdown) {
232                 xprt_clear_connected(xprt);
233
234                 dprintk("RPC:       %s: %sconnect\n", __func__,
235                                 r_xprt->rx_ep.rep_connected != 0 ? "re" : "");
236                 rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
237                 if (rc)
238                         goto out;
239         }
240         goto out_clear;
241
242 out:
243         xprt_wake_pending_tasks(xprt, rc);
244
245 out_clear:
246         dprintk("RPC:       %s: exit\n", __func__);
247         xprt_clear_connecting(xprt);
248 }
249
250 /*
251  * xprt_rdma_destroy
252  *
253  * Destroy the xprt.
254  * Free all memory associated with the object, including its own.
255  * NOTE: none of the *destroy methods free memory for their top-level
256  * objects, even though they may have allocated it (they do free
257  * private memory). It's up to the caller to handle it. In this
258  * case (RDMA transport), all structure memory is inlined with the
259  * struct rpcrdma_xprt.
260  */
261 static void
262 xprt_rdma_destroy(struct rpc_xprt *xprt)
263 {
264         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
265         int rc;
266
267         dprintk("RPC:       %s: called\n", __func__);
268
269         cancel_delayed_work(&r_xprt->rdma_connect);
270         flush_scheduled_work();
271
272         xprt_clear_connected(xprt);
273
274         rpcrdma_buffer_destroy(&r_xprt->rx_buf);
275         rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
276         if (rc)
277                 dprintk("RPC:       %s: rpcrdma_ep_destroy returned %i\n",
278                         __func__, rc);
279         rpcrdma_ia_close(&r_xprt->rx_ia);
280
281         xprt_rdma_free_addresses(xprt);
282
283         kfree(xprt->slot);
284         xprt->slot = NULL;
285         kfree(xprt);
286
287         dprintk("RPC:       %s: returning\n", __func__);
288
289         module_put(THIS_MODULE);
290 }
291
292 static const struct rpc_timeout xprt_rdma_default_timeout = {
293         .to_initval = 60 * HZ,
294         .to_maxval = 60 * HZ,
295 };
296
297 /**
298  * xprt_setup_rdma - Set up transport to use RDMA
299  *
300  * @args: rpc transport arguments
301  */
302 static struct rpc_xprt *
303 xprt_setup_rdma(struct xprt_create *args)
304 {
305         struct rpcrdma_create_data_internal cdata;
306         struct rpc_xprt *xprt;
307         struct rpcrdma_xprt *new_xprt;
308         struct rpcrdma_ep *new_ep;
309         struct sockaddr_in *sin;
310         int rc;
311
312         if (args->addrlen > sizeof(xprt->addr)) {
313                 dprintk("RPC:       %s: address too large\n", __func__);
314                 return ERR_PTR(-EBADF);
315         }
316
317         xprt = kzalloc(sizeof(struct rpcrdma_xprt), GFP_KERNEL);
318         if (xprt == NULL) {
319                 dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n",
320                         __func__);
321                 return ERR_PTR(-ENOMEM);
322         }
323
324         xprt->max_reqs = xprt_rdma_slot_table_entries;
325         xprt->slot = kcalloc(xprt->max_reqs,
326                                 sizeof(struct rpc_rqst), GFP_KERNEL);
327         if (xprt->slot == NULL) {
328                 dprintk("RPC:       %s: couldn't allocate %d slots\n",
329                         __func__, xprt->max_reqs);
330                 kfree(xprt);
331                 return ERR_PTR(-ENOMEM);
332         }
333
334         /* 60 second timeout, no retries */
335         xprt->timeout = &xprt_rdma_default_timeout;
336         xprt->bind_timeout = (60U * HZ);
337         xprt->connect_timeout = (60U * HZ);
338         xprt->reestablish_timeout = (5U * HZ);
339         xprt->idle_timeout = (5U * 60 * HZ);
340
341         xprt->resvport = 0;             /* privileged port not needed */
342         xprt->tsh_size = 0;             /* RPC-RDMA handles framing */
343         xprt->max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE;
344         xprt->ops = &xprt_rdma_procs;
345
346         /*
347          * Set up RDMA-specific connect data.
348          */
349
350         /* Put server RDMA address in local cdata */
351         memcpy(&cdata.addr, args->dstaddr, args->addrlen);
352
353         /* Ensure xprt->addr holds valid server TCP (not RDMA)
354          * address, for any side protocols which peek at it */
355         xprt->prot = IPPROTO_TCP;
356         xprt->addrlen = args->addrlen;
357         memcpy(&xprt->addr, &cdata.addr, xprt->addrlen);
358
359         sin = (struct sockaddr_in *)&cdata.addr;
360         if (ntohs(sin->sin_port) != 0)
361                 xprt_set_bound(xprt);
362
363         dprintk("RPC:       %s: %u.%u.%u.%u:%u\n", __func__,
364                         NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
365
366         /* Set max requests */
367         cdata.max_requests = xprt->max_reqs;
368
369         /* Set some length limits */
370         cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
371         cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
372
373         cdata.inline_wsize = xprt_rdma_max_inline_write;
374         if (cdata.inline_wsize > cdata.wsize)
375                 cdata.inline_wsize = cdata.wsize;
376
377         cdata.inline_rsize = xprt_rdma_max_inline_read;
378         if (cdata.inline_rsize > cdata.rsize)
379                 cdata.inline_rsize = cdata.rsize;
380
381         cdata.padding = xprt_rdma_inline_write_padding;
382
383         /*
384          * Create new transport instance, which includes initialized
385          *  o ia
386          *  o endpoint
387          *  o buffers
388          */
389
390         new_xprt = rpcx_to_rdmax(xprt);
391
392         rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr,
393                                 xprt_rdma_memreg_strategy);
394         if (rc)
395                 goto out1;
396
397         /*
398          * initialize and create ep
399          */
400         new_xprt->rx_data = cdata;
401         new_ep = &new_xprt->rx_ep;
402         new_ep->rep_remote_addr = cdata.addr;
403
404         rc = rpcrdma_ep_create(&new_xprt->rx_ep,
405                                 &new_xprt->rx_ia, &new_xprt->rx_data);
406         if (rc)
407                 goto out2;
408
409         /*
410          * Allocate pre-registered send and receive buffers for headers and
411          * any inline data. Also specify any padding which will be provided
412          * from a preregistered zero buffer.
413          */
414         rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
415                                 &new_xprt->rx_data);
416         if (rc)
417                 goto out3;
418
419         /*
420          * Register a callback for connection events. This is necessary because
421          * connection loss notification is async. We also catch connection loss
422          * when reaping receives.
423          */
424         INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
425         new_ep->rep_func = rpcrdma_conn_func;
426         new_ep->rep_xprt = xprt;
427
428         xprt_rdma_format_addresses(xprt);
429
430         if (!try_module_get(THIS_MODULE))
431                 goto out4;
432
433         return xprt;
434
435 out4:
436         xprt_rdma_free_addresses(xprt);
437         rc = -EINVAL;
438 out3:
439         (void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
440 out2:
441         rpcrdma_ia_close(&new_xprt->rx_ia);
442 out1:
443         kfree(xprt->slot);
444         kfree(xprt);
445         return ERR_PTR(rc);
446 }
447
448 /*
449  * Close a connection, during shutdown or timeout/reconnect
450  */
451 static void
452 xprt_rdma_close(struct rpc_xprt *xprt)
453 {
454         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
455
456         dprintk("RPC:       %s: closing\n", __func__);
457         xprt_disconnect_done(xprt);
458         (void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
459 }
460
461 static void
462 xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
463 {
464         struct sockaddr_in *sap;
465
466         sap = (struct sockaddr_in *)&xprt->addr;
467         sap->sin_port = htons(port);
468         sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr;
469         sap->sin_port = htons(port);
470         dprintk("RPC:       %s: %u\n", __func__, port);
471 }
472
473 static void
474 xprt_rdma_connect(struct rpc_task *task)
475 {
476         struct rpc_xprt *xprt = (struct rpc_xprt *)task->tk_xprt;
477         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
478
479         if (!xprt_test_and_set_connecting(xprt)) {
480                 if (r_xprt->rx_ep.rep_connected != 0) {
481                         /* Reconnect */
482                         schedule_delayed_work(&r_xprt->rdma_connect,
483                                 xprt->reestablish_timeout);
484                 } else {
485                         schedule_delayed_work(&r_xprt->rdma_connect, 0);
486                         if (!RPC_IS_ASYNC(task))
487                                 flush_scheduled_work();
488                 }
489         }
490 }
491
492 static int
493 xprt_rdma_reserve_xprt(struct rpc_task *task)
494 {
495         struct rpc_xprt *xprt = task->tk_xprt;
496         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
497         int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
498
499         /* == RPC_CWNDSCALE @ init, but *after* setup */
500         if (r_xprt->rx_buf.rb_cwndscale == 0UL) {
501                 r_xprt->rx_buf.rb_cwndscale = xprt->cwnd;
502                 dprintk("RPC:       %s: cwndscale %lu\n", __func__,
503                         r_xprt->rx_buf.rb_cwndscale);
504                 BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
505         }
506         xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
507         return xprt_reserve_xprt_cong(task);
508 }
509
510 /*
511  * The RDMA allocate/free functions need the task structure as a place
512  * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
513  * sequence. For this reason, the recv buffers are attached to send
514  * buffers for portions of the RPC. Note that the RPC layer allocates
515  * both send and receive buffers in the same call. We may register
516  * the receive buffer portion when using reply chunks.
517  */
518 static void *
519 xprt_rdma_allocate(struct rpc_task *task, size_t size)
520 {
521         struct rpc_xprt *xprt = task->tk_xprt;
522         struct rpcrdma_req *req, *nreq;
523
524         req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
525         BUG_ON(NULL == req);
526
527         if (size > req->rl_size) {
528                 dprintk("RPC:       %s: size %zd too large for buffer[%zd]: "
529                         "prog %d vers %d proc %d\n",
530                         __func__, size, req->rl_size,
531                         task->tk_client->cl_prog, task->tk_client->cl_vers,
532                         task->tk_msg.rpc_proc->p_proc);
533                 /*
534                  * Outgoing length shortage. Our inline write max must have
535                  * been configured to perform direct i/o.
536                  *
537                  * This is therefore a large metadata operation, and the
538                  * allocate call was made on the maximum possible message,
539                  * e.g. containing long filename(s) or symlink data. In
540                  * fact, while these metadata operations *might* carry
541                  * large outgoing payloads, they rarely *do*. However, we
542                  * have to commit to the request here, so reallocate and
543                  * register it now. The data path will never require this
544                  * reallocation.
545                  *
546                  * If the allocation or registration fails, the RPC framework
547                  * will (doggedly) retry.
548                  */
549                 if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy ==
550                                 RPCRDMA_BOUNCEBUFFERS) {
551                         /* forced to "pure inline" */
552                         dprintk("RPC:       %s: too much data (%zd) for inline "
553                                         "(r/w max %d/%d)\n", __func__, size,
554                                         rpcx_to_rdmad(xprt).inline_rsize,
555                                         rpcx_to_rdmad(xprt).inline_wsize);
556                         size = req->rl_size;
557                         rpc_exit(task, -EIO);           /* fail the operation */
558                         rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
559                         goto out;
560                 }
561                 if (task->tk_flags & RPC_TASK_SWAPPER)
562                         nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
563                 else
564                         nreq = kmalloc(sizeof *req + size, GFP_NOFS);
565                 if (nreq == NULL)
566                         goto outfail;
567
568                 if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
569                                 nreq->rl_base, size + sizeof(struct rpcrdma_req)
570                                 - offsetof(struct rpcrdma_req, rl_base),
571                                 &nreq->rl_handle, &nreq->rl_iov)) {
572                         kfree(nreq);
573                         goto outfail;
574                 }
575                 rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
576                 nreq->rl_size = size;
577                 nreq->rl_niovs = 0;
578                 nreq->rl_nchunks = 0;
579                 nreq->rl_buffer = (struct rpcrdma_buffer *)req;
580                 nreq->rl_reply = req->rl_reply;
581                 memcpy(nreq->rl_segments,
582                         req->rl_segments, sizeof nreq->rl_segments);
583                 /* flag the swap with an unused field */
584                 nreq->rl_iov.length = 0;
585                 req->rl_reply = NULL;
586                 req = nreq;
587         }
588         dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
589 out:
590         return req->rl_xdr_buf;
591
592 outfail:
593         rpcrdma_buffer_put(req);
594         rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
595         return NULL;
596 }
597
598 /*
599  * This function returns all RDMA resources to the pool.
600  */
601 static void
602 xprt_rdma_free(void *buffer)
603 {
604         struct rpcrdma_req *req;
605         struct rpcrdma_xprt *r_xprt;
606         struct rpcrdma_rep *rep;
607         int i;
608
609         if (buffer == NULL)
610                 return;
611
612         req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
613         if (req->rl_iov.length == 0) {  /* see allocate above */
614                 r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer,
615                                       struct rpcrdma_xprt, rx_buf);
616         } else
617                 r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
618         rep = req->rl_reply;
619
620         dprintk("RPC:       %s: called on 0x%p%s\n",
621                 __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
622
623         /*
624          * Finish the deregistration. When using mw bind, this was
625          * begun in rpcrdma_reply_handler(). In all other modes, we
626          * do it here, in thread context. The process is considered
627          * complete when the rr_func vector becomes NULL - this
628          * was put in place during rpcrdma_reply_handler() - the wait
629          * call below will not block if the dereg is "done". If
630          * interrupted, our framework will clean up.
631          */
632         for (i = 0; req->rl_nchunks;) {
633                 --req->rl_nchunks;
634                 i += rpcrdma_deregister_external(
635                         &req->rl_segments[i], r_xprt, NULL);
636         }
637
638         if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) {
639                 rep->rr_func = NULL;    /* abandon the callback */
640                 req->rl_reply = NULL;
641         }
642
643         if (req->rl_iov.length == 0) {  /* see allocate above */
644                 struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
645                 oreq->rl_reply = req->rl_reply;
646                 (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
647                                                    req->rl_handle,
648                                                    &req->rl_iov);
649                 kfree(req);
650                 req = oreq;
651         }
652
653         /* Put back request+reply buffers */
654         rpcrdma_buffer_put(req);
655 }
656
657 /*
658  * send_request invokes the meat of RPC RDMA. It must do the following:
659  *  1.  Marshal the RPC request into an RPC RDMA request, which means
660  *      putting a header in front of data, and creating IOVs for RDMA
661  *      from those in the request.
662  *  2.  In marshaling, detect opportunities for RDMA, and use them.
663  *  3.  Post a recv message to set up asynch completion, then send
664  *      the request (rpcrdma_ep_post).
665  *  4.  No partial sends are possible in the RPC-RDMA protocol (as in UDP).
666  */
667
668 static int
669 xprt_rdma_send_request(struct rpc_task *task)
670 {
671         struct rpc_rqst *rqst = task->tk_rqstp;
672         struct rpc_xprt *xprt = task->tk_xprt;
673         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
674         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
675
676         /* marshal the send itself */
677         if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
678                 r_xprt->rx_stats.failed_marshal_count++;
679                 dprintk("RPC:       %s: rpcrdma_marshal_req failed\n",
680                         __func__);
681                 return -EIO;
682         }
683
684         if (req->rl_reply == NULL)              /* e.g. reconnection */
685                 rpcrdma_recv_buffer_get(req);
686
687         if (req->rl_reply) {
688                 req->rl_reply->rr_func = rpcrdma_reply_handler;
689                 /* this need only be done once, but... */
690                 req->rl_reply->rr_xprt = xprt;
691         }
692
693         if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) {
694                 xprt_disconnect_done(xprt);
695                 return -ENOTCONN;       /* implies disconnect */
696         }
697
698         rqst->rq_bytes_sent = 0;
699         return 0;
700 }
701
702 static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
703 {
704         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
705         long idle_time = 0;
706
707         if (xprt_connected(xprt))
708                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
709
710         seq_printf(seq,
711           "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu "
712           "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n",
713
714            0,   /* need a local port? */
715            xprt->stat.bind_count,
716            xprt->stat.connect_count,
717            xprt->stat.connect_time,
718            idle_time,
719            xprt->stat.sends,
720            xprt->stat.recvs,
721            xprt->stat.bad_xids,
722            xprt->stat.req_u,
723            xprt->stat.bklog_u,
724
725            r_xprt->rx_stats.read_chunk_count,
726            r_xprt->rx_stats.write_chunk_count,
727            r_xprt->rx_stats.reply_chunk_count,
728            r_xprt->rx_stats.total_rdma_request,
729            r_xprt->rx_stats.total_rdma_reply,
730            r_xprt->rx_stats.pullup_copy_count,
731            r_xprt->rx_stats.fixup_copy_count,
732            r_xprt->rx_stats.hardway_register_count,
733            r_xprt->rx_stats.failed_marshal_count,
734            r_xprt->rx_stats.bad_reply_count);
735 }
736
737 /*
738  * Plumbing for rpc transport switch and kernel module
739  */
740
741 static struct rpc_xprt_ops xprt_rdma_procs = {
742         .reserve_xprt           = xprt_rdma_reserve_xprt,
743         .release_xprt           = xprt_release_xprt_cong, /* sunrpc/xprt.c */
744         .release_request        = xprt_release_rqst_cong,       /* ditto */
745         .set_retrans_timeout    = xprt_set_retrans_timeout_def, /* ditto */
746         .rpcbind                = rpcb_getport_async,   /* sunrpc/rpcb_clnt.c */
747         .set_port               = xprt_rdma_set_port,
748         .connect                = xprt_rdma_connect,
749         .buf_alloc              = xprt_rdma_allocate,
750         .buf_free               = xprt_rdma_free,
751         .send_request           = xprt_rdma_send_request,
752         .close                  = xprt_rdma_close,
753         .destroy                = xprt_rdma_destroy,
754         .print_stats            = xprt_rdma_print_stats
755 };
756
757 static struct xprt_class xprt_rdma = {
758         .list                   = LIST_HEAD_INIT(xprt_rdma.list),
759         .name                   = "rdma",
760         .owner                  = THIS_MODULE,
761         .ident                  = XPRT_TRANSPORT_RDMA,
762         .setup                  = xprt_setup_rdma,
763 };
764
765 static void __exit xprt_rdma_cleanup(void)
766 {
767         int rc;
768
769         dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
770 #ifdef RPC_DEBUG
771         if (sunrpc_table_header) {
772                 unregister_sysctl_table(sunrpc_table_header);
773                 sunrpc_table_header = NULL;
774         }
775 #endif
776         rc = xprt_unregister_transport(&xprt_rdma);
777         if (rc)
778                 dprintk("RPC:       %s: xprt_unregister returned %i\n",
779                         __func__, rc);
780 }
781
782 static int __init xprt_rdma_init(void)
783 {
784         int rc;
785
786         rc = xprt_register_transport(&xprt_rdma);
787
788         if (rc)
789                 return rc;
790
791         dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n");
792
793         dprintk(KERN_INFO "Defaults:\n");
794         dprintk(KERN_INFO "\tSlots %d\n"
795                 "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
796                 xprt_rdma_slot_table_entries,
797                 xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
798         dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n",
799                 xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);
800
801 #ifdef RPC_DEBUG
802         if (!sunrpc_table_header)
803                 sunrpc_table_header = register_sysctl_table(sunrpc_table);
804 #endif
805         return 0;
806 }
807
808 module_init(xprt_rdma_init);
809 module_exit(xprt_rdma_cleanup);