BugFix DeviceSource: shared access to stream vis Device::manager

Creation of multiple DeviceSources is possible (also for multiple sessions and transitions) through centralized management of gst streams. Creation and deletion of a shared stream accross DeviceSources is handled.
This commit is contained in:
Bruno Herbelin
2022-01-20 01:28:30 +01:00
parent 625e2305ba
commit b82c83de5e
6 changed files with 208 additions and 139 deletions

View File

@@ -114,27 +114,52 @@ Device::callback_device_monitor (GstBus *, GstMessage * message, gpointer )
return G_SOURCE_CONTINUE; return G_SOURCE_CONTINUE;
} }
struct hasDeviceName: public std::unary_function<DeviceHandle, bool>
{
inline bool operator()(const DeviceHandle &elem) const {
return (elem.name.compare(_name) == 0);
}
explicit hasDeviceName(const std::string &name) : _name(name) { }
private:
std::string _name;
};
struct hasConnectedSource: public std::unary_function<DeviceHandle, bool>
{
inline bool operator()(const DeviceHandle &elem) const {
auto sit = std::find(elem.connected_sources.begin(), elem.connected_sources.end(), s_);
return sit != elem.connected_sources.end();
}
explicit hasConnectedSource(Source *s) : s_(s) { }
private:
Source *s_;
};
void Device::add(GstDevice *device) void Device::add(GstDevice *device)
{ {
if (device==nullptr) if (device==nullptr)
return; return;
gchar *device_name = gst_device_get_display_name (device);
// lock before change // lock before change
access_.lock(); access_.lock();
gchar *device_name = gst_device_get_display_name (device); // if device with this name is not already listed
auto handle = std::find_if(handles_.cbegin(), handles_.cend(), hasDeviceName(device_name) );
if ( handle == handles_.cend() ) {
if ( std::find(manager().src_name_.begin(), manager().src_name_.end(), device_name) == manager().src_name_.end()) { std::string p = pipelineForDevice(device, handles_.size());
std::string p = pipelineForDevice(device, manager().src_description_.size());
DeviceConfigSet confs = getDeviceConfigs(p); DeviceConfigSet confs = getDeviceConfigs(p);
// add if not in the list and valid // add if not in the list and valid
if (!p.empty() && !confs.empty()) { if (!p.empty() && !confs.empty()) {
src_name_.push_back(device_name); DeviceHandle dev;
src_description_.push_back(p); dev.name = device_name;
src_config_.push_back(confs); dev.pipeline = p;
dev.configs = confs;
handles_.push_back(dev);
#ifdef GST_DEVICE_DEBUG #ifdef GST_DEVICE_DEBUG
gchar *stru = gst_structure_to_string( gst_device_get_properties(device) ); gchar *stru = gst_structure_to_string( gst_device_get_properties(device) );
g_print("\nDevice %s plugged : %s\n", device_name, stru); g_print("\nDevice %s plugged : %s\n", device_name, stru);
@@ -143,10 +168,10 @@ void Device::add(GstDevice *device)
} }
} }
g_free (device_name);
// unlock access // unlock access
access_.unlock(); access_.unlock();
g_free (device_name);
} }
void Device::remove(GstDevice *device) void Device::remove(GstDevice *device)
@@ -154,40 +179,35 @@ void Device::remove(GstDevice *device)
if (device==nullptr) if (device==nullptr)
return; return;
gchar *device_name = gst_device_get_display_name (device);
// lock before change // lock before change
access_.lock(); access_.lock();
gchar *device_name = gst_device_get_display_name (device); // if a device with this name is listed
auto handle = std::find_if(handles_.cbegin(), handles_.cend(), hasDeviceName(device_name) );
if ( handle != handles_.cend() ) {
std::vector< std::string >::iterator nameit = src_name_.begin(); // remove the handle if there is no source connected
std::vector< std::string >::iterator descit = src_description_.begin(); if (handle->connected_sources.empty())
std::vector< DeviceConfigSet >::iterator coit = src_config_.begin(); handles_.erase(handle);
while (nameit != src_name_.end()){ else {
// otherwise unplug all sources
if ( (*nameit).compare(device_name) == 0 ) for (auto sit = handle->connected_sources.begin(); sit != handle->connected_sources.end(); ++sit)
{ (*sit)->unplug();
src_name_.erase(nameit); // and inform user
src_description_.erase(descit); Log::Warning("Device %s unplugged.", device_name);
src_config_.erase(coit); // NB; the handle will be removed at the destruction of the last DeviceSource
monitor_unplug_event_ = true;
#ifdef GST_DEVICE_DEBUG
g_print("\nDevice %s unplugged\n", device_name);
#endif
break;
} }
++nameit;
++descit;
++coit;
} }
g_free (device_name);
// unlock access // unlock access
access_.unlock(); access_.unlock();
g_free (device_name);
} }
Device::Device(): monitor_initialized_(false), monitor_unplug_event_(false) Device::Device(): monitor_initialized_(false), monitor_unplug_event_(false)
{ {
std::thread(launchMonitoring, this).detach(); std::thread(launchMonitoring, this).detach();
@@ -226,9 +246,12 @@ void Device::launchMonitoring(Device *d)
confscreen.insert(best); confscreen.insert(best);
// add to config list // add to config list
d->access_.lock(); d->access_.lock();
d->src_name_.push_back("Screen capture");
d->src_description_.push_back(gst_plugin_vidcap); DeviceHandle dev;
d->src_config_.push_back(confscreen); dev.name = "Screen capture";
dev.pipeline = gst_plugin_vidcap;
dev.configs = confscreen;
d->handles_.push_back(dev);
d->access_.unlock(); d->access_.unlock();
} }
@@ -263,7 +286,7 @@ bool Device::initialized()
int Device::numDevices() int Device::numDevices()
{ {
access_.lock(); access_.lock();
int ret = src_name_.size(); int ret = handles_.size();
access_.unlock(); access_.unlock();
return ret; return ret;
@@ -272,8 +295,8 @@ int Device::numDevices()
bool Device::exists(const std::string &device) bool Device::exists(const std::string &device)
{ {
access_.lock(); access_.lock();
std::vector< std::string >::const_iterator d = std::find(src_name_.begin(), src_name_.end(), device); auto h = std::find_if(handles_.cbegin(), handles_.cend(), hasDeviceName(device));
bool ret = (d != src_name_.end()); bool ret = (h != handles_.cend());
access_.unlock(); access_.unlock();
return ret; return ret;
@@ -289,35 +312,13 @@ private:
std::string _d; std::string _d;
}; };
Source *Device::createSource(const std::string &device) const
{
Source *s = nullptr;
// find if a DeviceSource with this device is already registered
std::list< DeviceSource *>::const_iterator d = std::find_if(device_sources_.begin(), device_sources_.end(), hasDevice(device));
// if already registered, clone the device source
if ( d != device_sources_.end()) {
CloneSource *cs = (*d)->clone();
s = cs;
}
// otherwise, we are free to create a new device source
else {
DeviceSource *ds = new DeviceSource();
ds->setDevice(device);
s = ds;
}
return s;
}
std::string Device::name(int index) std::string Device::name(int index)
{ {
std::string ret = ""; std::string ret = "";
access_.lock(); access_.lock();
if (index > -1 && index < (int) src_name_.size()) if (index > -1 && index < (int) handles_.size())
ret = src_name_[index]; ret = handles_[index].name;
access_.unlock(); access_.unlock();
return ret; return ret;
@@ -328,8 +329,8 @@ std::string Device::description(int index)
std::string ret = ""; std::string ret = "";
access_.lock(); access_.lock();
if (index > -1 && index < (int) src_description_.size()) if (index > -1 && index < (int) handles_.size())
ret = src_description_[index]; ret = handles_[index].pipeline;
access_.unlock(); access_.unlock();
return ret; return ret;
@@ -340,8 +341,8 @@ DeviceConfigSet Device::config(int index)
DeviceConfigSet ret; DeviceConfigSet ret;
access_.lock(); access_.lock();
if (index > -1 && index < (int) src_config_.size()) if (index > -1 && index < (int) handles_.size())
ret = src_config_[index]; ret = handles_[index].configs;
access_.unlock(); access_.unlock();
return ret; return ret;
@@ -350,20 +351,18 @@ DeviceConfigSet Device::config(int index)
int Device::index(const std::string &device) int Device::index(const std::string &device)
{ {
int i = -1; int i = -1;
access_.lock(); access_.lock();
std::vector< std::string >::iterator p = std::find(src_name_.begin(), src_name_.end(), device); auto h = std::find_if(handles_.cbegin(), handles_.cend(), hasDeviceName(device));
if (p != src_name_.end()) if (h != handles_.cend())
i = std::distance(src_name_.begin(), p); i = std::distance(handles_.cbegin(), h);
access_.unlock(); access_.unlock();
return i; return i;
} }
DeviceSource::DeviceSource(uint64_t id) : StreamSource(id) DeviceSource::DeviceSource(uint64_t id) : StreamSource(id), unplugged_(false)
{ {
// create stream
stream_ = new Stream;
// set symbol // set symbol
symbol_ = new Symbol(Symbol::CAMERA, glm::vec3(0.75f, 0.75f, 0.01f)); symbol_ = new Symbol(Symbol::CAMERA, glm::vec3(0.75f, 0.75f, 0.01f));
symbol_->scale_.y = 1.5f; symbol_->scale_.y = 1.5f;
@@ -371,10 +370,32 @@ DeviceSource::DeviceSource(uint64_t id) : StreamSource(id)
DeviceSource::~DeviceSource() DeviceSource::~DeviceSource()
{ {
// unregister this device source // unregister this device source from a Device handler
Device::manager().device_sources_.remove(this); auto h = std::find_if(Device::manager().handles_.begin(), Device::manager().handles_.end(), hasConnectedSource(this));
if (h != Device::manager().handles_.end())
{
// remove this pointer to the list of connected sources
h->connected_sources.remove(this);
// if this is the last source connected to the device handler
// the stream will be removed by the ~StreamSource destructor
// and the device handler should not keep reference to it
if (h->connected_sources.empty()) {
// if the cause of deletion is the unplugging of the device
if (unplugged_)
// remove the handle entirely
Device::manager().handles_.erase(h);
else
// otherwise just cancel the reference to the stream
h->stream = nullptr;
}
// else this means another DeviceSource is using this stream
// and we should avoid to delete the stream in the ~StreamSource destructor
else
stream_ = nullptr;
}
} }
void DeviceSource::setDevice(const std::string &devicename) void DeviceSource::setDevice(const std::string &devicename)
{ {
// instanciate and wait for monitor initialization if not already initialized // instanciate and wait for monitor initialization if not already initialized
@@ -385,60 +406,106 @@ void DeviceSource::setDevice(const std::string &devicename)
// remember device name // remember device name
device_ = devicename; device_ = devicename;
// check existence of a device with that name // check existence of a device handle with that name
int index = Device::manager().index(device_); auto h = std::find_if(Device::manager().handles_.begin(), Device::manager().handles_.end(), hasDeviceName(device_));
if (index > -1) { if ( h != Device::manager().handles_.end()) {
// register this device source // find if a DeviceHandle with this device name already has a stream
Device::manager().device_sources_.push_back(this); if ( h->stream != nullptr) {
// just use it !
stream_ = h->stream;
}
else {
// start filling in the gstreamer pipeline
std::ostringstream pipeline;
pipeline << h->pipeline;
// start filling in the gstreamer pipeline // test the device and get config
std::ostringstream pipeline; DeviceConfigSet confs = h->configs;
pipeline << Device::manager().description(index);
// test the device and get config
DeviceConfigSet confs = Device::manager().config(index);
#ifdef DEVICE_DEBUG #ifdef DEVICE_DEBUG
Log::Info("Device %s supported configs:", devicename.c_str()); Log::Info("Device %s supported configs:", devicename.c_str());
for( DeviceConfigSet::iterator it = confs.begin(); it != confs.end(); ++it ){ for( DeviceConfigSet::iterator it = confs.begin(); it != confs.end(); ++it ){
float fps = static_cast<float>((*it).fps_numerator) / static_cast<float>((*it).fps_denominator); float fps = static_cast<float>((*it).fps_numerator) / static_cast<float>((*it).fps_denominator);
Log::Info(" - %s %s %d x %d %.1f fps", (*it).stream.c_str(), (*it).format.c_str(), (*it).width, (*it).height, fps); Log::Info(" - %s %s %d x %d %.1f fps", (*it).stream.c_str(), (*it).format.c_str(), (*it).width, (*it).height, fps);
} }
#endif #endif
if (!confs.empty()) { if (!confs.empty()) {
DeviceConfig best = *confs.rbegin(); DeviceConfig best = *confs.rbegin();
float fps = static_cast<float>(best.fps_numerator) / static_cast<float>(best.fps_denominator); float fps = static_cast<float>(best.fps_numerator) / static_cast<float>(best.fps_denominator);
Log::Info("Device %s selected its optimal config: %s %s %dx%d@%.1ffps", device_.c_str(), best.stream.c_str(), best.format.c_str(), best.width, best.height, fps); Log::Info("Device %s selected its optimal config: %s %s %dx%d@%.1ffps", device_.c_str(), best.stream.c_str(), best.format.c_str(), best.width, best.height, fps);
pipeline << " ! " << best.stream; pipeline << " ! " << best.stream;
if (!best.format.empty()) if (!best.format.empty())
pipeline << ",format=" << best.format; pipeline << ",format=" << best.format;
pipeline << ",framerate=" << best.fps_numerator << "/" << best.fps_denominator; pipeline << ",framerate=" << best.fps_numerator << "/" << best.fps_denominator;
pipeline << ",width=" << best.width; pipeline << ",width=" << best.width;
pipeline << ",height=" << best.height; pipeline << ",height=" << best.height;
if ( best.stream.find("jpeg") != std::string::npos ) if ( best.stream.find("jpeg") != std::string::npos )
pipeline << " ! jpegdec"; pipeline << " ! jpegdec";
if ( device_.find("Screen") != std::string::npos ) if ( device_.find("Screen") != std::string::npos )
pipeline << " ! videoconvert ! video/x-raw,format=RGB ! queue max-size-buffers=3"; pipeline << " ! videoconvert ! video/x-raw,format=RGB ! queue max-size-buffers=3";
pipeline << " ! videoconvert"; pipeline << " ! videoconvert";
// resize render buffer // resize render buffer
if (renderbuffer_) if (renderbuffer_)
renderbuffer_->resize(best.width, best.height); renderbuffer_->resize(best.width, best.height);
// open gstreamer // new stream
stream_->open( pipeline.str(), best.width, best.height); stream_ = h->stream = new Stream;
stream_->play(true);
// open gstreamer
h->stream->open( pipeline.str(), best.width, best.height);
h->stream->play(true);
}
} }
// reference this source in the handle
h->connected_sources.push_back(this);
// will be ready after init and one frame rendered // will be ready after init and one frame rendered
ready_ = false; ready_ = false;
} }
else else {
Log::Warning("No such device '%s'", device_.c_str()); Log::Warning("No such device '%s'", device_.c_str());
}
}
void DeviceSource::setActive (bool on)
{
bool was_active = active_;
// try to activate (may fail if source is cloned)
Source::setActive(on);
if (stream_) {
// change status of stream (only if status changed)
if (active_ != was_active) {
// activate a source if any of the handled device source is active
auto h = std::find_if(Device::manager().handles_.begin(), Device::manager().handles_.end(), hasConnectedSource(this));
if (h != Device::manager().handles_.end()) {
bool streamactive = false;
for (auto sit = h->connected_sources.begin(); sit != h->connected_sources.end(); ++sit) {
if ( (*sit)->active_)
streamactive = true;
}
stream_->enable(streamactive);
}
}
// change visibility of active surface (show preview of stream when inactive)
if (activesurface_) {
if (active_)
activesurface_->setTextureIndex(Resource::getTextureTransparent());
else
activesurface_->setTextureIndex(stream_->texture());
}
}
} }
void DeviceSource::accept(Visitor& v) void DeviceSource::accept(Visitor& v)
@@ -450,23 +517,7 @@ void DeviceSource::accept(Visitor& v)
bool DeviceSource::failed() const bool DeviceSource::failed() const
{ {
// fail if stream openned and failed return unplugged_ || StreamSource::failed();
if (stream_ && stream_->failed())
return true;
// if there was a unplug event
if (Device::manager().monitor_unplug_event_) {
// check if it was this device that was unplugged
if (!Device::manager().exists(device_)){
// the unplug event is resolved
Device::manager().monitor_unplug_event_ = false;
// this device source fail
return true;
}
}
// no other reason to fail
return false;
} }
DeviceConfigSet Device::getDeviceConfigs(const std::string &src_description) DeviceConfigSet Device::getDeviceConfigs(const std::string &src_description)

View File

@@ -1,6 +1,7 @@
#ifndef DEVICESOURCE_H #ifndef DEVICESOURCE_H
#define DEVICESOURCE_H #define DEVICESOURCE_H
#include <string>
#include <vector> #include <vector>
#include <set> #include <set>
@@ -16,6 +17,7 @@ public:
// Source interface // Source interface
bool failed() const override; bool failed() const override;
void accept (Visitor& v) override; void accept (Visitor& v) override;
void setActive (bool on) override;
// StreamSource interface // StreamSource interface
Stream *stream() const override { return stream_; } Stream *stream() const override { return stream_; }
@@ -23,13 +25,14 @@ public:
// specific interface // specific interface
void setDevice(const std::string &devicename); void setDevice(const std::string &devicename);
inline std::string device() const { return device_; } inline std::string device() const { return device_; }
void unplug() { unplugged_ = true; }
glm::ivec2 icon() const override; glm::ivec2 icon() const override;
std::string info() const override; std::string info() const override;
private: private:
std::string device_; std::string device_;
std::atomic<bool> unplugged_;
}; };
struct DeviceConfig { struct DeviceConfig {
@@ -82,6 +85,19 @@ struct better_device_comparator
typedef std::set<DeviceConfig, better_device_comparator> DeviceConfigSet; typedef std::set<DeviceConfig, better_device_comparator> DeviceConfigSet;
struct DeviceHandle {
std::string name;
std::string pipeline;
DeviceConfigSet configs;
Stream *stream;
std::list<DeviceSource *> connected_sources;
DeviceHandle() {
stream = nullptr;
}
};
class Device class Device
{ {
@@ -108,8 +124,6 @@ public:
int index (const std::string &device); int index (const std::string &device);
bool exists (const std::string &device) ; bool exists (const std::string &device) ;
Source *createSource(const std::string &device) const;
static gboolean callback_device_monitor (GstBus *, GstMessage *, gpointer); static gboolean callback_device_monitor (GstBus *, GstMessage *, gpointer);
static DeviceConfigSet getDeviceConfigs(const std::string &src_description); static DeviceConfigSet getDeviceConfigs(const std::string &src_description);
@@ -121,15 +135,13 @@ private:
void add(GstDevice *device); void add(GstDevice *device);
std::mutex access_; std::mutex access_;
std::vector< std::string > src_name_; std::vector< DeviceHandle > handles_;
std::vector< std::string > src_description_;
std::vector< DeviceConfigSet > src_config_;
GstDeviceMonitor *monitor_; GstDeviceMonitor *monitor_;
std::condition_variable monitor_initialization_; std::condition_variable monitor_initialization_;
bool monitor_initialized_; bool monitor_initialized_;
bool monitor_unplug_event_; bool monitor_unplug_event_;
std::list< DeviceSource * > device_sources_;
}; };

View File

@@ -346,7 +346,8 @@ Source * Mixer::createSourcePattern(uint pattern, glm::ivec2 res)
Source * Mixer::createSourceDevice(const std::string &namedevice) Source * Mixer::createSourceDevice(const std::string &namedevice)
{ {
// ready to create a source // ready to create a source
Source *s = Device::manager().createSource(namedevice); DeviceSource *s = new DeviceSource;
s->setDevice(namedevice);
// propose a new name based on pattern name // propose a new name based on pattern name
s->setName( namedevice.substr(0, namedevice.find(" ")) ); s->setName( namedevice.substr(0, namedevice.find(" ")) );

View File

@@ -153,6 +153,10 @@ SourceList join (const SourceList &first, const SourceList &second)
SourceLink::SourceLink(Source *s): host_(nullptr), target_(nullptr), id_(0)
{
connect(s);
}
void SourceLink::connect(uint64_t id, Session *se) void SourceLink::connect(uint64_t id, Session *se)
{ {

View File

@@ -33,6 +33,7 @@ class SourceLink {
public: public:
SourceLink(): host_(nullptr), target_(nullptr), id_(0) { } SourceLink(): host_(nullptr), target_(nullptr), id_(0) { }
SourceLink(Source *s);
~SourceLink(); ~SourceLink();
void connect(uint64_t id, Session *se); void connect(uint64_t id, Session *se);

View File

@@ -374,7 +374,7 @@ float Stream::aspectRatio() const
void Stream::enable(bool on) void Stream::enable(bool on)
{ {
if ( !opened_ || pipeline_ == nullptr) if ( !opened_ || pipeline_ == nullptr || !textureinitialized_)
return; return;
if ( enabled_ != on ) { if ( enabled_ != on ) {