mpi: (merges !389) Enable user to provide an MPI communication context

This commit is contained in:
Steven Smith
2020-08-28 14:13:11 -07:00
committed by Tom Henderson
parent 2cea1e4ccd
commit 5b2f902d04
38 changed files with 1952 additions and 476 deletions

View File

@@ -0,0 +1,99 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright 2018. Lawrence Livermore National Security, LLC.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation;
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Steven Smith <smith84@llnl.gov>
*/
#include "mpi-test-fixtures.h"
#include "ns3/simulator.h"
#include "ns3/packet.h"
#include "ns3/inet-socket-address.h"
#include "ns3/inet6-socket-address.h"
#include "ns3/address.h"
#include "ns3/ptr.h"
#include "mpi.h"
namespace ns3 {
unsigned long SinkTracer::m_sinkCount = 0;
unsigned long SinkTracer::m_line = 0;
int SinkTracer::m_worldRank = -1;
int SinkTracer::m_worldSize = -1;
void
SinkTracer::Init (void)
{
m_sinkCount = 0;
m_line = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &m_worldRank);
MPI_Comm_size (MPI_COMM_WORLD, &m_worldSize);
}
void
SinkTracer::SinkTrace (const ns3::Ptr<const ns3::Packet> packet,
const ns3::Address &srcAddress,
const ns3::Address &destAddress)
{
m_sinkCount++;
}
void
SinkTracer::Verify (unsigned long expectedCount)
{
unsigned long globalCount;
#ifdef NS3_MPI
MPI_Reduce(&m_sinkCount, &globalCount, 1, MPI_UNSIGNED_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
#else
globalCount = m_sinkCount;
#endif
if (expectedCount == globalCount)
{
RANK0COUT ("PASSED\n");
}
else
{
RANK0COUT ("FAILED Observed sink traces (" << globalCount << ") not equal to expected (" << expectedCount << ")\n");
}
}
std::string
SinkTracer::FormatAddress (const ns3::Address address)
{
std::stringstream ss;
if (InetSocketAddress::IsMatchingType (address))
{
ss << InetSocketAddress::ConvertFrom(address).GetIpv4 ()
<< ":"
<< InetSocketAddress::ConvertFrom (address).GetPort ();
}
else if (Inet6SocketAddress::IsMatchingType (address))
{
ss << Inet6SocketAddress::ConvertFrom(address).GetIpv6 ()
<< ":"
<< Inet6SocketAddress::ConvertFrom (address).GetPort ();
}
return ss.str ();
}
} // namespace ns3

View File

@@ -0,0 +1,143 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright 2018. Lawrence Livermore National Security, LLC.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation;
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Steven Smith <smith84@llnl.gov>
*/
#include <iomanip>
#include <sstream>
#include <ios>
/**
* \file
* \ingroup mpi
*
* Common methods for MPI examples.
*
* Since MPI output is coming from multiple processors it is the
* ordering between the processors is non-deterministic. For
* regression testing the output is sorted to force a deterministic
* ordering. Methods include here add line number to support
* this sorting.
*
* Testing output is also grepped so only lines with "TEST" are
* included. Some MPI launchers emit extra text to output which must
* be excluded for regression comparisons.
*/
namespace ns3 {
template <typename T> class Ptr;
class Address;
class Packet;
/**
* Write to std::cout only from rank 0.
* Number line for sorting output of parallel runs.
*
* \param x The output operators.
*/
#define RANK0COUT(x) \
do \
if (SinkTracer::GetWorldRank () == 0) \
{ \
std::cout << "TEST : "; \
std::ios_base::fmtflags f( std::cout.flags() ); \
std::cout << std::setfill('0') << std::setw(5) << SinkTracer::GetLineCount (); \
std::cout.flags( f ); \
std::cout << " : " << x; \
} \
while (false)
#define RANK0COUTAPPEND(x) \
do \
if (SinkTracer::GetWorldRank () == 0) \
{ \
std::cout << x ; \
} \
while (false)
class SinkTracer
{
public:
/**
* PacketSink receive trace callback.
* \copydetails ns3::Packet::TwoAddressTracedCallback
*/
static void Init (void);
/**
* PacketSink receive trace callback.
* \copydetails ns3::Packet::TwoAddressTracedCallback
*/
static void SinkTrace (const ns3::Ptr<const ns3::Packet> packet,
const ns3::Address &srcAddress,
const ns3::Address &destAddress);
/**
* Verify the sink trace count observed matches the expected count.
* Prints message to std::cout indicating success or fail.
*/
static void Verify (unsigned long expectedCount);
/**
* Get the source address and port, as a formatted string.
*
* \param [in] address The ns3::Address.
* \return A string with the formatted address and port number.
*/
static std::string FormatAddress (const ns3::Address address);
/**
* Get the MPI rank in the world communicator.
*
* \return MPI world rank.
*/
static int GetWorldRank (void)
{
return m_worldRank;
}
/**
* Get the MPI size of the world communicator.
*
* \return MPI world size.
*/
static int GetWorldSize (void)
{
return m_worldSize;
}
/**
* Get current line count and increment.
*
*/
static int GetLineCount (void)
{
return m_line++;
}
private:
static unsigned long m_sinkCount; /*< Running sum of number of SinkTrace calls observed */
static unsigned long m_line; /*< Current output line number for ordering output */
static int m_worldRank; /*< MPI CommWorld rank */
static int m_worldSize; /*< MPI CommWorld size */
};
} // namespace ns3

View File

@@ -14,6 +14,11 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* (c) 2009, GTech Systems, Inc. - Alfred Park <park@gtech-systems.com>
*/
/**
* \file
* \ingroup mpi
*
* DARPA NMS Campus Network Model
*
@@ -33,11 +38,7 @@
* to make very large simulations.
*/
// for timing functions
#include <cstdlib>
#include <sys/time.h>
#include <fstream>
#include <vector>
#include "mpi-test-fixtures.h"
#include "ns3/core-module.h"
#include "ns3/internet-module.h"
@@ -48,12 +49,10 @@
#include "ns3/mpi-interface.h"
#include "ns3/ipv4-nix-vector-helper.h"
using namespace ns3;
#include <fstream>
#include <vector>
typedef struct timeval TIMER_TYPE;
#define TIMER_NOW(_t) gettimeofday (&_t,NULL);
#define TIMER_SECONDS(_t) ((double)(_t).tv_sec + (_t).tv_usec * 1e-6)
#define TIMER_DIFF(_t1, _t2) (TIMER_SECONDS (_t1) - TIMER_SECONDS (_t2))
using namespace ns3;
NS_LOG_COMPONENT_DEFINE ("CampusNetworkModelDistributed");
@@ -67,66 +66,78 @@ main (int argc, char *argv[])
typedef std::vector<Ipv4InterfaceContainer> vectorOfIpv4InterfaceContainer;
typedef std::vector<vectorOfIpv4InterfaceContainer> vectorOfVectorOfIpv4InterfaceContainer;
typedef std::vector<vectorOfVectorOfIpv4InterfaceContainer> vectorOfVectorOfVectorOfIpv4InterfaceContainer;
typedef std::vector<NetDeviceContainer> vectorOfNetDeviceContainer;
typedef std::vector<vectorOfNetDeviceContainer> vectorOfVectorOfNetDeviceContainer;
// Enable parallel simulator with the command line arguments
MpiInterface::Enable (&argc, &argv);
TIMER_TYPE t0, t1, t2;
TIMER_NOW (t0);
std::cout << " ==== DARPA NMS CAMPUS NETWORK SIMULATION ====" << std::endl;
SinkTracer::Init ();
GlobalValue::Bind ("SimulatorImplementationType",
StringValue ("ns3::DistributedSimulatorImpl"));
SystemWallClockMs t0; // Total time
SystemWallClockMs t1; // Setup time
SystemWallClockMs t2; // Run time/
t0.Start ();
t1.Start ();
uint32_t systemId = MpiInterface::GetSystemId ();
uint32_t systemCount = MpiInterface::GetSize ();
uint32_t nCN = 2, nLANClients = 42;
int32_t single = 0;
int nBytes = 500000; // Bytes for each on/off app
RANK0COUT (" ==== DARPA NMS CAMPUS NETWORK SIMULATION ====" << std::endl);
GlobalValue::Bind ("SimulatorImplementationType",
StringValue ("ns3::DistributedSimulatorImpl"));
uint32_t nCN = 2;
uint32_t nLANClients = 10;
bool single = 0;
int nPackets = 10; // Packets sent by OnOff applications
bool nix = true;
Time stop = Seconds (100);
bool verbose = false;
bool testing = false;
CommandLine cmd (__FILE__);
cmd.AddValue ("CN", "Number of total CNs [2]", nCN);
cmd.AddValue ("LAN", "Number of nodes per LAN [42]", nLANClients);
cmd.AddValue ("single", "1 if use single flow", single);
cmd.AddValue ("nBytes", "Number of bytes for each on/off app", nBytes);
cmd.AddValue ("campuses", "Number of campus networks", nCN);
cmd.AddValue ("clients", "Number of client nodes per LAN", nLANClients);
cmd.AddValue ("packets", "Number of packets each on/off app should send", nPackets);
cmd.AddValue ("nix", "Toggle the use of nix-vector or global routing", nix);
cmd.AddValue ("stop", "Simulation run time", stop);
cmd.AddValue ("single", "Use single on/off app per campus network", single);
cmd.AddValue ("verbose", "Show extra timing information", verbose);
cmd.AddValue ("test", "Enable regression test output", testing);
cmd.Parse (argc,argv);
if (nCN < 2)
{
std::cout << "Number of total CNs (" << nCN << ") lower than minimum of 2"
<< std::endl;
RANK0COUT ("Number of total CNs (" << nCN << ") lower than minimum of 2"
<< std::endl);
return 1;
}
if (systemCount > nCN)
{
std::cout << "Number of total CNs (" << nCN << ") should be >= systemCount ("
<< systemCount << ")." << std::endl;
RANK0COUT ("Number of total CNs (" << nCN << ") should be >= systemCount ("
<< systemCount << ")." << std::endl);
return 1;
}
std::cout << "Number of CNs: " << nCN << ", LAN nodes: " << nLANClients << std::endl;
RANK0COUT ("Number of CNs: " << nCN << ", LAN nodes: " << nLANClients << std::endl);
vectorOfNodeContainer nodes_netLR(nCN);
vectorOfVectorOfNodeContainer nodes_net0(nCN,vectorOfNodeContainer(3));
vectorOfVectorOfNodeContainer nodes_net1(nCN,vectorOfNodeContainer(6));
vectorOfVectorOfNodeContainer nodes_net2(nCN,vectorOfNodeContainer(14));
vectorOfVectorOfNodeContainer nodes_net3(nCN,vectorOfNodeContainer(9));
vectorOfVectorOfVectorOfNodeContainer nodes_net2LAN(nCN,vectorOfVectorOfNodeContainer(7,vectorOfNodeContainer(nLANClients)));
vectorOfVectorOfVectorOfNodeContainer nodes_net3LAN(nCN,vectorOfVectorOfNodeContainer(5,vectorOfNodeContainer(nLANClients)));
PointToPointHelper p2p_2gb200ms, p2p_1gb5ms, p2p_100mb1ms;
InternetStackHelper stack;
Ipv4InterfaceContainer ifs;
Ipv4InterfaceContainer ifs;
vectorOfVectorOfIpv4InterfaceContainer ifs0(nCN,vectorOfIpv4InterfaceContainer(3));
vectorOfVectorOfIpv4InterfaceContainer ifs1(nCN,vectorOfIpv4InterfaceContainer(6));
@@ -134,7 +145,7 @@ main (int argc, char *argv[])
vectorOfVectorOfIpv4InterfaceContainer ifs3(nCN,vectorOfIpv4InterfaceContainer(9));
vectorOfVectorOfVectorOfIpv4InterfaceContainer ifs2LAN(nCN,vectorOfVectorOfIpv4InterfaceContainer(7,vectorOfIpv4InterfaceContainer(nLANClients)));
vectorOfVectorOfVectorOfIpv4InterfaceContainer ifs3LAN(nCN,vectorOfVectorOfIpv4InterfaceContainer(5,vectorOfIpv4InterfaceContainer(nLANClients)));
Ipv4AddressHelper address;
std::ostringstream oss;
p2p_1gb5ms.SetDeviceAttribute ("DataRate", StringValue ("1Gbps"));
@@ -153,9 +164,9 @@ main (int argc, char *argv[])
// Create Campus Networks
for (uint32_t z = 0; z < nCN; ++z)
{
std::cout << "Creating Campus Network " << z << ":" << std::endl;
RANK0COUT ("Creating Campus Network " << z << ":" << std::endl);
// Create Net0
std::cout << " SubNet [ 0";
RANK0COUT (" SubNet [ 0");
for (int i = 0; i < 3; ++i)
{
Ptr<Node> node = CreateObject<Node> (z % systemCount);
@@ -171,7 +182,7 @@ main (int argc, char *argv[])
ndc0[i] = p2p_1gb5ms.Install (nodes_net0[z][i]);
}
// Create Net1
std::cout << " 1";
RANK0COUTAPPEND (" 1");
for (int i = 0; i < 6; ++i)
{
Ptr<Node> node = CreateObject<Node> (z % systemCount);
@@ -203,7 +214,7 @@ main (int argc, char *argv[])
address.SetBase (oss.str ().c_str (), "255.255.255.0");
ifs = address.Assign (ndc0_1);
// Create Net2
std::cout << " 2";
RANK0COUTAPPEND (" 2");
for (int i = 0; i < 14; ++i)
{
Ptr<Node> node = CreateObject<Node> (z % systemCount);
@@ -246,7 +257,7 @@ main (int argc, char *argv[])
}
}
// Create Net3
std::cout << " 3 ]" << std::endl;
RANK0COUTAPPEND (" 3 ]" << std::endl);
for (int i = 0; i < 9; ++i)
{
Ptr<Node> node = CreateObject<Node> (z % systemCount);
@@ -283,7 +294,7 @@ main (int argc, char *argv[])
ifs3LAN[z][i][j] = address.Assign (ndc3LAN[i][j]);
}
}
std::cout << " Connecting Subnets..." << std::endl;
RANK0COUT (" Connecting Subnets..." << std::endl);
// Create Lone Routers (Node 4 & 5)
Ptr<Node> node1 = CreateObject<Node> (z % systemCount);
Ptr<Node> node2 = CreateObject<Node> (z % systemCount);
@@ -338,7 +349,7 @@ main (int argc, char *argv[])
address.SetBase (oss.str ().c_str (), "255.255.255.0");
ifs = address.Assign (ndc3_5b);
// Assign IP addresses
std::cout << " Assigning IP addresses..." << std::endl;
RANK0COUT (" Assigning IP addresses..." << std::endl);
for (int i = 0; i < 3; ++i)
{
oss.str ("");
@@ -379,7 +390,7 @@ main (int argc, char *argv[])
// Create Ring Links
if (nCN > 1)
{
std::cout << "Forming Ring Topology..." << std::endl;
RANK0COUT ("Forming Ring Topology..." << std::endl);
vectorOfNodeContainer nodes_ring(nCN);
for (uint32_t z = 0; z < nCN - 1; ++z)
{
@@ -400,9 +411,9 @@ main (int argc, char *argv[])
}
// Create Traffic Flows
std::cout << "Creating UDP Traffic Flows:" << std::endl;
RANK0COUT ("Creating UDP Traffic Flows:" << std::endl);
Config::SetDefault ("ns3::OnOffApplication::MaxBytes",
UintegerValue (nBytes));
UintegerValue (nPackets * 512));
Config::SetDefault ("ns3::OnOffApplication::OnTime",
StringValue ("ns3::ConstantRandomVariable[Constant=1]"));
Config::SetDefault ("ns3::OnOffApplication::OffTime",
@@ -418,10 +429,14 @@ main (int argc, char *argv[])
9999));
ApplicationContainer sinkApp = sinkHelper.Install (nodes_net1[0][2].Get (0));
sinkApp.Start (Seconds (0.0));
if (testing)
{
sinkApp.Get (0)->TraceConnectWithoutContext ("RxWithAddresses", MakeCallback (&SinkTracer::SinkTrace));
}
OnOffHelper client ("ns3::UdpSocketFactory", Address ());
AddressValue remoteAddress (InetSocketAddress (ifs1[0][2].GetAddress (0), 9999));
std::cout << "Remote Address is " << ifs1[0][2].GetAddress (0) << std::endl;
RANK0COUT ("Remote Address is " << ifs1[0][2].GetAddress (0) << std::endl);
client.SetAttribute ("Remote", remoteAddress);
ApplicationContainer clientApp;
@@ -435,8 +450,11 @@ main (int argc, char *argv[])
9999));
ApplicationContainer sinkApp =
sinkHelper.Install (nodes_net1[1][0].Get (0));
sinkApp.Start (Seconds (0.0));
if (testing)
{
sinkApp.Get (0)->TraceConnectWithoutContext ("RxWithAddresses", MakeCallback (&SinkTracer::SinkTrace));
}
}
else if (systemId == 0)
{
@@ -444,7 +462,7 @@ main (int argc, char *argv[])
AddressValue remoteAddress
(InetSocketAddress (ifs1[1][0].GetAddress (0), 9999));
std::cout << "Remote Address is " << ifs1[1][0].GetAddress (0) << std::endl;
RANK0COUT ("Remote Address is " << ifs1[1][0].GetAddress (0) << std::endl);
client.SetAttribute ("Remote", remoteAddress);
ApplicationContainer clientApp;
@@ -465,7 +483,7 @@ main (int argc, char *argv[])
x = 0;
}
// Subnet 2 LANs
std::cout << " Campus Network " << z << " Flows [ Net2 ";
RANK0COUT (" Campus Network " << z << " Flows [ Net2 ");
for (int i = 0; i < 7; ++i)
{
for (uint32_t j = 0; j < nLANClients; ++j)
@@ -481,6 +499,10 @@ main (int argc, char *argv[])
sinkHelper.Install (nodes_net2LAN[z][i][j].Get (0));
sinkApp.Start (Seconds (0.0));
if (testing)
{
sinkApp.Get (0)->TraceConnectWithoutContext ("RxWithAddresses", MakeCallback (&SinkTracer::SinkTrace));
}
}
else if (systemId == z % systemCount)
{
@@ -492,6 +514,10 @@ main (int argc, char *argv[])
sinkHelper.Install (nodes_net2LAN[z][i][j].Get (0));
sinkApp.Start (Seconds (0.0));
if (testing)
{
sinkApp.Get (0)->TraceConnectWithoutContext ("RxWithAddresses", MakeCallback (&SinkTracer::SinkTrace));
}
}
// Sources
if (systemCount == 1)
@@ -525,7 +551,7 @@ main (int argc, char *argv[])
}
}
// Subnet 3 LANs
std::cout << "Net3 ]" << std::endl;
RANK0COUTAPPEND ("Net3 ]" << std::endl);
for (int i = 0; i < 5; ++i)
{
for (uint32_t j = 0; j < nLANClients; ++j)
@@ -539,8 +565,11 @@ main (int argc, char *argv[])
ApplicationContainer sinkApp =
sinkHelper.Install (nodes_net3LAN[z][i][j].Get (0));
sinkApp.Start (Seconds (0.0));
if (testing)
{
sinkApp.Get (0)->TraceConnectWithoutContext ("RxWithAddresses", MakeCallback (&SinkTracer::SinkTrace));
}
}
else if (systemId == z % systemCount)
{
@@ -552,6 +581,10 @@ main (int argc, char *argv[])
sinkHelper.Install (nodes_net3LAN[z][i][j].Get (0));
sinkApp.Start (Seconds (0.0));
if (testing)
{
sinkApp.Get (0)->TraceConnectWithoutContext ("RxWithAddresses", MakeCallback (&SinkTracer::SinkTrace));
}
}
// Sources
if (systemCount == 1)
@@ -587,40 +620,57 @@ main (int argc, char *argv[])
}
}
std::cout << "Created " << NodeList::GetNNodes () << " nodes." << std::endl;
TIMER_TYPE routingStart;
TIMER_NOW (routingStart);
RANK0COUT ("Created " << NodeList::GetNNodes () << " nodes." << std::endl);
SystemWallClockMs tRouting;
tRouting.Start ();;
if (nix)
{
std::cout << "Using Nix-vectors..." << std::endl;
RANK0COUT ("Using Nix-vectors..." << std::endl);
}
else
{
// Calculate routing tables
std::cout << "Populating Routing tables..." << std::endl;
RANK0COUT ("Populating Routing tables..." << std::endl);
Ipv4GlobalRoutingHelper::PopulateRoutingTables ();
}
TIMER_TYPE routingEnd;
TIMER_NOW (routingEnd);
std::cout << "Routing tables population took "
<< TIMER_DIFF (routingEnd, routingStart) << std::endl;
tRouting.End ();
if (verbose)
{
RANK0COUT ("Routing tables population took "
<< tRouting.GetElapsedReal () << "ms" << std::endl);
}
std::cout << "Running simulator..." << std::endl;
TIMER_NOW (t1);
Simulator::Stop (Seconds (100.0));
RANK0COUT ("Running simulator..." << std::endl);
t1.End ();
t2.Start ();
Simulator::Stop (stop);
Simulator::Run ();
TIMER_NOW (t2);
std::cout << "Simulator finished." << std::endl;
RANK0COUT ("Simulator finished." << std::endl);
Simulator::Destroy ();
if (testing)
{
const int numberNodesSending = nCN * ( nLANClients * (7 + 5)); // 7 size of Net2, 5 size of Net3
const int expectedPacketCount = numberNodesSending * nPackets;
SinkTracer::Verify (expectedPacketCount);
}
// Exit the parallel execution environment
MpiInterface::Disable ();
double d1 = TIMER_DIFF (t1, t0), d2 = TIMER_DIFF (t2, t1);
std::cout << "-----" << std::endl << "Runtime Stats:" << std::endl;
std::cout << "Simulator init time: " << d1 << std::endl;
std::cout << "Simulator run time: " << d2 << std::endl;
std::cout << "Total elapsed time: " << d1 + d2 << std::endl;
t2.End ();
RANK0COUT ("-----" << std::endl);
if (verbose)
{
RANK0COUT ("Runtime Stats:\n"
<< "Simulator init time: " << t1.GetElapsedReal () << "ms\n"
<< "Simulator run time: " << t2.GetElapsedReal () << "ms\n"
<< "Total elapsed time: " << t0.GetElapsedReal () << "ms"
<< std::endl);
}
return 0;
}

