1 /* call.c: Rx call routines
3 * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
12 #include <linux/sched.h>
13 #include <linux/slab.h>
14 #include <linux/module.h>
15 #include <rxrpc/rxrpc.h>
16 #include <rxrpc/transport.h>
17 #include <rxrpc/peer.h>
18 #include <rxrpc/connection.h>
19 #include <rxrpc/call.h>
20 #include <rxrpc/message.h>
23 __RXACCT_DECL(atomic_t rxrpc_call_count);
24 __RXACCT_DECL(atomic_t rxrpc_message_count);
26 LIST_HEAD(rxrpc_calls);
27 DECLARE_RWSEM(rxrpc_calls_sem);
29 unsigned rxrpc_call_rcv_timeout = HZ/3;
30 static unsigned rxrpc_call_acks_timeout = HZ/3;
31 static unsigned rxrpc_call_dfr_ack_timeout = HZ/20;
32 static unsigned short rxrpc_call_max_resend = HZ/10;
34 const char *rxrpc_call_states[] = {
47 const char *rxrpc_call_error_states[] = {
55 const char *rxrpc_pkts[] = {
57 "data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug",
58 "?09", "?10", "?11", "?12", "?13", "?14", "?15"
61 static const char *rxrpc_acks[] = {
62 "---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",
66 static const char _acktype[] = "NA-";
68 static void rxrpc_call_receive_packet(struct rxrpc_call *call);
69 static void rxrpc_call_receive_data_packet(struct rxrpc_call *call,
70 struct rxrpc_message *msg);
71 static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call,
72 struct rxrpc_message *msg);
73 static void rxrpc_call_definitively_ACK(struct rxrpc_call *call,
75 static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest);
76 static int __rxrpc_call_read_data(struct rxrpc_call *call);
78 static int rxrpc_call_record_ACK(struct rxrpc_call *call,
79 struct rxrpc_message *msg,
83 static int rxrpc_call_flush(struct rxrpc_call *call);
85 #define _state(call) \
86 _debug("[[[ state %s ]]]", rxrpc_call_states[call->app_call_state]);
88 static void rxrpc_call_default_attn_func(struct rxrpc_call *call)
90 wake_up(&call->waitq);
93 static void rxrpc_call_default_error_func(struct rxrpc_call *call)
95 wake_up(&call->waitq);
98 static void rxrpc_call_default_aemap_func(struct rxrpc_call *call)
100 switch (call->app_err_state) {
101 case RXRPC_ESTATE_LOCAL_ABORT:
102 call->app_abort_code = -call->app_errno;
103 case RXRPC_ESTATE_PEER_ABORT:
104 call->app_errno = -ECONNABORTED;
110 static void __rxrpc_call_acks_timeout(unsigned long _call)
112 struct rxrpc_call *call = (struct rxrpc_call *) _call;
114 _debug("ACKS TIMEOUT %05lu", jiffies - call->cjif);
116 call->flags |= RXRPC_CALL_ACKS_TIMO;
117 rxrpc_krxiod_queue_call(call);
120 static void __rxrpc_call_rcv_timeout(unsigned long _call)
122 struct rxrpc_call *call = (struct rxrpc_call *) _call;
124 _debug("RCV TIMEOUT %05lu", jiffies - call->cjif);
126 call->flags |= RXRPC_CALL_RCV_TIMO;
127 rxrpc_krxiod_queue_call(call);
130 static void __rxrpc_call_ackr_timeout(unsigned long _call)
132 struct rxrpc_call *call = (struct rxrpc_call *) _call;
134 _debug("ACKR TIMEOUT %05lu",jiffies - call->cjif);
136 call->flags |= RXRPC_CALL_ACKR_TIMO;
137 rxrpc_krxiod_queue_call(call);
140 /*****************************************************************************/
142 * calculate a timeout based on an RTT value
144 static inline unsigned long __rxrpc_rtt_based_timeout(struct rxrpc_call *call,
147 unsigned long expiry = call->conn->peer->rtt / (1000000 / HZ);
150 if (expiry < HZ / 25)
155 _leave(" = %lu jiffies", expiry);
156 return jiffies + expiry;
157 } /* end __rxrpc_rtt_based_timeout() */
159 /*****************************************************************************/
161 * create a new call record
163 static inline int __rxrpc_create_call(struct rxrpc_connection *conn,
164 struct rxrpc_call **_call)
166 struct rxrpc_call *call;
170 /* allocate and initialise a call record */
171 call = (struct rxrpc_call *) get_zeroed_page(GFP_KERNEL);
177 atomic_set(&call->usage, 1);
179 init_waitqueue_head(&call->waitq);
180 spin_lock_init(&call->lock);
181 INIT_LIST_HEAD(&call->link);
182 INIT_LIST_HEAD(&call->acks_pendq);
183 INIT_LIST_HEAD(&call->rcv_receiveq);
184 INIT_LIST_HEAD(&call->rcv_krxiodq_lk);
185 INIT_LIST_HEAD(&call->app_readyq);
186 INIT_LIST_HEAD(&call->app_unreadyq);
187 INIT_LIST_HEAD(&call->app_link);
188 INIT_LIST_HEAD(&call->app_attn_link);
190 init_timer(&call->acks_timeout);
191 call->acks_timeout.data = (unsigned long) call;
192 call->acks_timeout.function = __rxrpc_call_acks_timeout;
194 init_timer(&call->rcv_timeout);
195 call->rcv_timeout.data = (unsigned long) call;
196 call->rcv_timeout.function = __rxrpc_call_rcv_timeout;
198 init_timer(&call->ackr_dfr_timo);
199 call->ackr_dfr_timo.data = (unsigned long) call;
200 call->ackr_dfr_timo.function = __rxrpc_call_ackr_timeout;
203 call->ackr_win_bot = 1;
204 call->ackr_win_top = call->ackr_win_bot + RXRPC_CALL_ACK_WINDOW_SIZE - 1;
205 call->ackr_prev_seq = 0;
206 call->app_mark = RXRPC_APP_MARK_EOF;
207 call->app_attn_func = rxrpc_call_default_attn_func;
208 call->app_error_func = rxrpc_call_default_error_func;
209 call->app_aemap_func = rxrpc_call_default_aemap_func;
210 call->app_scr_alloc = call->app_scratch;
212 call->cjif = jiffies;
214 _leave(" = 0 (%p)", call);
219 } /* end __rxrpc_create_call() */
221 /*****************************************************************************/
223 * create a new call record for outgoing calls
225 int rxrpc_create_call(struct rxrpc_connection *conn,
226 rxrpc_call_attn_func_t attn,
227 rxrpc_call_error_func_t error,
228 rxrpc_call_aemap_func_t aemap,
229 struct rxrpc_call **_call)
231 DECLARE_WAITQUEUE(myself, current);
233 struct rxrpc_call *call;
238 /* allocate and initialise a call record */
239 ret = __rxrpc_create_call(conn, &call);
241 _leave(" = %d", ret);
245 call->app_call_state = RXRPC_CSTATE_CLNT_SND_ARGS;
247 call->app_attn_func = attn;
249 call->app_error_func = error;
251 call->app_aemap_func = aemap;
255 spin_lock(&conn->lock);
256 set_current_state(TASK_INTERRUPTIBLE);
257 add_wait_queue(&conn->chanwait, &myself);
260 /* try to find an unused channel */
261 for (cix = 0; cix < 4; cix++)
262 if (!conn->channels[cix])
265 /* no free channels - wait for one to become available */
267 if (signal_pending(current))
270 spin_unlock(&conn->lock);
273 set_current_state(TASK_INTERRUPTIBLE);
275 spin_lock(&conn->lock);
278 /* got a channel - now attach to the connection */
280 remove_wait_queue(&conn->chanwait, &myself);
281 set_current_state(TASK_RUNNING);
283 /* concoct a unique call number */
285 call->call_id = htonl(++conn->call_counter);
286 for (loop = 0; loop < 4; loop++)
287 if (conn->channels[loop] &&
288 conn->channels[loop]->call_id == call->call_id)
291 rxrpc_get_connection(conn);
292 conn->channels[cix] = call; /* assign _after_ done callid check loop */
293 do_gettimeofday(&conn->atime);
294 call->chan_ix = htonl(cix);
296 spin_unlock(&conn->lock);
298 down_write(&rxrpc_calls_sem);
299 list_add_tail(&call->call_link, &rxrpc_calls);
300 up_write(&rxrpc_calls_sem);
302 __RXACCT(atomic_inc(&rxrpc_call_count));
305 _leave(" = 0 (call=%p cix=%u)", call, cix);
309 remove_wait_queue(&conn->chanwait, &myself);
310 set_current_state(TASK_RUNNING);
311 spin_unlock(&conn->lock);
313 free_page((unsigned long) call);
314 _leave(" = %d", ret);
316 } /* end rxrpc_create_call() */
318 /*****************************************************************************/
320 * create a new call record for incoming calls
322 int rxrpc_incoming_call(struct rxrpc_connection *conn,
323 struct rxrpc_message *msg,
324 struct rxrpc_call **_call)
326 struct rxrpc_call *call;
330 cix = ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK;
332 _enter("%p,%u,%u", conn, ntohl(msg->hdr.callNumber), cix);
334 /* allocate and initialise a call record */
335 ret = __rxrpc_create_call(conn, &call);
337 _leave(" = %d", ret);
341 call->pkt_rcv_count = 1;
342 call->app_call_state = RXRPC_CSTATE_SRVR_RCV_OPID;
343 call->app_mark = sizeof(uint32_t);
347 /* attach to the connection */
349 call->chan_ix = htonl(cix);
350 call->call_id = msg->hdr.callNumber;
352 spin_lock(&conn->lock);
354 if (!conn->channels[cix] ||
355 conn->channels[cix]->app_call_state == RXRPC_CSTATE_COMPLETE ||
356 conn->channels[cix]->app_call_state == RXRPC_CSTATE_ERROR
358 conn->channels[cix] = call;
359 rxrpc_get_connection(conn);
363 spin_unlock(&conn->lock);
366 free_page((unsigned long) call);
371 down_write(&rxrpc_calls_sem);
372 list_add_tail(&call->call_link, &rxrpc_calls);
373 up_write(&rxrpc_calls_sem);
374 __RXACCT(atomic_inc(&rxrpc_call_count));
378 _leave(" = %d [%p]", ret, call);
380 } /* end rxrpc_incoming_call() */
382 /*****************************************************************************/
386 void rxrpc_put_call(struct rxrpc_call *call)
388 struct rxrpc_connection *conn = call->conn;
389 struct rxrpc_message *msg;
391 _enter("%p{u=%d}",call,atomic_read(&call->usage));
394 if (atomic_read(&call->usage) <= 0)
397 /* to prevent a race, the decrement and the de-list must be effectively
399 spin_lock(&conn->lock);
400 if (likely(!atomic_dec_and_test(&call->usage))) {
401 spin_unlock(&conn->lock);
406 if (conn->channels[ntohl(call->chan_ix)] == call)
407 conn->channels[ntohl(call->chan_ix)] = NULL;
409 spin_unlock(&conn->lock);
411 wake_up(&conn->chanwait);
413 rxrpc_put_connection(conn);
415 /* clear the timers and dequeue from krxiod */
416 del_timer_sync(&call->acks_timeout);
417 del_timer_sync(&call->rcv_timeout);
418 del_timer_sync(&call->ackr_dfr_timo);
420 rxrpc_krxiod_dequeue_call(call);
422 /* clean up the contents of the struct */
423 if (call->snd_nextmsg)
424 rxrpc_put_message(call->snd_nextmsg);
427 rxrpc_put_message(call->snd_ping);
429 while (!list_empty(&call->acks_pendq)) {
430 msg = list_entry(call->acks_pendq.next,
431 struct rxrpc_message, link);
432 list_del(&msg->link);
433 rxrpc_put_message(msg);
436 while (!list_empty(&call->rcv_receiveq)) {
437 msg = list_entry(call->rcv_receiveq.next,
438 struct rxrpc_message, link);
439 list_del(&msg->link);
440 rxrpc_put_message(msg);
443 while (!list_empty(&call->app_readyq)) {
444 msg = list_entry(call->app_readyq.next,
445 struct rxrpc_message, link);
446 list_del(&msg->link);
447 rxrpc_put_message(msg);
450 while (!list_empty(&call->app_unreadyq)) {
451 msg = list_entry(call->app_unreadyq.next,
452 struct rxrpc_message, link);
453 list_del(&msg->link);
454 rxrpc_put_message(msg);
457 module_put(call->owner);
459 down_write(&rxrpc_calls_sem);
460 list_del(&call->call_link);
461 up_write(&rxrpc_calls_sem);
463 __RXACCT(atomic_dec(&rxrpc_call_count));
464 free_page((unsigned long) call);
466 _leave(" [destroyed]");
467 } /* end rxrpc_put_call() */
469 /*****************************************************************************/
471 * actually generate a normal ACK
473 static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call *call,
476 struct rxrpc_message *msg;
481 /* ACKs default to DELAY */
482 if (!call->ackr.reason)
483 call->ackr.reason = RXRPC_ACK_DELAY;
485 _proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
486 jiffies - call->cjif,
487 ntohs(call->ackr.maxSkew),
488 ntohl(call->ackr.firstPacket),
489 ntohl(call->ackr.previousPacket),
490 ntohl(call->ackr.serial),
491 rxrpc_acks[call->ackr.reason],
494 aux[0] = htonl(call->conn->peer->if_mtu); /* interface MTU */
495 aux[1] = htonl(1444); /* max MTU */
496 aux[2] = htonl(16); /* rwind */
497 aux[3] = htonl(4); /* max packets */
499 diov[0].iov_len = sizeof(struct rxrpc_ackpacket);
500 diov[0].iov_base = &call->ackr;
501 diov[1].iov_len = call->ackr_pend_cnt + 3;
502 diov[1].iov_base = call->ackr_array;
503 diov[2].iov_len = sizeof(aux);
504 diov[2].iov_base = &aux;
506 /* build and send the message */
507 ret = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK,
508 3, diov, GFP_KERNEL, &msg);
513 msg->hdr.seq = htonl(seq);
514 msg->hdr.flags |= RXRPC_SLOW_START_OK;
516 ret = rxrpc_conn_sendmsg(call->conn, msg);
517 rxrpc_put_message(msg);
520 call->pkt_snd_count++;
522 /* count how many actual ACKs there were at the front */
523 for (delta = 0; delta < call->ackr_pend_cnt; delta++)
524 if (call->ackr_array[delta] != RXRPC_ACK_TYPE_ACK)
527 call->ackr_pend_cnt -= delta; /* all ACK'd to this point */
529 /* crank the ACK window around */
531 /* un-ACK'd window */
533 else if (delta < RXRPC_CALL_ACK_WINDOW_SIZE) {
534 /* partially ACK'd window
535 * - shuffle down to avoid losing out-of-sequence packets
537 call->ackr_win_bot += delta;
538 call->ackr_win_top += delta;
540 memmove(&call->ackr_array[0],
541 &call->ackr_array[delta],
542 call->ackr_pend_cnt);
544 memset(&call->ackr_array[call->ackr_pend_cnt],
546 sizeof(call->ackr_array) - call->ackr_pend_cnt);
549 /* fully ACK'd window
550 * - just clear the whole thing
552 memset(&call->ackr_array,
554 sizeof(call->ackr_array));
558 memset(&call->ackr, 0, sizeof(call->ackr));
561 if (!call->app_call_state)
562 printk("___ STATE 0 ___\n");
564 } /* end __rxrpc_call_gen_normal_ACK() */
566 /*****************************************************************************/
568 * note the reception of a packet in the call's ACK records and generate an
569 * appropriate ACK packet if necessary
570 * - returns 0 if packet should be processed, 1 if packet should be ignored
571 * and -ve on an error
573 static int rxrpc_call_generate_ACK(struct rxrpc_call *call,
574 struct rxrpc_header *hdr,
575 struct rxrpc_ackpacket *ack)
577 struct rxrpc_message *msg;
581 u8 special_ACK, do_ACK, force;
583 _enter("%p,%p { seq=%d tp=%d fl=%02x }",
584 call, hdr, ntohl(hdr->seq), hdr->type, hdr->flags);
586 seq = ntohl(hdr->seq);
587 offset = seq - call->ackr_win_bot;
588 do_ACK = RXRPC_ACK_DELAY;
592 if (call->ackr_high_seq < seq)
593 call->ackr_high_seq = seq;
595 /* deal with generation of obvious special ACKs first */
596 if (ack && ack->reason == RXRPC_ACK_PING) {
597 special_ACK = RXRPC_ACK_PING_RESPONSE;
602 if (seq < call->ackr_win_bot) {
603 special_ACK = RXRPC_ACK_DUPLICATE;
608 if (seq >= call->ackr_win_top) {
609 special_ACK = RXRPC_ACK_EXCEEDS_WINDOW;
614 if (call->ackr_array[offset] != RXRPC_ACK_TYPE_NACK) {
615 special_ACK = RXRPC_ACK_DUPLICATE;
620 /* okay... it's a normal data packet inside the ACK window */
621 call->ackr_array[offset] = RXRPC_ACK_TYPE_ACK;
623 if (offset < call->ackr_pend_cnt) {
625 else if (offset > call->ackr_pend_cnt) {
626 do_ACK = RXRPC_ACK_OUT_OF_SEQUENCE;
627 call->ackr_pend_cnt = offset;
631 if (hdr->flags & RXRPC_REQUEST_ACK) {
632 do_ACK = RXRPC_ACK_REQUESTED;
635 /* generate an ACK on the final packet of a reply just received */
636 if (hdr->flags & RXRPC_LAST_PACKET) {
637 if (call->conn->out_clientflag)
640 else if (!(hdr->flags & RXRPC_MORE_PACKETS)) {
641 do_ACK = RXRPC_ACK_REQUESTED;
644 /* re-ACK packets previously received out-of-order */
645 for (offset++; offset < RXRPC_CALL_ACK_WINDOW_SIZE; offset++)
646 if (call->ackr_array[offset] != RXRPC_ACK_TYPE_ACK)
649 call->ackr_pend_cnt = offset;
651 /* generate an ACK if we fill up the window */
652 if (call->ackr_pend_cnt >= RXRPC_CALL_ACK_WINDOW_SIZE)
656 _debug("%05lu ACKs pend=%u norm=%s special=%s%s",
657 jiffies - call->cjif,
660 rxrpc_acks[special_ACK],
661 force ? " immediate" :
662 do_ACK == RXRPC_ACK_REQUESTED ? " merge-req" :
663 hdr->flags & RXRPC_LAST_PACKET ? " finalise" :
667 /* send any pending normal ACKs if need be */
668 if (call->ackr_pend_cnt > 0) {
669 /* fill out the appropriate form */
670 call->ackr.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE);
671 call->ackr.maxSkew = htons(min(call->ackr_high_seq - seq,
673 call->ackr.firstPacket = htonl(call->ackr_win_bot);
674 call->ackr.previousPacket = call->ackr_prev_seq;
675 call->ackr.serial = hdr->serial;
676 call->ackr.nAcks = call->ackr_pend_cnt;
678 if (do_ACK == RXRPC_ACK_REQUESTED)
679 call->ackr.reason = do_ACK;
681 /* generate the ACK immediately if necessary */
682 if (special_ACK || force) {
683 err = __rxrpc_call_gen_normal_ACK(
684 call, do_ACK == RXRPC_ACK_DELAY ? 0 : seq);
692 if (call->ackr.reason == RXRPC_ACK_REQUESTED)
693 call->ackr_dfr_seq = seq;
695 /* start the ACK timer if not running if there are any pending deferred
697 if (call->ackr_pend_cnt > 0 &&
698 call->ackr.reason != RXRPC_ACK_REQUESTED &&
699 !timer_pending(&call->ackr_dfr_timo)
703 timo = rxrpc_call_dfr_ack_timeout + jiffies;
705 _debug("START ACKR TIMER for cj=%lu", timo - call->cjif);
707 spin_lock(&call->lock);
708 mod_timer(&call->ackr_dfr_timo, timo);
709 spin_unlock(&call->lock);
711 else if ((call->ackr_pend_cnt == 0 ||
712 call->ackr.reason == RXRPC_ACK_REQUESTED) &&
713 timer_pending(&call->ackr_dfr_timo)
715 /* stop timer if no pending ACKs */
716 _debug("CLEAR ACKR TIMER");
717 del_timer_sync(&call->ackr_dfr_timo);
720 /* send a special ACK if one is required */
722 struct rxrpc_ackpacket ack;
724 uint8_t acks[1] = { RXRPC_ACK_TYPE_ACK };
726 /* fill out the appropriate form */
727 ack.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE);
728 ack.maxSkew = htons(min(call->ackr_high_seq - seq,
730 ack.firstPacket = htonl(call->ackr_win_bot);
731 ack.previousPacket = call->ackr_prev_seq;
732 ack.serial = hdr->serial;
733 ack.reason = special_ACK;
736 _proto("Rx Sending s-ACK"
737 " { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
739 ntohl(ack.firstPacket),
740 ntohl(ack.previousPacket),
742 rxrpc_acks[ack.reason],
745 diov[0].iov_len = sizeof(struct rxrpc_ackpacket);
746 diov[0].iov_base = &ack;
747 diov[1].iov_len = sizeof(acks);
748 diov[1].iov_base = acks;
750 /* build and send the message */
751 err = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK,
752 hdr->seq ? 2 : 1, diov,
761 msg->hdr.seq = htonl(seq);
762 msg->hdr.flags |= RXRPC_SLOW_START_OK;
764 err = rxrpc_conn_sendmsg(call->conn, msg);
765 rxrpc_put_message(msg);
770 call->pkt_snd_count++;
775 call->ackr_prev_seq = hdr->seq;
777 _leave(" = %d", ret);
779 } /* end rxrpc_call_generate_ACK() */
781 /*****************************************************************************/
783 * handle work to be done on a call
784 * - includes packet reception and timeout processing
786 void rxrpc_call_do_stuff(struct rxrpc_call *call)
788 _enter("%p{flags=%lx}", call, call->flags);
790 /* handle packet reception */
791 if (call->flags & RXRPC_CALL_RCV_PKT) {
792 _debug("- receive packet");
793 call->flags &= ~RXRPC_CALL_RCV_PKT;
794 rxrpc_call_receive_packet(call);
797 /* handle overdue ACKs */
798 if (call->flags & RXRPC_CALL_ACKS_TIMO) {
799 _debug("- overdue ACK timeout");
800 call->flags &= ~RXRPC_CALL_ACKS_TIMO;
801 rxrpc_call_resend(call, call->snd_seq_count);
804 /* handle lack of reception */
805 if (call->flags & RXRPC_CALL_RCV_TIMO) {
806 _debug("- reception timeout");
807 call->flags &= ~RXRPC_CALL_RCV_TIMO;
808 rxrpc_call_abort(call, -EIO);
811 /* handle deferred ACKs */
812 if (call->flags & RXRPC_CALL_ACKR_TIMO ||
813 (call->ackr.nAcks > 0 && call->ackr.reason == RXRPC_ACK_REQUESTED)
815 _debug("- deferred ACK timeout: cj=%05lu r=%s n=%u",
816 jiffies - call->cjif,
817 rxrpc_acks[call->ackr.reason],
820 call->flags &= ~RXRPC_CALL_ACKR_TIMO;
822 if (call->ackr.nAcks > 0 &&
823 call->app_call_state != RXRPC_CSTATE_ERROR) {
825 __rxrpc_call_gen_normal_ACK(call, call->ackr_dfr_seq);
826 call->ackr_dfr_seq = 0;
832 } /* end rxrpc_call_do_stuff() */
834 /*****************************************************************************/
836 * send an abort message at call or connection level
837 * - must be called with call->lock held
838 * - the supplied error code is sent as the packet data
840 static int __rxrpc_call_abort(struct rxrpc_call *call, int errno)
842 struct rxrpc_connection *conn = call->conn;
843 struct rxrpc_message *msg;
848 _enter("%p{%08x},%p{%d},%d",
849 conn, ntohl(conn->conn_id), call, ntohl(call->call_id), errno);
851 /* if this call is already aborted, then just wake up any waiters */
852 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
853 spin_unlock(&call->lock);
854 call->app_error_func(call);
859 rxrpc_get_call(call);
861 /* change the state _with_ the lock still held */
862 call->app_call_state = RXRPC_CSTATE_ERROR;
863 call->app_err_state = RXRPC_ESTATE_LOCAL_ABORT;
864 call->app_errno = errno;
865 call->app_mark = RXRPC_APP_MARK_EOF;
866 call->app_read_buf = NULL;
867 call->app_async_read = 0;
871 /* ask the app to translate the error code */
872 call->app_aemap_func(call);
874 spin_unlock(&call->lock);
876 /* flush any outstanding ACKs */
877 del_timer_sync(&call->acks_timeout);
878 del_timer_sync(&call->rcv_timeout);
879 del_timer_sync(&call->ackr_dfr_timo);
881 if (rxrpc_call_is_ack_pending(call))
882 __rxrpc_call_gen_normal_ACK(call, 0);
884 /* send the abort packet only if we actually traded some other
887 if (call->pkt_snd_count || call->pkt_rcv_count) {
888 /* actually send the abort */
889 _proto("Rx Sending Call ABORT { data=%d }",
890 call->app_abort_code);
892 _error = htonl(call->app_abort_code);
894 diov[0].iov_len = sizeof(_error);
895 diov[0].iov_base = &_error;
897 ret = rxrpc_conn_newmsg(conn, call, RXRPC_PACKET_TYPE_ABORT,
898 1, diov, GFP_KERNEL, &msg);
900 ret = rxrpc_conn_sendmsg(conn, msg);
901 rxrpc_put_message(msg);
905 /* tell the app layer to let go */
906 call->app_error_func(call);
908 rxrpc_put_call(call);
910 _leave(" = %d", ret);
912 } /* end __rxrpc_call_abort() */
914 /*****************************************************************************/
916 * send an abort message at call or connection level
917 * - the supplied error code is sent as the packet data
919 int rxrpc_call_abort(struct rxrpc_call *call, int error)
921 spin_lock(&call->lock);
923 return __rxrpc_call_abort(call, error);
925 } /* end rxrpc_call_abort() */
927 /*****************************************************************************/
929 * process packets waiting for this call
931 static void rxrpc_call_receive_packet(struct rxrpc_call *call)
933 struct rxrpc_message *msg;
934 struct list_head *_p;
938 rxrpc_get_call(call); /* must not go away too soon if aborted by
941 while (!list_empty(&call->rcv_receiveq)) {
942 /* try to get next packet */
944 spin_lock(&call->lock);
945 if (!list_empty(&call->rcv_receiveq)) {
946 _p = call->rcv_receiveq.next;
949 spin_unlock(&call->lock);
954 msg = list_entry(_p, struct rxrpc_message, link);
956 _proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)",
957 jiffies - call->cjif,
958 rxrpc_pkts[msg->hdr.type],
959 ntohl(msg->hdr.serial),
961 msg->hdr.flags & RXRPC_JUMBO_PACKET ? 'j' : '-',
962 msg->hdr.flags & RXRPC_MORE_PACKETS ? 'm' : '-',
963 msg->hdr.flags & RXRPC_LAST_PACKET ? 'l' : '-',
964 msg->hdr.flags & RXRPC_REQUEST_ACK ? 'r' : '-',
965 msg->hdr.flags & RXRPC_CLIENT_INITIATED ? 'C' : 'S'
968 switch (msg->hdr.type) {
969 /* deal with data packets */
970 case RXRPC_PACKET_TYPE_DATA:
971 /* ACK the packet if necessary */
972 switch (rxrpc_call_generate_ACK(call, &msg->hdr,
974 case 0: /* useful packet */
975 rxrpc_call_receive_data_packet(call, msg);
977 case 1: /* duplicate or out-of-window packet */
980 rxrpc_put_message(msg);
985 /* deal with ACK packets */
986 case RXRPC_PACKET_TYPE_ACK:
987 rxrpc_call_receive_ack_packet(call, msg);
990 /* deal with abort packets */
991 case RXRPC_PACKET_TYPE_ABORT: {
994 dp = skb_header_pointer(msg->pkt, msg->offset,
995 sizeof(_dbuf), &_dbuf);
997 printk("Rx Received short ABORT packet\n");
999 _proto("Rx Received Call ABORT { data=%d }",
1000 (dp ? ntohl(*dp) : 0));
1002 spin_lock(&call->lock);
1003 call->app_call_state = RXRPC_CSTATE_ERROR;
1004 call->app_err_state = RXRPC_ESTATE_PEER_ABORT;
1005 call->app_abort_code = (dp ? ntohl(*dp) : 0);
1006 call->app_errno = -ECONNABORTED;
1007 call->app_mark = RXRPC_APP_MARK_EOF;
1008 call->app_read_buf = NULL;
1009 call->app_async_read = 0;
1011 /* ask the app to translate the error code */
1012 call->app_aemap_func(call);
1014 spin_unlock(&call->lock);
1015 call->app_error_func(call);
1019 /* deal with other packet types */
1020 _proto("Rx Unsupported packet type %u (#%u)",
1021 msg->hdr.type, msg->seq);
1025 rxrpc_put_message(msg);
1029 rxrpc_put_call(call);
1031 } /* end rxrpc_call_receive_packet() */
1033 /*****************************************************************************/
1035 * process next data packet
1036 * - as the next data packet arrives:
1037 * - it is queued on app_readyq _if_ it is the next one expected
1039 * - it is queued on app_unreadyq _if_ it is not the next one expected
1040 * - if a packet placed on app_readyq completely fills a hole leading up to
1041 * the first packet on app_unreadyq, then packets now in sequence are
1042 * tranferred to app_readyq
1043 * - the application layer can only see packets on app_readyq
1044 * (app_ready_qty bytes)
1045 * - the application layer is prodded every time a new packet arrives
1047 static void rxrpc_call_receive_data_packet(struct rxrpc_call *call,
1048 struct rxrpc_message *msg)
1050 const struct rxrpc_operation *optbl, *op;
1051 struct rxrpc_message *pmsg;
1052 struct list_head *_p;
1053 int ret, lo, hi, rmtimo;
1056 _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq);
1058 rxrpc_get_message(msg);
1060 /* add to the unready queue if we'd have to create a hole in the ready
1061 * queue otherwise */
1062 if (msg->seq != call->app_ready_seq + 1) {
1063 _debug("Call add packet %d to unreadyq", msg->seq);
1065 /* insert in seq order */
1066 list_for_each(_p, &call->app_unreadyq) {
1067 pmsg = list_entry(_p, struct rxrpc_message, link);
1068 if (pmsg->seq > msg->seq)
1072 list_add_tail(&msg->link, _p);
1074 _leave(" [unreadyq]");
1078 /* next in sequence - simply append into the call's ready queue */
1079 _debug("Call add packet %d to readyq (+%Zd => %Zd bytes)",
1080 msg->seq, msg->dsize, call->app_ready_qty);
1082 spin_lock(&call->lock);
1083 call->app_ready_seq = msg->seq;
1084 call->app_ready_qty += msg->dsize;
1085 list_add_tail(&msg->link, &call->app_readyq);
1087 /* move unready packets to the readyq if we got rid of a hole */
1088 while (!list_empty(&call->app_unreadyq)) {
1089 pmsg = list_entry(call->app_unreadyq.next,
1090 struct rxrpc_message, link);
1092 if (pmsg->seq != call->app_ready_seq + 1)
1095 /* next in sequence - just move list-to-list */
1096 _debug("Call transfer packet %d to readyq (+%Zd => %Zd bytes)",
1097 pmsg->seq, pmsg->dsize, call->app_ready_qty);
1099 call->app_ready_seq = pmsg->seq;
1100 call->app_ready_qty += pmsg->dsize;
1101 list_move_tail(&pmsg->link, &call->app_readyq);
1104 /* see if we've got the last packet yet */
1105 if (!list_empty(&call->app_readyq)) {
1106 pmsg = list_entry(call->app_readyq.prev,
1107 struct rxrpc_message, link);
1108 if (pmsg->hdr.flags & RXRPC_LAST_PACKET) {
1109 call->app_last_rcv = 1;
1110 _debug("Last packet on readyq");
1114 switch (call->app_call_state) {
1115 /* do nothing if call already aborted */
1116 case RXRPC_CSTATE_ERROR:
1117 spin_unlock(&call->lock);
1121 /* extract the operation ID from an incoming call if that's not
1123 case RXRPC_CSTATE_SRVR_RCV_OPID:
1124 spin_unlock(&call->lock);
1126 /* handle as yet insufficient data for the operation ID */
1127 if (call->app_ready_qty < 4) {
1128 if (call->app_last_rcv)
1129 /* trouble - last packet seen */
1130 rxrpc_call_abort(call, -EINVAL);
1136 /* pull the operation ID out of the buffer */
1137 ret = rxrpc_call_read_data(call, &opid, sizeof(opid), 0);
1139 printk("Unexpected error from read-data: %d\n", ret);
1140 if (call->app_call_state != RXRPC_CSTATE_ERROR)
1141 rxrpc_call_abort(call, ret);
1145 call->app_opcode = ntohl(opid);
1147 /* locate the operation in the available ops table */
1148 optbl = call->conn->service->ops_begin;
1150 hi = call->conn->service->ops_end - optbl;
1153 int mid = (hi + lo) / 2;
1155 if (call->app_opcode == op->id)
1157 if (call->app_opcode > op->id)
1164 kproto("Rx Client requested operation %d from %s service",
1165 call->app_opcode, call->conn->service->name);
1166 rxrpc_call_abort(call, -EINVAL);
1171 _proto("Rx Client requested operation %s from %s service",
1172 op->name, call->conn->service->name);
1174 /* we're now waiting for the argument block (unless the call
1176 spin_lock(&call->lock);
1177 if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_OPID ||
1178 call->app_call_state == RXRPC_CSTATE_SRVR_SND_REPLY) {
1179 if (!call->app_last_rcv)
1180 call->app_call_state =
1181 RXRPC_CSTATE_SRVR_RCV_ARGS;
1182 else if (call->app_ready_qty > 0)
1183 call->app_call_state =
1184 RXRPC_CSTATE_SRVR_GOT_ARGS;
1186 call->app_call_state =
1187 RXRPC_CSTATE_SRVR_SND_REPLY;
1188 call->app_mark = op->asize;
1189 call->app_user = op->user;
1191 spin_unlock(&call->lock);
1196 case RXRPC_CSTATE_SRVR_RCV_ARGS:
1197 /* change state if just received last packet of arg block */
1198 if (call->app_last_rcv)
1199 call->app_call_state = RXRPC_CSTATE_SRVR_GOT_ARGS;
1200 spin_unlock(&call->lock);
1205 case RXRPC_CSTATE_CLNT_RCV_REPLY:
1206 /* change state if just received last packet of reply block */
1208 if (call->app_last_rcv) {
1209 call->app_call_state = RXRPC_CSTATE_CLNT_GOT_REPLY;
1212 spin_unlock(&call->lock);
1215 del_timer_sync(&call->acks_timeout);
1216 del_timer_sync(&call->rcv_timeout);
1217 del_timer_sync(&call->ackr_dfr_timo);
1224 /* deal with data reception in an unexpected state */
1225 printk("Unexpected state [[[ %u ]]]\n", call->app_call_state);
1226 __rxrpc_call_abort(call, -EBADMSG);
1231 if (call->app_call_state == RXRPC_CSTATE_CLNT_RCV_REPLY &&
1235 /* otherwise just invoke the data function whenever we can satisfy its desire for more
1238 _proto("Rx Received Op Data: st=%u qty=%Zu mk=%Zu%s",
1239 call->app_call_state, call->app_ready_qty, call->app_mark,
1240 call->app_last_rcv ? " last-rcvd" : "");
1242 spin_lock(&call->lock);
1244 ret = __rxrpc_call_read_data(call);
1247 spin_unlock(&call->lock);
1248 call->app_attn_func(call);
1251 spin_unlock(&call->lock);
1254 spin_unlock(&call->lock);
1257 __rxrpc_call_abort(call, ret);
1265 } /* end rxrpc_call_receive_data_packet() */
1267 /*****************************************************************************/
1269 * received an ACK packet
1271 static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call,
1272 struct rxrpc_message *msg)
1274 struct rxrpc_ackpacket _ack, *ap;
1275 rxrpc_serial_net_t serial;
1279 _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq);
1281 /* extract the basic ACK record */
1282 ap = skb_header_pointer(msg->pkt, msg->offset, sizeof(_ack), &_ack);
1284 printk("Rx Received short ACK packet\n");
1287 msg->offset += sizeof(_ack);
1289 serial = ap->serial;
1290 seq = ntohl(ap->firstPacket);
1292 _proto("Rx Received ACK %%%d { b=%hu m=%hu f=%u p=%u s=%u r=%s n=%u }",
1293 ntohl(msg->hdr.serial),
1294 ntohs(ap->bufferSpace),
1297 ntohl(ap->previousPacket),
1299 rxrpc_acks[ap->reason],
1303 /* check the other side isn't ACK'ing a sequence number I haven't sent
1305 if (ap->nAcks > 0 &&
1306 (seq > call->snd_seq_count ||
1307 seq + ap->nAcks - 1 > call->snd_seq_count)) {
1308 printk("Received ACK (#%u-#%u) for unsent packet\n",
1309 seq, seq + ap->nAcks - 1);
1310 rxrpc_call_abort(call, -EINVAL);
1315 /* deal with RTT calculation */
1317 struct rxrpc_message *rttmsg;
1319 /* find the prompting packet */
1320 spin_lock(&call->lock);
1321 if (call->snd_ping && call->snd_ping->hdr.serial == serial) {
1322 /* it was a ping packet */
1323 rttmsg = call->snd_ping;
1324 call->snd_ping = NULL;
1325 spin_unlock(&call->lock);
1328 rttmsg->rttdone = 1;
1329 rxrpc_peer_calculate_rtt(call->conn->peer,
1331 rxrpc_put_message(rttmsg);
1335 struct list_head *_p;
1337 /* it ought to be a data packet - look in the pending
1339 list_for_each(_p, &call->acks_pendq) {
1340 rttmsg = list_entry(_p, struct rxrpc_message,
1342 if (rttmsg->hdr.serial == serial) {
1343 if (rttmsg->rttdone)
1344 /* never do RTT twice without
1348 rttmsg->rttdone = 1;
1349 rxrpc_peer_calculate_rtt(
1350 call->conn->peer, rttmsg, msg);
1354 spin_unlock(&call->lock);
1358 switch (ap->reason) {
1359 /* deal with negative/positive acknowledgement of data
1361 case RXRPC_ACK_REQUESTED:
1362 case RXRPC_ACK_DELAY:
1363 case RXRPC_ACK_IDLE:
1364 rxrpc_call_definitively_ACK(call, seq - 1);
1366 case RXRPC_ACK_DUPLICATE:
1367 case RXRPC_ACK_OUT_OF_SEQUENCE:
1368 case RXRPC_ACK_EXCEEDS_WINDOW:
1369 call->snd_resend_cnt = 0;
1370 ret = rxrpc_call_record_ACK(call, msg, seq, ap->nAcks);
1372 rxrpc_call_abort(call, ret);
1375 /* respond to ping packets immediately */
1376 case RXRPC_ACK_PING:
1377 rxrpc_call_generate_ACK(call, &msg->hdr, ap);
1380 /* only record RTT on ping response packets */
1381 case RXRPC_ACK_PING_RESPONSE:
1382 if (call->snd_ping) {
1383 struct rxrpc_message *rttmsg;
1385 /* only do RTT stuff if the response matches the
1388 spin_lock(&call->lock);
1389 if (call->snd_ping &&
1390 call->snd_ping->hdr.serial == ap->serial) {
1391 rttmsg = call->snd_ping;
1392 call->snd_ping = NULL;
1394 spin_unlock(&call->lock);
1397 rttmsg->rttdone = 1;
1398 rxrpc_peer_calculate_rtt(call->conn->peer,
1400 rxrpc_put_message(rttmsg);
1406 printk("Unsupported ACK reason %u\n", ap->reason);
1411 } /* end rxrpc_call_receive_ack_packet() */
1413 /*****************************************************************************/
1415 * record definitive ACKs for all messages up to and including the one with the
1418 static void rxrpc_call_definitively_ACK(struct rxrpc_call *call,
1419 rxrpc_seq_t highest)
1421 struct rxrpc_message *msg;
1424 _enter("%p{ads=%u},%u", call, call->acks_dftv_seq, highest);
1426 while (call->acks_dftv_seq < highest) {
1427 call->acks_dftv_seq++;
1429 _proto("Definitive ACK on packet #%u", call->acks_dftv_seq);
1431 /* discard those at front of queue until message with highest
1433 spin_lock(&call->lock);
1435 if (!list_empty(&call->acks_pendq)) {
1436 msg = list_entry(call->acks_pendq.next,
1437 struct rxrpc_message, link);
1438 list_del_init(&msg->link); /* dequeue */
1439 if (msg->state == RXRPC_MSG_SENT)
1440 call->acks_pend_cnt--;
1442 spin_unlock(&call->lock);
1444 /* insanity check */
1446 panic("%s(): acks_pendq unexpectedly empty\n",
1449 if (msg->seq != call->acks_dftv_seq)
1450 panic("%s(): Packet #%u expected at front of acks_pendq"
1452 __FUNCTION__, call->acks_dftv_seq, msg->seq);
1454 /* discard the message */
1455 msg->state = RXRPC_MSG_DONE;
1456 rxrpc_put_message(msg);
1459 /* if all sent packets are definitively ACK'd then prod any sleepers just in case */
1461 spin_lock(&call->lock);
1462 if (call->acks_dftv_seq == call->snd_seq_count) {
1463 if (call->app_call_state != RXRPC_CSTATE_COMPLETE) {
1464 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1469 spin_unlock(&call->lock);
1472 del_timer_sync(&call->acks_timeout);
1473 del_timer_sync(&call->rcv_timeout);
1474 del_timer_sync(&call->ackr_dfr_timo);
1475 call->app_attn_func(call);
1479 } /* end rxrpc_call_definitively_ACK() */
1481 /*****************************************************************************/
1483 * record the specified amount of ACKs/NAKs
1485 static int rxrpc_call_record_ACK(struct rxrpc_call *call,
1486 struct rxrpc_message *msg,
1490 struct rxrpc_message *dmsg;
1491 struct list_head *_p;
1492 rxrpc_seq_t highest;
1495 char resend, now_complete;
1498 _enter("%p{apc=%u ads=%u},%p,%u,%Zu",
1499 call, call->acks_pend_cnt, call->acks_dftv_seq,
1502 /* handle re-ACK'ing of definitively ACK'd packets (may be out-of-order
1504 if (seq <= call->acks_dftv_seq) {
1505 unsigned delta = call->acks_dftv_seq - seq;
1507 if (count <= delta) {
1508 _leave(" = 0 [all definitively ACK'd]");
1514 msg->offset += delta;
1517 highest = seq + count - 1;
1520 /* extract up to 16 ACK slots at a time */
1521 chunk = min(count, sizeof(acks));
1524 memset(acks, 2, sizeof(acks));
1526 if (skb_copy_bits(msg->pkt, msg->offset, &acks, chunk) < 0) {
1527 printk("Rx Received short ACK packet\n");
1528 _leave(" = -EINVAL");
1531 msg->offset += chunk;
1533 /* check that the ACK set is valid */
1534 for (ix = 0; ix < chunk; ix++) {
1536 case RXRPC_ACK_TYPE_ACK:
1538 case RXRPC_ACK_TYPE_NACK:
1542 printk("Rx Received unsupported ACK state"
1544 _leave(" = -EINVAL");
1549 _proto("Rx ACK of packets #%u-#%u "
1550 "[%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c] (pend=%u)",
1551 seq, (unsigned) (seq + chunk - 1),
1552 _acktype[acks[0x0]],
1553 _acktype[acks[0x1]],
1554 _acktype[acks[0x2]],
1555 _acktype[acks[0x3]],
1556 _acktype[acks[0x4]],
1557 _acktype[acks[0x5]],
1558 _acktype[acks[0x6]],
1559 _acktype[acks[0x7]],
1560 _acktype[acks[0x8]],
1561 _acktype[acks[0x9]],
1562 _acktype[acks[0xA]],
1563 _acktype[acks[0xB]],
1564 _acktype[acks[0xC]],
1565 _acktype[acks[0xD]],
1566 _acktype[acks[0xE]],
1567 _acktype[acks[0xF]],
1571 /* mark the packets in the ACK queue as being provisionally
1574 spin_lock(&call->lock);
1576 /* find the first packet ACK'd/NAK'd here */
1577 list_for_each(_p, &call->acks_pendq) {
1578 dmsg = list_entry(_p, struct rxrpc_message, link);
1579 if (dmsg->seq == seq)
1581 _debug("- %u: skipping #%u", ix, dmsg->seq);
1587 _debug("- %u: processing #%u (%c) apc=%u",
1588 ix, dmsg->seq, _acktype[acks[ix]],
1589 call->acks_pend_cnt);
1591 if (acks[ix] == RXRPC_ACK_TYPE_ACK) {
1592 if (dmsg->state == RXRPC_MSG_SENT)
1593 call->acks_pend_cnt--;
1594 dmsg->state = RXRPC_MSG_ACKED;
1597 if (dmsg->state == RXRPC_MSG_ACKED)
1598 call->acks_pend_cnt++;
1599 dmsg->state = RXRPC_MSG_SENT;
1604 _p = dmsg->link.next;
1605 dmsg = list_entry(_p, struct rxrpc_message, link);
1606 } while(ix < chunk &&
1607 _p != &call->acks_pendq &&
1613 spin_unlock(&call->lock);
1617 rxrpc_call_resend(call, highest);
1619 /* if all packets are provisionally ACK'd, then wake up anyone who's
1620 * waiting for that */
1622 spin_lock(&call->lock);
1623 if (call->acks_pend_cnt == 0) {
1624 if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_FINAL_ACK) {
1625 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1630 spin_unlock(&call->lock);
1633 _debug("- wake up waiters");
1634 del_timer_sync(&call->acks_timeout);
1635 del_timer_sync(&call->rcv_timeout);
1636 del_timer_sync(&call->ackr_dfr_timo);
1637 call->app_attn_func(call);
1640 _leave(" = 0 (apc=%u)", call->acks_pend_cnt);
1644 panic("%s(): acks_pendq in bad state (packet #%u absent)\n",
1647 } /* end rxrpc_call_record_ACK() */
1649 /*****************************************************************************/
1651 * transfer data from the ready packet queue to the asynchronous read buffer
1652 * - since this func is the only one going to look at packets queued on
1653 * app_readyq, we don't need a lock to modify or access them, only to modify
1654 * the queue pointers
1655 * - called with call->lock held
1656 * - the buffer must be in kernel space
1658 * 0 if buffer filled
1659 * -EAGAIN if buffer not filled and more data to come
1660 * -EBADMSG if last packet received and insufficient data left
1661 * -ECONNABORTED if the call has in an error state
1663 static int __rxrpc_call_read_data(struct rxrpc_call *call)
1665 struct rxrpc_message *msg;
1669 _enter("%p{as=%d buf=%p qty=%Zu/%Zu}",
1671 call->app_async_read, call->app_read_buf,
1672 call->app_ready_qty, call->app_mark);
1674 /* check the state */
1675 switch (call->app_call_state) {
1676 case RXRPC_CSTATE_SRVR_RCV_ARGS:
1677 case RXRPC_CSTATE_CLNT_RCV_REPLY:
1678 if (call->app_last_rcv) {
1679 printk("%s(%p,%p,%Zd):"
1680 " Inconsistent call state (%s, last pkt)",
1682 call, call->app_read_buf, call->app_mark,
1683 rxrpc_call_states[call->app_call_state]);
1688 case RXRPC_CSTATE_SRVR_RCV_OPID:
1689 case RXRPC_CSTATE_SRVR_GOT_ARGS:
1690 case RXRPC_CSTATE_CLNT_GOT_REPLY:
1693 case RXRPC_CSTATE_SRVR_SND_REPLY:
1694 if (!call->app_last_rcv) {
1695 printk("%s(%p,%p,%Zd):"
1696 " Inconsistent call state (%s, not last pkt)",
1698 call, call->app_read_buf, call->app_mark,
1699 rxrpc_call_states[call->app_call_state]);
1702 _debug("Trying to read data from call in SND_REPLY state");
1705 case RXRPC_CSTATE_ERROR:
1706 _leave(" = -ECONNABORTED");
1707 return -ECONNABORTED;
1710 printk("reading in unexpected state [[[ %u ]]]\n",
1711 call->app_call_state);
1715 /* handle the case of not having an async buffer */
1716 if (!call->app_async_read) {
1717 if (call->app_mark == RXRPC_APP_MARK_EOF) {
1718 ret = call->app_last_rcv ? 0 : -EAGAIN;
1721 if (call->app_mark >= call->app_ready_qty) {
1722 call->app_mark = RXRPC_APP_MARK_EOF;
1726 ret = call->app_last_rcv ? -EBADMSG : -EAGAIN;
1730 _leave(" = %d [no buf]", ret);
1734 while (!list_empty(&call->app_readyq) && call->app_mark > 0) {
1735 msg = list_entry(call->app_readyq.next,
1736 struct rxrpc_message, link);
1738 /* drag as much data as we need out of this packet */
1739 qty = min(call->app_mark, msg->dsize);
1741 _debug("reading %Zu from skb=%p off=%lu",
1742 qty, msg->pkt, msg->offset);
1744 if (call->app_read_buf)
1745 if (skb_copy_bits(msg->pkt, msg->offset,
1746 call->app_read_buf, qty) < 0)
1747 panic("%s: Failed to copy data from packet:"
1750 call, call->app_read_buf, qty);
1752 /* if that packet is now empty, discard it */
1753 call->app_ready_qty -= qty;
1756 if (msg->dsize == 0) {
1757 list_del_init(&msg->link);
1758 rxrpc_put_message(msg);
1764 call->app_mark -= qty;
1765 if (call->app_read_buf)
1766 call->app_read_buf += qty;
1769 if (call->app_mark == 0) {
1770 call->app_async_read = 0;
1771 call->app_mark = RXRPC_APP_MARK_EOF;
1772 call->app_read_buf = NULL;
1774 /* adjust the state if used up all packets */
1775 if (list_empty(&call->app_readyq) && call->app_last_rcv) {
1776 switch (call->app_call_state) {
1777 case RXRPC_CSTATE_SRVR_RCV_OPID:
1778 call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY;
1779 call->app_mark = RXRPC_APP_MARK_EOF;
1781 del_timer_sync(&call->rcv_timeout);
1783 case RXRPC_CSTATE_SRVR_GOT_ARGS:
1784 call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY;
1786 del_timer_sync(&call->rcv_timeout);
1789 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1791 del_timer_sync(&call->acks_timeout);
1792 del_timer_sync(&call->ackr_dfr_timo);
1793 del_timer_sync(&call->rcv_timeout);
1802 if (call->app_last_rcv) {
1803 _debug("Insufficient data (%Zu/%Zu)",
1804 call->app_ready_qty, call->app_mark);
1805 call->app_async_read = 0;
1806 call->app_mark = RXRPC_APP_MARK_EOF;
1807 call->app_read_buf = NULL;
1809 _leave(" = -EBADMSG");
1813 _leave(" = -EAGAIN");
1815 } /* end __rxrpc_call_read_data() */
1817 /*****************************************************************************/
1819 * attempt to read the specified amount of data from the call's ready queue
1820 * into the buffer provided
1821 * - since this func is the only one going to look at packets queued on
1822 * app_readyq, we don't need a lock to modify or access them, only to modify
1823 * the queue pointers
1824 * - if the buffer pointer is NULL, then data is merely drained, not copied
1825 * - if flags&RXRPC_CALL_READ_BLOCK, then the function will wait until there is
1826 * enough data or an error will be generated
1827 * - note that the caller must have added the calling task to the call's wait
1829 * - if flags&RXRPC_CALL_READ_ALL, then an error will be generated if this
1830 * function doesn't read all available data
1832 int rxrpc_call_read_data(struct rxrpc_call *call,
1833 void *buffer, size_t size, int flags)
1837 _enter("%p{arq=%Zu},%p,%Zd,%x",
1838 call, call->app_ready_qty, buffer, size, flags);
1840 spin_lock(&call->lock);
1842 if (unlikely(!!call->app_read_buf)) {
1843 spin_unlock(&call->lock);
1844 _leave(" = -EBUSY");
1848 call->app_mark = size;
1849 call->app_read_buf = buffer;
1850 call->app_async_read = 1;
1851 call->app_read_count++;
1853 /* read as much data as possible */
1854 ret = __rxrpc_call_read_data(call);
1857 if (flags & RXRPC_CALL_READ_ALL &&
1858 (!call->app_last_rcv || call->app_ready_qty > 0)) {
1859 _leave(" = -EBADMSG");
1860 __rxrpc_call_abort(call, -EBADMSG);
1864 spin_unlock(&call->lock);
1865 call->app_attn_func(call);
1870 spin_unlock(&call->lock);
1871 _leave(" = %d [aborted]", ret);
1875 __rxrpc_call_abort(call, ret);
1876 _leave(" = %d", ret);
1880 spin_unlock(&call->lock);
1882 if (!(flags & RXRPC_CALL_READ_BLOCK)) {
1883 _leave(" = -EAGAIN");
1887 /* wait for the data to arrive */
1888 _debug("blocking for data arrival");
1891 set_current_state(TASK_INTERRUPTIBLE);
1892 if (!call->app_async_read || signal_pending(current))
1896 set_current_state(TASK_RUNNING);
1898 if (signal_pending(current)) {
1899 _leave(" = -EINTR");
1903 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
1904 _leave(" = -ECONNABORTED");
1905 return -ECONNABORTED;
1912 } /* end rxrpc_call_read_data() */
1914 /*****************************************************************************/
1916 * write data to a call
1917 * - the data may not be sent immediately if it doesn't fill a buffer
1918 * - if we can't queue all the data for buffering now, siov[] will have been
1919 * adjusted to take account of what has been sent
1921 int rxrpc_call_write_data(struct rxrpc_call *call,
1929 struct rxrpc_message *msg;
1931 size_t space, size, chunk, tmp;
1935 _enter("%p,%Zu,%p,%02x,%x,%d,%p",
1936 call, sioc, siov, rxhdr_flags, alloc_flags, dup_data,
1943 /* can't send more if we've sent last packet from this end */
1944 switch (call->app_call_state) {
1945 case RXRPC_CSTATE_SRVR_SND_REPLY:
1946 case RXRPC_CSTATE_CLNT_SND_ARGS:
1948 case RXRPC_CSTATE_ERROR:
1949 ret = call->app_errno;
1954 /* calculate how much data we've been given */
1956 for (; sioc > 0; sptr++, sioc--) {
1960 if (!sptr->iov_base)
1963 size += sptr->iov_len;
1966 _debug("- size=%Zu mtu=%Zu", size, call->conn->mtu_size);
1969 /* make sure there's a message under construction */
1970 if (!call->snd_nextmsg) {
1971 /* no - allocate a message with no data yet attached */
1972 ret = rxrpc_conn_newmsg(call->conn, call,
1973 RXRPC_PACKET_TYPE_DATA,
1974 0, NULL, alloc_flags,
1975 &call->snd_nextmsg);
1978 _debug("- allocated new message [ds=%Zu]",
1979 call->snd_nextmsg->dsize);
1982 msg = call->snd_nextmsg;
1983 msg->hdr.flags |= rxhdr_flags;
1985 /* deal with zero-length terminal packet */
1987 if (rxhdr_flags & RXRPC_LAST_PACKET) {
1988 ret = rxrpc_call_flush(call);
1995 /* work out how much space current packet has available */
1996 space = call->conn->mtu_size - msg->dsize;
1997 chunk = min(space, size);
1999 _debug("- [before] space=%Zu chunk=%Zu", space, chunk);
2001 while (!siov->iov_len)
2004 /* if we are going to have to duplicate the data then coalesce
2007 /* don't allocate more that 1 page at a time */
2008 if (chunk > PAGE_SIZE)
2011 /* allocate a data buffer and attach to the message */
2012 buf = kmalloc(chunk, alloc_flags);
2013 if (unlikely(!buf)) {
2015 sizeof(struct rxrpc_header)) {
2016 /* discard an empty msg and wind back
2017 * the seq counter */
2018 rxrpc_put_message(msg);
2019 call->snd_nextmsg = NULL;
2020 call->snd_seq_count--;
2027 tmp = msg->dcount++;
2028 set_bit(tmp, &msg->dfree);
2029 msg->data[tmp].iov_base = buf;
2030 msg->data[tmp].iov_len = chunk;
2031 msg->dsize += chunk;
2032 *size_sent += chunk;
2035 /* load the buffer with data */
2037 tmp = min(chunk, siov->iov_len);
2038 memcpy(buf, siov->iov_base, tmp);
2040 siov->iov_base += tmp;
2041 siov->iov_len -= tmp;
2048 /* we want to attach the supplied buffers directly */
2050 msg->dcount < RXRPC_MSG_MAX_IOCS) {
2051 tmp = msg->dcount++;
2052 msg->data[tmp].iov_base = siov->iov_base;
2053 msg->data[tmp].iov_len = siov->iov_len;
2054 msg->dsize += siov->iov_len;
2055 *size_sent += siov->iov_len;
2056 size -= siov->iov_len;
2057 chunk -= siov->iov_len;
2062 _debug("- [loaded] chunk=%Zu size=%Zu", chunk, size);
2064 /* dispatch the message when full, final or requesting ACK */
2065 if (msg->dsize >= call->conn->mtu_size || rxhdr_flags) {
2066 ret = rxrpc_call_flush(call);
2075 _leave(" = %d (%Zd queued, %Zd rem)", ret, *size_sent, size);
2078 } /* end rxrpc_call_write_data() */
2080 /*****************************************************************************/
2082 * flush outstanding packets to the network
2084 static int rxrpc_call_flush(struct rxrpc_call *call)
2086 struct rxrpc_message *msg;
2091 rxrpc_get_call(call);
2093 /* if there's a packet under construction, then dispatch it now */
2094 if (call->snd_nextmsg) {
2095 msg = call->snd_nextmsg;
2096 call->snd_nextmsg = NULL;
2098 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2099 msg->hdr.flags &= ~RXRPC_MORE_PACKETS;
2100 if (call->app_call_state != RXRPC_CSTATE_CLNT_SND_ARGS)
2101 msg->hdr.flags |= RXRPC_REQUEST_ACK;
2104 msg->hdr.flags |= RXRPC_MORE_PACKETS;
2107 _proto("Sending DATA message { ds=%Zu dc=%u df=%02lu }",
2108 msg->dsize, msg->dcount, msg->dfree);
2110 /* queue and adjust call state */
2111 spin_lock(&call->lock);
2112 list_add_tail(&msg->link, &call->acks_pendq);
2114 /* decide what to do depending on current state and if this is
2115 * the last packet */
2117 switch (call->app_call_state) {
2118 case RXRPC_CSTATE_SRVR_SND_REPLY:
2119 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2120 call->app_call_state =
2121 RXRPC_CSTATE_SRVR_RCV_FINAL_ACK;
2126 case RXRPC_CSTATE_CLNT_SND_ARGS:
2127 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2128 call->app_call_state =
2129 RXRPC_CSTATE_CLNT_RCV_REPLY;
2134 case RXRPC_CSTATE_ERROR:
2135 ret = call->app_errno;
2137 spin_unlock(&call->lock);
2141 call->acks_pend_cnt++;
2143 mod_timer(&call->acks_timeout,
2144 __rxrpc_rtt_based_timeout(call,
2145 rxrpc_call_acks_timeout));
2147 spin_unlock(&call->lock);
2149 ret = rxrpc_conn_sendmsg(call->conn, msg);
2151 call->pkt_snd_count++;
2155 rxrpc_put_call(call);
2157 _leave(" = %d", ret);
2160 } /* end rxrpc_call_flush() */
2162 /*****************************************************************************/
2164 * resend NAK'd or unacknowledged packets up to the highest one specified
2166 static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest)
2168 struct rxrpc_message *msg;
2169 struct list_head *_p;
2170 rxrpc_seq_t seq = 0;
2172 _enter("%p,%u", call, highest);
2174 _proto("Rx Resend required");
2176 /* handle too many resends */
2177 if (call->snd_resend_cnt >= rxrpc_call_max_resend) {
2178 _debug("Aborting due to too many resends (rcv=%d)",
2179 call->pkt_rcv_count);
2180 rxrpc_call_abort(call,
2181 call->pkt_rcv_count > 0 ? -EIO : -ETIMEDOUT);
2186 spin_lock(&call->lock);
2187 call->snd_resend_cnt++;
2189 /* determine which the next packet we might need to ACK is */
2190 if (seq <= call->acks_dftv_seq)
2191 seq = call->acks_dftv_seq;
2197 /* look for the packet in the pending-ACK queue */
2198 list_for_each(_p, &call->acks_pendq) {
2199 msg = list_entry(_p, struct rxrpc_message, link);
2200 if (msg->seq == seq)
2205 " Inconsistent pending-ACK queue (ds=%u sc=%u sq=%u)\n",
2206 __FUNCTION__, call, highest,
2207 call->acks_dftv_seq, call->snd_seq_count, seq);
2210 if (msg->state != RXRPC_MSG_SENT)
2211 continue; /* only un-ACK'd packets */
2213 rxrpc_get_message(msg);
2214 spin_unlock(&call->lock);
2216 /* send each message again (and ignore any errors we might
2218 _proto("Resending DATA message { ds=%Zu dc=%u df=%02lu }",
2219 msg->dsize, msg->dcount, msg->dfree);
2221 if (rxrpc_conn_sendmsg(call->conn, msg) == 0)
2222 call->pkt_snd_count++;
2224 rxrpc_put_message(msg);
2226 spin_lock(&call->lock);
2229 /* reset the timeout */
2230 mod_timer(&call->acks_timeout,
2231 __rxrpc_rtt_based_timeout(call, rxrpc_call_acks_timeout));
2233 spin_unlock(&call->lock);
2236 } /* end rxrpc_call_resend() */
2238 /*****************************************************************************/
2240 * handle an ICMP error being applied to a call
2242 void rxrpc_call_handle_error(struct rxrpc_call *call, int local, int errno)
2244 _enter("%p{%u},%d", call, ntohl(call->call_id), errno);
2246 /* if this call is already aborted, then just wake up any waiters */
2247 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
2248 call->app_error_func(call);
2251 /* tell the app layer what happened */
2252 spin_lock(&call->lock);
2253 call->app_call_state = RXRPC_CSTATE_ERROR;
2256 call->app_err_state = RXRPC_ESTATE_LOCAL_ERROR;
2258 call->app_err_state = RXRPC_ESTATE_REMOTE_ERROR;
2259 call->app_errno = errno;
2260 call->app_mark = RXRPC_APP_MARK_EOF;
2261 call->app_read_buf = NULL;
2262 call->app_async_read = 0;
2265 call->app_aemap_func(call);
2267 del_timer_sync(&call->acks_timeout);
2268 del_timer_sync(&call->rcv_timeout);
2269 del_timer_sync(&call->ackr_dfr_timo);
2271 spin_unlock(&call->lock);
2273 call->app_error_func(call);
2277 } /* end rxrpc_call_handle_error() */