/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as * published by the Free Software Foundation; * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * Author: George Riley */ // This object contains static methods that provide an easy interface // to the necessary MPI information. #include #include #include #include "mpi-interface.h" #include "ns3/node.h" #include "ns3/node-list.h" #include "ns3/point-to-point-net-device.h" #include "ns3/simulator.h" #include "ns3/simulator-impl.h" #include "ns3/nstime.h" #ifdef NS3_MPI #include #endif namespace ns3 { SentBuffer::SentBuffer () { m_buffer = 0; m_request = 0; } SentBuffer::~SentBuffer () { delete [] m_buffer; } uint8_t* SentBuffer::GetBuffer () { return m_buffer; } void SentBuffer::SetBuffer (uint8_t* buffer) { m_buffer = buffer; } #ifdef NS3_MPI MPI_Request* SentBuffer::GetRequest () { return &m_request; } #endif uint32_t MpiInterface::m_sid = 0; uint32_t MpiInterface::m_size = 1; bool MpiInterface::m_initialized = false; bool MpiInterface::m_enabled = false; uint32_t MpiInterface::m_rxCount = 0; uint32_t MpiInterface::m_txCount = 0; std::list MpiInterface::m_pendingTx; #ifdef NS3_MPI MPI_Request* MpiInterface::m_requests; char** MpiInterface::m_pRxBuffers; #endif void MpiInterface::Destroy () { #ifdef NS3_MPI for (uint32_t i = 0; i < GetSize (); ++i) { delete [] m_pRxBuffers[i]; } delete [] m_pRxBuffers; delete [] m_requests; m_pendingTx.clear (); #endif } uint32_t MpiInterface::GetRxCount () { return m_rxCount; } uint32_t MpiInterface::GetTxCount () { return m_txCount; } uint32_t MpiInterface::GetSystemId () { if (!m_initialized) { Simulator::GetImplementation (); m_initialized = true; } return m_sid; } uint32_t MpiInterface::GetSize () { if (!m_initialized) { Simulator::GetImplementation (); m_initialized = true; } return m_size; } bool MpiInterface::IsEnabled () { if (!m_initialized) { Simulator::GetImplementation (); m_initialized = true; } return m_enabled; } void MpiInterface::Enable (int* pargc, char*** pargv) { #ifdef NS3_MPI // Initialize the MPI interface MPI_Init (pargc, pargv); MPI_Barrier (MPI_COMM_WORLD); MPI_Comm_rank (MPI_COMM_WORLD, reinterpret_cast (&m_sid)); MPI_Comm_size (MPI_COMM_WORLD, reinterpret_cast (&m_size)); m_enabled = true; m_initialized = true; // Post a non-blocking receive for all peers m_pRxBuffers = new char*[m_size]; m_requests = new MPI_Request[m_size]; for (uint32_t i = 0; i < GetSize (); ++i) { m_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE]; MPI_Irecv (m_pRxBuffers[i], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &m_requests[i]); } #else NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); #endif } void MpiInterface::SendPacket (Ptr p, const Time& rxTime, uint32_t node, uint32_t dev) { #ifdef NS3_MPI SentBuffer sendBuf; m_pendingTx.push_back (sendBuf); std::list::reverse_iterator i = m_pendingTx.rbegin (); // Points to the last element uint32_t serializedSize = p->GetSerializedSize (); uint8_t* buffer = new uint8_t[serializedSize + 16]; i->SetBuffer (buffer); // Add the time, dest node and dest device uint64_t t = rxTime.GetNanoSeconds (); uint64_t* pTime = reinterpret_cast (buffer); *pTime++ = t; uint32_t* pData = reinterpret_cast (pTime); *pData++ = node; *pData++ = dev; // Serialize the packet p->Serialize (reinterpret_cast (pData), serializedSize); // Find the system id for the destination node Ptr destNode = NodeList::GetNode (node); uint32_t nodeSysId = destNode->GetSystemId (); MPI_Isend (reinterpret_cast (i->GetBuffer ()), serializedSize + 16, MPI_CHAR, nodeSysId, 0, MPI_COMM_WORLD, (i->GetRequest ())); m_txCount++; #else NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); #endif } void MpiInterface::ReceiveMessages () { // Poll the non-block reads to see if data arrived #ifdef NS3_MPI while (true) { int flag = 0; int index = 0; MPI_Status status; MPI_Testany (GetSize (), m_requests, &index, &flag, &status); if (!flag) { break; // No more messages } int count; MPI_Get_count (&status, MPI_CHAR, &count); m_rxCount++; // Count this receive // Get the meta data first uint64_t* pTime = reinterpret_cast (m_pRxBuffers[index]); uint64_t nanoSeconds = *pTime++; uint32_t* pData = reinterpret_cast (pTime); uint32_t node = *pData++; uint32_t dev = *pData++; Time rxTime = NanoSeconds (nanoSeconds); count -= sizeof (nanoSeconds) + sizeof (node) + sizeof (dev); Ptr p = Create (reinterpret_cast (pData), count, true); // Find the correct node/device to schedule receive event Ptr pNode = NodeList::GetNode (node); uint32_t nDevices = pNode->GetNDevices (); Ptr pDev = 0; for (uint32_t i = 0; i < nDevices; ++i) { Ptr pThisDev = pNode->GetDevice (i); if (pThisDev->GetIfIndex () == dev) { pDev = DynamicCast (pThisDev); break; } } NS_ASSERT (pNode && pDev); // Schedule the rx event Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (), &PointToPointNetDevice::Receive, pDev, p); // Re-queue the next read MPI_Irecv (m_pRxBuffers[index], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &m_requests[index]); } #else NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); #endif } void MpiInterface::TestSendComplete () { #ifdef NS3_MPI std::list::iterator i = m_pendingTx.begin (); while (i != m_pendingTx.end ()) { MPI_Status status; int flag = 0; MPI_Test (i->GetRequest (), &flag, &status); std::list::iterator current = i; // Save current for erasing i++; // Advance to next if (flag) { // This message is complete m_pendingTx.erase (current); } } #else NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in"); #endif } } // namespace ns3