Cut over UDP applications to use the new receive API

This commit is contained in:
Tom Henderson
2008-04-25 14:29:28 -07:00
parent d7a038431f
commit 288d193daa
12 changed files with 230 additions and 23 deletions

View File

@@ -103,7 +103,7 @@ UdpEchoClient::StartApplication (void)
m_socket->Connect (InetSocketAddress (m_peerAddress, m_peerPort));
}
m_socket->SetRecvCallback(MakeCallback(&UdpEchoClient::Receive, this));
m_socket->SetRecv_Callback(MakeCallback(&UdpEchoClient::HandleRead, this));
ScheduleTransmit (Seconds(0.));
}
@@ -148,6 +148,29 @@ UdpEchoClient::Send (void)
}
}
void
UdpEchoClient::HandleRead (Ptr<Socket> socket)
{
NS_LOG_FUNCTION (this << socket);
Ptr<Packet> packet;
uint32_t maxSize = std::numeric_limits<uint32_t>::max();
uint32_t flags = 0; // no flags
while (packet = socket->Recv (maxSize, flags))
{
SocketRxAddressTag tag;
bool found = packet->PeekTag (tag);
NS_ASSERT (found);
Address from = tag.GetAddress ();
packet->RemoveTag (tag);
if (InetSocketAddress::IsMatchingType (from))
{
InetSocketAddress address = InetSocketAddress::ConvertFrom (from);
NS_LOG_INFO ("Received " << packet->GetSize() << " bytes from " <<
address.GetIpv4());
}
}
}
void
UdpEchoClient::Receive(
Ptr<Socket> socket,

View File

@@ -57,6 +57,7 @@ private:
void Send (void);
void Receive(Ptr<Socket> socket, Ptr<Packet> packet, const Address &from);
void HandleRead (Ptr<Socket> socket);
uint32_t m_count;
Time m_interval;

View File

@@ -79,7 +79,8 @@ UdpEchoServer::StartApplication (void)
m_socket->Bind (local);
}
m_socket->SetRecvCallback(MakeCallback(&UdpEchoServer::Receive, this));
//m_socket->SetRecvCallback(MakeCallback(&UdpEchoServer::Receive, this));
m_socket->SetRecv_Callback(MakeCallback(&UdpEchoServer::HandleRead, this));
}
void
@@ -94,6 +95,31 @@ UdpEchoServer::StopApplication ()
}
}
void
UdpEchoServer::HandleRead (Ptr<Socket> socket)
{
Ptr<Packet> packet;
uint32_t maxSize = std::numeric_limits<uint32_t>::max();
uint32_t flags = 0; // no flags
while (packet = socket->Recv (maxSize, flags))
{
SocketRxAddressTag tag;
bool found = packet->PeekTag (tag);
NS_ASSERT (found);
Address from = tag.GetAddress ();
packet->RemoveTag (tag);
if (InetSocketAddress::IsMatchingType (from))
{
InetSocketAddress address = InetSocketAddress::ConvertFrom (from);
NS_LOG_INFO ("Received " << packet->GetSize() << " bytes from " <<
address.GetIpv4());
NS_LOG_LOGIC ("Echoing packet");
socket->SendTo (from, packet);
}
}
}
void
UdpEchoServer::Receive(
Ptr<Socket> socket,

View File

@@ -50,6 +50,7 @@ private:
virtual void StopApplication (void);
void Receive(Ptr<Socket> socket, Ptr<Packet> packet, const Address &from);
void HandleRead (Ptr<Socket> socket);
uint16_t m_port;
Ptr<Socket> m_socket;

View File

@@ -33,7 +33,7 @@ namespace ns3 {
* The maximum size (in bytes) of a Tag is stored
* in this constant.
*/
#define TAGS_MAX_SIZE 16
#define TAGS_MAX_SIZE 32
class Tags {
public:

View File

@@ -445,8 +445,19 @@ TcpSocket::Listen (uint32_t q)
Ptr<Packet>
TcpSocket::Recv (uint32_t maxSize, uint32_t flags)
{
if (m_deliveryQueue.empty() )
{
return 0;
}
Ptr<Packet> p = m_deliveryQueue.front ();
m_deliveryQueue.pop ();
if (p->GetSize() <= maxSize)
{
m_deliveryQueue.pop ();
}
else
{
p = 0;
}
return p;
}

View File

@@ -325,6 +325,25 @@ UdpSocket::SendTo(const Address &address, Ptr<Packet> p)
return DoSendTo (p, ipv4, port);
}
Ptr<Packet>
UdpSocket::Recv (uint32_t maxSize, uint32_t flags)
{
if (m_deliveryQueue.empty() )
{
return 0;
}
Ptr<Packet> p = m_deliveryQueue.front ();
if (p->GetSize() <= maxSize)
{
m_deliveryQueue.pop ();
}
else
{
p = 0;
}
return p;
}
void
UdpSocket::ForwardUp (Ptr<Packet> packet, Ipv4Address ipv4, uint16_t port)
{
@@ -334,9 +353,12 @@ UdpSocket::ForwardUp (Ptr<Packet> packet, Ipv4Address ipv4, uint16_t port)
{
return;
}
Address address = InetSocketAddress (ipv4, port);
NotifyDataReceived (packet, address);
SocketRxAddressTag tag;
tag.SetAddress (address);
packet->AddTag (tag);
m_deliveryQueue.push (packet);
NotifyDataRecv ();
}
} //namespace ns3
@@ -367,6 +389,8 @@ public:
void ReceivePacket (Ptr<Socket> socket, Ptr<Packet> packet, const Address &from);
void ReceivePacket2 (Ptr<Socket> socket, Ptr<Packet> packet, const Address &from);
void ReceivePkt (Ptr<Socket> socket);
void ReceivePkt2 (Ptr<Socket> socket);
};
@@ -384,6 +408,16 @@ void UdpSocketTest::ReceivePacket2 (Ptr<Socket> socket, Ptr<Packet> packet, cons
m_receivedPacket2 = packet;
}
void UdpSocketTest::ReceivePkt (Ptr<Socket> socket)
{
m_receivedPacket = socket->Recv (std::numeric_limits<uint32_t>::max(), 0);
}
void UdpSocketTest::ReceivePkt2 (Ptr<Socket> socket)
{
m_receivedPacket2 = socket->Recv (std::numeric_limits<uint32_t>::max(), 0);
}
bool
UdpSocketTest::RunTests (void)
{
@@ -457,10 +491,18 @@ UdpSocketTest::RunTests (void)
Ptr<SocketFactory> rxSocketFactory = rxNode->GetObject<Udp> ();
Ptr<Socket> rxSocket = rxSocketFactory->CreateSocket ();
NS_TEST_ASSERT_EQUAL (rxSocket->Bind (InetSocketAddress (Ipv4Address ("10.0.0.1"), 1234)), 0);
#ifdef OLDSEMANTICS
rxSocket->SetRecvCallback (MakeCallback (&UdpSocketTest::ReceivePacket, this));
#else
rxSocket->SetRecv_Callback (MakeCallback (&UdpSocketTest::ReceivePkt, this));
#endif
Ptr<Socket> rxSocket2 = rxSocketFactory->CreateSocket ();
#ifdef OLDSEMANTICS
rxSocket2->SetRecvCallback (MakeCallback (&UdpSocketTest::ReceivePacket2, this));
#else
rxSocket2->SetRecv_Callback (MakeCallback (&UdpSocketTest::ReceivePkt2, this));
#endif
NS_TEST_ASSERT_EQUAL (rxSocket2->Bind (InetSocketAddress (Ipv4Address ("10.0.1.1"), 1234)), 0);
Ptr<SocketFactory> txSocketFactory = txNode->GetObject<Udp> ();
@@ -477,6 +519,8 @@ UdpSocketTest::RunTests (void)
NS_TEST_ASSERT_EQUAL (m_receivedPacket->GetSize (), 123);
NS_TEST_ASSERT_EQUAL (m_receivedPacket2->GetSize (), 0); // second interface should receive it
m_receivedPacket->RemoveAllTags ();
m_receivedPacket2->RemoveAllTags ();
// Simple broadcast test
@@ -489,6 +533,8 @@ UdpSocketTest::RunTests (void)
// second socket should not receive it (it is bound specifically to the second interface's address
NS_TEST_ASSERT_EQUAL (m_receivedPacket2->GetSize (), 0);
m_receivedPacket->RemoveAllTags ();
m_receivedPacket2->RemoveAllTags ();
// Broadcast test with multiple receiving sockets
@@ -497,7 +543,11 @@ UdpSocketTest::RunTests (void)
// the socket address matches.
rxSocket2->Dispose ();
rxSocket2 = rxSocketFactory->CreateSocket ();
#ifdef OLDSEMANTICS
rxSocket2->SetRecvCallback (MakeCallback (&UdpSocketTest::ReceivePacket2, this));
#else
rxSocket2->SetRecv_Callback (MakeCallback (&UdpSocketTest::ReceivePkt2, this));
#endif
NS_TEST_ASSERT_EQUAL (rxSocket2->Bind (InetSocketAddress (Ipv4Address ("0.0.0.0"), 1234)), 0);
m_receivedPacket = Create<Packet> ();
@@ -508,19 +558,14 @@ UdpSocketTest::RunTests (void)
NS_TEST_ASSERT_EQUAL (m_receivedPacket->GetSize (), 123);
NS_TEST_ASSERT_EQUAL (m_receivedPacket2->GetSize (), 123);
m_receivedPacket->RemoveAllTags ();
m_receivedPacket2->RemoveAllTags ();
Simulator::Destroy ();
return result;
}
Ptr<Packet>
UdpSocket::Recv (uint32_t maxSize, uint32_t flags)
{
Ptr<Packet> p = m_deliveryQueue.front ();
m_deliveryQueue.pop ();
return p;
}
static UdpSocketTest gUdpSocketTest;

View File

@@ -301,15 +301,30 @@ PacketSocket::ForwardUp (Ptr<NetDevice> device, Ptr<Packet> packet,
address.SetSingleDevice (device->GetIfIndex ());
address.SetProtocol (protocol);
SocketRxAddressTag tag;
tag.SetAddress (address);
packet->AddTag (tag);
m_deliveryQueue.push (packet);
NS_LOG_LOGIC ("UID is " << packet->GetUid() << " PacketSocket " << this);
NotifyDataReceived (packet, address);
NotifyDataRecv ();
}
Ptr<Packet>
PacketSocket::Recv (uint32_t maxSize, uint32_t flags)
{
if (m_deliveryQueue.empty() )
{
return 0;
}
Ptr<Packet> p = m_deliveryQueue.front ();
m_deliveryQueue.pop ();
if (p->GetSize() <= maxSize)
{
m_deliveryQueue.pop ();
}
else
{
p = 0;
}
return p;
}

View File

@@ -246,4 +246,53 @@ Socket::NotifyDataRecv (void)
m_receivedData_ (this);
}
}
SocketRxAddressTag::SocketRxAddressTag ()
{
}
uint32_t
SocketRxAddressTag::GetUid (void)
{
static uint32_t uid = ns3::Tag::AllocateUid<SocketRxAddressTag> ("SocketRxAddressTag.ns3");
return uid;
}
void
SocketRxAddressTag::Print (std::ostream &os) const
{
os << "address="<< m_address;
}
uint32_t
SocketRxAddressTag::GetSerializedSize (void) const
{
return 0;
}
void
SocketRxAddressTag::Serialize (Buffer::Iterator i) const
{
// for local use in stack only
}
uint32_t
SocketRxAddressTag::Deserialize (Buffer::Iterator i)
{
// for local use in stack only
return 0;
}
void
SocketRxAddressTag::SetAddress (Address addr)
{
m_address = addr;
}
Address
SocketRxAddressTag::GetAddress (void) const
{
return m_address;
}
}//namespace ns3

