ISAAC
Overview :: Library Doc :: Server Doc :: JSON Commands

In Situ Animation of Accelerated Computations

RTPImageConnector.cpp
Go to the documentation of this file.
1 /* This file is part of ISAAC.
2  *
3  * ISAAC is free software: you can redistribute it and/or modify
4  * it under the terms of the GNU Lesser General Public License as
5  * published by the Free Software Foundation, either version 3 of the
6  * License, or(at your option) any later version.
7  *
8  * ISAAC is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU Lesser General Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public
14  * License along with ISAAC. If not, see <www.gnu.org/licenses/>. */
15 
16 #include "RTPImageConnector.hpp"
17 #include <boost/preprocessor.hpp>
18 #include <pthread.h>
19 #include <inttypes.h>
20 
21 RTPImageConnector::RTPImageConnector(std::string url,bool zerolatency,bool raw)
22 {
23  this->url = url;
24  this->zerolatency = zerolatency;
25  this->raw = raw;
26 }
27 
29 {
30  if (raw)
31  return "JPEG_RTP_Stream";
32  else
33  return "H264_RTP_Stream";
34 }
35 
36 #define GST_LOAD_ELEMENT_OR_DIE(stream,element) \
37  if (success) \
38  { \
39  stream.element = gst_element_factory_make(BOOST_PP_STRINGIZE(element), NULL); \
40  if(!stream.element) \
41  { \
42  fprintf(stderr,"Could not open " BOOST_PP_STRINGIZE(element)"\n"); \
43  success = 0; \
44  } \
45  }
46 
47 errorCode RTPImageConnector::init(int minport,int maxport)
48 {
49  this->minport = minport;
50  this->maxport = maxport;
51  tStream defaultstream;
52  defaultstream.is_used = false;
53  streams.resize(maxport-minport,defaultstream);
54  gst_init(NULL,NULL);
55  return 0;
56 }
57 
58 void suicideNotify(gpointer data)
59 {
60  ImageBuffer* image = (ImageBuffer*)data;
61  image->suicide();
62 }
63 
65 {
66  uint64_t finish = 0;
67  while(finish == 0)
68  {
69  ImageBufferContainer* message;
70  while(message = clientGetMessage())
71  {
72  if (message->type == GROUP_OBSERVED)
73  {
74  int nr;
75  for (nr = 0; nr < streams.size(); nr++)
76  if (!streams[nr].is_used)
77  break;
78  if (nr < streams.size())
79  {
80  //gst-launch-1.0 appsrc ! videoconf ! x264enc ! rtph264pay config-interval=10 pt=96 ! udpsink host=127.0.0.1 port=5000
81  gboolean success = 1;
82  GST_LOAD_ELEMENT_OR_DIE(streams[nr],appsrc)
83  if (success)
84  g_object_set (G_OBJECT (streams[nr].appsrc), "caps",
85  gst_caps_new_simple ("video/x-raw",
86  "format", G_TYPE_STRING, "RGBx",
87  "bpp", G_TYPE_INT, 32,
88  "depth", G_TYPE_INT, 32,
89  "width", G_TYPE_INT, message->group->getFramebufferWidth(),
90  "height", G_TYPE_INT, message->group->getFramebufferHeight(),
91  "framerate", GST_TYPE_FRACTION, 0, 1,
92  NULL), NULL);
93  g_object_set (G_OBJECT (streams[nr].appsrc),
94  "do-timestamp", 1,
95  "min-percent", 0,
96  "emit-signals", 0,
97  "format", GST_FORMAT_TIME, NULL);
98  GST_LOAD_ELEMENT_OR_DIE(streams[nr],videoconvert)
99  if (raw)
100  {
101  GST_LOAD_ELEMENT_OR_DIE(streams[nr],capsfilter)
102  if (success)
103  g_object_set (G_OBJECT (streams[nr].capsfilter), "caps",
104  gst_caps_new_simple ("video/x-raw",
105  "format", G_TYPE_STRING, "I420",
106  NULL), NULL);
107  GST_LOAD_ELEMENT_OR_DIE(streams[nr],jpegenc)
108  GST_LOAD_ELEMENT_OR_DIE(streams[nr],rtpjpegpay)
109  if (success)
110  g_object_set(G_OBJECT(streams[nr].rtpjpegpay),
111  "pt", 96, NULL);
112  }
113  else
114  {
115  GST_LOAD_ELEMENT_OR_DIE(streams[nr],x264enc)
116  size_t bitrate_heuristic = (size_t) (
117  (uint64_t)3000 *
118  (uint64_t)message->group->getFramebufferWidth() *
119  (uint64_t)message->group->getFramebufferHeight() /
120  (uint64_t)800 /
121  (uint64_t)600 );
122  if (success)
123  g_object_set (G_OBJECT (streams[nr].x264enc),
124  "tune", zerolatency ? 0x00000004 : 0x00000000,
125  "psy-tune", 2,
126  "speed-preset", 1,
127  "bitrate", bitrate_heuristic,
128  "threads", 2,
129  "byte-stream", 1, NULL);
130  GST_LOAD_ELEMENT_OR_DIE(streams[nr],rtph264pay)
131  if (success)
132  g_object_set(G_OBJECT(streams[nr].rtph264pay),
133  "config-interval", 10,
134  "pt", 96, NULL);
135  }
136  GST_LOAD_ELEMENT_OR_DIE(streams[nr],udpsink)
137  if (success)
138  g_object_set(G_OBJECT(streams[nr].udpsink),
139  "host", message->target.c_str(),
140  "port", nr+minport, NULL);
141 
142  if (success)
143  {
144  streams[nr].pipeline = gst_pipeline_new( NULL );
145  streams[nr].bin = gst_bin_new( NULL );
146  if (raw)
147  {
148  gst_bin_add_many(GST_BIN(streams[nr].bin), streams[nr].appsrc, streams[nr].videoconvert, streams[nr].capsfilter, streams[nr].jpegenc, streams[nr].rtpjpegpay, streams[nr].udpsink, NULL);
149  gst_bin_add(GST_BIN(streams[nr].pipeline), streams[nr].bin);
150  success = gst_element_link_many(streams[nr].appsrc, streams[nr].videoconvert, streams[nr].capsfilter, streams[nr].jpegenc, streams[nr].rtpjpegpay, streams[nr].udpsink, NULL);
151  }
152  else
153  {
154  gst_bin_add_many(GST_BIN(streams[nr].bin), streams[nr].appsrc, streams[nr].videoconvert, streams[nr].x264enc, streams[nr].rtph264pay, streams[nr].udpsink, NULL);
155  gst_bin_add(GST_BIN(streams[nr].pipeline), streams[nr].bin);
156  success = gst_element_link_many(streams[nr].appsrc, streams[nr].videoconvert, streams[nr].x264enc, streams[nr].rtph264pay, streams[nr].udpsink, NULL);
157  }
158  }
159  if ( !success )
160  fprintf(stderr,"RTPImageConnector: Could not link elements for rtp stream.\n");
161  else
162  {
163  streams[nr].is_used = true;
164  streams[nr].group = message->group;
165  streams[nr].ref = message->reference;
166  streams[nr].url = message->target;
167  char* register_message = (char*)malloc(128);
168  if (raw)
169  sprintf(register_message,"v=0\nm=video %i RTP/AVP 96\nc=IN IP4 %s\na=rtpmap:96 JPEG/90000\n",nr+minport,url.c_str());
170  else
171  sprintf(register_message,"v=0\nm=video %i RTP/AVP 96\nc=IN IP4 %s\na=rtpmap:96 H264/90000\n",nr+minport,url.c_str());
172  clientSendMessage(new ImageBufferContainer(REGISTER_STREAM,(uint8_t*)register_message,message->group,1,message->target,message->reference));
173  if (raw)
174  printf("RTIPImageConnector: Openend JPEG Stream at port %i\n",minport+nr);
175  else
176  printf("RTIPImageConnector: Openend H264 Stream at port %i\n",minport+nr);
177  if (gst_element_set_state(GST_ELEMENT(streams[nr].pipeline), GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE)
178  printf("RTIPImageConnector: Could not play stream!\n");
179  }
180  }
181  else
182  fprintf(stderr,"RTPImageConnector: No free port!\n");
183  }
184  if (message->type == GROUP_OBSERVED_STOPPED || message->type == GROUP_FINISHED)
185  {
186  int nr;
187  for (nr = 0; nr < streams.size(); nr++)
188  if ((message->type == GROUP_OBSERVED_STOPPED && streams[nr].ref == message->reference) ||
189  (message->type == GROUP_FINISHED && streams[nr].group == message->group))
190  break;
191  if (nr < streams.size() && streams[nr].is_used)
192  {
193  gst_app_src_end_of_stream( (GstAppSrc*)streams[nr].appsrc );
194  gst_element_set_state(streams[nr].pipeline, GST_STATE_NULL);
195  gst_object_unref(GST_OBJECT(streams[nr].pipeline));
196  streams[nr].is_used = false;
197  printf("RTIPImageConnector: Closed Stream\n");
198  }
199  }
200  if (message->type == UPDATE_BUFFER)
201  {
202  int nr;
203  for (nr = 0; nr < streams.size(); nr++)
204  if (streams[nr].is_used && streams[nr].group == message->group)
205  {
206  uint64_t val = gst_app_src_get_current_level_bytes( (GstAppSrc*)streams[nr].appsrc );
207  if ( val == 0)
208  {
209  message->image->incref();
210  GstBuffer *buffer = gst_buffer_new_wrapped_full (GstMemoryFlags(0), message->image->buffer, streams[nr].group->getVideoBufferSize(), 0, streams[nr].group->getVideoBufferSize(), (gpointer)(message->image), suicideNotify);
211  if (gst_app_src_push_buffer( (GstAppSrc*)streams[nr].appsrc, buffer) != GST_FLOW_OK)
212  printf("RTIPImageConnector: Error while sending buffer\n");
213  }
214  }
215  }
216  if (message->type == IMG_FORCE_EXIT)
217  finish = 1;
218  clientSendMessage( message );
219  }
220  usleep(1000);
221  }
222  int nr;
223  for (nr = 0; nr < streams.size(); nr++)
224  if (streams[nr].is_used)
225  {
226  gst_app_src_end_of_stream( (GstAppSrc*)streams[nr].appsrc );
227  gst_element_set_state(streams[nr].pipeline, GST_STATE_NULL);
228  gst_object_unref(GST_OBJECT(streams[nr].pipeline));
229  streams[nr].is_used = false;
230  }
231 }
232 
234 {
235 }
236 
ImageBufferContainer * clientGetMessage()
Definition: MessageAble.hpp:38
errorCode clientSendMessage(ImageBufferContainer *message)
Definition: MessageAble.hpp:34
void suicideNotify(gpointer data)
InsituConnectorGroup * group
Definition: Common.hpp:196
RTPImageConnector(std::string url, bool zerolatency=false, bool raw=false)
void suicide()
Definition: Common.hpp:149
errorCode init(int minport, int maxport)
int errorCode
Definition: Common.hpp:24
#define GST_LOAD_ELEMENT_OR_DIE(stream, element)
void incref()
Definition: Common.hpp:143
int getFramebufferHeight()
Definition: Broker.hpp:74
ImageBufferType type
Definition: Common.hpp:195
int getFramebufferWidth()
Definition: Broker.hpp:70
uint8_t * buffer
Definition: Common.hpp:162
std::string target
Definition: Common.hpp:197
ImageBuffer * image
Definition: Common.hpp:199