Merge branches 'release', 'cpuidle-2.6.25' and 'idle' into release
[linux-2.6] / net / sunrpc / xprtrdma / svc_rdma_transport.c
1 /*
2  * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  *
39  * Author: Tom Tucker <tom@opengridcomputing.com>
40  */
41
42 #include <linux/sunrpc/svc_xprt.h>
43 #include <linux/sunrpc/debug.h>
44 #include <linux/sunrpc/rpc_rdma.h>
45 #include <linux/spinlock.h>
46 #include <rdma/ib_verbs.h>
47 #include <rdma/rdma_cm.h>
48 #include <linux/sunrpc/svc_rdma.h>
49
50 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
51
52 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
53                                         struct sockaddr *sa, int salen,
54                                         int flags);
55 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
56 static void svc_rdma_release_rqst(struct svc_rqst *);
57 static void rdma_destroy_xprt(struct svcxprt_rdma *xprt);
58 static void dto_tasklet_func(unsigned long data);
59 static void svc_rdma_detach(struct svc_xprt *xprt);
60 static void svc_rdma_free(struct svc_xprt *xprt);
61 static int svc_rdma_has_wspace(struct svc_xprt *xprt);
62 static void rq_cq_reap(struct svcxprt_rdma *xprt);
63 static void sq_cq_reap(struct svcxprt_rdma *xprt);
64
65 DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
66 static DEFINE_SPINLOCK(dto_lock);
67 static LIST_HEAD(dto_xprt_q);
68
69 static struct svc_xprt_ops svc_rdma_ops = {
70         .xpo_create = svc_rdma_create,
71         .xpo_recvfrom = svc_rdma_recvfrom,
72         .xpo_sendto = svc_rdma_sendto,
73         .xpo_release_rqst = svc_rdma_release_rqst,
74         .xpo_detach = svc_rdma_detach,
75         .xpo_free = svc_rdma_free,
76         .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
77         .xpo_has_wspace = svc_rdma_has_wspace,
78         .xpo_accept = svc_rdma_accept,
79 };
80
81 struct svc_xprt_class svc_rdma_class = {
82         .xcl_name = "rdma",
83         .xcl_owner = THIS_MODULE,
84         .xcl_ops = &svc_rdma_ops,
85         .xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP,
86 };
87
88 static int rdma_bump_context_cache(struct svcxprt_rdma *xprt)
89 {
90         int target;
91         int at_least_one = 0;
92         struct svc_rdma_op_ctxt *ctxt;
93
94         target = min(xprt->sc_ctxt_cnt + xprt->sc_ctxt_bump,
95                      xprt->sc_ctxt_max);
96
97         spin_lock_bh(&xprt->sc_ctxt_lock);
98         while (xprt->sc_ctxt_cnt < target) {
99                 xprt->sc_ctxt_cnt++;
100                 spin_unlock_bh(&xprt->sc_ctxt_lock);
101
102                 ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
103
104                 spin_lock_bh(&xprt->sc_ctxt_lock);
105                 if (ctxt) {
106                         at_least_one = 1;
107                         ctxt->next = xprt->sc_ctxt_head;
108                         xprt->sc_ctxt_head = ctxt;
109                 } else {
110                         /* kmalloc failed...give up for now */
111                         xprt->sc_ctxt_cnt--;
112                         break;
113                 }
114         }
115         spin_unlock_bh(&xprt->sc_ctxt_lock);
116         dprintk("svcrdma: sc_ctxt_max=%d, sc_ctxt_cnt=%d\n",
117                 xprt->sc_ctxt_max, xprt->sc_ctxt_cnt);
118         return at_least_one;
119 }
120
121 struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
122 {
123         struct svc_rdma_op_ctxt *ctxt;
124
125         while (1) {
126                 spin_lock_bh(&xprt->sc_ctxt_lock);
127                 if (unlikely(xprt->sc_ctxt_head == NULL)) {
128                         /* Try to bump my cache. */
129                         spin_unlock_bh(&xprt->sc_ctxt_lock);
130
131                         if (rdma_bump_context_cache(xprt))
132                                 continue;
133
134                         printk(KERN_INFO "svcrdma: sleeping waiting for "
135                                "context memory on xprt=%p\n",
136                                xprt);
137                         schedule_timeout_uninterruptible(msecs_to_jiffies(500));
138                         continue;
139                 }
140                 ctxt = xprt->sc_ctxt_head;
141                 xprt->sc_ctxt_head = ctxt->next;
142                 spin_unlock_bh(&xprt->sc_ctxt_lock);
143                 ctxt->xprt = xprt;
144                 INIT_LIST_HEAD(&ctxt->dto_q);
145                 ctxt->count = 0;
146                 break;
147         }
148         return ctxt;
149 }
150
151 void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
152 {
153         struct svcxprt_rdma *xprt;
154         int i;
155
156         BUG_ON(!ctxt);
157         xprt = ctxt->xprt;
158         if (free_pages)
159                 for (i = 0; i < ctxt->count; i++)
160                         put_page(ctxt->pages[i]);
161
162         for (i = 0; i < ctxt->count; i++)
163                 dma_unmap_single(xprt->sc_cm_id->device->dma_device,
164                                  ctxt->sge[i].addr,
165                                  ctxt->sge[i].length,
166                                  ctxt->direction);
167         spin_lock_bh(&xprt->sc_ctxt_lock);
168         ctxt->next = xprt->sc_ctxt_head;
169         xprt->sc_ctxt_head = ctxt;
170         spin_unlock_bh(&xprt->sc_ctxt_lock);
171 }
172
173 /* ib_cq event handler */
174 static void cq_event_handler(struct ib_event *event, void *context)
175 {
176         struct svc_xprt *xprt = context;
177         dprintk("svcrdma: received CQ event id=%d, context=%p\n",
178                 event->event, context);
179         set_bit(XPT_CLOSE, &xprt->xpt_flags);
180 }
181
182 /* QP event handler */
183 static void qp_event_handler(struct ib_event *event, void *context)
184 {
185         struct svc_xprt *xprt = context;
186
187         switch (event->event) {
188         /* These are considered benign events */
189         case IB_EVENT_PATH_MIG:
190         case IB_EVENT_COMM_EST:
191         case IB_EVENT_SQ_DRAINED:
192         case IB_EVENT_QP_LAST_WQE_REACHED:
193                 dprintk("svcrdma: QP event %d received for QP=%p\n",
194                         event->event, event->element.qp);
195                 break;
196         /* These are considered fatal events */
197         case IB_EVENT_PATH_MIG_ERR:
198         case IB_EVENT_QP_FATAL:
199         case IB_EVENT_QP_REQ_ERR:
200         case IB_EVENT_QP_ACCESS_ERR:
201         case IB_EVENT_DEVICE_FATAL:
202         default:
203                 dprintk("svcrdma: QP ERROR event %d received for QP=%p, "
204                         "closing transport\n",
205                         event->event, event->element.qp);
206                 set_bit(XPT_CLOSE, &xprt->xpt_flags);
207                 break;
208         }
209 }
210
211 /*
212  * Data Transfer Operation Tasklet
213  *
214  * Walks a list of transports with I/O pending, removing entries as
215  * they are added to the server's I/O pending list. Two bits indicate
216  * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
217  * spinlock that serializes access to the transport list with the RQ
218  * and SQ interrupt handlers.
219  */
220 static void dto_tasklet_func(unsigned long data)
221 {
222         struct svcxprt_rdma *xprt;
223         unsigned long flags;
224
225         spin_lock_irqsave(&dto_lock, flags);
226         while (!list_empty(&dto_xprt_q)) {
227                 xprt = list_entry(dto_xprt_q.next,
228                                   struct svcxprt_rdma, sc_dto_q);
229                 list_del_init(&xprt->sc_dto_q);
230                 spin_unlock_irqrestore(&dto_lock, flags);
231
232                 if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) {
233                         ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
234                         rq_cq_reap(xprt);
235                         set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
236                         /*
237                          * If data arrived before established event,
238                          * don't enqueue. This defers RPC I/O until the
239                          * RDMA connection is complete.
240                          */
241                         if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
242                                 svc_xprt_enqueue(&xprt->sc_xprt);
243                 }
244
245                 if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
246                         ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
247                         sq_cq_reap(xprt);
248                 }
249
250                 spin_lock_irqsave(&dto_lock, flags);
251         }
252         spin_unlock_irqrestore(&dto_lock, flags);
253 }
254
255 /*
256  * Receive Queue Completion Handler
257  *
258  * Since an RQ completion handler is called on interrupt context, we
259  * need to defer the handling of the I/O to a tasklet
260  */
261 static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
262 {
263         struct svcxprt_rdma *xprt = cq_context;
264         unsigned long flags;
265
266         /*
267          * Set the bit regardless of whether or not it's on the list
268          * because it may be on the list already due to an SQ
269          * completion.
270         */
271         set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
272
273         /*
274          * If this transport is not already on the DTO transport queue,
275          * add it
276          */
277         spin_lock_irqsave(&dto_lock, flags);
278         if (list_empty(&xprt->sc_dto_q))
279                 list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
280         spin_unlock_irqrestore(&dto_lock, flags);
281
282         /* Tasklet does all the work to avoid irqsave locks. */
283         tasklet_schedule(&dto_tasklet);
284 }
285
286 /*
287  * rq_cq_reap - Process the RQ CQ.
288  *
289  * Take all completing WC off the CQE and enqueue the associated DTO
290  * context on the dto_q for the transport.
291  */
292 static void rq_cq_reap(struct svcxprt_rdma *xprt)
293 {
294         int ret;
295         struct ib_wc wc;
296         struct svc_rdma_op_ctxt *ctxt = NULL;
297
298         atomic_inc(&rdma_stat_rq_poll);
299
300         spin_lock_bh(&xprt->sc_rq_dto_lock);
301         while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
302                 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
303                 ctxt->wc_status = wc.status;
304                 ctxt->byte_len = wc.byte_len;
305                 if (wc.status != IB_WC_SUCCESS) {
306                         /* Close the transport */
307                         set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
308                         svc_rdma_put_context(ctxt, 1);
309                         continue;
310                 }
311                 list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
312         }
313         spin_unlock_bh(&xprt->sc_rq_dto_lock);
314
315         if (ctxt)
316                 atomic_inc(&rdma_stat_rq_prod);
317 }
318
319 /*
320  * Send Queue Completion Handler - potentially called on interrupt context.
321  */
322 static void sq_cq_reap(struct svcxprt_rdma *xprt)
323 {
324         struct svc_rdma_op_ctxt *ctxt = NULL;
325         struct ib_wc wc;
326         struct ib_cq *cq = xprt->sc_sq_cq;
327         int ret;
328
329         atomic_inc(&rdma_stat_sq_poll);
330         while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
331                 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
332                 xprt = ctxt->xprt;
333
334                 if (wc.status != IB_WC_SUCCESS)
335                         /* Close the transport */
336                         set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
337
338                 /* Decrement used SQ WR count */
339                 atomic_dec(&xprt->sc_sq_count);
340                 wake_up(&xprt->sc_send_wait);
341
342                 switch (ctxt->wr_op) {
343                 case IB_WR_SEND:
344                 case IB_WR_RDMA_WRITE:
345                         svc_rdma_put_context(ctxt, 1);
346                         break;
347
348                 case IB_WR_RDMA_READ:
349                         if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
350                                 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
351                                 set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
352                                 spin_lock_bh(&xprt->sc_read_complete_lock);
353                                 list_add_tail(&ctxt->dto_q,
354                                               &xprt->sc_read_complete_q);
355                                 spin_unlock_bh(&xprt->sc_read_complete_lock);
356                                 svc_xprt_enqueue(&xprt->sc_xprt);
357                         }
358                         break;
359
360                 default:
361                         printk(KERN_ERR "svcrdma: unexpected completion type, "
362                                "opcode=%d, status=%d\n",
363                                wc.opcode, wc.status);
364                         break;
365                 }
366         }
367
368         if (ctxt)
369                 atomic_inc(&rdma_stat_sq_prod);
370 }
371
372 static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
373 {
374         struct svcxprt_rdma *xprt = cq_context;
375         unsigned long flags;
376
377         /*
378          * Set the bit regardless of whether or not it's on the list
379          * because it may be on the list already due to an RQ
380          * completion.
381         */
382         set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
383
384         /*
385          * If this transport is not already on the DTO transport queue,
386          * add it
387          */
388         spin_lock_irqsave(&dto_lock, flags);
389         if (list_empty(&xprt->sc_dto_q))
390                 list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
391         spin_unlock_irqrestore(&dto_lock, flags);
392
393         /* Tasklet does all the work to avoid irqsave locks. */
394         tasklet_schedule(&dto_tasklet);
395 }
396
397 static void create_context_cache(struct svcxprt_rdma *xprt,
398                                  int ctxt_count, int ctxt_bump, int ctxt_max)
399 {
400         struct svc_rdma_op_ctxt *ctxt;
401         int i;
402
403         xprt->sc_ctxt_max = ctxt_max;
404         xprt->sc_ctxt_bump = ctxt_bump;
405         xprt->sc_ctxt_cnt = 0;
406         xprt->sc_ctxt_head = NULL;
407         for (i = 0; i < ctxt_count; i++) {
408                 ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
409                 if (ctxt) {
410                         ctxt->next = xprt->sc_ctxt_head;
411                         xprt->sc_ctxt_head = ctxt;
412                         xprt->sc_ctxt_cnt++;
413                 }
414         }
415 }
416
417 static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt)
418 {
419         struct svc_rdma_op_ctxt *next;
420         if (!ctxt)
421                 return;
422
423         do {
424                 next = ctxt->next;
425                 kfree(ctxt);
426                 ctxt = next;
427         } while (next);
428 }
429
430 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
431                                              int listener)
432 {
433         struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
434
435         if (!cma_xprt)
436                 return NULL;
437         svc_xprt_init(&svc_rdma_class, &cma_xprt->sc_xprt, serv);
438         INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
439         INIT_LIST_HEAD(&cma_xprt->sc_dto_q);
440         INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
441         INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
442         init_waitqueue_head(&cma_xprt->sc_send_wait);
443
444         spin_lock_init(&cma_xprt->sc_lock);
445         spin_lock_init(&cma_xprt->sc_read_complete_lock);
446         spin_lock_init(&cma_xprt->sc_ctxt_lock);
447         spin_lock_init(&cma_xprt->sc_rq_dto_lock);
448
449         cma_xprt->sc_ord = svcrdma_ord;
450
451         cma_xprt->sc_max_req_size = svcrdma_max_req_size;
452         cma_xprt->sc_max_requests = svcrdma_max_requests;
453         cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
454         atomic_set(&cma_xprt->sc_sq_count, 0);
455
456         if (!listener) {
457                 int reqs = cma_xprt->sc_max_requests;
458                 create_context_cache(cma_xprt,
459                                      reqs << 1, /* starting size */
460                                      reqs,      /* bump amount */
461                                      reqs +
462                                      cma_xprt->sc_sq_depth +
463                                      RPCRDMA_MAX_THREADS + 1); /* max */
464                 if (!cma_xprt->sc_ctxt_head) {
465                         kfree(cma_xprt);
466                         return NULL;
467                 }
468                 clear_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
469         } else
470                 set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
471
472         return cma_xprt;
473 }
474
475 struct page *svc_rdma_get_page(void)
476 {
477         struct page *page;
478
479         while ((page = alloc_page(GFP_KERNEL)) == NULL) {
480                 /* If we can't get memory, wait a bit and try again */
481                 printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 "
482                        "jiffies.\n");
483                 schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
484         }
485         return page;
486 }
487
488 int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
489 {
490         struct ib_recv_wr recv_wr, *bad_recv_wr;
491         struct svc_rdma_op_ctxt *ctxt;
492         struct page *page;
493         unsigned long pa;
494         int sge_no;
495         int buflen;
496         int ret;
497
498         ctxt = svc_rdma_get_context(xprt);
499         buflen = 0;
500         ctxt->direction = DMA_FROM_DEVICE;
501         for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
502                 BUG_ON(sge_no >= xprt->sc_max_sge);
503                 page = svc_rdma_get_page();
504                 ctxt->pages[sge_no] = page;
505                 pa = ib_dma_map_page(xprt->sc_cm_id->device,
506                                      page, 0, PAGE_SIZE,
507                                      DMA_FROM_DEVICE);
508                 ctxt->sge[sge_no].addr = pa;
509                 ctxt->sge[sge_no].length = PAGE_SIZE;
510                 ctxt->sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
511                 buflen += PAGE_SIZE;
512         }
513         ctxt->count = sge_no;
514         recv_wr.next = NULL;
515         recv_wr.sg_list = &ctxt->sge[0];
516         recv_wr.num_sge = ctxt->count;
517         recv_wr.wr_id = (u64)(unsigned long)ctxt;
518
519         ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
520         return ret;
521 }
522
523 /*
524  * This function handles the CONNECT_REQUEST event on a listening
525  * endpoint. It is passed the cma_id for the _new_ connection. The context in
526  * this cma_id is inherited from the listening cma_id and is the svc_xprt
527  * structure for the listening endpoint.
528  *
529  * This function creates a new xprt for the new connection and enqueues it on
530  * the accept queue for the listent xprt. When the listen thread is kicked, it
531  * will call the recvfrom method on the listen xprt which will accept the new
532  * connection.
533  */
534 static void handle_connect_req(struct rdma_cm_id *new_cma_id)
535 {
536         struct svcxprt_rdma *listen_xprt = new_cma_id->context;
537         struct svcxprt_rdma *newxprt;
538
539         /* Create a new transport */
540         newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
541         if (!newxprt) {
542                 dprintk("svcrdma: failed to create new transport\n");
543                 return;
544         }
545         newxprt->sc_cm_id = new_cma_id;
546         new_cma_id->context = newxprt;
547         dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
548                 newxprt, newxprt->sc_cm_id, listen_xprt);
549
550         /*
551          * Enqueue the new transport on the accept queue of the listening
552          * transport
553          */
554         spin_lock_bh(&listen_xprt->sc_lock);
555         list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
556         spin_unlock_bh(&listen_xprt->sc_lock);
557
558         /*
559          * Can't use svc_xprt_received here because we are not on a
560          * rqstp thread
561         */
562         set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
563         svc_xprt_enqueue(&listen_xprt->sc_xprt);
564 }
565
566 /*
567  * Handles events generated on the listening endpoint. These events will be
568  * either be incoming connect requests or adapter removal  events.
569  */
570 static int rdma_listen_handler(struct rdma_cm_id *cma_id,
571                                struct rdma_cm_event *event)
572 {
573         struct svcxprt_rdma *xprt = cma_id->context;
574         int ret = 0;
575
576         switch (event->event) {
577         case RDMA_CM_EVENT_CONNECT_REQUEST:
578                 dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
579                         "event=%d\n", cma_id, cma_id->context, event->event);
580                 handle_connect_req(cma_id);
581                 break;
582
583         case RDMA_CM_EVENT_ESTABLISHED:
584                 /* Accept complete */
585                 dprintk("svcrdma: Connection completed on LISTEN xprt=%p, "
586                         "cm_id=%p\n", xprt, cma_id);
587                 break;
588
589         case RDMA_CM_EVENT_DEVICE_REMOVAL:
590                 dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
591                         xprt, cma_id);
592                 if (xprt)
593                         set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
594                 break;
595
596         default:
597                 dprintk("svcrdma: Unexpected event on listening endpoint %p, "
598                         "event=%d\n", cma_id, event->event);
599                 break;
600         }
601
602         return ret;
603 }
604
605 static int rdma_cma_handler(struct rdma_cm_id *cma_id,
606                             struct rdma_cm_event *event)
607 {
608         struct svc_xprt *xprt = cma_id->context;
609         struct svcxprt_rdma *rdma =
610                 container_of(xprt, struct svcxprt_rdma, sc_xprt);
611         switch (event->event) {
612         case RDMA_CM_EVENT_ESTABLISHED:
613                 /* Accept complete */
614                 dprintk("svcrdma: Connection completed on DTO xprt=%p, "
615                         "cm_id=%p\n", xprt, cma_id);
616                 clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
617                 svc_xprt_enqueue(xprt);
618                 break;
619         case RDMA_CM_EVENT_DISCONNECTED:
620                 dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
621                         xprt, cma_id);
622                 if (xprt) {
623                         set_bit(XPT_CLOSE, &xprt->xpt_flags);
624                         svc_xprt_enqueue(xprt);
625                 }
626                 break;
627         case RDMA_CM_EVENT_DEVICE_REMOVAL:
628                 dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, "
629                         "event=%d\n", cma_id, xprt, event->event);
630                 if (xprt) {
631                         set_bit(XPT_CLOSE, &xprt->xpt_flags);
632                         svc_xprt_enqueue(xprt);
633                 }
634                 break;
635         default:
636                 dprintk("svcrdma: Unexpected event on DTO endpoint %p, "
637                         "event=%d\n", cma_id, event->event);
638                 break;
639         }
640         return 0;
641 }
642
643 /*
644  * Create a listening RDMA service endpoint.
645  */
646 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
647                                         struct sockaddr *sa, int salen,
648                                         int flags)
649 {
650         struct rdma_cm_id *listen_id;
651         struct svcxprt_rdma *cma_xprt;
652         struct svc_xprt *xprt;
653         int ret;
654
655         dprintk("svcrdma: Creating RDMA socket\n");
656
657         cma_xprt = rdma_create_xprt(serv, 1);
658         if (!cma_xprt)
659                 return ERR_PTR(ENOMEM);
660         xprt = &cma_xprt->sc_xprt;
661
662         listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
663         if (IS_ERR(listen_id)) {
664                 rdma_destroy_xprt(cma_xprt);
665                 dprintk("svcrdma: rdma_create_id failed = %ld\n",
666                         PTR_ERR(listen_id));
667                 return (void *)listen_id;
668         }
669         ret = rdma_bind_addr(listen_id, sa);
670         if (ret) {
671                 rdma_destroy_xprt(cma_xprt);
672                 rdma_destroy_id(listen_id);
673                 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
674                 return ERR_PTR(ret);
675         }
676         cma_xprt->sc_cm_id = listen_id;
677
678         ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
679         if (ret) {
680                 rdma_destroy_id(listen_id);
681                 rdma_destroy_xprt(cma_xprt);
682                 dprintk("svcrdma: rdma_listen failed = %d\n", ret);
683         }
684
685         /*
686          * We need to use the address from the cm_id in case the
687          * caller specified 0 for the port number.
688          */
689         sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr;
690         svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
691
692         return &cma_xprt->sc_xprt;
693 }
694
695 /*
696  * This is the xpo_recvfrom function for listening endpoints. Its
697  * purpose is to accept incoming connections. The CMA callback handler
698  * has already created a new transport and attached it to the new CMA
699  * ID.
700  *
701  * There is a queue of pending connections hung on the listening
702  * transport. This queue contains the new svc_xprt structure. This
703  * function takes svc_xprt structures off the accept_q and completes
704  * the connection.
705  */
706 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
707 {
708         struct svcxprt_rdma *listen_rdma;
709         struct svcxprt_rdma *newxprt = NULL;
710         struct rdma_conn_param conn_param;
711         struct ib_qp_init_attr qp_attr;
712         struct ib_device_attr devattr;
713         struct sockaddr *sa;
714         int ret;
715         int i;
716
717         listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
718         clear_bit(XPT_CONN, &xprt->xpt_flags);
719         /* Get the next entry off the accept list */
720         spin_lock_bh(&listen_rdma->sc_lock);
721         if (!list_empty(&listen_rdma->sc_accept_q)) {
722                 newxprt = list_entry(listen_rdma->sc_accept_q.next,
723                                      struct svcxprt_rdma, sc_accept_q);
724                 list_del_init(&newxprt->sc_accept_q);
725         }
726         if (!list_empty(&listen_rdma->sc_accept_q))
727                 set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
728         spin_unlock_bh(&listen_rdma->sc_lock);
729         if (!newxprt)
730                 return NULL;
731
732         dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
733                 newxprt, newxprt->sc_cm_id);
734
735         ret = ib_query_device(newxprt->sc_cm_id->device, &devattr);
736         if (ret) {
737                 dprintk("svcrdma: could not query device attributes on "
738                         "device %p, rc=%d\n", newxprt->sc_cm_id->device, ret);
739                 goto errout;
740         }
741
742         /* Qualify the transport resource defaults with the
743          * capabilities of this particular device */
744         newxprt->sc_max_sge = min((size_t)devattr.max_sge,
745                                   (size_t)RPCSVC_MAXPAGES);
746         newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
747                                    (size_t)svcrdma_max_requests);
748         newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
749
750         newxprt->sc_ord =  min((size_t)devattr.max_qp_rd_atom,
751                                (size_t)svcrdma_ord);
752
753         newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);
754         if (IS_ERR(newxprt->sc_pd)) {
755                 dprintk("svcrdma: error creating PD for connect request\n");
756                 goto errout;
757         }
758         newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,
759                                          sq_comp_handler,
760                                          cq_event_handler,
761                                          newxprt,
762                                          newxprt->sc_sq_depth,
763                                          0);
764         if (IS_ERR(newxprt->sc_sq_cq)) {
765                 dprintk("svcrdma: error creating SQ CQ for connect request\n");
766                 goto errout;
767         }
768         newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
769                                          rq_comp_handler,
770                                          cq_event_handler,
771                                          newxprt,
772                                          newxprt->sc_max_requests,
773                                          0);
774         if (IS_ERR(newxprt->sc_rq_cq)) {
775                 dprintk("svcrdma: error creating RQ CQ for connect request\n");
776                 goto errout;
777         }
778
779         memset(&qp_attr, 0, sizeof qp_attr);
780         qp_attr.event_handler = qp_event_handler;
781         qp_attr.qp_context = &newxprt->sc_xprt;
782         qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
783         qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
784         qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
785         qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
786         qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
787         qp_attr.qp_type = IB_QPT_RC;
788         qp_attr.send_cq = newxprt->sc_sq_cq;
789         qp_attr.recv_cq = newxprt->sc_rq_cq;
790         dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"
791                 "    cm_id->device=%p, sc_pd->device=%p\n"
792                 "    cap.max_send_wr = %d\n"
793                 "    cap.max_recv_wr = %d\n"
794                 "    cap.max_send_sge = %d\n"
795                 "    cap.max_recv_sge = %d\n",
796                 newxprt->sc_cm_id, newxprt->sc_pd,
797                 newxprt->sc_cm_id->device, newxprt->sc_pd->device,
798                 qp_attr.cap.max_send_wr,
799                 qp_attr.cap.max_recv_wr,
800                 qp_attr.cap.max_send_sge,
801                 qp_attr.cap.max_recv_sge);
802
803         ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
804         if (ret) {
805                 /*
806                  * XXX: This is a hack. We need a xx_request_qp interface
807                  * that will adjust the qp_attr's with a best-effort
808                  * number
809                  */
810                 qp_attr.cap.max_send_sge -= 2;
811                 qp_attr.cap.max_recv_sge -= 2;
812                 ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd,
813                                      &qp_attr);
814                 if (ret) {
815                         dprintk("svcrdma: failed to create QP, ret=%d\n", ret);
816                         goto errout;
817                 }
818                 newxprt->sc_max_sge = qp_attr.cap.max_send_sge;
819                 newxprt->sc_max_sge = qp_attr.cap.max_recv_sge;
820                 newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
821                 newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
822         }
823         newxprt->sc_qp = newxprt->sc_cm_id->qp;
824
825         /* Register all of physical memory */
826         newxprt->sc_phys_mr = ib_get_dma_mr(newxprt->sc_pd,
827                                             IB_ACCESS_LOCAL_WRITE |
828                                             IB_ACCESS_REMOTE_WRITE);
829         if (IS_ERR(newxprt->sc_phys_mr)) {
830                 dprintk("svcrdma: Failed to create DMA MR ret=%d\n", ret);
831                 goto errout;
832         }
833
834         /* Post receive buffers */
835         for (i = 0; i < newxprt->sc_max_requests; i++) {
836                 ret = svc_rdma_post_recv(newxprt);
837                 if (ret) {
838                         dprintk("svcrdma: failure posting receive buffers\n");
839                         goto errout;
840                 }
841         }
842
843         /* Swap out the handler */
844         newxprt->sc_cm_id->event_handler = rdma_cma_handler;
845
846         /* Accept Connection */
847         set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
848         memset(&conn_param, 0, sizeof conn_param);
849         conn_param.responder_resources = 0;
850         conn_param.initiator_depth = newxprt->sc_ord;
851         ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
852         if (ret) {
853                 dprintk("svcrdma: failed to accept new connection, ret=%d\n",
854                        ret);
855                 goto errout;
856         }
857
858         dprintk("svcrdma: new connection %p accepted with the following "
859                 "attributes:\n"
860                 "    local_ip        : %d.%d.%d.%d\n"
861                 "    local_port      : %d\n"
862                 "    remote_ip       : %d.%d.%d.%d\n"
863                 "    remote_port     : %d\n"
864                 "    max_sge         : %d\n"
865                 "    sq_depth        : %d\n"
866                 "    max_requests    : %d\n"
867                 "    ord             : %d\n",
868                 newxprt,
869                 NIPQUAD(((struct sockaddr_in *)&newxprt->sc_cm_id->
870                          route.addr.src_addr)->sin_addr.s_addr),
871                 ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
872                        route.addr.src_addr)->sin_port),
873                 NIPQUAD(((struct sockaddr_in *)&newxprt->sc_cm_id->
874                          route.addr.dst_addr)->sin_addr.s_addr),
875                 ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
876                        route.addr.dst_addr)->sin_port),
877                 newxprt->sc_max_sge,
878                 newxprt->sc_sq_depth,
879                 newxprt->sc_max_requests,
880                 newxprt->sc_ord);
881
882         /* Set the local and remote addresses in the transport */
883         sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
884         svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
885         sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
886         svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
887
888         ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
889         ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
890         return &newxprt->sc_xprt;
891
892  errout:
893         dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
894         rdma_destroy_id(newxprt->sc_cm_id);
895         rdma_destroy_xprt(newxprt);
896         return NULL;
897 }
898
899 /*
900  * Post an RQ WQE to the RQ when the rqst is being released. This
901  * effectively returns an RQ credit to the client. The rq_xprt_ctxt
902  * will be null if the request is deferred due to an RDMA_READ or the
903  * transport had no data ready (EAGAIN). Note that an RPC deferred in
904  * svc_process will still return the credit, this is because the data
905  * is copied and no longer consume a WQE/WC.
906  */
907 static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
908 {
909         int err;
910         struct svcxprt_rdma *rdma =
911                 container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
912         if (rqstp->rq_xprt_ctxt) {
913                 BUG_ON(rqstp->rq_xprt_ctxt != rdma);
914                 err = svc_rdma_post_recv(rdma);
915                 if (err)
916                         dprintk("svcrdma: failed to post an RQ WQE error=%d\n",
917                                 err);
918         }
919         rqstp->rq_xprt_ctxt = NULL;
920 }
921
922 /* Disable data ready events for this connection */
923 static void svc_rdma_detach(struct svc_xprt *xprt)
924 {
925         struct svcxprt_rdma *rdma =
926                 container_of(xprt, struct svcxprt_rdma, sc_xprt);
927         unsigned long flags;
928
929         dprintk("svc: svc_rdma_detach(%p)\n", xprt);
930         /*
931          * Shutdown the connection. This will ensure we don't get any
932          * more events from the provider.
933          */
934         rdma_disconnect(rdma->sc_cm_id);
935         rdma_destroy_id(rdma->sc_cm_id);
936
937         /* We may already be on the DTO list */
938         spin_lock_irqsave(&dto_lock, flags);
939         if (!list_empty(&rdma->sc_dto_q))
940                 list_del_init(&rdma->sc_dto_q);
941         spin_unlock_irqrestore(&dto_lock, flags);
942 }
943
944 static void svc_rdma_free(struct svc_xprt *xprt)
945 {
946         struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt;
947         dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
948         rdma_destroy_xprt(rdma);
949         kfree(rdma);
950 }
951
952 static void rdma_destroy_xprt(struct svcxprt_rdma *xprt)
953 {
954         if (xprt->sc_qp && !IS_ERR(xprt->sc_qp))
955                 ib_destroy_qp(xprt->sc_qp);
956
957         if (xprt->sc_sq_cq && !IS_ERR(xprt->sc_sq_cq))
958                 ib_destroy_cq(xprt->sc_sq_cq);
959
960         if (xprt->sc_rq_cq && !IS_ERR(xprt->sc_rq_cq))
961                 ib_destroy_cq(xprt->sc_rq_cq);
962
963         if (xprt->sc_phys_mr && !IS_ERR(xprt->sc_phys_mr))
964                 ib_dereg_mr(xprt->sc_phys_mr);
965
966         if (xprt->sc_pd && !IS_ERR(xprt->sc_pd))
967                 ib_dealloc_pd(xprt->sc_pd);
968
969         destroy_context_cache(xprt->sc_ctxt_head);
970 }
971
972 static int svc_rdma_has_wspace(struct svc_xprt *xprt)
973 {
974         struct svcxprt_rdma *rdma =
975                 container_of(xprt, struct svcxprt_rdma, sc_xprt);
976
977         /*
978          * If there are fewer SQ WR available than required to send a
979          * simple response, return false.
980          */
981         if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3))
982                 return 0;
983
984         /*
985          * ...or there are already waiters on the SQ,
986          * return false.
987          */
988         if (waitqueue_active(&rdma->sc_send_wait))
989                 return 0;
990
991         /* Otherwise return true. */
992         return 1;
993 }
994
995 int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
996 {
997         struct ib_send_wr *bad_wr;
998         int ret;
999
1000         if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
1001                 return 0;
1002
1003         BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
1004         BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op !=
1005                 wr->opcode);
1006         /* If the SQ is full, wait until an SQ entry is available */
1007         while (1) {
1008                 spin_lock_bh(&xprt->sc_lock);
1009                 if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
1010                         spin_unlock_bh(&xprt->sc_lock);
1011                         atomic_inc(&rdma_stat_sq_starve);
1012                         /* See if we can reap some SQ WR */
1013                         sq_cq_reap(xprt);
1014
1015                         /* Wait until SQ WR available if SQ still full */
1016                         wait_event(xprt->sc_send_wait,
1017                                    atomic_read(&xprt->sc_sq_count) <
1018                                    xprt->sc_sq_depth);
1019                         continue;
1020                 }
1021                 /* Bumped used SQ WR count and post */
1022                 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
1023                 if (!ret)
1024                         atomic_inc(&xprt->sc_sq_count);
1025                 else
1026                         dprintk("svcrdma: failed to post SQ WR rc=%d, "
1027                                "sc_sq_count=%d, sc_sq_depth=%d\n",
1028                                ret, atomic_read(&xprt->sc_sq_count),
1029                                xprt->sc_sq_depth);
1030                 spin_unlock_bh(&xprt->sc_lock);
1031                 break;
1032         }
1033         return ret;
1034 }
1035
1036 int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
1037                         enum rpcrdma_errcode err)
1038 {
1039         struct ib_send_wr err_wr;
1040         struct ib_sge sge;
1041         struct page *p;
1042         struct svc_rdma_op_ctxt *ctxt;
1043         u32 *va;
1044         int length;
1045         int ret;
1046
1047         p = svc_rdma_get_page();
1048         va = page_address(p);
1049
1050         /* XDR encode error */
1051         length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
1052
1053         /* Prepare SGE for local address */
1054         sge.addr = ib_dma_map_page(xprt->sc_cm_id->device,
1055                                    p, 0, PAGE_SIZE, DMA_FROM_DEVICE);
1056         sge.lkey = xprt->sc_phys_mr->lkey;
1057         sge.length = length;
1058
1059         ctxt = svc_rdma_get_context(xprt);
1060         ctxt->count = 1;
1061         ctxt->pages[0] = p;
1062
1063         /* Prepare SEND WR */
1064         memset(&err_wr, 0, sizeof err_wr);
1065         ctxt->wr_op = IB_WR_SEND;
1066         err_wr.wr_id = (unsigned long)ctxt;
1067         err_wr.sg_list = &sge;
1068         err_wr.num_sge = 1;
1069         err_wr.opcode = IB_WR_SEND;
1070         err_wr.send_flags = IB_SEND_SIGNALED;
1071
1072         /* Post It */
1073         ret = svc_rdma_send(xprt, &err_wr);
1074         if (ret) {
1075                 dprintk("svcrdma: Error posting send = %d\n", ret);
1076                 svc_rdma_put_context(ctxt, 1);
1077         }
1078
1079         return ret;
1080 }