Improved connection robustness and diconnection/connection behaviors

(added connection rejection to streamer).
This commit is contained in:
brunoherbelin
2020-10-25 22:01:04 +01:00
parent e60c7a4cad
commit 01410a59cf
7 changed files with 148 additions and 79 deletions

View File

@@ -34,7 +34,6 @@
#define STREAMER_DEBUG
#endif
// this is called when a U
void StreamingRequestListener::ProcessMessage( const osc::ReceivedMessage& m,
const IpEndpointName& remoteEndpoint )
{
@@ -48,8 +47,10 @@ void StreamingRequestListener::ProcessMessage( const osc::ReceivedMessage& m,
osc::ReceivedMessage::const_iterator arg = m.ArgumentsBegin();
int reply_to_port = (arg++)->AsInt32();
const char *client_name = (arg++)->AsString();
Streaming::manager().addStream(sender, reply_to_port, client_name);
if (Streaming::manager().enabled())
Streaming::manager().addStream(sender, reply_to_port, client_name);
else
Streaming::manager().refuseStream(sender, reply_to_port);
#ifdef STREAMER_DEBUG
Log::Info("%s wants a stream.", sender);
#endif
@@ -72,12 +73,17 @@ void StreamingRequestListener::ProcessMessage( const osc::ReceivedMessage& m,
}
}
void wait_for_request_(UdpListeningReceiveSocket *receiver)
{
receiver->Run();
}
Streaming::Streaming() : session_(nullptr), width_(0), height_(0)
Streaming::Streaming() : enabled_(false), session_(nullptr), width_(0), height_(0)
{
int port = Connection::manager().info().port_stream_request;
receiver_ = new UdpListeningReceiveSocket(IpEndpointName( IpEndpointName::ANY_ADDRESS, port ), &listener_ );
std::thread(wait_for_request_, receiver_).detach();
}
@@ -104,20 +110,16 @@ std::vector<std::string> Streaming::listStreams() const
return ls;
}
void wait_for_request_(UdpListeningReceiveSocket *receiver)
{
receiver->Run();
}
void Streaming::enable(bool on)
{
if (on) {
std::thread(wait_for_request_, receiver_).detach();
// accept streaming requests
enabled_ = true;
Log::Info("Accepting stream requests to %s.", Connection::manager().info().name.c_str());
}
else {
// end streaming requests
receiver_->AsynchronousBreak();
// refuse streaming requests
enabled_ = false;
// ending and removing all streaming
for (auto sit = streamers_.begin(); sit != streamers_.end(); sit=streamers_.erase(sit))
(*sit)->stop();
@@ -183,6 +185,25 @@ void Streaming::removeStreams(const std::string &clientname)
}
}
void Streaming::refuseStream(const std::string &sender, int reply_to)
{
// get ip of client
std::string sender_ip = sender.substr(0, sender.find_last_of(":"));
// prepare to reply to client
IpEndpointName host( sender_ip.c_str(), reply_to );
UdpTransmitSocket socket( host );
// build OSC message
char buffer[IP_MTU_SIZE];
osc::OutboundPacketStream p( buffer, IP_MTU_SIZE );
p.Clear();
p << osc::BeginMessage( OSC_PREFIX OSC_STREAM_REJECT );
p << osc::EndMessage;
// send OSC message to client
socket.Send( p.Data(), p.Size() );
#ifdef STREAMER_DEBUG
Log::Info("Refusing streaming to %s", sender_ip.c_str());
#endif
}
void Streaming::addStream(const std::string &sender, int reply_to, const std::string &clientname)
{