Graybat  1.1
Graph Approach for Highly Generic Communication Schemes Based on Adaptive Topologies
Cage.hpp
1 #pragma once
2 
3 // CLIB
4 #include <assert.h> /* assert */
5 #include <cstddef> /* nullptr_t */
6 
7 // STL
8 #include <map> /* map */
9 #include <exception> /* std::out_of_range */
10 #include <algorithm> /* std::max */
11 #include <stdexcept> /* std::runtime_error */
12 #include <tuple> /* std::tie */
13 #include <memory> /* std::shared_memory */
14 #include <sstream> /* std::stringstream */
15 
16 // GRAYBAT
17 #include <graybat/utils/exclusivePrefixSum.hpp> /* exclusivePrefixSum */
18 #include <graybat/Vertex.hpp> /* CommunicationVertex */
19 #include <graybat/Edge.hpp> /* CommunicationEdge */
20 #include <graybat/pattern/None.hpp> /* graybatt::pattern::None */
21 #include <graybat/communicationPolicy/Traits.hpp>
22 #include <graybat/graphPolicy/Traits.hpp>
23 
24 namespace graybat {
25 
26 
27 
28 
29  /************************************************************************/
45  template <typename T_CommunicationPolicy, typename T_GraphPolicy>
46  struct Cage {
47  using CommunicationPolicy = T_CommunicationPolicy;
48  using GraphPolicy = T_GraphPolicy;
50 
51  using VAddr = graybat::communicationPolicy::VAddr<CommunicationPolicy>;
52  using Context = graybat::communicationPolicy::Context<CommunicationPolicy>;
53  using Event = graybat::communicationPolicy::Event<CommunicationPolicy>;
54  using CPConfig = graybat::communicationPolicy::Config<CommunicationPolicy>;
57 
58  using EdgeDescription = graybat::graphPolicy::EdgeDescription<GraphPolicy>;
59  using GraphDescription = graybat::graphPolicy::GraphDescription<GraphPolicy>;
60  using VertexID = graybat::graphPolicy::VertexID;
61  using EdgeID = graybat::graphPolicy::EdgeID;
62  using GraphID = graybat::graphPolicy::GraphID;
63  using Peer = size_t;
64 
65  template <class T_Functor>
66  Cage(CPConfig const cpConfig, T_Functor graphFunctor) :
67  comm(cpConfig),
68  graph(GraphPolicy(graphFunctor())){
69 
70  }
71 
72  Cage(CPConfig const cpConfig) :
73  comm(cpConfig),
74  graph(GraphPolicy(graybat::pattern::None<GraphPolicy>()())){
75 
76  }
77 
78  Cage() :
79  comm(CPConfig()),
80  graph(GraphPolicy(graybat::pattern::None<GraphPolicy>()())){
81 
82  }
83 
84  ~Cage(){
85  //std::cout << "Destruct Cage" << std::endl;
86  }
87 
88 
89  /***************************************************************************
90  *
91  * MEMBER
92  *
93  ***************************************************************************/
94  CommunicationPolicy comm;
95  GraphPolicy graph;
96  Context graphContext;
97  std::vector<Vertex> hostedVertices;
98 
99 
100  /***************************************************************************
101  *
102  * MAPS
103  *
104  ***************************************************************************/
105  // Maps vertices to its hosts
106  std::map<VertexID, VAddr> vertexMap;
107 
108  // Each graph is mapped to a context of the peer
109  std::map<GraphID, Context> graphMap;
110 
111  // List of vertices of the hosts
112  std::map<VAddr, std::vector<Vertex> > peerMap;
113 
114 
115  /***********************************************************************/
122  template <class T_Functor>
123  void setGraph(T_Functor graphFunctor){
124  graph = GraphPolicy(graphFunctor());
125 
126  }
127 
128  std::vector<Vertex> getVertices(){
129  using Iter = graybat::graphPolicy::AllVertexIter<GraphPolicy>;
130  std::vector<Vertex> vertices;
131 
132  Iter vi_first, vi_last;
133  std::tie(vi_first, vi_last) = graph.getVertices();
134 
135  while(vi_first != vi_last){
136  vertices.push_back(Vertex(graph.getVertexProperty(*vi_first).first,
137  graph.getVertexProperty(*vi_first).second,
138  *this));
139  vi_first++;
140  }
141 
142  return vertices;
143  }
144 
145 
146  Vertex getVertex(const VertexID vertexID){
147  typedef typename GraphPolicy::AllVertexIter Iter;
148  std::vector<Vertex> vertices;
149 
150  Iter vi_first, vi_last;
151  std::tie(vi_first, vi_last) = graph.getVertices();
152 
153  std::advance(vi_first, vertexID);
154 
155  return Vertex(graph.getVertexProperty(*vi_first).first,
156  graph.getVertexProperty(*vi_first).second,
157  *this);
158 
159  }
160 
161  Edge getEdge(const Vertex source, const Vertex target){
162  std::pair<EdgeID, bool> edge = graph.getEdge(source.id, target.id);
163 
164  if(edge.second){
165  return Edge(graph.getEdgeProperty(edge.first).first,
166  getVertex(graph.getEdgeSource(edge.first)),
167  getVertex(graph.getEdgeTarget(edge.first)),
168  graph.getEdgeProperty(edge.first).second,
169  *this);
170  }
171  else {
172  throw std::runtime_error("Edge between does not exist");
173  }
174 
175  }
176 
177 
178  std::vector<Vertex> getAdjacentVertices(const Vertex &v){
179  typedef typename GraphPolicy::AdjacentVertexIter Iter;
180 
181  std::vector<Vertex> adjacentVertices;
182 
183  Iter avi_first, avi_last;
184  std::tie(avi_first, avi_last) = graph.getAdjacentVertices(v.id);
185 
186  while(avi_first != avi_last){
187  adjacentVertices.push_back(Vertex(graph.getVertexProperty(*avi_first).first,
188  graph.getVertexProperty(*avi_first).second,
189  *this));
190  avi_first++;
191  }
192 
193  return adjacentVertices;
194 
195  }
196 
197  std::vector<Edge> getOutEdges(const Vertex &v){
198  typedef typename GraphPolicy::OutEdgeIter Iter;
199  std::vector<Edge> outEdges;
200 
201  Iter oi_first, oi_last;
202  std::tie(oi_first, oi_last) = graph.getOutEdges(v.id);
203 
204  while(oi_first != oi_last){
205  outEdges.push_back(Edge(graph.getEdgeProperty(*oi_first).first,
206  getVertex(graph.getEdgeSource(*oi_first)),
207  getVertex(graph.getEdgeTarget(*oi_first)),
208  graph.getEdgeProperty(*oi_first).second,
209  *this));
210  oi_first++;
211 
212  }
213 
214  return outEdges;
215 
216  }
217 
218  std::vector<Edge> getInEdges(const Vertex v){
219  typedef typename GraphPolicy::InEdgeIter Iter;
220  std::vector<Edge> inEdges;
221 
222  Iter ii_first, ii_last;
223  std::tie(ii_first, ii_last) = graph.getInEdges(v.id);
224 
225  while(ii_first != ii_last){
226  inEdges.push_back(Edge(graph.getEdgeProperty(*ii_first).first,
227  getVertex(graph.getEdgeSource(*ii_first)),
228  getVertex(graph.getEdgeTarget(*ii_first)),
229  graph.getEdgeProperty(*ii_first).second,
230  *this));
231  ii_first++;
232 
233  }
234 
235  return inEdges;
236 
237 
238  }
241  /***********************************************************************/
259  template<class T_Functor>
260  void distribute(T_Functor distFunctor){
261  hostedVertices = distFunctor(comm.getGlobalContext().getVAddr(),
262  comm.getGlobalContext().size(),
263  *this);
264 
265  announce(hostedVertices);
266  }
267 
268 
293  void announce(const std::vector<Vertex> vertices, const bool global=true){
294  // Get old context from graph
295  Context oldContext = graphContext;
296 
297  if(global){
298  oldContext = comm.getGlobalContext();
299  }
300 
301  assert(oldContext.valid());
302 
303  graphContext = comm.splitContext(vertices.size(), oldContext);
304 
305  // Each peer announces the vertices it hosts
306  if(graphContext.valid()){
307  std::array<unsigned, 1> nVertices {{static_cast<unsigned>(vertices.size())}};
308  std::vector<unsigned> vertexIDs;
309 
310  std::for_each(vertices.begin(), vertices.end(), [&vertexIDs](Vertex v){vertexIDs.push_back(v.id);});
311 
312  // Send hostedVertices to all other peers
313  for(unsigned vAddr = 0; vAddr < graphContext.size(); ++vAddr){
314  assert(nVertices[0] != 0);
315  comm.asyncSend(vAddr, 0, graphContext, nVertices);
316  comm.asyncSend(vAddr, 0, graphContext, vertexIDs);
317  }
318 
319  // Recv hostedVertices from all other peers
320  for(unsigned vAddr = 0; vAddr < graphContext.size(); ++vAddr){
321  std::vector<Vertex> remoteVertices;
322  std::array<unsigned, 1> nVertices {{ 0 }};
323  comm.recv(vAddr, 0, graphContext, nVertices);
324  std::vector<unsigned> vertexIDs(nVertices[0]);
325  comm.recv(vAddr, 0, graphContext, vertexIDs);
326 
327  for(unsigned u : vertexIDs){
328  vertexMap[u] = vAddr;
329  remoteVertices.push_back(Cage::getVertex(u));
330  }
331  peerMap[vAddr] = remoteVertices;
332  }
333 
334  }
335 
336  }
337 
338  template <typename T>
339  struct maximum {
340 
341  T operator()(const T a, const T b){
342  return std::max(a, b);
343  }
344 
345  };
346 
347 
359  VAddr locateVertex(Vertex vertex){
360  auto it = vertexMap.find(vertex.id);
361  if(it != vertexMap.end()){
362  return (*it).second;
363  }
364  else {
365  std::stringstream errorMsg;
366  errorMsg << "[" << comm.getGlobalContext().getVAddr() << "] No host of vertex " << vertex.id << " known.";
367  throw std::runtime_error(errorMsg.str());
368  }
369 
370  }
371 
377  std::vector<Vertex> getHostedVertices(const VAddr vAddr){
378  return peerMap[vAddr];
379 
380  }
381 
387  bool peerHostsVertex(Vertex vertex){
388  VAddr vaddr = graphContext.getVAddr();
389 
390  for(Vertex &v : getHostedVertices(vaddr)){
391  if(vertex.id == v.id)
392  return true;
393  }
394  return false;
395 
396 
397  }
398 
399  std::vector<Peer> getPeers(){
400  unsigned nPeers = comm.getGlobalContext().size();
401  return std::vector<Peer>(nPeers);
402  }
403 
406  /***********************************************************************/
423  template <typename T>
424  void send(const Edge edge, const T& data){
425  VAddr destVAddr = locateVertex(edge.target);
426  comm.send(destVAddr, edge.id, graphContext, data);
427 
428  }
429 
430 
442  template <typename T>
443  void send(const Edge edge, const T& data, std::vector<Event> &events){
444  //std::cout << "send cage:" << edge.target.id << " " << edge.id << std::endl;
445  VAddr destVAddr = locateVertex(edge.target);
446  events.push_back(comm.asyncSend(destVAddr, edge.id, graphContext, data));
447 
448  }
449 
450 
459  template <typename T>
460  void recv(const Edge edge, T& data){
461  //std::cout << "recv cage:" << edge.source.id << " " << edge.id << std::endl;
462  VAddr srcVAddr = locateVertex(edge.source);
463  comm.recv(srcVAddr, edge.id, graphContext, data);
464 
465  }
466 
467  template <typename T>
468  Edge recv(T& data){
469  Event event = comm.recv(graphContext, data);
470 
471  return Edge(graph.getEdgeProperty(event.getTag()).first,
472  getVertex(graph.getEdgeSource(event.getTag())),
473  getVertex(graph.getEdgeTarget(event.getTag())),
474  graph.getEdgeProperty(event.getTag()).second,
475  *this);
476  }
477 
478 
488  template <typename T>
489  void recv(const Edge edge, T& data, std::vector<Event> &events){
490  VAddr srcVAddr = locateVertex(edge.source);
491  events.push_back(comm.asyncRecv(srcVAddr, edge.id, graphContext, data));
492 
493  }
494 
497  /**********************************************************************/
505  template <typename T_Data, typename Op>
506  void reduce(const Vertex rootVertex, const Vertex srcVertex, Op op, const std::vector<T_Data> sendData, std::vector<T_Data>& recvData){
507  static std::vector<T_Data> reduce;
508  static std::vector<T_Data>* rootRecvData;
509  static unsigned vertexCount = 0;
510  static bool hasRootVertex = false;
511 
512 
513  VAddr rootVAddr = locateVertex(rootVertex);
514  VAddr srcVAddr = locateVertex(srcVertex);
515  Context context = graphContext;
516  std::vector<Vertex> vertices = getHostedVertices(srcVAddr);
517 
518  vertexCount++;
519 
520  if(reduce.empty()){
521  reduce = std::vector<T_Data>(sendData.size(), 0);
522  }
523 
524  // Reduce locally
525  std::transform(reduce.begin(), reduce.end(), sendData.begin(), reduce.begin(), op);
526 
527  // Remember pointer of recvData from rootVertex
528  if(rootVertex.id == srcVertex.id){
529  hasRootVertex = true;
530  rootRecvData = &recvData;
531  }
532 
533  // Finally start reduction
534  if(vertexCount == vertices.size()){
535 
536  if(hasRootVertex){
537  comm.reduce(rootVAddr, context, op, reduce, *rootRecvData);
538  }
539  else{
540  comm.reduce(rootVAddr, context, op, reduce, recvData);
541 
542  }
543 
544  reduce.clear();
545  vertexCount = 0;
546  }
547  assert(vertexCount <= vertices.size());
548 
549  }
550 
551  template <typename T_Data, typename T_Recv, typename Op>
552  void allReduce(const Vertex srcVertex, Op op, const std::vector<T_Data> sendData, T_Recv& recvData){
553 
554  static std::vector<T_Data> reduce;
555  static unsigned vertexCount = 0;
556  static std::vector<T_Recv*> recvDatas;
557 
558  VAddr srcVAddr = locateVertex(srcVertex);
559  Context context = graphContext;
560  std::vector<Vertex> vertices = getHostedVertices(srcVAddr);
561 
562  recvDatas.push_back(&recvData);
563 
564  vertexCount++;
565 
566  if(reduce.empty()){
567  reduce = std::vector<T_Data>(sendData.size(), 0);
568  }
569 
570  // Reduce locally
571  std::transform(reduce.begin(), reduce.end(), sendData.begin(), reduce.begin(), op);
572 
573  // Finally start reduction
574  if(vertexCount == vertices.size()){
575 
576  comm.allReduce(context, op, reduce, *(recvDatas[0]));
577 
578  // Distribute Received Data to Hosted Vertices
579  for(unsigned i = 1; i < recvDatas.size(); ++i){
580  std::copy(recvDatas[0]->begin(), recvDatas[0]->end(), recvDatas[i]->begin());
581 
582  }
583 
584 
585  reduce.clear();
586  vertexCount = 0;
587  }
588  assert(vertexCount <= vertices.size());
589 
590  }
591 
592 
593  // This function is the hell
594  // TODO: Simplify !!!
595  // TODO: Better software design required !!!
596  template <typename T_Send, typename T_Recv>
597  void gather(const Vertex rootVertex, const Vertex srcVertex, const T_Send sendData, T_Recv& recvData, const bool reorder){
598  typedef typename T_Send::value_type SendValueType;
599  typedef typename T_Recv::value_type RecvValueType;
600 
601  static std::vector<SendValueType> gather;
602  static T_Recv* rootRecvData = NULL;
603  static bool peerHostsRootVertex = false;
604  static unsigned nGatherCalls = 0;
605 
606  nGatherCalls++;
607 
608 
609  VAddr rootVAddr = locateVertex(rootVertex);
610  Context context = graphContext;
611 
612  // Insert data of srcVertex to the end of the gather vector
613  gather.insert(gather.end(), sendData.begin(), sendData.end());
614 
615  // Store recv pointer of rootVertex
616  if(srcVertex.id == rootVertex.id){
617  rootRecvData = &recvData;
618  peerHostsRootVertex = true;
619  }
620 
621  if(nGatherCalls == hostedVertices.size()){
622  std::vector<unsigned> recvCount;
623  if(peerHostsRootVertex){
624  comm.gatherVar(rootVAddr, context, gather, *rootRecvData, recvCount);
625 
626  // Reorder the received data, so that the data
627  // is in vertex id order. This operation is no
628  // sorting since the mapping is known before.
629  if(reorder){
630  std::vector<RecvValueType> recvDataReordered(recvData.size());
631  Cage::reorder(*rootRecvData, recvCount, recvDataReordered);
632  std::copy(recvDataReordered.begin(), recvDataReordered.end(), rootRecvData->begin());
633 
634  }
635 
636  }
637  else {
638  comm.gatherVar(rootVAddr, context, gather, recvData, recvCount);
639  }
640 
641  gather.clear();
642  nGatherCalls = 0;
643 
644  }
645 
646  }
647 
648 
654  template <typename T_Send, typename T_Recv>
655  void allGather(const Vertex srcVertex, T_Send sendData, T_Recv& recvData, const bool reorder){
656  typedef typename T_Send::value_type SendValueType;
657  typedef typename T_Recv::value_type RecvValueType;
658 
659  static std::vector<SendValueType> gather;
660  static std::vector<T_Recv*> recvDatas;
661  static unsigned nGatherCalls = 0;
662  nGatherCalls++;
663 
664  VAddr srcVAddr = locateVertex(srcVertex);
665  Context context = graphContext;
666  std::vector<Vertex> vertices = getHostedVertices(srcVAddr);
667 
668  gather.insert(gather.end(), sendData.begin(), sendData.end());
669  recvDatas.push_back(&recvData);
670 
671 
672  if(nGatherCalls == hostedVertices.size()){
673  std::vector<unsigned> recvCount;
674 
675  comm.allGatherVar(context, gather, *(recvDatas[0]), recvCount);
676 
677  // Reordering code
678  if(reorder){
679  std::vector<RecvValueType> recvDataReordered(recvData.size());
680  Cage::reorder(*(recvDatas[0]), recvCount, recvDataReordered);
681  std::copy(recvDataReordered.begin(), recvDataReordered.end(), recvDatas[0]->begin());
682 
683  }
684 
685  // Distribute Received Data to Hosted Vertices
686  //unsigned nElements = std::accumulate(recvCount.begin(), recvCount.end(), 0);
687  for(unsigned i = 1; i < recvDatas.size(); ++i){
688  std::copy(recvDatas[0]->begin(), recvDatas[0]->end(), recvDatas[i]->begin());
689 
690  }
691 
692  gather.clear();
693 
694  }
695 
696  }
697 
707  template <typename T>
708  void spread(const Vertex vertex, const T& data, std::vector<Event> &events){
709  std::vector<Edge> edges = getOutEdges(vertex);
710  for(Edge edge: edges){
711  Cage::send(edge, data, events);
712  }
713  }
714 
723  template <typename T>
724  void spread(const Vertex vertex, const T& data){
725  std::vector<Edge> edges = getOutEdges(vertex);
726  for(Edge edge: edges){
727  Cage::send(edge, data);
728  }
729  }
730 
741  template <typename T>
742  void collect(const Vertex vertex, T& data){
743  std::vector<Edge> edges = getInEdges(vertex);
744  for(unsigned i = 0; i < edges.size(); ++i){
745  unsigned elementsPerEdge = data.size() / edges.size();
746  std::vector<typename T::value_type> elements(elementsPerEdge);
747  Cage::recv(edges[i], elements);
748  std::copy(elements.begin(), elements.end(), data.begin() + (i*elementsPerEdge));
749  }
750 
751  }
752 
753 
754 
755  void synchronize(){
756  comm.synchronize(graphContext);
757 
758  }
759 
760 
761  int ContextID(){
762  return graphContext.getID();
763  }
764 
767  private:
768 
773  template <class T>
774  void reorder(const std::vector<T> &data, const std::vector<unsigned> &recvCount, std::vector<T> &dataReordered){
775  std::vector<unsigned> prefixsum(graphContext.size(), 0);
776 
777  utils::exclusivePrefixSum(recvCount.begin(), recvCount.end(), prefixsum.begin());
778 
779  for(unsigned vAddr = 0; vAddr < graphContext.size(); vAddr++){
780  const std::vector<Vertex> hostedVertices = getHostedVertices(vAddr);
781  const unsigned nElementsPerVertex = recvCount.at(vAddr) / hostedVertices.size();
782 
783  for(unsigned hostVertex_i = 0; hostVertex_i < hostedVertices.size(); hostVertex_i++){
784 
785  unsigned sourceOffset = prefixsum[vAddr] + (hostVertex_i * nElementsPerVertex);
786  unsigned targetOffset = hostedVertices[hostVertex_i].id * nElementsPerVertex;
787 
788  std::copy(data.begin() + sourceOffset,
789  data.begin() + sourceOffset + nElementsPerVertex,
790  dataReordered.begin() + targetOffset);
791  }
792  }
793 
794  }
795 
796  };
797 
798 } // namespace graybat
799 
800 
801 
VAddr locateVertex(Vertex vertex)
Returns the VAddr of the host of vertex in the graph.
Definition: Cage.hpp:359
void spread(const Vertex vertex, const T &data, std::vector< Event > &events)
Spread data from a vertex to all adjacent vertices connected by an outgoing edge (async).
Definition: Cage.hpp:708
The Communication And Graph Environment enables to communicate on basis of a graph with methods of a ...
Definition: Cage.hpp:46
void distribute(T_Functor distFunctor)
Distribution of the graph vertices to the peers of the global context. The distFunctor it the functio...
Definition: Cage.hpp:260
Definition: Vertex.hpp:9
void allGather(const Vertex srcVertex, T_Send sendData, T_Recv &recvData, const bool reorder)
Definition: Cage.hpp:655
void announce(const std::vector< Vertex > vertices, const bool global=true)
Announces vertices of a graph to the network, so that other peers know that these vertices are hosted...
Definition: Cage.hpp:293
bool peerHostsVertex(Vertex vertex)
Returns true if the vertex is hosted by the calling peer otherwise false.
Definition: Cage.hpp:387
Definition: Edge.hpp:8
void spread(const Vertex vertex, const T &data)
Spread data from a vertex to all adjacent vertices connected by an outgoing edge (sync).
Definition: Cage.hpp:724
Definition: Cage.hpp:339
void send(const Edge edge, const T &data)
Synchron transmission of data to the destVertex on edge.
Definition: Cage.hpp:424
Definition: None.hpp:11
std::vector< Vertex > getHostedVertices(const VAddr vAddr)
Opposite operation of locateVertex(). It returns the vertices that are hosted by the peer with vAddr ...
Definition: Cage.hpp:377
void collect(const Vertex vertex, T &data)
Collects data from all incoming edges under the assumption that all vertices send the same number of ...
Definition: Cage.hpp:742
void send(const Edge edge, const T &data, std::vector< Event > &events)
Asynchron transmission of data to the destVertex on edge.
Definition: Cage.hpp:443
void recv(const Edge edge, T &data)
Synchron receive of data from the srcVertex on edge.
Definition: Cage.hpp:460
Definition: BiStar.hpp:8
void recv(const Edge edge, T &data, std::vector< Event > &events)
Asynchron receive of data from the srcVertex on edge.
Definition: Cage.hpp:489