mpi, mtp: Add hybrid simulation support

This commit is contained in:
F5
2022-10-25 17:10:23 +08:00
parent f818faabcd
commit af331aed29
10 changed files with 934 additions and 30 deletions

View File

@@ -5,25 +5,52 @@ if(${ENABLE_EXAMPLES})
)
endif()
build_lib(
LIBNAME mpi
SOURCE_FILES
model/distributed-simulator-impl.cc
model/granted-time-window-mpi-interface.cc
model/mpi-interface.cc
model/mpi-receiver.cc
model/null-message-mpi-interface.cc
model/null-message-simulator-impl.cc
model/parallel-communication-interface.h
model/remote-channel-bundle-manager.cc
model/remote-channel-bundle.cc
HEADER_FILES
model/mpi-interface.h
model/mpi-receiver.h
model/parallel-communication-interface.h
LIBRARIES_TO_LINK
${libcore}
${libnetwork}
${MPI_CXX_LIBRARIES}
TEST_SOURCES ${example_as_test_suite}
)
if(${ENABLE_MTP})
build_lib(
LIBNAME mpi
SOURCE_FILES
model/distributed-simulator-impl.cc
model/granted-time-window-mpi-interface.cc
model/hybrid-simulator-impl.cc
model/mpi-interface.cc
model/mpi-receiver.cc
model/null-message-mpi-interface.cc
model/null-message-simulator-impl.cc
model/parallel-communication-interface.h
model/remote-channel-bundle-manager.cc
model/remote-channel-bundle.cc
HEADER_FILES
model/mpi-interface.h
model/mpi-receiver.h
model/parallel-communication-interface.h
LIBRARIES_TO_LINK
${libcore}
${libnetwork}
${libmtp}
${MPI_CXX_LIBRARIES}
TEST_SOURCES ${example_as_test_suite}
)
else()
build_lib(
LIBNAME mpi
SOURCE_FILES
model/distributed-simulator-impl.cc
model/granted-time-window-mpi-interface.cc
model/mpi-interface.cc
model/mpi-receiver.cc
model/null-message-mpi-interface.cc
model/null-message-simulator-impl.cc
model/parallel-communication-interface.h
model/remote-channel-bundle-manager.cc
model/remote-channel-bundle.cc
HEADER_FILES
model/mpi-interface.h
model/mpi-receiver.h
model/parallel-communication-interface.h
LIBRARIES_TO_LINK
${libcore}
${libnetwork}
${MPI_CXX_LIBRARIES}
TEST_SOURCES ${example_as_test_suite}
)
endif()

View File

@@ -37,3 +37,18 @@ build_lib_example(
${libcsma}
${libapplications}
)
if(${ENABLE_MTP})
build_lib_example(
NAME simple-hybrid
SOURCE_FILES simple-hybrid.cc
mpi-test-fixtures.cc
LIBRARIES_TO_LINK
${libmpi}
${libpoint-to-point}
${libinternet}
${libnix-vector-routing}
${libapplications}
${libmtp}
)
endif()

View File

@@ -0,0 +1,301 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation;
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/**
* \file
* \ingroup mpi
*
* TestDistributed creates a dumbbell topology and logically splits it in
* half. The left half is placed on logical processor 0 and the right half
* is placed on logical processor 1.
*
* ------- -------
* RANK 0 RANK 1
* ------- | -------
* |
* n0 ---------| | |---------- n6
* | | |
* n1 -------\ | | | /------- n7
* n4 ----------|---------- n5
* n2 -------/ | | | \------- n8
* | | |
* n3 ---------| | |---------- n9
*
*
* OnOff clients are placed on each left leaf node. Each right leaf node
* is a packet sink for a left leaf node. As a packet travels from one
* logical processor to another (the link between n4 and n5), MPI messages
* are passed containing the serialized packet. The message is then
* deserialized into a new packet and sent on as normal.
*
* One packet is sent from each left leaf node. The packet sinks on the
* right leaf nodes output logging information when they receive the packet.
*/
#include "mpi-test-fixtures.h"
#include "ns3/core-module.h"
#include "ns3/network-module.h"
#include "ns3/mpi-interface.h"
#include "ns3/ipv4-global-routing-helper.h"
#include "ns3/point-to-point-helper.h"
#include "ns3/internet-stack-helper.h"
#include "ns3/nix-vector-helper.h"
#include "ns3/ipv4-address-helper.h"
#include "ns3/on-off-helper.h"
#include "ns3/packet-sink-helper.h"
#include "ns3/mtp-interface.h"
#include <mpi.h>
#include <iomanip>
using namespace ns3;
NS_LOG_COMPONENT_DEFINE ("SimpleDistributed");
int
main (int argc, char *argv[])
{
LogComponentEnable ("LogicalProcess", LOG_LEVEL_INFO);
bool nix = true;
bool nullmsg = false;
bool tracing = false;
bool testing = false;
bool verbose = false;
// Parse command line
CommandLine cmd (__FILE__);
cmd.AddValue ("nix", "Enable the use of nix-vector or global routing", nix);
cmd.AddValue ("nullmsg", "Enable the use of null-message synchronization", nullmsg);
cmd.AddValue ("tracing", "Enable pcap tracing", tracing);
cmd.AddValue ("verbose", "verbose output", verbose);
cmd.AddValue ("test", "Enable regression test output", testing);
cmd.Parse (argc, argv);
// Distributed simulation setup; by default use granted time window algorithm.
if(nullmsg)
{
GlobalValue::Bind ("SimulatorImplementationType",
StringValue ("ns3::NullMessageSimulatorImpl"));
}
else
{
GlobalValue::Bind ("SimulatorImplementationType",
StringValue ("ns3::DistributedSimulatorImpl"));
}
// Enable parallel simulator with the command line arguments
MtpInterface::Enable ();
MpiInterface::Enable (&argc, &argv);
SinkTracer::Init ();
if (verbose)
{
LogComponentEnable ("PacketSink", (LogLevel)(LOG_LEVEL_INFO | LOG_PREFIX_NODE | LOG_PREFIX_TIME));
}
uint32_t systemId = MpiInterface::GetSystemId ();
uint32_t systemCount = MpiInterface::GetSize ();
// Check for valid distributed parameters.
// Must have 2 and only 2 Logical Processors (LPs)
if (systemCount != 2)
{
std::cout << "This simulation requires 2 and only 2 logical processors." << std::endl;
return 1;
}
// Some default values
Config::SetDefault ("ns3::OnOffApplication::PacketSize", UintegerValue (512));
Config::SetDefault ("ns3::OnOffApplication::DataRate", StringValue ("1Mbps"));
Config::SetDefault ("ns3::OnOffApplication::MaxBytes", UintegerValue (512));
// Create leaf nodes on left with system id 0
NodeContainer leftLeafNodes;
leftLeafNodes.Create (4, 0);
// Create router nodes. Left router
// with system id 0, right router with
// system id 1
NodeContainer routerNodes;
Ptr<Node> routerNode1 = CreateObject<Node> (0);
Ptr<Node> routerNode2 = CreateObject<Node> (1);
routerNodes.Add (routerNode1);
routerNodes.Add (routerNode2);
// Create leaf nodes on right with system id 1
NodeContainer rightLeafNodes;
rightLeafNodes.Create (4, 1);
PointToPointHelper routerLink;
routerLink.SetDeviceAttribute ("DataRate", StringValue ("5Mbps"));
routerLink.SetChannelAttribute ("Delay", StringValue ("5ms"));
PointToPointHelper leafLink;
leafLink.SetDeviceAttribute ("DataRate", StringValue ("1Mbps"));
leafLink.SetChannelAttribute ("Delay", StringValue ("2ms"));
// Add link connecting routers
NetDeviceContainer routerDevices;
routerDevices = routerLink.Install (routerNodes);
// Add links for left side leaf nodes to left router
NetDeviceContainer leftRouterDevices;
NetDeviceContainer leftLeafDevices;
for (uint32_t i = 0; i < 4; ++i)
{
NetDeviceContainer temp = leafLink.Install (leftLeafNodes.Get (i), routerNodes.Get (0));
leftLeafDevices.Add (temp.Get (0));
leftRouterDevices.Add (temp.Get (1));
}
// Add links for right side leaf nodes to right router
NetDeviceContainer rightRouterDevices;
NetDeviceContainer rightLeafDevices;
for (uint32_t i = 0; i < 4; ++i)
{
NetDeviceContainer temp = leafLink.Install (rightLeafNodes.Get (i), routerNodes.Get (1));
rightLeafDevices.Add (temp.Get (0));
rightRouterDevices.Add (temp.Get (1));
}
InternetStackHelper stack;
if (nix)
{
Ipv4NixVectorHelper nixRouting;
stack.SetRoutingHelper (nixRouting); // has effect on the next Install ()
}
stack.InstallAll ();
Ipv4InterfaceContainer routerInterfaces;
Ipv4InterfaceContainer leftLeafInterfaces;
Ipv4InterfaceContainer leftRouterInterfaces;
Ipv4InterfaceContainer rightLeafInterfaces;
Ipv4InterfaceContainer rightRouterInterfaces;
Ipv4AddressHelper leftAddress;
leftAddress.SetBase ("10.1.1.0", "255.255.255.0");
Ipv4AddressHelper routerAddress;
routerAddress.SetBase ("10.2.1.0", "255.255.255.0");
Ipv4AddressHelper rightAddress;
rightAddress.SetBase ("10.3.1.0", "255.255.255.0");
// Router-to-Router interfaces
routerInterfaces = routerAddress.Assign (routerDevices);
// Left interfaces
for (uint32_t i = 0; i < 4; ++i)
{
NetDeviceContainer ndc;
ndc.Add (leftLeafDevices.Get (i));
ndc.Add (leftRouterDevices.Get (i));
Ipv4InterfaceContainer ifc = leftAddress.Assign (ndc);
leftLeafInterfaces.Add (ifc.Get (0));
leftRouterInterfaces.Add (ifc.Get (1));
leftAddress.NewNetwork ();
}
// Right interfaces
for (uint32_t i = 0; i < 4; ++i)
{
NetDeviceContainer ndc;
ndc.Add (rightLeafDevices.Get (i));
ndc.Add (rightRouterDevices.Get (i));
Ipv4InterfaceContainer ifc = rightAddress.Assign (ndc);
rightLeafInterfaces.Add (ifc.Get (0));
rightRouterInterfaces.Add (ifc.Get (1));
rightAddress.NewNetwork ();
}
if (!nix)
{
Ipv4GlobalRoutingHelper::PopulateRoutingTables ();
}
if (tracing == true)
{
if (systemId == 0)
{
routerLink.EnablePcap("router-left", routerDevices, true);
leafLink.EnablePcap("leaf-left", leftLeafDevices, true);
}
if (systemId == 1)
{
routerLink.EnablePcap("router-right", routerDevices, true);
leafLink.EnablePcap("leaf-right", rightLeafDevices, true);
}
}
// Create a packet sink on the right leafs to receive packets from left leafs
uint16_t port = 50000;
if (systemId == 1)
{
Address sinkLocalAddress (InetSocketAddress (Ipv4Address::GetAny (), port));
PacketSinkHelper sinkHelper ("ns3::UdpSocketFactory", sinkLocalAddress);
ApplicationContainer sinkApp;
for (uint32_t i = 0; i < 4; ++i)
{
sinkApp.Add (sinkHelper.Install (rightLeafNodes.Get (i)));
if (testing)
{
sinkApp.Get (i)->TraceConnectWithoutContext ("RxWithAddresses", MakeCallback (&SinkTracer::SinkTrace));
}
}
sinkApp.Start (Seconds (1.0));
sinkApp.Stop (Seconds (5));
}
// Create the OnOff applications to send
if (systemId == 0)
{
OnOffHelper clientHelper ("ns3::UdpSocketFactory", Address ());
clientHelper.SetAttribute
("OnTime", StringValue ("ns3::ConstantRandomVariable[Constant=1]"));
clientHelper.SetAttribute
("OffTime", StringValue ("ns3::ConstantRandomVariable[Constant=0]"));
ApplicationContainer clientApps;
for (uint32_t i = 0; i < 4; ++i)
{
AddressValue remoteAddress
(InetSocketAddress (rightLeafInterfaces.GetAddress (i), port));
clientHelper.SetAttribute ("Remote", remoteAddress);
clientApps.Add (clientHelper.Install (leftLeafNodes.Get (i)));
}
clientApps.Start (Seconds (1.0));
clientApps.Stop (Seconds (5));
}
Simulator::Stop (Seconds (5));
Simulator::Run ();
Simulator::Destroy ();
if (testing)
{
SinkTracer::Verify (4);
}
// Exit the MPI execution environment
MpiInterface::Disable ();
return 0;
}

