| /* |
| * Copyright (C) 2018 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #ifndef SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ |
| #define SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ |
| |
| #include <vector> |
| |
| #include "perfetto/ext/base/circular_queue.h" |
| #include "perfetto/trace_processor/basic_types.h" |
| #include "src/trace_processor/fuchsia_provider_view.h" |
| #include "src/trace_processor/proto_incremental_state.h" |
| #include "src/trace_processor/trace_blob_view.h" |
| #include "src/trace_processor/trace_processor_context.h" |
| #include "src/trace_processor/trace_storage.h" |
| |
| #if PERFETTO_BUILDFLAG(PERFETTO_TP_JSON) |
| #include <json/value.h> |
| #else |
| // Json traces are only supported in standalone and Chromium builds. |
| namespace Json { |
| class Value {}; |
| } // namespace Json |
| #endif |
| |
| namespace perfetto { |
| namespace trace_processor { |
| |
| // This class takes care of sorting events parsed from the trace stream in |
| // arbitrary order and pushing them to the next pipeline stages (parsing) in |
| // order. In order to support streaming use-cases, sorting happens within a |
| // max window. Events are held in the TraceSorter staging area (events_) until |
| // either (1) the (max - min) timestamp > window_size; (2) trace EOF. |
| // |
| // This class is designed around the assumption that: |
| // - Most events come from ftrace. |
| // - Ftrace events are sorted within each cpu most of the times. |
| // |
| // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues |
| // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if |
| // necessary) before proceeding with the global merge-sort-extract. |
| // When an event is pushed through, it is just appeneded to the end of one of |
| // the N queues. While appending, we keep track of the fact that the queue |
| // is still ordered or just lost ordering. When an out-of-order event is |
| // detected on a queue we keep track of: (1) the offset within the queue where |
| // the chaos begun, (2) the timestamp that broke the ordering. |
| // When we decide to extract events from the queues into the next stages of |
| // the trace processor, we re-sort the events in the queue. Rather than |
| // re-sorting everything all the times, we use the above knowledge to restrict |
| // sorting to the (hopefully smaller) tail of the |events_| staging area. |
| // At any time, the first partition of |events_| [0 .. sort_start_idx_) is |
| // ordered, and the second partition [sort_start_idx_.. end] is not. |
| // We use a logarithmic bound search operation to figure out what is the index |
| // within the first partition where sorting should start, and sort all events |
| // from there to the end. |
| class TraceSorter { |
| public: |
| struct InlineSchedSwitch { |
| int64_t prev_state; |
| int32_t next_pid; |
| int32_t next_prio; |
| StringId next_comm; |
| }; |
| |
| // Discriminated union of events that are cannot be easily read from the |
| // mapped trace. |
| struct InlineEvent { |
| enum class Type { kInvalid = 0, kSchedSwitch }; |
| |
| static InlineEvent SchedSwitch(InlineSchedSwitch content) { |
| InlineEvent evt; |
| evt.type = Type::kSchedSwitch; |
| evt.sched_switch = content; |
| return evt; |
| } |
| |
| Type type = Type::kInvalid; |
| union { |
| InlineSchedSwitch sched_switch; |
| }; |
| }; |
| |
| struct TimestampedTracePiece { |
| TimestampedTracePiece( |
| int64_t ts, |
| uint64_t idx, |
| TraceBlobView tbv, |
| ProtoIncrementalState::PacketSequenceState* sequence_state) |
| : TimestampedTracePiece(ts, |
| /*thread_ts=*/0, |
| /*thread_instructions=*/0, |
| idx, |
| std::move(tbv), |
| /*value=*/nullptr, |
| /*fpv=*/nullptr, |
| /*sequence_state=*/sequence_state, |
| InlineEvent{}) {} |
| |
| TimestampedTracePiece(int64_t ts, uint64_t idx, TraceBlobView tbv) |
| : TimestampedTracePiece(ts, |
| /*thread_ts=*/0, |
| /*thread_instructions=*/0, |
| idx, |
| std::move(tbv), |
| /*value=*/nullptr, |
| /*fpv=*/nullptr, |
| /*sequence_state=*/nullptr, |
| InlineEvent{}) {} |
| |
| TimestampedTracePiece(int64_t ts, |
| uint64_t idx, |
| std::unique_ptr<Json::Value> value) |
| : TimestampedTracePiece(ts, |
| /*thread_ts=*/0, |
| /*thread_instructions=*/0, |
| idx, |
| // TODO(dproy): Stop requiring TraceBlobView in |
| // TimestampedTracePiece. |
| TraceBlobView(nullptr, 0, 0), |
| std::move(value), |
| /*fpv=*/nullptr, |
| /*sequence_state=*/nullptr, |
| InlineEvent{}) {} |
| |
| TimestampedTracePiece(int64_t ts, |
| uint64_t idx, |
| TraceBlobView tbv, |
| std::unique_ptr<FuchsiaProviderView> fpv) |
| : TimestampedTracePiece(ts, |
| /*thread_ts=*/0, |
| /*thread_instructions=*/0, |
| idx, |
| std::move(tbv), |
| /*value=*/nullptr, |
| std::move(fpv), |
| /*sequence_state=*/nullptr, |
| InlineEvent{}) {} |
| |
| TimestampedTracePiece( |
| int64_t ts, |
| int64_t thread_ts, |
| int64_t thread_instructions, |
| uint64_t idx, |
| TraceBlobView tbv, |
| ProtoIncrementalState::PacketSequenceState* sequence_state) |
| : TimestampedTracePiece(ts, |
| thread_ts, |
| thread_instructions, |
| idx, |
| std::move(tbv), |
| /*value=*/nullptr, |
| /*fpv=*/nullptr, |
| sequence_state, |
| InlineEvent{}) {} |
| |
| TimestampedTracePiece(int64_t ts, uint64_t idx, InlineEvent inline_evt) |
| : TimestampedTracePiece(ts, |
| /*thread_ts=*/0, |
| /*thread_instructions=*/0, |
| idx, |
| /*tbv=*/TraceBlobView(nullptr, 0, 0), |
| /*value=*/nullptr, |
| /*fpv=*/nullptr, |
| /*sequence_state=*/nullptr, |
| inline_evt) {} |
| |
| TimestampedTracePiece( |
| int64_t ts, |
| int64_t thread_ts, |
| int64_t thread_instructions, |
| uint64_t idx, |
| TraceBlobView tbv, |
| std::unique_ptr<Json::Value> value, |
| std::unique_ptr<FuchsiaProviderView> fpv, |
| ProtoIncrementalState::PacketSequenceState* sequence_state, |
| InlineEvent inline_evt) |
| : json_value(std::move(value)), |
| fuchsia_provider_view(std::move(fpv)), |
| packet_sequence_state(sequence_state), |
| packet_sequence_state_generation( |
| sequence_state ? sequence_state->current_generation() : 0), |
| timestamp(ts), |
| thread_timestamp(thread_ts), |
| thread_instruction_count(thread_instructions), |
| packet_idx_(idx), |
| blob_view(std::move(tbv)), |
| inline_event(inline_evt) {} |
| |
| TimestampedTracePiece(TimestampedTracePiece&&) noexcept = default; |
| TimestampedTracePiece& operator=(TimestampedTracePiece&&) = default; |
| |
| // For std::lower_bound(). |
| static inline bool Compare(const TimestampedTracePiece& x, int64_t ts) { |
| return x.timestamp < ts; |
| } |
| |
| // For std::sort(). |
| inline bool operator<(const TimestampedTracePiece& o) const { |
| return timestamp < o.timestamp || |
| (timestamp == o.timestamp && packet_idx_ < o.packet_idx_); |
| } |
| |
| std::unique_ptr<Json::Value> json_value; |
| std::unique_ptr<FuchsiaProviderView> fuchsia_provider_view; |
| ProtoIncrementalState::PacketSequenceState* packet_sequence_state; |
| size_t packet_sequence_state_generation; |
| |
| int64_t timestamp; |
| int64_t thread_timestamp; |
| int64_t thread_instruction_count; |
| uint64_t packet_idx_; |
| TraceBlobView blob_view; |
| InlineEvent inline_event; |
| }; |
| |
| TraceSorter(TraceProcessorContext*, int64_t window_size_ns); |
| |
| inline void PushTracePacket(int64_t timestamp, |
| ProtoIncrementalState::PacketSequenceState* state, |
| TraceBlobView packet) { |
| DCHECK_ftrace_batch_cpu(kNoBatch); |
| auto* queue = GetQueue(0); |
| queue->Append(TimestampedTracePiece(timestamp, packet_idx_++, |
| std::move(packet), state)); |
| MaybeExtractEvents(queue); |
| } |
| |
| inline void PushJsonValue(int64_t timestamp, |
| std::unique_ptr<Json::Value> json_value) { |
| auto* queue = GetQueue(0); |
| queue->Append( |
| TimestampedTracePiece(timestamp, packet_idx_++, std::move(json_value))); |
| MaybeExtractEvents(queue); |
| } |
| |
| inline void PushFuchsiaRecord( |
| int64_t timestamp, |
| TraceBlobView record, |
| std::unique_ptr<FuchsiaProviderView> provider_view) { |
| DCHECK_ftrace_batch_cpu(kNoBatch); |
| auto* queue = GetQueue(0); |
| queue->Append(TimestampedTracePiece( |
| timestamp, packet_idx_++, std::move(record), std::move(provider_view))); |
| MaybeExtractEvents(queue); |
| } |
| |
| inline void PushFtraceEvent(uint32_t cpu, |
| int64_t timestamp, |
| TraceBlobView event) { |
| set_ftrace_batch_cpu_for_DCHECK(cpu); |
| GetQueue(cpu + 1)->Append( |
| TimestampedTracePiece(timestamp, packet_idx_++, std::move(event))); |
| |
| // The caller must call FinalizeFtraceEventBatch() after having pushed a |
| // batch of ftrace events. This is to amortize the overhead of handling |
| // global ordering and doing that in batches only after all ftrace events |
| // for a bundle are pushed. |
| } |
| |
| // As with |PushFtraceEvent|, doesn't immediately sort the affected queues. |
| // TODO(rsavitski): if a trace has a mix of normal & "compact" events (being |
| // pushed through this function), the ftrace batches will no longer be fully |
| // sorted by timestamp. In such situations, we will have to sort at the end of |
| // the batch. We can do better as both sub-sequences are sorted however. |
| // Consider adding extra queues, or pushing them in a merge-sort fashion |
| // instead. |
| inline void PushInlineFtraceEvent(uint32_t cpu, |
| int64_t timestamp, |
| InlineEvent inline_event) { |
| set_ftrace_batch_cpu_for_DCHECK(cpu); |
| GetQueue(cpu + 1)->Append( |
| TimestampedTracePiece(timestamp, packet_idx_++, inline_event)); |
| } |
| |
| inline void PushTrackEventPacket( |
| int64_t timestamp, |
| int64_t thread_time, |
| int64_t thread_instruction_count, |
| ProtoIncrementalState::PacketSequenceState* state, |
| TraceBlobView packet) { |
| auto* queue = GetQueue(0); |
| queue->Append(TimestampedTracePiece(timestamp, thread_time, |
| thread_instruction_count, packet_idx_++, |
| std::move(packet), state)); |
| MaybeExtractEvents(queue); |
| } |
| |
| inline void FinalizeFtraceEventBatch(uint32_t cpu) { |
| DCHECK_ftrace_batch_cpu(cpu); |
| set_ftrace_batch_cpu_for_DCHECK(kNoBatch); |
| MaybeExtractEvents(GetQueue(cpu + 1)); |
| } |
| |
| // Extract all events ignoring the window. |
| void ExtractEventsForced() { |
| SortAndExtractEventsBeyondWindow(/*window_size_ns=*/0); |
| } |
| |
| // Sets the window size to be the size specified (which should be lower than |
| // any previous window size specified) and flushes any data beyond |
| // this window size. |
| // It is undefined to call this function with a window size greater than than |
| // the current size. |
| void SetWindowSizeNs(int64_t window_size_ns) { |
| PERFETTO_DCHECK(window_size_ns <= window_size_ns_); |
| |
| PERFETTO_DLOG("Setting window size to be %" PRId64 " ns", window_size_ns); |
| window_size_ns_ = window_size_ns; |
| |
| // Fast path: if, globally, we are within the window size, then just exit. |
| if (global_max_ts_ - global_min_ts_ < window_size_ns) |
| return; |
| SortAndExtractEventsBeyondWindow(window_size_ns_); |
| } |
| |
| private: |
| static constexpr uint32_t kNoBatch = std::numeric_limits<uint32_t>::max(); |
| |
| struct Queue { |
| inline void Append(TimestampedTracePiece ttp) { |
| const int64_t timestamp = ttp.timestamp; |
| events_.emplace_back(std::move(ttp)); |
| min_ts_ = std::min(min_ts_, timestamp); |
| |
| // Events are often seen in order. |
| if (PERFETTO_LIKELY(timestamp >= max_ts_)) { |
| max_ts_ = timestamp; |
| } else { |
| // The event is breaking ordering. The first time it happens, keep |
| // track of which index we are at. We know that everything before that |
| // is sorted (because events were pushed monotonically). Everything |
| // after that index, instead, will need a sorting pass before moving |
| // events to the next pipeline stage. |
| if (sort_start_idx_ == 0) { |
| PERFETTO_DCHECK(events_.size() >= 2); |
| sort_start_idx_ = events_.size() - 1; |
| sort_min_ts_ = timestamp; |
| } else { |
| sort_min_ts_ = std::min(sort_min_ts_, timestamp); |
| } |
| } |
| |
| PERFETTO_DCHECK(min_ts_ <= max_ts_); |
| } |
| |
| bool needs_sorting() const { return sort_start_idx_ != 0; } |
| void Sort(); |
| |
| base::CircularQueue<TimestampedTracePiece> events_; |
| int64_t min_ts_ = std::numeric_limits<int64_t>::max(); |
| int64_t max_ts_ = 0; |
| size_t sort_start_idx_ = 0; |
| int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max(); |
| }; |
| |
| // This method passes any events older than window_size_ns to the |
| // parser to be parsed and then stored. |
| void SortAndExtractEventsBeyondWindow(int64_t windows_size_ns); |
| |
| inline Queue* GetQueue(size_t index) { |
| if (PERFETTO_UNLIKELY(index >= queues_.size())) |
| queues_.resize(index + 1); |
| return &queues_[index]; |
| } |
| |
| inline void MaybeExtractEvents(Queue* queue) { |
| DCHECK_ftrace_batch_cpu(kNoBatch); |
| global_max_ts_ = std::max(global_max_ts_, queue->max_ts_); |
| global_min_ts_ = std::min(global_min_ts_, queue->min_ts_); |
| |
| // Fast path: if, globally, we are within the window size, then just exit. |
| if (global_max_ts_ - global_min_ts_ < window_size_ns_) |
| return; |
| SortAndExtractEventsBeyondWindow(window_size_ns_); |
| } |
| |
| TraceProcessorContext* const context_; |
| |
| // queues_[0] is the general (non-ftrace) queue. |
| // queues_[1] is the ftrace queue for CPU(0). |
| // queues_[x] is the ftrace queue for CPU(x - 1). |
| std::vector<Queue> queues_; |
| |
| // Events are propagated to the next stage only after (max - min) timestamp |
| // is larger than this value. |
| int64_t window_size_ns_; |
| |
| // max(e.timestamp for e in queues_). |
| int64_t global_max_ts_ = 0; |
| |
| // min(e.timestamp for e in queues_). |
| int64_t global_min_ts_ = std::numeric_limits<int64_t>::max(); |
| |
| // Monotonic increasing value used to index timestamped trace pieces. |
| uint64_t packet_idx_ = 0; |
| |
| // Used for performance tests. True when setting TRACE_PROCESSOR_SORT_ONLY=1. |
| bool bypass_next_stage_for_testing_ = false; |
| |
| #if PERFETTO_DCHECK_IS_ON() |
| // Used only for DCHECK-ing that FinalizeFtraceEventBatch() is called. |
| uint32_t ftrace_batch_cpu_ = kNoBatch; |
| |
| inline void DCHECK_ftrace_batch_cpu(uint32_t cpu) { |
| PERFETTO_DCHECK(ftrace_batch_cpu_ == kNoBatch || ftrace_batch_cpu_ == cpu); |
| } |
| |
| inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu) { |
| PERFETTO_DCHECK(ftrace_batch_cpu_ == cpu || ftrace_batch_cpu_ == kNoBatch || |
| cpu == kNoBatch); |
| ftrace_batch_cpu_ = cpu; |
| } |
| #else |
| inline void DCHECK_ftrace_batch_cpu(uint32_t) {} |
| inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t) {} |
| #endif |
| }; |
| |
| } // namespace trace_processor |
| } // namespace perfetto |
| |
| #endif // SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ |