blob: 78bd95835f1989bb4b3c09dba344cbd407cbdb18 [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/copresence/rpc/rpc_handler.h"
#include "base/bind.h"
#include "base/command_line.h"
#include "base/guid.h"
#include "base/logging.h"
#include "base/strings/string_util.h"
#include "base/strings/stringprintf.h"
// TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities
// to DeviceCapabilitiesW. This breaks the pb.h headers below. For now,
// we fix this with an #undef.
#include "base/time/time.h"
#if defined(OS_WIN)
#undef DeviceCapabilities
#endif
#include "components/copresence/copresence_switches.h"
#include "components/copresence/handlers/directive_handler.h"
#include "components/copresence/handlers/gcm_handler.h"
#include "components/copresence/proto/codes.pb.h"
#include "components/copresence/proto/data.pb.h"
#include "components/copresence/proto/rpcs.pb.h"
#include "components/copresence/public/copresence_constants.h"
#include "components/copresence/public/copresence_delegate.h"
#include "components/copresence/rpc/http_post.h"
#include "net/http/http_status_code.h"
// TODO(ckehoe): Return error messages for bad requests.
namespace copresence {
using google::protobuf::MessageLite;
using google::protobuf::RepeatedPtrField;
const char RpcHandler::kReportRequestRpcName[] = "report";
namespace {
const int kTokenLoggingSuffix = 5;
const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes.
const int kMaxInvalidTokens = 10000;
const char kRegisterDeviceRpcName[] = "registerdevice";
const char kDefaultCopresenceServer[] =
"https://www.googleapis.com/copresence/v2/copresence";
// UrlSafe is defined as:
// '/' represented by a '_' and '+' represented by a '-'
// TODO(rkc): Move this to the wrapper.
std::string ToUrlSafe(std::string token) {
base::ReplaceChars(token, "+", "-", &token);
base::ReplaceChars(token, "/", "_", &token);
return token;
}
// Logging
// Checks for a copresence error. If there is one, logs it and returns true.
bool IsErrorStatus(const Status& status) {
if (status.code() != OK) {
LOG(ERROR) << "Copresence error code " << status.code()
<< (status.message().empty() ? "" : ": " + status.message());
}
return status.code() != OK;
}
void LogIfErrorStatus(const util::error::Code& code,
const std::string& context) {
LOG_IF(ERROR, code != util::error::OK)
<< context << " error " << code << ". See "
<< "cs/google3/util/task/codes.proto for more info.";
}
// If any errors occurred, logs them and returns true.
bool ReportErrorLogged(const ReportResponse& response) {
bool result = IsErrorStatus(response.header().status());
// The Report fails or succeeds as a unit. If any responses had errors,
// the header will too. Thus we don't need to propagate individual errors.
if (response.has_update_signals_response())
LogIfErrorStatus(response.update_signals_response().status(), "Update");
if (response.has_manage_messages_response())
LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
if (response.has_manage_subscriptions_response()) {
LogIfErrorStatus(response.manage_subscriptions_response().status(),
"Subscribe");
}
return result;
}
const std::string LoggingStrForToken(const std::string& auth_token) {
if (auth_token.empty())
return "anonymous";
std::string token_suffix = auth_token.substr(
auth_token.length() - kTokenLoggingSuffix, kTokenLoggingSuffix);
return base::StringPrintf("token ...%s", token_suffix.c_str());
}
// Request construction
// TODO(ckehoe): Move these into a separate file?
template <typename T>
BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
if (msg.has_token_exchange_strategy() &&
msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
return msg.token_exchange_strategy().broadcast_scan_configuration();
}
return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
}
scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
scoped_ptr<DeviceState> state(new DeviceState);
TokenTechnology* ultrasound =
state->mutable_capabilities()->add_token_technology();
ultrasound->set_medium(AUDIO_ULTRASOUND_PASSBAND);
ultrasound->add_instruction_type(TRANSMIT);
ultrasound->add_instruction_type(RECEIVE);
TokenTechnology* audible =
state->mutable_capabilities()->add_token_technology();
audible->set_medium(AUDIO_AUDIBLE_DTMF);
audible->add_instruction_type(TRANSMIT);
audible->add_instruction_type(RECEIVE);
return state.Pass();
}
// TODO(ckehoe): We're keeping this code in a separate function for now
// because we get a version string from Chrome, but the proto expects
// an int64 version. We should probably change the version proto
// to handle a more detailed version.
ClientVersion* CreateVersion(const std::string& client,
const std::string& version_name) {
ClientVersion* version = new ClientVersion;
version->set_client(client);
version->set_version_name(version_name);
return version;
}
void AddTokenToRequest(const AudioToken& token, ReportRequest* request) {
TokenObservation* token_observation =
request->mutable_update_signals_request()->add_token_observation();
token_observation->set_token_id(ToUrlSafe(token.token));
TokenSignals* signals = token_observation->add_signals();
signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF
: AUDIO_ULTRASOUND_PASSBAND);
signals->set_observed_time_millis(base::Time::Now().ToJsTime());
}
} // namespace
// Public functions.
RpcHandler::RpcHandler(CopresenceDelegate* delegate,
DirectiveHandler* directive_handler,
GCMHandler* gcm_handler,
const PostCallback& server_post_callback)
: delegate_(delegate),
directive_handler_(directive_handler),
gcm_handler_(gcm_handler),
server_post_callback_(server_post_callback),
invalid_audio_token_cache_(
base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
kMaxInvalidTokens) {
DCHECK(delegate_);
DCHECK(directive_handler_);
// |gcm_handler_| is optional.
if (server_post_callback_.is_null()) {
server_post_callback_ =
base::Bind(&RpcHandler::SendHttpPost, base::Unretained(this));
}
if (gcm_handler_) {
gcm_handler_->GetGcmId(
base::Bind(&RpcHandler::RegisterGcmId, base::Unretained(this)));
}
}
RpcHandler::~RpcHandler() {
// Do not use |directive_handler_| or |gcm_handler_| here.
// They will already have been destructed.
for (HttpPost* post : pending_posts_)
delete post;
}
void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
const std::string& app_id,
const std::string& auth_token,
const StatusCallback& status_callback) {
DCHECK(request.get());
// Check that we have a "device" registered for this auth token.
bool queue_request = true;
const auto& registration = device_id_by_auth_token_.find(auth_token);
if (registration == device_id_by_auth_token_.end()) {
// Not registered.
RegisterForToken(auth_token);
} else if (!registration->second.empty()) {
// Registration complete.
queue_request = false;
}
// We're not registered, or registration is in progress.
if (queue_request) {
pending_requests_queue_.push_back(new PendingRequest(
request.Pass(), app_id, auth_token, status_callback));
return;
}
DVLOG(3) << "Sending ReportRequest to server.";
// If we are unpublishing or unsubscribing, we need to stop those publish or
// subscribes right away, we don't need to wait for the server to tell us.
ProcessRemovedOperations(*request);
request->mutable_update_signals_request()->set_allocated_state(
GetDeviceCapabilities(*request).release());
AddPlayingTokens(request.get());
SendServerRequest(kReportRequestRpcName,
registration->second,
app_id,
auth_token,
request.Pass(),
// On destruction, this request will be cancelled.
base::Bind(&RpcHandler::ReportResponseHandler,
base::Unretained(this),
status_callback));
}
void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
DCHECK(!tokens.empty());
if (device_id_by_auth_token_.empty()) {
VLOG(2) << "Skipping token reporting because no device IDs are registered";
return;
}
// Construct the ReportRequest.
ReportRequest request;
for (const AudioToken& token : tokens) {
if (invalid_audio_token_cache_.HasKey(ToUrlSafe(token.token)))
continue;
DVLOG(3) << "Sending token " << token.token << " to server under "
<< device_id_by_auth_token_.size() << " device ID(s)";
AddTokenToRequest(token, &request);
}
// Report under all active tokens.
for (const auto& registration : device_id_by_auth_token_) {
SendReportRequest(make_scoped_ptr(new ReportRequest(request)),
registration.first);
}
}
// Private functions.
RpcHandler::PendingRequest::PendingRequest(scoped_ptr<ReportRequest> report,
const std::string& app_id,
const std::string& auth_token,
const StatusCallback& callback)
: report(report.Pass()),
app_id(app_id),
auth_token(auth_token),
callback(callback) {}
RpcHandler::PendingRequest::~PendingRequest() {}
void RpcHandler::RegisterForToken(const std::string& auth_token) {
DVLOG(2) << "Sending " << LoggingStrForToken(auth_token)
<< " registration to server.";
scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
// Add a GCM ID for authenticated registration, if we have one.
if (auth_token.empty() || gcm_id_.empty()) {
request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
} else {
DVLOG(2) << "Registering GCM ID with " << LoggingStrForToken(auth_token);
request->mutable_push_service()->set_service(GCM);
request->mutable_push_service()->mutable_gcm_registration()
->set_device_token(gcm_id_);
}
// Only identify as a Chrome device if we're in anonymous mode.
// Authenticated calls come from a "GAIA device".
if (auth_token.empty()) {
Identity* identity =
request->mutable_device_identifiers()->mutable_registrant();
identity->set_type(CHROME);
identity->set_chrome_id(base::GenerateGUID());
// Since we're generating a new "Chrome ID" here,
// we need to make sure this isn't a duplicate registration.
DCHECK_EQ(0u, device_id_by_auth_token_.count(std::string()))
<< "Attempted anonymous re-registration";
}
bool gcm_pending = !auth_token.empty() && gcm_handler_ && gcm_id_.empty();
SendServerRequest(
kRegisterDeviceRpcName,
// This will have the side effect of populating an empty device ID
// for this auth token in the map. This is what we want,
// to mark registration as being in progress.
device_id_by_auth_token_[auth_token],
std::string(), // app ID
auth_token,
request.Pass(),
base::Bind(&RpcHandler::RegisterResponseHandler,
// On destruction, this request will be cancelled.
base::Unretained(this),
auth_token,
gcm_pending));
}
void RpcHandler::ProcessQueuedRequests(const std::string& auth_token) {
// Track requests that are not on this auth token.
ScopedVector<PendingRequest> still_pending_requests;
// If there is no device ID for this auth token, registration failed.
bool registration_failed =
(device_id_by_auth_token_.count(auth_token) == 0);
// We momentarily take ownership of all the pointers in the queue.
// They are either deleted here or passed on to a new queue.
for (PendingRequest* request : pending_requests_queue_) {
if (request->auth_token == auth_token) {
if (registration_failed) {
request->callback.Run(FAIL);
} else {
SendReportRequest(request->report.Pass(),
request->app_id,
request->auth_token,
request->callback);
}
delete request;
} else {
// The request is on a different auth token.
still_pending_requests.push_back(request);
}
}
// Only keep the requests that weren't processed.
// All the pointers in the queue are now spoken for.
pending_requests_queue_.weak_clear();
pending_requests_queue_ = still_pending_requests.Pass();
}
void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
const std::string& auth_token) {
SendReportRequest(request.Pass(),
std::string(),
auth_token,
StatusCallback());
}
// Store a GCM ID and send it to the server if needed. The constructor passes
// this callback to the GCMHandler to receive the ID whenever it's ready.
// It may be returned immediately, if the ID is cached, or require a server
// round-trip. This ID must then be passed along to the copresence server.
// There are a few ways this can happen for each auth token:
//
// 1. The GCM ID is available when we first register, and is passed along
// with the RegisterDeviceRequest.
//
// 2. The GCM ID becomes available after the RegisterDeviceRequest has
// completed. Then the loop in this function will invoke RegisterForToken()
// again to pass on the ID.
//
// 3. The GCM ID becomes available after the RegisterDeviceRequest is sent,
// but before it completes. In this case, the gcm_pending flag is passed
// through to the RegisterResponseHandler, which invokes RegisterForToken()
// again to pass on the ID. The loop here must skip pending registrations,
// as the device ID will be empty.
//
// TODO(ckehoe): Add tests for these scenarios.
void RpcHandler::RegisterGcmId(const std::string& gcm_id) {
gcm_id_ = gcm_id;
if (!gcm_id.empty()) {
for (const auto& registration : device_id_by_auth_token_) {
const std::string& auth_token = registration.first;
const std::string& device_id = registration.second;
if (!auth_token.empty() && !device_id.empty())
RegisterForToken(auth_token);
}
}
}
void RpcHandler::RegisterResponseHandler(
const std::string& auth_token,
bool gcm_pending,
HttpPost* completed_post,
int http_status_code,
const std::string& response_data) {
if (completed_post) {
int elements_erased = pending_posts_.erase(completed_post);
DCHECK_GT(elements_erased, 0);
delete completed_post;
}
// Registration is no longer in progress.
// If it was successful, we'll update below.
device_id_by_auth_token_.erase(auth_token);
RegisterDeviceResponse response;
if (http_status_code != net::HTTP_OK) {
// TODO(ckehoe): Retry registration if appropriate.
LOG(ERROR) << LoggingStrForToken(auth_token)
<< " device registration failed";
} else if (!response.ParseFromString(response_data)) {
LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
} else if (!IsErrorStatus(response.header().status())) {
const std::string& device_id = response.registered_device_id();
DCHECK(!device_id.empty());
device_id_by_auth_token_[auth_token] = device_id;
DVLOG(2) << LoggingStrForToken(auth_token)
<< " device registration successful. Id: " << device_id;
// If we have a GCM ID now, and didn't before, pass it on to the server.
if (gcm_pending && !gcm_id_.empty())
RegisterForToken(auth_token);
}
// Send or fail requests on this auth token.
ProcessQueuedRequests(auth_token);
}
void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
HttpPost* completed_post,
int http_status_code,
const std::string& response_data) {
if (completed_post) {
int elements_erased = pending_posts_.erase(completed_post);
DCHECK(elements_erased);
delete completed_post;
}
if (http_status_code != net::HTTP_OK) {
if (!status_callback.is_null())
status_callback.Run(FAIL);
return;
}
DVLOG(3) << "Received ReportResponse.";
ReportResponse response;
if (!response.ParseFromString(response_data)) {
LOG(ERROR) << "Invalid ReportResponse";
if (!status_callback.is_null())
status_callback.Run(FAIL);
return;
}
if (ReportErrorLogged(response)) {
if (!status_callback.is_null())
status_callback.Run(FAIL);
return;
}
for (const MessageResult& result :
response.manage_messages_response().published_message_result()) {
DVLOG(2) << "Published message with id " << result.published_message_id();
}
for (const SubscriptionResult& result :
response.manage_subscriptions_response().subscription_result()) {
DVLOG(2) << "Created subscription with id " << result.subscription_id();
}
if (response.has_update_signals_response()) {
const UpdateSignalsResponse& update_response =
response.update_signals_response();
DispatchMessages(update_response.message());
for (const Directive& directive : update_response.directive())
directive_handler_->AddDirective(directive);
for (const Token& token : update_response.token()) {
switch (token.status()) {
case VALID:
// TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
// short TTL (like 10s) and send it up with every report request.
// Then we'll still get messages while we're waiting to hear it again.
VLOG(1) << "Got valid token " << token.id();
break;
case INVALID:
DVLOG(3) << "Discarding invalid token " << token.id();
invalid_audio_token_cache_.Add(token.id(), true);
break;
default:
DVLOG(2) << "Token " << token.id() << " has status code "
<< token.status();
}
}
}
// TODO(ckehoe): Return a more detailed status response.
if (!status_callback.is_null())
status_callback.Run(SUCCESS);
}
void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
// Remove unpublishes.
if (request.has_manage_messages_request()) {
for (const std::string& unpublish :
request.manage_messages_request().id_to_unpublish()) {
directive_handler_->RemoveDirectives(unpublish);
}
}
// Remove unsubscribes.
if (request.has_manage_subscriptions_request()) {
for (const std::string& unsubscribe :
request.manage_subscriptions_request().id_to_unsubscribe()) {
directive_handler_->RemoveDirectives(unsubscribe);
}
}
}
void RpcHandler::AddPlayingTokens(ReportRequest* request) {
const std::string& audible_token =
directive_handler_->GetCurrentAudioToken(AUDIBLE);
const std::string& inaudible_token =
directive_handler_->GetCurrentAudioToken(INAUDIBLE);
if (!audible_token.empty())
AddTokenToRequest(AudioToken(audible_token, true), request);
if (!inaudible_token.empty())
AddTokenToRequest(AudioToken(inaudible_token, false), request);
}
void RpcHandler::DispatchMessages(
const RepeatedPtrField<SubscribedMessage>& messages) {
if (messages.size() == 0)
return;
// Index the messages by subscription id.
std::map<std::string, std::vector<Message>> messages_by_subscription;
DVLOG(3) << "Dispatching " << messages.size() << " messages";
for (const SubscribedMessage& message : messages) {
for (const std::string& subscription_id : message.subscription_id()) {
messages_by_subscription[subscription_id].push_back(
message.published_message());
}
}
// Send the messages for each subscription.
for (const auto& map_entry : messages_by_subscription) {
// TODO(ckehoe): Once we have the app ID from the server, we need to pass
// it in here and get rid of the app id registry from the main API class.
const std::string& subscription = map_entry.first;
const std::vector<Message>& messages = map_entry.second;
delegate_->HandleMessages(std::string(), subscription, messages);
}
}
// TODO(ckehoe): Pass in the version string and
// group this with the local functions up top.
RequestHeader* RpcHandler::CreateRequestHeader(
const std::string& client_name,
const std::string& device_id) const {
RequestHeader* header = new RequestHeader;
header->set_allocated_framework_version(CreateVersion(
"Chrome", delegate_->GetPlatformVersionString()));
if (!client_name.empty()) {
header->set_allocated_client_version(
CreateVersion(client_name, std::string()));
}
header->set_current_time_millis(base::Time::Now().ToJsTime());
if (!device_id.empty())
header->set_registered_device_id(device_id);
DeviceFingerprint* fingerprint = new DeviceFingerprint;
fingerprint->set_platform_version(delegate_->GetPlatformVersionString());
fingerprint->set_type(CHROME_PLATFORM_TYPE);
header->set_allocated_device_fingerprint(fingerprint);
return header;
}
template <class T>
void RpcHandler::SendServerRequest(
const std::string& rpc_name,
const std::string& device_id,
const std::string& app_id,
const std::string& auth_token,
scoped_ptr<T> request,
const PostCleanupCallback& response_handler) {
request->set_allocated_header(CreateRequestHeader(app_id, device_id));
server_post_callback_.Run(delegate_->GetRequestContext(),
rpc_name,
delegate_->GetAPIKey(app_id),
auth_token,
make_scoped_ptr<MessageLite>(request.release()),
response_handler);
}
void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
const std::string& rpc_name,
const std::string& api_key,
const std::string& auth_token,
scoped_ptr<MessageLite> request_proto,
const PostCleanupCallback& callback) {
// Create the base URL to call.
CommandLine* command_line = CommandLine::ForCurrentProcess();
const std::string copresence_server_host =
command_line->HasSwitch(switches::kCopresenceServer) ?
command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
kDefaultCopresenceServer;
// Create the request and keep a pointer until it completes.
HttpPost* http_post = new HttpPost(
url_context_getter,
copresence_server_host,
rpc_name,
api_key,
auth_token,
command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken),
*request_proto);
http_post->Start(base::Bind(callback, http_post));
pending_posts_.insert(http_post);
}
} // namespace copresence