blob: 7d0e624df58c5dd4802f49b4f5b60703e26be81b [file] [log] [blame]
// Copyright (C) 2019 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "compiler/compiler.h"
#include "common/debug.h"
#include "common/expected.h"
#include "perfetto/rx_producer.h" // TODO: refactor BinaryWireProtobuf to separate header.
#include "inode2filename/inode.h"
#include "inode2filename/search_directories.h"
#include "serialize/protobuf_io.h"
#include <android-base/unique_fd.h>
#include <android-base/parseint.h>
#include <android-base/file.h>
#include <perfetto/trace/trace.pb.h> // ::perfetto::protos::Trace
#include <perfetto/trace/trace_packet.pb.h> // ::perfetto::protos::TracePacket
#include "rxcpp/rx.hpp"
#include <iostream>
#include <fstream>
#include <optional>
#include <utility>
#include <regex>
#include <sched.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <syscall.h>
#include <fcntl.h>
#include <unistd.h>
namespace iorap::compiler {
using Inode = iorap::inode2filename::Inode;
using InodeResult = iorap::inode2filename::InodeResult;
using SearchDirectories = iorap::inode2filename::SearchDirectories;
template <typename T>
using ProtobufPtr = iorap::perfetto::ProtobufPtr<T>;
struct PerfettoTraceProtoInfo {
/* The perfetto trace proto. */
::iorap::perfetto::PerfettoTraceProto proto;
/*
* The timestamp limit of the trace.
* It's used to truncate the trace file.
*/
uint64_t timestamp_limit_ns;
};
struct PerfettoTracePtrInfo {
/* Deserialized protobuf data containing the perfetto trace. */
ProtobufPtr<::perfetto::protos::Trace> trace_ptr;
/*
* The timestamp limit of the trace.
* It's used to truncate the trace file.
*/
uint64_t timestamp_limit_ns;
};
// Attempt to read protobufs from the filenames.
// Emits one (or none) protobuf for each filename, in the same order as the filenames.
// On any errors, the items are dropped (errors are written to the error LOG).
//
// All work is done on the same Coordinator as the Subscriber.
template <typename ProtoT /*extends MessageLite*/>
auto/*observable<PerfettoTracePtrInfo>*/ ReadProtosFromFileNames(
rxcpp::observable<CompilationInput> file_infos) {
using BinaryWireProtoT = ::iorap::perfetto::PerfettoTraceProto;
return file_infos
.map([](const CompilationInput& file_info) ->
std::optional<PerfettoTraceProtoInfo> {
LOG(VERBOSE) << "compiler::ReadProtosFromFileNames " << file_info.filename
<< " TimeStampLimit "<< file_info.timestamp_limit_ns << " [begin]";
std::optional<BinaryWireProtoT> maybe_proto =
BinaryWireProtoT::ReadFullyFromFile(file_info.filename);
if (!maybe_proto) {
LOG(ERROR) << "Failed to read file: " << file_info.filename;
return std::nullopt;
}
return {{std::move(maybe_proto.value()), file_info.timestamp_limit_ns}};
})
.filter([](const std::optional<PerfettoTraceProtoInfo>& proto_info) {
return proto_info.has_value();
})
.map([](std::optional<PerfettoTraceProtoInfo>& proto_info) ->
PerfettoTraceProtoInfo {
return proto_info.value();
}) // TODO: refactor to something that flattens the optional, and logs in one operator.
.map([](PerfettoTraceProtoInfo& proto_info) ->
std::optional<PerfettoTracePtrInfo> {
std::optional<ProtobufPtr<ProtoT>> t = proto_info.proto.template MaybeUnserialize<ProtoT>();
if (!t) {
LOG(ERROR) << "Failed to parse protobuf: "; // TODO: filename.
return std::nullopt;
}
return {{std::move(t.value()), proto_info.timestamp_limit_ns}};
})
.filter([](const std::optional<PerfettoTracePtrInfo>& trace_info) {
return trace_info.has_value();
})
.map([](std::optional<PerfettoTracePtrInfo>& trace_info) ->
PerfettoTracePtrInfo {
LOG(VERBOSE) << "compiler::ReadProtosFromFileNames [success]";
return trace_info.value();
// TODO: protobufs have no move constructor. this might be inefficient?
});
/*
return filenames
.map([](const std::string& filename) {
LOG(VERBOSE) << "compiler::ReadProtosFromFileNames " << filename << " [begin]";
std::optional<BinaryWireProtoT> maybe_proto =
BinaryWireProtoT::ReadFullyFromFile(filename);
if (!maybe_proto) {
LOG(ERROR) << "Failed to read file: " << filename;
}
std::unique_ptr<BinaryWireProtoT> ptr;
if (maybe_proto) {
ptr.reset(new BinaryWireProtoT{std::move(*maybe_proto)});
}
return ptr;
})
.filter([](const std::unique_ptr<BinaryWireProtoT>& proto) {
return proto != nullptr;
})
.map([](std::unique_ptr<BinaryWireProtoT>& proto) {
std::optional<ProtoT> t = proto->template MaybeUnserialize<ProtoT>();
if (!t) {
LOG(ERROR) << "Failed to parse protobuf: "; // TODO: filename.
}
return t;
})
.filter([](const std::optional<ProtoT>& proto) {
return proto.has_value();
})
.map([](std::optional<ProtoT> proto) -> ProtoT {
LOG(VERBOSE) << "compiler::ReadProtosFromFileNames [success]";
return std::move(proto.value());
// TODO: protobufs have no move constructor. this might be inefficient?
});
*/
}
auto/*observable<PerfettoTracePtrInfo>*/ ReadPerfettoTraceProtos(
std::vector<CompilationInput> file_infos) {
auto filename_obs = rxcpp::observable<>::iterate(std::move(file_infos));
rxcpp::observable<PerfettoTracePtrInfo> obs =
ReadProtosFromFileNames<::perfetto::protos::Trace>(std::move(filename_obs));
return obs;
}
// A flattened data representation of an MmFileMap*FtraceEvent.
// This representation is used for streaming processing.
//
// Note: Perfetto applies a 'union' over all possible fields on all possible devices
// (and uses the max sizeof per-field).
//
// Since all protobuf fields are optional, fields not present on a particular device are always
// null
struct PageCacheFtraceEvent {
/*
* Ftrace buffer-specific
*/
uint32_t cpu; // e.g. 0-7 for the cpu core number.
/*
* Ftrace-event general data
*/
// Nanoseconds since an epoch.
// Epoch is configurable by writing into trace_clock.
// By default this timestamp is CPU local.
uint64_t timestamp;
// Kernel pid (do not confuse with userspace pid aka tgid)
uint32_t pid;
// Tagged by our code while parsing the ftraces:
uint64_t timestamp_relative; // timestamp relative to first ftrace within a Trace protobuf.
bool add_to_page_cache; // AddToPageCache=true, DeleteFromPageCache=false.
/*
* mm_filemap-specific data
*
* Fields are common:
* - MmFilemapAddToPageCacheFtraceEvent
* - MmFilemapDeleteFromPageCacheFtraceEvent
*/
uint64_t pfn; // page frame number (physical) - null on some devices, e.g. marlin
uint64_t i_ino; // inode number (use in conjunction with s_dev)
uint64_t index; // offset into file: this is a multiple of the page size (usually 4096).
uint64_t s_dev; // (dev_t) device number
uint64_t page; // struct page*. - null on some devices, e.g. blueline.
Inode inode() const {
return Inode::FromDeviceAndInode(static_cast<dev_t>(s_dev),
static_cast<ino_t>(i_ino));
}
};
std::ostream& operator<<(std::ostream& os, const PageCacheFtraceEvent& e) {
os << "{";
os << "cpu:" << e.cpu << ",";
os << "timestamp:" << e.timestamp << ",";
os << "pid:" << e.pid << ",";
os << "timestamp_relative:" << e.timestamp_relative << ",";
os << "add_to_page_cache:" << e.add_to_page_cache << ",";
os << "pfn:" << e.pfn << ",";
os << "i_ino:" << e.i_ino << ",";
os << "index:" << e.index << ",";
os << "s_dev:" << e.s_dev << ",";
os << "page:" << e.page;
os << "}";
return os;
}
/*
* Gets the start timestamp.
*
* It is the minimium timestamp.
*/
std::optional<uint64_t> GetStartTimestamp(const ::perfetto::protos::Trace& trace) {
std::optional<uint64_t> timestamp_relative_start;
// Traverse each timestamp to get the minimium one.
for (const ::perfetto::protos::TracePacket& packet : trace.packet()) {
if (packet.has_timestamp()) {
timestamp_relative_start = timestamp_relative_start?
std::min(*timestamp_relative_start, packet.timestamp()) : packet.timestamp();
}
if (!packet.has_ftrace_events()) {
continue;
}
const ::perfetto::protos::FtraceEventBundle& ftrace_event_bundle =
packet.ftrace_events();
for (const ::perfetto::protos::FtraceEvent& event : ftrace_event_bundle.event()) {
if (event.has_timestamp()) {
timestamp_relative_start = timestamp_relative_start?
std::min(*timestamp_relative_start, event.timestamp()) : event.timestamp();
}
}
}
return timestamp_relative_start;
}
/*
* sample blueline output:
*
* $ adb shell cat /d/tracing/events/filemap/mm_filemap_add_to_page_cache/format
*
* name: mm_filemap_add_to_page_cache
* ID: 178
* format:
* field:unsigned short common_type; offset:0; size:2; signed:0;
* field:unsigned char common_flags; offset:2; size:1; signed:0;
* field:unsigned char common_preempt_count; offset:3; size:1; signed:0;
* field:int common_pid; offset:4; size:4; signed:1;
*
* field:unsigned long pfn; offset:8; size:8; signed:0;
* field:unsigned long i_ino; offset:16; size:8; signed:0;
* field:unsigned long index; offset:24; size:8; signed:0;
* field:dev_t s_dev; offset:32; size:4; signed:0;
*
* print fmt: "dev %d:%d ino %lx page=%p pfn=%lu ofs=%lu", ((unsigned int) ((REC->s_dev) >> 20)),
* ((unsigned int) ((REC->s_dev) & ((1U << 20) - 1))), REC->i_ino,
* (((struct page *)(((0xffffffffffffffffUL) - ((1UL) << ((39) - 1)) + 1) -
* ((1UL) << ((39) - 12 - 1 + 6))) - (memstart_addr >> 12)) + (REC->pfn)),
* REC->pfn, REC->index << 12
*/
auto /*observable<PageCacheFtraceEvent>*/ SelectPageCacheFtraceEvents(
PerfettoTracePtrInfo trace_info) {
const ::perfetto::protos::Trace& trace = *(trace_info.trace_ptr);
constexpr bool kDebugFunction = true;
return rxcpp::observable<>::create<PageCacheFtraceEvent>(
[trace=std::move(trace), timestamp_limit_ns=trace_info.timestamp_limit_ns]
(rxcpp::subscriber<PageCacheFtraceEvent> sub) {
uint64_t timestamp = 0;
uint64_t timestamp_relative = 0;
std::optional<uint64_t> timestamp_relative_start = GetStartTimestamp(trace);
uint32_t cpu = 0;
uint32_t pid = 0;
bool add_to_page_cache = true;
auto on_next_page_cache_event = [&](const auto& mm_event) {
PageCacheFtraceEvent out;
out.timestamp = timestamp;
out.cpu = cpu;
out.pid = pid;
out.timestamp_relative = timestamp_relative;
out.add_to_page_cache = add_to_page_cache;
out.pfn = mm_event.pfn();
out.i_ino = mm_event.i_ino();
out.index = mm_event.index();
out.s_dev = mm_event.s_dev();
out.page = mm_event.page();
sub.on_next(std::move(out));
};
for (const ::perfetto::protos::TracePacket& packet : trace.packet()) {
// Break out of all loops if we are unsubscribed.
if (!sub.is_subscribed()) {
if (kDebugFunction) LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents unsubscribe";
return;
}
if (kDebugFunction) LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents TracePacket";
if (packet.has_timestamp()) {
timestamp_relative_start = timestamp_relative_start.value_or(packet.timestamp());
timestamp = packet.timestamp(); // XX: should we call 'has_timestamp()' ?
} else {
timestamp = 0;
}
if (packet.has_ftrace_events()) {
const ::perfetto::protos::FtraceEventBundle& ftrace_event_bundle =
packet.ftrace_events();
cpu = ftrace_event_bundle.cpu(); // XX: has_cpu ?
for (const ::perfetto::protos::FtraceEvent& event : ftrace_event_bundle.event()) {
// Break out of all loops if we are unsubscribed.
if (!sub.is_subscribed()) {
return;
}
if (event.has_timestamp()) {
timestamp = event.timestamp();
if(timestamp > timestamp_limit_ns) {
LOG(VERBOSE) << "The timestamp is " << timestamp <<
", which exceeds the limit "<< timestamp_limit_ns;
continue;
}
} else {
DCHECK(packet.has_timestamp() == false)
<< "Timestamp in outer packet but not inner packet";
// XX: use timestamp from the perfetto TracePacket ???
// REVIEWERS: not sure if this is ok, does it use the same clock source and
// is the packet data going to be the same clock sample as the Ftrace event?
}
if (timestamp_relative_start){
timestamp_relative = timestamp - *timestamp_relative_start;
} else {
timestamp_relative = 0;
}
pid = event.pid(); // XX: has_pid ?
if (event.has_mm_filemap_add_to_page_cache()) {
add_to_page_cache = true;
const ::perfetto::protos::MmFilemapAddToPageCacheFtraceEvent& mm_event =
event.mm_filemap_add_to_page_cache();
on_next_page_cache_event(mm_event);
} else if (event.has_mm_filemap_delete_from_page_cache()) {
add_to_page_cache = false;
const ::perfetto::protos::MmFilemapDeleteFromPageCacheFtraceEvent& mm_event =
event.mm_filemap_delete_from_page_cache();
on_next_page_cache_event(mm_event);
}
}
} else {
if (kDebugFunction) {
LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents no ftrace event bundle";
}
}
}
if (kDebugFunction) {
LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents#on_completed";
}
// Let subscriber know there are no more items.
sub.on_completed();
});
}
auto /*observable<Inode*/ SelectDistinctInodesFromTraces(
rxcpp::observable<PerfettoTracePtrInfo> traces) {
// Emit only unique (s_dev, i_ino) pairs from all Trace protos.
auto obs = traces
.flat_map([](PerfettoTracePtrInfo trace) {
rxcpp::observable<PageCacheFtraceEvent> obs = SelectPageCacheFtraceEvents(std::move(trace));
// FIXME: dont check this in
// return obs;
//return obs.take(100); // for faster development
return obs;
}) // TODO: Upstream bug? using []()::perfetto::protos::Trace&) causes a compilation error.
.map([](const PageCacheFtraceEvent& event) -> Inode {
return Inode::FromDeviceAndInode(static_cast<dev_t>(event.s_dev),
static_cast<ino_t>(event.i_ino));
})
.tap([](const Inode& inode) {
LOG(VERBOSE) << "SelectDistinctInodesFromTraces (pre-distinct): " << inode;
})
.distinct() // observable<Inode>*/
;
return obs;
}
// TODO: static assert checks for convertible return values.
auto/*observable<InodeResult>*/ ResolveInodesToFileNames(
rxcpp::observable<Inode> inodes,
inode2filename::InodeResolverDependencies dependencies) {
std::shared_ptr<inode2filename::InodeResolver> inode_resolver =
inode2filename::InodeResolver::Create(std::move(dependencies));
return inode_resolver->FindFilenamesFromInodes(std::move(inodes));
}
using InodeMap = std::unordered_map<Inode, std::string /*filename*/>;
auto /*just observable<InodeMap>*/ ReduceResolvedInodesToMap(
rxcpp::observable<InodeResult> inode_results) {
return inode_results.reduce(
InodeMap{},
[](InodeMap m, InodeResult result) {
if (result) {
LOG(VERBOSE) << "compiler::ReduceResolvedInodesToMap insert " << result;
m.insert({std::move(result.inode), std::move(result.data.value())});
} else {
// TODO: side stats for how many of these are failed to resolve?
LOG(WARNING) << "compiler: Failed to resolve inode, " << result;
}
return m;
},
[](InodeMap m) {
return m; // TODO: use an identity function
}); // emits exactly 1 InodeMap value.
}
struct ResolvedPageCacheFtraceEvent {
std::string filename;
PageCacheFtraceEvent event;
};
std::ostream& operator<<(std::ostream& os, const ResolvedPageCacheFtraceEvent& e) {
os << "{";
os << "filename:\"" << e.filename << "\",";
os << e.event;
os << "}";
return os;
}
struct CombinedState {
CombinedState() = default;
explicit CombinedState(InodeMap inode_map) : inode_map{std::move(inode_map)} {}
explicit CombinedState(PageCacheFtraceEvent event) : ftrace_event{std::move(event)} {}
CombinedState(InodeMap inode_map, PageCacheFtraceEvent event)
: inode_map(std::move(inode_map)),
ftrace_event{std::move(event)} {}
std::optional<InodeMap> inode_map;
std::optional<PageCacheFtraceEvent> ftrace_event;
bool HasAll() const {
return inode_map.has_value() && ftrace_event.has_value();
}
const InodeMap& GetInodeMap() const {
DCHECK(HasAll());
return inode_map.value();
}
InodeMap& GetInodeMap() {
DCHECK(HasAll());
return inode_map.value();
}
const PageCacheFtraceEvent& GetEvent() const {
DCHECK(HasAll());
return ftrace_event.value();
}
PageCacheFtraceEvent& GetEvent() {
DCHECK(HasAll());
return ftrace_event.value();
}
void Merge(CombinedState&& other) {
if (other.inode_map) {
inode_map = std::move(other.inode_map);
}
if (other.ftrace_event) {
ftrace_event = std::move(other.ftrace_event);
}
}
};
std::ostream& operator<<(std::ostream& os, const CombinedState& s) {
os << "CombinedState{inode_map:";
if (s.inode_map) {
os << "|sz=" << (s.inode_map.value().size()) << "|";
} else {
os << "(null)";
}
os << ",event:";
if (s.ftrace_event) {
//os << s.ftrace_event.value().timestamp << "ns";
os << s.ftrace_event.value();
} else {
os << "(null)";
}
os << "}";
return os;
}
auto/*observable<ResolvedPageCacheFtraceEvent>*/ ResolvePageCacheEntriesFromProtos(
rxcpp::observable<PerfettoTracePtrInfo> traces,
inode2filename::InodeResolverDependencies dependencies) {
// 1st chain = emits exactly 1 InodeMap.
// [proto, proto, proto...] -> [inode, inode, inode, ...]
auto/*observable<Inode>*/ distinct_inodes = SelectDistinctInodesFromTraces(traces);
rxcpp::observable<Inode> distinct_inodes_obs = distinct_inodes.as_dynamic();
// [inode, inode, inode, ...] -> [(inode, {filename|error}), ...]
auto/*observable<InodeResult>*/ inode_names = ResolveInodesToFileNames(distinct_inodes_obs,
std::move(dependencies));
// rxcpp has no 'join' operators, so do a manual join with concat.
auto/*observable<InodeMap>*/ inode_name_map = ReduceResolvedInodesToMap(inode_names);
// 2nd chain = emits all PageCacheFtraceEvent
auto/*observable<PageCacheFtraceEvent>*/ page_cache_ftrace_events = traces
.flat_map([](PerfettoTracePtrInfo trace) {
rxcpp::observable<PageCacheFtraceEvent> obs = SelectPageCacheFtraceEvents(std::move(trace));
return obs;
});
auto inode_name_map_precombine = inode_name_map
.map([](InodeMap inode_map) {
LOG(VERBOSE) << "compiler::ResolvePageCacheEntriesFromProtos#inode_name_map_precombine ";
return CombinedState{std::move(inode_map)};
});
auto page_cache_ftrace_events_precombine = page_cache_ftrace_events
.map([](PageCacheFtraceEvent event) {
LOG(VERBOSE)
<< "compiler::ResolvePageCacheEntriesFromProtos#page_cache_ftrace_events_precombine "
<< event;
return CombinedState{std::move(event)};
});
// Combine 1st+2nd chain.
//
// concat subscribes to each observable, waiting until its completed, before subscribing
// to the next observable and waiting again.
//
// During all this, every #on_next is immediately forwarded to the downstream observables.
// In our case, we want to block until InodeNameMap is ready, and re-iterate all ftrace events.
auto/*observable<ResolvedPageCacheFtraceEvent>*/ resolved_events = inode_name_map_precombine
.concat(page_cache_ftrace_events_precombine)
.scan(CombinedState{},
[](CombinedState current_state, CombinedState delta_state) {
LOG(VERBOSE) << "compiler::ResolvePageCacheEntriesFromProtos#scan "
<< "current=" << current_state << ","
<< "delta=" << delta_state;
// IT0 = (,) + (InodeMap,)
// IT1 = (InodeMap,) + (,Event)
// IT2..N = (InodeMap,Event1) + (,Event2)
current_state.Merge(std::move(delta_state));
return current_state;
})
.filter([](const CombinedState& state) {
return state.HasAll();
})
.map([](CombinedState& state) -> std::optional<ResolvedPageCacheFtraceEvent> {
PageCacheFtraceEvent& event = state.GetEvent();
const InodeMap& inode_map = state.GetInodeMap();
auto it = inode_map.find(event.inode());
if (it != inode_map.end()) {
std::string filename = it->second;
LOG(VERBOSE) << "compiler::ResolvePageCacheEntriesFromProtos combine_latest " << event;
return ResolvedPageCacheFtraceEvent{std::move(filename), std::move(event)};
} else {
LOG(ERROR) << "compiler: FtraceEvent's inode did not have resolved filename: " << event;
return std::nullopt;
}
})
.filter(
[](std::optional<ResolvedPageCacheFtraceEvent> maybe_event) {
return maybe_event.has_value();
})
.map([](std::optional<ResolvedPageCacheFtraceEvent> maybe_event) {
return std::move(maybe_event.value());
});
// -> observable<ResolvedPageCacheFtraceEvent>
return resolved_events;
}
namespace detail {
bool multiless_one(const std::string& a, const std::string& b) {
return std::lexicographical_compare(a.begin(), a.end(),
b.begin(), b.end());
}
template <typename T>
constexpr bool multiless_one(T&& a, T&& b) { // a < b
using std::less; // ADL
return less<std::decay_t<T>>{}(std::forward<T>(a), std::forward<T>(b));
}
constexpr bool multiless() {
return false; // [] < [] is always false.
}
template <typename T, typename ... Args>
constexpr bool multiless(T&& a, T&& b, Args&&... args) {
if (a != b) {
return multiless_one(std::forward<T>(a), std::forward<T>(b));
} else {
return multiless(std::forward<Args>(args)...);
}
}
} // namespace detail
// Return [A0...An] < [B0...Bn] ; vector-like scalar comparison of each field.
// Arguments are passed in the order A0,B0,A1,B1,...,An,Bn.
template <typename ... Args>
constexpr bool multiless(Args&&... args) {
return detail::multiless(std::forward<Args>(args)...);
}
struct CompilerPageCacheEvent {
std::string filename;
uint64_t timestamp_relative; // use relative timestamp because absolute values aren't comparable
// across different trace protos.
// relative timestamps can be said to be 'approximately' comparable.
// assuming we compare the same application startup's trace times.
bool add_to_page_cache; // AddToPageCache=true, DeleteFromPageCache=false.
uint64_t index; // offset into file: this is a multiple of the page size (usually 4096).
// All other data from the ftrace is dropped because we don't currently use it in the
// compiler algorithms.
CompilerPageCacheEvent() = default;
CompilerPageCacheEvent(const ResolvedPageCacheFtraceEvent& resolved)
: CompilerPageCacheEvent(resolved.filename, resolved.event) {
}
CompilerPageCacheEvent(ResolvedPageCacheFtraceEvent&& resolved)
: CompilerPageCacheEvent(std::move(resolved.filename), std::move(resolved.event)) {
}
// Compare all fields (except the timestamp field).
static bool LessIgnoringTimestamp(const CompilerPageCacheEvent& a,
const CompilerPageCacheEvent& b) {
return multiless(a.filename, b.filename,
a.add_to_page_cache, b.add_to_page_cache,
a.index, b.index);
}
// Compare all fields. Timestamps get highest precedence.
bool operator<(const CompilerPageCacheEvent& rhs) const {
return multiless(timestamp_relative, rhs.timestamp_relative,
filename, rhs.filename,
add_to_page_cache, rhs.add_to_page_cache,
index, rhs.index);
}
private:
CompilerPageCacheEvent(std::string filename, const PageCacheFtraceEvent& event)
: filename(std::move(filename)),
timestamp_relative(event.timestamp_relative),
add_to_page_cache(event.add_to_page_cache),
index(event.index) {
}
};
std::ostream& operator<<(std::ostream& os, const CompilerPageCacheEvent& e) {
os << "{";
os << "filename:\"" << e.filename << "\",";
os << "timestamp:" << e.timestamp_relative << ",";
os << "add_to_page_cache:" << e.add_to_page_cache << ",";
os << "index:" << e.index;
os << "}";
return os;
}
// Filter an observable chain of 'ResolvedPageCacheFtraceEvent'
// into an observable chain of 'ResolvedPageCacheFtraceEvent'.
//
// Any items emitted by the input chain that match the regular expression
// specified by blacklist_filter are not emitted into the output chain.
auto/*observable<ResolvedPageCacheFtraceEvent>*/ ApplyBlacklistToPageCacheEvents(
rxcpp::observable<ResolvedPageCacheFtraceEvent> resolved_events,
std::optional<std::string> blacklist_filter) {
bool has_re = blacklist_filter.has_value();
// default regex engine is ecmascript.
std::regex reg_exp{blacklist_filter ? *blacklist_filter : std::string("")};
return resolved_events.filter(
[reg_exp, has_re](const ResolvedPageCacheFtraceEvent& event) {
if (!has_re) {
return true;
}
// Remove any entries that match the regex in --blacklist-filter/-bf.
bool res = std::regex_search(event.filename, reg_exp);
if (res) {
LOG(VERBOSE) << "Blacklist filter removed '" << event.filename << "' from chain.";
}
return !res;
});
}
// Compile an observable chain of 'ResolvedPageCacheFtraceEvent' into
// an observable chain of distinct, timestamp-ordered, CompilerPageCacheEvent.
//
// This is a reducing operation: No items are emitted until resolved_events is completed.
auto/*observable<CompilerPageCacheEvent>*/ CompilePageCacheEvents(
rxcpp::observable<ResolvedPageCacheFtraceEvent> resolved_events) {
struct CompilerPageCacheEventIgnoringTimestampLess {
bool operator()(const CompilerPageCacheEvent& lhs,
const CompilerPageCacheEvent& rhs) const {
return CompilerPageCacheEvent::LessIgnoringTimestamp(lhs, rhs);
}
};
// Greedy O(N) compilation algorithm.
//
// This produces an inoptimal result (e.g. a small timestamp
// that might occur only 1% of the time nevertheless wins out), but the
// algorithm itself is quite simple, and doesn't require any heuristic tuning.
// First pass: *Merge* into set that ignores the timestamp value for order, but retains
// the smallest timestamp value if the same key is re-inserted.
using IgnoreTimestampForOrderingSet =
std::set<CompilerPageCacheEvent, CompilerPageCacheEventIgnoringTimestampLess>;
// Second pass: *Sort* data by smallest timestamp first.
using CompilerPageCacheEventSet =
std::set<CompilerPageCacheEvent>;
return resolved_events
.map(
[](ResolvedPageCacheFtraceEvent event) {
// Drop all the extra metadata like pid, cpu, etc.
// When we merge we could keep a list of the original data, but there is no advantage
// to doing so.
return CompilerPageCacheEvent{std::move(event)};
}
)
.reduce(
IgnoreTimestampForOrderingSet{},
[](IgnoreTimestampForOrderingSet set, CompilerPageCacheEvent event) {
// Add each event to the set, keying by everything but the timestamp.
// If the key is already inserted, re-insert with the smaller timestamp value.
auto it = set.find(event);
if (it == set.end()) {
// Need to insert new element.
set.insert(std::move(event));
} else if (it->timestamp_relative > event.timestamp_relative) {
// Replace existing element: the new element has a smaller timestamp.
it = set.erase(it);
// Amortized O(1) time if insertion happens in the position before the hint.
set.insert(it, std::move(event));
} // else: Skip insertion. Element already present with the minimum timestamp.
return set;
},
[](IgnoreTimestampForOrderingSet set) {
// Extract all elements from 'set', re-insert into 'ts_set'.
// The values are now ordered by timestamp (and then the rest of the fields).
CompilerPageCacheEventSet ts_set;
ts_set.merge(std::move(set));
std::shared_ptr<CompilerPageCacheEventSet> final_set{
new CompilerPageCacheEventSet{std::move(ts_set)}};
return final_set;
// return ts_set;
}) // observable<CompilerPageCacheEventSet> (just)
.flat_map(
[](std::shared_ptr<CompilerPageCacheEventSet> final_set) {
// TODO: flat_map seems to make a copy of the parameter _every single iteration_
// without the shared_ptr it would just make a copy of the set every time it went
// through the iterate function.
// Causing absurdly slow compile times x1000 slower than we wanted.
// TODO: file a bug upstream and/or fix upstream.
CompilerPageCacheEventSet& ts_set = *final_set;
// [](CompilerPageCacheEventSet& ts_set) {
LOG(DEBUG) << "compiler: Merge-pass completed (" << ts_set.size() << " entries).";
//return rxcpp::sources::iterate(std::move(ts_set));
return rxcpp::sources::iterate(ts_set).map([](CompilerPageCacheEvent e) { return e; });
}
); // observable<CompilerPageCacheEvent>
}
/** Makes a vector of info that includes filename and timestamp limit. */
std::vector<CompilationInput> MakeCompilationInputs(
std::vector<std::string> input_file_names,
std::vector<uint64_t> timestamp_limit_ns){
// If the timestamp limit is empty, set the limit to max value
// for each trace file.
if (timestamp_limit_ns.empty()) {
for (size_t i = 0; i < input_file_names.size(); i++) {
timestamp_limit_ns.push_back(std::numeric_limits<uint64_t>::max());
}
}
DCHECK_EQ(input_file_names.size(), timestamp_limit_ns.size());
std::vector<CompilationInput> file_infos;
for (size_t i = 0; i < input_file_names.size(); i++) {
file_infos.push_back({input_file_names[i], timestamp_limit_ns[i]});
}
return file_infos;
}
bool PerformCompilation(std::vector<CompilationInput> perfetto_traces,
std::string output_file_name,
bool output_proto,
std::optional<std::string> blacklist_filter,
inode2filename::InodeResolverDependencies dependencies) {
auto trace_protos = ReadPerfettoTraceProtos(std::move(perfetto_traces));
auto resolved_events = ResolvePageCacheEntriesFromProtos(std::move(trace_protos),
std::move(dependencies));
auto filtered_events =
ApplyBlacklistToPageCacheEvents(std::move(resolved_events), blacklist_filter);
auto compiled_events = CompilePageCacheEvents(std::move(filtered_events));
std::ofstream ofs;
if (!output_file_name.empty()) {
if (!output_proto) {
ofs.open(output_file_name);
if (!ofs) {
LOG(ERROR) << "compiler: Failed to open output file for writing: " << output_file_name;
return false;
}
}
}
auto trace_file_proto = serialize::ArenaPtr<serialize::proto::TraceFile>::Make();
// Fast lookup of filename -> FileIndex id.
std::unordered_map<std::string, int64_t /*file handle id*/> file_path_map;
int64_t file_handle_id = 0;
int counter = 0;
compiled_events
// .as_blocking()
.tap([&](CompilerPageCacheEvent& event) {
if (!output_proto) {
return;
}
if (!event.add_to_page_cache) {
// Skip DeleteFromPageCache events, they are only used for intermediate.
return;
}
DCHECK(trace_file_proto->mutable_index() != nullptr);
serialize::proto::TraceFileIndex& index = *trace_file_proto->mutable_index();
int64_t file_handle;
// Add TraceFileIndexEntry if it doesn't exist.
auto it = file_path_map.find(event.filename);
if (it == file_path_map.end()) {
file_handle = file_handle_id++;
file_path_map[event.filename] = file_handle;
serialize::proto::TraceFileIndexEntry* entry = index.add_entries();
DCHECK(entry != nullptr);
entry->set_id(file_handle);
entry->set_file_name(event.filename);
if (kIsDebugBuild) {
int i = static_cast<int>(file_handle);
const serialize::proto::TraceFileIndexEntry& entry_ex = index.entries(i);
DCHECK_EQ(entry->id(), entry_ex.id());
DCHECK_EQ(entry->file_name(), entry_ex.file_name());
}
} else {
file_handle = it->second;
}
int kPageSize = 4096; // TODO: don't hardcode the page size.
// Add TraceFileEntry.
DCHECK(trace_file_proto->mutable_list() != nullptr);
serialize::proto::TraceFileEntry* entry = trace_file_proto->mutable_list()->add_entries();
DCHECK(entry != nullptr);
entry->set_index_id(file_handle);
// Page index -> file offset in bytes.
entry->set_file_offset(static_cast<int64_t>(event.index) * kPageSize);
entry->set_file_length(kPageSize);
})
.subscribe([&](CompilerPageCacheEvent event) {
if (!output_proto) {
if (output_file_name.empty()) {
LOG(INFO) << "CompilerPageCacheEvent" << event << std::endl;
} else {
ofs << event << "\n"; // TODO: write in proto format instead.
}
}
++counter;
});
if (output_proto) {
LOG(DEBUG) << "compiler: WriteFully to begin into " << output_file_name;
::google::protobuf::MessageLite& message = *trace_file_proto.get();
if (auto res = serialize::ProtobufIO::WriteFully(message, output_file_name); !res) {
errno = res.error();
PLOG(ERROR) << "compiler: Failed to write protobuf to file: " << output_file_name;
return false;
} else {
LOG(INFO) << "compiler: Wrote protobuf " << output_file_name;
}
}
LOG(DEBUG) << "compiler: Compilation completed (" << counter << " events).";
return true;
}
} // namespace iorap::compiler