View File

@@ -40,6 +40,9 @@
#include "ns3/simulator-impl.h"
#include "ns3/nstime.h"
#include "ns3/log.h"
#ifdef NS3_MTP
#include "ns3/mtp-interface.h"
#endif
#include <mpi.h>
@@ -91,6 +94,10 @@ char** GrantedTimeWindowMpiInterface::g_pRxBuffers;
MPI_Comm GrantedTimeWindowMpiInterface::g_communicator = MPI_COMM_WORLD;
bool GrantedTimeWindowMpiInterface::g_freeCommunicator = false;;
#ifdef NS3_MTP
std::atomic<bool> GrantedTimeWindowMpiInterface::g_sending (false);
#endif
TypeId
GrantedTimeWindowMpiInterface::GetTypeId (void)
{
@@ -210,6 +217,11 @@ GrantedTimeWindowMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, ui
{
NS_LOG_FUNCTION (this << p << rxTime.GetTimeStep () << node << dev);
#ifdef NS3_MTP
while (g_sending.exchange (true, std::memory_order_acquire))
;
#endif
SentBuffer sendBuf;
g_pendingTx.push_back (sendBuf);
std::list<SentBuffer>::reverse_iterator i = g_pendingTx.rbegin (); // Points to the last element
@@ -229,11 +241,19 @@ GrantedTimeWindowMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, ui
// Find the system id for the destination node
Ptr<Node> destNode = NodeList::GetNode (node);
#ifdef NS3_MTP
uint32_t nodeSysId = destNode->GetSystemId () & 0xFFFF;
#else
uint32_t nodeSysId = destNode->GetSystemId ();
#endif
MPI_Isend (reinterpret_cast<void *> (i->GetBuffer ()), serializedSize + 16, MPI_CHAR, nodeSysId,
0, g_communicator, (i->GetRequest ()));
g_txCount++;
#ifdef NS3_MTP
g_sending.store(false, std::memory_order_release);
#endif
}
void
@@ -287,8 +307,13 @@ GrantedTimeWindowMpiInterface::ReceiveMessages ()
NS_ASSERT (pNode && pMpiRec);
// Schedule the rx event
#ifdef NS3_MTP
MtpInterface::GetSystem (pNode->GetSystemId () >> 16)
->ScheduleAt (pNode->GetId (), rxTime, MakeEvent (&MpiReceiver::Receive, pMpiRec, p));
#else
Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (),
&MpiReceiver::Receive, pMpiRec, p);
#endif
// Re-queue the next read
MPI_Irecv (g_pRxBuffers[index], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,

View File

@@ -29,6 +29,7 @@
#define NS3_GRANTED_TIME_WINDOW_MPI_INTERFACE_H
#include <stdint.h>
#include <atomic>
#include <list>
#include "ns3/nstime.h"
@@ -80,6 +81,7 @@ private:
class Packet;
class DistributedSimulatorImpl;
class HybridSimulatorImpl;
/**
* \ingroup mpi
@@ -110,8 +112,6 @@ public:
virtual void SendPacket (Ptr<Packet> p, const Time &rxTime, uint32_t node, uint32_t dev);
virtual MPI_Comm GetCommunicator();
private:
/*
* The granted time window implementation is a collaboration of several
* classes. Methods that should be invoked only by the
@@ -119,7 +119,8 @@ private:
* It is not intended for state to be shared.
*/
friend ns3::DistributedSimulatorImpl;
friend ns3::HybridSimulatorImpl;
/**
* Check for received messages complete
*/
@@ -136,7 +137,7 @@ private:
* \return transmitted count in packets
*/
static uint32_t GetTxCount ();
/** System ID (rank) for this task. */
static uint32_t g_sid;
/** Size of the MPI COM_WORLD group. */
@@ -171,6 +172,10 @@ private:
/** Did ns-3 create the communicator? Have to free it. */
static bool g_freeCommunicator;
#ifdef NS3_MTP
static std::atomic<bool> g_sending;
#endif
};
} // namespace ns3

View File

@@ -0,0 +1,439 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
#include "hybrid-simulator-impl.h"
#include "granted-time-window-mpi-interface.h"
#include "mpi-interface.h"
#include "ns3/channel.h"
#include "ns3/simulator.h"
#include "ns3/node.h"
#include "ns3/node-container.h"
#include "ns3/node-list.h"
#include "ns3/uinteger.h"
#include "ns3/mtp-interface.h"
#include <mpi.h>
#include <algorithm>
#include <queue>
#include <thread>
namespace ns3 {
NS_LOG_COMPONENT_DEFINE ("HybridSimulatorImpl");
NS_OBJECT_ENSURE_REGISTERED (HybridSimulatorImpl);
HybridSimulatorImpl::HybridSimulatorImpl ()
{
NS_LOG_FUNCTION (this);
MtpInterface::Enable (1, 0);
m_myId = MpiInterface::GetSystemId ();
m_systemCount = MpiInterface::GetSize ();
// Allocate the LBTS message buffer
m_pLBTS = new LbtsMessage[m_systemCount];
m_smallestTime = Seconds (0);
m_globalFinished = false;
}
HybridSimulatorImpl::~HybridSimulatorImpl ()
{
NS_LOG_FUNCTION (this);
}
TypeId
HybridSimulatorImpl::GetTypeId (void)
{
static TypeId tid =
TypeId ("ns3::HybridSimulatorImpl")
.SetParent<SimulatorImpl> ()
.SetGroupName ("Mtp")
.AddConstructor<HybridSimulatorImpl> ()
.AddAttribute ("MaxThreads", "The maximum threads used in simulation",
UintegerValue (std::thread::hardware_concurrency ()),
MakeUintegerAccessor (&HybridSimulatorImpl::m_maxThreads),
MakeUintegerChecker<uint32_t> (1))
.AddAttribute ("MinLookahead", "The minimum lookahead in a partition",
TimeValue (TimeStep (1)),
MakeTimeAccessor (&HybridSimulatorImpl::m_minLookahead),
MakeTimeChecker (TimeStep (0)));
return tid;
}
void
HybridSimulatorImpl::Destroy ()
{
while (!m_destroyEvents.empty ())
{
Ptr<EventImpl> ev = m_destroyEvents.front ().PeekEventImpl ();
m_destroyEvents.pop_front ();
NS_LOG_LOGIC ("handle destroy " << ev);
if (!ev->IsCancelled ())
{
ev->Invoke ();
}
}
MtpInterface::Disable ();
MpiInterface::Destroy ();
}
bool
HybridSimulatorImpl::IsFinished (void) const
{
return m_globalFinished;
}
bool
HybridSimulatorImpl::IsLocalFinished (void) const
{
return MtpInterface::isFinished ();
}
void
HybridSimulatorImpl::Stop (void)
{
NS_LOG_FUNCTION (this);
for (uint32_t i = 0; i < MtpInterface::GetSize (); i++)
{
MtpInterface::GetSystem (i)->Stop ();
}
}
void
HybridSimulatorImpl::Stop (Time const &delay)
{
NS_LOG_FUNCTION (this << delay.GetTimeStep ());
Simulator::Schedule (delay, &Simulator::Stop);
}
EventId
HybridSimulatorImpl::Schedule (Time const &delay, EventImpl *event)
{
NS_LOG_FUNCTION (this << delay.GetTimeStep () << event);
return MtpInterface::GetSystem ()->Schedule (delay, event);
}
void
HybridSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &delay, EventImpl *event)
{
NS_LOG_FUNCTION (this << context << delay.GetTimeStep () << event);
if (MtpInterface::GetSize () == 1)
{
// initialization stage, do not schedule remote
LogicalProcess *local = MtpInterface::GetSystem ();
local->ScheduleWithContext (local, context, delay, event);
}
else
{
LogicalProcess *remote =
MtpInterface::GetSystem (NodeList::GetNode (context)->GetSystemId () >> 16);
MtpInterface::GetSystem ()->ScheduleWithContext (remote, context, delay, event);
}
}
EventId
HybridSimulatorImpl::ScheduleNow (EventImpl *event)
{
return Schedule (TimeStep (0), event);
}
EventId
HybridSimulatorImpl::ScheduleDestroy (EventImpl *event)
{
EventId id (Ptr<EventImpl> (event, false), GetMaximumSimulationTime ().GetTimeStep (), 0xffffffff,
EventId::DESTROY);
MtpInterface::CriticalSection cs;
m_destroyEvents.push_back (id);
return id;
}
void
HybridSimulatorImpl::Remove (const EventId &id)
{
if (id.GetUid () == EventId::DESTROY)
{
// destroy events.
for (std::list<EventId>::iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end ();
i++)
{
if (*i == id)
{
m_destroyEvents.erase (i);
break;
}
}
}
else
{
MtpInterface::GetSystem ()->Remove (id);
}
}
void
HybridSimulatorImpl::Cancel (const EventId &id)
{
if (!IsExpired (id))
{
id.PeekEventImpl ()->Cancel ();
}
}
bool
HybridSimulatorImpl::IsExpired (const EventId &id) const
{
if (id.GetUid () == EventId::DESTROY)
{
// destroy events.
if (id.PeekEventImpl () == 0 || id.PeekEventImpl ()->IsCancelled ())
{
return true;
}
for (std::list<EventId>::const_iterator i = m_destroyEvents.begin ();
i != m_destroyEvents.end (); i++)
{
if (*i == id)
{
return false;
}
}
return true;
}
else
{
return MtpInterface::GetSystem ()->IsExpired (id);
}
}
void
HybridSimulatorImpl::Run (void)
{
NS_LOG_FUNCTION (this);
Partition ();
MtpInterface::RunBefore ();
m_globalFinished = false;
while (!m_globalFinished)
{
GrantedTimeWindowMpiInterface::ReceiveMessages ();
GrantedTimeWindowMpiInterface::TestSendComplete ();
MtpInterface::CalculateSmallestTime ();
LbtsMessage lMsg (GrantedTimeWindowMpiInterface::GetRxCount (),
GrantedTimeWindowMpiInterface::GetTxCount (), m_myId, IsLocalFinished (),
MtpInterface::GetSmallestTime ());
m_pLBTS[m_myId] = lMsg;
MPI_Allgather (&lMsg, sizeof (LbtsMessage), MPI_BYTE, m_pLBTS, sizeof (LbtsMessage), MPI_BYTE,
MpiInterface::GetCommunicator ());
m_smallestTime = m_pLBTS[0].GetSmallestTime ();
// The totRx and totTx counts insure there are no transient
// messages; If totRx != totTx, there are transients,
// so we don't update the granted time.
uint32_t totRx = m_pLBTS[0].GetRxCount ();
uint32_t totTx = m_pLBTS[0].GetTxCount ();
m_globalFinished = m_pLBTS[0].IsFinished ();
// calculate smallest time of all hosts
for (uint32_t i = 1; i < m_systemCount; ++i)
{
if (m_pLBTS[i].GetSmallestTime () < m_smallestTime)
{
m_smallestTime = m_pLBTS[i].GetSmallestTime ();
}
totRx += m_pLBTS[i].GetRxCount ();
totTx += m_pLBTS[i].GetTxCount ();
m_globalFinished &= m_pLBTS[i].IsFinished ();
}
MtpInterface::SetSmallestTime (m_smallestTime);
// Global halting condition is all nodes have empty queue's and
// no messages are in-flight.
m_globalFinished &= totRx == totTx;
// Execute next event if it is within the current time window.
// Local task may be completed.
if (totRx == totTx && !IsLocalFinished ())
{ // Safe to process
MtpInterface::ProcessOneRound ();
}
}
MtpInterface::RunAfter ();
}
Time
HybridSimulatorImpl::Now (void) const
{
// Do not add function logging here, to avoid stack overflow
return MtpInterface::GetSystem ()->Now ();
}
Time
HybridSimulatorImpl::GetDelayLeft (const EventId &id) const
{
if (IsExpired (id))
{
return TimeStep (0);
}
else
{
return MtpInterface::GetSystem ()->GetDelayLeft (id);
}
}
Time
HybridSimulatorImpl::GetMaximumSimulationTime (void) const
{
return Time::Max () / 2;
}
void
HybridSimulatorImpl::SetScheduler (ObjectFactory schedulerFactory)
{
NS_LOG_FUNCTION (this << schedulerFactory);
for (uint32_t i = 0; i < MtpInterface::GetSize (); i++)
{
MtpInterface::GetSystem (i)->SetScheduler (schedulerFactory);
}
m_schedulerTypeId = schedulerFactory.GetTypeId ();
}
uint32_t
HybridSimulatorImpl::GetSystemId () const
{
return m_myId;
}
uint32_t
HybridSimulatorImpl::GetContext (void) const
{
return MtpInterface::GetSystem ()->GetContext ();
}
uint64_t
HybridSimulatorImpl::GetEventCount (void) const
{
uint64_t eventCount = 0;
for (uint32_t i = 0; i < MtpInterface::GetSize (); i++)
{
eventCount += MtpInterface::GetSystem (i)->GetEventCount ();
}
return eventCount;
}
void
HybridSimulatorImpl::DoDispose (void)
{
delete[] m_pLBTS;
SimulatorImpl::DoDispose ();
}
void
HybridSimulatorImpl::Partition ()
{
NS_LOG_FUNCTION (this);
uint32_t localSystemId = 0;
NodeContainer nodes = NodeContainer::GetGlobal ();
bool *visited = new bool[nodes.GetN ()]{false};
std::queue<Ptr<Node>> q;
// perform a BFS on the whole network topo to assign each node a localSystemId
for (NodeContainer::Iterator it = nodes.Begin (); it != nodes.End (); it++)
{
Ptr<Node> node = *it;
if (!visited[node->GetId ()] && node->GetSystemId () == m_myId)
{
q.push (node);
localSystemId++;
while (!q.empty ())
{
// pop from BFS queue
node = q.front ();
q.pop ();
visited[node->GetId ()] = true;
// assign this node the current localSystemId
node->SetSystemId (localSystemId << 16 | m_myId);
NS_LOG_INFO ("node " << node->GetId () << " is set to local system "
<< localSystemId);
for (uint32_t i = 0; i < node->GetNDevices (); i++)
{
Ptr<NetDevice> localNetDevice = node->GetDevice (i);
Ptr<Channel> channel = localNetDevice->GetChannel ();
if (channel == 0)
{
continue;
}
// cut-off p2p links for partition
if (localNetDevice->IsPointToPoint ())
{
TimeValue delay;
channel->GetAttribute ("Delay", delay);
// if delay is below threshold, do not cut-off
if (delay.Get () >= m_minLookahead)
{
continue;
}
}
// grab the adjacent nodes
for (uint32_t j = 0; j < channel->GetNDevices (); j++)
{
Ptr<Node> remote = channel->GetDevice (j)->GetNode ();
// if it's not visited, and not remote, add it to the current partition
if (!visited[remote->GetId ()] && node->GetSystemId () == m_myId)
{
q.push (remote);
}
}
}
}
}
}
delete[] visited;
// after the partition, we finally know the system count
uint32_t systemCount = localSystemId;
uint32_t threadCount = std::min (m_maxThreads, systemCount);
NS_LOG_INFO ("Partition done! " << systemCount << " systems share " << threadCount << " threads");
// create new systems
Ptr<Scheduler> events = MtpInterface::GetSystem ()->GetPendingEvents ();
MtpInterface::Disable ();
MtpInterface::Enable (threadCount, systemCount);
// set scheduler
ObjectFactory schedulerFactory;
schedulerFactory.SetTypeId (m_schedulerTypeId);
for (uint32_t i = 0; i <= systemCount; i++)
{
MtpInterface::GetSystem (i)->SetScheduler (schedulerFactory);
}
// transfer events to new system
while (!events->IsEmpty ())
{
Scheduler::Event ev = events->RemoveNext ();
// invoke initialization events (at time 0) by their insertion order
// since they may not be in the same system, causing error
if (ev.key.m_ts == 0)
{
MtpInterface::GetSystem (ev.key.m_context == Simulator::NO_CONTEXT
? 0
: NodeList::GetNode (ev.key.m_context)->GetSystemId () >> 16)
->InvokeNow (ev);
}
else if (ev.key.m_context == Simulator::NO_CONTEXT)
{
Schedule (TimeStep (ev.key.m_ts), ev.impl);
}
else
{
ScheduleWithContext (ev.key.m_context, TimeStep (ev.key.m_ts), ev.impl);
}
}
}
} // namespace ns3

