17 #include <graybat/utils/exclusivePrefixSum.hpp>
18 #include <graybat/Vertex.hpp>
19 #include <graybat/Edge.hpp>
20 #include <graybat/pattern/None.hpp>
21 #include <graybat/communicationPolicy/Traits.hpp>
22 #include <graybat/graphPolicy/Traits.hpp>
45 template <
typename T_CommunicationPolicy,
typename T_GraphPolicy>
47 using CommunicationPolicy = T_CommunicationPolicy;
48 using GraphPolicy = T_GraphPolicy;
51 using VAddr = graybat::communicationPolicy::VAddr<CommunicationPolicy>;
52 using Context = graybat::communicationPolicy::Context<CommunicationPolicy>;
53 using Event = graybat::communicationPolicy::Event<CommunicationPolicy>;
54 using CPConfig = graybat::communicationPolicy::Config<CommunicationPolicy>;
58 using EdgeDescription = graybat::graphPolicy::EdgeDescription<GraphPolicy>;
59 using GraphDescription = graybat::graphPolicy::GraphDescription<GraphPolicy>;
60 using VertexID = graybat::graphPolicy::VertexID;
61 using EdgeID = graybat::graphPolicy::EdgeID;
62 using GraphID = graybat::graphPolicy::GraphID;
65 template <
class T_Functor>
66 Cage(CPConfig
const cpConfig, T_Functor graphFunctor) :
68 graph(GraphPolicy(graphFunctor())){
72 Cage(CPConfig
const cpConfig) :
94 CommunicationPolicy comm;
97 std::vector<Vertex> hostedVertices;
106 std::map<VertexID, VAddr> vertexMap;
109 std::map<GraphID, Context> graphMap;
112 std::map<VAddr, std::vector<Vertex> > peerMap;
122 template <
class T_Functor>
123 void setGraph(T_Functor graphFunctor){
124 graph = GraphPolicy(graphFunctor());
128 std::vector<Vertex> getVertices(){
129 using Iter = graybat::graphPolicy::AllVertexIter<GraphPolicy>;
130 std::vector<Vertex> vertices;
132 Iter vi_first, vi_last;
133 std::tie(vi_first, vi_last) = graph.getVertices();
135 while(vi_first != vi_last){
136 vertices.push_back(
Vertex(graph.getVertexProperty(*vi_first).first,
137 graph.getVertexProperty(*vi_first).second,
146 Vertex getVertex(
const VertexID vertexID){
147 typedef typename GraphPolicy::AllVertexIter Iter;
148 std::vector<Vertex> vertices;
150 Iter vi_first, vi_last;
151 std::tie(vi_first, vi_last) = graph.getVertices();
153 std::advance(vi_first, vertexID);
155 return Vertex(graph.getVertexProperty(*vi_first).first,
156 graph.getVertexProperty(*vi_first).second,
162 std::pair<EdgeID, bool> edge = graph.getEdge(source.id, target.id);
165 return Edge(graph.getEdgeProperty(edge.first).first,
166 getVertex(graph.getEdgeSource(edge.first)),
167 getVertex(graph.getEdgeTarget(edge.first)),
168 graph.getEdgeProperty(edge.first).second,
172 throw std::runtime_error(
"Edge between does not exist");
178 std::vector<Vertex> getAdjacentVertices(
const Vertex &v){
179 typedef typename GraphPolicy::AdjacentVertexIter Iter;
181 std::vector<Vertex> adjacentVertices;
183 Iter avi_first, avi_last;
184 std::tie(avi_first, avi_last) = graph.getAdjacentVertices(v.id);
186 while(avi_first != avi_last){
187 adjacentVertices.push_back(
Vertex(graph.getVertexProperty(*avi_first).first,
188 graph.getVertexProperty(*avi_first).second,
193 return adjacentVertices;
197 std::vector<Edge> getOutEdges(
const Vertex &v){
198 typedef typename GraphPolicy::OutEdgeIter Iter;
199 std::vector<Edge> outEdges;
201 Iter oi_first, oi_last;
202 std::tie(oi_first, oi_last) = graph.getOutEdges(v.id);
204 while(oi_first != oi_last){
205 outEdges.push_back(
Edge(graph.getEdgeProperty(*oi_first).first,
206 getVertex(graph.getEdgeSource(*oi_first)),
207 getVertex(graph.getEdgeTarget(*oi_first)),
208 graph.getEdgeProperty(*oi_first).second,
218 std::vector<Edge> getInEdges(
const Vertex v){
219 typedef typename GraphPolicy::InEdgeIter Iter;
220 std::vector<Edge> inEdges;
222 Iter ii_first, ii_last;
223 std::tie(ii_first, ii_last) = graph.getInEdges(v.id);
225 while(ii_first != ii_last){
226 inEdges.push_back(
Edge(graph.getEdgeProperty(*ii_first).first,
227 getVertex(graph.getEdgeSource(*ii_first)),
228 getVertex(graph.getEdgeTarget(*ii_first)),
229 graph.getEdgeProperty(*ii_first).second,
259 template<
class T_Functor>
261 hostedVertices = distFunctor(comm.getGlobalContext().getVAddr(),
262 comm.getGlobalContext().size(),
293 void announce(
const std::vector<Vertex> vertices,
const bool global=
true){
295 Context oldContext = graphContext;
298 oldContext = comm.getGlobalContext();
301 assert(oldContext.valid());
303 graphContext = comm.splitContext(vertices.size(), oldContext);
306 if(graphContext.valid()){
307 std::array<unsigned, 1> nVertices {{
static_cast<unsigned>(vertices.size())}};
308 std::vector<unsigned> vertexIDs;
310 std::for_each(vertices.begin(), vertices.end(), [&vertexIDs](
Vertex v){vertexIDs.push_back(v.id);});
313 for(
unsigned vAddr = 0; vAddr < graphContext.size(); ++vAddr){
314 assert(nVertices[0] != 0);
315 comm.asyncSend(vAddr, 0, graphContext, nVertices);
316 comm.asyncSend(vAddr, 0, graphContext, vertexIDs);
320 for(
unsigned vAddr = 0; vAddr < graphContext.size(); ++vAddr){
321 std::vector<Vertex> remoteVertices;
322 std::array<unsigned, 1> nVertices {{ 0 }};
323 comm.recv(vAddr, 0, graphContext, nVertices);
324 std::vector<unsigned> vertexIDs(nVertices[0]);
325 comm.recv(vAddr, 0, graphContext, vertexIDs);
327 for(
unsigned u : vertexIDs){
328 vertexMap[u] = vAddr;
329 remoteVertices.push_back(Cage::getVertex(u));
331 peerMap[vAddr] = remoteVertices;
338 template <
typename T>
341 T operator()(
const T a,
const T b){
342 return std::max(a, b);
360 auto it = vertexMap.find(vertex.id);
361 if(it != vertexMap.end()){
365 std::stringstream errorMsg;
366 errorMsg <<
"[" << comm.getGlobalContext().getVAddr() <<
"] No host of vertex " << vertex.id <<
" known.";
367 throw std::runtime_error(errorMsg.str());
378 return peerMap[vAddr];
388 VAddr vaddr = graphContext.getVAddr();
391 if(vertex.id == v.id)
399 std::vector<Peer> getPeers(){
400 unsigned nPeers = comm.getGlobalContext().size();
401 return std::vector<Peer>(nPeers);
423 template <
typename T>
426 comm.send(destVAddr, edge.id, graphContext, data);
442 template <
typename T>
443 void send(
const Edge edge,
const T& data, std::vector<Event> &events){
446 events.push_back(comm.asyncSend(destVAddr, edge.id, graphContext, data));
459 template <
typename T>
463 comm.recv(srcVAddr, edge.id, graphContext, data);
467 template <
typename T>
469 Event
event = comm.recv(graphContext, data);
471 return Edge(graph.getEdgeProperty(event.getTag()).first,
472 getVertex(graph.getEdgeSource(event.getTag())),
473 getVertex(graph.getEdgeTarget(event.getTag())),
474 graph.getEdgeProperty(event.getTag()).second,
488 template <
typename T>
489 void recv(
const Edge edge, T& data, std::vector<Event> &events){
491 events.push_back(comm.asyncRecv(srcVAddr, edge.id, graphContext, data));
505 template <
typename T_Data,
typename Op>
506 void reduce(
const Vertex rootVertex,
const Vertex srcVertex, Op op,
const std::vector<T_Data> sendData, std::vector<T_Data>& recvData){
507 static std::vector<T_Data> reduce;
508 static std::vector<T_Data>* rootRecvData;
509 static unsigned vertexCount = 0;
510 static bool hasRootVertex =
false;
515 Context context = graphContext;
521 reduce = std::vector<T_Data>(sendData.size(), 0);
525 std::transform(reduce.begin(), reduce.end(), sendData.begin(), reduce.begin(), op);
528 if(rootVertex.id == srcVertex.id){
529 hasRootVertex =
true;
530 rootRecvData = &recvData;
534 if(vertexCount == vertices.size()){
537 comm.reduce(rootVAddr, context, op, reduce, *rootRecvData);
540 comm.reduce(rootVAddr, context, op, reduce, recvData);
547 assert(vertexCount <= vertices.size());
551 template <
typename T_Data,
typename T_Recv,
typename Op>
552 void allReduce(
const Vertex srcVertex, Op op,
const std::vector<T_Data> sendData, T_Recv& recvData){
554 static std::vector<T_Data> reduce;
555 static unsigned vertexCount = 0;
556 static std::vector<T_Recv*> recvDatas;
559 Context context = graphContext;
562 recvDatas.push_back(&recvData);
567 reduce = std::vector<T_Data>(sendData.size(), 0);
571 std::transform(reduce.begin(), reduce.end(), sendData.begin(), reduce.begin(), op);
574 if(vertexCount == vertices.size()){
576 comm.allReduce(context, op, reduce, *(recvDatas[0]));
579 for(
unsigned i = 1; i < recvDatas.size(); ++i){
580 std::copy(recvDatas[0]->begin(), recvDatas[0]->end(), recvDatas[i]->begin());
588 assert(vertexCount <= vertices.size());
596 template <
typename T_Send,
typename T_Recv>
597 void gather(
const Vertex rootVertex,
const Vertex srcVertex,
const T_Send sendData, T_Recv& recvData,
const bool reorder){
598 typedef typename T_Send::value_type SendValueType;
599 typedef typename T_Recv::value_type RecvValueType;
601 static std::vector<SendValueType> gather;
602 static T_Recv* rootRecvData = NULL;
603 static bool peerHostsRootVertex =
false;
604 static unsigned nGatherCalls = 0;
610 Context context = graphContext;
613 gather.insert(gather.end(), sendData.begin(), sendData.end());
616 if(srcVertex.id == rootVertex.id){
617 rootRecvData = &recvData;
618 peerHostsRootVertex =
true;
621 if(nGatherCalls == hostedVertices.size()){
622 std::vector<unsigned> recvCount;
623 if(peerHostsRootVertex){
624 comm.gatherVar(rootVAddr, context, gather, *rootRecvData, recvCount);
630 std::vector<RecvValueType> recvDataReordered(recvData.size());
631 Cage::reorder(*rootRecvData, recvCount, recvDataReordered);
632 std::copy(recvDataReordered.begin(), recvDataReordered.end(), rootRecvData->begin());
638 comm.gatherVar(rootVAddr, context, gather, recvData, recvCount);
654 template <
typename T_Send,
typename T_Recv>
655 void allGather(
const Vertex srcVertex, T_Send sendData, T_Recv& recvData,
const bool reorder){
656 typedef typename T_Send::value_type SendValueType;
657 typedef typename T_Recv::value_type RecvValueType;
659 static std::vector<SendValueType> gather;
660 static std::vector<T_Recv*> recvDatas;
661 static unsigned nGatherCalls = 0;
665 Context context = graphContext;
668 gather.insert(gather.end(), sendData.begin(), sendData.end());
669 recvDatas.push_back(&recvData);
672 if(nGatherCalls == hostedVertices.size()){
673 std::vector<unsigned> recvCount;
675 comm.allGatherVar(context, gather, *(recvDatas[0]), recvCount);
679 std::vector<RecvValueType> recvDataReordered(recvData.size());
680 Cage::reorder(*(recvDatas[0]), recvCount, recvDataReordered);
681 std::copy(recvDataReordered.begin(), recvDataReordered.end(), recvDatas[0]->begin());
687 for(
unsigned i = 1; i < recvDatas.size(); ++i){
688 std::copy(recvDatas[0]->begin(), recvDatas[0]->end(), recvDatas[i]->begin());
707 template <
typename T>
708 void spread(
const Vertex vertex,
const T& data, std::vector<Event> &events){
709 std::vector<Edge> edges = getOutEdges(vertex);
710 for(
Edge edge: edges){
723 template <
typename T>
725 std::vector<Edge> edges = getOutEdges(vertex);
726 for(
Edge edge: edges){
741 template <
typename T>
743 std::vector<Edge> edges = getInEdges(vertex);
744 for(
unsigned i = 0; i < edges.size(); ++i){
745 unsigned elementsPerEdge = data.size() / edges.size();
746 std::vector<typename T::value_type> elements(elementsPerEdge);
748 std::copy(elements.begin(), elements.end(), data.begin() + (i*elementsPerEdge));
756 comm.synchronize(graphContext);
762 return graphContext.getID();
774 void reorder(
const std::vector<T> &data,
const std::vector<unsigned> &recvCount, std::vector<T> &dataReordered){
775 std::vector<unsigned> prefixsum(graphContext.size(), 0);
777 utils::exclusivePrefixSum(recvCount.begin(), recvCount.end(), prefixsum.begin());
779 for(
unsigned vAddr = 0; vAddr < graphContext.size(); vAddr++){
781 const unsigned nElementsPerVertex = recvCount.at(vAddr) / hostedVertices.size();
783 for(
unsigned hostVertex_i = 0; hostVertex_i < hostedVertices.size(); hostVertex_i++){
785 unsigned sourceOffset = prefixsum[vAddr] + (hostVertex_i * nElementsPerVertex);
786 unsigned targetOffset = hostedVertices[hostVertex_i].id * nElementsPerVertex;
788 std::copy(data.begin() + sourceOffset,
789 data.begin() + sourceOffset + nElementsPerVertex,
790 dataReordered.begin() + targetOffset);
VAddr locateVertex(Vertex vertex)
Returns the VAddr of the host of vertex in the graph.
Definition: Cage.hpp:359
void spread(const Vertex vertex, const T &data, std::vector< Event > &events)
Spread data from a vertex to all adjacent vertices connected by an outgoing edge (async).
Definition: Cage.hpp:708
The Communication And Graph Environment enables to communicate on basis of a graph with methods of a ...
Definition: Cage.hpp:46
void distribute(T_Functor distFunctor)
Distribution of the graph vertices to the peers of the global context. The distFunctor it the functio...
Definition: Cage.hpp:260
void allGather(const Vertex srcVertex, T_Send sendData, T_Recv &recvData, const bool reorder)
Definition: Cage.hpp:655
void announce(const std::vector< Vertex > vertices, const bool global=true)
Announces vertices of a graph to the network, so that other peers know that these vertices are hosted...
Definition: Cage.hpp:293
bool peerHostsVertex(Vertex vertex)
Returns true if the vertex is hosted by the calling peer otherwise false.
Definition: Cage.hpp:387
void spread(const Vertex vertex, const T &data)
Spread data from a vertex to all adjacent vertices connected by an outgoing edge (sync).
Definition: Cage.hpp:724
void send(const Edge edge, const T &data)
Synchron transmission of data to the destVertex on edge.
Definition: Cage.hpp:424
std::vector< Vertex > getHostedVertices(const VAddr vAddr)
Opposite operation of locateVertex(). It returns the vertices that are hosted by the peer with vAddr ...
Definition: Cage.hpp:377
void collect(const Vertex vertex, T &data)
Collects data from all incoming edges under the assumption that all vertices send the same number of ...
Definition: Cage.hpp:742
void send(const Edge edge, const T &data, std::vector< Event > &events)
Asynchron transmission of data to the destVertex on edge.
Definition: Cage.hpp:443
void recv(const Edge edge, T &data)
Synchron receive of data from the srcVertex on edge.
Definition: Cage.hpp:460
void recv(const Edge edge, T &data, std::vector< Event > &events)
Asynchron receive of data from the srcVertex on edge.
Definition: Cage.hpp:489