A Discrete-Event Network Simulator
API
null-message-mpi-interface.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright 2013. Lawrence Livermore National Security, LLC.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License version 2 as
7  * published by the Free Software Foundation;
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  * Author: Steven Smith <smith84@llnl.gov>
19  *
20  */
21 
23 
26 #include "remote-channel-bundle.h"
27 
28 #include "ns3/mpi-receiver.h"
29 #include "ns3/node.h"
30 #include "ns3/node-list.h"
31 #include "ns3/net-device.h"
32 #include "ns3/nstime.h"
33 #include "ns3/simulator.h"
34 #include "ns3/log.h"
35 
36 #ifdef NS3_MPI
37 #include <mpi.h>
38 #endif
39 
40 #include <iostream>
41 #include <iomanip>
42 #include <list>
43 
44 namespace ns3 {
45 
46 NS_LOG_COMPONENT_DEFINE ("NullMessageMpiInterface");
47 
52 #ifdef NS3_MPI
53 const uint32_t NULL_MESSAGE_MAX_MPI_MSG_SIZE = 2000;
54 #endif
55 
57 {
58  m_buffer = 0;
59  m_request = 0;
60 }
61 
63 {
64  delete [] m_buffer;
65 }
66 
67 uint8_t*
69 {
70  return m_buffer;
71 }
72 
73 void
75 {
76  m_buffer = buffer;
77 }
78 
81 {
82  return &m_request;
83 }
84 
90 std::list<NullMessageSentBuffer> NullMessageMpiInterface::g_pendingTx;
91 
94 
96 {
97  NS_LOG_FUNCTION (this);
98 
99 #ifndef NS3_MPI
100  /*
101  * This class can only be constructed if MPI is available. Fail if an
102  * attempt is made to instantiate this class without MPI.
103  */
104  NS_FATAL_ERROR ("Must compile with MPI if Null Message simulator is used, see --enable-mpi option for waf");
105 #endif
106 }
107 
109 {
110  NS_LOG_FUNCTION (this);
111 }
112 
113 void
115 {
116  NS_LOG_FUNCTION (this);
117 }
118 
119 uint32_t
121 {
123  return g_sid;
124 }
125 
126 uint32_t
128 {
130  return g_size;
131 }
132 
133 bool
135 {
136  if (!g_initialized)
137  {
139  g_initialized = true;
140  }
141  return g_enabled;
142 }
143 
144 void
145 NullMessageMpiInterface::Enable (int* pargc, char*** pargv)
146 {
147  NS_LOG_FUNCTION (this << *pargc);
148 
149 #ifndef NS3_MPI
150  NS_UNUSED(pargv);
151 #else
152 
153  // Initialize the MPI interface
154  MPI_Init (pargc, pargv);
155  MPI_Barrier (MPI_COMM_WORLD);
156 
157  // SystemId and Size are unit32_t in interface but MPI uses int so convert.
158  int mpiSystemId;
159  int mpiSize;
160  MPI_Comm_rank (MPI_COMM_WORLD, &mpiSystemId);
161  MPI_Comm_size (MPI_COMM_WORLD, &mpiSize);
162 
163  g_sid = mpiSystemId;
164  g_size = mpiSize;
165 
166  g_enabled = true;
167  g_initialized = true;
168 
169 #endif
170 }
171 
172 void
174 {
176 #ifdef NS3_MPI
178 
180 
181  // Post a non-blocking receive for all peers
183  g_pRxBuffers = new char*[g_numNeighbors];
184  int index = 0;
185  for (uint32_t rank = 0; rank < g_size; ++rank)
186  {
188  if (bundle)
189  {
190  g_pRxBuffers[index] = new char[NULL_MESSAGE_MAX_MPI_MSG_SIZE];
191  MPI_Irecv (g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, rank, 0,
192  MPI_COMM_WORLD, &g_requests[index]);
193  ++index;
194  }
195  }
196 #endif
197 }
198 
199 void
200 NullMessageMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev)
201 {
202  NS_LOG_FUNCTION (this << p << rxTime.GetTimeStep () << node << dev);
203 
205 
206 #ifdef NS3_MPI
207 
208  // Find the system id for the destination node
209  Ptr<Node> destNode = NodeList::GetNode (node);
210  uint32_t nodeSysId = destNode->GetSystemId ();
211 
212  NullMessageSentBuffer sendBuf;
213  g_pendingTx.push_back (sendBuf);
214  std::list<NullMessageSentBuffer>::reverse_iterator iter = g_pendingTx.rbegin (); // Points to the last element
215 
216  uint32_t serializedSize = p->GetSerializedSize ();
217  uint32_t bufferSize = serializedSize + ( 2 * sizeof (uint64_t) ) + ( 2 * sizeof (uint32_t) );
218  uint8_t* buffer = new uint8_t[bufferSize];
219  iter->SetBuffer (buffer);
220  // Add the time, dest node and dest device
221  uint64_t t = rxTime.GetInteger ();
222  uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
223  *pTime++ = t;
224 
225  Time guarantee_update = NullMessageSimulatorImpl::GetInstance ()->CalculateGuaranteeTime (nodeSysId);
226  *pTime++ = guarantee_update.GetTimeStep ();
227 
228  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
229  *pData++ = node;
230  *pData++ = dev;
231  // Serialize the packet
232  p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
233 
234  MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
235  0, MPI_COMM_WORLD, (iter->GetRequest ()));
236 
238 
239 #endif
240 }
241 
242 void
244 {
245  NS_LOG_FUNCTION (guarantee_update.GetTimeStep () << bundle);
246 
248 
249 #ifdef NS3_MPI
250 
251  NullMessageSentBuffer sendBuf;
252  g_pendingTx.push_back (sendBuf);
253  std::list<NullMessageSentBuffer>::reverse_iterator iter = g_pendingTx.rbegin (); // Points to the last element
254 
255  uint32_t bufferSize = 2 * sizeof (uint64_t) + 2 * sizeof (uint32_t);
256  uint8_t* buffer = new uint8_t[bufferSize];
257  iter->SetBuffer (buffer);
258  // Add the time, dest node and dest device
259  uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
260  *pTime++ = 0;
261  *pTime++ = guarantee_update.GetInteger ();
262  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
263  *pData++ = 0;
264  *pData++ = 0;
265 
266  // Find the system id for the destination MPI rank
267  uint32_t nodeSysId = bundle->GetSystemId ();
268 
269  MPI_Isend (reinterpret_cast<void *> (iter->GetBuffer ()), bufferSize, MPI_CHAR, nodeSysId,
270  0, MPI_COMM_WORLD, (iter->GetRequest ()));
271 #endif
272 }
273 
274 void
276 {
278 
279  ReceiveMessages(true);
280 }
281 
282 
283 void
285 {
287 
288  ReceiveMessages(false);
289 }
290 
291 
292 void
294 {
295  NS_LOG_FUNCTION (blocking);
296 
298 
299 #ifdef NS3_MPI
300 
301  // stop flag set to true when no more messages are found to
302  // process.
303  bool stop = false;
304 
305 
306  if (!g_numNeighbors) {
307  // Not communicating with anyone.
308  return;
309  }
310 
311  do
312  {
313  int messageReceived = 0;
314  int index = 0;
315  MPI_Status status;
316 
317  if (blocking)
318  {
319  MPI_Waitany (g_numNeighbors, g_requests, &index, &status);
320  messageReceived = 1; /* Wait always implies message was received */
321  stop = true;
322  }
323  else
324  {
325  MPI_Testany (g_numNeighbors, g_requests, &index, &messageReceived, &status);
326  }
327 
328  if (messageReceived)
329  {
330  int count;
331  MPI_Get_count (&status, MPI_CHAR, &count);
332 
333  // Get the meta data first
334  uint64_t* pTime = reinterpret_cast<uint64_t *> (g_pRxBuffers[index]);
335  uint64_t time = *pTime++;
336  uint64_t guaranteeUpdate = *pTime++;
337 
338  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
339  uint32_t node = *pData++;
340  uint32_t dev = *pData++;
341 
342  Time rxTime (time);
343 
344  // rxtime == 0 means this is a Null Message
345  if (rxTime > Time (0))
346  {
347  count -= sizeof (time) + sizeof (guaranteeUpdate) + sizeof (node) + sizeof (dev);
348 
349  Ptr<Packet> p = Create<Packet> (reinterpret_cast<uint8_t *> (pData), count, true);
350 
351  // Find the correct node/device to schedule receive event
352  Ptr<Node> pNode = NodeList::GetNode (node);
353  Ptr<MpiReceiver> pMpiRec = 0;
354  uint32_t nDevices = pNode->GetNDevices ();
355  for (uint32_t i = 0; i < nDevices; ++i)
356  {
357  Ptr<NetDevice> pThisDev = pNode->GetDevice (i);
358  if (pThisDev->GetIfIndex () == dev)
359  {
360  pMpiRec = pThisDev->GetObject<MpiReceiver> ();
361  break;
362  }
363  }
364  NS_ASSERT (pNode && pMpiRec);
365 
366  // Schedule the rx event
367  Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (),
368  &MpiReceiver::Receive, pMpiRec, p);
369 
370  }
371 
372  // Update guarantee time for both packet receives and Null Messages.
373  Ptr<RemoteChannelBundle> bundle = RemoteChannelBundleManager::Find (status.MPI_SOURCE);
374  NS_ASSERT (bundle);
375 
376  bundle->SetGuaranteeTime (Time (guaranteeUpdate));
377 
378  // Re-queue the next read
379  MPI_Irecv (g_pRxBuffers[index], NULL_MESSAGE_MAX_MPI_MSG_SIZE, MPI_CHAR, status.MPI_SOURCE, 0,
380  MPI_COMM_WORLD, &g_requests[index]);
381 
382  }
383  else
384  {
385  // if non-blocking and no message received in testany then stop message loop
386  stop = true;
387  }
388  }
389  while (!stop);
390 #endif
391 }
392 
393 void
395 {
397 
399 
400 #ifdef NS3_MPI
401  std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin ();
402  while (iter != g_pendingTx.end ())
403  {
404  MPI_Status status;
405  int flag = 0;
406  MPI_Test (iter->GetRequest (), &flag, &status);
407  std::list<NullMessageSentBuffer>::iterator current = iter; // Save current for erasing
408  ++iter; // Advance to next
409  if (flag)
410  { // This message is complete
411  g_pendingTx.erase (current);
412  }
413  }
414 #endif
415 }
416 
417 void
419 {
420  NS_LOG_FUNCTION (this);
421 
422 #ifdef NS3_MPI
423  int flag = 0;
424  MPI_Initialized (&flag);
425  if (flag)
426  {
427 
428  for (std::list<NullMessageSentBuffer>::iterator iter = g_pendingTx.begin ();
429  iter != g_pendingTx.end ();
430  ++iter)
431  {
432  MPI_Cancel (iter->GetRequest ());
433  MPI_Request_free (iter->GetRequest ());
434  }
435 
436  for (uint32_t i = 0; i < g_numNeighbors; ++i)
437  {
438  MPI_Cancel (&g_requests[i]);
439  MPI_Request_free (&g_requests[i]);
440  }
441 
442  MPI_Finalize ();
443 
444  for (uint32_t i = 0; i < g_numNeighbors; ++i)
445  {
446  delete [] g_pRxBuffers[i];
447  }
448  delete [] g_pRxBuffers;
449  delete [] g_requests;
450 
451  g_pendingTx.clear ();
452 
453  g_enabled = false;
454  g_initialized = false;
455 
456  }
457  else
458  {
459  NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first");
460  }
461 #endif
462 }
463 
464 } // namespace ns3
virtual void Destroy()
Delete all buffers.
Time CalculateGuaranteeTime(uint32_t systemId)
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:102
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:73
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by "...
uint32_t GetId(void) const
Definition: node.cc:107
int64_t GetInteger(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:399
static Ptr< SimulatorImpl > GetImplementation(void)
Get the SimulatorImpl singleton.
Definition: simulator.cc:425
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:241
Ptr< NetDevice > GetDevice(uint32_t index) const
Retrieve the index-th NetDevice associated to this node.
Definition: node.cc:142
uint32_t GetSerializedSize(void) const
Returns number of bytes required for packet serialization.
Definition: packet.cc:585
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file...
Definition: assert.h:67
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:202
#define NS_UNUSED(x)
Mark a local variable as unused.
Definition: unused.h:36
static void ReceiveMessagesNonBlocking()
Non-blocking check for received messages complete.
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:162
void RescheduleNullMessageEvent(Ptr< RemoteChannelBundle > bundle)
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
virtual void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev)
static NullMessageSimulatorImpl * GetInstance(void)
static void TestSendComplete()
Check for completed sends.
virtual void Enable(int *pargc, char ***pargv)
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:44
static void SendNullMessage(const Time &guaranteeUpdate, Ptr< RemoteChannelBundle > bundle)
Send a Null Message to across the specified bundle.
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:42
static void ReceiveMessages(bool blocking=false)
Check for received messages complete.
virtual void Disable()
Terminates the MPI environment by calling MPI_Finalize This function must be called after Destroy ()...
Every class exported by the ns3 library is enclosed in the ns3 namespace.
static void ReceiveMessagesBlocking()
Blocking message receive.
uint32_t GetSystemId(void) const
Definition: node.cc:121
static Ptr< RemoteChannelBundle > Find(uint32_t systemId)
NullMessageSentBuffer()
maximum MPI message size for easy buffer creation
static Time Now(void)
Return the current simulation virtual time.
Definition: simulator.cc:249
uint8_t * m_buffer
Buffer for send.
static void ScheduleWithContext(uint32_t context, Time const &delay, MEM mem_ptr, OBJ obj)
Schedule an event with the given context.
Definition: simulator.h:1475
Non-blocking send buffers for Null Message implementation.
static void InitializeSendReceiveBuffers(void)
Initialize send and receive buffers.
static std::list< NullMessageSentBuffer > g_pendingTx
uint32_t GetNDevices(void) const
Definition: node.cc:150
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
Definition: packet.cc:628
MPI_Request m_request
MPI request posted for the send.
int64_t GetTimeStep(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:391