Graybat  1.1
Graph Approach for Highly Generic Communication Schemes Based on Adaptive Topologies
MinBMPI.hpp
1 #pragma once
2 #include <assert.h> /* assert */
3 #include <array> /* array */
4 #include <numeric> /* std::accumulate */
5 #include <iostream> /* std::cout */
6 
7 #include <map> /* std::map */
8 #include <exception> /* std::out_of_range */
9 #include <sstream> /* std::stringstream */
10 #include <algorithm> /* std::transform */
11 #include <mpi.h> /* MPI_* */
12 
13 
14 // Boost mpi stuff
15 #include <boost/mpi/environment.hpp>
16 #include <boost/mpi/communicator.hpp>
17 #include <boost/mpi/collectives.hpp>
18 #include <boost/mpi/datatype.hpp>
19 #include <boost/optional.hpp>
20 
21 
22 namespace mpi = boost::mpi;
23 
24 namespace graybat {
25 
26  namespace communicationPolicy {
27 
28  /************************************************************************/
35  struct MinBMPI {
41  class Context {
42  typedef unsigned ContextID;
43  typedef unsigned VAddr;
44 
45  public:
46  Context() :
47  id(0),
48  isValid(false){
49 
50  }
51 
52  Context(ContextID contextID, mpi::communicator comm) :
53  comm(comm),
54  id(contextID),
55  isValid(true){
56 
57  }
58 
59  Context& operator=(const Context& otherContext){
60  id = otherContext.getID();
61  isValid = otherContext.valid();
62  comm = otherContext.comm;
63  return *this;
64 
65  }
66 
67  size_t size() const{
68  return comm.size();
69  }
70 
71  VAddr getVAddr() const {
72  return comm.rank();
73  }
74 
75  ContextID getID() const {
76  return id;
77  }
78 
79  bool valid() const{
80  return isValid;
81  }
82 
83  mpi::communicator comm;
84 
85  private:
86  ContextID id;
87  bool isValid;
88  };
89 
98  class Event {
99  public:
100  Event(mpi::request request) : request(request){
101 
102  }
103 
104  ~Event(){
105 
106  }
107 
108  void wait(){
109  request.wait();
110 
111  }
112 
113  bool ready(){
114  boost::optional<mpi::status> status = request.test();
115 
116  if(status){
117  return true;
118  }
119  else {
120  return false;
121  }
122 
123  }
124 
125  private:
126  mpi::request request;
127  };
128 
129 
130  // Type defs
131  typedef unsigned Tag;
132  typedef unsigned ContextID;
133  typedef unsigned VAddr;
134 
135  typedef unsigned MsgType;
136  typedef int Uri;
137 
138 
139  MinBMPI() :contextCount(0),
140  uriMap(0),
141  initialContext(contextCount, mpi::communicator()){
142 
143  uriMap.push_back(std::vector<Uri>());
144 
145  for(unsigned i = 0; i < initialContext.size(); ++i){
146  uriMap.back().push_back(i);
147  }
148 
149  }
150 
151  // Destructor
152  ~MinBMPI(){
153 
154  }
155  /***********************************************************************/
176  // template <typename T_Send>
177  // Event asyncSend(const VAddr destVAddr, const Tag tag, const Context context, const T_Send& sendData){
178  // Uri destUri = getVAddrUri(context, destVAddr);
179  // mpi::request request = context.comm.isend(destUri, tag, sendData.data(), sendData.size());
180  // return Event(request);
181 
182  // }
183 
184 
197  // template <typename T_Send>
198  // void send(const VAddr destVAddr, const Tag tag, const Context context, const T_Send& sendData){
199  // Uri destUri = getVAddrUri(context, destVAddr);
200  // context.comm.send(destUri, tag, sendData.data(), sendData.size());
201  // }
202 
203 
219  // template <typename T_Recv>
220  // Event asyncRecv(const VAddr srcVAddr, const Tag tag, const Context context, T_Recv& recvData){
221  // Uri srcUri = getVAddrUri(context, srcVAddr);
222  // mpi::request request = context.comm.irecv(srcUri, tag, recvData.data(), recvData.size());
223  // return Event(request);
224  // }
225 
226 
239  // template <typename T_Recv>
240  // void recv(const VAddr srcVAddr, const Tag tag, const Context context, T_Recv& recvData){
241  // Uri srcUri = getVAddrUri(context, srcVAddr);
242  // context.comm.recv(srcUri, tag, recvData.data(), recvData.size());
243 
244  // }
247  /************************************************************************/
268  // template <typename T_Send, typename T_Recv>
269  // void gather(const VAddr rootVAddr, const Context context, const T_Send& sendData, T_Recv& recvData){
270  // Uri rootUri = getVAddrUri(context, rootVAddr);
271  // mpi::gather(context.comm, sendData.data(), sendData.size(), recvData, rootUri);
272  // }
273 
274 
297  // template <typename T_Send, typename T_Recv>
298  // void gatherVar(const VAddr rootVAddr, const Context context, const T_Send& sendData, T_Recv& recvData, std::vector<unsigned>& recvCount){
299  // // Retrieve number of elements each peer sends
300  // recvCount.resize(context.size());
301  // std::array<unsigned, 1> nElements{{(unsigned)sendData.size()}};
302  // allGather(context, nElements, recvCount);
303  // recvData.resize(std::accumulate(recvCount.begin(), recvCount.end(), 0U));
304 
305  // Uri rootUri = getVAddrUri(context, rootVAddr);
306  // int rdispls[context.size()];
307 
308  // // Create offset map
309  // unsigned offset = 0;
310  // for (unsigned i=0; i < context.size(); ++i) {
311  // rdispls[i] = offset;
312  // offset += recvCount[i];
313 
314  // }
315 
316  // // Gather data with varying size
317  // MPI_Gatherv(const_cast<typename T_Send::value_type*>(sendData.data()), sendData.size(),
318  // mpi::get_mpi_datatype<typename T_Send::value_type>(*(sendData.data())),
319  // const_cast<typename T_Recv::value_type*>(recvData.data()),
320  // const_cast<int*>((int*)recvCount.data()), rdispls,
321  // mpi::get_mpi_datatype<typename T_Recv::value_type>(*(recvData.data())),
322  // rootUri, context.comm);
323 
324 
325  // }
326 
327 
337  template <typename T_Send, typename T_Recv>
338  void allGather(const Context context, const T_Send& sendData, T_Recv& recvData){
339  mpi::all_gather(context.comm, sendData.data(), sendData.size(), recvData.data());
340 
341  }
342 
343 
354  // template <typename T_Send, typename T_Recv>
355  // void allGatherVar(const Context context, const T_Send& sendData, T_Recv& recvData, std::vector<unsigned>& recvCount){
356  // // Retrieve number of elements each peer sends
357  // recvCount.resize(context.size());
358  // allGather(context, std::array<unsigned, 1>{{(unsigned)sendData.size()}}, recvCount);
359  // recvData.resize(std::accumulate(recvCount.begin(), recvCount.end(), 0U));
360 
361  // int rdispls[context.size()];
362 
363  // // Create offset map
364  // unsigned offset = 0;
365  // for (unsigned i=0; i < context.size(); ++i) {
366  // rdispls[i] = offset;
367  // offset += recvCount[i];
368 
369  // }
370 
371  // // Gather data with varying size
372  // MPI_Allgatherv(const_cast<typename T_Send::value_type*>(sendData.data()), sendData.size(),
373  // mpi::get_mpi_datatype<typename T_Send::value_type>(*(sendData.data())),
374  // const_cast<typename T_Recv::value_type*>(recvData.data()),
375  // const_cast<int*>((int*)recvCount.data()), rdispls,
376  // mpi::get_mpi_datatype<typename T_Recv::value_type>(*(recvData.data())),
377  // context.comm);
378 
379 
380  // }
381 
382 
395  // template <typename T_Send, typename T_Recv>
396  // void scatter(const VAddr rootVAddr, const Context context, const T_Send& sendData, T_Recv& recvData){
397  // Uri rootUri = getVAddrUri(context, rootVAddr);
398  // mpi::scatter(context.comm, sendData.data(), recvData.data(), recvData.size(), rootUri);
399 
400  // }
401 
402 
415  // template <typename T_Send, typename T_Recv>
416  // void allToAll(const Context context, const T_Send& sendData, T_Recv& recvData){
417  // unsigned elementsPerPeer = sendData.size() / context.size();
418 
419  // mpi::all_to_all(context.comm, sendData.data(), elementsPerPeer, recvData.data());
420 
421  // }
422 
423 
440  // template <typename T_Send, typename T_Recv, typename T_Op>
441  // void reduce(const VAddr rootVAddr, const Context context, const T_Op op, const T_Send& sendData, const T_Recv& recvData){
442  // Uri rootUri = getVAddrUri(context, rootVAddr);
443  // mpi::reduce(context.comm, sendData.data(), sendData.size(), recvData.data(), op, rootUri);
444 
445  // }
446 
459  template <typename T_Send, typename T_Recv, typename T_Op>
460  void allReduce(const Context context, T_Op op, const T_Send& sendData, T_Recv& recvData){
461  mpi::all_reduce(context.comm, sendData.data(), sendData.size(), recvData.data(), op);
462 
463  }
464 
465 
478  // template <typename T_SendRecv>
479  // void broadcast(const VAddr rootVAddr, const Context context, const T_SendRecv& data){
480  // Uri rootUri = uriMap.at(context.getID()).at(rootVAddr);
481  // mpi::broadcast(context.comm, data.data(), data.size(), rootUri);
482  // }
483 
484 
490  // void synchronize(const Context context){
491  // context.comm.barrier();
492  // }
493 
494 
502  // void synchronize(){
503  // synchronize(getGlobalContext());
504  // }
508  /*************************************************************************/
519  Context createContext(const std::vector<VAddr> vAddrs, const Context oldContext){
520  assert(vAddrs.size() > 0);
521 
522  VAddr myVAddr = oldContext.getVAddr();
523  bool isPartOfNewContext = false;
524 
525  for(VAddr vAddr : vAddrs){
526  if(vAddr == myVAddr){
527  isPartOfNewContext = true;
528  }
529 
530  }
531 
532  mpi::communicator newComm = oldContext.comm.split(isPartOfNewContext);
533 
534  if(isPartOfNewContext){
535  std::array<Uri, 1> uri;
536 
537  Context newContext(++contextCount, newComm);
538  uri[0] = newContext.getVAddr();
539 
540  // Update UriMap
541  uriMap.push_back(std::vector<Uri>(newContext.size()));
542  std::vector<Uri> otherUris(newContext.size());
543 
544  allGather(newContext, uri, otherUris);
545 
546  std::copy(otherUris.begin(), otherUris.end(), uriMap[newContext.getID()].begin());
547 
548  return newContext;
549 
550  }
551  else {
552  // return invalid context
553  // for peers not anymore included
554  return Context();
555 
556  }
557 
558  }
559 
560 
566  return initialContext;
567  }
570  private:
571 
572  /***************************************************************************
573  *
574  * @name Private Member
575  *
576  ***************************************************************************/
577  ContextID contextCount;
578  std::vector<std::vector<Uri>> uriMap;
579  Context initialContext;
580  mpi::environment env;
581 
582  /***************************************************************************
583  *
584  * @name Helper Functions
585  *
586  ***************************************************************************/
587 
588 
589  void error(VAddr vAddr, std::string msg){
590  std::cout << "[" << vAddr << "] " << msg;
591 
592  }
593 
599  template <typename T_Context>
600  inline Uri getVAddrUri(const T_Context context, const VAddr vAddr){
601  Uri uri = 0;
602  try {
603  uri = uriMap.at(context.getID()).at(vAddr);
604 
605  } catch(const std::out_of_range& e){
606  std::stringstream errorStream;
607  errorStream << "MPI::getVAddrUri::" << e.what()<< " : Communicator with ID " << vAddr << " is not part of the context " << context.getID() << std::endl;
608  error(context.getID(), errorStream.str());
609  exit(1);
610  }
611 
612  return uri;
613  }
614 
615  };
616 
617  } // namespace communicationPolicy
618 
619 } // namespace graybat
Context createContext(const std::vector< VAddr > vAddrs, const Context oldContext)
Creates a new context from peer ids of an oldContext
Definition: MinBMPI.hpp:519
void allReduce(const Context context, T_Op op, const T_Send &sendData, T_Recv &recvData)
Collects sendData from all peers of the context. Size of sendData can vary in size. The data is received by every peer in the context.
Definition: MinBMPI.hpp:460
Definition: chain.cpp:31
void allGather(const Context context, const T_Send &sendData, T_Recv &recvData)
Collects sendData from all peers of the context and transmits it as a list to the peer with rootVAddr...
Definition: MinBMPI.hpp:338
Implementation of the Cage communicationPolicy interface based on the MPI implementation boost::mpi...
Definition: MinBMPI.hpp:35
Context getGlobalContext()
Returns the context that contains all peers.
Definition: MinBMPI.hpp:565
A context represents a set of peers which are able to communicate with each other.
Definition: MinBMPI.hpp:41
Definition: BiStar.hpp:8
An event is returned by non-blocking communication operations and can be asked whether an operation h...
Definition: MinBMPI.hpp:98