From 7246dfa08e49a49febb4893810ae47d75ef33a24 Mon Sep 17 00:00:00 2001 From: brunoherbelin Date: Sat, 24 Oct 2020 23:53:11 +0200 Subject: [PATCH] Work-in progress: connection manager now used in Streamer and NetworkSource to identify possible connections, and exchange streaming configuration. --- Connection.cpp | 25 ++- Connection.h | 14 +- MediaPlayer.cpp | 2 +- Mixer.cpp | 6 +- Mixer.h | 2 +- NetworkSource.cpp | 465 +++++++++++++++++++-------------------------- NetworkSource.h | 123 +++++------- NetworkToolkit.cpp | 4 +- NetworkToolkit.h | 39 +++- Settings.cpp | 20 -- Settings.h | 14 -- Streamer.cpp | 227 +++++++++++++--------- Streamer.h | 37 ++-- 13 files changed, 459 insertions(+), 519 deletions(-) diff --git a/Connection.cpp b/Connection.cpp index efbca7d..d24927a 100644 --- a/Connection.cpp +++ b/Connection.cpp @@ -8,6 +8,8 @@ #include "defines.h" #include "Connection.h" +#include "Settings.h" +#include "Streamer.h" #include "Log.h" #ifndef NDEBUG @@ -31,8 +33,8 @@ bool Connection::init() try { // increment the port to have unique ports connections_[0].port_handshake = HANDSHAKE_PORT + trial; - connections_[0].port_stream_send = STREAM_REQUEST_PORT + trial; - connections_[0].port_stream_receive = STREAM_RESPONSE_PORT + trial; + connections_[0].port_stream_request = STREAM_REQUEST_PORT + trial; + connections_[0].port_osc = OSC_DIALOG_PORT + trial; // try to create listenning socket // through exception runtime if fails @@ -44,7 +46,7 @@ bool Connection::init() // all good trial = MAX_HANDSHAKE; } - catch (const std::runtime_error& e) { + catch (const std::runtime_error&) { // arg, the receiver could not be initialized // because the port was not available receiver_ = nullptr; @@ -59,6 +61,9 @@ bool Connection::init() std::thread(listen).detach(); // regularly check for available streaming hosts std::thread(ask).detach(); + // restore state of Streamer + Streaming::manager().enable( Settings::application.accept_connections ); + } return receiver_ != nullptr; @@ -68,6 +73,9 @@ void Connection::terminate() { if (receiver_!=nullptr) receiver_->AsynchronousBreak(); + + // restore state of Streamer + Streaming::manager().enable( false ); } @@ -174,6 +182,9 @@ void Connection::ask() (*it).alive--; // erase connection if its life score is negative (not responding too many times) if ( it!=Connection::manager().connections_.begin() && (*it).alive < 0 ) { + // inform streamer to cancel streaming to this client + Streaming::manager().removeStreams( (*it).address ); + // remove from list it = Connection::manager().connections_.erase(it); #ifdef CONNECTION_DEBUG Log::Info("A connection was lost"); @@ -213,8 +224,8 @@ void ConnectionRequestListener::ProcessMessage( const osc::ReceivedMessage& m, p << osc::BeginMessage( OSC_PREFIX OSC_PONG ); p << Connection::manager().connections_[0].name.c_str(); p << Connection::manager().connections_[0].port_handshake; - p << Connection::manager().connections_[0].port_stream_send; - p << Connection::manager().connections_[0].port_stream_receive; + p << Connection::manager().connections_[0].port_stream_request; + p << Connection::manager().connections_[0].port_osc; p << osc::EndMessage; // send OSC message to port indicated by remote @@ -234,8 +245,8 @@ void ConnectionRequestListener::ProcessMessage( const osc::ReceivedMessage& m, osc::ReceivedMessage::const_iterator arg = m.ArgumentsBegin(); info.name = std::string( (arg++)->AsString() ); info.port_handshake = (arg++)->AsInt32(); - info.port_stream_send = (arg++)->AsInt32(); - info.port_stream_receive = (arg++)->AsInt32(); + info.port_stream_request = (arg++)->AsInt32(); + info.port_osc = (arg++)->AsInt32(); // do we know this connection ? int i = Connection::manager().index(info); diff --git a/Connection.h b/Connection.h index 26b9faf..2d38209 100644 --- a/Connection.h +++ b/Connection.h @@ -20,17 +20,17 @@ struct ConnectionInfo { std::string address; int port_handshake; - int port_stream_send; - int port_stream_receive; + int port_stream_request; + int port_osc; std::string name; int alive; ConnectionInfo () { address = "127.0.0.1"; port_handshake = HANDSHAKE_PORT; - port_stream_send = STREAM_REQUEST_PORT; - port_stream_receive = STREAM_RESPONSE_PORT; - name = "user@localhost"; + port_stream_request = STREAM_REQUEST_PORT; + port_osc = OSC_DIALOG_PORT; + name = ""; alive = ALIVE; } @@ -39,8 +39,8 @@ struct ConnectionInfo { if (this != &o) { this->address = o.address; this->port_handshake = o.port_handshake; - this->port_stream_send = o.port_stream_send; - this->port_stream_receive = o.port_stream_receive; + this->port_stream_request = o.port_stream_request; + this->port_osc = o.port_osc; this->name = o.name; } return *this; diff --git a/MediaPlayer.cpp b/MediaPlayer.cpp index c8a3df2..18f34b1 100644 --- a/MediaPlayer.cpp +++ b/MediaPlayer.cpp @@ -75,7 +75,7 @@ guint MediaPlayer::texture() const static MediaInfo UriDiscoverer_(std::string uri) { #ifdef MEDIA_PLAYER_DEBUG - Log::Info("Checking '%s'", uri.c_str()); + Log::Info("Checking file '%s'", uri.c_str()); #endif MediaInfo video_stream_info; diff --git a/Mixer.cpp b/Mixer.cpp index 711717f..85a1103 100644 --- a/Mixer.cpp +++ b/Mixer.cpp @@ -308,14 +308,14 @@ Source * Mixer::createSourceDevice(const std::string &namedevice) } -Source * Mixer::createSourceNetwork(const std::string &address) +Source * Mixer::createSourceNetwork(const std::string &nameconnection) { // ready to create a source NetworkSource *s = new NetworkSource; - s->setAddress(address); + s->setConnection(nameconnection); // propose a new name based on address - s->setName(address); + s->setName(nameconnection); return s; } diff --git a/Mixer.h b/Mixer.h index 4585946..ca31b2a 100644 --- a/Mixer.h +++ b/Mixer.h @@ -42,7 +42,7 @@ public: Source * createSourceStream (const std::string &gstreamerpipeline); Source * createSourcePattern(uint pattern, glm::ivec2 res); Source * createSourceDevice (const std::string &namedevice); - Source * createSourceNetwork(const std::string &address); + Source * createSourceNetwork(const std::string &nameconnection); // operations on sources void addSource (Source *s); diff --git a/NetworkSource.cpp b/NetworkSource.cpp index 0677087..f2d6c01 100644 --- a/NetworkSource.cpp +++ b/NetworkSource.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include @@ -14,6 +15,7 @@ #include "Decorations.h" #include "Visitor.h" #include "Log.h" +#include "Connection.h" #include "NetworkSource.h" @@ -21,6 +23,182 @@ #define NETWORK_DEBUG #endif + + +// this is called when receiving an answer for streaming request +void StreamerResponseListener::ProcessMessage( const osc::ReceivedMessage& m, + const IpEndpointName& remoteEndpoint ) +{ + char sender[IpEndpointName::ADDRESS_AND_PORT_STRING_LENGTH]; + remoteEndpoint.AddressAndPortAsString(sender); + + try{ + if( std::strcmp( m.AddressPattern(), OSC_PREFIX OSC_STREAM_OFFER ) == 0 ){ + + NetworkToolkit::StreamConfig conf; + + // someone is offering a stream + osc::ReceivedMessage::const_iterator arg = m.ArgumentsBegin(); + conf.port = (arg++)->AsInt32(); + conf.protocol = (NetworkToolkit::Protocol) (arg++)->AsInt32(); + conf.width = (arg++)->AsInt32(); + conf.height = (arg++)->AsInt32(); + + // we got the offer from Streaming::manager() + parent_->setConfig(conf); +#ifdef NETWORK_DEBUG + Log::Info("Received stream info from %s", sender); +#endif + } + } + catch( osc::Exception& e ){ + // any parsing errors such as unexpected argument types, or + // missing arguments get thrown as exceptions. + Log::Info("error while parsing message '%s' from %s : %s", m.AddressPattern(), sender, e.what()); + } +} + + +NetworkStream::NetworkStream(): Stream(), receiver_(nullptr) +{ +// listener_port_ = 5400; // TODO Find a free port, unique each time + confirmed_ = false; +} + +glm::ivec2 NetworkStream::resolution() const +{ + return glm::ivec2(config_.width, config_.height); +} + +void wait_for_stream_(UdpListeningReceiveSocket *receiver) +{ + receiver->Run(); +} + +void NetworkStream::open(const std::string &nameconnection) +{ + // does this Connection exists? + int streamer_index = Connection::manager().index(nameconnection); + + // Nope, cannot connect to unknown connection + if (streamer_index < 0) { + Log::Notify("Cannot connect to %s: please make sure the program is active.", nameconnection); + failed_ = true; + return; + } + + // prepare listener to receive stream config from remote streaming manager + listener_.setParent(this); + + // find an available port to receive response from remote streaming manager + int listener_port_ = -1; + for (int trial = 0; receiver_ == nullptr && trial < 10 ; trial++) { + try { + // invent a port which would be available + listener_port_ = 72000 + rand()%1000; + // try to create receiver (through exception on fail) + receiver_ = new UdpListeningReceiveSocket(IpEndpointName(Connection::manager().info().address.c_str(), listener_port_), &listener_); + } + catch (const std::runtime_error&) { + receiver_ = nullptr; + } + } + if (receiver_ == nullptr) { + Log::Notify("Cannot open %s.", nameconnection); + failed_ = true; + return; + } + + // ok, we can ask to this connected streamer to send us a stream + ConnectionInfo streamer = Connection::manager().info(streamer_index); + IpEndpointName a( streamer.address.c_str(), streamer.port_stream_request ); + streamer_address_ = a.address; + streamer_address_ = a.port; + + // build OSC message + char buffer[IP_MTU_SIZE]; + osc::OutboundPacketStream p( buffer, IP_MTU_SIZE ); + p.Clear(); + p << osc::BeginMessage( OSC_PREFIX OSC_STREAM_REQUEST ); + // send my listening port to indicate to Connection::manager where to reply + p << listener_port_; + p << osc::EndMessage; + + // send OSC message to streamer + UdpTransmitSocket socket( streamer_address_ ); + socket.Send( p.Data(), p.Size() ); + + // Now we wait for the offer from the streamer + std::thread(wait_for_stream_, receiver_).detach(); + +#ifdef NETWORK_DEBUG + Log::Info("Asking %s:%d for a stream", streamer.address.c_str(), streamer.port_stream_request); + Log::Info("Waiting for response at %s:%d", Connection::manager().info().address.c_str(), listener_port_); +#endif +} + +void NetworkStream::close() +{ + if (receiver_) + delete receiver_; + + // build OSC message to inform disconnection + char buffer[IP_MTU_SIZE]; + osc::OutboundPacketStream p( buffer, IP_MTU_SIZE ); + p.Clear(); + p << osc::BeginMessage( OSC_PREFIX OSC_STREAM_DISCONNECT ); + p << config_.port; // send my stream port to identify myself to the streamer Connection::manager + p << osc::EndMessage; + + // send OSC message to streamer + UdpTransmitSocket socket( streamer_address_ ); + socket.Send( p.Data(), p.Size() ); +} + + +void NetworkStream::setConfig(NetworkToolkit::StreamConfig conf) +{ + config_ = conf; + confirmed_ = true; +} + +void NetworkStream::update() +{ + Stream::update(); + + if ( !ready_ && !failed_ && confirmed_) + { + // stop receiving streamer info + receiver_->AsynchronousBreak(); + +#ifdef NETWORK_DEBUG + Log::Info("Creating Network Stream %d %d x %d", config_.port, config_.width, config_.height); +#endif + // build the pipeline depending on stream info + std::ostringstream pipeline; + if (config_.protocol == NetworkToolkit::TCP_JPEG || config_.protocol == NetworkToolkit::TCP_H264) { + + pipeline << "tcpclientsrc name=src timeout=1 port=" << config_.port; + } + else if (config_.protocol == NetworkToolkit::SHM_RAW) { + + std::string path = SystemToolkit::full_filename(SystemToolkit::settings_path(), "shm"); + path += std::to_string(config_.port); + pipeline << "shmsrc name=src is-live=true socket-path=" << path; + // TODO rename SHM socket "shm_PORT" + } + + pipeline << NetworkToolkit::protocol_receive_pipeline[config_.protocol]; + pipeline << " ! videoconvert"; + + // open the pipeline with generic stream class + Stream::open(pipeline.str(), config_.width, config_.height); + + } + +} + + NetworkSource::NetworkSource() : StreamSource() { // create stream @@ -31,29 +209,31 @@ NetworkSource::NetworkSource() : StreamSource() overlays_[View::LAYER]->attach( new Symbol(Symbol::EMPTY, glm::vec3(0.8f, 0.8f, 0.01f)) ); } -void NetworkSource::setAddress(const std::string &address) + +NetworkSource::~NetworkSource() { - address_ = address; - Log::Notify("Creating Network Source '%s'", address_.c_str()); + networkStream()->close(); +} -// // extract host and port from "host:port" -// std::string host = address.substr(0, address.find_last_of(":")); -// std::string port_s = address.substr(address.find_last_of(":") + 1); -// uint port = std::stoi(port_s); +NetworkStream *NetworkSource::networkStream() const +{ + return dynamic_cast(stream_); +} -// // validate protocol -// if (protocol < NetworkToolkit::TCP_JPEG || protocol >= NetworkToolkit::DEFAULT) -// protocol = NetworkToolkit::TCP_JPEG; +void NetworkSource::setConnection(const std::string &nameconnection) +{ + connection_name_ = nameconnection; + Log::Notify("Creating Network Source '%s'", connection_name_.c_str()); -// // open network stream -// networkstream()->open( protocol, host, port ); -// stream_->play(true); + // open network stream + networkStream()->open( connection_name_ ); + stream_->play(true); } -std::string NetworkSource::address() const +std::string NetworkSource::connection() const { - return address_; + return connection_name_; } @@ -65,259 +245,4 @@ void NetworkSource::accept(Visitor& v) } -void NetworkHosts::listen() -{ - Log::Info("Accepting Streaming responses on port %d", STREAM_RESPONSE_PORT); - NetworkHosts::manager().receiver_->Run(); - Log::Info("Refusing Streaming responses"); -} -void NetworkHosts::ask() -{ - Log::Info("Broadcasting Streaming requests on port %d", STREAM_REQUEST_PORT); - - IpEndpointName host( "255.255.255.255", STREAM_REQUEST_PORT ); - UdpTransmitSocket socket( host ); - socket.SetEnableBroadcast(true); - - char buffer[IP_MTU_SIZE]; - osc::OutboundPacketStream p( buffer, IP_MTU_SIZE ); - p.Clear(); - p << osc::BeginMessage( OSC_PREFIX OSC_STREAM_REQUEST ) << osc::EndMessage; - - while(true) - { - socket.Send( p.Data(), p.Size() ); - - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } - -} - -// this is called when receiving an answer for streaming request -void HostsResponsesListener::ProcessMessage( const osc::ReceivedMessage& m, - const IpEndpointName& remoteEndpoint ) -{ - char sender[IpEndpointName::ADDRESS_AND_PORT_STRING_LENGTH]; - remoteEndpoint.AddressAndPortAsString(sender); - - std::string se(sender); - std::string senderip = se.substr(0, se.find_last_of(":")); - - try{ - if( std::strcmp( m.AddressPattern(), OSC_PREFIX OSC_STREAM_OFFER ) == 0 ){ - - // someone is offering a stream - osc::ReceivedMessage::const_iterator arg = m.ArgumentsBegin(); - const char *address = (arg++)->AsString(); - int p = (arg++)->AsInt32(); - int w = (arg++)->AsInt32(); - int h = (arg++)->AsInt32(); - - Log::Info("Able to receive %s %d x %d stream from %s ", address, w, h, senderip.c_str()); - NetworkHosts::manager().addHost(address, p, w, h); - - } - else if( std::strcmp( m.AddressPattern(), OSC_PREFIX OSC_STREAM_RETRACT ) == 0 ){ - // someone is retracting a stream - osc::ReceivedMessage::const_iterator arg = m.ArgumentsBegin(); - const char *address = (arg++)->AsString(); - NetworkHosts::manager().removeHost(address); - } - } - catch( osc::Exception& e ){ - // any parsing errors such as unexpected argument types, or - // missing arguments get thrown as exceptions. - Log::Info("error while parsing message '%s' from %s : %s", m.AddressPattern(), sender, e.what()); - } -} - - - -NetworkHosts::NetworkHosts() -{ -// // listen for answers -// receiver_ = new UdpListeningReceiveSocket(IpEndpointName( IpEndpointName::ANY_ADDRESS, STREAM_RESPONSE_PORT ), &listener_ ); -// std::thread(listen).detach(); - -// // regularly check for available streaming hosts -// std::thread(ask).detach(); -} - - -void NetworkHosts::addHost(const std::string &address, int protocol, int width, int height) -{ - // add only new - if (disconnected(address)) { - src_address_.push_back(address); - - // TODO : fill description - src_description_.push_back(address); - - list_uptodate_ = false; - } - -} -void NetworkHosts::removeHost(const std::string &address) -{ - // remove existing only - if (connected(address)) { - std::vector< std::string >::iterator addrit = src_address_.begin(); - std::vector< std::string >::iterator descit = src_description_.begin(); - while (addrit != src_address_.end()){ - - if ( (*addrit).compare(address) == 0 ) - { - src_address_.erase(addrit); - src_description_.erase(descit); - break; - } - - addrit++; - descit++; - } - list_uptodate_ = false; - } -} - -int NetworkHosts::numHosts() const -{ - return src_address_.size(); -} - -std::string NetworkHosts::name(int index) const -{ - if (index > -1 && index < (int) src_address_.size()) - return src_address_[index]; - else - return ""; -} - -std::string NetworkHosts::description(int index) const -{ - if (index > -1 && index < (int) src_description_.size()) - return src_description_[index]; - else - return ""; -} - -int NetworkHosts::index(const std::string &address) const -{ - int i = -1; - std::vector< std::string >::const_iterator p = std::find(src_address_.begin(), src_address_.end(), address); - if (p != src_address_.end()) - i = std::distance(src_address_.begin(), p); - - return i; -} - -bool NetworkHosts::connected(const std::string &address) const -{ - std::vector< std::string >::const_iterator d = std::find(src_address_.begin(), src_address_.end(), address); - return d != src_address_.end(); -} - -bool NetworkHosts::disconnected(const std::string &address) const -{ - if (list_uptodate_) - return false; - return !connected(address); -} - -struct hasAddress: public std::unary_function -{ - inline bool operator()(const NetworkSource* elem) const { - return (elem && elem->address() == _a); - } - hasAddress(std::string a) : _a(a) { } -private: - std::string _a; -}; - -Source *NetworkHosts::createSource(const std::string &address) const -{ - Source *s = nullptr; - - // find if a DeviceSource with this device is already registered - std::list< NetworkSource *>::const_iterator d = std::find_if(network_sources_.begin(), network_sources_.end(), hasAddress(address)); - - // if already registered, clone the device source - if ( d != network_sources_.end()) { - CloneSource *cs = (*d)->clone(); - s = cs; - } - // otherwise, we are free to create a new Network source - else { - NetworkSource *ns = new NetworkSource(); - ns->setAddress(address); - s = ns; - } - - return s; -} - - - - - - - - - - - - - - - - - - - -NetworkStream::NetworkStream(): Stream(), protocol_(NetworkToolkit::DEFAULT), host_("127.0.0.1"), port_(5000) -{ - -} - -glm::ivec2 NetworkStream::resolution() -{ - return glm::ivec2( width_, height_); -} - - -void NetworkStream::open(NetworkToolkit::Protocol protocol, const std::string &host, uint port ) -{ - protocol_ = protocol; - host_ = host; - port_ = port; - - int w = 1920; - int h = 1080; - - std::ostringstream pipeline; - if (protocol_ == NetworkToolkit::TCP_JPEG || protocol_ == NetworkToolkit::TCP_H264) { - - pipeline << "tcpclientsrc name=src timeout=1 host=" << host_ << " port=" << port_; - } - else if (protocol_ == NetworkToolkit::SHM_RAW) { - - std::string path = SystemToolkit::full_filename(SystemToolkit::settings_path(), "shm_socket"); - pipeline << "shmsrc name=src is-live=true socket-path=" << path; - // TODO SUPPORT multiple sockets shared memory - } - - pipeline << NetworkToolkit::protocol_receive_pipeline[protocol_]; - pipeline << " ! videoconvert"; - -// if ( ping(&w, &h) ) - // (private) open stream - Stream::open(pipeline.str(), w, h); -// else { -// Log::Notify("Failed to connect to %s:%d", host.c_str(), port); -// failed_ = true; -// } - - - - -} diff --git a/NetworkSource.h b/NetworkSource.h index 6833fd6..64be95b 100644 --- a/NetworkSource.h +++ b/NetworkSource.h @@ -9,81 +9,16 @@ #include "NetworkToolkit.h" #include "StreamSource.h" -class NetworkSource : public StreamSource +class NetworkStream; + +class StreamerResponseListener : public osc::OscPacketListener { - std::string address_; - -public: - NetworkSource(); - - // Source interface - void accept (Visitor& v) override; - - // StreamSource interface - Stream *stream() const override { return stream_; } - - // specific interface - void setAddress(const std::string &address); - std::string address() const; - - glm::ivec2 icon() const override { return glm::ivec2(11, 8); } - -}; - - -class HostsResponsesListener : public osc::OscPacketListener { - protected: - + class NetworkStream *parent_; virtual void ProcessMessage( const osc::ReceivedMessage& m, const IpEndpointName& remoteEndpoint ); -}; - - -class NetworkHosts -{ - friend class NetworkSource; - friend class HostsResponsesListener; - - NetworkHosts(); - NetworkHosts(NetworkHosts const& copy); // Not Implemented - NetworkHosts& operator=(NetworkHosts const& copy); // Not Implemented - - public: - - static NetworkHosts& manager() - { - // The only instance - static NetworkHosts _instance; - return _instance; - } - - int numHosts () const; - std::string name (int index) const; - std::string description (int index) const; - - int index (const std::string &address) const; - bool connected (const std::string &address) const; - bool disconnected (const std::string &address) const; - - Source *createSource(const std::string &address) const; - -private: - - std::vector< std::string > src_address_; - std::vector< std::string > src_description_; - bool list_uptodate_; - - std::list< NetworkSource * > network_sources_; - - static void ask(); - static void listen(); - HostsResponsesListener listener_; - UdpListeningReceiveSocket *receiver_; - void addHost(const std::string &address, int protocol, int width, int height); - void removeHost(const std::string &address); - + inline void setParent(NetworkStream *s) { parent_ = s; } }; @@ -92,20 +27,52 @@ class NetworkStream : public Stream public: NetworkStream(); - void open(NetworkToolkit::Protocol protocol, const std::string &host, uint port ); + void open(const std::string &nameconnection); + void close(); - glm::ivec2 resolution(); + void setConfig(NetworkToolkit::StreamConfig conf); + void update() override; - inline NetworkToolkit::Protocol protocol() const { return protocol_; } - inline std::string host() const { return host_; } - inline uint port() const { return port_; } + glm::ivec2 resolution() const; + inline NetworkToolkit::Protocol protocol() const { return config_.protocol; } + inline std::string IP() const { return config_.client_address; } + inline uint port() const { return config_.port; } private: - NetworkToolkit::Protocol protocol_; - std::string host_; - uint port_; + // connection information + IpEndpointName streamer_address_; +// int listener_port_; + StreamerResponseListener listener_; + UdpListeningReceiveSocket *receiver_; + std::atomic confirmed_; + + NetworkToolkit::StreamConfig config_; +}; + + +class NetworkSource : public StreamSource +{ + std::string connection_name_; + +public: + NetworkSource(); + ~NetworkSource(); + + // Source interface + void accept (Visitor& v) override; + + // StreamSource interface + Stream *stream() const override { return stream_; } + NetworkStream *networkStream() const; + + // specific interface + void setConnection(const std::string &nameconnection); + std::string connection() const; + + glm::ivec2 icon() const override { return glm::ivec2(11, 8); } }; + #endif // NETWORKSOURCE_H diff --git a/NetworkToolkit.cpp b/NetworkToolkit.cpp index cc0b6a5..59f896b 100644 --- a/NetworkToolkit.cpp +++ b/NetworkToolkit.cpp @@ -60,8 +60,8 @@ const std::vector NetworkToolkit::protocol_receive_pipeline { * SND * gst-launch-1.0 videotestsrc is-live=true ! jpegenc ! rtpjpegpay ! multiudpsink clients="127.0.0.1:5000,127.0.0.1:5001" * RCV - * gst-launch-1.0 -v udpsrc address=127.0.0.1 port=5000 ! application/x-rtp,encoding-name=JPEG,payload=26 ! rtpjpegdepay ! jpegdec ! autovideosink - * gst-launch-1.0 -v udpsrc address=127.0.0.1 port=5001 ! application/x-rtp,encoding-name=JPEG,payload=26 ! rtpjpegdepay ! jpegdec ! autovideosink + * gst-launch-1.0 -v udpsrc port=5000 ! application/x-rtp,encoding-name=JPEG,payload=26 ! rtpjpegdepay ! jpegdec ! autovideosink + * gst-launch-1.0 -v udpsrc port=5001 ! application/x-rtp,encoding-name=JPEG,payload=26 ! rtpjpegdepay ! jpegdec ! autovideosink * * RAW UDP (caps has to match exactly, and depends on resolution) * SND diff --git a/NetworkToolkit.h b/NetworkToolkit.h index f48036f..096ba45 100644 --- a/NetworkToolkit.h +++ b/NetworkToolkit.h @@ -9,15 +9,13 @@ #define OSC_PONG "/pong" #define OSC_STREAM_REQUEST "/request" #define OSC_STREAM_OFFER "/offer" -#define OSC_STREAM_RETRACT "/retract" -#define OSC_STREAM_CONNECT "/connect" #define OSC_STREAM_DISCONNECT "/disconnect" -#define MAX_HANDSHAKE 10 +#define MAX_HANDSHAKE 20 #define HANDSHAKE_PORT 71310 #define STREAM_REQUEST_PORT 71510 -#define STREAM_RESPONSE_PORT 71610 +#define OSC_DIALOG_PORT 71010 #define IP_MTU_SIZE 1536 namespace NetworkToolkit @@ -30,16 +28,45 @@ typedef enum { DEFAULT } Protocol; + +struct StreamConfig { + + std::string client_address; + int port; + Protocol protocol; + int width; + int height; + + StreamConfig () { + client_address = "127.0.0.1"; + port = 0; + protocol = DEFAULT; + width = 0; + height = 0; + } + + inline StreamConfig& operator = (const StreamConfig& o) + { + if (this != &o) { + this->client_address = o.client_address; + this->port = o.port; + this->protocol = o.protocol; + this->width = o.width; + this->height = o.height; + } + return *this; + } +}; + extern const char* protocol_name[DEFAULT]; extern const std::vector protocol_send_pipeline; extern const std::vector protocol_receive_pipeline; +std::string hostname(); std::vector host_ips(); bool is_host_ip(const std::string &ip); std::string closest_host_ip(const std::string &ip); -std::string hostname(); - } #endif // NETWORKTOOLKIT_H diff --git a/Settings.cpp b/Settings.cpp index 01443d7..e68f760 100644 --- a/Settings.cpp +++ b/Settings.cpp @@ -90,13 +90,6 @@ void Settings::Save() RecordNode->SetAttribute("timeout", application.record.timeout); pRoot->InsertEndChild(RecordNode); -// // Record -// XMLElement *StreamNode = xmlDoc.NewElement( "Stream" ); -// StreamNode->SetAttribute("profile", application.stream.profile); -// StreamNode->SetAttribute("ip", application.stream.ip.c_str()); -// StreamNode->SetAttribute("port", application.stream.port); -// pRoot->InsertEndChild(StreamNode); - // Transition XMLElement *TransitionNode = xmlDoc.NewElement( "Transition" ); TransitionNode->SetAttribute("auto_open", application.transition.auto_open); @@ -262,19 +255,6 @@ void Settings::Load() application.record.path = SystemToolkit::home_path(); } -// // Stream -// XMLElement * streamnode = pRoot->FirstChildElement("Stream"); -// if (streamnode != nullptr) { -// streamnode->QueryIntAttribute("profile", &application.stream.profile); -// streamnode->QueryIntAttribute("port", &application.stream.port); - -// const char *ip_ = streamnode->Attribute("ip"); -// if (ip_) -// application.stream.ip = std::string(ip_); -// else -// application.stream.ip = "localhost"; -// } - // Source XMLElement * sourceconfnode = pRoot->FirstChildElement("Source"); if (sourceconfnode != nullptr) { diff --git a/Settings.h b/Settings.h index 8cbc28e..a663ffd 100644 --- a/Settings.h +++ b/Settings.h @@ -75,19 +75,6 @@ struct RecordConfig }; -//struct StreamingConfig -//{ -// std::string ip; -// int port; -// int profile; - -// StreamingConfig() : ip("localhost") { -// profile = 0; -// port = 5400; -// } - -//}; - struct History { std::string path; @@ -202,7 +189,6 @@ struct Application // settings exporters RecordConfig record; -// StreamingConfig stream; // settings new source SourceConfig source; diff --git a/Streamer.cpp b/Streamer.cpp index 43a788a..025a053 100644 --- a/Streamer.cpp +++ b/Streamer.cpp @@ -22,12 +22,17 @@ #include "FrameBuffer.h" #include "Log.h" +#include "Connection.h" #include "NetworkToolkit.h" #include "Streamer.h" #include #include +#ifndef NDEBUG +#define STREAMER_DEBUG +#endif + // this is called when a U void StreamingRequestListener::ProcessMessage( const osc::ReceivedMessage& m, const IpEndpointName& remoteEndpoint ) @@ -39,19 +44,23 @@ void StreamingRequestListener::ProcessMessage( const osc::ReceivedMessage& m, if( std::strcmp( m.AddressPattern(), OSC_PREFIX OSC_STREAM_REQUEST) == 0 ){ - Log::Info("%s wants to know if i can stream", sender); - Streaming::manager().makeOffer(sender); - - } - else if( std::strcmp( m.AddressPattern(), OSC_PREFIX OSC_STREAM_CONNECT) == 0 ){ - - Log::Info("%s wants me stream", sender); + osc::ReceivedMessage::const_iterator arg = m.ArgumentsBegin(); + int reply_to_port = (arg++)->AsInt32(); + Streaming::manager().addStream(sender, reply_to_port); +#ifdef STREAMER_DEBUG + Log::Info("%s wants a stream.", sender); +#endif } else if( std::strcmp( m.AddressPattern(), OSC_PREFIX OSC_STREAM_DISCONNECT) == 0 ){ - Log::Info("%s ended reception", sender); + osc::ReceivedMessage::const_iterator arg = m.ArgumentsBegin(); + int port = (arg++)->AsInt32(); + Streaming::manager().removeStream(sender, port); +#ifdef STREAMER_DEBUG + Log::Info("%s does not need streaming anymore.", sender); +#endif } } catch( osc::Exception& e ){ @@ -64,33 +73,32 @@ void StreamingRequestListener::ProcessMessage( const osc::ReceivedMessage& m, Streaming::Streaming() : session_(nullptr), width_(0), height_(0) { -// receiver_ = new UdpListeningReceiveSocket(IpEndpointName( IpEndpointName::ANY_ADDRESS, STREAM_REQUEST_PORT ), &listener_ ); -//// receiver_ = new UdpListeningReceiveSocket(IpEndpointName( IpEndpointName::ANY_ADDRESS, STREAM_REQUEST_PORT ), &listener_ ); - - + int port = Connection::manager().info().port_stream_request; + receiver_ = new UdpListeningReceiveSocket(IpEndpointName( IpEndpointName::ANY_ADDRESS, port ), &listener_ ); } -void Streaming::listen() +void wait_for_request_(UdpListeningReceiveSocket *receiver) { - Log::Info("Accepting Streaming requests on port %d", STREAM_REQUEST_PORT); - Streaming::manager().receiver_->Run(); - Log::Info("Refusing Streaming requests"); + receiver->Run(); } void Streaming::enable(bool on) { -// if (on){ -// std::thread(listen).detach(); -// } -// else { -// // end streaming requests -// receiver_->AsynchronousBreak(); -// // end all streaming TODO -// } + if (on) { + std::thread(wait_for_request_, receiver_).detach(); + Log::Info("Accepting stream requests to %s.", Connection::manager().info().name.c_str()); + } + else { + // end streaming requests + receiver_->AsynchronousBreak(); + // end all streaming + for (auto sit = streamers_.begin(); sit != streamers_.end(); sit=streamers_.erase(sit)) + (*sit)->stop(); + Log::Info("Refusing stream requests to %s. No streaming ongoing.", Connection::manager().info().name.c_str()); + } } - void Streaming::setSession(Session *se) { if (se != nullptr && session_ != se) { @@ -106,73 +114,110 @@ void Streaming::setSession(Session *se) } } -void Streaming::makeOffer(const std::string &client) +void Streaming::removeStream(const std::string &sender, int port) { - // do not reply if no session can be streamed (should not happen) - if (session_ == nullptr) - return; + // get ip of sender + std::string sender_ip = sender.substr(0, sender.find_last_of(":")); + // parse the list for a streamers matching IP and port + std::vector::const_iterator sit = streamers_.begin(); + for (; sit != streamers_.end(); sit++){ + NetworkToolkit::StreamConfig config = (*sit)->config_; + if (config.client_address.compare(sender_ip) == 0 && config.port == port ) { +#ifdef STREAMER_DEBUG + Log::Info("Ending streaming to %s:%d", config.client_address.c_str(), config.port); +#endif + // match: stop this streamer + (*sit)->stop(); + // remove from list + streamers_.erase(sit); + break; + } + } + +} + +void Streaming::removeStreams(const std::string &ip) +{ + // remove all streamers matching given IP + std::vector::const_iterator sit = streamers_.begin(); + while ( sit != streamers_.end() ){ + NetworkToolkit::StreamConfig config = (*sit)->config_; + if (config.client_address.compare(ip) == 0) { +#ifdef STREAMER_DEBUG + Log::Info("Ending streaming to %s:%d", config.client_address.c_str(), config.port); +#endif + // match: stop this streamer + (*sit)->stop(); + // remove from list + sit = streamers_.erase(sit); + } + else + sit++; + } +} + + +void Streaming::addStream(const std::string &sender, int reply_to) +{ // get ip of client - std::string clientip = client.substr(0, client.find_last_of(":")); - - // get port used to request - std::string clientport_s = client.substr(client.find_last_of(":") + 1); + std::string sender_ip = sender.substr(0, sender.find_last_of(":")); + // get port used to send the request + std::string sender_port = sender.substr(sender.find_last_of(":") + 1); // prepare to reply to client - IpEndpointName host( clientip.c_str(), STREAM_RESPONSE_PORT ); + IpEndpointName host( sender_ip.c_str(), reply_to ); UdpTransmitSocket socket( host ); // prepare an offer - offer o; - o.client = clientip; - o.width = width_; - o.height = height_; + NetworkToolkit::StreamConfig conf; + conf.client_address = sender_ip; + conf.port = std::stoi(sender_port); // this port seems free, so re-use it! + conf.width = width_; + conf.height = height_; // offer SHM if same IP that our host IP (i.e. on the same machine) - if( NetworkToolkit::is_host_ip(clientip) ){ - // this port seems free, so re-use it! - o.address = "localhost:" + clientport_s; - o.protocol = NetworkToolkit::SHM_RAW; + if( NetworkToolkit::is_host_ip(conf.client_address) ) { + conf.protocol = NetworkToolkit::SHM_RAW; } // any other IP : offer UDP streaming else { - o.address = NetworkToolkit::closest_host_ip(clientip) + ":" + clientport_s; - o.protocol = NetworkToolkit::TCP_JPEG; + conf.protocol = NetworkToolkit::TCP_JPEG; } - // build message + // build OSC message char buffer[IP_MTU_SIZE]; osc::OutboundPacketStream p( buffer, IP_MTU_SIZE ); p.Clear(); p << osc::BeginMessage( OSC_PREFIX OSC_STREAM_OFFER ); - p << o.address.c_str(); - p << (int) o.protocol; - p << o.width << o.height; + p << conf.port; + p << (int) conf.protocol; + p << conf.width << conf.height; p << osc::EndMessage; - // send OSC message + // send OSC message to client socket.Send( p.Data(), p.Size() ); + +#ifdef STREAMER_DEBUG + Log::Info("Accepting stream request from %s:%d", sender_ip.c_str(), reply_to); +#endif + + // create streamer & remember it + VideoStreamer *streamer = new VideoStreamer(conf); + streamers_.push_back(streamer); + + // start streamer + session_->addFrameGrabber(streamer); } -void Streaming::addStream(const std::string &address) -{ -} -void Streaming::removeStream(const std::string &address) -{ - -} - -VideoStreamer::VideoStreamer(Streaming::offer config): FrameGrabber(), frame_buffer_(nullptr), width_(0), height_(0), +VideoStreamer::VideoStreamer(NetworkToolkit::StreamConfig conf): FrameGrabber(), frame_buffer_(nullptr), width_(0), height_(0), streaming_(false), accept_buffer_(false), pipeline_(nullptr), src_(nullptr), timestamp_(0) { // configure fix parameter frame_duration_ = gst_util_uint64_scale_int (1, GST_SECOND, 30); // 30 FPS timeframe_ = 2 * frame_duration_; - protocol_ = config.protocol; - address_ = config.address; - width_ = config.width; - height_ = config.height; + config_ = conf; } VideoStreamer::~VideoStreamer() @@ -203,11 +248,19 @@ void VideoStreamer::addFrame (FrameBuffer *frame_buffer, float dt) // set frame buffer as input frame_buffer_ = frame_buffer; - // define stream properties + // define frames properties width_ = frame_buffer_->width(); height_ = frame_buffer_->height(); size_ = width_ * height_ * (frame_buffer_->use_alpha() ? 4 : 3); + // if an incompatilble frame buffer given: cancel streaming + if ( config_.width != width_ || config_.height != height_) { + Log::Warning("Streaming cannot start: given frames (%d x %d) incompatible with stream (%d x %d)", + width_, height_, config_.width, config_.height); + finished_ = true; + return; + } + // create PBOs glGenBuffers(2, pbo_); glBindBuffer(GL_PIXEL_PACK_BUFFER, pbo_[1]); @@ -218,9 +271,9 @@ void VideoStreamer::addFrame (FrameBuffer *frame_buffer, float dt) // create a gstreamer pipeline std::string description = "appsrc name=src ! videoconvert ! "; - if (protocol_ < 0 || protocol_ >= NetworkToolkit::DEFAULT) - protocol_ = NetworkToolkit::TCP_JPEG; - description += NetworkToolkit::protocol_send_pipeline[protocol_]; + if (config_.protocol < 0 || config_.protocol >= NetworkToolkit::DEFAULT) + config_.protocol = NetworkToolkit::TCP_JPEG; + description += NetworkToolkit::protocol_send_pipeline[config_.protocol]; // parse pipeline descriptor GError *error = NULL; @@ -233,18 +286,15 @@ void VideoStreamer::addFrame (FrameBuffer *frame_buffer, float dt) } // setup streaming sink - if (protocol_ == NetworkToolkit::TCP_JPEG || protocol_ == NetworkToolkit::TCP_H264) { - // extract host and port from "host:port" - std::string ip = address_.substr(0, address_.find_last_of(":")); - std::string port_s = address_.substr(address_.find_last_of(":") + 1); - uint port = std::stoi(port_s); + if (config_.protocol == NetworkToolkit::TCP_JPEG || config_.protocol == NetworkToolkit::TCP_H264) { g_object_set (G_OBJECT (gst_bin_get_by_name (GST_BIN (pipeline_), "sink")), - "host", ip.c_str(), - "port", port, NULL); + "host", config_.client_address.c_str(), + "port", config_.port, NULL); } - else if (protocol_ == NetworkToolkit::SHM_RAW) { - std::string path = SystemToolkit::full_filename(SystemToolkit::settings_path(), "shm_socket"); - SystemToolkit::remove_file(path); + else if (config_.protocol == NetworkToolkit::SHM_RAW) { + // TODO rename SHM socket "shm_PORT" + std::string path = SystemToolkit::full_filename(SystemToolkit::settings_path(), "shm"); + path += std::to_string(config_.port); g_object_set (G_OBJECT (gst_bin_get_by_name (GST_BIN (pipeline_), "sink")), "socket-path", path.c_str(), NULL); } @@ -257,7 +307,6 @@ void VideoStreamer::addFrame (FrameBuffer *frame_buffer, float dt) "stream-type", GST_APP_STREAM_TYPE_STREAM, "is-live", TRUE, "format", GST_FORMAT_TIME, - // "do-timestamp", TRUE, NULL); // Direct encoding (no buffering) @@ -296,9 +345,7 @@ void VideoStreamer::addFrame (FrameBuffer *frame_buffer, float dt) } // all good - Log::Info("VideoStreamer start (%s %d x %d)", NetworkToolkit::protocol_name[protocol_], width_, height_); - -Log::Info("%s", description.c_str()); + Log::Info("VideoStreamer start (%s %d x %d)", config_.client_address.c_str(), width_, height_); // start streaming !! streaming_ = true; @@ -393,15 +440,14 @@ Log::Info("%s", description.c_str()); } } - // did the recording terminate with sink receiving end-of-stream ? + // did the streaming terminate with sink receiving end-of-stream ? else { // Wait for EOS message GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline_)); - GstMessage *msg = gst_bus_poll(bus, GST_MESSAGE_EOS, GST_TIME_AS_USECONDS(1)); + GstMessage *msg = gst_bus_poll(bus, GST_MESSAGE_EOS, GST_TIME_AS_USECONDS(4)); if (msg) { -// Log::Info("received EOS"); // stop the pipeline GstStateChangeReturn ret = gst_element_set_state (pipeline_, GST_STATE_NULL); if (ret == GST_STATE_CHANGE_FAILURE) @@ -413,8 +459,10 @@ Log::Info("%s", description.c_str()); } // make sure the shared memory socket is deleted - if (protocol_ == NetworkToolkit::SHM_RAW) { - std::string path = SystemToolkit::full_filename(SystemToolkit::settings_path(), "shm_socket"); + if (config_.protocol == NetworkToolkit::SHM_RAW) { + // TODO rename SHM socket "shm_PORT" + std::string path = SystemToolkit::full_filename(SystemToolkit::settings_path(), "shm"); + path += std::to_string(config_.port); SystemToolkit::remove_file(path); } } @@ -424,12 +472,13 @@ Log::Info("%s", description.c_str()); void VideoStreamer::stop () { + // stop recording + streaming_ = false; + // send end of stream if (src_) gst_app_src_end_of_stream (src_); - // stop recording - streaming_ = false; } std::string VideoStreamer::info() @@ -437,13 +486,13 @@ std::string VideoStreamer::info() std::string ret = "Streaming terminated."; if (streaming_) { - if (protocol_ == NetworkToolkit::TCP_JPEG || protocol_ == NetworkToolkit::TCP_H264) { + if (config_.protocol == NetworkToolkit::TCP_JPEG || config_.protocol == NetworkToolkit::TCP_H264) { ret = "TCP"; } - else if (protocol_ == NetworkToolkit::SHM_RAW) { + else if (config_.protocol == NetworkToolkit::SHM_RAW) { ret = "Shared Memory"; } diff --git a/Streamer.h b/Streamer.h index dc58c69..a394433 100644 --- a/Streamer.h +++ b/Streamer.h @@ -12,6 +12,7 @@ #include "FrameGrabber.h" class Session; +class VideoStreamer; class StreamingRequestListener : public osc::OscPacketListener { @@ -22,6 +23,8 @@ protected: class Streaming { + friend class StreamingRequestListener; + // Private Constructor Streaming(); Streaming(Streaming const& copy); // Not Implemented @@ -38,44 +41,36 @@ public: void enable(bool on); void setSession(Session *se); + void removeStreams(const std::string &ip); - typedef struct { - std::string client; - std::string address; - NetworkToolkit::Protocol protocol; - int width; - int height; - } offer; - - void makeOffer(const std::string &client); - void retractOffer(offer o); - - void addStream(const std::string &address); - void removeStream(const std::string &address); +protected: + void addStream(const std::string &sender, int reply_to); + void removeStream(const std::string &sender, int port); private: - std::string hostname_; - static void listen(); StreamingRequestListener listener_; UdpListeningReceiveSocket *receiver_; Session *session_; int width_; - int height_; - std::vector offers_; + int height_; + + // TODO Mutex to protect access to list of streamers + std::vector streamers_; }; class VideoStreamer : public FrameGrabber { + friend class Streaming; + // Frame buffer information FrameBuffer *frame_buffer_; uint width_; uint height_; - // - NetworkToolkit::Protocol protocol_; - std::string address_; + // connection information + NetworkToolkit::StreamConfig config_; // operation std::atomic streaming_; @@ -93,7 +88,7 @@ class VideoStreamer : public FrameGrabber public: - VideoStreamer(Streaming::offer config); + VideoStreamer(NetworkToolkit::StreamConfig conf); ~VideoStreamer(); void addFrame(FrameBuffer *frame_buffer, float dt) override;