Graybat  1.1
Graph Approach for Highly Generic Communication Schemes Based on Adaptive Topologies
ZMQ.hpp
1 #pragma once
2 
3 // CLIB
4 #include <assert.h> /* assert */
5 #include <string.h> /* strup */
6 
7 // STL
8 #include <assert.h> /* assert */
9 #include <array> /* array */
10 #include <iostream> /* std::cout */
11 #include <map> /* std::map */
12 #include <exception> /* std::out_of_range */
13 #include <sstream> /* std::stringstream, std::istringstream */
14 #include <string> /* std::string */
15 #include <queue> /* std::queue */
16 #include <utility> /* std::move */
17 #include <thread> /* std::thread */
18 #include <mutex> /* std::mutex */
19 
20 // ZMQ
21 #include <zmq.hpp> /* zmq::socket_t, zmq::context_t */
22 
23 // HANA
24 #include <boost/hana.hpp>
25 namespace hana = boost::hana;
26 
27 // GrayBat
28 #include <graybat/utils/MultiKeyMap.hpp> /* utils::MultiKeyMap */
29 #include <graybat/communicationPolicy/Base.hpp> /* Base */
30 #include <graybat/communicationPolicy/zmq/Context.hpp> /* Context */
31 #include <graybat/communicationPolicy/zmq/Event.hpp> /* Event */
32 #include <graybat/communicationPolicy/zmq/Config.hpp> /* Config */
33 #include <graybat/communicationPolicy/Traits.hpp>
34 
35 namespace graybat {
36 
37  namespace communicationPolicy {
38 
39  /************************************************************************/
46  struct ZMQ;
47 
48  namespace traits {
49 
50  template<>
51  struct ContextType<ZMQ> {
53  };
54 
55  template<>
56  struct EventType<ZMQ> {
58  };
59 
60  template<>
61  struct ConfigType<ZMQ> {
63  };
64 
65  }
66 
67  struct ZMQ : public graybat::communicationPolicy::Base<ZMQ> {
68 
69  // Type defs
70  using Tag = typename graybat::communicationPolicy::Tag<ZMQ>;
71  using ContextID = typename graybat::communicationPolicy::ContextID<ZMQ>;
72  using MsgType = typename graybat::communicationPolicy::MsgType<ZMQ>;
73  using MsgID = typename graybat::communicationPolicy::MsgID<ZMQ>;
74  using VAddr = typename graybat::communicationPolicy::VAddr<ZMQ>;
75  using Context = typename graybat::communicationPolicy::Context<ZMQ>;
76  using Event = typename graybat::communicationPolicy::Event<ZMQ>;
77  using Config = typename graybat::communicationPolicy::Config<ZMQ>;
78  using Uri = std::string;
79 
80  // Message types for signaling server
81  static const MsgType VADDR_REQUEST = 0;
82  static const MsgType VADDR_LOOKUP = 1;
83  static const MsgType DESTRUCT = 2;
84  static const MsgType RETRY = 3;
85  static const MsgType ACK = 4;
86  static const MsgType CONTEXT_INIT = 5;
87  static const MsgType CONTEXT_REQUEST = 6;
88 
89  // Message types between peers
90  static const MsgType PEER = 7;
91  static const MsgType CONFIRM = 8;
92  static const MsgType SPLIT = 9;
93 
94  // zmq related
95  ::zmq::context_t zmqContext;
96  ::zmq::context_t zmqSignalingContext;
97  ::zmq::socket_t recvSocket;
98  ::zmq::socket_t signalingSocket;
99  const int zmqHwm;
100 
101  // policy related
102  Context initialContext;
103  std::map<ContextID, std::map<VAddr, std::size_t> >sendSocketMappings;
104  std::vector<::zmq::socket_t> sendSockets;
105  std::map<ContextID, std::map<VAddr, Uri> >phoneBook;
106  std::map<ContextID, std::map<Uri, VAddr> >inversePhoneBook;
107  std::map<ContextID, Context> contexts;
108 
109  utils::MessageBox<::zmq::message_t, MsgType, ContextID, VAddr, Tag> inBox;
110 
111  unsigned maxMsgID;
112  std::thread recvHandler;
113  std::mutex sendMtx;
114  std::mutex recvMtx;
115 
116  // Uris
117  const Uri masterUri;
118  const Uri peerUri;
119 
120  ZMQ(Config const config) :
121 
122  zmqContext(1),
123  zmqSignalingContext(1),
124  recvSocket(zmqContext, ZMQ_PULL),
125  signalingSocket(zmqSignalingContext, ZMQ_REQ),
126  zmqHwm(10000),
127  maxMsgID(0),
128  masterUri(config.masterUri),
129  peerUri(bindToNextFreePort(recvSocket, config.peerUri)){
130 
131  // Connect to signaling process
132  signalingSocket.connect(masterUri.c_str());
133 
134  // Retrieve Context id for initial context from signaling process
135  ContextID contextID = getInitialContextID(signalingSocket, config.contextSize);
136 
137  // Retrieve own vAddr from signaling process for initial context
138  VAddr vAddr = getVAddr(signalingSocket, contextID, peerUri);
139  initialContext = Context(contextID, vAddr, config.contextSize);
140  contexts[initialContext.getID()] = initialContext;
141 
142  // Retrieve for uris of other peers from signaling process for the initial context
143  for(unsigned vAddr = 0; vAddr < initialContext.size(); vAddr++){
144  Uri remoteUri = getUri(signalingSocket, initialContext.getID(), vAddr);
145  phoneBook[initialContext.getID()][vAddr] = remoteUri;
146  inversePhoneBook[initialContext.getID()][remoteUri] = vAddr;
147  }
148 
149  // Create socket connection to other peers
150  // Create socketmapping from initial context to sockets of VAddrs
151  for(unsigned vAddr = 0; vAddr < initialContext.size(); vAddr++){
152  sendSockets.emplace_back(::zmq::socket_t(zmqContext, ZMQ_PUSH));
153  sendSocketMappings[initialContext.getID()][vAddr] = sendSockets.size() - 1;
154  sendSockets.at(sendSocketMappings[initialContext.getID()].at(vAddr)).connect(phoneBook[initialContext.getID()].at(vAddr).c_str());
155  }
156 
157  // Create thread which recv all messages to this peer
158  recvHandler = std::thread(&ZMQ::handleRecv, this);
159 
160  }
161 
162  ZMQ(ZMQ &&other) = delete;
163  ZMQ(ZMQ &other) = delete;
164 
165  // Destructor
166  ~ZMQ(){
167  // Send exit to signaling server
168  s_send(signalingSocket, std::to_string(DESTRUCT).c_str());
169 
170  // Send exit to receive handler
171  std::array<unsigned, 1> null;
172  asyncSendImpl(DESTRUCT, 0, initialContext, initialContext.getVAddr(), 0, null);
173  recvHandler.join();
174 
175  //std::cout << "Destruct ZMQ" << std::endl;
176 
177  }
178 
179  /***********************************************************************/
187  ContextID getInitialContextID(::zmq::socket_t &socket, const size_t contextSize){
188  ContextID contextID = 0;
189  // Send vAddr request
190  std::stringstream ss;
191  ss << CONTEXT_INIT << " " << contextSize;
192  s_send(socket, ss.str().c_str());
193 
194  // Recv vAddr
195  std::stringstream sss;
196  sss << s_recv(socket);
197  sss >> contextID;
198  return contextID;
199 
200  }
201 
202  ContextID getContextID(::zmq::socket_t &socket){
203  ContextID contextID = 0;
204 
205  // Send vAddr request
206  std::stringstream ss;
207  ss << CONTEXT_REQUEST;
208  s_send(socket, ss.str().c_str());
209 
210  // Recv vAddr
211  std::stringstream sss;
212  sss << s_recv(socket);
213  sss >> contextID;
214  return contextID;
215 
216  }
217 
218  VAddr getVAddr(::zmq::socket_t &socket, const ContextID contextID, const Uri uri){
219  VAddr vAddr(0);
220  // Send vAddr request
221  std::stringstream ss;
222  ss << VADDR_REQUEST << " " << contextID << " " << uri << " ";
223  s_send(socket, ss.str().c_str());
224 
225  // Recv vAddr
226  std::stringstream sss;
227  sss << s_recv(socket);
228  sss >> vAddr;
229 
230  return vAddr;
231 
232  }
233 
234  Uri getUri(::zmq::socket_t &socket, const ContextID contextID, const VAddr vAddr){
235  MsgType type = RETRY;
236 
237  while(type == RETRY){
238  // Send vAddr lookup
239  std::stringstream ss;
240  ss << VADDR_LOOKUP << " " << contextID << " " << vAddr;
241  s_send(socket, ss.str().c_str());
242 
243  // Recv uri
244  std::string remoteUri;
245  std::stringstream sss;
246  sss << s_recv(socket);
247  sss >> type;
248  if(type == ACK){
249  sss >> remoteUri;
250  return remoteUri;
251  }
252 
253  }
254 
255  }
256 
257  MsgID getMsgID(){
258  return maxMsgID++;
259  }
260 
261  static char * s_recv (::zmq::socket_t& socket) {
262  ::zmq::message_t message(256);
263  socket.recv(&message);
264  if (message.size() == static_cast<size_t>(-1))
265  return NULL;
266  if (message.size() > 255)
267  static_cast<char*>(message.data())[255] = 0;
268  return strdup (static_cast<char*>(message.data()));
269  }
270 
271  static int s_send (::zmq::socket_t& socket, const char *string) {
272  ::zmq::message_t message(sizeof(char) * strlen(string));
273  memcpy (static_cast<char*>(message.data()), string, sizeof(char) * strlen(string));
274  socket.send(message);
275  return 0;
276  }
277 
278  template <typename T_Data>
279  void zmqMessageToData(::zmq::message_t &message, T_Data& data){
280  size_t msgOffset = 0;
281  MsgType remoteMsgType;
282  MsgID remoteMsgID;
283  ContextID remoteContextID;
284  VAddr remoteVAddr;
285  Tag remoteTag;
286 
287  memcpy (&remoteMsgType, static_cast<char*>(message.data()) + msgOffset, sizeof(MsgType)); msgOffset += sizeof(MsgType);
288  memcpy (&remoteMsgID, static_cast<char*>(message.data()) + msgOffset, sizeof(MsgID)); msgOffset += sizeof(MsgID);
289  memcpy (&remoteContextID, static_cast<char*>(message.data()) + msgOffset, sizeof(ContextID)); msgOffset += sizeof(ContextID);
290  memcpy (&remoteVAddr, static_cast<char*>(message.data()) + msgOffset, sizeof(VAddr)); msgOffset += sizeof(VAddr);
291  memcpy (&remoteTag, static_cast<char*>(message.data()) + msgOffset, sizeof(Tag)); msgOffset += sizeof(Tag);
292 
293  memcpy (static_cast<void*>(data.data()),
294  static_cast<char*>(message.data()) + msgOffset,
295  sizeof(typename T_Data::value_type) * data.size());
296 
297  }
298 
299  Uri bindToNextFreePort(::zmq::socket_t &socket, const std::string peerUri){
300  std::string peerBaseUri = peerUri.substr(0, peerUri.rfind(":"));
301  unsigned peerBasePort = std::stoi(peerUri.substr(peerUri.rfind(":") + 1));
302  bool connected = false;
303 
304  std::string uri;
305  while(!connected){
306  try {
307  uri = peerBaseUri + ":" + std::to_string(peerBasePort);
308  socket.bind(uri.c_str());
309  connected = true;
310  }
311  catch(::zmq::error_t e){
312  //std::cout << e.what() << ". PeerUri \"" << uri << "\". Try to increment port and rebind." << std::endl;
313  peerBasePort++;
314  }
315 
316  }
317 
318  return uri;
319 
320  }
321 
322  void handleRecv(){
323 
324  while(true){
325 
326  ::zmq::message_t message;
327  recvSocket.recv(&message);
328 
329  {
330  size_t msgOffset = 0;
331  MsgType remoteMsgType;
332  MsgID remoteMsgID;
333  ContextID remoteContextID;
334  VAddr remoteVAddr;
335  Tag remoteTag;
336 
337  memcpy (&remoteMsgType, static_cast<char*>(message.data()) + msgOffset, sizeof(MsgType)); msgOffset += sizeof(MsgType);
338  memcpy (&remoteMsgID, static_cast<char*>(message.data()) + msgOffset, sizeof(MsgID)); msgOffset += sizeof(MsgID);
339  memcpy (&remoteContextID, static_cast<char*>(message.data()) + msgOffset, sizeof(ContextID)); msgOffset += sizeof(ContextID);
340  memcpy (&remoteVAddr, static_cast<char*>(message.data()) + msgOffset, sizeof(VAddr)); msgOffset += sizeof(VAddr);
341  memcpy (&remoteTag, static_cast<char*>(message.data()) + msgOffset, sizeof(Tag)); msgOffset += sizeof(Tag);
342 
343  //std::cout << "recv handler: " << remoteMsgType << " " << remoteMsgID << " " << remoteContextID << " " << remoteVAddr << " " << remoteTag << std::endl;
344 
345  if(remoteMsgType == DESTRUCT){
346  return;
347  }
348 
349  if(remoteMsgType == PEER){
350  std::array<unsigned,0> null;
351  Context context = contexts.at(remoteContextID);
352  asyncSendImpl(CONFIRM, remoteMsgID, context, remoteVAddr, remoteTag, null);
353  }
354 
355  inBox.enqueue(std::move(message), remoteMsgType, remoteContextID, remoteVAddr, remoteTag);
356 
357  }
358 
359  }
360 
361  }
362 
363 
367  /***********************************************************************/
388  template <typename T_Send>
389  void send(const VAddr destVAddr, const Tag tag, const Context context, const T_Send& sendData){
390  Event e = asyncSend(destVAddr, tag, context, sendData);
391  e.wait();
392  }
393 
408  template <typename T_Send>
409  Event asyncSend(const VAddr destVAddr, const Tag tag, const Context context, T_Send& sendData){
410  //std::cout << "send method[" << context.getVAddr() << "]: " << context.getID() << " " << destVAddr << " " << tag << std::endl;
411  MsgID msgID = getMsgID();
412  asyncSendImpl(PEER, msgID, context, destVAddr, tag, sendData);
413  return Event(msgID, context, destVAddr, tag, *this);
414 
415  }
416 
417 
418  template <typename T_Send>
419  void asyncSendImpl(const MsgType msgType, const MsgID msgID, const Context context, const VAddr destVAddr, const Tag tag, T_Send& sendData){
420  // Create message
421  ::zmq::message_t message(sizeof(MsgType) +
422  sizeof(MsgID) +
423  sizeof(ContextID) +
424  sizeof(VAddr) +
425  sizeof(Tag) +
426  sendData.size() * sizeof(typename T_Send::value_type));
427 
428  size_t msgOffset(0);
429  ContextID contextID(context.getID());
430  VAddr srcVAddr(context.getVAddr());
431  memcpy (static_cast<char*>(message.data()) + msgOffset, &msgType, sizeof(MsgType)); msgOffset += sizeof(MsgType);
432  memcpy (static_cast<char*>(message.data()) + msgOffset, &msgID, sizeof(MsgID)); msgOffset += sizeof(MsgID);
433  memcpy (static_cast<char*>(message.data()) + msgOffset, &contextID, sizeof(ContextID)); msgOffset += sizeof(ContextID);
434  memcpy (static_cast<char*>(message.data()) + msgOffset, &srcVAddr, sizeof(VAddr)); msgOffset += sizeof(VAddr);
435  memcpy (static_cast<char*>(message.data()) + msgOffset, &tag, sizeof(Tag)); msgOffset += sizeof(Tag);
436  memcpy (static_cast<char*>(message.data()) + msgOffset, sendData.data(), sizeof(typename T_Send::value_type) * sendData.size());
437 
438  //std::cout << "[" << context.getVAddr() << "] sendImpl: " << msgType << " " << msgID << " " << context.getID() << " " << destVAddr << " " << tag << std::endl;
439 
440  std::size_t sendSocket_i = sendSocketMappings.at(context.getID()).at(destVAddr);
441  ::zmq::socket_t &sendSocket = sendSockets.at(sendSocket_i);
442 
443  sendMtx.lock();
444  sendSocket.send(message);
445  sendMtx.unlock();
446 
447  }
448 
449 
462  template <typename T_Recv>
463  void recv(const VAddr srcVAddr, const Tag tag, const Context context, T_Recv& recvData){
464  //std::cout << "recv method[" << context.getVAddr() << "]:" << context.getID() << " " << srcVAddr << " " << tag << std::endl;
465  recvImpl(PEER, context, srcVAddr, tag, recvData);
466 
467  }
468 
469  template <typename T_Recv>
470  Event recv(const Context context, T_Recv& recvData){
471  return recvImpl(context, recvData);
472  }
473 
474  template <typename T_Recv>
475  void recvImpl(const MsgType msgType, const Context context, const VAddr srcVAddr, const Tag tag, T_Recv& recvData){
476  //std::cout << "[" << context.getVAddr() << "] recvImpl: " << msgType << " " << context.getID() << " " << srcVAddr << " " << tag << std::endl;
477  ::zmq::message_t message(std::move(inBox.waitDequeue(msgType, context.getID(), srcVAddr, tag)));
478  zmqMessageToData(message, recvData);
479 
480  }
481 
482 
483  template <typename T_Recv>
484  Event recvImpl(const Context context, T_Recv& recvData){
485 
486  hana::tuple<MsgType, ContextID, VAddr, Tag>keys;
487  VAddr destVAddr;
488  Tag tag;
489 
490  ::zmq::message_t message(std::move(inBox.waitDequeue(keys, PEER, context.getID())));
491  destVAddr = hana::at(keys, hana::size_c<2>);
492  tag = hana::at(keys, hana::size_c<3>);
493 
494  zmqMessageToData(message, recvData);
495  return Event(getMsgID(), context, destVAddr, tag, *this);
496 
497  }
498 
499  void wait(const MsgType msgID, const Context context, const VAddr vAddr, const Tag tag){
500  //std::cout << "wait method: " << msgID << " " << context.getID() << " " << vAddr << " " << tag << std::endl;
501  while(!ready(msgID, context, vAddr, tag));
502 
503  }
504 
505  bool ready(const MsgType msgID, const Context context, const VAddr vAddr, const Tag tag){
506 
507  ::zmq::message_t message(std::move(inBox.waitDequeue(CONFIRM, context.getID(), vAddr, tag)));
508 
509  size_t msgOffset = 0;
510  MsgType remoteMsgType;
511  MsgID remoteMsgID;
512 
513  memcpy (&remoteMsgType, static_cast<char*>(message.data()) + msgOffset, sizeof(MsgType)); msgOffset += sizeof(MsgType);
514  memcpy (&remoteMsgID, static_cast<char*>(message.data()) + msgOffset, sizeof(MsgID)); msgOffset += sizeof(MsgID);
515 
516  if(remoteMsgID == msgID){
517  return true;
518  }
519  else {
520  inBox.enqueue(std::move(message), CONFIRM, context.getID(), vAddr, tag);
521  }
522 
523  return false;
524 
525  }
526 
527 
528 
529 
532  /************************************************************************/
541  /*************************************************************************/
552  Context splitContext(const bool isMember, const Context oldContext){
553  //std::cout << oldContext.getVAddr() << " splitcontext entry" << std::endl;
554  ::zmq::message_t reqMessage;
555  Context newContext;
556 
557  // Request old master for new context
558  std::array<unsigned, 1> member {{ isMember }};
559  ZMQ::asyncSendImpl(SPLIT, getMsgID(), oldContext, 0, 0, member);
560 
561  // Peer with VAddr 0 collects new members
562  if( oldContext.getVAddr() == 0){
563  std::array<unsigned, 2> nMembers {{ 0 }};
564  std::vector<VAddr> vAddrs;
565 
566  for(unsigned vAddr = 0; vAddr < oldContext.size(); ++vAddr){
567  std::array<unsigned, 1> remoteIsMember {{ 0 }};
568  ZMQ::recvImpl(SPLIT, oldContext, vAddr, 0, remoteIsMember);
569 
570  if(remoteIsMember[0]) {
571  nMembers[0]++;
572  vAddrs.push_back(vAddr);
573 
574  }
575 
576  }
577 
578  nMembers[1] = getContextID(signalingSocket);
579 
580  for(VAddr vAddr : vAddrs){
581  ZMQ::asyncSendImpl(SPLIT, getMsgID(), oldContext, vAddr, 0, nMembers);
582 
583  }
584 
585  }
586 
587  //std::cout << oldContext.getVAddr() << " check 0" << std::endl;
588 
589  if(isMember){
590  std::array<unsigned, 2> nMembers {{ 0 , 0 }};
591 
592  ZMQ::recvImpl(SPLIT, oldContext, 0, 0, nMembers);
593  ContextID newContextID = nMembers[1];
594 
595  newContext = Context(newContextID, getVAddr(signalingSocket, newContextID, peerUri), nMembers[0]);
596  contexts[newContext.getID()] = newContext;
597 
598  //std::cout << oldContext.getVAddr() << " check 1" << std::endl;
599  // Update phonebook for new context
600  for(unsigned vAddr = 0; vAddr < newContext.size(); vAddr++){
601  Uri remoteUri = getUri(signalingSocket, newContext.getID(), vAddr);
602  phoneBook[newContext.getID()][vAddr] = remoteUri;
603  inversePhoneBook[newContext.getID()][remoteUri] = vAddr;
604  }
605 
606  //std::cout << oldContext.getVAddr() << " check 2" << std::endl;
607  // Create mappings to sockets for new context
608  for(unsigned vAddr = 0; vAddr < newContext.size(); vAddr++){
609  Uri uri = phoneBook[newContext.getID()][vAddr];
610  VAddr oldVAddr = inversePhoneBook[oldContext.getID()].at(uri);
611  sendSocketMappings[newContext.getID()][vAddr] = sendSocketMappings[oldContext.getID()].at(oldVAddr);
612  }
613 
614  }
615  else{
616  // Invalid context for "not members"
617  newContext = Context();
618  }
619 
620  //std::cout << oldContext.getVAddr() << " check 3" << std::endl;
621 
622  // Barrier thus recvHandler is up to date with sendSocketMappings
623  // Necessary in environment with multiple zmq objects
624  std::array<unsigned, 0> null;
625  for(unsigned vAddr = 0; vAddr < oldContext.size(); ++vAddr){
626  ZMQ::asyncSendImpl(SPLIT, getMsgID(), oldContext, vAddr, 0, null);
627  }
628  for(unsigned vAddr = 0; vAddr < oldContext.size(); ++vAddr){
629  ZMQ::recvImpl(SPLIT, oldContext, vAddr, 0, null);
630  }
631 
632  //std::cout << oldContext.getVAddr() << " splitContext end" << std::endl;
633  return newContext;
634 
635  }
636 
637 
642  Context getGlobalContext(){
643  return initialContext;
644  }
647  };
648 
649 
650 
651 
652  } // namespace communicationPolicy
653 
654 } // namespace graybat
Definition: chain.cpp:31
void recv(const VAddr srcVAddr, const Tag tag, const Context context, T_Recv &recvData)
Blocking receive of a message recvData from peer with virtual address srcVAddr.
Definition: ZMQ.hpp:463
Definition: Base.hpp:20
Context splitContext(const bool isMember, const Context oldContext)
Definition: ZMQ.hpp:552
An event is returned by non-blocking communication operations and can be asked whether an operation h...
Definition: Event.hpp:20
A context represents a set of peers which are able to communicate with each other.
Definition: Context.hpp:17
Context getGlobalContext()
Returns the context that contains all peers.
Definition: ZMQ.hpp:642
Implementation of the Cage communicationPolicy interface based on ZMQ.
Definition: ZMQ.hpp:67
Definition: BiStar.hpp:8
Event asyncSend(const VAddr destVAddr, const Tag tag, const Context context, T_Send &sendData)
Non blocking transmission of a message sendData to peer with virtual address destVAddr.
Definition: ZMQ.hpp:409
void send(const VAddr destVAddr, const Tag tag, const Context context, const T_Send &sendData)
Blocking transmission of a message sendData to peer with virtual address destVAddr.
Definition: ZMQ.hpp:389