RPC/RDMA: support FRMR client memory registration.
[linux-2.6] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/pci.h>  /* for Tavor hack below */
51
52 #include "xprt_rdma.h"
53
54 /*
55  * Globals/Macros
56  */
57
58 #ifdef RPC_DEBUG
59 # define RPCDBG_FACILITY        RPCDBG_TRANS
60 #endif
61
62 /*
63  * internal functions
64  */
65
66 /*
67  * handle replies in tasklet context, using a single, global list
68  * rdma tasklet function -- just turn around and call the func
69  * for all replies on the list
70  */
71
72 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73 static LIST_HEAD(rpcrdma_tasklets_g);
74
75 static void
76 rpcrdma_run_tasklet(unsigned long data)
77 {
78         struct rpcrdma_rep *rep;
79         void (*func)(struct rpcrdma_rep *);
80         unsigned long flags;
81
82         data = data;
83         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84         while (!list_empty(&rpcrdma_tasklets_g)) {
85                 rep = list_entry(rpcrdma_tasklets_g.next,
86                                  struct rpcrdma_rep, rr_list);
87                 list_del(&rep->rr_list);
88                 func = rep->rr_func;
89                 rep->rr_func = NULL;
90                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
91
92                 if (func)
93                         func(rep);
94                 else
95                         rpcrdma_recv_buffer_put(rep);
96
97                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
98         }
99         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100 }
101
102 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
103
104 static inline void
105 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
106 {
107         unsigned long flags;
108
109         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110         list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112         tasklet_schedule(&rpcrdma_tasklet_g);
113 }
114
115 static void
116 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
117 {
118         struct rpcrdma_ep *ep = context;
119
120         dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
121                 __func__, event->event, event->device->name, context);
122         if (ep->rep_connected == 1) {
123                 ep->rep_connected = -EIO;
124                 ep->rep_func(ep);
125                 wake_up_all(&ep->rep_connect_wait);
126         }
127 }
128
129 static void
130 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
131 {
132         struct rpcrdma_ep *ep = context;
133
134         dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
135                 __func__, event->event, event->device->name, context);
136         if (ep->rep_connected == 1) {
137                 ep->rep_connected = -EIO;
138                 ep->rep_func(ep);
139                 wake_up_all(&ep->rep_connect_wait);
140         }
141 }
142
143 static inline
144 void rpcrdma_event_process(struct ib_wc *wc)
145 {
146         struct rpcrdma_rep *rep =
147                         (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
148
149         dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
150                 __func__, rep, wc->status, wc->opcode, wc->byte_len);
151
152         if (!rep) /* send or bind completion that we don't care about */
153                 return;
154
155         if (IB_WC_SUCCESS != wc->status) {
156                 dprintk("RPC:       %s: %s WC status %X, connection lost\n",
157                         __func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158                          wc->status);
159                 rep->rr_len = ~0U;
160                 rpcrdma_schedule_tasklet(rep);
161                 return;
162         }
163
164         switch (wc->opcode) {
165         case IB_WC_RECV:
166                 rep->rr_len = wc->byte_len;
167                 ib_dma_sync_single_for_cpu(
168                         rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169                         rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170                 /* Keep (only) the most recent credits, after check validity */
171                 if (rep->rr_len >= 16) {
172                         struct rpcrdma_msg *p =
173                                         (struct rpcrdma_msg *) rep->rr_base;
174                         unsigned int credits = ntohl(p->rm_credit);
175                         if (credits == 0) {
176                                 dprintk("RPC:       %s: server"
177                                         " dropped credits to 0!\n", __func__);
178                                 /* don't deadlock */
179                                 credits = 1;
180                         } else if (credits > rep->rr_buffer->rb_max_requests) {
181                                 dprintk("RPC:       %s: server"
182                                         " over-crediting: %d (%d)\n",
183                                         __func__, credits,
184                                         rep->rr_buffer->rb_max_requests);
185                                 credits = rep->rr_buffer->rb_max_requests;
186                         }
187                         atomic_set(&rep->rr_buffer->rb_credits, credits);
188                 }
189                 /* fall through */
190         case IB_WC_BIND_MW:
191                 rpcrdma_schedule_tasklet(rep);
192                 break;
193         default:
194                 dprintk("RPC:       %s: unexpected WC event %X\n",
195                         __func__, wc->opcode);
196                 break;
197         }
198 }
199
200 static inline int
201 rpcrdma_cq_poll(struct ib_cq *cq)
202 {
203         struct ib_wc wc;
204         int rc;
205
206         for (;;) {
207                 rc = ib_poll_cq(cq, 1, &wc);
208                 if (rc < 0) {
209                         dprintk("RPC:       %s: ib_poll_cq failed %i\n",
210                                 __func__, rc);
211                         return rc;
212                 }
213                 if (rc == 0)
214                         break;
215
216                 rpcrdma_event_process(&wc);
217         }
218
219         return 0;
220 }
221
222 /*
223  * rpcrdma_cq_event_upcall
224  *
225  * This upcall handles recv, send, bind and unbind events.
226  * It is reentrant but processes single events in order to maintain
227  * ordering of receives to keep server credits.
228  *
229  * It is the responsibility of the scheduled tasklet to return
230  * recv buffers to the pool. NOTE: this affects synchronization of
231  * connection shutdown. That is, the structures required for
232  * the completion of the reply handler must remain intact until
233  * all memory has been reclaimed.
234  *
235  * Note that send events are suppressed and do not result in an upcall.
236  */
237 static void
238 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
239 {
240         int rc;
241
242         rc = rpcrdma_cq_poll(cq);
243         if (rc)
244                 return;
245
246         rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247         if (rc) {
248                 dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
249                         __func__, rc);
250                 return;
251         }
252
253         rpcrdma_cq_poll(cq);
254 }
255
256 #ifdef RPC_DEBUG
257 static const char * const conn[] = {
258         "address resolved",
259         "address error",
260         "route resolved",
261         "route error",
262         "connect request",
263         "connect response",
264         "connect error",
265         "unreachable",
266         "rejected",
267         "established",
268         "disconnected",
269         "device removal"
270 };
271 #endif
272
273 static int
274 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
275 {
276         struct rpcrdma_xprt *xprt = id->context;
277         struct rpcrdma_ia *ia = &xprt->rx_ia;
278         struct rpcrdma_ep *ep = &xprt->rx_ep;
279         struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
280         struct ib_qp_attr attr;
281         struct ib_qp_init_attr iattr;
282         int connstate = 0;
283
284         switch (event->event) {
285         case RDMA_CM_EVENT_ADDR_RESOLVED:
286         case RDMA_CM_EVENT_ROUTE_RESOLVED:
287                 complete(&ia->ri_done);
288                 break;
289         case RDMA_CM_EVENT_ADDR_ERROR:
290                 ia->ri_async_rc = -EHOSTUNREACH;
291                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
292                         __func__, ep);
293                 complete(&ia->ri_done);
294                 break;
295         case RDMA_CM_EVENT_ROUTE_ERROR:
296                 ia->ri_async_rc = -ENETUNREACH;
297                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
298                         __func__, ep);
299                 complete(&ia->ri_done);
300                 break;
301         case RDMA_CM_EVENT_ESTABLISHED:
302                 connstate = 1;
303                 ib_query_qp(ia->ri_id->qp, &attr,
304                         IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
305                         &iattr);
306                 dprintk("RPC:       %s: %d responder resources"
307                         " (%d initiator)\n",
308                         __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
309                 goto connected;
310         case RDMA_CM_EVENT_CONNECT_ERROR:
311                 connstate = -ENOTCONN;
312                 goto connected;
313         case RDMA_CM_EVENT_UNREACHABLE:
314                 connstate = -ENETDOWN;
315                 goto connected;
316         case RDMA_CM_EVENT_REJECTED:
317                 connstate = -ECONNREFUSED;
318                 goto connected;
319         case RDMA_CM_EVENT_DISCONNECTED:
320                 connstate = -ECONNABORTED;
321                 goto connected;
322         case RDMA_CM_EVENT_DEVICE_REMOVAL:
323                 connstate = -ENODEV;
324 connected:
325                 dprintk("RPC:       %s: %s: %u.%u.%u.%u:%u"
326                         " (ep 0x%p event 0x%x)\n",
327                         __func__,
328                         (event->event <= 11) ? conn[event->event] :
329                                                 "unknown connection error",
330                         NIPQUAD(addr->sin_addr.s_addr),
331                         ntohs(addr->sin_port),
332                         ep, event->event);
333                 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
334                 dprintk("RPC:       %s: %sconnected\n",
335                                         __func__, connstate > 0 ? "" : "dis");
336                 ep->rep_connected = connstate;
337                 ep->rep_func(ep);
338                 wake_up_all(&ep->rep_connect_wait);
339                 break;
340         default:
341                 ia->ri_async_rc = -EINVAL;
342                 dprintk("RPC:       %s: unexpected CM event %X\n",
343                         __func__, event->event);
344                 complete(&ia->ri_done);
345                 break;
346         }
347
348         return 0;
349 }
350
351 static struct rdma_cm_id *
352 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
353                         struct rpcrdma_ia *ia, struct sockaddr *addr)
354 {
355         struct rdma_cm_id *id;
356         int rc;
357
358         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
359         if (IS_ERR(id)) {
360                 rc = PTR_ERR(id);
361                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
362                         __func__, rc);
363                 return id;
364         }
365
366         ia->ri_async_rc = 0;
367         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
368         if (rc) {
369                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
370                         __func__, rc);
371                 goto out;
372         }
373         wait_for_completion(&ia->ri_done);
374         rc = ia->ri_async_rc;
375         if (rc)
376                 goto out;
377
378         ia->ri_async_rc = 0;
379         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
380         if (rc) {
381                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
382                         __func__, rc);
383                 goto out;
384         }
385         wait_for_completion(&ia->ri_done);
386         rc = ia->ri_async_rc;
387         if (rc)
388                 goto out;
389
390         return id;
391
392 out:
393         rdma_destroy_id(id);
394         return ERR_PTR(rc);
395 }
396
397 /*
398  * Drain any cq, prior to teardown.
399  */
400 static void
401 rpcrdma_clean_cq(struct ib_cq *cq)
402 {
403         struct ib_wc wc;
404         int count = 0;
405
406         while (1 == ib_poll_cq(cq, 1, &wc))
407                 ++count;
408
409         if (count)
410                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
411                         __func__, count, wc.opcode);
412 }
413
414 /*
415  * Exported functions.
416  */
417
418 /*
419  * Open and initialize an Interface Adapter.
420  *  o initializes fields of struct rpcrdma_ia, including
421  *    interface and provider attributes and protection zone.
422  */
423 int
424 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
425 {
426         int rc, mem_priv;
427         struct ib_device_attr devattr;
428         struct rpcrdma_ia *ia = &xprt->rx_ia;
429
430         init_completion(&ia->ri_done);
431
432         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
433         if (IS_ERR(ia->ri_id)) {
434                 rc = PTR_ERR(ia->ri_id);
435                 goto out1;
436         }
437
438         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
439         if (IS_ERR(ia->ri_pd)) {
440                 rc = PTR_ERR(ia->ri_pd);
441                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
442                         __func__, rc);
443                 goto out2;
444         }
445
446         /*
447          * Query the device to determine if the requested memory
448          * registration strategy is supported. If it isn't, set the
449          * strategy to a globally supported model.
450          */
451         rc = ib_query_device(ia->ri_id->device, &devattr);
452         if (rc) {
453                 dprintk("RPC:       %s: ib_query_device failed %d\n",
454                         __func__, rc);
455                 goto out2;
456         }
457
458         if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
459                 ia->ri_have_dma_lkey = 1;
460                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
461         }
462
463         switch (memreg) {
464         case RPCRDMA_MEMWINDOWS:
465         case RPCRDMA_MEMWINDOWS_ASYNC:
466                 if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
467                         dprintk("RPC:       %s: MEMWINDOWS registration "
468                                 "specified but not supported by adapter, "
469                                 "using slower RPCRDMA_REGISTER\n",
470                                 __func__);
471                         memreg = RPCRDMA_REGISTER;
472                 }
473                 break;
474         case RPCRDMA_MTHCAFMR:
475                 if (!ia->ri_id->device->alloc_fmr) {
476 #if RPCRDMA_PERSISTENT_REGISTRATION
477                         dprintk("RPC:       %s: MTHCAFMR registration "
478                                 "specified but not supported by adapter, "
479                                 "using riskier RPCRDMA_ALLPHYSICAL\n",
480                                 __func__);
481                         memreg = RPCRDMA_ALLPHYSICAL;
482 #else
483                         dprintk("RPC:       %s: MTHCAFMR registration "
484                                 "specified but not supported by adapter, "
485                                 "using slower RPCRDMA_REGISTER\n",
486                                 __func__);
487                         memreg = RPCRDMA_REGISTER;
488 #endif
489                 }
490                 break;
491         case RPCRDMA_FRMR:
492                 /* Requires both frmr reg and local dma lkey */
493                 if ((devattr.device_cap_flags &
494                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
495                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
496 #if RPCRDMA_PERSISTENT_REGISTRATION
497                         dprintk("RPC:       %s: FRMR registration "
498                                 "specified but not supported by adapter, "
499                                 "using riskier RPCRDMA_ALLPHYSICAL\n",
500                                 __func__);
501                         memreg = RPCRDMA_ALLPHYSICAL;
502 #else
503                         dprintk("RPC:       %s: FRMR registration "
504                                 "specified but not supported by adapter, "
505                                 "using slower RPCRDMA_REGISTER\n",
506                                 __func__);
507                         memreg = RPCRDMA_REGISTER;
508 #endif
509                 }
510                 break;
511         }
512
513         /*
514          * Optionally obtain an underlying physical identity mapping in
515          * order to do a memory window-based bind. This base registration
516          * is protected from remote access - that is enabled only by binding
517          * for the specific bytes targeted during each RPC operation, and
518          * revoked after the corresponding completion similar to a storage
519          * adapter.
520          */
521         switch (memreg) {
522         case RPCRDMA_BOUNCEBUFFERS:
523         case RPCRDMA_REGISTER:
524         case RPCRDMA_FRMR:
525                 break;
526 #if RPCRDMA_PERSISTENT_REGISTRATION
527         case RPCRDMA_ALLPHYSICAL:
528                 mem_priv = IB_ACCESS_LOCAL_WRITE |
529                                 IB_ACCESS_REMOTE_WRITE |
530                                 IB_ACCESS_REMOTE_READ;
531                 goto register_setup;
532 #endif
533         case RPCRDMA_MEMWINDOWS_ASYNC:
534         case RPCRDMA_MEMWINDOWS:
535                 mem_priv = IB_ACCESS_LOCAL_WRITE |
536                                 IB_ACCESS_MW_BIND;
537                 goto register_setup;
538         case RPCRDMA_MTHCAFMR:
539                 if (ia->ri_have_dma_lkey)
540                         break;
541                 mem_priv = IB_ACCESS_LOCAL_WRITE;
542         register_setup:
543                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
544                 if (IS_ERR(ia->ri_bind_mem)) {
545                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
546                                 "phys register failed with %lX\n\t"
547                                 "Will continue with degraded performance\n",
548                                 __func__, PTR_ERR(ia->ri_bind_mem));
549                         memreg = RPCRDMA_REGISTER;
550                         ia->ri_bind_mem = NULL;
551                 }
552                 break;
553         default:
554                 printk(KERN_ERR "%s: invalid memory registration mode %d\n",
555                                 __func__, memreg);
556                 rc = -EINVAL;
557                 goto out2;
558         }
559         dprintk("RPC:       %s: memory registration strategy is %d\n",
560                 __func__, memreg);
561
562         /* Else will do memory reg/dereg for each chunk */
563         ia->ri_memreg_strategy = memreg;
564
565         return 0;
566 out2:
567         rdma_destroy_id(ia->ri_id);
568 out1:
569         return rc;
570 }
571
572 /*
573  * Clean up/close an IA.
574  *   o if event handles and PD have been initialized, free them.
575  *   o close the IA
576  */
577 void
578 rpcrdma_ia_close(struct rpcrdma_ia *ia)
579 {
580         int rc;
581
582         dprintk("RPC:       %s: entering\n", __func__);
583         if (ia->ri_bind_mem != NULL) {
584                 rc = ib_dereg_mr(ia->ri_bind_mem);
585                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
586                         __func__, rc);
587         }
588         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id) && ia->ri_id->qp)
589                 rdma_destroy_qp(ia->ri_id);
590         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
591                 rc = ib_dealloc_pd(ia->ri_pd);
592                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
593                         __func__, rc);
594         }
595         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id))
596                 rdma_destroy_id(ia->ri_id);
597 }
598
599 /*
600  * Create unconnected endpoint.
601  */
602 int
603 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
604                                 struct rpcrdma_create_data_internal *cdata)
605 {
606         struct ib_device_attr devattr;
607         int rc, err;
608
609         rc = ib_query_device(ia->ri_id->device, &devattr);
610         if (rc) {
611                 dprintk("RPC:       %s: ib_query_device failed %d\n",
612                         __func__, rc);
613                 return rc;
614         }
615
616         /* check provider's send/recv wr limits */
617         if (cdata->max_requests > devattr.max_qp_wr)
618                 cdata->max_requests = devattr.max_qp_wr;
619
620         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
621         ep->rep_attr.qp_context = ep;
622         /* send_cq and recv_cq initialized below */
623         ep->rep_attr.srq = NULL;
624         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
625         switch (ia->ri_memreg_strategy) {
626         case RPCRDMA_FRMR:
627                 /* Add room for frmr register and invalidate WRs */
628                 ep->rep_attr.cap.max_send_wr *= 3;
629                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
630                         return -EINVAL;
631                 break;
632         case RPCRDMA_MEMWINDOWS_ASYNC:
633         case RPCRDMA_MEMWINDOWS:
634                 /* Add room for mw_binds+unbinds - overkill! */
635                 ep->rep_attr.cap.max_send_wr++;
636                 ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
637                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
638                         return -EINVAL;
639                 break;
640         default:
641                 break;
642         }
643         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
644         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
645         ep->rep_attr.cap.max_recv_sge = 1;
646         ep->rep_attr.cap.max_inline_data = 0;
647         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
648         ep->rep_attr.qp_type = IB_QPT_RC;
649         ep->rep_attr.port_num = ~0;
650
651         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
652                 "iovs: send %d recv %d\n",
653                 __func__,
654                 ep->rep_attr.cap.max_send_wr,
655                 ep->rep_attr.cap.max_recv_wr,
656                 ep->rep_attr.cap.max_send_sge,
657                 ep->rep_attr.cap.max_recv_sge);
658
659         /* set trigger for requesting send completion */
660         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
661         switch (ia->ri_memreg_strategy) {
662         case RPCRDMA_MEMWINDOWS_ASYNC:
663         case RPCRDMA_MEMWINDOWS:
664                 ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
665                 break;
666         default:
667                 break;
668         }
669         if (ep->rep_cqinit <= 2)
670                 ep->rep_cqinit = 0;
671         INIT_CQCOUNT(ep);
672         ep->rep_ia = ia;
673         init_waitqueue_head(&ep->rep_connect_wait);
674
675         /*
676          * Create a single cq for receive dto and mw_bind (only ever
677          * care about unbind, really). Send completions are suppressed.
678          * Use single threaded tasklet upcalls to maintain ordering.
679          */
680         ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
681                                   rpcrdma_cq_async_error_upcall, NULL,
682                                   ep->rep_attr.cap.max_recv_wr +
683                                   ep->rep_attr.cap.max_send_wr + 1, 0);
684         if (IS_ERR(ep->rep_cq)) {
685                 rc = PTR_ERR(ep->rep_cq);
686                 dprintk("RPC:       %s: ib_create_cq failed: %i\n",
687                         __func__, rc);
688                 goto out1;
689         }
690
691         rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
692         if (rc) {
693                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
694                         __func__, rc);
695                 goto out2;
696         }
697
698         ep->rep_attr.send_cq = ep->rep_cq;
699         ep->rep_attr.recv_cq = ep->rep_cq;
700
701         /* Initialize cma parameters */
702
703         /* RPC/RDMA does not use private data */
704         ep->rep_remote_cma.private_data = NULL;
705         ep->rep_remote_cma.private_data_len = 0;
706
707         /* Client offers RDMA Read but does not initiate */
708         switch (ia->ri_memreg_strategy) {
709         case RPCRDMA_BOUNCEBUFFERS:
710                 ep->rep_remote_cma.responder_resources = 0;
711                 break;
712         case RPCRDMA_MTHCAFMR:
713         case RPCRDMA_REGISTER:
714         case RPCRDMA_FRMR:
715                 ep->rep_remote_cma.responder_resources = cdata->max_requests *
716                                 (RPCRDMA_MAX_DATA_SEGS / 8);
717                 break;
718         case RPCRDMA_MEMWINDOWS:
719         case RPCRDMA_MEMWINDOWS_ASYNC:
720 #if RPCRDMA_PERSISTENT_REGISTRATION
721         case RPCRDMA_ALLPHYSICAL:
722 #endif
723                 ep->rep_remote_cma.responder_resources = cdata->max_requests *
724                                 (RPCRDMA_MAX_DATA_SEGS / 2);
725                 break;
726         default:
727                 break;
728         }
729         if (ep->rep_remote_cma.responder_resources > devattr.max_qp_rd_atom)
730                 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
731         ep->rep_remote_cma.initiator_depth = 0;
732
733         ep->rep_remote_cma.retry_count = 7;
734         ep->rep_remote_cma.flow_control = 0;
735         ep->rep_remote_cma.rnr_retry_count = 0;
736
737         return 0;
738
739 out2:
740         err = ib_destroy_cq(ep->rep_cq);
741         if (err)
742                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
743                         __func__, err);
744 out1:
745         return rc;
746 }
747
748 /*
749  * rpcrdma_ep_destroy
750  *
751  * Disconnect and destroy endpoint. After this, the only
752  * valid operations on the ep are to free it (if dynamically
753  * allocated) or re-create it.
754  *
755  * The caller's error handling must be sure to not leak the endpoint
756  * if this function fails.
757  */
758 int
759 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
760 {
761         int rc;
762
763         dprintk("RPC:       %s: entering, connected is %d\n",
764                 __func__, ep->rep_connected);
765
766         if (ia->ri_id->qp) {
767                 rc = rpcrdma_ep_disconnect(ep, ia);
768                 if (rc)
769                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
770                                 " returned %i\n", __func__, rc);
771         }
772
773         ep->rep_func = NULL;
774
775         /* padding - could be done in rpcrdma_buffer_destroy... */
776         if (ep->rep_pad_mr) {
777                 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
778                 ep->rep_pad_mr = NULL;
779         }
780
781         if (ia->ri_id->qp) {
782                 rdma_destroy_qp(ia->ri_id);
783                 ia->ri_id->qp = NULL;
784         }
785
786         rpcrdma_clean_cq(ep->rep_cq);
787         rc = ib_destroy_cq(ep->rep_cq);
788         if (rc)
789                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
790                         __func__, rc);
791
792         return rc;
793 }
794
795 /*
796  * Connect unconnected endpoint.
797  */
798 int
799 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
800 {
801         struct rdma_cm_id *id;
802         int rc = 0;
803         int retry_count = 0;
804         int reconnect = (ep->rep_connected != 0);
805
806         if (reconnect) {
807                 struct rpcrdma_xprt *xprt;
808 retry:
809                 rc = rpcrdma_ep_disconnect(ep, ia);
810                 if (rc && rc != -ENOTCONN)
811                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
812                                 " status %i\n", __func__, rc);
813                 rpcrdma_clean_cq(ep->rep_cq);
814
815                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
816                 id = rpcrdma_create_id(xprt, ia,
817                                 (struct sockaddr *)&xprt->rx_data.addr);
818                 if (IS_ERR(id)) {
819                         rc = PTR_ERR(id);
820                         goto out;
821                 }
822                 /* TEMP TEMP TEMP - fail if new device:
823                  * Deregister/remarshal *all* requests!
824                  * Close and recreate adapter, pd, etc!
825                  * Re-determine all attributes still sane!
826                  * More stuff I haven't thought of!
827                  * Rrrgh!
828                  */
829                 if (ia->ri_id->device != id->device) {
830                         printk("RPC:       %s: can't reconnect on "
831                                 "different device!\n", __func__);
832                         rdma_destroy_id(id);
833                         rc = -ENETDOWN;
834                         goto out;
835                 }
836                 /* END TEMP */
837                 rdma_destroy_id(ia->ri_id);
838                 ia->ri_id = id;
839         }
840
841         rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
842         if (rc) {
843                 dprintk("RPC:       %s: rdma_create_qp failed %i\n",
844                         __func__, rc);
845                 goto out;
846         }
847
848 /* XXX Tavor device performs badly with 2K MTU! */
849 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
850         struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
851         if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
852             (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
853              pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
854                 struct ib_qp_attr attr = {
855                         .path_mtu = IB_MTU_1024
856                 };
857                 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
858         }
859 }
860
861         /* Theoretically a client initiator_depth > 0 is not needed,
862          * but many peers fail to complete the connection unless they
863          * == responder_resources! */
864         if (ep->rep_remote_cma.initiator_depth !=
865                                 ep->rep_remote_cma.responder_resources)
866                 ep->rep_remote_cma.initiator_depth =
867                         ep->rep_remote_cma.responder_resources;
868
869         ep->rep_connected = 0;
870
871         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
872         if (rc) {
873                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
874                                 __func__, rc);
875                 goto out;
876         }
877
878         if (reconnect)
879                 return 0;
880
881         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
882
883         /*
884          * Check state. A non-peer reject indicates no listener
885          * (ECONNREFUSED), which may be a transient state. All
886          * others indicate a transport condition which has already
887          * undergone a best-effort.
888          */
889         if (ep->rep_connected == -ECONNREFUSED
890             && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
891                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
892                 goto retry;
893         }
894         if (ep->rep_connected <= 0) {
895                 /* Sometimes, the only way to reliably connect to remote
896                  * CMs is to use same nonzero values for ORD and IRD. */
897                 ep->rep_remote_cma.initiator_depth =
898                                         ep->rep_remote_cma.responder_resources;
899                 if (ep->rep_remote_cma.initiator_depth == 0)
900                         ++ep->rep_remote_cma.initiator_depth;
901                 if (ep->rep_remote_cma.responder_resources == 0)
902                         ++ep->rep_remote_cma.responder_resources;
903                 if (retry_count++ == 0)
904                         goto retry;
905                 rc = ep->rep_connected;
906         } else {
907                 dprintk("RPC:       %s: connected\n", __func__);
908         }
909
910 out:
911         if (rc)
912                 ep->rep_connected = rc;
913         return rc;
914 }
915
916 /*
917  * rpcrdma_ep_disconnect
918  *
919  * This is separate from destroy to facilitate the ability
920  * to reconnect without recreating the endpoint.
921  *
922  * This call is not reentrant, and must not be made in parallel
923  * on the same endpoint.
924  */
925 int
926 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
927 {
928         int rc;
929
930         rpcrdma_clean_cq(ep->rep_cq);
931         rc = rdma_disconnect(ia->ri_id);
932         if (!rc) {
933                 /* returns without wait if not connected */
934                 wait_event_interruptible(ep->rep_connect_wait,
935                                                         ep->rep_connected != 1);
936                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
937                         (ep->rep_connected == 1) ? "still " : "dis");
938         } else {
939                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
940                 ep->rep_connected = rc;
941         }
942         return rc;
943 }
944
945 /*
946  * Initialize buffer memory
947  */
948 int
949 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
950         struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
951 {
952         char *p;
953         size_t len;
954         int i, rc;
955         struct rpcrdma_mw *r;
956
957         buf->rb_max_requests = cdata->max_requests;
958         spin_lock_init(&buf->rb_lock);
959         atomic_set(&buf->rb_credits, 1);
960
961         /* Need to allocate:
962          *   1.  arrays for send and recv pointers
963          *   2.  arrays of struct rpcrdma_req to fill in pointers
964          *   3.  array of struct rpcrdma_rep for replies
965          *   4.  padding, if any
966          *   5.  mw's, fmr's or frmr's, if any
967          * Send/recv buffers in req/rep need to be registered
968          */
969
970         len = buf->rb_max_requests *
971                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
972         len += cdata->padding;
973         switch (ia->ri_memreg_strategy) {
974         case RPCRDMA_FRMR:
975                 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
976                                 sizeof(struct rpcrdma_mw);
977                 break;
978         case RPCRDMA_MTHCAFMR:
979                 /* TBD we are perhaps overallocating here */
980                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
981                                 sizeof(struct rpcrdma_mw);
982                 break;
983         case RPCRDMA_MEMWINDOWS_ASYNC:
984         case RPCRDMA_MEMWINDOWS:
985                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
986                                 sizeof(struct rpcrdma_mw);
987                 break;
988         default:
989                 break;
990         }
991
992         /* allocate 1, 4 and 5 in one shot */
993         p = kzalloc(len, GFP_KERNEL);
994         if (p == NULL) {
995                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
996                         __func__, len);
997                 rc = -ENOMEM;
998                 goto out;
999         }
1000         buf->rb_pool = p;       /* for freeing it later */
1001
1002         buf->rb_send_bufs = (struct rpcrdma_req **) p;
1003         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1004         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1005         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1006
1007         /*
1008          * Register the zeroed pad buffer, if any.
1009          */
1010         if (cdata->padding) {
1011                 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1012                                             &ep->rep_pad_mr, &ep->rep_pad);
1013                 if (rc)
1014                         goto out;
1015         }
1016         p += cdata->padding;
1017
1018         /*
1019          * Allocate the fmr's, or mw's for mw_bind chunk registration.
1020          * We "cycle" the mw's in order to minimize rkey reuse,
1021          * and also reduce unbind-to-bind collision.
1022          */
1023         INIT_LIST_HEAD(&buf->rb_mws);
1024         r = (struct rpcrdma_mw *)p;
1025         switch (ia->ri_memreg_strategy) {
1026         case RPCRDMA_FRMR:
1027                 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1028                         r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1029                                                          RPCRDMA_MAX_SEGS);
1030                         if (IS_ERR(r->r.frmr.fr_mr)) {
1031                                 rc = PTR_ERR(r->r.frmr.fr_mr);
1032                                 dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1033                                         " failed %i\n", __func__, rc);
1034                                 goto out;
1035                         }
1036                         r->r.frmr.fr_pgl =
1037                                 ib_alloc_fast_reg_page_list(ia->ri_id->device,
1038                                                             RPCRDMA_MAX_SEGS);
1039                         if (IS_ERR(r->r.frmr.fr_pgl)) {
1040                                 rc = PTR_ERR(r->r.frmr.fr_pgl);
1041                                 dprintk("RPC:       %s: "
1042                                         "ib_alloc_fast_reg_page_list "
1043                                         "failed %i\n", __func__, rc);
1044                                 goto out;
1045                         }
1046                         list_add(&r->mw_list, &buf->rb_mws);
1047                         ++r;
1048                 }
1049                 break;
1050         case RPCRDMA_MTHCAFMR:
1051                 /* TBD we are perhaps overallocating here */
1052                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1053                         static struct ib_fmr_attr fa =
1054                                 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1055                         r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1056                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1057                                 &fa);
1058                         if (IS_ERR(r->r.fmr)) {
1059                                 rc = PTR_ERR(r->r.fmr);
1060                                 dprintk("RPC:       %s: ib_alloc_fmr"
1061                                         " failed %i\n", __func__, rc);
1062                                 goto out;
1063                         }
1064                         list_add(&r->mw_list, &buf->rb_mws);
1065                         ++r;
1066                 }
1067                 break;
1068         case RPCRDMA_MEMWINDOWS_ASYNC:
1069         case RPCRDMA_MEMWINDOWS:
1070                 /* Allocate one extra request's worth, for full cycling */
1071                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1072                         r->r.mw = ib_alloc_mw(ia->ri_pd);
1073                         if (IS_ERR(r->r.mw)) {
1074                                 rc = PTR_ERR(r->r.mw);
1075                                 dprintk("RPC:       %s: ib_alloc_mw"
1076                                         " failed %i\n", __func__, rc);
1077                                 goto out;
1078                         }
1079                         list_add(&r->mw_list, &buf->rb_mws);
1080                         ++r;
1081                 }
1082                 break;
1083         default:
1084                 break;
1085         }
1086
1087         /*
1088          * Allocate/init the request/reply buffers. Doing this
1089          * using kmalloc for now -- one for each buf.
1090          */
1091         for (i = 0; i < buf->rb_max_requests; i++) {
1092                 struct rpcrdma_req *req;
1093                 struct rpcrdma_rep *rep;
1094
1095                 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1096                 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1097                 /* Typical ~2400b, so rounding up saves work later */
1098                 if (len < 4096)
1099                         len = 4096;
1100                 req = kmalloc(len, GFP_KERNEL);
1101                 if (req == NULL) {
1102                         dprintk("RPC:       %s: request buffer %d alloc"
1103                                 " failed\n", __func__, i);
1104                         rc = -ENOMEM;
1105                         goto out;
1106                 }
1107                 memset(req, 0, sizeof(struct rpcrdma_req));
1108                 buf->rb_send_bufs[i] = req;
1109                 buf->rb_send_bufs[i]->rl_buffer = buf;
1110
1111                 rc = rpcrdma_register_internal(ia, req->rl_base,
1112                                 len - offsetof(struct rpcrdma_req, rl_base),
1113                                 &buf->rb_send_bufs[i]->rl_handle,
1114                                 &buf->rb_send_bufs[i]->rl_iov);
1115                 if (rc)
1116                         goto out;
1117
1118                 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1119
1120                 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1121                 rep = kmalloc(len, GFP_KERNEL);
1122                 if (rep == NULL) {
1123                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1124                                 __func__, i);
1125                         rc = -ENOMEM;
1126                         goto out;
1127                 }
1128                 memset(rep, 0, sizeof(struct rpcrdma_rep));
1129                 buf->rb_recv_bufs[i] = rep;
1130                 buf->rb_recv_bufs[i]->rr_buffer = buf;
1131                 init_waitqueue_head(&rep->rr_unbind);
1132
1133                 rc = rpcrdma_register_internal(ia, rep->rr_base,
1134                                 len - offsetof(struct rpcrdma_rep, rr_base),
1135                                 &buf->rb_recv_bufs[i]->rr_handle,
1136                                 &buf->rb_recv_bufs[i]->rr_iov);
1137                 if (rc)
1138                         goto out;
1139
1140         }
1141         dprintk("RPC:       %s: max_requests %d\n",
1142                 __func__, buf->rb_max_requests);
1143         /* done */
1144         return 0;
1145 out:
1146         rpcrdma_buffer_destroy(buf);
1147         return rc;
1148 }
1149
1150 /*
1151  * Unregister and destroy buffer memory. Need to deal with
1152  * partial initialization, so it's callable from failed create.
1153  * Must be called before destroying endpoint, as registrations
1154  * reference it.
1155  */
1156 void
1157 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1158 {
1159         int rc, i;
1160         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1161         struct rpcrdma_mw *r;
1162
1163         /* clean up in reverse order from create
1164          *   1.  recv mr memory (mr free, then kfree)
1165          *   1a. bind mw memory
1166          *   2.  send mr memory (mr free, then kfree)
1167          *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1168          *   4.  arrays
1169          */
1170         dprintk("RPC:       %s: entering\n", __func__);
1171
1172         for (i = 0; i < buf->rb_max_requests; i++) {
1173                 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1174                         rpcrdma_deregister_internal(ia,
1175                                         buf->rb_recv_bufs[i]->rr_handle,
1176                                         &buf->rb_recv_bufs[i]->rr_iov);
1177                         kfree(buf->rb_recv_bufs[i]);
1178                 }
1179                 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1180                         while (!list_empty(&buf->rb_mws)) {
1181                                 r = list_entry(buf->rb_mws.next,
1182                                         struct rpcrdma_mw, mw_list);
1183                                 list_del(&r->mw_list);
1184                                 switch (ia->ri_memreg_strategy) {
1185                                 case RPCRDMA_FRMR:
1186                                         rc = ib_dereg_mr(r->r.frmr.fr_mr);
1187                                         if (rc)
1188                                                 dprintk("RPC:       %s:"
1189                                                         " ib_dereg_mr"
1190                                                         " failed %i\n",
1191                                                         __func__, rc);
1192                                         ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1193                                         break;
1194                                 case RPCRDMA_MTHCAFMR:
1195                                         rc = ib_dealloc_fmr(r->r.fmr);
1196                                         if (rc)
1197                                                 dprintk("RPC:       %s:"
1198                                                         " ib_dealloc_fmr"
1199                                                         " failed %i\n",
1200                                                         __func__, rc);
1201                                         break;
1202                                 case RPCRDMA_MEMWINDOWS_ASYNC:
1203                                 case RPCRDMA_MEMWINDOWS:
1204                                         rc = ib_dealloc_mw(r->r.mw);
1205                                         if (rc)
1206                                                 dprintk("RPC:       %s:"
1207                                                         " ib_dealloc_mw"
1208                                                         " failed %i\n",
1209                                                         __func__, rc);
1210                                         break;
1211                                 default:
1212                                         break;
1213                                 }
1214                         }
1215                         rpcrdma_deregister_internal(ia,
1216                                         buf->rb_send_bufs[i]->rl_handle,
1217                                         &buf->rb_send_bufs[i]->rl_iov);
1218                         kfree(buf->rb_send_bufs[i]);
1219                 }
1220         }
1221
1222         kfree(buf->rb_pool);
1223 }
1224
1225 /*
1226  * Get a set of request/reply buffers.
1227  *
1228  * Reply buffer (if needed) is attached to send buffer upon return.
1229  * Rule:
1230  *    rb_send_index and rb_recv_index MUST always be pointing to the
1231  *    *next* available buffer (non-NULL). They are incremented after
1232  *    removing buffers, and decremented *before* returning them.
1233  */
1234 struct rpcrdma_req *
1235 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1236 {
1237         struct rpcrdma_req *req;
1238         unsigned long flags;
1239         int i;
1240         struct rpcrdma_mw *r;
1241
1242         spin_lock_irqsave(&buffers->rb_lock, flags);
1243         if (buffers->rb_send_index == buffers->rb_max_requests) {
1244                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1245                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1246                 return ((struct rpcrdma_req *)NULL);
1247         }
1248
1249         req = buffers->rb_send_bufs[buffers->rb_send_index];
1250         if (buffers->rb_send_index < buffers->rb_recv_index) {
1251                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1252                         __func__,
1253                         buffers->rb_recv_index - buffers->rb_send_index);
1254                 req->rl_reply = NULL;
1255         } else {
1256                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1257                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1258         }
1259         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1260         if (!list_empty(&buffers->rb_mws)) {
1261                 i = RPCRDMA_MAX_SEGS - 1;
1262                 do {
1263                         r = list_entry(buffers->rb_mws.next,
1264                                         struct rpcrdma_mw, mw_list);
1265                         list_del(&r->mw_list);
1266                         req->rl_segments[i].mr_chunk.rl_mw = r;
1267                 } while (--i >= 0);
1268         }
1269         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1270         return req;
1271 }
1272
1273 /*
1274  * Put request/reply buffers back into pool.
1275  * Pre-decrement counter/array index.
1276  */
1277 void
1278 rpcrdma_buffer_put(struct rpcrdma_req *req)
1279 {
1280         struct rpcrdma_buffer *buffers = req->rl_buffer;
1281         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1282         int i;
1283         unsigned long flags;
1284
1285         BUG_ON(req->rl_nchunks != 0);
1286         spin_lock_irqsave(&buffers->rb_lock, flags);
1287         buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1288         req->rl_niovs = 0;
1289         if (req->rl_reply) {
1290                 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1291                 init_waitqueue_head(&req->rl_reply->rr_unbind);
1292                 req->rl_reply->rr_func = NULL;
1293                 req->rl_reply = NULL;
1294         }
1295         switch (ia->ri_memreg_strategy) {
1296         case RPCRDMA_FRMR:
1297         case RPCRDMA_MTHCAFMR:
1298         case RPCRDMA_MEMWINDOWS_ASYNC:
1299         case RPCRDMA_MEMWINDOWS:
1300                 /*
1301                  * Cycle mw's back in reverse order, and "spin" them.
1302                  * This delays and scrambles reuse as much as possible.
1303                  */
1304                 i = 1;
1305                 do {
1306                         struct rpcrdma_mw **mw;
1307                         mw = &req->rl_segments[i].mr_chunk.rl_mw;
1308                         list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1309                         *mw = NULL;
1310                 } while (++i < RPCRDMA_MAX_SEGS);
1311                 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1312                                         &buffers->rb_mws);
1313                 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1314                 break;
1315         default:
1316                 break;
1317         }
1318         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1319 }
1320
1321 /*
1322  * Recover reply buffers from pool.
1323  * This happens when recovering from error conditions.
1324  * Post-increment counter/array index.
1325  */
1326 void
1327 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1328 {
1329         struct rpcrdma_buffer *buffers = req->rl_buffer;
1330         unsigned long flags;
1331
1332         if (req->rl_iov.length == 0)    /* special case xprt_rdma_allocate() */
1333                 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1334         spin_lock_irqsave(&buffers->rb_lock, flags);
1335         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1336                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1337                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1338         }
1339         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1340 }
1341
1342 /*
1343  * Put reply buffers back into pool when not attached to
1344  * request. This happens in error conditions, and when
1345  * aborting unbinds. Pre-decrement counter/array index.
1346  */
1347 void
1348 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1349 {
1350         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1351         unsigned long flags;
1352
1353         rep->rr_func = NULL;
1354         spin_lock_irqsave(&buffers->rb_lock, flags);
1355         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1356         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1357 }
1358
1359 /*
1360  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1361  */
1362
1363 int
1364 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1365                                 struct ib_mr **mrp, struct ib_sge *iov)
1366 {
1367         struct ib_phys_buf ipb;
1368         struct ib_mr *mr;
1369         int rc;
1370
1371         /*
1372          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1373          */
1374         iov->addr = ib_dma_map_single(ia->ri_id->device,
1375                         va, len, DMA_BIDIRECTIONAL);
1376         iov->length = len;
1377
1378         if (ia->ri_have_dma_lkey) {
1379                 *mrp = NULL;
1380                 iov->lkey = ia->ri_dma_lkey;
1381                 return 0;
1382         } else if (ia->ri_bind_mem != NULL) {
1383                 *mrp = NULL;
1384                 iov->lkey = ia->ri_bind_mem->lkey;
1385                 return 0;
1386         }
1387
1388         ipb.addr = iov->addr;
1389         ipb.size = iov->length;
1390         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1391                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1392
1393         dprintk("RPC:       %s: phys convert: 0x%llx "
1394                         "registered 0x%llx length %d\n",
1395                         __func__, (unsigned long long)ipb.addr,
1396                         (unsigned long long)iov->addr, len);
1397
1398         if (IS_ERR(mr)) {
1399                 *mrp = NULL;
1400                 rc = PTR_ERR(mr);
1401                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1402         } else {
1403                 *mrp = mr;
1404                 iov->lkey = mr->lkey;
1405                 rc = 0;
1406         }
1407
1408         return rc;
1409 }
1410
1411 int
1412 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1413                                 struct ib_mr *mr, struct ib_sge *iov)
1414 {
1415         int rc;
1416
1417         ib_dma_unmap_single(ia->ri_id->device,
1418                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1419
1420         if (NULL == mr)
1421                 return 0;
1422
1423         rc = ib_dereg_mr(mr);
1424         if (rc)
1425                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1426         return rc;
1427 }
1428
1429 /*
1430  * Wrappers for chunk registration, shared by read/write chunk code.
1431  */
1432
1433 static void
1434 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1435 {
1436         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1437         seg->mr_dmalen = seg->mr_len;
1438         if (seg->mr_page)
1439                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1440                                 seg->mr_page, offset_in_page(seg->mr_offset),
1441                                 seg->mr_dmalen, seg->mr_dir);
1442         else
1443                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1444                                 seg->mr_offset,
1445                                 seg->mr_dmalen, seg->mr_dir);
1446 }
1447
1448 static void
1449 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1450 {
1451         if (seg->mr_page)
1452                 ib_dma_unmap_page(ia->ri_id->device,
1453                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1454         else
1455                 ib_dma_unmap_single(ia->ri_id->device,
1456                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1457 }
1458
1459 static int
1460 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1461                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1462                         struct rpcrdma_xprt *r_xprt)
1463 {
1464         struct rpcrdma_mr_seg *seg1 = seg;
1465         struct ib_send_wr frmr_wr, *bad_wr;
1466         u8 key;
1467         int len, pageoff;
1468         int i, rc;
1469
1470         pageoff = offset_in_page(seg1->mr_offset);
1471         seg1->mr_offset -= pageoff;     /* start of page */
1472         seg1->mr_len += pageoff;
1473         len = -pageoff;
1474         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1475                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1476         for (i = 0; i < *nsegs;) {
1477                 rpcrdma_map_one(ia, seg, writing);
1478                 seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1479                 len += seg->mr_len;
1480                 ++seg;
1481                 ++i;
1482                 /* Check for holes */
1483                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1484                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1485                         break;
1486         }
1487         dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1488                 __func__, seg1->mr_chunk.rl_mw, i);
1489
1490         /* Bump the key */
1491         key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1492         ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1493
1494         /* Prepare FRMR WR */
1495         memset(&frmr_wr, 0, sizeof frmr_wr);
1496         frmr_wr.opcode = IB_WR_FAST_REG_MR;
1497         frmr_wr.send_flags = 0;                 /* unsignaled */
1498         frmr_wr.wr.fast_reg.iova_start = (unsigned long)seg1->mr_dma;
1499         frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1500         frmr_wr.wr.fast_reg.page_list_len = i;
1501         frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1502         frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1503         frmr_wr.wr.fast_reg.access_flags = (writing ?
1504                                 IB_ACCESS_REMOTE_WRITE : IB_ACCESS_REMOTE_READ);
1505         frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1506         DECR_CQCOUNT(&r_xprt->rx_ep);
1507
1508         rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
1509
1510         if (rc) {
1511                 dprintk("RPC:       %s: failed ib_post_send for register,"
1512                         " status %i\n", __func__, rc);
1513                 while (i--)
1514                         rpcrdma_unmap_one(ia, --seg);
1515         } else {
1516                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1517                 seg1->mr_base = seg1->mr_dma + pageoff;
1518                 seg1->mr_nsegs = i;
1519                 seg1->mr_len = len;
1520         }
1521         *nsegs = i;
1522         return rc;
1523 }
1524
1525 static int
1526 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1527                         struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1528 {
1529         struct rpcrdma_mr_seg *seg1 = seg;
1530         struct ib_send_wr invalidate_wr, *bad_wr;
1531         int rc;
1532
1533         while (seg1->mr_nsegs--)
1534                 rpcrdma_unmap_one(ia, seg++);
1535
1536         memset(&invalidate_wr, 0, sizeof invalidate_wr);
1537         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1538         invalidate_wr.send_flags = 0;                   /* unsignaled */
1539         invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1540         DECR_CQCOUNT(&r_xprt->rx_ep);
1541
1542         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1543         if (rc)
1544                 dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1545                         " status %i\n", __func__, rc);
1546         return rc;
1547 }
1548
1549 static int
1550 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1551                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1552 {
1553         struct rpcrdma_mr_seg *seg1 = seg;
1554         u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1555         int len, pageoff, i, rc;
1556
1557         pageoff = offset_in_page(seg1->mr_offset);
1558         seg1->mr_offset -= pageoff;     /* start of page */
1559         seg1->mr_len += pageoff;
1560         len = -pageoff;
1561         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1562                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1563         for (i = 0; i < *nsegs;) {
1564                 rpcrdma_map_one(ia, seg, writing);
1565                 physaddrs[i] = seg->mr_dma;
1566                 len += seg->mr_len;
1567                 ++seg;
1568                 ++i;
1569                 /* Check for holes */
1570                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1571                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1572                         break;
1573         }
1574         rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1575                                 physaddrs, i, seg1->mr_dma);
1576         if (rc) {
1577                 dprintk("RPC:       %s: failed ib_map_phys_fmr "
1578                         "%u@0x%llx+%i (%d)... status %i\n", __func__,
1579                         len, (unsigned long long)seg1->mr_dma,
1580                         pageoff, i, rc);
1581                 while (i--)
1582                         rpcrdma_unmap_one(ia, --seg);
1583         } else {
1584                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1585                 seg1->mr_base = seg1->mr_dma + pageoff;
1586                 seg1->mr_nsegs = i;
1587                 seg1->mr_len = len;
1588         }
1589         *nsegs = i;
1590         return rc;
1591 }
1592
1593 static int
1594 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1595                         struct rpcrdma_ia *ia)
1596 {
1597         struct rpcrdma_mr_seg *seg1 = seg;
1598         LIST_HEAD(l);
1599         int rc;
1600
1601         list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1602         rc = ib_unmap_fmr(&l);
1603         while (seg1->mr_nsegs--)
1604                 rpcrdma_unmap_one(ia, seg++);
1605         if (rc)
1606                 dprintk("RPC:       %s: failed ib_unmap_fmr,"
1607                         " status %i\n", __func__, rc);
1608         return rc;
1609 }
1610
1611 static int
1612 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1613                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1614                         struct rpcrdma_xprt *r_xprt)
1615 {
1616         int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1617                                   IB_ACCESS_REMOTE_READ);
1618         struct ib_mw_bind param;
1619         int rc;
1620
1621         *nsegs = 1;
1622         rpcrdma_map_one(ia, seg, writing);
1623         param.mr = ia->ri_bind_mem;
1624         param.wr_id = 0ULL;     /* no send cookie */
1625         param.addr = seg->mr_dma;
1626         param.length = seg->mr_len;
1627         param.send_flags = 0;
1628         param.mw_access_flags = mem_priv;
1629
1630         DECR_CQCOUNT(&r_xprt->rx_ep);
1631         rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1632         if (rc) {
1633                 dprintk("RPC:       %s: failed ib_bind_mw "
1634                         "%u@0x%llx status %i\n",
1635                         __func__, seg->mr_len,
1636                         (unsigned long long)seg->mr_dma, rc);
1637                 rpcrdma_unmap_one(ia, seg);
1638         } else {
1639                 seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1640                 seg->mr_base = param.addr;
1641                 seg->mr_nsegs = 1;
1642         }
1643         return rc;
1644 }
1645
1646 static int
1647 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1648                         struct rpcrdma_ia *ia,
1649                         struct rpcrdma_xprt *r_xprt, void **r)
1650 {
1651         struct ib_mw_bind param;
1652         LIST_HEAD(l);
1653         int rc;
1654
1655         BUG_ON(seg->mr_nsegs != 1);
1656         param.mr = ia->ri_bind_mem;
1657         param.addr = 0ULL;      /* unbind */
1658         param.length = 0;
1659         param.mw_access_flags = 0;
1660         if (*r) {
1661                 param.wr_id = (u64) (unsigned long) *r;
1662                 param.send_flags = IB_SEND_SIGNALED;
1663                 INIT_CQCOUNT(&r_xprt->rx_ep);
1664         } else {
1665                 param.wr_id = 0ULL;
1666                 param.send_flags = 0;
1667                 DECR_CQCOUNT(&r_xprt->rx_ep);
1668         }
1669         rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1670         rpcrdma_unmap_one(ia, seg);
1671         if (rc)
1672                 dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1673                         " status %i\n", __func__, rc);
1674         else
1675                 *r = NULL;      /* will upcall on completion */
1676         return rc;
1677 }
1678
1679 static int
1680 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1681                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1682 {
1683         int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1684                                   IB_ACCESS_REMOTE_READ);
1685         struct rpcrdma_mr_seg *seg1 = seg;
1686         struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1687         int len, i, rc = 0;
1688
1689         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1690                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1691         for (len = 0, i = 0; i < *nsegs;) {
1692                 rpcrdma_map_one(ia, seg, writing);
1693                 ipb[i].addr = seg->mr_dma;
1694                 ipb[i].size = seg->mr_len;
1695                 len += seg->mr_len;
1696                 ++seg;
1697                 ++i;
1698                 /* Check for holes */
1699                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1700                     offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1701                         break;
1702         }
1703         seg1->mr_base = seg1->mr_dma;
1704         seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1705                                 ipb, i, mem_priv, &seg1->mr_base);
1706         if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1707                 rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1708                 dprintk("RPC:       %s: failed ib_reg_phys_mr "
1709                         "%u@0x%llx (%d)... status %i\n",
1710                         __func__, len,
1711                         (unsigned long long)seg1->mr_dma, i, rc);
1712                 while (i--)
1713                         rpcrdma_unmap_one(ia, --seg);
1714         } else {
1715                 seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1716                 seg1->mr_nsegs = i;
1717                 seg1->mr_len = len;
1718         }
1719         *nsegs = i;
1720         return rc;
1721 }
1722
1723 static int
1724 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1725                         struct rpcrdma_ia *ia)
1726 {
1727         struct rpcrdma_mr_seg *seg1 = seg;
1728         int rc;
1729
1730         rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1731         seg1->mr_chunk.rl_mr = NULL;
1732         while (seg1->mr_nsegs--)
1733                 rpcrdma_unmap_one(ia, seg++);
1734         if (rc)
1735                 dprintk("RPC:       %s: failed ib_dereg_mr,"
1736                         " status %i\n", __func__, rc);
1737         return rc;
1738 }
1739
1740 int
1741 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1742                         int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1743 {
1744         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1745         int rc = 0;
1746
1747         switch (ia->ri_memreg_strategy) {
1748
1749 #if RPCRDMA_PERSISTENT_REGISTRATION
1750         case RPCRDMA_ALLPHYSICAL:
1751                 rpcrdma_map_one(ia, seg, writing);
1752                 seg->mr_rkey = ia->ri_bind_mem->rkey;
1753                 seg->mr_base = seg->mr_dma;
1754                 seg->mr_nsegs = 1;
1755                 nsegs = 1;
1756                 break;
1757 #endif
1758
1759         /* Registration using frmr registration */
1760         case RPCRDMA_FRMR:
1761                 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1762                 break;
1763
1764         /* Registration using fmr memory registration */
1765         case RPCRDMA_MTHCAFMR:
1766                 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1767                 break;
1768
1769         /* Registration using memory windows */
1770         case RPCRDMA_MEMWINDOWS_ASYNC:
1771         case RPCRDMA_MEMWINDOWS:
1772                 rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1773                 break;
1774
1775         /* Default registration each time */
1776         default:
1777                 rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1778                 break;
1779         }
1780         if (rc)
1781                 return -1;
1782
1783         return nsegs;
1784 }
1785
1786 int
1787 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1788                 struct rpcrdma_xprt *r_xprt, void *r)
1789 {
1790         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1791         int nsegs = seg->mr_nsegs, rc;
1792
1793         switch (ia->ri_memreg_strategy) {
1794
1795 #if RPCRDMA_PERSISTENT_REGISTRATION
1796         case RPCRDMA_ALLPHYSICAL:
1797                 BUG_ON(nsegs != 1);
1798                 rpcrdma_unmap_one(ia, seg);
1799                 rc = 0;
1800                 break;
1801 #endif
1802
1803         case RPCRDMA_FRMR:
1804                 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1805                 break;
1806
1807         case RPCRDMA_MTHCAFMR:
1808                 rc = rpcrdma_deregister_fmr_external(seg, ia);
1809                 break;
1810
1811         case RPCRDMA_MEMWINDOWS_ASYNC:
1812         case RPCRDMA_MEMWINDOWS:
1813                 rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1814                 break;
1815
1816         default:
1817                 rc = rpcrdma_deregister_default_external(seg, ia);
1818                 break;
1819         }
1820         if (r) {
1821                 struct rpcrdma_rep *rep = r;
1822                 void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1823                 rep->rr_func = NULL;
1824                 func(rep);      /* dereg done, callback now */
1825         }
1826         return nsegs;
1827 }
1828
1829 /*
1830  * Prepost any receive buffer, then post send.
1831  *
1832  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1833  */
1834 int
1835 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1836                 struct rpcrdma_ep *ep,
1837                 struct rpcrdma_req *req)
1838 {
1839         struct ib_send_wr send_wr, *send_wr_fail;
1840         struct rpcrdma_rep *rep = req->rl_reply;
1841         int rc;
1842
1843         if (rep) {
1844                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1845                 if (rc)
1846                         goto out;
1847                 req->rl_reply = NULL;
1848         }
1849
1850         send_wr.next = NULL;
1851         send_wr.wr_id = 0ULL;   /* no send cookie */
1852         send_wr.sg_list = req->rl_send_iov;
1853         send_wr.num_sge = req->rl_niovs;
1854         send_wr.opcode = IB_WR_SEND;
1855         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1856                 ib_dma_sync_single_for_device(ia->ri_id->device,
1857                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1858                         DMA_TO_DEVICE);
1859         ib_dma_sync_single_for_device(ia->ri_id->device,
1860                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1861                 DMA_TO_DEVICE);
1862         ib_dma_sync_single_for_device(ia->ri_id->device,
1863                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1864                 DMA_TO_DEVICE);
1865
1866         if (DECR_CQCOUNT(ep) > 0)
1867                 send_wr.send_flags = 0;
1868         else { /* Provider must take a send completion every now and then */
1869                 INIT_CQCOUNT(ep);
1870                 send_wr.send_flags = IB_SEND_SIGNALED;
1871         }
1872
1873         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1874         if (rc)
1875                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1876                         rc);
1877 out:
1878         return rc;
1879 }
1880
1881 /*
1882  * (Re)post a receive buffer.
1883  */
1884 int
1885 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1886                      struct rpcrdma_ep *ep,
1887                      struct rpcrdma_rep *rep)
1888 {
1889         struct ib_recv_wr recv_wr, *recv_wr_fail;
1890         int rc;
1891
1892         recv_wr.next = NULL;
1893         recv_wr.wr_id = (u64) (unsigned long) rep;
1894         recv_wr.sg_list = &rep->rr_iov;
1895         recv_wr.num_sge = 1;
1896
1897         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1898                 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1899
1900         DECR_CQCOUNT(ep);
1901         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1902
1903         if (rc)
1904                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1905                         rc);
1906         return rc;
1907 }