From efe4c2ed68beecd88b13bf057fe9a9ce246f43ea Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Thu, 22 Mar 2012 18:05:13 +0100 Subject: [PATCH] Finally enable a thread-safe ScheduleWithContext --- src/core/model/default-simulator-impl.cc | 79 ++++++- src/core/model/default-simulator-impl.h | 18 +- src/core/model/realtime-simulator-impl.cc | 23 +- src/core/model/realtime-simulator-impl.h | 3 + src/core/model/simulator.h | 25 ++ src/core/test/threaded-test-suite.cc | 266 ++++++++++++++++++++++ src/core/wscript | 1 + 7 files changed, 405 insertions(+), 10 deletions(-) create mode 100644 src/core/test/threaded-test-suite.cc diff --git a/src/core/model/default-simulator-impl.cc b/src/core/model/default-simulator-impl.cc index 144648648..512e19130 100644 --- a/src/core/model/default-simulator-impl.cc +++ b/src/core/model/default-simulator-impl.cc @@ -59,6 +59,8 @@ DefaultSimulatorImpl::DefaultSimulatorImpl () m_currentTs = 0; m_currentContext = 0xffffffff; m_unscheduledEvents = 0; + m_eventsWithContextEmpty = true; + m_main = SystemThread::Self(); } DefaultSimulatorImpl::~DefaultSimulatorImpl () @@ -128,6 +130,8 @@ DefaultSimulatorImpl::ProcessOneEvent (void) m_currentUid = next.key.m_uid; next.impl->Invoke (); next.impl->Unref (); + + ProcessEventsWithContext (); } bool @@ -150,10 +154,44 @@ DefaultSimulatorImpl::Next (void) const return TimeStep (NextTs ()); } +void +DefaultSimulatorImpl::ProcessEventsWithContext (void) +{ + if (m_eventsWithContextEmpty) + { + return; + } + + // swap queues + EventsWithContext eventsWithContext; + { + CriticalSection cs (m_eventsWithContextMutex); + m_eventsWithContext.swap(eventsWithContext); + m_eventsWithContextEmpty = true; + } + while (!eventsWithContext.empty ()) + { + EventWithContext event = eventsWithContext.front (); + eventsWithContext.pop_front (); + Scheduler::Event ev; + ev.impl = event.event; + ev.key.m_ts = m_currentTs + event.timestamp; + ev.key.m_context = event.context; + ev.key.m_uid = m_uid; + m_uid++; + m_unscheduledEvents++; + m_events->Insert (ev); + } +} + void DefaultSimulatorImpl::Run (void) { + // Set the current threadId as the main threadId + m_main = SystemThread::Self(); + ProcessEventsWithContext (); m_stop = false; + while (!m_events->IsEmpty () && !m_stop) { ProcessOneEvent (); @@ -167,6 +205,9 @@ DefaultSimulatorImpl::Run (void) void DefaultSimulatorImpl::RunOneEvent (void) { + // Set the current threadId as the main threadId + m_main = SystemThread::Self(); + ProcessEventsWithContext (); ProcessOneEvent (); } @@ -188,6 +229,8 @@ DefaultSimulatorImpl::Stop (Time const &time) EventId DefaultSimulatorImpl::Schedule (Time const &time, EventImpl *event) { + NS_ASSERT_MSG (SystemThread::Equals (m_main), "Simulator::Schedule Thread-unsafe invocation!"); + Time tAbsolute = time + TimeStep (m_currentTs); NS_ASSERT (tAbsolute.IsPositive ()); @@ -208,19 +251,37 @@ DefaultSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &time, E { NS_LOG_FUNCTION (this << context << time.GetTimeStep () << m_currentTs << event); - Scheduler::Event ev; - ev.impl = event; - ev.key.m_ts = m_currentTs + time.GetTimeStep (); - ev.key.m_context = context; - ev.key.m_uid = m_uid; - m_uid++; - m_unscheduledEvents++; - m_events->Insert (ev); + if (SystemThread::Equals (m_main)) + { + Time tAbsolute = time + TimeStep (m_currentTs); + Scheduler::Event ev; + ev.impl = event; + ev.key.m_ts = (uint64_t) tAbsolute.GetTimeStep (); + ev.key.m_context = context; + ev.key.m_uid = m_uid; + m_uid++; + m_unscheduledEvents++; + m_events->Insert (ev); + } + else + { + EventWithContext ev; + ev.context = context; + ev.timestamp = time.GetTimeStep (); + ev.event = event; + { + CriticalSection cs (m_eventsWithContextMutex); + m_eventsWithContext.push_back(ev); + m_eventsWithContextEmpty = false; + } + } } EventId DefaultSimulatorImpl::ScheduleNow (EventImpl *event) { + NS_ASSERT_MSG (SystemThread::Equals (m_main), "Simulator::ScheduleNow Thread-unsafe invocation!"); + Scheduler::Event ev; ev.impl = event; ev.key.m_ts = m_currentTs; @@ -235,6 +296,8 @@ DefaultSimulatorImpl::ScheduleNow (EventImpl *event) EventId DefaultSimulatorImpl::ScheduleDestroy (EventImpl *event) { + NS_ASSERT_MSG (SystemThread::Equals (m_main), "Simulator::ScheduleDestroy Thread-unsafe invocation!"); + EventId id (Ptr (event, false), m_currentTs, 0xffffffff, 2); m_destroyEvents.push_back (id); m_uid++; diff --git a/src/core/model/default-simulator-impl.h b/src/core/model/default-simulator-impl.h index f2f1a892d..fa999e051 100644 --- a/src/core/model/default-simulator-impl.h +++ b/src/core/model/default-simulator-impl.h @@ -24,6 +24,8 @@ #include "simulator-impl.h" #include "scheduler.h" #include "event-impl.h" +#include "system-thread.h" +#include "ns3/system-mutex.h" #include "ptr.h" @@ -64,11 +66,23 @@ private: virtual void DoDispose (void); void ProcessOneEvent (void); uint64_t NextTs (void) const; - typedef std::list DestroyEvents; + void ProcessEventsWithContext (void); + + struct EventWithContext { + uint32_t context; + uint64_t timestamp; + EventImpl *event; + }; + typedef std::list EventsWithContext; + EventsWithContext m_eventsWithContext; + bool m_eventsWithContextEmpty; + SystemMutex m_eventsWithContextMutex; + typedef std::list DestroyEvents; DestroyEvents m_destroyEvents; bool m_stop; Ptr m_events; + uint32_t m_uid; uint32_t m_currentUid; uint64_t m_currentTs; @@ -76,6 +90,8 @@ private: // number of events that have been inserted but not yet scheduled, // not counting the "destroy" events; this is used for validation int m_unscheduledEvents; + + SystemThread::ThreadId m_main; }; } // namespace ns3 diff --git a/src/core/model/realtime-simulator-impl.cc b/src/core/model/realtime-simulator-impl.cc index d76564cd9..b633925d9 100644 --- a/src/core/model/realtime-simulator-impl.cc +++ b/src/core/model/realtime-simulator-impl.cc @@ -80,6 +80,8 @@ RealtimeSimulatorImpl::RealtimeSimulatorImpl () m_currentContext = 0xffffffff; m_unscheduledEvents = 0; + m_main = SystemThread::Self(); + // Be very careful not to do anything that would cause a change or assignment // of the underlying reference counts of m_synchronizer or you will be sorry. m_synchronizer = CreateObject (); @@ -424,6 +426,9 @@ RealtimeSimulatorImpl::Run (void) NS_ASSERT_MSG (m_running == false, "RealtimeSimulatorImpl::Run(): Simulator already running"); + // Set the current threadId as the main threadId + m_main = SystemThread::Self(); + m_stop = false; m_running = true; m_synchronizer->SetOrigin (m_currentTs); @@ -510,6 +515,9 @@ RealtimeSimulatorImpl::RunOneEvent (void) // { CriticalSection cs (m_mutex); + + // Set the current threadId as the main threadId + m_main = SystemThread::Self(); Scheduler::Event next = m_events->RemoveNext (); @@ -581,7 +589,20 @@ RealtimeSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &time, CriticalSection cs (m_mutex); uint64_t ts; - ts = m_currentTs + time.GetTimeStep (); + if (SystemThread::Equals (m_main)) + { + ts = m_currentTs + time.GetTimeStep (); + } + else + { + // + // If the simulator is running, we're pacing and have a meaningful + // realtime clock. If we're not, then m_currentTs is where we stopped. + // + ts = m_running ? m_synchronizer->GetCurrentRealtime () : m_currentTs; + ts += time.GetTimeStep (); + } + NS_ASSERT_MSG (ts >= m_currentTs, "RealtimeSimulatorImpl::ScheduleRealtime(): schedule for time < m_currentTs"); Scheduler::Event ev; ev.impl = impl; diff --git a/src/core/model/realtime-simulator-impl.h b/src/core/model/realtime-simulator-impl.h index c5a0955d7..e5aa93ce4 100644 --- a/src/core/model/realtime-simulator-impl.h +++ b/src/core/model/realtime-simulator-impl.h @@ -20,6 +20,7 @@ #define REALTIME_SIMULATOR_IMPL_H #include "simulator-impl.h" +#include "system-thread.h" #include "scheduler.h" #include "synchronizer.h" @@ -118,6 +119,8 @@ private: * The maximum allowable drift from real-time in SYNC_HARD_LIMIT mode. */ Time m_hardLimit; + + SystemThread::ThreadId m_main; }; } // namespace ns3 diff --git a/src/core/model/simulator.h b/src/core/model/simulator.h index 15803b917..f143217cf 100644 --- a/src/core/model/simulator.h +++ b/src/core/model/simulator.h @@ -286,6 +286,7 @@ public: /** * Schedule an event with the given context. * A context of 0xffffffff means no context is specified. + * This method is thread-safe: it can be called from any thread. * * @param time the relative expiration time of the event. * @param context user-specified context parameter @@ -296,6 +297,8 @@ public: static void ScheduleWithContext (uint32_t context, Time const &time, MEM mem_ptr, OBJ obj); /** + * This method is thread-safe: it can be called from any thread. + * * @param time the relative expiration time of the event. * @param context user-specified context parameter * @param mem_ptr member method pointer to invoke @@ -306,6 +309,8 @@ public: static void ScheduleWithContext (uint32_t context, Time const &time, MEM mem_ptr, OBJ obj, T1 a1); /** + * This method is thread-safe: it can be called from any thread. + * * @param time the relative expiration time of the event. * @param context user-specified context parameter * @param mem_ptr member method pointer to invoke @@ -317,6 +322,8 @@ public: static void ScheduleWithContext (uint32_t context, Time const &time, MEM mem_ptr, OBJ obj, T1 a1, T2 a2); /** + * This method is thread-safe: it can be called from any thread. + * * @param time the relative expiration time of the event. * @param context user-specified context parameter * @param mem_ptr member method pointer to invoke @@ -330,6 +337,8 @@ public: static void ScheduleWithContext (uint32_t context, Time const &time, MEM mem_ptr, OBJ obj, T1 a1, T2 a2, T3 a3); /** + * This method is thread-safe: it can be called from any thread. + * * @param time the relative expiration time of the event. * @param context user-specified context parameter * @param mem_ptr member method pointer to invoke @@ -344,6 +353,8 @@ public: static void ScheduleWithContext (uint32_t context, Time const &time, MEM mem_ptr, OBJ obj, T1 a1, T2 a2, T3 a3, T4 a4); /** + * This method is thread-safe: it can be called from any thread. + * * @param time the relative expiration time of the event. * @param context user-specified context parameter * @param mem_ptr member method pointer to invoke @@ -359,6 +370,8 @@ public: static void ScheduleWithContext (uint32_t context, Time const &time, MEM mem_ptr, OBJ obj, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5); /** + * This method is thread-safe: it can be called from any thread. + * * @param time the relative expiration time of the event. * @param context user-specified context parameter * @param f the function to invoke @@ -366,6 +379,8 @@ public: static void ScheduleWithContext (uint32_t context, Time const &time, void (*f)(void)); /** + * This method is thread-safe: it can be called from any thread. + * * @param time the relative expiration time of the event. * @param context user-specified context parameter * @param f the function to invoke @@ -375,6 +390,8 @@ public: static void ScheduleWithContext (uint32_t context, Time const &time, void (*f)(U1), T1 a1); /** + * This method is thread-safe: it can be called from any thread. + * * @param time the relative expiration time of the event. * @param context user-specified context parameter * @param f the function to invoke @@ -385,6 +402,8 @@ public: static void ScheduleWithContext (uint32_t context, Time const &time, void (*f)(U1,U2), T1 a1, T2 a2); /** + * This method is thread-safe: it can be called from any thread. + * * @param time the relative expiration time of the event. * @param context user-specified context parameter * @param f the function to invoke @@ -396,6 +415,8 @@ public: static void ScheduleWithContext (uint32_t context, Time const &time, void (*f)(U1,U2,U3), T1 a1, T2 a2, T3 a3); /** + * This method is thread-safe: it can be called from any thread. + * * @param time the relative expiration time of the event. * @param context user-specified context parameter * @param f the function to invoke @@ -409,6 +430,8 @@ public: static void ScheduleWithContext (uint32_t context, Time const &time, void (*f)(U1,U2,U3,U4), T1 a1, T2 a2, T3 a3, T4 a4); /** + * This method is thread-safe: it can be called from any thread. + * * @param time the relative expiration time of the event. * @param context user-specified context parameter * @param f the function to invoke @@ -743,6 +766,8 @@ public: static EventId Schedule (Time const &time, const Ptr &event); /** + * This method is thread-safe: it can be called from any thread. + * * \param time delay until the event expires * \param context event context * \param event the event to schedule diff --git a/src/core/test/threaded-test-suite.cc b/src/core/test/threaded-test-suite.cc new file mode 100644 index 000000000..647193b78 --- /dev/null +++ b/src/core/test/threaded-test-suite.cc @@ -0,0 +1,266 @@ +/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ +/* + * Copyright (c) 2011 INRIA + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation; + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Claudio Freire + */ +#include "ns3/test.h" +#include "ns3/simulator.h" +#include "ns3/list-scheduler.h" +#include "ns3/heap-scheduler.h" +#include "ns3/map-scheduler.h" +#include "ns3/calendar-scheduler.h" +#include "ns3/config.h" +#include "ns3/string.h" +#include "ns3/system-thread.h" + +#include +#include +#include + +namespace ns3 { + +#define MAXTHREADS 64 + +class ThreadedSimulatorEventsTestCase : public TestCase +{ +public: + ThreadedSimulatorEventsTestCase (ObjectFactory schedulerFactory, const std::string &simulatorType, unsigned int threads); + void A (int a); + void B (int b); + void C (int c); + void D (int d); + void DoNothing (unsigned int threadno); + static void SchedulingThread (std::pair context); + void End (void); + uint64_t m_b; + uint64_t m_a; + uint64_t m_c; + uint64_t m_d; + unsigned int m_threads; + bool m_threadWaiting[MAXTHREADS]; + bool m_stop; + ObjectFactory m_schedulerFactory; + std::string m_simulatorType; + std::string m_error; + std::list > m_threadlist; + +private: + virtual void DoSetup (void); + virtual void DoRun (void); + virtual void DoTeardown (void); +}; + +ThreadedSimulatorEventsTestCase::ThreadedSimulatorEventsTestCase (ObjectFactory schedulerFactory, const std::string &simulatorType, unsigned int threads) + : TestCase ("Check that threaded event handling is working with " + + schedulerFactory.GetTypeId ().GetName () + " in " + simulatorType), + m_threads (threads), + m_schedulerFactory (schedulerFactory), + m_simulatorType (simulatorType) +{ +} + +void +ThreadedSimulatorEventsTestCase::End (void) +{ + m_stop = true; + for (std::list >::iterator it2 = m_threadlist.begin(); it2 != m_threadlist.end(); ++it2) + { + (*it2)->Join(); + } +} +void +ThreadedSimulatorEventsTestCase::SchedulingThread (std::pair context) +{ + ThreadedSimulatorEventsTestCase *me = context.first; + unsigned int threadno = context.second; + + while (!me->m_stop) + { + me->m_threadWaiting[threadno] = true; + Simulator::ScheduleWithContext ( + uint32_t (-1), + MicroSeconds (1), + MakeEvent (&ThreadedSimulatorEventsTestCase::DoNothing, me, threadno)); + while (!me->m_stop && me->m_threadWaiting[threadno]) + { + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 500; + nanosleep(&ts, NULL); + } + } +} +void +ThreadedSimulatorEventsTestCase::DoNothing (unsigned int threadno) +{ + if (!m_error.empty()) + { + m_error = "Bad threaded scheduling"; + } + m_threadWaiting[threadno] = false; +} +void +ThreadedSimulatorEventsTestCase::A (int a) +{ + if (m_a != m_b || m_a != m_c || m_a != m_d) + { + m_error = "Bad scheduling"; + Simulator::Stop(); + }; + ++m_a; + Simulator::Schedule ( + MicroSeconds (10), + MakeEvent (&ThreadedSimulatorEventsTestCase::B, this, a+1)); +} + +void +ThreadedSimulatorEventsTestCase::B (int b) +{ + if (m_a != (m_b+1) || m_a != (m_c+1) || m_a != (m_d+1)) + { + m_error = "Bad scheduling"; + Simulator::Stop(); + }; + ++m_b; + Simulator::Schedule ( + MicroSeconds (10), + MakeEvent (&ThreadedSimulatorEventsTestCase::C, this, b+1)); +} + +void +ThreadedSimulatorEventsTestCase::C (int c) +{ + if (m_a != m_b || m_a != (m_c+1) || m_a != (m_d+1)) + { + m_error = "Bad scheduling"; + Simulator::Stop(); + }; + ++m_c; + Simulator::Schedule ( + MicroSeconds (10), + MakeEvent (&ThreadedSimulatorEventsTestCase::D, this, c+1)); +} + +void +ThreadedSimulatorEventsTestCase::D (int d) +{ + if (m_a != m_b || m_a != m_c || m_a != (m_d+1)) + { + m_error = "Bad scheduling"; + Simulator::Stop(); + }; + ++m_d; + if (m_stop) + { + Simulator::Stop(); + } + else + { + Simulator::Schedule ( + MicroSeconds (10), + MakeEvent (&ThreadedSimulatorEventsTestCase::A, this, d+1)); + } +} + +void +ThreadedSimulatorEventsTestCase::DoSetup (void) +{ + if (!m_simulatorType.empty()) + { + Config::SetGlobal ("SimulatorImplementationType", StringValue (m_simulatorType)); + } + + m_error = ""; + + m_a = + m_b = + m_c = + m_d = 0; + + for (unsigned int i=0; i < m_threads; ++i) + { + m_threadlist.push_back( + Create (MakeBoundCallback ( + &ThreadedSimulatorEventsTestCase::SchedulingThread, + std::pair(this,1) )) ); + } +} +void +ThreadedSimulatorEventsTestCase::DoTeardown (void) +{ + m_threadlist.clear(); + + Config::SetGlobal ("SimulatorImplementationType", StringValue ("ns3::DefaultSimulatorImpl")); +} +void +ThreadedSimulatorEventsTestCase::DoRun (void) +{ + Simulator::SetScheduler (m_schedulerFactory); + + Simulator::Schedule (MicroSeconds (10), &ThreadedSimulatorEventsTestCase::A, this, 1); + Simulator::Schedule (Seconds (1), &ThreadedSimulatorEventsTestCase::End, this); + + + for (std::list >::iterator it = m_threadlist.begin(); it != m_threadlist.end(); ++it) + { + (*it)->Start(); + } + + Simulator::Run (); + Simulator::Destroy (); + + m_stop = true; + + NS_TEST_EXPECT_MSG_EQ (m_error.empty(), true, m_error.c_str()); + NS_TEST_EXPECT_MSG_EQ (m_a, m_b, "Bad scheduling"); + NS_TEST_EXPECT_MSG_EQ (m_a, m_c, "Bad scheduling"); + NS_TEST_EXPECT_MSG_EQ (m_a, m_d, "Bad scheduling"); +} + +class ThreadedSimulatorTestSuite : public TestSuite +{ +public: + ThreadedSimulatorTestSuite () + : TestSuite ("threaded-simulator") + { + std::string simulatorTypes[] = { + "", + "ns3::RealtimeSimulatorImpl" + }; + + unsigned int threadcounts[] = {2, 10, 20}; + + ObjectFactory factory; + + for (unsigned int i=0; i < (sizeof(simulatorTypes) / sizeof(simulatorTypes[0])); ++i) + { + for (unsigned int j=0; j < (sizeof(threadcounts) / sizeof(threadcounts[0])); ++j) + { + factory.SetTypeId (ListScheduler::GetTypeId ()); + AddTestCase (new ThreadedSimulatorEventsTestCase (factory, simulatorTypes[i], threadcounts[j])); + factory.SetTypeId (MapScheduler::GetTypeId ()); + AddTestCase (new ThreadedSimulatorEventsTestCase (factory, simulatorTypes[i], threadcounts[j])); + factory.SetTypeId (HeapScheduler::GetTypeId ()); + AddTestCase (new ThreadedSimulatorEventsTestCase (factory, simulatorTypes[i], threadcounts[j])); + factory.SetTypeId (CalendarScheduler::GetTypeId ()); + AddTestCase (new ThreadedSimulatorEventsTestCase (factory, simulatorTypes[i], threadcounts[j])); + } + } + } +} g_threadedSimulatorTestSuite; + +} // namespace ns3 diff --git a/src/core/wscript b/src/core/wscript index db7fb389c..446cf4f2c 100644 --- a/src/core/wscript +++ b/src/core/wscript @@ -168,6 +168,7 @@ def build(bld): 'test/traced-callback-test-suite.cc', 'test/type-traits-test-suite.cc', 'test/watchdog-test-suite.cc', + 'test/threaded-test-suite.cc', ] headers = bld.new_task_gen(features=['ns3header'])