A Discrete-Event Network Simulator
API
distributed-simulator-impl.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  *
18  */
19 
22 #include "mpi-interface.h"
23 
24 #include "ns3/simulator.h"
25 #include "ns3/scheduler.h"
26 #include "ns3/event-impl.h"
27 #include "ns3/channel.h"
28 #include "ns3/node-container.h"
29 #include "ns3/ptr.h"
30 #include "ns3/pointer.h"
31 #include "ns3/assert.h"
32 #include "ns3/log.h"
33 
34 #include <cmath>
35 
36 #ifdef NS3_MPI
37 #include <mpi.h>
38 #endif
39 
40 namespace ns3 {
41 
42 NS_LOG_COMPONENT_DEFINE ("DistributedSimulatorImpl");
43 
44 NS_OBJECT_ENSURE_REGISTERED (DistributedSimulatorImpl);
45 
47 {
48 }
49 
50 Time
52 {
53  return m_smallestTime;
54 }
55 
56 uint32_t
58 {
59  return m_txCount;
60 }
61 
62 uint32_t
64 {
65  return m_rxCount;
66 }
67 uint32_t
69 {
70  return m_myId;
71 }
72 
73 bool
75 {
76  return m_isFinished;
77 }
78 
80 
81 TypeId
83 {
84  static TypeId tid = TypeId ("ns3::DistributedSimulatorImpl")
86  .SetGroupName ("Mpi")
87  .AddConstructor<DistributedSimulatorImpl> ()
88  ;
89  return tid;
90 }
91 
93 {
94  NS_LOG_FUNCTION (this);
95 
96 #ifdef NS3_MPI
99 
100  // Allocate the LBTS message buffer
102  m_grantedTime = Seconds (0);
103 #else
105  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
106 #endif
107 
108  m_stop = false;
109  m_globalFinished = false;
110  // uids are allocated from 4.
111  // uid 0 is "invalid" events
112  // uid 1 is "now" events
113  // uid 2 is "destroy" events
114  m_uid = 4;
115  // before ::Run is entered, the m_currentUid will be zero
116  m_currentUid = 0;
117  m_currentTs = 0;
120  m_events = 0;
121 }
122 
124 {
125  NS_LOG_FUNCTION (this);
126 }
127 
128 void
130 {
131  NS_LOG_FUNCTION (this);
132 
133  while (!m_events->IsEmpty ())
134  {
135  Scheduler::Event next = m_events->RemoveNext ();
136  next.impl->Unref ();
137  }
138  m_events = 0;
139  delete [] m_pLBTS;
141 }
142 
143 void
145 {
146  NS_LOG_FUNCTION (this);
147 
148  while (!m_destroyEvents.empty ())
149  {
150  Ptr<EventImpl> ev = m_destroyEvents.front ().PeekEventImpl ();
151  m_destroyEvents.pop_front ();
152  NS_LOG_LOGIC ("handle destroy " << ev);
153  if (!ev->IsCancelled ())
154  {
155  ev->Invoke ();
156  }
157  }
158 
160 }
161 
162 
163 void
165 {
166  NS_LOG_FUNCTION (this);
167 
168 #ifdef NS3_MPI
169  if (MpiInterface::GetSize () <= 1)
170  {
171  m_lookAhead = Seconds (0);
172  }
173  else
174  {
175  if (m_lookAhead == Seconds (-1))
176  {
178  }
179  // else it was already set by SetLookAhead
180 
182  for (NodeContainer::Iterator iter = c.Begin (); iter != c.End (); ++iter)
183  {
184  if ((*iter)->GetSystemId () != MpiInterface::GetSystemId ())
185  {
186  continue;
187  }
188 
189  for (uint32_t i = 0; i < (*iter)->GetNDevices (); ++i)
190  {
191  Ptr<NetDevice> localNetDevice = (*iter)->GetDevice (i);
192  // only works for p2p links currently
193  if (!localNetDevice->IsPointToPoint ())
194  {
195  continue;
196  }
197  Ptr<Channel> channel = localNetDevice->GetChannel ();
198  if (channel == 0)
199  {
200  continue;
201  }
202 
203  // grab the adjacent node
204  Ptr<Node> remoteNode;
205  if (channel->GetDevice (0) == localNetDevice)
206  {
207  remoteNode = (channel->GetDevice (1))->GetNode ();
208  }
209  else
210  {
211  remoteNode = (channel->GetDevice (0))->GetNode ();
212  }
213 
214  // if it's not remote, don't consider it
215  if (remoteNode->GetSystemId () == MpiInterface::GetSystemId ())
216  {
217  continue;
218  }
219 
220  // compare delay on the channel with current value of
221  // m_lookAhead. if delay on channel is smaller, make
222  // it the new lookAhead.
223  TimeValue delay;
224  channel->GetAttribute ("Delay", delay);
225 
226  if (delay.Get () < m_lookAhead)
227  {
228  m_lookAhead = delay.Get ();
229  }
230  }
231  }
232  }
233 
234  // m_lookAhead is now set
236 
237  /*
238  * Compute the maximum inter-task latency and use that value
239  * for tasks with no inter-task links.
240  *
241  * Special processing for edge cases. For tasks that have no
242  * nodes need to determine a reasonable lookAhead value. Infinity
243  * would work correctly but introduces a performance issue; tasks
244  * with an infinite lookAhead would execute all their events
245  * before doing an AllGather resulting in very bad load balance
246  * during the first time window. Since all tasks participate in
247  * the AllGather it is desirable to have all the tasks advance in
248  * simulation time at a similar rate assuming roughly equal events
249  * per unit of simulation time in order to equalize the amount of
250  * work per time window.
251  */
252  long sendbuf;
253  long recvbuf;
254 
255  /* Tasks with no inter-task links do not contribute to max */
257  {
258  sendbuf = 0;
259  }
260  else
261  {
262  sendbuf = m_lookAhead.GetInteger ();
263  }
264 
265  MPI_Allreduce (&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MPI_COMM_WORLD);
266 
267  /* For nodes that did not compute a lookahead use max from ranks
268  * that did compute a value. An edge case occurs if all nodes have
269  * no inter-task links (max will be 0 in this case). Use infinity so all tasks
270  * will proceed without synchronization until a single AllGather
271  * occurs when all tasks have finished.
272  */
273  if (m_lookAhead == GetMaximumSimulationTime () && recvbuf != 0)
274  {
275  m_lookAhead = Time (recvbuf);
277  }
278 
279 #else
280  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
281 #endif
282 }
283 
284 void
286 {
287  if (lookAhead > 0)
288  {
289  NS_LOG_FUNCTION (this << lookAhead);
290  m_lookAhead = lookAhead;
291  }
292  else
293  {
294  NS_LOG_WARN ("attempted to set look ahead negative: " << lookAhead);
295  }
296 }
297 
298 void
300 {
301  NS_LOG_FUNCTION (this << schedulerFactory);
302 
303  Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler> ();
304 
305  if (m_events != 0)
306  {
307  while (!m_events->IsEmpty ())
308  {
309  Scheduler::Event next = m_events->RemoveNext ();
310  scheduler->Insert (next);
311  }
312  }
313  m_events = scheduler;
314 }
315 
316 void
318 {
319  NS_LOG_FUNCTION (this);
320 
321  Scheduler::Event next = m_events->RemoveNext ();
322 
323  NS_ASSERT (next.key.m_ts >= m_currentTs);
325 
326  NS_LOG_LOGIC ("handle " << next.key.m_ts);
327  m_currentTs = next.key.m_ts;
329  m_currentUid = next.key.m_uid;
330  next.impl->Invoke ();
331  next.impl->Unref ();
332 }
333 
334 bool
336 {
337  return m_globalFinished;
338 }
339 
340 bool
342 {
343  return m_events->IsEmpty () || m_stop;
344 }
345 
346 uint64_t
348 {
349  // If local MPI task is has no more events or stop was called
350  // next event time is infinity.
351  if (IsLocalFinished ())
352  {
354  }
355  else
356  {
357  Scheduler::Event ev = m_events->PeekNext ();
358  return ev.key.m_ts;
359  }
360 }
361 
362 Time
364 {
365  return TimeStep (NextTs ());
366 }
367 
368 void
370 {
371  NS_LOG_FUNCTION (this);
372 
373 #ifdef NS3_MPI
375  m_stop = false;
376  m_globalFinished = false;
377  while (!m_globalFinished)
378  {
379  Time nextTime = Next ();
380 
381  // If local event is beyond grantedTime then need to synchronize
382  // with other tasks to determine new time window. If local task
383  // is finished then continue to participate in allgather
384  // synchronizations with other tasks until all tasks have
385  // completed.
386  if (nextTime > m_grantedTime || IsLocalFinished () )
387  {
388  // Can't process next event, calculate a new LBTS
389  // First receive any pending messages
391  // reset next time
392  nextTime = Next ();
393  // And check for send completes
395  // Finally calculate the lbts
397  m_myId, IsLocalFinished (), nextTime);
398  m_pLBTS[m_myId] = lMsg;
399  MPI_Allgather (&lMsg, sizeof (LbtsMessage), MPI_BYTE, m_pLBTS,
400  sizeof (LbtsMessage), MPI_BYTE, MPI_COMM_WORLD);
401  Time smallestTime = m_pLBTS[0].GetSmallestTime ();
402  // The totRx and totTx counts insure there are no transient
403  // messages; If totRx != totTx, there are transients,
404  // so we don't update the granted time.
405  uint32_t totRx = m_pLBTS[0].GetRxCount ();
406  uint32_t totTx = m_pLBTS[0].GetTxCount ();
408 
409  for (uint32_t i = 1; i < m_systemCount; ++i)
410  {
411  if (m_pLBTS[i].GetSmallestTime () < smallestTime)
412  {
413  smallestTime = m_pLBTS[i].GetSmallestTime ();
414  }
415  totRx += m_pLBTS[i].GetRxCount ();
416  totTx += m_pLBTS[i].GetTxCount ();
418  }
419  if (totRx == totTx)
420  {
421  // If lookahead is infinite then granted time should be as well.
422  // Covers the edge case if all the tasks have no inter tasks
423  // links, prevents overflow of granted time.
425  {
427  }
428  else
429  {
430  // Overflow is possible here if near end of representable time.
431  m_grantedTime = smallestTime + m_lookAhead;
432  }
433  }
434  }
435 
436  // Execute next event if it is within the current time window.
437  // Local task may be completed.
438  if ( (nextTime <= m_grantedTime) && (!IsLocalFinished ()) )
439  { // Safe to process
440  ProcessOneEvent ();
441  }
442  }
443 
444  // If the simulator stopped naturally by lack of events, make a
445  // consistency test to check that we didn't lose any events along the way.
446  NS_ASSERT (!m_events->IsEmpty () || m_unscheduledEvents == 0);
447 #else
448  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
449 #endif
450 }
451 
453 {
454  return m_myId;
455 }
456 
457 void
459 {
460  NS_LOG_FUNCTION (this);
461 
462  m_stop = true;
463 }
464 
465 void
467 {
468  NS_LOG_FUNCTION (this << delay.GetTimeStep ());
469 
471 }
472 
473 //
474 // Schedule an event for a _relative_ time in the future.
475 //
476 EventId
478 {
479  NS_LOG_FUNCTION (this << delay.GetTimeStep () << event);
480 
481  Time tAbsolute = delay + TimeStep (m_currentTs);
482 
483  NS_ASSERT (tAbsolute.IsPositive ());
484  NS_ASSERT (tAbsolute >= TimeStep (m_currentTs));
485  Scheduler::Event ev;
486  ev.impl = event;
487  ev.key.m_ts = static_cast<uint64_t> (tAbsolute.GetTimeStep ());
488  ev.key.m_context = GetContext ();
489  ev.key.m_uid = m_uid;
490  m_uid++;
492  m_events->Insert (ev);
493  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
494 }
495 
496 void
497 DistributedSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &delay, EventImpl *event)
498 {
499  NS_LOG_FUNCTION (this << context << delay.GetTimeStep () << m_currentTs << event);
500 
501  Scheduler::Event ev;
502  ev.impl = event;
503  ev.key.m_ts = m_currentTs + delay.GetTimeStep ();
504  ev.key.m_context = context;
505  ev.key.m_uid = m_uid;
506  m_uid++;
508  m_events->Insert (ev);
509 }
510 
511 EventId
513 {
514  NS_LOG_FUNCTION (this << event);
515 
516  Scheduler::Event ev;
517  ev.impl = event;
518  ev.key.m_ts = m_currentTs;
519  ev.key.m_context = GetContext ();
520  ev.key.m_uid = m_uid;
521  m_uid++;
523  m_events->Insert (ev);
524  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
525 }
526 
527 EventId
529 {
530  NS_LOG_FUNCTION (this << event);
531 
532  EventId id (Ptr<EventImpl> (event, false), m_currentTs, 0xffffffff, 2);
533  m_destroyEvents.push_back (id);
534  m_uid++;
535  return id;
536 }
537 
538 Time
540 {
541  return TimeStep (m_currentTs);
542 }
543 
544 Time
546 {
547  if (IsExpired (id))
548  {
549  return TimeStep (0);
550  }
551  else
552  {
553  return TimeStep (id.GetTs () - m_currentTs);
554  }
555 }
556 
557 void
559 {
560  if (id.GetUid () == 2)
561  {
562  // destroy events.
563  for (DestroyEvents::iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
564  {
565  if (*i == id)
566  {
567  m_destroyEvents.erase (i);
568  break;
569  }
570  }
571  return;
572  }
573  if (IsExpired (id))
574  {
575  return;
576  }
577  Scheduler::Event event;
578  event.impl = id.PeekEventImpl ();
579  event.key.m_ts = id.GetTs ();
580  event.key.m_context = id.GetContext ();
581  event.key.m_uid = id.GetUid ();
582  m_events->Remove (event);
583  event.impl->Cancel ();
584  // whenever we remove an event from the event list, we have to unref it.
585  event.impl->Unref ();
586 
588 }
589 
590 void
592 {
593  if (!IsExpired (id))
594  {
595  id.PeekEventImpl ()->Cancel ();
596  }
597 }
598 
599 bool
601 {
602  if (id.GetUid () == 2)
603  {
604  if (id.PeekEventImpl () == 0
605  || id.PeekEventImpl ()->IsCancelled ())
606  {
607  return true;
608  }
609  // destroy events.
610  for (DestroyEvents::const_iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
611  {
612  if (*i == id)
613  {
614  return false;
615  }
616  }
617  return true;
618  }
619  if (id.PeekEventImpl () == 0
620  || id.GetTs () < m_currentTs
621  || (id.GetTs () == m_currentTs
622  && id.GetUid () <= m_currentUid)
623  || id.PeekEventImpl ()->IsCancelled ())
624  {
625  return true;
626  }
627  else
628  {
629  return false;
630  }
631 }
632 
633 Time
635 {
638  return TimeStep (0x7fffffffffffffffLL);
639 }
640 
641 uint32_t
643 {
644  return m_currentContext;
645 }
646 
647 } // namespace ns3
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:102
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:73
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by "...
virtual void SetScheduler(ObjectFactory schedulerFactory)
Set the Scheduler to be used to manage the event list.
virtual uint32_t GetContext(void) const
Get the current simulation context.
void Unref(void) const
Decrement the reference count.
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:45
int64_t GetInteger(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:399
std::vector< Ptr< Node > >::const_iterator Iterator
Node container iterator.
uint64_t m_ts
Event time stamp.
Definition: scheduler.h:81
virtual EventId Schedule(Time const &delay, EventImpl *event)
Schedule a future event execution (in the same context).
EventImpl * impl
Pointer to the event implementation.
Definition: scheduler.h:94
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file...
Definition: assert.h:67
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:202
virtual EventId ScheduleDestroy(EventImpl *event)
Schedule an event to run at the end of the simulation, after the Stop() time or condition has been re...
#define NS_UNUSED(x)
Mark a local variable as unused.
Definition: unused.h:36
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:162
void(* Time)(Time oldValue, Time newValue)
TracedValue callback signature for Time.
Definition: nstime.h:740
virtual void DoDispose(void)
Destructor implementation.
Definition: object.cc:346
virtual void SetMaximumLookAhead(const Time lookAhead)
Iterator End(void) const
Get an iterator which indicates past-the-last Node in the container.
virtual Time GetDelayLeft(const EventId &id) const
Get the remaining time until this event will execute.
virtual void DoDispose(void)
Destructor implementation.
channel
Definition: third.py:85
void Invoke(void)
Called by the simulation engine to notify the event that it is time to execute.
Definition: event-impl.cc:46
virtual Time GetMaximumSimulationTime(void) const
Get the maximum representable simulation time.
static void TestSendComplete()
Check for completed sends.
static EventId Schedule(Time const &delay, MEM mem_ptr, OBJ obj)
Schedule an event to expire after delay.
Definition: simulator.h:1381
EventKey key
Key for sorting and ordering Events.
Definition: scheduler.h:95
virtual bool IsFinished(void) const
Check if the simulation should finish.
static void Destroy()
Deletes storage used by the parallel environment.
AttributeValue implementation for Time.
Definition: nstime.h:1076
Ptr< Object > Create(void) const
Create an Object instance of the configured TypeId.
uint32_t m_uid
Event unique id.
Definition: scheduler.h:82
virtual void Cancel(const EventId &id)
Set the cancel bit on this event: the event&#39;s associated function will not be invoked when it expires...
virtual EventId ScheduleNow(EventImpl *event)
Schedule an event to run at the current virtual time.
Maintain the event list.
Definition: scheduler.h:66
virtual bool IsExpired(const EventId &id) const
Check if an event has already run or been cancelled.
Scheduler event.
Definition: scheduler.h:92
Distributed simulator implementation using lookahead.
Every class exported by the ns3 library is enclosed in the ns3 namespace.
keep track of a set of node pointers.
virtual uint32_t GetSystemId(void) const
Get the system id of this simulator.
virtual void Destroy()
Execute the events scheduled with ScheduleDestroy().
uint32_t GetSystemId(void) const
Definition: node.cc:121
Time TimeStep(uint64_t ts)
Definition: nstime.h:1071
NS_LOG_LOGIC("Net device "<< nd<< " is not bridged")
Structure used for all-reduce LBTS computation.
static NodeContainer GetGlobal(void)
Create a NodeContainer that contains a list of all nodes created through NodeContainer::Create() and ...
virtual void Remove(const EventId &id)
Remove an event from the event list.
Instantiate subclasses of ns3::Object.
A simulation event.
Definition: event-impl.h:44
static void ReceiveMessages()
Check for received messages complete.
An identifier for simulation events.
Definition: event-id.h:53
static uint32_t GetSystemId()
#define NS_LOG_WARN(msg)
Use NS_LOG to output a message of level LOG_WARN.
Definition: log.h:262
static void Stop(void)
Tell the Simulator the calling event should be the last one executed.
Definition: simulator.cc:234
Time Get(void) const
Definition: time.cc:443
Time Seconds(double value)
Construct a Time in the indicated unit.
Definition: nstime.h:1014
Flag for events not associated with any particular context.
Definition: simulator.h:198
virtual void ScheduleWithContext(uint32_t context, Time const &delay, EventImpl *event)
Schedule a future event execution (in a different context).
virtual Time Now(void) const
Return the current simulation virtual time.
virtual void Run(void)
Run the simulation.
virtual void Stop(void)
Tell the Simulator the calling event should be the last one executed.
a unique identifier for an interface.
Definition: type-id.h:58
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:915
static uint32_t GetSize()
uint32_t m_context
Event context.
Definition: scheduler.h:83
Iterator Begin(void) const
Get an iterator which refers to the first Node in the container.
The SimulatorImpl base class.
int64_t GetTimeStep(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:391