ISAAC
Overview :: Library Doc :: Server Doc :: JSON Commands

In Situ Animation of Accelerated Computations

isaac_communicator.hpp
Go to the documentation of this file.
1 /* This file is part of ISAAC.
2  *
3  * ISAAC is free software: you can redistribute it and/or modify
4  * it under the terms of the GNU Lesser General Public License as
5  * published by the Free Software Foundation, either version 3 of the
6  * License, or (at your option) any later version.
7  *
8  * ISAAC is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU Lesser General Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public
14  * License along with ISAAC. If not, see <www.gnu.org/licenses/>. */
15 
16 #pragma once
17 
18 #include <sys/socket.h>
19 #include <netinet/in.h>
20 #include <netinet/tcp.h>
21 #include <netdb.h>
22 #include <unistd.h>
23 
24 #if ISAAC_JPEG == 1
25  #include <jpeglib.h>
26 #endif
27 
28 #include "isaac_macros.hpp"
29 
30 #include <boost/archive/iterators/base64_from_binary.hpp>
31 #include <boost/archive/iterators/transform_width.hpp>
32 #include <boost/archive/iterators/ostream_iterator.hpp>
33 #include <sstream>
34 
35 namespace isaac
36 {
37 
39 {
42 };
43 
45 {
46  public:
47  IsaacCommunicator(const std::string url,const isaac_uint port) :
48  id(0),
49  server_id(0),
50  url(url),
51  port(port),
52  sockfd(0),
53  jpeg_quality(90),
54  registerMessage(NULL)
55  {
56  pthread_mutex_init (&deleteMessageMutex, NULL);
57  }
58  json_t* getLastMessage()
59  {
60  json_t* result = NULL;
61  pthread_mutex_lock(&deleteMessageMutex);
62  if (!messageList.empty())
63  {
64  result = messageList.front();
65  messageList.pop_front();
66  }
67  pthread_mutex_unlock(&deleteMessageMutex);
68  return result;
69  }
71  {
72  struct hostent *server;
73  server = gethostbyname(url.c_str());
74  if (!server)
75  {
76  if (setting == ReturnAtError)
77  {
78  fprintf(stderr,"Could not resolve %s.\n",url.c_str());
79  return -1;
80  }
81  else
82  {
83  sockfd = 0;
84  return 1;
85  }
86  }
87  sockfd = socket(AF_INET, SOCK_STREAM, 0);
88  if (sockfd < 0)
89  {
90  if (setting == ReturnAtError)
91  {
92  fprintf(stderr,"Could not create socket.\n");
93  return -2;
94  }
95  else
96  {
97  sockfd = 0;
98  return 1;
99  }
100  }
101  struct sockaddr_in serv_addr;
102  memset(&serv_addr,0, sizeof(serv_addr));
103  serv_addr.sin_family = AF_INET;
104  bcopy((char *)server->h_addr,(char *)&serv_addr.sin_addr.s_addr,server->h_length);
105  serv_addr.sin_port = htons(port);
106  if (connect(sockfd,(struct sockaddr *) &serv_addr,sizeof(serv_addr)) < 0)
107  {
108  close(sockfd);
109  if (setting == ReturnAtError)
110  {
111  fprintf(stderr,"Could not connect to %s.\n",url.c_str());
112  return -3;
113  }
114  else
115  {
116  sockfd = 0;
117  return 1;
118  }
119  }
120  pthread_create(&readThread,NULL,run_readAndSetMessages,this);
121  return 0;
122  }
123 
124  isaac_int serverSend(char * content, bool starting = true, bool finishing = false)
125  {
126  if (sockfd < 0)
127  return 0;
128  if (sockfd == 0) //Connection lost or never established
129  {
131  return 0;
132  char* content = json_dumps( *registerMessage, 0 );
133  isaac_int result = serverSend(content,true,true);
134  free(content);
135  }
136  int n = 0;
137  if (starting)
138  {
139  int c = 0;
140  while (id > server_id + ISAAC_MAX_DIFFERENCE)
141  {
142  usleep(1000);
143  c++;
144  if (c > 1000) //1s!
145  {
146  id = server_id-1;
147  break;
148  }
149  }
150  char id_string[32];
151  sprintf(id_string,"{\"uid\": %i",id);
152  int add = send(sockfd,id_string,strlen(id_string),MSG_MORE | MSG_NOSIGNAL);
153  ISAAC_HANDLE_EPIPE(add,n,sockfd,readThread)
154  n += add;
155  id++;
156  }
157  if (content)
158  {
159  content[0] = ',';
160  uint32_t l = strlen(content)-1; //without closing }
161  //content[l] = 0;
162  int amount = (l+4095)/4096;
163  for (int i = 0; i < amount; i++)
164  {
165  if (i == amount - 1)
166  {
167  int add = send(sockfd,&content[i*4096],l - i * 4096,MSG_MORE | MSG_NOSIGNAL);
168  ISAAC_HANDLE_EPIPE(add,n,sockfd,readThread)
169  n += add;
170  }
171  else
172  {
173  int add = send(sockfd,&content[i*4096],4096,MSG_MORE | MSG_NOSIGNAL);
174  ISAAC_HANDLE_EPIPE(add,n,sockfd,readThread)
175  n += add;
176  }
177  }
178  }
179  if (finishing)
180  {
181  char finisher[] = "} ";
182  int add = send(sockfd,finisher,2,MSG_NOSIGNAL);
183  ISAAC_HANDLE_EPIPE(add,n,sockfd,readThread)
184  n += add;
185  }
186  return n;
187  }
188 
189  isaac_int serverSendRegister(json_t** registerMessage)
190  {
191  this->registerMessage = registerMessage;
192  char* content = json_dumps( *registerMessage, 0 );
193  isaac_int result = serverSend(content,true,true);
194  free(content);
195  return result;
196  }
197 
198  #if ISAAC_JPEG == 1
199  static void isaac_init_destination(j_compress_ptr cinfo)
200  {
201  }
202  static boolean isaac_jpeg_empty_output_buffer(j_compress_ptr cinfo)
203  {
204  return true;
205  }
206  static void isaac_jpeg_term_destination(j_compress_ptr cinfo)
207  {
208  }
209  #endif
210  void setJpegQuality(isaac_uint jpeg_quality)
211  {
212  if (jpeg_quality > 100)
213  jpeg_quality = 100;
214  this->jpeg_quality = jpeg_quality;
215  }
216  void serverSendFrame(void* ptr,const isaac_uint width,const isaac_uint height,const isaac_uint depth)
217  {
218  //First the size
219  uint32_t count = width*height*depth;
220  #if ISAAC_JPEG == 1
221  struct jpeg_compress_struct cinfo;
222  struct jpeg_error_mgr jerr;
223  jpeg_destination_mgr dest;
224  dest.init_destination = &isaac_init_destination;
225  dest.empty_output_buffer = &isaac_jpeg_empty_output_buffer;
226  dest.term_destination = &isaac_jpeg_term_destination;
227  cinfo.err = jpeg_std_error(&jerr);
228  jpeg_create_compress(&cinfo);
229  cinfo.dest = &dest;
230  std::vector<char> jpeg_buffer;
231  jpeg_buffer.resize( count );
232  cinfo.dest->next_output_byte = (JOCTET*)(jpeg_buffer.data());
233  cinfo.dest->free_in_buffer = count;
234  cinfo.image_width = width;
235  cinfo.image_height = height;
236  cinfo.input_components = depth;
237  cinfo.in_color_space = JCS_EXT_RGBX;
238  jpeg_set_defaults(&cinfo);
239  jpeg_set_quality(&cinfo, jpeg_quality, false);
240  jpeg_start_compress(&cinfo, TRUE);
241  while (cinfo.next_scanline < cinfo.image_height)
242  {
243  JSAMPROW row_pointer[1];
244  row_pointer[0] = & ((JSAMPROW)ptr)[cinfo.next_scanline * width * depth];
245  (void) jpeg_write_scanlines(&cinfo, row_pointer, 1);
246  }
247  jpeg_finish_compress(&cinfo);
248  count -= cinfo.dest->free_in_buffer;
249  ptr = jpeg_buffer.data();
250  jpeg_destroy_compress(&cinfo);
251  #endif
252 
253  using namespace boost::archive::iterators;
254  std::stringstream payload;
255  typedef
256  base64_from_binary
257  <
258  transform_width
259  <
260  const unsigned char *,
261  6,
262  8
263  >
264  >
265  base64_text; // compose all the above operations in to a new iterator
266 
267  std::copy(
268  base64_text( (char*)ptr ),
269  base64_text( (char*)ptr + count),
270  boost::archive::iterators::ostream_iterator<char>(payload)
271  );
272 
273  #if ISAAC_JPEG == 1
274  char header[] = "{\"payload\": \"data:image/jpeg;base64,";
275  #else
276  char header[] = "{\"payload\": \"data:image/raw-rgba;base64,";
277  #endif
278  char footer[] = "\"}";
279  int hl = strlen(header);
280  int pl = payload.str().length();
281  int fl = strlen(footer);
282  #if ISAAC_VALGRIND_TWEAKS == 1
283  //Allocating one letter more for \0 and 4 letter more because of
284  //strlen (of glib) always reading 4 aligned bytes - even after \0.
285  //It should never crash because of the missing 4 bytes - but
286  //valgrind does complain nevertheless.
287  char* message = (char*)malloc(hl+pl+fl+1+4);
288  #else
289  char* message = (char*)malloc(hl+pl+fl+1);
290  #endif
291  memcpy( message ,header,hl);
292  memcpy(&(message[hl ]),payload.str().c_str(),pl);
293  memcpy(&(message[hl+pl]),footer,fl+1); //with 0
294  serverSend( message, false, true );
295  free(message);
296  }
298  {
299  close(sockfd);
300  }
302  {
303  if (sockfd)
305  usleep(100000); //100ms
306  pthread_cancel(readThread);
307  pthread_mutex_destroy(&deleteMessageMutex);
308  }
309  private:
310  void readAndSetMessages()
311  {
312  while (json_t * content = json_load_callback(json_load_callback_function,&sockfd,JSON_DISABLE_EOF_CHECK,NULL))
313  {
314  //Search for ready messages:
315  json_t* js;
316  if (js = json_object_get( content, "fatal error"))
317  {
318  const char* fatal_error = json_string_value( js );
319  fprintf(stderr,"Fatal error: \"%s\".\n",fatal_error);
320  if (strcmp(fatal_error,"protocol mismatch") == 0)
321  {
322  close(sockfd);
323  sockfd = -1;
324  }
325  json_decref( content );
326  }
327  else
328  if (js = json_object_get( content, "done"))
329  {
330  isaac_uint new_server_id = json_integer_value( js );
331  if (new_server_id > server_id)
332  server_id = new_server_id;
333  json_decref( content );
334  }
335  else
336  {
337  pthread_mutex_lock(&deleteMessageMutex);
338  messageList.push_back(content);
339  pthread_mutex_unlock(&deleteMessageMutex);
340  }
341  }
342  }
343  static size_t json_load_callback_function (void *buffer, size_t buflen, void *data)
344  {
345  return recv(*((isaac_int*)data),buffer,1,0);
346  }
347  static void* run_readAndSetMessages(void* communicator)
348  {
349  ((IsaacCommunicator*)communicator)->readAndSetMessages();
350  return 0;
351  }
352  isaac_uint id;
353  isaac_uint server_id;
354  std::string url;
355  isaac_uint port;
356  isaac_int sockfd;
357  isaac_uint jpeg_quality;
358  std::list<json_t*> messageList;
359  pthread_mutex_t deleteMessageMutex;
360  pthread_t readThread;
361  json_t** registerMessage;
362 };
363 
364 } //namespace isaac;
IsaacCommunicator(const std::string url, const isaac_uint port)
Definition: isaac.hpp:60
isaac_int serverSend(char *content, bool starting=true, bool finishing=false)
#define ISAAC_MAX_DIFFERENCE
int32_t isaac_int
Definition: isaac_types.hpp:26
uint32_t isaac_uint
Definition: isaac_types.hpp:27
void setJpegQuality(isaac_uint jpeg_quality)
void serverSendFrame(void *ptr, const isaac_uint width, const isaac_uint height, const isaac_uint depth)
isaac_int serverSendRegister(json_t **registerMessage)
#define ISAAC_HANDLE_EPIPE(add, n, sockfd, readThread)
isaac_int serverConnect(CommunicatorSetting setting)