16 #include <boost/mpi/environment.hpp>
17 #include <boost/mpi/communicator.hpp>
18 #include <boost/mpi/collectives.hpp>
19 #include <boost/mpi/datatype.hpp>
20 #include <boost/optional.hpp>
21 namespace mpi = boost::mpi;
27 #include <graybat/utils/serialize_tuple.hpp>
28 #include <graybat/communicationPolicy/bmpi/Context.hpp>
29 #include <graybat/communicationPolicy/bmpi/Event.hpp>
30 #include <graybat/communicationPolicy/bmpi/Config.hpp>
31 #include <graybat/communicationPolicy/Base.hpp>
32 #include <graybat/communicationPolicy/Traits.hpp>
36 namespace communicationPolicy {
71 using Tag =
typename graybat::communicationPolicy::Tag<BMPI>;
72 using ContextID =
typename graybat::communicationPolicy::ContextID<BMPI>;
73 using MsgType =
typename graybat::communicationPolicy::MsgType<BMPI>;
74 using VAddr =
typename graybat::communicationPolicy::VAddr<BMPI>;
75 using Context =
typename graybat::communicationPolicy::Context<BMPI>;
76 using Event =
typename graybat::communicationPolicy::Event<BMPI>;
77 using Config =
typename graybat::communicationPolicy::Config<BMPI>;
80 BMPI(Config
const config) :contextCount(0),
82 initialContext(contextCount, mpi::communicator()){
84 uriMap.push_back(std::vector<Uri>());
86 for(
unsigned i = 0; i < initialContext.size(); ++i){
87 uriMap.back().push_back(i);
115 template <
typename T_Send>
116 void send(
const VAddr destVAddr,
const Tag tag,
const Context context,
const T_Send& sendData){
117 Uri destUri = getVAddrUri(context, destVAddr);
118 context.comm.send(destUri, tag, sendData.data(), sendData.size());
136 template <
typename T_Send>
137 Event
asyncSend(
const VAddr destVAddr,
const Tag tag,
const Context context,
const T_Send& sendData){
138 Uri destUri = getVAddrUri(context, destVAddr);
139 mpi::request request = context.comm.isend(destUri, tag, sendData.data(), sendData.size());
140 return Event(request);
155 template <
typename T_Recv>
156 void recv(
const VAddr srcVAddr,
const Tag tag,
const Context context, T_Recv& recvData){
157 Uri srcUri = getVAddrUri(context, srcVAddr);
158 context.comm.recv(srcUri, tag, recvData.data(), recvData.size());
162 template <
typename T_Recv>
163 Event
recv(
const Context context, T_Recv& recvData){
167 auto status = context.comm.probe();
168 context.comm.recv(status.source(), status.tag(), recvData.data(), recvData.size());
169 return Event(status);
193 template <
typename T_Recv>
194 Event
asyncRecv(
const VAddr srcVAddr,
const Tag tag,
const Context context, T_Recv& recvData){
195 Uri srcUri = getVAddrUri(context, srcVAddr);
196 mpi::request request = context.comm.irecv(srcUri, tag, recvData.data(), recvData.size());
197 return Event(request);
224 template <
typename T_Send,
typename T_Recv>
225 void gather(
const VAddr rootVAddr,
const Context context,
const T_Send& sendData, T_Recv& recvData){
226 Uri rootUri = getVAddrUri(context, rootVAddr);
227 mpi::gather(context.comm, sendData.data(), sendData.size(), recvData, rootUri);
253 template <
typename T_Send,
typename T_Recv>
254 void gatherVar(
const VAddr rootVAddr,
const Context context,
const T_Send& sendData, T_Recv& recvData, std::vector<unsigned>& recvCount){
256 recvCount.resize(context.size());
257 std::array<unsigned, 1> nElements{{(unsigned)sendData.size()}};
258 allGather(context, nElements, recvCount);
259 recvData.resize(std::accumulate(recvCount.begin(), recvCount.end(), 0U));
261 Uri rootUri = getVAddrUri(context, rootVAddr);
262 int rdispls[context.size()];
266 for (
unsigned i=0; i < context.size(); ++i) {
268 offset += recvCount[i];
273 MPI_Gatherv(const_cast<typename T_Send::value_type*>(sendData.data()), sendData.size(),
274 mpi::get_mpi_datatype<typename T_Send::value_type>(*(sendData.data())),
275 const_cast<typename T_Recv::value_type*
>(recvData.data()),
276 const_cast<int*>((
int*)recvCount.data()), rdispls,
277 mpi::get_mpi_datatype<typename T_Recv::value_type>(*(recvData.data())),
278 rootUri, context.comm);
293 template <
typename T_Send,
typename T_Recv>
294 void allGather(Context context,
const T_Send& sendData, T_Recv& recvData){
295 mpi::all_gather(context.comm, sendData.data(), sendData.size(), recvData.data());
310 template <
typename T_Send,
typename T_Recv>
311 void allGatherVar(
const Context context,
const T_Send& sendData, T_Recv& recvData, std::vector<unsigned>& recvCount){
313 recvCount.resize(context.size());
314 allGather(context, std::array<unsigned, 1>{{(unsigned)sendData.size()}}, recvCount);
315 recvData.resize(std::accumulate(recvCount.begin(), recvCount.end(), 0U));
317 int rdispls[context.size()];
321 for (
unsigned i=0; i < context.size(); ++i) {
323 offset += recvCount[i];
328 MPI_Allgatherv(const_cast<typename T_Send::value_type*>(sendData.data()), sendData.size(),
329 mpi::get_mpi_datatype<typename T_Send::value_type>(*(sendData.data())),
330 const_cast<typename T_Recv::value_type*
>(recvData.data()),
331 const_cast<int*>((
int*)recvCount.data()), rdispls,
332 mpi::get_mpi_datatype<typename T_Recv::value_type>(*(recvData.data())),
351 template <
typename T_Send,
typename T_Recv>
352 void scatter(
const VAddr rootVAddr,
const Context context,
const T_Send& sendData, T_Recv& recvData){
353 Uri rootUri = getVAddrUri(context, rootVAddr);
354 mpi::scatter(context.comm, sendData.data(), recvData.data(), recvData.size(), rootUri);
371 template <
typename T_Send,
typename T_Recv>
372 void allToAll(
const Context context,
const T_Send& sendData, T_Recv& recvData){
373 unsigned elementsPerPeer = sendData.size() / context.size();
375 mpi::all_to_all(context.comm, sendData.data(), elementsPerPeer, recvData.data());
396 template <
typename T_Send,
typename T_Recv,
typename T_Op>
397 void reduce(
const VAddr rootVAddr,
const Context context,
const T_Op op,
const T_Send& sendData, T_Recv& recvData){
398 Uri rootUri = getVAddrUri(context, rootVAddr);
399 mpi::reduce(context.comm, sendData.data(), sendData.size(), recvData.data(), op, rootUri);
415 template <
typename T_Send,
typename T_Recv,
typename T_Op>
416 void allReduce(
const Context context, T_Op op,
const T_Send& sendData, T_Recv& recvData){
417 mpi::all_reduce(context.comm, sendData.data(), sendData.size(), recvData.data(), op);
434 template <
typename T_SendRecv>
435 void broadcast(
const VAddr rootVAddr,
const Context context, T_SendRecv& data){
436 Uri rootUri = uriMap.at(context.getID()).at(rootVAddr);
437 mpi::broadcast(context.comm, data.data(), data.size(), rootUri);
447 context.comm.barrier();
476 mpi::communicator newComm = oldContext.comm.split(isMember);
479 Context newContext(++contextCount, newComm);
480 std::array<Uri, 1> uri {{ (int) newContext.getVAddr() }};
481 uriMap.push_back(std::vector<Uri>(newContext.size()));
483 std::vector<Event> events;
485 for(
unsigned i = 0; i < newContext.size(); ++i){
486 events.push_back(Event(newContext.comm.isend(i, 0, uri.data(), 1)));
490 for(
unsigned i = 0; i < newContext.size(); ++i){
491 std::array<Uri, 1> otherUri {{ 0 }};
492 newContext.comm.recv(i, 0, otherUri.data(), 1);
493 uriMap.at(newContext.getID()).at(i) = otherUri[0];
497 for(
unsigned i = 0; i < events.size(); ++i){
498 events.back().wait();
519 return initialContext;
530 ContextID contextCount;
531 std::vector<std::vector<Uri>> uriMap;
532 Context initialContext;
533 mpi::environment env;
542 void error(VAddr vAddr, std::string msg){
543 std::cout <<
"[" << vAddr <<
"] " << msg;
552 template <
typename T_Context>
553 inline Uri getVAddrUri(
const T_Context context,
const VAddr vAddr){
556 uri = uriMap.at(context.getID()).at(vAddr);
558 }
catch(
const std::out_of_range& e){
559 std::stringstream errorStream;
560 errorStream <<
"MPI::getVAddrUri::" << e.what()<<
" : Communicator with ID " << vAddr <<
" is not part of the context " << context.getID() << std::endl;
561 error(context.getID(), errorStream.str());
Event asyncSend(const VAddr destVAddr, const Tag tag, const Context context, const T_Send &sendData)
Non blocking transmission of a message sendData to peer with virtual address destVAddr.
Definition: BMPI.hpp:137
Context splitContext(const bool isMember, const Context oldContext)
Creates a new context with all peers that declared isMember as true.
Definition: BMPI.hpp:475
void allToAll(const Context context, const T_Send &sendData, T_Recv &recvData)
Distributes sendData of all peer in the context to all peers in the context. Every peer will receive ...
Definition: BMPI.hpp:372
void allGather(Context context, const T_Send &sendData, T_Recv &recvData)
Collects sendData from all members of the context and transmits it as a list to every peer in the con...
Definition: BMPI.hpp:294
void allReduce(const Context context, T_Op op, const T_Send &sendData, T_Recv &recvData)
Performs a reduction with a binary operator op on all sendData elements from all peers whithin the co...
Definition: BMPI.hpp:416
void synchronize()
Synchronizes all peers within the globalContext in the programm execution (barrier).
Definition: BMPI.hpp:458
void send(const VAddr destVAddr, const Tag tag, const Context context, const T_Send &sendData)
Blocking transmission of a message sendData to peer with virtual address destVAddr.
Definition: BMPI.hpp:116
Definition: Traits.hpp:10
Definition: Traits.hpp:16
An event is returned by non-blocking communication operations and can be asked whether an operation h...
Definition: Event.hpp:20
void reduce(const VAddr rootVAddr, const Context context, const T_Op op, const T_Send &sendData, T_Recv &recvData)
Performs a reduction with a binary operator op on all sendData elements from all peers whithin the co...
Definition: BMPI.hpp:397
void synchronize(const Context context)
Synchronizes all peers within context to the same point in the programm execution (barrier)...
Definition: BMPI.hpp:446
void broadcast(const VAddr rootVAddr, const Context context, T_SendRecv &data)
Send sendData from peer rootVAddr to all peers in context. Every peer will receive the same data...
Definition: BMPI.hpp:435
void allGatherVar(const Context context, const T_Send &sendData, T_Recv &recvData, std::vector< unsigned > &recvCount)
Collects sendData from all peers of the context. Size of sendData can vary in size. The data is received by every peer in the context.
Definition: BMPI.hpp:311
Implementation of the Cage communicationPolicy interface based on the MPI implementation boost::mpi...
Definition: BMPI.hpp:68
Definition: Traits.hpp:13
Event asyncRecv(const VAddr srcVAddr, const Tag tag, const Context context, T_Recv &recvData)
Non blocking receive of a message recvData from peer with virtual address srcVAddr.
Definition: BMPI.hpp:194
void gatherVar(const VAddr rootVAddr, const Context context, const T_Send &sendData, T_Recv &recvData, std::vector< unsigned > &recvCount)
Collects sendData from all members of the context with varying size and transmits it as a list to pee...
Definition: BMPI.hpp:254
Definition: Context.hpp:16
Context getGlobalContext()
Returns the context that contains all peers.
Definition: BMPI.hpp:518
void gather(const VAddr rootVAddr, const Context context, const T_Send &sendData, T_Recv &recvData)
Collects sendData from all peers of the context and transmits it as a list to the peer with rootVAddr...
Definition: BMPI.hpp:225
void recv(const VAddr srcVAddr, const Tag tag, const Context context, T_Recv &recvData)
Blocking receive of a message recvData from peer with virtual address srcVAddr.
Definition: BMPI.hpp:156
void scatter(const VAddr rootVAddr, const Context context, const T_Send &sendData, T_Recv &recvData)
Distributes sendData from peer rootVAddr to all peers in context. Every peer will receive different d...
Definition: BMPI.hpp:352