View File

@@ -1,5 +1,7 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright 2013. Lawrence Livermore National Security, LLC.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation;
@@ -13,6 +15,12 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Steven Smith <smith84@llnl.gov>
*/
/**
* \file
* \ingroup mpi
*
* This test is equivalent to simple-distributed but tests boundary cases
* when one of the ranks has no Nodes on it. When run on two tasks
@@ -56,6 +64,8 @@
* right leaf nodes output logging information when they receive the packet.
*/
#include "mpi-test-fixtures.h"
#include "ns3/core-module.h"
#include "ns3/network-module.h"
#include "ns3/mpi-interface.h"
@@ -66,11 +76,11 @@
#include "ns3/ipv4-address-helper.h"
#include "ns3/on-off-helper.h"
#include "ns3/packet-sink-helper.h"
#include <mpi.h>
#include "mpi.h"
using namespace ns3;
NS_LOG_COMPONENT_DEFINE ("SimpleDistributed");
NS_LOG_COMPONENT_DEFINE ("SimpleDistributedEmptyNode");
int
main (int argc, char *argv[])
@@ -78,12 +88,16 @@ main (int argc, char *argv[])
bool nix = true;
bool nullmsg = false;
bool tracing = false;
bool testing = false;
bool verbose = false;
// Parse command line
CommandLine cmd (__FILE__);
cmd.AddValue ("nix", "Enable the use of nix-vector or global routing", nix);
cmd.AddValue ("nullmsg", "Enable the use of null-message synchronization", nullmsg);
cmd.AddValue ("tracing", "Enable pcap tracing", tracing);
cmd.AddValue ("verbose", "verbose output", verbose);
cmd.AddValue ("test", "Enable regression test output", testing);
cmd.Parse (argc, argv);
// Distributed simulation setup; by default use granted time window algorithm.
@@ -100,8 +114,13 @@ main (int argc, char *argv[])
MpiInterface::Enable (&argc, &argv);
LogComponentEnable ("PacketSink", LOG_LEVEL_INFO);
SinkTracer::Init ();
if (verbose)
{
LogComponentEnable ("PacketSink", (LogLevel)(LOG_LEVEL_INFO | LOG_PREFIX_NODE | LOG_PREFIX_TIME));
}
uint32_t systemId = MpiInterface::GetSystemId ();
uint32_t systemCount = MpiInterface::GetSize ();
@@ -258,6 +277,10 @@ main (int argc, char *argv[])
for (uint32_t i = 0; i < 4; ++i)
{
sinkApp.Add (sinkHelper.Install (rightLeafNodes.Get (i)));
if (testing)
{
sinkApp.Get (i)->TraceConnectWithoutContext ("RxWithAddresses", MakeCallback (&SinkTracer::SinkTrace));
}
}
sinkApp.Start (Seconds (1.0));
sinkApp.Stop (Seconds (5));
@@ -287,6 +310,12 @@ main (int argc, char *argv[])
Simulator::Stop (Seconds (5));
Simulator::Run ();
Simulator::Destroy ();
if (testing)
{
SinkTracer::Verify (4);
}
// Exit the MPI execution environment
MpiInterface::Disable ();
return 0;

View File