View File

@@ -0,0 +1,72 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
#ifndef NS3_HYBRID_SIMULATOR_IMPL_H
#define NS3_HYBRID_SIMULATOR_IMPL_H
#include "distributed-simulator-impl.h"
#include "ns3/event-id.h"
#include "ns3/event-impl.h"
#include "ns3/nstime.h"
#include "ns3/object-factory.h"
#include "ns3/simulator-impl.h"
#include <list>
namespace ns3 {
class HybridSimulatorImpl : public SimulatorImpl
{
public:
static TypeId GetTypeId (void);
/** Default constructor. */
HybridSimulatorImpl ();
/** Destructor. */
~HybridSimulatorImpl ();
// virtual from SimulatorImpl
virtual void Destroy ();
virtual bool IsFinished (void) const;
virtual void Stop (void);
virtual void Stop (Time const &delay);
virtual EventId Schedule (Time const &delay, EventImpl *event);
virtual void ScheduleWithContext (uint32_t context, Time const &delay, EventImpl *event);
virtual EventId ScheduleNow (EventImpl *event);
virtual EventId ScheduleDestroy (EventImpl *event);
virtual void Remove (const EventId &id);
virtual void Cancel (const EventId &id);
virtual bool IsExpired (const EventId &id) const;
virtual void Run (void);
virtual Time Now (void) const;
virtual Time GetDelayLeft (const EventId &id) const;
virtual Time GetMaximumSimulationTime (void) const;
virtual void SetScheduler (ObjectFactory schedulerFactory);
virtual uint32_t GetSystemId (void) const;
virtual uint32_t GetContext (void) const;
virtual uint64_t GetEventCount (void) const;
private:
// Inherited from Object
virtual void DoDispose (void);
bool IsLocalFinished (void) const;
/** Are all parallel instances completed. */
bool m_globalFinished;
LbtsMessage *m_pLBTS;
uint32_t m_myId; /**< MPI rank. */
uint32_t m_systemCount; /**< MPI communicator size. */
Time m_smallestTime; /**< End of current window. */
void Partition ();
uint32_t m_maxThreads;
Time m_minLookahead;
TypeId m_schedulerTypeId;
std::list<EventId> m_destroyEvents;
};
} // namespace ns3
#endif /* NS3_HYBRID_SIMULATOR_IMPL_H */

View File

@@ -94,7 +94,7 @@ MpiInterface::SetParallelSimulatorImpl (void)
g_parallelCommunicationInterface = new NullMessageMpiInterface ();
useDefault = false;
}
else if (simulationType.compare ("ns3::DistributedSimulatorImpl") == 0)
else if (simulationType.compare ("ns3::DistributedSimulatorImpl") == 0 || simulationType.compare("ns3::HybridSimulatorImpl") == 0)
{
g_parallelCommunicationInterface = new GrantedTimeWindowMpiInterface ();
useDefault = false;