blob: 5093e5285da77e9bd9fc0e44eed953123b7cea1c [file] [log] [blame]
/*
* Copyright (C) 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/cloud_trace_processor/trace_processor_wrapper.h"
#include <atomic>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <utility>
#include <vector>
#include "perfetto/base/status.h"
#include "perfetto/ext/base/file_utils.h"
#include "perfetto/ext/base/status_or.h"
#include "perfetto/ext/base/threading/future.h"
#include "perfetto/ext/base/threading/poll.h"
#include "perfetto/ext/base/threading/stream.h"
#include "perfetto/ext/base/threading/thread_pool.h"
#include "perfetto/ext/base/threading/util.h"
#include "perfetto/protozero/proto_utils.h"
#include "perfetto/protozero/scattered_heap_buffer.h"
#include "perfetto/trace_processor/trace_blob.h"
#include "perfetto/trace_processor/trace_blob_view.h"
#include "perfetto/trace_processor/trace_processor.h"
#include "protos/perfetto/cloud_trace_processor/worker.pb.h"
#include "src/protozero/proto_ring_buffer.h"
#include "src/trace_processor/rpc/query_result_serializer.h"
#include "src/trace_processor/util/status_macros.h"
namespace perfetto {
namespace cloud_trace_processor {
namespace {
using trace_processor::QueryResultSerializer;
using trace_processor::TraceBlob;
using trace_processor::TraceBlobView;
using trace_processor::TraceProcessor;
using Statefulness = TraceProcessorWrapper::Statefulness;
struct QueryRunner {
QueryRunner(std::shared_ptr<TraceProcessor> _tp,
std::string _query,
std::string _trace_path,
Statefulness _statefulness)
: tp(std::move(_tp)),
query(std::move(_query)),
trace_path(std::move(_trace_path)),
statefulness(_statefulness) {}
std::optional<protos::TracePoolShardQueryResponse> operator()() {
if (!has_more) {
if (statefulness == Statefulness::kStateless) {
tp->RestoreInitialTables();
}
return std::nullopt;
}
// If the serializer does not exist yet, that means we have not yet run
// the query so make sure to do that first.
EnsureSerializerExists();
has_more = serializer->Serialize(&result);
protos::TracePoolShardQueryResponse resp;
*resp.mutable_trace() = trace_path;
resp.mutable_result()->ParseFromArray(result.data(),
static_cast<int>(result.size()));
result.clear();
return std::make_optional(std::move(resp));
}
void EnsureSerializerExists() {
if (serializer) {
return;
}
auto it = tp->ExecuteQuery(query);
serializer.reset(new QueryResultSerializer(std::move(it)));
}
std::shared_ptr<TraceProcessor> tp;
std::string query;
std::string trace_path;
TraceProcessorWrapper::Statefulness statefulness;
// shared_ptr to allow copying when this type is coerced to std::function.
std::shared_ptr<QueryResultSerializer> serializer;
std::vector<uint8_t> result;
bool has_more = true;
};
} // namespace
TraceProcessorWrapper::TraceProcessorWrapper(std::string trace_path,
base::ThreadPool* thread_pool,
Statefulness statefulness)
: trace_path_(std::move(trace_path)),
thread_pool_(thread_pool),
statefulness_(statefulness) {
trace_processor::Config config;
config.ingest_ftrace_in_raw_table = false;
trace_processor_ = TraceProcessor::CreateInstance(config);
}
base::StatusFuture TraceProcessorWrapper::LoadTrace(
base::StatusOrStream<std::vector<uint8_t>> file_stream) {
if (trace_processor_.use_count() != 1) {
return base::ErrStatus("Request is already in flight");
}
return std::move(file_stream)
.MapFuture(
[this](base::StatusOr<std::vector<uint8_t>> d) -> base::StatusFuture {
RETURN_IF_ERROR(d.status());
return base::RunOnceOnThreadPool<base::Status>(
thread_pool_, [res = std::move(*d), tp = trace_processor_] {
return tp->Parse(TraceBlobView(
TraceBlob::CopyFrom(res.data(), res.size())));
});
})
.Collect(base::AllOkCollector())
.ContinueWith([this](base::Status status) -> base::StatusFuture {
RETURN_IF_ERROR(status);
return base::RunOnceOnThreadPool<base::Status>(
thread_pool_, [tp = trace_processor_] {
tp->NotifyEndOfFile();
return base::OkStatus();
});
});
}
base::StatusOrStream<protos::TracePoolShardQueryResponse>
TraceProcessorWrapper::Query(const std::string& query) {
using StatusOrResponse = base::StatusOr<protos::TracePoolShardQueryResponse>;
if (trace_processor_.use_count() != 1) {
return base::StreamOf<StatusOrResponse>(
base::ErrStatus("Request is already in flight"));
}
return base::RunOnThreadPool<StatusOrResponse>(
thread_pool_,
QueryRunner(trace_processor_, query, trace_path_, statefulness_));
}
} // namespace cloud_trace_processor
} // namespace perfetto