#include #include #include #include #include #include #include #include #include "SystemToolkit.h" #include "defines.h" #include "Stream.h" #include "Decorations.h" #include "Visitor.h" #include "Log.h" #include "Connection.h" #include "NetworkSource.h" #ifndef NDEBUG #define NETWORK_DEBUG #endif // this is called when receiving an answer for streaming request void StreamerResponseListener::ProcessMessage( const osc::ReceivedMessage& m, const IpEndpointName& remoteEndpoint ) { char sender[IpEndpointName::ADDRESS_AND_PORT_STRING_LENGTH]; remoteEndpoint.AddressAndPortAsString(sender); try{ if( std::strcmp( m.AddressPattern(), OSC_PREFIX OSC_STREAM_OFFER ) == 0 ){ NetworkToolkit::StreamConfig conf; // someone is offering a stream osc::ReceivedMessage::const_iterator arg = m.ArgumentsBegin(); conf.port = (arg++)->AsInt32(); conf.protocol = (NetworkToolkit::Protocol) (arg++)->AsInt32(); conf.width = (arg++)->AsInt32(); conf.height = (arg++)->AsInt32(); // we got the offer from Streaming::manager() parent_->config_ = conf; parent_->connected_ = true; parent_->received_config_ = true; #ifdef NETWORK_DEBUG Log::Info("Received stream info from %s", sender); #endif } else if( std::strcmp( m.AddressPattern(), OSC_PREFIX OSC_STREAM_REJECT ) == 0 ){ parent_->connected_ = false; parent_->received_config_ = true; #ifdef NETWORK_DEBUG Log::Info("Received rejection from %s", sender); #endif } } catch( osc::Exception& e ){ // any parsing errors such as unexpected argument types, or // missing arguments get thrown as exceptions. Log::Info("error while parsing message '%s' from %s : %s", m.AddressPattern(), sender, e.what()); } } NetworkStream::NetworkStream(): Stream(), receiver_(nullptr) { received_config_ = false; connected_ = false; } glm::ivec2 NetworkStream::resolution() const { return glm::ivec2(config_.width, config_.height); } void wait_for_stream_(UdpListeningReceiveSocket *receiver) { receiver->Run(); } void NetworkStream::connect(const std::string &nameconnection) { // start fresh received_config_ = false; connected_ = false; if (receiver_) { delete receiver_; receiver_ = nullptr; close(); } if (nameconnection.compare(Connection::manager().info().name) == 0) { Log::Warning("Cannot create self-referencing Network Source '%s'", nameconnection.c_str()); failed_ = true; return; } // does this Connection exists? int streamer_index = Connection::manager().index(nameconnection); // Nope, cannot connect to unknown connection if (streamer_index < 0) { Log::Warning("Cannot connect to %s: please make sure the program is active.", nameconnection.c_str()); failed_ = true; return; } // prepare listener to receive stream config from remote streaming manager listener_.setParent(this); // find an available port to receive response from remote streaming manager int listener_port_ = -1; for (int trial = 0; receiver_ == nullptr && trial < 10 ; trial++) { try { // invent a port which would be available listener_port_ = 72000 + rand()%1000; // try to create receiver (through exception on fail) receiver_ = new UdpListeningReceiveSocket(IpEndpointName(Connection::manager().info().address.c_str(), listener_port_), &listener_); } catch (const std::runtime_error&) { receiver_ = nullptr; } } if (receiver_ == nullptr) { Log::Notify("Cannot establish connection with %s. Please check your network.", nameconnection.c_str()); failed_ = true; return; } // ok, we can ask to this connected streamer to send us a stream ConnectionInfo streamer = Connection::manager().info(streamer_index); IpEndpointName a( streamer.address.c_str(), streamer.port_stream_request ); streamer_address_ = a.address; streamer_address_ = a.port; // build OSC message char buffer[IP_MTU_SIZE]; osc::OutboundPacketStream p( buffer, IP_MTU_SIZE ); p.Clear(); p << osc::BeginMessage( OSC_PREFIX OSC_STREAM_REQUEST ); // send my listening port to indicate to Connection::manager where to reply p << listener_port_; p << Connection::manager().info().name.c_str(); p << osc::EndMessage; // send OSC message to streamer UdpTransmitSocket socket( streamer_address_ ); socket.Send( p.Data(), p.Size() ); // Now we wait for the offer from the streamer std::thread(wait_for_stream_, receiver_).detach(); #ifdef NETWORK_DEBUG Log::Info("Asking %s:%d for a stream", streamer.address.c_str(), streamer.port_stream_request); Log::Info("Waiting for response at %s:%d", Connection::manager().info().address.c_str(), listener_port_); #endif } void NetworkStream::disconnect() { if (receiver_) { delete receiver_; receiver_ = nullptr; } if (connected_) { // build OSC message to inform disconnection char buffer[IP_MTU_SIZE]; osc::OutboundPacketStream p( buffer, IP_MTU_SIZE ); p.Clear(); p << osc::BeginMessage( OSC_PREFIX OSC_STREAM_DISCONNECT ); p << config_.port; // send my stream port to identify myself to the streamer Connection::manager p << osc::EndMessage; // send OSC message to streamer UdpTransmitSocket socket( streamer_address_ ); socket.Send( p.Data(), p.Size() ); } } bool NetworkStream::connected() const { return connected_ && Stream::isPlaying(); } void NetworkStream::update() { Stream::update(); if ( !ready_ && !failed_ && received_config_) { // only once received_config_ = false; // stop receiving streamer info if (receiver_) receiver_->AsynchronousBreak(); if (connected_) { #ifdef NETWORK_DEBUG Log::Info("Creating Network Stream %d (%d x %d)", config_.port, config_.width, config_.height); #endif // prepare pipeline parameter with port given in config_ std::string parameter = std::to_string(config_.port); // make sure the shared memory socket exists if (config_.protocol == NetworkToolkit::SHM_RAW) { // for shared memory, the parameter is a file location in settings parameter = SystemToolkit::full_filename(SystemToolkit::settings_path(), "shm") + parameter; // try few times to see if file exists and wait 20ms each time for(int trial = 0; trial < 5; trial ++){ if ( SystemToolkit::file_exists(parameter)) break; std::this_thread::sleep_for (std::chrono::milliseconds(20)); } // failed to find the shm socket file: cannot connect if (!SystemToolkit::file_exists(parameter)) { Log::Warning("Cannot connect to shared memory."); failed_ = true; } } // general case : create pipeline and open if (!failed_) { // build the pipeline depending on stream info std::ostringstream pipeline; // get generic pipeline string std::string pipelinestring = NetworkToolkit::protocol_receive_pipeline[config_.protocol]; // find placeholder for PORT int xxxx = pipelinestring.find("XXXX"); // keep beginning of pipeline pipeline << pipelinestring.substr(0, xxxx); // Replace 'XXXX' by info on port config pipeline << parameter; // keep ending of pipeline pipeline << pipelinestring.substr(xxxx + 4); // add a videoconverter pipeline << " ! videoconvert"; // open the pipeline with generic stream class Stream::open(pipeline.str(), config_.width, config_.height); } } else { Log::Info("Connection rejected."); } } } NetworkSource::NetworkSource() : StreamSource() { // create stream stream_ = (Stream *) new NetworkStream; // set icons overlays_[View::MIXING]->attach( new Symbol(Symbol::SHARE, glm::vec3(0.8f, 0.8f, 0.01f)) ); overlays_[View::LAYER]->attach( new Symbol(Symbol::SHARE, glm::vec3(0.8f, 0.8f, 0.01f)) ); } NetworkSource::~NetworkSource() { networkStream()->disconnect(); } NetworkStream *NetworkSource::networkStream() const { return dynamic_cast(stream_); } void NetworkSource::setConnection(const std::string &nameconnection) { connection_name_ = nameconnection; Log::Notify("Creating Network Source '%s'", connection_name_.c_str()); // open network stream networkStream()->connect( connection_name_ ); stream_->play(true); } std::string NetworkSource::connection() const { return connection_name_; } void NetworkSource::accept(Visitor& v) { Source::accept(v); if (!failed()) v.visit(*this); }