@@ -0,0 +1,520 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright 2018. Lawrence Livermore National Security, LLC.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation;
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Steven Smith <smith84@llnl.gov>
*/
/**
* \file
* \ingroup mpi
*
* This test is equivalent to simple-distributed with the addition of
* initialization of MPI by user code (this script) and providing
* a communicator to ns-3. The ns-3 communicator is smaller than
* MPI Comm World as might be the case if ns-3 is run in parallel
* with another simulator.
*
* TestDistributed creates a dumbbell topology and logically splits it in
* half. The left half is placed on logical processor 0 and the right half
* is placed on logical processor 1.
*
* ------- -------
* RANK 0 RANK 1
* ------- | -------
* |
* n0 ---------| | |---------- n6
* | | |
* n1 -------\ | | | /------- n7
* n4 ----------|---------- n5
* n2 -------/ | | | \------- n8
* | | |
* n3 ---------| | |---------- n9
*
*
* OnOff clients are placed on each left leaf node. Each right leaf node
* is a packet sink for a left leaf node. As a packet travels from one
* logical processor to another (the link between n4 and n5), MPI messages
* are passed containing the serialized packet. The message is then
* deserialized into a new packet and sent on as normal.
*
* One packet is sent from each left leaf node. The packet sinks on the
* right leaf nodes output logging information when they receive the packet.
*/
#include "mpi-test-fixtures.h"
#include "ns3/core-module.h"
#include "ns3/network-module.h"
#include "ns3/mpi-interface.h"
#include "ns3/ipv4-global-routing-helper.h"
#include "ns3/ipv4-static-routing-helper.h"
#include "ns3/ipv4-list-routing-helper.h"
#include "ns3/point-to-point-helper.h"
#include "ns3/internet-stack-helper.h"
#include "ns3/ipv4-nix-vector-helper.h"
#include "ns3/ipv4-address-helper.h"
#include "ns3/on-off-helper.h"
#include "ns3/packet-sink.h"
#include "ns3/packet-sink-helper.h"
#include "mpi.h"
using namespace ns3;
NS_LOG_COMPONENT_DEFINE ("SimpleDistributedMpiComm");
// Tag for whether this rank should go into a new communicator
// ns-3 ranks will have color == 1.
const int NS_COLOR = 1;
const int NOT_NS_COLOR = NS_COLOR + 1;
/**
* Report my rank, in both MPI_COMM_WORLD and the split communicator.
*
* \param [in] color My role, either ns-3 rank or other rank.
* \param [in] spitComm The split communicator.
*/
void
ReportRank (int color, MPI_Comm splitComm)
{
int otherId=0;
int otherSize=1;
MPI_Comm_rank (splitComm, &otherId);
MPI_Comm_size (splitComm, &otherSize);
if (color == NS_COLOR)
{
RANK0COUT ( "ns-3 rank: ");
}
else
{
RANK0COUT ( "Other rank: ");
}
RANK0COUTAPPEND ( "in MPI_COMM_WORLD: " << SinkTracer::GetWorldRank () << ":" << SinkTracer::GetWorldSize ()
<< ", in splitComm: " << otherId << ":" << otherSize
<< std::endl);
} // ReportRank()
int
main (int argc, char *argv[])
{
bool nix = true;
bool nullmsg = false;
bool tracing = false;
bool init = false;
bool verbose = false;
bool testing = false;
// Parse command line
CommandLine cmd;
cmd.AddValue ("nix", "Enable the use of nix-vector or global routing", nix);
cmd.AddValue ("nullmsg", "Enable the use of null-message synchronization (instead of granted time window)", nullmsg);
cmd.AddValue ("tracing", "Enable pcap tracing", tracing);
cmd.AddValue ("init", "ns-3 should initialize MPI by calling MPI_Init", init);
cmd.AddValue ("verbose", "verbose output", verbose);
cmd.AddValue ("test", "Enable regression test output", testing);
cmd.Parse (argc, argv);
// Defer reporting the configuration until we know the communicator
// Distributed simulation setup; by default use granted time window algorithm.
if(nullmsg)
{
GlobalValue::Bind ("SimulatorImplementationType",
StringValue ("ns3::NullMessageSimulatorImpl"));
}
else
{
GlobalValue::Bind ("SimulatorImplementationType",
StringValue ("ns3::DistributedSimulatorImpl"));
}
// MPI_Init
if (init)
{
// Initialize MPI directly
MPI_Init(&argc, &argv);
}
else
{
// Let ns-3 call MPI_Init and MPI_Finalize
MpiInterface::Enable (&argc, &argv);
}
SinkTracer::Init ();
auto worldSize = SinkTracer::GetWorldSize ();
auto worldRank = SinkTracer::GetWorldRank ();
if ( (!init) && (worldSize != 2))
{
RANK0COUT ("This simulation requires exactly 2 logical processors if --init is not set." << std::endl);
return 1;
}
if (worldSize < 2)
{
RANK0COUT ("This simulation requires 2 or more logical processors." << std::endl);
return 1;
}
// Set up the MPI communicator for ns-3
// Condition ns-3 Communicator
// a. worldSize = 2 copy of MPI_COMM_WORLD
// b. worldSize > 2 communicator of ranks 1-2
// Flag to record that we created a communicator so we can free it at the end.
bool freeComm = false;
// The new communicator, if we create one
MPI_Comm splitComm = MPI_COMM_WORLD;
// The list of ranks assigned to ns-3
std::string ns3Ranks;
// Tag for whether this rank should go into a new communicator
int color = MPI_UNDEFINED;
if (worldSize == 2)
{
std::stringstream ss;
color = NS_COLOR;
ss << "MPI_COMM_WORLD (" << worldSize << " ranks)";
ns3Ranks = ss.str ();
splitComm = MPI_COMM_WORLD;
freeComm = false;
}
else
{
// worldSize > 2 communicator of ranks 1-2
// Put ranks 1-2 in the new communicator
if (worldRank == 1 || worldRank == 2)
{
color = NS_COLOR;
}
else
{
color = NOT_NS_COLOR;
}
std::stringstream ss;
ss << "Split [1-2] (out of " << worldSize << " ranks) from MPI_COMM_WORLD";
ns3Ranks = ss.str ();
// Now create the new communicator
MPI_Comm_split (MPI_COMM_WORLD, color, worldRank, &splitComm);
freeComm = true;
}
if(init)
{
MpiInterface::Enable (splitComm);
}
// Report the configuration from rank 0 only
RANK0COUT (cmd.GetName () << "\n");
RANK0COUT ("\n" );
RANK0COUT ("Configuration:\n" );
RANK0COUT ("Routing: " << (nix ? "nix-vector" : "global") << "\n");
RANK0COUT ("Synchronization: " << (nullmsg ? "null-message" : "granted time window (YAWNS)") << "\n");
RANK0COUT ("MPI_Init called: " << (init ? "explicitly by this program" : "implicitly by ns3::MpiInterface::Enable()") << "\n" );
RANK0COUT ("ns-3 Communicator: " << ns3Ranks << "\n");
RANK0COUT ("PCAP tracing: " << (tracing ? "" : "not") << " enabled\n");
RANK0COUT ("\n");
RANK0COUT ("Rank assignments:" << std::endl);
if (worldRank == 0)
{
ReportRank (color, splitComm);
}
if(verbose)
{
// Circulate a token to have each rank report in turn
int token;
if (worldRank == 0)
{
token = 1;
}
else
{
MPI_Recv (&token, 1, MPI_INT, worldRank - 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
ReportRank (color, splitComm);
}
MPI_Send (&token, 1, MPI_INT, (worldRank + 1) % worldSize, 0, MPI_COMM_WORLD);
if (worldRank == 0)
{
MPI_Recv (&token, 1, MPI_INT, worldSize - 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
} // circulate token to report rank
RANK0COUT (std::endl);
if (color != NS_COLOR)
{
// Do other work outside the ns-3 communicator
// In real use of a separate communicator from ns-3
// the other tasks would be running another simulator
// or other desired work here..
// Our work is done, just wait for everyone else to finish.
MpiInterface::Disable ();
if(init)
{
MPI_Finalize ();
}
return 0;
}
// The code below here is essentially the same as simple-distributed.cc
// --------------------------------------------------------------------
// We use a trace instead of relying on NS_LOG
if (verbose)
{
LogComponentEnable ("PacketSink", LOG_LEVEL_INFO);
}
uint32_t systemId = MpiInterface::GetSystemId ();
uint32_t systemCount = MpiInterface::GetSize ();
// Check for valid distributed parameters.
// Both this script and simple-distributed.cc will work
// with arbitrary numbers of ranks, as long as there are at least 2.
if (systemCount < 2)
{
RANK0COUT ("This simulation requires at least 2 logical processors." << std::endl);
return 1;
}
// Some default values
Config::SetDefault ("ns3::OnOffApplication::PacketSize", UintegerValue (512));
Config::SetDefault ("ns3::OnOffApplication::DataRate", StringValue ("1Mbps"));
Config::SetDefault ("ns3::OnOffApplication::MaxBytes", UintegerValue (512));
// Create leaf nodes on left with system id 0
NodeContainer leftLeafNodes;
leftLeafNodes.Create (4, 0);
// Create router nodes. Left router
// with system id 0, right router with
// system id 1
NodeContainer routerNodes;
Ptr<Node> routerNode1 = CreateObject<Node> (0);
Ptr<Node> routerNode2 = CreateObject<Node> (1);
routerNodes.Add (routerNode1);
routerNodes.Add (routerNode2);
// Create leaf nodes on left with system id 1
NodeContainer rightLeafNodes;
rightLeafNodes.Create (4, 1);
PointToPointHelper routerLink;
routerLink.SetDeviceAttribute ("DataRate", StringValue ("5Mbps"));
routerLink.SetChannelAttribute ("Delay", StringValue ("5ms"));
PointToPointHelper leafLink;
leafLink.SetDeviceAttribute ("DataRate", StringValue ("1Mbps"));
leafLink.SetChannelAttribute ("Delay", StringValue ("2ms"));
// Add link connecting routers
NetDeviceContainer routerDevices;
routerDevices = routerLink.Install (routerNodes);
// Add links for left side leaf nodes to left router
NetDeviceContainer leftRouterDevices;
NetDeviceContainer leftLeafDevices;
for (uint32_t i = 0; i < 4; ++i)
{
NetDeviceContainer temp = leafLink.Install (leftLeafNodes.Get (i), routerNodes.Get (0));
leftLeafDevices.Add (temp.Get (0));
leftRouterDevices.Add (temp.Get (1));
}
// Add links for right side leaf nodes to right router
NetDeviceContainer rightRouterDevices;
NetDeviceContainer rightLeafDevices;
for (uint32_t i = 0; i < 4; ++i)
{
NetDeviceContainer temp = leafLink.Install (rightLeafNodes.Get (i), routerNodes.Get (1));
rightLeafDevices.Add (temp.Get (0));
rightRouterDevices.Add (temp.Get (1));
}
InternetStackHelper stack;
Ipv4NixVectorHelper nixRouting;
Ipv4StaticRoutingHelper staticRouting;
Ipv4ListRoutingHelper list;
list.Add (staticRouting, 0);
list.Add (nixRouting, 10);
if (nix)
{
stack.SetRoutingHelper (list); // has effect on the next Install ()
}
stack.InstallAll ();
Ipv4InterfaceContainer routerInterfaces;
Ipv4InterfaceContainer leftLeafInterfaces;
Ipv4InterfaceContainer leftRouterInterfaces;
Ipv4InterfaceContainer rightLeafInterfaces;
Ipv4InterfaceContainer rightRouterInterfaces;
Ipv4AddressHelper leftAddress;
leftAddress.SetBase ("10.1.1.0", "255.255.255.0");
Ipv4AddressHelper routerAddress;
routerAddress.SetBase ("10.2.1.0", "255.255.255.0");
Ipv4AddressHelper rightAddress;
rightAddress.SetBase ("10.3.1.0", "255.255.255.0");
// Router-to-Router interfaces
routerInterfaces = routerAddress.Assign (routerDevices);
// Left interfaces
for (uint32_t i = 0; i < 4; ++i)
{
NetDeviceContainer ndc;
ndc.Add (leftLeafDevices.Get (i));
ndc.Add (leftRouterDevices.Get (i));
Ipv4InterfaceContainer ifc = leftAddress.Assign (ndc);
leftLeafInterfaces.Add (ifc.Get (0));
leftRouterInterfaces.Add (ifc.Get (1));
leftAddress.NewNetwork ();
}
// Right interfaces
for (uint32_t i = 0; i < 4; ++i)
{
NetDeviceContainer ndc;
ndc.Add (rightLeafDevices.Get (i));
ndc.Add (rightRouterDevices.Get (i));
Ipv4InterfaceContainer ifc = rightAddress.Assign (ndc);
rightLeafInterfaces.Add (ifc.Get (0));
rightRouterInterfaces.Add (ifc.Get (1));
rightAddress.NewNetwork ();
}
if (!nix)
{
Ipv4GlobalRoutingHelper::PopulateRoutingTables ();
}
if (tracing == true)
{
if (systemId == 0)
{
routerLink.EnablePcap("router-left", routerDevices, true);
leafLink.EnablePcap("leaf-left", leftLeafDevices, true);
}
if (systemId == 1)
{
routerLink.EnablePcap("router-right", routerDevices, true);
leafLink.EnablePcap("leaf-right", rightLeafDevices, true);
}
}
// Create a packet sink on the right leafs to receive packets from left leafs
uint16_t port = 50000;
if (systemId == 1)
{
Address sinkLocalAddress (InetSocketAddress (Ipv4Address::GetAny (), port));
PacketSinkHelper sinkHelper ("ns3::UdpSocketFactory", sinkLocalAddress);
ApplicationContainer sinkApp;
for (uint32_t i = 0; i < 4; ++i)
{
auto apps = sinkHelper.Install (rightLeafNodes.Get (i));
auto sink = DynamicCast<PacketSink> (apps.Get (0));
NS_ASSERT_MSG (sink, "Couldn't get PacketSink application.");
if (testing)
{
sink->TraceConnectWithoutContext ("RxWithAddresses", MakeCallback(&SinkTracer::SinkTrace));
}
sinkApp.Add (apps);
}
sinkApp.Start (Seconds (1.0));
sinkApp.Stop (Seconds (5));
}
// Create the OnOff applications to send
if (systemId == 0)
{
OnOffHelper clientHelper ("ns3::UdpSocketFactory", Address ());
clientHelper.SetAttribute
("OnTime", StringValue ("ns3::ConstantRandomVariable[Constant=1]"));
clientHelper.SetAttribute
("OffTime", StringValue ("ns3::ConstantRandomVariable[Constant=0]"));
ApplicationContainer clientApps;
for (uint32_t i = 0; i < 4; ++i)
{
AddressValue remoteAddress
(InetSocketAddress (rightLeafInterfaces.GetAddress (i), port));
clientHelper.SetAttribute ("Remote", remoteAddress);
clientApps.Add (clientHelper.Install (leftLeafNodes.Get (i)));
}
clientApps.Start (Seconds (1.0));
clientApps.Stop (Seconds (5));
}
RANK0COUT (std::endl);
Simulator::Stop (Seconds (5));
Simulator::Run ();
Simulator::Destroy ();
// --------------------------------------------------------------------
// Conditional cleanup based on whether we built a communicator
// and called MPI_Init
if (freeComm)
{
MPI_Comm_free (&splitComm);
}
if (testing)
{
SinkTracer::Verify (4);
}
// Clean up the ns-3 MPI execution environment
// This will call MPI_Finalize if MpiInterface::Initialize was called
MpiInterface::Disable ();
if (init)
{
// We called MPI_Init, so we have to call MPI_Finalize
MPI_Finalize ();
}
return 0;
}

View File

@@ -12,7 +12,11 @@
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
/**
* \file
* \ingroup mpi
*
* TestDistributed creates a dumbbell topology and logically splits it in
* half. The left half is placed on logical processor 0 and the right half
@@ -41,6 +45,8 @@
* right leaf nodes output logging information when they receive the packet.
*/
#include "mpi-test-fixtures.h"
#include "ns3/core-module.h"
#include "ns3/network-module.h"
#include "ns3/mpi-interface.h"
@@ -53,6 +59,8 @@
#include "ns3/packet-sink-helper.h"
#include <mpi.h>
#include <iomanip>
using namespace ns3;
NS_LOG_COMPONENT_DEFINE ("SimpleDistributed");
@@ -63,12 +71,16 @@ main (int argc, char *argv[])
bool nix = true;
bool nullmsg = false;
bool tracing = false;
bool testing = false;
bool verbose = false;
// Parse command line
CommandLine cmd (__FILE__);
cmd.AddValue ("nix", "Enable the use of nix-vector or global routing", nix);
cmd.AddValue ("nullmsg", "Enable the use of null-message synchronization", nullmsg);
cmd.AddValue ("tracing", "Enable pcap tracing", tracing);
cmd.AddValue ("verbose", "verbose output", verbose);
cmd.AddValue ("test", "Enable regression test output", testing);
cmd.Parse (argc, argv);
// Distributed simulation setup; by default use granted time window algorithm.
@@ -86,7 +98,12 @@ main (int argc, char *argv[])
// Enable parallel simulator with the command line arguments
MpiInterface::Enable (&argc, &argv);
LogComponentEnable ("PacketSink", LOG_LEVEL_INFO);
SinkTracer::Init ();
if (verbose)
{
LogComponentEnable ("PacketSink", (LogLevel)(LOG_LEVEL_INFO | LOG_PREFIX_NODE | LOG_PREFIX_TIME));
}
uint32_t systemId = MpiInterface::GetSystemId ();
uint32_t systemCount = MpiInterface::GetSize ();
@@ -225,6 +242,7 @@ main (int argc, char *argv[])
}
// Create a packet sink on the right leafs to receive packets from left leafs
uint16_t port = 50000;
if (systemId == 1)
{
@@ -234,6 +252,10 @@ main (int argc, char *argv[])
for (uint32_t i = 0; i < 4; ++i)
{
sinkApp.Add (sinkHelper.Install (rightLeafNodes.Get (i)));
if (testing)
{
sinkApp.Get (i)->TraceConnectWithoutContext ("RxWithAddresses", MakeCallback (&SinkTracer::SinkTrace));
}
}
sinkApp.Start (Seconds (1.0));
sinkApp.Stop (Seconds (5));
@@ -263,6 +285,12 @@ main (int argc, char *argv[])
Simulator::Stop (Seconds (5));
Simulator::Run ();
Simulator::Destroy ();
if (testing)
{
SinkTracer::Verify (4);
}
// Exit the MPI execution environment
MpiInterface::Disable ();
return 0;

View File

@@ -14,6 +14,8 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "mpi-test-fixtures.h"
#include "ns3/core-module.h"
#include "ns3/point-to-point-module.h"
#include "ns3/network-module.h"
@@ -25,21 +27,30 @@
#include "ns3/yans-wifi-helper.h"
#include "ns3/ssid.h"
// Default Network Topology
//
// (same as third.cc from tutorial)
// Distributed simulation, split across the p2p link
// |
// Rank 0 | Rank 1
// -------------------------|----------------------------
// Wifi 10.1.3.0
// AP
// * * * *
// | | | | 10.1.1.0
// n5 n6 n7 n0 -------------- n1 n2 n3 n4
// point-to-point | | | |
// ================
// LAN 10.1.2.0
#include <iomanip>
/**
* \file
* \ingroup mpi
*
* Distributed version of third.cc from the tutorial.
*
* Default Network Topology
*
* (same as third.cc from tutorial)
* Distributed simulation, split across the p2p link
* |
* Rank 0 | Rank 1
* -------------------------|----------------------------
* Wifi 10.1.3.0
* AP
* * * * *
* | | | | 10.1.1.0
* n5 n6 n7 n0 -------------- n1 n2 n3 n4
* point-to-point | | | |
* ================
* LAN 10.1.2.0
*/
using namespace ns3;
@@ -48,11 +59,12 @@ NS_LOG_COMPONENT_DEFINE ("ThirdExampleDistributed");
int
main (int argc, char *argv[])
{
bool verbose = true;
bool verbose = false;
uint32_t nCsma = 3;
uint32_t nWifi = 3;
bool tracing = false;
bool nullmsg = false;
bool testing = false;
CommandLine cmd (__FILE__);
cmd.AddValue ("nCsma", "Number of \"extra\" CSMA nodes/devices", nCsma);
@@ -60,6 +72,7 @@ main (int argc, char *argv[])
cmd.AddValue ("verbose", "Tell echo applications to log if true", verbose);
cmd.AddValue ("tracing", "Enable pcap tracing", tracing);
cmd.AddValue ("nullmsg", "Enable the use of null-message synchronization", nullmsg);
cmd.AddValue ("test", "Enable regression test output", testing);
cmd.Parse (argc,argv);
@@ -74,8 +87,8 @@ main (int argc, char *argv[])
if (verbose)
{
LogComponentEnable ("UdpEchoClientApplication", LOG_LEVEL_INFO);
LogComponentEnable ("UdpEchoServerApplication", LOG_LEVEL_INFO);
LogComponentEnable ("UdpEchoClientApplication", (LogLevel)(LOG_LEVEL_INFO | LOG_PREFIX_NODE | LOG_PREFIX_TIME));
LogComponentEnable ("UdpEchoServerApplication", (LogLevel)(LOG_LEVEL_INFO | LOG_PREFIX_NODE | LOG_PREFIX_TIME));
}
// Sequential fallback values
@@ -96,6 +109,8 @@ main (int argc, char *argv[])
MpiInterface::Enable (&argc, &argv);
SinkTracer::Init ();
systemId = MpiInterface::GetSystemId ();
systemCount = MpiInterface::GetSize ();
@@ -212,6 +227,11 @@ main (int argc, char *argv[])
ApplicationContainer serverApps = echoServer.Install (csmaNodes.Get (nCsma));
serverApps.Start (Seconds (1.0));
serverApps.Stop (Seconds (10.0));
if (testing)
{
serverApps.Get (0)->TraceConnectWithoutContext ("RxWithAddresses", MakeCallback (&SinkTracer::SinkTrace));
}
}
// If this rank is systemWifi
@@ -228,6 +248,11 @@ main (int argc, char *argv[])
echoClient.Install (wifiStaNodes.Get (nWifi - 1));
clientApps.Start (Seconds (2.0));
clientApps.Stop (Seconds (10.0));
if (testing)
{
clientApps.Get (0)->TraceConnectWithoutContext ("RxWithAddresses", MakeCallback (&SinkTracer::SinkTrace));
}
}
Ipv4GlobalRoutingHelper::PopulateRoutingTables ();
@@ -258,6 +283,11 @@ main (int argc, char *argv[])
Simulator::Run ();
Simulator::Destroy ();
if (testing)
{
SinkTracer::Verify (2);
}
// Exit the MPI execution environment
MpiInterface::Disable ();

View File

@@ -3,16 +3,23 @@
def build(bld):
obj = bld.create_ns3_program('simple-distributed',
['mpi', 'point-to-point', 'internet', 'nix-vector-routing', 'applications'])
obj.source = 'simple-distributed.cc'
obj.source = ['simple-distributed.cc', 'mpi-test-fixtures.cc']
obj = bld.create_ns3_program('simple-distributed-empty-node',
['mpi', 'point-to-point', 'internet', 'nix-vector-routing', 'applications'])
obj.source = ['simple-distributed-empty-node.cc', 'mpi-test-fixtures.cc']
obj = bld.create_ns3_program('simple-distributed-mpi-comm',
['point-to-point', 'internet', 'nix-vector-routing', 'applications'])
obj.source = ['simple-distributed-mpi-comm.cc', 'mpi-test-fixtures.cc']
obj = bld.create_ns3_program('third-distributed',
['mpi', 'point-to-point', 'internet', 'mobility', 'wifi', 'csma', 'applications'])
obj.source = 'third-distributed.cc'
obj.source = ['third-distributed.cc', 'mpi-test-fixtures.cc']
obj = bld.create_ns3_program('nms-p2p-nix-distributed',
['mpi', 'point-to-point', 'internet', 'nix-vector-routing', 'applications'])
obj.source = 'nms-p2p-nix-distributed.cc'
obj.source = ['nms-p2p-nix-distributed.cc', 'mpi-test-fixtures.cc']
obj = bld.create_ns3_program('simple-distributed-empty-node',
['mpi', 'point-to-point', 'internet', 'nix-vector-routing', 'applications'])
obj.source = 'simple-distributed-empty-node.cc'

View File

@@ -17,6 +17,12 @@
*
*/
/**
* \file
* \ingroup mpi
* Implementation of classes ns3::LbtsMessage and ns3::DistributedSimulatorImpl.
*/
#include "distributed-simulator-impl.h"
#include "granted-time-window-mpi-interface.h"
#include "mpi-interface.h"
@@ -73,7 +79,12 @@ LbtsMessage::IsFinished ()
return m_isFinished;
}
Time DistributedSimulatorImpl::m_lookAhead = Seconds (-1);
/**
* Initialize m_lookAhead to maximum, it will be constrained by
* user supplied time via BoundLookAhead and the
* minimum latency network between ranks.
*/
Time DistributedSimulatorImpl::m_lookAhead = Time::Max();
TypeId
DistributedSimulatorImpl::GetTypeId (void)
@@ -158,18 +169,13 @@ DistributedSimulatorImpl::CalculateLookAhead (void)
{
NS_LOG_FUNCTION (this);
/* If runnning sequential simulation can ignore lookahead */
if (MpiInterface::GetSize () <= 1)
{
m_lookAhead = Seconds (0);
}
else
{
if (m_lookAhead == Seconds (-1))
{
m_lookAhead = GetMaximumSimulationTime ();
}
// else it was already set by SetLookAhead
NodeContainer c = NodeContainer::GetGlobal ();
for (NodeContainer::Iterator iter = c.Begin (); iter != c.End (); ++iter)
{
@@ -254,7 +260,7 @@ DistributedSimulatorImpl::CalculateLookAhead (void)
sendbuf = m_lookAhead.GetInteger ();
}
MPI_Allreduce (&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MPI_COMM_WORLD);
MPI_Allreduce (&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MpiInterface::GetCommunicator ());
/* For nodes that did not compute a lookahead use max from ranks
* that did compute a value. An edge case occurs if all nodes have
@@ -270,16 +276,16 @@ DistributedSimulatorImpl::CalculateLookAhead (void)
}
void
DistributedSimulatorImpl::SetMaximumLookAhead (const Time lookAhead)
DistributedSimulatorImpl::BoundLookAhead (const Time lookAhead)
{
if (lookAhead > Time (0))
{
NS_LOG_FUNCTION (this << lookAhead);
m_lookAhead = lookAhead;
m_lookAhead = Min(m_lookAhead, lookAhead);
}
else
{
NS_LOG_WARN ("attempted to set look ahead negative: " << lookAhead);
NS_LOG_WARN ("attempted to set lookahead to a negative time: " << lookAhead);
}
}
@@ -385,7 +391,7 @@ DistributedSimulatorImpl::Run (void)
m_myId, IsLocalFinished (), nextTime);
m_pLBTS[m_myId] = lMsg;
MPI_Allgather (&lMsg, sizeof (LbtsMessage), MPI_BYTE, m_pLBTS,
sizeof (LbtsMessage), MPI_BYTE, MPI_COMM_WORLD);
sizeof (LbtsMessage), MPI_BYTE, MpiInterface::GetCommunicator ());
Time smallestTime = m_pLBTS[0].GetSmallestTime ();
// The totRx and totTx counts insure there are no transient
// messages; If totRx != totTx, there are transients,
@@ -404,6 +410,11 @@ DistributedSimulatorImpl::Run (void)
totTx += m_pLBTS[i].GetTxCount ();
m_globalFinished &= m_pLBTS[i].IsFinished ();
}
// Global halting condition is all nodes have empty queue's and
// no messages are in-flight.
m_globalFinished &= totRx == totTx;
if (totRx == totTx)
{
// If lookahead is infinite then granted time should be as well.

View File

@@ -17,6 +17,12 @@
*
*/
/**
* \file
* \ingroup mpi
* Declaration of classes ns3::LbtsMessage and ns3::DistributedSimulatorImpl.
*/
#ifndef NS3_DISTRIBUTED_SIMULATOR_IMPL_H
#define NS3_DISTRIBUTED_SIMULATOR_IMPL_H
@@ -85,11 +91,11 @@ public:
bool IsFinished ();
private:
uint32_t m_txCount;
uint32_t m_rxCount;
uint32_t m_myId;
Time m_smallestTime;
bool m_isFinished;
uint32_t m_txCount; /**< Count of transmitted messages. */
uint32_t m_rxCount; /**< Count of received messages. */
uint32_t m_myId; /**< System Id of the rank sending this LBTS. */
Time m_smallestTime; /**< Earliest next event timestamp. */
bool m_isFinished; /**< \c true when this rank has no more events. */
};
/**
@@ -101,9 +107,15 @@ private:
class DistributedSimulatorImpl : public SimulatorImpl
{
public:
/**
* Register this type.
* \return The object TypeId.
*/
static TypeId GetTypeId (void);
/** Default constructor. */
DistributedSimulatorImpl ();
/** Destructor. */
~DistributedSimulatorImpl ();
// virtual from SimulatorImpl
@@ -122,41 +134,101 @@ public:
virtual Time Now (void) const;
virtual Time GetDelayLeft (const EventId &id) const;
virtual Time GetMaximumSimulationTime (void) const;
virtual void SetMaximumLookAhead (const Time lookAhead);
virtual void SetScheduler (ObjectFactory schedulerFactory);
virtual uint32_t GetSystemId (void) const;
virtual uint32_t GetContext (void) const;
virtual uint64_t GetEventCount (void) const;
/**
* Add additional bound to lookahead constraints.
*
* This may be used if there are additional constraints on lookahead
* in addition to the minimum inter rank latency time. For example
* when running ns-3 in a co-simulation setting the other simulators
* may have tighter lookahead constraints.
*
* The method may be invoked more than once, the minimum time will
* be used to constrain lookahead.
*
* \param [in] lookAhead The maximum lookahead; must be > 0.
*/
virtual void BoundLookAhead (const Time lookAhead);
private:
// Inherited from Object
virtual void DoDispose (void);
/**
* Calculate lookahead constraint based on network latency.
*
* The smallest cross-rank PointToPoint channel delay imposes
* a constraint on the conservative PDES time window. The
* user may impose additional constraints on lookahead
* using the ConstrainLookAhead() method.
*/
void CalculateLookAhead (void);
/**
* Check if this rank is finished. It's finished when there are
* no more events or stop has been requested.
*
* \returns \c true when this rank is finished.
*/
bool IsLocalFinished (void) const;
/** Process the next event. */
void ProcessOneEvent (void);
/**
* Get the timestep of the next event.
*
* If there are no more events the timestep is infinity.
*
* \return The next event timestep.
*/
uint64_t NextTs (void) const;
/**
* Get the time of the next event, as returned by NextTs().
*
* \return The next event time stamp.
*/
Time Next (void) const;
/** Container type for the events to run at Simulator::Destroy(). */
typedef std::list<EventId> DestroyEvents;
/** The container of events to run at Destroy() */
DestroyEvents m_destroyEvents;
/** Flag calling for the end of the simulation. */
bool m_stop;
bool m_globalFinished; // Are all parallel instances completed.
/** Are all parallel instances completed. */
bool m_globalFinished;
/** The event priority queue. */
Ptr<Scheduler> m_events;
/** Next event unique id. */
uint32_t m_uid;
/** Unique id of the current event. */
uint32_t m_currentUid;
/** Timestamp of the current event. */
uint64_t m_currentTs;
/** Execution context of the current event. */
uint32_t m_currentContext;
/** The event count. */
uint64_t m_eventCount;
// number of events that have been inserted but not yet scheduled,
// not counting the "destroy" events; this is used for validation
/**
* Number of events that have been inserted but not yet scheduled,
* not counting the "destroy" events; this is used for validation.
*/
int m_unscheduledEvents;
LbtsMessage* m_pLBTS; // Allocated once we know how many systems
uint32_t m_myId; // MPI Rank
uint32_t m_systemCount; // MPI Size
Time m_grantedTime; // Last LBTS
static Time m_lookAhead; // Lookahead value
/**
* Container for Lbts messages, one per rank.
* Allocated once we know how many systems there are.
*/
LbtsMessage* m_pLBTS;
uint32_t m_myId; /**< MPI rank. */
uint32_t m_systemCount; /**< MPI communicator size. */
Time m_grantedTime; /**< End of current window. */
static Time m_lookAhead; /**< Current window size. */
};

View File

@@ -14,7 +14,12 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: George Riley <riley@ece.gatech.edu>
*
*/
/**
* \file
* \ingroup mpi
* Implementation of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
*/
// This object contains static methods that provide an easy interface
@@ -42,6 +47,8 @@ namespace ns3 {
NS_LOG_COMPONENT_DEFINE ("GrantedTimeWindowMpiInterface");
NS_OBJECT_ENSURE_REGISTERED (GrantedTimeWindowMpiInterface);
SentBuffer::SentBuffer ()
{
m_buffer = 0;
@@ -71,16 +78,18 @@ SentBuffer::GetRequest ()
return &m_request;
}
uint32_t GrantedTimeWindowMpiInterface::m_sid = 0;
uint32_t GrantedTimeWindowMpiInterface::m_size = 1;
bool GrantedTimeWindowMpiInterface::m_initialized = false;
bool GrantedTimeWindowMpiInterface::m_enabled = false;
uint32_t GrantedTimeWindowMpiInterface::m_rxCount = 0;
uint32_t GrantedTimeWindowMpiInterface::m_txCount = 0;
std::list<SentBuffer> GrantedTimeWindowMpiInterface::m_pendingTx;
uint32_t GrantedTimeWindowMpiInterface::g_sid = 0;
uint32_t GrantedTimeWindowMpiInterface::g_size = 1;
bool GrantedTimeWindowMpiInterface::g_enabled = false;
bool GrantedTimeWindowMpiInterface::g_mpiInitCalled = false;
uint32_t GrantedTimeWindowMpiInterface::g_rxCount = 0;
uint32_t GrantedTimeWindowMpiInterface::g_txCount = 0;
std::list<SentBuffer> GrantedTimeWindowMpiInterface::g_pendingTx;
MPI_Request* GrantedTimeWindowMpiInterface::m_requests;
char** GrantedTimeWindowMpiInterface::m_pRxBuffers;
MPI_Request* GrantedTimeWindowMpiInterface::g_requests;
char** GrantedTimeWindowMpiInterface::g_pRxBuffers;
MPI_Comm GrantedTimeWindowMpiInterface::g_communicator = MPI_COMM_WORLD;
bool GrantedTimeWindowMpiInterface::g_freeCommunicator = false;;
TypeId
GrantedTimeWindowMpiInterface::GetTypeId (void)
@@ -99,79 +108,100 @@ GrantedTimeWindowMpiInterface::Destroy ()
for (uint32_t i = 0; i < GetSize (); ++i)
{
delete [] m_pRxBuffers[i];
delete [] g_pRxBuffers[i];
}
delete [] m_pRxBuffers;
delete [] m_requests;
delete [] g_pRxBuffers;
delete [] g_requests;
m_pendingTx.clear ();
g_pendingTx.clear ();
}
uint32_t
GrantedTimeWindowMpiInterface::GetRxCount ()
{
return m_rxCount;
NS_ASSERT (g_enabled);
return g_rxCount;
}
uint32_t
GrantedTimeWindowMpiInterface::GetTxCount ()
{
return m_txCount;
NS_ASSERT (g_enabled);
return g_txCount;
}
uint32_t
GrantedTimeWindowMpiInterface::GetSystemId ()
{
if (!m_initialized)
{
Simulator::GetImplementation ();
m_initialized = true;
}
return m_sid;
NS_ASSERT (g_enabled);
return g_sid;
}
uint32_t
GrantedTimeWindowMpiInterface::GetSize ()
{
if (!m_initialized)
{
Simulator::GetImplementation ();
m_initialized = true;
}
return m_size;
NS_ASSERT (g_enabled);
return g_size;
}
bool
GrantedTimeWindowMpiInterface::IsEnabled ()
{
if (!m_initialized)
{
Simulator::GetImplementation ();
m_initialized = true;
}
return m_enabled;
return g_enabled;
}
MPI_Comm
GrantedTimeWindowMpiInterface::GetCommunicator()
{
NS_ASSERT (g_enabled);
return g_communicator;
}
void
GrantedTimeWindowMpiInterface::Enable (int* pargc, char*** pargv)
{
NS_LOG_FUNCTION (this << pargc << pargv);
NS_LOG_FUNCTION (this << pargc << pargv);
NS_ASSERT (g_enabled == false);
// Initialize the MPI interface
MPI_Init (pargc, pargv);
MPI_Barrier (MPI_COMM_WORLD);
MPI_Comm_rank (MPI_COMM_WORLD, reinterpret_cast <int *> (&m_sid));
MPI_Comm_size (MPI_COMM_WORLD, reinterpret_cast <int *> (&m_size));
m_enabled = true;
m_initialized = true;
Enable (MPI_COMM_WORLD);
g_mpiInitCalled = true;
g_enabled = true;
}
void
GrantedTimeWindowMpiInterface::Enable (MPI_Comm communicator)
{
NS_LOG_FUNCTION (this);
NS_ASSERT (g_enabled == false);
// Standard MPI practice is to duplicate the communicator for
// library to use. Library communicates in isolated communication
// context.
MPI_Comm_dup (communicator, &g_communicator);
g_freeCommunicator = true;
MPI_Barrier (g_communicator);
int mpiSystemId;
int mpiSize;
MPI_Comm_rank (g_communicator, &mpiSystemId);
MPI_Comm_size (g_communicator, &mpiSize);
g_sid = mpiSystemId;
g_size = mpiSize;
g_enabled = true;
// Post a non-blocking receive for all peers
m_pRxBuffers = new char*[m_size];
m_requests = new MPI_Request[m_size];
g_pRxBuffers = new char*[g_size];
g_requests = new MPI_Request[g_size];
for (uint32_t i = 0; i < GetSize (); ++i)
{
m_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE];
MPI_Irecv (m_pRxBuffers[i], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
MPI_COMM_WORLD, &m_requests[i]);
g_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE];
MPI_Irecv (g_pRxBuffers[i], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
g_communicator, &g_requests[i]);
}
}
@@ -181,8 +211,8 @@ GrantedTimeWindowMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, ui
NS_LOG_FUNCTION (this << p << rxTime.GetTimeStep () << node << dev);
SentBuffer sendBuf;
m_pendingTx.push_back (sendBuf);
std::list<SentBuffer>::reverse_iterator i = m_pendingTx.rbegin (); // Points to the last element
g_pendingTx.push_back (sendBuf);
std::list<SentBuffer>::reverse_iterator i = g_pendingTx.rbegin (); // Points to the last element
uint32_t serializedSize = p->GetSerializedSize ();
uint8_t* buffer = new uint8_t[serializedSize + 16];
@@ -202,8 +232,8 @@ GrantedTimeWindowMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, ui
uint32_t nodeSysId = destNode->GetSystemId ();
MPI_Isend (reinterpret_cast<void *> (i->GetBuffer ()), serializedSize + 16, MPI_CHAR, nodeSysId,
0, MPI_COMM_WORLD, (i->GetRequest ()));
m_txCount++;
0, g_communicator, (i->GetRequest ()));
g_txCount++;
}
void
@@ -218,17 +248,17 @@ GrantedTimeWindowMpiInterface::ReceiveMessages ()
int index = 0;
MPI_Status status;
MPI_Testany (MpiInterface::GetSize (), m_requests, &index, &flag, &status);
MPI_Testany (MpiInterface::GetSize (), g_requests, &index, &flag, &status);
if (!flag)
{
break; // No more messages
}
int count;
MPI_Get_count (&status, MPI_CHAR, &count);
m_rxCount++; // Count this receive
g_rxCount++; // Count this receive
// Get the meta data first
uint64_t* pTime = reinterpret_cast<uint64_t *> (m_pRxBuffers[index]);
uint64_t* pTime = reinterpret_cast<uint64_t *> (g_pRxBuffers[index]);
uint64_t time = *pTime++;
uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
uint32_t node = *pData++;
@@ -261,8 +291,8 @@ GrantedTimeWindowMpiInterface::ReceiveMessages ()
&MpiReceiver::Receive, pMpiRec, p);
// Re-queue the next read
MPI_Irecv (m_pRxBuffers[index], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
MPI_COMM_WORLD, &m_requests[index]);
MPI_Irecv (g_pRxBuffers[index], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
g_communicator, &g_requests[index]);
}
}
@@ -271,8 +301,8 @@ GrantedTimeWindowMpiInterface::TestSendComplete ()
{
NS_LOG_FUNCTION_NOARGS ();
std::list<SentBuffer>::iterator i = m_pendingTx.begin ();
while (i != m_pendingTx.end ())
std::list<SentBuffer>::iterator i = g_pendingTx.begin ();
while (i != g_pendingTx.end ())
{
MPI_Status status;
int flag = 0;
@@ -281,7 +311,7 @@ GrantedTimeWindowMpiInterface::TestSendComplete ()
i++; // Advance to next
if (flag)
{ // This message is complete
m_pendingTx.erase (current);
g_pendingTx.erase (current);
}
}
}
@@ -291,18 +321,29 @@ GrantedTimeWindowMpiInterface::Disable ()
{
NS_LOG_FUNCTION_NOARGS ();
int flag = 0;
MPI_Initialized (&flag);
if (flag)
if (g_freeCommunicator)
{
MPI_Finalize ();
m_enabled = false;
m_initialized = false;
MPI_Comm_free (&g_communicator);
g_freeCommunicator = false;
}
else
// ns-3 should MPI finalize only if ns-3 was used to initialize
if (g_mpiInitCalled)
{
NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first");
int flag = 0;
MPI_Initialized (&flag);
if (flag)
{
MPI_Finalize ();
}
else
{
NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first");
}
g_mpiInitCalled = false;
}
g_enabled = false;
}

