blob: 6d948ce138fceabfb3565082ecbf188ad3b4cf59 [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "osp/public/presentation/presentation_controller.h"
#include <algorithm>
#include <sstream>
#include <type_traits>
#include "absl/types/optional.h"
#include "osp/impl/presentation/url_availability_requester.h"
#include "osp/msgs/osp_messages.h"
#include "osp/msgs/request_response_handler.h"
#include "osp/public/message_demuxer.h"
#include "osp/public/network_service_manager.h"
#include "osp/public/protocol_connection_client.h"
#include "util/osp_logging.h"
namespace openscreen {
namespace osp {
#define DECLARE_MSG_REQUEST_RESPONSE(base_name) \
using RequestMsgType = msgs::Presentation##base_name##Request; \
using ResponseMsgType = msgs::Presentation##base_name##Response; \
\
static constexpr MessageEncodingFunction<RequestMsgType> kEncoder = \
&msgs::EncodePresentation##base_name##Request; \
static constexpr MessageDecodingFunction<ResponseMsgType> kDecoder = \
&msgs::DecodePresentation##base_name##Response; \
static constexpr msgs::Type kResponseType = \
msgs::Type::kPresentation##base_name##Response
struct StartRequest {
DECLARE_MSG_REQUEST_RESPONSE(Start);
msgs::PresentationStartRequest request;
RequestDelegate* delegate;
Connection::Delegate* presentation_connection_delegate;
};
struct ConnectionOpenRequest {
DECLARE_MSG_REQUEST_RESPONSE(ConnectionOpen);
msgs::PresentationConnectionOpenRequest request;
RequestDelegate* delegate;
Connection::Delegate* presentation_connection_delegate;
std::unique_ptr<Connection> connection;
};
struct ConnectionCloseRequest {
DECLARE_MSG_REQUEST_RESPONSE(ConnectionClose);
msgs::PresentationConnectionCloseRequest request;
};
struct TerminationRequest {
DECLARE_MSG_REQUEST_RESPONSE(Termination);
msgs::PresentationTerminationRequest request;
};
class Controller::MessageGroupStreams final
: public ProtocolConnectionClient::ConnectionRequestCallback,
public ProtocolConnection::Observer,
public RequestResponseHandler<StartRequest>::Delegate,
public RequestResponseHandler<ConnectionOpenRequest>::Delegate,
public RequestResponseHandler<ConnectionCloseRequest>::Delegate,
public RequestResponseHandler<TerminationRequest>::Delegate {
public:
MessageGroupStreams(Controller* controller, const std::string& service_id);
~MessageGroupStreams();
uint64_t SendStartRequest(StartRequest request);
void CancelStartRequest(uint64_t request_id);
void OnMatchedResponse(StartRequest* request,
msgs::PresentationStartResponse* response,
uint64_t endpoint_id) override;
void OnError(StartRequest* request, Error error) override;
uint64_t SendConnectionOpenRequest(ConnectionOpenRequest request);
void CancelConnectionOpenRequest(uint64_t request_id);
void OnMatchedResponse(ConnectionOpenRequest* request,
msgs::PresentationConnectionOpenResponse* response,
uint64_t endpoint_id) override;
void OnError(ConnectionOpenRequest* request, Error error) override;
void SendConnectionCloseRequest(ConnectionCloseRequest request);
void OnMatchedResponse(ConnectionCloseRequest* request,
msgs::PresentationConnectionCloseResponse* response,
uint64_t endpoint_id) override;
void OnError(ConnectionCloseRequest* request, Error error) override;
void SendTerminationRequest(TerminationRequest request);
void OnMatchedResponse(TerminationRequest* request,
msgs::PresentationTerminationResponse* response,
uint64_t endpoint_id) override;
void OnError(TerminationRequest* request, Error error) override;
// ProtocolConnectionClient::ConnectionRequestCallback overrides.
void OnConnectionOpened(
uint64_t request_id,
std::unique_ptr<ProtocolConnection> connection) override;
void OnConnectionFailed(uint64_t request_id) override;
// ProtocolConnection::Observer overrides.
void OnConnectionClosed(const ProtocolConnection& connection) override;
private:
uint64_t GetNextInternalRequestId();
Controller* const controller_;
const std::string service_id_;
uint64_t next_internal_request_id_ = 1;
ProtocolConnectionClient::ConnectRequest initiation_connect_request_;
std::unique_ptr<ProtocolConnection> initiation_protocol_connection_;
ProtocolConnectionClient::ConnectRequest connection_connect_request_;
std::unique_ptr<ProtocolConnection> connection_protocol_connection_;
// TODO(btolsch): Improve the ergo of QuicClient::Connect because this is bad.
bool initiation_connect_request_stack_{false};
bool connection_connect_request_stack_{false};
RequestResponseHandler<StartRequest> initiation_handler_;
RequestResponseHandler<ConnectionOpenRequest> connection_open_handler_;
RequestResponseHandler<ConnectionCloseRequest> connection_close_handler_;
RequestResponseHandler<TerminationRequest> termination_handler_;
};
Controller::MessageGroupStreams::MessageGroupStreams(
Controller* controller,
const std::string& service_id)
: controller_(controller),
service_id_(service_id),
initiation_handler_(this),
connection_open_handler_(this),
connection_close_handler_(this),
termination_handler_(this) {}
Controller::MessageGroupStreams::~MessageGroupStreams() = default;
uint64_t Controller::MessageGroupStreams::SendStartRequest(
StartRequest request) {
uint64_t request_id = GetNextInternalRequestId();
if (!initiation_protocol_connection_ && !initiation_connect_request_) {
initiation_connect_request_stack_ = true;
initiation_connect_request_ =
NetworkServiceManager::Get()->GetProtocolConnectionClient()->Connect(
controller_->receiver_endpoints_[service_id_], this);
initiation_connect_request_stack_ = false;
}
initiation_handler_.WriteMessage(request_id, std::move(request));
return request_id;
}
void Controller::MessageGroupStreams::CancelStartRequest(uint64_t request_id) {
// TODO(btolsch): Instead, mark the |request_id| for immediate termination if
// we get a successful response.
initiation_handler_.CancelMessage(request_id);
}
void Controller::MessageGroupStreams::OnMatchedResponse(
StartRequest* request,
msgs::PresentationStartResponse* response,
uint64_t endpoint_id) {
if (response->result != msgs::PresentationStartResponse_result::kSuccess) {
std::stringstream ss;
ss << "presentation-start-response for " << request->request.url
<< " failed: " << static_cast<int>(response->result);
Error error(Error::Code::kUnknownStartError, ss.str());
OSP_LOG_INFO << error.message();
request->delegate->OnError(std::move(error));
return;
}
OSP_LOG_INFO << "presentation started for " << request->request.url;
Controller::ControlledPresentation& presentation =
controller_->presentations_[request->request.presentation_id];
presentation.service_id = service_id_;
presentation.url = request->request.url;
auto connection = std::make_unique<Connection>(
Connection::PresentationInfo{request->request.presentation_id,
request->request.url},
request->presentation_connection_delegate, controller_);
controller_->OpenConnection(response->connection_id, endpoint_id, service_id_,
request->delegate, std::move(connection),
NetworkServiceManager::Get()
->GetProtocolConnectionClient()
->CreateProtocolConnection(endpoint_id));
}
void Controller::MessageGroupStreams::OnError(StartRequest* request,
Error error) {
request->delegate->OnError(std::move(error));
}
uint64_t Controller::MessageGroupStreams::SendConnectionOpenRequest(
ConnectionOpenRequest request) {
uint64_t request_id = GetNextInternalRequestId();
if (!connection_protocol_connection_ && !connection_connect_request_) {
connection_connect_request_stack_ = true;
connection_connect_request_ =
NetworkServiceManager::Get()->GetProtocolConnectionClient()->Connect(
controller_->receiver_endpoints_[service_id_], this);
connection_connect_request_stack_ = false;
}
connection_open_handler_.WriteMessage(request_id, std::move(request));
return request_id;
}
void Controller::MessageGroupStreams::CancelConnectionOpenRequest(
uint64_t request_id) {
connection_open_handler_.CancelMessage(request_id);
}
void Controller::MessageGroupStreams::OnMatchedResponse(
ConnectionOpenRequest* request,
msgs::PresentationConnectionOpenResponse* response,
uint64_t endpoint_id) {
if (response->result !=
msgs::PresentationConnectionOpenResponse_result::kSuccess) {
std::stringstream ss;
ss << "presentation-connection-open-response for " << request->request.url
<< " failed: " << static_cast<int>(response->result);
Error error(Error::Code::kUnknownStartError, ss.str());
OSP_LOG_INFO << error.message();
request->delegate->OnError(std::move(error));
return;
}
OSP_LOG_INFO << "presentation connection opened to "
<< request->request.presentation_id;
if (request->presentation_connection_delegate) {
request->connection = std::make_unique<Connection>(
Connection::PresentationInfo{request->request.presentation_id,
request->request.url},
request->presentation_connection_delegate, controller_);
}
std::unique_ptr<ProtocolConnection> protocol_connection =
NetworkServiceManager::Get()
->GetProtocolConnectionClient()
->CreateProtocolConnection(endpoint_id);
request->connection->OnConnected(response->connection_id, endpoint_id,
std::move(protocol_connection));
controller_->AddConnection(request->connection.get());
request->delegate->OnConnection(std::move(request->connection));
}
void Controller::MessageGroupStreams::OnError(ConnectionOpenRequest* request,
Error error) {
request->delegate->OnError(std::move(error));
}
void Controller::MessageGroupStreams::SendConnectionCloseRequest(
ConnectionCloseRequest request) {
if (!connection_protocol_connection_ && !connection_connect_request_) {
connection_connect_request_stack_ = true;
connection_connect_request_ =
NetworkServiceManager::Get()->GetProtocolConnectionClient()->Connect(
controller_->receiver_endpoints_[service_id_], this);
connection_connect_request_stack_ = false;
}
connection_close_handler_.WriteMessage(std::move(request));
}
void Controller::MessageGroupStreams::OnMatchedResponse(
ConnectionCloseRequest* request,
msgs::PresentationConnectionCloseResponse* response,
uint64_t endpoint_id) {
OSP_LOG_IF(INFO,
response->result !=
msgs::PresentationConnectionCloseResponse_result::kSuccess)
<< "error in presentation-connection-close-response: "
<< static_cast<int>(response->result);
}
void Controller::MessageGroupStreams::OnError(ConnectionCloseRequest* request,
Error error) {
OSP_LOG_INFO << "got error when closing connection "
<< request->request.connection_id << ": " << error;
}
void Controller::MessageGroupStreams::SendTerminationRequest(
TerminationRequest request) {
if (!initiation_protocol_connection_ && !initiation_connect_request_) {
initiation_connect_request_ =
NetworkServiceManager::Get()->GetProtocolConnectionClient()->Connect(
controller_->receiver_endpoints_[service_id_], this);
}
termination_handler_.WriteMessage(std::move(request));
}
void Controller::MessageGroupStreams::OnMatchedResponse(
TerminationRequest* request,
msgs::PresentationTerminationResponse* response,
uint64_t endpoint_id) {
OSP_VLOG << "got presentation-termination-response for "
<< request->request.presentation_id << " with result "
<< static_cast<int>(response->result);
controller_->TerminatePresentationById(request->request.presentation_id);
}
void Controller::MessageGroupStreams::OnError(TerminationRequest* request,
Error error) {}
void Controller::MessageGroupStreams::OnConnectionOpened(
uint64_t request_id,
std::unique_ptr<ProtocolConnection> connection) {
if ((initiation_connect_request_ &&
initiation_connect_request_.request_id() == request_id) ||
initiation_connect_request_stack_) {
initiation_protocol_connection_ = std::move(connection);
initiation_protocol_connection_->SetObserver(this);
initiation_connect_request_.MarkComplete();
initiation_handler_.SetConnection(initiation_protocol_connection_.get());
termination_handler_.SetConnection(initiation_protocol_connection_.get());
} else if ((connection_connect_request_ &&
connection_connect_request_.request_id() == request_id) ||
connection_connect_request_stack_) {
connection_protocol_connection_ = std::move(connection);
connection_protocol_connection_->SetObserver(this);
connection_connect_request_.MarkComplete();
connection_open_handler_.SetConnection(
connection_protocol_connection_.get());
connection_close_handler_.SetConnection(
connection_protocol_connection_.get());
}
}
void Controller::MessageGroupStreams::OnConnectionFailed(uint64_t request_id) {
if (initiation_connect_request_ &&
initiation_connect_request_.request_id() == request_id) {
initiation_connect_request_.MarkComplete();
initiation_handler_.Reset();
termination_handler_.Reset();
} else if (connection_connect_request_ &&
connection_connect_request_.request_id() == request_id) {
connection_connect_request_.MarkComplete();
connection_open_handler_.Reset();
connection_close_handler_.Reset();
}
}
void Controller::MessageGroupStreams::OnConnectionClosed(
const ProtocolConnection& connection) {
if (&connection == initiation_protocol_connection_.get()) {
initiation_handler_.Reset();
termination_handler_.Reset();
}
}
uint64_t Controller::MessageGroupStreams::GetNextInternalRequestId() {
return ++next_internal_request_id_;
}
Controller::ReceiverWatch::ReceiverWatch() = default;
Controller::ReceiverWatch::ReceiverWatch(Controller* controller,
const std::vector<std::string>& urls,
ReceiverObserver* observer)
: urls_(urls), observer_(observer), controller_(controller) {}
Controller::ReceiverWatch::ReceiverWatch(
Controller::ReceiverWatch&& other) noexcept {
swap(*this, other);
}
Controller::ReceiverWatch::~ReceiverWatch() {
if (observer_) {
controller_->CancelReceiverWatch(urls_, observer_);
}
observer_ = nullptr;
}
Controller::ReceiverWatch& Controller::ReceiverWatch::operator=(
Controller::ReceiverWatch other) {
swap(*this, other);
return *this;
}
void swap(Controller::ReceiverWatch& a, Controller::ReceiverWatch& b) {
using std::swap;
swap(a.urls_, b.urls_);
swap(a.observer_, b.observer_);
swap(a.controller_, b.controller_);
}
Controller::ConnectRequest::ConnectRequest() = default;
Controller::ConnectRequest::ConnectRequest(Controller* controller,
const std::string& service_id,
bool is_reconnect,
absl::optional<uint64_t> request_id)
: service_id_(service_id),
is_reconnect_(is_reconnect),
request_id_(request_id),
controller_(controller) {}
Controller::ConnectRequest::ConnectRequest(ConnectRequest&& other) noexcept {
swap(*this, other);
}
Controller::ConnectRequest::~ConnectRequest() {
if (request_id_) {
controller_->CancelConnectRequest(service_id_, is_reconnect_,
request_id_.value());
}
request_id_ = 0;
}
Controller::ConnectRequest& Controller::ConnectRequest::operator=(
ConnectRequest other) {
swap(*this, other);
return *this;
}
void swap(Controller::ConnectRequest& a, Controller::ConnectRequest& b) {
using std::swap;
swap(a.service_id_, b.service_id_);
swap(a.is_reconnect_, b.is_reconnect_);
swap(a.request_id_, b.request_id_);
swap(a.controller_, b.controller_);
}
Controller::Controller(ClockNowFunctionPtr now_function) {
availability_requester_ =
std::make_unique<UrlAvailabilityRequester>(now_function);
connection_manager_ =
std::make_unique<ConnectionManager>(NetworkServiceManager::Get()
->GetProtocolConnectionClient()
->message_demuxer());
const std::vector<ServiceInfo>& receivers =
NetworkServiceManager::Get()->GetMdnsServiceListener()->GetReceivers();
for (const auto& info : receivers) {
// TODO(crbug.com/openscreen/33): Replace service_id with endpoint_id when
// endpoint_id is more than just an IPEndpoint counter and actually relates
// to a device's identity.
receiver_endpoints_.emplace(info.service_id, info.v4_endpoint.port
? info.v4_endpoint
: info.v6_endpoint);
availability_requester_->AddReceiver(info);
}
// TODO(btolsch): This is for |receiver_endpoints_|, but this should really be
// tracked elsewhere so it's available to other protocols as well.
NetworkServiceManager::Get()->GetMdnsServiceListener()->AddObserver(this);
}
Controller::~Controller() {
connection_manager_.reset();
NetworkServiceManager::Get()->GetMdnsServiceListener()->RemoveObserver(this);
}
Controller::ReceiverWatch Controller::RegisterReceiverWatch(
const std::vector<std::string>& urls,
ReceiverObserver* observer) {
availability_requester_->AddObserver(urls, observer);
return ReceiverWatch(this, urls, observer);
}
Controller::ConnectRequest Controller::StartPresentation(
const std::string& url,
const std::string& service_id,
RequestDelegate* delegate,
Connection::Delegate* conn_delegate) {
StartRequest request;
request.request.url = url;
request.request.presentation_id = MakePresentationId(url, service_id);
request.delegate = delegate;
request.presentation_connection_delegate = conn_delegate;
uint64_t request_id =
group_streams_[service_id]->SendStartRequest(std::move(request));
constexpr bool is_reconnect = false;
return ConnectRequest(this, service_id, is_reconnect, request_id);
}
Controller::ConnectRequest Controller::ReconnectPresentation(
const std::vector<std::string>& urls,
const std::string& presentation_id,
const std::string& service_id,
RequestDelegate* delegate,
Connection::Delegate* conn_delegate) {
auto presentation_entry = presentations_.find(presentation_id);
if (presentation_entry == presentations_.end()) {
delegate->OnError(Error::Code::kNoPresentationFound);
return ConnectRequest();
}
auto matching_url_it =
std::find(urls.begin(), urls.end(), presentation_entry->second.url);
if (matching_url_it == urls.end()) {
delegate->OnError(Error::Code::kNoPresentationFound);
return ConnectRequest();
}
ConnectionOpenRequest request;
request.request.url = presentation_entry->second.url;
request.request.presentation_id = presentation_id;
request.delegate = delegate;
request.presentation_connection_delegate = conn_delegate;
request.connection = nullptr;
uint64_t request_id =
group_streams_[service_id]->SendConnectionOpenRequest(std::move(request));
constexpr bool is_reconnect = true;
return ConnectRequest(this, service_id, is_reconnect, request_id);
}
Controller::ConnectRequest Controller::ReconnectConnection(
std::unique_ptr<Connection> connection,
RequestDelegate* delegate) {
if (connection->state() != Connection::State::kClosed) {
delegate->OnError(Error::Code::kInvalidConnectionState);
return ConnectRequest();
}
const Connection::PresentationInfo& info = connection->presentation_info();
auto presentation_entry = presentations_.find(info.id);
if (presentation_entry == presentations_.end() ||
presentation_entry->second.url != info.url) {
OSP_LOG_ERROR << "missing ControlledPresentation for non-terminated "
"connection with info ("
<< info.id << ", " << info.url << ")";
delegate->OnError(Error::Code::kNoPresentationFound);
return ConnectRequest();
}
OSP_DCHECK(connection_manager_->GetConnection(connection->connection_id()))
<< "otherwise valid connection for reconnect is unknown to the "
"connection manager";
connection_manager_->RemoveConnection(connection.get());
connection->OnConnecting();
ConnectionOpenRequest request;
request.request.url = info.url;
request.request.presentation_id = info.id;
request.delegate = delegate;
request.presentation_connection_delegate = nullptr;
request.connection = std::move(connection);
const std::string& service_id = presentation_entry->second.service_id;
uint64_t request_id =
group_streams_[service_id]->SendConnectionOpenRequest(std::move(request));
constexpr bool is_reconnect = true;
return ConnectRequest(this, service_id, is_reconnect, request_id);
}
Error Controller::CloseConnection(Connection* connection,
Connection::CloseReason reason) {
auto presentation_entry =
presentations_.find(connection->presentation_info().id);
if (presentation_entry == presentations_.end()) {
std::stringstream ss;
ss << "no presentation found when trying to close connection "
<< connection->presentation_info().id << ":"
<< connection->connection_id();
return Error(Error::Code::kNoPresentationFound, ss.str());
}
ConnectionCloseRequest request;
request.request.connection_id = connection->connection_id();
group_streams_[presentation_entry->second.service_id]
->SendConnectionCloseRequest(std::move(request));
return Error::None();
}
Error Controller::OnPresentationTerminated(const std::string& presentation_id,
TerminationReason reason) {
auto presentation_entry = presentations_.find(presentation_id);
if (presentation_entry == presentations_.end()) {
return Error::Code::kNoPresentationFound;
}
ControlledPresentation& presentation = presentation_entry->second;
for (auto* connection : presentation.connections) {
connection->OnTerminated();
}
TerminationRequest request;
request.request.presentation_id = presentation_id;
request.request.reason =
msgs::PresentationTerminationRequest_reason::kUserTerminatedViaController;
group_streams_[presentation.service_id]->SendTerminationRequest(
std::move(request));
presentations_.erase(presentation_entry);
termination_listener_by_id_.erase(presentation_id);
return Error::None();
}
void Controller::OnConnectionDestroyed(Connection* connection) {
auto presentation_entry =
presentations_.find(connection->presentation_info().id);
if (presentation_entry == presentations_.end()) {
return;
}
std::vector<Connection*>& connections =
presentation_entry->second.connections;
connections.erase(
std::remove(connections.begin(), connections.end(), connection),
connections.end());
connection_manager_->RemoveConnection(connection);
}
std::string Controller::GetServiceIdForPresentationId(
const std::string& presentation_id) const {
auto presentation_entry = presentations_.find(presentation_id);
if (presentation_entry == presentations_.end()) {
return "";
}
return presentation_entry->second.service_id;
}
ProtocolConnection* Controller::GetConnectionRequestGroupStream(
const std::string& service_id) {
OSP_UNIMPLEMENTED();
return nullptr;
}
void Controller::OnError(ServiceListenerError) {}
void Controller::OnMetrics(ServiceListener::Metrics) {}
class Controller::TerminationListener final
: public MessageDemuxer::MessageCallback {
public:
TerminationListener(Controller* controller,
const std::string& presentation_id,
uint64_t endpoint_id);
~TerminationListener() override;
// MessageDemuxer::MessageCallback overrides.
ErrorOr<size_t> OnStreamMessage(uint64_t endpoint_id,
uint64_t connection_id,
msgs::Type message_type,
const uint8_t* buffer,
size_t buffer_size,
Clock::time_point now) override;
private:
Controller* const controller_;
std::string presentation_id_;
MessageDemuxer::MessageWatch event_watch_;
};
Controller::TerminationListener::TerminationListener(
Controller* controller,
const std::string& presentation_id,
uint64_t endpoint_id)
: controller_(controller), presentation_id_(presentation_id) {
event_watch_ =
NetworkServiceManager::Get()
->GetProtocolConnectionClient()
->message_demuxer()
->WatchMessageType(endpoint_id,
msgs::Type::kPresentationTerminationEvent, this);
}
Controller::TerminationListener::~TerminationListener() = default;
ErrorOr<size_t> Controller::TerminationListener::OnStreamMessage(
uint64_t endpoint_id,
uint64_t connection_id,
msgs::Type message_type,
const uint8_t* buffer,
size_t buffer_size,
Clock::time_point now) {
OSP_CHECK_EQ(static_cast<int>(msgs::Type::kPresentationTerminationEvent),
static_cast<int>(message_type));
msgs::PresentationTerminationEvent event;
ssize_t result =
msgs::DecodePresentationTerminationEvent(buffer, buffer_size, &event);
if (result < 0) {
OSP_LOG_WARN << "decode presentation-termination-event error: " << result;
return Error::Code::kCborParsing;
} else if (event.presentation_id != presentation_id_) {
OSP_LOG_WARN << "got presentation-termination-event for wrong id: "
<< presentation_id_ << " vs. " << event.presentation_id;
return result;
}
OSP_LOG_INFO << "termination event";
auto presentation_entry =
controller_->presentations_.find(event.presentation_id);
if (presentation_entry != controller_->presentations_.end()) {
for (auto* connection : presentation_entry->second.connections)
connection->OnTerminated();
controller_->presentations_.erase(presentation_entry);
}
controller_->termination_listener_by_id_.erase(event.presentation_id);
return result;
}
// static
std::string Controller::MakePresentationId(const std::string& url,
const std::string& service_id) {
// TODO(btolsch): This is just a placeholder for the demo. It should
// eventually become a GUID/unguessable token routine.
std::string safe_id = service_id;
for (auto& c : safe_id)
if (c < ' ' || c > '~')
c = '.';
return safe_id + ":" + url;
}
void Controller::AddConnection(Connection* connection) {
connection_manager_->AddConnection(connection);
}
void Controller::OpenConnection(
uint64_t connection_id,
uint64_t endpoint_id,
const std::string& service_id,
RequestDelegate* request_delegate,
std::unique_ptr<Connection>&& connection,
std::unique_ptr<ProtocolConnection>&& protocol_connection) {
connection->OnConnected(connection_id, endpoint_id,
std::move(protocol_connection));
const std::string& presentation_id = connection->presentation_info().id;
auto presentation_entry = presentations_.find(presentation_id);
if (presentation_entry == presentations_.end()) {
auto emplace_entry = presentations_.emplace(
presentation_id,
ControlledPresentation{
service_id, connection->presentation_info().url, {}});
presentation_entry = emplace_entry.first;
}
ControlledPresentation& presentation = presentation_entry->second;
presentation.connections.push_back(connection.get());
AddConnection(connection.get());
auto terminate_entry = termination_listener_by_id_.find(presentation_id);
if (terminate_entry == termination_listener_by_id_.end()) {
termination_listener_by_id_.emplace(
presentation_id, std::make_unique<TerminationListener>(
this, presentation_id, endpoint_id));
}
request_delegate->OnConnection(std::move(connection));
}
void Controller::TerminatePresentationById(const std::string& presentation_id) {
auto presentation_entry = presentations_.find(presentation_id);
if (presentation_entry != presentations_.end()) {
for (auto* connection : presentation_entry->second.connections) {
connection->OnTerminated();
}
presentations_.erase(presentation_entry);
}
}
void Controller::CancelReceiverWatch(const std::vector<std::string>& urls,
ReceiverObserver* observer) {
availability_requester_->RemoveObserverUrls(urls, observer);
}
void Controller::CancelConnectRequest(const std::string& service_id,
bool is_reconnect,
uint64_t request_id) {
auto group_streams_entry = group_streams_.find(service_id);
if (group_streams_entry == group_streams_.end())
return;
if (is_reconnect) {
group_streams_entry->second->CancelConnectionOpenRequest(request_id);
} else {
group_streams_entry->second->CancelStartRequest(request_id);
}
}
void Controller::OnStarted() {}
void Controller::OnStopped() {}
void Controller::OnSuspended() {}
void Controller::OnSearching() {}
void Controller::OnReceiverAdded(const ServiceInfo& info) {
receiver_endpoints_.emplace(info.service_id, info.v4_endpoint.port
? info.v4_endpoint
: info.v6_endpoint);
auto group_streams =
std::make_unique<MessageGroupStreams>(this, info.service_id);
group_streams_[info.service_id] = std::move(group_streams);
availability_requester_->AddReceiver(info);
}
void Controller::OnReceiverChanged(const ServiceInfo& info) {
receiver_endpoints_[info.service_id] =
info.v4_endpoint.port ? info.v4_endpoint : info.v6_endpoint;
availability_requester_->ChangeReceiver(info);
}
void Controller::OnReceiverRemoved(const ServiceInfo& info) {
receiver_endpoints_.erase(info.service_id);
group_streams_.erase(info.service_id);
availability_requester_->RemoveReceiver(info);
}
void Controller::OnAllReceiversRemoved() {
receiver_endpoints_.clear();
availability_requester_->RemoveAllReceivers();
}
} // namespace osp
} // namespace openscreen