blob: eb93772dff22b09a29e0062b0b2b4c0f5e3d256a [file] [log] [blame]
/*
* Copyright (C) 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define STATSD_DEBUG false // STOPSHIP if true
#include "Log.h"
#include "ShellSubscriberClient.h"
#include "matchers/matcher_util.h"
#include "stats_log_util.h"
using android::base::unique_fd;
using android::util::ProtoOutputStream;
namespace android {
namespace os {
namespace statsd {
const static int FIELD_ID_ATOM = 1;
// Called by ShellSubscriber when a pushed event occurs
void ShellSubscriberClient::onLogEvent(const LogEvent& event) {
mProto.clear();
for (const auto& matcher : mPushedMatchers) {
if (matchesSimple(mUidMap, matcher, event)) {
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
event.ToProto(mProto);
mProto.end(atomToken);
attemptWriteToPipeLocked(mProto.size());
}
}
}
// Read and parse single config. There should only one config per input.
bool ShellSubscriberClient::readConfig() {
// Read the size of the config.
size_t bufferSize;
if (!android::base::ReadFully(mDupIn.get(), &bufferSize, sizeof(bufferSize))) {
return false;
}
// Check bufferSize
if (bufferSize > (kMaxSizeKb * 1024)) {
ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
"bytes)",
bufferSize, (kMaxSizeKb * 1024));
return false;
}
// Read the config.
vector<uint8_t> buffer(bufferSize);
if (!android::base::ReadFully(mDupIn.get(), buffer.data(), bufferSize)) {
return false;
}
// Parse the config.
ShellSubscription config;
if (!config.ParseFromArray(buffer.data(), bufferSize)) {
return false;
}
// Update SubscriptionInfo with state from config
for (const auto& pushed : config.pushed()) {
mPushedMatchers.push_back(pushed);
}
for (const auto& pulled : config.pulled()) {
vector<string> packages;
vector<int32_t> uids;
for (const string& pkg : pulled.packages()) {
auto it = UidMap::sAidToUidMapping.find(pkg);
if (it != UidMap::sAidToUidMapping.end()) {
uids.push_back(it->second);
} else {
packages.push_back(pkg);
}
}
mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis(), packages, uids);
ALOGD("ShellSubscriberClient: adding matcher for pulled atom %d",
pulled.matcher().atom_id());
}
return true;
}
// The pullAndHeartbeat threads sleep for the minimum time
// among all clients' input
int64_t ShellSubscriberClient::pullAndSendHeartbeatsIfNeeded(int64_t nowSecs, int64_t nowMillis,
int64_t nowNanos) {
int64_t sleepTimeMs = kMsBetweenHeartbeats;
if ((nowSecs - mStartTimeSec >= mTimeoutSec) && (mTimeoutSec > 0)) {
mClientAlive = false;
return sleepTimeMs;
}
for (PullInfo& pullInfo : mPulledInfo) {
if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) {
continue;
}
vector<int32_t> uids;
getUidsForPullAtom(&uids, pullInfo);
vector<std::shared_ptr<LogEvent>> data;
mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data);
VLOG("ShellSubscriberClient: pulled %zu atoms with id %d", data.size(),
pullInfo.mPullerMatcher.atom_id());
writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
}
// Send a heartbeat, consisting of a data size of 0, if the user hasn't recently received
// data from statsd. When it receives the data size of 0, the user will not expect any
// atoms and recheck whether the subscription should end.
if (nowMillis - mLastWriteMs >= kMsBetweenHeartbeats) {
attemptWriteToPipeLocked(/*dataSize=*/0);
if (!mClientAlive) return kMsBetweenHeartbeats;
}
// Determine how long to sleep before doing more work.
for (PullInfo& pullInfo : mPulledInfo) {
int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval;
int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative
sleepTimeMs = std::min(sleepTimeMs, timeBeforePull);
}
int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis;
sleepTimeMs = std::min(sleepTimeMs, timeBeforeHeartbeat);
return sleepTimeMs;
}
void ShellSubscriberClient::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
const SimpleAtomMatcher& matcher) {
mProto.clear();
int count = 0;
for (const auto& event : data) {
if (matchesSimple(mUidMap, matcher, *event)) {
count++;
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
event->ToProto(mProto);
mProto.end(atomToken);
}
}
if (count > 0) attemptWriteToPipeLocked(mProto.size());
}
// Tries to write the atom encoded in mProto to the pipe. If the write fails
// because the read end of the pipe has closed, change the client status so
// the manager knows the subscription is no longer active
void ShellSubscriberClient::attemptWriteToPipeLocked(size_t dataSize) {
// First, write the payload size.
if (!android::base::WriteFully(mDupOut, &dataSize, sizeof(dataSize))) {
mClientAlive = false;
return;
}
// Then, write the payload if this is not just a heartbeat.
if (dataSize > 0 && !mProto.flush(mDupOut.get())) {
mClientAlive = false;
return;
}
mLastWriteMs = getElapsedRealtimeMillis();
}
void ShellSubscriberClient::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
// This is slow. Consider storing the uids per app and listening to uidmap updates.
for (const string& pkg : pullInfo.mPullPackages) {
set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
}
uids->push_back(DEFAULT_PULL_UID);
}
} // namespace statsd
} // namespace os
} // namespace android