blob: 7df33a6317582d5eb6c5136d4fb40f4e65806abb [file] [log] [blame]
/*
* Copyright (C) 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef SRC_TRACE_PROCESSOR_TRACE_SORTER_H_
#define SRC_TRACE_PROCESSOR_TRACE_SORTER_H_
#include <algorithm>
#include <memory>
#include <utility>
#include <vector>
#include "perfetto/ext/base/circular_queue.h"
#include "perfetto/trace_processor/basic_types.h"
#include "perfetto/trace_processor/trace_blob_view.h"
#include "src/trace_processor/timestamped_trace_piece.h"
namespace perfetto {
namespace trace_processor {
class PacketSequenceState;
struct SystraceLine;
// This class takes care of sorting events parsed from the trace stream in
// arbitrary order and pushing them to the next pipeline stages (parsing) in
// order. In order to support streaming use-cases, sorting happens within a
// window.
//
// Events are held in the TraceSorter staging area (events_) until either:
// 1. We can determine that it's safe to extract events by observing
// TracingServiceEvent Flush and ReadBuffer events
// 2. The trace EOF is reached
//
// Incremental extraction
//
// Incremental extraction happens by using a combination of flush and read
// buffer events from the tracing service. Note that incremental extraction
// is only applicable for write_into_file traces; ring-buffer traces will
// be sorted fully in-memory implicitly because there is only a single read
// buffer call at the end.
//
// The algorithm for incremental extraction is explained in detail at
// go/trace-sorting-is-complicated.
//
// Sorting algorithm
//
// The sorting algorithm is designed around the assumption that:
// - Most events come from ftrace.
// - Ftrace events are sorted within each cpu most of the times.
//
// Due to this, this class is oprerates as a streaming merge-sort of N+1 queues
// (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if
// necessary) before proceeding with the global merge-sort-extract.
//
// When an event is pushed through, it is just appended to the end of one of
// the N queues. While appending, we keep track of the fact that the queue
// is still ordered or just lost ordering. When an out-of-order event is
// detected on a queue we keep track of: (1) the offset within the queue where
// the chaos begun, (2) the timestamp that broke the ordering.
//
// When we decide to extract events from the queues into the next stages of
// the trace processor, we re-sort the events in the queue. Rather than
// re-sorting everything all the times, we use the above knowledge to restrict
// sorting to the (hopefully smaller) tail of the |events_| staging area.
// At any time, the first partition of |events_| [0 .. sort_start_idx_) is
// ordered, and the second partition [sort_start_idx_.. end] is not.
// We use a logarithmic bound search operation to figure out what is the index
// within the first partition where sorting should start, and sort all events
// from there to the end.
class TraceSorter {
public:
enum class SortingMode {
kDefault,
kFullSort,
};
TraceSorter(TraceProcessorContext* context,
std::unique_ptr<TraceParser> parser,
SortingMode);
inline void PushTracePacket(int64_t timestamp,
PacketSequenceState* state,
TraceBlobView packet) {
AppendNonFtraceEvent(TimestampedTracePiece(timestamp, packet_idx_++,
std::move(packet),
state->current_generation()));
}
inline void PushJsonValue(int64_t timestamp, std::string json_value) {
AppendNonFtraceEvent(
TimestampedTracePiece(timestamp, packet_idx_++, std::move(json_value)));
}
inline void PushFuchsiaRecord(int64_t timestamp,
std::unique_ptr<FuchsiaRecord> record) {
AppendNonFtraceEvent(
TimestampedTracePiece(timestamp, packet_idx_++, std::move(record)));
}
inline void PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line) {
int64_t timestamp = systrace_line->ts;
AppendNonFtraceEvent(TimestampedTracePiece(timestamp, packet_idx_++,
std::move(systrace_line)));
}
inline void PushTrackEventPacket(int64_t timestamp,
std::unique_ptr<TrackEventData> data) {
AppendNonFtraceEvent(
TimestampedTracePiece(timestamp, packet_idx_++, std::move(data)));
}
inline void PushFtraceEvent(uint32_t cpu,
int64_t timestamp,
TraceBlobView event,
PacketSequenceState* state) {
auto* queue = GetQueue(cpu + 1);
queue->Append(TimestampedTracePiece(
timestamp, packet_idx_++,
FtraceEventData{std::move(event), state->current_generation()}));
UpdateGlobalTs(queue);
}
inline void PushInlineFtraceEvent(uint32_t cpu,
int64_t timestamp,
InlineSchedSwitch inline_sched_switch) {
// TODO(rsavitski): if a trace has a mix of normal & "compact" events (being
// pushed through this function), the ftrace batches will no longer be fully
// sorted by timestamp. In such situations, we will have to sort at the end
// of the batch. We can do better as both sub-sequences are sorted however.
// Consider adding extra queues, or pushing them in a merge-sort fashion
// instead.
auto* queue = GetQueue(cpu + 1);
queue->Append(
TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_switch));
UpdateGlobalTs(queue);
}
inline void PushInlineFtraceEvent(uint32_t cpu,
int64_t timestamp,
InlineSchedWaking inline_sched_waking) {
auto* queue = GetQueue(cpu + 1);
queue->Append(
TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_waking));
UpdateGlobalTs(queue);
}
void ExtractEventsForced() {
SortAndExtractEventsUntilPacket(packet_idx_);
queues_.resize(0);
packet_idx_for_extraction_ = packet_idx_;
flushes_since_extraction_ = 0;
}
void NotifyFlushEvent() { flushes_since_extraction_++; }
void NotifyReadBufferEvent() {
if (sorting_mode_ == SortingMode::kFullSort ||
flushes_since_extraction_ < 2) {
return;
}
SortAndExtractEventsUntilPacket(packet_idx_for_extraction_);
packet_idx_for_extraction_ = packet_idx_;
flushes_since_extraction_ = 0;
}
int64_t max_timestamp() const { return global_max_ts_; }
private:
static constexpr uint32_t kNoBatch = std::numeric_limits<uint32_t>::max();
struct Queue {
inline void Append(TimestampedTracePiece ttp) {
const int64_t timestamp = ttp.timestamp;
events_.emplace_back(std::move(ttp));
min_ts_ = std::min(min_ts_, timestamp);
// Events are often seen in order.
if (PERFETTO_LIKELY(timestamp >= max_ts_)) {
max_ts_ = timestamp;
} else {
// The event is breaking ordering. The first time it happens, keep
// track of which index we are at. We know that everything before that
// is sorted (because events were pushed monotonically). Everything
// after that index, instead, will need a sorting pass before moving
// events to the next pipeline stage.
if (sort_start_idx_ == 0) {
PERFETTO_DCHECK(events_.size() >= 2);
sort_start_idx_ = events_.size() - 1;
sort_min_ts_ = timestamp;
} else {
sort_min_ts_ = std::min(sort_min_ts_, timestamp);
}
}
PERFETTO_DCHECK(min_ts_ <= max_ts_);
}
bool needs_sorting() const { return sort_start_idx_ != 0; }
void Sort();
base::CircularQueue<TimestampedTracePiece> events_;
int64_t min_ts_ = std::numeric_limits<int64_t>::max();
int64_t max_ts_ = 0;
size_t sort_start_idx_ = 0;
int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max();
};
void SortAndExtractEventsUntilPacket(uint64_t limit_packet_idx);
inline Queue* GetQueue(size_t index) {
if (PERFETTO_UNLIKELY(index >= queues_.size()))
queues_.resize(index + 1);
return &queues_[index];
}
inline void AppendNonFtraceEvent(TimestampedTracePiece ttp) {
Queue* queue = GetQueue(0);
queue->Append(std::move(ttp));
UpdateGlobalTs(queue);
}
inline void UpdateGlobalTs(Queue* queue) {
global_min_ts_ = std::min(global_min_ts_, queue->min_ts_);
global_max_ts_ = std::max(global_max_ts_, queue->max_ts_);
}
void MaybePushEvent(size_t queue_idx,
TimestampedTracePiece ttp) PERFETTO_ALWAYS_INLINE;
TraceProcessorContext* context_;
std::unique_ptr<TraceParser> parser_;
// Whether we should ignore incremental extraction and just wait for
// forced extractionn at the end of the trace.
SortingMode sorting_mode_ = SortingMode::kDefault;
// The packet index until which events should be extracted. Set based
// on the packet index in |OnReadBuffer|.
uint64_t packet_idx_for_extraction_ = 0;
// The number of flushes which have happened since the last incremental
// extraction.
uint32_t flushes_since_extraction_ = 0;
// queues_[0] is the general (non-ftrace) queue.
// queues_[1] is the ftrace queue for CPU(0).
// queues_[x] is the ftrace queue for CPU(x - 1).
std::vector<Queue> queues_;
// max(e.timestamp for e in queues_).
int64_t global_max_ts_ = 0;
// min(e.timestamp for e in queues_).
int64_t global_min_ts_ = std::numeric_limits<int64_t>::max();
// Monotonic increasing value used to index timestamped trace pieces.
uint64_t packet_idx_ = 0;
// Used for performance tests. True when setting TRACE_PROCESSOR_SORT_ONLY=1.
bool bypass_next_stage_for_testing_ = false;
// max(e.ts for e pushed to next stage)
int64_t latest_pushed_event_ts_ = std::numeric_limits<int64_t>::min();
};
} // namespace trace_processor
} // namespace perfetto
#endif // SRC_TRACE_PROCESSOR_TRACE_SORTER_H_