diff --git a/src/ControlManager.cpp b/src/ControlManager.cpp index cd410f2..0c22cf0 100644 --- a/src/ControlManager.cpp +++ b/src/ControlManager.cpp @@ -38,6 +38,7 @@ #include "TransitionView.h" #include "NetworkToolkit.h" #include "UserInterfaceManager.h" +#include "Streamer.h" #include "ControlManager.h" @@ -119,6 +120,11 @@ void Control::RequestListener::ProcessMessage( const osc::ReceivedMessage& m, Control::manager().sendBatchStatus(remoteEndpoint); } } + // Request stream + else if ( target.compare(OSC_STREAM) == 0 ) + { + Control::manager().receiveStreamAttribute(attribute, m.ArgumentStream(), sender); + } // ALL sources target: apply attribute to all sources of the session else if ( target.compare(OSC_ALL) == 0 ) { @@ -999,6 +1005,68 @@ void Control::receiveMultitouchAttribute(const std::string &attribute, } + +void Control::receiveStreamAttribute(const std::string &attribute, + osc::ReceivedMessageArgumentStream arguments, + const std::string &sender) +{ + try { + if (Streaming::manager().enabled()) { + // Stream request /vimix/stream/request + if ( attribute.compare(OSC_STREAM_REQUEST) == 0 ) { + int port = 0; + const char *label = nullptr; + // read arguments, port is mandatory, label optionnal + arguments >> port; + if (arguments.Eos()) + arguments >> osc::EndMessage; + else + arguments >> label >> osc::EndMessage; + // remove prevous identical stream + Streaming::manager().removeStream(sender, port); + // add the requested stream to manager + Streaming::manager().addStream(sender, port, label == nullptr ? sender : label); + } + // Stream disconnection request /vimix/stream/disconnect + else if ( attribute.compare(OSC_STREAM_DISCONNECT) == 0 ) { + + NetworkToolkit::StreamConfig removed; + + // Port is given: remove stream from that sender at given port + try { + osc::ReceivedMessageArgumentStream arg = arguments; + int port = 0; + arg >> port; + // no exception, remove that port + removed = Streaming::manager().removeStream(sender, port); + // silently ignore any other argument + } + catch (osc::WrongArgumentTypeException &) { + } + // no stream was removed by port + if (!removed.port) { + // try by client name if given: remove all streams with that name + const char *label = nullptr; + arguments >> label >> osc::EndMessage; + // no exception, remove that label + Streaming::manager().removeStreams(label); + } + } + } + } + catch (osc::MissingArgumentException &e) { + Log::Info(CONTROL_OSC_MSG "Missing argument for attribute '%s' for target %s.", attribute.c_str(), OSC_STREAM); + } + catch (osc::ExcessArgumentException &e) { + Log::Info(CONTROL_OSC_MSG "Too many arguments for attribute '%s' for target %s.", attribute.c_str(), OSC_STREAM); + } + catch (osc::WrongArgumentTypeException &e) { + Log::Info(CONTROL_OSC_MSG "Invalid argument for attribute '%s' for target %s.", attribute.c_str(), OSC_STREAM); + } + +} + + void Control::sendSourceAttibutes(const IpEndpointName &remoteEndpoint, std::string target, Source *s) { // default values diff --git a/src/ControlManager.h b/src/ControlManager.h index f3c0efa..5b3ae78 100644 --- a/src/ControlManager.h +++ b/src/ControlManager.h @@ -67,6 +67,7 @@ #define OSC_SESSION_SAVE "/save" #define OSC_SESSION_CLOSE "/close" +#define OSC_STREAM "/peertopeer" #define OSC_MULTITOUCH "/multitouch" #define INPUT_UNDEFINED 0 @@ -155,6 +156,9 @@ protected: void sendBatchStatus(const IpEndpointName& remoteEndpoint); void sendOutputStatus(const IpEndpointName& remoteEndpoint); + void receiveStreamAttribute(const std::string &attribute, + osc::ReceivedMessageArgumentStream arguments, const std::string &sender); + static void keyboardCalback(GLFWwindow*, int, int, int, int); private: diff --git a/src/NetworkToolkit.cpp b/src/NetworkToolkit.cpp index 4614995..6722816 100644 --- a/src/NetworkToolkit.cpp +++ b/src/NetworkToolkit.cpp @@ -97,21 +97,23 @@ const char* NetworkToolkit::stream_protocol_label[NetworkToolkit::DEFAULT] = { const std::vector NetworkToolkit::stream_send_pipeline { "video/x-raw, format=RGB, framerate=30/1 ! queue max-size-buffers=10 ! rtpvrawpay ! application/x-rtp,sampling=RGB ! udpsink name=sink", "video/x-raw, format=NV12, framerate=30/1 ! queue max-size-buffers=10 ! jpegenc ! rtpjpegpay ! udpsink name=sink", - "video/x-raw, format=NV12, framerate=30/1 ! queue max-size-buffers=10 ! x264enc tune=zerolatency pass=4 quantizer=22 speed-preset=2 ! rtph264pay aggregate-mode=1 ! udpsink name=sink", + "video/x-raw, format=NV12, framerate=30/1 ! queue max-size-buffers=10 ! x264enc tune=\"zerolatency\" pass=4 quantizer=22 speed-preset=2 ! h264parse ! rtph264pay aggregate-mode=1 ! udpsink name=sink", "video/x-raw, format=RGB, framerate=30/1 ! queue max-size-buffers=10 ! shmsink buffer-time=100000 wait-for-connection=true name=sink" }; const std::vector NetworkToolkit::stream_receive_pipeline { "udpsrc port=XXXX caps=\"application/x-rtp,media=(string)video,encoding-name=(string)RAW,sampling=(string)RGB,width=(string)WWWW,height=(string)HHHH\" ! rtpvrawdepay ! queue max-size-buffers=10", - "udpsrc port=XXXX caps=\"application/x-rtp,media=(string)video,encoding-name=(string)JPEG\" ! rtpjpegdepay ! decodebin", - "udpsrc port=XXXX caps=\"application/x-rtp,media=(string)video,encoding-name=(string)H264\" ! rtph264depay ! decodebin", + "udpsrc port=XXXX caps=\"application/x-rtp,media=(string)video,encoding-name=(string)JPEG\" ! queue ! rtpjpegdepay ! decodebin", + "udpsrc port=XXXX caps=\"application/x-rtp,media=(string)video,encoding-name=(string)H264\" ! queue ! rtph264depay ! h264parse ! decodebin", "shmsrc socket-path=XXXX ! video/x-raw, format=RGB, framerate=30/1 ! queue max-size-buffers=10", }; const std::vector< std::pair > NetworkToolkit::stream_h264_send_pipeline { // {"vtenc_h264_hw", "video/x-raw, format=I420, framerate=30/1 ! queue max-size-buffers=10 ! vtenc_h264_hw realtime=1 allow-frame-reordering=0 ! rtph264pay aggregate-mode=1 ! udpsink name=sink"}, - {"nvh264enc", "video/x-raw, format=RGBA, framerate=30/1 ! queue max-size-buffers=10 ! nvh264enc rc-mode=cbr-ld-hq zerolatency=true ! video/x-h264, profile=(string)main ! rtph264pay aggregate-mode=1 ! udpsink name=sink"}, - {"vaapih264enc", "video/x-raw, format=NV12, framerate=30/1 ! queue max-size-buffers=10 ! vaapih264enc rate-control=cqp init-qp=26 ! video/x-h264, profile=(string)main ! rtph264pay aggregate-mode=1 ! udpsink name=sink"} + {"nvh264enc", "video/x-raw, format=RGBA, framerate=30/1 ! queue max-size-buffers=10 ! " + "nvh264enc rc-mode=1 zerolatency=true ! video/x-h264, profile=(string)main ! h264parse ! rtph264pay aggregate-mode=1 ! udpsink name=sink"}, + {"vaapih264enc", "video/x-raw, format=NV12, framerate=30/1 ! queue max-size-buffers=10 ! " + "vaapih264enc rate-control=cqp init-qp=26 ! video/x-h264, profile=(string)main ! h264parse ! rtph264pay aggregate-mode=1 ! udpsink name=sink"} }; bool initialized_ = false; diff --git a/src/Streamer.cpp b/src/Streamer.cpp index 47b24ec..54df70b 100644 --- a/src/Streamer.cpp +++ b/src/Streamer.cpp @@ -83,10 +83,10 @@ void Streaming::RequestListener::ProcessMessage( const osc::ReceivedMessage& m, // then enforce local UDP transfer protocol = NetworkToolkit::UDP_RAW; // add stream answering to request - Streaming::manager().addStream(sender, reply_to_port, client_name, protocol); + Streaming::manager()._addStream(sender, reply_to_port, client_name, protocol); } else - Streaming::manager().refuseStream(sender, reply_to_port); + Streaming::manager()._refuseStream(sender, reply_to_port); } else if( std::strcmp( m.AddressPattern(), OSC_PREFIX OSC_STREAM_DISCONNECT) == 0 ){ // receive info on disconnection @@ -254,7 +254,7 @@ void Streaming::removeStream(const VideoStreamer *vs) } } -void Streaming::refuseStream(const std::string &sender, int reply_to) +void Streaming::_refuseStream(const std::string &sender, int reply_to) { // get ip of client std::string sender_ip = sender.substr(0, sender.find_last_of(":")); @@ -273,7 +273,32 @@ void Streaming::refuseStream(const std::string &sender, int reply_to) Log::Warning("A connection request for streaming came in and was refused.\nYou can enable the Sharing on local network from the menu of the Output window."); } -void Streaming::addStream(const std::string &sender, int reply_to, +void Streaming::addStream(const std::string &sender, int port, + const std::string &clientname) +{ + // get ip of client + std::string sender_ip = sender.substr(0, sender.find_last_of(":")); + + // prepare an offer + NetworkToolkit::StreamConfig conf; + conf.client_address = sender_ip; + conf.client_name = clientname; + conf.port = port; + conf.width = FrameGrabbing::manager().width(); + conf.height = FrameGrabbing::manager().height(); + conf.protocol = Settings::application.stream_protocol > 0 ? NetworkToolkit::UDP_H264 : NetworkToolkit::UDP_JPEG; + + // create streamer & remember it + VideoStreamer *streamer = new VideoStreamer(conf); + streamers_lock_.lock(); + streamers_.push_back(streamer); + streamers_lock_.unlock(); + + // start streamer + FrameGrabbing::manager().add(streamer); +} + +void Streaming::_addStream(const std::string &sender, int reply_to, const std::string &clientname, NetworkToolkit::StreamProtocol protocol) { // get ip of client @@ -395,10 +420,12 @@ std::string VideoStreamer::init(GstCaps *caps) std::string path = SystemToolkit::full_filename(SystemToolkit::temp_path(), "shm"); path += std::to_string(config_.port); g_object_set (G_OBJECT (gst_bin_get_by_name (GST_BIN (pipeline_), "sink")), + "sync", FALSE, "socket-path", path.c_str(), NULL); } else { g_object_set (G_OBJECT (gst_bin_get_by_name (GST_BIN (pipeline_), "sink")), + "sync", FALSE, "host", config_.client_address.c_str(), "port", config_.port, NULL); } diff --git a/src/Streamer.h b/src/Streamer.h index cae88ef..3e09914 100644 --- a/src/Streamer.h +++ b/src/Streamer.h @@ -36,6 +36,7 @@ public: void removeStreams(const std::string &clientname); NetworkToolkit::StreamConfig removeStream(const std::string &sender, int port); void removeStream(const VideoStreamer *vs); + void addStream(const std::string &sender, int port, const std::string &clientname); bool busy(); std::vector listStreams(); @@ -48,9 +49,9 @@ protected: virtual void ProcessMessage( const osc::ReceivedMessage& m, const IpEndpointName& remoteEndpoint ); }; - void addStream(const std::string &sender, int reply_to, const std::string &clientname, + void _addStream(const std::string &sender, int reply_to, const std::string &clientname, NetworkToolkit::StreamProtocol protocol = NetworkToolkit::DEFAULT); - void refuseStream(const std::string &sender, int reply_to); + void _refuseStream(const std::string &sender, int reply_to); private: