From f02a99a4e2b24c166d14da6ca9eacc6af3ed2b77 Mon Sep 17 00:00:00 2001 From: Bruno Herbelin Date: Fri, 31 Dec 2021 13:15:23 +0100 Subject: [PATCH] Improved GenericStreamSource, with stream discoverer Also timeout to fail if open does not works + new GST icon. --- Source.h | 3 +- Stream.cpp | 93 ++++++++++++++++++++++++------------------- Stream.h | 13 ++++-- StreamSource.cpp | 33 ++++++++++++--- StreamSource.h | 8 ++++ rsc/images/icons.dds | Bin 1638528 -> 1638528 bytes 6 files changed, 98 insertions(+), 52 deletions(-) diff --git a/Source.h b/Source.h index 49820f4..853dcda 100644 --- a/Source.h +++ b/Source.h @@ -23,7 +23,8 @@ #define ICON_SOURCE_GROUP 10, 6 #define ICON_SOURCE_RENDER 0, 2 #define ICON_SOURCE_CLONE 9, 2 -#define ICON_SOURCE 12, 11 +#define ICON_SOURCE_GSTREAMER 16, 16 +#define ICON_SOURCE 13, 11 class SourceCallback; class ImageShader; diff --git a/Stream.cpp b/Stream.cpp index 6bba9f8..0f603f1 100644 --- a/Stream.cpp +++ b/Stream.cpp @@ -17,6 +17,9 @@ * along with this program. If not, see . **/ +#include +#include + // Desktop OpenGL function loader #include @@ -124,15 +127,10 @@ GstFlowReturn callback_stream_discoverer (GstAppSink *sink, gpointer p) StreamInfo StreamDiscoverer(const std::string &description, guint w, guint h) { // the stream info to return - StreamInfo info; + StreamInfo info(w, h); - // obvious fast answer: valid values are provided in argument - if (w > 0 && h > 0 ) { - info.width = w; - info.height = h; - } - // otherwise, run a test pipeline to discover the size of the stream - else { + // no valid info, run a test pipeline to discover the size of the stream + if ( !info.valid() ) { // complete the pipeline description with an appsink (to add a callback) std::string _description = description; _description += " ! appsink name=sink"; @@ -157,19 +155,15 @@ StreamInfo StreamDiscoverer(const std::string &description, guint w, guint h) // start to play the pipeline gst_element_set_state (_pipeline, GST_STATE_PLAYING); - // wait for the callback_stream_discoverer to return + // wait for the callback_stream_discoverer to return, no more than 4 sec std::mutex mtx; std::unique_lock lck(mtx); - // if waited more than 2 seconds, its dead :( - if ( info.discovered.wait_for(lck,std::chrono::seconds(2)) == std::cv_status::timeout) - Log::Warning("Failed to discover stream size."); + info.discovered.wait_for(lck,std::chrono::seconds(TIMEOUT)); // stop and delete pipeline GstStateChangeReturn ret = gst_element_set_state (_pipeline, GST_STATE_NULL); - if (ret == GST_STATE_CHANGE_ASYNC) { - GstState state; - gst_element_get_state (_pipeline, &state, NULL, GST_CLOCK_TIME_NONE); - } + if (ret == GST_STATE_CHANGE_ASYNC) + gst_element_get_state (_pipeline, NULL, NULL, 1000000); gst_object_unref (_pipeline); } } @@ -214,9 +208,8 @@ void Stream::execute_open() GError *error = NULL; pipeline_ = gst_parse_launch (description.c_str(), &error); if (error != NULL) { - Log::Warning("Stream %s Could not construct pipeline %s:\n%s", std::to_string(id_).c_str(), description.c_str(), error->message); + fail(std::string("Could not construct pipeline: ") + error->message + "\n" + description); g_clear_error (&error); - failed_ = true; return; } g_object_set(G_OBJECT(pipeline_), "name", std::to_string(id_).c_str(), NULL); @@ -227,16 +220,14 @@ void Stream::execute_open() ",height=" + std::to_string(height_); GstCaps *caps = gst_caps_from_string(capstring.c_str()); if (!caps || !gst_video_info_from_caps (&v_frame_video_info_, caps)) { - Log::Warning("Stream %d Could not configure video frame info", id_); - failed_ = true; + fail("Could not configure video frame info"); return; } // setup appsink GstElement *sink = gst_bin_get_by_name (GST_BIN (pipeline_), "sink"); if (!sink) { - Log::Warning("Stream %s Could not configure sink", std::to_string(id_).c_str()); - failed_ = true; + fail("Could not configure pipeline sink."); return; } @@ -276,8 +267,7 @@ void Stream::execute_open() live_ = false; GstStateChangeReturn ret = gst_element_set_state (pipeline_, desired_state_); if (ret == GST_STATE_CHANGE_FAILURE) { - Log::Warning("Stream %s Could not open '%s'", std::to_string(id_).c_str(), description_.c_str()); - failed_ = true; + fail(std::string("Could not open ") + description_); return; } else if (ret == GST_STATE_CHANGE_NO_PREROLL) { @@ -295,6 +285,24 @@ void Stream::execute_open() // all good Log::Info("Stream %s Opened '%s' (%d x %d)", std::to_string(id_).c_str(), description.c_str(), width_, height_); opened_ = true; + + // launch a timeout to check on open status + std::thread( timeout_open, this ).detach(); +} + +void Stream::fail(const std::string &message) +{ + Log::Warning("Stream %s %s.", std::to_string(id_).c_str(), message.c_str() ); + failed_ = true; +} + +void Stream::timeout_open(Stream *str) +{ + // vait for timeout + std::this_thread::sleep_for(std::chrono::seconds(TIMEOUT)); + + if (!str->textureinitialized_) + str->fail("Failed to initialize"); } bool Stream::isOpen() const @@ -332,15 +340,13 @@ void Stream::close() // clean up GST if (pipeline_ != nullptr) { // force flush - GstState state; gst_element_send_event(pipeline_, gst_event_new_seek (1.0, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_END, 0) ); - gst_element_get_state (pipeline_, &state, NULL, GST_CLOCK_TIME_NONE); - + gst_element_get_state (pipeline_, NULL, NULL, 1000000); + // force end GstStateChangeReturn ret = gst_element_set_state (pipeline_, GST_STATE_NULL); - if (ret == GST_STATE_CHANGE_ASYNC) { - gst_element_get_state (pipeline_, &state, NULL, GST_CLOCK_TIME_NONE); - } + if (ret == GST_STATE_CHANGE_ASYNC) + gst_element_get_state (pipeline_, NULL, NULL, 1000000); gst_object_unref (pipeline_); pipeline_ = nullptr; } @@ -391,10 +397,8 @@ void Stream::enable(bool on) // apply state change GstStateChangeReturn ret = gst_element_set_state (pipeline_, requested_state); - if (ret == GST_STATE_CHANGE_FAILURE) { - Log::Warning("Stream %s Failed to enable", std::to_string(id_).c_str()); - failed_ = true; - } + if (ret == GST_STATE_CHANGE_FAILURE) + fail("Failed to enable"); } } @@ -436,10 +440,9 @@ void Stream::play(bool on) // all ready, apply state change immediately GstStateChangeReturn ret = gst_element_set_state (pipeline_, desired_state_); - if (ret == GST_STATE_CHANGE_FAILURE) { - Log::Warning("Stream %s Failed to play", std::to_string(id_).c_str()); - failed_ = true; - } + if (ret == GST_STATE_CHANGE_FAILURE) + fail("Failed to play"); + #ifdef STREAM_DEBUG else if (on) Log::Info("Stream %s Start", std::to_string(id_).c_str()); @@ -616,11 +619,17 @@ void Stream::update() // try to get info from discoverer if (discoverer_.wait_for( std::chrono::milliseconds(4) ) == std::future_status::ready ) { - // got all info needed for openning ! + // get info StreamInfo i(discoverer_.get()); - width_ = i.width; - height_ = i.height; - execute_open(); + if (i.valid()) { + // got all info needed for openning ! + width_ = i.width; + height_ = i.height; + execute_open(); + } + // invalid info; fail + else + fail("Failed to determine resolution"); } } // wait next frame to display diff --git a/Stream.h b/Stream.h index 465e7b2..66a6c8a 100644 --- a/Stream.h +++ b/Stream.h @@ -15,6 +15,7 @@ class Visitor; #define N_FRAME 3 +#define TIMEOUT 4 struct StreamInfo { @@ -22,15 +23,17 @@ struct StreamInfo { guint height; std::condition_variable discovered; - StreamInfo() { - width = 640; - height = 480; + StreamInfo(guint w=0, guint h=0) { + width = w; + height = h; } StreamInfo(const StreamInfo& b) { width = b.width; height = b.height; } + + inline bool valid() { return width > 0 && height > 0; } }; class Stream { @@ -207,9 +210,11 @@ protected: // gst pipeline control virtual void execute_open(); + virtual void fail(const std::string &message); + static void timeout_open(Stream *str); // gst frame filling - bool textureinitialized_; + std::atomic textureinitialized_; void init_texture(guint index); void fill_texture(guint index); bool fill_frame(GstBuffer *buf, FrameStatus status); diff --git a/StreamSource.cpp b/StreamSource.cpp index 2773891..1bc52b0 100644 --- a/StreamSource.cpp +++ b/StreamSource.cpp @@ -28,6 +28,7 @@ #include "Stream.h" #include "Visitor.h" #include "Log.h" +#include "BaseToolkit.h" #include "StreamSource.h" @@ -44,19 +45,29 @@ GenericStreamSource::GenericStreamSource() : StreamSource() void GenericStreamSource::setDescription(const std::string &desc) { - Log::Notify("Creating Source with Stream description '%s'", desc.c_str()); - - std::string pipeline = desc; - pipeline.append(" ! queue max-size-buffers=10 ! videoconvert"); + gst_description_ = desc; + gst_elements_ = BaseToolkit::splitted(desc, '!'); + Log::Notify("Creating Source with Stream description '%s'", gst_description_.c_str()); // open gstreamer - stream_->open(pipeline); + stream_->open(gst_description_ + " ! queue max-size-buffers=10 ! videoconvert" ); stream_->play(true); // will be ready after init and one frame rendered ready_ = false; } + +std::string GenericStreamSource::description() const +{ + return gst_description_; +} + +std::list GenericStreamSource::gstElements() const +{ + return gst_elements_; +} + void GenericStreamSource::accept(Visitor& v) { Source::accept(v); @@ -64,6 +75,18 @@ void GenericStreamSource::accept(Visitor& v) v.visit(*this); } +glm::ivec2 GenericStreamSource::icon() const +{ + return glm::ivec2(ICON_SOURCE_GSTREAMER); +} + +std::string GenericStreamSource::info() const +{ + std::string src_element = gst_elements_.front(); + src_element = src_element.substr(0, src_element.find(" ")); + return std::string("Gstreamer custom pipeline with source '")+src_element+"'"; +} + StreamSource::StreamSource(uint64_t id) : Source(id), stream_(nullptr) { } diff --git a/StreamSource.h b/StreamSource.h index cbc6df8..eb788ef 100644 --- a/StreamSource.h +++ b/StreamSource.h @@ -59,6 +59,9 @@ protected: */ class GenericStreamSource : public StreamSource { + std::string gst_description_; + std::list gst_elements_; + public: GenericStreamSource(); @@ -70,6 +73,11 @@ public: // specific interface void setDescription(const std::string &desc); + std::string description() const; + std::list gstElements() const; + + glm::ivec2 icon() const override; + std::string info() const override; }; #endif // STREAMSOURCE_H diff --git a/rsc/images/icons.dds b/rsc/images/icons.dds index 623faa839a739f65814ca9b969518bdae8587b43..22799f97c83ed4251dfd90caf30b77efda374387 100644 GIT binary patch delta 2426 zcmZ`(U2GIp6h5=dP(YN~GDAz?VX*~&LJ1g)g8MM_C&5I_AUicCjY^9UZF~^LC=s^6 zGz4jYl?@8nkOu#kK-BmbIv{I z`_4Ug+BUCl!=I;lAusGr@N{pYSK&?aCVNx7d%UUMz21G^G|%uNUZpqvsF|%@VhB;! zN<=n(SSzv$rb*(5P zX+5SD-;nwBQJR|KMbaN5n$xZfHXBwo4-U%&6nEWq!~9+@Sk(JMFVjrutZQ{GOJ^Z- z!r9F+(=-xMLcgHjw(W!BUvVq{@#3+>&z*~C%Uh?iko}zV+>p`YjKFi0kShJ|ZlR(C`d|Ip#w~_)4>$+*p%n_?;H1 znE5TP!?8b?jL-RrX1K1+HHJjX*?@zpIWo%We;n8_aDLfn{DFZPOC$%*XK7_lb<9bV zr0j5Z_iA&M5sNYvMHpOY^(SN~400H_eeXwD2gm{AT<0>tVu4n2EzxfxhO^tM)n)pS;bv)ei`or%uF(Msj)Pw`2w_1npvaM8-A z<>yB=mt7}H(G<^$O77q}BGz4@6*As+<4VgJNk)j$WrdZRJ9MLA^s97Rd-@n-a~Hl* zZypI($)kZ9y$YkO21f=Dc=F3K+c=V3xf*#SVDAFw zcT^&Y-zw^Upw>?g^_D86ihYz~6kP0I%z4I=s1UINjHtsazQ)IX2OA$9k2O@el}BL3 z9%IdGvsO0a+_1kB(})$KYWO*yHu_!s!>lwCcJ!}L(TC&5?9NQ4;lxGR(56DH(1>mf z`R$DX%kIk0H|J6K!@25%9xKc)oaQq0tXBMhH(kn4#(%azoPL4ot?63YVulSr56#eQ8=AE8y@><|B#r>7+r z(vNf#X9S<^IF6d*^M``HGOgIZZ#@3KG#>MAU%c9Icig7}f_P^`y~wjEFg&&QPkDcu zkZ`>cBX2ok4^^r36+bj)2+&EQ* zw`ltqzfCkOng^+YD1YPepE8-RPjl^yV&RF6#aIE<8YLG|8s)BVS&MzU;g0x7-;7vc z)&NSk6|0&E9|uX;wI<6^Uojz@oN3w9UWvkiCB*CkPkZ|y;S%|=jr?r?l^~GIgB+ei wy^*oFdqXW>ALX}b9&g5C%(sjiw{S{!_W~ms+IFqp!fytCRoQFx)w|CA2Po*wTmS$7 delta 349 zcmZo@Olbh37RDB)7UmX~7S