Fix TcpSocketImpl rx buffer logic (closes 248)

This commit is contained in:
Raj Bhattacharjea
2008-07-29 15:36:41 -04:00
parent 1f86e826e5
commit caaae1c1dd
2 changed files with 121 additions and 27 deletions

View File

@@ -1053,11 +1053,21 @@ void TcpSocketImpl::NewRx (Ptr<Packet> p,
// 3) Received seq is > expected, just re-ack previous and buffer data
if (tcpHeader.GetSequenceNumber () == m_nextRxSequence)
{ // If seq is expected seq
// Trim the end if necessary
// 1) Update nextRxSeq
// 2) Buffer this packet so Recv can read it
// 3) Send the ack
UnAckData_t::iterator next = m_bufferedData.upper_bound (m_nextRxSequence);
if (next != m_bufferedData.end ())
{
SequenceNumber nextBufferedSeq = next->first;
if (m_nextRxSequence + SequenceNumber(s) > nextBufferedSeq)
{//tail end isn't all new, trim enough off the end
s = nextBufferedSeq - m_nextRxSequence;
}
}
p = p->CreateFragment (0,s);
m_nextRxSequence += s; // Advance next expected sequence
//bytesReceived += s; // Statistics
NS_LOG_LOGIC("Case 1, advanced nrxs to " << m_nextRxSequence );
SocketAddressTag tag;
tag.SetAddress (fromAddress);
@@ -1072,27 +1082,7 @@ void TcpSocketImpl::NewRx (Ptr<Packet> p,
// Save for later delivery
m_bufferedData[tcpHeader.GetSequenceNumber () ] = p;
m_rxAvailable += p->GetSize ();
//putting this into the buffer might have filled in a sequence gap
//so we have to iterate through the list to find the largest contiguous
//sequenced chunk, and update m_rxAvailable appropriately
i = m_bufferedData.find (tcpHeader.GetSequenceNumber () );
UnAckData_t::iterator next = i;
next++;
while(next != m_bufferedData.end())
{
if(i->first + SequenceNumber(i->second->GetSize ()) == next->first)
{
//next packet is in sequence, count it
m_rxAvailable += next->second->GetSize();
m_nextRxSequence += next->second->GetSize();
}
else
{
break; //no more in this contiguous chunk
}
++i;
++next;
}
RxBufFinishInsert (tcpHeader.GetSequenceNumber ());
NotifyDataRecv ();
if (m_closeNotified)
{
@@ -1107,20 +1097,93 @@ void TcpSocketImpl::NewRx (Ptr<Packet> p,
}
}
}
else if (SequenceNumber (tcpHeader.GetSequenceNumber ()) >= m_nextRxSequence)
{ // Need to buffer this one
else if (tcpHeader.GetSequenceNumber () > m_nextRxSequence)
{ // Need to buffer this one, but trim off the front and back if necessary
NS_LOG_LOGIC ("Case 2, buffering " << tcpHeader.GetSequenceNumber () );
UnAckData_t::iterator previous =
m_bufferedData.lower_bound (tcpHeader.GetSequenceNumber ());
SequenceNumber startSeq = tcpHeader.GetSequenceNumber();
if (previous != m_bufferedData.begin ())
{
--previous;
startSeq = previous->first + SequenceNumber(previous->second->GetSize());
if (startSeq > tcpHeader.GetSequenceNumber ())
{
s = tcpHeader.GetSequenceNumber () + SequenceNumber(s) - startSeq;
}
else
{
startSeq = tcpHeader.GetSequenceNumber();
}
}
//possibly trim off the end
UnAckData_t::iterator next = m_bufferedData.upper_bound (tcpHeader.GetSequenceNumber());
if (next != m_bufferedData.end ())
{
SequenceNumber nextBufferedSeq = next->first;
if (startSeq + SequenceNumber(s) > nextBufferedSeq)
{//tail end isn't all new either, trim enough off the end
s = nextBufferedSeq - startSeq;
}
}
p = p->CreateFragment (startSeq - tcpHeader.GetSequenceNumber (),s);
UnAckData_t::iterator i =
m_bufferedData.find (tcpHeader.GetSequenceNumber () );
m_bufferedData.find (startSeq);
if (i != m_bufferedData.end () )
{
i->second = 0; // relase reference to already buffered
if(p->GetSize() > i->second->GetSize())
{
i->second = 0; // relase reference to already buffered
}
else
{
p = i->second;
}
}
// Save for later delivery
SocketAddressTag tag;
tag.SetAddress (fromAddress);
p->AddTag (tag);
m_bufferedData[tcpHeader.GetSequenceNumber () ] = p;
m_bufferedData[startSeq] = p;
i = m_bufferedData.find (startSeq);
next = i;
++next;
if(next != m_bufferedData.end())
{
NS_ASSERT(next->first >= i->first + SequenceNumber(i->second->GetSize ()));
}
}
else if (tcpHeader.GetSequenceNumber () + SequenceNumber(s) > m_nextRxSequence)
{//parial new data case, only part of the packet is new data
//trim the beginning
s = tcpHeader.GetSequenceNumber () + SequenceNumber(s) - m_nextRxSequence; //how much new
//possibly trim off the end
UnAckData_t::iterator next = m_bufferedData.upper_bound (m_nextRxSequence);
if (next != m_bufferedData.end ())
{
SequenceNumber nextBufferedSeq = next->first;
if (m_nextRxSequence + SequenceNumber(s) > nextBufferedSeq)
{//tail end isn't all new either, trim enough off the end
s = nextBufferedSeq - m_nextRxSequence;
}
}
p = p->CreateFragment (m_nextRxSequence - tcpHeader.GetSequenceNumber (),s);
SequenceNumber start = m_nextRxSequence;
m_nextRxSequence += s; // Advance next expected sequence
SocketAddressTag tag;
tag.SetAddress (fromAddress);
p->AddTag (tag);
//buffer the new fragment, it'll be read by call to Recv
UnAckData_t::iterator i = m_bufferedData.find (start);
if (i != m_bufferedData.end () ) //we found it already in the buffer
{
i->second = 0; // relase reference to already buffered
}
// Save for later delivery
m_bufferedData[start] = p;
m_rxAvailable += p->GetSize ();
RxBufFinishInsert(start);
NotifyDataRecv ();
}
else
{ // debug
@@ -1142,6 +1205,36 @@ void TcpSocketImpl::NewRx (Ptr<Packet> p,
}
}
void TcpSocketImpl::RxBufFinishInsert (SequenceNumber seq)
{
//putting data into the buffer might have filled in a sequence gap so we have
//to iterate through the list to find the largest contiguous sequenced chunk,
//and update m_rxAvailable and m_nextRxSequence appropriately
UnAckData_t::iterator i = m_bufferedData.find (seq);
UnAckData_t::iterator next = i;
++next;
//make sure the buffer is logically sequenced
if(next != m_bufferedData.end())
{
NS_ASSERT(next->first >= i->first + SequenceNumber(i->second->GetSize ()));
}
while(next != m_bufferedData.end())
{
if(i->first + SequenceNumber(i->second->GetSize ()) == next->first)
{
//next packet is in sequence, count it
m_rxAvailable += next->second->GetSize();
m_nextRxSequence += next->second->GetSize();
}
else
{
break; //no more in this contiguous chunk
}
++i;
++next;
}
}
void TcpSocketImpl::DelAckTimeout ()
{
m_delAckCount = 0;

View File

@@ -123,6 +123,7 @@ private:
// Manage data tx/rx
void NewRx (Ptr<Packet>, const TcpHeader&, const Address&);
void RxBufFinishInsert (SequenceNumber);
// XXX This should be virtual and overridden
Ptr<TcpSocketImpl> Copy ();
void NewAck (SequenceNumber seq);