diff --git a/src/mpi/CMakeLists.txt b/src/mpi/CMakeLists.txt index f5184e8ed..cad1653a9 100644 --- a/src/mpi/CMakeLists.txt +++ b/src/mpi/CMakeLists.txt @@ -5,25 +5,52 @@ if(${ENABLE_EXAMPLES}) ) endif() -build_lib( - LIBNAME mpi - SOURCE_FILES - model/distributed-simulator-impl.cc - model/granted-time-window-mpi-interface.cc - model/mpi-interface.cc - model/mpi-receiver.cc - model/null-message-mpi-interface.cc - model/null-message-simulator-impl.cc - model/parallel-communication-interface.h - model/remote-channel-bundle-manager.cc - model/remote-channel-bundle.cc - HEADER_FILES - model/mpi-interface.h - model/mpi-receiver.h - model/parallel-communication-interface.h - LIBRARIES_TO_LINK - ${libcore} - ${libnetwork} - ${MPI_CXX_LIBRARIES} - TEST_SOURCES ${example_as_test_suite} -) +if(${ENABLE_MTP}) + build_lib( + LIBNAME mpi + SOURCE_FILES + model/distributed-simulator-impl.cc + model/granted-time-window-mpi-interface.cc + model/hybrid-simulator-impl.cc + model/mpi-interface.cc + model/mpi-receiver.cc + model/null-message-mpi-interface.cc + model/null-message-simulator-impl.cc + model/parallel-communication-interface.h + model/remote-channel-bundle-manager.cc + model/remote-channel-bundle.cc + HEADER_FILES + model/mpi-interface.h + model/mpi-receiver.h + model/parallel-communication-interface.h + LIBRARIES_TO_LINK + ${libcore} + ${libnetwork} + ${libmtp} + ${MPI_CXX_LIBRARIES} + TEST_SOURCES ${example_as_test_suite} + ) +else() + build_lib( + LIBNAME mpi + SOURCE_FILES + model/distributed-simulator-impl.cc + model/granted-time-window-mpi-interface.cc + model/mpi-interface.cc + model/mpi-receiver.cc + model/null-message-mpi-interface.cc + model/null-message-simulator-impl.cc + model/parallel-communication-interface.h + model/remote-channel-bundle-manager.cc + model/remote-channel-bundle.cc + HEADER_FILES + model/mpi-interface.h + model/mpi-receiver.h + model/parallel-communication-interface.h + LIBRARIES_TO_LINK + ${libcore} + ${libnetwork} + ${MPI_CXX_LIBRARIES} + TEST_SOURCES ${example_as_test_suite} + ) +endif() diff --git a/src/mpi/examples/CMakeLists.txt b/src/mpi/examples/CMakeLists.txt index 7688d4c6a..f778747e9 100644 --- a/src/mpi/examples/CMakeLists.txt +++ b/src/mpi/examples/CMakeLists.txt @@ -37,3 +37,18 @@ build_lib_example( ${libcsma} ${libapplications} ) + +if(${ENABLE_MTP}) + build_lib_example( + NAME simple-hybrid + SOURCE_FILES simple-hybrid.cc + mpi-test-fixtures.cc + LIBRARIES_TO_LINK + ${libmpi} + ${libpoint-to-point} + ${libinternet} + ${libnix-vector-routing} + ${libapplications} + ${libmtp} + ) +endif() \ No newline at end of file diff --git a/src/mpi/examples/simple-hybrid.cc b/src/mpi/examples/simple-hybrid.cc new file mode 100644 index 000000000..812ffac52 --- /dev/null +++ b/src/mpi/examples/simple-hybrid.cc @@ -0,0 +1,301 @@ +/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ +/* + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation; + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/** + * \file + * \ingroup mpi + * + * TestDistributed creates a dumbbell topology and logically splits it in + * half. The left half is placed on logical processor 0 and the right half + * is placed on logical processor 1. + * + * ------- ------- + * RANK 0 RANK 1 + * ------- | ------- + * | + * n0 ---------| | |---------- n6 + * | | | + * n1 -------\ | | | /------- n7 + * n4 ----------|---------- n5 + * n2 -------/ | | | \------- n8 + * | | | + * n3 ---------| | |---------- n9 + * + * + * OnOff clients are placed on each left leaf node. Each right leaf node + * is a packet sink for a left leaf node. As a packet travels from one + * logical processor to another (the link between n4 and n5), MPI messages + * are passed containing the serialized packet. The message is then + * deserialized into a new packet and sent on as normal. + * + * One packet is sent from each left leaf node. The packet sinks on the + * right leaf nodes output logging information when they receive the packet. + */ + +#include "mpi-test-fixtures.h" + +#include "ns3/core-module.h" +#include "ns3/network-module.h" +#include "ns3/mpi-interface.h" +#include "ns3/ipv4-global-routing-helper.h" +#include "ns3/point-to-point-helper.h" +#include "ns3/internet-stack-helper.h" +#include "ns3/nix-vector-helper.h" +#include "ns3/ipv4-address-helper.h" +#include "ns3/on-off-helper.h" +#include "ns3/packet-sink-helper.h" +#include "ns3/mtp-interface.h" +#include + +#include + +using namespace ns3; + +NS_LOG_COMPONENT_DEFINE ("SimpleDistributed"); + +int +main (int argc, char *argv[]) +{ + LogComponentEnable ("LogicalProcess", LOG_LEVEL_INFO); + + bool nix = true; + bool nullmsg = false; + bool tracing = false; + bool testing = false; + bool verbose = false; + + // Parse command line + CommandLine cmd (__FILE__); + cmd.AddValue ("nix", "Enable the use of nix-vector or global routing", nix); + cmd.AddValue ("nullmsg", "Enable the use of null-message synchronization", nullmsg); + cmd.AddValue ("tracing", "Enable pcap tracing", tracing); + cmd.AddValue ("verbose", "verbose output", verbose); + cmd.AddValue ("test", "Enable regression test output", testing); + cmd.Parse (argc, argv); + + // Distributed simulation setup; by default use granted time window algorithm. + if(nullmsg) + { + GlobalValue::Bind ("SimulatorImplementationType", + StringValue ("ns3::NullMessageSimulatorImpl")); + } + else + { + GlobalValue::Bind ("SimulatorImplementationType", + StringValue ("ns3::DistributedSimulatorImpl")); + } + + // Enable parallel simulator with the command line arguments + MtpInterface::Enable (); + MpiInterface::Enable (&argc, &argv); + + SinkTracer::Init (); + + if (verbose) + { + LogComponentEnable ("PacketSink", (LogLevel)(LOG_LEVEL_INFO | LOG_PREFIX_NODE | LOG_PREFIX_TIME)); + } + + uint32_t systemId = MpiInterface::GetSystemId (); + uint32_t systemCount = MpiInterface::GetSize (); + + // Check for valid distributed parameters. + // Must have 2 and only 2 Logical Processors (LPs) + if (systemCount != 2) + { + std::cout << "This simulation requires 2 and only 2 logical processors." << std::endl; + return 1; + } + + // Some default values + Config::SetDefault ("ns3::OnOffApplication::PacketSize", UintegerValue (512)); + Config::SetDefault ("ns3::OnOffApplication::DataRate", StringValue ("1Mbps")); + Config::SetDefault ("ns3::OnOffApplication::MaxBytes", UintegerValue (512)); + + // Create leaf nodes on left with system id 0 + NodeContainer leftLeafNodes; + leftLeafNodes.Create (4, 0); + + // Create router nodes. Left router + // with system id 0, right router with + // system id 1 + NodeContainer routerNodes; + Ptr routerNode1 = CreateObject (0); + Ptr routerNode2 = CreateObject (1); + routerNodes.Add (routerNode1); + routerNodes.Add (routerNode2); + + // Create leaf nodes on right with system id 1 + NodeContainer rightLeafNodes; + rightLeafNodes.Create (4, 1); + + PointToPointHelper routerLink; + routerLink.SetDeviceAttribute ("DataRate", StringValue ("5Mbps")); + routerLink.SetChannelAttribute ("Delay", StringValue ("5ms")); + + PointToPointHelper leafLink; + leafLink.SetDeviceAttribute ("DataRate", StringValue ("1Mbps")); + leafLink.SetChannelAttribute ("Delay", StringValue ("2ms")); + + // Add link connecting routers + NetDeviceContainer routerDevices; + routerDevices = routerLink.Install (routerNodes); + + // Add links for left side leaf nodes to left router + NetDeviceContainer leftRouterDevices; + NetDeviceContainer leftLeafDevices; + for (uint32_t i = 0; i < 4; ++i) + { + NetDeviceContainer temp = leafLink.Install (leftLeafNodes.Get (i), routerNodes.Get (0)); + leftLeafDevices.Add (temp.Get (0)); + leftRouterDevices.Add (temp.Get (1)); + } + + // Add links for right side leaf nodes to right router + NetDeviceContainer rightRouterDevices; + NetDeviceContainer rightLeafDevices; + for (uint32_t i = 0; i < 4; ++i) + { + NetDeviceContainer temp = leafLink.Install (rightLeafNodes.Get (i), routerNodes.Get (1)); + rightLeafDevices.Add (temp.Get (0)); + rightRouterDevices.Add (temp.Get (1)); + } + + InternetStackHelper stack; + if (nix) + { + Ipv4NixVectorHelper nixRouting; + stack.SetRoutingHelper (nixRouting); // has effect on the next Install () + } + + stack.InstallAll (); + + Ipv4InterfaceContainer routerInterfaces; + Ipv4InterfaceContainer leftLeafInterfaces; + Ipv4InterfaceContainer leftRouterInterfaces; + Ipv4InterfaceContainer rightLeafInterfaces; + Ipv4InterfaceContainer rightRouterInterfaces; + + Ipv4AddressHelper leftAddress; + leftAddress.SetBase ("10.1.1.0", "255.255.255.0"); + + Ipv4AddressHelper routerAddress; + routerAddress.SetBase ("10.2.1.0", "255.255.255.0"); + + Ipv4AddressHelper rightAddress; + rightAddress.SetBase ("10.3.1.0", "255.255.255.0"); + + // Router-to-Router interfaces + routerInterfaces = routerAddress.Assign (routerDevices); + + // Left interfaces + for (uint32_t i = 0; i < 4; ++i) + { + NetDeviceContainer ndc; + ndc.Add (leftLeafDevices.Get (i)); + ndc.Add (leftRouterDevices.Get (i)); + Ipv4InterfaceContainer ifc = leftAddress.Assign (ndc); + leftLeafInterfaces.Add (ifc.Get (0)); + leftRouterInterfaces.Add (ifc.Get (1)); + leftAddress.NewNetwork (); + } + + // Right interfaces + for (uint32_t i = 0; i < 4; ++i) + { + NetDeviceContainer ndc; + ndc.Add (rightLeafDevices.Get (i)); + ndc.Add (rightRouterDevices.Get (i)); + Ipv4InterfaceContainer ifc = rightAddress.Assign (ndc); + rightLeafInterfaces.Add (ifc.Get (0)); + rightRouterInterfaces.Add (ifc.Get (1)); + rightAddress.NewNetwork (); + } + + if (!nix) + { + Ipv4GlobalRoutingHelper::PopulateRoutingTables (); + } + + if (tracing == true) + { + if (systemId == 0) + { + routerLink.EnablePcap("router-left", routerDevices, true); + leafLink.EnablePcap("leaf-left", leftLeafDevices, true); + } + + if (systemId == 1) + { + routerLink.EnablePcap("router-right", routerDevices, true); + leafLink.EnablePcap("leaf-right", rightLeafDevices, true); + } + } + + // Create a packet sink on the right leafs to receive packets from left leafs + + uint16_t port = 50000; + if (systemId == 1) + { + Address sinkLocalAddress (InetSocketAddress (Ipv4Address::GetAny (), port)); + PacketSinkHelper sinkHelper ("ns3::UdpSocketFactory", sinkLocalAddress); + ApplicationContainer sinkApp; + for (uint32_t i = 0; i < 4; ++i) + { + sinkApp.Add (sinkHelper.Install (rightLeafNodes.Get (i))); + if (testing) + { + sinkApp.Get (i)->TraceConnectWithoutContext ("RxWithAddresses", MakeCallback (&SinkTracer::SinkTrace)); + } + } + sinkApp.Start (Seconds (1.0)); + sinkApp.Stop (Seconds (5)); + } + + // Create the OnOff applications to send + if (systemId == 0) + { + OnOffHelper clientHelper ("ns3::UdpSocketFactory", Address ()); + clientHelper.SetAttribute + ("OnTime", StringValue ("ns3::ConstantRandomVariable[Constant=1]")); + clientHelper.SetAttribute + ("OffTime", StringValue ("ns3::ConstantRandomVariable[Constant=0]")); + + ApplicationContainer clientApps; + for (uint32_t i = 0; i < 4; ++i) + { + AddressValue remoteAddress + (InetSocketAddress (rightLeafInterfaces.GetAddress (i), port)); + clientHelper.SetAttribute ("Remote", remoteAddress); + clientApps.Add (clientHelper.Install (leftLeafNodes.Get (i))); + } + clientApps.Start (Seconds (1.0)); + clientApps.Stop (Seconds (5)); + } + + Simulator::Stop (Seconds (5)); + Simulator::Run (); + Simulator::Destroy (); + + if (testing) + { + SinkTracer::Verify (4); + } + + // Exit the MPI execution environment + MpiInterface::Disable (); + return 0; +} diff --git a/src/mpi/model/granted-time-window-mpi-interface.cc b/src/mpi/model/granted-time-window-mpi-interface.cc index cbef1c109..8397c9b22 100644 --- a/src/mpi/model/granted-time-window-mpi-interface.cc +++ b/src/mpi/model/granted-time-window-mpi-interface.cc @@ -40,6 +40,9 @@ #include "ns3/simulator-impl.h" #include "ns3/nstime.h" #include "ns3/log.h" +#ifdef NS3_MTP +#include "ns3/mtp-interface.h" +#endif #include @@ -91,6 +94,10 @@ char** GrantedTimeWindowMpiInterface::g_pRxBuffers; MPI_Comm GrantedTimeWindowMpiInterface::g_communicator = MPI_COMM_WORLD; bool GrantedTimeWindowMpiInterface::g_freeCommunicator = false;; +#ifdef NS3_MTP +std::atomic GrantedTimeWindowMpiInterface::g_sending (false); +#endif + TypeId GrantedTimeWindowMpiInterface::GetTypeId (void) { @@ -210,6 +217,11 @@ GrantedTimeWindowMpiInterface::SendPacket (Ptr p, const Time& rxTime, ui { NS_LOG_FUNCTION (this << p << rxTime.GetTimeStep () << node << dev); +#ifdef NS3_MTP + while (g_sending.exchange (true, std::memory_order_acquire)) + ; +#endif + SentBuffer sendBuf; g_pendingTx.push_back (sendBuf); std::list::reverse_iterator i = g_pendingTx.rbegin (); // Points to the last element @@ -229,11 +241,19 @@ GrantedTimeWindowMpiInterface::SendPacket (Ptr p, const Time& rxTime, ui // Find the system id for the destination node Ptr destNode = NodeList::GetNode (node); +#ifdef NS3_MTP + uint32_t nodeSysId = destNode->GetSystemId () & 0xFFFF; +#else uint32_t nodeSysId = destNode->GetSystemId (); +#endif MPI_Isend (reinterpret_cast (i->GetBuffer ()), serializedSize + 16, MPI_CHAR, nodeSysId, 0, g_communicator, (i->GetRequest ())); g_txCount++; + +#ifdef NS3_MTP + g_sending.store(false, std::memory_order_release); +#endif } void @@ -287,8 +307,13 @@ GrantedTimeWindowMpiInterface::ReceiveMessages () NS_ASSERT (pNode && pMpiRec); // Schedule the rx event +#ifdef NS3_MTP + MtpInterface::GetSystem (pNode->GetSystemId () >> 16) + ->ScheduleAt (pNode->GetId (), rxTime, MakeEvent (&MpiReceiver::Receive, pMpiRec, p)); +#else Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (), &MpiReceiver::Receive, pMpiRec, p); +#endif // Re-queue the next read MPI_Irecv (g_pRxBuffers[index], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0, diff --git a/src/mpi/model/granted-time-window-mpi-interface.h b/src/mpi/model/granted-time-window-mpi-interface.h index 95be1af6e..10c9938c9 100644 --- a/src/mpi/model/granted-time-window-mpi-interface.h +++ b/src/mpi/model/granted-time-window-mpi-interface.h @@ -29,6 +29,7 @@ #define NS3_GRANTED_TIME_WINDOW_MPI_INTERFACE_H #include +#include #include #include "ns3/nstime.h" @@ -80,6 +81,7 @@ private: class Packet; class DistributedSimulatorImpl; +class HybridSimulatorImpl; /** * \ingroup mpi @@ -110,8 +112,6 @@ public: virtual void SendPacket (Ptr p, const Time &rxTime, uint32_t node, uint32_t dev); virtual MPI_Comm GetCommunicator(); -private: - /* * The granted time window implementation is a collaboration of several * classes. Methods that should be invoked only by the @@ -119,7 +119,8 @@ private: * It is not intended for state to be shared. */ friend ns3::DistributedSimulatorImpl; - + friend ns3::HybridSimulatorImpl; + /** * Check for received messages complete */ @@ -136,7 +137,7 @@ private: * \return transmitted count in packets */ static uint32_t GetTxCount (); - + /** System ID (rank) for this task. */ static uint32_t g_sid; /** Size of the MPI COM_WORLD group. */ @@ -171,6 +172,10 @@ private: /** Did ns-3 create the communicator? Have to free it. */ static bool g_freeCommunicator; + +#ifdef NS3_MTP + static std::atomic g_sending; +#endif }; } // namespace ns3 diff --git a/src/mpi/model/hybrid-simulator-impl.cc b/src/mpi/model/hybrid-simulator-impl.cc new file mode 100644 index 000000000..2488d2ffb --- /dev/null +++ b/src/mpi/model/hybrid-simulator-impl.cc @@ -0,0 +1,439 @@ +/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ + +#include "hybrid-simulator-impl.h" +#include "granted-time-window-mpi-interface.h" +#include "mpi-interface.h" + +#include "ns3/channel.h" +#include "ns3/simulator.h" +#include "ns3/node.h" +#include "ns3/node-container.h" +#include "ns3/node-list.h" +#include "ns3/uinteger.h" +#include "ns3/mtp-interface.h" + +#include +#include +#include +#include + +namespace ns3 { + +NS_LOG_COMPONENT_DEFINE ("HybridSimulatorImpl"); + +NS_OBJECT_ENSURE_REGISTERED (HybridSimulatorImpl); + +HybridSimulatorImpl::HybridSimulatorImpl () +{ + NS_LOG_FUNCTION (this); + + MtpInterface::Enable (1, 0); + m_myId = MpiInterface::GetSystemId (); + m_systemCount = MpiInterface::GetSize (); + + // Allocate the LBTS message buffer + m_pLBTS = new LbtsMessage[m_systemCount]; + m_smallestTime = Seconds (0); + m_globalFinished = false; +} + +HybridSimulatorImpl::~HybridSimulatorImpl () +{ + NS_LOG_FUNCTION (this); +} + +TypeId +HybridSimulatorImpl::GetTypeId (void) +{ + static TypeId tid = + TypeId ("ns3::HybridSimulatorImpl") + .SetParent () + .SetGroupName ("Mtp") + .AddConstructor () + .AddAttribute ("MaxThreads", "The maximum threads used in simulation", + UintegerValue (std::thread::hardware_concurrency ()), + MakeUintegerAccessor (&HybridSimulatorImpl::m_maxThreads), + MakeUintegerChecker (1)) + .AddAttribute ("MinLookahead", "The minimum lookahead in a partition", + TimeValue (TimeStep (1)), + MakeTimeAccessor (&HybridSimulatorImpl::m_minLookahead), + MakeTimeChecker (TimeStep (0))); + return tid; +} + +void +HybridSimulatorImpl::Destroy () +{ + while (!m_destroyEvents.empty ()) + { + Ptr ev = m_destroyEvents.front ().PeekEventImpl (); + m_destroyEvents.pop_front (); + NS_LOG_LOGIC ("handle destroy " << ev); + if (!ev->IsCancelled ()) + { + ev->Invoke (); + } + } + + MtpInterface::Disable (); + MpiInterface::Destroy (); +} + +bool +HybridSimulatorImpl::IsFinished (void) const +{ + return m_globalFinished; +} + +bool +HybridSimulatorImpl::IsLocalFinished (void) const +{ + return MtpInterface::isFinished (); +} + +void +HybridSimulatorImpl::Stop (void) +{ + NS_LOG_FUNCTION (this); + for (uint32_t i = 0; i < MtpInterface::GetSize (); i++) + { + MtpInterface::GetSystem (i)->Stop (); + } +} + +void +HybridSimulatorImpl::Stop (Time const &delay) +{ + NS_LOG_FUNCTION (this << delay.GetTimeStep ()); + Simulator::Schedule (delay, &Simulator::Stop); +} + +EventId +HybridSimulatorImpl::Schedule (Time const &delay, EventImpl *event) +{ + NS_LOG_FUNCTION (this << delay.GetTimeStep () << event); + return MtpInterface::GetSystem ()->Schedule (delay, event); +} + +void +HybridSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &delay, EventImpl *event) +{ + NS_LOG_FUNCTION (this << context << delay.GetTimeStep () << event); + + if (MtpInterface::GetSize () == 1) + { + // initialization stage, do not schedule remote + LogicalProcess *local = MtpInterface::GetSystem (); + local->ScheduleWithContext (local, context, delay, event); + } + else + { + LogicalProcess *remote = + MtpInterface::GetSystem (NodeList::GetNode (context)->GetSystemId () >> 16); + MtpInterface::GetSystem ()->ScheduleWithContext (remote, context, delay, event); + } +} + +EventId +HybridSimulatorImpl::ScheduleNow (EventImpl *event) +{ + return Schedule (TimeStep (0), event); +} + +EventId +HybridSimulatorImpl::ScheduleDestroy (EventImpl *event) +{ + EventId id (Ptr (event, false), GetMaximumSimulationTime ().GetTimeStep (), 0xffffffff, + EventId::DESTROY); + MtpInterface::CriticalSection cs; + m_destroyEvents.push_back (id); + return id; +} + +void +HybridSimulatorImpl::Remove (const EventId &id) +{ + if (id.GetUid () == EventId::DESTROY) + { + // destroy events. + for (std::list::iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); + i++) + { + if (*i == id) + { + m_destroyEvents.erase (i); + break; + } + } + } + else + { + MtpInterface::GetSystem ()->Remove (id); + } +} + +void +HybridSimulatorImpl::Cancel (const EventId &id) +{ + if (!IsExpired (id)) + { + id.PeekEventImpl ()->Cancel (); + } +} + +bool +HybridSimulatorImpl::IsExpired (const EventId &id) const +{ + if (id.GetUid () == EventId::DESTROY) + { + // destroy events. + if (id.PeekEventImpl () == 0 || id.PeekEventImpl ()->IsCancelled ()) + { + return true; + } + for (std::list::const_iterator i = m_destroyEvents.begin (); + i != m_destroyEvents.end (); i++) + { + if (*i == id) + { + return false; + } + } + return true; + } + else + { + return MtpInterface::GetSystem ()->IsExpired (id); + } +} + +void +HybridSimulatorImpl::Run (void) +{ + NS_LOG_FUNCTION (this); + + Partition (); + MtpInterface::RunBefore (); + + m_globalFinished = false; + while (!m_globalFinished) + { + GrantedTimeWindowMpiInterface::ReceiveMessages (); + GrantedTimeWindowMpiInterface::TestSendComplete (); + MtpInterface::CalculateSmallestTime (); + LbtsMessage lMsg (GrantedTimeWindowMpiInterface::GetRxCount (), + GrantedTimeWindowMpiInterface::GetTxCount (), m_myId, IsLocalFinished (), + MtpInterface::GetSmallestTime ()); + m_pLBTS[m_myId] = lMsg; + MPI_Allgather (&lMsg, sizeof (LbtsMessage), MPI_BYTE, m_pLBTS, sizeof (LbtsMessage), MPI_BYTE, + MpiInterface::GetCommunicator ()); + m_smallestTime = m_pLBTS[0].GetSmallestTime (); + + // The totRx and totTx counts insure there are no transient + // messages; If totRx != totTx, there are transients, + // so we don't update the granted time. + uint32_t totRx = m_pLBTS[0].GetRxCount (); + uint32_t totTx = m_pLBTS[0].GetTxCount (); + m_globalFinished = m_pLBTS[0].IsFinished (); + + // calculate smallest time of all hosts + for (uint32_t i = 1; i < m_systemCount; ++i) + { + if (m_pLBTS[i].GetSmallestTime () < m_smallestTime) + { + m_smallestTime = m_pLBTS[i].GetSmallestTime (); + } + totRx += m_pLBTS[i].GetRxCount (); + totTx += m_pLBTS[i].GetTxCount (); + m_globalFinished &= m_pLBTS[i].IsFinished (); + } + MtpInterface::SetSmallestTime (m_smallestTime); + + // Global halting condition is all nodes have empty queue's and + // no messages are in-flight. + m_globalFinished &= totRx == totTx; + + // Execute next event if it is within the current time window. + // Local task may be completed. + if (totRx == totTx && !IsLocalFinished ()) + { // Safe to process + MtpInterface::ProcessOneRound (); + } + } + + MtpInterface::RunAfter (); +} + +Time +HybridSimulatorImpl::Now (void) const +{ + // Do not add function logging here, to avoid stack overflow + return MtpInterface::GetSystem ()->Now (); +} + +Time +HybridSimulatorImpl::GetDelayLeft (const EventId &id) const +{ + if (IsExpired (id)) + { + return TimeStep (0); + } + else + { + return MtpInterface::GetSystem ()->GetDelayLeft (id); + } +} + +Time +HybridSimulatorImpl::GetMaximumSimulationTime (void) const +{ + return Time::Max () / 2; +} + +void +HybridSimulatorImpl::SetScheduler (ObjectFactory schedulerFactory) +{ + NS_LOG_FUNCTION (this << schedulerFactory); + for (uint32_t i = 0; i < MtpInterface::GetSize (); i++) + { + MtpInterface::GetSystem (i)->SetScheduler (schedulerFactory); + } + m_schedulerTypeId = schedulerFactory.GetTypeId (); +} + +uint32_t +HybridSimulatorImpl::GetSystemId () const +{ + return m_myId; +} + +uint32_t +HybridSimulatorImpl::GetContext (void) const +{ + return MtpInterface::GetSystem ()->GetContext (); +} + +uint64_t +HybridSimulatorImpl::GetEventCount (void) const +{ + uint64_t eventCount = 0; + for (uint32_t i = 0; i < MtpInterface::GetSize (); i++) + { + eventCount += MtpInterface::GetSystem (i)->GetEventCount (); + } + return eventCount; +} + +void +HybridSimulatorImpl::DoDispose (void) +{ + delete[] m_pLBTS; + SimulatorImpl::DoDispose (); +} + +void +HybridSimulatorImpl::Partition () +{ + NS_LOG_FUNCTION (this); + uint32_t localSystemId = 0; + NodeContainer nodes = NodeContainer::GetGlobal (); + bool *visited = new bool[nodes.GetN ()]{false}; + std::queue> q; + + // perform a BFS on the whole network topo to assign each node a localSystemId + for (NodeContainer::Iterator it = nodes.Begin (); it != nodes.End (); it++) + { + Ptr node = *it; + if (!visited[node->GetId ()] && node->GetSystemId () == m_myId) + { + q.push (node); + localSystemId++; + while (!q.empty ()) + { + // pop from BFS queue + node = q.front (); + q.pop (); + visited[node->GetId ()] = true; + // assign this node the current localSystemId + node->SetSystemId (localSystemId << 16 | m_myId); + NS_LOG_INFO ("node " << node->GetId () << " is set to local system " + << localSystemId); + + for (uint32_t i = 0; i < node->GetNDevices (); i++) + { + Ptr localNetDevice = node->GetDevice (i); + Ptr channel = localNetDevice->GetChannel (); + if (channel == 0) + { + continue; + } + // cut-off p2p links for partition + if (localNetDevice->IsPointToPoint ()) + { + TimeValue delay; + channel->GetAttribute ("Delay", delay); + // if delay is below threshold, do not cut-off + if (delay.Get () >= m_minLookahead) + { + continue; + } + } + // grab the adjacent nodes + for (uint32_t j = 0; j < channel->GetNDevices (); j++) + { + Ptr remote = channel->GetDevice (j)->GetNode (); + // if it's not visited, and not remote, add it to the current partition + if (!visited[remote->GetId ()] && node->GetSystemId () == m_myId) + { + q.push (remote); + } + } + } + } + } + } + delete[] visited; + + // after the partition, we finally know the system count + uint32_t systemCount = localSystemId; + uint32_t threadCount = std::min (m_maxThreads, systemCount); + NS_LOG_INFO ("Partition done! " << systemCount << " systems share " << threadCount << " threads"); + + // create new systems + Ptr events = MtpInterface::GetSystem ()->GetPendingEvents (); + MtpInterface::Disable (); + MtpInterface::Enable (threadCount, systemCount); + + // set scheduler + ObjectFactory schedulerFactory; + schedulerFactory.SetTypeId (m_schedulerTypeId); + for (uint32_t i = 0; i <= systemCount; i++) + { + MtpInterface::GetSystem (i)->SetScheduler (schedulerFactory); + } + + // transfer events to new system + while (!events->IsEmpty ()) + { + Scheduler::Event ev = events->RemoveNext (); + // invoke initialization events (at time 0) by their insertion order + // since they may not be in the same system, causing error + if (ev.key.m_ts == 0) + { + MtpInterface::GetSystem (ev.key.m_context == Simulator::NO_CONTEXT + ? 0 + : NodeList::GetNode (ev.key.m_context)->GetSystemId () >> 16) + ->InvokeNow (ev); + } + else if (ev.key.m_context == Simulator::NO_CONTEXT) + { + Schedule (TimeStep (ev.key.m_ts), ev.impl); + } + else + { + ScheduleWithContext (ev.key.m_context, TimeStep (ev.key.m_ts), ev.impl); + } + } +} + +} // namespace ns3 diff --git a/src/mpi/model/hybrid-simulator-impl.h b/src/mpi/model/hybrid-simulator-impl.h new file mode 100644 index 000000000..61e4e21a9 --- /dev/null +++ b/src/mpi/model/hybrid-simulator-impl.h @@ -0,0 +1,72 @@ +/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ +#ifndef NS3_HYBRID_SIMULATOR_IMPL_H +#define NS3_HYBRID_SIMULATOR_IMPL_H + +#include "distributed-simulator-impl.h" + +#include "ns3/event-id.h" +#include "ns3/event-impl.h" +#include "ns3/nstime.h" +#include "ns3/object-factory.h" +#include "ns3/simulator-impl.h" + +#include + +namespace ns3 { + +class HybridSimulatorImpl : public SimulatorImpl +{ +public: + static TypeId GetTypeId (void); + + /** Default constructor. */ + HybridSimulatorImpl (); + /** Destructor. */ + ~HybridSimulatorImpl (); + + // virtual from SimulatorImpl + virtual void Destroy (); + virtual bool IsFinished (void) const; + virtual void Stop (void); + virtual void Stop (Time const &delay); + virtual EventId Schedule (Time const &delay, EventImpl *event); + virtual void ScheduleWithContext (uint32_t context, Time const &delay, EventImpl *event); + virtual EventId ScheduleNow (EventImpl *event); + virtual EventId ScheduleDestroy (EventImpl *event); + virtual void Remove (const EventId &id); + virtual void Cancel (const EventId &id); + virtual bool IsExpired (const EventId &id) const; + virtual void Run (void); + virtual Time Now (void) const; + virtual Time GetDelayLeft (const EventId &id) const; + virtual Time GetMaximumSimulationTime (void) const; + virtual void SetScheduler (ObjectFactory schedulerFactory); + virtual uint32_t GetSystemId (void) const; + virtual uint32_t GetContext (void) const; + virtual uint64_t GetEventCount (void) const; + +private: + // Inherited from Object + virtual void DoDispose (void); + + bool IsLocalFinished (void) const; + + /** Are all parallel instances completed. */ + bool m_globalFinished; + + LbtsMessage *m_pLBTS; + uint32_t m_myId; /**< MPI rank. */ + uint32_t m_systemCount; /**< MPI communicator size. */ + Time m_smallestTime; /**< End of current window. */ + + void Partition (); + + uint32_t m_maxThreads; + Time m_minLookahead; + TypeId m_schedulerTypeId; + std::list m_destroyEvents; +}; + +} // namespace ns3 + +#endif /* NS3_HYBRID_SIMULATOR_IMPL_H */ diff --git a/src/mpi/model/mpi-interface.cc b/src/mpi/model/mpi-interface.cc index 983ed0472..2fd2dd6e8 100644 --- a/src/mpi/model/mpi-interface.cc +++ b/src/mpi/model/mpi-interface.cc @@ -94,7 +94,7 @@ MpiInterface::SetParallelSimulatorImpl (void) g_parallelCommunicationInterface = new NullMessageMpiInterface (); useDefault = false; } - else if (simulationType.compare ("ns3::DistributedSimulatorImpl") == 0) + else if (simulationType.compare ("ns3::DistributedSimulatorImpl") == 0 || simulationType.compare("ns3::HybridSimulatorImpl") == 0) { g_parallelCommunicationInterface = new GrantedTimeWindowMpiInterface (); useDefault = false; diff --git a/src/mtp/model/logical-process.cc b/src/mtp/model/logical-process.cc index be12de7f6..5ae82b8c1 100644 --- a/src/mtp/model/logical-process.cc +++ b/src/mtp/model/logical-process.cc @@ -65,10 +65,17 @@ LogicalProcess::CalculateLookAhead () NodeContainer c = NodeContainer::GetGlobal (); for (NodeContainer::Iterator iter = c.Begin (); iter != c.End (); ++iter) { +#ifdef NS3_MPI + if (((*iter)->GetSystemId () >> 16) != m_systemId) + { + continue; + } +#else if ((*iter)->GetSystemId () != m_systemId) { continue; } +#endif for (uint32_t i = 0; i < (*iter)->GetNDevices (); ++i) { Ptr localNetDevice = (*iter)->GetDevice (i); @@ -184,8 +191,20 @@ LogicalProcess::Schedule (Time const &delay, EventImpl *event) } void -LogicalProcess::ScheduleWithContext (LogicalProcess *remote, uint32_t context, Time const &delay, - EventImpl *event) +LogicalProcess::ScheduleAt (const uint32_t context, Time const &time, EventImpl *event) +{ + Scheduler::Event ev; + + ev.impl = event; + ev.key.m_ts = time.GetTimeStep (); + ev.key.m_context = context; + ev.key.m_uid = m_uid++; + m_events->Insert (ev); +} + +void +LogicalProcess::ScheduleWithContext (LogicalProcess *remote, const uint32_t context, + Time const &delay, EventImpl *event) { Scheduler::Event ev; diff --git a/src/mtp/model/logical-process.h b/src/mtp/model/logical-process.h index 32ccb04f1..fa0ba3e2d 100644 --- a/src/mtp/model/logical-process.h +++ b/src/mtp/model/logical-process.h @@ -48,7 +48,8 @@ public: // mapped from MultithreadedSimulatorImpl EventId Schedule (Time const &delay, EventImpl *event); - void ScheduleWithContext (LogicalProcess *remote, uint32_t context, Time const &delay, + void ScheduleAt (const uint32_t context, Time const &time, EventImpl *event); + void ScheduleWithContext (LogicalProcess *remote, const uint32_t context, Time const &delay, EventImpl *event); void InvokeNow (Scheduler::Event const &ev); // cross context immediate invocation void Remove (const EventId &id);