Finally enable a thread-safe ScheduleWithContext

This commit is contained in:
Alina Quereilhac
2012-03-22 18:05:13 +01:00
parent ef36bcce9e
commit efe4c2ed68
7 changed files with 405 additions and 10 deletions

View File

@@ -59,6 +59,8 @@ DefaultSimulatorImpl::DefaultSimulatorImpl ()
m_currentTs = 0;
m_currentContext = 0xffffffff;
m_unscheduledEvents = 0;
m_eventsWithContextEmpty = true;
m_main = SystemThread::Self();
}
DefaultSimulatorImpl::~DefaultSimulatorImpl ()
@@ -128,6 +130,8 @@ DefaultSimulatorImpl::ProcessOneEvent (void)
m_currentUid = next.key.m_uid;
next.impl->Invoke ();
next.impl->Unref ();
ProcessEventsWithContext ();
}
bool
@@ -150,10 +154,44 @@ DefaultSimulatorImpl::Next (void) const
return TimeStep (NextTs ());
}
void
DefaultSimulatorImpl::ProcessEventsWithContext (void)
{
if (m_eventsWithContextEmpty)
{
return;
}
// swap queues
EventsWithContext eventsWithContext;
{
CriticalSection cs (m_eventsWithContextMutex);
m_eventsWithContext.swap(eventsWithContext);
m_eventsWithContextEmpty = true;
}
while (!eventsWithContext.empty ())
{
EventWithContext event = eventsWithContext.front ();
eventsWithContext.pop_front ();
Scheduler::Event ev;
ev.impl = event.event;
ev.key.m_ts = m_currentTs + event.timestamp;
ev.key.m_context = event.context;
ev.key.m_uid = m_uid;
m_uid++;
m_unscheduledEvents++;
m_events->Insert (ev);
}
}
void
DefaultSimulatorImpl::Run (void)
{
// Set the current threadId as the main threadId
m_main = SystemThread::Self();
ProcessEventsWithContext ();
m_stop = false;
while (!m_events->IsEmpty () && !m_stop)
{
ProcessOneEvent ();
@@ -167,6 +205,9 @@ DefaultSimulatorImpl::Run (void)
void
DefaultSimulatorImpl::RunOneEvent (void)
{
// Set the current threadId as the main threadId
m_main = SystemThread::Self();
ProcessEventsWithContext ();
ProcessOneEvent ();
}
@@ -188,6 +229,8 @@ DefaultSimulatorImpl::Stop (Time const &time)
EventId
DefaultSimulatorImpl::Schedule (Time const &time, EventImpl *event)
{
NS_ASSERT_MSG (SystemThread::Equals (m_main), "Simulator::Schedule Thread-unsafe invocation!");
Time tAbsolute = time + TimeStep (m_currentTs);
NS_ASSERT (tAbsolute.IsPositive ());
@@ -208,19 +251,37 @@ DefaultSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &time, E
{
NS_LOG_FUNCTION (this << context << time.GetTimeStep () << m_currentTs << event);
Scheduler::Event ev;
ev.impl = event;
ev.key.m_ts = m_currentTs + time.GetTimeStep ();
ev.key.m_context = context;
ev.key.m_uid = m_uid;
m_uid++;
m_unscheduledEvents++;
m_events->Insert (ev);
if (SystemThread::Equals (m_main))
{
Time tAbsolute = time + TimeStep (m_currentTs);
Scheduler::Event ev;
ev.impl = event;
ev.key.m_ts = (uint64_t) tAbsolute.GetTimeStep ();
ev.key.m_context = context;
ev.key.m_uid = m_uid;
m_uid++;
m_unscheduledEvents++;
m_events->Insert (ev);
}
else
{
EventWithContext ev;
ev.context = context;
ev.timestamp = time.GetTimeStep ();
ev.event = event;
{
CriticalSection cs (m_eventsWithContextMutex);
m_eventsWithContext.push_back(ev);
m_eventsWithContextEmpty = false;
}
}
}
EventId
DefaultSimulatorImpl::ScheduleNow (EventImpl *event)
{
NS_ASSERT_MSG (SystemThread::Equals (m_main), "Simulator::ScheduleNow Thread-unsafe invocation!");
Scheduler::Event ev;
ev.impl = event;
ev.key.m_ts = m_currentTs;
@@ -235,6 +296,8 @@ DefaultSimulatorImpl::ScheduleNow (EventImpl *event)
EventId
DefaultSimulatorImpl::ScheduleDestroy (EventImpl *event)
{
NS_ASSERT_MSG (SystemThread::Equals (m_main), "Simulator::ScheduleDestroy Thread-unsafe invocation!");
EventId id (Ptr<EventImpl> (event, false), m_currentTs, 0xffffffff, 2);
m_destroyEvents.push_back (id);
m_uid++;

View File

@@ -24,6 +24,8 @@
#include "simulator-impl.h"
#include "scheduler.h"
#include "event-impl.h"
#include "system-thread.h"
#include "ns3/system-mutex.h"
#include "ptr.h"
@@ -64,11 +66,23 @@ private:
virtual void DoDispose (void);
void ProcessOneEvent (void);
uint64_t NextTs (void) const;
typedef std::list<EventId> DestroyEvents;
void ProcessEventsWithContext (void);
struct EventWithContext {
uint32_t context;
uint64_t timestamp;
EventImpl *event;
};
typedef std::list<struct EventWithContext> EventsWithContext;
EventsWithContext m_eventsWithContext;
bool m_eventsWithContextEmpty;
SystemMutex m_eventsWithContextMutex;
typedef std::list<EventId> DestroyEvents;
DestroyEvents m_destroyEvents;
bool m_stop;
Ptr<Scheduler> m_events;
uint32_t m_uid;
uint32_t m_currentUid;
uint64_t m_currentTs;
@@ -76,6 +90,8 @@ private:
// number of events that have been inserted but not yet scheduled,
// not counting the "destroy" events; this is used for validation
int m_unscheduledEvents;
SystemThread::ThreadId m_main;
};
} // namespace ns3

View File

@@ -80,6 +80,8 @@ RealtimeSimulatorImpl::RealtimeSimulatorImpl ()
m_currentContext = 0xffffffff;
m_unscheduledEvents = 0;
m_main = SystemThread::Self();
// Be very careful not to do anything that would cause a change or assignment
// of the underlying reference counts of m_synchronizer or you will be sorry.
m_synchronizer = CreateObject<WallClockSynchronizer> ();
@@ -424,6 +426,9 @@ RealtimeSimulatorImpl::Run (void)
NS_ASSERT_MSG (m_running == false,
"RealtimeSimulatorImpl::Run(): Simulator already running");
// Set the current threadId as the main threadId
m_main = SystemThread::Self();
m_stop = false;
m_running = true;
m_synchronizer->SetOrigin (m_currentTs);
@@ -510,6 +515,9 @@ RealtimeSimulatorImpl::RunOneEvent (void)
//
{
CriticalSection cs (m_mutex);
// Set the current threadId as the main threadId
m_main = SystemThread::Self();
Scheduler::Event next = m_events->RemoveNext ();
@@ -581,7 +589,20 @@ RealtimeSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &time,
CriticalSection cs (m_mutex);
uint64_t ts;
ts = m_currentTs + time.GetTimeStep ();
if (SystemThread::Equals (m_main))
{
ts = m_currentTs + time.GetTimeStep ();
}
else
{
//
// If the simulator is running, we're pacing and have a meaningful
// realtime clock. If we're not, then m_currentTs is where we stopped.
//
ts = m_running ? m_synchronizer->GetCurrentRealtime () : m_currentTs;
ts += time.GetTimeStep ();
}
NS_ASSERT_MSG (ts >= m_currentTs, "RealtimeSimulatorImpl::ScheduleRealtime(): schedule for time < m_currentTs");
Scheduler::Event ev;
ev.impl = impl;

View File

@@ -20,6 +20,7 @@
#define REALTIME_SIMULATOR_IMPL_H
#include "simulator-impl.h"
#include "system-thread.h"
#include "scheduler.h"
#include "synchronizer.h"
@@ -118,6 +119,8 @@ private:
* The maximum allowable drift from real-time in SYNC_HARD_LIMIT mode.
*/
Time m_hardLimit;
SystemThread::ThreadId m_main;
};
} // namespace ns3

View File

@@ -286,6 +286,7 @@ public:
/**
* Schedule an event with the given context.
* A context of 0xffffffff means no context is specified.
* This method is thread-safe: it can be called from any thread.
*
* @param time the relative expiration time of the event.
* @param context user-specified context parameter
@@ -296,6 +297,8 @@ public:
static void ScheduleWithContext (uint32_t context, Time const &time, MEM mem_ptr, OBJ obj);
/**
* This method is thread-safe: it can be called from any thread.
*
* @param time the relative expiration time of the event.
* @param context user-specified context parameter
* @param mem_ptr member method pointer to invoke
@@ -306,6 +309,8 @@ public:
static void ScheduleWithContext (uint32_t context, Time const &time, MEM mem_ptr, OBJ obj, T1 a1);
/**
* This method is thread-safe: it can be called from any thread.
*
* @param time the relative expiration time of the event.
* @param context user-specified context parameter
* @param mem_ptr member method pointer to invoke
@@ -317,6 +322,8 @@ public:
static void ScheduleWithContext (uint32_t context, Time const &time, MEM mem_ptr, OBJ obj, T1 a1, T2 a2);
/**
* This method is thread-safe: it can be called from any thread.
*
* @param time the relative expiration time of the event.
* @param context user-specified context parameter
* @param mem_ptr member method pointer to invoke
@@ -330,6 +337,8 @@ public:
static void ScheduleWithContext (uint32_t context, Time const &time, MEM mem_ptr, OBJ obj, T1 a1, T2 a2, T3 a3);
/**
* This method is thread-safe: it can be called from any thread.
*
* @param time the relative expiration time of the event.
* @param context user-specified context parameter
* @param mem_ptr member method pointer to invoke
@@ -344,6 +353,8 @@ public:
static void ScheduleWithContext (uint32_t context, Time const &time, MEM mem_ptr, OBJ obj, T1 a1, T2 a2, T3 a3, T4 a4);
/**
* This method is thread-safe: it can be called from any thread.
*
* @param time the relative expiration time of the event.
* @param context user-specified context parameter
* @param mem_ptr member method pointer to invoke
@@ -359,6 +370,8 @@ public:
static void ScheduleWithContext (uint32_t context, Time const &time, MEM mem_ptr, OBJ obj,
T1 a1, T2 a2, T3 a3, T4 a4, T5 a5);
/**
* This method is thread-safe: it can be called from any thread.
*
* @param time the relative expiration time of the event.
* @param context user-specified context parameter
* @param f the function to invoke
@@ -366,6 +379,8 @@ public:
static void ScheduleWithContext (uint32_t context, Time const &time, void (*f)(void));
/**
* This method is thread-safe: it can be called from any thread.
*
* @param time the relative expiration time of the event.
* @param context user-specified context parameter
* @param f the function to invoke
@@ -375,6 +390,8 @@ public:
static void ScheduleWithContext (uint32_t context, Time const &time, void (*f)(U1), T1 a1);
/**
* This method is thread-safe: it can be called from any thread.
*
* @param time the relative expiration time of the event.
* @param context user-specified context parameter
* @param f the function to invoke
@@ -385,6 +402,8 @@ public:
static void ScheduleWithContext (uint32_t context, Time const &time, void (*f)(U1,U2), T1 a1, T2 a2);
/**
* This method is thread-safe: it can be called from any thread.
*
* @param time the relative expiration time of the event.
* @param context user-specified context parameter
* @param f the function to invoke
@@ -396,6 +415,8 @@ public:
static void ScheduleWithContext (uint32_t context, Time const &time, void (*f)(U1,U2,U3), T1 a1, T2 a2, T3 a3);
/**
* This method is thread-safe: it can be called from any thread.
*
* @param time the relative expiration time of the event.
* @param context user-specified context parameter
* @param f the function to invoke
@@ -409,6 +430,8 @@ public:
static void ScheduleWithContext (uint32_t context, Time const &time, void (*f)(U1,U2,U3,U4), T1 a1, T2 a2, T3 a3, T4 a4);
/**
* This method is thread-safe: it can be called from any thread.
*
* @param time the relative expiration time of the event.
* @param context user-specified context parameter
* @param f the function to invoke
@@ -743,6 +766,8 @@ public:
static EventId Schedule (Time const &time, const Ptr<EventImpl> &event);
/**
* This method is thread-safe: it can be called from any thread.
*
* \param time delay until the event expires
* \param context event context
* \param event the event to schedule

View File

@@ -0,0 +1,266 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright (c) 2011 INRIA
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation;
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Author: Claudio Freire <claudio-daniel.freire@inria.fr>
*/
#include "ns3/test.h"
#include "ns3/simulator.h"
#include "ns3/list-scheduler.h"
#include "ns3/heap-scheduler.h"
#include "ns3/map-scheduler.h"
#include "ns3/calendar-scheduler.h"
#include "ns3/config.h"
#include "ns3/string.h"
#include "ns3/system-thread.h"
#include <time.h>
#include <list>
#include <utility>
namespace ns3 {
#define MAXTHREADS 64
class ThreadedSimulatorEventsTestCase : public TestCase
{
public:
ThreadedSimulatorEventsTestCase (ObjectFactory schedulerFactory, const std::string &simulatorType, unsigned int threads);
void A (int a);
void B (int b);
void C (int c);
void D (int d);
void DoNothing (unsigned int threadno);
static void SchedulingThread (std::pair<ThreadedSimulatorEventsTestCase *, unsigned int> context);
void End (void);
uint64_t m_b;
uint64_t m_a;
uint64_t m_c;
uint64_t m_d;
unsigned int m_threads;
bool m_threadWaiting[MAXTHREADS];
bool m_stop;
ObjectFactory m_schedulerFactory;
std::string m_simulatorType;
std::string m_error;
std::list<Ptr<SystemThread> > m_threadlist;
private:
virtual void DoSetup (void);
virtual void DoRun (void);
virtual void DoTeardown (void);
};
ThreadedSimulatorEventsTestCase::ThreadedSimulatorEventsTestCase (ObjectFactory schedulerFactory, const std::string &simulatorType, unsigned int threads)
: TestCase ("Check that threaded event handling is working with " +
schedulerFactory.GetTypeId ().GetName () + " in " + simulatorType),
m_threads (threads),
m_schedulerFactory (schedulerFactory),
m_simulatorType (simulatorType)
{
}
void
ThreadedSimulatorEventsTestCase::End (void)
{
m_stop = true;
for (std::list<Ptr<SystemThread> >::iterator it2 = m_threadlist.begin(); it2 != m_threadlist.end(); ++it2)
{
(*it2)->Join();
}
}
void
ThreadedSimulatorEventsTestCase::SchedulingThread (std::pair<ThreadedSimulatorEventsTestCase *, unsigned int> context)
{
ThreadedSimulatorEventsTestCase *me = context.first;
unsigned int threadno = context.second;
while (!me->m_stop)
{
me->m_threadWaiting[threadno] = true;
Simulator::ScheduleWithContext (
uint32_t (-1),
MicroSeconds (1),
MakeEvent (&ThreadedSimulatorEventsTestCase::DoNothing, me, threadno));
while (!me->m_stop && me->m_threadWaiting[threadno])
{
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 500;
nanosleep(&ts, NULL);
}
}
}
void
ThreadedSimulatorEventsTestCase::DoNothing (unsigned int threadno)
{
if (!m_error.empty())
{
m_error = "Bad threaded scheduling";
}
m_threadWaiting[threadno] = false;
}
void
ThreadedSimulatorEventsTestCase::A (int a)
{
if (m_a != m_b || m_a != m_c || m_a != m_d)
{
m_error = "Bad scheduling";
Simulator::Stop();
};
++m_a;
Simulator::Schedule (
MicroSeconds (10),
MakeEvent (&ThreadedSimulatorEventsTestCase::B, this, a+1));
}
void
ThreadedSimulatorEventsTestCase::B (int b)
{
if (m_a != (m_b+1) || m_a != (m_c+1) || m_a != (m_d+1))
{
m_error = "Bad scheduling";
Simulator::Stop();
};
++m_b;
Simulator::Schedule (
MicroSeconds (10),
MakeEvent (&ThreadedSimulatorEventsTestCase::C, this, b+1));
}
void
ThreadedSimulatorEventsTestCase::C (int c)
{
if (m_a != m_b || m_a != (m_c+1) || m_a != (m_d+1))
{
m_error = "Bad scheduling";
Simulator::Stop();
};
++m_c;
Simulator::Schedule (
MicroSeconds (10),
MakeEvent (&ThreadedSimulatorEventsTestCase::D, this, c+1));
}
void
ThreadedSimulatorEventsTestCase::D (int d)
{
if (m_a != m_b || m_a != m_c || m_a != (m_d+1))
{
m_error = "Bad scheduling";
Simulator::Stop();
};
++m_d;
if (m_stop)
{
Simulator::Stop();
}
else
{
Simulator::Schedule (
MicroSeconds (10),
MakeEvent (&ThreadedSimulatorEventsTestCase::A, this, d+1));
}
}
void
ThreadedSimulatorEventsTestCase::DoSetup (void)
{
if (!m_simulatorType.empty())
{
Config::SetGlobal ("SimulatorImplementationType", StringValue (m_simulatorType));
}
m_error = "";
m_a =
m_b =
m_c =
m_d = 0;
for (unsigned int i=0; i < m_threads; ++i)
{
m_threadlist.push_back(
Create<SystemThread> (MakeBoundCallback (
&ThreadedSimulatorEventsTestCase::SchedulingThread,
std::pair<ThreadedSimulatorEventsTestCase *, unsigned int>(this,1) )) );
}
}
void
ThreadedSimulatorEventsTestCase::DoTeardown (void)
{
m_threadlist.clear();
Config::SetGlobal ("SimulatorImplementationType", StringValue ("ns3::DefaultSimulatorImpl"));
}
void
ThreadedSimulatorEventsTestCase::DoRun (void)
{
Simulator::SetScheduler (m_schedulerFactory);
Simulator::Schedule (MicroSeconds (10), &ThreadedSimulatorEventsTestCase::A, this, 1);
Simulator::Schedule (Seconds (1), &ThreadedSimulatorEventsTestCase::End, this);
for (std::list<Ptr<SystemThread> >::iterator it = m_threadlist.begin(); it != m_threadlist.end(); ++it)
{
(*it)->Start();
}
Simulator::Run ();
Simulator::Destroy ();
m_stop = true;
NS_TEST_EXPECT_MSG_EQ (m_error.empty(), true, m_error.c_str());
NS_TEST_EXPECT_MSG_EQ (m_a, m_b, "Bad scheduling");
NS_TEST_EXPECT_MSG_EQ (m_a, m_c, "Bad scheduling");
NS_TEST_EXPECT_MSG_EQ (m_a, m_d, "Bad scheduling");
}
class ThreadedSimulatorTestSuite : public TestSuite
{
public:
ThreadedSimulatorTestSuite ()
: TestSuite ("threaded-simulator")
{
std::string simulatorTypes[] = {
"",
"ns3::RealtimeSimulatorImpl"
};
unsigned int threadcounts[] = {2, 10, 20};
ObjectFactory factory;
for (unsigned int i=0; i < (sizeof(simulatorTypes) / sizeof(simulatorTypes[0])); ++i)
{
for (unsigned int j=0; j < (sizeof(threadcounts) / sizeof(threadcounts[0])); ++j)
{
factory.SetTypeId (ListScheduler::GetTypeId ());
AddTestCase (new ThreadedSimulatorEventsTestCase (factory, simulatorTypes[i], threadcounts[j]));
factory.SetTypeId (MapScheduler::GetTypeId ());
AddTestCase (new ThreadedSimulatorEventsTestCase (factory, simulatorTypes[i], threadcounts[j]));
factory.SetTypeId (HeapScheduler::GetTypeId ());
AddTestCase (new ThreadedSimulatorEventsTestCase (factory, simulatorTypes[i], threadcounts[j]));
factory.SetTypeId (CalendarScheduler::GetTypeId ());
AddTestCase (new ThreadedSimulatorEventsTestCase (factory, simulatorTypes[i], threadcounts[j]));
}
}
}
} g_threadedSimulatorTestSuite;
} // namespace ns3

View File

@@ -168,6 +168,7 @@ def build(bld):
'test/traced-callback-test-suite.cc',
'test/type-traits-test-suite.cc',
'test/watchdog-test-suite.cc',
'test/threaded-test-suite.cc',
]
headers = bld.new_task_gen(features=['ns3header'])