flow-monitor: Make flow-monitor thread-safe

This commit is contained in:
F5
2022-10-25 18:47:12 +08:00
parent f657cd0e2a
commit 6764518fff
6 changed files with 78 additions and 0 deletions

View File

@@ -83,6 +83,9 @@ FlowMonitor::FlowMonitor ()
: m_enabled (false) : m_enabled (false)
{ {
NS_LOG_FUNCTION (this); NS_LOG_FUNCTION (this);
#ifdef NS3_MTP
m_lock.store (false, std::memory_order_relaxed);
#endif
} }
void void
@@ -146,6 +149,12 @@ FlowMonitor::ReportFirstTx (Ptr<FlowProbe> probe, uint32_t flowId, uint32_t pack
return; return;
} }
Time now = Simulator::Now (); Time now = Simulator::Now ();
#ifdef NS3_MTP
while (m_lock.exchange (true, std::memory_order_acquire))
;
#endif
TrackedPacket &tracked = m_trackedPackets[std::make_pair (flowId, packetId)]; TrackedPacket &tracked = m_trackedPackets[std::make_pair (flowId, packetId)];
tracked.firstSeenTime = now; tracked.firstSeenTime = now;
tracked.lastSeenTime = tracked.firstSeenTime; tracked.lastSeenTime = tracked.firstSeenTime;
@@ -163,6 +172,10 @@ FlowMonitor::ReportFirstTx (Ptr<FlowProbe> probe, uint32_t flowId, uint32_t pack
stats.timeFirstTxPacket = now; stats.timeFirstTxPacket = now;
} }
stats.timeLastTxPacket = now; stats.timeLastTxPacket = now;
#ifdef NS3_MTP
m_lock.store (false, std::memory_order_release);
#endif
} }
@@ -175,6 +188,12 @@ FlowMonitor::ReportForwarding (Ptr<FlowProbe> probe, uint32_t flowId, uint32_t p
NS_LOG_DEBUG ("FlowMonitor not enabled; returning"); NS_LOG_DEBUG ("FlowMonitor not enabled; returning");
return; return;
} }
#ifdef NS3_MTP
while (m_lock.exchange (true, std::memory_order_acquire))
;
#endif
std::pair<FlowId, FlowPacketId> key (flowId, packetId); std::pair<FlowId, FlowPacketId> key (flowId, packetId);
TrackedPacketMap::iterator tracked = m_trackedPackets.find (key); TrackedPacketMap::iterator tracked = m_trackedPackets.find (key);
if (tracked == m_trackedPackets.end ()) if (tracked == m_trackedPackets.end ())
@@ -189,6 +208,10 @@ FlowMonitor::ReportForwarding (Ptr<FlowProbe> probe, uint32_t flowId, uint32_t p
Time delay = (Simulator::Now () - tracked->second.firstSeenTime); Time delay = (Simulator::Now () - tracked->second.firstSeenTime);
probe->AddPacketStats (flowId, packetSize, delay); probe->AddPacketStats (flowId, packetSize, delay);
#ifdef NS3_MTP
m_lock.store (false, std::memory_order_release);
#endif
} }
@@ -201,6 +224,12 @@ FlowMonitor::ReportLastRx (Ptr<FlowProbe> probe, uint32_t flowId, uint32_t packe
NS_LOG_DEBUG ("FlowMonitor not enabled; returning"); NS_LOG_DEBUG ("FlowMonitor not enabled; returning");
return; return;
} }
#ifdef NS3_MTP
while (m_lock.exchange (true, std::memory_order_acquire))
;
#endif
TrackedPacketMap::iterator tracked = m_trackedPackets.find (std::make_pair (flowId, packetId)); TrackedPacketMap::iterator tracked = m_trackedPackets.find (std::make_pair (flowId, packetId));
if (tracked == m_trackedPackets.end ()) if (tracked == m_trackedPackets.end ())
{ {
@@ -255,6 +284,10 @@ FlowMonitor::ReportLastRx (Ptr<FlowProbe> probe, uint32_t flowId, uint32_t packe
<< flowId << ", packetId=" << packetId << ")."); << flowId << ", packetId=" << packetId << ").");
m_trackedPackets.erase (tracked); // we don't need to track this packet anymore m_trackedPackets.erase (tracked); // we don't need to track this packet anymore
#ifdef NS3_MTP
m_lock.store (false, std::memory_order_release);
#endif
} }
void void
@@ -268,6 +301,11 @@ FlowMonitor::ReportDrop (Ptr<FlowProbe> probe, uint32_t flowId, uint32_t packetI
return; return;
} }
#ifdef NS3_MTP
while (m_lock.exchange (true, std::memory_order_acquire))
;
#endif
probe->AddPacketDropStats (flowId, packetSize, reasonCode); probe->AddPacketDropStats (flowId, packetSize, reasonCode);
FlowStats &stats = GetStatsForFlow (flowId); FlowStats &stats = GetStatsForFlow (flowId);
@@ -290,6 +328,10 @@ FlowMonitor::ReportDrop (Ptr<FlowProbe> probe, uint32_t flowId, uint32_t packetI
<< flowId << ", packetId=" << packetId << ")."); << flowId << ", packetId=" << packetId << ").");
m_trackedPackets.erase (tracked); m_trackedPackets.erase (tracked);
} }
#ifdef NS3_MTP
m_lock.store (false, std::memory_order_release);
#endif
} }
const FlowMonitor::FlowStatsContainer& const FlowMonitor::FlowStatsContainer&

View File

