ISAAC
Overview :: Library Doc :: Server Doc :: JSON Commands

In Situ Animation of Accelerated Computations

RTMPImageConnector.cpp
Go to the documentation of this file.
1 /* This file is part of ISAAC.
2  *
3  * ISAAC is free software: you can redistribute it and/or modify
4  * it under the terms of the GNU Lesser General Public License as
5  * published by the Free Software Foundation, either version 3 of the
6  * License, or (at your option) any later version.
7  *
8  * ISAAC is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU Lesser General Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public
14  * License along with ISAAC. If not, see <www.gnu.org/licenses/>. */
15 
16 #include "RTMPImageConnector.hpp"
17 #include <boost/preprocessor.hpp>
18 #include <pthread.h>
19 #include <inttypes.h>
20 
21 RTMPImageConnector::RTMPImageConnector(std::string name, std::string apikey, std::string base_url, int bitrate, bool dummy_audio) :
22  group(NULL),
23  name(name),
24  apikey(apikey),
25  base_url(base_url),
26  heartbeat_image(NULL),
27  bitrate(bitrate),
28  dummy_audio(dummy_audio)
29 {
30  showClient = false;
31 }
32 
34 {
35  return std::string("RTMPImageConnector(") + name + std::string(")");
36 }
37 
38 errorCode RTMPImageConnector::init(int minport,int maxport)
39 {
40  return 0;
41 }
42 
43 #define RTMP_LOAD_ELEMENT_OR_DIE(element) \
44  if (success) \
45  { \
46  element = gst_element_factory_make(BOOST_PP_STRINGIZE(element), NULL); \
47  if(!element) \
48  { \
49  fprintf(stderr,"RTMPImageConnector: Could not open " BOOST_PP_STRINGIZE(element)"\n"); \
50  success = 0; \
51  } \
52  }
53 
55 {
56  struct timespec ts;
57  if (clock_gettime(CLOCK_MONOTONIC_RAW,&ts) == 0)
58  return ts.tv_sec*1000 + ts.tv_nsec/1000000;
59  return 0;
60 }
61 
62 void RTMPImageConnector::addFrame( ImageBuffer* image, GstAppSrc* appsrc, InsituConnectorGroup* group)
63 {
64  uint64_t val = gst_app_src_get_current_level_bytes( appsrc );
65  if ( val == 0)
66  {
67  image->incref();
68  GstBuffer *buffer = gst_buffer_new_wrapped_full (GstMemoryFlags(0), image->buffer, group->getVideoBufferSize(), 0, group->getVideoBufferSize(), (gpointer)(image), suicideNotify);
69  if (gst_app_src_push_buffer( appsrc, buffer) != GST_FLOW_OK)
70  printf("RTMPImageConnector: Error while sending buffer\n");
71  }
72 }
73 
75 {
76  RTMPImageConnector* myself = (RTMPImageConnector*)ptr;
77  while (myself->heartbeat_finish == false)
78  {
79  uint64_t now = getTicksMs();
80  if (myself->heartbeat_image)
81  {
82  pthread_mutex_lock(&myself->heartbeat_mutex);
83  if ( now - myself->heartbeat > ISAAC_MAX_HEARTBEAT )
84  {
85  addFrame( myself->heartbeat_image, (GstAppSrc*)(myself->appsrc), myself->group );
86  myself->heartbeat = now;
87  }
88  pthread_mutex_unlock(&myself->heartbeat_mutex);
89  }
90  usleep(10000);
91  }
92 }
93 
95 {
96  pthread_mutex_init (&heartbeat_mutex, NULL);
97  heartbeat_finish = false;
98  pthread_create(&heartbeat_thread,NULL,RTMPImageConnector::heartbeatFunction,this);
99  int finish = 0;
100  while (finish == 0)
101  {
102  ImageBufferContainer* message;
103  while (message = clientGetMessage())
104  {
105  if (message->type == IMG_FORCE_EXIT)
106  finish = 1;
107  if (message->type == GROUP_FINISHED)
108  {
109  if (group == message->group)
110  {
111  pthread_mutex_lock(&heartbeat_mutex);
112  if (heartbeat_image) //Releasing old frame
113  heartbeat_image->suicide();
114  heartbeat_image = NULL;
115  pthread_mutex_unlock(&heartbeat_mutex);
116  group = NULL;
117  gst_app_src_end_of_stream( (GstAppSrc*)appsrc );
118  gst_element_set_state(pipeline, GST_STATE_NULL);
119  gst_object_unref(GST_OBJECT(pipeline));
120  printf("RTMPImageConnector: Closed Stream\n");
121  }
122  }
123  if (message->type == UPDATE_BUFFER)
124  {
125  if (group == NULL)
126  {
127  //gst-launch-1.0 appsrc ! video/x-raw,… ! videoconvert ! capsfilter ! video/x-raw,format=I420 ! videorate ! video/x-raw,framerate=15/1 ! x264enc threads=2 bitrate=400 tune=zerolatency ! flvmux ! rtmpsink location=rtmp://live-fra.twitch.tv/app/$APIKEY
128  heartbeat = getTicksMs();
129  gboolean success = 1;
131  if (success)
132  g_object_set (G_OBJECT (appsrc), "caps",
133  gst_caps_new_simple ("video/x-raw",
134  "format", G_TYPE_STRING, "RGBx",
135  "bpp", G_TYPE_INT, 32,
136  "depth", G_TYPE_INT, 32,
137  "width", G_TYPE_INT, message->group->getFramebufferWidth(),
138  "height", G_TYPE_INT, message->group->getFramebufferHeight(),
139  "framerate", GST_TYPE_FRACTION, 0, 1,
140  NULL), NULL);
141  if (success)
142  g_object_set (G_OBJECT (appsrc),
143  "do-timestamp", 1,
144  "min-percent", 0,
145  "min-latency", 0,
146  "emit-signals", 0,
147  "is-live", 1,
148  "format", GST_FORMAT_TIME,
149  NULL);
150  RTMP_LOAD_ELEMENT_OR_DIE(videorate)
151  RTMP_LOAD_ELEMENT_OR_DIE(capsfilter)
152  videorate_capsfilter = capsfilter;
153  if (success)
154  g_object_set (G_OBJECT (videorate_capsfilter), "caps",
155  gst_caps_new_simple ("video/x-raw",
156  "framerate", GST_TYPE_FRACTION, 15, 1,
157  NULL), NULL);
158  RTMP_LOAD_ELEMENT_OR_DIE(videoconvert)
159  RTMP_LOAD_ELEMENT_OR_DIE(capsfilter)
160  if (success)
161  g_object_set (G_OBJECT (capsfilter), "caps",
162  gst_caps_new_simple ("video/x-raw",
163  "format", G_TYPE_STRING, "I420",
164  NULL), NULL);
165  RTMP_LOAD_ELEMENT_OR_DIE(x264enc)
166  if (success)
167  g_object_set (G_OBJECT (x264enc),
168  "tune", 0x00000004,
169  "psy-tune", 2,
170  "speed-preset", 1,
171  "bitrate", bitrate,
172  "threads", 2,
173  "byte-stream", 1,
174  "key-int-max", 2000/ISAAC_MAX_HEARTBEAT, // key-int-max * min_frame_rate <= 2s!
175  NULL);
177  if (success)
178  g_object_set (G_OBJECT (flvmux),
179  "streamable", 1,
180  NULL);
181  RTMP_LOAD_ELEMENT_OR_DIE(rtmpsink)
182  char location[512];
183  sprintf(location,"rtmp://%s/%s",base_url.c_str(),apikey.c_str());
184  if (success)
185  g_object_set(G_OBJECT(rtmpsink),
186  "location", location, NULL);
187 
188  if (dummy_audio)
189  {
190  RTMP_LOAD_ELEMENT_OR_DIE(audiotestsrc)
191  if (success)
192  g_object_set(G_OBJECT(audiotestsrc),
193  "volume", 0.0,
194  "is-live", 1,
195  NULL);
196  RTMP_LOAD_ELEMENT_OR_DIE(voaacenc)
197  if (success)
198  {
199  pipeline = gst_pipeline_new( NULL );
200  bin = gst_bin_new( NULL );
201  gst_bin_add_many(GST_BIN(bin), appsrc, videoconvert, capsfilter, x264enc, flvmux, rtmpsink, NULL);
202  if (dummy_audio)
203  gst_bin_add_many(GST_BIN(bin), audiotestsrc, voaacenc, NULL);
204  gst_bin_add(GST_BIN(pipeline), bin);
205  success = gst_element_link_many( appsrc, videoconvert, capsfilter, x264enc, flvmux, NULL);
206  if (dummy_audio)
207  success &= gst_element_link_many( audiotestsrc, voaacenc, flvmux, NULL);
208  success &= gst_element_link_many( flvmux, rtmpsink, NULL);
209  if ( !success )
210  fprintf(stderr,"RTMPImageConnector: Could not link elements for rtmp stream.\n");
211  if (gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE)
212  printf("RTMPImageConnector: Could not play stream!\n");
213  else
214  printf("RTMPImageConnector: Openend H264 Stream\n");
215  group = message->group;
216  }
217  }
218  }
219  if (group == message->group) //We show always the very first group
220  {
221  pthread_mutex_lock(&heartbeat_mutex);
222  addFrame( message->image, (GstAppSrc*)appsrc, group );
223  heartbeat = getTicksMs();
224  if (heartbeat_image) //Releasing old frame
225  heartbeat_image->suicide();
226  heartbeat_image = message->image;
227  heartbeat_image->incref();
228  pthread_mutex_unlock(&heartbeat_mutex);
229  }
230  }
231  clientSendMessage( message );
232  }
233  usleep(1000);
234  }
235  heartbeat_finish = true;
236  pthread_join(heartbeat_thread,NULL);
237  pthread_mutex_destroy(&heartbeat_mutex);
238 }
ImageBufferContainer * clientGetMessage()
Definition: MessageAble.hpp:38
errorCode clientSendMessage(ImageBufferContainer *message)
Definition: MessageAble.hpp:34
void suicideNotify(gpointer data)
static void addFrame(ImageBuffer *image, GstAppSrc *appsrc, InsituConnectorGroup *group)
InsituConnectorGroup * group
Definition: Common.hpp:196
static uint64_t getTicksMs()
int getVideoBufferSize()
Definition: Broker.hpp:66
void suicide()
Definition: Common.hpp:149
#define RTMP_LOAD_ELEMENT_OR_DIE(element)
RTMPImageConnector(std::string name, std::string apikey, std::string base_url, int bitrate=400, bool dummy_audio=false)
static void * heartbeatFunction(void *ptr)
int errorCode
Definition: Common.hpp:24
errorCode init(int minport, int maxport)
void incref()
Definition: Common.hpp:143
int getFramebufferHeight()
Definition: Broker.hpp:74
#define ISAAC_MAX_HEARTBEAT
ImageBufferType type
Definition: Common.hpp:195
int getFramebufferWidth()
Definition: Broker.hpp:70
uint8_t * buffer
Definition: Common.hpp:162
ImageBuffer * image
Definition: Common.hpp:199