blob: d6aa22f2b38e3968661e42d113b4dbbc5fab051a [file] [log] [blame]
/*
* Copyright (C) 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/bigtrace/orchestrator_impl.h"
#include <memory>
#include <optional>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include "perfetto/base/status.h"
#include "perfetto/base/task_runner.h"
#include "perfetto/ext/base/flat_hash_map.h"
#include "perfetto/ext/base/periodic_task.h"
#include "perfetto/ext/base/status_or.h"
#include "perfetto/ext/base/threading/future.h"
#include "perfetto/ext/base/threading/poll.h"
#include "perfetto/ext/base/threading/spawn.h"
#include "perfetto/ext/base/threading/stream.h"
#include "perfetto/ext/bigtrace/orchestrator.h"
#include "perfetto/ext/bigtrace/worker.h"
#include "protos/perfetto/bigtrace/orchestrator.pb.h"
#include "protos/perfetto/bigtrace/worker.pb.h"
#include "src/trace_processor/util/status_macros.h"
namespace perfetto {
namespace bigtrace {
namespace {
base::Future<base::StatusOr<protos::TracePoolQueryResponse>>
RpcResponseToPoolResponse(base::StatusOr<protos::QueryTraceResponse> resp) {
RETURN_IF_ERROR(resp.status());
protos::TracePoolQueryResponse ret;
ret.set_trace(std::move(resp->trace()));
*ret.mutable_result() = std::move(*resp->mutable_result());
return ret;
}
// The period of sync of state from the orchestrator to all the workers. This
// constant trades freshness (i.e. lower period) vs unnecessary work (i.e.
// higher period). 15s seems an acceptable number even for interactive trace
// loads.
static constexpr uint32_t kDefaultWorkerSyncPeriod = 15000;
} // namespace
Orchestrator::~Orchestrator() = default;
std::unique_ptr<Orchestrator> Orchestrator::CreateInProcess(
base::TaskRunner* task_runner,
std::vector<std::unique_ptr<Worker>> workers) {
return std::unique_ptr<Orchestrator>(
new OrchestratorImpl(task_runner, std::move(workers)));
}
OrchestratorImpl::OrchestratorImpl(base::TaskRunner* task_runner,
std::vector<std::unique_ptr<Worker>> workers)
: task_runner_(task_runner),
periodic_sync_task_(task_runner),
workers_(std::move(workers)) {
base::PeriodicTask::Args args;
args.task = [this] { ExecuteSyncWorkers(); };
args.period_ms = kDefaultWorkerSyncPeriod;
args.start_first_task_immediately = true;
periodic_sync_task_.Start(std::move(args));
}
base::StatusOrFuture<protos::TracePoolCreateResponse>
OrchestratorImpl::TracePoolCreate(const protos::TracePoolCreateArgs& args) {
if (!args.has_pool_name()) {
return base::StatusOr<protos::TracePoolCreateResponse>(
base::ErrStatus("Pool name must be provided"));
}
std::string id = "stateless:" + args.pool_name();
if (auto it_inserted = pools_.Insert(id, TracePool()); !it_inserted.second) {
return base::StatusOr<protos::TracePoolCreateResponse>(
base::ErrStatus("Pool '%s' already exists", id.c_str()));
}
return protos::TracePoolCreateResponse();
}
base::StatusOrFuture<protos::TracePoolSetTracesResponse>
OrchestratorImpl::TracePoolSetTraces(
const protos::TracePoolSetTracesArgs& args) {
std::string id = args.pool_id();
TracePool* pool = pools_.Find(id);
if (!pool) {
return base::StatusOr<protos::TracePoolSetTracesResponse>(
base::ErrStatus("Unable to find pool %s", id.c_str()));
}
if (!pool->traces.empty()) {
return base::StatusOr<protos::TracePoolSetTracesResponse>(base::ErrStatus(
"Incrementally adding/removing items to pool not currently supported"));
}
pool->traces.assign(args.traces().begin(), args.traces().end());
uint32_t round_robin_worker_idx = 0;
for (const std::string& trace_path : pool->traces) {
auto it_and_inserted = traces_.Insert(trace_path, Trace());
it_and_inserted.first->refcount++;
if (it_and_inserted.second) {
it_and_inserted.first->worker = workers_[round_robin_worker_idx].get();
// Set the worker index to the next worker in a round-robin fashion.
round_robin_worker_idx = (round_robin_worker_idx + 1) % workers_.size();
} else {
PERFETTO_CHECK(it_and_inserted.first);
}
}
return protos::TracePoolSetTracesResponse();
}
base::StatusOrStream<protos::TracePoolQueryResponse>
OrchestratorImpl::TracePoolQuery(const protos::TracePoolQueryArgs& args) {
TracePool* pool = pools_.Find(args.pool_id());
if (!pool) {
return base::StreamOf(base::StatusOr<protos::TracePoolQueryResponse>(
base::ErrStatus("Unable to find pool %s", args.pool_id().c_str())));
}
std::vector<base::StatusOrStream<protos::QueryTraceResponse>> streams;
protos::QueryTraceArgs query_args;
*query_args.mutable_sql_query() = args.sql_query();
for (const std::string& trace_path : pool->traces) {
auto* trace = traces_.Find(trace_path);
*query_args.mutable_trace() = trace_path;
streams.emplace_back(trace->worker->QueryTrace(query_args));
}
return base::FlattenStreams(std::move(streams))
.MapFuture(&RpcResponseToPoolResponse);
}
base::StatusOrFuture<protos::TracePoolDestroyResponse>
OrchestratorImpl::TracePoolDestroy(const protos::TracePoolDestroyArgs& args) {
std::string id = args.pool_id();
TracePool* pool = pools_.Find(id);
if (!pool) {
return base::StatusOr<protos::TracePoolDestroyResponse>(
base::ErrStatus("Unable to find pool %s", id.c_str()));
}
std::unordered_set<std::string> to_erase;
for (auto it = traces_.GetIterator(); it; ++it) {
PERFETTO_CHECK(it.value().refcount-- > 0);
if (it.value().refcount == 0) {
to_erase.emplace(it.key());
}
}
for (const std::string& trace_path : to_erase) {
traces_.Erase(trace_path);
}
PERFETTO_CHECK(pools_.Erase(id));
return protos::TracePoolDestroyResponse();
}
void OrchestratorImpl::ExecuteSyncWorkers() {
if (periodic_sync_handle_) {
return;
}
periodic_sync_handle_ = base::SpawnFuture(task_runner_, [this]() {
return SyncWorkers().ContinueWith([this](base::Status status) {
if (!status.ok()) {
PERFETTO_ELOG("%s", status.c_message());
}
periodic_sync_handle_ = std::nullopt;
return base::Future<base::FVoid>(base::FVoid());
});
});
}
void OrchestratorImpl::ExecuteForceSyncWorkers() {
// Destroy the sync handle to cancel any currently running sync.
periodic_sync_handle_ = std::nullopt;
ExecuteSyncWorkers();
}
base::StatusFuture OrchestratorImpl::SyncWorkers() {
std::vector<base::StatusOrStream<protos::SyncTraceStateResponse>> streams;
base::FlatHashMap<Worker*, std::vector<std::string>> worker_to_traces;
for (auto it = traces_.GetIterator(); it; ++it) {
auto it_and_inserted = worker_to_traces.Insert(it.value().worker, {});
it_and_inserted.first->emplace_back(it.key());
}
for (auto& worker : workers_) {
auto* traces = worker_to_traces.Find(worker.get());
if (!traces) {
continue;
}
protos::SyncTraceStateArgs args;
for (const auto& trace : *traces) {
args.add_traces(trace);
}
streams.push_back(worker->SyncTraceState(std::move(args)));
}
return base::FlattenStreams(std::move(streams))
.MapFuture([](base::StatusOr<protos::SyncTraceStateResponse> resp) {
return base::StatusFuture(resp.status());
})
.Collect(base::AllOkCollector());
}
} // namespace bigtrace
} // namespace perfetto