manual update from upstream:
[linux-2.6] / net / sunrpc / xprt.c
1 /*
2  *  linux/net/sunrpc/xprt.c
3  *
4  *  This is a generic RPC call interface supporting congestion avoidance,
5  *  and asynchronous calls.
6  *
7  *  The interface works like this:
8  *
9  *  -   When a process places a call, it allocates a request slot if
10  *      one is available. Otherwise, it sleeps on the backlog queue
11  *      (xprt_reserve).
12  *  -   Next, the caller puts together the RPC message, stuffs it into
13  *      the request struct, and calls xprt_transmit().
14  *  -   xprt_transmit sends the message and installs the caller on the
15  *      transport's wait list. At the same time, it installs a timer that
16  *      is run after the packet's timeout has expired.
17  *  -   When a packet arrives, the data_ready handler walks the list of
18  *      pending requests for that transport. If a matching XID is found, the
19  *      caller is woken up, and the timer removed.
20  *  -   When no reply arrives within the timeout interval, the timer is
21  *      fired by the kernel and runs xprt_timer(). It either adjusts the
22  *      timeout values (minor timeout) or wakes up the caller with a status
23  *      of -ETIMEDOUT.
24  *  -   When the caller receives a notification from RPC that a reply arrived,
25  *      it should release the RPC slot, and process the reply.
26  *      If the call timed out, it may choose to retry the operation by
27  *      adjusting the initial timeout value, and simply calling rpc_call
28  *      again.
29  *
30  *  Support for async RPC is done through a set of RPC-specific scheduling
31  *  primitives that `transparently' work for processes as well as async
32  *  tasks that rely on callbacks.
33  *
34  *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
35  *
36  *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
37  */
38
39 #include <linux/module.h>
40
41 #include <linux/types.h>
42 #include <linux/interrupt.h>
43 #include <linux/workqueue.h>
44 #include <linux/random.h>
45
46 #include <linux/sunrpc/clnt.h>
47
48 /*
49  * Local variables
50  */
51
52 #ifdef RPC_DEBUG
53 # undef  RPC_DEBUG_DATA
54 # define RPCDBG_FACILITY        RPCDBG_XPRT
55 #endif
56
57 /*
58  * Local functions
59  */
60 static void     xprt_request_init(struct rpc_task *, struct rpc_xprt *);
61 static inline void      do_xprt_reserve(struct rpc_task *);
62 static void     xprt_connect_status(struct rpc_task *task);
63 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
64
65 /*
66  * The transport code maintains an estimate on the maximum number of out-
67  * standing RPC requests, using a smoothed version of the congestion
68  * avoidance implemented in 44BSD. This is basically the Van Jacobson
69  * congestion algorithm: If a retransmit occurs, the congestion window is
70  * halved; otherwise, it is incremented by 1/cwnd when
71  *
72  *      -       a reply is received and
73  *      -       a full number of requests are outstanding and
74  *      -       the congestion window hasn't been updated recently.
75  */
76 #define RPC_CWNDSHIFT           (8U)
77 #define RPC_CWNDSCALE           (1U << RPC_CWNDSHIFT)
78 #define RPC_INITCWND            RPC_CWNDSCALE
79 #define RPC_MAXCWND(xprt)       ((xprt)->max_reqs << RPC_CWNDSHIFT)
80
81 #define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
82
83 /**
84  * xprt_reserve_xprt - serialize write access to transports
85  * @task: task that is requesting access to the transport
86  *
87  * This prevents mixing the payload of separate requests, and prevents
88  * transport connects from colliding with writes.  No congestion control
89  * is provided.
90  */
91 int xprt_reserve_xprt(struct rpc_task *task)
92 {
93         struct rpc_xprt *xprt = task->tk_xprt;
94         struct rpc_rqst *req = task->tk_rqstp;
95
96         if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
97                 if (task == xprt->snd_task)
98                         return 1;
99                 if (task == NULL)
100                         return 0;
101                 goto out_sleep;
102         }
103         xprt->snd_task = task;
104         if (req) {
105                 req->rq_bytes_sent = 0;
106                 req->rq_ntrans++;
107         }
108         return 1;
109
110 out_sleep:
111         dprintk("RPC: %4d failed to lock transport %p\n",
112                         task->tk_pid, xprt);
113         task->tk_timeout = 0;
114         task->tk_status = -EAGAIN;
115         if (req && req->rq_ntrans)
116                 rpc_sleep_on(&xprt->resend, task, NULL, NULL);
117         else
118                 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
119         return 0;
120 }
121
122 /*
123  * xprt_reserve_xprt_cong - serialize write access to transports
124  * @task: task that is requesting access to the transport
125  *
126  * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
127  * integrated into the decision of whether a request is allowed to be
128  * woken up and given access to the transport.
129  */
130 int xprt_reserve_xprt_cong(struct rpc_task *task)
131 {
132         struct rpc_xprt *xprt = task->tk_xprt;
133         struct rpc_rqst *req = task->tk_rqstp;
134
135         if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
136                 if (task == xprt->snd_task)
137                         return 1;
138                 goto out_sleep;
139         }
140         if (__xprt_get_cong(xprt, task)) {
141                 xprt->snd_task = task;
142                 if (req) {
143                         req->rq_bytes_sent = 0;
144                         req->rq_ntrans++;
145                 }
146                 return 1;
147         }
148         smp_mb__before_clear_bit();
149         clear_bit(XPRT_LOCKED, &xprt->state);
150         smp_mb__after_clear_bit();
151 out_sleep:
152         dprintk("RPC: %4d failed to lock transport %p\n", task->tk_pid, xprt);
153         task->tk_timeout = 0;
154         task->tk_status = -EAGAIN;
155         if (req && req->rq_ntrans)
156                 rpc_sleep_on(&xprt->resend, task, NULL, NULL);
157         else
158                 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
159         return 0;
160 }
161
162 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
163 {
164         int retval;
165
166         spin_lock_bh(&xprt->transport_lock);
167         retval = xprt->ops->reserve_xprt(task);
168         spin_unlock_bh(&xprt->transport_lock);
169         return retval;
170 }
171
172 static void __xprt_lock_write_next(struct rpc_xprt *xprt)
173 {
174         struct rpc_task *task;
175         struct rpc_rqst *req;
176
177         if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
178                 return;
179
180         task = rpc_wake_up_next(&xprt->resend);
181         if (!task) {
182                 task = rpc_wake_up_next(&xprt->sending);
183                 if (!task)
184                         goto out_unlock;
185         }
186
187         req = task->tk_rqstp;
188         xprt->snd_task = task;
189         if (req) {
190                 req->rq_bytes_sent = 0;
191                 req->rq_ntrans++;
192         }
193         return;
194
195 out_unlock:
196         smp_mb__before_clear_bit();
197         clear_bit(XPRT_LOCKED, &xprt->state);
198         smp_mb__after_clear_bit();
199 }
200
201 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
202 {
203         struct rpc_task *task;
204
205         if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
206                 return;
207         if (RPCXPRT_CONGESTED(xprt))
208                 goto out_unlock;
209         task = rpc_wake_up_next(&xprt->resend);
210         if (!task) {
211                 task = rpc_wake_up_next(&xprt->sending);
212                 if (!task)
213                         goto out_unlock;
214         }
215         if (__xprt_get_cong(xprt, task)) {
216                 struct rpc_rqst *req = task->tk_rqstp;
217                 xprt->snd_task = task;
218                 if (req) {
219                         req->rq_bytes_sent = 0;
220                         req->rq_ntrans++;
221                 }
222                 return;
223         }
224 out_unlock:
225         smp_mb__before_clear_bit();
226         clear_bit(XPRT_LOCKED, &xprt->state);
227         smp_mb__after_clear_bit();
228 }
229
230 /**
231  * xprt_release_xprt - allow other requests to use a transport
232  * @xprt: transport with other tasks potentially waiting
233  * @task: task that is releasing access to the transport
234  *
235  * Note that "task" can be NULL.  No congestion control is provided.
236  */
237 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
238 {
239         if (xprt->snd_task == task) {
240                 xprt->snd_task = NULL;
241                 smp_mb__before_clear_bit();
242                 clear_bit(XPRT_LOCKED, &xprt->state);
243                 smp_mb__after_clear_bit();
244                 __xprt_lock_write_next(xprt);
245         }
246 }
247
248 /**
249  * xprt_release_xprt_cong - allow other requests to use a transport
250  * @xprt: transport with other tasks potentially waiting
251  * @task: task that is releasing access to the transport
252  *
253  * Note that "task" can be NULL.  Another task is awoken to use the
254  * transport if the transport's congestion window allows it.
255  */
256 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
257 {
258         if (xprt->snd_task == task) {
259                 xprt->snd_task = NULL;
260                 smp_mb__before_clear_bit();
261                 clear_bit(XPRT_LOCKED, &xprt->state);
262                 smp_mb__after_clear_bit();
263                 __xprt_lock_write_next_cong(xprt);
264         }
265 }
266
267 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
268 {
269         spin_lock_bh(&xprt->transport_lock);
270         xprt->ops->release_xprt(xprt, task);
271         spin_unlock_bh(&xprt->transport_lock);
272 }
273
274 /*
275  * Van Jacobson congestion avoidance. Check if the congestion window
276  * overflowed. Put the task to sleep if this is the case.
277  */
278 static int
279 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
280 {
281         struct rpc_rqst *req = task->tk_rqstp;
282
283         if (req->rq_cong)
284                 return 1;
285         dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
286                         task->tk_pid, xprt->cong, xprt->cwnd);
287         if (RPCXPRT_CONGESTED(xprt))
288                 return 0;
289         req->rq_cong = 1;
290         xprt->cong += RPC_CWNDSCALE;
291         return 1;
292 }
293
294 /*
295  * Adjust the congestion window, and wake up the next task
296  * that has been sleeping due to congestion
297  */
298 static void
299 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
300 {
301         if (!req->rq_cong)
302                 return;
303         req->rq_cong = 0;
304         xprt->cong -= RPC_CWNDSCALE;
305         __xprt_lock_write_next_cong(xprt);
306 }
307
308 /**
309  * xprt_release_rqst_cong - housekeeping when request is complete
310  * @task: RPC request that recently completed
311  *
312  * Useful for transports that require congestion control.
313  */
314 void xprt_release_rqst_cong(struct rpc_task *task)
315 {
316         __xprt_put_cong(task->tk_xprt, task->tk_rqstp);
317 }
318
319 /**
320  * xprt_adjust_cwnd - adjust transport congestion window
321  * @task: recently completed RPC request used to adjust window
322  * @result: result code of completed RPC request
323  *
324  * We use a time-smoothed congestion estimator to avoid heavy oscillation.
325  */
326 void xprt_adjust_cwnd(struct rpc_task *task, int result)
327 {
328         struct rpc_rqst *req = task->tk_rqstp;
329         struct rpc_xprt *xprt = task->tk_xprt;
330         unsigned long cwnd = xprt->cwnd;
331
332         if (result >= 0 && cwnd <= xprt->cong) {
333                 /* The (cwnd >> 1) term makes sure
334                  * the result gets rounded properly. */
335                 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
336                 if (cwnd > RPC_MAXCWND(xprt))
337                         cwnd = RPC_MAXCWND(xprt);
338                 __xprt_lock_write_next_cong(xprt);
339         } else if (result == -ETIMEDOUT) {
340                 cwnd >>= 1;
341                 if (cwnd < RPC_CWNDSCALE)
342                         cwnd = RPC_CWNDSCALE;
343         }
344         dprintk("RPC:      cong %ld, cwnd was %ld, now %ld\n",
345                         xprt->cong, xprt->cwnd, cwnd);
346         xprt->cwnd = cwnd;
347         __xprt_put_cong(xprt, req);
348 }
349
350 /**
351  * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
352  * @xprt: transport with waiting tasks
353  * @status: result code to plant in each task before waking it
354  *
355  */
356 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
357 {
358         if (status < 0)
359                 rpc_wake_up_status(&xprt->pending, status);
360         else
361                 rpc_wake_up(&xprt->pending);
362 }
363
364 /**
365  * xprt_wait_for_buffer_space - wait for transport output buffer to clear
366  * @task: task to be put to sleep
367  *
368  */
369 void xprt_wait_for_buffer_space(struct rpc_task *task)
370 {
371         struct rpc_rqst *req = task->tk_rqstp;
372         struct rpc_xprt *xprt = req->rq_xprt;
373
374         task->tk_timeout = req->rq_timeout;
375         rpc_sleep_on(&xprt->pending, task, NULL, NULL);
376 }
377
378 /**
379  * xprt_write_space - wake the task waiting for transport output buffer space
380  * @xprt: transport with waiting tasks
381  *
382  * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
383  */
384 void xprt_write_space(struct rpc_xprt *xprt)
385 {
386         if (unlikely(xprt->shutdown))
387                 return;
388
389         spin_lock_bh(&xprt->transport_lock);
390         if (xprt->snd_task) {
391                 dprintk("RPC:      write space: waking waiting task on xprt %p\n",
392                                 xprt);
393                 rpc_wake_up_task(xprt->snd_task);
394         }
395         spin_unlock_bh(&xprt->transport_lock);
396 }
397
398 /**
399  * xprt_set_retrans_timeout_def - set a request's retransmit timeout
400  * @task: task whose timeout is to be set
401  *
402  * Set a request's retransmit timeout based on the transport's
403  * default timeout parameters.  Used by transports that don't adjust
404  * the retransmit timeout based on round-trip time estimation.
405  */
406 void xprt_set_retrans_timeout_def(struct rpc_task *task)
407 {
408         task->tk_timeout = task->tk_rqstp->rq_timeout;
409 }
410
411 /*
412  * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
413  * @task: task whose timeout is to be set
414  * 
415  * Set a request's retransmit timeout using the RTT estimator.
416  */
417 void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
418 {
419         int timer = task->tk_msg.rpc_proc->p_timer;
420         struct rpc_rtt *rtt = task->tk_client->cl_rtt;
421         struct rpc_rqst *req = task->tk_rqstp;
422         unsigned long max_timeout = req->rq_xprt->timeout.to_maxval;
423
424         task->tk_timeout = rpc_calc_rto(rtt, timer);
425         task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
426         if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
427                 task->tk_timeout = max_timeout;
428 }
429
430 static void xprt_reset_majortimeo(struct rpc_rqst *req)
431 {
432         struct rpc_timeout *to = &req->rq_xprt->timeout;
433
434         req->rq_majortimeo = req->rq_timeout;
435         if (to->to_exponential)
436                 req->rq_majortimeo <<= to->to_retries;
437         else
438                 req->rq_majortimeo += to->to_increment * to->to_retries;
439         if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
440                 req->rq_majortimeo = to->to_maxval;
441         req->rq_majortimeo += jiffies;
442 }
443
444 /**
445  * xprt_adjust_timeout - adjust timeout values for next retransmit
446  * @req: RPC request containing parameters to use for the adjustment
447  *
448  */
449 int xprt_adjust_timeout(struct rpc_rqst *req)
450 {
451         struct rpc_xprt *xprt = req->rq_xprt;
452         struct rpc_timeout *to = &xprt->timeout;
453         int status = 0;
454
455         if (time_before(jiffies, req->rq_majortimeo)) {
456                 if (to->to_exponential)
457                         req->rq_timeout <<= 1;
458                 else
459                         req->rq_timeout += to->to_increment;
460                 if (to->to_maxval && req->rq_timeout >= to->to_maxval)
461                         req->rq_timeout = to->to_maxval;
462                 req->rq_retries++;
463                 pprintk("RPC: %lu retrans\n", jiffies);
464         } else {
465                 req->rq_timeout = to->to_initval;
466                 req->rq_retries = 0;
467                 xprt_reset_majortimeo(req);
468                 /* Reset the RTT counters == "slow start" */
469                 spin_lock_bh(&xprt->transport_lock);
470                 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
471                 spin_unlock_bh(&xprt->transport_lock);
472                 pprintk("RPC: %lu timeout\n", jiffies);
473                 status = -ETIMEDOUT;
474         }
475
476         if (req->rq_timeout == 0) {
477                 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
478                 req->rq_timeout = 5 * HZ;
479         }
480         return status;
481 }
482
483 static void xprt_autoclose(void *args)
484 {
485         struct rpc_xprt *xprt = (struct rpc_xprt *)args;
486
487         xprt_disconnect(xprt);
488         xprt->ops->close(xprt);
489         xprt_release_write(xprt, NULL);
490 }
491
492 /**
493  * xprt_disconnect - mark a transport as disconnected
494  * @xprt: transport to flag for disconnect
495  *
496  */
497 void xprt_disconnect(struct rpc_xprt *xprt)
498 {
499         dprintk("RPC:      disconnected transport %p\n", xprt);
500         spin_lock_bh(&xprt->transport_lock);
501         xprt_clear_connected(xprt);
502         xprt_wake_pending_tasks(xprt, -ENOTCONN);
503         spin_unlock_bh(&xprt->transport_lock);
504 }
505
506 static void
507 xprt_init_autodisconnect(unsigned long data)
508 {
509         struct rpc_xprt *xprt = (struct rpc_xprt *)data;
510
511         spin_lock(&xprt->transport_lock);
512         if (!list_empty(&xprt->recv) || xprt->shutdown)
513                 goto out_abort;
514         if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
515                 goto out_abort;
516         spin_unlock(&xprt->transport_lock);
517         if (xprt_connecting(xprt))
518                 xprt_release_write(xprt, NULL);
519         else
520                 schedule_work(&xprt->task_cleanup);
521         return;
522 out_abort:
523         spin_unlock(&xprt->transport_lock);
524 }
525
526 /**
527  * xprt_connect - schedule a transport connect operation
528  * @task: RPC task that is requesting the connect
529  *
530  */
531 void xprt_connect(struct rpc_task *task)
532 {
533         struct rpc_xprt *xprt = task->tk_xprt;
534
535         dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid,
536                         xprt, (xprt_connected(xprt) ? "is" : "is not"));
537
538         if (xprt->shutdown) {
539                 task->tk_status = -EIO;
540                 return;
541         }
542         if (!xprt->addr.sin_port) {
543                 task->tk_status = -EIO;
544                 return;
545         }
546         if (!xprt_lock_write(xprt, task))
547                 return;
548         if (xprt_connected(xprt))
549                 xprt_release_write(xprt, task);
550         else {
551                 if (task->tk_rqstp)
552                         task->tk_rqstp->rq_bytes_sent = 0;
553
554                 task->tk_timeout = xprt->connect_timeout;
555                 rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL);
556                 xprt->ops->connect(task);
557         }
558         return;
559 }
560
561 static void xprt_connect_status(struct rpc_task *task)
562 {
563         struct rpc_xprt *xprt = task->tk_xprt;
564
565         if (task->tk_status >= 0) {
566                 dprintk("RPC: %4d xprt_connect_status: connection established\n",
567                                 task->tk_pid);
568                 return;
569         }
570
571         switch (task->tk_status) {
572         case -ECONNREFUSED:
573         case -ECONNRESET:
574                 dprintk("RPC: %4d xprt_connect_status: server %s refused connection\n",
575                                 task->tk_pid, task->tk_client->cl_server);
576                 break;
577         case -ENOTCONN:
578                 dprintk("RPC: %4d xprt_connect_status: connection broken\n",
579                                 task->tk_pid);
580                 break;
581         case -ETIMEDOUT:
582                 dprintk("RPC: %4d xprt_connect_status: connect attempt timed out\n",
583                                 task->tk_pid);
584                 break;
585         default:
586                 dprintk("RPC: %4d xprt_connect_status: error %d connecting to server %s\n",
587                                 task->tk_pid, -task->tk_status, task->tk_client->cl_server);
588                 xprt_release_write(xprt, task);
589                 task->tk_status = -EIO;
590                 return;
591         }
592
593         /* if soft mounted, just cause this RPC to fail */
594         if (RPC_IS_SOFT(task)) {
595                 xprt_release_write(xprt, task);
596                 task->tk_status = -EIO;
597         }
598 }
599
600 /**
601  * xprt_lookup_rqst - find an RPC request corresponding to an XID
602  * @xprt: transport on which the original request was transmitted
603  * @xid: RPC XID of incoming reply
604  *
605  */
606 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
607 {
608         struct list_head *pos;
609         struct rpc_rqst *req = NULL;
610
611         list_for_each(pos, &xprt->recv) {
612                 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
613                 if (entry->rq_xid == xid) {
614                         req = entry;
615                         break;
616                 }
617         }
618         return req;
619 }
620
621 /**
622  * xprt_update_rtt - update an RPC client's RTT state after receiving a reply
623  * @task: RPC request that recently completed
624  *
625  */
626 void xprt_update_rtt(struct rpc_task *task)
627 {
628         struct rpc_rqst *req = task->tk_rqstp;
629         struct rpc_rtt *rtt = task->tk_client->cl_rtt;
630         unsigned timer = task->tk_msg.rpc_proc->p_timer;
631
632         if (timer) {
633                 if (req->rq_ntrans == 1)
634                         rpc_update_rtt(rtt, timer,
635                                         (long)jiffies - req->rq_xtime);
636                 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
637         }
638 }
639
640 /**
641  * xprt_complete_rqst - called when reply processing is complete
642  * @task: RPC request that recently completed
643  * @copied: actual number of bytes received from the transport
644  *
645  * Caller holds transport lock.
646  */
647 void xprt_complete_rqst(struct rpc_task *task, int copied)
648 {
649         struct rpc_rqst *req = task->tk_rqstp;
650
651         dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
652                         task->tk_pid, ntohl(req->rq_xid), copied);
653
654         list_del_init(&req->rq_list);
655         req->rq_received = req->rq_private_buf.len = copied;
656         rpc_wake_up_task(task);
657 }
658
659 static void xprt_timer(struct rpc_task *task)
660 {
661         struct rpc_rqst *req = task->tk_rqstp;
662         struct rpc_xprt *xprt = req->rq_xprt;
663
664         dprintk("RPC: %4d xprt_timer\n", task->tk_pid);
665
666         spin_lock(&xprt->transport_lock);
667         if (!req->rq_received) {
668                 if (xprt->ops->timer)
669                         xprt->ops->timer(task);
670                 task->tk_status = -ETIMEDOUT;
671         }
672         task->tk_timeout = 0;
673         rpc_wake_up_task(task);
674         spin_unlock(&xprt->transport_lock);
675 }
676
677 /**
678  * xprt_prepare_transmit - reserve the transport before sending a request
679  * @task: RPC task about to send a request
680  *
681  */
682 int xprt_prepare_transmit(struct rpc_task *task)
683 {
684         struct rpc_rqst *req = task->tk_rqstp;
685         struct rpc_xprt *xprt = req->rq_xprt;
686         int err = 0;
687
688         dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid);
689
690         if (xprt->shutdown)
691                 return -EIO;
692
693         spin_lock_bh(&xprt->transport_lock);
694         if (req->rq_received && !req->rq_bytes_sent) {
695                 err = req->rq_received;
696                 goto out_unlock;
697         }
698         if (!xprt->ops->reserve_xprt(task)) {
699                 err = -EAGAIN;
700                 goto out_unlock;
701         }
702
703         if (!xprt_connected(xprt)) {
704                 err = -ENOTCONN;
705                 goto out_unlock;
706         }
707 out_unlock:
708         spin_unlock_bh(&xprt->transport_lock);
709         return err;
710 }
711
712 void
713 xprt_abort_transmit(struct rpc_task *task)
714 {
715         struct rpc_xprt *xprt = task->tk_xprt;
716
717         xprt_release_write(xprt, task);
718 }
719
720 /**
721  * xprt_transmit - send an RPC request on a transport
722  * @task: controlling RPC task
723  *
724  * We have to copy the iovec because sendmsg fiddles with its contents.
725  */
726 void xprt_transmit(struct rpc_task *task)
727 {
728         struct rpc_rqst *req = task->tk_rqstp;
729         struct rpc_xprt *xprt = req->rq_xprt;
730         int status;
731
732         dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
733
734         smp_rmb();
735         if (!req->rq_received) {
736                 if (list_empty(&req->rq_list)) {
737                         spin_lock_bh(&xprt->transport_lock);
738                         /* Update the softirq receive buffer */
739                         memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
740                                         sizeof(req->rq_private_buf));
741                         /* Add request to the receive list */
742                         list_add_tail(&req->rq_list, &xprt->recv);
743                         spin_unlock_bh(&xprt->transport_lock);
744                         xprt_reset_majortimeo(req);
745                         /* Turn off autodisconnect */
746                         del_singleshot_timer_sync(&xprt->timer);
747                 }
748         } else if (!req->rq_bytes_sent)
749                 return;
750
751         status = xprt->ops->send_request(task);
752         if (status == 0) {
753                 dprintk("RPC: %4d xmit complete\n", task->tk_pid);
754                 spin_lock_bh(&xprt->transport_lock);
755                 xprt->ops->set_retrans_timeout(task);
756                 /* Don't race with disconnect */
757                 if (!xprt_connected(xprt))
758                         task->tk_status = -ENOTCONN;
759                 else if (!req->rq_received)
760                         rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
761                 xprt->ops->release_xprt(xprt, task);
762                 spin_unlock_bh(&xprt->transport_lock);
763                 return;
764         }
765
766         /* Note: at this point, task->tk_sleeping has not yet been set,
767          *       hence there is no danger of the waking up task being put on
768          *       schedq, and being picked up by a parallel run of rpciod().
769          */
770         task->tk_status = status;
771
772         switch (status) {
773         case -ECONNREFUSED:
774                 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
775         case -EAGAIN:
776         case -ENOTCONN:
777                 return;
778         default:
779                 break;
780         }
781         xprt_release_write(xprt, task);
782         return;
783 }
784
785 static inline void do_xprt_reserve(struct rpc_task *task)
786 {
787         struct rpc_xprt *xprt = task->tk_xprt;
788
789         task->tk_status = 0;
790         if (task->tk_rqstp)
791                 return;
792         if (!list_empty(&xprt->free)) {
793                 struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
794                 list_del_init(&req->rq_list);
795                 task->tk_rqstp = req;
796                 xprt_request_init(task, xprt);
797                 return;
798         }
799         dprintk("RPC:      waiting for request slot\n");
800         task->tk_status = -EAGAIN;
801         task->tk_timeout = 0;
802         rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
803 }
804
805 /**
806  * xprt_reserve - allocate an RPC request slot
807  * @task: RPC task requesting a slot allocation
808  *
809  * If no more slots are available, place the task on the transport's
810  * backlog queue.
811  */
812 void xprt_reserve(struct rpc_task *task)
813 {
814         struct rpc_xprt *xprt = task->tk_xprt;
815
816         task->tk_status = -EIO;
817         if (!xprt->shutdown) {
818                 spin_lock(&xprt->reserve_lock);
819                 do_xprt_reserve(task);
820                 spin_unlock(&xprt->reserve_lock);
821         }
822 }
823
824 static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt)
825 {
826         return xprt->xid++;
827 }
828
829 static inline void xprt_init_xid(struct rpc_xprt *xprt)
830 {
831         get_random_bytes(&xprt->xid, sizeof(xprt->xid));
832 }
833
834 static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
835 {
836         struct rpc_rqst *req = task->tk_rqstp;
837
838         req->rq_timeout = xprt->timeout.to_initval;
839         req->rq_task    = task;
840         req->rq_xprt    = xprt;
841         req->rq_xid     = xprt_alloc_xid(xprt);
842         req->rq_release_snd_buf = NULL;
843         dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
844                         req, ntohl(req->rq_xid));
845 }
846
847 /**
848  * xprt_release - release an RPC request slot
849  * @task: task which is finished with the slot
850  *
851  */
852 void xprt_release(struct rpc_task *task)
853 {
854         struct rpc_xprt *xprt = task->tk_xprt;
855         struct rpc_rqst *req;
856
857         if (!(req = task->tk_rqstp))
858                 return;
859         spin_lock_bh(&xprt->transport_lock);
860         xprt->ops->release_xprt(xprt, task);
861         if (xprt->ops->release_request)
862                 xprt->ops->release_request(task);
863         if (!list_empty(&req->rq_list))
864                 list_del(&req->rq_list);
865         xprt->last_used = jiffies;
866         if (list_empty(&xprt->recv) && !xprt->shutdown)
867                 mod_timer(&xprt->timer,
868                                 xprt->last_used + xprt->idle_timeout);
869         spin_unlock_bh(&xprt->transport_lock);
870         task->tk_rqstp = NULL;
871         if (req->rq_release_snd_buf)
872                 req->rq_release_snd_buf(req);
873         memset(req, 0, sizeof(*req));   /* mark unused */
874
875         dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
876
877         spin_lock(&xprt->reserve_lock);
878         list_add(&req->rq_list, &xprt->free);
879         rpc_wake_up_next(&xprt->backlog);
880         spin_unlock(&xprt->reserve_lock);
881 }
882
883 /**
884  * xprt_set_timeout - set constant RPC timeout
885  * @to: RPC timeout parameters to set up
886  * @retr: number of retries
887  * @incr: amount of increase after each retry
888  *
889  */
890 void xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
891 {
892         to->to_initval   = 
893         to->to_increment = incr;
894         to->to_maxval    = to->to_initval + (incr * retr);
895         to->to_retries   = retr;
896         to->to_exponential = 0;
897 }
898
899 static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
900 {
901         int result;
902         struct rpc_xprt *xprt;
903         struct rpc_rqst *req;
904
905         if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
906                 return ERR_PTR(-ENOMEM);
907         memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
908
909         xprt->addr = *ap;
910
911         switch (proto) {
912         case IPPROTO_UDP:
913                 result = xs_setup_udp(xprt, to);
914                 break;
915         case IPPROTO_TCP:
916                 result = xs_setup_tcp(xprt, to);
917                 break;
918         default:
919                 printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n",
920                                 proto);
921                 result = -EIO;
922                 break;
923         }
924         if (result) {
925                 kfree(xprt);
926                 return ERR_PTR(result);
927         }
928
929         spin_lock_init(&xprt->transport_lock);
930         spin_lock_init(&xprt->reserve_lock);
931
932         INIT_LIST_HEAD(&xprt->free);
933         INIT_LIST_HEAD(&xprt->recv);
934         INIT_WORK(&xprt->task_cleanup, xprt_autoclose, xprt);
935         init_timer(&xprt->timer);
936         xprt->timer.function = xprt_init_autodisconnect;
937         xprt->timer.data = (unsigned long) xprt;
938         xprt->last_used = jiffies;
939         xprt->cwnd = RPC_INITCWND;
940
941         rpc_init_wait_queue(&xprt->pending, "xprt_pending");
942         rpc_init_wait_queue(&xprt->sending, "xprt_sending");
943         rpc_init_wait_queue(&xprt->resend, "xprt_resend");
944         rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
945
946         /* initialize free list */
947         for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
948                 list_add(&req->rq_list, &xprt->free);
949
950         xprt_init_xid(xprt);
951
952         dprintk("RPC:      created transport %p with %u slots\n", xprt,
953                         xprt->max_reqs);
954         
955         return xprt;
956 }
957
958 /**
959  * xprt_create_proto - create an RPC client transport
960  * @proto: requested transport protocol
961  * @sap: remote peer's address
962  * @to: timeout parameters for new transport
963  *
964  */
965 struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
966 {
967         struct rpc_xprt *xprt;
968
969         xprt = xprt_setup(proto, sap, to);
970         if (IS_ERR(xprt))
971                 dprintk("RPC:      xprt_create_proto failed\n");
972         else
973                 dprintk("RPC:      xprt_create_proto created xprt %p\n", xprt);
974         return xprt;
975 }
976
977 static void xprt_shutdown(struct rpc_xprt *xprt)
978 {
979         xprt->shutdown = 1;
980         rpc_wake_up(&xprt->sending);
981         rpc_wake_up(&xprt->resend);
982         xprt_wake_pending_tasks(xprt, -EIO);
983         rpc_wake_up(&xprt->backlog);
984         del_timer_sync(&xprt->timer);
985 }
986
987 /**
988  * xprt_destroy - destroy an RPC transport, killing off all requests.
989  * @xprt: transport to destroy
990  *
991  */
992 int xprt_destroy(struct rpc_xprt *xprt)
993 {
994         dprintk("RPC:      destroying transport %p\n", xprt);
995         xprt_shutdown(xprt);
996         xprt->ops->destroy(xprt);
997         kfree(xprt);
998
999         return 0;
1000 }