20 #include <sys/socket.h> 27 #include <boost/archive/iterators/binary_from_base64.hpp> 28 #include <boost/archive/iterators/transform_width.hpp> 38 template <
typename Type>
48 this->inner_port = inner_port;
49 this->inner_interface = interface;
50 this->insituThread = 0;
51 masterHello = json_object();
52 json_object_set_new( masterHello,
"type", json_string(
"hello" ) );
53 json_object_set_new( masterHello,
"name", json_string( name.c_str() ) );
54 masterHelloConnectorList = json_array();
55 json_object_set_new( masterHello,
"streams", masterHelloConnectorList );
64 dataConnectorList.push_back(d);
74 imageConnectorList.push_back(d);
77 json_t* element = json_object();
78 json_object_set_new( element,
"name", json_string( imageConnector->
getName().c_str() ) );
79 json_object_set_new( element,
"id", json_integer( imageConnectorList.size() - 1 ) );
80 json_array_append_new( masterHelloConnectorList, element );
96 json_incref( mom->
t->initData );
105 void isaac_jpeg_init_source(j_decompress_ptr cinfo)
108 boolean isaac_jpeg_fill_input_buffer(j_decompress_ptr cinfo)
112 void isaac_jpeg_skip_input_data(j_decompress_ptr cinfo,
long num_bytes)
115 boolean isaac_jpeg_resync_to_restart(j_decompress_ptr cinfo,
int desired)
119 void isaac_jpeg_term_source(j_decompress_ptr cinfo)
123 struct isaac_jpeg_error_mgr {
124 struct jpeg_error_mgr pub;
125 jmp_buf setjmp_buffer;
127 typedef struct isaac_jpeg_error_mgr * isaac_jpeg_error_ptr;
130 isaac_jpeg_error_exit (j_common_ptr cinfo)
132 isaac_jpeg_error_ptr err = (isaac_jpeg_error_ptr) cinfo->err;
133 (*cinfo->err->output_message) (cinfo);
134 longjmp(err->setjmp_buffer, 1);
141 char* colon = strchr(payload,
':');
146 char* semicolon = strchr(colon,
';');
147 if (semicolon == NULL)
150 char* comma = strchr(semicolon,
',');
156 int whole_length = strlen(comma);
157 comma[whole_length-1] = 0;
160 if (strcmp(colon,
"image/jpeg") == 0)
163 uint8_t* temp_buffer = video_buffer;
165 temp_buffer = (uint8_t*)malloc(strlen(comma)+4);
168 using namespace boost::archive::iterators;
183 base64_dec src_it(comma);
184 for(
int i = 0; i < (whole_length+1)*3/4; ++i)
186 temp_buffer[i] = *src_it;
190 catch (dataflow_exception&)
197 struct jpeg_decompress_struct cinfo;
198 struct isaac_jpeg_error_mgr jerr;
199 cinfo.err = jpeg_std_error(&jerr.pub);
200 jerr.pub.error_exit = isaac_jpeg_error_exit;
201 if (setjmp(jerr.setjmp_buffer))
203 jpeg_destroy_decompress(&cinfo);
204 printf(
"Got invalid jpeg from simulation. Ignoring.\n");
209 src.init_source = &isaac_jpeg_init_source;
210 src.fill_input_buffer = &isaac_jpeg_fill_input_buffer;
211 src.skip_input_data = &isaac_jpeg_skip_input_data;
212 src.resync_to_restart = &isaac_jpeg_resync_to_restart;
213 src.term_source = &isaac_jpeg_term_source;
214 jpeg_create_decompress(&cinfo);
216 cinfo.src->next_input_byte = (JOCTET*)(temp_buffer);
218 (void) jpeg_read_header(&cinfo, TRUE);
219 (void) jpeg_start_decompress(&cinfo);
220 int row_stride = cinfo.output_width * cinfo.output_components;
221 JSAMPARRAY buffer = (*cinfo.mem->alloc_sarray)
222 ((j_common_ptr) &cinfo, JPOOL_IMAGE, row_stride, 1);
223 while (cinfo.output_scanline < cinfo.output_height)
225 int y = cinfo.output_scanline;
226 (void) jpeg_read_scanlines(&cinfo, buffer, 1);
227 for (
int x = 0; x < cinfo.output_width; x++)
229 video_buffer[4*(x+y*cinfo.output_width)+0] = buffer[0][x*3+0];
230 video_buffer[4*(x+y*cinfo.output_width)+1] = buffer[0][x*3+1];
231 video_buffer[4*(x+y*cinfo.output_width)+2] = buffer[0][x*3+2];
234 (void) jpeg_finish_decompress(&cinfo);
235 jpeg_destroy_decompress(&cinfo);
238 memset( video_buffer, rand()%255, group->video_buffer_size );
249 reference = (
void*)std::stol(ref);
255 std::string result (
"");
259 if (group->
t->name == name)
261 pthread_mutex_lock(&group->
t->streams_mutex);
262 std::map< std::string, std::map< void* , std::string > >::iterator it = group->
t->streams.find( connector );
263 if (it != group->
t->streams.end())
265 std::map< void* , std::string >::iterator it2 = (*it).second.find( reference );
266 if (it2 != (*it).second.end())
267 result = (*it2).second;
269 pthread_mutex_unlock(&group->
t->streams_mutex);
278 printf(
"Running ISAAC Master\n");
280 printf(
"Starting insitu plugin listener\n");
281 if (insituMaster.init(inner_port,inner_interface))
283 fprintf(stderr,
"Error while starting insitu plugin listener\n");
284 signal(SIGINT, SIG_DFL);
289 for (
auto it = dataConnectorList.begin(); it != dataConnectorList.end(); it++)
291 printf(
"Launching %s\n",(*it).connector->getName().c_str());
294 for (
auto it = imageConnectorList.begin(); it != imageConnectorList.end(); it++)
296 printf(
"Launching %s\n",(*it).connector->getName().c_str());
310 if (insitu->
t->connector->messagesOut.back)
311 lastMessage = insitu->
t->connector->messagesOut.back->t;
314 bool delete_message =
true;
315 if (message->type ==
PERIOD)
317 if (insitu->
t->group == NULL)
319 delete_message =
false;
320 insitu->
t->connector->clientSendMessage( message );
326 if ( js = json_object_get(message->json_root,
"init") )
328 json_decref( insitu->
t->group->initData );
329 insitu->
t->group->initData = js;
331 json_object_del( message->json_root,
"init" );
334 json_t* payload = json_object_get( message->json_root,
"payload" );
337 json_incref( payload );
338 json_incref( message->json_root );
339 json_object_del( message->json_root,
"payload" );
341 uint8_t*video_buffer=(uint8_t*)malloc(insitu->
t->group->video_buffer_size);
342 receiveVideo(insitu->
t->group,video_buffer,json_dumps( payload , JSON_ENCODE_ANY ));
343 int l=imageConnectorList.size();
348 for(
auto ic=imageConnectorList.begin();ic!=imageConnectorList.end();ic++)
349 (*ic).connector->masterSendMessage(container);
359 if(dc->t->doesObserve(insitu->
t->group->getID(),stream,dropable))
360 dc->t->masterSendMessage(
new MessageContainer(message->type,message->json_root,
true,dropable));
370 std::string name( json_string_value( json_object_get( message->json_root,
"name" ) ) );
375 if (it->
t->name == name)
385 insituConnectorGroupList.push_back( group );
387 insitu->
t->group = group;
389 switch (message->type)
392 group->initData = message->json_root;
393 group->framebuffer_width = json_integer_value( json_object_get(group->initData,
"framebuffer width") );
394 group->framebuffer_height = json_integer_value( json_object_get(group->initData,
"framebuffer height") );
395 group->video_buffer_size = group->framebuffer_width*group->framebuffer_height*4;
396 delete_message =
false;
397 printf(
"New connection, giving id %i (control)\n",insitu->
t->connector->getID());
401 group->master = insitu->
t;
402 group->id = insitu->
t->connector->getID();
406 printf(
"Group complete, sending to connected interfaces\n");
413 for (
auto it = imageConnectorList.begin(); it != imageConnectorList.end(); it++)
422 printf(
"Connection %i closed.\n",
id);
424 if (insituContainer->
group)
428 json_object_set_new( message->json_root,
"id", json_integer( group->
getID() ) );
435 for (
auto it = imageConnectorList.begin(); it != imageConnectorList.end(); it++)
438 group->master->
group = NULL;
439 printf(
"Removed group %i\n",group->id);
446 pthread_create(&thread,NULL,delete_pointer_later<InsituConnectorGroup>,insituConnectorGroupList.remove(it));
452 delete insituMaster.insituConnectorList.remove(insitu);
457 if (message == lastMessage)
475 json_t* observe_id = json_object_get(message->json_root,
"observe id");
478 int id = json_integer_value( observe_id );
483 char* buffer = json_dumps( message->json_root, 0 );
484 if ( group->
t->master->connector->getID() == id)
486 send(group->
t->master->connector->getSockFD(),buffer,strlen(buffer),0);
496 int id = json_integer_value( json_object_get(message->json_root,
"observe id") );
497 int stream = json_integer_value( json_object_get(message->json_root,
"stream") );
498 bool dropable = json_boolean_value( json_object_get(message->json_root,
"dropable") );
501 if ( stream >= imageConnectorList.size() )
502 stream = imageConnectorList.size()-1;
503 const char* url = json_string_value( json_object_get(message->json_root,
"url") );
504 void* ref = (
void*)dc->t;
505 dc->t->observe(
id, stream, dropable );
510 if (it->
t->getID() == id)
519 json_t *js, *root = json_object();
520 if (json_array_size( js = json_object_get( group->initData,
"projection") ) == 16)
521 json_object_set( root,
"projection", js );
522 if (json_array_size( js = json_object_get( group->initData,
"rotation") ) == 9)
523 json_object_set( root,
"rotation", js );
524 if (json_array_size( js = json_object_get( group->initData,
"position") ) == 3)
525 json_object_set( root,
"position", js );
526 if ( js = json_object_get( group->initData,
"distance") )
527 json_object_set( root,
"distance", js );
528 json_object_set_new( root,
"type", json_string(
"update" ) );
533 "{\"type\": \"feedback\", \"request\": \"transfer\"} " 534 "{\"type\": \"feedback\", \"request\": \"functions\"} " 535 "{\"type\": \"feedback\", \"request\": \"weight\"} " 536 "{\"type\": \"feedback\", \"request\": \"clipping\"} " 537 "{\"type\": \"feedback\", \"request\": \"redraw\"} " 538 "{\"type\": \"feedback\", \"request\": \"controller\"}";
542 if (message->type ==
STOP)
544 int id = json_integer_value( json_object_get(message->json_root,
"observe id") );
545 const char* url = json_string_value( json_object_get(message->json_root,
"url") );
546 void* ref = (
void*)dc->t;
548 bool dropable =
false;
549 dc->t->stopObserve(
id, stream, dropable );
554 if (it->
t->getID() == id)
564 if (message->type ==
CLOSED)
566 void* ref = (
void*)dc->t;
572 if (dc->t->doesObserve(gr->t->getID(), stream, dropable))
578 dataClientList.remove(dc);
589 for (
auto it = imageConnectorList.begin(); it != imageConnectorList.end(); it++)
595 pthread_mutex_lock(&message->group->streams_mutex);
596 message->group->streams[(*it).connector->getName()].insert( std::pair< void*,std::string >( message->reference, std::string((
char*)message->image->buffer) ));
597 pthread_mutex_unlock(&message->group->streams_mutex);
598 json_t* root = json_object();
599 json_object_set_new( root,
"type", json_string (
"register video") );
600 json_object_set_new( root,
"name", json_string ( message->group->getName().c_str() ) );
601 json_object_set_new( root,
"connector", json_string ( (*it).connector->getName().c_str() ) );
602 json_object_set_new( root,
"reference", json_integer ( (
long)message->reference ) );
611 message->image->suicide();
612 message->ref_count--;
613 if (message->ref_count == 0)
624 if(dc->t->doesObserve(message->insitu_id,stream,dropable))
628 json_decref(message->json);
629 json_decref(message->payload);
640 insituMaster.setExit();
641 printf(
"Waiting for insitu Master thread to finish... ");
643 pthread_join(insituThread,NULL);
645 for (
auto it = dataConnectorList.begin(); it != dataConnectorList.end(); it++)
647 printf(
"Asking %s to exit\n",(*it).connector->getName().c_str());
650 for (
auto it = imageConnectorList.begin(); it != imageConnectorList.end(); it++)
652 printf(
"Asking %s to exit\n",(*it).connector->getName().c_str());
655 for (
auto it = dataConnectorList.begin(); it != dataConnectorList.end(); it++)
657 pthread_join((*it).thread,NULL);
658 printf(
"%s finished\n",(*it).connector->getName().c_str());
660 for (
auto it = imageConnectorList.begin(); it != imageConnectorList.end(); it++)
662 pthread_join((*it).thread,NULL);
663 printf(
"%s finished\n",(*it).connector->getName().c_str());
665 signal(SIGINT, SIG_DFL);
671 json_decref( masterHello );
672 dataConnectorList.clear();
676 delete( insituConnectorGroupList.remove(it) );
static void * run_runable(void *ptr)
void * delete_pointer_later(void *ptr)
ThreadListContainer_ptr getFront()
std::string getStream(std::string connector, std::string name, std::string ref)
InsituConnectorGroup * group
struct ThreadListContainer_struct * next
virtual std::string getName()=0
void setBroker(Broker *broker)
static volatile sig_atomic_t force_exit
Broker(std::string name, int inner_port, std::string interface)
errorCode masterSendMessage(MessageTemplate *message)
MetaDataClient * addDataClient()
errorCode addImageConnector(ImageConnector *imageConnector)
InsituConnector * connector
void receiveVideo(InsituConnectorGroup *group, uint8_t *video_buffer, char *payload)
ImageConnector * connector
errorCode addDataConnector(MetaDataConnector *dataConnector)