ISAAC
Overview :: Library Doc :: Server Doc :: JSON Commands

In Situ Animation of Accelerated Computations

InsituConnectorMaster.cpp
Go to the documentation of this file.
1 /* This file is part of ISAAC.
2  *
3  * ISAAC is free software: you can redistribute it and/or modify
4  * it under the terms of the GNU Lesser General Public License as
5  * published by the Free Software Foundation, either version 3 of the
6  * License, or (at your option) any later version.
7  *
8  * ISAAC is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU Lesser General Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public
14  * License along with ISAAC. If not, see <www.gnu.org/licenses/>. */
15 
17 #include "NetworkInterfaces.hpp"
18 #include "version.hpp"
19 
20 #include <jansson.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 #include <sys/poll.h>
24 #include <netinet/in.h>
25 #include <string.h>
26 #include <pthread.h>
27 #include <errno.h>
28 #include <vector>
29 
31 {
32  sockfd = 0;
33  nextFreeNumber = 0;
34  force_exit = false;
35 }
36 
37 errorCode InsituConnectorMaster::init(int port,std::string interface)
38 {
39  sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
40  if (sockfd < 0)
41  return -1;
42  int enable = 1;
43  setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
44  struct sockaddr_in serv_addr;
45  memset(&serv_addr,0, sizeof(serv_addr));
46  serv_addr.sin_family = AF_INET;
47  NetworkInterfaces::bindInterface(serv_addr.sin_addr.s_addr,interface);
48  serv_addr.sin_port = htons(port);
49  if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
50  {
51  printf("Bind failed with error %i\n",errno);
52  return -2;
53  }
54 }
55 
57 {
58  return sockfd;
59 }
60 
61 size_t json_load_callback_function (void *buffer, size_t buflen, void *data)
62 {
64  if (jlcb->pos < jlcb->count)
65  {
66  ((char*)buffer)[0] = jlcb->buffer[jlcb->pos];
67  jlcb->pos++;
68  return 1;
69  }
70  return 0;
71 }
72 
74 {
75  listen(sockfd,5);
76  struct sockaddr_in cli_addr;
77  socklen_t clilen = sizeof(cli_addr);
78 
79  struct pollfd fd_array[MAX_SOCKETS];
80  memset(fd_array,0,sizeof(fd_array));
81  std::vector< InsituConnectorContainer* > con_array = std::vector< InsituConnectorContainer* >(MAX_SOCKETS,NULL);
82 
83  fd_array[0].fd = sockfd;
84  fd_array[0].events = POLLIN;
85  int fdnum = 1;
86 
87  while (!force_exit)
88  {
89  int rv = poll(fd_array, fdnum, 1000); //1s timeout
90  if (rv < 0)
91  {
92  fprintf(stderr,"Error while calling poll\n");
93  return -1;
94  }
95  if (rv)
96  {
97  //First some extra sausage for the listening sockfd
98  if (fd_array[0].revents == POLLIN)
99  {
100  int newsockfd = 1;
101  while (newsockfd >= 0)
102  {
103  newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
104  if (newsockfd >= 0)
105  {
106  InsituConnector* insituConnector = new InsituConnector(newsockfd,nextFreeNumber++);
107  insituConnector->jlcb.count = 0;
109  d->connector = insituConnector;
110  con_array[fdnum] = d;
111  fd_array[fdnum].fd = newsockfd;
112  fd_array[fdnum].events = POLLIN;
113  insituConnectorList.push_back(d);
114  fdnum++;
115  }
116  }
117  }
118  for (int i = 1; i < fdnum; i++)
119  {
120  if (fd_array[i].revents & POLLIN)
121  {
122  while (1)
123  {
124  int add = recv(fd_array[i].fd,&(con_array[i]->connector->jlcb.buffer[con_array[i]->connector->jlcb.count]),4096,MSG_DONTWAIT);
125  if (add > 0)
126  con_array[i]->connector->jlcb.count += add;
127  else
128  break;
129  }
130  con_array[i]->connector->jlcb.pos = 0;
131  con_array[i]->connector->jlcb.buffer[con_array[i]->connector->jlcb.count] = 0;
132  bool closed = false;
133  if (con_array[i]->connector->jlcb.count > 0)
134  {
135  con_array[i]->connector->jlcb.buffer[con_array[i]->connector->jlcb.count] = 0;
136  json_error_t error;
137  int last_working_pos = 0;
138  while (json_t * content = json_load_callback(json_load_callback_function,&con_array[i]->connector->jlcb,JSON_DISABLE_EOF_CHECK,&error))
139  {
140  last_working_pos = con_array[i]->connector->jlcb.pos;
141  MessageContainer* message = new MessageContainer(NONE,content);
142  MessageType type = message->type;
143  if (type == REGISTER)
144  {
145  json_object_set_new( message->json_root, "id", json_integer( con_array[i]->connector->getID() ) );
146  json_t* protocol_version = json_object_get( message->json_root, "protocol" );
147  long version[2] =
148  {
149  json_integer_value( json_array_get( protocol_version, 0) ),
150  json_integer_value( json_array_get( protocol_version, 1) )
151  };
152  if ( version[0] != ISAAC_PROTOCOL_VERSION_MAJOR )
153  {
154  printf("Fatal error: Protocol version mismatch: Library has %i.%i, server needs %i.%i!\n",version[0],version[1],ISAAC_PROTOCOL_VERSION_MAJOR,ISAAC_PROTOCOL_VERSION_MINOR);
155  const char buffer[] = "{ \"fatal error\": \"protocol mismatch\" }";
156  send(fd_array[i].fd,buffer,strlen(buffer),MSG_NOSIGNAL);
157  closed = true;
158  }
159  else
160  if ( version[1] != ISAAC_PROTOCOL_VERSION_MINOR )
161  printf("Warning: Protocol minor version mismatch: Library has %i.%i, server can %i.%i!\n",version[0],version[1],ISAAC_PROTOCOL_VERSION_MAJOR,ISAAC_PROTOCOL_VERSION_MINOR);
162  }
163  if (!closed)
164  {
165  long long uid = json_integer_value( json_object_get(content, "uid") );
166  con_array[i]->connector->clientSendMessage(message);
167  if (type == EXIT_PLUGIN)
168  {
169  closed = true;
170  break;
171  }
172  else
173  {
174  //send, which uid we just got
175  char buffer[32];
176  sprintf(buffer,"{\"done\": %lld}", uid );
177  int l = strlen(buffer);
178  if ( send(fd_array[i].fd,buffer,l,MSG_NOSIGNAL) < l )
179  {
180  MessageContainer* message = new MessageContainer(EXIT_PLUGIN,json_object());
181  json_object_set_new( message->json_root, "type", json_string( "exit" ) );
182  con_array[i]->connector->clientSendMessage(message);
183  closed = true;
184  break;
185  }
186  }
187  }
188  }
189  //If the whole json message was not received yet, we need to keep the start
190  if ( error.position != 1 || strcmp(error.text,"'[' or '{' expected near end of file") != 0 )
191  {
192  for (int j = 0; j < con_array[i]->connector->jlcb.count - last_working_pos; j++)
193  con_array[i]->connector->jlcb.buffer[j] = con_array[i]->connector->jlcb.buffer[j + last_working_pos];
194  con_array[i]->connector->jlcb.count -= last_working_pos;
195  }
196  else
197  con_array[i]->connector->jlcb.count = 0;
198  }
199  else //Closed
200  {
201  MessageContainer* message = new MessageContainer(EXIT_PLUGIN,json_object());
202  json_object_set_new( message->json_root, "type", json_string( "exit" ) );
203  con_array[i]->connector->clientSendMessage(message);
204  closed = true;
205  }
206  if (closed)
207  {
208  close(fd_array[i].fd);
209  fdnum--;
210  for (int j = i; j < fdnum; j++)
211  {
212  fd_array[j] = fd_array[j+1];
213  con_array[j] = con_array[j+1];
214  }
215  memset(&(fd_array[fdnum]),0,sizeof(fd_array[fdnum]));
216  }
217  }
218  }
219  }
220  }
221 }
222 
224 {
225  force_exit = true;
226 }
227 
229 {
231  while (mom = insituConnectorList.pop_front())
232  {
233  shutdown(mom->connector->getSockFD(),SHUT_RDWR);
234  printf("Waiting for Connections %i to finish... ",mom->connector->getID());
235  fflush(stdout);
236  delete mom->connector;
237  delete mom;
238  printf("Done\n");
239  }
240 }
241 
Definition: Common.hpp:42
ThreadList< InsituConnectorContainer *> insituConnectorList
MessageType type
Definition: Common.hpp:112
size_t json_load_callback_function(void *buffer, size_t buflen, void *data)
#define ISAAC_PROTOCOL_VERSION_MINOR
Definition: version.hpp:8
#define ISAAC_PROTOCOL_VERSION_MAJOR
Definition: version.hpp:7
int errorCode
Definition: Common.hpp:24
MessageType
Definition: Common.hpp:28
static void bindInterface(in_addr_t &s_addr, std::string interface, bool ipv6=false)
json_t * json_root
Definition: Common.hpp:113
char buffer[ISAAC_MAX_RECEIVE]
#define MAX_SOCKETS
struct InsituConnectorContainer_struct InsituConnectorContainer
errorCode init(int port, std::string interface)