diff --git a/DeviceSource.cpp b/DeviceSource.cpp index 89d1dfa..b459e5d 100644 --- a/DeviceSource.cpp +++ b/DeviceSource.cpp @@ -19,6 +19,7 @@ #ifndef NDEBUG #define DEVICE_DEBUG +//#define GST_DEVICE_DEBUG #endif @@ -112,7 +113,7 @@ Device::callback_device_monitor (GstBus *, GstMessage * message, gpointer ) break; manager().src_name_.push_back(name); -#ifdef DEVICE_DEBUG +#ifdef GST_DEVICE_DEBUG gchar *stru = gst_structure_to_string( gst_device_get_properties(device) ); g_print("\nDevice %s plugged : %s\n", name, stru); g_free (stru); @@ -134,7 +135,7 @@ Device::callback_device_monitor (GstBus *, GstMessage * message, gpointer ) gst_message_parse_device_removed (message, &device); name = gst_device_get_display_name (device); manager().remove(name); -#ifdef DEVICE_DEBUG +#ifdef GST_DEVICE_DEBUG g_print("\nDevice %s unplugged\n", name); #endif g_free (name); @@ -204,7 +205,7 @@ Device::Device() src_name_.push_back(name); g_free (name); -#ifdef DEVICE_DEBUG +#ifdef GST_DEVICE_DEBUG gchar *stru = gst_structure_to_string( gst_device_get_properties(device) ); g_print("\nDevice %s already plugged : %s", name, stru); g_free (stru); @@ -441,7 +442,7 @@ DeviceConfigSet Device::getDeviceConfigs(const std::string &src_description) for (int c = 0; c < C; ++c) { // get GST cap GstStructure *decice_cap_struct = gst_caps_get_structure (device_caps, c); -#ifdef DEVICE_DEBUG +#ifdef GST_DEVICE_DEBUG gchar *capstext = gst_structure_to_string (decice_cap_struct); g_print("\nDevice caps: %s", capstext); g_free(capstext); diff --git a/ImGuiVisitor.cpp b/ImGuiVisitor.cpp index 51ecbc8..8e10d35 100644 --- a/ImGuiVisitor.cpp +++ b/ImGuiVisitor.cpp @@ -539,7 +539,10 @@ void ImGuiVisitor::visit (NetworkSource& s) ImGui::SameLine(0, 10); ImGui::Text("Network connection"); - ImGui::Text("Connected to %s", s.connection().c_str()); + ImGui::Text("Connection to %s", s.connection().c_str()); + NetworkStream *ns = s.networkStream(); + ImGui::Text(" - %s (%dx%d)\n - Network host %s:%d", NetworkToolkit::protocol_name[ns->protocol()], + ns->resolution().x, ns->resolution().y, ns->IP().c_str(), ns->port()); } diff --git a/NetworkSource.cpp b/NetworkSource.cpp index 7960285..cdc3ec5 100644 --- a/NetworkSource.cpp +++ b/NetworkSource.cpp @@ -61,7 +61,6 @@ void StreamerResponseListener::ProcessMessage( const osc::ReceivedMessage& m, NetworkStream::NetworkStream(): Stream(), receiver_(nullptr) { -// listener_port_ = 5400; // TODO Find a free port, unique each time confirmed_ = false; } @@ -174,30 +173,45 @@ void NetworkStream::update() #ifdef NETWORK_DEBUG Log::Info("Creating Network Stream %d (%d x %d)", config_.port, config_.width, config_.height); #endif - // build the pipeline depending on stream info - std::ostringstream pipeline; + // prepare pipeline parameter with port given in config_ + std::string parameter = std::to_string(config_.port); - // get generic pipeline string - std::string pipelinestring = NetworkToolkit::protocol_receive_pipeline[config_.protocol]; - // find placeholder for PORT - int xxxx = pipelinestring.find("XXXX"); - // keep beginning of pipeline - pipeline << pipelinestring.substr(0, xxxx); - // Replace 'XXXX' + // make sure the shared memory socket exists if (config_.protocol == NetworkToolkit::SHM_RAW) { - std::string path = SystemToolkit::full_filename(SystemToolkit::settings_path(), "shm"); - path += std::to_string(config_.port); - pipeline << path; + // for shared memory, the parameter is a file location in settings + parameter = SystemToolkit::full_filename(SystemToolkit::settings_path(), "shm") + parameter; + // try few times to see if file exists and wait 20ms each time + for(int trial = 0; trial < 5; trial ++){ + if ( SystemToolkit::file_exists(parameter)) + break; + std::this_thread::sleep_for (std::chrono::milliseconds(20)); + } + // failed to find the shm socket file: cannot connect + if (!SystemToolkit::file_exists(parameter)) { + Log::Warning("Cannot connect to shared memory."); + failed_ = true; + } } - else - pipeline << config_.port; - // keep ending of pipeline - pipeline << pipelinestring.substr(xxxx + 4); - // add a videoconverter - pipeline << " ! videoconvert"; - // open the pipeline with generic stream class - Stream::open(pipeline.str(), config_.width, config_.height); + if (!failed_) { + // build the pipeline depending on stream info + std::ostringstream pipeline; + // get generic pipeline string + std::string pipelinestring = NetworkToolkit::protocol_receive_pipeline[config_.protocol]; + // find placeholder for PORT + int xxxx = pipelinestring.find("XXXX"); + // keep beginning of pipeline + pipeline << pipelinestring.substr(0, xxxx); + // Replace 'XXXX' by info on port config + pipeline << parameter; + // keep ending of pipeline + pipeline << pipelinestring.substr(xxxx + 4); + // add a videoconverter + pipeline << " ! videoconvert"; + + // open the pipeline with generic stream class + Stream::open(pipeline.str(), config_.width, config_.height); + } } } diff --git a/NetworkSource.h b/NetworkSource.h index 64be95b..39036ad 100644 --- a/NetworkSource.h +++ b/NetworkSource.h @@ -41,7 +41,6 @@ public: private: // connection information IpEndpointName streamer_address_; -// int listener_port_; StreamerResponseListener listener_; UdpListeningReceiveSocket *receiver_; std::atomic confirmed_; diff --git a/NetworkToolkit.cpp b/NetworkToolkit.cpp index 5a2d2eb..c7d3824 100644 --- a/NetworkToolkit.cpp +++ b/NetworkToolkit.cpp @@ -59,15 +59,15 @@ const char* NetworkToolkit::protocol_name[NetworkToolkit::DEFAULT] = { "Shared Memory", - "UDP Broadcast JPEG", - "UDP Broadcast H264", + "UDP RTP Stream JPEG", + "UDP RTP Stream H264", "TCP Broadcast JPEG", "TCP Broadcast H264" }; const std::vector NetworkToolkit::protocol_send_pipeline { - "video/x-raw, format=RGB, framerate=30/1 ! queue max-size-buffers=3 ! shmsink buffer-time=100000 name=sink", + "video/x-raw, format=RGB, framerate=30/1 ! queue max-size-buffers=3 ! shmsink buffer-time=100000 wait-for-connection=true name=sink", "video/x-raw, format=I420, framerate=30/1 ! queue max-size-buffers=3 ! jpegenc ! rtpjpegpay ! udpsink name=sink", "video/x-raw, format=I420, framerate=30/1 ! queue max-size-buffers=3 ! x264enc tune=\"zerolatency\" threads=2 ! rtph264pay ! udpsink name=sink", "video/x-raw, format=I420 ! queue max-size-buffers=3 ! jpegenc ! rtpjpegpay ! rtpstreampay ! tcpserversink name=sink", @@ -76,9 +76,9 @@ const std::vector NetworkToolkit::protocol_send_pipeline { const std::vector NetworkToolkit::protocol_receive_pipeline { - "shmsrc is-live=true socket-path=XXXX ! queue max-size-buffers=3 ! video/x-raw, format=RGB, framerate=30/1", - "udpsrc port=XXXX ! queue max-size-buffers=3 ! application/x-rtp,encoding-name=JPEG,payload=26 ! rtpjpegdepay ! jpegdec", - "udpsrc port=XXXX ! queue max-size-buffers=3 ! application/x-rtp,media=video,encoding-name=H264,payload=96,clock-rate=90000 ! rtph264depay ! avdec_h264", + "shmsrc socket-path=XXXX ! video/x-raw, format=RGB, framerate=30/1 ! queue max-size-buffers=3", + "udpsrc buffer-size=200000 port=XXXX ! application/x-rtp,encoding-name=JPEG,payload=26 ! queue max-size-buffers=3 ! rtpjpegdepay ! jpegdec", + "udpsrc buffer-size=200000 port=XXXX ! application/x-rtp,media=video,encoding-name=H264,payload=96,clock-rate=90000 ! queue max-size-buffers=3 ! rtph264depay ! avdec_h264", "tcpclientsrc timeout=1 port=XXXX ! queue max-size-buffers=3 ! application/x-rtp-stream,media=video,encoding-name=JPEG,payload=26 ! rtpstreamdepay ! rtpjpegdepay ! jpegdec", "tcpclientsrc timeout=1 port=XXXX ! queue max-size-buffers=3 ! application/x-rtp-stream,media=video,encoding-name=H264,payload=96,clock-rate=90000 ! rtpstreamdepay ! rtph264depay ! avdec_h264" }; diff --git a/SessionCreator.cpp b/SessionCreator.cpp index 43426cf..4f3610e 100644 --- a/SessionCreator.cpp +++ b/SessionCreator.cpp @@ -188,6 +188,9 @@ void SessionLoader::load(XMLElement *sessionNode) else if ( std::string(pType) == "DeviceSource") { load_source = new DeviceSource; } + else if ( std::string(pType) == "NetworkSource") { + load_source = new NetworkSource; + } // skip failed (including clones) if (!load_source) diff --git a/Streamer.cpp b/Streamer.cpp index 6598ffc..3211acb 100644 --- a/Streamer.cpp +++ b/Streamer.cpp @@ -183,7 +183,7 @@ void Streaming::addStream(const std::string &sender, int reply_to) else { conf.protocol = NetworkToolkit::UDP_JPEG; } - conf.protocol = NetworkToolkit::UDP_JPEG; +// conf.protocol = NetworkToolkit::UDP_JPEG; // build OSC message char buffer[IP_MTU_SIZE]; @@ -199,7 +199,7 @@ void Streaming::addStream(const std::string &sender, int reply_to) socket.Send( p.Data(), p.Size() ); #ifdef STREAMER_DEBUG - Log::Info("Accepting stream request from %s:%d", sender_ip.c_str(), reply_to); + Log::Info("Starting streaming to %s:%d", sender_ip.c_str(), conf.port); #endif // create streamer & remember it @@ -244,7 +244,7 @@ void VideoStreamer::addFrame (FrameBuffer *frame_buffer, float dt) return; // first frame for initialization - if (frame_buffer_ == nullptr) { + if (frame_buffer_ == nullptr) { // set frame buffer as input frame_buffer_ = frame_buffer; @@ -354,7 +354,6 @@ void VideoStreamer::addFrame (FrameBuffer *frame_buffer, float dt) streaming_ = true; } - // frame buffer changed ? else if (frame_buffer_ != frame_buffer) { @@ -422,7 +421,6 @@ void VideoStreamer::addFrame (FrameBuffer *frame_buffer, float dt) gst_buffer_unmap (buffer, &map); // push -// Log::Info("VideoRecorder push data %ld", buffer->pts); gst_app_src_push_buffer (src_, buffer); // NB: buffer will be unrefed by the appsrc @@ -443,8 +441,8 @@ void VideoStreamer::addFrame (FrameBuffer *frame_buffer, float dt) } } - // did the streaming terminate with sink receiving end-of-stream ? - else + // did the streaming receive end-of-stream ? + else if (!finished_) { // Wait for EOS message GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline_)); @@ -453,21 +451,28 @@ void VideoStreamer::addFrame (FrameBuffer *frame_buffer, float dt) if (msg) { // stop the pipeline GstStateChangeReturn ret = gst_element_set_state (pipeline_, GST_STATE_NULL); +#ifdef STREAMER_DEBUG if (ret == GST_STATE_CHANGE_FAILURE) - Log::Warning("VideoStreamer Could not stop"); + Log::Info("Streaming to %s:%d could not stop properly.", config_.client_address.c_str(), config_.port); else - Log::Notify("Stream finished after %s s.", GstToolkit::time_to_string(timestamp_).c_str()); - + Log::Info("Streaming to %s:%d ending...", config_.client_address.c_str(), config_.port); +#endif finished_ = true; } + } + // finished ! + else { // make sure the shared memory socket is deleted if (config_.protocol == NetworkToolkit::SHM_RAW) { - // TODO rename SHM socket "shm_PORT" std::string path = SystemToolkit::full_filename(SystemToolkit::settings_path(), "shm"); path += std::to_string(config_.port); SystemToolkit::remove_file(path); } + + Log::Notify("Streaming to %s:%d finished after %s s.", config_.client_address.c_str(), config_.port, + GstToolkit::time_to_string(timestamp_).c_str()); + } @@ -477,10 +482,7 @@ void VideoStreamer::stop () { // stop recording streaming_ = false; - - // send end of stream - if (src_) - gst_app_src_end_of_stream (src_); + finished_ = true; } @@ -488,17 +490,8 @@ std::string VideoStreamer::info() { std::string ret = "Streaming terminated."; if (streaming_) { - - if (config_.protocol == NetworkToolkit::TCP_JPEG || config_.protocol == NetworkToolkit::TCP_H264) { - - - ret = "TCP"; - - } - else if (config_.protocol == NetworkToolkit::SHM_RAW) { - ret = "Shared Memory"; - } - + ret = "Streaming "; + ret += NetworkToolkit::protocol_name[config_.protocol]; } return ret; }