ISAAC
Overview :: Library Doc :: Server Doc :: JSON Commands

In Situ Animation of Accelerated Computations

TCPDataConnector.cpp
Go to the documentation of this file.
1 /* This file is part of ISAAC.
2  *
3  * ISAAC is free software: you can redistribute it and/or modify
4  * it under the terms of the GNU Lesser General Public License as
5  * published by the Free Software Foundation, either version 3 of the
6  * License, or (at your option) any later version.
7  *
8  * ISAAC is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU Lesser General Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public
14  * License along with ISAAC. If not, see <www.gnu.org/licenses/>. */
15 
16 #include "TCPDataConnector.hpp"
17 #include "MetaDataClient.hpp"
18 #include "NetworkInterfaces.hpp"
19 
20 #include <sys/types.h>
21 #include <sys/socket.h>
22 #include <sys/poll.h>
23 #include <netinet/in.h>
24 
26 {
27 }
28 
30 {
31  return "TCPDataConnector";
32 }
33 
34 errorCode TCPDataConnector::init(int port,std::string interface)
35 {
36  sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
37  if (sockfd < 0)
38  return -1;
39  int enable = 1;
40  setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
41  struct sockaddr_in serv_addr;
42  memset(&serv_addr,0, sizeof(serv_addr));
43  serv_addr.sin_family = AF_INET;
44  NetworkInterfaces::bindInterface(serv_addr.sin_addr.s_addr,interface);
45  serv_addr.sin_port = htons(port);
46  if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
47  {
48  printf("TCPDataConnector: Bind failed with error %i\n",errno);
49  return -2;
50  }
51 }
52 
54 {
56 };
57 
59 {
60  listen(sockfd,5);
61  struct sockaddr_in cli_addr;
62  socklen_t clilen = sizeof(cli_addr);
63 
64  struct pollfd fd_array[MAX_SOCKETS];
65  memset(fd_array,0,sizeof(fd_array));
66  std::vector< MetaDataClient* > client_array = std::vector< MetaDataClient* >(MAX_SOCKETS,NULL);
67  std::vector< jlcb_container* > jlcb_array = std::vector< jlcb_container* >(MAX_SOCKETS,NULL);
68 
69  fd_array[0].fd = sockfd;
70  fd_array[0].events = POLLIN;
71  int fdnum = 1;
72 
73  volatile bool force_exit = false;
74 
75  while (!force_exit)
76  {
77  //Own messages
78  while (MessageContainer* message = clientGetMessage())
79  {
80  if (message->type == FORCE_EXIT)
81  force_exit = true;
82  delete message;
83  }
84  int rv = poll(fd_array, fdnum, 10); //10ms timeout
85  if (rv < 0)
86  {
87  fprintf(stderr,"TCPDataConnector: Error while calling poll\n");
88  return -1;
89  }
90  if (rv)
91  {
92  //First some extra sausage for the listening sockfd
93  if (fd_array[0].revents == POLLIN)
94  {
95  int newsockfd = 1;
96  while (newsockfd >= 0)
97  {
98  newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
99  if (newsockfd >= 0)
100  {
101  printf("TCPDataConnector: New connection, giving id %i\n",fdnum);
102  client_array[fdnum] = broker->addDataClient();
103  jlcb_array[fdnum] = new jlcb_container();
104  fd_array[fdnum].fd = newsockfd;
105  fd_array[fdnum].events = POLLIN;
106  fdnum++;
107  }
108  }
109  }
110  }
111  for (int i = 1; i < fdnum; i++)
112  {
113  bool closed = false;
114  //Messages of children
115  while (MessageContainer* message = client_array[i]->clientGetMessage())
116  {
117  pthread_mutex_lock(&MessageContainer::deep_copy_mutex);
118  char* buffer = json_dumps( message->json_root, 0 );
119  pthread_mutex_unlock(&MessageContainer::deep_copy_mutex);
120  int l = strlen(buffer);
121  int n = 0;
122  int amount = (l+4095)/4096;
123  for (int j = 0; j < amount; j++)
124  {
125  if (j == amount - 1)
126  n += send(fd_array[i].fd,&buffer[j*4096],l - j * 4096,MSG_NOSIGNAL);
127  else
128  n += send(fd_array[i].fd,&buffer[j*4096],4096,MSG_MORE | MSG_NOSIGNAL);
129  }
130  if (n < l)
131  {
132  printf("TCPDataConnector: ERROR %d writing to socket %i\n", n, fd_array[i].fd);
133  client_array[i]->clientSendMessage(new MessageContainer(CLOSED));
134  closed = true;
135  }
136  delete message;
137  free(buffer);
138  }
139  if (fd_array[i].revents & POLLIN)
140  {
141  while (1)
142  {
143  int add = recv(fd_array[i].fd,&(jlcb_array[i]->jlcb.buffer[jlcb_array[i]->jlcb.count]),4096,MSG_DONTWAIT);
144  if (add > 0)
145  jlcb_array[i]->jlcb.count += add;
146  else
147  break;
148  }
149  jlcb_array[i]->jlcb.pos = 0;
150  jlcb_array[i]->jlcb.buffer[jlcb_array[i]->jlcb.count] = 0;
151  if (jlcb_array[i]->jlcb.count > 0)
152  {
153  jlcb_array[i]->jlcb.buffer[jlcb_array[i]->jlcb.count] = 0;
154  json_error_t error;
155  int last_working_pos = 0;
156  while (json_t * content = json_load_callback(json_load_callback_function,&jlcb_array[i]->jlcb,JSON_DISABLE_EOF_CHECK,&error))
157  {
158  last_working_pos = jlcb_array[i]->jlcb.pos;
159  MessageContainer* message = new MessageContainer(NONE,content);
160  MessageType type = message->type;
161  json_object_set_new( message->json_root, "url", json_string( "127.0.0.1" ) ); //TODO: Using real url
162  client_array[i]->clientSendMessage(message);
163  }
164  //If the whole json message was not received yet, we need to keep the start
165  if ( error.position != 1 || strcmp(error.text,"'[' or '{' expected near end of file") != 0 )
166  {
167  for (int j = 0; j < jlcb_array[i]->jlcb.count - last_working_pos; j++)
168  jlcb_array[i]->jlcb.buffer[j] = jlcb_array[i]->jlcb.buffer[j + last_working_pos];
169  jlcb_array[i]->jlcb.count -= last_working_pos;
170  }
171  else
172  jlcb_array[i]->jlcb.count = 0;
173  }
174  else //Closed
175  {
176  client_array[i]->clientSendMessage(new MessageContainer(CLOSED));
177  closed = true;
178  }
179  }
180  if (closed)
181  {
182  close(fd_array[i].fd);
183  printf("TCPDataConnector: Closed connection %i\n",i);
184  delete(jlcb_array[i]);
185  fdnum--;
186  for (int j = i; j < fdnum; j++)
187  {
188  fd_array[j] = fd_array[j+1];
189  client_array[j] = client_array[j+1];
190  }
191  memset(&(fd_array[fdnum]),0,sizeof(fd_array[fdnum]));
192  }
193  }
194  }
195  return 0;
196 }
json_load_callback_struct jlcb
Definition: Common.hpp:42
MessageContainer * clientGetMessage()
Definition: MessageAble.hpp:38
MessageType type
Definition: Common.hpp:112
std::string getName()
size_t json_load_callback_function(void *buffer, size_t buflen, void *data)
int errorCode
Definition: Common.hpp:24
MessageType
Definition: Common.hpp:28
static void bindInterface(in_addr_t &s_addr, std::string interface, bool ipv6=false)
static pthread_mutex_t deep_copy_mutex
Definition: Common.hpp:49
json_t * json_root
Definition: Common.hpp:113
MetaDataClient * addDataClient()
Definition: Broker.cpp:85
#define MAX_SOCKETS
errorCode init(int port, std::string interface)