View File

@@ -14,7 +14,12 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: George Riley <riley@ece.gatech.edu>
*
*/
/**
* \file
* \ingroup mpi
* Declaration of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
*/
// This object contains static methods that provide an easy interface
@@ -69,11 +74,12 @@ public:
MPI_Request* GetRequest ();
private:
uint8_t* m_buffer;
MPI_Request m_request;
uint8_t* m_buffer; /**< The buffer. */
MPI_Request m_request; /**< The MPI request handle. */
};
class Packet;
class DistributedSimulatorImpl;
/**
* \ingroup mpi
@@ -87,46 +93,33 @@ class Packet;
class GrantedTimeWindowMpiInterface : public ParallelCommunicationInterface, Object
{
public:
/**
* Register this type.
* \return The object TypeId.
*/
static TypeId GetTypeId (void);
/**
* Delete all buffers
*/
// Inherited
virtual void Destroy ();
/**
* \return MPI rank
*/
virtual uint32_t GetSystemId ();
/**
* \return MPI size (number of systems)
*/
virtual uint32_t GetSize ();
/**
* \return true if using MPI
*/
virtual bool IsEnabled ();
/**
* \param pargc number of command line arguments
* \param pargv command line arguments
*
* Sets up MPI interface
*/
virtual void Enable (int* pargc, char*** pargv);
/**
* Terminates the MPI environment by calling MPI_Finalize
* This function must be called after Destroy ()
* It also resets m_initialized, m_enabled
*/
virtual void Disable ();
/**
* \param p packet to send
* \param rxTime received time at destination node
* \param node destination node
* \param dev destination device
*
* Serialize and send a packet to the specified node and net device
*/
virtual void Enable (MPI_Comm communicator);
virtual void Disable();
virtual void SendPacket (Ptr<Packet> p, const Time &rxTime, uint32_t node, uint32_t dev);
virtual MPI_Comm GetCommunicator();
private:
/*
* The granted time window implementation is a collaboration of several
* classes. Methods that should be invoked only by the
* collaborators are private to restrict use.
* It is not intended for state to be shared.
*/
friend ns3::DistributedSimulatorImpl;
/**
* Check for received messages complete
*/
@@ -143,27 +136,41 @@ public:
* \return transmitted count in packets
*/
static uint32_t GetTxCount ();
/** System ID (rank) for this task. */
static uint32_t g_sid;
/** Size of the MPI COM_WORLD group. */
static uint32_t g_size;
private:
static uint32_t m_sid;
static uint32_t m_size;
/** Total packets received. */
static uint32_t g_rxCount;
// Total packets received
static uint32_t m_rxCount;
/** Total packets sent. */
static uint32_t g_txCount;
// Total packets sent
static uint32_t m_txCount;
static bool m_initialized;
static bool m_enabled;
/** Has this interface been enabled. */
static bool g_enabled;
// Pending non-blocking receives
static MPI_Request* m_requests;
/**
* Has MPI Init been called by this interface.
* Alternatively user supplies a communicator.
*/
static bool g_mpiInitCalled;
// Data buffers for non-blocking reads
static char** m_pRxBuffers;
/** Pending non-blocking receives. */
static MPI_Request* g_requests;
// List of pending non-blocking sends
static std::list<SentBuffer> m_pendingTx;
/** Data buffers for non-blocking reads. */
static char** g_pRxBuffers;
/** List of pending non-blocking sends. */
static std::list<SentBuffer> g_pendingTx;
/** MPI communicator being used for ns-3 tasks. */
static MPI_Comm g_communicator;
/** Did ns-3 create the communicator? Have to free it. */
static bool g_freeCommunicator;
};
} // namespace ns3

