Graybat  1.1
Graph Approach for Highly Generic Communication Schemes Based on Adaptive Topologies
BMPI.hpp
1 #pragma once
2 
3 // CLIB
4 #include <assert.h> /* assert */
5 
6 // STL
7 #include <array> /* array */
8 #include <numeric> /* std::accumulate */
9 #include <iostream> /* std::cout */
10 #include <map> /* std::map */
11 #include <exception> /* std::out_of_range */
12 #include <sstream> /* std::stringstream */
13 #include <algorithm> /* std::transform */
14 
15 // BOOST
16 #include <boost/mpi/environment.hpp>
17 #include <boost/mpi/communicator.hpp>
18 #include <boost/mpi/collectives.hpp>
19 #include <boost/mpi/datatype.hpp>
20 #include <boost/optional.hpp>
21 namespace mpi = boost::mpi;
22 
23 // MPI
24 #include <mpi.h> /* MPI_* */
25 
26 // GRAYBAT
27 #include <graybat/utils/serialize_tuple.hpp>
28 #include <graybat/communicationPolicy/bmpi/Context.hpp> /* Context */
29 #include <graybat/communicationPolicy/bmpi/Event.hpp> /* Event */
30 #include <graybat/communicationPolicy/bmpi/Config.hpp> /* Config */
31 #include <graybat/communicationPolicy/Base.hpp>
32 #include <graybat/communicationPolicy/Traits.hpp>
33 
34 namespace graybat {
35 
36  namespace communicationPolicy {
37 
38  /************************************************************************/
47  struct BMPI;
48 
49  namespace traits {
50 
51  template<>
52  struct ContextType<BMPI> {
54  };
55 
56  template<>
57  struct EventType<BMPI> {
59  };
60 
61  template<>
62  struct ConfigType<BMPI> {
64  };
65 
66  }
67 
68  struct BMPI : Base<BMPI>{
69 
70  // Type defs
71  using Tag = typename graybat::communicationPolicy::Tag<BMPI>;
72  using ContextID = typename graybat::communicationPolicy::ContextID<BMPI>;
73  using MsgType = typename graybat::communicationPolicy::MsgType<BMPI>;
74  using VAddr = typename graybat::communicationPolicy::VAddr<BMPI>;
75  using Context = typename graybat::communicationPolicy::Context<BMPI>;
76  using Event = typename graybat::communicationPolicy::Event<BMPI>;
77  using Config = typename graybat::communicationPolicy::Config<BMPI>;
78  using Uri = int;
79 
80  BMPI(Config const config) :contextCount(0),
81  uriMap(0),
82  initialContext(contextCount, mpi::communicator()){
83 
84  uriMap.push_back(std::vector<Uri>());
85 
86  for(unsigned i = 0; i < initialContext.size(); ++i){
87  uriMap.back().push_back(i);
88  }
89 
90  }
91 
92  // Destructor
93  ~BMPI(){
94 
95  }
96  /***********************************************************************/
115  template <typename T_Send>
116  void send(const VAddr destVAddr, const Tag tag, const Context context, const T_Send& sendData){
117  Uri destUri = getVAddrUri(context, destVAddr);
118  context.comm.send(destUri, tag, sendData.data(), sendData.size());
119  }
120 
121 
136  template <typename T_Send>
137  Event asyncSend(const VAddr destVAddr, const Tag tag, const Context context, const T_Send& sendData){
138  Uri destUri = getVAddrUri(context, destVAddr);
139  mpi::request request = context.comm.isend(destUri, tag, sendData.data(), sendData.size());
140  return Event(request);
141 
142  }
155  template <typename T_Recv>
156  void recv(const VAddr srcVAddr, const Tag tag, const Context context, T_Recv& recvData){
157  Uri srcUri = getVAddrUri(context, srcVAddr);
158  context.comm.recv(srcUri, tag, recvData.data(), recvData.size());
159 
160  }
161 
162  template <typename T_Recv>
163  Event recv(const Context context, T_Recv& recvData){
164  //std::cerr << mpi::any_source << " " << mpi::any_tag << std::endl;
165 
166  //auto status = context.comm.recv(mpi::any_source, mpi::any_tag, recvData.data(), recvData.size());
167  auto status = context.comm.probe();
168  context.comm.recv(status.source(), status.tag(), recvData.data(), recvData.size());
169  return Event(status);
170 
171 
172  //auto status = context.comm.recv(boost::mpi::any_source, boost::mpi::any_tag, recvData.data(), recvData.size());
173  //auto status = context.comm.recv(boost::mpi::any_source, boost::mpi::any_tag);
174 
175  }
176 
177 
193  template <typename T_Recv>
194  Event asyncRecv(const VAddr srcVAddr, const Tag tag, const Context context, T_Recv& recvData){
195  Uri srcUri = getVAddrUri(context, srcVAddr);
196  mpi::request request = context.comm.irecv(srcUri, tag, recvData.data(), recvData.size());
197  return Event(request);
198  }
199 
200 
203  /************************************************************************/
224  template <typename T_Send, typename T_Recv>
225  void gather(const VAddr rootVAddr, const Context context, const T_Send& sendData, T_Recv& recvData){
226  Uri rootUri = getVAddrUri(context, rootVAddr);
227  mpi::gather(context.comm, sendData.data(), sendData.size(), recvData, rootUri);
228  }
229 
230 
253  template <typename T_Send, typename T_Recv>
254  void gatherVar(const VAddr rootVAddr, const Context context, const T_Send& sendData, T_Recv& recvData, std::vector<unsigned>& recvCount){
255  // Retrieve number of elements each peer sends
256  recvCount.resize(context.size());
257  std::array<unsigned, 1> nElements{{(unsigned)sendData.size()}};
258  allGather(context, nElements, recvCount);
259  recvData.resize(std::accumulate(recvCount.begin(), recvCount.end(), 0U));
260 
261  Uri rootUri = getVAddrUri(context, rootVAddr);
262  int rdispls[context.size()];
263 
264  // Create offset map
265  unsigned offset = 0;
266  for (unsigned i=0; i < context.size(); ++i) {
267  rdispls[i] = offset;
268  offset += recvCount[i];
269 
270  }
271 
272  // Gather data with varying size
273  MPI_Gatherv(const_cast<typename T_Send::value_type*>(sendData.data()), sendData.size(),
274  mpi::get_mpi_datatype<typename T_Send::value_type>(*(sendData.data())),
275  const_cast<typename T_Recv::value_type*>(recvData.data()),
276  const_cast<int*>((int*)recvCount.data()), rdispls,
277  mpi::get_mpi_datatype<typename T_Recv::value_type>(*(recvData.data())),
278  rootUri, context.comm);
279 
280 
281  }
282 
283 
293  template <typename T_Send, typename T_Recv>
294  void allGather(Context context, const T_Send& sendData, T_Recv& recvData){
295  mpi::all_gather(context.comm, sendData.data(), sendData.size(), recvData.data());
296 
297  }
298 
299 
310  template <typename T_Send, typename T_Recv>
311  void allGatherVar(const Context context, const T_Send& sendData, T_Recv& recvData, std::vector<unsigned>& recvCount){
312  // Retrieve number of elements each peer sends
313  recvCount.resize(context.size());
314  allGather(context, std::array<unsigned, 1>{{(unsigned)sendData.size()}}, recvCount);
315  recvData.resize(std::accumulate(recvCount.begin(), recvCount.end(), 0U));
316 
317  int rdispls[context.size()];
318 
319  // Create offset map
320  unsigned offset = 0;
321  for (unsigned i=0; i < context.size(); ++i) {
322  rdispls[i] = offset;
323  offset += recvCount[i];
324 
325  }
326 
327  // Gather data with varying size
328  MPI_Allgatherv(const_cast<typename T_Send::value_type*>(sendData.data()), sendData.size(),
329  mpi::get_mpi_datatype<typename T_Send::value_type>(*(sendData.data())),
330  const_cast<typename T_Recv::value_type*>(recvData.data()),
331  const_cast<int*>((int*)recvCount.data()), rdispls,
332  mpi::get_mpi_datatype<typename T_Recv::value_type>(*(recvData.data())),
333  context.comm);
334 
335 
336  }
337 
338 
351  template <typename T_Send, typename T_Recv>
352  void scatter(const VAddr rootVAddr, const Context context, const T_Send& sendData, T_Recv& recvData){
353  Uri rootUri = getVAddrUri(context, rootVAddr);
354  mpi::scatter(context.comm, sendData.data(), recvData.data(), recvData.size(), rootUri);
355 
356  }
357 
358 
371  template <typename T_Send, typename T_Recv>
372  void allToAll(const Context context, const T_Send& sendData, T_Recv& recvData){
373  unsigned elementsPerPeer = sendData.size() / context.size();
374 
375  mpi::all_to_all(context.comm, sendData.data(), elementsPerPeer, recvData.data());
376 
377  }
378 
379 
396  template <typename T_Send, typename T_Recv, typename T_Op>
397  void reduce(const VAddr rootVAddr, const Context context, const T_Op op, const T_Send& sendData, T_Recv& recvData){
398  Uri rootUri = getVAddrUri(context, rootVAddr);
399  mpi::reduce(context.comm, sendData.data(), sendData.size(), recvData.data(), op, rootUri);
400 
401  }
402 
415  template <typename T_Send, typename T_Recv, typename T_Op>
416  void allReduce(const Context context, T_Op op, const T_Send& sendData, T_Recv& recvData){
417  mpi::all_reduce(context.comm, sendData.data(), sendData.size(), recvData.data(), op);
418 
419  }
420 
421 
434  template <typename T_SendRecv>
435  void broadcast(const VAddr rootVAddr, const Context context, T_SendRecv& data){
436  Uri rootUri = uriMap.at(context.getID()).at(rootVAddr);
437  mpi::broadcast(context.comm, data.data(), data.size(), rootUri);
438  }
439 
440 
446  void synchronize(const Context context){
447  context.comm.barrier();
448  }
449 
450 
458  void synchronize(){
460  }
464  /*************************************************************************/
475  Context splitContext(const bool isMember, const Context oldContext){
476  mpi::communicator newComm = oldContext.comm.split(isMember);
477 
478  if(isMember){
479  Context newContext(++contextCount, newComm);
480  std::array<Uri, 1> uri {{ (int) newContext.getVAddr() }};
481  uriMap.push_back(std::vector<Uri>(newContext.size()));
482 
483  std::vector<Event> events;
484 
485  for(unsigned i = 0; i < newContext.size(); ++i){
486  events.push_back(Event(newContext.comm.isend(i, 0, uri.data(), 1)));
487 
488  }
489 
490  for(unsigned i = 0; i < newContext.size(); ++i){
491  std::array<Uri, 1> otherUri {{ 0 }};
492  newContext.comm.recv(i, 0, otherUri.data(), 1);
493  uriMap.at(newContext.getID()).at(i) = otherUri[0];
494 
495  }
496 
497  for(unsigned i = 0; i < events.size(); ++i){
498  events.back().wait();
499  events.pop_back();
500  }
501 
502  return newContext;
503 
504 
505  }
506  else {
507  return Context();
508  }
509 
510 
511  }
512 
513 
518  Context getGlobalContext(){
519  return initialContext;
520  }
523  private:
524 
525  /***************************************************************************
526  *
527  * @name Private Member
528  *
529  ***************************************************************************/
530  ContextID contextCount;
531  std::vector<std::vector<Uri>> uriMap;
532  Context initialContext;
533  mpi::environment env;
534 
535  /***************************************************************************
536  *
537  * @name Helper Functions
538  *
539  ***************************************************************************/
540 
541 
542  void error(VAddr vAddr, std::string msg){
543  std::cout << "[" << vAddr << "] " << msg;
544 
545  }
546 
552  template <typename T_Context>
553  inline Uri getVAddrUri(const T_Context context, const VAddr vAddr){
554  Uri uri = 0;
555  try {
556  uri = uriMap.at(context.getID()).at(vAddr);
557 
558  } catch(const std::out_of_range& e){
559  std::stringstream errorStream;
560  errorStream << "MPI::getVAddrUri::" << e.what()<< " : Communicator with ID " << vAddr << " is not part of the context " << context.getID() << std::endl;
561  error(context.getID(), errorStream.str());
562  exit(1);
563  }
564 
565  return uri;
566  }
567 
568  };
569 
570  } // namespace communicationPolicy
571 
572 } // namespace graybat
Event asyncSend(const VAddr destVAddr, const Tag tag, const Context context, const T_Send &sendData)
Non blocking transmission of a message sendData to peer with virtual address destVAddr.
Definition: BMPI.hpp:137
Context splitContext(const bool isMember, const Context oldContext)
Creates a new context with all peers that declared isMember as true.
Definition: BMPI.hpp:475
void allToAll(const Context context, const T_Send &sendData, T_Recv &recvData)
Distributes sendData of all peer in the context to all peers in the context. Every peer will receive ...
Definition: BMPI.hpp:372
Definition: chain.cpp:31
void allGather(Context context, const T_Send &sendData, T_Recv &recvData)
Collects sendData from all members of the context and transmits it as a list to every peer in the con...
Definition: BMPI.hpp:294
void allReduce(const Context context, T_Op op, const T_Send &sendData, T_Recv &recvData)
Performs a reduction with a binary operator op on all sendData elements from all peers whithin the co...
Definition: BMPI.hpp:416
void synchronize()
Synchronizes all peers within the globalContext in the programm execution (barrier).
Definition: BMPI.hpp:458
Definition: Base.hpp:20
void send(const VAddr destVAddr, const Tag tag, const Context context, const T_Send &sendData)
Blocking transmission of a message sendData to peer with virtual address destVAddr.
Definition: BMPI.hpp:116
An event is returned by non-blocking communication operations and can be asked whether an operation h...
Definition: Event.hpp:20
void reduce(const VAddr rootVAddr, const Context context, const T_Op op, const T_Send &sendData, T_Recv &recvData)
Performs a reduction with a binary operator op on all sendData elements from all peers whithin the co...
Definition: BMPI.hpp:397
void synchronize(const Context context)
Synchronizes all peers within context to the same point in the programm execution (barrier)...
Definition: BMPI.hpp:446
void broadcast(const VAddr rootVAddr, const Context context, T_SendRecv &data)
Send sendData from peer rootVAddr to all peers in context. Every peer will receive the same data...
Definition: BMPI.hpp:435
void allGatherVar(const Context context, const T_Send &sendData, T_Recv &recvData, std::vector< unsigned > &recvCount)
Collects sendData from all peers of the context. Size of sendData can vary in size. The data is received by every peer in the context.
Definition: BMPI.hpp:311
Implementation of the Cage communicationPolicy interface based on the MPI implementation boost::mpi...
Definition: BMPI.hpp:68
Event asyncRecv(const VAddr srcVAddr, const Tag tag, const Context context, T_Recv &recvData)
Non blocking receive of a message recvData from peer with virtual address srcVAddr.
Definition: BMPI.hpp:194
void gatherVar(const VAddr rootVAddr, const Context context, const T_Send &sendData, T_Recv &recvData, std::vector< unsigned > &recvCount)
Collects sendData from all members of the context with varying size and transmits it as a list to pee...
Definition: BMPI.hpp:254
Context getGlobalContext()
Returns the context that contains all peers.
Definition: BMPI.hpp:518
void gather(const VAddr rootVAddr, const Context context, const T_Send &sendData, T_Recv &recvData)
Collects sendData from all peers of the context and transmits it as a list to the peer with rootVAddr...
Definition: BMPI.hpp:225
void recv(const VAddr srcVAddr, const Tag tag, const Context context, T_Recv &recvData)
Blocking receive of a message recvData from peer with virtual address srcVAddr.
Definition: BMPI.hpp:156
Definition: BiStar.hpp:8
void scatter(const VAddr rootVAddr, const Context context, const T_Send &sendData, T_Recv &recvData)
Distributes sendData from peer rootVAddr to all peers in context. Every peer will receive different d...
Definition: BMPI.hpp:352