@@ -23,6 +23,7 @@
#include <vector> #include <vector>
#include <map> #include <map>
#include <atomic>
#include "ns3/ptr.h" #include "ns3/ptr.h"
#include "ns3/object.h" #include "ns3/object.h"
@@ -294,6 +295,9 @@ private:
double m_packetSizeBinWidth; //!< packet size bin width (for histograms) double m_packetSizeBinWidth; //!< packet size bin width (for histograms)
double m_flowInterruptionsBinWidth; //!< Flow interruptions bin width (for histograms) double m_flowInterruptionsBinWidth; //!< Flow interruptions bin width (for histograms)
Time m_flowInterruptionsMinTime; //!< Flow interruptions minimum time Time m_flowInterruptionsMinTime; //!< Flow interruptions minimum time
#ifdef NS3_MTP
std::atomic<bool> m_lock;
#endif
/// Get the stats for a given flow /// Get the stats for a given flow
/// \param flowId the Flow identification /// \param flowId the Flow identification

View File

@@ -98,6 +98,9 @@ bool operator == (const Ipv4FlowClassifier::FiveTuple &t1,
Ipv4FlowClassifier::Ipv4FlowClassifier () Ipv4FlowClassifier::Ipv4FlowClassifier ()
{ {
#ifdef NS3_MTP
m_lock.store (false, std::memory_order_relaxed);
#endif
} }
bool bool
@@ -147,6 +150,11 @@ Ipv4FlowClassifier::Classify (const Ipv4Header &ipHeader, Ptr<const Packet> ipPa
tuple.sourcePort = srcPort; tuple.sourcePort = srcPort;
tuple.destinationPort = dstPort; tuple.destinationPort = dstPort;
#ifdef NS3_MTP
while (m_lock.exchange (true, std::memory_order_acquire))
;
#endif
// try to insert the tuple, but check if it already exists // try to insert the tuple, but check if it already exists
std::pair<std::map<FiveTuple, FlowId>::iterator, bool> insert std::pair<std::map<FiveTuple, FlowId>::iterator, bool> insert
= m_flowMap.insert (std::pair<FiveTuple, FlowId> (tuple, 0)); = m_flowMap.insert (std::pair<FiveTuple, FlowId> (tuple, 0));
@@ -178,6 +186,10 @@ Ipv4FlowClassifier::Classify (const Ipv4Header &ipHeader, Ptr<const Packet> ipPa
*out_flowId = insert.first->second; *out_flowId = insert.first->second;
*out_packetId = m_flowPktIdMap[*out_flowId]; *out_packetId = m_flowPktIdMap[*out_flowId];
#ifdef NS3_MTP
m_lock.store (false, std::memory_order_release);
#endif
return true; return true;
} }

View File

@@ -23,6 +23,7 @@
#include <stdint.h> #include <stdint.h>
#include <map> #include <map>
#include <atomic>
#include "ns3/ipv4-header.h" #include "ns3/ipv4-header.h"
#include "ns3/flow-classifier.h" #include "ns3/flow-classifier.h"
@@ -99,6 +100,9 @@ private:
/// Map FlowIds to (DSCP value, packet count) pairs /// Map FlowIds to (DSCP value, packet count) pairs
std::map<FlowId, std::map<Ipv4Header::DscpType, uint32_t> > m_flowDscpMap; std::map<FlowId, std::map<Ipv4Header::DscpType, uint32_t> > m_flowDscpMap;
#ifdef NS3_MTP
std::atomic<bool> m_lock;
#endif
}; };
/** /**

View File

@@ -99,6 +99,9 @@ bool operator == (const Ipv6FlowClassifier::FiveTuple &t1,
Ipv6FlowClassifier::Ipv6FlowClassifier () Ipv6FlowClassifier::Ipv6FlowClassifier ()
{ {
#ifdef NS3_MTP
m_lock.store (false, std::memory_order_relaxed);
#endif
} }
bool bool
@@ -148,6 +151,11 @@ Ipv6FlowClassifier::Classify (const Ipv6Header &ipHeader, Ptr<const Packet> ipPa
tuple.sourcePort = srcPort; tuple.sourcePort = srcPort;
tuple.destinationPort = dstPort; tuple.destinationPort = dstPort;
#ifdef NS3_MTP
while (m_lock.exchange (true, std::memory_order_acquire))
;
#endif
// try to insert the tuple, but check if it already exists // try to insert the tuple, but check if it already exists
std::pair<std::map<FiveTuple, FlowId>::iterator, bool> insert std::pair<std::map<FiveTuple, FlowId>::iterator, bool> insert
= m_flowMap.insert (std::pair<FiveTuple, FlowId> (tuple, 0)); = m_flowMap.insert (std::pair<FiveTuple, FlowId> (tuple, 0));
@@ -179,6 +187,10 @@ Ipv6FlowClassifier::Classify (const Ipv6Header &ipHeader, Ptr<const Packet> ipPa
*out_flowId = insert.first->second; *out_flowId = insert.first->second;
*out_packetId = m_flowPktIdMap[*out_flowId]; *out_packetId = m_flowPktIdMap[*out_flowId];
#ifdef NS3_MTP
m_lock.store (false, std::memory_order_release);
#endif
return true; return true;
} }

View File

@@ -24,6 +24,7 @@
#include <stdint.h> #include <stdint.h>
#include <map> #include <map>
#include <atomic>
#include "ns3/ipv6-header.h" #include "ns3/ipv6-header.h"
#include "ns3/flow-classifier.h" #include "ns3/flow-classifier.h"
@@ -100,6 +101,9 @@ private:
/// Map FlowIds to (DSCP value, packet count) pairs /// Map FlowIds to (DSCP value, packet count) pairs
std::map<FlowId, std::map<Ipv6Header::DscpType, uint32_t> > m_flowDscpMap; std::map<FlowId, std::map<Ipv6Header::DscpType, uint32_t> > m_flowDscpMap;
#ifdef NS3_MTP
std::atomic<bool> m_lock;
#endif
}; };
/** /**