View File

@@ -16,7 +16,12 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Steven Smith <smith84@llnl.gov>
*
*/
/**
* \file
* \ingroup mpi
* Implementation of class ns3::MpiInterface.
*/
#include "mpi-interface.h"
@@ -73,7 +78,7 @@ MpiInterface::IsEnabled ()
}
void
MpiInterface::Enable (int* pargc, char*** pargv)
MpiInterface::SetParallelSimulatorImpl (void)
{
StringValue simulationTypeValue;
bool useDefault = true;
@@ -104,10 +109,24 @@ MpiInterface::Enable (int* pargc, char*** pargv)
StringValue ("ns3::DistributedSimulatorImpl"));
NS_LOG_WARN ("SimulatorImplementationType was set to non-parallel simulator; setting type to ns3::DistributedSimulatorImp");
}
}
void
MpiInterface::Enable (int* pargc, char*** pargv)
{
SetParallelSimulatorImpl ();
g_parallelCommunicationInterface->Enable (pargc, pargv);
}
void
MpiInterface::Enable (MPI_Comm communicator)
{
SetParallelSimulatorImpl ();
g_parallelCommunicationInterface->Enable (communicator);
}
void
MpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev)
{
@@ -115,6 +134,13 @@ MpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint
g_parallelCommunicationInterface->SendPacket (p, rxTime, node, dev);
}
MPI_Comm
MpiInterface::GetCommunicator()
{
NS_ASSERT (g_parallelCommunicationInterface);
return g_parallelCommunicationInterface->GetCommunicator ();
}
void
MpiInterface::Disable ()

