blob: 4ae8e93f39857e8b45cc156f7c7f4b5f2fd2f86e [file] [log] [blame]
/*
* Copyright (C) 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define STATSD_DEBUG false // STOPSHIP if true
#include "Log.h"
#include "ShellSubscriberClient.h"
#include "FieldValue.h"
#include "guardrail/StatsdStats.h"
#include "matchers/matcher_util.h"
#include "stats_log_util.h"
using android::base::unique_fd;
using android::util::ProtoOutputStream;
using Status = ::ndk::ScopedAStatus;
namespace android {
namespace os {
namespace statsd {
const static int FIELD_ID_SHELL_DATA__ATOM = 1;
const static int FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS = 2;
// Store next subscription ID for StatsdStats.
// Not thread-safe; should only be accessed while holding ShellSubscriber::mMutex lock.
static int nextSubId = 0;
struct ReadConfigResult {
vector<SimpleAtomMatcher> pushedMatchers;
vector<ShellSubscriberClient::PullInfo> pullInfo;
};
// Read and parse single config. There should only one config in the input.
static optional<ReadConfigResult> readConfig(const vector<uint8_t>& configBytes,
int64_t startTimeMs, int64_t minPullIntervalMs) {
// Parse the config.
ShellSubscription config;
if (!config.ParseFromArray(configBytes.data(), configBytes.size())) {
ALOGE("ShellSubscriberClient: failed to parse the config");
return nullopt;
}
ReadConfigResult result;
result.pushedMatchers.assign(config.pushed().begin(), config.pushed().end());
vector<ShellSubscriberClient::PullInfo> pullInfo;
for (const auto& pulled : config.pulled()) {
vector<string> packages;
vector<int32_t> uids;
for (const string& pkg : pulled.packages()) {
auto it = UidMap::sAidToUidMapping.find(pkg);
if (it != UidMap::sAidToUidMapping.end()) {
uids.push_back(it->second);
} else {
packages.push_back(pkg);
}
}
const int64_t pullIntervalMs = max(pulled.freq_millis(), minPullIntervalMs);
result.pullInfo.emplace_back(pulled.matcher(), startTimeMs, pullIntervalMs, packages, uids);
ALOGD("ShellSubscriberClient: adding matcher for pulled atom %d",
pulled.matcher().atom_id());
}
return result;
}
ShellSubscriberClient::PullInfo::PullInfo(const SimpleAtomMatcher& matcher, int64_t startTimeMs,
int64_t intervalMs,
const std::vector<std::string>& packages,
const std::vector<int32_t>& uids)
: mPullerMatcher(matcher),
mIntervalMs(intervalMs),
mPrevPullElapsedRealtimeMs(startTimeMs),
mPullPackages(packages),
mPullUids(uids) {
}
ShellSubscriberClient::ShellSubscriberClient(
int id, int out, const std::shared_ptr<IStatsSubscriptionCallback>& callback,
const std::vector<SimpleAtomMatcher>& pushedMatchers,
const std::vector<PullInfo>& pulledInfo, int64_t timeoutSec, int64_t startTimeSec,
const sp<UidMap>& uidMap, const sp<StatsPullerManager>& pullerMgr)
: mId(id),
mUidMap(uidMap),
mPullerMgr(pullerMgr),
mDupOut(fcntl(out, F_DUPFD_CLOEXEC, 0)),
mPushedMatchers(pushedMatchers),
mPulledInfo(pulledInfo),
mCallback(callback),
mTimeoutSec(timeoutSec),
mStartTimeSec(startTimeSec),
mLastWriteMs(startTimeSec * 1000),
mCacheSize(0){};
unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
int in, int out, int64_t timeoutSec, int64_t startTimeSec, const sp<UidMap>& uidMap,
const sp<StatsPullerManager>& pullerMgr) {
// Read the size of the config.
size_t bufferSize;
if (!android::base::ReadFully(in, &bufferSize, sizeof(bufferSize))) {
return nullptr;
}
// Check bufferSize
if (bufferSize > (kMaxSizeKb * 1024)) {
ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
"bytes)",
bufferSize, (kMaxSizeKb * 1024));
return nullptr;
}
// Read the config.
vector<uint8_t> buffer(bufferSize);
if (!android::base::ReadFully(in, buffer.data(), bufferSize)) {
ALOGE("ShellSubscriberClient: failed to read the config from file descriptor");
return nullptr;
}
const optional<ReadConfigResult> readConfigResult =
readConfig(buffer, startTimeSec * 1000, /* minPullIntervalMs */ 0);
if (!readConfigResult.has_value()) {
return nullptr;
}
return make_unique<ShellSubscriberClient>(
nextSubId++, out, /*callback=*/nullptr, readConfigResult->pushedMatchers,
readConfigResult->pullInfo, timeoutSec, startTimeSec, uidMap, pullerMgr);
}
unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
const vector<uint8_t>& subscriptionConfig,
const shared_ptr<IStatsSubscriptionCallback>& callback, int64_t startTimeSec,
const sp<UidMap>& uidMap, const sp<StatsPullerManager>& pullerMgr) {
if (callback == nullptr) {
ALOGE("ShellSubscriberClient: received nullptr callback");
return nullptr;
}
if (subscriptionConfig.size() > (kMaxSizeKb * 1024)) {
ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
"bytes)",
subscriptionConfig.size(), (kMaxSizeKb * 1024));
return nullptr;
}
const optional<ReadConfigResult> readConfigResult =
readConfig(subscriptionConfig, startTimeSec * 1000,
ShellSubscriberClient::kMinCallbackPullIntervalMs);
if (!readConfigResult.has_value()) {
return nullptr;
}
const int id = nextSubId++;
StatsdStats::getInstance().noteSubscriptionStarted(id, readConfigResult->pushedMatchers.size(),
readConfigResult->pullInfo.size());
return make_unique<ShellSubscriberClient>(
id, /*out=*/-1, callback, readConfigResult->pushedMatchers, readConfigResult->pullInfo,
/*timeoutSec=*/-1, startTimeSec, uidMap, pullerMgr);
}
bool ShellSubscriberClient::writeEventToProtoIfMatched(const LogEvent& event,
const SimpleAtomMatcher& matcher,
const sp<UidMap>& uidMap) {
auto [matched, transformedEvent] = matchesSimple(mUidMap, matcher, event);
if (!matched) {
return false;
}
const LogEvent& eventRef = transformedEvent == nullptr ? event : *transformedEvent;
// Cache atom event in mProtoOut.
uint64_t atomToken = mProtoOut.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED |
FIELD_ID_SHELL_DATA__ATOM);
eventRef.ToProto(mProtoOut);
mProtoOut.end(atomToken);
const int64_t timestampNs = truncateTimestampIfNecessary(eventRef);
mProtoOut.write(util::FIELD_TYPE_INT64 | util::FIELD_COUNT_REPEATED |
FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS,
static_cast<long long>(timestampNs));
// Update byte size of cached data.
mCacheSize += getSize(eventRef.getValues()) + sizeof(timestampNs);
return true;
}
// Called by ShellSubscriber when a pushed event occurs
void ShellSubscriberClient::onLogEvent(const LogEvent& event) {
for (const auto& matcher : mPushedMatchers) {
if (writeEventToProtoIfMatched(event, matcher, mUidMap)) {
flushProtoIfNeeded();
break;
}
}
}
void ShellSubscriberClient::flushProtoIfNeeded() {
if (mCallback == nullptr) { // Using file descriptor.
triggerFdFlush();
} else if (mCacheSize >= kMaxCacheSizeBytes) { // Using callback.
// Flush data if cache is full.
triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED);
}
}
int64_t ShellSubscriberClient::pullIfNeeded(int64_t nowSecs, int64_t nowMillis, int64_t nowNanos) {
int64_t sleepTimeMs = 24 * 60 * 60 * 1000; // 24 hours.
for (PullInfo& pullInfo : mPulledInfo) {
if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs <= nowMillis) {
vector<int32_t> uids;
getUidsForPullAtom(&uids, pullInfo);
vector<shared_ptr<LogEvent>> data;
mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data);
VLOG("ShellSubscriberClient: pulled %zu atoms with id %d", data.size(),
pullInfo.mPullerMatcher.atom_id());
if (mCallback != nullptr) { // Callback subscription
StatsdStats::getInstance().noteSubscriptionAtomPulled(
pullInfo.mPullerMatcher.atom_id());
}
writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
}
// Determine how long to sleep before doing more work.
const int64_t nextPullTimeMs = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs;
const int64_t timeBeforePullMs =
nextPullTimeMs - nowMillis; // guaranteed to be non-negative
sleepTimeMs = min(sleepTimeMs, timeBeforePullMs);
}
return sleepTimeMs;
}
// The pullAndHeartbeat threads sleep for the minimum time
// among all clients' input
int64_t ShellSubscriberClient::pullAndSendHeartbeatsIfNeeded(int64_t nowSecs, int64_t nowMillis,
int64_t nowNanos) {
int64_t sleepTimeMs;
if (mCallback == nullptr) { // File descriptor subscription
if ((nowSecs - mStartTimeSec >= mTimeoutSec) && (mTimeoutSec > 0)) {
mClientAlive = false;
return kMsBetweenHeartbeats;
}
sleepTimeMs = min(kMsBetweenHeartbeats, pullIfNeeded(nowSecs, nowMillis, nowNanos));
// Send a heartbeat consisting of data size of 0, if
// the user hasn't recently received data from statsd. When it receives the data size of 0,
// the user will not expect any atoms and recheck whether the subscription should end.
if (nowMillis - mLastWriteMs >= kMsBetweenHeartbeats) {
triggerFdFlush();
if (!mClientAlive) return kMsBetweenHeartbeats;
}
int64_t timeBeforeHeartbeat = mLastWriteMs + kMsBetweenHeartbeats - nowMillis;
sleepTimeMs = min(sleepTimeMs, timeBeforeHeartbeat);
} else { // Callback subscription.
sleepTimeMs = min(kMsBetweenCallbacks, pullIfNeeded(nowSecs, nowMillis, nowNanos));
if (mCacheSize > 0 && nowMillis - mLastWriteMs >= kMsBetweenCallbacks) {
// Flush data if cache has kept data for longer than kMsBetweenCallbacks.
triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED);
}
// Cache should be flushed kMsBetweenCallbacks after mLastWrite.
const int64_t timeToCallbackMs = mLastWriteMs + kMsBetweenCallbacks - nowMillis;
// For callback subscriptions, ensure minimum sleep time is at least
// kMinCallbackSleepIntervalMs. Even if there is less than kMinCallbackSleepIntervalMs left
// before next pull time, sleep for at least kMinCallbackSleepIntervalMs. This has the
// effect of multiple pulled atoms that have a pull within kMinCallbackSleepIntervalMs from
// now to have their pulls batched together, mitigating frequent wakeups of the puller
// thread.
sleepTimeMs = max(kMinCallbackSleepIntervalMs, min(sleepTimeMs, timeToCallbackMs));
}
return sleepTimeMs;
}
void ShellSubscriberClient::writePulledAtomsLocked(const vector<shared_ptr<LogEvent>>& data,
const SimpleAtomMatcher& matcher) {
bool hasData = false;
for (const shared_ptr<LogEvent>& event : data) {
if (writeEventToProtoIfMatched(*event, matcher, mUidMap)) {
hasData = true;
}
}
if (hasData) {
flushProtoIfNeeded();
}
}
// Tries to write the atom encoded in mProtoOut to the pipe. If the write fails
// because the read end of the pipe has closed, change the client status so
// the manager knows the subscription is no longer active
void ShellSubscriberClient::attemptWriteToPipeLocked() {
const size_t dataSize = mProtoOut.size();
// First, write the payload size.
if (!android::base::WriteFully(mDupOut, &dataSize, sizeof(dataSize))) {
mClientAlive = false;
return;
}
// Then, write the payload if this is not just a heartbeat.
if (dataSize > 0 && !mProtoOut.flush(mDupOut.get())) {
mClientAlive = false;
return;
}
mLastWriteMs = getElapsedRealtimeMillis();
}
void ShellSubscriberClient::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
// This is slow. Consider storing the uids per app and listening to uidmap updates.
for (const string& pkg : pullInfo.mPullPackages) {
set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
}
uids->push_back(DEFAULT_PULL_UID);
}
void ShellSubscriberClient::clearCache() {
mProtoOut.clear();
mCacheSize = 0;
}
void ShellSubscriberClient::triggerFdFlush() {
attemptWriteToPipeLocked();
clearCache();
}
void ShellSubscriberClient::triggerCallback(StatsSubscriptionCallbackReason reason) {
// Invoke Binder callback with cached event data.
vector<uint8_t> payloadBytes;
mProtoOut.serializeToVector(&payloadBytes);
StatsdStats::getInstance().noteSubscriptionFlushed(mId);
const Status status = mCallback->onSubscriptionData(reason, payloadBytes);
if (status.getStatus() == STATUS_DEAD_OBJECT &&
status.getExceptionCode() == EX_TRANSACTION_FAILED) {
mClientAlive = false;
return;
}
mLastWriteMs = getElapsedRealtimeMillis();
clearCache();
}
void ShellSubscriberClient::flush() {
triggerCallback(StatsSubscriptionCallbackReason::FLUSH_REQUESTED);
}
void ShellSubscriberClient::onUnsubscribe() {
StatsdStats::getInstance().noteSubscriptionEnded(mId);
if (mClientAlive) {
triggerCallback(StatsSubscriptionCallbackReason::SUBSCRIPTION_ENDED);
}
}
void ShellSubscriberClient::addAllAtomIds(LogEventFilter::AtomIdSet& allAtomIds) const {
for (const auto& matcher : mPushedMatchers) {
allAtomIds.insert(matcher.atom_id());
}
}
} // namespace statsd
} // namespace os
} // namespace android