From 288d193daaf34aa078c2cb3d59cfd02b9ae4c731 Mon Sep 17 00:00:00 2001 From: Tom Henderson Date: Fri, 25 Apr 2008 14:29:28 -0700 Subject: [PATCH] Cut over UDP applications to use the new receive API --- src/applications/udp-echo/udp-echo-client.cc | 25 +++++++- src/applications/udp-echo/udp-echo-client.h | 1 + src/applications/udp-echo/udp-echo-server.cc | 28 ++++++++- src/applications/udp-echo/udp-echo-server.h | 1 + src/common/tags.h | 2 +- src/internet-node/tcp-socket.cc | 13 +++- src/internet-node/udp-socket.cc | 65 +++++++++++++++++--- src/node/packet-socket.cc | 19 +++++- src/node/socket.cc | 49 +++++++++++++++ src/node/socket.h | 30 +++++++++ src/routing/olsr/olsr-agent-impl.cc | 16 +++-- src/routing/olsr/olsr-agent-impl.h | 4 +- 12 files changed, 230 insertions(+), 23 deletions(-) diff --git a/src/applications/udp-echo/udp-echo-client.cc b/src/applications/udp-echo/udp-echo-client.cc index 6d122d872..458beb105 100644 --- a/src/applications/udp-echo/udp-echo-client.cc +++ b/src/applications/udp-echo/udp-echo-client.cc @@ -103,7 +103,7 @@ UdpEchoClient::StartApplication (void) m_socket->Connect (InetSocketAddress (m_peerAddress, m_peerPort)); } - m_socket->SetRecvCallback(MakeCallback(&UdpEchoClient::Receive, this)); + m_socket->SetRecv_Callback(MakeCallback(&UdpEchoClient::HandleRead, this)); ScheduleTransmit (Seconds(0.)); } @@ -148,6 +148,29 @@ UdpEchoClient::Send (void) } } +void +UdpEchoClient::HandleRead (Ptr socket) +{ + NS_LOG_FUNCTION (this << socket); + Ptr packet; + uint32_t maxSize = std::numeric_limits::max(); + uint32_t flags = 0; // no flags + while (packet = socket->Recv (maxSize, flags)) + { + SocketRxAddressTag tag; + bool found = packet->PeekTag (tag); + NS_ASSERT (found); + Address from = tag.GetAddress (); + packet->RemoveTag (tag); + if (InetSocketAddress::IsMatchingType (from)) + { + InetSocketAddress address = InetSocketAddress::ConvertFrom (from); + NS_LOG_INFO ("Received " << packet->GetSize() << " bytes from " << + address.GetIpv4()); + } + } +} + void UdpEchoClient::Receive( Ptr socket, diff --git a/src/applications/udp-echo/udp-echo-client.h b/src/applications/udp-echo/udp-echo-client.h index 9a924edc2..848bdd211 100644 --- a/src/applications/udp-echo/udp-echo-client.h +++ b/src/applications/udp-echo/udp-echo-client.h @@ -57,6 +57,7 @@ private: void Send (void); void Receive(Ptr socket, Ptr packet, const Address &from); + void HandleRead (Ptr socket); uint32_t m_count; Time m_interval; diff --git a/src/applications/udp-echo/udp-echo-server.cc b/src/applications/udp-echo/udp-echo-server.cc index 3535c3ffc..f0129b160 100644 --- a/src/applications/udp-echo/udp-echo-server.cc +++ b/src/applications/udp-echo/udp-echo-server.cc @@ -79,7 +79,8 @@ UdpEchoServer::StartApplication (void) m_socket->Bind (local); } - m_socket->SetRecvCallback(MakeCallback(&UdpEchoServer::Receive, this)); + //m_socket->SetRecvCallback(MakeCallback(&UdpEchoServer::Receive, this)); + m_socket->SetRecv_Callback(MakeCallback(&UdpEchoServer::HandleRead, this)); } void @@ -94,6 +95,31 @@ UdpEchoServer::StopApplication () } } +void +UdpEchoServer::HandleRead (Ptr socket) +{ + Ptr packet; + uint32_t maxSize = std::numeric_limits::max(); + uint32_t flags = 0; // no flags + while (packet = socket->Recv (maxSize, flags)) + { + SocketRxAddressTag tag; + bool found = packet->PeekTag (tag); + NS_ASSERT (found); + Address from = tag.GetAddress (); + packet->RemoveTag (tag); + if (InetSocketAddress::IsMatchingType (from)) + { + InetSocketAddress address = InetSocketAddress::ConvertFrom (from); + NS_LOG_INFO ("Received " << packet->GetSize() << " bytes from " << + address.GetIpv4()); + + NS_LOG_LOGIC ("Echoing packet"); + socket->SendTo (from, packet); + } + } +} + void UdpEchoServer::Receive( Ptr socket, diff --git a/src/applications/udp-echo/udp-echo-server.h b/src/applications/udp-echo/udp-echo-server.h index e0f55b4cd..6b42ec2fc 100644 --- a/src/applications/udp-echo/udp-echo-server.h +++ b/src/applications/udp-echo/udp-echo-server.h @@ -50,6 +50,7 @@ private: virtual void StopApplication (void); void Receive(Ptr socket, Ptr packet, const Address &from); + void HandleRead (Ptr socket); uint16_t m_port; Ptr m_socket; diff --git a/src/common/tags.h b/src/common/tags.h index a88b6a6e0..d8ca513b5 100644 --- a/src/common/tags.h +++ b/src/common/tags.h @@ -33,7 +33,7 @@ namespace ns3 { * The maximum size (in bytes) of a Tag is stored * in this constant. */ -#define TAGS_MAX_SIZE 16 +#define TAGS_MAX_SIZE 32 class Tags { public: diff --git a/src/internet-node/tcp-socket.cc b/src/internet-node/tcp-socket.cc index e88693979..34f0476f3 100644 --- a/src/internet-node/tcp-socket.cc +++ b/src/internet-node/tcp-socket.cc @@ -445,8 +445,19 @@ TcpSocket::Listen (uint32_t q) Ptr TcpSocket::Recv (uint32_t maxSize, uint32_t flags) { + if (m_deliveryQueue.empty() ) + { + return 0; + } Ptr p = m_deliveryQueue.front (); - m_deliveryQueue.pop (); + if (p->GetSize() <= maxSize) + { + m_deliveryQueue.pop (); + } + else + { + p = 0; + } return p; } diff --git a/src/internet-node/udp-socket.cc b/src/internet-node/udp-socket.cc index 00ba74e9a..7a1976639 100644 --- a/src/internet-node/udp-socket.cc +++ b/src/internet-node/udp-socket.cc @@ -325,6 +325,25 @@ UdpSocket::SendTo(const Address &address, Ptr p) return DoSendTo (p, ipv4, port); } +Ptr +UdpSocket::Recv (uint32_t maxSize, uint32_t flags) +{ + if (m_deliveryQueue.empty() ) + { + return 0; + } + Ptr p = m_deliveryQueue.front (); + if (p->GetSize() <= maxSize) + { + m_deliveryQueue.pop (); + } + else + { + p = 0; + } + return p; +} + void UdpSocket::ForwardUp (Ptr packet, Ipv4Address ipv4, uint16_t port) { @@ -334,9 +353,12 @@ UdpSocket::ForwardUp (Ptr packet, Ipv4Address ipv4, uint16_t port) { return; } - Address address = InetSocketAddress (ipv4, port); - NotifyDataReceived (packet, address); + SocketRxAddressTag tag; + tag.SetAddress (address); + packet->AddTag (tag); + m_deliveryQueue.push (packet); + NotifyDataRecv (); } } //namespace ns3 @@ -367,6 +389,8 @@ public: void ReceivePacket (Ptr socket, Ptr packet, const Address &from); void ReceivePacket2 (Ptr socket, Ptr packet, const Address &from); + void ReceivePkt (Ptr socket); + void ReceivePkt2 (Ptr socket); }; @@ -384,6 +408,16 @@ void UdpSocketTest::ReceivePacket2 (Ptr socket, Ptr packet, cons m_receivedPacket2 = packet; } +void UdpSocketTest::ReceivePkt (Ptr socket) +{ + m_receivedPacket = socket->Recv (std::numeric_limits::max(), 0); +} + +void UdpSocketTest::ReceivePkt2 (Ptr socket) +{ + m_receivedPacket2 = socket->Recv (std::numeric_limits::max(), 0); +} + bool UdpSocketTest::RunTests (void) { @@ -457,10 +491,18 @@ UdpSocketTest::RunTests (void) Ptr rxSocketFactory = rxNode->GetObject (); Ptr rxSocket = rxSocketFactory->CreateSocket (); NS_TEST_ASSERT_EQUAL (rxSocket->Bind (InetSocketAddress (Ipv4Address ("10.0.0.1"), 1234)), 0); +#ifdef OLDSEMANTICS rxSocket->SetRecvCallback (MakeCallback (&UdpSocketTest::ReceivePacket, this)); +#else + rxSocket->SetRecv_Callback (MakeCallback (&UdpSocketTest::ReceivePkt, this)); +#endif Ptr rxSocket2 = rxSocketFactory->CreateSocket (); +#ifdef OLDSEMANTICS rxSocket2->SetRecvCallback (MakeCallback (&UdpSocketTest::ReceivePacket2, this)); +#else + rxSocket2->SetRecv_Callback (MakeCallback (&UdpSocketTest::ReceivePkt2, this)); +#endif NS_TEST_ASSERT_EQUAL (rxSocket2->Bind (InetSocketAddress (Ipv4Address ("10.0.1.1"), 1234)), 0); Ptr txSocketFactory = txNode->GetObject (); @@ -477,6 +519,8 @@ UdpSocketTest::RunTests (void) NS_TEST_ASSERT_EQUAL (m_receivedPacket->GetSize (), 123); NS_TEST_ASSERT_EQUAL (m_receivedPacket2->GetSize (), 0); // second interface should receive it + m_receivedPacket->RemoveAllTags (); + m_receivedPacket2->RemoveAllTags (); // Simple broadcast test @@ -489,6 +533,8 @@ UdpSocketTest::RunTests (void) // second socket should not receive it (it is bound specifically to the second interface's address NS_TEST_ASSERT_EQUAL (m_receivedPacket2->GetSize (), 0); + m_receivedPacket->RemoveAllTags (); + m_receivedPacket2->RemoveAllTags (); // Broadcast test with multiple receiving sockets @@ -497,7 +543,11 @@ UdpSocketTest::RunTests (void) // the socket address matches. rxSocket2->Dispose (); rxSocket2 = rxSocketFactory->CreateSocket (); +#ifdef OLDSEMANTICS rxSocket2->SetRecvCallback (MakeCallback (&UdpSocketTest::ReceivePacket2, this)); +#else + rxSocket2->SetRecv_Callback (MakeCallback (&UdpSocketTest::ReceivePkt2, this)); +#endif NS_TEST_ASSERT_EQUAL (rxSocket2->Bind (InetSocketAddress (Ipv4Address ("0.0.0.0"), 1234)), 0); m_receivedPacket = Create (); @@ -508,19 +558,14 @@ UdpSocketTest::RunTests (void) NS_TEST_ASSERT_EQUAL (m_receivedPacket->GetSize (), 123); NS_TEST_ASSERT_EQUAL (m_receivedPacket2->GetSize (), 123); + m_receivedPacket->RemoveAllTags (); + m_receivedPacket2->RemoveAllTags (); + Simulator::Destroy (); return result; } -Ptr -UdpSocket::Recv (uint32_t maxSize, uint32_t flags) -{ - Ptr p = m_deliveryQueue.front (); - m_deliveryQueue.pop (); - return p; -} - static UdpSocketTest gUdpSocketTest; diff --git a/src/node/packet-socket.cc b/src/node/packet-socket.cc index fa045e3b0..ad2041333 100644 --- a/src/node/packet-socket.cc +++ b/src/node/packet-socket.cc @@ -301,15 +301,30 @@ PacketSocket::ForwardUp (Ptr device, Ptr packet, address.SetSingleDevice (device->GetIfIndex ()); address.SetProtocol (protocol); + SocketRxAddressTag tag; + tag.SetAddress (address); + packet->AddTag (tag); + m_deliveryQueue.push (packet); NS_LOG_LOGIC ("UID is " << packet->GetUid() << " PacketSocket " << this); - NotifyDataReceived (packet, address); + NotifyDataRecv (); } Ptr PacketSocket::Recv (uint32_t maxSize, uint32_t flags) { + if (m_deliveryQueue.empty() ) + { + return 0; + } Ptr p = m_deliveryQueue.front (); - m_deliveryQueue.pop (); + if (p->GetSize() <= maxSize) + { + m_deliveryQueue.pop (); + } + else + { + p = 0; + } return p; } diff --git a/src/node/socket.cc b/src/node/socket.cc index 86d5d2ec8..071449a31 100644 --- a/src/node/socket.cc +++ b/src/node/socket.cc @@ -246,4 +246,53 @@ Socket::NotifyDataRecv (void) m_receivedData_ (this); } } + +SocketRxAddressTag::SocketRxAddressTag () +{ +} + +uint32_t +SocketRxAddressTag::GetUid (void) +{ + static uint32_t uid = ns3::Tag::AllocateUid ("SocketRxAddressTag.ns3"); + return uid; +} + +void +SocketRxAddressTag::Print (std::ostream &os) const +{ + os << "address="<< m_address; +} + +uint32_t +SocketRxAddressTag::GetSerializedSize (void) const +{ + return 0; +} + +void +SocketRxAddressTag::Serialize (Buffer::Iterator i) const +{ + // for local use in stack only +} + +uint32_t +SocketRxAddressTag::Deserialize (Buffer::Iterator i) +{ + // for local use in stack only + return 0; +} + +void +SocketRxAddressTag::SetAddress (Address addr) +{ + m_address = addr; +} + +Address +SocketRxAddressTag::GetAddress (void) const +{ + return m_address; +} + }//namespace ns3 diff --git a/src/node/socket.h b/src/node/socket.h index 8340ae665..f60233738 100644 --- a/src/node/socket.h +++ b/src/node/socket.h @@ -25,12 +25,14 @@ #include "ns3/callback.h" #include "ns3/ptr.h" +#include "ns3/tag.h" #include "ns3/object.h" #include "address.h" #include namespace ns3 { + class Node; class Packet; @@ -257,6 +259,14 @@ public: */ int SendTo (const Address &address, const uint8_t* buf, uint32_t size); + /** + * \brief Read a single packet from the socket + * \param maxSize reader will accept packet up to maxSize + * \param flags Socket recv flags + * \returns Ptr of the next in-sequence packet. Returns + * 0 if the socket cannot return a next in-sequence packet conforming + * to the maxSize and flags. + */ virtual Ptr Recv (uint32_t maxSize, uint32_t flags) = 0; protected: @@ -285,6 +295,26 @@ protected: Callback > m_receivedData_; }; +/** + * \brief This class implements a tag that carries the source address + * of a packet across the receiving socket interface. + */ +class SocketRxAddressTag : public Tag +{ +public: + SocketRxAddressTag (); + static uint32_t GetUid (void); + void Print (std::ostream &os) const; + uint32_t GetSerializedSize (void) const; + void Serialize (Buffer::Iterator i) const; + uint32_t Deserialize (Buffer::Iterator i); + + void SetAddress (Address addr); + Address GetAddress (void) const; +private: + Address m_address; +}; + } //namespace ns3 #endif /* SOCKET_H */ diff --git a/src/routing/olsr/olsr-agent-impl.cc b/src/routing/olsr/olsr-agent-impl.cc index 5c27a7d6b..f0bebc2e8 100644 --- a/src/routing/olsr/olsr-agent-impl.cc +++ b/src/routing/olsr/olsr-agent-impl.cc @@ -282,7 +282,7 @@ void AgentImpl::Start () // Create a socket to listen only on this interface Ptr socket = socketFactory->CreateSocket (); - socket->SetRecvCallback (MakeCallback (&AgentImpl::RecvOlsr, this)); + socket->SetRecv_Callback (MakeCallback (&AgentImpl::RecvOlsr, this)); if (socket->Bind (InetSocketAddress (addr, OLSR_PORT_NUMBER))) { NS_FATAL_ERROR ("Failed to bind() OLSR receive socket"); @@ -307,10 +307,18 @@ void AgentImpl::SetMainInterface (uint32_t interface) // // \brief Processes an incoming %OLSR packet following RFC 3626 specification. void -AgentImpl::RecvOlsr (Ptr socket, - Ptr receivedPacket, - const Address &sourceAddress) +AgentImpl::RecvOlsr (Ptr socket) { + Ptr receivedPacket; + uint32_t maxSize = std::numeric_limits::max(); + uint32_t flags = 0; // no flags + receivedPacket = socket->Recv (maxSize, flags); + + SocketRxAddressTag tag; + bool found = receivedPacket->PeekTag (tag); + NS_ASSERT (found); + Address sourceAddress = tag.GetAddress (); + InetSocketAddress inetSourceAddr = InetSocketAddress::ConvertFrom (sourceAddress); Ipv4Address senderIfaceAddr = inetSourceAddr.GetIpv4 (); Ipv4Address receiverIfaceAddr = m_socketAddresses[socket]; diff --git a/src/routing/olsr/olsr-agent-impl.h b/src/routing/olsr/olsr-agent-impl.h index 6c4f4b68a..01e486d35 100644 --- a/src/routing/olsr/olsr-agent-impl.h +++ b/src/routing/olsr/olsr-agent-impl.h @@ -98,9 +98,7 @@ protected: /// Increments message sequence number and returns the new value. inline uint16_t GetMessageSequenceNumber (); - void RecvOlsr (Ptr socket, - Ptr receivedPacket, - const Address &sourceAddress); + void RecvOlsr (Ptr socket); void MprComputation (); void RoutingTableComputation ();