blob: f2fb94f5f1f56cd1075242e11b02399543cdefdd [file] [log] [blame]
// Copyright (C) 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "icing/result/result-state-manager.h"
#include <memory>
#include <queue>
#include <utility>
#include "icing/result/page-result.h"
#include "icing/result/result-adjustment-info.h"
#include "icing/result/result-retriever-v2.h"
#include "icing/result/result-state-v2.h"
#include "icing/scoring/scored-document-hits-ranker.h"
#include "icing/util/clock.h"
#include "icing/util/logging.h"
#include "icing/util/status-macros.h"
namespace icing {
namespace lib {
ResultStateManager::ResultStateManager(int max_total_hits,
const DocumentStore& document_store,
const Clock* clock)
: document_store_(document_store),
max_total_hits_(max_total_hits),
num_total_hits_(0),
random_generator_(GetSteadyTimeNanoseconds()),
clock_(*clock) {}
libtextclassifier3::StatusOr<std::pair<uint64_t, PageResult>>
ResultStateManager::CacheAndRetrieveFirstPage(
std::unique_ptr<ScoredDocumentHitsRanker> ranker,
std::unique_ptr<ResultAdjustmentInfo> parent_adjustment_info,
std::unique_ptr<ResultAdjustmentInfo> child_adjustment_info,
const ResultSpecProto& result_spec, const DocumentStore& document_store,
const ResultRetrieverV2& result_retriever) {
if (ranker == nullptr) {
return absl_ports::InvalidArgumentError("Should not provide null ranker");
}
// Create shared pointer of ResultState.
// ResultState should be created by ResultStateManager only.
std::shared_ptr<ResultStateV2> result_state = std::make_shared<ResultStateV2>(
std::move(ranker), std::move(parent_adjustment_info),
std::move(child_adjustment_info), result_spec, document_store);
// Retrieve docs outside of ResultStateManager critical section.
// Will enter ResultState critical section inside ResultRetriever.
auto [page_result, has_more_results] =
result_retriever.RetrieveNextPage(*result_state);
if (!has_more_results) {
// No more pages, won't store ResultState, returns directly
return std::make_pair(kInvalidNextPageToken, std::move(page_result));
}
// ResultState has multiple pages, storing it
int num_hits_to_add = 0;
{
// ResultState critical section
absl_ports::unique_lock l(&result_state->mutex);
result_state->scored_document_hits_ranker->TruncateHitsTo(max_total_hits_);
result_state->RegisterNumTotalHits(&num_total_hits_);
num_hits_to_add = result_state->scored_document_hits_ranker->size();
}
// It is fine to exit ResultState critical section, since it is just created
// above and only this thread (this call stack) has access to it. Thus, it
// won't be changed during the gap before we enter ResultStateManager critical
// section.
uint64_t next_page_token = kInvalidNextPageToken;
{
// ResultStateManager critical section
absl_ports::unique_lock l(&mutex_);
// Remove expired result states first.
InternalInvalidateExpiredResultStates(kDefaultResultStateTtlInMs);
// Remove states to make room for this new state.
RemoveStatesIfNeeded(num_hits_to_add);
// Generate a new unique token and add it into result_state_map_.
next_page_token = Add(std::move(result_state));
}
return std::make_pair(next_page_token, std::move(page_result));
}
uint64_t ResultStateManager::Add(std::shared_ptr<ResultStateV2> result_state) {
uint64_t new_token = GetUniqueToken();
result_state_map_.emplace(new_token, std::move(result_state));
// Tracks the insertion order
token_queue_.push(
std::make_pair(new_token, clock_.GetSystemTimeMilliseconds()));
return new_token;
}
libtextclassifier3::StatusOr<std::pair<uint64_t, PageResult>>
ResultStateManager::GetNextPage(uint64_t next_page_token,
const ResultRetrieverV2& result_retriever) {
std::shared_ptr<ResultStateV2> result_state = nullptr;
{
// ResultStateManager critical section
absl_ports::unique_lock l(&mutex_);
// Remove expired result states before fetching
InternalInvalidateExpiredResultStates(kDefaultResultStateTtlInMs);
const auto& state_iterator = result_state_map_.find(next_page_token);
if (state_iterator == result_state_map_.end()) {
return absl_ports::NotFoundError("next_page_token not found");
}
result_state = state_iterator->second;
}
// Retrieve docs outside of ResultStateManager critical section.
// Will enter ResultState critical section inside ResultRetriever.
auto [page_result, has_more_results] =
result_retriever.RetrieveNextPage(*result_state);
if (!has_more_results) {
{
// ResultStateManager critical section
absl_ports::unique_lock l(&mutex_);
InternalInvalidateResultState(next_page_token);
}
next_page_token = kInvalidNextPageToken;
}
return std::make_pair(next_page_token, std::move(page_result));
}
void ResultStateManager::InvalidateResultState(uint64_t next_page_token) {
if (next_page_token == kInvalidNextPageToken) {
return;
}
absl_ports::unique_lock l(&mutex_);
InternalInvalidateResultState(next_page_token);
}
void ResultStateManager::InvalidateAllResultStates() {
absl_ports::unique_lock l(&mutex_);
InternalInvalidateAllResultStates();
}
void ResultStateManager::InternalInvalidateAllResultStates() {
// We don't have to reset num_total_hits_ (to 0) here, since clearing
// result_state_map_ will "eventually" invoke the destructor of ResultState
// (which decrements num_total_hits_) and num_total_hits_ will become 0.
result_state_map_.clear();
invalidated_token_set_.clear();
token_queue_ = std::queue<std::pair<uint64_t, int64_t>>();
}
uint64_t ResultStateManager::GetUniqueToken() {
uint64_t new_token = random_generator_();
// There's a small chance of collision between the random numbers, here we're
// trying to avoid any collisions by checking the keys.
while (result_state_map_.find(new_token) != result_state_map_.end() ||
invalidated_token_set_.find(new_token) !=
invalidated_token_set_.end() ||
new_token == kInvalidNextPageToken) {
new_token = random_generator_();
}
return new_token;
}
void ResultStateManager::RemoveStatesIfNeeded(int num_hits_to_add) {
if (result_state_map_.empty() || token_queue_.empty()) {
return;
}
// 1. Check if this new result_state would take up the entire result state
// manager budget.
if (num_hits_to_add > max_total_hits_) {
// This single result state will exceed our budget. Drop everything else to
// accomodate it.
InternalInvalidateAllResultStates();
return;
}
// 2. Remove any tokens that were previously invalidated.
while (!token_queue_.empty() &&
invalidated_token_set_.find(token_queue_.front().first) !=
invalidated_token_set_.end()) {
invalidated_token_set_.erase(token_queue_.front().first);
token_queue_.pop();
}
// 3. If we're over budget, remove states from oldest to newest until we fit
// into our budget.
// Note: num_total_hits_ may not be decremented immediately after invalidating
// a result state, since other threads may still hold the shared pointer.
// Thus, we have to check if token_queue_ is empty or not, since it is
// possible that num_total_hits_ is non-zero and still greater than
// max_total_hits_ when token_queue_ is empty. Still "eventually" it will be
// decremented after the last thread releases the shared pointer.
while (!token_queue_.empty() && num_total_hits_ > max_total_hits_) {
InternalInvalidateResultState(token_queue_.front().first);
token_queue_.pop();
}
invalidated_token_set_.clear();
}
void ResultStateManager::InternalInvalidateResultState(uint64_t token) {
// Removes the entry in result_state_map_ and insert the token into
// invalidated_token_set_. The entry in token_queue_ can't be easily removed
// right now (may need O(n) time), so we leave it there and later completely
// remove the token in RemoveStatesIfNeeded().
auto itr = result_state_map_.find(token);
if (itr != result_state_map_.end()) {
// We don't have to decrement num_total_hits_ here, since erasing the shared
// ptr instance will "eventually" invoke the destructor of ResultState and
// it will handle this.
result_state_map_.erase(itr);
invalidated_token_set_.insert(token);
}
}
void ResultStateManager::InternalInvalidateExpiredResultStates(
int64_t result_state_ttl) {
int64_t current_time = clock_.GetSystemTimeMilliseconds();
while (!token_queue_.empty() &&
current_time - token_queue_.front().second >= result_state_ttl) {
auto itr = result_state_map_.find(token_queue_.front().first);
if (itr != result_state_map_.end()) {
// We don't have to decrement num_total_hits_ here, since erasing the
// shared ptr instance will "eventually" invoke the destructor of
// ResultState and it will handle this.
result_state_map_.erase(itr);
} else {
// Since result_state_map_ and invalidated_token_set_ are mutually
// exclusive, we remove the token from invalidated_token_set_ only if it
// isn't present in result_state_map_.
invalidated_token_set_.erase(token_queue_.front().first);
}
token_queue_.pop();
}
}
} // namespace lib
} // namespace icing