From caaae1c1dd4d00f96ac10e23955f527f65d3496d Mon Sep 17 00:00:00 2001 From: Raj Bhattacharjea Date: Tue, 29 Jul 2008 15:36:41 -0400 Subject: [PATCH] Fix TcpSocketImpl rx buffer logic (closes 248) --- src/internet-stack/tcp-socket-impl.cc | 147 +++++++++++++++++++++----- src/internet-stack/tcp-socket-impl.h | 1 + 2 files changed, 121 insertions(+), 27 deletions(-) diff --git a/src/internet-stack/tcp-socket-impl.cc b/src/internet-stack/tcp-socket-impl.cc index cb970df79..abd008ee1 100644 --- a/src/internet-stack/tcp-socket-impl.cc +++ b/src/internet-stack/tcp-socket-impl.cc @@ -1053,11 +1053,21 @@ void TcpSocketImpl::NewRx (Ptr p, // 3) Received seq is > expected, just re-ack previous and buffer data if (tcpHeader.GetSequenceNumber () == m_nextRxSequence) { // If seq is expected seq + // Trim the end if necessary // 1) Update nextRxSeq // 2) Buffer this packet so Recv can read it // 3) Send the ack + UnAckData_t::iterator next = m_bufferedData.upper_bound (m_nextRxSequence); + if (next != m_bufferedData.end ()) + { + SequenceNumber nextBufferedSeq = next->first; + if (m_nextRxSequence + SequenceNumber(s) > nextBufferedSeq) + {//tail end isn't all new, trim enough off the end + s = nextBufferedSeq - m_nextRxSequence; + } + } + p = p->CreateFragment (0,s); m_nextRxSequence += s; // Advance next expected sequence - //bytesReceived += s; // Statistics NS_LOG_LOGIC("Case 1, advanced nrxs to " << m_nextRxSequence ); SocketAddressTag tag; tag.SetAddress (fromAddress); @@ -1072,27 +1082,7 @@ void TcpSocketImpl::NewRx (Ptr p, // Save for later delivery m_bufferedData[tcpHeader.GetSequenceNumber () ] = p; m_rxAvailable += p->GetSize (); - //putting this into the buffer might have filled in a sequence gap - //so we have to iterate through the list to find the largest contiguous - //sequenced chunk, and update m_rxAvailable appropriately - i = m_bufferedData.find (tcpHeader.GetSequenceNumber () ); - UnAckData_t::iterator next = i; - next++; - while(next != m_bufferedData.end()) - { - if(i->first + SequenceNumber(i->second->GetSize ()) == next->first) - { - //next packet is in sequence, count it - m_rxAvailable += next->second->GetSize(); - m_nextRxSequence += next->second->GetSize(); - } - else - { - break; //no more in this contiguous chunk - } - ++i; - ++next; - } + RxBufFinishInsert (tcpHeader.GetSequenceNumber ()); NotifyDataRecv (); if (m_closeNotified) { @@ -1107,20 +1097,93 @@ void TcpSocketImpl::NewRx (Ptr p, } } } - else if (SequenceNumber (tcpHeader.GetSequenceNumber ()) >= m_nextRxSequence) - { // Need to buffer this one + else if (tcpHeader.GetSequenceNumber () > m_nextRxSequence) + { // Need to buffer this one, but trim off the front and back if necessary NS_LOG_LOGIC ("Case 2, buffering " << tcpHeader.GetSequenceNumber () ); + UnAckData_t::iterator previous = + m_bufferedData.lower_bound (tcpHeader.GetSequenceNumber ()); + SequenceNumber startSeq = tcpHeader.GetSequenceNumber(); + if (previous != m_bufferedData.begin ()) + { + --previous; + startSeq = previous->first + SequenceNumber(previous->second->GetSize()); + if (startSeq > tcpHeader.GetSequenceNumber ()) + { + s = tcpHeader.GetSequenceNumber () + SequenceNumber(s) - startSeq; + } + else + { + startSeq = tcpHeader.GetSequenceNumber(); + } + } + //possibly trim off the end + UnAckData_t::iterator next = m_bufferedData.upper_bound (tcpHeader.GetSequenceNumber()); + if (next != m_bufferedData.end ()) + { + SequenceNumber nextBufferedSeq = next->first; + if (startSeq + SequenceNumber(s) > nextBufferedSeq) + {//tail end isn't all new either, trim enough off the end + s = nextBufferedSeq - startSeq; + } + } + p = p->CreateFragment (startSeq - tcpHeader.GetSequenceNumber (),s); UnAckData_t::iterator i = - m_bufferedData.find (tcpHeader.GetSequenceNumber () ); + m_bufferedData.find (startSeq); if (i != m_bufferedData.end () ) { - i->second = 0; // relase reference to already buffered + if(p->GetSize() > i->second->GetSize()) + { + i->second = 0; // relase reference to already buffered + } + else + { + p = i->second; + } } // Save for later delivery SocketAddressTag tag; tag.SetAddress (fromAddress); p->AddTag (tag); - m_bufferedData[tcpHeader.GetSequenceNumber () ] = p; + m_bufferedData[startSeq] = p; + i = m_bufferedData.find (startSeq); + next = i; + ++next; + if(next != m_bufferedData.end()) + { + NS_ASSERT(next->first >= i->first + SequenceNumber(i->second->GetSize ())); + } + } + else if (tcpHeader.GetSequenceNumber () + SequenceNumber(s) > m_nextRxSequence) + {//parial new data case, only part of the packet is new data + //trim the beginning + s = tcpHeader.GetSequenceNumber () + SequenceNumber(s) - m_nextRxSequence; //how much new + //possibly trim off the end + UnAckData_t::iterator next = m_bufferedData.upper_bound (m_nextRxSequence); + if (next != m_bufferedData.end ()) + { + SequenceNumber nextBufferedSeq = next->first; + if (m_nextRxSequence + SequenceNumber(s) > nextBufferedSeq) + {//tail end isn't all new either, trim enough off the end + s = nextBufferedSeq - m_nextRxSequence; + } + } + p = p->CreateFragment (m_nextRxSequence - tcpHeader.GetSequenceNumber (),s); + SequenceNumber start = m_nextRxSequence; + m_nextRxSequence += s; // Advance next expected sequence + SocketAddressTag tag; + tag.SetAddress (fromAddress); + p->AddTag (tag); + //buffer the new fragment, it'll be read by call to Recv + UnAckData_t::iterator i = m_bufferedData.find (start); + if (i != m_bufferedData.end () ) //we found it already in the buffer + { + i->second = 0; // relase reference to already buffered + } + // Save for later delivery + m_bufferedData[start] = p; + m_rxAvailable += p->GetSize (); + RxBufFinishInsert(start); + NotifyDataRecv (); } else { // debug @@ -1142,6 +1205,36 @@ void TcpSocketImpl::NewRx (Ptr p, } } +void TcpSocketImpl::RxBufFinishInsert (SequenceNumber seq) +{ + //putting data into the buffer might have filled in a sequence gap so we have + //to iterate through the list to find the largest contiguous sequenced chunk, + //and update m_rxAvailable and m_nextRxSequence appropriately + UnAckData_t::iterator i = m_bufferedData.find (seq); + UnAckData_t::iterator next = i; + ++next; + //make sure the buffer is logically sequenced + if(next != m_bufferedData.end()) + { + NS_ASSERT(next->first >= i->first + SequenceNumber(i->second->GetSize ())); + } + while(next != m_bufferedData.end()) + { + if(i->first + SequenceNumber(i->second->GetSize ()) == next->first) + { + //next packet is in sequence, count it + m_rxAvailable += next->second->GetSize(); + m_nextRxSequence += next->second->GetSize(); + } + else + { + break; //no more in this contiguous chunk + } + ++i; + ++next; + } +} + void TcpSocketImpl::DelAckTimeout () { m_delAckCount = 0; diff --git a/src/internet-stack/tcp-socket-impl.h b/src/internet-stack/tcp-socket-impl.h index 3817a28c1..8f2597124 100644 --- a/src/internet-stack/tcp-socket-impl.h +++ b/src/internet-stack/tcp-socket-impl.h @@ -123,6 +123,7 @@ private: // Manage data tx/rx void NewRx (Ptr, const TcpHeader&, const Address&); + void RxBufFinishInsert (SequenceNumber); // XXX This should be virtual and overridden Ptr Copy (); void NewAck (SequenceNumber seq);