blob: 1544aa6a16211cd28ddce00e5fea0a2726703b58 [file] [log] [blame]
/*
* Copyright (C) 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.android.adservices.service.measurement.attribution;
import static com.android.adservices.service.measurement.PrivacyParams.AGGREGATE_REPORT_DELAY_SPAN;
import static com.android.adservices.service.measurement.PrivacyParams.AGGREGATE_REPORT_MIN_DELAY;
import static com.android.adservices.service.stats.AdServicesStatsLog.AD_SERVICES_MEASUREMENT_ATTRIBUTION;
import static com.android.adservices.service.stats.AdServicesStatsLog.AD_SERVICES_MEASUREMENT_DELAYED_SOURCE_REGISTRATION;
import android.annotation.NonNull;
import android.net.Uri;
import android.util.Pair;
import com.android.adservices.LoggerFactory;
import com.android.adservices.data.measurement.DatastoreException;
import com.android.adservices.data.measurement.DatastoreManager;
import com.android.adservices.data.measurement.IMeasurementDao;
import com.android.adservices.service.AdServicesConfig;
import com.android.adservices.service.Flags;
import com.android.adservices.service.FlagsFactory;
import com.android.adservices.service.common.WebAddresses;
import com.android.adservices.service.measurement.AttributedTrigger;
import com.android.adservices.service.measurement.Attribution;
import com.android.adservices.service.measurement.AttributionConfig;
import com.android.adservices.service.measurement.EventReport;
import com.android.adservices.service.measurement.EventSurfaceType;
import com.android.adservices.service.measurement.EventTrigger;
import com.android.adservices.service.measurement.FilterMap;
import com.android.adservices.service.measurement.PrivacyParams;
import com.android.adservices.service.measurement.Source;
import com.android.adservices.service.measurement.Trigger;
import com.android.adservices.service.measurement.TriggerSpec;
import com.android.adservices.service.measurement.TriggerSpecs;
import com.android.adservices.service.measurement.aggregation.AggregatableAttributionSource;
import com.android.adservices.service.measurement.aggregation.AggregatableAttributionTrigger;
import com.android.adservices.service.measurement.aggregation.AggregateAttributionData;
import com.android.adservices.service.measurement.aggregation.AggregateDeduplicationKey;
import com.android.adservices.service.measurement.aggregation.AggregateHistogramContribution;
import com.android.adservices.service.measurement.aggregation.AggregatePayloadGenerator;
import com.android.adservices.service.measurement.aggregation.AggregateReport;
import com.android.adservices.service.measurement.noising.SourceNoiseHandler;
import com.android.adservices.service.measurement.reporting.DebugKeyAccessor;
import com.android.adservices.service.measurement.reporting.DebugReportApi;
import com.android.adservices.service.measurement.reporting.DebugReportApi.Type;
import com.android.adservices.service.measurement.reporting.EventReportWindowCalcDelegate;
import com.android.adservices.service.measurement.util.BaseUriExtractor;
import com.android.adservices.service.measurement.util.Filter;
import com.android.adservices.service.measurement.util.UnsignedLong;
import com.android.adservices.service.stats.AdServicesLogger;
import com.android.adservices.service.stats.AdServicesLoggerImpl;
import com.android.adservices.service.stats.MeasurementAttributionStats;
import com.android.adservices.service.stats.MeasurementDelayedSourceRegistrationStats;
import com.google.common.collect.ImmutableList;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
class AttributionJobHandler {
private static final String API_VERSION = "0.1";
private static final String AGGREGATE_REPORT_DELAY_DELIMITER = ",";
private final DatastoreManager mDatastoreManager;
private final DebugReportApi mDebugReportApi;
private final EventReportWindowCalcDelegate mEventReportWindowCalcDelegate;
private final SourceNoiseHandler mSourceNoiseHandler;
private final AdServicesLogger mLogger;
private final XnaSourceCreator mXnaSourceCreator;
private final Flags mFlags;
private final Filter mFilter;
private enum TriggeringStatus {
DROPPED,
ATTRIBUTED
}
AttributionJobHandler(DatastoreManager datastoreManager, DebugReportApi debugReportApi) {
this(
datastoreManager,
FlagsFactory.getFlags(),
debugReportApi,
new EventReportWindowCalcDelegate(FlagsFactory.getFlags()),
new SourceNoiseHandler(FlagsFactory.getFlags()),
AdServicesLoggerImpl.getInstance(),
new XnaSourceCreator(FlagsFactory.getFlags()));
}
AttributionJobHandler(
DatastoreManager datastoreManager,
Flags flags,
DebugReportApi debugReportApi,
EventReportWindowCalcDelegate eventReportWindowCalcDelegate,
SourceNoiseHandler sourceNoiseHandler,
AdServicesLogger logger,
XnaSourceCreator xnaSourceCreator) {
mDatastoreManager = datastoreManager;
mFlags = flags;
mDebugReportApi = debugReportApi;
mEventReportWindowCalcDelegate = eventReportWindowCalcDelegate;
mSourceNoiseHandler = sourceNoiseHandler;
mLogger = logger;
mXnaSourceCreator = xnaSourceCreator;
mFilter = new Filter(mFlags);
}
/**
* Perform attribution by finding relevant {@link Source} and generates {@link EventReport}.
*
* @return false if there are datastore failures or pending {@link Trigger} left, true otherwise
*/
boolean performPendingAttributions() {
Optional<List<String>> pendingTriggersOpt = mDatastoreManager
.runInTransactionWithResult(IMeasurementDao::getPendingTriggerIds);
if (!pendingTriggersOpt.isPresent()) {
// Failure during trigger retrieval
// Reschedule for retry
return false;
}
List<String> pendingTriggers = pendingTriggersOpt.get();
for (int i = 0;
i < pendingTriggers.size()
&& i < mFlags.getMeasurementMaxAttributionsPerInvocation();
i++) {
AttributionStatus attributionStatus = new AttributionStatus();
boolean success = performAttribution(pendingTriggers.get(i), attributionStatus);
logAttributionStats(attributionStatus);
if (!success) {
// Failure during trigger attribution
// Reschedule for retry
return false;
}
}
// Reschedule if there are unprocessed pending triggers.
return mFlags.getMeasurementMaxAttributionsPerInvocation() >= pendingTriggers.size();
}
/**
* Perform attribution for {@code triggerId}.
*
* @param triggerId datastore id of the {@link Trigger}
* @return success
*/
private boolean performAttribution(String triggerId, AttributionStatus attributionStatus) {
return mDatastoreManager.runInTransaction(
measurementDao -> {
Trigger trigger = measurementDao.getTrigger(triggerId);
attributionStatus.setAttributionDelay(
System.currentTimeMillis() - trigger.getTriggerTime());
if (trigger.getStatus() != Trigger.Status.PENDING) {
attributionStatus.setFailureTypeFromTriggerStatus(trigger.getStatus());
return;
}
Optional<Pair<Source, List<Source>>> sourceOpt =
selectSourceToAttribute(trigger, measurementDao, attributionStatus);
// Log competing source that did not win attribution because of delay
Optional<Source> matchingDelayedSource =
measurementDao.getNearestDelayedMatchingActiveSource(trigger);
if (matchingDelayedSource.isPresent()) {
logDelayedSourceRegistrationStats(matchingDelayedSource.get(), trigger);
}
if (sourceOpt.isEmpty()) {
mDebugReportApi.scheduleTriggerNoMatchingSourceDebugReport(
trigger, measurementDao, Type.TRIGGER_NO_MATCHING_SOURCE);
attributionStatus.setAttributionResult(
AttributionStatus.AttributionResult.NOT_ATTRIBUTED);
attributionStatus.setFailureType(
AttributionStatus.FailureType.NO_MATCHING_SOURCE);
ignoreTrigger(trigger, measurementDao);
return;
}
Source source = sourceOpt.get().first;
List<Source> remainingMatchingSources = sourceOpt.get().second;
attributionStatus.setSourceType(source.getSourceType());
attributionStatus.setSurfaceTypeFromSourceAndTrigger(source, trigger);
attributionStatus.setSourceRegistrant(source.getRegistrant().toString());
if (source.isInstallAttributed()) {
attributionStatus.setInstallAttribution(true);
}
if (!doTopLevelFiltersMatch(source, trigger, measurementDao)) {
attributionStatus.setAttributionResult(
AttributionStatus.AttributionResult.NOT_ATTRIBUTED);
attributionStatus.setFailureType(
AttributionStatus.FailureType.TOP_LEVEL_FILTER_MATCH_FAILURE);
ignoreTrigger(trigger, measurementDao);
return;
}
if (mFlags.getMeasurementEnableSourceDeactivationAfterFiltering()) {
ignoreCompetingSources(
measurementDao,
remainingMatchingSources,
trigger.getEnrollmentId());
}
if (shouldAttributionBeBlockedByRateLimits(source, trigger, measurementDao)) {
attributionStatus.setAttributionResult(
AttributionStatus.AttributionResult.NOT_ATTRIBUTED);
attributionStatus.setFailureType(
AttributionStatus.FailureType.RATE_LIMIT_EXCEEDED);
ignoreTrigger(trigger, measurementDao);
return;
}
TriggeringStatus aggregateTriggeringStatus =
maybeGenerateAggregateReport(
source, trigger, measurementDao, attributionStatus);
TriggeringStatus eventTriggeringStatus =
maybeGenerateEventReport(
source, trigger, measurementDao, attributionStatus);
boolean isEventTriggeringStatusAttributed =
eventTriggeringStatus == TriggeringStatus.ATTRIBUTED;
boolean isAggregateTriggeringStatusAttributed =
aggregateTriggeringStatus == TriggeringStatus.ATTRIBUTED;
if (isEventTriggeringStatusAttributed
|| isAggregateTriggeringStatusAttributed) {
if (!mFlags.getMeasurementEnableSourceDeactivationAfterFiltering()) {
ignoreCompetingSources(
measurementDao,
remainingMatchingSources,
trigger.getEnrollmentId());
}
attributeTrigger(trigger, measurementDao);
if (mFlags.getMeasurementEnableScopedAttributionRateLimit()) {
if (isEventTriggeringStatusAttributed) {
insertAttribution(Attribution.Scope.EVENT, source, trigger,
measurementDao);
}
if (isAggregateTriggeringStatusAttributed) {
insertAttribution(Attribution.Scope.AGGREGATE, source, trigger,
measurementDao);
}
} else {
insertAttribution(source, trigger, measurementDao);
}
attributionStatus.setAttributionResult(
isAggregateTriggeringStatusAttributed,
isEventTriggeringStatusAttributed);
} else {
attributionStatus.setAttributionResult(
AttributionStatus.AttributionResult.NOT_ATTRIBUTED);
// TODO (b/309323690) Consider logging implications for scoped attribution
// rate limit.
attributionStatus.setFailureType(
AttributionStatus.FailureType.NO_REPORTS_GENERATED);
ignoreTrigger(trigger, measurementDao);
}
});
}
private boolean shouldAttributionBeBlockedByRateLimits(
Source source, Trigger trigger, IMeasurementDao measurementDao)
throws DatastoreException {
if ((!mFlags.getMeasurementEnableScopedAttributionRateLimit()
&& !hasAttributionQuota(source, trigger, measurementDao))
|| !isReportingOriginWithinPrivacyBounds(source, trigger, measurementDao)) {
LoggerFactory.getMeasurementLogger()
.d(
"Attribution blocked by rate limits. Source ID: %s ; Trigger ID: %s ",
source.getId(), trigger.getId());
return true;
}
return false;
}
private TriggeringStatus maybeGenerateAggregateReport(
Source source,
Trigger trigger,
IMeasurementDao measurementDao,
AttributionStatus attributionStatus)
throws DatastoreException {
if (mFlags.getMeasurementEnableScopedAttributionRateLimit()
&& !hasAttributionQuota(
Attribution.Scope.AGGREGATE, source, trigger, measurementDao)) {
LoggerFactory.getMeasurementLogger()
.d("Attribution blocked by aggregate rate limits. Source ID: %s ; "
+ "Trigger ID: %s ", source.getId(), trigger.getId());
return TriggeringStatus.DROPPED;
}
if (trigger.getTriggerTime() >= source.getAggregatableReportWindow()) {
mDebugReportApi.scheduleTriggerDebugReport(
source,
trigger,
null,
measurementDao,
Type.TRIGGER_AGGREGATE_REPORT_WINDOW_PASSED);
return TriggeringStatus.DROPPED;
}
int numReportsPerDestination =
measurementDao.getNumAggregateReportsPerDestination(
trigger.getAttributionDestination(), trigger.getDestinationType());
if (numReportsPerDestination >= mFlags.getMeasurementMaxAggregateReportsPerDestination()) {
LoggerFactory.getMeasurementLogger()
.d(
String.format(
Locale.ENGLISH,
"Aggregate reports for destination %1$s exceeds system health"
+ " limit of %2$d.",
trigger.getAttributionDestination(),
mFlags.getMeasurementMaxAggregateReportsPerDestination()));
mDebugReportApi.scheduleTriggerDebugReport(
source,
trigger,
String.valueOf(numReportsPerDestination),
measurementDao,
Type.TRIGGER_AGGREGATE_STORAGE_LIMIT);
return TriggeringStatus.DROPPED;
}
if (mFlags.getMeasurementEnableMaxAggregateReportsPerSource()) {
int numReportsPerSource =
measurementDao.getNumAggregateReportsPerSource(source.getId());
if (numReportsPerSource >= mFlags.getMeasurementMaxAggregateReportsPerSource()) {
LoggerFactory.getMeasurementLogger()
.d(
String.format(
Locale.ENGLISH,
"Aggregate reports for source %1$s exceeds system"
+ " health limit of %2$d.",
source.getId(),
mFlags.getMeasurementMaxAggregateReportsPerSource()));
mDebugReportApi.scheduleTriggerDebugReport(
source,
trigger,
String.valueOf(numReportsPerSource),
measurementDao,
Type.TRIGGER_AGGREGATE_EXCESSIVE_REPORTS);
return TriggeringStatus.DROPPED;
}
}
try {
Optional<AggregateDeduplicationKey> aggregateDeduplicationKeyOptional =
maybeGetAggregateDeduplicationKey(source, trigger);
if (aggregateDeduplicationKeyOptional.isPresent()
&& source.getAggregateReportDedupKeys()
.contains(
aggregateDeduplicationKeyOptional
.get()
.getDeduplicationKey()
.get())) {
mDebugReportApi.scheduleTriggerDebugReport(
source,
trigger,
/* limit = */ null,
measurementDao,
Type.TRIGGER_AGGREGATE_DEDUPLICATED);
return TriggeringStatus.DROPPED;
}
Optional<List<AggregateHistogramContribution>> contributions =
new AggregatePayloadGenerator(mFlags)
.generateAttributionReport(source, trigger);
if (!contributions.isPresent()) {
if (source.getAggregatableAttributionSource(trigger, mFlags).isPresent()
&& trigger.getAggregatableAttributionTrigger(mFlags).isPresent()) {
mDebugReportApi.scheduleTriggerDebugReport(
source,
trigger,
/* limit = */ null,
measurementDao,
Type.TRIGGER_AGGREGATE_NO_CONTRIBUTIONS);
}
return TriggeringStatus.DROPPED;
}
OptionalInt newAggregateContributions =
validateAndGetUpdatedAggregateContributions(
contributions.get(), source, trigger, measurementDao);
if (!newAggregateContributions.isPresent()) {
LoggerFactory.getMeasurementLogger()
.d(
"Aggregate contributions exceeded bound. Source ID: %s ; "
+ "Trigger ID: %s ",
source.getId(), trigger.getId());
return TriggeringStatus.DROPPED;
}
source.setAggregateContributions(newAggregateContributions.getAsInt());
long randomTime = getAggregateReportDelay();
Pair<UnsignedLong, UnsignedLong> debugKeyPair =
new DebugKeyAccessor(measurementDao).getDebugKeys(source, trigger);
UnsignedLong sourceDebugKey = debugKeyPair.first;
UnsignedLong triggerDebugKey = debugKeyPair.second;
int debugReportStatus = AggregateReport.DebugReportStatus.NONE;
if (sourceDebugKey != null && triggerDebugKey != null) {
debugReportStatus = AggregateReport.DebugReportStatus.PENDING;
}
AggregateReport.Builder aggregateReportBuilder =
new AggregateReport.Builder()
// TODO: b/254855494 unused field, incorrect value; cleanup
.setPublisher(source.getRegistrant())
.setAttributionDestination(trigger.getAttributionDestinationBaseUri())
.setSourceRegistrationTime(roundDownToDay(source.getEventTime()))
.setScheduledReportTime(trigger.getTriggerTime() + randomTime)
.setEnrollmentId(trigger.getEnrollmentId())
.setDebugCleartextPayload(
AggregateReport.generateDebugPayload(contributions.get()))
.setAggregateAttributionData(
new AggregateAttributionData.Builder()
.setContributions(contributions.get())
.build())
.setStatus(AggregateReport.Status.PENDING)
.setDebugReportStatus(debugReportStatus)
.setApiVersion(API_VERSION)
.setSourceDebugKey(sourceDebugKey)
.setTriggerDebugKey(triggerDebugKey)
.setSourceId(source.getId())
.setTriggerId(trigger.getId())
.setRegistrationOrigin(trigger.getRegistrationOrigin());
if (trigger.getAggregationCoordinatorOrigin() != null) {
aggregateReportBuilder.setAggregationCoordinatorOrigin(
trigger.getAggregationCoordinatorOrigin());
} else {
aggregateReportBuilder.setAggregationCoordinatorOrigin(
Uri.parse(
AdServicesConfig
.getMeasurementDefaultAggregationCoordinatorOrigin()));
}
if (aggregateDeduplicationKeyOptional.isPresent()) {
aggregateReportBuilder.setDedupKey(
aggregateDeduplicationKeyOptional.get().getDeduplicationKey().get());
}
AggregateReport aggregateReport = aggregateReportBuilder.build();
if (mFlags.getMeasurementNullAggregateReportEnabled()) {
generateNullAggregateReports(trigger, aggregateReport, measurementDao);
}
finalizeAggregateReportCreation(
source, aggregateDeduplicationKeyOptional, aggregateReport, measurementDao);
incrementAggregateReportCountBy(attributionStatus, 1);
if (aggregateReport.getDebugReportStatus()
== AggregateReport.DebugReportStatus.PENDING) {
incrementAggregateDebugReportCountBy(attributionStatus, 1);
}
// TODO (b/230618328): read from DB and upload unencrypted aggregate report.
return TriggeringStatus.ATTRIBUTED;
} catch (JSONException e) {
LoggerFactory.getMeasurementLogger()
.e(
e,
"AttributionJobHandler::maybeGenerateAggregateReport JSONException when"
+ " parse aggregate fields.");
return TriggeringStatus.DROPPED;
}
}
private void generateNullAggregateReports(
Trigger trigger, AggregateReport aggregateReport, IMeasurementDao measurementDao)
throws DatastoreException, JSONException {
long maxSourceExpiry =
mFlags.getMeasurementMaxReportingRegisterSourceExpirationInSeconds()
* TimeUnit.SECONDS.toMillis(1);
maxSourceExpiry = roundDownToDay(maxSourceExpiry);
long roundedAttributedSourceTime =
roundDownToDay(aggregateReport.getSourceRegistrationTime());
float nullRate = mFlags.getMeasurementNullAggReportRateInclSourceRegistrationTime();
for (long daysInMillis = 0L;
daysInMillis <= maxSourceExpiry;
daysInMillis += TimeUnit.DAYS.toMillis(1)) {
long fakeSourceTime = trigger.getTriggerTime() - daysInMillis;
if (roundDownToDay(fakeSourceTime) == roundedAttributedSourceTime) {
continue;
}
if (Math.random() < nullRate) {
AggregateReport nullReport = getNullAggregateReport(trigger, fakeSourceTime);
measurementDao.insertAggregateReport(nullReport);
}
}
}
private AggregateReport getNullAggregateReport(Trigger trigger, long sourceTime)
throws JSONException {
AggregateReport.Builder nullReportBuilder =
new AggregateReport.Builder()
.getNullAggregateReportBuilder(
trigger, sourceTime, getAggregateReportDelay(), API_VERSION);
if (mFlags.getMeasurementEnableAggregatableReportPayloadPadding()) {
AggregateHistogramContribution paddingContribution =
new AggregateHistogramContribution.Builder().setPaddingContribution().build();
List<AggregateHistogramContribution> contributions = new ArrayList<>();
contributions.add(paddingContribution);
AggregatePayloadGenerator generator = new AggregatePayloadGenerator(mFlags);
generator.padContributions(contributions, paddingContribution);
nullReportBuilder.setDebugCleartextPayload(
AggregateReport.generateDebugPayload(contributions));
}
return nullReportBuilder.build();
}
private Optional<Pair<Source, List<Source>>> selectSourceToAttribute(
Trigger trigger, IMeasurementDao measurementDao, AttributionStatus attributionStatus)
throws DatastoreException {
List<Source> matchingSources;
if (!mFlags.getMeasurementEnableXNA() || trigger.getAttributionConfig() == null) {
matchingSources = measurementDao.getMatchingActiveSources(trigger);
} else {
// XNA attribution is possible
Set<String> enrollmentIds = extractEnrollmentIds(trigger.getAttributionConfig());
List<Source> allSources =
measurementDao.fetchTriggerMatchingSourcesForXna(trigger, enrollmentIds);
List<Source> triggerEnrollmentMatchingSources = new ArrayList<>();
List<Source> otherEnrollmentBasedSources = new ArrayList<>();
for (Source source : allSources) {
if (Objects.equals(source.getEnrollmentId(), trigger.getEnrollmentId())) {
triggerEnrollmentMatchingSources.add(source);
} else {
otherEnrollmentBasedSources.add(source);
}
}
List<Source> derivedSources =
mXnaSourceCreator.generateDerivedSources(trigger, otherEnrollmentBasedSources);
matchingSources = new ArrayList<>();
matchingSources.addAll(triggerEnrollmentMatchingSources);
matchingSources.addAll(derivedSources);
}
if (matchingSources.isEmpty()) {
return Optional.empty();
}
// Sort based on isInstallAttributed, Priority and Event Time.
// Is a valid install-attributed source.
Function<Source, Boolean> installAttributionComparator =
(Source source) ->
source.isInstallAttributed()
&& isWithinInstallCooldownWindow(source, trigger);
matchingSources.sort(
Comparator.comparing(installAttributionComparator, Comparator.reverseOrder())
.thenComparing(Source::getPriority, Comparator.reverseOrder())
.thenComparing(Source::getEventTime, Comparator.reverseOrder()));
Source selectedSource = matchingSources.remove(0);
if (selectedSource.getParentId() != null) {
attributionStatus.setSourceDerived(true);
}
return Optional.of(Pair.create(selectedSource, matchingSources));
}
private Set<String> extractEnrollmentIds(String attributionConfigsString) {
Set<String> enrollmentIds = new HashSet<>();
try {
JSONArray attributionConfigsJsonArray = new JSONArray(attributionConfigsString);
for (int i = 0; i < attributionConfigsJsonArray.length(); i++) {
JSONObject attributionConfigJson = attributionConfigsJsonArray.getJSONObject(i);
// It can't be null, has already been validated at fetcher
enrollmentIds.add(
attributionConfigJson.getString(
AttributionConfig.AttributionConfigContract.SOURCE_NETWORK));
}
} catch (JSONException e) {
LoggerFactory.getMeasurementLogger().d(e, "Failed to parse attribution configs.");
}
return enrollmentIds;
}
private Optional<AggregateDeduplicationKey> maybeGetAggregateDeduplicationKey(
Source source, Trigger trigger) {
try {
Optional<AggregateDeduplicationKey> dedupKey;
Optional<AggregatableAttributionSource> optionalAggregateAttributionSource =
source.getAggregatableAttributionSource(trigger, mFlags);
Optional<AggregatableAttributionTrigger> optionalAggregateAttributionTrigger =
trigger.getAggregatableAttributionTrigger(mFlags);
if (!optionalAggregateAttributionSource.isPresent()
|| !optionalAggregateAttributionTrigger.isPresent()) {
return Optional.empty();
}
AggregatableAttributionSource aggregateAttributionSource =
optionalAggregateAttributionSource.get();
AggregatableAttributionTrigger aggregateAttributionTrigger =
optionalAggregateAttributionTrigger.get();
dedupKey =
aggregateAttributionTrigger.maybeExtractDedupKey(
aggregateAttributionSource.getFilterMap(), mFlags);
return dedupKey;
} catch (JSONException e) {
LoggerFactory.getMeasurementLogger()
.e(
e,
"AttributionJobHandler::maybeGetAggregateDeduplicationKey JSONException"
+ " when parse aggregate dedup key fields in"
+ " AttributionJobHandler.");
return Optional.empty();
}
}
private void ignoreCompetingSources(
IMeasurementDao measurementDao,
List<Source> remainingMatchingSources,
String triggerEnrollmentId)
throws DatastoreException {
if (!remainingMatchingSources.isEmpty()) {
List<String> ignoredOriginalSourceIds = new ArrayList<>();
for (Source source : remainingMatchingSources) {
source.setStatus(Source.Status.IGNORED);
if (source.getParentId() == null) {
// Original source
ignoredOriginalSourceIds.add(source.getId());
} else {
// Derived source (XNA)
measurementDao.insertIgnoredSourceForEnrollment(
source.getParentId(), triggerEnrollmentId);
}
}
measurementDao.updateSourceStatus(ignoredOriginalSourceIds, Source.Status.IGNORED);
}
}
private TriggeringStatus maybeGenerateEventReport(
Source source,
Trigger trigger,
IMeasurementDao measurementDao,
AttributionStatus attributionStatus)
throws DatastoreException {
if (source.getParentId() != null) {
LoggerFactory.getMeasurementLogger()
.d("Event report generation skipped because it's a derived source.");
return TriggeringStatus.DROPPED;
}
if (mFlags.getMeasurementEnableScopedAttributionRateLimit()
&& !hasAttributionQuota(
Attribution.Scope.EVENT, source, trigger, measurementDao)) {
LoggerFactory.getMeasurementLogger()
.d("Attribution blocked by event rate limits. Source ID: %s ; "
+ "Trigger ID: %s ", source.getId(), trigger.getId());
return TriggeringStatus.DROPPED;
}
// TODO: Handle attribution rate limit consideration for non-truthful cases.
if (source.getAttributionMode() != Source.AttributionMode.TRUTHFULLY) {
mDebugReportApi.scheduleTriggerDebugReport(
source, trigger, null, measurementDao, Type.TRIGGER_EVENT_NOISE);
return TriggeringStatus.DROPPED;
}
Optional<EventTrigger> matchingEventTrigger =
findFirstMatchingEventTrigger(source, trigger, measurementDao);
if (!matchingEventTrigger.isPresent()) {
return TriggeringStatus.DROPPED;
}
EventTrigger eventTrigger = matchingEventTrigger.get();
// Check if deduplication key clashes with existing reports.
if (eventTrigger.getDedupKey() != null) {
boolean alreadyAttributed;
if (mFlags.getMeasurementEnableAraDeduplicationAlignmentV1()) {
try {
source.buildAttributedTriggers();
alreadyAttributed = hasDeduplicationKey(source, eventTrigger.getDedupKey());
} catch (JSONException e) {
LoggerFactory.getMeasurementLogger()
.e(e, "maybeGenerateEventReport: failed to build attributed triggers.");
return TriggeringStatus.DROPPED;
}
} else {
alreadyAttributed = source.getEventReportDedupKeys().contains(
eventTrigger.getDedupKey());
}
if (alreadyAttributed) {
mDebugReportApi.scheduleTriggerDebugReport(
source,
trigger,
/* limit = */ null,
measurementDao,
Type.TRIGGER_EVENT_DEDUPLICATED);
return TriggeringStatus.DROPPED;
}
}
if (mEventReportWindowCalcDelegate.getReportingTime(
source, trigger.getTriggerTime(), trigger.getDestinationType()) == -1
&& (source.getTriggerSpecsString() == null
|| source.getTriggerSpecsString().isEmpty())) {
mDebugReportApi.scheduleTriggerDebugReport(
source, trigger, null, measurementDao, Type.TRIGGER_EVENT_REPORT_WINDOW_PASSED);
return TriggeringStatus.DROPPED;
}
int numReports =
measurementDao.getNumEventReportsPerDestination(
trigger.getAttributionDestination(), trigger.getDestinationType());
if (numReports >= mFlags.getMeasurementMaxEventReportsPerDestination()) {
LoggerFactory.getMeasurementLogger()
.d(
String.format(
Locale.ENGLISH,
"Event reports for destination %1$s exceeds system health limit"
+ " of %2$d.",
trigger.getAttributionDestination(),
mFlags.getMeasurementMaxEventReportsPerDestination()));
mDebugReportApi.scheduleTriggerDebugReport(
source,
trigger,
String.valueOf(numReports),
measurementDao,
Type.TRIGGER_EVENT_STORAGE_LIMIT);
return TriggeringStatus.DROPPED;
}
Pair<List<Uri>, List<Uri>> destinations =
measurementDao.getSourceDestinations(source.getId());
source.setAppDestinations(destinations.first);
source.setWebDestinations(destinations.second);
Pair<UnsignedLong, UnsignedLong> debugKeyPair =
new DebugKeyAccessor(measurementDao).getDebugKeys(source, trigger);
if (!mFlags.getMeasurementFlexibleEventReportingApiEnabled()
|| source.getTriggerSpecsString() == null
|| source.getTriggerSpecsString().isEmpty()) {
EventReport newEventReport =
new EventReport.Builder()
.populateFromSourceAndTrigger(
source,
trigger,
eventTrigger,
debugKeyPair,
mEventReportWindowCalcDelegate,
mSourceNoiseHandler,
getEventReportDestinations(
source, trigger.getDestinationType()))
.build();
if (!provisionEventReportQuota(source, trigger, newEventReport, measurementDao)) {
return TriggeringStatus.DROPPED;
}
if (mFlags.getMeasurementEnableAraDeduplicationAlignmentV1()) {
finalizeEventReportCreation(
source, eventTrigger, trigger, newEventReport, measurementDao);
} else {
finalizeEventReportCreation(source, eventTrigger, newEventReport, measurementDao);
}
incrementEventReportCountBy(attributionStatus, 1);
if (newEventReport.getDebugReportStatus() == EventReport.DebugReportStatus.PENDING) {
incrementEventDebugReportCountBy(attributionStatus, 1);
}
// The source is using flexible event API
} else if (source.getTriggerSpecsString() != null
&& !source.getTriggerSpecsString().isEmpty()) {
try {
source.buildTriggerSpecs();
if (!generateFlexEventReports(
source, trigger, eventTrigger, debugKeyPair, measurementDao)) {
return TriggeringStatus.DROPPED;
}
} catch (JSONException e) {
LoggerFactory.getMeasurementLogger().e(
e, "AttributionJobHandler::maybeGenerateEventReport cannot build trigger"
+ "specs");
return TriggeringStatus.DROPPED;
}
}
return TriggeringStatus.ATTRIBUTED;
}
private static int restoreTriggerContributionsAndProvisionFlexEventReportQuota(
Source source,
Trigger trigger,
Map<UnsignedLong, Integer> triggerDataToBucketIndexMap,
IMeasurementDao measurementDao) throws DatastoreException {
List<EventReport> sourceEventReports = measurementDao.getSourceEventReports(source);
List<EventReport> reportsToDelete = new ArrayList<>();
source.getTriggerSpecs().prepareFlexAttribution(
sourceEventReports,
trigger.getTriggerTime(),
reportsToDelete,
triggerDataToBucketIndexMap);
int numEarlierScheduledReports = sourceEventReports.size() - reportsToDelete.size();
int maxEventReports = source.getTriggerSpecs().getMaxReports();
// Completed reports already covered the allotted quota.
if (numEarlierScheduledReports == maxEventReports) {
return 0;
}
// Delete pending reports. We will recreate an updated sequence below.
for (EventReport eventReport : reportsToDelete) {
measurementDao.deleteEventReport(eventReport);
}
return maxEventReports - numEarlierScheduledReports;
}
private boolean generateFlexEventReports(
Source source,
Trigger trigger,
EventTrigger eventTrigger,
Pair<UnsignedLong, UnsignedLong> debugKeyPair,
IMeasurementDao measurementDao) throws DatastoreException {
TriggerSpecs triggerSpecs = source.getTriggerSpecs();
if (!triggerSpecs.containsTriggerData(eventTrigger.getTriggerData())) {
return false;
}
// Store the current bucket index for each trigger data
Map<UnsignedLong, Integer> triggerDataToBucketIndexMap = new HashMap<>();
int remainingReportQuota =
restoreTriggerContributionsAndProvisionFlexEventReportQuota(
source, trigger, triggerDataToBucketIndexMap, measurementDao);
if (remainingReportQuota == 0) {
return false;
}
List<AttributedTrigger> attributedTriggers = source.getAttributedTriggers();
long triggerValue =
triggerSpecs.getSummaryOperatorType(eventTrigger.getTriggerData())
== TriggerSpec.SummaryOperatorType.COUNT
? 1L
: eventTrigger.getTriggerValue();
attributedTriggers.add(new AttributedTrigger(
trigger.getId(),
eventTrigger.getTriggerPriority(),
eventTrigger.getTriggerData(),
triggerValue,
trigger.getTriggerTime(),
eventTrigger.getDedupKey(),
debugKeyPair.second,
debugKeyPair.first != null));
attributedTriggers.sort(
Comparator.comparingLong(AttributedTrigger::getPriority).reversed()
.thenComparing(AttributedTrigger::getTriggerTime));
// Store for each trigger data any amount already covered for the current bucket.
Map<UnsignedLong, Long> triggerDataToBucketAmountMap = new HashMap<>();
Map<UnsignedLong, List<AttributedTrigger>> triggerDataToContributingTriggersMap =
new HashMap<>();
for (AttributedTrigger attributedTrigger : attributedTriggers) {
// Flex API already inserts the attributed trigger and does not need an explicit action
// for that.
if (attributedTrigger.getDedupKey() != null
&& !mFlags.getMeasurementEnableAraDeduplicationAlignmentV1()) {
source.getEventReportDedupKeys().add(attributedTrigger.getDedupKey());
measurementDao.updateSourceEventReportDedupKeys(source);
}
remainingReportQuota -= updateFlexAttributionStateAndGetNumReports(
source,
trigger,
attributedTrigger,
remainingReportQuota,
triggerDataToBucketIndexMap,
triggerDataToBucketAmountMap,
triggerDataToContributingTriggersMap,
measurementDao);
if (remainingReportQuota == 0) {
break;
}
}
measurementDao.updateSourceAttributedTriggers(
source.getId(),
source.attributedTriggersToJsonFlexApi());
// TODO (b/307786346): represent actual report count.
return true;
}
private int updateFlexAttributionStateAndGetNumReports(
Source source,
Trigger trigger,
AttributedTrigger attributedTrigger,
int remainingReportQuota,
Map<UnsignedLong, Integer> triggerDataToBucketIndexMap,
Map<UnsignedLong, Long> triggerDataToBucketAmountMap,
Map<UnsignedLong, List<AttributedTrigger>> triggerDataToContributingTriggersMap,
IMeasurementDao measurementDao) throws DatastoreException {
TriggerSpecs triggerSpecs = source.getTriggerSpecs();
UnsignedLong triggerData = attributedTrigger.getTriggerData();
triggerDataToBucketIndexMap.putIfAbsent(triggerData, 0);
int bucketIndex = triggerDataToBucketIndexMap.get(triggerData);
List<Long> buckets = triggerSpecs.getSummaryBucketsForTriggerData(triggerData);
// Once we've generated a report for the last bucket, subsequent triggers cannot
// generate more reports.
if (bucketIndex == buckets.size()) {
return 0;
}
triggerDataToBucketAmountMap.putIfAbsent(triggerData, 0L);
triggerDataToContributingTriggersMap.putIfAbsent(triggerData, new ArrayList<>());
List<AttributedTrigger> contributingTriggers =
triggerDataToContributingTriggersMap.get(triggerData);
if (attributedTrigger.remainingValue() > 0L) {
contributingTriggers.add(attributedTrigger);
}
long prevBucket = bucketIndex == 0 ? 0L : buckets.get(bucketIndex - 1);
int numReportsCreated = 0;
for (int i = bucketIndex; i < buckets.size(); i++) {
long bucket = buckets.get(i);
long bucketSize = bucket - prevBucket;
long bucketAmount = triggerDataToBucketAmountMap.get(triggerData);
if (attributedTrigger.remainingValue() >= bucketSize - bucketAmount) {
finalizeEventReportCreationForFlex(
source,
trigger,
attributedTrigger,
contributingTriggers,
TriggerSpecs.getSummaryBucketFromIndex(i, buckets),
measurementDao);
numReportsCreated += 1;
if (remainingReportQuota - numReportsCreated == 0) {
return numReportsCreated;
}
attributedTrigger.addContribution(bucketSize - bucketAmount);
triggerDataToBucketIndexMap.put(triggerData, i + 1);
triggerDataToBucketAmountMap.put(triggerData, 0L);
contributingTriggers.clear();
if (attributedTrigger.remainingValue() > 0L) {
contributingTriggers.add(attributedTrigger);
}
} else {
triggerDataToBucketIndexMap.put(triggerData, i);
long diff = attributedTrigger.remainingValue();
triggerDataToBucketAmountMap.merge(
triggerData, diff, (oldValue, value) -> oldValue + diff);
attributedTrigger.addContribution(diff);
break;
}
prevBucket = bucket;
}
return numReportsCreated;
}
private List<Uri> getEventReportDestinations(@NonNull Source source, int destinationType) {
ImmutableList.Builder<Uri> destinations = new ImmutableList.Builder<>();
if (mFlags.getMeasurementEnableCoarseEventReportDestinations()
&& source.getCoarseEventReportDestinations()) {
Optional.ofNullable(source.getAppDestinations()).ifPresent(destinations::addAll);
Optional.ofNullable(source.getWebDestinations()).ifPresent(destinations::addAll);
} else {
destinations.addAll(source.getAttributionDestinations(destinationType));
}
return destinations.build();
}
private boolean provisionEventReportQuota(
Source source,
Trigger trigger,
EventReport newEventReport,
IMeasurementDao measurementDao)
throws DatastoreException {
List<EventReport> sourceEventReports = measurementDao.getSourceEventReports(source);
if (isWithinReportLimit(source, sourceEventReports.size(), trigger.getDestinationType())) {
return true;
}
List<EventReport> relevantEventReports =
sourceEventReports.stream()
.filter(
(r) ->
r.getStatus() == EventReport.Status.PENDING
&& r.getReportTime()
== newEventReport.getReportTime())
.sorted(
Comparator.comparingLong(EventReport::getTriggerPriority)
.thenComparing(
EventReport::getTriggerTime,
Comparator.reverseOrder()))
.collect(Collectors.toList());
if (relevantEventReports.isEmpty()) {
UnsignedLong triggerData = newEventReport.getTriggerData();
mDebugReportApi.scheduleTriggerDebugReportWithAllFields(
source,
trigger,
triggerData,
measurementDao,
Type.TRIGGER_EVENT_EXCESSIVE_REPORTS);
return false;
}
EventReport lowestPriorityEventReport = relevantEventReports.get(0);
if (lowestPriorityEventReport.getTriggerPriority() >= newEventReport.getTriggerPriority()) {
UnsignedLong triggerData = newEventReport.getTriggerData();
mDebugReportApi.scheduleTriggerDebugReportWithAllFields(
source, trigger, triggerData, measurementDao, Type.TRIGGER_EVENT_LOW_PRIORITY);
return false;
}
if (lowestPriorityEventReport.getTriggerDedupKey() != null
&& !mFlags.getMeasurementEnableAraDeduplicationAlignmentV1()) {
source.getEventReportDedupKeys().remove(lowestPriorityEventReport.getTriggerDedupKey());
}
measurementDao.deleteEventReport(lowestPriorityEventReport);
return true;
}
private static void finalizeEventReportCreation(
Source source,
EventTrigger eventTrigger,
EventReport eventReport,
IMeasurementDao measurementDao)
throws DatastoreException {
if (eventTrigger.getDedupKey() != null) {
source.getEventReportDedupKeys().add(eventTrigger.getDedupKey());
}
measurementDao.updateSourceEventReportDedupKeys(source);
measurementDao.insertEventReport(eventReport);
}
private static void finalizeEventReportCreation(
Source source,
EventTrigger eventTrigger,
Trigger trigger,
EventReport eventReport,
IMeasurementDao measurementDao)
throws DatastoreException {
if (eventTrigger.getDedupKey() != null) {
source.getAttributedTriggers().add(
new AttributedTrigger(
trigger.getId(),
eventTrigger.getTriggerData(),
eventTrigger.getDedupKey()));
measurementDao.updateSourceAttributedTriggers(
source.getId(),
source.attributedTriggersToJson());
}
measurementDao.insertEventReport(eventReport);
}
private void finalizeEventReportCreationForFlex(
Source source,
Trigger trigger,
AttributedTrigger attributedTrigger,
List<AttributedTrigger> contributingTriggers,
Pair<Long, Long> triggerSummaryBucket,
IMeasurementDao measurementDao)
throws DatastoreException {
long reportTime = mEventReportWindowCalcDelegate.getFlexEventReportingTime(
source.getTriggerSpecs(),
source.getEventTime(),
// We can make an assertion that any report generated for any trigger data can only
// be associated with the next report window after trigger time that's configured
// for that trigger data: (1) if report time were to be before trigger time, that
// would mean attributed triggers all with an earlier time filled a bucket during
// the current iteration, which is disputed by counting all of those buckets before,
// and (2) if report time is to be after trigger time, it necessarily will be the
// next report window configured for the current trigger data, regardless of how
// much earlier were the actual attributed triggers counted towards the bucket.
trigger.getTriggerTime(),
attributedTrigger.getTriggerData());
Pair<UnsignedLong, List<UnsignedLong>> debugKeys =
getDebugKeysForFlex(contributingTriggers, source);
EventReport eventReport =
new EventReport.Builder()
.getForFlex(
source,
trigger,
attributedTrigger,
reportTime,
triggerSummaryBucket,
debugKeys.first,
debugKeys.second,
mEventReportWindowCalcDelegate,
mSourceNoiseHandler,
getEventReportDestinations(
source, trigger.getDestinationType()))
.build();
measurementDao.insertEventReport(eventReport);
}
private static void finalizeAggregateReportCreation(
Source source,
Optional<AggregateDeduplicationKey> aggregateDeduplicationKeyOptional,
AggregateReport aggregateReport,
IMeasurementDao measurementDao)
throws DatastoreException {
if (aggregateDeduplicationKeyOptional.isPresent()) {
source.getAggregateReportDedupKeys()
.add(aggregateDeduplicationKeyOptional.get().getDeduplicationKey().get());
}
if (source.getParentId() == null) {
// Only update aggregate contributions for an original source, not for a derived
// source
measurementDao.updateSourceAggregateContributions(source);
measurementDao.updateSourceAggregateReportDedupKeys(source);
}
measurementDao.insertAggregateReport(aggregateReport);
}
private static void ignoreTrigger(Trigger trigger, IMeasurementDao measurementDao)
throws DatastoreException {
trigger.setStatus(Trigger.Status.IGNORED);
measurementDao.updateTriggerStatus(
Collections.singletonList(trigger.getId()), Trigger.Status.IGNORED);
}
private static void attributeTrigger(Trigger trigger, IMeasurementDao measurementDao)
throws DatastoreException {
trigger.setStatus(Trigger.Status.ATTRIBUTED);
measurementDao.updateTriggerStatus(
Collections.singletonList(trigger.getId()), Trigger.Status.ATTRIBUTED);
}
private static void insertAttribution(Source source, Trigger trigger,
IMeasurementDao measurementDao) throws DatastoreException {
measurementDao.insertAttribution(createAttributionBuilder(source, trigger).build());
}
private static void insertAttribution(@Attribution.Scope int scope, Source source,
Trigger trigger, IMeasurementDao measurementDao) throws DatastoreException {
measurementDao.insertAttribution(
createAttributionBuilder(source, trigger)
.setScope(scope)
.build());
}
private boolean hasAttributionQuota(
Source source, Trigger trigger, IMeasurementDao measurementDao)
throws DatastoreException {
long attributionCount = measurementDao.getAttributionsPerRateLimitWindow(source, trigger);
if (attributionCount >= mFlags.getMeasurementMaxAttributionPerRateLimitWindow()) {
mDebugReportApi.scheduleTriggerDebugReport(
source,
trigger,
String.valueOf(attributionCount),
measurementDao,
Type.TRIGGER_ATTRIBUTIONS_PER_SOURCE_DESTINATION_LIMIT);
}
return attributionCount < mFlags.getMeasurementMaxAttributionPerRateLimitWindow();
}
private boolean hasAttributionQuota(
@Attribution.Scope int scope,
Source source,
Trigger trigger,
IMeasurementDao measurementDao) throws DatastoreException {
long attributionCount = measurementDao.getAttributionsPerRateLimitWindow(
scope, source, trigger);
int limit = scope == Attribution.Scope.EVENT
? mFlags.getMeasurementMaxEventAttributionPerRateLimitWindow()
: mFlags.getMeasurementMaxAggregateAttributionPerRateLimitWindow();
boolean isWithinLimit = attributionCount < limit;
if (!isWithinLimit) {
// TODO (b/309324404) Consider debug report implications for scoped attribution rate
// limit.
mDebugReportApi.scheduleTriggerDebugReport(
source,
trigger,
String.valueOf(attributionCount),
measurementDao,
Type.TRIGGER_ATTRIBUTIONS_PER_SOURCE_DESTINATION_LIMIT);
}
return isWithinLimit;
}
private boolean isWithinReportLimit(
Source source, int existingReportCount, @EventSurfaceType int destinationType) {
return mEventReportWindowCalcDelegate.getMaxReportCount(
source, hasAppInstallAttributionOccurred(source, destinationType))
> existingReportCount;
}
private static boolean hasAppInstallAttributionOccurred(
Source source, @EventSurfaceType int destinationType) {
return destinationType == EventSurfaceType.APP && source.isInstallAttributed();
}
private static boolean isWithinInstallCooldownWindow(Source source, Trigger trigger) {
return trigger.getTriggerTime()
< (source.getEventTime() + source.getInstallCooldownWindow());
}
/**
* The logic works as following - 1. If source OR trigger filters are empty, we call it a match
* since there is no restriction. 2. If source and trigger filters have no common keys, it's a
* match. 3. All common keys between source and trigger filters should have intersection between
* their list of values.
*
* @return true for a match, false otherwise
*/
private boolean doTopLevelFiltersMatch(
@NonNull Source source, @NonNull Trigger trigger, IMeasurementDao measurementDao)
throws DatastoreException {
try {
FilterMap sourceFilters = source.getFilterData(trigger, mFlags);
List<FilterMap> triggerFilterSet = extractFilterSet(trigger.getFilters());
List<FilterMap> triggerNotFilterSet = extractFilterSet(trigger.getNotFilters());
boolean isFilterMatch =
mFilter.isFilterMatch(sourceFilters, triggerFilterSet, true)
&& mFilter.isFilterMatch(sourceFilters, triggerNotFilterSet, false);
if (!isFilterMatch
&& !sourceFilters.isEmpty(mFlags)
&& (!triggerFilterSet.isEmpty() || !triggerNotFilterSet.isEmpty())) {
mDebugReportApi.scheduleTriggerDebugReport(
source,
trigger,
/* limit = */ null,
measurementDao,
Type.TRIGGER_NO_MATCHING_FILTER_DATA);
}
return isFilterMatch;
} catch (JSONException e) {
// If JSON is malformed, we shall consider as not matched.
LoggerFactory.getMeasurementLogger()
.e(e, "AttributionJobHandler::doTopLevelFiltersMatch: JSON parse failed.");
return false;
}
}
private Optional<EventTrigger> findFirstMatchingEventTrigger(
Source source, Trigger trigger, IMeasurementDao measurementDao)
throws DatastoreException {
try {
FilterMap sourceFiltersData = source.getFilterData(trigger, mFlags);
List<EventTrigger> eventTriggers = trigger.parseEventTriggers(mFlags);
Optional<EventTrigger> matchingEventTrigger =
eventTriggers.stream()
.filter(
eventTrigger ->
doEventLevelFiltersMatch(
sourceFiltersData, eventTrigger))
.findFirst();
// trigger-no-matching-configurations verbose debug report is generated when event
// trigger "filters/not_filters" field doesn't match source "filter_data" field. It
// won't be generated when trigger doesn't have event_trigger_data field.
if (!matchingEventTrigger.isPresent() && !eventTriggers.isEmpty()) {
mDebugReportApi.scheduleTriggerDebugReport(
source,
trigger,
/* limit = */ null,
measurementDao,
Type.TRIGGER_EVENT_NO_MATCHING_CONFIGURATIONS);
}
return matchingEventTrigger;
} catch (JSONException e) {
// If JSON is malformed, we shall consider as not matched.
LoggerFactory.getMeasurementLogger()
.e(
e,
"AttributionJobHandler::findFirstMatchingEventTrigger: Malformed JSON"
+ " string.");
return Optional.empty();
}
}
private boolean doEventLevelFiltersMatch(
FilterMap sourceFiltersData, EventTrigger eventTrigger) {
if (eventTrigger.getFilterSet().isPresent()
&& !mFilter.isFilterMatch(
sourceFiltersData, eventTrigger.getFilterSet().get(), true)) {
return false;
}
if (eventTrigger.getNotFilterSet().isPresent()
&& !mFilter.isFilterMatch(
sourceFiltersData, eventTrigger.getNotFilterSet().get(), false)) {
return false;
}
return true;
}
private List<FilterMap> extractFilterSet(String str) throws JSONException {
return mFlags.getMeasurementEnableLookbackWindowFilter()
? extractFilterSetV2(str)
: extractFilterSetV1(str);
}
private List<FilterMap> extractFilterSetV1(String str) throws JSONException {
String json = (str == null || str.isEmpty()) ? "[]" : str;
List<FilterMap> filterSet = new ArrayList<>();
JSONArray filters = new JSONArray(json);
for (int i = 0; i < filters.length(); i++) {
FilterMap filterMap =
new FilterMap.Builder()
.buildFilterData(filters.getJSONObject(i))
.build();
filterSet.add(filterMap);
}
return filterSet;
}
private List<FilterMap> extractFilterSetV2(String str) throws JSONException {
String json = (str == null || str.isEmpty()) ? "[]" : str;
JSONArray filters = new JSONArray(json);
return mFilter.deserializeFilterSet(filters);
}
private OptionalInt validateAndGetUpdatedAggregateContributions(
List<AggregateHistogramContribution> contributions,
Source source,
Trigger trigger,
IMeasurementDao measurementDao)
throws DatastoreException {
int newAggregateContributions = source.getAggregateContributions();
for (AggregateHistogramContribution contribution : contributions) {
try {
newAggregateContributions =
Math.addExact(newAggregateContributions, contribution.getValue());
if (newAggregateContributions
>= PrivacyParams.MAX_SUM_OF_AGGREGATE_VALUES_PER_SOURCE) {
// When histogram value is >= 65536 (aggregatable_budget_per_source),
// generate verbose debug report, record the actual histogram value.
mDebugReportApi.scheduleTriggerDebugReport(
source,
trigger,
String.valueOf(PrivacyParams.MAX_SUM_OF_AGGREGATE_VALUES_PER_SOURCE),
measurementDao,
Type.TRIGGER_AGGREGATE_INSUFFICIENT_BUDGET);
}
if (newAggregateContributions
> PrivacyParams.MAX_SUM_OF_AGGREGATE_VALUES_PER_SOURCE) {
return OptionalInt.empty();
}
} catch (ArithmeticException e) {
LoggerFactory.getMeasurementLogger()
.e(
e,
"AttributionJobHandler::validateAndGetUpdatedAggregateContributions"
+ " Error adding aggregate contribution values.");
return OptionalInt.empty();
}
}
return OptionalInt.of(newAggregateContributions);
}
private static long roundDownToDay(long timestamp) {
return Math.floorDiv(timestamp, TimeUnit.DAYS.toMillis(1)) * TimeUnit.DAYS.toMillis(1);
}
private boolean isReportingOriginWithinPrivacyBounds(
Source source, Trigger trigger, IMeasurementDao measurementDao)
throws DatastoreException {
Optional<Pair<Uri, Uri>> publisherAndDestination =
getPublisherAndDestinationTopPrivateDomains(source, trigger);
if (publisherAndDestination.isPresent()) {
Integer count =
measurementDao.countDistinctReportingOriginsPerPublisherXDestInAttribution(
publisherAndDestination.get().first,
publisherAndDestination.get().second,
trigger.getRegistrationOrigin(),
trigger.getTriggerTime() - PrivacyParams.RATE_LIMIT_WINDOW_MILLISECONDS,
trigger.getTriggerTime());
if (count >= mFlags.getMeasurementMaxDistinctEnrollmentsInAttribution()) {
mDebugReportApi.scheduleTriggerDebugReport(
source,
trigger,
String.valueOf(count),
measurementDao,
Type.TRIGGER_REPORTING_ORIGIN_LIMIT);
}
return count < mFlags.getMeasurementMaxDistinctEnrollmentsInAttribution();
} else {
LoggerFactory.getMeasurementLogger()
.d(
"isEnrollmentWithinPrivacyBounds:"
+ " getPublisherAndDestinationTopPrivateDomains failed. %s %s",
source.getPublisher(), trigger.getAttributionDestination());
return true;
}
}
private static Optional<Pair<Uri, Uri>> getPublisherAndDestinationTopPrivateDomains(
Source source, Trigger trigger) {
Uri attributionDestination = trigger.getAttributionDestination();
Optional<Uri> triggerDestinationTopPrivateDomain =
trigger.getDestinationType() == EventSurfaceType.APP
? Optional.of(BaseUriExtractor.getBaseUri(attributionDestination))
: WebAddresses.topPrivateDomainAndScheme(attributionDestination);
Uri publisher = source.getPublisher();
Optional<Uri> publisherTopPrivateDomain =
source.getPublisherType() == EventSurfaceType.APP
? Optional.of(publisher)
: WebAddresses.topPrivateDomainAndScheme(publisher);
if (!triggerDestinationTopPrivateDomain.isPresent()
|| !publisherTopPrivateDomain.isPresent()) {
return Optional.empty();
} else {
return Optional.of(Pair.create(
publisherTopPrivateDomain.get(),
triggerDestinationTopPrivateDomain.get()));
}
}
public static Attribution.Builder createAttributionBuilder(@NonNull Source source,
@NonNull Trigger trigger) {
Optional<Uri> publisherTopPrivateDomain =
getTopPrivateDomain(source.getPublisher(), source.getPublisherType());
Uri destination = trigger.getAttributionDestination();
Optional<Uri> destinationTopPrivateDomain =
getTopPrivateDomain(destination, trigger.getDestinationType());
if (!publisherTopPrivateDomain.isPresent()
|| !destinationTopPrivateDomain.isPresent()) {
throw new IllegalArgumentException(
String.format(
"insertAttributionRateLimit: "
+ "getSourceAndDestinationTopPrivateDomains"
+ " failed. Publisher: %s; Attribution destination: %s",
source.getPublisher(), destination));
}
return new Attribution.Builder()
.setSourceSite(publisherTopPrivateDomain.get().toString())
.setSourceOrigin(source.getPublisher().toString())
.setDestinationSite(destinationTopPrivateDomain.get().toString())
.setDestinationOrigin(BaseUriExtractor.getBaseUri(destination).toString())
.setEnrollmentId(trigger.getEnrollmentId())
// TODO: b/276638412 rename to Attribution::setSourceTime
.setTriggerTime(source.getEventTime())
.setRegistrant(trigger.getRegistrant().toString())
.setSourceId(source.getId())
.setTriggerId(trigger.getId())
.setRegistrationOrigin(trigger.getRegistrationOrigin());
}
private static Optional<Uri> getTopPrivateDomain(
Uri uri, @EventSurfaceType int eventSurfaceType) {
return eventSurfaceType == EventSurfaceType.APP
? Optional.of(BaseUriExtractor.getBaseUri(uri))
: WebAddresses.topPrivateDomainAndScheme(uri);
}
private static Pair<UnsignedLong, List<UnsignedLong>> getDebugKeysForFlex(
List<AttributedTrigger> contributingTriggers, Source source) {
List<UnsignedLong> triggerDebugKeys = new ArrayList<>();
// To provide a source debug key in the event report, the source debug key must have been
// populated for each evaluation for source and trigger for all triggers contributing to the
// bucket.
boolean allBucketContributorsHadNonNullSourceDebugKeys = true;
for (AttributedTrigger trigger : contributingTriggers) {
// Only add a debug key to the result if the invariant is maintained. Otherwise, the
// invariant has been broken, but conclude the iteration to process source debug-key.
if (trigger.getDebugKey() != null) {
triggerDebugKeys.add(trigger.getDebugKey());
}
// Update the value of the boolean for source debug key as a series of AND
// operations that must all be true.
allBucketContributorsHadNonNullSourceDebugKeys &= trigger.hasSourceDebugKey();
}
// We are allowed to access the actual source debug key value if the invariant has been
// maintained.
UnsignedLong sourceDebugKey = allBucketContributorsHadNonNullSourceDebugKeys
? source.getDebugKey()
: null;
// All triggers must have debug keys for the report to include any.
if (contributingTriggers.size() == triggerDebugKeys.size()) {
return Pair.create(sourceDebugKey, triggerDebugKeys);
} else {
return Pair.create(sourceDebugKey, Collections.emptyList());
}
}
private static boolean hasDeduplicationKey(@NonNull Source source,
@NonNull UnsignedLong dedupKey) {
for (AttributedTrigger attributedTrigger : source.getAttributedTriggers()) {
if (dedupKey.equals(attributedTrigger.getDedupKey())) {
return true;
}
}
return false;
}
private void logAttributionStats(AttributionStatus attributionStatus) {
mLogger.logMeasurementAttributionStats(
new MeasurementAttributionStats.Builder()
.setCode(AD_SERVICES_MEASUREMENT_ATTRIBUTION)
.setSourceType(attributionStatus.getSourceType().getValue())
.setSurfaceType(attributionStatus.getAttributionSurface().getValue())
.setResult(attributionStatus.getAttributionResult().getValue())
.setFailureType(attributionStatus.getFailureType().getValue())
.setSourceDerived(attributionStatus.isSourceDerived())
.setInstallAttribution(attributionStatus.isInstallAttribution())
.setAttributionDelay(attributionStatus.getAttributionDelay())
.setSourceRegistrant(attributionStatus.getSourceRegistrant())
.build());
}
private void logDelayedSourceRegistrationStats(Source source, Trigger trigger) {
DelayedSourceRegistrationStatus delayedSourceRegistrationStatus =
new DelayedSourceRegistrationStatus();
delayedSourceRegistrationStatus.setRegistrationDelay(
source.getEventTime() - trigger.getTriggerTime());
mLogger.logMeasurementDelayedSourceRegistrationStats(
new MeasurementDelayedSourceRegistrationStats.Builder()
.setCode(AD_SERVICES_MEASUREMENT_DELAYED_SOURCE_REGISTRATION)
.setRegistrationStatus(delayedSourceRegistrationStatus.UNKNOWN)
.setRegistrationDelay(
delayedSourceRegistrationStatus.getRegistrationDelay())
.setRegistrant(source.getRegistrant().toString())
.build());
}
private long getAggregateReportDelay() {
long reportDelayFromDefaults =
(long) (Math.random() * AGGREGATE_REPORT_DELAY_SPAN + AGGREGATE_REPORT_MIN_DELAY);
if (!mFlags.getMeasurementEnableConfigurableAggregateReportDelay()) {
return reportDelayFromDefaults;
}
String aggregateReportDelayString = mFlags.getMeasurementAggregateReportDelayConfig();
if (aggregateReportDelayString == null) {
LoggerFactory.getMeasurementLogger()
.d("Invalid configurable aggregate report delay: null");
return reportDelayFromDefaults;
}
String[] split = aggregateReportDelayString.split(AGGREGATE_REPORT_DELAY_DELIMITER);
if (split.length != 2) {
LoggerFactory.getMeasurementLogger()
.d("Invalid configurable aggregate report delay: length is not two");
return reportDelayFromDefaults;
}
try {
final long minDelay = Long.parseLong(split[0].trim());
final long delaySpan = Long.parseLong(split[1].trim());
return (long) (Math.random() * delaySpan + minDelay);
} catch (NumberFormatException e) {
LoggerFactory.getMeasurementLogger()
.e(e, "Configurable aggregate report delay parsing failed.");
return reportDelayFromDefaults;
}
}
private void incrementEventReportCountBy(AttributionStatus attributionStatus, int count) {
attributionStatus.setEventReportCount(attributionStatus.getEventReportCount() + count);
}
private void incrementEventDebugReportCountBy(AttributionStatus attributionStatus, int count) {
attributionStatus.setEventDebugReportCount(
attributionStatus.getEventDebugReportCount() + count);
}
private void incrementAggregateReportCountBy(AttributionStatus attributionStatus, int count) {
attributionStatus.setAggregateReportCount(
attributionStatus.getAggregateReportCount() + count);
}
private void incrementAggregateDebugReportCountBy(
AttributionStatus attributionStatus, int count) {
attributionStatus.setAggregateDebugReportCount(
attributionStatus.getAggregateDebugReportCount() + count);
}
}