View File

@@ -25,12 +25,14 @@
#include "ns3/callback.h"
#include "ns3/ptr.h"
#include "ns3/tag.h"
#include "ns3/object.h"
#include "address.h"
#include <stdint.h>
namespace ns3 {
class Node;
class Packet;
@@ -257,6 +259,14 @@ public:
*/
int SendTo (const Address &address, const uint8_t* buf, uint32_t size);
/**
* \brief Read a single packet from the socket
* \param maxSize reader will accept packet up to maxSize
* \param flags Socket recv flags
* \returns Ptr<Packet> of the next in-sequence packet. Returns
* 0 if the socket cannot return a next in-sequence packet conforming
* to the maxSize and flags.
*/
virtual Ptr<Packet> Recv (uint32_t maxSize, uint32_t flags) = 0;
protected:
@@ -285,6 +295,26 @@ protected:
Callback<void, Ptr<Socket> > m_receivedData_;
};
/**
* \brief This class implements a tag that carries the source address
* of a packet across the receiving socket interface.
*/
class SocketRxAddressTag : public Tag
{
public:
SocketRxAddressTag ();
static uint32_t GetUid (void);
void Print (std::ostream &os) const;
uint32_t GetSerializedSize (void) const;
void Serialize (Buffer::Iterator i) const;
uint32_t Deserialize (Buffer::Iterator i);
void SetAddress (Address addr);
Address GetAddress (void) const;
private:
Address m_address;
};
} //namespace ns3
#endif /* SOCKET_H */

