20 #include <sys/types.h> 21 #include <sys/socket.h> 23 #include <netinet/in.h> 31 return "TCPDataConnector";
36 sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
40 setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &enable,
sizeof(
int));
41 struct sockaddr_in serv_addr;
42 memset(&serv_addr,0,
sizeof(serv_addr));
43 serv_addr.sin_family = AF_INET;
45 serv_addr.sin_port = htons(port);
46 if (bind(sockfd, (
struct sockaddr *) &serv_addr,
sizeof(serv_addr)) < 0)
48 printf(
"TCPDataConnector: Bind failed with error %i\n",errno);
61 struct sockaddr_in cli_addr;
62 socklen_t clilen =
sizeof(cli_addr);
64 struct pollfd fd_array[MAX_SOCKETS];
65 memset(fd_array,0,
sizeof(fd_array));
66 std::vector< MetaDataClient* > client_array = std::vector< MetaDataClient* >(
MAX_SOCKETS,NULL);
67 std::vector< jlcb_container* > jlcb_array = std::vector< jlcb_container* >(
MAX_SOCKETS,NULL);
69 fd_array[0].fd = sockfd;
70 fd_array[0].events = POLLIN;
73 volatile bool force_exit =
false;
84 int rv = poll(fd_array, fdnum, 10);
87 fprintf(stderr,
"TCPDataConnector: Error while calling poll\n");
93 if (fd_array[0].revents == POLLIN)
96 while (newsockfd >= 0)
98 newsockfd = accept(sockfd, (
struct sockaddr *) &cli_addr, &clilen);
101 printf(
"TCPDataConnector: New connection, giving id %i\n",fdnum);
104 fd_array[fdnum].fd = newsockfd;
105 fd_array[fdnum].events = POLLIN;
111 for (
int i = 1; i < fdnum; i++)
118 char* buffer = json_dumps( message->json_root, 0 );
120 int l = strlen(buffer);
122 int amount = (l+4095)/4096;
123 for (
int j = 0; j < amount; j++)
126 n += send(fd_array[i].fd,&buffer[j*4096],l - j * 4096,MSG_NOSIGNAL);
128 n += send(fd_array[i].fd,&buffer[j*4096],4096,MSG_MORE | MSG_NOSIGNAL);
132 printf(
"TCPDataConnector: ERROR %d writing to socket %i\n", n, fd_array[i].fd);
139 if (fd_array[i].revents & POLLIN)
143 int add = recv(fd_array[i].fd,&(jlcb_array[i]->jlcb.buffer[jlcb_array[i]->jlcb.count]),4096,MSG_DONTWAIT);
145 jlcb_array[i]->jlcb.count += add;
149 jlcb_array[i]->jlcb.pos = 0;
150 jlcb_array[i]->jlcb.buffer[jlcb_array[i]->jlcb.count] = 0;
151 if (jlcb_array[i]->jlcb.count > 0)
153 jlcb_array[i]->jlcb.buffer[jlcb_array[i]->jlcb.count] = 0;
155 int last_working_pos = 0;
158 last_working_pos = jlcb_array[i]->jlcb.pos;
161 json_object_set_new( message->
json_root,
"url", json_string(
"127.0.0.1" ) );
162 client_array[i]->clientSendMessage(message);
165 if ( error.position != 1 || strcmp(error.text,
"'[' or '{' expected near end of file") != 0 )
167 for (
int j = 0; j < jlcb_array[i]->jlcb.count - last_working_pos; j++)
168 jlcb_array[i]->jlcb.buffer[j] = jlcb_array[i]->jlcb.buffer[j + last_working_pos];
169 jlcb_array[i]->jlcb.count -= last_working_pos;
172 jlcb_array[i]->jlcb.count = 0;
182 close(fd_array[i].fd);
183 printf(
"TCPDataConnector: Closed connection %i\n",i);
184 delete(jlcb_array[i]);
186 for (
int j = i; j < fdnum; j++)
188 fd_array[j] = fd_array[j+1];
189 client_array[j] = client_array[j+1];
191 memset(&(fd_array[fdnum]),0,
sizeof(fd_array[fdnum]));
json_load_callback_struct jlcb
MessageContainer * clientGetMessage()
size_t json_load_callback_function(void *buffer, size_t buflen, void *data)
static void bindInterface(in_addr_t &s_addr, std::string interface, bool ipv6=false)
static pthread_mutex_t deep_copy_mutex
MetaDataClient * addDataClient()
errorCode init(int port, std::string interface)