blob: 8dc34fd22f4e3f4de751f20de453840dac36d325 [file] [log] [blame]
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trace
import (
"cmp"
"encoding/binary"
"fmt"
"internal/trace/v2/event"
"internal/trace/v2/event/go122"
)
type batchCursor struct {
m ThreadID
lastTs Time
idx int // next index into []batch
dataOff int // next index into batch.data
ev baseEvent // last read event
}
func (b *batchCursor) nextEvent(batches []batch, freq frequency) (ok bool, err error) {
// Batches should generally always have at least one event,
// but let's be defensive about that and accept empty batches.
for b.idx < len(batches) && len(batches[b.idx].data) == b.dataOff {
b.idx++
b.dataOff = 0
b.lastTs = 0
}
// Have we reached the end of the batches?
if b.idx == len(batches) {
return false, nil
}
// Initialize lastTs if it hasn't been yet.
if b.lastTs == 0 {
b.lastTs = freq.mul(batches[b.idx].time)
}
// Read an event out.
n, tsdiff, err := readTimedBaseEvent(batches[b.idx].data[b.dataOff:], &b.ev)
if err != nil {
return false, err
}
// Complete the timestamp from the cursor's last timestamp.
b.ev.time = freq.mul(tsdiff) + b.lastTs
// Move the cursor's timestamp forward.
b.lastTs = b.ev.time
// Move the cursor forward.
b.dataOff += n
return true, nil
}
func (b *batchCursor) compare(a *batchCursor) int {
return cmp.Compare(b.ev.time, a.ev.time)
}
// readTimedBaseEvent reads out the raw event data from b
// into e. It does not try to interpret the arguments
// but it does validate that the event is a regular
// event with a timestamp (vs. a structural event).
//
// It requires that the event its reading be timed, which must
// be the case for every event in a plain EventBatch.
func readTimedBaseEvent(b []byte, e *baseEvent) (int, timestamp, error) {
// Get the event type.
typ := event.Type(b[0])
specs := go122.Specs()
if int(typ) >= len(specs) {
return 0, 0, fmt.Errorf("found invalid event type: %v", typ)
}
e.typ = typ
// Get spec.
spec := &specs[typ]
if len(spec.Args) == 0 || !spec.IsTimedEvent {
return 0, 0, fmt.Errorf("found event without a timestamp: type=%v", typ)
}
n := 1
// Read timestamp diff.
ts, nb := binary.Uvarint(b[n:])
if nb <= 0 {
return 0, 0, fmt.Errorf("found invalid uvarint for timestamp")
}
n += nb
// Read the rest of the arguments.
for i := 0; i < len(spec.Args)-1; i++ {
arg, nb := binary.Uvarint(b[n:])
if nb <= 0 {
return 0, 0, fmt.Errorf("found invalid uvarint")
}
e.args[i] = arg
n += nb
}
return n, timestamp(ts), nil
}
func heapInsert(heap []*batchCursor, bc *batchCursor) []*batchCursor {
// Add the cursor to the end of the heap.
heap = append(heap, bc)
// Sift the new entry up to the right place.
heapSiftUp(heap, len(heap)-1)
return heap
}
func heapUpdate(heap []*batchCursor, i int) {
// Try to sift up.
if heapSiftUp(heap, i) != i {
return
}
// Try to sift down, if sifting up failed.
heapSiftDown(heap, i)
}
func heapRemove(heap []*batchCursor, i int) []*batchCursor {
// Sift index i up to the root, ignoring actual values.
for i > 0 {
heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2]
i = (i - 1) / 2
}
// Swap the root with the last element, then remove it.
heap[0], heap[len(heap)-1] = heap[len(heap)-1], heap[0]
heap = heap[:len(heap)-1]
// Sift the root down.
heapSiftDown(heap, 0)
return heap
}
func heapSiftUp(heap []*batchCursor, i int) int {
for i > 0 && heap[(i-1)/2].ev.time > heap[i].ev.time {
heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2]
i = (i - 1) / 2
}
return i
}
func heapSiftDown(heap []*batchCursor, i int) int {
for {
m := min3(heap, i, 2*i+1, 2*i+2)
if m == i {
// Heap invariant already applies.
break
}
heap[i], heap[m] = heap[m], heap[i]
i = m
}
return i
}
func min3(b []*batchCursor, i0, i1, i2 int) int {
minIdx := i0
minT := maxTime
if i0 < len(b) {
minT = b[i0].ev.time
}
if i1 < len(b) {
if t := b[i1].ev.time; t < minT {
minT = t
minIdx = i1
}
}
if i2 < len(b) {
if t := b[i2].ev.time; t < minT {
minT = t
minIdx = i2
}
}
return minIdx
}