OSC Peer to peer request

Added OSC mechanism to request network stream. Improved stability of H264 streaming.
This commit is contained in:
Bruno Herbelin
2023-06-04 16:14:45 +02:00
parent 1f9bff6182
commit 216d9a1686
5 changed files with 113 additions and 11 deletions

View File

@@ -38,6 +38,7 @@
#include "TransitionView.h"
#include "NetworkToolkit.h"
#include "UserInterfaceManager.h"
#include "Streamer.h"
#include "ControlManager.h"
@@ -119,6 +120,11 @@ void Control::RequestListener::ProcessMessage( const osc::ReceivedMessage& m,
Control::manager().sendBatchStatus(remoteEndpoint);
}
}
// Request stream
else if ( target.compare(OSC_STREAM) == 0 )
{
Control::manager().receiveStreamAttribute(attribute, m.ArgumentStream(), sender);
}
// ALL sources target: apply attribute to all sources of the session
else if ( target.compare(OSC_ALL) == 0 )
{
@@ -999,6 +1005,68 @@ void Control::receiveMultitouchAttribute(const std::string &attribute,
}
void Control::receiveStreamAttribute(const std::string &attribute,
osc::ReceivedMessageArgumentStream arguments,
const std::string &sender)
{
try {
if (Streaming::manager().enabled()) {
// Stream request /vimix/stream/request
if ( attribute.compare(OSC_STREAM_REQUEST) == 0 ) {
int port = 0;
const char *label = nullptr;
// read arguments, port is mandatory, label optionnal
arguments >> port;
if (arguments.Eos())
arguments >> osc::EndMessage;
else
arguments >> label >> osc::EndMessage;
// remove prevous identical stream
Streaming::manager().removeStream(sender, port);
// add the requested stream to manager
Streaming::manager().addStream(sender, port, label == nullptr ? sender : label);
}
// Stream disconnection request /vimix/stream/disconnect
else if ( attribute.compare(OSC_STREAM_DISCONNECT) == 0 ) {
NetworkToolkit::StreamConfig removed;
// Port is given: remove stream from that sender at given port
try {
osc::ReceivedMessageArgumentStream arg = arguments;
int port = 0;
arg >> port;
// no exception, remove that port
removed = Streaming::manager().removeStream(sender, port);
// silently ignore any other argument
}
catch (osc::WrongArgumentTypeException &) {
}
// no stream was removed by port
if (!removed.port) {
// try by client name if given: remove all streams with that name
const char *label = nullptr;
arguments >> label >> osc::EndMessage;
// no exception, remove that label
Streaming::manager().removeStreams(label);
}
}
}
}
catch (osc::MissingArgumentException &e) {
Log::Info(CONTROL_OSC_MSG "Missing argument for attribute '%s' for target %s.", attribute.c_str(), OSC_STREAM);
}
catch (osc::ExcessArgumentException &e) {
Log::Info(CONTROL_OSC_MSG "Too many arguments for attribute '%s' for target %s.", attribute.c_str(), OSC_STREAM);
}
catch (osc::WrongArgumentTypeException &e) {
Log::Info(CONTROL_OSC_MSG "Invalid argument for attribute '%s' for target %s.", attribute.c_str(), OSC_STREAM);
}
}
void Control::sendSourceAttibutes(const IpEndpointName &remoteEndpoint, std::string target, Source *s)
{
// default values

View File

@@ -67,6 +67,7 @@
#define OSC_SESSION_SAVE "/save"
#define OSC_SESSION_CLOSE "/close"
#define OSC_STREAM "/peertopeer"
#define OSC_MULTITOUCH "/multitouch"
#define INPUT_UNDEFINED 0
@@ -155,6 +156,9 @@ protected:
void sendBatchStatus(const IpEndpointName& remoteEndpoint);
void sendOutputStatus(const IpEndpointName& remoteEndpoint);
void receiveStreamAttribute(const std::string &attribute,
osc::ReceivedMessageArgumentStream arguments, const std::string &sender);
static void keyboardCalback(GLFWwindow*, int, int, int, int);
private:

View File

@@ -97,21 +97,23 @@ const char* NetworkToolkit::stream_protocol_label[NetworkToolkit::DEFAULT] = {
const std::vector<std::string> NetworkToolkit::stream_send_pipeline {
"video/x-raw, format=RGB, framerate=30/1 ! queue max-size-buffers=10 ! rtpvrawpay ! application/x-rtp,sampling=RGB ! udpsink name=sink",
"video/x-raw, format=NV12, framerate=30/1 ! queue max-size-buffers=10 ! jpegenc ! rtpjpegpay ! udpsink name=sink",
"video/x-raw, format=NV12, framerate=30/1 ! queue max-size-buffers=10 ! x264enc tune=zerolatency pass=4 quantizer=22 speed-preset=2 ! rtph264pay aggregate-mode=1 ! udpsink name=sink",
"video/x-raw, format=NV12, framerate=30/1 ! queue max-size-buffers=10 ! x264enc tune=\"zerolatency\" pass=4 quantizer=22 speed-preset=2 ! h264parse ! rtph264pay aggregate-mode=1 ! udpsink name=sink",
"video/x-raw, format=RGB, framerate=30/1 ! queue max-size-buffers=10 ! shmsink buffer-time=100000 wait-for-connection=true name=sink"
};
const std::vector<std::string> NetworkToolkit::stream_receive_pipeline {
"udpsrc port=XXXX caps=\"application/x-rtp,media=(string)video,encoding-name=(string)RAW,sampling=(string)RGB,width=(string)WWWW,height=(string)HHHH\" ! rtpvrawdepay ! queue max-size-buffers=10",
"udpsrc port=XXXX caps=\"application/x-rtp,media=(string)video,encoding-name=(string)JPEG\" ! rtpjpegdepay ! decodebin",
"udpsrc port=XXXX caps=\"application/x-rtp,media=(string)video,encoding-name=(string)H264\" ! rtph264depay ! decodebin",
"udpsrc port=XXXX caps=\"application/x-rtp,media=(string)video,encoding-name=(string)JPEG\" ! queue ! rtpjpegdepay ! decodebin",
"udpsrc port=XXXX caps=\"application/x-rtp,media=(string)video,encoding-name=(string)H264\" ! queue ! rtph264depay ! h264parse ! decodebin",
"shmsrc socket-path=XXXX ! video/x-raw, format=RGB, framerate=30/1 ! queue max-size-buffers=10",
};
const std::vector< std::pair<std::string, std::string> > NetworkToolkit::stream_h264_send_pipeline {
// {"vtenc_h264_hw", "video/x-raw, format=I420, framerate=30/1 ! queue max-size-buffers=10 ! vtenc_h264_hw realtime=1 allow-frame-reordering=0 ! rtph264pay aggregate-mode=1 ! udpsink name=sink"},
{"nvh264enc", "video/x-raw, format=RGBA, framerate=30/1 ! queue max-size-buffers=10 ! nvh264enc rc-mode=cbr-ld-hq zerolatency=true ! video/x-h264, profile=(string)main ! rtph264pay aggregate-mode=1 ! udpsink name=sink"},
{"vaapih264enc", "video/x-raw, format=NV12, framerate=30/1 ! queue max-size-buffers=10 ! vaapih264enc rate-control=cqp init-qp=26 ! video/x-h264, profile=(string)main ! rtph264pay aggregate-mode=1 ! udpsink name=sink"}
{"nvh264enc", "video/x-raw, format=RGBA, framerate=30/1 ! queue max-size-buffers=10 ! "
"nvh264enc rc-mode=1 zerolatency=true ! video/x-h264, profile=(string)main ! h264parse ! rtph264pay aggregate-mode=1 ! udpsink name=sink"},
{"vaapih264enc", "video/x-raw, format=NV12, framerate=30/1 ! queue max-size-buffers=10 ! "
"vaapih264enc rate-control=cqp init-qp=26 ! video/x-h264, profile=(string)main ! h264parse ! rtph264pay aggregate-mode=1 ! udpsink name=sink"}
};
bool initialized_ = false;

View File

@@ -83,10 +83,10 @@ void Streaming::RequestListener::ProcessMessage( const osc::ReceivedMessage& m,
// then enforce local UDP transfer
protocol = NetworkToolkit::UDP_RAW;
// add stream answering to request
Streaming::manager().addStream(sender, reply_to_port, client_name, protocol);
Streaming::manager()._addStream(sender, reply_to_port, client_name, protocol);
}
else
Streaming::manager().refuseStream(sender, reply_to_port);
Streaming::manager()._refuseStream(sender, reply_to_port);
}
else if( std::strcmp( m.AddressPattern(), OSC_PREFIX OSC_STREAM_DISCONNECT) == 0 ){
// receive info on disconnection
@@ -254,7 +254,7 @@ void Streaming::removeStream(const VideoStreamer *vs)
}
}
void Streaming::refuseStream(const std::string &sender, int reply_to)
void Streaming::_refuseStream(const std::string &sender, int reply_to)
{
// get ip of client
std::string sender_ip = sender.substr(0, sender.find_last_of(":"));
@@ -273,7 +273,32 @@ void Streaming::refuseStream(const std::string &sender, int reply_to)
Log::Warning("A connection request for streaming came in and was refused.\nYou can enable the Sharing on local network from the menu of the Output window.");
}
void Streaming::addStream(const std::string &sender, int reply_to,
void Streaming::addStream(const std::string &sender, int port,
const std::string &clientname)
{
// get ip of client
std::string sender_ip = sender.substr(0, sender.find_last_of(":"));
// prepare an offer
NetworkToolkit::StreamConfig conf;
conf.client_address = sender_ip;
conf.client_name = clientname;
conf.port = port;
conf.width = FrameGrabbing::manager().width();
conf.height = FrameGrabbing::manager().height();
conf.protocol = Settings::application.stream_protocol > 0 ? NetworkToolkit::UDP_H264 : NetworkToolkit::UDP_JPEG;
// create streamer & remember it
VideoStreamer *streamer = new VideoStreamer(conf);
streamers_lock_.lock();
streamers_.push_back(streamer);
streamers_lock_.unlock();
// start streamer
FrameGrabbing::manager().add(streamer);
}
void Streaming::_addStream(const std::string &sender, int reply_to,
const std::string &clientname, NetworkToolkit::StreamProtocol protocol)
{
// get ip of client
@@ -395,10 +420,12 @@ std::string VideoStreamer::init(GstCaps *caps)
std::string path = SystemToolkit::full_filename(SystemToolkit::temp_path(), "shm");
path += std::to_string(config_.port);
g_object_set (G_OBJECT (gst_bin_get_by_name (GST_BIN (pipeline_), "sink")),
"sync", FALSE,
"socket-path", path.c_str(), NULL);
}
else {
g_object_set (G_OBJECT (gst_bin_get_by_name (GST_BIN (pipeline_), "sink")),
"sync", FALSE,
"host", config_.client_address.c_str(),
"port", config_.port, NULL);
}

View File

@@ -36,6 +36,7 @@ public:
void removeStreams(const std::string &clientname);
NetworkToolkit::StreamConfig removeStream(const std::string &sender, int port);
void removeStream(const VideoStreamer *vs);
void addStream(const std::string &sender, int port, const std::string &clientname);
bool busy();
std::vector<std::string> listStreams();
@@ -48,9 +49,9 @@ protected:
virtual void ProcessMessage( const osc::ReceivedMessage& m,
const IpEndpointName& remoteEndpoint );
};
void addStream(const std::string &sender, int reply_to, const std::string &clientname,
void _addStream(const std::string &sender, int reply_to, const std::string &clientname,
NetworkToolkit::StreamProtocol protocol = NetworkToolkit::DEFAULT);
void refuseStream(const std::string &sender, int reply_to);
void _refuseStream(const std::string &sender, int reply_to);
private: