| // Copyright 2019 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "osp/public/presentation/presentation_controller.h" |
| |
| #include <algorithm> |
| #include <sstream> |
| #include <type_traits> |
| |
| #include "absl/types/optional.h" |
| #include "osp/impl/presentation/url_availability_requester.h" |
| #include "osp/msgs/osp_messages.h" |
| #include "osp/msgs/request_response_handler.h" |
| #include "osp/public/message_demuxer.h" |
| #include "osp/public/network_service_manager.h" |
| #include "osp/public/protocol_connection_client.h" |
| #include "util/osp_logging.h" |
| |
| namespace openscreen { |
| namespace osp { |
| |
| #define DECLARE_MSG_REQUEST_RESPONSE(base_name) \ |
| using RequestMsgType = msgs::Presentation##base_name##Request; \ |
| using ResponseMsgType = msgs::Presentation##base_name##Response; \ |
| \ |
| static constexpr MessageEncodingFunction<RequestMsgType> kEncoder = \ |
| &msgs::EncodePresentation##base_name##Request; \ |
| static constexpr MessageDecodingFunction<ResponseMsgType> kDecoder = \ |
| &msgs::DecodePresentation##base_name##Response; \ |
| static constexpr msgs::Type kResponseType = \ |
| msgs::Type::kPresentation##base_name##Response |
| |
| struct StartRequest { |
| DECLARE_MSG_REQUEST_RESPONSE(Start); |
| |
| msgs::PresentationStartRequest request; |
| RequestDelegate* delegate; |
| Connection::Delegate* presentation_connection_delegate; |
| }; |
| |
| struct ConnectionOpenRequest { |
| DECLARE_MSG_REQUEST_RESPONSE(ConnectionOpen); |
| |
| msgs::PresentationConnectionOpenRequest request; |
| RequestDelegate* delegate; |
| Connection::Delegate* presentation_connection_delegate; |
| std::unique_ptr<Connection> connection; |
| }; |
| |
| struct ConnectionCloseRequest { |
| DECLARE_MSG_REQUEST_RESPONSE(ConnectionClose); |
| |
| msgs::PresentationConnectionCloseRequest request; |
| }; |
| |
| struct TerminationRequest { |
| DECLARE_MSG_REQUEST_RESPONSE(Termination); |
| |
| msgs::PresentationTerminationRequest request; |
| }; |
| |
| class Controller::MessageGroupStreams final |
| : public ProtocolConnectionClient::ConnectionRequestCallback, |
| public ProtocolConnection::Observer, |
| public RequestResponseHandler<StartRequest>::Delegate, |
| public RequestResponseHandler<ConnectionOpenRequest>::Delegate, |
| public RequestResponseHandler<ConnectionCloseRequest>::Delegate, |
| public RequestResponseHandler<TerminationRequest>::Delegate { |
| public: |
| MessageGroupStreams(Controller* controller, const std::string& service_id); |
| ~MessageGroupStreams(); |
| |
| uint64_t SendStartRequest(StartRequest request); |
| void CancelStartRequest(uint64_t request_id); |
| void OnMatchedResponse(StartRequest* request, |
| msgs::PresentationStartResponse* response, |
| uint64_t endpoint_id) override; |
| void OnError(StartRequest* request, Error error) override; |
| |
| uint64_t SendConnectionOpenRequest(ConnectionOpenRequest request); |
| void CancelConnectionOpenRequest(uint64_t request_id); |
| void OnMatchedResponse(ConnectionOpenRequest* request, |
| msgs::PresentationConnectionOpenResponse* response, |
| uint64_t endpoint_id) override; |
| void OnError(ConnectionOpenRequest* request, Error error) override; |
| |
| void SendConnectionCloseRequest(ConnectionCloseRequest request); |
| void OnMatchedResponse(ConnectionCloseRequest* request, |
| msgs::PresentationConnectionCloseResponse* response, |
| uint64_t endpoint_id) override; |
| void OnError(ConnectionCloseRequest* request, Error error) override; |
| |
| void SendTerminationRequest(TerminationRequest request); |
| void OnMatchedResponse(TerminationRequest* request, |
| msgs::PresentationTerminationResponse* response, |
| uint64_t endpoint_id) override; |
| void OnError(TerminationRequest* request, Error error) override; |
| |
| // ProtocolConnectionClient::ConnectionRequestCallback overrides. |
| void OnConnectionOpened( |
| uint64_t request_id, |
| std::unique_ptr<ProtocolConnection> connection) override; |
| void OnConnectionFailed(uint64_t request_id) override; |
| |
| // ProtocolConnection::Observer overrides. |
| void OnConnectionClosed(const ProtocolConnection& connection) override; |
| |
| private: |
| uint64_t GetNextInternalRequestId(); |
| |
| Controller* const controller_; |
| const std::string service_id_; |
| |
| uint64_t next_internal_request_id_ = 1; |
| ProtocolConnectionClient::ConnectRequest initiation_connect_request_; |
| std::unique_ptr<ProtocolConnection> initiation_protocol_connection_; |
| ProtocolConnectionClient::ConnectRequest connection_connect_request_; |
| std::unique_ptr<ProtocolConnection> connection_protocol_connection_; |
| |
| // TODO(btolsch): Improve the ergo of QuicClient::Connect because this is bad. |
| bool initiation_connect_request_stack_{false}; |
| bool connection_connect_request_stack_{false}; |
| |
| RequestResponseHandler<StartRequest> initiation_handler_; |
| RequestResponseHandler<ConnectionOpenRequest> connection_open_handler_; |
| RequestResponseHandler<ConnectionCloseRequest> connection_close_handler_; |
| RequestResponseHandler<TerminationRequest> termination_handler_; |
| }; |
| |
| Controller::MessageGroupStreams::MessageGroupStreams( |
| Controller* controller, |
| const std::string& service_id) |
| : controller_(controller), |
| service_id_(service_id), |
| initiation_handler_(this), |
| connection_open_handler_(this), |
| connection_close_handler_(this), |
| termination_handler_(this) {} |
| |
| Controller::MessageGroupStreams::~MessageGroupStreams() = default; |
| |
| uint64_t Controller::MessageGroupStreams::SendStartRequest( |
| StartRequest request) { |
| uint64_t request_id = GetNextInternalRequestId(); |
| if (!initiation_protocol_connection_ && !initiation_connect_request_) { |
| initiation_connect_request_stack_ = true; |
| initiation_connect_request_ = |
| NetworkServiceManager::Get()->GetProtocolConnectionClient()->Connect( |
| controller_->receiver_endpoints_[service_id_], this); |
| initiation_connect_request_stack_ = false; |
| } |
| initiation_handler_.WriteMessage(request_id, std::move(request)); |
| return request_id; |
| } |
| |
| void Controller::MessageGroupStreams::CancelStartRequest(uint64_t request_id) { |
| // TODO(btolsch): Instead, mark the |request_id| for immediate termination if |
| // we get a successful response. |
| initiation_handler_.CancelMessage(request_id); |
| } |
| |
| void Controller::MessageGroupStreams::OnMatchedResponse( |
| StartRequest* request, |
| msgs::PresentationStartResponse* response, |
| uint64_t endpoint_id) { |
| if (response->result != msgs::PresentationStartResponse_result::kSuccess) { |
| std::stringstream ss; |
| ss << "presentation-start-response for " << request->request.url |
| << " failed: " << static_cast<int>(response->result); |
| Error error(Error::Code::kUnknownStartError, ss.str()); |
| OSP_LOG_INFO << error.message(); |
| request->delegate->OnError(std::move(error)); |
| return; |
| } |
| OSP_LOG_INFO << "presentation started for " << request->request.url; |
| Controller::ControlledPresentation& presentation = |
| controller_->presentations_[request->request.presentation_id]; |
| presentation.service_id = service_id_; |
| presentation.url = request->request.url; |
| auto connection = std::make_unique<Connection>( |
| Connection::PresentationInfo{request->request.presentation_id, |
| request->request.url}, |
| request->presentation_connection_delegate, controller_); |
| controller_->OpenConnection(response->connection_id, endpoint_id, service_id_, |
| request->delegate, std::move(connection), |
| NetworkServiceManager::Get() |
| ->GetProtocolConnectionClient() |
| ->CreateProtocolConnection(endpoint_id)); |
| } |
| |
| void Controller::MessageGroupStreams::OnError(StartRequest* request, |
| Error error) { |
| request->delegate->OnError(std::move(error)); |
| } |
| |
| uint64_t Controller::MessageGroupStreams::SendConnectionOpenRequest( |
| ConnectionOpenRequest request) { |
| uint64_t request_id = GetNextInternalRequestId(); |
| if (!connection_protocol_connection_ && !connection_connect_request_) { |
| connection_connect_request_stack_ = true; |
| connection_connect_request_ = |
| NetworkServiceManager::Get()->GetProtocolConnectionClient()->Connect( |
| controller_->receiver_endpoints_[service_id_], this); |
| connection_connect_request_stack_ = false; |
| } |
| connection_open_handler_.WriteMessage(request_id, std::move(request)); |
| return request_id; |
| } |
| |
| void Controller::MessageGroupStreams::CancelConnectionOpenRequest( |
| uint64_t request_id) { |
| connection_open_handler_.CancelMessage(request_id); |
| } |
| |
| void Controller::MessageGroupStreams::OnMatchedResponse( |
| ConnectionOpenRequest* request, |
| msgs::PresentationConnectionOpenResponse* response, |
| uint64_t endpoint_id) { |
| if (response->result != |
| msgs::PresentationConnectionOpenResponse_result::kSuccess) { |
| std::stringstream ss; |
| ss << "presentation-connection-open-response for " << request->request.url |
| << " failed: " << static_cast<int>(response->result); |
| Error error(Error::Code::kUnknownStartError, ss.str()); |
| OSP_LOG_INFO << error.message(); |
| request->delegate->OnError(std::move(error)); |
| return; |
| } |
| OSP_LOG_INFO << "presentation connection opened to " |
| << request->request.presentation_id; |
| if (request->presentation_connection_delegate) { |
| request->connection = std::make_unique<Connection>( |
| Connection::PresentationInfo{request->request.presentation_id, |
| request->request.url}, |
| request->presentation_connection_delegate, controller_); |
| } |
| std::unique_ptr<ProtocolConnection> protocol_connection = |
| NetworkServiceManager::Get() |
| ->GetProtocolConnectionClient() |
| ->CreateProtocolConnection(endpoint_id); |
| request->connection->OnConnected(response->connection_id, endpoint_id, |
| std::move(protocol_connection)); |
| controller_->AddConnection(request->connection.get()); |
| request->delegate->OnConnection(std::move(request->connection)); |
| } |
| |
| void Controller::MessageGroupStreams::OnError(ConnectionOpenRequest* request, |
| Error error) { |
| request->delegate->OnError(std::move(error)); |
| } |
| |
| void Controller::MessageGroupStreams::SendConnectionCloseRequest( |
| ConnectionCloseRequest request) { |
| if (!connection_protocol_connection_ && !connection_connect_request_) { |
| connection_connect_request_stack_ = true; |
| connection_connect_request_ = |
| NetworkServiceManager::Get()->GetProtocolConnectionClient()->Connect( |
| controller_->receiver_endpoints_[service_id_], this); |
| connection_connect_request_stack_ = false; |
| } |
| connection_close_handler_.WriteMessage(std::move(request)); |
| } |
| |
| void Controller::MessageGroupStreams::OnMatchedResponse( |
| ConnectionCloseRequest* request, |
| msgs::PresentationConnectionCloseResponse* response, |
| uint64_t endpoint_id) { |
| OSP_LOG_IF(INFO, |
| response->result != |
| msgs::PresentationConnectionCloseResponse_result::kSuccess) |
| << "error in presentation-connection-close-response: " |
| << static_cast<int>(response->result); |
| } |
| |
| void Controller::MessageGroupStreams::OnError(ConnectionCloseRequest* request, |
| Error error) { |
| OSP_LOG_INFO << "got error when closing connection " |
| << request->request.connection_id << ": " << error; |
| } |
| |
| void Controller::MessageGroupStreams::SendTerminationRequest( |
| TerminationRequest request) { |
| if (!initiation_protocol_connection_ && !initiation_connect_request_) { |
| initiation_connect_request_ = |
| NetworkServiceManager::Get()->GetProtocolConnectionClient()->Connect( |
| controller_->receiver_endpoints_[service_id_], this); |
| } |
| termination_handler_.WriteMessage(std::move(request)); |
| } |
| |
| void Controller::MessageGroupStreams::OnMatchedResponse( |
| TerminationRequest* request, |
| msgs::PresentationTerminationResponse* response, |
| uint64_t endpoint_id) { |
| OSP_VLOG << "got presentation-termination-response for " |
| << request->request.presentation_id << " with result " |
| << static_cast<int>(response->result); |
| controller_->TerminatePresentationById(request->request.presentation_id); |
| } |
| |
| void Controller::MessageGroupStreams::OnError(TerminationRequest* request, |
| Error error) {} |
| |
| void Controller::MessageGroupStreams::OnConnectionOpened( |
| uint64_t request_id, |
| std::unique_ptr<ProtocolConnection> connection) { |
| if ((initiation_connect_request_ && |
| initiation_connect_request_.request_id() == request_id) || |
| initiation_connect_request_stack_) { |
| initiation_protocol_connection_ = std::move(connection); |
| initiation_protocol_connection_->SetObserver(this); |
| initiation_connect_request_.MarkComplete(); |
| initiation_handler_.SetConnection(initiation_protocol_connection_.get()); |
| termination_handler_.SetConnection(initiation_protocol_connection_.get()); |
| } else if ((connection_connect_request_ && |
| connection_connect_request_.request_id() == request_id) || |
| connection_connect_request_stack_) { |
| connection_protocol_connection_ = std::move(connection); |
| connection_protocol_connection_->SetObserver(this); |
| connection_connect_request_.MarkComplete(); |
| connection_open_handler_.SetConnection( |
| connection_protocol_connection_.get()); |
| connection_close_handler_.SetConnection( |
| connection_protocol_connection_.get()); |
| } |
| } |
| |
| void Controller::MessageGroupStreams::OnConnectionFailed(uint64_t request_id) { |
| if (initiation_connect_request_ && |
| initiation_connect_request_.request_id() == request_id) { |
| initiation_connect_request_.MarkComplete(); |
| initiation_handler_.Reset(); |
| termination_handler_.Reset(); |
| } else if (connection_connect_request_ && |
| connection_connect_request_.request_id() == request_id) { |
| connection_connect_request_.MarkComplete(); |
| connection_open_handler_.Reset(); |
| connection_close_handler_.Reset(); |
| } |
| } |
| |
| void Controller::MessageGroupStreams::OnConnectionClosed( |
| const ProtocolConnection& connection) { |
| if (&connection == initiation_protocol_connection_.get()) { |
| initiation_handler_.Reset(); |
| termination_handler_.Reset(); |
| } |
| } |
| |
| uint64_t Controller::MessageGroupStreams::GetNextInternalRequestId() { |
| return ++next_internal_request_id_; |
| } |
| |
| Controller::ReceiverWatch::ReceiverWatch() = default; |
| Controller::ReceiverWatch::ReceiverWatch(Controller* controller, |
| const std::vector<std::string>& urls, |
| ReceiverObserver* observer) |
| : urls_(urls), observer_(observer), controller_(controller) {} |
| |
| Controller::ReceiverWatch::ReceiverWatch( |
| Controller::ReceiverWatch&& other) noexcept { |
| swap(*this, other); |
| } |
| |
| Controller::ReceiverWatch::~ReceiverWatch() { |
| if (observer_) { |
| controller_->CancelReceiverWatch(urls_, observer_); |
| } |
| observer_ = nullptr; |
| } |
| |
| Controller::ReceiverWatch& Controller::ReceiverWatch::operator=( |
| Controller::ReceiverWatch other) { |
| swap(*this, other); |
| return *this; |
| } |
| |
| void swap(Controller::ReceiverWatch& a, Controller::ReceiverWatch& b) { |
| using std::swap; |
| swap(a.urls_, b.urls_); |
| swap(a.observer_, b.observer_); |
| swap(a.controller_, b.controller_); |
| } |
| |
| Controller::ConnectRequest::ConnectRequest() = default; |
| Controller::ConnectRequest::ConnectRequest(Controller* controller, |
| const std::string& service_id, |
| bool is_reconnect, |
| absl::optional<uint64_t> request_id) |
| : service_id_(service_id), |
| is_reconnect_(is_reconnect), |
| request_id_(request_id), |
| controller_(controller) {} |
| |
| Controller::ConnectRequest::ConnectRequest(ConnectRequest&& other) noexcept { |
| swap(*this, other); |
| } |
| |
| Controller::ConnectRequest::~ConnectRequest() { |
| if (request_id_) { |
| controller_->CancelConnectRequest(service_id_, is_reconnect_, |
| request_id_.value()); |
| } |
| request_id_ = 0; |
| } |
| |
| Controller::ConnectRequest& Controller::ConnectRequest::operator=( |
| ConnectRequest other) { |
| swap(*this, other); |
| return *this; |
| } |
| |
| void swap(Controller::ConnectRequest& a, Controller::ConnectRequest& b) { |
| using std::swap; |
| swap(a.service_id_, b.service_id_); |
| swap(a.is_reconnect_, b.is_reconnect_); |
| swap(a.request_id_, b.request_id_); |
| swap(a.controller_, b.controller_); |
| } |
| |
| Controller::Controller(ClockNowFunctionPtr now_function) { |
| availability_requester_ = |
| std::make_unique<UrlAvailabilityRequester>(now_function); |
| connection_manager_ = |
| std::make_unique<ConnectionManager>(NetworkServiceManager::Get() |
| ->GetProtocolConnectionClient() |
| ->message_demuxer()); |
| const std::vector<ServiceInfo>& receivers = |
| NetworkServiceManager::Get()->GetMdnsServiceListener()->GetReceivers(); |
| for (const auto& info : receivers) { |
| // TODO(crbug.com/openscreen/33): Replace service_id with endpoint_id when |
| // endpoint_id is more than just an IPEndpoint counter and actually relates |
| // to a device's identity. |
| receiver_endpoints_.emplace(info.service_id, info.v4_endpoint.port |
| ? info.v4_endpoint |
| : info.v6_endpoint); |
| availability_requester_->AddReceiver(info); |
| } |
| // TODO(btolsch): This is for |receiver_endpoints_|, but this should really be |
| // tracked elsewhere so it's available to other protocols as well. |
| NetworkServiceManager::Get()->GetMdnsServiceListener()->AddObserver(this); |
| } |
| |
| Controller::~Controller() { |
| connection_manager_.reset(); |
| NetworkServiceManager::Get()->GetMdnsServiceListener()->RemoveObserver(this); |
| } |
| |
| Controller::ReceiverWatch Controller::RegisterReceiverWatch( |
| const std::vector<std::string>& urls, |
| ReceiverObserver* observer) { |
| availability_requester_->AddObserver(urls, observer); |
| return ReceiverWatch(this, urls, observer); |
| } |
| |
| Controller::ConnectRequest Controller::StartPresentation( |
| const std::string& url, |
| const std::string& service_id, |
| RequestDelegate* delegate, |
| Connection::Delegate* conn_delegate) { |
| StartRequest request; |
| request.request.url = url; |
| request.request.presentation_id = MakePresentationId(url, service_id); |
| request.delegate = delegate; |
| request.presentation_connection_delegate = conn_delegate; |
| uint64_t request_id = |
| group_streams_[service_id]->SendStartRequest(std::move(request)); |
| constexpr bool is_reconnect = false; |
| return ConnectRequest(this, service_id, is_reconnect, request_id); |
| } |
| |
| Controller::ConnectRequest Controller::ReconnectPresentation( |
| const std::vector<std::string>& urls, |
| const std::string& presentation_id, |
| const std::string& service_id, |
| RequestDelegate* delegate, |
| Connection::Delegate* conn_delegate) { |
| auto presentation_entry = presentations_.find(presentation_id); |
| if (presentation_entry == presentations_.end()) { |
| delegate->OnError(Error::Code::kNoPresentationFound); |
| return ConnectRequest(); |
| } |
| auto matching_url_it = |
| std::find(urls.begin(), urls.end(), presentation_entry->second.url); |
| if (matching_url_it == urls.end()) { |
| delegate->OnError(Error::Code::kNoPresentationFound); |
| return ConnectRequest(); |
| } |
| ConnectionOpenRequest request; |
| request.request.url = presentation_entry->second.url; |
| request.request.presentation_id = presentation_id; |
| request.delegate = delegate; |
| request.presentation_connection_delegate = conn_delegate; |
| request.connection = nullptr; |
| uint64_t request_id = |
| group_streams_[service_id]->SendConnectionOpenRequest(std::move(request)); |
| constexpr bool is_reconnect = true; |
| return ConnectRequest(this, service_id, is_reconnect, request_id); |
| } |
| |
| Controller::ConnectRequest Controller::ReconnectConnection( |
| std::unique_ptr<Connection> connection, |
| RequestDelegate* delegate) { |
| if (connection->state() != Connection::State::kClosed) { |
| delegate->OnError(Error::Code::kInvalidConnectionState); |
| return ConnectRequest(); |
| } |
| const Connection::PresentationInfo& info = connection->presentation_info(); |
| auto presentation_entry = presentations_.find(info.id); |
| if (presentation_entry == presentations_.end() || |
| presentation_entry->second.url != info.url) { |
| OSP_LOG_ERROR << "missing ControlledPresentation for non-terminated " |
| "connection with info (" |
| << info.id << ", " << info.url << ")"; |
| delegate->OnError(Error::Code::kNoPresentationFound); |
| return ConnectRequest(); |
| } |
| OSP_DCHECK(connection_manager_->GetConnection(connection->connection_id())) |
| << "otherwise valid connection for reconnect is unknown to the " |
| "connection manager"; |
| connection_manager_->RemoveConnection(connection.get()); |
| connection->OnConnecting(); |
| ConnectionOpenRequest request; |
| request.request.url = info.url; |
| request.request.presentation_id = info.id; |
| request.delegate = delegate; |
| request.presentation_connection_delegate = nullptr; |
| request.connection = std::move(connection); |
| const std::string& service_id = presentation_entry->second.service_id; |
| uint64_t request_id = |
| group_streams_[service_id]->SendConnectionOpenRequest(std::move(request)); |
| constexpr bool is_reconnect = true; |
| return ConnectRequest(this, service_id, is_reconnect, request_id); |
| } |
| |
| Error Controller::CloseConnection(Connection* connection, |
| Connection::CloseReason reason) { |
| auto presentation_entry = |
| presentations_.find(connection->presentation_info().id); |
| if (presentation_entry == presentations_.end()) { |
| std::stringstream ss; |
| ss << "no presentation found when trying to close connection " |
| << connection->presentation_info().id << ":" |
| << connection->connection_id(); |
| return Error(Error::Code::kNoPresentationFound, ss.str()); |
| } |
| ConnectionCloseRequest request; |
| request.request.connection_id = connection->connection_id(); |
| group_streams_[presentation_entry->second.service_id] |
| ->SendConnectionCloseRequest(std::move(request)); |
| return Error::None(); |
| } |
| |
| Error Controller::OnPresentationTerminated(const std::string& presentation_id, |
| TerminationReason reason) { |
| auto presentation_entry = presentations_.find(presentation_id); |
| if (presentation_entry == presentations_.end()) { |
| return Error::Code::kNoPresentationFound; |
| } |
| ControlledPresentation& presentation = presentation_entry->second; |
| for (auto* connection : presentation.connections) { |
| connection->OnTerminated(); |
| } |
| TerminationRequest request; |
| request.request.presentation_id = presentation_id; |
| request.request.reason = |
| msgs::PresentationTerminationRequest_reason::kUserTerminatedViaController; |
| group_streams_[presentation.service_id]->SendTerminationRequest( |
| std::move(request)); |
| presentations_.erase(presentation_entry); |
| termination_listener_by_id_.erase(presentation_id); |
| return Error::None(); |
| } |
| |
| void Controller::OnConnectionDestroyed(Connection* connection) { |
| auto presentation_entry = |
| presentations_.find(connection->presentation_info().id); |
| if (presentation_entry == presentations_.end()) { |
| return; |
| } |
| |
| std::vector<Connection*>& connections = |
| presentation_entry->second.connections; |
| |
| connections.erase( |
| std::remove(connections.begin(), connections.end(), connection), |
| connections.end()); |
| |
| connection_manager_->RemoveConnection(connection); |
| } |
| |
| std::string Controller::GetServiceIdForPresentationId( |
| const std::string& presentation_id) const { |
| auto presentation_entry = presentations_.find(presentation_id); |
| if (presentation_entry == presentations_.end()) { |
| return ""; |
| } |
| return presentation_entry->second.service_id; |
| } |
| |
| ProtocolConnection* Controller::GetConnectionRequestGroupStream( |
| const std::string& service_id) { |
| OSP_UNIMPLEMENTED(); |
| return nullptr; |
| } |
| |
| void Controller::OnError(ServiceListenerError) {} |
| void Controller::OnMetrics(ServiceListener::Metrics) {} |
| |
| class Controller::TerminationListener final |
| : public MessageDemuxer::MessageCallback { |
| public: |
| TerminationListener(Controller* controller, |
| const std::string& presentation_id, |
| uint64_t endpoint_id); |
| ~TerminationListener() override; |
| |
| // MessageDemuxer::MessageCallback overrides. |
| ErrorOr<size_t> OnStreamMessage(uint64_t endpoint_id, |
| uint64_t connection_id, |
| msgs::Type message_type, |
| const uint8_t* buffer, |
| size_t buffer_size, |
| Clock::time_point now) override; |
| |
| private: |
| Controller* const controller_; |
| std::string presentation_id_; |
| MessageDemuxer::MessageWatch event_watch_; |
| }; |
| |
| Controller::TerminationListener::TerminationListener( |
| Controller* controller, |
| const std::string& presentation_id, |
| uint64_t endpoint_id) |
| : controller_(controller), presentation_id_(presentation_id) { |
| event_watch_ = |
| NetworkServiceManager::Get() |
| ->GetProtocolConnectionClient() |
| ->message_demuxer() |
| ->WatchMessageType(endpoint_id, |
| msgs::Type::kPresentationTerminationEvent, this); |
| } |
| |
| Controller::TerminationListener::~TerminationListener() = default; |
| |
| ErrorOr<size_t> Controller::TerminationListener::OnStreamMessage( |
| uint64_t endpoint_id, |
| uint64_t connection_id, |
| msgs::Type message_type, |
| const uint8_t* buffer, |
| size_t buffer_size, |
| Clock::time_point now) { |
| OSP_CHECK_EQ(static_cast<int>(msgs::Type::kPresentationTerminationEvent), |
| static_cast<int>(message_type)); |
| msgs::PresentationTerminationEvent event; |
| ssize_t result = |
| msgs::DecodePresentationTerminationEvent(buffer, buffer_size, &event); |
| if (result < 0) { |
| OSP_LOG_WARN << "decode presentation-termination-event error: " << result; |
| return Error::Code::kCborParsing; |
| } else if (event.presentation_id != presentation_id_) { |
| OSP_LOG_WARN << "got presentation-termination-event for wrong id: " |
| << presentation_id_ << " vs. " << event.presentation_id; |
| return result; |
| } |
| OSP_LOG_INFO << "termination event"; |
| auto presentation_entry = |
| controller_->presentations_.find(event.presentation_id); |
| if (presentation_entry != controller_->presentations_.end()) { |
| for (auto* connection : presentation_entry->second.connections) |
| connection->OnTerminated(); |
| controller_->presentations_.erase(presentation_entry); |
| } |
| controller_->termination_listener_by_id_.erase(event.presentation_id); |
| return result; |
| } |
| |
| // static |
| std::string Controller::MakePresentationId(const std::string& url, |
| const std::string& service_id) { |
| // TODO(btolsch): This is just a placeholder for the demo. It should |
| // eventually become a GUID/unguessable token routine. |
| std::string safe_id = service_id; |
| for (auto& c : safe_id) |
| if (c < ' ' || c > '~') |
| c = '.'; |
| return safe_id + ":" + url; |
| } |
| |
| void Controller::AddConnection(Connection* connection) { |
| connection_manager_->AddConnection(connection); |
| } |
| |
| void Controller::OpenConnection( |
| uint64_t connection_id, |
| uint64_t endpoint_id, |
| const std::string& service_id, |
| RequestDelegate* request_delegate, |
| std::unique_ptr<Connection>&& connection, |
| std::unique_ptr<ProtocolConnection>&& protocol_connection) { |
| connection->OnConnected(connection_id, endpoint_id, |
| std::move(protocol_connection)); |
| const std::string& presentation_id = connection->presentation_info().id; |
| auto presentation_entry = presentations_.find(presentation_id); |
| if (presentation_entry == presentations_.end()) { |
| auto emplace_entry = presentations_.emplace( |
| presentation_id, |
| ControlledPresentation{ |
| service_id, connection->presentation_info().url, {}}); |
| presentation_entry = emplace_entry.first; |
| } |
| ControlledPresentation& presentation = presentation_entry->second; |
| presentation.connections.push_back(connection.get()); |
| AddConnection(connection.get()); |
| |
| auto terminate_entry = termination_listener_by_id_.find(presentation_id); |
| if (terminate_entry == termination_listener_by_id_.end()) { |
| termination_listener_by_id_.emplace( |
| presentation_id, std::make_unique<TerminationListener>( |
| this, presentation_id, endpoint_id)); |
| } |
| request_delegate->OnConnection(std::move(connection)); |
| } |
| |
| void Controller::TerminatePresentationById(const std::string& presentation_id) { |
| auto presentation_entry = presentations_.find(presentation_id); |
| if (presentation_entry != presentations_.end()) { |
| for (auto* connection : presentation_entry->second.connections) { |
| connection->OnTerminated(); |
| } |
| presentations_.erase(presentation_entry); |
| } |
| } |
| |
| void Controller::CancelReceiverWatch(const std::vector<std::string>& urls, |
| ReceiverObserver* observer) { |
| availability_requester_->RemoveObserverUrls(urls, observer); |
| } |
| |
| void Controller::CancelConnectRequest(const std::string& service_id, |
| bool is_reconnect, |
| uint64_t request_id) { |
| auto group_streams_entry = group_streams_.find(service_id); |
| if (group_streams_entry == group_streams_.end()) |
| return; |
| if (is_reconnect) { |
| group_streams_entry->second->CancelConnectionOpenRequest(request_id); |
| } else { |
| group_streams_entry->second->CancelStartRequest(request_id); |
| } |
| } |
| |
| void Controller::OnStarted() {} |
| void Controller::OnStopped() {} |
| void Controller::OnSuspended() {} |
| void Controller::OnSearching() {} |
| |
| void Controller::OnReceiverAdded(const ServiceInfo& info) { |
| receiver_endpoints_.emplace(info.service_id, info.v4_endpoint.port |
| ? info.v4_endpoint |
| : info.v6_endpoint); |
| auto group_streams = |
| std::make_unique<MessageGroupStreams>(this, info.service_id); |
| group_streams_[info.service_id] = std::move(group_streams); |
| availability_requester_->AddReceiver(info); |
| } |
| |
| void Controller::OnReceiverChanged(const ServiceInfo& info) { |
| receiver_endpoints_[info.service_id] = |
| info.v4_endpoint.port ? info.v4_endpoint : info.v6_endpoint; |
| availability_requester_->ChangeReceiver(info); |
| } |
| |
| void Controller::OnReceiverRemoved(const ServiceInfo& info) { |
| receiver_endpoints_.erase(info.service_id); |
| group_streams_.erase(info.service_id); |
| availability_requester_->RemoveReceiver(info); |
| } |
| |
| void Controller::OnAllReceiversRemoved() { |
| receiver_endpoints_.clear(); |
| availability_requester_->RemoveAllReceivers(); |
| } |
| |
| } // namespace osp |
| } // namespace openscreen |