From 32c83a6eee66d83c4250874312fc6100c160f484 Mon Sep 17 00:00:00 2001 From: Bruno Herbelin Date: Thu, 3 Oct 2024 14:56:58 +0200 Subject: [PATCH] BugFix Gst Pipeline closing and unreferencing Need to empty pipeline bus if IGNORE_GST_BUS_MESSAGE is not set. Ensuring all gst_objects are unreferenced properly to be erased. --- CMakeLists.txt | 11 +++-- src/FrameGrabber.cpp | 2 +- src/MediaPlayer.cpp | 93 ++++++++++++++++++++++++------------ src/MediaPlayer.h | 3 +- src/MultiFileRecorder.cpp | 3 ++ src/Stream.cpp | 81 ++++++++++++++++++++----------- src/Stream.h | 3 +- src/UserInterfaceManager.cpp | 10 ++++ 8 files changed, 141 insertions(+), 65 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5e6b6ae..766b88f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -87,9 +87,6 @@ if(UNIX) else() add_definitions(-DLINUX) - # add_definitions(-DUSE_GST_OPENGL_SYNC_HANDLER) - # add_definitions(-DUSE_GL_BUFFER_SUBDATA) - # add_definitions(-DIGNORE_GST_ERROR_MESSAGE) # CPACK set(CPACK_SYSTEM_NAME "${CMAKE_HOST_SYSTEM_NAME}") @@ -113,6 +110,14 @@ elseif(WIN32) add_definitions(-DMINGW32) endif() +##### +##### Preprocessor options +##### + +# add_definitions(-DUSE_GST_OPENGL_SYNC_HANDLER) +# add_definitions(-DUSE_GL_BUFFER_SUBDATA) +add_definitions(-DIGNORE_GST_BUS_MESSAGE) + ##### ##### Dependencies ##### diff --git a/src/FrameGrabber.cpp b/src/FrameGrabber.cpp index 2be6e2b..5d85c89 100644 --- a/src/FrameGrabber.cpp +++ b/src/FrameGrabber.cpp @@ -436,7 +436,7 @@ void FrameGrabber::addFrame (GstBuffer *buffer, GstCaps *caps) // if initialization succeeded if (initialized_) { -#ifdef IGNORE_GST_ERROR_MESSAGE +#ifdef IGNORE_GST_BUS_MESSAGE // avoid filling up bus with messages gst_bus_set_flushing(gst_element_get_bus(pipeline_), true); #else diff --git a/src/MediaPlayer.cpp b/src/MediaPlayer.cpp index d8e36b9..b66b3c0 100644 --- a/src/MediaPlayer.cpp +++ b/src/MediaPlayer.cpp @@ -104,7 +104,9 @@ MediaPlayer::~MediaPlayer() if (pbo_[0]) glDeleteBuffers(2, pbo_); - Log::Info("MediaPlayer %s Deleted", std::to_string(id_).c_str()); +#ifdef MEDIA_PLAYER_DEBUG + g_printerr("MediaPlayer %s deleted\n", std::to_string(id_).c_str()); +#endif } void MediaPlayer::accept(Visitor& v) { @@ -362,6 +364,7 @@ GstBusSyncReply MediaPlayer::signal_handler(GstBus *, GstMessage *msg, gpointer } // drop all messages to avoid filling up the stack + gst_message_unref (msg); return GST_BUS_DROP; } @@ -372,8 +375,6 @@ void MediaPlayer::execute_open() { // create playbin pipeline_ = gst_element_factory_make ("playbin", std::to_string(id_).c_str()); - // ref gst_object (unrefed on close) - gst_object_ref(pipeline_); // set uri of file to open g_object_set ( G_OBJECT (pipeline_), "uri", uri_.c_str(), NULL); @@ -477,7 +478,6 @@ void MediaPlayer::execute_open() // set playbin sink g_object_set ( G_OBJECT (pipeline_), "video-sink", sink, NULL); - gst_object_unref (sink); // done with ref to caps gst_caps_unref (caps); @@ -488,7 +488,7 @@ void MediaPlayer::execute_open() #endif // set to desired state (PLAY or PAUSE) - GstStateChangeReturn ret = gst_element_set_state (pipeline_, desired_state_); + GstStateChangeReturn ret = gst_element_set_state (GST_ELEMENT(pipeline_), desired_state_); if (ret == GST_STATE_CHANGE_FAILURE) { Log::Warning("MediaPlayer %s Could not open '%s'", std::to_string(id_).c_str(), uri_.c_str()); failed_ = true; @@ -502,13 +502,13 @@ void MediaPlayer::execute_open() timeline_.setEnd(d); } -#ifdef IGNORE_GST_ERROR_MESSAGE + bus_ = gst_element_get_bus(pipeline_); +#ifdef IGNORE_GST_BUS_MESSAGE // avoid filling up bus with messages - gst_bus_set_flushing(gst_element_get_bus(pipeline_), true); + gst_bus_set_flushing(bus_, true); #else // set message handler for the pipeline's bus - gst_bus_set_sync_handler(gst_element_get_bus(pipeline_), - MediaPlayer::signal_handler, this, NULL); + gst_bus_set_sync_handler(bus_, MediaPlayer::signal_handler, this, NULL); #endif // all good @@ -519,9 +519,8 @@ void MediaPlayer::execute_open() Log::Info("MediaPlayer %s Timeline [%ld %ld] %ld frames, %d gaps", std::to_string(id_).c_str(), timeline_.begin(), timeline_.end(), timeline_.numFrames(), timeline_.numGaps()); - if (media_.hasaudio) { + if (media_.hasaudio) Log::Info("MediaPlayer %s Audio track %s", std::to_string(id_).c_str(), audio_enabled_ ? "enabled" : "disabled"); - } opened_ = true; @@ -690,6 +689,15 @@ void MediaPlayer::execute_open() timeline_.setEnd(d); } + bus_ = gst_element_get_bus(pipeline_); +#ifdef IGNORE_GST_BUS_MESSAGE + // avoid filling up bus with messages + gst_bus_set_flushing(bus_, true); +#else + // set message handler for the pipeline's bus + gst_bus_set_sync_handler(bus_, MediaPlayer::signal_handler, this, NULL); +#endif + // all good Log::Info("MediaPlayer %s Opened '%s' (%s %d x %d)", std::to_string(id_).c_str(), SystemToolkit::filename(uri_).c_str(), media_.codec_name.c_str(), media_.width, media_.height); @@ -746,26 +754,41 @@ void MediaPlayer::Frame::unmap() { if (full) gst_video_frame_unmap(&vframe); + full = false; } - -void MediaPlayer::pipeline_terminate( GstElement *p ) +void MediaPlayer::pipeline_terminate( GstElement *p, GstBus *b ) { -#ifdef MEDIA_PLAYER_DEBUG gchar *name = gst_element_get_name(p); - g_printerr("MediaPlayer %s close\n", name); - Log::Info("MediaPlayer %s Closed", name); - g_free(name); +#ifdef MEDIA_PLAYER_DEBUG + g_printerr("MediaPlayer %s closing\n", name); #endif - // force end - GstStateChangeReturn ret = gst_element_set_state (p, GST_STATE_NULL); +#ifndef IGNORE_GST_BUS_MESSAGE + // empty pipeline bus (if used) + GstMessage *msg = NULL; + do { + if (msg) + gst_message_unref (msg); + msg = gst_bus_timed_pop_filtered(b, 1000000, GST_MESSAGE_ANY); + } while (msg != NULL); +#endif + // unref bus + gst_object_unref( GST_OBJECT(b) ); + + // end pipeline + GstStateChangeReturn ret = gst_element_set_state(p, GST_STATE_NULL); if (ret == GST_STATE_CHANGE_ASYNC) - gst_element_get_state (p, NULL, NULL, 1000000); + gst_element_get_state(p, NULL, NULL, 1000000); // unref to free pipeline - gst_object_unref ( p ); + while (GST_OBJECT_REFCOUNT_VALUE(p) > 0) + gst_object_unref( GST_OBJECT(p) ); + + // all done + Log::Info("MediaPlayer %s Closed", name); + g_free(name); // unregister MediaPlayer::registered_.remove(p); @@ -783,13 +806,6 @@ void MediaPlayer::close() return; } - // clean up GST - if (pipeline_ != nullptr) { - // end pipeline asynchronously - std::thread(MediaPlayer::pipeline_terminate, pipeline_).detach(); - pipeline_ = nullptr; - } - // cleanup eventual remaining frame memory for(guint i = 0; i < N_VFRAME; i++) { frame_[i].access.lock(); @@ -797,10 +813,19 @@ void MediaPlayer::close() frame_[i].status = INVALID; frame_[i].access.unlock(); } - write_index_ = 0; - last_index_ = 0; + + // clean up GST + if (pipeline_ != nullptr) { + // end pipeline asynchronously + std::thread(MediaPlayer::pipeline_terminate, pipeline_, bus_).detach(); + // immediately invalidate access for other methods + pipeline_ = nullptr; + bus_ = nullptr; + } // un-ready the media player + write_index_ = 0; + last_index_ = 0; opened_ = false; failed_ = false; pending_ = false; @@ -1358,6 +1383,12 @@ void MediaPlayer::update() if (need_loop && desired_state_ == GST_STATE_PLAYING) // avoid repeated call execute_loop_command(); +#ifndef IGNORE_GST_BUS_MESSAGE + GstMessage *msg = gst_bus_pop_filtered(bus_, GST_MESSAGE_ANY); + if (msg != NULL) + gst_message_unref(msg); +#endif + force_update_ = false; } @@ -1417,7 +1448,7 @@ void MediaPlayer::execute_seek_command(GstClockTime target, bool force) if (seek_event && gst_element_send_event(pipeline_, seek_event) ) { seeking_ = true; #ifdef MEDIA_PLAYER_DEBUG - Log::Info("MediaPlayer %s Seek %ld %.1f", std::to_string(id_).c_str(), seek_pos, rate_); + g_printerr("MediaPlayer %s Seek %ld %.1f", std::to_string(id_).c_str(), seek_pos, rate_); #endif } else diff --git a/src/MediaPlayer.h b/src/MediaPlayer.h index bc40413..c6c7afe 100644 --- a/src/MediaPlayer.h +++ b/src/MediaPlayer.h @@ -322,6 +322,7 @@ private: LoopMode loop_; GstState desired_state_; GstElement *pipeline_; + GstBus *bus_; GstVideoInfo v_frame_video_info_; std::atomic opened_; std::atomic failed_; @@ -409,7 +410,7 @@ private: static void callback_element_setup (GstElement *pipeline, GstElement *element, MediaPlayer *mp); // global list of registered media player - static void pipeline_terminate(GstElement *p); + static void pipeline_terminate(GstElement *p, GstBus *b); static std::list registered_; }; diff --git a/src/MultiFileRecorder.cpp b/src/MultiFileRecorder.cpp index 96769c1..a1de8d1 100644 --- a/src/MultiFileRecorder.cpp +++ b/src/MultiFileRecorder.cpp @@ -263,6 +263,9 @@ bool MultiFileRecorder::end_record () ret = false; } + gst_message_unref (msg); + gst_object_unref (bus); + // stop the pipeline GstStateChangeReturn r = gst_element_set_state (pipeline_, GST_STATE_NULL); if (r == GST_STATE_CHANGE_ASYNC) { diff --git a/src/Stream.cpp b/src/Stream.cpp index 4a00826..32161d1 100644 --- a/src/Stream.cpp +++ b/src/Stream.cpp @@ -84,6 +84,10 @@ Stream::~Stream() // cleanup picture buffer if (pbo_[0]) glDeleteBuffers(2, pbo_); + +#ifdef STREAM_DEBUG + g_printerr("Stream %s deleted\n", std::to_string(id_).c_str()); +#endif } void Stream::accept(Visitor& v) { @@ -231,18 +235,15 @@ GstBusSyncReply stream_signal_handler(GstBus *, GstMessage *msg, gpointer ptr) // only handle error messages if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR && ptr != nullptr) { GError *error; - gchar *debugs; - gst_message_parse_error(msg, &error, &debugs); - - Log::Warning("Stream %s Error %s", + gst_message_parse_error(msg, &error, NULL); + Log::Warning("Stream %s : %s", std::to_string(reinterpret_cast(ptr)->id()).c_str(), error->message); - g_error_free(error); - free(debugs); } // drop all messages to avoid filling up the stack + gst_message_unref (msg); return GST_BUS_DROP; } @@ -330,13 +331,13 @@ void Stream::execute_open() // instruct the sink to send samples synched in time if not live source gst_base_sink_set_sync (GST_BASE_SINK(sink), !live_); -#ifdef IGNORE_GST_ERROR_MESSAGE + bus_ = gst_element_get_bus(pipeline_); +#ifdef IGNORE_GST_BUS_MESSAGE // avoid filling up bus with messages - gst_bus_set_flushing(gst_element_get_bus(pipeline_), true); + gst_bus_set_flushing(bus_, true); #else // set message handler for the pipeline's bus - gst_bus_set_sync_handler(gst_element_get_bus(pipeline_), - stream_signal_handler, this, NULL); + gst_bus_set_sync_handler(bus_, stream_signal_handler, this, NULL); #endif // done with refs @@ -383,28 +384,44 @@ bool Stream::failed() const void Stream::Frame::unmap() { - if ( full ) + if (full) gst_video_frame_unmap(&vframe); + full = false; } -void Stream::pipeline_terminate( GstElement *p ) +void Stream::pipeline_terminate( GstElement *p, GstBus *b ) { gchar *name = gst_element_get_name(p); #ifdef STREAM_DEBUG - g_printerr("Stream %s close\n", name); + g_printerr("Stream %s closing\n", name); #endif - Log::Info("Stream %s Closed", name); - g_free(name); - // force end +#ifndef IGNORE_GST_BUS_MESSAGE + // empty pipeline bus (if used) + GstMessage *msg = NULL; + do { + if (msg) + gst_message_unref (msg); + msg = gst_bus_timed_pop_filtered(b, 1000000, GST_MESSAGE_ANY); + } while (msg != NULL); +#endif + // unref bus + gst_object_unref( GST_OBJECT(b) ); + + // end pipeline GstStateChangeReturn ret = gst_element_set_state (p, GST_STATE_NULL); if (ret == GST_STATE_CHANGE_ASYNC) gst_element_get_state (p, NULL, NULL, 1000000); // unref to free pipeline - gst_object_unref ( p ); + while (GST_OBJECT_REFCOUNT_VALUE(p) > 0) + gst_object_unref( GST_OBJECT(p) ); + + // all done + Log::Info("Stream %s Closed", name); + g_free(name); // unregister Stream::registered_.remove(p); @@ -421,25 +438,27 @@ void Stream::close() return; } - // un-ready - opened_ = false; - - // clean up GST - if (pipeline_ != nullptr) { - // end pipeline asynchronously - std::thread(Stream::pipeline_terminate, pipeline_).detach(); - pipeline_ = nullptr; - } - // cleanup eventual remaining frame memory for(guint i = 0; i < N_FRAME; ++i){ frame_[i].access.lock(); frame_[i].unmap(); + frame_[i].status = INVALID; frame_[i].access.unlock(); } + + // clean up GST + if (pipeline_ != nullptr) { + // end pipeline asynchronously + std::thread(Stream::pipeline_terminate, pipeline_, bus_).detach(); + // immediately invalidate access for other methods + pipeline_ = nullptr; + bus_ = nullptr; + } + + // un-ready + opened_ = false; write_index_ = 0; last_index_ = 0; - } @@ -785,6 +804,12 @@ void Stream::update() // stop on end of stream play(false); } + +#ifndef IGNORE_GST_BUS_MESSAGE + GstMessage *msg = gst_bus_pop_filtered(bus_, GST_MESSAGE_ANY); + if (msg != NULL) + gst_message_unref(msg); +#endif } double Stream::updateFrameRate() const diff --git a/src/Stream.h b/src/Stream.h index d110b55..32a635c 100644 --- a/src/Stream.h +++ b/src/Stream.h @@ -184,6 +184,7 @@ protected: GstClockTime position_; GstState desired_state_; GstElement *pipeline_; + GstBus *bus_; GstVideoInfo v_frame_video_info_; std::atomic opened_; std::atomic failed_; @@ -254,7 +255,7 @@ protected: static GstFlowReturn callback_new_sample (GstAppSink *, gpointer); // global list of registered streams - static void pipeline_terminate(GstElement *p); + static void pipeline_terminate(GstElement *p, GstBus *b); static std::list registered_; }; diff --git a/src/UserInterfaceManager.cpp b/src/UserInterfaceManager.cpp index 989d9b7..d5e16fc 100644 --- a/src/UserInterfaceManager.cpp +++ b/src/UserInterfaceManager.cpp @@ -6364,6 +6364,16 @@ void ShowSandbox(bool* p_open) ImGui::Text("Testing sandox"); ImGui::Separator(); + ImGui::Separator(); + ImGui::Text("Reset GST"); + + if (ImGui::Button("RESET GSTREAMER")){ + + gst_deinit(); + // gst_init (NULL, NULL); + } + + ImGui::Text("Source list"); Session *se = Mixer::manager().session(); for (auto sit = se->begin(); sit != se->end(); ++sit) {