View File

@@ -282,7 +282,7 @@ void AgentImpl::Start ()
// Create a socket to listen only on this interface
Ptr<Socket> socket = socketFactory->CreateSocket ();
socket->SetRecvCallback (MakeCallback (&AgentImpl::RecvOlsr, this));
socket->SetRecv_Callback (MakeCallback (&AgentImpl::RecvOlsr, this));
if (socket->Bind (InetSocketAddress (addr, OLSR_PORT_NUMBER)))
{
NS_FATAL_ERROR ("Failed to bind() OLSR receive socket");
@@ -307,10 +307,18 @@ void AgentImpl::SetMainInterface (uint32_t interface)
//
// \brief Processes an incoming %OLSR packet following RFC 3626 specification.
void
AgentImpl::RecvOlsr (Ptr<Socket> socket,
Ptr<Packet> receivedPacket,
const Address &sourceAddress)
AgentImpl::RecvOlsr (Ptr<Socket> socket)
{
Ptr<Packet> receivedPacket;
uint32_t maxSize = std::numeric_limits<uint32_t>::max();
uint32_t flags = 0; // no flags
receivedPacket = socket->Recv (maxSize, flags);
SocketRxAddressTag tag;
bool found = receivedPacket->PeekTag (tag);
NS_ASSERT (found);
Address sourceAddress = tag.GetAddress ();
InetSocketAddress inetSourceAddr = InetSocketAddress::ConvertFrom (sourceAddress);
Ipv4Address senderIfaceAddr = inetSourceAddr.GetIpv4 ();
Ipv4Address receiverIfaceAddr = m_socketAddresses[socket];

View File

@@ -98,9 +98,7 @@ protected:
/// Increments message sequence number and returns the new value.
inline uint16_t GetMessageSequenceNumber ();
void RecvOlsr (Ptr<Socket> socket,
Ptr<Packet> receivedPacket,
const Address &sourceAddress);
void RecvOlsr (Ptr<Socket> socket);
void MprComputation ();
void RoutingTableComputation ();