1 /* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
 
   3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
 
   4  * Written by David Howells (dhowells@redhat.com)
 
   6  * This program is free software; you can redistribute it and/or
 
   7  * modify it under the terms of the GNU General Public License
 
   8  * as published by the Free Software Foundation; either version
 
   9  * 2 of the License, or (at your option) any later version.
 
  12 #include <linux/module.h>
 
  13 #include <linux/circ_buf.h>
 
  14 #include <linux/net.h>
 
  15 #include <linux/skbuff.h>
 
  16 #include <linux/udp.h>
 
  18 #include <net/af_rxrpc.h>
 
  19 #include "ar-internal.h"
 
  21 static unsigned rxrpc_ack_defer = 1;
 
  23 static const char *rxrpc_acks[] = {
 
  24         "---", "REQ", "DUP", "OOS", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",
 
  28 static const s8 rxrpc_ack_priority[] = {
 
  30         [RXRPC_ACK_DELAY]               = 1,
 
  31         [RXRPC_ACK_REQUESTED]           = 2,
 
  33         [RXRPC_ACK_PING_RESPONSE]       = 4,
 
  34         [RXRPC_ACK_DUPLICATE]           = 5,
 
  35         [RXRPC_ACK_OUT_OF_SEQUENCE]     = 6,
 
  36         [RXRPC_ACK_EXCEEDS_WINDOW]      = 7,
 
  37         [RXRPC_ACK_NOSPACE]             = 8,
 
  41  * propose an ACK be sent
 
  43 void __rxrpc_propose_ACK(struct rxrpc_call *call, uint8_t ack_reason,
 
  44                          __be32 serial, bool immediate)
 
  47         s8 prior = rxrpc_ack_priority[ack_reason];
 
  49         ASSERTCMP(prior, >, 0);
 
  51         _enter("{%d},%s,%%%x,%u",
 
  52                call->debug_id, rxrpc_acks[ack_reason], ntohl(serial),
 
  55         if (prior < rxrpc_ack_priority[call->ackr_reason]) {
 
  61         /* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
 
  63         if (prior == rxrpc_ack_priority[call->ackr_reason]) {
 
  65                         call->ackr_serial = serial;
 
  71         call->ackr_reason = ack_reason;
 
  72         call->ackr_serial = serial;
 
  76                 _debug("run delay timer");
 
  77                 call->ack_timer.expires = jiffies + rxrpc_ack_timeout * HZ;
 
  78                 add_timer(&call->ack_timer);
 
  83                         _debug("run defer timer");
 
  89         case RXRPC_ACK_REQUESTED:
 
  92                 if (!immediate || serial == cpu_to_be32(1)) {
 
  93                         _debug("run defer timer");
 
  94                         expiry = rxrpc_ack_defer;
 
  99                 _debug("immediate ACK");
 
 105         if (!timer_pending(&call->ack_timer) ||
 
 106             time_after(call->ack_timer.expires, expiry))
 
 107                 mod_timer(&call->ack_timer, expiry);
 
 111         _debug("cancel timer %%%u", ntohl(serial));
 
 112         try_to_del_timer_sync(&call->ack_timer);
 
 113         read_lock_bh(&call->state_lock);
 
 114         if (call->state <= RXRPC_CALL_COMPLETE &&
 
 115             !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
 
 116                 rxrpc_queue_call(call);
 
 117         read_unlock_bh(&call->state_lock);
 
 121  * propose an ACK be sent, locking the call structure
 
 123 void rxrpc_propose_ACK(struct rxrpc_call *call, uint8_t ack_reason,
 
 124                        __be32 serial, bool immediate)
 
 126         s8 prior = rxrpc_ack_priority[ack_reason];
 
 128         if (prior > rxrpc_ack_priority[call->ackr_reason]) {
 
 129                 spin_lock_bh(&call->lock);
 
 130                 __rxrpc_propose_ACK(call, ack_reason, serial, immediate);
 
 131                 spin_unlock_bh(&call->lock);
 
 136  * set the resend timer
 
 138 static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
 
 139                              unsigned long resend_at)
 
 141         read_lock_bh(&call->state_lock);
 
 142         if (call->state >= RXRPC_CALL_COMPLETE)
 
 146                 _debug("SET RESEND");
 
 147                 set_bit(RXRPC_CALL_RESEND, &call->events);
 
 151                 _debug("MODIFY RESEND TIMER");
 
 152                 set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 
 153                 mod_timer(&call->resend_timer, resend_at);
 
 155                 _debug("KILL RESEND TIMER");
 
 156                 del_timer_sync(&call->resend_timer);
 
 157                 clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
 
 158                 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 
 160         read_unlock_bh(&call->state_lock);
 
 166 static void rxrpc_resend(struct rxrpc_call *call)
 
 168         struct rxrpc_skb_priv *sp;
 
 169         struct rxrpc_header *hdr;
 
 171         unsigned long *p_txb, resend_at;
 
 175         _enter("{%d,%d,%d,%d},",
 
 176                call->acks_hard, call->acks_unacked,
 
 177                atomic_read(&call->sequence),
 
 178                CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
 
 184         for (loop = call->acks_tail;
 
 185              loop != call->acks_head || stop;
 
 186              loop = (loop + 1) &  (call->acks_winsz - 1)
 
 188                 p_txb = call->acks_window + loop;
 
 189                 smp_read_barrier_depends();
 
 193                 txb = (struct sk_buff *) *p_txb;
 
 196                 if (sp->need_resend) {
 
 199                         /* each Tx packet has a new serial number */
 
 201                                 htonl(atomic_inc_return(&call->conn->serial));
 
 203                         hdr = (struct rxrpc_header *) txb->head;
 
 204                         hdr->serial = sp->hdr.serial;
 
 206                         _proto("Tx DATA %%%u { #%d }",
 
 207                                ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
 
 208                         if (rxrpc_send_packet(call->conn->trans, txb) < 0) {
 
 210                                 sp->resend_at = jiffies + 3;
 
 213                                         jiffies + rxrpc_resend_timeout * HZ;
 
 217                 if (time_after_eq(jiffies + 1, sp->resend_at)) {
 
 220                 } else if (resend & 2) {
 
 221                         if (time_before(sp->resend_at, resend_at))
 
 222                                 resend_at = sp->resend_at;
 
 224                         resend_at = sp->resend_at;
 
 229         rxrpc_set_resend(call, resend, resend_at);
 
 234  * handle resend timer expiry
 
 236 static void rxrpc_resend_timer(struct rxrpc_call *call)
 
 238         struct rxrpc_skb_priv *sp;
 
 240         unsigned long *p_txb, resend_at;
 
 245                call->acks_tail, call->acks_unacked, call->acks_head);
 
 250         for (loop = call->acks_unacked;
 
 251              loop != call->acks_head;
 
 252              loop = (loop + 1) &  (call->acks_winsz - 1)
 
 254                 p_txb = call->acks_window + loop;
 
 255                 smp_read_barrier_depends();
 
 256                 txb = (struct sk_buff *) (*p_txb & ~1);
 
 259                 ASSERT(!(*p_txb & 1));
 
 261                 if (sp->need_resend) {
 
 263                 } else if (time_after_eq(jiffies + 1, sp->resend_at)) {
 
 266                 } else if (resend & 2) {
 
 267                         if (time_before(sp->resend_at, resend_at))
 
 268                                 resend_at = sp->resend_at;
 
 270                         resend_at = sp->resend_at;
 
 275         rxrpc_set_resend(call, resend, resend_at);
 
 280  * process soft ACKs of our transmitted packets
 
 281  * - these indicate packets the peer has or has not received, but hasn't yet
 
 282  *   given to the consumer, and so can still be discarded and re-requested
 
 284 static int rxrpc_process_soft_ACKs(struct rxrpc_call *call,
 
 285                                    struct rxrpc_ackpacket *ack,
 
 288         struct rxrpc_skb_priv *sp;
 
 290         unsigned long *p_txb, resend_at;
 
 292         u8 sacks[RXRPC_MAXACKS], resend;
 
 294         _enter("{%d,%d},{%d},",
 
 296                CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz),
 
 299         if (skb_copy_bits(skb, 0, sacks, ack->nAcks) < 0)
 
 304         for (loop = 0; loop < ack->nAcks; loop++) {
 
 305                 p_txb = call->acks_window;
 
 306                 p_txb += (call->acks_tail + loop) & (call->acks_winsz - 1);
 
 307                 smp_read_barrier_depends();
 
 308                 txb = (struct sk_buff *) (*p_txb & ~1);
 
 311                 switch (sacks[loop]) {
 
 312                 case RXRPC_ACK_TYPE_ACK:
 
 316                 case RXRPC_ACK_TYPE_NACK:
 
 322                         _debug("Unsupported ACK type %d", sacks[loop]);
 
 328         call->acks_unacked = (call->acks_tail + loop) & (call->acks_winsz - 1);
 
 330         /* anything not explicitly ACK'd is implicitly NACK'd, but may just not
 
 331          * have been received or processed yet by the far end */
 
 332         for (loop = call->acks_unacked;
 
 333              loop != call->acks_head;
 
 334              loop = (loop + 1) &  (call->acks_winsz - 1)
 
 336                 p_txb = call->acks_window + loop;
 
 337                 smp_read_barrier_depends();
 
 338                 txb = (struct sk_buff *) (*p_txb & ~1);
 
 342                         /* packet must have been discarded */
 
 346                 } else if (sp->need_resend) {
 
 348                 } else if (time_after_eq(jiffies + 1, sp->resend_at)) {
 
 351                 } else if (resend & 2) {
 
 352                         if (time_before(sp->resend_at, resend_at))
 
 353                                 resend_at = sp->resend_at;
 
 355                         resend_at = sp->resend_at;
 
 360         rxrpc_set_resend(call, resend, resend_at);
 
 365         _leave(" = -EPROTO");
 
 370  * discard hard-ACK'd packets from the Tx window
 
 372 static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard)
 
 374         struct rxrpc_skb_priv *sp;
 
 376         int tail = call->acks_tail, old_tail;
 
 377         int win = CIRC_CNT(call->acks_head, tail, call->acks_winsz);
 
 379         _enter("{%u,%u},%u", call->acks_hard, win, hard);
 
 381         ASSERTCMP(hard - call->acks_hard, <=, win);
 
 383         while (call->acks_hard < hard) {
 
 384                 smp_read_barrier_depends();
 
 385                 _skb = call->acks_window[tail] & ~1;
 
 386                 sp = rxrpc_skb((struct sk_buff *) _skb);
 
 387                 rxrpc_free_skb((struct sk_buff *) _skb);
 
 389                 tail = (tail + 1) & (call->acks_winsz - 1);
 
 390                 call->acks_tail = tail;
 
 391                 if (call->acks_unacked == old_tail)
 
 392                         call->acks_unacked = tail;
 
 396         wake_up(&call->tx_waitq);
 
 400  * clear the Tx window in the event of a failure
 
 402 static void rxrpc_clear_tx_window(struct rxrpc_call *call)
 
 404         rxrpc_rotate_tx_window(call, atomic_read(&call->sequence));
 
 408  * drain the out of sequence received packet queue into the packet Rx queue
 
 410 static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call)
 
 412         struct rxrpc_skb_priv *sp;
 
 417         _enter("{%d,%d}", call->rx_data_post, call->rx_first_oos);
 
 419         spin_lock_bh(&call->lock);
 
 422         if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 
 423                 goto socket_unavailable;
 
 425         skb = skb_dequeue(&call->rx_oos_queue);
 
 429                 _debug("drain OOS packet %d [%d]",
 
 430                        ntohl(sp->hdr.seq), call->rx_first_oos);
 
 432                 if (ntohl(sp->hdr.seq) != call->rx_first_oos) {
 
 433                         skb_queue_head(&call->rx_oos_queue, skb);
 
 434                         call->rx_first_oos = ntohl(rxrpc_skb(skb)->hdr.seq);
 
 435                         _debug("requeue %p {%u}", skb, call->rx_first_oos);
 
 437                         skb->mark = RXRPC_SKB_MARK_DATA;
 
 438                         terminal = ((sp->hdr.flags & RXRPC_LAST_PACKET) &&
 
 439                                 !(sp->hdr.flags & RXRPC_CLIENT_INITIATED));
 
 440                         ret = rxrpc_queue_rcv_skb(call, skb, true, terminal);
 
 442                         _debug("drain #%u", call->rx_data_post);
 
 443                         call->rx_data_post++;
 
 445                         /* find out what the next packet is */
 
 446                         skb = skb_peek(&call->rx_oos_queue);
 
 449                                         ntohl(rxrpc_skb(skb)->hdr.seq);
 
 451                                 call->rx_first_oos = 0;
 
 452                         _debug("peek %p {%u}", skb, call->rx_first_oos);
 
 458         spin_unlock_bh(&call->lock);
 
 459         _leave(" = %d", ret);
 
 464  * insert an out of sequence packet into the buffer
 
 466 static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
 
 469         struct rxrpc_skb_priv *sp, *psp;
 
 474         seq = ntohl(sp->hdr.seq);
 
 475         _enter(",,{%u}", seq);
 
 477         skb->destructor = rxrpc_packet_destructor;
 
 478         ASSERTCMP(sp->call, ==, NULL);
 
 480         rxrpc_get_call(call);
 
 482         /* insert into the buffer in sequence order */
 
 483         spin_lock_bh(&call->lock);
 
 485         skb_queue_walk(&call->rx_oos_queue, p) {
 
 487                 if (ntohl(psp->hdr.seq) > seq) {
 
 488                         _debug("insert oos #%u before #%u",
 
 489                                seq, ntohl(psp->hdr.seq));
 
 490                         skb_insert(p, skb, &call->rx_oos_queue);
 
 495         _debug("append oos #%u", seq);
 
 496         skb_queue_tail(&call->rx_oos_queue, skb);
 
 499         /* we might now have a new front to the queue */
 
 500         if (call->rx_first_oos == 0 || seq < call->rx_first_oos)
 
 501                 call->rx_first_oos = seq;
 
 503         read_lock(&call->state_lock);
 
 504         if (call->state < RXRPC_CALL_COMPLETE &&
 
 505             call->rx_data_post == call->rx_first_oos) {
 
 506                 _debug("drain rx oos now");
 
 507                 set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
 
 509         read_unlock(&call->state_lock);
 
 511         spin_unlock_bh(&call->lock);
 
 512         _leave(" [stored #%u]", call->rx_first_oos);
 
 516  * clear the Tx window on final ACK reception
 
 518 static void rxrpc_zap_tx_window(struct rxrpc_call *call)
 
 520         struct rxrpc_skb_priv *sp;
 
 522         unsigned long _skb, *acks_window;
 
 523         uint8_t winsz = call->acks_winsz;
 
 526         acks_window = call->acks_window;
 
 527         call->acks_window = NULL;
 
 529         while (CIRC_CNT(call->acks_head, call->acks_tail, winsz) > 0) {
 
 530                 tail = call->acks_tail;
 
 531                 smp_read_barrier_depends();
 
 532                 _skb = acks_window[tail] & ~1;
 
 534                 call->acks_tail = (call->acks_tail + 1) & (winsz - 1);
 
 536                 skb = (struct sk_buff *) _skb;
 
 538                 _debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
 
 546  * process the extra information that may be appended to an ACK packet
 
 548 static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
 
 549                                   unsigned latest, int nAcks)
 
 551         struct rxrpc_ackinfo ackinfo;
 
 552         struct rxrpc_peer *peer;
 
 555         if (skb_copy_bits(skb, nAcks + 3, &ackinfo, sizeof(ackinfo)) < 0) {
 
 556                 _leave(" [no ackinfo]");
 
 560         _proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }",
 
 562                ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU),
 
 563                ntohl(ackinfo.rwind), ntohl(ackinfo.jumbo_max));
 
 565         mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
 
 567         peer = call->conn->trans->peer;
 
 568         if (mtu < peer->maxdata) {
 
 569                 spin_lock_bh(&peer->lock);
 
 571                 peer->mtu = mtu + peer->hdrsize;
 
 572                 spin_unlock_bh(&peer->lock);
 
 573                 _net("Net MTU %u (maxdata %u)", peer->mtu, peer->maxdata);
 
 578  * process packets in the reception queue
 
 580 static int rxrpc_process_rx_queue(struct rxrpc_call *call,
 
 583         struct rxrpc_ackpacket ack;
 
 584         struct rxrpc_skb_priv *sp;
 
 593         skb = skb_dequeue(&call->rx_queue);
 
 597         _net("deferred skb %p", skb);
 
 601         _debug("process %s [st %d]", rxrpc_pkts[sp->hdr.type], call->state);
 
 605         switch (sp->hdr.type) {
 
 606                 /* data packets that wind up here have been received out of
 
 607                  * order, need security processing or are jumbo packets */
 
 608         case RXRPC_PACKET_TYPE_DATA:
 
 609                 _proto("OOSQ DATA %%%u { #%u }",
 
 610                        ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
 
 612                 /* secured packets must be verified and possibly decrypted */
 
 613                 if (rxrpc_verify_packet(call, skb, _abort_code) < 0)
 
 616                 rxrpc_insert_oos_packet(call, skb);
 
 617                 goto process_further;
 
 619                 /* partial ACK to process */
 
 620         case RXRPC_PACKET_TYPE_ACK:
 
 621                 if (skb_copy_bits(skb, 0, &ack, sizeof(ack)) < 0) {
 
 622                         _debug("extraction failure");
 
 625                 if (!skb_pull(skb, sizeof(ack)))
 
 628                 latest = ntohl(sp->hdr.serial);
 
 629                 hard = ntohl(ack.firstPacket);
 
 630                 tx = atomic_read(&call->sequence);
 
 632                 _proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
 
 636                        ntohl(ack.previousPacket),
 
 638                        rxrpc_acks[ack.reason],
 
 641                 rxrpc_extract_ackinfo(call, skb, latest, ack.nAcks);
 
 643                 if (ack.reason == RXRPC_ACK_PING) {
 
 644                         _proto("Rx ACK %%%u PING Request", latest);
 
 645                         rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
 
 646                                           sp->hdr.serial, true);
 
 649                 /* discard any out-of-order or duplicate ACKs */
 
 650                 if (latest - call->acks_latest <= 0) {
 
 651                         _debug("discard ACK %d <= %d",
 
 652                                latest, call->acks_latest);
 
 655                 call->acks_latest = latest;
 
 657                 if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
 
 658                     call->state != RXRPC_CALL_CLIENT_AWAIT_REPLY &&
 
 659                     call->state != RXRPC_CALL_SERVER_SEND_REPLY &&
 
 660                     call->state != RXRPC_CALL_SERVER_AWAIT_ACK)
 
 663                 _debug("Tx=%d H=%u S=%d", tx, call->acks_hard, call->state);
 
 667                                 _debug("hard-ACK'd packet %d not transmitted"
 
 673                         if ((call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY ||
 
 674                              call->state == RXRPC_CALL_SERVER_AWAIT_ACK) &&
 
 679                         rxrpc_rotate_tx_window(call, hard - 1);
 
 683                         if (hard - 1 + ack.nAcks > tx) {
 
 684                                 _debug("soft-ACK'd packet %d+%d not"
 
 685                                        " transmitted (%d top)",
 
 686                                        hard - 1, ack.nAcks, tx);
 
 690                         if (rxrpc_process_soft_ACKs(call, &ack, skb) < 0)
 
 695                 /* complete ACK to process */
 
 696         case RXRPC_PACKET_TYPE_ACKALL:
 
 699                 /* abort and busy are handled elsewhere */
 
 700         case RXRPC_PACKET_TYPE_BUSY:
 
 701         case RXRPC_PACKET_TYPE_ABORT:
 
 704                 /* connection level events - also handled elsewhere */
 
 705         case RXRPC_PACKET_TYPE_CHALLENGE:
 
 706         case RXRPC_PACKET_TYPE_RESPONSE:
 
 707         case RXRPC_PACKET_TYPE_DEBUG:
 
 711         /* if we've had a hard ACK that covers all the packets we've sent, then
 
 712          * that ends that phase of the operation */
 
 714         write_lock_bh(&call->state_lock);
 
 715         _debug("ack all %d", call->state);
 
 717         switch (call->state) {
 
 718         case RXRPC_CALL_CLIENT_AWAIT_REPLY:
 
 719                 call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
 
 721         case RXRPC_CALL_SERVER_AWAIT_ACK:
 
 722                 _debug("srv complete");
 
 723                 call->state = RXRPC_CALL_COMPLETE;
 
 726         case RXRPC_CALL_CLIENT_SEND_REQUEST:
 
 727         case RXRPC_CALL_SERVER_RECV_REQUEST:
 
 728                 goto protocol_error_unlock; /* can't occur yet */
 
 730                 write_unlock_bh(&call->state_lock);
 
 731                 goto discard; /* assume packet left over from earlier phase */
 
 734         write_unlock_bh(&call->state_lock);
 
 736         /* if all the packets we sent are hard-ACK'd, then we can discard
 
 737          * whatever we've got left */
 
 738         _debug("clear Tx %d",
 
 739                CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
 
 741         del_timer_sync(&call->resend_timer);
 
 742         clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 
 743         clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
 
 745         if (call->acks_window)
 
 746                 rxrpc_zap_tx_window(call);
 
 749                 /* post the final ACK message for userspace to pick up */
 
 751                 skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
 
 753                 rxrpc_get_call(call);
 
 754                 spin_lock_bh(&call->lock);
 
 755                 if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
 
 757                 spin_unlock_bh(&call->lock);
 
 758                 goto process_further;
 
 763         goto process_further;
 
 765 protocol_error_unlock:
 
 766         write_unlock_bh(&call->state_lock);
 
 769         _leave(" = -EPROTO");
 
 774  * post a message to the socket Rx queue for recvmsg() to pick up
 
 776 static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
 
 779         struct rxrpc_skb_priv *sp;
 
 783         _enter("{%d,%lx},%u,%u,%d",
 
 784                call->debug_id, call->flags, mark, error, fatal);
 
 786         /* remove timers and things for fatal messages */
 
 788                 del_timer_sync(&call->resend_timer);
 
 789                 del_timer_sync(&call->ack_timer);
 
 790                 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 
 793         if (mark != RXRPC_SKB_MARK_NEW_CALL &&
 
 794             !test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 
 795                 _leave("[no userid]");
 
 799         if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) {
 
 800                 skb = alloc_skb(0, GFP_NOFS);
 
 809                 memset(sp, 0, sizeof(*sp));
 
 812                 rxrpc_get_call(call);
 
 814                 spin_lock_bh(&call->lock);
 
 815                 ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
 
 816                 spin_unlock_bh(&call->lock);
 
 824  * handle background processing of incoming call packets and ACK / abort
 
 827 void rxrpc_process_call(struct work_struct *work)
 
 829         struct rxrpc_call *call =
 
 830                 container_of(work, struct rxrpc_call, processor);
 
 831         struct rxrpc_ackpacket ack;
 
 832         struct rxrpc_ackinfo ackinfo;
 
 833         struct rxrpc_header hdr;
 
 839         int genbit, loop, nbit, ioc, ret, mtu;
 
 840         u32 abort_code = RX_PROTOCOL_ERROR;
 
 843         //printk("\n--------------------\n");
 
 844         _enter("{%d,%s,%lx} [%lu]",
 
 845                call->debug_id, rxrpc_call_states[call->state], call->events,
 
 846                (jiffies - call->creation_jif) / (HZ / 10));
 
 848         if (test_and_set_bit(RXRPC_CALL_PROC_BUSY, &call->flags)) {
 
 849                 _debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX");
 
 853         /* there's a good chance we're going to have to send a message, so set
 
 854          * one up in advance */
 
 855         msg.msg_name    = &call->conn->trans->peer->srx.transport.sin;
 
 856         msg.msg_namelen = sizeof(call->conn->trans->peer->srx.transport.sin);
 
 857         msg.msg_control = NULL;
 
 858         msg.msg_controllen = 0;
 
 861         hdr.epoch       = call->conn->epoch;
 
 863         hdr.callNumber  = call->call_id;
 
 865         hdr.type        = RXRPC_PACKET_TYPE_ACK;
 
 866         hdr.flags       = call->conn->out_clientflag;
 
 868         hdr.securityIndex = call->conn->security_ix;
 
 870         hdr.serviceId   = call->conn->service_id;
 
 872         memset(iov, 0, sizeof(iov));
 
 873         iov[0].iov_base = &hdr;
 
 874         iov[0].iov_len  = sizeof(hdr);
 
 876         /* deal with events of a final nature */
 
 877         if (test_bit(RXRPC_CALL_RELEASE, &call->events)) {
 
 878                 rxrpc_release_call(call);
 
 879                 clear_bit(RXRPC_CALL_RELEASE, &call->events);
 
 882         if (test_bit(RXRPC_CALL_RCVD_ERROR, &call->events)) {
 
 885                 clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
 
 886                 clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
 
 887                 clear_bit(RXRPC_CALL_ABORT, &call->events);
 
 889                 error = call->conn->trans->peer->net_error;
 
 890                 _debug("post net error %d", error);
 
 892                 if (rxrpc_post_message(call, RXRPC_SKB_MARK_NET_ERROR,
 
 895                 clear_bit(RXRPC_CALL_RCVD_ERROR, &call->events);
 
 899         if (test_bit(RXRPC_CALL_CONN_ABORT, &call->events)) {
 
 900                 ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
 
 902                 clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
 
 903                 clear_bit(RXRPC_CALL_ABORT, &call->events);
 
 905                 _debug("post conn abort");
 
 907                 if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 
 908                                        call->conn->error, true) < 0)
 
 910                 clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
 
 914         if (test_bit(RXRPC_CALL_REJECT_BUSY, &call->events)) {
 
 915                 hdr.type = RXRPC_PACKET_TYPE_BUSY;
 
 916                 genbit = RXRPC_CALL_REJECT_BUSY;
 
 920         if (test_bit(RXRPC_CALL_ABORT, &call->events)) {
 
 921                 ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
 
 923                 if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 
 924                                        ECONNABORTED, true) < 0)
 
 926                 hdr.type = RXRPC_PACKET_TYPE_ABORT;
 
 927                 data = htonl(call->abort_code);
 
 928                 iov[1].iov_base = &data;
 
 929                 iov[1].iov_len = sizeof(data);
 
 930                 genbit = RXRPC_CALL_ABORT;
 
 934         if (test_bit(RXRPC_CALL_ACK_FINAL, &call->events)) {
 
 935                 genbit = RXRPC_CALL_ACK_FINAL;
 
 937                 ack.bufferSpace = htons(8);
 
 940                 ack.reason      = RXRPC_ACK_IDLE;
 
 942                 call->ackr_reason = 0;
 
 944                 spin_lock_bh(&call->lock);
 
 945                 ack.serial = call->ackr_serial;
 
 946                 ack.previousPacket = call->ackr_prev_seq;
 
 947                 ack.firstPacket = htonl(call->rx_data_eaten + 1);
 
 948                 spin_unlock_bh(&call->lock);
 
 952                 iov[1].iov_base = &ack;
 
 953                 iov[1].iov_len  = sizeof(ack);
 
 954                 iov[2].iov_base = &pad;
 
 956                 iov[3].iov_base = &ackinfo;
 
 957                 iov[3].iov_len  = sizeof(ackinfo);
 
 961         if (call->events & ((1 << RXRPC_CALL_RCVD_BUSY) |
 
 962                             (1 << RXRPC_CALL_RCVD_ABORT))
 
 966                 if (test_bit(RXRPC_CALL_RCVD_ABORT, &call->events))
 
 967                         mark = RXRPC_SKB_MARK_REMOTE_ABORT;
 
 969                         mark = RXRPC_SKB_MARK_BUSY;
 
 971                 _debug("post abort/busy");
 
 972                 rxrpc_clear_tx_window(call);
 
 973                 if (rxrpc_post_message(call, mark, ECONNABORTED, true) < 0)
 
 976                 clear_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
 
 977                 clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
 
 981         if (test_and_clear_bit(RXRPC_CALL_RCVD_ACKALL, &call->events)) {
 
 982                 _debug("do implicit ackall");
 
 983                 rxrpc_clear_tx_window(call);
 
 986         if (test_bit(RXRPC_CALL_LIFE_TIMER, &call->events)) {
 
 987                 write_lock_bh(&call->state_lock);
 
 988                 if (call->state <= RXRPC_CALL_COMPLETE) {
 
 989                         call->state = RXRPC_CALL_LOCALLY_ABORTED;
 
 990                         call->abort_code = RX_CALL_TIMEOUT;
 
 991                         set_bit(RXRPC_CALL_ABORT, &call->events);
 
 993                 write_unlock_bh(&call->state_lock);
 
 995                 _debug("post timeout");
 
 996                 if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 
1000                 clear_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
 
1004         /* deal with assorted inbound messages */
 
1005         if (!skb_queue_empty(&call->rx_queue)) {
 
1006                 switch (rxrpc_process_rx_queue(call, &abort_code)) {
 
1015                         rxrpc_abort_call(call, abort_code);
 
1020         /* handle resending */
 
1021         if (test_and_clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
 
1022                 rxrpc_resend_timer(call);
 
1023         if (test_and_clear_bit(RXRPC_CALL_RESEND, &call->events))
 
1026         /* consider sending an ordinary ACK */
 
1027         if (test_bit(RXRPC_CALL_ACK, &call->events)) {
 
1028                 _debug("send ACK: window: %d - %d { %lx }",
 
1029                        call->rx_data_eaten, call->ackr_win_top,
 
1030                        call->ackr_window[0]);
 
1032                 if (call->state > RXRPC_CALL_SERVER_ACK_REQUEST &&
 
1033                     call->ackr_reason != RXRPC_ACK_PING_RESPONSE) {
 
1034                         /* ACK by sending reply DATA packet in this state */
 
1035                         clear_bit(RXRPC_CALL_ACK, &call->events);
 
1036                         goto maybe_reschedule;
 
1039                 genbit = RXRPC_CALL_ACK;
 
1041                 acks = kzalloc(call->ackr_win_top - call->rx_data_eaten,
 
1046                 //hdr.flags     = RXRPC_SLOW_START_OK;
 
1047                 ack.bufferSpace = htons(8);
 
1052                 spin_lock_bh(&call->lock);
 
1053                 ack.reason = call->ackr_reason;
 
1054                 ack.serial = call->ackr_serial;
 
1055                 ack.previousPacket = call->ackr_prev_seq;
 
1056                 ack.firstPacket = htonl(call->rx_data_eaten + 1);
 
1059                 for (loop = 0; loop < RXRPC_ACKR_WINDOW_ASZ; loop++) {
 
1060                         nbit = loop * BITS_PER_LONG;
 
1061                         for (bits = call->ackr_window[loop]; bits; bits >>= 1
 
1063                                 _debug("- l=%d n=%d b=%lx", loop, nbit, bits);
 
1065                                         acks[nbit] = RXRPC_ACK_TYPE_ACK;
 
1066                                         ack.nAcks = nbit + 1;
 
1071                 call->ackr_reason = 0;
 
1072                 spin_unlock_bh(&call->lock);
 
1076                 iov[1].iov_base = &ack;
 
1077                 iov[1].iov_len  = sizeof(ack);
 
1078                 iov[2].iov_base = acks;
 
1079                 iov[2].iov_len  = ack.nAcks;
 
1080                 iov[3].iov_base = &pad;
 
1082                 iov[4].iov_base = &ackinfo;
 
1083                 iov[4].iov_len  = sizeof(ackinfo);
 
1085                 switch (ack.reason) {
 
1086                 case RXRPC_ACK_REQUESTED:
 
1087                 case RXRPC_ACK_DUPLICATE:
 
1088                 case RXRPC_ACK_OUT_OF_SEQUENCE:
 
1089                 case RXRPC_ACK_EXCEEDS_WINDOW:
 
1090                 case RXRPC_ACK_NOSPACE:
 
1091                 case RXRPC_ACK_PING:
 
1092                 case RXRPC_ACK_PING_RESPONSE:
 
1093                         goto send_ACK_with_skew;
 
1094                 case RXRPC_ACK_DELAY:
 
1095                 case RXRPC_ACK_IDLE:
 
1100         /* handle completion of security negotiations on an incoming
 
1102         if (test_and_clear_bit(RXRPC_CALL_SECURED, &call->events)) {
 
1104                 spin_lock_bh(&call->lock);
 
1106                 if (call->state == RXRPC_CALL_SERVER_SECURING) {
 
1108                         write_lock(&call->conn->lock);
 
1109                         if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
 
1110                             !test_bit(RXRPC_CALL_RELEASE, &call->events)) {
 
1111                                 _debug("not released");
 
1112                                 call->state = RXRPC_CALL_SERVER_ACCEPTING;
 
1113                                 list_move_tail(&call->accept_link,
 
1114                                                &call->socket->acceptq);
 
1116                         write_unlock(&call->conn->lock);
 
1117                         read_lock(&call->state_lock);
 
1118                         if (call->state < RXRPC_CALL_COMPLETE)
 
1119                                 set_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
 
1120                         read_unlock(&call->state_lock);
 
1123                 spin_unlock_bh(&call->lock);
 
1124                 if (!test_bit(RXRPC_CALL_POST_ACCEPT, &call->events))
 
1125                         goto maybe_reschedule;
 
1128         /* post a notification of an acceptable connection to the app */
 
1129         if (test_bit(RXRPC_CALL_POST_ACCEPT, &call->events)) {
 
1130                 _debug("post accept");
 
1131                 if (rxrpc_post_message(call, RXRPC_SKB_MARK_NEW_CALL,
 
1134                 clear_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
 
1135                 goto maybe_reschedule;
 
1138         /* handle incoming call acceptance */
 
1139         if (test_and_clear_bit(RXRPC_CALL_ACCEPTED, &call->events)) {
 
1141                 ASSERTCMP(call->rx_data_post, ==, 0);
 
1142                 call->rx_data_post = 1;
 
1143                 read_lock_bh(&call->state_lock);
 
1144                 if (call->state < RXRPC_CALL_COMPLETE)
 
1145                         set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
 
1146                 read_unlock_bh(&call->state_lock);
 
1149         /* drain the out of sequence received packet queue into the packet Rx
 
1151         if (test_and_clear_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events)) {
 
1152                 while (call->rx_data_post == call->rx_first_oos)
 
1153                         if (rxrpc_drain_rx_oos_queue(call) < 0)
 
1155                 goto maybe_reschedule;
 
1158         /* other events may have been raised since we started checking */
 
1159         goto maybe_reschedule;
 
1162         ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
 
1165         mtu = call->conn->trans->peer->if_mtu;
 
1166         mtu -= call->conn->trans->peer->hdrsize;
 
1167         ackinfo.maxMTU  = htonl(mtu);
 
1168         ackinfo.rwind   = htonl(32);
 
1170         /* permit the peer to send us jumbo packets if it wants to */
 
1171         ackinfo.rxMTU   = htonl(5692);
 
1172         ackinfo.jumbo_max = htonl(4);
 
1174         hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
 
1175         _proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
 
1178                ntohl(ack.firstPacket),
 
1179                ntohl(ack.previousPacket),
 
1181                rxrpc_acks[ack.reason],
 
1184         del_timer_sync(&call->ack_timer);
 
1186                 set_bit(RXRPC_CALL_TX_SOFT_ACK, &call->flags);
 
1187         goto send_message_2;
 
1190         _debug("send message");
 
1192         hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
 
1193         _proto("Tx %s %%%u", rxrpc_pkts[hdr.type], ntohl(hdr.serial));
 
1196         len = iov[0].iov_len;
 
1198         if (iov[4].iov_len) {
 
1200                 len += iov[4].iov_len;
 
1201                 len += iov[3].iov_len;
 
1202                 len += iov[2].iov_len;
 
1203                 len += iov[1].iov_len;
 
1204         } else if (iov[3].iov_len) {
 
1206                 len += iov[3].iov_len;
 
1207                 len += iov[2].iov_len;
 
1208                 len += iov[1].iov_len;
 
1209         } else if (iov[2].iov_len) {
 
1211                 len += iov[2].iov_len;
 
1212                 len += iov[1].iov_len;
 
1213         } else if (iov[1].iov_len) {
 
1215                 len += iov[1].iov_len;
 
1218         ret = kernel_sendmsg(call->conn->trans->local->socket,
 
1219                              &msg, iov, ioc, len);
 
1221                 _debug("sendmsg failed: %d", ret);
 
1222                 read_lock_bh(&call->state_lock);
 
1223                 if (call->state < RXRPC_CALL_DEAD)
 
1224                         rxrpc_queue_call(call);
 
1225                 read_unlock_bh(&call->state_lock);
 
1230         case RXRPC_CALL_ABORT:
 
1231                 clear_bit(genbit, &call->events);
 
1232                 clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
 
1235         case RXRPC_CALL_ACK_FINAL:
 
1236                 write_lock_bh(&call->state_lock);
 
1237                 if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
 
1238                         call->state = RXRPC_CALL_COMPLETE;
 
1239                 write_unlock_bh(&call->state_lock);
 
1243                 clear_bit(genbit, &call->events);
 
1244                 switch (call->state) {
 
1245                 case RXRPC_CALL_CLIENT_AWAIT_REPLY:
 
1246                 case RXRPC_CALL_CLIENT_RECV_REPLY:
 
1247                 case RXRPC_CALL_SERVER_RECV_REQUEST:
 
1248                 case RXRPC_CALL_SERVER_ACK_REQUEST:
 
1249                         _debug("start ACK timer");
 
1250                         rxrpc_propose_ACK(call, RXRPC_ACK_DELAY,
 
1251                                           call->ackr_serial, false);
 
1255                 goto maybe_reschedule;
 
1259         del_timer_sync(&call->ack_timer);
 
1260         if (test_and_clear_bit(RXRPC_CALL_ACK_FINAL, &call->events))
 
1261                 rxrpc_put_call(call);
 
1262         clear_bit(RXRPC_CALL_ACK, &call->events);
 
1265         if (call->events || !skb_queue_empty(&call->rx_queue)) {
 
1266                 read_lock_bh(&call->state_lock);
 
1267                 if (call->state < RXRPC_CALL_DEAD)
 
1268                         rxrpc_queue_call(call);
 
1269                 read_unlock_bh(&call->state_lock);
 
1272         /* don't leave aborted connections on the accept queue */
 
1273         if (call->state >= RXRPC_CALL_COMPLETE &&
 
1274             !list_empty(&call->accept_link)) {
 
1275                 _debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
 
1276                        call, call->events, call->flags,
 
1277                        ntohl(call->conn->cid));
 
1279                 read_lock_bh(&call->state_lock);
 
1280                 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
 
1281                     !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
 
1282                         rxrpc_queue_call(call);
 
1283                 read_unlock_bh(&call->state_lock);
 
1287         clear_bit(RXRPC_CALL_PROC_BUSY, &call->flags);
 
1290         /* because we don't want two CPUs both processing the work item for one
 
1291          * call at the same time, we use a flag to note when it's busy; however
 
1292          * this means there's a race between clearing the flag and setting the
 
1293          * work pending bit and the work item being processed again */
 
1294         if (call->events && !work_pending(&call->processor)) {
 
1295                 _debug("jumpstart %x", ntohl(call->conn->cid));
 
1296                 rxrpc_queue_call(call);
 
1303         _debug("out of memory");
 
1304         goto maybe_reschedule;