ISAAC
Overview :: Library Doc :: Server Doc :: JSON Commands

In Situ Animation of Accelerated Computations

Broker.cpp
Go to the documentation of this file.
1 /* This file is part of ISAAC.
2  *
3  * ISAAC is free software: you can redistribute it and/or modify
4  * it under the terms of the GNU Lesser General Public License as
5  * published by the Free Software Foundation, either version 3 of the
6  * License, or (at your option) any later version.
7  *
8  * ISAAC is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU Lesser General Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public
14  * License along with ISAAC. If not, see <www.gnu.org/licenses/>. */
15 
16 #include "Broker.hpp"
17 #include <stdio.h>
18 #include <pthread.h>
19 #include "ThreadList.hpp"
20 #include <sys/socket.h>
21 #include <string>
22 #if ISAAC_JPEG == 1
23  #include <jpeglib.h>
24  #include <setjmp.h>
25 #endif
26 
27 #include <boost/archive/iterators/binary_from_base64.hpp>
28 #include <boost/archive/iterators/transform_width.hpp>
29 
30 volatile sig_atomic_t Broker::force_exit = 0;
31 
32 void sighandler(int sig)
33 {
34  printf("\n");
36 }
37 
38 template <typename Type>
39 void* delete_pointer_later(void* ptr)
40 {
41  sleep(300); //5 minutes sleeping
42  delete( (Type*)ptr );
43 }
44 
45 Broker::Broker(std::string name,int inner_port,std::string interface)
46 {
47  this->name = name;
48  this->inner_port = inner_port;
49  this->inner_interface = interface;
50  this->insituThread = 0;
51  masterHello = json_object();
52  json_object_set_new( masterHello, "type", json_string( "hello" ) );
53  json_object_set_new( masterHello, "name", json_string( name.c_str() ) );
54  masterHelloConnectorList = json_array();
55  json_object_set_new( masterHello, "streams", masterHelloConnectorList );
56 }
57 
59 {
60  dataConnector->setBroker(this);
62  d.connector = dataConnector;
63  d.thread = 0;
64  dataConnectorList.push_back(d);
65  return 0;
66 }
67 
69 {
70  imageConnector->setBroker(this);
72  d.connector = imageConnector;
73  d.thread = 0;
74  imageConnectorList.push_back(d);
75  if (imageConnector->showClient)
76  {
77  json_t* element = json_object();
78  json_object_set_new( element, "name", json_string( imageConnector->getName().c_str() ) );
79  json_object_set_new( element, "id", json_integer( imageConnectorList.size() - 1 ) ); //last element
80  json_array_append_new( masterHelloConnectorList, element );
81  }
82  return 0;
83 }
84 
86 {
87  MetaDataClient* client = new MetaDataClient();
88  dataClientList.push_back(client);
89  client->masterSendMessage(new MessageContainer(MASTER_HELLO,masterHello,true));
90  //Send all registered visualizations
92  while (mom)
93  {
94  if (mom->t->master)
95  {
96  json_incref( mom->t->initData );
97  client->masterSendMessage(new MessageContainer(REGISTER,mom->t->initData,true));
98  }
99  mom = mom->next;
100  }
101  return client;
102 }
103 
104 #if ISAAC_JPEG == 1
105  void isaac_jpeg_init_source(j_decompress_ptr cinfo)
106  {
107  }
108  boolean isaac_jpeg_fill_input_buffer(j_decompress_ptr cinfo)
109  {
110  return true;
111  }
112  void isaac_jpeg_skip_input_data(j_decompress_ptr cinfo,long num_bytes)
113  {
114  }
115  boolean isaac_jpeg_resync_to_restart(j_decompress_ptr cinfo, int desired)
116  {
117  return true;
118  }
119  void isaac_jpeg_term_source(j_decompress_ptr cinfo)
120  {
121  }
122 
123  struct isaac_jpeg_error_mgr {
124  struct jpeg_error_mgr pub;
125  jmp_buf setjmp_buffer;
126  };
127  typedef struct isaac_jpeg_error_mgr * isaac_jpeg_error_ptr;
128 
129  METHODDEF(void)
130  isaac_jpeg_error_exit (j_common_ptr cinfo)
131  {
132  isaac_jpeg_error_ptr err = (isaac_jpeg_error_ptr) cinfo->err;
133  (*cinfo->err->output_message) (cinfo);
134  longjmp(err->setjmp_buffer, 1);
135  }
136 #endif
137 
138 void Broker::receiveVideo(InsituConnectorGroup* group,uint8_t* video_buffer,char* payload)
139 {
140  //Search for : in payload
141  char* colon = strchr(payload, ':');
142  if (colon == NULL)
143  return;
144  colon++;
145  //Search for ; in payload
146  char* semicolon = strchr(colon, ';');
147  if (semicolon == NULL)
148  return;
149  //Search for , in payload
150  char* comma = strchr(semicolon, ',');
151  if (comma == NULL)
152  return;
153  semicolon[0] = 0; //in colon is now the image type
154  //After the comma the base64 stream starts
155  comma++;
156  int whole_length = strlen(comma);
157  comma[whole_length-1] = 0;
158 
159  bool jpeg = false;
160  if (strcmp(colon, "image/jpeg") == 0)
161  jpeg = true;
162 
163  uint8_t* temp_buffer = video_buffer;
164  if (jpeg)
165  temp_buffer = (uint8_t*)malloc(strlen(comma)+4); //Should always be enough
166 
167  //base64 -> binary data
168  using namespace boost::archive::iterators;
169  typedef
170  transform_width
171  <
172  binary_from_base64
173  <
174  const uint8_t *
175  >,
176  8,
177  6
178  >
179  base64_dec;
180 
181  try
182  {
183  base64_dec src_it(comma);
184  for(int i = 0; i < (whole_length+1)*3/4; ++i)
185  {
186  temp_buffer[i] = *src_it;
187  ++src_it;
188  }
189  }
190  catch (dataflow_exception&)
191  {
192  }
193 
194  if (jpeg)
195  {
196  #if ISAAC_JPEG == 1
197  struct jpeg_decompress_struct cinfo;
198  struct isaac_jpeg_error_mgr jerr;
199  cinfo.err = jpeg_std_error(&jerr.pub);
200  jerr.pub.error_exit = isaac_jpeg_error_exit;
201  if (setjmp(jerr.setjmp_buffer))
202  {
203  jpeg_destroy_decompress(&cinfo);
204  printf("Got invalid jpeg from simulation. Ignoring.\n");
205  free(payload);
206  return;
207  }
208  jpeg_source_mgr src;
209  src.init_source = &isaac_jpeg_init_source;
210  src.fill_input_buffer = &isaac_jpeg_fill_input_buffer;
211  src.skip_input_data = &isaac_jpeg_skip_input_data;
212  src.resync_to_restart = &isaac_jpeg_resync_to_restart;
213  src.term_source = &isaac_jpeg_term_source;
214  jpeg_create_decompress(&cinfo);
215  cinfo.src = &src;
216  cinfo.src->next_input_byte = (JOCTET*)(temp_buffer);
217  cinfo.src->bytes_in_buffer = group->getVideoBufferSize();
218  (void) jpeg_read_header(&cinfo, TRUE);
219  (void) jpeg_start_decompress(&cinfo);
220  int row_stride = cinfo.output_width * cinfo.output_components;
221  JSAMPARRAY buffer = (*cinfo.mem->alloc_sarray)
222  ((j_common_ptr) &cinfo, JPOOL_IMAGE, row_stride, 1);
223  while (cinfo.output_scanline < cinfo.output_height)
224  {
225  int y = cinfo.output_scanline;
226  (void) jpeg_read_scanlines(&cinfo, buffer, 1);
227  for (int x = 0; x < cinfo.output_width; x++)
228  {
229  video_buffer[4*(x+y*cinfo.output_width)+0] = buffer[0][x*3+0];
230  video_buffer[4*(x+y*cinfo.output_width)+1] = buffer[0][x*3+1];
231  video_buffer[4*(x+y*cinfo.output_width)+2] = buffer[0][x*3+2];
232  }
233  }
234  (void) jpeg_finish_decompress(&cinfo);
235  jpeg_destroy_decompress(&cinfo);
236  free(temp_buffer);
237  #else
238  memset( video_buffer, rand()%255, group->video_buffer_size );
239  #endif
240  }
241  free(payload);
242 }
243 
244 std::string Broker::getStream(std::string connector,std::string name,std::string ref)
245 {
246  void* reference;
247  try
248  {
249  reference = (void*)std::stol(ref);
250  }
251  catch (...)
252  {
253  return "";
254  }
255  std::string result ("");
256  ThreadList< InsituConnectorGroup* >::ThreadListContainer_ptr group = insituConnectorGroupList.getFront();
257  while (group)
258  {
259  if (group->t->name == name)
260  {
261  pthread_mutex_lock(&group->t->streams_mutex);
262  std::map< std::string, std::map< void* , std::string > >::iterator it = group->t->streams.find( connector );
263  if (it != group->t->streams.end())
264  {
265  std::map< void* , std::string >::iterator it2 = (*it).second.find( reference );
266  if (it2 != (*it).second.end())
267  result = (*it2).second;
268  }
269  pthread_mutex_unlock(&group->t->streams_mutex);
270  }
271  group = group->next;
272  }
273  return result;
274 }
275 
277 {
278  printf("Running ISAAC Master\n");
279  signal(SIGINT, sighandler);
280  printf("Starting insitu plugin listener\n");
281  if (insituMaster.init(inner_port,inner_interface))
282  {
283  fprintf(stderr,"Error while starting insitu plugin listener\n");
284  signal(SIGINT, SIG_DFL);
285  return -1;
286  }
287  pthread_create(&insituThread,NULL,Runable::run_runable,&insituMaster);
288 
289  for (auto it = dataConnectorList.begin(); it != dataConnectorList.end(); it++)
290  {
291  printf("Launching %s\n",(*it).connector->getName().c_str());
292  pthread_create(&((*it).thread),NULL,Runable::run_runable,(*it).connector);
293  }
294  for (auto it = imageConnectorList.begin(); it != imageConnectorList.end(); it++)
295  {
296  printf("Launching %s\n",(*it).connector->getName().c_str());
297  pthread_create(&((*it).thread),NULL,Runable::run_runable,(*it).connector);
298  }
299  while (force_exit == 0)
300  {
302  // Iterate over all insitu clients //
304  ThreadList< InsituConnectorContainer* >::ThreadListContainer_ptr insitu = insituMaster.insituConnectorList.getFront();
305  while (insitu)
306  {
308  //Check for new messages for every insituConnector
309  MessageContainer* lastMessage = NULL;
310  if (insitu->t->connector->messagesOut.back)
311  lastMessage = insitu->t->connector->messagesOut.back->t;
312  while (MessageContainer* message = insitu->t->connector->masterGetMessage())
313  {
314  bool delete_message = true;
315  if (message->type == PERIOD)
316  {
317  if (insitu->t->group == NULL) //Later!
318  {
319  delete_message = false;
320  insitu->t->connector->clientSendMessage( message );
321  }
322  else
323  {
324  //Let's see, whether some options are broadcastet and change them in the initData
325  json_t* js;
326  if ( js = json_object_get(message->json_root, "init") )
327  {
328  json_decref( insitu->t->group->initData );
329  insitu->t->group->initData = js;
330  json_incref( js );
331  json_object_del( message->json_root, "init" );
332  }
333  //Filter payload
334  json_t* payload = json_object_get( message->json_root, "payload" );
335  if (payload)
336  {
337  json_incref( payload );
338  json_incref( message->json_root );
339  json_object_del( message->json_root, "payload" );
340  //Allocate, receive and send video
341  uint8_t*video_buffer=(uint8_t*)malloc(insitu->t->group->video_buffer_size);
342  receiveVideo(insitu->t->group,video_buffer,json_dumps( payload , JSON_ENCODE_ANY ));
343  int l=imageConnectorList.size();
344  ImageBufferContainer *container=new ImageBufferContainer(UPDATE_BUFFER,video_buffer,insitu->t->group,l,"",NULL,message->json_root,payload,insitu->t->group->getID());
345  if(l==0)
346  container->image->suicide();
347  else
348  for(auto ic=imageConnectorList.begin();ic!=imageConnectorList.end();ic++)
349  (*ic).connector->masterSendMessage(container);
350  }
351  else
352  {
353  //Direct send of metadata!
355  while(dc)
356  {
357  int stream;
358  bool dropable;
359  if(dc->t->doesObserve(insitu->t->group->getID(),stream,dropable))
360  dc->t->masterSendMessage(new MessageContainer(message->type,message->json_root,true,dropable));
361  dc=dc->next;
362  }
363  }
364  }
365  }
366  else
367  if (message->type == REGISTER) //Saving the metadata description for later
368  {
369  //Get group
370  std::string name( json_string_value( json_object_get( message->json_root, "name" ) ) );
371  InsituConnectorGroup* group = NULL;
372  ThreadList< InsituConnectorGroup* >::ThreadListContainer_ptr it = insituConnectorGroupList.getFront();
373  while (it)
374  {
375  if (it->t->name == name)
376  {
377  group = it->t;
378  break;
379  }
380  it = it->next;
381  }
382  if (group == NULL)
383  {
384  group = new InsituConnectorGroup(name);
385  insituConnectorGroupList.push_back( group );
386  }
387  insitu->t->group = group;
388 
389  switch (message->type)
390  {
391  case REGISTER:
392  group->initData = message->json_root;
393  group->framebuffer_width = json_integer_value( json_object_get(group->initData, "framebuffer width") );
394  group->framebuffer_height = json_integer_value( json_object_get(group->initData, "framebuffer height") );
395  group->video_buffer_size = group->framebuffer_width*group->framebuffer_height*4;
396  delete_message = false;
397  printf("New connection, giving id %i (control)\n",insitu->t->connector->getID());
398  break;
399  }
400 
401  group->master = insitu->t;
402  group->id = insitu->t->connector->getID();
403 
404  if (group->master)
405  {
406  printf("Group complete, sending to connected interfaces\n");
408  while (dc)
409  {
410  dc->t->masterSendMessage(new MessageContainer(REGISTER,group->initData,true));
411  dc = dc->next;
412  }
413  for (auto it = imageConnectorList.begin(); it != imageConnectorList.end(); it++)
414  (*it).connector->masterSendMessage(new ImageBufferContainer(GROUP_ADDED,NULL,group,1));
415  }
416  }
417  else
418  if (message->type == EXIT_PLUGIN) //Let's tell everybody and remove it from the list
419  {
420  InsituConnectorContainer* insituContainer = insitu->t;
421  int id = insituContainer->connector->getID();
422  printf("Connection %i closed.\n",id);
423  //Group does still exist?
424  if (insituContainer->group)
425  {
426  InsituConnectorGroup* group = insituContainer->group;
427  //Add id of group
428  json_object_set_new( message->json_root, "id", json_integer( group->getID() ) );
430  while (dc)
431  {
432  dc->t->masterSendMessage(new MessageContainer(EXIT_PLUGIN,message->json_root,true));
433  dc = dc->next;
434  }
435  for (auto it = imageConnectorList.begin(); it != imageConnectorList.end(); it++)
436  (*it).connector->masterSendMessage(new ImageBufferContainer(GROUP_FINISHED,NULL,group,1));
437  //Now let's remove the whole group
438  group->master->group = NULL;
439  printf("Removed group %i\n",group->id);
440  ThreadList< InsituConnectorGroup* >::ThreadListContainer_ptr it = insituConnectorGroupList.getFront();
441  while (it)
442  {
443  if (it->t == group)
444  {
445  pthread_t thread;
446  pthread_create(&thread,NULL,delete_pointer_later<InsituConnectorGroup>,insituConnectorGroupList.remove(it));
447  break;
448  }
449  it = it->next;
450  }
451  }
452  delete insituMaster.insituConnectorList.remove(insitu);
453  break;
454  }
455  if (delete_message)
456  delete message;
457  if (message == lastMessage)
458  break;
459  }
460  insitu = next;
461  }
462 
464  // Iterate over all metadata clients //
467  while (dc)
468  {
470  //Check for new messages for every client
471  while (MessageContainer* message = dc->t->masterGetMessage())
472  {
473  if (message->type == FEEDBACK)
474  {
475  json_t* observe_id = json_object_get(message->json_root, "observe id");
476  if (observe_id)
477  {
478  int id = json_integer_value( observe_id );
479  //Send feedback to observing insitu
480  ThreadList< InsituConnectorGroup* >::ThreadListContainer_ptr group = insituConnectorGroupList.getFront();
481  while (group)
482  {
483  char* buffer = json_dumps( message->json_root, 0 );
484  if ( group->t->master->connector->getID() == id)
485  {
486  send(group->t->master->connector->getSockFD(),buffer,strlen(buffer),0);
487  break;
488  }
489  free(buffer);
490  group = group->next;
491  }
492  }
493  }
494  if (message->type == OBSERVE)
495  {
496  int id = json_integer_value( json_object_get(message->json_root, "observe id") );
497  int stream = json_integer_value( json_object_get(message->json_root, "stream") );
498  bool dropable = json_boolean_value( json_object_get(message->json_root, "dropable") );
499  if ( stream < 0 )
500  stream = 0;
501  if ( stream >= imageConnectorList.size() )
502  stream = imageConnectorList.size()-1;
503  const char* url = json_string_value( json_object_get(message->json_root, "url") );
504  void* ref = (void*)dc->t;
505  dc->t->observe( id, stream, dropable );
506  InsituConnectorGroup* group = NULL;
507  ThreadList< InsituConnectorGroup* >::ThreadListContainer_ptr it = insituConnectorGroupList.getFront();
508  while (it)
509  {
510  if (it->t->getID() == id)
511  {
512  group = it->t;
513  break;
514  }
515  it = it->next;
516  }
517  if (group)
518  {
519  json_t *js, *root = json_object();
520  if (json_array_size( js = json_object_get( group->initData, "projection") ) == 16)
521  json_object_set( root, "projection", js );
522  if (json_array_size( js = json_object_get( group->initData, "rotation") ) == 9)
523  json_object_set( root, "rotation", js );
524  if (json_array_size( js = json_object_get( group->initData, "position") ) == 3)
525  json_object_set( root, "position", js );
526  if ( js = json_object_get( group->initData, "distance") )
527  json_object_set( root, "distance", js );
528  json_object_set_new( root, "type", json_string( "update" ) );
529  dc->t->masterSendMessage(new MessageContainer(UPDATE,root));
530  imageConnectorList[ stream ].connector->masterSendMessage(new ImageBufferContainer(GROUP_OBSERVED,NULL,group,1,url,ref));
531  //Send request for (transfer) functions and most recent frame
532  char buffer[] =
533  "{\"type\": \"feedback\", \"request\": \"transfer\"} "
534  "{\"type\": \"feedback\", \"request\": \"functions\"} "
535  "{\"type\": \"feedback\", \"request\": \"weight\"} "
536  "{\"type\": \"feedback\", \"request\": \"clipping\"} "
537  "{\"type\": \"feedback\", \"request\": \"redraw\"} "
538  "{\"type\": \"feedback\", \"request\": \"controller\"}";
539  send(group->master->connector->getSockFD(),buffer,strlen(buffer),MSG_NOSIGNAL);
540  }
541  }
542  if (message->type == STOP)
543  {
544  int id = json_integer_value( json_object_get(message->json_root, "observe id") );
545  const char* url = json_string_value( json_object_get(message->json_root, "url") );
546  void* ref = (void*)dc->t;
547  int stream = 0;
548  bool dropable = false;
549  dc->t->stopObserve( id, stream, dropable );
550  InsituConnectorGroup* group = NULL;
551  ThreadList< InsituConnectorGroup* >::ThreadListContainer_ptr it = insituConnectorGroupList.getFront();
552  while (it)
553  {
554  if (it->t->getID() == id)
555  {
556  group = it->t;
557  break;
558  }
559  it = it->next;
560  }
561  if (group)
562  imageConnectorList[ stream ].connector->masterSendMessage(new ImageBufferContainer(GROUP_OBSERVED_STOPPED,NULL,group,1,url,ref));
563  }
564  if (message->type == CLOSED)
565  {
566  void* ref = (void*)dc->t;
567  ThreadList< InsituConnectorGroup* >::ThreadListContainer_ptr gr = insituConnectorGroupList.getFront();
568  while (gr)
569  {
570  int stream;
571  bool dropable;
572  if (dc->t->doesObserve(gr->t->getID(), stream, dropable))
573  {
574  imageConnectorList[ stream ].connector->masterSendMessage(new ImageBufferContainer(GROUP_OBSERVED_STOPPED,NULL,gr->t,1,"",ref));
575  }
576  gr = gr->next;
577  }
578  dataClientList.remove(dc);
579  break;
580  }
581  delete message;
582  }
583  dc = next;
584  }
585 
587  // Iterate over all image connectors //
589  for (auto it = imageConnectorList.begin(); it != imageConnectorList.end(); it++)
590  {
591  while (ImageBufferContainer* message = (*it).connector->masterGetMessage())
592  {
593  if (message->type == REGISTER_STREAM)
594  {
595  pthread_mutex_lock(&message->group->streams_mutex);
596  message->group->streams[(*it).connector->getName()].insert( std::pair< void*,std::string >( message->reference, std::string((char*)message->image->buffer) ));
597  pthread_mutex_unlock(&message->group->streams_mutex);
598  json_t* root = json_object();
599  json_object_set_new( root, "type", json_string ("register video") );
600  json_object_set_new( root, "name", json_string ( message->group->getName().c_str() ) );
601  json_object_set_new( root, "connector", json_string ( (*it).connector->getName().c_str() ) );
602  json_object_set_new( root, "reference", json_integer ( (long)message->reference ) );
604  while (dc)
605  {
606  dc->t->masterSendMessage(new MessageContainer(REGISTER_VIDEO,root,true));
607  dc = dc->next;
608  }
609  json_decref( root );
610  }
611  message->image->suicide();
612  message->ref_count--;
613  if (message->ref_count == 0)
614  {
615  //Sending json message
616  if (message->type == UPDATE_BUFFER)
617  {
618  //Send json data
620  while(dc)
621  {
622  int stream;
623  bool dropable;
624  if(dc->t->doesObserve(message->insitu_id,stream,dropable))
625  dc->t->masterSendMessage(new MessageContainer(NONE,message->json,true,dropable));
626  dc=dc->next;
627  }
628  json_decref(message->json);
629  json_decref(message->payload);
630  }
631  delete(message);
632  }
633  }
634  }
635  usleep(100);
636  }
637 
638 
639  //shutdown(insituMaster.getSockFD(),SHUT_RDWR);
640  insituMaster.setExit();
641  printf("Waiting for insitu Master thread to finish... ");
642  fflush(stdout);
643  pthread_join(insituThread,NULL);
644  printf("Done\n");
645  for (auto it = dataConnectorList.begin(); it != dataConnectorList.end(); it++)
646  {
647  printf("Asking %s to exit\n",(*it).connector->getName().c_str());
648  (*it).connector->masterSendMessage(new MessageContainer(FORCE_EXIT));
649  }
650  for (auto it = imageConnectorList.begin(); it != imageConnectorList.end(); it++)
651  {
652  printf("Asking %s to exit\n",(*it).connector->getName().c_str());
653  (*it).connector->masterSendMessage(new ImageBufferContainer(IMG_FORCE_EXIT,NULL,NULL,1));
654  }
655  for (auto it = dataConnectorList.begin(); it != dataConnectorList.end(); it++)
656  {
657  pthread_join((*it).thread,NULL);
658  printf("%s finished\n",(*it).connector->getName().c_str());
659  }
660  for (auto it = imageConnectorList.begin(); it != imageConnectorList.end(); it++)
661  {
662  pthread_join((*it).thread,NULL);
663  printf("%s finished\n",(*it).connector->getName().c_str());
664  }
665  signal(SIGINT, SIG_DFL);
666  return 0;
667 }
668 
670 {
671  json_decref( masterHello );
672  dataConnectorList.clear();
673  while (MetaDataClient* mom = dataClientList.pop_front())
674  delete mom;
675  while ( ThreadList< InsituConnectorGroup* >::ThreadListContainer_ptr it = insituConnectorGroupList.getFront() )
676  delete( insituConnectorGroupList.remove(it) );
677 }
Definition: Common.hpp:42
static void * run_runable(void *ptr)
Definition: Runable.cpp:18
void * delete_pointer_later(void *ptr)
Definition: Broker.cpp:39
Definition: Common.hpp:39
int getVideoBufferSize()
Definition: Broker.hpp:66
ThreadListContainer_ptr getFront()
Definition: ThreadList.hpp:87
std::string getStream(std::string connector, std::string name, std::string ref)
Definition: Broker.cpp:244
struct ThreadListContainer_struct * next
Definition: ThreadList.hpp:35
void suicide()
Definition: Common.hpp:149
virtual std::string getName()=0
void setBroker(Broker *broker)
static volatile sig_atomic_t force_exit
Definition: Broker.hpp:110
~Broker()
Definition: Broker.cpp:669
Broker(std::string name, int inner_port, std::string interface)
Definition: Broker.cpp:45
int errorCode
Definition: Common.hpp:24
errorCode masterSendMessage(MessageTemplate *message)
Definition: MessageAble.hpp:43
void push_back(T t)
Definition: ThreadList.hpp:45
MetaDataClient * addDataClient()
Definition: Broker.cpp:85
errorCode addImageConnector(ImageConnector *imageConnector)
Definition: Broker.cpp:68
MetaDataConnector * connector
Definition: Broker.hpp:36
void receiveVideo(InsituConnectorGroup *group, uint8_t *video_buffer, char *payload)
Definition: Broker.cpp:138
void sighandler(int sig)
Definition: Broker.cpp:32
ImageConnector * connector
Definition: Broker.hpp:42
errorCode run()
Definition: Broker.cpp:276
void setBroker(Broker *broker)
errorCode addDataConnector(MetaDataConnector *dataConnector)
Definition: Broker.cpp:58
ImageBuffer * image
Definition: Common.hpp:199