1 /* call.c: Rx call routines
3 * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
12 #include <linux/sched.h>
13 #include <linux/slab.h>
14 #include <linux/module.h>
15 #include <rxrpc/rxrpc.h>
16 #include <rxrpc/transport.h>
17 #include <rxrpc/peer.h>
18 #include <rxrpc/connection.h>
19 #include <rxrpc/call.h>
20 #include <rxrpc/message.h>
23 __RXACCT_DECL(atomic_t rxrpc_call_count);
24 __RXACCT_DECL(atomic_t rxrpc_message_count);
26 LIST_HEAD(rxrpc_calls);
27 DECLARE_RWSEM(rxrpc_calls_sem);
29 unsigned rxrpc_call_rcv_timeout = HZ/3;
30 static unsigned rxrpc_call_acks_timeout = HZ/3;
31 static unsigned rxrpc_call_dfr_ack_timeout = HZ/20;
32 static unsigned short rxrpc_call_max_resend = HZ/10;
34 const char *rxrpc_call_states[] = {
47 const char *rxrpc_call_error_states[] = {
55 const char *rxrpc_pkts[] = {
57 "data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug",
58 "?09", "?10", "?11", "?12", "?13", "?14", "?15"
61 static const char *rxrpc_acks[] = {
62 "---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",
66 static const char _acktype[] = "NA-";
68 static void rxrpc_call_receive_packet(struct rxrpc_call *call);
69 static void rxrpc_call_receive_data_packet(struct rxrpc_call *call,
70 struct rxrpc_message *msg);
71 static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call,
72 struct rxrpc_message *msg);
73 static void rxrpc_call_definitively_ACK(struct rxrpc_call *call,
75 static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest);
76 static int __rxrpc_call_read_data(struct rxrpc_call *call);
78 static int rxrpc_call_record_ACK(struct rxrpc_call *call,
79 struct rxrpc_message *msg,
83 static int rxrpc_call_flush(struct rxrpc_call *call);
85 #define _state(call) \
86 _debug("[[[ state %s ]]]", rxrpc_call_states[call->app_call_state]);
88 static void rxrpc_call_default_attn_func(struct rxrpc_call *call)
90 wake_up(&call->waitq);
93 static void rxrpc_call_default_error_func(struct rxrpc_call *call)
95 wake_up(&call->waitq);
98 static void rxrpc_call_default_aemap_func(struct rxrpc_call *call)
100 switch (call->app_err_state) {
101 case RXRPC_ESTATE_LOCAL_ABORT:
102 call->app_abort_code = -call->app_errno;
103 case RXRPC_ESTATE_PEER_ABORT:
104 call->app_errno = -ECONNABORTED;
110 static void __rxrpc_call_acks_timeout(unsigned long _call)
112 struct rxrpc_call *call = (struct rxrpc_call *) _call;
114 _debug("ACKS TIMEOUT %05lu", jiffies - call->cjif);
116 call->flags |= RXRPC_CALL_ACKS_TIMO;
117 rxrpc_krxiod_queue_call(call);
120 static void __rxrpc_call_rcv_timeout(unsigned long _call)
122 struct rxrpc_call *call = (struct rxrpc_call *) _call;
124 _debug("RCV TIMEOUT %05lu", jiffies - call->cjif);
126 call->flags |= RXRPC_CALL_RCV_TIMO;
127 rxrpc_krxiod_queue_call(call);
130 static void __rxrpc_call_ackr_timeout(unsigned long _call)
132 struct rxrpc_call *call = (struct rxrpc_call *) _call;
134 _debug("ACKR TIMEOUT %05lu",jiffies - call->cjif);
136 call->flags |= RXRPC_CALL_ACKR_TIMO;
137 rxrpc_krxiod_queue_call(call);
140 /*****************************************************************************/
142 * calculate a timeout based on an RTT value
144 static inline unsigned long __rxrpc_rtt_based_timeout(struct rxrpc_call *call,
147 unsigned long expiry = call->conn->peer->rtt / (1000000 / HZ);
150 if (expiry < HZ / 25)
155 _leave(" = %lu jiffies", expiry);
156 return jiffies + expiry;
157 } /* end __rxrpc_rtt_based_timeout() */
159 /*****************************************************************************/
161 * create a new call record
163 static inline int __rxrpc_create_call(struct rxrpc_connection *conn,
164 struct rxrpc_call **_call)
166 struct rxrpc_call *call;
170 /* allocate and initialise a call record */
171 call = (struct rxrpc_call *) get_zeroed_page(GFP_KERNEL);
177 atomic_set(&call->usage, 1);
179 init_waitqueue_head(&call->waitq);
180 spin_lock_init(&call->lock);
181 INIT_LIST_HEAD(&call->link);
182 INIT_LIST_HEAD(&call->acks_pendq);
183 INIT_LIST_HEAD(&call->rcv_receiveq);
184 INIT_LIST_HEAD(&call->rcv_krxiodq_lk);
185 INIT_LIST_HEAD(&call->app_readyq);
186 INIT_LIST_HEAD(&call->app_unreadyq);
187 INIT_LIST_HEAD(&call->app_link);
188 INIT_LIST_HEAD(&call->app_attn_link);
190 init_timer(&call->acks_timeout);
191 call->acks_timeout.data = (unsigned long) call;
192 call->acks_timeout.function = __rxrpc_call_acks_timeout;
194 init_timer(&call->rcv_timeout);
195 call->rcv_timeout.data = (unsigned long) call;
196 call->rcv_timeout.function = __rxrpc_call_rcv_timeout;
198 init_timer(&call->ackr_dfr_timo);
199 call->ackr_dfr_timo.data = (unsigned long) call;
200 call->ackr_dfr_timo.function = __rxrpc_call_ackr_timeout;
203 call->ackr_win_bot = 1;
204 call->ackr_win_top = call->ackr_win_bot + RXRPC_CALL_ACK_WINDOW_SIZE - 1;
205 call->ackr_prev_seq = 0;
206 call->app_mark = RXRPC_APP_MARK_EOF;
207 call->app_attn_func = rxrpc_call_default_attn_func;
208 call->app_error_func = rxrpc_call_default_error_func;
209 call->app_aemap_func = rxrpc_call_default_aemap_func;
210 call->app_scr_alloc = call->app_scratch;
212 call->cjif = jiffies;
214 _leave(" = 0 (%p)", call);
219 } /* end __rxrpc_create_call() */
221 /*****************************************************************************/
223 * create a new call record for outgoing calls
225 int rxrpc_create_call(struct rxrpc_connection *conn,
226 rxrpc_call_attn_func_t attn,
227 rxrpc_call_error_func_t error,
228 rxrpc_call_aemap_func_t aemap,
229 struct rxrpc_call **_call)
231 DECLARE_WAITQUEUE(myself, current);
233 struct rxrpc_call *call;
238 /* allocate and initialise a call record */
239 ret = __rxrpc_create_call(conn, &call);
241 _leave(" = %d", ret);
245 call->app_call_state = RXRPC_CSTATE_CLNT_SND_ARGS;
247 call->app_attn_func = attn;
249 call->app_error_func = error;
251 call->app_aemap_func = aemap;
255 spin_lock(&conn->lock);
256 set_current_state(TASK_INTERRUPTIBLE);
257 add_wait_queue(&conn->chanwait, &myself);
260 /* try to find an unused channel */
261 for (cix = 0; cix < 4; cix++)
262 if (!conn->channels[cix])
265 /* no free channels - wait for one to become available */
267 if (signal_pending(current))
270 spin_unlock(&conn->lock);
273 set_current_state(TASK_INTERRUPTIBLE);
275 spin_lock(&conn->lock);
278 /* got a channel - now attach to the connection */
280 remove_wait_queue(&conn->chanwait, &myself);
281 set_current_state(TASK_RUNNING);
283 /* concoct a unique call number */
285 call->call_id = htonl(++conn->call_counter);
286 for (loop = 0; loop < 4; loop++)
287 if (conn->channels[loop] &&
288 conn->channels[loop]->call_id == call->call_id)
291 rxrpc_get_connection(conn);
292 conn->channels[cix] = call; /* assign _after_ done callid check loop */
293 do_gettimeofday(&conn->atime);
294 call->chan_ix = htonl(cix);
296 spin_unlock(&conn->lock);
298 down_write(&rxrpc_calls_sem);
299 list_add_tail(&call->call_link, &rxrpc_calls);
300 up_write(&rxrpc_calls_sem);
302 __RXACCT(atomic_inc(&rxrpc_call_count));
305 _leave(" = 0 (call=%p cix=%u)", call, cix);
309 remove_wait_queue(&conn->chanwait, &myself);
310 set_current_state(TASK_RUNNING);
311 spin_unlock(&conn->lock);
313 free_page((unsigned long) call);
314 _leave(" = %d", ret);
316 } /* end rxrpc_create_call() */
318 /*****************************************************************************/
320 * create a new call record for incoming calls
322 int rxrpc_incoming_call(struct rxrpc_connection *conn,
323 struct rxrpc_message *msg,
324 struct rxrpc_call **_call)
326 struct rxrpc_call *call;
330 cix = ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK;
332 _enter("%p,%u,%u", conn, ntohl(msg->hdr.callNumber), cix);
334 /* allocate and initialise a call record */
335 ret = __rxrpc_create_call(conn, &call);
337 _leave(" = %d", ret);
341 call->pkt_rcv_count = 1;
342 call->app_call_state = RXRPC_CSTATE_SRVR_RCV_OPID;
343 call->app_mark = sizeof(uint32_t);
347 /* attach to the connection */
349 call->chan_ix = htonl(cix);
350 call->call_id = msg->hdr.callNumber;
352 spin_lock(&conn->lock);
354 if (!conn->channels[cix] ||
355 conn->channels[cix]->app_call_state == RXRPC_CSTATE_COMPLETE ||
356 conn->channels[cix]->app_call_state == RXRPC_CSTATE_ERROR
358 conn->channels[cix] = call;
359 rxrpc_get_connection(conn);
363 spin_unlock(&conn->lock);
366 free_page((unsigned long) call);
371 down_write(&rxrpc_calls_sem);
372 list_add_tail(&call->call_link, &rxrpc_calls);
373 up_write(&rxrpc_calls_sem);
374 __RXACCT(atomic_inc(&rxrpc_call_count));
378 _leave(" = %d [%p]", ret, call);
380 } /* end rxrpc_incoming_call() */
382 /*****************************************************************************/
386 void rxrpc_put_call(struct rxrpc_call *call)
388 struct rxrpc_connection *conn = call->conn;
389 struct rxrpc_message *msg;
391 _enter("%p{u=%d}",call,atomic_read(&call->usage));
394 if (atomic_read(&call->usage) <= 0)
397 /* to prevent a race, the decrement and the de-list must be effectively
399 spin_lock(&conn->lock);
400 if (likely(!atomic_dec_and_test(&call->usage))) {
401 spin_unlock(&conn->lock);
406 if (conn->channels[ntohl(call->chan_ix)] == call)
407 conn->channels[ntohl(call->chan_ix)] = NULL;
409 spin_unlock(&conn->lock);
411 wake_up(&conn->chanwait);
413 rxrpc_put_connection(conn);
415 /* clear the timers and dequeue from krxiod */
416 del_timer_sync(&call->acks_timeout);
417 del_timer_sync(&call->rcv_timeout);
418 del_timer_sync(&call->ackr_dfr_timo);
420 rxrpc_krxiod_dequeue_call(call);
422 /* clean up the contents of the struct */
423 if (call->snd_nextmsg)
424 rxrpc_put_message(call->snd_nextmsg);
427 rxrpc_put_message(call->snd_ping);
429 while (!list_empty(&call->acks_pendq)) {
430 msg = list_entry(call->acks_pendq.next,
431 struct rxrpc_message, link);
432 list_del(&msg->link);
433 rxrpc_put_message(msg);
436 while (!list_empty(&call->rcv_receiveq)) {
437 msg = list_entry(call->rcv_receiveq.next,
438 struct rxrpc_message, link);
439 list_del(&msg->link);
440 rxrpc_put_message(msg);
443 while (!list_empty(&call->app_readyq)) {
444 msg = list_entry(call->app_readyq.next,
445 struct rxrpc_message, link);
446 list_del(&msg->link);
447 rxrpc_put_message(msg);
450 while (!list_empty(&call->app_unreadyq)) {
451 msg = list_entry(call->app_unreadyq.next,
452 struct rxrpc_message, link);
453 list_del(&msg->link);
454 rxrpc_put_message(msg);
457 module_put(call->owner);
459 down_write(&rxrpc_calls_sem);
460 list_del(&call->call_link);
461 up_write(&rxrpc_calls_sem);
463 __RXACCT(atomic_dec(&rxrpc_call_count));
464 free_page((unsigned long) call);
466 _leave(" [destroyed]");
467 } /* end rxrpc_put_call() */
469 /*****************************************************************************/
471 * actually generate a normal ACK
473 static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call *call,
476 struct rxrpc_message *msg;
481 /* ACKs default to DELAY */
482 if (!call->ackr.reason)
483 call->ackr.reason = RXRPC_ACK_DELAY;
485 _proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
486 jiffies - call->cjif,
487 ntohs(call->ackr.maxSkew),
488 ntohl(call->ackr.firstPacket),
489 ntohl(call->ackr.previousPacket),
490 ntohl(call->ackr.serial),
491 rxrpc_acks[call->ackr.reason],
494 aux[0] = htonl(call->conn->peer->if_mtu); /* interface MTU */
495 aux[1] = htonl(1444); /* max MTU */
496 aux[2] = htonl(16); /* rwind */
497 aux[3] = htonl(4); /* max packets */
499 diov[0].iov_len = sizeof(struct rxrpc_ackpacket);
500 diov[0].iov_base = &call->ackr;
501 diov[1].iov_len = call->ackr_pend_cnt + 3;
502 diov[1].iov_base = call->ackr_array;
503 diov[2].iov_len = sizeof(aux);
504 diov[2].iov_base = &aux;
506 /* build and send the message */
507 ret = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK,
508 3, diov, GFP_KERNEL, &msg);
513 msg->hdr.seq = htonl(seq);
514 msg->hdr.flags |= RXRPC_SLOW_START_OK;
516 ret = rxrpc_conn_sendmsg(call->conn, msg);
517 rxrpc_put_message(msg);
520 call->pkt_snd_count++;
522 /* count how many actual ACKs there were at the front */
523 for (delta = 0; delta < call->ackr_pend_cnt; delta++)
524 if (call->ackr_array[delta] != RXRPC_ACK_TYPE_ACK)
527 call->ackr_pend_cnt -= delta; /* all ACK'd to this point */
529 /* crank the ACK window around */
531 /* un-ACK'd window */
533 else if (delta < RXRPC_CALL_ACK_WINDOW_SIZE) {
534 /* partially ACK'd window
535 * - shuffle down to avoid losing out-of-sequence packets
537 call->ackr_win_bot += delta;
538 call->ackr_win_top += delta;
540 memmove(&call->ackr_array[0],
541 &call->ackr_array[delta],
542 call->ackr_pend_cnt);
544 memset(&call->ackr_array[call->ackr_pend_cnt],
546 sizeof(call->ackr_array) - call->ackr_pend_cnt);
549 /* fully ACK'd window
550 * - just clear the whole thing
552 memset(&call->ackr_array,
554 sizeof(call->ackr_array));
558 memset(&call->ackr, 0, sizeof(call->ackr));
561 if (!call->app_call_state)
562 printk("___ STATE 0 ___\n");
564 } /* end __rxrpc_call_gen_normal_ACK() */
566 /*****************************************************************************/
568 * note the reception of a packet in the call's ACK records and generate an
569 * appropriate ACK packet if necessary
570 * - returns 0 if packet should be processed, 1 if packet should be ignored
571 * and -ve on an error
573 static int rxrpc_call_generate_ACK(struct rxrpc_call *call,
574 struct rxrpc_header *hdr,
575 struct rxrpc_ackpacket *ack)
577 struct rxrpc_message *msg;
581 u8 special_ACK, do_ACK, force;
583 _enter("%p,%p { seq=%d tp=%d fl=%02x }",
584 call, hdr, ntohl(hdr->seq), hdr->type, hdr->flags);
586 seq = ntohl(hdr->seq);
587 offset = seq - call->ackr_win_bot;
588 do_ACK = RXRPC_ACK_DELAY;
592 if (call->ackr_high_seq < seq)
593 call->ackr_high_seq = seq;
595 /* deal with generation of obvious special ACKs first */
596 if (ack && ack->reason == RXRPC_ACK_PING) {
597 special_ACK = RXRPC_ACK_PING_RESPONSE;
602 if (seq < call->ackr_win_bot) {
603 special_ACK = RXRPC_ACK_DUPLICATE;
608 if (seq >= call->ackr_win_top) {
609 special_ACK = RXRPC_ACK_EXCEEDS_WINDOW;
614 if (call->ackr_array[offset] != RXRPC_ACK_TYPE_NACK) {
615 special_ACK = RXRPC_ACK_DUPLICATE;
620 /* okay... it's a normal data packet inside the ACK window */
621 call->ackr_array[offset] = RXRPC_ACK_TYPE_ACK;
623 if (offset < call->ackr_pend_cnt) {
625 else if (offset > call->ackr_pend_cnt) {
626 do_ACK = RXRPC_ACK_OUT_OF_SEQUENCE;
627 call->ackr_pend_cnt = offset;
631 if (hdr->flags & RXRPC_REQUEST_ACK) {
632 do_ACK = RXRPC_ACK_REQUESTED;
635 /* generate an ACK on the final packet of a reply just received */
636 if (hdr->flags & RXRPC_LAST_PACKET) {
637 if (call->conn->out_clientflag)
640 else if (!(hdr->flags & RXRPC_MORE_PACKETS)) {
641 do_ACK = RXRPC_ACK_REQUESTED;
644 /* re-ACK packets previously received out-of-order */
645 for (offset++; offset < RXRPC_CALL_ACK_WINDOW_SIZE; offset++)
646 if (call->ackr_array[offset] != RXRPC_ACK_TYPE_ACK)
649 call->ackr_pend_cnt = offset;
651 /* generate an ACK if we fill up the window */
652 if (call->ackr_pend_cnt >= RXRPC_CALL_ACK_WINDOW_SIZE)
656 _debug("%05lu ACKs pend=%u norm=%s special=%s%s",
657 jiffies - call->cjif,
660 rxrpc_acks[special_ACK],
661 force ? " immediate" :
662 do_ACK == RXRPC_ACK_REQUESTED ? " merge-req" :
663 hdr->flags & RXRPC_LAST_PACKET ? " finalise" :
667 /* send any pending normal ACKs if need be */
668 if (call->ackr_pend_cnt > 0) {
669 /* fill out the appropriate form */
670 call->ackr.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE);
671 call->ackr.maxSkew = htons(min(call->ackr_high_seq - seq,
673 call->ackr.firstPacket = htonl(call->ackr_win_bot);
674 call->ackr.previousPacket = call->ackr_prev_seq;
675 call->ackr.serial = hdr->serial;
676 call->ackr.nAcks = call->ackr_pend_cnt;
678 if (do_ACK == RXRPC_ACK_REQUESTED)
679 call->ackr.reason = do_ACK;
681 /* generate the ACK immediately if necessary */
682 if (special_ACK || force) {
683 err = __rxrpc_call_gen_normal_ACK(
684 call, do_ACK == RXRPC_ACK_DELAY ? 0 : seq);
692 if (call->ackr.reason == RXRPC_ACK_REQUESTED)
693 call->ackr_dfr_seq = seq;
695 /* start the ACK timer if not running if there are any pending deferred
697 if (call->ackr_pend_cnt > 0 &&
698 call->ackr.reason != RXRPC_ACK_REQUESTED &&
699 !timer_pending(&call->ackr_dfr_timo)
703 timo = rxrpc_call_dfr_ack_timeout + jiffies;
705 _debug("START ACKR TIMER for cj=%lu", timo - call->cjif);
707 spin_lock(&call->lock);
708 mod_timer(&call->ackr_dfr_timo, timo);
709 spin_unlock(&call->lock);
711 else if ((call->ackr_pend_cnt == 0 ||
712 call->ackr.reason == RXRPC_ACK_REQUESTED) &&
713 timer_pending(&call->ackr_dfr_timo)
715 /* stop timer if no pending ACKs */
716 _debug("CLEAR ACKR TIMER");
717 del_timer_sync(&call->ackr_dfr_timo);
720 /* send a special ACK if one is required */
722 struct rxrpc_ackpacket ack;
724 uint8_t acks[1] = { RXRPC_ACK_TYPE_ACK };
726 /* fill out the appropriate form */
727 ack.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE);
728 ack.maxSkew = htons(min(call->ackr_high_seq - seq,
730 ack.firstPacket = htonl(call->ackr_win_bot);
731 ack.previousPacket = call->ackr_prev_seq;
732 ack.serial = hdr->serial;
733 ack.reason = special_ACK;
736 _proto("Rx Sending s-ACK"
737 " { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
739 ntohl(ack.firstPacket),
740 ntohl(ack.previousPacket),
742 rxrpc_acks[ack.reason],
745 diov[0].iov_len = sizeof(struct rxrpc_ackpacket);
746 diov[0].iov_base = &ack;
747 diov[1].iov_len = sizeof(acks);
748 diov[1].iov_base = acks;
750 /* build and send the message */
751 err = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK,
752 hdr->seq ? 2 : 1, diov,
761 msg->hdr.seq = htonl(seq);
762 msg->hdr.flags |= RXRPC_SLOW_START_OK;
764 err = rxrpc_conn_sendmsg(call->conn, msg);
765 rxrpc_put_message(msg);
770 call->pkt_snd_count++;
775 call->ackr_prev_seq = hdr->seq;
777 _leave(" = %d", ret);
779 } /* end rxrpc_call_generate_ACK() */
781 /*****************************************************************************/
783 * handle work to be done on a call
784 * - includes packet reception and timeout processing
786 void rxrpc_call_do_stuff(struct rxrpc_call *call)
788 _enter("%p{flags=%lx}", call, call->flags);
790 /* handle packet reception */
791 if (call->flags & RXRPC_CALL_RCV_PKT) {
792 _debug("- receive packet");
793 call->flags &= ~RXRPC_CALL_RCV_PKT;
794 rxrpc_call_receive_packet(call);
797 /* handle overdue ACKs */
798 if (call->flags & RXRPC_CALL_ACKS_TIMO) {
799 _debug("- overdue ACK timeout");
800 call->flags &= ~RXRPC_CALL_ACKS_TIMO;
801 rxrpc_call_resend(call, call->snd_seq_count);
804 /* handle lack of reception */
805 if (call->flags & RXRPC_CALL_RCV_TIMO) {
806 _debug("- reception timeout");
807 call->flags &= ~RXRPC_CALL_RCV_TIMO;
808 rxrpc_call_abort(call, -EIO);
811 /* handle deferred ACKs */
812 if (call->flags & RXRPC_CALL_ACKR_TIMO ||
813 (call->ackr.nAcks > 0 && call->ackr.reason == RXRPC_ACK_REQUESTED)
815 _debug("- deferred ACK timeout: cj=%05lu r=%s n=%u",
816 jiffies - call->cjif,
817 rxrpc_acks[call->ackr.reason],
820 call->flags &= ~RXRPC_CALL_ACKR_TIMO;
822 if (call->ackr.nAcks > 0 &&
823 call->app_call_state != RXRPC_CSTATE_ERROR) {
825 __rxrpc_call_gen_normal_ACK(call, call->ackr_dfr_seq);
826 call->ackr_dfr_seq = 0;
832 } /* end rxrpc_call_do_stuff() */
834 /*****************************************************************************/
836 * send an abort message at call or connection level
837 * - must be called with call->lock held
838 * - the supplied error code is sent as the packet data
840 static int __rxrpc_call_abort(struct rxrpc_call *call, int errno)
842 struct rxrpc_connection *conn = call->conn;
843 struct rxrpc_message *msg;
848 _enter("%p{%08x},%p{%d},%d",
849 conn, ntohl(conn->conn_id), call, ntohl(call->call_id), errno);
851 /* if this call is already aborted, then just wake up any waiters */
852 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
853 spin_unlock(&call->lock);
854 call->app_error_func(call);
859 rxrpc_get_call(call);
861 /* change the state _with_ the lock still held */
862 call->app_call_state = RXRPC_CSTATE_ERROR;
863 call->app_err_state = RXRPC_ESTATE_LOCAL_ABORT;
864 call->app_errno = errno;
865 call->app_mark = RXRPC_APP_MARK_EOF;
866 call->app_read_buf = NULL;
867 call->app_async_read = 0;
871 /* ask the app to translate the error code */
872 call->app_aemap_func(call);
874 spin_unlock(&call->lock);
876 /* flush any outstanding ACKs */
877 del_timer_sync(&call->acks_timeout);
878 del_timer_sync(&call->rcv_timeout);
879 del_timer_sync(&call->ackr_dfr_timo);
881 if (rxrpc_call_is_ack_pending(call))
882 __rxrpc_call_gen_normal_ACK(call, 0);
884 /* send the abort packet only if we actually traded some other
887 if (call->pkt_snd_count || call->pkt_rcv_count) {
888 /* actually send the abort */
889 _proto("Rx Sending Call ABORT { data=%d }",
890 call->app_abort_code);
892 _error = htonl(call->app_abort_code);
894 diov[0].iov_len = sizeof(_error);
895 diov[0].iov_base = &_error;
897 ret = rxrpc_conn_newmsg(conn, call, RXRPC_PACKET_TYPE_ABORT,
898 1, diov, GFP_KERNEL, &msg);
900 ret = rxrpc_conn_sendmsg(conn, msg);
901 rxrpc_put_message(msg);
905 /* tell the app layer to let go */
906 call->app_error_func(call);
908 rxrpc_put_call(call);
910 _leave(" = %d", ret);
912 } /* end __rxrpc_call_abort() */
914 /*****************************************************************************/
916 * send an abort message at call or connection level
917 * - the supplied error code is sent as the packet data
919 int rxrpc_call_abort(struct rxrpc_call *call, int error)
921 spin_lock(&call->lock);
923 return __rxrpc_call_abort(call, error);
925 } /* end rxrpc_call_abort() */
927 /*****************************************************************************/
929 * process packets waiting for this call
931 static void rxrpc_call_receive_packet(struct rxrpc_call *call)
933 struct rxrpc_message *msg;
934 struct list_head *_p;
938 rxrpc_get_call(call); /* must not go away too soon if aborted by
941 while (!list_empty(&call->rcv_receiveq)) {
942 /* try to get next packet */
944 spin_lock(&call->lock);
945 if (!list_empty(&call->rcv_receiveq)) {
946 _p = call->rcv_receiveq.next;
949 spin_unlock(&call->lock);
954 msg = list_entry(_p, struct rxrpc_message, link);
956 _proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)",
957 jiffies - call->cjif,
958 rxrpc_pkts[msg->hdr.type],
959 ntohl(msg->hdr.serial),
961 msg->hdr.flags & RXRPC_JUMBO_PACKET ? 'j' : '-',
962 msg->hdr.flags & RXRPC_MORE_PACKETS ? 'm' : '-',
963 msg->hdr.flags & RXRPC_LAST_PACKET ? 'l' : '-',
964 msg->hdr.flags & RXRPC_REQUEST_ACK ? 'r' : '-',
965 msg->hdr.flags & RXRPC_CLIENT_INITIATED ? 'C' : 'S'
968 switch (msg->hdr.type) {
969 /* deal with data packets */
970 case RXRPC_PACKET_TYPE_DATA:
971 /* ACK the packet if necessary */
972 switch (rxrpc_call_generate_ACK(call, &msg->hdr,
974 case 0: /* useful packet */
975 rxrpc_call_receive_data_packet(call, msg);
977 case 1: /* duplicate or out-of-window packet */
980 rxrpc_put_message(msg);
985 /* deal with ACK packets */
986 case RXRPC_PACKET_TYPE_ACK:
987 rxrpc_call_receive_ack_packet(call, msg);
990 /* deal with abort packets */
991 case RXRPC_PACKET_TYPE_ABORT: {
994 dp = skb_header_pointer(msg->pkt, msg->offset,
995 sizeof(_dbuf), &_dbuf);
997 printk("Rx Received short ABORT packet\n");
999 _proto("Rx Received Call ABORT { data=%d }",
1000 (dp ? ntohl(*dp) : 0));
1002 spin_lock(&call->lock);
1003 call->app_call_state = RXRPC_CSTATE_ERROR;
1004 call->app_err_state = RXRPC_ESTATE_PEER_ABORT;
1005 call->app_abort_code = (dp ? ntohl(*dp) : 0);
1006 call->app_errno = -ECONNABORTED;
1007 call->app_mark = RXRPC_APP_MARK_EOF;
1008 call->app_read_buf = NULL;
1009 call->app_async_read = 0;
1011 /* ask the app to translate the error code */
1012 call->app_aemap_func(call);
1014 spin_unlock(&call->lock);
1015 call->app_error_func(call);
1019 /* deal with other packet types */
1020 _proto("Rx Unsupported packet type %u (#%u)",
1021 msg->hdr.type, msg->seq);
1025 rxrpc_put_message(msg);
1029 rxrpc_put_call(call);
1031 } /* end rxrpc_call_receive_packet() */
1033 /*****************************************************************************/
1035 * process next data packet
1036 * - as the next data packet arrives:
1037 * - it is queued on app_readyq _if_ it is the next one expected
1039 * - it is queued on app_unreadyq _if_ it is not the next one expected
1040 * - if a packet placed on app_readyq completely fills a hole leading up to
1041 * the first packet on app_unreadyq, then packets now in sequence are
1042 * tranferred to app_readyq
1043 * - the application layer can only see packets on app_readyq
1044 * (app_ready_qty bytes)
1045 * - the application layer is prodded every time a new packet arrives
1047 static void rxrpc_call_receive_data_packet(struct rxrpc_call *call,
1048 struct rxrpc_message *msg)
1050 const struct rxrpc_operation *optbl, *op;
1051 struct rxrpc_message *pmsg;
1052 struct list_head *_p;
1053 int ret, lo, hi, rmtimo;
1056 _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq);
1058 rxrpc_get_message(msg);
1060 /* add to the unready queue if we'd have to create a hole in the ready
1061 * queue otherwise */
1062 if (msg->seq != call->app_ready_seq + 1) {
1063 _debug("Call add packet %d to unreadyq", msg->seq);
1065 /* insert in seq order */
1066 list_for_each(_p, &call->app_unreadyq) {
1067 pmsg = list_entry(_p, struct rxrpc_message, link);
1068 if (pmsg->seq > msg->seq)
1072 list_add_tail(&msg->link, _p);
1074 _leave(" [unreadyq]");
1078 /* next in sequence - simply append into the call's ready queue */
1079 _debug("Call add packet %d to readyq (+%Zd => %Zd bytes)",
1080 msg->seq, msg->dsize, call->app_ready_qty);
1082 spin_lock(&call->lock);
1083 call->app_ready_seq = msg->seq;
1084 call->app_ready_qty += msg->dsize;
1085 list_add_tail(&msg->link, &call->app_readyq);
1087 /* move unready packets to the readyq if we got rid of a hole */
1088 while (!list_empty(&call->app_unreadyq)) {
1089 pmsg = list_entry(call->app_unreadyq.next,
1090 struct rxrpc_message, link);
1092 if (pmsg->seq != call->app_ready_seq + 1)
1095 /* next in sequence - just move list-to-list */
1096 _debug("Call transfer packet %d to readyq (+%Zd => %Zd bytes)",
1097 pmsg->seq, pmsg->dsize, call->app_ready_qty);
1099 call->app_ready_seq = pmsg->seq;
1100 call->app_ready_qty += pmsg->dsize;
1101 list_del_init(&pmsg->link);
1102 list_add_tail(&pmsg->link, &call->app_readyq);
1105 /* see if we've got the last packet yet */
1106 if (!list_empty(&call->app_readyq)) {
1107 pmsg = list_entry(call->app_readyq.prev,
1108 struct rxrpc_message, link);
1109 if (pmsg->hdr.flags & RXRPC_LAST_PACKET) {
1110 call->app_last_rcv = 1;
1111 _debug("Last packet on readyq");
1115 switch (call->app_call_state) {
1116 /* do nothing if call already aborted */
1117 case RXRPC_CSTATE_ERROR:
1118 spin_unlock(&call->lock);
1122 /* extract the operation ID from an incoming call if that's not
1124 case RXRPC_CSTATE_SRVR_RCV_OPID:
1125 spin_unlock(&call->lock);
1127 /* handle as yet insufficient data for the operation ID */
1128 if (call->app_ready_qty < 4) {
1129 if (call->app_last_rcv)
1130 /* trouble - last packet seen */
1131 rxrpc_call_abort(call, -EINVAL);
1137 /* pull the operation ID out of the buffer */
1138 ret = rxrpc_call_read_data(call, &opid, sizeof(opid), 0);
1140 printk("Unexpected error from read-data: %d\n", ret);
1141 if (call->app_call_state != RXRPC_CSTATE_ERROR)
1142 rxrpc_call_abort(call, ret);
1146 call->app_opcode = ntohl(opid);
1148 /* locate the operation in the available ops table */
1149 optbl = call->conn->service->ops_begin;
1151 hi = call->conn->service->ops_end - optbl;
1154 int mid = (hi + lo) / 2;
1156 if (call->app_opcode == op->id)
1158 if (call->app_opcode > op->id)
1165 kproto("Rx Client requested operation %d from %s service",
1166 call->app_opcode, call->conn->service->name);
1167 rxrpc_call_abort(call, -EINVAL);
1172 _proto("Rx Client requested operation %s from %s service",
1173 op->name, call->conn->service->name);
1175 /* we're now waiting for the argument block (unless the call
1177 spin_lock(&call->lock);
1178 if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_OPID ||
1179 call->app_call_state == RXRPC_CSTATE_SRVR_SND_REPLY) {
1180 if (!call->app_last_rcv)
1181 call->app_call_state =
1182 RXRPC_CSTATE_SRVR_RCV_ARGS;
1183 else if (call->app_ready_qty > 0)
1184 call->app_call_state =
1185 RXRPC_CSTATE_SRVR_GOT_ARGS;
1187 call->app_call_state =
1188 RXRPC_CSTATE_SRVR_SND_REPLY;
1189 call->app_mark = op->asize;
1190 call->app_user = op->user;
1192 spin_unlock(&call->lock);
1197 case RXRPC_CSTATE_SRVR_RCV_ARGS:
1198 /* change state if just received last packet of arg block */
1199 if (call->app_last_rcv)
1200 call->app_call_state = RXRPC_CSTATE_SRVR_GOT_ARGS;
1201 spin_unlock(&call->lock);
1206 case RXRPC_CSTATE_CLNT_RCV_REPLY:
1207 /* change state if just received last packet of reply block */
1209 if (call->app_last_rcv) {
1210 call->app_call_state = RXRPC_CSTATE_CLNT_GOT_REPLY;
1213 spin_unlock(&call->lock);
1216 del_timer_sync(&call->acks_timeout);
1217 del_timer_sync(&call->rcv_timeout);
1218 del_timer_sync(&call->ackr_dfr_timo);
1225 /* deal with data reception in an unexpected state */
1226 printk("Unexpected state [[[ %u ]]]\n", call->app_call_state);
1227 __rxrpc_call_abort(call, -EBADMSG);
1232 if (call->app_call_state == RXRPC_CSTATE_CLNT_RCV_REPLY &&
1236 /* otherwise just invoke the data function whenever we can satisfy its desire for more
1239 _proto("Rx Received Op Data: st=%u qty=%Zu mk=%Zu%s",
1240 call->app_call_state, call->app_ready_qty, call->app_mark,
1241 call->app_last_rcv ? " last-rcvd" : "");
1243 spin_lock(&call->lock);
1245 ret = __rxrpc_call_read_data(call);
1248 spin_unlock(&call->lock);
1249 call->app_attn_func(call);
1252 spin_unlock(&call->lock);
1255 spin_unlock(&call->lock);
1258 __rxrpc_call_abort(call, ret);
1266 } /* end rxrpc_call_receive_data_packet() */
1268 /*****************************************************************************/
1270 * received an ACK packet
1272 static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call,
1273 struct rxrpc_message *msg)
1275 struct rxrpc_ackpacket _ack, *ap;
1276 rxrpc_serial_net_t serial;
1280 _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq);
1282 /* extract the basic ACK record */
1283 ap = skb_header_pointer(msg->pkt, msg->offset, sizeof(_ack), &_ack);
1285 printk("Rx Received short ACK packet\n");
1288 msg->offset += sizeof(_ack);
1290 serial = ap->serial;
1291 seq = ntohl(ap->firstPacket);
1293 _proto("Rx Received ACK %%%d { b=%hu m=%hu f=%u p=%u s=%u r=%s n=%u }",
1294 ntohl(msg->hdr.serial),
1295 ntohs(ap->bufferSpace),
1298 ntohl(ap->previousPacket),
1300 rxrpc_acks[ap->reason],
1304 /* check the other side isn't ACK'ing a sequence number I haven't sent
1306 if (ap->nAcks > 0 &&
1307 (seq > call->snd_seq_count ||
1308 seq + ap->nAcks - 1 > call->snd_seq_count)) {
1309 printk("Received ACK (#%u-#%u) for unsent packet\n",
1310 seq, seq + ap->nAcks - 1);
1311 rxrpc_call_abort(call, -EINVAL);
1316 /* deal with RTT calculation */
1318 struct rxrpc_message *rttmsg;
1320 /* find the prompting packet */
1321 spin_lock(&call->lock);
1322 if (call->snd_ping && call->snd_ping->hdr.serial == serial) {
1323 /* it was a ping packet */
1324 rttmsg = call->snd_ping;
1325 call->snd_ping = NULL;
1326 spin_unlock(&call->lock);
1329 rttmsg->rttdone = 1;
1330 rxrpc_peer_calculate_rtt(call->conn->peer,
1332 rxrpc_put_message(rttmsg);
1336 struct list_head *_p;
1338 /* it ought to be a data packet - look in the pending
1340 list_for_each(_p, &call->acks_pendq) {
1341 rttmsg = list_entry(_p, struct rxrpc_message,
1343 if (rttmsg->hdr.serial == serial) {
1344 if (rttmsg->rttdone)
1345 /* never do RTT twice without
1349 rttmsg->rttdone = 1;
1350 rxrpc_peer_calculate_rtt(
1351 call->conn->peer, rttmsg, msg);
1355 spin_unlock(&call->lock);
1359 switch (ap->reason) {
1360 /* deal with negative/positive acknowledgement of data
1362 case RXRPC_ACK_REQUESTED:
1363 case RXRPC_ACK_DELAY:
1364 case RXRPC_ACK_IDLE:
1365 rxrpc_call_definitively_ACK(call, seq - 1);
1367 case RXRPC_ACK_DUPLICATE:
1368 case RXRPC_ACK_OUT_OF_SEQUENCE:
1369 case RXRPC_ACK_EXCEEDS_WINDOW:
1370 call->snd_resend_cnt = 0;
1371 ret = rxrpc_call_record_ACK(call, msg, seq, ap->nAcks);
1373 rxrpc_call_abort(call, ret);
1376 /* respond to ping packets immediately */
1377 case RXRPC_ACK_PING:
1378 rxrpc_call_generate_ACK(call, &msg->hdr, ap);
1381 /* only record RTT on ping response packets */
1382 case RXRPC_ACK_PING_RESPONSE:
1383 if (call->snd_ping) {
1384 struct rxrpc_message *rttmsg;
1386 /* only do RTT stuff if the response matches the
1389 spin_lock(&call->lock);
1390 if (call->snd_ping &&
1391 call->snd_ping->hdr.serial == ap->serial) {
1392 rttmsg = call->snd_ping;
1393 call->snd_ping = NULL;
1395 spin_unlock(&call->lock);
1398 rttmsg->rttdone = 1;
1399 rxrpc_peer_calculate_rtt(call->conn->peer,
1401 rxrpc_put_message(rttmsg);
1407 printk("Unsupported ACK reason %u\n", ap->reason);
1412 } /* end rxrpc_call_receive_ack_packet() */
1414 /*****************************************************************************/
1416 * record definitive ACKs for all messages up to and including the one with the
1419 static void rxrpc_call_definitively_ACK(struct rxrpc_call *call,
1420 rxrpc_seq_t highest)
1422 struct rxrpc_message *msg;
1425 _enter("%p{ads=%u},%u", call, call->acks_dftv_seq, highest);
1427 while (call->acks_dftv_seq < highest) {
1428 call->acks_dftv_seq++;
1430 _proto("Definitive ACK on packet #%u", call->acks_dftv_seq);
1432 /* discard those at front of queue until message with highest
1434 spin_lock(&call->lock);
1436 if (!list_empty(&call->acks_pendq)) {
1437 msg = list_entry(call->acks_pendq.next,
1438 struct rxrpc_message, link);
1439 list_del_init(&msg->link); /* dequeue */
1440 if (msg->state == RXRPC_MSG_SENT)
1441 call->acks_pend_cnt--;
1443 spin_unlock(&call->lock);
1445 /* insanity check */
1447 panic("%s(): acks_pendq unexpectedly empty\n",
1450 if (msg->seq != call->acks_dftv_seq)
1451 panic("%s(): Packet #%u expected at front of acks_pendq"
1453 __FUNCTION__, call->acks_dftv_seq, msg->seq);
1455 /* discard the message */
1456 msg->state = RXRPC_MSG_DONE;
1457 rxrpc_put_message(msg);
1460 /* if all sent packets are definitively ACK'd then prod any sleepers just in case */
1462 spin_lock(&call->lock);
1463 if (call->acks_dftv_seq == call->snd_seq_count) {
1464 if (call->app_call_state != RXRPC_CSTATE_COMPLETE) {
1465 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1470 spin_unlock(&call->lock);
1473 del_timer_sync(&call->acks_timeout);
1474 del_timer_sync(&call->rcv_timeout);
1475 del_timer_sync(&call->ackr_dfr_timo);
1476 call->app_attn_func(call);
1480 } /* end rxrpc_call_definitively_ACK() */
1482 /*****************************************************************************/
1484 * record the specified amount of ACKs/NAKs
1486 static int rxrpc_call_record_ACK(struct rxrpc_call *call,
1487 struct rxrpc_message *msg,
1491 struct rxrpc_message *dmsg;
1492 struct list_head *_p;
1493 rxrpc_seq_t highest;
1496 char resend, now_complete;
1499 _enter("%p{apc=%u ads=%u},%p,%u,%Zu",
1500 call, call->acks_pend_cnt, call->acks_dftv_seq,
1503 /* handle re-ACK'ing of definitively ACK'd packets (may be out-of-order
1505 if (seq <= call->acks_dftv_seq) {
1506 unsigned delta = call->acks_dftv_seq - seq;
1508 if (count <= delta) {
1509 _leave(" = 0 [all definitively ACK'd]");
1515 msg->offset += delta;
1518 highest = seq + count - 1;
1521 /* extract up to 16 ACK slots at a time */
1522 chunk = min(count, sizeof(acks));
1525 memset(acks, 2, sizeof(acks));
1527 if (skb_copy_bits(msg->pkt, msg->offset, &acks, chunk) < 0) {
1528 printk("Rx Received short ACK packet\n");
1529 _leave(" = -EINVAL");
1532 msg->offset += chunk;
1534 /* check that the ACK set is valid */
1535 for (ix = 0; ix < chunk; ix++) {
1537 case RXRPC_ACK_TYPE_ACK:
1539 case RXRPC_ACK_TYPE_NACK:
1543 printk("Rx Received unsupported ACK state"
1545 _leave(" = -EINVAL");
1550 _proto("Rx ACK of packets #%u-#%u "
1551 "[%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c] (pend=%u)",
1552 seq, (unsigned) (seq + chunk - 1),
1553 _acktype[acks[0x0]],
1554 _acktype[acks[0x1]],
1555 _acktype[acks[0x2]],
1556 _acktype[acks[0x3]],
1557 _acktype[acks[0x4]],
1558 _acktype[acks[0x5]],
1559 _acktype[acks[0x6]],
1560 _acktype[acks[0x7]],
1561 _acktype[acks[0x8]],
1562 _acktype[acks[0x9]],
1563 _acktype[acks[0xA]],
1564 _acktype[acks[0xB]],
1565 _acktype[acks[0xC]],
1566 _acktype[acks[0xD]],
1567 _acktype[acks[0xE]],
1568 _acktype[acks[0xF]],
1572 /* mark the packets in the ACK queue as being provisionally
1575 spin_lock(&call->lock);
1577 /* find the first packet ACK'd/NAK'd here */
1578 list_for_each(_p, &call->acks_pendq) {
1579 dmsg = list_entry(_p, struct rxrpc_message, link);
1580 if (dmsg->seq == seq)
1582 _debug("- %u: skipping #%u", ix, dmsg->seq);
1588 _debug("- %u: processing #%u (%c) apc=%u",
1589 ix, dmsg->seq, _acktype[acks[ix]],
1590 call->acks_pend_cnt);
1592 if (acks[ix] == RXRPC_ACK_TYPE_ACK) {
1593 if (dmsg->state == RXRPC_MSG_SENT)
1594 call->acks_pend_cnt--;
1595 dmsg->state = RXRPC_MSG_ACKED;
1598 if (dmsg->state == RXRPC_MSG_ACKED)
1599 call->acks_pend_cnt++;
1600 dmsg->state = RXRPC_MSG_SENT;
1605 _p = dmsg->link.next;
1606 dmsg = list_entry(_p, struct rxrpc_message, link);
1607 } while(ix < chunk &&
1608 _p != &call->acks_pendq &&
1614 spin_unlock(&call->lock);
1618 rxrpc_call_resend(call, highest);
1620 /* if all packets are provisionally ACK'd, then wake up anyone who's
1621 * waiting for that */
1623 spin_lock(&call->lock);
1624 if (call->acks_pend_cnt == 0) {
1625 if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_FINAL_ACK) {
1626 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1631 spin_unlock(&call->lock);
1634 _debug("- wake up waiters");
1635 del_timer_sync(&call->acks_timeout);
1636 del_timer_sync(&call->rcv_timeout);
1637 del_timer_sync(&call->ackr_dfr_timo);
1638 call->app_attn_func(call);
1641 _leave(" = 0 (apc=%u)", call->acks_pend_cnt);
1645 panic("%s(): acks_pendq in bad state (packet #%u absent)\n",
1648 } /* end rxrpc_call_record_ACK() */
1650 /*****************************************************************************/
1652 * transfer data from the ready packet queue to the asynchronous read buffer
1653 * - since this func is the only one going to look at packets queued on
1654 * app_readyq, we don't need a lock to modify or access them, only to modify
1655 * the queue pointers
1656 * - called with call->lock held
1657 * - the buffer must be in kernel space
1659 * 0 if buffer filled
1660 * -EAGAIN if buffer not filled and more data to come
1661 * -EBADMSG if last packet received and insufficient data left
1662 * -ECONNABORTED if the call has in an error state
1664 static int __rxrpc_call_read_data(struct rxrpc_call *call)
1666 struct rxrpc_message *msg;
1670 _enter("%p{as=%d buf=%p qty=%Zu/%Zu}",
1672 call->app_async_read, call->app_read_buf,
1673 call->app_ready_qty, call->app_mark);
1675 /* check the state */
1676 switch (call->app_call_state) {
1677 case RXRPC_CSTATE_SRVR_RCV_ARGS:
1678 case RXRPC_CSTATE_CLNT_RCV_REPLY:
1679 if (call->app_last_rcv) {
1680 printk("%s(%p,%p,%Zd):"
1681 " Inconsistent call state (%s, last pkt)",
1683 call, call->app_read_buf, call->app_mark,
1684 rxrpc_call_states[call->app_call_state]);
1689 case RXRPC_CSTATE_SRVR_RCV_OPID:
1690 case RXRPC_CSTATE_SRVR_GOT_ARGS:
1691 case RXRPC_CSTATE_CLNT_GOT_REPLY:
1694 case RXRPC_CSTATE_SRVR_SND_REPLY:
1695 if (!call->app_last_rcv) {
1696 printk("%s(%p,%p,%Zd):"
1697 " Inconsistent call state (%s, not last pkt)",
1699 call, call->app_read_buf, call->app_mark,
1700 rxrpc_call_states[call->app_call_state]);
1703 _debug("Trying to read data from call in SND_REPLY state");
1706 case RXRPC_CSTATE_ERROR:
1707 _leave(" = -ECONNABORTED");
1708 return -ECONNABORTED;
1711 printk("reading in unexpected state [[[ %u ]]]\n",
1712 call->app_call_state);
1716 /* handle the case of not having an async buffer */
1717 if (!call->app_async_read) {
1718 if (call->app_mark == RXRPC_APP_MARK_EOF) {
1719 ret = call->app_last_rcv ? 0 : -EAGAIN;
1722 if (call->app_mark >= call->app_ready_qty) {
1723 call->app_mark = RXRPC_APP_MARK_EOF;
1727 ret = call->app_last_rcv ? -EBADMSG : -EAGAIN;
1731 _leave(" = %d [no buf]", ret);
1735 while (!list_empty(&call->app_readyq) && call->app_mark > 0) {
1736 msg = list_entry(call->app_readyq.next,
1737 struct rxrpc_message, link);
1739 /* drag as much data as we need out of this packet */
1740 qty = min(call->app_mark, msg->dsize);
1742 _debug("reading %Zu from skb=%p off=%lu",
1743 qty, msg->pkt, msg->offset);
1745 if (call->app_read_buf)
1746 if (skb_copy_bits(msg->pkt, msg->offset,
1747 call->app_read_buf, qty) < 0)
1748 panic("%s: Failed to copy data from packet:"
1751 call, call->app_read_buf, qty);
1753 /* if that packet is now empty, discard it */
1754 call->app_ready_qty -= qty;
1757 if (msg->dsize == 0) {
1758 list_del_init(&msg->link);
1759 rxrpc_put_message(msg);
1765 call->app_mark -= qty;
1766 if (call->app_read_buf)
1767 call->app_read_buf += qty;
1770 if (call->app_mark == 0) {
1771 call->app_async_read = 0;
1772 call->app_mark = RXRPC_APP_MARK_EOF;
1773 call->app_read_buf = NULL;
1775 /* adjust the state if used up all packets */
1776 if (list_empty(&call->app_readyq) && call->app_last_rcv) {
1777 switch (call->app_call_state) {
1778 case RXRPC_CSTATE_SRVR_RCV_OPID:
1779 call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY;
1780 call->app_mark = RXRPC_APP_MARK_EOF;
1782 del_timer_sync(&call->rcv_timeout);
1784 case RXRPC_CSTATE_SRVR_GOT_ARGS:
1785 call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY;
1787 del_timer_sync(&call->rcv_timeout);
1790 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1792 del_timer_sync(&call->acks_timeout);
1793 del_timer_sync(&call->ackr_dfr_timo);
1794 del_timer_sync(&call->rcv_timeout);
1803 if (call->app_last_rcv) {
1804 _debug("Insufficient data (%Zu/%Zu)",
1805 call->app_ready_qty, call->app_mark);
1806 call->app_async_read = 0;
1807 call->app_mark = RXRPC_APP_MARK_EOF;
1808 call->app_read_buf = NULL;
1810 _leave(" = -EBADMSG");
1814 _leave(" = -EAGAIN");
1816 } /* end __rxrpc_call_read_data() */
1818 /*****************************************************************************/
1820 * attempt to read the specified amount of data from the call's ready queue
1821 * into the buffer provided
1822 * - since this func is the only one going to look at packets queued on
1823 * app_readyq, we don't need a lock to modify or access them, only to modify
1824 * the queue pointers
1825 * - if the buffer pointer is NULL, then data is merely drained, not copied
1826 * - if flags&RXRPC_CALL_READ_BLOCK, then the function will wait until there is
1827 * enough data or an error will be generated
1828 * - note that the caller must have added the calling task to the call's wait
1830 * - if flags&RXRPC_CALL_READ_ALL, then an error will be generated if this
1831 * function doesn't read all available data
1833 int rxrpc_call_read_data(struct rxrpc_call *call,
1834 void *buffer, size_t size, int flags)
1838 _enter("%p{arq=%Zu},%p,%Zd,%x",
1839 call, call->app_ready_qty, buffer, size, flags);
1841 spin_lock(&call->lock);
1843 if (unlikely(!!call->app_read_buf)) {
1844 spin_unlock(&call->lock);
1845 _leave(" = -EBUSY");
1849 call->app_mark = size;
1850 call->app_read_buf = buffer;
1851 call->app_async_read = 1;
1852 call->app_read_count++;
1854 /* read as much data as possible */
1855 ret = __rxrpc_call_read_data(call);
1858 if (flags & RXRPC_CALL_READ_ALL &&
1859 (!call->app_last_rcv || call->app_ready_qty > 0)) {
1860 _leave(" = -EBADMSG");
1861 __rxrpc_call_abort(call, -EBADMSG);
1865 spin_unlock(&call->lock);
1866 call->app_attn_func(call);
1871 spin_unlock(&call->lock);
1872 _leave(" = %d [aborted]", ret);
1876 __rxrpc_call_abort(call, ret);
1877 _leave(" = %d", ret);
1881 spin_unlock(&call->lock);
1883 if (!(flags & RXRPC_CALL_READ_BLOCK)) {
1884 _leave(" = -EAGAIN");
1888 /* wait for the data to arrive */
1889 _debug("blocking for data arrival");
1892 set_current_state(TASK_INTERRUPTIBLE);
1893 if (!call->app_async_read || signal_pending(current))
1897 set_current_state(TASK_RUNNING);
1899 if (signal_pending(current)) {
1900 _leave(" = -EINTR");
1904 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
1905 _leave(" = -ECONNABORTED");
1906 return -ECONNABORTED;
1913 } /* end rxrpc_call_read_data() */
1915 /*****************************************************************************/
1917 * write data to a call
1918 * - the data may not be sent immediately if it doesn't fill a buffer
1919 * - if we can't queue all the data for buffering now, siov[] will have been
1920 * adjusted to take account of what has been sent
1922 int rxrpc_call_write_data(struct rxrpc_call *call,
1930 struct rxrpc_message *msg;
1932 size_t space, size, chunk, tmp;
1936 _enter("%p,%Zu,%p,%02x,%x,%d,%p",
1937 call, sioc, siov, rxhdr_flags, alloc_flags, dup_data,
1944 /* can't send more if we've sent last packet from this end */
1945 switch (call->app_call_state) {
1946 case RXRPC_CSTATE_SRVR_SND_REPLY:
1947 case RXRPC_CSTATE_CLNT_SND_ARGS:
1949 case RXRPC_CSTATE_ERROR:
1950 ret = call->app_errno;
1955 /* calculate how much data we've been given */
1957 for (; sioc > 0; sptr++, sioc--) {
1961 if (!sptr->iov_base)
1964 size += sptr->iov_len;
1967 _debug("- size=%Zu mtu=%Zu", size, call->conn->mtu_size);
1970 /* make sure there's a message under construction */
1971 if (!call->snd_nextmsg) {
1972 /* no - allocate a message with no data yet attached */
1973 ret = rxrpc_conn_newmsg(call->conn, call,
1974 RXRPC_PACKET_TYPE_DATA,
1975 0, NULL, alloc_flags,
1976 &call->snd_nextmsg);
1979 _debug("- allocated new message [ds=%Zu]",
1980 call->snd_nextmsg->dsize);
1983 msg = call->snd_nextmsg;
1984 msg->hdr.flags |= rxhdr_flags;
1986 /* deal with zero-length terminal packet */
1988 if (rxhdr_flags & RXRPC_LAST_PACKET) {
1989 ret = rxrpc_call_flush(call);
1996 /* work out how much space current packet has available */
1997 space = call->conn->mtu_size - msg->dsize;
1998 chunk = min(space, size);
2000 _debug("- [before] space=%Zu chunk=%Zu", space, chunk);
2002 while (!siov->iov_len)
2005 /* if we are going to have to duplicate the data then coalesce
2008 /* don't allocate more that 1 page at a time */
2009 if (chunk > PAGE_SIZE)
2012 /* allocate a data buffer and attach to the message */
2013 buf = kmalloc(chunk, alloc_flags);
2014 if (unlikely(!buf)) {
2016 sizeof(struct rxrpc_header)) {
2017 /* discard an empty msg and wind back
2018 * the seq counter */
2019 rxrpc_put_message(msg);
2020 call->snd_nextmsg = NULL;
2021 call->snd_seq_count--;
2028 tmp = msg->dcount++;
2029 set_bit(tmp, &msg->dfree);
2030 msg->data[tmp].iov_base = buf;
2031 msg->data[tmp].iov_len = chunk;
2032 msg->dsize += chunk;
2033 *size_sent += chunk;
2036 /* load the buffer with data */
2038 tmp = min(chunk, siov->iov_len);
2039 memcpy(buf, siov->iov_base, tmp);
2041 siov->iov_base += tmp;
2042 siov->iov_len -= tmp;
2049 /* we want to attach the supplied buffers directly */
2051 msg->dcount < RXRPC_MSG_MAX_IOCS) {
2052 tmp = msg->dcount++;
2053 msg->data[tmp].iov_base = siov->iov_base;
2054 msg->data[tmp].iov_len = siov->iov_len;
2055 msg->dsize += siov->iov_len;
2056 *size_sent += siov->iov_len;
2057 size -= siov->iov_len;
2058 chunk -= siov->iov_len;
2063 _debug("- [loaded] chunk=%Zu size=%Zu", chunk, size);
2065 /* dispatch the message when full, final or requesting ACK */
2066 if (msg->dsize >= call->conn->mtu_size || rxhdr_flags) {
2067 ret = rxrpc_call_flush(call);
2076 _leave(" = %d (%Zd queued, %Zd rem)", ret, *size_sent, size);
2079 } /* end rxrpc_call_write_data() */
2081 /*****************************************************************************/
2083 * flush outstanding packets to the network
2085 static int rxrpc_call_flush(struct rxrpc_call *call)
2087 struct rxrpc_message *msg;
2092 rxrpc_get_call(call);
2094 /* if there's a packet under construction, then dispatch it now */
2095 if (call->snd_nextmsg) {
2096 msg = call->snd_nextmsg;
2097 call->snd_nextmsg = NULL;
2099 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2100 msg->hdr.flags &= ~RXRPC_MORE_PACKETS;
2101 if (call->app_call_state != RXRPC_CSTATE_CLNT_SND_ARGS)
2102 msg->hdr.flags |= RXRPC_REQUEST_ACK;
2105 msg->hdr.flags |= RXRPC_MORE_PACKETS;
2108 _proto("Sending DATA message { ds=%Zu dc=%u df=%02lu }",
2109 msg->dsize, msg->dcount, msg->dfree);
2111 /* queue and adjust call state */
2112 spin_lock(&call->lock);
2113 list_add_tail(&msg->link, &call->acks_pendq);
2115 /* decide what to do depending on current state and if this is
2116 * the last packet */
2118 switch (call->app_call_state) {
2119 case RXRPC_CSTATE_SRVR_SND_REPLY:
2120 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2121 call->app_call_state =
2122 RXRPC_CSTATE_SRVR_RCV_FINAL_ACK;
2127 case RXRPC_CSTATE_CLNT_SND_ARGS:
2128 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2129 call->app_call_state =
2130 RXRPC_CSTATE_CLNT_RCV_REPLY;
2135 case RXRPC_CSTATE_ERROR:
2136 ret = call->app_errno;
2138 spin_unlock(&call->lock);
2142 call->acks_pend_cnt++;
2144 mod_timer(&call->acks_timeout,
2145 __rxrpc_rtt_based_timeout(call,
2146 rxrpc_call_acks_timeout));
2148 spin_unlock(&call->lock);
2150 ret = rxrpc_conn_sendmsg(call->conn, msg);
2152 call->pkt_snd_count++;
2156 rxrpc_put_call(call);
2158 _leave(" = %d", ret);
2161 } /* end rxrpc_call_flush() */
2163 /*****************************************************************************/
2165 * resend NAK'd or unacknowledged packets up to the highest one specified
2167 static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest)
2169 struct rxrpc_message *msg;
2170 struct list_head *_p;
2171 rxrpc_seq_t seq = 0;
2173 _enter("%p,%u", call, highest);
2175 _proto("Rx Resend required");
2177 /* handle too many resends */
2178 if (call->snd_resend_cnt >= rxrpc_call_max_resend) {
2179 _debug("Aborting due to too many resends (rcv=%d)",
2180 call->pkt_rcv_count);
2181 rxrpc_call_abort(call,
2182 call->pkt_rcv_count > 0 ? -EIO : -ETIMEDOUT);
2187 spin_lock(&call->lock);
2188 call->snd_resend_cnt++;
2190 /* determine which the next packet we might need to ACK is */
2191 if (seq <= call->acks_dftv_seq)
2192 seq = call->acks_dftv_seq;
2198 /* look for the packet in the pending-ACK queue */
2199 list_for_each(_p, &call->acks_pendq) {
2200 msg = list_entry(_p, struct rxrpc_message, link);
2201 if (msg->seq == seq)
2206 " Inconsistent pending-ACK queue (ds=%u sc=%u sq=%u)\n",
2207 __FUNCTION__, call, highest,
2208 call->acks_dftv_seq, call->snd_seq_count, seq);
2211 if (msg->state != RXRPC_MSG_SENT)
2212 continue; /* only un-ACK'd packets */
2214 rxrpc_get_message(msg);
2215 spin_unlock(&call->lock);
2217 /* send each message again (and ignore any errors we might
2219 _proto("Resending DATA message { ds=%Zu dc=%u df=%02lu }",
2220 msg->dsize, msg->dcount, msg->dfree);
2222 if (rxrpc_conn_sendmsg(call->conn, msg) == 0)
2223 call->pkt_snd_count++;
2225 rxrpc_put_message(msg);
2227 spin_lock(&call->lock);
2230 /* reset the timeout */
2231 mod_timer(&call->acks_timeout,
2232 __rxrpc_rtt_based_timeout(call, rxrpc_call_acks_timeout));
2234 spin_unlock(&call->lock);
2237 } /* end rxrpc_call_resend() */
2239 /*****************************************************************************/
2241 * handle an ICMP error being applied to a call
2243 void rxrpc_call_handle_error(struct rxrpc_call *call, int local, int errno)
2245 _enter("%p{%u},%d", call, ntohl(call->call_id), errno);
2247 /* if this call is already aborted, then just wake up any waiters */
2248 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
2249 call->app_error_func(call);
2252 /* tell the app layer what happened */
2253 spin_lock(&call->lock);
2254 call->app_call_state = RXRPC_CSTATE_ERROR;
2257 call->app_err_state = RXRPC_ESTATE_LOCAL_ERROR;
2259 call->app_err_state = RXRPC_ESTATE_REMOTE_ERROR;
2260 call->app_errno = errno;
2261 call->app_mark = RXRPC_APP_MARK_EOF;
2262 call->app_read_buf = NULL;
2263 call->app_async_read = 0;
2266 call->app_aemap_func(call);
2268 del_timer_sync(&call->acks_timeout);
2269 del_timer_sync(&call->rcv_timeout);
2270 del_timer_sync(&call->ackr_dfr_timo);
2272 spin_unlock(&call->lock);
2274 call->app_error_func(call);
2278 } /* end rxrpc_call_handle_error() */