24 #include <boost/hana.hpp>
25 namespace hana = boost::hana;
28 #include <graybat/utils/MultiKeyMap.hpp>
29 #include <graybat/communicationPolicy/Base.hpp>
30 #include <graybat/communicationPolicy/zmq/Context.hpp>
31 #include <graybat/communicationPolicy/zmq/Event.hpp>
32 #include <graybat/communicationPolicy/zmq/Config.hpp>
33 #include <graybat/communicationPolicy/Traits.hpp>
37 namespace communicationPolicy {
70 using Tag =
typename graybat::communicationPolicy::Tag<ZMQ>;
71 using ContextID =
typename graybat::communicationPolicy::ContextID<ZMQ>;
72 using MsgType =
typename graybat::communicationPolicy::MsgType<ZMQ>;
73 using MsgID =
typename graybat::communicationPolicy::MsgID<ZMQ>;
74 using VAddr =
typename graybat::communicationPolicy::VAddr<ZMQ>;
75 using Context =
typename graybat::communicationPolicy::Context<ZMQ>;
76 using Event =
typename graybat::communicationPolicy::Event<ZMQ>;
77 using Config =
typename graybat::communicationPolicy::Config<ZMQ>;
78 using Uri = std::string;
81 static const MsgType VADDR_REQUEST = 0;
82 static const MsgType VADDR_LOOKUP = 1;
83 static const MsgType DESTRUCT = 2;
84 static const MsgType RETRY = 3;
85 static const MsgType ACK = 4;
86 static const MsgType CONTEXT_INIT = 5;
87 static const MsgType CONTEXT_REQUEST = 6;
90 static const MsgType PEER = 7;
91 static const MsgType CONFIRM = 8;
92 static const MsgType SPLIT = 9;
95 ::zmq::context_t zmqContext;
96 ::zmq::context_t zmqSignalingContext;
97 ::zmq::socket_t recvSocket;
98 ::zmq::socket_t signalingSocket;
102 Context initialContext;
103 std::map<ContextID, std::map<VAddr, std::size_t> >sendSocketMappings;
104 std::vector<::zmq::socket_t> sendSockets;
105 std::map<ContextID, std::map<VAddr, Uri> >phoneBook;
106 std::map<ContextID, std::map<Uri, VAddr> >inversePhoneBook;
107 std::map<ContextID, Context> contexts;
109 utils::MessageBox<::zmq::message_t, MsgType, ContextID, VAddr, Tag> inBox;
112 std::thread recvHandler;
120 ZMQ(Config
const config) :
123 zmqSignalingContext(1),
124 recvSocket(zmqContext, ZMQ_PULL),
125 signalingSocket(zmqSignalingContext, ZMQ_REQ),
128 masterUri(config.masterUri),
129 peerUri(bindToNextFreePort(recvSocket, config.peerUri)){
132 signalingSocket.connect(masterUri.c_str());
135 ContextID contextID = getInitialContextID(signalingSocket, config.contextSize);
138 VAddr vAddr = getVAddr(signalingSocket, contextID, peerUri);
139 initialContext = Context(contextID, vAddr, config.contextSize);
140 contexts[initialContext.getID()] = initialContext;
143 for(
unsigned vAddr = 0; vAddr < initialContext.size(); vAddr++){
144 Uri remoteUri = getUri(signalingSocket, initialContext.getID(), vAddr);
145 phoneBook[initialContext.getID()][vAddr] = remoteUri;
146 inversePhoneBook[initialContext.getID()][remoteUri] = vAddr;
151 for(
unsigned vAddr = 0; vAddr < initialContext.size(); vAddr++){
152 sendSockets.emplace_back(::zmq::socket_t(zmqContext, ZMQ_PUSH));
153 sendSocketMappings[initialContext.getID()][vAddr] = sendSockets.size() - 1;
154 sendSockets.at(sendSocketMappings[initialContext.getID()].at(vAddr)).connect(phoneBook[initialContext.getID()].at(vAddr).c_str());
158 recvHandler = std::thread(&ZMQ::handleRecv,
this);
162 ZMQ(
ZMQ &&other) =
delete;
168 s_send(signalingSocket, std::to_string(DESTRUCT).c_str());
171 std::array<unsigned, 1> null;
172 asyncSendImpl(DESTRUCT, 0, initialContext, initialContext.getVAddr(), 0, null);
187 ContextID getInitialContextID(::zmq::socket_t &socket,
const size_t contextSize){
188 ContextID contextID = 0;
190 std::stringstream ss;
191 ss << CONTEXT_INIT <<
" " << contextSize;
192 s_send(socket, ss.str().c_str());
195 std::stringstream sss;
196 sss << s_recv(socket);
202 ContextID getContextID(::zmq::socket_t &socket){
203 ContextID contextID = 0;
206 std::stringstream ss;
207 ss << CONTEXT_REQUEST;
208 s_send(socket, ss.str().c_str());
211 std::stringstream sss;
212 sss << s_recv(socket);
218 VAddr getVAddr(::zmq::socket_t &socket,
const ContextID contextID,
const Uri uri){
221 std::stringstream ss;
222 ss << VADDR_REQUEST <<
" " << contextID <<
" " << uri <<
" ";
223 s_send(socket, ss.str().c_str());
226 std::stringstream sss;
227 sss << s_recv(socket);
234 Uri getUri(::zmq::socket_t &socket,
const ContextID contextID,
const VAddr vAddr){
235 MsgType type = RETRY;
237 while(type == RETRY){
239 std::stringstream ss;
240 ss << VADDR_LOOKUP <<
" " << contextID <<
" " << vAddr;
241 s_send(socket, ss.str().c_str());
244 std::string remoteUri;
245 std::stringstream sss;
246 sss << s_recv(socket);
261 static char * s_recv (::zmq::socket_t& socket) {
262 ::zmq::message_t message(256);
263 socket.recv(&message);
264 if (message.size() ==
static_cast<size_t>(-1))
266 if (message.size() > 255)
267 static_cast<char*>(message.data())[255] = 0;
268 return strdup (static_cast<char*>(message.data()));
271 static int s_send (::zmq::socket_t& socket,
const char *
string) {
272 ::zmq::message_t message(
sizeof(
char) * strlen(
string));
273 memcpy (static_cast<char*>(message.data()),
string,
sizeof(
char) * strlen(
string));
274 socket.send(message);
278 template <
typename T_Data>
279 void zmqMessageToData(::zmq::message_t &message, T_Data& data){
280 size_t msgOffset = 0;
281 MsgType remoteMsgType;
283 ContextID remoteContextID;
287 memcpy (&remoteMsgType, static_cast<char*>(message.data()) + msgOffset,
sizeof(MsgType)); msgOffset +=
sizeof(MsgType);
288 memcpy (&remoteMsgID, static_cast<char*>(message.data()) + msgOffset,
sizeof(MsgID)); msgOffset +=
sizeof(MsgID);
289 memcpy (&remoteContextID, static_cast<char*>(message.data()) + msgOffset,
sizeof(ContextID)); msgOffset +=
sizeof(ContextID);
290 memcpy (&remoteVAddr, static_cast<char*>(message.data()) + msgOffset,
sizeof(VAddr)); msgOffset +=
sizeof(VAddr);
291 memcpy (&remoteTag, static_cast<char*>(message.data()) + msgOffset,
sizeof(
Tag)); msgOffset +=
sizeof(
Tag);
293 memcpy (static_cast<void*>(data.data()),
294 static_cast<char*>(message.data()) + msgOffset,
295 sizeof(
typename T_Data::value_type) * data.size());
299 Uri bindToNextFreePort(::zmq::socket_t &socket,
const std::string peerUri){
300 std::string peerBaseUri = peerUri.substr(0, peerUri.rfind(
":"));
301 unsigned peerBasePort = std::stoi(peerUri.substr(peerUri.rfind(
":") + 1));
302 bool connected =
false;
307 uri = peerBaseUri +
":" + std::to_string(peerBasePort);
308 socket.bind(uri.c_str());
311 catch(::zmq::error_t e){
326 ::zmq::message_t message;
327 recvSocket.recv(&message);
330 size_t msgOffset = 0;
331 MsgType remoteMsgType;
333 ContextID remoteContextID;
337 memcpy (&remoteMsgType, static_cast<char*>(message.data()) + msgOffset,
sizeof(MsgType)); msgOffset +=
sizeof(MsgType);
338 memcpy (&remoteMsgID, static_cast<char*>(message.data()) + msgOffset,
sizeof(MsgID)); msgOffset +=
sizeof(MsgID);
339 memcpy (&remoteContextID, static_cast<char*>(message.data()) + msgOffset,
sizeof(ContextID)); msgOffset +=
sizeof(ContextID);
340 memcpy (&remoteVAddr, static_cast<char*>(message.data()) + msgOffset,
sizeof(VAddr)); msgOffset +=
sizeof(VAddr);
341 memcpy (&remoteTag, static_cast<char*>(message.data()) + msgOffset,
sizeof(
Tag)); msgOffset +=
sizeof(
Tag);
345 if(remoteMsgType == DESTRUCT){
349 if(remoteMsgType == PEER){
350 std::array<unsigned,0> null;
351 Context context = contexts.at(remoteContextID);
352 asyncSendImpl(CONFIRM, remoteMsgID, context, remoteVAddr, remoteTag, null);
355 inBox.enqueue(std::move(message), remoteMsgType, remoteContextID, remoteVAddr, remoteTag);
388 template <
typename T_Send>
389 void send(
const VAddr destVAddr,
const Tag tag,
const Context context,
const T_Send& sendData){
390 Event e =
asyncSend(destVAddr, tag, context, sendData);
408 template <
typename T_Send>
409 Event
asyncSend(
const VAddr destVAddr,
const Tag tag,
const Context context, T_Send& sendData){
411 MsgID msgID = getMsgID();
412 asyncSendImpl(PEER, msgID, context, destVAddr, tag, sendData);
413 return Event(msgID, context, destVAddr, tag, *
this);
418 template <
typename T_Send>
419 void asyncSendImpl(
const MsgType msgType,
const MsgID msgID,
const Context context,
const VAddr destVAddr,
const Tag tag, T_Send& sendData){
421 ::zmq::message_t message(
sizeof(MsgType) +
426 sendData.size() *
sizeof(
typename T_Send::value_type));
429 ContextID contextID(context.getID());
430 VAddr srcVAddr(context.getVAddr());
431 memcpy (static_cast<char*>(message.data()) + msgOffset, &msgType,
sizeof(MsgType)); msgOffset +=
sizeof(MsgType);
432 memcpy (static_cast<char*>(message.data()) + msgOffset, &msgID,
sizeof(MsgID)); msgOffset +=
sizeof(MsgID);
433 memcpy (static_cast<char*>(message.data()) + msgOffset, &contextID,
sizeof(ContextID)); msgOffset +=
sizeof(ContextID);
434 memcpy (static_cast<char*>(message.data()) + msgOffset, &srcVAddr,
sizeof(VAddr)); msgOffset +=
sizeof(VAddr);
435 memcpy (static_cast<char*>(message.data()) + msgOffset, &tag,
sizeof(
Tag)); msgOffset +=
sizeof(
Tag);
436 memcpy (static_cast<char*>(message.data()) + msgOffset, sendData.data(),
sizeof(
typename T_Send::value_type) * sendData.size());
440 std::size_t sendSocket_i = sendSocketMappings.at(context.getID()).at(destVAddr);
441 ::zmq::socket_t &sendSocket = sendSockets.at(sendSocket_i);
444 sendSocket.send(message);
462 template <
typename T_Recv>
463 void recv(
const VAddr srcVAddr,
const Tag tag,
const Context context, T_Recv& recvData){
465 recvImpl(PEER, context, srcVAddr, tag, recvData);
469 template <
typename T_Recv>
470 Event
recv(
const Context context, T_Recv& recvData){
471 return recvImpl(context, recvData);
474 template <
typename T_Recv>
475 void recvImpl(
const MsgType msgType,
const Context context,
const VAddr srcVAddr,
const Tag tag, T_Recv& recvData){
477 ::zmq::message_t message(std::move(inBox.waitDequeue(msgType, context.getID(), srcVAddr, tag)));
478 zmqMessageToData(message, recvData);
483 template <
typename T_Recv>
484 Event recvImpl(
const Context context, T_Recv& recvData){
486 hana::tuple<MsgType, ContextID, VAddr, Tag>keys;
490 ::zmq::message_t message(std::move(inBox.waitDequeue(keys, PEER, context.getID())));
491 destVAddr = hana::at(keys, hana::size_c<2>);
492 tag = hana::at(keys, hana::size_c<3>);
494 zmqMessageToData(message, recvData);
495 return Event(getMsgID(), context, destVAddr, tag, *
this);
499 void wait(
const MsgType msgID,
const Context context,
const VAddr vAddr,
const Tag tag){
501 while(!ready(msgID, context, vAddr, tag));
505 bool ready(
const MsgType msgID,
const Context context,
const VAddr vAddr,
const Tag tag){
507 ::zmq::message_t message(std::move(inBox.waitDequeue(CONFIRM, context.getID(), vAddr, tag)));
509 size_t msgOffset = 0;
510 MsgType remoteMsgType;
513 memcpy (&remoteMsgType, static_cast<char*>(message.data()) + msgOffset,
sizeof(MsgType)); msgOffset +=
sizeof(MsgType);
514 memcpy (&remoteMsgID, static_cast<char*>(message.data()) + msgOffset,
sizeof(MsgID)); msgOffset +=
sizeof(MsgID);
516 if(remoteMsgID == msgID){
520 inBox.enqueue(std::move(message), CONFIRM, context.getID(), vAddr, tag);
554 ::zmq::message_t reqMessage;
558 std::array<unsigned, 1> member {{ isMember }};
559 ZMQ::asyncSendImpl(SPLIT, getMsgID(), oldContext, 0, 0, member);
562 if( oldContext.getVAddr() == 0){
563 std::array<unsigned, 2> nMembers {{ 0 }};
564 std::vector<VAddr> vAddrs;
566 for(
unsigned vAddr = 0; vAddr < oldContext.size(); ++vAddr){
567 std::array<unsigned, 1> remoteIsMember {{ 0 }};
568 ZMQ::recvImpl(SPLIT, oldContext, vAddr, 0, remoteIsMember);
570 if(remoteIsMember[0]) {
572 vAddrs.push_back(vAddr);
578 nMembers[1] = getContextID(signalingSocket);
580 for(VAddr vAddr : vAddrs){
581 ZMQ::asyncSendImpl(SPLIT, getMsgID(), oldContext, vAddr, 0, nMembers);
590 std::array<unsigned, 2> nMembers {{ 0 , 0 }};
592 ZMQ::recvImpl(SPLIT, oldContext, 0, 0, nMembers);
593 ContextID newContextID = nMembers[1];
595 newContext = Context(newContextID, getVAddr(signalingSocket, newContextID, peerUri), nMembers[0]);
596 contexts[newContext.getID()] = newContext;
600 for(
unsigned vAddr = 0; vAddr < newContext.size(); vAddr++){
601 Uri remoteUri = getUri(signalingSocket, newContext.getID(), vAddr);
602 phoneBook[newContext.getID()][vAddr] = remoteUri;
603 inversePhoneBook[newContext.getID()][remoteUri] = vAddr;
608 for(
unsigned vAddr = 0; vAddr < newContext.size(); vAddr++){
609 Uri uri = phoneBook[newContext.getID()][vAddr];
610 VAddr oldVAddr = inversePhoneBook[oldContext.getID()].at(uri);
611 sendSocketMappings[newContext.getID()][vAddr] = sendSocketMappings[oldContext.getID()].at(oldVAddr);
617 newContext = Context();
624 std::array<unsigned, 0> null;
625 for(
unsigned vAddr = 0; vAddr < oldContext.size(); ++vAddr){
626 ZMQ::asyncSendImpl(SPLIT, getMsgID(), oldContext, vAddr, 0, null);
628 for(
unsigned vAddr = 0; vAddr < oldContext.size(); ++vAddr){
629 ZMQ::recvImpl(SPLIT, oldContext, vAddr, 0, null);
643 return initialContext;
void recv(const VAddr srcVAddr, const Tag tag, const Context context, T_Recv &recvData)
Blocking receive of a message recvData from peer with virtual address srcVAddr.
Definition: ZMQ.hpp:463
Definition: Traits.hpp:10
Definition: Traits.hpp:16
Context splitContext(const bool isMember, const Context oldContext)
Definition: ZMQ.hpp:552
An event is returned by non-blocking communication operations and can be asked whether an operation h...
Definition: Event.hpp:20
Definition: Traits.hpp:13
A context represents a set of peers which are able to communicate with each other.
Definition: Context.hpp:17
Context getGlobalContext()
Returns the context that contains all peers.
Definition: ZMQ.hpp:642
Implementation of the Cage communicationPolicy interface based on ZMQ.
Definition: ZMQ.hpp:67
Event asyncSend(const VAddr destVAddr, const Tag tag, const Context context, T_Send &sendData)
Non blocking transmission of a message sendData to peer with virtual address destVAddr.
Definition: ZMQ.hpp:409
void send(const VAddr destVAddr, const Tag tag, const Context context, const T_Send &sendData)
Blocking transmission of a message sendData to peer with virtual address destVAddr.
Definition: ZMQ.hpp:389