A Discrete-Event Network Simulator
API
granted-time-window-mpi-interface.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  *
18  */
19 
20 // This object contains static methods that provide an easy interface
21 // to the necessary MPI information.
22 
23 #include <iostream>
24 #include <iomanip>
25 #include <list>
26 
28 #include "mpi-receiver.h"
29 #include "mpi-interface.h"
30 
31 #include "ns3/node.h"
32 #include "ns3/node-list.h"
33 #include "ns3/net-device.h"
34 #include "ns3/simulator.h"
35 #include "ns3/simulator-impl.h"
36 #include "ns3/nstime.h"
37 #include "ns3/log.h"
38 
39 #ifdef NS3_MPI
40 #include <mpi.h>
41 #endif
42 
43 namespace ns3 {
44 
45 NS_LOG_COMPONENT_DEFINE ("GrantedTimeWindowMpiInterface");
46 
48 {
49  m_buffer = 0;
50  m_request = 0;
51 }
52 
54 {
55  delete [] m_buffer;
56 }
57 
58 uint8_t*
60 {
61  return m_buffer;
62 }
63 
64 void
65 SentBuffer::SetBuffer (uint8_t* buffer)
66 {
67  m_buffer = buffer;
68 }
69 
70 #ifdef NS3_MPI
73 {
74  return &m_request;
75 }
76 #endif
77 
84 std::list<SentBuffer> GrantedTimeWindowMpiInterface::m_pendingTx;
85 
86 #ifdef NS3_MPI
89 #endif
90 
91 TypeId
93 {
94  static TypeId tid = TypeId ("ns3::GrantedTimeWindowMpiInterface")
95  .SetParent<Object> ()
96  .SetGroupName ("Mpi")
97  ;
98  return tid;
99 }
100 
101 void
103 {
104  NS_LOG_FUNCTION (this);
105 
106 #ifdef NS3_MPI
107  for (uint32_t i = 0; i < GetSize (); ++i)
108  {
109  delete [] m_pRxBuffers[i];
110  }
111  delete [] m_pRxBuffers;
112  delete [] m_requests;
113 
114  m_pendingTx.clear ();
115 #endif
116 }
117 
118 uint32_t
120 {
121  return m_rxCount;
122 }
123 
124 uint32_t
126 {
127  return m_txCount;
128 }
129 
130 uint32_t
132 {
133  if (!m_initialized)
134  {
136  m_initialized = true;
137  }
138  return m_sid;
139 }
140 
141 uint32_t
143 {
144  if (!m_initialized)
145  {
147  m_initialized = true;
148  }
149  return m_size;
150 }
151 
152 bool
154 {
155  if (!m_initialized)
156  {
158  m_initialized = true;
159  }
160  return m_enabled;
161 }
162 
163 void
164 GrantedTimeWindowMpiInterface::Enable (int* pargc, char*** pargv)
165 {
166  NS_LOG_FUNCTION (this << pargc << pargv);
167 
168 #ifdef NS3_MPI
169  // Initialize the MPI interface
170  MPI_Init (pargc, pargv);
171  MPI_Barrier (MPI_COMM_WORLD);
172  MPI_Comm_rank (MPI_COMM_WORLD, reinterpret_cast <int *> (&m_sid));
173  MPI_Comm_size (MPI_COMM_WORLD, reinterpret_cast <int *> (&m_size));
174  m_enabled = true;
175  m_initialized = true;
176  // Post a non-blocking receive for all peers
177  m_pRxBuffers = new char*[m_size];
179  for (uint32_t i = 0; i < GetSize (); ++i)
180  {
181  m_pRxBuffers[i] = new char[MAX_MPI_MSG_SIZE];
182  MPI_Irecv (m_pRxBuffers[i], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
183  MPI_COMM_WORLD, &m_requests[i]);
184  }
185 #else
186  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
187 #endif
188 }
189 
190 void
191 GrantedTimeWindowMpiInterface::SendPacket (Ptr<Packet> p, const Time& rxTime, uint32_t node, uint32_t dev)
192 {
193  NS_LOG_FUNCTION (this << p << rxTime.GetTimeStep () << node << dev);
194 
195 #ifdef NS3_MPI
196  SentBuffer sendBuf;
197  m_pendingTx.push_back (sendBuf);
198  std::list<SentBuffer>::reverse_iterator i = m_pendingTx.rbegin (); // Points to the last element
199 
200  uint32_t serializedSize = p->GetSerializedSize ();
201  uint8_t* buffer = new uint8_t[serializedSize + 16];
202  i->SetBuffer (buffer);
203  // Add the time, dest node and dest device
204  uint64_t t = rxTime.GetInteger ();
205  uint64_t* pTime = reinterpret_cast <uint64_t *> (buffer);
206  *pTime++ = t;
207  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
208  *pData++ = node;
209  *pData++ = dev;
210  // Serialize the packet
211  p->Serialize (reinterpret_cast<uint8_t *> (pData), serializedSize);
212 
213  // Find the system id for the destination node
214  Ptr<Node> destNode = NodeList::GetNode (node);
215  uint32_t nodeSysId = destNode->GetSystemId ();
216 
217  MPI_Isend (reinterpret_cast<void *> (i->GetBuffer ()), serializedSize + 16, MPI_CHAR, nodeSysId,
218  0, MPI_COMM_WORLD, (i->GetRequest ()));
219  m_txCount++;
220 #else
221  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
222 #endif
223 }
224 
225 void
227 {
229 
230 #ifdef NS3_MPI
231  // Poll the non-block reads to see if data arrived
232  while (true)
233  {
234  int flag = 0;
235  int index = 0;
236  MPI_Status status;
237 
238  MPI_Testany (MpiInterface::GetSize (), m_requests, &index, &flag, &status);
239  if (!flag)
240  {
241  break; // No more messages
242  }
243  int count;
244  MPI_Get_count (&status, MPI_CHAR, &count);
245  m_rxCount++; // Count this receive
246 
247  // Get the meta data first
248  uint64_t* pTime = reinterpret_cast<uint64_t *> (m_pRxBuffers[index]);
249  uint64_t time = *pTime++;
250  uint32_t* pData = reinterpret_cast<uint32_t *> (pTime);
251  uint32_t node = *pData++;
252  uint32_t dev = *pData++;
253 
254  Time rxTime (time);
255 
256  count -= sizeof (time) + sizeof (node) + sizeof (dev);
257 
258  Ptr<Packet> p = Create<Packet> (reinterpret_cast<uint8_t *> (pData), count, true);
259 
260  // Find the correct node/device to schedule receive event
261  Ptr<Node> pNode = NodeList::GetNode (node);
262  Ptr<MpiReceiver> pMpiRec = 0;
263  uint32_t nDevices = pNode->GetNDevices ();
264  for (uint32_t i = 0; i < nDevices; ++i)
265  {
266  Ptr<NetDevice> pThisDev = pNode->GetDevice (i);
267  if (pThisDev->GetIfIndex () == dev)
268  {
269  pMpiRec = pThisDev->GetObject<MpiReceiver> ();
270  break;
271  }
272  }
273 
274  NS_ASSERT (pNode && pMpiRec);
275 
276  // Schedule the rx event
277  Simulator::ScheduleWithContext (pNode->GetId (), rxTime - Simulator::Now (),
278  &MpiReceiver::Receive, pMpiRec, p);
279 
280  // Re-queue the next read
281  MPI_Irecv (m_pRxBuffers[index], MAX_MPI_MSG_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 0,
282  MPI_COMM_WORLD, &m_requests[index]);
283  }
284 #else
285  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
286 #endif
287 }
288 
289 void
291 {
293 
294 #ifdef NS3_MPI
295  std::list<SentBuffer>::iterator i = m_pendingTx.begin ();
296  while (i != m_pendingTx.end ())
297  {
298  MPI_Status status;
299  int flag = 0;
300  MPI_Test (i->GetRequest (), &flag, &status);
301  std::list<SentBuffer>::iterator current = i; // Save current for erasing
302  i++; // Advance to next
303  if (flag)
304  { // This message is complete
305  m_pendingTx.erase (current);
306  }
307  }
308 #else
309  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
310 #endif
311 }
312 
313 void
315 {
317 
318 #ifdef NS3_MPI
319  int flag = 0;
320  MPI_Initialized (&flag);
321  if (flag)
322  {
323  MPI_Finalize ();
324  m_enabled = false;
325  m_initialized = false;
326  }
327  else
328  {
329  NS_FATAL_ERROR ("Cannot disable MPI environment without Initializing it first");
330  }
331 #else
332  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
333 #endif
334 }
335 
336 
337 } // namespace ns3
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:102
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by "...
uint32_t GetId(void) const
Definition: node.cc:107
int64_t GetInteger(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:399
static Ptr< SimulatorImpl > GetImplementation(void)
Get the SimulatorImpl singleton.
Definition: simulator.cc:425
static Ptr< Node > GetNode(uint32_t n)
Definition: node-list.cc:241
Ptr< NetDevice > GetDevice(uint32_t index) const
Retrieve the index-th NetDevice associated to this node.
Definition: node.cc:142
uint32_t GetSerializedSize(void) const
Returns number of bytes required for packet serialization.
Definition: packet.cc:585
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file...
Definition: assert.h:67
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:202
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:162
#define NS_LOG_FUNCTION_NOARGS()
Output the name of the function.
virtual void Enable(int *pargc, char ***pargv)
static void TestSendComplete()
Check for completed sends.
void Receive(Ptr< Packet > p)
Direct an incoming packet to the device Receive() method.
Definition: mpi-receiver.cc:44
Tracks non-blocking sends.
Class to aggregate to a NetDevice if it supports MPI capability.
Definition: mpi-receiver.h:42
Every class exported by the ns3 library is enclosed in the ns3 namespace.
uint32_t GetSystemId(void) const
Definition: node.cc:121
static Time Now(void)
Return the current simulation virtual time.
Definition: simulator.cc:249
virtual void Disable()
Terminates the MPI environment by calling MPI_Finalize This function must be called after Destroy () ...
virtual void SendPacket(Ptr< Packet > p, const Time &rxTime, uint32_t node, uint32_t dev)
MPI_Request * GetRequest()
static void ScheduleWithContext(uint32_t context, Time const &delay, MEM mem_ptr, OBJ obj)
Schedule an event with the given context.
Definition: simulator.h:1475
static void ReceiveMessages()
Check for received messages complete.
const uint32_t MAX_MPI_MSG_SIZE
maximum MPI message size for easy buffer creation
A base class which provides memory management and object aggregation.
Definition: object.h:87
a unique identifier for an interface.
Definition: type-id.h:58
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:915
static uint32_t GetSize()
uint32_t GetNDevices(void) const
Definition: node.cc:150
uint32_t Serialize(uint8_t *buffer, uint32_t maxSize) const
Serialize a packet, tags, and metadata into a byte buffer.
Definition: packet.cc:628
int64_t GetTimeStep(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:391