View File

@@ -16,7 +16,12 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Steven Smith <smith84@llnl.gov>
*
*/
/**
* \file
* \ingroup mpi
* Declaration of class ns3::MpiInterface.
*/
#ifndef NS3_MPI_INTERFACE_H
@@ -25,6 +30,8 @@
#include <ns3/nstime.h>
#include <ns3/packet.h>
#include "mpi.h"
namespace ns3 {
/**
* \defgroup mpi MPI Distributed Simulation
@@ -51,41 +58,78 @@ class MpiInterface
{
public:
/**
* Deletes storage used by the parallel environment.
* \brief Deletes storage used by the parallel environment.
*/
static void Destroy ();
/**
* \return system identification
* \brief Get the id number of this rank.
*
* When running a sequential simulation this will return a systemID of 0.
*
* \return system identification
*/
static uint32_t GetSystemId ();
/**
* \return number of parallel tasks
* \brief Get the number of ranks used by ns-3.
*
* When running a sequential simulation this will return a size of 1.
* Returns the size (number of MPI ranks) of the communicator used by
* ns-3. When running a sequential simulation this will return a
* size of 1.
*
* \return number of parallel tasks
*/
static uint32_t GetSize ();
/**
* \brief Returns enabled state of parallel environment.
*
* \return true if parallel communication is enabled
*/
static bool IsEnabled ();
/**
* \brief Setup the parallel communication interface.
*
* There are two ways to setup the communications interface. This
* Enable method is the easiest method and should be used in most
* situations.
*
* Disable() must be invoked at end of an ns-3 application to
* properly cleanup the parallel communication interface.
*
* This method will call MPI_Init and configure ns-3 to use the
* MPI_COMM_WORLD communicator.
*
* For more complex situations, such as embedding ns-3 with other
* MPI simulators or libraries, the Enable(MPI_Comm communcicator)
* may be used if MPI is initialized externally or if ns-3 needs to
* be run unique communicator. For example if there are two
* parallel simulators and the goal is to run each simulator on a
* different set of ranks.
*
* \note The `SimulatorImplementationType attribute in
* ns3::GlobalValues must be set before calling Enable()
*
* \param pargc number of command line arguments
* \param pargv command line arguments
*
* \brief Sets up parallel communication interface.
*
* SimulatorImplementationType attribute in ns3::GlobalValues must be set before
* Enable is invoked.
*/
static void Enable (int* pargc, char*** pargv);
/**
* Terminates the parallel environment.
* This function must be called after Destroy ()
* \brief Setup the parallel communication interface using the specified communicator.
*
* See @ref Enable (int* pargc, char*** pargv) for additional information.
*
* \param communicator MPI Communicator that should be used by ns-3
*/
static void Enable (MPI_Comm communicator);
/**
* \brief Clean up the ns-3 parallel communications interface.
*
* MPI_Finalize will be called only if Enable (int* pargc, char***
* pargv) was called.
*/
static void Disable ();
/**
* \brief Send a packet to a remote node.
*
* \param p packet to send
* \param rxTime received time at destination node
* \param node destination node
@@ -94,8 +138,25 @@ public:
* Serialize and send a packet to the specified node and net device
*/
static void SendPacket (Ptr<Packet> p, const Time &rxTime, uint32_t node, uint32_t dev);
/**
* \brief Return the communicator used to run ns-3.
*
* The communicator returned will be MPI_COMM_WORLD if Enable (int*
* pargc, char*** pargv) is used to enable or the user specified
* communicator if Enable (MPI_Comm communicator) is used.
*
* \return The MPI Communicator.
*/
static MPI_Comm GetCommunicator();
private:
/**
* Common enable logic.
*/
static void SetParallelSimulatorImpl (void);
/**
* Static instance of the instantiated parallel controller.
*/

View File

@@ -16,6 +16,13 @@
* Author: George Riley <riley@ece.gatech.edu>
*/
/**
* \file
* \ingroup mpi
* ns3::MpiReciver implementation,
* provides an interface to aggregate to MPI-compatible NetDevices.
*/
#include "mpi-receiver.h"
namespace ns3 {

View File

@@ -16,7 +16,12 @@
* Author: George Riley <riley@ece.gatech.edu>
*/
// Provides an interface to aggregate to MPI-compatible NetDevices
/**
* \file
* \ingroup mpi
* ns3::MpiReciver declaration,
* provides an interface to aggregate to MPI-compatible NetDevices.
*/
#ifndef NS3_MPI_RECEIVER_H
#define NS3_MPI_RECEIVER_H
@@ -42,6 +47,10 @@ namespace ns3 {
class MpiReceiver : public Object
{
public:
/**
* Register this type.
* \return The object TypeId.
*/
static TypeId GetTypeId (void);
virtual ~MpiReceiver ();
@@ -58,6 +67,7 @@ public:
private:
virtual void DoDispose (void);
/** Callback to send received packets to. */
Callback<void, Ptr<Packet> > m_rxCallback;
};

View File

@@ -19,6 +19,12 @@
*
*/
/**
* \file
* \ingroup mpi
* Implementation of classes ns3::NullMessageSentBuffer and ns3::NullMessageMpiInterface.
*/
#include "null-message-mpi-interface.h"
#include "null-message-simulator-impl.h"
@@ -42,6 +48,47 @@
namespace ns3 {
NS_LOG_COMPONENT_DEFINE ("NullMessageMpiInterface");
NS_OBJECT_ENSURE_REGISTERED (NullMessageMpiInterface);
/**
* \ingroup mpi
*
* \brief Non-blocking send buffers for Null Message implementation.
*
* One buffer is allocated for each non-blocking send.
*/
class NullMessageSentBuffer
{
public:
NullMessageSentBuffer ();
~NullMessageSentBuffer ();
/**
* \return pointer to sent buffer
*/
uint8_t* GetBuffer ();
/**
* \param buffer pointer to sent buffer
*/
void SetBuffer (uint8_t* buffer);
/**
* \return MPI request
*/
MPI_Request* GetRequest ();
private:
/**
* Buffer for send.
*/
uint8_t* m_buffer;
/**
* MPI request posted for the send.
*/
MPI_Request m_request;
};
/**
* maximum MPI message size for easy
@@ -81,13 +128,26 @@ NullMessageSentBuffer::GetRequest ()
uint32_t NullMessageMpiInterface::g_sid = 0;
uint32_t NullMessageMpiInterface::g_size = 1;
uint32_t NullMessageMpiInterface::g_numNeighbors = 0;
bool NullMessageMpiInterface::g_initialized = false;
bool NullMessageMpiInterface::g_enabled = false;
bool NullMessageMpiInterface::g_mpiInitCalled = false;
std::list<NullMessageSentBuffer> NullMessageMpiInterface::g_pendingTx;
MPI_Comm NullMessageMpiInterface::g_communicator = MPI_COMM_WORLD;
bool NullMessageMpiInterface::g_freeCommunicator = false;
MPI_Request* NullMessageMpiInterface::g_requests;
char** NullMessageMpiInterface::g_pRxBuffers;
TypeId
NullMessageMpiInterface::GetTypeId (void)
{
static TypeId tid = TypeId ("ns3::NullMessageMpiInterface")
.SetParent<Object> ()
.SetGroupName ("Mpi")
;
return tid;
}
NullMessageMpiInterface::NullMessageMpiInterface ()
{
NS_LOG_FUNCTION (this);
@@ -118,14 +178,16 @@ NullMessageMpiInterface::GetSize ()
return g_size;
}
MPI_Comm
NullMessageMpiInterface::GetCommunicator()
{
NS_ASSERT (g_enabled);
return g_communicator;
}
bool
NullMessageMpiInterface::IsEnabled ()
{
if (!g_initialized)
{
Simulator::GetImplementation ();
g_initialized = true;
}
return g_enabled;
}
@@ -134,21 +196,39 @@ NullMessageMpiInterface::Enable (int* pargc, char*** pargv)
{
NS_LOG_FUNCTION (this << *pargc);
NS_ASSERT (g_enabled == false);
// Initialize the MPI interface
MPI_Init (pargc, pargv);
MPI_Barrier (MPI_COMM_WORLD);
Enable (MPI_COMM_WORLD);
g_mpiInitCalled = true;
}
void
NullMessageMpiInterface::Enable (MPI_Comm communicator)
{
NS_LOG_FUNCTION (this);
NS_ASSERT (g_enabled == false);
// Standard MPI practice is to duplicate the communicator for
// library to use. Library communicates in isolated communication
// context.
MPI_Comm_dup (communicator, &g_communicator);
g_freeCommunicator = true;
// SystemId and Size are unit32_t in interface but MPI uses int so convert.
int mpiSystemId;
int mpiSize;
MPI_Comm_rank (MPI_COMM_WORLD, &mpiSystemId);
MPI_Comm_size (MPI_COMM_WORLD, &mpiSize);
MPI_Comm_rank (g_communicator, &mpiSystemId);
MPI_Comm_size (g_communicator, &mpiSize);
g_sid = mpiSystemId;
g_size = mpiSize;
g_enabled = true;
g_initialized = true;
MPI_Barrier(g_communicator);
}
void
@@ -170,7 +250,7 @@ NullMessageMpiInterface::InitializeSendReceiveBuffers(void)
{
g_pRxBuffers[index] = new char[NULL_MESSAGE_MAX_MPI_MSG_SIZE];
MPI_Irecv (g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, rank, 0,
MPI_COMM_WORLD, &g_requests[index]);
g_communicator, &g_requests[index]);
++index;
}
}
@@ -210,7 +290,7 @@ NullMessageMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t
p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
0, MPI_COMM_WORLD, (iter->GetRequest ()));
0, g_communicator, (iter->GetRequest ()));
NullMessageSimulatorImpl::GetInstance ()->RescheduleNullMessageEvent (nodeSysId);
}
@@ -241,7 +321,7 @@ NullMessageMpiInterface::SendNullMessage (const Time& guarantee_update, Ptr<Remo
uint32_t nodeSysId = bundle->GetSystemId ();
MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
0, MPI_COMM_WORLD, (iter->GetRequest ()));
0, g_communicator, (iter->GetRequest ()));
}
void
@@ -261,7 +341,6 @@ NullMessageMpiInterface::ReceiveMessagesNonBlocking ()
ReceiveMessages(false);
}
void
NullMessageMpiInterface::ReceiveMessages (bool blocking)
{
@@ -348,7 +427,7 @@ NullMessageMpiInterface::ReceiveMessages (bool blocking)
// Re-queue the next read
MPI_Irecv (g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, status.MPI_SOURCE, 0,
MPI_COMM_WORLD, &g_requests[index]);
g_communicator, &g_requests[index]);
}
else
@@ -387,11 +466,8 @@ NullMessageMpiInterface::Disable ()
{
NS_LOG_FUNCTION (this);
int flag = 0;
MPI_Initialized (&flag);
if (flag)
if (g_enabled)
{
for (std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin ();
iter != g_pendingTx.end ();
++iter)
@@ -406,7 +482,6 @@ NullMessageMpiInterface::Disable ()
MPI_Request_free (&g_requests[i]);
}
MPI_Finalize ();
for (uint32_t i = 0; i < g_numNeighbors; ++i)
{
@@ -417,9 +492,29 @@ NullMessageMpiInterface::Disable ()
g_pendingTx.clear ();
g_enabled = false;
g_initialized = false;
if (g_freeCommunicator)
{
MPI_Comm_free (&g_communicator);
g_freeCommunicator = false;
}
if (g_mpiInitCalled)
{
int flag = 0;
MPI_Initialized (&flag);
if (flag)
{
MPI_Finalize ();
}
else
{
NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first");
}
}
g_enabled = false;
g_mpiInitCalled = false;
}
else
{

View File

@@ -16,7 +16,12 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Steven Smith <smith84@llnl.gov>
*
*/
/**
* \file
* \ingroup mpi
* Declaration of classes ns3::NullMessageSentBuffer and ns3::NullMessageMpiInterface.
*/
#ifndef NS3_NULLMESSAGE_MPI_INTERFACE_H
@@ -32,132 +37,68 @@
namespace ns3 {
class NullMessageSimulatorImpl;
class NullMessageSentBuffer;
class RemoteChannelBundle;
class Packet;
/**
* \ingroup mpi
*
* \brief Non-blocking send buffers for Null Message implementation.
*
* One buffer is allocated for each non-blocking send.
*/
class NullMessageSentBuffer
{
public:
NullMessageSentBuffer ();
~NullMessageSentBuffer ();
/**
* \return pointer to sent buffer
*/
uint8_t* GetBuffer ();
/**
* \param buffer pointer to sent buffer
*/
void SetBuffer (uint8_t* buffer);
/**
* \return MPI request
*/
MPI_Request* GetRequest ();
private:
/**
* Buffer for send.
*/
uint8_t* m_buffer;
/**
* MPI request posted for the send.
*/
MPI_Request m_request;
};
/**
* \ingroup mpi
*
* \brief Interface between ns-3 and MPI for the Null Message
* distributed simulation implementation.
*/
class NullMessageMpiInterface : public ParallelCommunicationInterface
class NullMessageMpiInterface : public ParallelCommunicationInterface, Object
{
public:
/**
* Register this type.
* \return The object TypeId.
*/
static TypeId GetTypeId (void);
NullMessageMpiInterface ();
~NullMessageMpiInterface ();
/**
* Delete all buffers
*/
// Inherited
virtual void Destroy ();
/**
* \return system id (MPI rank)
*/
virtual uint32_t GetSystemId ();
/**
* \return number of systems (MPI size)
*/
virtual uint32_t GetSize ();
/**
* \return true if interface is enabled
*/
virtual bool IsEnabled ();
/**
* \param pargc number of command line arguments
* \param pargv command line arguments
*
* Sets up interface. Calls MPI Init and
* posts receives.
*/
virtual void Enable (int* pargc, char*** pargv);
/**
* Terminates the MPI environment by calling MPI_Finalize This
* function must be called after Destroy (). Resets m_initialized
* and m_enabled.
*/
virtual void Enable (MPI_Comm communicator);
virtual void Disable ();
/**
* \param p packet to send
* \param rxTime received time at destination node
* \param node destination node
* \param dev destination device
*
* Serialize and send a packet to the specified node and net device.
*
* \internal
* The MPI buffer format packs a delivery information and the serialized packet.
*
* uint64_t time the packed should be delivered
* uint64_t guarantee time for the Null Message algorithm.
* uint32_t node id of destination
* unit32_t dev id on destination
* uint8_t[] serialized packet
*/
virtual void SendPacket (Ptr<Packet> p, const Time &rxTime, uint32_t node, uint32_t dev);
virtual MPI_Comm GetCommunicator();
private:
/**
* The null message implementation is a collaboration of several
* classes. Methods that should be invoked only by the
* collaborators are private to restrict use.
* It is not intended for state to be shared.
*/
friend ns3::RemoteChannelBundle;
friend ns3::NullMessageSimulatorImpl;
/**
* \param guaranteeUpdate guarantee update time for the Null Message
* \bundle the destination bundle for the Null Message.
*
* \brief Send a Null Message to across the specified bundle.
*
* Guarantee update time is the lower bound time on the next
* possible event from this MPI task to the remote MPI task across
* the bundle. Remote task may execute events up to time.
*
* Null Messages are sent when a packet has not been sent across
* this bundle in order to allow time advancement on the remote
* MPI task.
*
* \internal
* The Null Message MPI buffer format is based on the format for sending a packet with
* several fields set to 0 to signal that it is a Null Message. Overloading the normal packet
* format simplifies receive logic.
* \param [in] guaranteeUpdate Lower bound time on the next
* possible event from this MPI task to the remote MPI task across
* the bundle. Remote task may execute events up to this time.
*
* uint64_t 0 must be zero for Null Message
* uint64_t guarantee time
* uint32_t 0 must be zero for Null Message
* uint32_t 0 must be zero for Null Message
* \param [in] bundle The bundle of links between two ranks.
*
* \internal The Null Message MPI buffer format uses the same packet
* metadata format as sending a normal packet with the time,
* destination node, and destination device set to zero. Using the
* same packet metadata simplifies receive logic.
*/
static void SendNullMessage (const Time& guaranteeUpdate, Ptr<RemoteChannelBundle> bundle);
/**
@@ -183,37 +124,49 @@ public:
*/
static void InitializeSendReceiveBuffers (void);
private:
/**
* Check for received messages complete. Will block until message
* has been received if blocking flag is true. When blocking will
* return after the first message is received. Non-blocking mode will
* Non-blocking check for received messages complete. Will
* receive all messages that are queued up locally.
* only check for received messages complete, and return
* all messages that are queued up locally.
*
* \param [in] blocking Whether this call should block.
*/
static void ReceiveMessages (bool blocking = false);
// System ID (rank) for this task
/** System ID (rank) for this task. */
static uint32_t g_sid;
// Size of the MPI COM_WORLD group.
/** Size of the MPI COM_WORLD group. */
static uint32_t g_size;
// Number of neighbor tasks, tasks that this task shares a link with.
/** Number of neighbor tasks, tasks that this task shares a link with. */
static uint32_t g_numNeighbors;
static bool g_initialized;
/** Has this interface been enabled. */
static bool g_enabled;
// Pending non-blocking receives
/**
* Has MPI Init been called by this interface.
* Alternatively user supplies a communicator.
*/
static bool g_mpiInitCalled;
/** Pending non-blocking receives. */
static MPI_Request* g_requests;
// Data buffers for non-blocking receives
/** Data buffers for non-blocking receives. */
static char** g_pRxBuffers;
// List of pending non-blocking sends
/** List of pending non-blocking sends. */
static std::list<NullMessageSentBuffer> g_pendingTx;
/** MPI communicator being used for ns-3 tasks. */
static MPI_Comm g_communicator;
/** Did we create the communicator? Have to free it. */
static bool g_freeCommunicator;
};
} // namespace ns3

View File

@@ -16,9 +16,15 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Steven Smith <smith84@llnl.gov>
*
*/
/**
* \file
* \ingroup mpi
* Implementation of class ns3::NullMessageSimulatorImpl.
*/
#include "null-message-simulator-impl.h"
#include "null-message-mpi-interface.h"
@@ -369,14 +375,6 @@ NullMessageSimulatorImpl::GetSystemId () const
return m_myId;
}
void
NullMessageSimulatorImpl::RunOneEvent (void)
{
NS_LOG_FUNCTION (this);
ProcessOneEvent ();
}
void
NullMessageSimulatorImpl::Stop (void)
{

View File

@@ -16,9 +16,15 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Steven Smith <smith84@llnl.gov>
*
*/
/**
* \file
* \ingroup mpi
* Declaration of class ns3::NullMessageSimulatorImpl.
*/
#ifndef NULLMESSAGE_SIMULATOR_IMPL_H
#define NULLMESSAGE_SIMULATOR_IMPL_H
@@ -45,10 +51,16 @@ class RemoteChannelBundle;
class NullMessageSimulatorImpl : public SimulatorImpl
{
public:
/**
* Register this type.
* \return The object TypeId.
*/
static TypeId GetTypeId (void);
/** Default constructor. */
NullMessageSimulatorImpl ();
/** Destructor. */
~NullMessageSimulatorImpl ();
// virtual from SimulatorImpl
@@ -64,7 +76,7 @@ public:
virtual void Cancel (const EventId &id);
virtual bool IsExpired (const EventId &id) const;
virtual void Run (void);
virtual void RunOneEvent (void);
virtual Time Now (void) const;
virtual Time GetDelayLeft (const EventId &id) const;
virtual Time GetMaximumSimulationTime (void) const;
@@ -98,7 +110,7 @@ private:
virtual void DoDispose (void);
/**
* Calculate the look ahead allowable for this MPI task. Basically
* Calculate the lookahead allowable for this MPI task. Basically
* the minimum latency on links to neighbor MPI tasks.
*/
void CalculateLookAhead (void);
@@ -168,31 +180,42 @@ private:
*/
void NullMessageEventHandler(RemoteChannelBundle* bundle);
/** Container type for the events to run at Simulator::Destroy(). */
typedef std::list<EventId> DestroyEvents;
/** The container of events to run at Destroy() */
DestroyEvents m_destroyEvents;
/** Flag calling for the end of the simulation. */
bool m_stop;
/** The event priority queue. */
Ptr<Scheduler> m_events;
/** Next event unique id. */
uint32_t m_uid;
/** Unique id of the current event. */
uint32_t m_currentUid;
/** Timestamp of the current event. */
uint64_t m_currentTs;
/** Execution context of the current event. */
uint32_t m_currentContext;
/** The event count. */
uint64_t m_eventCount;
// number of events that have been inserted but not yet scheduled,
// not counting the "destroy" events; this is used for validation
/**
* Number of events that have been inserted but not yet scheduled,
* not counting the "destroy" events; this is used for validation.
*/
int m_unscheduledEvents;
uint32_t m_myId; // MPI Rank
uint32_t m_systemCount; // MPI Size
uint32_t m_myId; /**< MPI rank. */
uint32_t m_systemCount; /**< MPI communicator size. */
/*
/**
* The time for which it is safe for this task to execute events
* without danger of out-of-order events.
*/
Time m_safeTime;
/*
/**
* Null Message performance tuning parameter. Controls when Null
* messages are sent. When value is 1 the minimum number of Null
* messages are sent conserving bandwidth. The delay in arrival of
@@ -204,9 +227,7 @@ private:
*/
double m_schedulerTune;
/*
* Singleton instance.
*/
/** Singleton instance. */
static NullMessageSimulatorImpl* g_instance;
};

View File

@@ -19,6 +19,12 @@
*
*/
/**
* \file
* \ingroup mpi
* Declaration of class ns3::ParallelCommunicationInterface.
*/
#ifndef NS3_PARALLEL_COMMUNICATION_INTERFACE_H
#define NS3_PARALLEL_COMMUNICATION_INTERFACE_H
@@ -40,11 +46,17 @@ namespace ns3 {
* \brief Pure virtual base class for the interface between ns-3 and
* the parallel communication layer being used.
*
* Each type of parallel communication layer is required to implement
* this interface. This interface is called through the
* MpiInterface.
* This class is implemented for each of the parallel versions of
* SimulatorImpl to manage communication between process (ranks).
*
* This interface is called through the singleton MpiInterface class.
* MpiInterface has the same API as ParallelCommunicationInterface but
* being a singleton uses static methods to delegate to methods
* defined in classes that implement the
* ParallelCommunicationInterface. For example, SendPacket is likely
* to be specialized for a specific parallel SimulatorImpl.
*/
class ParallelCommunicationInterface
class ParallelCommunicationInterface
{
public:
/**
@@ -52,43 +64,41 @@ public:
*/
virtual ~ParallelCommunicationInterface() {}
/**
* Deletes storage used by the parallel environment.
* \copydoc MpiInterface::Destroy
*/
virtual void Destroy () = 0;
/**
* \return system identification
* \copydoc MpiInterface::GetSystemId
*/
virtual uint32_t GetSystemId () = 0;
/**
* \return number of parallel tasks
* \copydoc MpiInterface::GetSize
*/
virtual uint32_t GetSize () = 0;
/**
* \return true if parallel communication is enabled
* \copydoc MpiInterface::IsEnabled
*/
virtual bool IsEnabled () = 0;
/**
* \param pargc number of command line arguments
* \param pargv command line arguments
*
* Sets up parallel communication interface
* \copydoc MpiInterface::Enable(int* pargc,char*** pargv)
*/
virtual void Enable (int* pargc, char*** pargv) = 0;
/**
* Terminates the parallel environment.
* This function must be called after Destroy ()
* \copydoc MpiInterface::Enable(MPI_Comm communicator)
*/
virtual void Enable (MPI_Comm communicator) = 0;
/**
* \copydoc MpiInterface::Disable
*/
virtual void Disable () = 0;
/**
* \param p packet to send
* \param rxTime received time at destination node
* \param node destination node
* \param dev destination device
*
* Serialize and send a packet to the specified node and net device
* \copydoc MpiInterface::SendPacket
*/
virtual void SendPacket (Ptr<Packet> p, const Time &rxTime, uint32_t node, uint32_t dev) = 0;
/**
* \copydoc MpiInterface::GetCommunicator
*/
virtual MPI_Comm GetCommunicator () = 0;
private:
};

View File

@@ -19,6 +19,12 @@
*
*/
/**
* \file
* \ingroup mpi
* Implementation of class ns3::RemoteChannelBundleManager.
*/
#include "remote-channel-bundle-manager.h"
#include "remote-channel-bundle.h"

View File

@@ -19,18 +19,24 @@
*
*/
/**
* \file
* \ingroup mpi
* Declaration of class ns3::RemoteChannelBundleManager.
*/
#ifndef NS3_REMOTE_CHANNEL_BUNDLE_MANAGER
#define NS3_REMOTE_CHANNEL_BUNDLE_MANAGER
#include <ns3/nstime.h>
#include <ns3/ptr.h>
#include <map>
#include <unordered_map>
namespace ns3 {
class RemoteChannelBundle;
/*
/**
* \ingroup mpi
*
* \brief Singleton for managing the RemoteChannelBundles for each process.
@@ -42,19 +48,26 @@ class RemoteChannelBundleManager
public:
/**
* \return remote channel bundle for specified SystemId.
* Get the bundle corresponding to a remote rank.
*
* \param [in] systemId The remote system id.
* \return The bundle for the specified system id.
*/
static Ptr<RemoteChannelBundle> Find (uint32_t systemId);
/**
* Add RemoteChannelBundle from this task to MPI task on other side of the link.
* Add RemoteChannelBundle from this task to MPI task
* on other side of the link.
* Can not be invoked after InitializeNullMessageEvents has been invoked.
*
* \param [in] systemId The remote system id.
* \return The newly added bundle.
*/
static Ptr<RemoteChannelBundle> Add (uint32_t systemId);
/**
* \return number of remote channel bundles
*
* Get the number of ns-3 channels in this bundle
* \return The number of channels.
*/
static std::size_t Size (void);
@@ -65,13 +78,12 @@ public:
static void InitializeNullMessageEvents (void);
/**
* \return safe time across all remote channels.
* Get the safe time across all channels in this bundle.
* \return The safe time.
*/
static Time GetSafeTime (void);
/**
* Destroy the singleton.
*/
/** Destroy the singleton. */
static void Destroy (void);
private:
@@ -83,19 +95,22 @@ private:
{
}
/**
* Private dtor to prevent destruction outside of singleton pattern.
*/
~RemoteChannelBundleManager ()
{
}
/*
/**
* Container for all remote channel bundles for this task.
*
* Would be more efficient to use unordered_map when C++11 is adopted for NS3.
*/
typedef std::map<uint32_t, Ptr<RemoteChannelBundle> > RemoteChannelMap;
typedef std::unordered_map<uint32_t, Ptr<RemoteChannelBundle> > RemoteChannelMap;
/** The remote channel bundles. */
static RemoteChannelMap g_remoteChannelBundles;
/*
/**
* Protect manager class from being initialized twice or incorrect
* ordering of method calls.
*/

View File

@@ -19,6 +19,12 @@
*
*/
/**
* \file
* \ingroup mpi
* Implementation of class ns3::RemoteChannelBundle.
*/
#include "remote-channel-bundle.h"
#include "null-message-mpi-interface.h"
@@ -28,7 +34,6 @@
namespace ns3 {
#define NS_TIME_INFINITY ns3::Time (0x7fffffffffffffffLL)
TypeId RemoteChannelBundle::GetTypeId (void)
{
@@ -42,14 +47,14 @@ TypeId RemoteChannelBundle::GetTypeId (void)
RemoteChannelBundle::RemoteChannelBundle ()
: m_remoteSystemId (UINT32_MAX),
m_guaranteeTime (0),
m_delay (NS_TIME_INFINITY)
m_delay (Time::Max ())
{
}
RemoteChannelBundle::RemoteChannelBundle (const uint32_t remoteSystemId)
: m_remoteSystemId (remoteSystemId),
m_guaranteeTime (0),
m_delay (NS_TIME_INFINITY)
m_delay (Time::Max ())
{
}
@@ -115,12 +120,10 @@ std::ostream& operator<< (std::ostream& out, ns3::RemoteChannelBundle& bundle )
out << "RemoteChannelBundle Rank = " << bundle.m_remoteSystemId
<< ", GuaranteeTime = " << bundle.m_guaranteeTime
<< ", Delay = " << bundle.m_delay << std::endl;
for (std::map < uint32_t, Ptr < Channel > > ::const_iterator pair = bundle.m_channels.begin ();
pair != bundle.m_channels.end ();
++pair)
for (auto element : bundle.m_channels)
{
out << "\t" << (*pair).second << std::endl;
out << "\t" << element.second << std::endl;
}
return out;

View File

@@ -19,6 +19,12 @@
*
*/
/**
* \file
* \ingroup mpi
* Declaration of class ns3::RemoteChannelBundle.
*/
#ifndef NS3_REMOTE_CHANNEL_BUNDLE
#define NS3_REMOTE_CHANNEL_BUNDLE
@@ -28,14 +34,14 @@
#include <ns3/ptr.h>
#include <ns3/pointer.h>
#include <map>
#include <unordered_map>
namespace ns3 {
/**
* \ingroup mpi
*
* \brief Collection of NS3 channels between local and remote nodes.
* \brief Collection of ns-3 channels between local and remote nodes.
*
* An instance exists for each remote system that the local system is
* in communication with. These are created and managed by the
@@ -45,34 +51,47 @@ namespace ns3 {
class RemoteChannelBundle : public Object
{
public:
/**
* Register this type.
* \return The object TypeId.
*/
static TypeId GetTypeId (void);
/** Default constructor. */
RemoteChannelBundle ();
/**
* Construct and assing system Id.
* \param [in] remoteSystemId The system id.
*/
RemoteChannelBundle (const uint32_t remoteSystemId);
/** Destructor. */
~RemoteChannelBundle ()
{
}
/**
* Add a channel to this bundle.
* \param channel to add to the bundle
* \param delay time for the channel (usually the latency)
*/
void AddChannel (Ptr<Channel> channel, Time delay);
/**
* Get the system Id for this side.
* \return SystemID for remote side of this bundle
*/
uint32_t GetSystemId () const;
/**
* Get the current guarantee time for this bundle.
* \return guarantee time
*/
Time GetGuaranteeTime (void) const;
/**
* \param guarantee time
* \param time The guarantee time.
*
* Set the guarantee time for the bundle. This should be called
* after a packet or Null Message received.
@@ -80,28 +99,33 @@ public:
void SetGuaranteeTime (Time time);
/**
* \return the minimum delay along any channel in this bundle
* Get the minimum delay along any channel in this bundle
* \return The minimum delay.
*/
Time GetDelay (void) const;
/**
* Set the event ID of the Null Message send event current scheduled
* Set the event ID of the Null Message send event currently scheduled
* for this channel.
*
* \param [in] id The null message event id.
*/
void SetEventId (EventId id);
/**
* \return the event ID of the Null Message send event for this bundle
* Get the event ID of the Null Message send event for this bundle
* \return The null message event id.
*/
EventId GetEventId (void) const;
/**
* \return number of NS3 channels in this bundle
* Get the number of ns-3 channels in this bundle
* \return The number of channels.
*/
std::size_t GetSize (void) const;
/**
* \param time
* \param time The delay from now when the null message should be received.
*
* Send Null Message to the remote task associated with this bundle.
* Message will be delivered at current simulation time + the time
@@ -111,37 +135,39 @@ public:
/**
* Output for debugging purposes.
*
* \param [in,out] out The stream.
* \param [in] bundle The bundle to print.
* \return The stream.
*/
friend std::ostream& operator<< (std::ostream& out, ns3::RemoteChannelBundle& bundle );
private:
/*
* Remote rank.
*/
/** Remote rank. */
uint32_t m_remoteSystemId;
/*
* NS3 Channels that are connected from nodes in this MPI task to remote_rank.
*
* Would be more efficient to use unordered_map when C++11 is adopted by NS3.
/**
* Container of channels that are connected from nodes in this MPI task
* to nodes in a remote rank.
*/
std::map < uint32_t, Ptr < Channel > > m_channels;
typedef std::unordered_map < uint32_t, Ptr < Channel > > ChannelMap;
ChannelMap m_channels; /**< ChannelId to Channel map */
/*
/**
* Guarantee time for the incoming Channels from MPI task remote_rank.
* No PacketMessage will ever arrive on any incoming channel in this bundle with a
* ReceiveTime less than this. Initialized to StartTime.
* No PacketMessage will ever arrive on any incoming channel
* in this bundle with a ReceiveTime less than this.
* Initialized to StartTime.
*/
Time m_guaranteeTime;
/*
* Delay for this Channel bundle. min link delay over all incoming channels;
/**
* Delay for this Channel bundle, which is
* the min link delay over all incoming channels;
*/
Time m_delay;
/*
* Event scheduled to send Null Message for this bundle.
*/
/** Event scheduled to send Null Message for this bundle. */
EventId m_nullEventId;
};

View File

@@ -0,0 +1,14 @@
TEST : 00000 :
TEST : 00001 :
TEST : 00002 : Configuration:
TEST : 00003 : Routing: nix-vector
TEST : 00004 : Synchronization: granted time window (YAWNS)
TEST : 00005 : MPI_Init called: explicitly by this program
TEST : 00006 : ns-3 Communicator: MPI_COMM_WORLD (2 ranks)
TEST : 00007 : PCAP tracing: not enabled
TEST : 00008 :
TEST : 00009 : Rank assignments:
TEST : 00010 : ns-3 rank: in MPI_COMM_WORLD: 0:2, in splitComm: 0:2
TEST : 00011 :
TEST : 00012 :
TEST : 00013 : PASSED

View File

@@ -0,0 +1,14 @@
TEST : 00000 :
TEST : 00001 :
TEST : 00002 : Configuration:
TEST : 00003 : Routing: nix-vector
TEST : 00004 : Synchronization: granted time window (YAWNS)
TEST : 00005 : MPI_Init called: implicitly by ns3::MpiInterface::Enable()
TEST : 00006 : ns-3 Communicator: MPI_COMM_WORLD (2 ranks)
TEST : 00007 : PCAP tracing: not enabled
TEST : 00008 :
TEST : 00009 : Rank assignments:
TEST : 00010 : ns-3 rank: in MPI_COMM_WORLD: 0:2, in splitComm: 0:2
TEST : 00011 :
TEST : 00012 :
TEST : 00013 : PASSED

View File

@@ -0,0 +1,12 @@
TEST : 00000 :
TEST : 00001 :
TEST : 00002 : Configuration:
TEST : 00003 : Routing: nix-vector
TEST : 00004 : Synchronization: granted time window (YAWNS)
TEST : 00005 : MPI_Init called: explicitly by this program
TEST : 00006 : ns-3 Communicator: Split [1-2] (out of 3 ranks) from MPI_COMM_WORLD
TEST : 00007 : PCAP tracing: not enabled
TEST : 00008 :
TEST : 00009 : Rank assignments:
TEST : 00010 : Other rank: in MPI_COMM_WORLD: 0:3, in splitComm: 0:1
TEST : 00011 :

View File

@@ -0,0 +1 @@
TEST : 00000 : PASSED

View File

@@ -0,0 +1 @@
TEST : 00000 : PASSED

View File

@@ -0,0 +1 @@
TEST : 00000 : PASSED

View File

@@ -0,0 +1 @@
TEST : 00000 : PASSED

View File

@@ -0,0 +1,20 @@
TEST : 00000 : ==== DARPA NMS CAMPUS NETWORK SIMULATION ====
TEST : 00001 : Number of CNs: 2, LAN nodes: 10
TEST : 00002 : Creating Campus Network 0:
TEST : 00003 : SubNet [ 0 1 2 3 ]
TEST : 00004 : Connecting Subnets...
TEST : 00005 : Assigning IP addresses...
TEST : 00006 : Creating Campus Network 1:
TEST : 00007 : SubNet [ 0 1 2 3 ]
TEST : 00008 : Connecting Subnets...
TEST : 00009 : Assigning IP addresses...
TEST : 00010 : Forming Ring Topology...
TEST : 00011 : Creating UDP Traffic Flows:
TEST : 00012 : Campus Network 0 Flows [ Net2 Net3 ]
TEST : 00013 : Campus Network 1 Flows [ Net2 Net3 ]
TEST : 00014 : Created 308 nodes.
TEST : 00015 : Using Nix-vectors...
TEST : 00016 : Running simulator...
TEST : 00017 : Simulator finished.
TEST : 00018 : PASSED
TEST : 00019 : -----

View File

@@ -0,0 +1 @@
TEST : 00000 : PASSED

View File

@@ -0,0 +1 @@
TEST : 00000 : PASSED

View File

@@ -0,0 +1 @@
TEST : 00000 : PASSED

View File

@@ -0,0 +1,134 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright (c) 2018 Lawrence Livermore National Laboratory
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation;
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Peter D. Barnes, Jr. <pdbarnes@llnl.gov>
*/
#include "ns3/example-as-test.h"
#include <sstream>
using namespace ns3;
/**
* This version of ns3::ExampleTestCase is specialized for MPI
* by accepting the number of ranks as a parameter,
* then building a `--command-template` string which
* invokes `mpiexec` correctly to execute MPI examples.
*/
class MpiTestCase : public ExampleAsTestCase
{
public:
/**
* \copydoc ns3::ExampleAsTestCase::ExampleAsTestCase
*
* \param [in] ranks The number of ranks to use
*/
MpiTestCase (const std::string name,
const std::string program,
const std::string dataDir,
const int ranks,
const std::string args = "");
/** Destructor */
virtual ~MpiTestCase (void) {};
/**
* Produce the `--command-template` argument which will invoke
* `mpiexec` with the requested number of ranks.
*
* \returns The `--command-template` string.
*/
std::string GetCommandTemplate (void) const;
/**
* Sort the output from parallel execution.
* stdout from multiple ranks is not ordered.
*
* \returns Sort command
*/
std::string
GetPostProcessingCommand (void) const;
private:
/** The number of ranks. */
int m_ranks;
};
MpiTestCase::MpiTestCase (const std::string name,
const std::string program,
const std::string dataDir,
const int ranks,
const std::string args /* = "" */)
: ExampleAsTestCase (name, program, dataDir, args),
m_ranks (ranks)
{
}
std::string
MpiTestCase::GetCommandTemplate (void) const
{
std::stringstream ss;
ss << "mpiexec -n " << m_ranks << " %s --test " << m_args;
return ss.str ();
}
std::string
MpiTestCase::GetPostProcessingCommand (void) const
{
std::string command ("| grep TEST | sort ");
return command;
}
/** MPI specialization of ns3::ExampleTestSuite. */
class MpiTestSuite : public TestSuite
{
public:
/**
* \copydoc MpiTestCase::MpiTestCase
*
* \param [in] duration Amount of time this test takes to execute
* (defaults to QUICK).
*/
MpiTestSuite (const std::string name,
const std::string program,
const std::string dataDir,
const int ranks,
const std::string args = "",
const TestDuration duration=QUICK)
: TestSuite (name, EXAMPLE)
{
AddTestCase (new MpiTestCase (name, program, dataDir, ranks, args), duration);
}
}; // class MpiTestSuite
/* Tests using SimpleDistributedSimulatorImpl */
static MpiTestSuite g_mpiNms2 ("mpi-example-nms-2", "nms-p2p-nix-distributed", NS_TEST_SOURCEDIR, 2);
static MpiTestSuite g_mpiComm2 ("mpi-example-comm-2", "simple-distributed-mpi-comm", NS_TEST_SOURCEDIR, 2);
static MpiTestSuite g_mpiComm2comm ("mpi-example-comm-2-init", "simple-distributed-mpi-comm", NS_TEST_SOURCEDIR, 2, "--init");
static MpiTestSuite g_mpiComm3comm ("mpi-example-comm-3-init", "simple-distributed-mpi-comm", NS_TEST_SOURCEDIR, 3, "--init");
static MpiTestSuite g_mpiEmpty2 ("mpi-example-empty-2", "simple-distributed-empty-node", NS_TEST_SOURCEDIR, 2);
static MpiTestSuite g_mpiEmpty3 ("mpi-example-empty-3", "simple-distributed-empty-node", NS_TEST_SOURCEDIR, 3);
static MpiTestSuite g_mpiSimple2 ("mpi-example-simple-2", "simple-distributed", NS_TEST_SOURCEDIR, 2);
static MpiTestSuite g_mpiThird2 ("mpi-example-third-2", "third-distributed", NS_TEST_SOURCEDIR, 2);
/* Tests using NullMessageSimulatorImpl */
static MpiTestSuite g_mpiSimple2NullMsg ("mpi-example-simple-2-nullmsg", "simple-distributed", NS_TEST_SOURCEDIR, 2, "--nullmsg");
static MpiTestSuite g_mpiEmpty2NullMsg ("mpi-example-empty-2-nullmsg", "simple-distributed-empty-node", NS_TEST_SOURCEDIR, 2, "-nullmsg");
static MpiTestSuite g_mpiEmpty3NullMsg ("mpi-example-empty-3-nullmsg", "simple-distributed-empty-node", NS_TEST_SOURCEDIR, 3, "-nullmsg");

View File

@@ -54,12 +54,19 @@ def build(bld):
'model/mpi-interface.cc',
]
# MPI tests are based on examples that are run as tests, only test when examples are built.
if bld.env['ENABLE_EXAMPLES']:
module_test = bld.create_ns3_module_test_library('mpi')
module_test.source = [
'test/mpi-test-suite.cc',
]
headers = bld(features='ns3header')
headers.module = 'mpi'
headers.source = [
'model/mpi-receiver.h',
'model/mpi-interface.h',
'model/parallel-communication-interface.h',
'model/parallel-communication-interface.h',
]
if bld.env['ENABLE_MPI']: