SUNRPC: Move the UDP socket bufsize parameters to a private data structure
[linux-2.6] / net / sunrpc / xprtsock.c
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
7  * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  */
17
18 #include <linux/types.h>
19 #include <linux/slab.h>
20 #include <linux/capability.h>
21 #include <linux/sched.h>
22 #include <linux/pagemap.h>
23 #include <linux/errno.h>
24 #include <linux/socket.h>
25 #include <linux/in.h>
26 #include <linux/net.h>
27 #include <linux/mm.h>
28 #include <linux/udp.h>
29 #include <linux/tcp.h>
30 #include <linux/sunrpc/clnt.h>
31 #include <linux/sunrpc/sched.h>
32 #include <linux/file.h>
33
34 #include <net/sock.h>
35 #include <net/checksum.h>
36 #include <net/udp.h>
37 #include <net/tcp.h>
38
39 /*
40  * xprtsock tunables
41  */
42 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
43 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
44
45 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
46 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
47
48 /*
49  * How many times to try sending a request on a socket before waiting
50  * for the socket buffer to clear.
51  */
52 #define XS_SENDMSG_RETRY        (10U)
53
54 /*
55  * Time out for an RPC UDP socket connect.  UDP socket connects are
56  * synchronous, but we set a timeout anyway in case of resource
57  * exhaustion on the local host.
58  */
59 #define XS_UDP_CONN_TO          (5U * HZ)
60
61 /*
62  * Wait duration for an RPC TCP connection to be established.  Solaris
63  * NFS over TCP uses 60 seconds, for example, which is in line with how
64  * long a server takes to reboot.
65  */
66 #define XS_TCP_CONN_TO          (60U * HZ)
67
68 /*
69  * Wait duration for a reply from the RPC portmapper.
70  */
71 #define XS_BIND_TO              (60U * HZ)
72
73 /*
74  * Delay if a UDP socket connect error occurs.  This is most likely some
75  * kind of resource problem on the local host.
76  */
77 #define XS_UDP_REEST_TO         (2U * HZ)
78
79 /*
80  * The reestablish timeout allows clients to delay for a bit before attempting
81  * to reconnect to a server that just dropped our connection.
82  *
83  * We implement an exponential backoff when trying to reestablish a TCP
84  * transport connection with the server.  Some servers like to drop a TCP
85  * connection when they are overworked, so we start with a short timeout and
86  * increase over time if the server is down or not responding.
87  */
88 #define XS_TCP_INIT_REEST_TO    (3U * HZ)
89 #define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
90
91 /*
92  * TCP idle timeout; client drops the transport socket if it is idle
93  * for this long.  Note that we also timeout UDP sockets to prevent
94  * holding port numbers when there is no RPC traffic.
95  */
96 #define XS_IDLE_DISC_TO         (5U * 60 * HZ)
97
98 #ifdef RPC_DEBUG
99 # undef  RPC_DEBUG_DATA
100 # define RPCDBG_FACILITY        RPCDBG_TRANS
101 #endif
102
103 #ifdef RPC_DEBUG_DATA
104 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
105 {
106         u8 *buf = (u8 *) packet;
107         int j;
108
109         dprintk("RPC:      %s\n", msg);
110         for (j = 0; j < count && j < 128; j += 4) {
111                 if (!(j & 31)) {
112                         if (j)
113                                 dprintk("\n");
114                         dprintk("0x%04x ", j);
115                 }
116                 dprintk("%02x%02x%02x%02x ",
117                         buf[j], buf[j+1], buf[j+2], buf[j+3]);
118         }
119         dprintk("\n");
120 }
121 #else
122 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
123 {
124         /* NOP */
125 }
126 #endif
127
128 struct sock_xprt {
129         struct rpc_xprt         xprt;
130
131         /*
132          * Network layer
133          */
134         struct socket *         sock;
135         struct sock *           inet;
136
137         /*
138          * State of TCP reply receive
139          */
140         __be32                  tcp_fraghdr,
141                                 tcp_xid;
142
143         u32                     tcp_offset,
144                                 tcp_reclen;
145
146         unsigned long           tcp_copied,
147                                 tcp_flags;
148
149         /*
150          * Connection of transports
151          */
152         struct work_struct      connect_worker;
153         unsigned short          port;
154
155         /*
156          * UDP socket buffer size parameters
157          */
158         size_t                  rcvsize,
159                                 sndsize;
160 };
161
162 /*
163  * TCP receive state flags
164  */
165 #define TCP_RCV_LAST_FRAG       (1UL << 0)
166 #define TCP_RCV_COPY_FRAGHDR    (1UL << 1)
167 #define TCP_RCV_COPY_XID        (1UL << 2)
168 #define TCP_RCV_COPY_DATA       (1UL << 3)
169
170 static void xs_format_peer_addresses(struct rpc_xprt *xprt)
171 {
172         struct sockaddr_in *addr = (struct sockaddr_in *) &xprt->addr;
173         char *buf;
174
175         buf = kzalloc(20, GFP_KERNEL);
176         if (buf) {
177                 snprintf(buf, 20, "%u.%u.%u.%u",
178                                 NIPQUAD(addr->sin_addr.s_addr));
179         }
180         xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
181
182         buf = kzalloc(8, GFP_KERNEL);
183         if (buf) {
184                 snprintf(buf, 8, "%u",
185                                 ntohs(addr->sin_port));
186         }
187         xprt->address_strings[RPC_DISPLAY_PORT] = buf;
188
189         if (xprt->prot == IPPROTO_UDP)
190                 xprt->address_strings[RPC_DISPLAY_PROTO] = "udp";
191         else
192                 xprt->address_strings[RPC_DISPLAY_PROTO] = "tcp";
193
194         buf = kzalloc(48, GFP_KERNEL);
195         if (buf) {
196                 snprintf(buf, 48, "addr=%u.%u.%u.%u port=%u proto=%s",
197                         NIPQUAD(addr->sin_addr.s_addr),
198                         ntohs(addr->sin_port),
199                         xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
200         }
201         xprt->address_strings[RPC_DISPLAY_ALL] = buf;
202 }
203
204 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
205 {
206         kfree(xprt->address_strings[RPC_DISPLAY_ADDR]);
207         kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
208         kfree(xprt->address_strings[RPC_DISPLAY_ALL]);
209 }
210
211 #define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
212
213 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
214 {
215         struct msghdr msg = {
216                 .msg_name       = addr,
217                 .msg_namelen    = addrlen,
218                 .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
219         };
220         struct kvec iov = {
221                 .iov_base       = vec->iov_base + base,
222                 .iov_len        = vec->iov_len - base,
223         };
224
225         if (iov.iov_len != 0)
226                 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
227         return kernel_sendmsg(sock, &msg, NULL, 0, 0);
228 }
229
230 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
231 {
232         struct page **ppage;
233         unsigned int remainder;
234         int err, sent = 0;
235
236         remainder = xdr->page_len - base;
237         base += xdr->page_base;
238         ppage = xdr->pages + (base >> PAGE_SHIFT);
239         base &= ~PAGE_MASK;
240         for(;;) {
241                 unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
242                 int flags = XS_SENDMSG_FLAGS;
243
244                 remainder -= len;
245                 if (remainder != 0 || more)
246                         flags |= MSG_MORE;
247                 err = sock->ops->sendpage(sock, *ppage, base, len, flags);
248                 if (remainder == 0 || err != len)
249                         break;
250                 sent += err;
251                 ppage++;
252                 base = 0;
253         }
254         if (sent == 0)
255                 return err;
256         if (err > 0)
257                 sent += err;
258         return sent;
259 }
260
261 /**
262  * xs_sendpages - write pages directly to a socket
263  * @sock: socket to send on
264  * @addr: UDP only -- address of destination
265  * @addrlen: UDP only -- length of destination address
266  * @xdr: buffer containing this request
267  * @base: starting position in the buffer
268  *
269  */
270 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
271 {
272         unsigned int remainder = xdr->len - base;
273         int err, sent = 0;
274
275         if (unlikely(!sock))
276                 return -ENOTCONN;
277
278         clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
279         if (base != 0) {
280                 addr = NULL;
281                 addrlen = 0;
282         }
283
284         if (base < xdr->head[0].iov_len || addr != NULL) {
285                 unsigned int len = xdr->head[0].iov_len - base;
286                 remainder -= len;
287                 err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
288                 if (remainder == 0 || err != len)
289                         goto out;
290                 sent += err;
291                 base = 0;
292         } else
293                 base -= xdr->head[0].iov_len;
294
295         if (base < xdr->page_len) {
296                 unsigned int len = xdr->page_len - base;
297                 remainder -= len;
298                 err = xs_send_pagedata(sock, xdr, base, remainder != 0);
299                 if (remainder == 0 || err != len)
300                         goto out;
301                 sent += err;
302                 base = 0;
303         } else
304                 base -= xdr->page_len;
305
306         if (base >= xdr->tail[0].iov_len)
307                 return sent;
308         err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
309 out:
310         if (sent == 0)
311                 return err;
312         if (err > 0)
313                 sent += err;
314         return sent;
315 }
316
317 /**
318  * xs_nospace - place task on wait queue if transmit was incomplete
319  * @task: task to put to sleep
320  *
321  */
322 static void xs_nospace(struct rpc_task *task)
323 {
324         struct rpc_rqst *req = task->tk_rqstp;
325         struct rpc_xprt *xprt = req->rq_xprt;
326         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
327
328         dprintk("RPC: %4d xmit incomplete (%u left of %u)\n",
329                         task->tk_pid, req->rq_slen - req->rq_bytes_sent,
330                         req->rq_slen);
331
332         if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
333                 /* Protect against races with write_space */
334                 spin_lock_bh(&xprt->transport_lock);
335
336                 /* Don't race with disconnect */
337                 if (!xprt_connected(xprt))
338                         task->tk_status = -ENOTCONN;
339                 else if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
340                         xprt_wait_for_buffer_space(task);
341
342                 spin_unlock_bh(&xprt->transport_lock);
343         } else
344                 /* Keep holding the socket if it is blocked */
345                 rpc_delay(task, HZ>>4);
346 }
347
348 /**
349  * xs_udp_send_request - write an RPC request to a UDP socket
350  * @task: address of RPC task that manages the state of an RPC request
351  *
352  * Return values:
353  *        0:    The request has been sent
354  *   EAGAIN:    The socket was blocked, please call again later to
355  *              complete the request
356  * ENOTCONN:    Caller needs to invoke connect logic then call again
357  *    other:    Some other error occured, the request was not sent
358  */
359 static int xs_udp_send_request(struct rpc_task *task)
360 {
361         struct rpc_rqst *req = task->tk_rqstp;
362         struct rpc_xprt *xprt = req->rq_xprt;
363         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
364         struct xdr_buf *xdr = &req->rq_snd_buf;
365         int status;
366
367         xs_pktdump("packet data:",
368                                 req->rq_svec->iov_base,
369                                 req->rq_svec->iov_len);
370
371         req->rq_xtime = jiffies;
372         status = xs_sendpages(transport->sock,
373                               (struct sockaddr *) &xprt->addr,
374                               xprt->addrlen, xdr,
375                               req->rq_bytes_sent);
376
377         dprintk("RPC:      xs_udp_send_request(%u) = %d\n",
378                         xdr->len - req->rq_bytes_sent, status);
379
380         if (likely(status >= (int) req->rq_slen))
381                 return 0;
382
383         /* Still some bytes left; set up for a retry later. */
384         if (status > 0)
385                 status = -EAGAIN;
386
387         switch (status) {
388         case -ENETUNREACH:
389         case -EPIPE:
390         case -ECONNREFUSED:
391                 /* When the server has died, an ICMP port unreachable message
392                  * prompts ECONNREFUSED. */
393                 break;
394         case -EAGAIN:
395                 xs_nospace(task);
396                 break;
397         default:
398                 dprintk("RPC:      sendmsg returned unrecognized error %d\n",
399                         -status);
400                 break;
401         }
402
403         return status;
404 }
405
406 static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
407 {
408         u32 reclen = buf->len - sizeof(rpc_fraghdr);
409         rpc_fraghdr *base = buf->head[0].iov_base;
410         *base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
411 }
412
413 /**
414  * xs_tcp_send_request - write an RPC request to a TCP socket
415  * @task: address of RPC task that manages the state of an RPC request
416  *
417  * Return values:
418  *        0:    The request has been sent
419  *   EAGAIN:    The socket was blocked, please call again later to
420  *              complete the request
421  * ENOTCONN:    Caller needs to invoke connect logic then call again
422  *    other:    Some other error occured, the request was not sent
423  *
424  * XXX: In the case of soft timeouts, should we eventually give up
425  *      if sendmsg is not able to make progress?
426  */
427 static int xs_tcp_send_request(struct rpc_task *task)
428 {
429         struct rpc_rqst *req = task->tk_rqstp;
430         struct rpc_xprt *xprt = req->rq_xprt;
431         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
432         struct xdr_buf *xdr = &req->rq_snd_buf;
433         int status, retry = 0;
434
435         xs_encode_tcp_record_marker(&req->rq_snd_buf);
436
437         xs_pktdump("packet data:",
438                                 req->rq_svec->iov_base,
439                                 req->rq_svec->iov_len);
440
441         /* Continue transmitting the packet/record. We must be careful
442          * to cope with writespace callbacks arriving _after_ we have
443          * called sendmsg(). */
444         while (1) {
445                 req->rq_xtime = jiffies;
446                 status = xs_sendpages(transport->sock,
447                                         NULL, 0, xdr, req->rq_bytes_sent);
448
449                 dprintk("RPC:      xs_tcp_send_request(%u) = %d\n",
450                                 xdr->len - req->rq_bytes_sent, status);
451
452                 if (unlikely(status < 0))
453                         break;
454
455                 /* If we've sent the entire packet, immediately
456                  * reset the count of bytes sent. */
457                 req->rq_bytes_sent += status;
458                 task->tk_bytes_sent += status;
459                 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
460                         req->rq_bytes_sent = 0;
461                         return 0;
462                 }
463
464                 status = -EAGAIN;
465                 if (retry++ > XS_SENDMSG_RETRY)
466                         break;
467         }
468
469         switch (status) {
470         case -EAGAIN:
471                 xs_nospace(task);
472                 break;
473         case -ECONNREFUSED:
474         case -ECONNRESET:
475         case -ENOTCONN:
476         case -EPIPE:
477                 status = -ENOTCONN;
478                 break;
479         default:
480                 dprintk("RPC:      sendmsg returned unrecognized error %d\n",
481                         -status);
482                 xprt_disconnect(xprt);
483                 break;
484         }
485
486         return status;
487 }
488
489 /**
490  * xs_tcp_release_xprt - clean up after a tcp transmission
491  * @xprt: transport
492  * @task: rpc task
493  *
494  * This cleans up if an error causes us to abort the transmission of a request.
495  * In this case, the socket may need to be reset in order to avoid confusing
496  * the server.
497  */
498 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
499 {
500         struct rpc_rqst *req;
501
502         if (task != xprt->snd_task)
503                 return;
504         if (task == NULL)
505                 goto out_release;
506         req = task->tk_rqstp;
507         if (req->rq_bytes_sent == 0)
508                 goto out_release;
509         if (req->rq_bytes_sent == req->rq_snd_buf.len)
510                 goto out_release;
511         set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
512 out_release:
513         xprt_release_xprt(xprt, task);
514 }
515
516 /**
517  * xs_close - close a socket
518  * @xprt: transport
519  *
520  * This is used when all requests are complete; ie, no DRC state remains
521  * on the server we want to save.
522  */
523 static void xs_close(struct rpc_xprt *xprt)
524 {
525         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
526         struct socket *sock = transport->sock;
527         struct sock *sk = transport->inet;
528
529         if (!sk)
530                 goto clear_close_wait;
531
532         dprintk("RPC:      xs_close xprt %p\n", xprt);
533
534         write_lock_bh(&sk->sk_callback_lock);
535         transport->inet = NULL;
536         transport->sock = NULL;
537
538         sk->sk_user_data = NULL;
539         sk->sk_data_ready = xprt->old_data_ready;
540         sk->sk_state_change = xprt->old_state_change;
541         sk->sk_write_space = xprt->old_write_space;
542         write_unlock_bh(&sk->sk_callback_lock);
543
544         sk->sk_no_check = 0;
545
546         sock_release(sock);
547 clear_close_wait:
548         smp_mb__before_clear_bit();
549         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
550         smp_mb__after_clear_bit();
551 }
552
553 /**
554  * xs_destroy - prepare to shutdown a transport
555  * @xprt: doomed transport
556  *
557  */
558 static void xs_destroy(struct rpc_xprt *xprt)
559 {
560         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
561
562         dprintk("RPC:      xs_destroy xprt %p\n", xprt);
563
564         cancel_delayed_work(&transport->connect_worker);
565         flush_scheduled_work();
566
567         xprt_disconnect(xprt);
568         xs_close(xprt);
569         xs_free_peer_addresses(xprt);
570         kfree(xprt->slot);
571         kfree(xprt);
572 }
573
574 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
575 {
576         return (struct rpc_xprt *) sk->sk_user_data;
577 }
578
579 /**
580  * xs_udp_data_ready - "data ready" callback for UDP sockets
581  * @sk: socket with data to read
582  * @len: how much data to read
583  *
584  */
585 static void xs_udp_data_ready(struct sock *sk, int len)
586 {
587         struct rpc_task *task;
588         struct rpc_xprt *xprt;
589         struct rpc_rqst *rovr;
590         struct sk_buff *skb;
591         int err, repsize, copied;
592         u32 _xid;
593         __be32 *xp;
594
595         read_lock(&sk->sk_callback_lock);
596         dprintk("RPC:      xs_udp_data_ready...\n");
597         if (!(xprt = xprt_from_sock(sk)))
598                 goto out;
599
600         if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
601                 goto out;
602
603         if (xprt->shutdown)
604                 goto dropit;
605
606         repsize = skb->len - sizeof(struct udphdr);
607         if (repsize < 4) {
608                 dprintk("RPC:      impossible RPC reply size %d!\n", repsize);
609                 goto dropit;
610         }
611
612         /* Copy the XID from the skb... */
613         xp = skb_header_pointer(skb, sizeof(struct udphdr),
614                                 sizeof(_xid), &_xid);
615         if (xp == NULL)
616                 goto dropit;
617
618         /* Look up and lock the request corresponding to the given XID */
619         spin_lock(&xprt->transport_lock);
620         rovr = xprt_lookup_rqst(xprt, *xp);
621         if (!rovr)
622                 goto out_unlock;
623         task = rovr->rq_task;
624
625         if ((copied = rovr->rq_private_buf.buflen) > repsize)
626                 copied = repsize;
627
628         /* Suck it into the iovec, verify checksum if not done by hw. */
629         if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
630                 goto out_unlock;
631
632         /* Something worked... */
633         dst_confirm(skb->dst);
634
635         xprt_adjust_cwnd(task, copied);
636         xprt_update_rtt(task);
637         xprt_complete_rqst(task, copied);
638
639  out_unlock:
640         spin_unlock(&xprt->transport_lock);
641  dropit:
642         skb_free_datagram(sk, skb);
643  out:
644         read_unlock(&sk->sk_callback_lock);
645 }
646
647 static inline size_t xs_tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
648 {
649         if (len > desc->count)
650                 len = desc->count;
651         if (skb_copy_bits(desc->skb, desc->offset, p, len)) {
652                 dprintk("RPC:      failed to copy %zu bytes from skb. %zu bytes remain\n",
653                                 len, desc->count);
654                 return 0;
655         }
656         desc->offset += len;
657         desc->count -= len;
658         dprintk("RPC:      copied %zu bytes from skb. %zu bytes remain\n",
659                         len, desc->count);
660         return len;
661 }
662
663 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
664 {
665         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
666         size_t len, used;
667         char *p;
668
669         p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
670         len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
671         used = xs_tcp_copy_data(desc, p, len);
672         transport->tcp_offset += used;
673         if (used != len)
674                 return;
675
676         transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
677         if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
678                 transport->tcp_flags |= TCP_RCV_LAST_FRAG;
679         else
680                 transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
681         transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
682
683         transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
684         transport->tcp_offset = 0;
685
686         /* Sanity check of the record length */
687         if (unlikely(transport->tcp_reclen < 4)) {
688                 dprintk("RPC:      invalid TCP record fragment length\n");
689                 xprt_disconnect(xprt);
690                 return;
691         }
692         dprintk("RPC:      reading TCP record fragment of length %d\n",
693                         transport->tcp_reclen);
694 }
695
696 static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
697 {
698         if (transport->tcp_offset == transport->tcp_reclen) {
699                 transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
700                 transport->tcp_offset = 0;
701                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
702                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
703                         transport->tcp_flags |= TCP_RCV_COPY_XID;
704                         transport->tcp_copied = 0;
705                 }
706         }
707 }
708
709 static inline void xs_tcp_read_xid(struct sock_xprt *transport, skb_reader_t *desc)
710 {
711         size_t len, used;
712         char *p;
713
714         len = sizeof(transport->tcp_xid) - transport->tcp_offset;
715         dprintk("RPC:      reading XID (%Zu bytes)\n", len);
716         p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
717         used = xs_tcp_copy_data(desc, p, len);
718         transport->tcp_offset += used;
719         if (used != len)
720                 return;
721         transport->tcp_flags &= ~TCP_RCV_COPY_XID;
722         transport->tcp_flags |= TCP_RCV_COPY_DATA;
723         transport->tcp_copied = 4;
724         dprintk("RPC:      reading reply for XID %08x\n",
725                         ntohl(transport->tcp_xid));
726         xs_tcp_check_fraghdr(transport);
727 }
728
729 static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
730 {
731         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
732         struct rpc_rqst *req;
733         struct xdr_buf *rcvbuf;
734         size_t len;
735         ssize_t r;
736
737         /* Find and lock the request corresponding to this xid */
738         spin_lock(&xprt->transport_lock);
739         req = xprt_lookup_rqst(xprt, transport->tcp_xid);
740         if (!req) {
741                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
742                 dprintk("RPC:      XID %08x request not found!\n",
743                                 ntohl(transport->tcp_xid));
744                 spin_unlock(&xprt->transport_lock);
745                 return;
746         }
747
748         rcvbuf = &req->rq_private_buf;
749         len = desc->count;
750         if (len > transport->tcp_reclen - transport->tcp_offset) {
751                 skb_reader_t my_desc;
752
753                 len = transport->tcp_reclen - transport->tcp_offset;
754                 memcpy(&my_desc, desc, sizeof(my_desc));
755                 my_desc.count = len;
756                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
757                                           &my_desc, xs_tcp_copy_data);
758                 desc->count -= r;
759                 desc->offset += r;
760         } else
761                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
762                                           desc, xs_tcp_copy_data);
763
764         if (r > 0) {
765                 transport->tcp_copied += r;
766                 transport->tcp_offset += r;
767         }
768         if (r != len) {
769                 /* Error when copying to the receive buffer,
770                  * usually because we weren't able to allocate
771                  * additional buffer pages. All we can do now
772                  * is turn off TCP_RCV_COPY_DATA, so the request
773                  * will not receive any additional updates,
774                  * and time out.
775                  * Any remaining data from this record will
776                  * be discarded.
777                  */
778                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
779                 dprintk("RPC:      XID %08x truncated request\n",
780                                 ntohl(transport->tcp_xid));
781                 dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
782                                 xprt, transport->tcp_copied, transport->tcp_offset,
783                                         transport->tcp_reclen);
784                 goto out;
785         }
786
787         dprintk("RPC:      XID %08x read %Zd bytes\n",
788                         ntohl(transport->tcp_xid), r);
789         dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
790                         xprt, transport->tcp_copied, transport->tcp_offset,
791                                 transport->tcp_reclen);
792
793         if (transport->tcp_copied == req->rq_private_buf.buflen)
794                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
795         else if (transport->tcp_offset == transport->tcp_reclen) {
796                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
797                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
798         }
799
800 out:
801         if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
802                 xprt_complete_rqst(req->rq_task, transport->tcp_copied);
803         spin_unlock(&xprt->transport_lock);
804         xs_tcp_check_fraghdr(transport);
805 }
806
807 static inline void xs_tcp_read_discard(struct sock_xprt *transport, skb_reader_t *desc)
808 {
809         size_t len;
810
811         len = transport->tcp_reclen - transport->tcp_offset;
812         if (len > desc->count)
813                 len = desc->count;
814         desc->count -= len;
815         desc->offset += len;
816         transport->tcp_offset += len;
817         dprintk("RPC:      discarded %Zu bytes\n", len);
818         xs_tcp_check_fraghdr(transport);
819 }
820
821 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
822 {
823         struct rpc_xprt *xprt = rd_desc->arg.data;
824         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
825         skb_reader_t desc = {
826                 .skb    = skb,
827                 .offset = offset,
828                 .count  = len,
829         };
830
831         dprintk("RPC:      xs_tcp_data_recv started\n");
832         do {
833                 /* Read in a new fragment marker if necessary */
834                 /* Can we ever really expect to get completely empty fragments? */
835                 if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
836                         xs_tcp_read_fraghdr(xprt, &desc);
837                         continue;
838                 }
839                 /* Read in the xid if necessary */
840                 if (transport->tcp_flags & TCP_RCV_COPY_XID) {
841                         xs_tcp_read_xid(transport, &desc);
842                         continue;
843                 }
844                 /* Read in the request data */
845                 if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
846                         xs_tcp_read_request(xprt, &desc);
847                         continue;
848                 }
849                 /* Skip over any trailing bytes on short reads */
850                 xs_tcp_read_discard(transport, &desc);
851         } while (desc.count);
852         dprintk("RPC:      xs_tcp_data_recv done\n");
853         return len - desc.count;
854 }
855
856 /**
857  * xs_tcp_data_ready - "data ready" callback for TCP sockets
858  * @sk: socket with data to read
859  * @bytes: how much data to read
860  *
861  */
862 static void xs_tcp_data_ready(struct sock *sk, int bytes)
863 {
864         struct rpc_xprt *xprt;
865         read_descriptor_t rd_desc;
866
867         read_lock(&sk->sk_callback_lock);
868         dprintk("RPC:      xs_tcp_data_ready...\n");
869         if (!(xprt = xprt_from_sock(sk)))
870                 goto out;
871         if (xprt->shutdown)
872                 goto out;
873
874         /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
875         rd_desc.arg.data = xprt;
876         rd_desc.count = 65536;
877         tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
878 out:
879         read_unlock(&sk->sk_callback_lock);
880 }
881
882 /**
883  * xs_tcp_state_change - callback to handle TCP socket state changes
884  * @sk: socket whose state has changed
885  *
886  */
887 static void xs_tcp_state_change(struct sock *sk)
888 {
889         struct rpc_xprt *xprt;
890
891         read_lock(&sk->sk_callback_lock);
892         if (!(xprt = xprt_from_sock(sk)))
893                 goto out;
894         dprintk("RPC:      xs_tcp_state_change client %p...\n", xprt);
895         dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
896                                 sk->sk_state, xprt_connected(xprt),
897                                 sock_flag(sk, SOCK_DEAD),
898                                 sock_flag(sk, SOCK_ZAPPED));
899
900         switch (sk->sk_state) {
901         case TCP_ESTABLISHED:
902                 spin_lock_bh(&xprt->transport_lock);
903                 if (!xprt_test_and_set_connected(xprt)) {
904                         struct sock_xprt *transport = container_of(xprt,
905                                         struct sock_xprt, xprt);
906
907                         /* Reset TCP record info */
908                         transport->tcp_offset = 0;
909                         transport->tcp_reclen = 0;
910                         transport->tcp_copied = 0;
911                         transport->tcp_flags =
912                                 TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
913
914                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
915                         xprt_wake_pending_tasks(xprt, 0);
916                 }
917                 spin_unlock_bh(&xprt->transport_lock);
918                 break;
919         case TCP_SYN_SENT:
920         case TCP_SYN_RECV:
921                 break;
922         case TCP_CLOSE_WAIT:
923                 /* Try to schedule an autoclose RPC calls */
924                 set_bit(XPRT_CLOSE_WAIT, &xprt->state);
925                 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
926                         schedule_work(&xprt->task_cleanup);
927         default:
928                 xprt_disconnect(xprt);
929         }
930  out:
931         read_unlock(&sk->sk_callback_lock);
932 }
933
934 /**
935  * xs_udp_write_space - callback invoked when socket buffer space
936  *                             becomes available
937  * @sk: socket whose state has changed
938  *
939  * Called when more output buffer space is available for this socket.
940  * We try not to wake our writers until they can make "significant"
941  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
942  * with a bunch of small requests.
943  */
944 static void xs_udp_write_space(struct sock *sk)
945 {
946         read_lock(&sk->sk_callback_lock);
947
948         /* from net/core/sock.c:sock_def_write_space */
949         if (sock_writeable(sk)) {
950                 struct socket *sock;
951                 struct rpc_xprt *xprt;
952
953                 if (unlikely(!(sock = sk->sk_socket)))
954                         goto out;
955                 if (unlikely(!(xprt = xprt_from_sock(sk))))
956                         goto out;
957                 if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
958                         goto out;
959
960                 xprt_write_space(xprt);
961         }
962
963  out:
964         read_unlock(&sk->sk_callback_lock);
965 }
966
967 /**
968  * xs_tcp_write_space - callback invoked when socket buffer space
969  *                             becomes available
970  * @sk: socket whose state has changed
971  *
972  * Called when more output buffer space is available for this socket.
973  * We try not to wake our writers until they can make "significant"
974  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
975  * with a bunch of small requests.
976  */
977 static void xs_tcp_write_space(struct sock *sk)
978 {
979         read_lock(&sk->sk_callback_lock);
980
981         /* from net/core/stream.c:sk_stream_write_space */
982         if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
983                 struct socket *sock;
984                 struct rpc_xprt *xprt;
985
986                 if (unlikely(!(sock = sk->sk_socket)))
987                         goto out;
988                 if (unlikely(!(xprt = xprt_from_sock(sk))))
989                         goto out;
990                 if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
991                         goto out;
992
993                 xprt_write_space(xprt);
994         }
995
996  out:
997         read_unlock(&sk->sk_callback_lock);
998 }
999
1000 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1001 {
1002         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1003         struct sock *sk = transport->inet;
1004
1005         if (transport->rcvsize) {
1006                 sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1007                 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1008         }
1009         if (transport->sndsize) {
1010                 sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1011                 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1012                 sk->sk_write_space(sk);
1013         }
1014 }
1015
1016 /**
1017  * xs_udp_set_buffer_size - set send and receive limits
1018  * @xprt: generic transport
1019  * @sndsize: requested size of send buffer, in bytes
1020  * @rcvsize: requested size of receive buffer, in bytes
1021  *
1022  * Set socket send and receive buffer size limits.
1023  */
1024 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1025 {
1026         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1027
1028         transport->sndsize = 0;
1029         if (sndsize)
1030                 transport->sndsize = sndsize + 1024;
1031         transport->rcvsize = 0;
1032         if (rcvsize)
1033                 transport->rcvsize = rcvsize + 1024;
1034
1035         xs_udp_do_set_buffer_size(xprt);
1036 }
1037
1038 /**
1039  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1040  * @task: task that timed out
1041  *
1042  * Adjust the congestion window after a retransmit timeout has occurred.
1043  */
1044 static void xs_udp_timer(struct rpc_task *task)
1045 {
1046         xprt_adjust_cwnd(task, -ETIMEDOUT);
1047 }
1048
1049 static unsigned short xs_get_random_port(void)
1050 {
1051         unsigned short range = xprt_max_resvport - xprt_min_resvport;
1052         unsigned short rand = (unsigned short) net_random() % range;
1053         return rand + xprt_min_resvport;
1054 }
1055
1056 /**
1057  * xs_print_peer_address - format an IPv4 address for printing
1058  * @xprt: generic transport
1059  * @format: flags field indicating which parts of the address to render
1060  */
1061 static char *xs_print_peer_address(struct rpc_xprt *xprt, enum rpc_display_format_t format)
1062 {
1063         if (xprt->address_strings[format] != NULL)
1064                 return xprt->address_strings[format];
1065         else
1066                 return "unprintable";
1067 }
1068
1069 /**
1070  * xs_set_port - reset the port number in the remote endpoint address
1071  * @xprt: generic transport
1072  * @port: new port number
1073  *
1074  */
1075 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1076 {
1077         struct sockaddr_in *sap = (struct sockaddr_in *) &xprt->addr;
1078
1079         dprintk("RPC:      setting port for xprt %p to %u\n", xprt, port);
1080
1081         sap->sin_port = htons(port);
1082 }
1083
1084 static int xs_bindresvport(struct sock_xprt *transport, struct socket *sock)
1085 {
1086         struct sockaddr_in myaddr = {
1087                 .sin_family = AF_INET,
1088         };
1089         int err;
1090         unsigned short port = transport->port;
1091
1092         do {
1093                 myaddr.sin_port = htons(port);
1094                 err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1095                                                 sizeof(myaddr));
1096                 if (err == 0) {
1097                         transport->port = port;
1098                         dprintk("RPC:      xs_bindresvport bound to port %u\n",
1099                                         port);
1100                         return 0;
1101                 }
1102                 if (port <= xprt_min_resvport)
1103                         port = xprt_max_resvport;
1104                 else
1105                         port--;
1106         } while (err == -EADDRINUSE && port != transport->port);
1107
1108         dprintk("RPC:      can't bind to reserved port (%d).\n", -err);
1109         return err;
1110 }
1111
1112 /**
1113  * xs_udp_connect_worker - set up a UDP socket
1114  * @args: RPC transport to connect
1115  *
1116  * Invoked by a work queue tasklet.
1117  */
1118 static void xs_udp_connect_worker(void *args)
1119 {
1120         struct sock_xprt *transport = (struct sock_xprt *)args;
1121         struct rpc_xprt *xprt = &transport->xprt;
1122         struct socket *sock = transport->sock;
1123         int err, status = -EIO;
1124
1125         if (xprt->shutdown || !xprt_bound(xprt))
1126                 goto out;
1127
1128         /* Start by resetting any existing state */
1129         xs_close(xprt);
1130
1131         if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
1132                 dprintk("RPC:      can't create UDP transport socket (%d).\n", -err);
1133                 goto out;
1134         }
1135
1136         if (xprt->resvport && xs_bindresvport(transport, sock) < 0) {
1137                 sock_release(sock);
1138                 goto out;
1139         }
1140
1141         dprintk("RPC:      worker connecting xprt %p to address: %s\n",
1142                         xprt, xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
1143
1144         if (!transport->inet) {
1145                 struct sock *sk = sock->sk;
1146
1147                 write_lock_bh(&sk->sk_callback_lock);
1148
1149                 sk->sk_user_data = xprt;
1150                 xprt->old_data_ready = sk->sk_data_ready;
1151                 xprt->old_state_change = sk->sk_state_change;
1152                 xprt->old_write_space = sk->sk_write_space;
1153                 sk->sk_data_ready = xs_udp_data_ready;
1154                 sk->sk_write_space = xs_udp_write_space;
1155                 sk->sk_no_check = UDP_CSUM_NORCV;
1156                 sk->sk_allocation = GFP_ATOMIC;
1157
1158                 xprt_set_connected(xprt);
1159
1160                 /* Reset to new socket */
1161                 transport->sock = sock;
1162                 transport->inet = sk;
1163
1164                 write_unlock_bh(&sk->sk_callback_lock);
1165         }
1166         xs_udp_do_set_buffer_size(xprt);
1167         status = 0;
1168 out:
1169         xprt_wake_pending_tasks(xprt, status);
1170         xprt_clear_connecting(xprt);
1171 }
1172
1173 /*
1174  * We need to preserve the port number so the reply cache on the server can
1175  * find our cached RPC replies when we get around to reconnecting.
1176  */
1177 static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
1178 {
1179         int result;
1180         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1181         struct sockaddr any;
1182
1183         dprintk("RPC:      disconnecting xprt %p to reuse port\n", xprt);
1184
1185         /*
1186          * Disconnect the transport socket by doing a connect operation
1187          * with AF_UNSPEC.  This should return immediately...
1188          */
1189         memset(&any, 0, sizeof(any));
1190         any.sa_family = AF_UNSPEC;
1191         result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1192         if (result)
1193                 dprintk("RPC:      AF_UNSPEC connect return code %d\n",
1194                                 result);
1195 }
1196
1197 /**
1198  * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint
1199  * @args: RPC transport to connect
1200  *
1201  * Invoked by a work queue tasklet.
1202  */
1203 static void xs_tcp_connect_worker(void *args)
1204 {
1205         struct sock_xprt *transport = (struct sock_xprt *)args;
1206         struct rpc_xprt *xprt = &transport->xprt;
1207         struct socket *sock = transport->sock;
1208         int err, status = -EIO;
1209
1210         if (xprt->shutdown || !xprt_bound(xprt))
1211                 goto out;
1212
1213         if (!sock) {
1214                 /* start from scratch */
1215                 if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
1216                         dprintk("RPC:      can't create TCP transport socket (%d).\n", -err);
1217                         goto out;
1218                 }
1219
1220                 if (xprt->resvport && xs_bindresvport(transport, sock) < 0) {
1221                         sock_release(sock);
1222                         goto out;
1223                 }
1224         } else
1225                 /* "close" the socket, preserving the local port */
1226                 xs_tcp_reuse_connection(xprt);
1227
1228         dprintk("RPC:      worker connecting xprt %p to address: %s\n",
1229                         xprt, xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
1230
1231         if (!transport->inet) {
1232                 struct sock *sk = sock->sk;
1233
1234                 write_lock_bh(&sk->sk_callback_lock);
1235
1236                 sk->sk_user_data = xprt;
1237                 xprt->old_data_ready = sk->sk_data_ready;
1238                 xprt->old_state_change = sk->sk_state_change;
1239                 xprt->old_write_space = sk->sk_write_space;
1240                 sk->sk_data_ready = xs_tcp_data_ready;
1241                 sk->sk_state_change = xs_tcp_state_change;
1242                 sk->sk_write_space = xs_tcp_write_space;
1243                 sk->sk_allocation = GFP_ATOMIC;
1244
1245                 /* socket options */
1246                 sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1247                 sock_reset_flag(sk, SOCK_LINGER);
1248                 tcp_sk(sk)->linger2 = 0;
1249                 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1250
1251                 xprt_clear_connected(xprt);
1252
1253                 /* Reset to new socket */
1254                 transport->sock = sock;
1255                 transport->inet = sk;
1256
1257                 write_unlock_bh(&sk->sk_callback_lock);
1258         }
1259
1260         /* Tell the socket layer to start connecting... */
1261         xprt->stat.connect_count++;
1262         xprt->stat.connect_start = jiffies;
1263         status = kernel_connect(sock, (struct sockaddr *) &xprt->addr,
1264                         xprt->addrlen, O_NONBLOCK);
1265         dprintk("RPC: %p  connect status %d connected %d sock state %d\n",
1266                         xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
1267         if (status < 0) {
1268                 switch (status) {
1269                         case -EINPROGRESS:
1270                         case -EALREADY:
1271                                 goto out_clear;
1272                         case -ECONNREFUSED:
1273                         case -ECONNRESET:
1274                                 /* retry with existing socket, after a delay */
1275                                 break;
1276                         default:
1277                                 /* get rid of existing socket, and retry */
1278                                 xs_close(xprt);
1279                                 break;
1280                 }
1281         }
1282 out:
1283         xprt_wake_pending_tasks(xprt, status);
1284 out_clear:
1285         xprt_clear_connecting(xprt);
1286 }
1287
1288 /**
1289  * xs_connect - connect a socket to a remote endpoint
1290  * @task: address of RPC task that manages state of connect request
1291  *
1292  * TCP: If the remote end dropped the connection, delay reconnecting.
1293  *
1294  * UDP socket connects are synchronous, but we use a work queue anyway
1295  * to guarantee that even unprivileged user processes can set up a
1296  * socket on a privileged port.
1297  *
1298  * If a UDP socket connect fails, the delay behavior here prevents
1299  * retry floods (hard mounts).
1300  */
1301 static void xs_connect(struct rpc_task *task)
1302 {
1303         struct rpc_xprt *xprt = task->tk_xprt;
1304         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1305
1306         if (xprt_test_and_set_connecting(xprt))
1307                 return;
1308
1309         if (transport->sock != NULL) {
1310                 dprintk("RPC:      xs_connect delayed xprt %p for %lu seconds\n",
1311                                 xprt, xprt->reestablish_timeout / HZ);
1312                 schedule_delayed_work(&transport->connect_worker,
1313                                         xprt->reestablish_timeout);
1314                 xprt->reestablish_timeout <<= 1;
1315                 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
1316                         xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
1317         } else {
1318                 dprintk("RPC:      xs_connect scheduled xprt %p\n", xprt);
1319                 schedule_work(&transport->connect_worker);
1320
1321                 /* flush_scheduled_work can sleep... */
1322                 if (!RPC_IS_ASYNC(task))
1323                         flush_scheduled_work();
1324         }
1325 }
1326
1327 /**
1328  * xs_udp_print_stats - display UDP socket-specifc stats
1329  * @xprt: rpc_xprt struct containing statistics
1330  * @seq: output file
1331  *
1332  */
1333 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
1334 {
1335         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1336
1337         seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
1338                         transport->port,
1339                         xprt->stat.bind_count,
1340                         xprt->stat.sends,
1341                         xprt->stat.recvs,
1342                         xprt->stat.bad_xids,
1343                         xprt->stat.req_u,
1344                         xprt->stat.bklog_u);
1345 }
1346
1347 /**
1348  * xs_tcp_print_stats - display TCP socket-specifc stats
1349  * @xprt: rpc_xprt struct containing statistics
1350  * @seq: output file
1351  *
1352  */
1353 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
1354 {
1355         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1356         long idle_time = 0;
1357
1358         if (xprt_connected(xprt))
1359                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
1360
1361         seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
1362                         transport->port,
1363                         xprt->stat.bind_count,
1364                         xprt->stat.connect_count,
1365                         xprt->stat.connect_time,
1366                         idle_time,
1367                         xprt->stat.sends,
1368                         xprt->stat.recvs,
1369                         xprt->stat.bad_xids,
1370                         xprt->stat.req_u,
1371                         xprt->stat.bklog_u);
1372 }
1373
1374 static struct rpc_xprt_ops xs_udp_ops = {
1375         .set_buffer_size        = xs_udp_set_buffer_size,
1376         .print_addr             = xs_print_peer_address,
1377         .reserve_xprt           = xprt_reserve_xprt_cong,
1378         .release_xprt           = xprt_release_xprt_cong,
1379         .rpcbind                = rpc_getport,
1380         .set_port               = xs_set_port,
1381         .connect                = xs_connect,
1382         .buf_alloc              = rpc_malloc,
1383         .buf_free               = rpc_free,
1384         .send_request           = xs_udp_send_request,
1385         .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
1386         .timer                  = xs_udp_timer,
1387         .release_request        = xprt_release_rqst_cong,
1388         .close                  = xs_close,
1389         .destroy                = xs_destroy,
1390         .print_stats            = xs_udp_print_stats,
1391 };
1392
1393 static struct rpc_xprt_ops xs_tcp_ops = {
1394         .print_addr             = xs_print_peer_address,
1395         .reserve_xprt           = xprt_reserve_xprt,
1396         .release_xprt           = xs_tcp_release_xprt,
1397         .rpcbind                = rpc_getport,
1398         .set_port               = xs_set_port,
1399         .connect                = xs_connect,
1400         .buf_alloc              = rpc_malloc,
1401         .buf_free               = rpc_free,
1402         .send_request           = xs_tcp_send_request,
1403         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
1404         .close                  = xs_close,
1405         .destroy                = xs_destroy,
1406         .print_stats            = xs_tcp_print_stats,
1407 };
1408
1409 static struct rpc_xprt *xs_setup_xprt(struct sockaddr *addr, size_t addrlen, unsigned int slot_table_size)
1410 {
1411         struct rpc_xprt *xprt;
1412         struct sock_xprt *new;
1413
1414         if (addrlen > sizeof(xprt->addr)) {
1415                 dprintk("RPC:      xs_setup_xprt: address too large\n");
1416                 return ERR_PTR(-EBADF);
1417         }
1418
1419         new = kzalloc(sizeof(*new), GFP_KERNEL);
1420         if (new == NULL) {
1421                 dprintk("RPC:      xs_setup_xprt: couldn't allocate rpc_xprt\n");
1422                 return ERR_PTR(-ENOMEM);
1423         }
1424         xprt = &new->xprt;
1425
1426         xprt->max_reqs = slot_table_size;
1427         xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
1428         if (xprt->slot == NULL) {
1429                 kfree(xprt);
1430                 dprintk("RPC:      xs_setup_xprt: couldn't allocate slot table\n");
1431                 return ERR_PTR(-ENOMEM);
1432         }
1433
1434         memcpy(&xprt->addr, addr, addrlen);
1435         xprt->addrlen = addrlen;
1436         new->port = xs_get_random_port();
1437
1438         return xprt;
1439 }
1440
1441 /**
1442  * xs_setup_udp - Set up transport to use a UDP socket
1443  * @addr: address of remote server
1444  * @addrlen: length of address in bytes
1445  * @to:   timeout parameters
1446  *
1447  */
1448 struct rpc_xprt *xs_setup_udp(struct sockaddr *addr, size_t addrlen, struct rpc_timeout *to)
1449 {
1450         struct rpc_xprt *xprt;
1451         struct sock_xprt *transport;
1452
1453         xprt = xs_setup_xprt(addr, addrlen, xprt_udp_slot_table_entries);
1454         if (IS_ERR(xprt))
1455                 return xprt;
1456         transport = container_of(xprt, struct sock_xprt, xprt);
1457
1458         if (ntohs(((struct sockaddr_in *)addr)->sin_port) != 0)
1459                 xprt_set_bound(xprt);
1460
1461         xprt->prot = IPPROTO_UDP;
1462         xprt->tsh_size = 0;
1463         /* XXX: header size can vary due to auth type, IPv6, etc. */
1464         xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
1465
1466         INIT_WORK(&transport->connect_worker, xs_udp_connect_worker, transport);
1467         xprt->bind_timeout = XS_BIND_TO;
1468         xprt->connect_timeout = XS_UDP_CONN_TO;
1469         xprt->reestablish_timeout = XS_UDP_REEST_TO;
1470         xprt->idle_timeout = XS_IDLE_DISC_TO;
1471
1472         xprt->ops = &xs_udp_ops;
1473
1474         if (to)
1475                 xprt->timeout = *to;
1476         else
1477                 xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
1478
1479         xs_format_peer_addresses(xprt);
1480         dprintk("RPC:      set up transport to address %s\n",
1481                         xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
1482
1483         return xprt;
1484 }
1485
1486 /**
1487  * xs_setup_tcp - Set up transport to use a TCP socket
1488  * @addr: address of remote server
1489  * @addrlen: length of address in bytes
1490  * @to: timeout parameters
1491  *
1492  */
1493 struct rpc_xprt *xs_setup_tcp(struct sockaddr *addr, size_t addrlen, struct rpc_timeout *to)
1494 {
1495         struct rpc_xprt *xprt;
1496         struct sock_xprt *transport;
1497
1498         xprt = xs_setup_xprt(addr, addrlen, xprt_tcp_slot_table_entries);
1499         if (IS_ERR(xprt))
1500                 return xprt;
1501         transport = container_of(xprt, struct sock_xprt, xprt);
1502
1503         if (ntohs(((struct sockaddr_in *)addr)->sin_port) != 0)
1504                 xprt_set_bound(xprt);
1505
1506         xprt->prot = IPPROTO_TCP;
1507         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
1508         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
1509
1510         INIT_WORK(&transport->connect_worker, xs_tcp_connect_worker, transport);
1511         xprt->bind_timeout = XS_BIND_TO;
1512         xprt->connect_timeout = XS_TCP_CONN_TO;
1513         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1514         xprt->idle_timeout = XS_IDLE_DISC_TO;
1515
1516         xprt->ops = &xs_tcp_ops;
1517
1518         if (to)
1519                 xprt->timeout = *to;
1520         else
1521                 xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
1522
1523         xs_format_peer_addresses(xprt);
1524         dprintk("RPC:      set up transport to address %s\n",
1525                         xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
1526
1527         return xprt;
1528 }