ISAAC
Overview :: Library Doc :: Server Doc :: JSON Commands

In Situ Animation of Accelerated Computations

WebSocketDataConnector.cpp
Go to the documentation of this file.
1 /* This file is part of ISAAC.
2  *
3  * ISAAC is free software: you can redistribute it and/or modify
4  * it under the terms of the GNU Lesser General Public License as
5  * published by the Free Software Foundation, either version 3 of the
6  * License, or (at your option) any later version.
7  *
8  * ISAAC is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU Lesser General Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public
14  * License along with ISAAC. If not, see <www.gnu.org/licenses/>. */
15 
17 
18 
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <getopt.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <fcntl.h>
25 #include <assert.h>
26 
27 #include <syslog.h>
28 #include <sys/time.h>
29 #include <unistd.h>
30 
31 #include <pthread.h>
32 
33 #include "MetaDataClient.hpp"
34 
35 #include <sstream>
36 #include <iostream>
37 
39 {
40  context = NULL;
41 }
42 
44 {
45  return "WebSocketDataConnector";
46 }
47 
48 static int callback_http(
49  struct lws *wsi,
50  enum lws_callback_reasons reason,
51  void *user,
52  void *in,
53  size_t len )
54 {
55  Broker** broker_ptr = (Broker**)lws_context_user(lws_get_context(wsi));
56  Broker* broker = NULL;
57  if (broker_ptr)
58  broker = *broker_ptr;
59  switch (reason)
60  {
61  case LWS_CALLBACK_HTTP:
62  {
63  std::istringstream request( (char*)in );
64  std::string left,middle,right;
65  std::getline(request, left, '/'); //first /
66  std::getline(request, left, '/');
67  std::getline(request, middle, '/');
68  std::getline(request, right, '/');
69  std::string description = broker->getStream(left,middle,right);
70  char buf[LWS_SEND_BUFFER_PRE_PADDING + 2048 + LWS_SEND_BUFFER_POST_PADDING];
71  char* use = &(buf[LWS_SEND_BUFFER_PRE_PADDING]);
72  sprintf(use,"HTTP/1.1 200 OK\n\n%s",description.c_str());
73  lws_write(wsi, (unsigned char*) use, strlen(use), LWS_WRITE_HTTP);
74  char name[256];
75  char rip[256];
76  lws_get_peer_addresses(wsi,lws_get_socket_fd(wsi),name,256,rip,256);
77  printf("HTTP Connection from %s (%s)!\n",name,rip);
78  return -1;
79  }
80  }
81  return 0;
82 }
83 
86  char url[32];
87 };
88 
89 static int
90 callback_isaac(
91  struct lws *wsi,
92  enum lws_callback_reasons reason,
93  void *user,
94  void *in,
95  size_t len )
96 {
97  int n, m;
98  char buf[LWS_SEND_BUFFER_PRE_PADDING + ISAAC_MAX_RECEIVE +
99  LWS_SEND_BUFFER_POST_PADDING];
100  char *p = &buf[LWS_SEND_BUFFER_PRE_PADDING];
101  struct per_session_data__isaac *pss = (struct per_session_data__isaac *)user;
102  Broker** broker_ptr = (Broker**)lws_context_user(lws_get_context(wsi));
103  Broker* broker = NULL;
104  if (broker_ptr)
105  broker = *broker_ptr;
106 
107  switch (reason) {
108 
109  case LWS_CALLBACK_ESTABLISHED:
110  printf("callback_isaac: LWS_CALLBACK_ESTABLISHED\n");
111  char dummy[32];
112  lws_get_peer_addresses(wsi,lws_get_socket_fd(wsi),dummy,32,pss->url,32);
113  break;
114 
115  case LWS_CALLBACK_SERVER_WRITEABLE:
116  if (pss->client)
117  {
118  MessageContainer* message = NULL;
119  int l = 0;
120  do
121  {
122  l = pss->client->messagesIn.length();
123  while ( (message = pss->client->clientGetMessage()) != NULL && //new message
124  l > 1 && //at least two
125  message->drop_able ) //only skip if dropable!
126  {
127  printf("WebSocketDataConnector: Dropped one dropable package!\n");
128  delete message;
129  l--;
130  }
131  if (message) //New message from master sama!
132  {
133  pthread_mutex_lock(&MessageContainer::deep_copy_mutex);
134  char* buffer = json_dumps( message->json_root, 0 );
135  pthread_mutex_unlock(&MessageContainer::deep_copy_mutex);
136  n = strlen(buffer);
137  sprintf(p,"%s",buffer);
138  m = lws_write(wsi, (unsigned char*)p, n, LWS_WRITE_TEXT);
139  free(buffer);
140  if (m < n)
141  {
142  lwsl_err("ERROR %d writing to socket\n", n);
144  return -1;
145  }
146  delete message;
147  }
148  }
149  while (l > 1 && !lws_send_pipe_choked(wsi));
150  }
151  break;
152  //case LWS_CALLBACK_CLOSED:
153  // pss->client->clientSendMessage(new MessageContainer(CLOSED));
154  // return -1;
155  case LWS_CALLBACK_RECEIVE:
156  if (pss->client)
157  {
158  json_t* input = json_loads((const char *)in, 0, NULL);
159  MessageContainer* message = new MessageContainer(NONE,input);
160  int finish = (message->type == CLOSED);
161  json_object_set_new( message->json_root, "url", json_string( pss->url ) );
162  pss->client->clientSendMessage(message);
163  if (finish)
164  return -1;
165  }
166  break;
167 
168  case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION:
169  {
170  char name[256];
171  char rip[256];
172  lws_get_peer_addresses(wsi,lws_get_socket_fd(wsi),name,256,rip,256);
173  printf("ISAAC Connection from %s (%s)!\n",name,rip);
174  pss->client = broker->addDataClient();
175  break;
176  }
177 
178  default:
179  break;
180  }
181 
182  return 0;
183 }
184 
185 static struct lws_protocols protocols[] = {
186  {
187  "http-only", /* name */
188  callback_http, /* callback */
189  0, /* per_session_data_size */
190  0, /* max frame size / rx buffer */
191  },
192  {
193  "isaac-json-protocol",
194  callback_isaac,
195  sizeof(struct per_session_data__isaac),
196  ISAAC_MAX_RECEIVE,
197  },
198  { NULL, NULL, 0, 0 } /* terminator */
199 };
200 
201 errorCode WebSocketDataConnector::init(int port,std::string interface)
202 {
203  setlogmask(LOG_UPTO (LOG_DEBUG));
204  openlog("lwsts", LOG_PID | LOG_PERROR, LOG_DAEMON);
205  lws_set_log_level(7, lwsl_emit_syslog);
206  struct lws_context_creation_info info;
207  memset(&info, 0, sizeof info);
208  info.protocols = protocols;
209  #ifndef LWS_NO_EXTENSIONS
210  info.extensions = NULL;
211  #endif
212  info.user = (void*)(&broker);
213  info.port = port;
214  info.gid = -1;
215  info.uid = -1;
216  if (interface.compare(std::string("*")) != 0)
217  info.iface = interface.c_str();
218  context = lws_create_context(&info);
219  if (context == NULL) {
220  lwsl_err("libwebsocket init failed\n");
221  return -1;
222  }
223  return 0;
224 }
225 
227 {
228  int n = 0;
229  bool force_exit = false;
230  while (n >= 0 && !force_exit)
231  {
232  n = lws_service(context, 0);
233  lws_callback_on_writable_all_protocol(context,&protocols[1]);
234  while (MessageContainer* message = clientGetMessage())
235  {
236  if (message->type == FORCE_EXIT)
237  force_exit = true;
238  delete message;
239  }
240  usleep(100);
241  }
242  lws_context_destroy(context);
243  return 0;
244 }
Definition: Common.hpp:42
MessageTemplate * clientGetMessage()
Definition: MessageAble.hpp:38
errorCode clientSendMessage(MessageTemplate *message)
Definition: MessageAble.hpp:34
#define ISAAC_MAX_RECEIVE
Definition: Common.hpp:26
MessageType type
Definition: Common.hpp:112
std::string getStream(std::string connector, std::string name, std::string ref)
Definition: Broker.cpp:244
int length()
Definition: ThreadList.hpp:77
int errorCode
Definition: Common.hpp:24
ThreadList< MessageTemplate * > messagesIn
Definition: MessageAble.hpp:52
static pthread_mutex_t deep_copy_mutex
Definition: Common.hpp:49
json_t * json_root
Definition: Common.hpp:113
MetaDataClient * addDataClient()
Definition: Broker.cpp:85
errorCode init(int port, std::string interface)