blob: d0fd6fac74b7002e7fe7270bd3f958d9e5d3965e [file] [log] [blame]
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.android.libraries.mobiledatadownload.lite;
import android.net.Uri;
import androidx.annotation.VisibleForTesting;
import com.google.android.libraries.mobiledatadownload.TimeSource;
import com.google.android.libraries.mobiledatadownload.file.spi.Monitor;
import com.google.android.libraries.mobiledatadownload.internal.logging.LogUtil;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.HashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
/** A Download Progress Monitor to support {@link DownloadListener}. */
@ThreadSafe
public class DownloadProgressMonitor implements Monitor, SingleFileDownloadProgressMonitor {
private static final String TAG = "DownloadProgressMonitor";
private final TimeSource timeSource;
private final Executor sequentialControlExecutor;
// NOTE: GuardRails prohibits multiple public constructors
private DownloadProgressMonitor(TimeSource timeSource, Executor controlExecutor) {
this.timeSource = timeSource;
// We want onProgress to be executed in order otherwise clients will observe out of order
// updates (bigger current size update appears before smaller current size update).
// We use Sequential Executor to ensure the onProgress will be processed sequentially.
this.sequentialControlExecutor = MoreExecutors.newSequentialExecutor(controlExecutor);
}
/** Constructor overload with {@link TimeSource}. */
// NOTE: this is necessary for use by other MDD components.
public static DownloadProgressMonitor create(TimeSource timeSource, Executor controlExecutor) {
return new DownloadProgressMonitor(timeSource, controlExecutor);
}
// We will only broadcast on progress notification at most once in this time frame.
// Currently MobStore Monitor notify every 8KB of downloaded bytes. This may be too chatty on
// fast network.
// 1000 was chosen arbitrarily.
@VisibleForTesting static final long BUFFERED_TIME_MS = 1000;
@GuardedBy("DownloadProgressMonitor.class")
private final HashMap<Uri, DownloadedBytesCounter> uriToDownloadedBytesCounter = new HashMap<>();
@Override
@Nullable
public Monitor.InputMonitor monitorRead(Uri uri) {
return null;
}
@Override
@Nullable
public Monitor.OutputMonitor monitorWrite(Uri uri) {
synchronized (DownloadProgressMonitor.class) {
if (uriToDownloadedBytesCounter.get(uri) == null) {
// All monitors for a shared FileStorage will be invoked for all file accesses through this
// shared FileStorage. So this monitor can receive non-MDD-Lite Uri.
return null;
}
return uriToDownloadedBytesCounter.get(uri);
}
}
@Override
@Nullable
public Monitor.OutputMonitor monitorAppend(Uri uri) {
return monitorWrite(uri);
}
public void pausedForConnectivity() {
synchronized (DownloadProgressMonitor.class) {
for (DownloadedBytesCounter downloadedBytesCounter : uriToDownloadedBytesCounter.values()) {
downloadedBytesCounter.pausedForConnectivity();
}
}
}
@Override
public void addDownloadListener(Uri uri, DownloadListener downloadListener) {
synchronized (DownloadProgressMonitor.class) {
if (!uriToDownloadedBytesCounter.containsKey(uri)) {
uriToDownloadedBytesCounter.put(uri, new DownloadedBytesCounter(uri, downloadListener));
}
}
}
@Override
public void removeDownloadListener(Uri uri) {
synchronized (DownloadProgressMonitor.class) {
uriToDownloadedBytesCounter.remove(uri);
}
}
// A counter for bytes downloaded.
private final class DownloadedBytesCounter implements Monitor.OutputMonitor {
private final Uri uri;
private final DownloadListener downloadListener;
private final AtomicLong byteCounter = new AtomicLong();
// Last timestamp that we broadcast on progress.
private long lastBroadcastOnProgressTimestampMs;
DownloadedBytesCounter(Uri uri, DownloadListener downloadListener) {
this.uri = uri;
this.downloadListener = downloadListener;
lastBroadcastOnProgressTimestampMs = timeSource.currentTimeMillis();
}
@Override
public void bytesWritten(byte[] b, int off, int len) {
notifyProgress(len);
}
private void notifyProgress(long len) {
// Only broadcast progress update every BUFFERED_TIME_MS.
// It will be fast (no locking) when there is no need to broadcast progress.
// When there is a need to broadcast progress, we need to obtain the lock due to 2 reasons:
// 1- Concurrent access to uriToDownloadedBytesCounter.
// 2- Prevent out of order progress update.
if (timeSource.currentTimeMillis() - lastBroadcastOnProgressTimestampMs < BUFFERED_TIME_MS) {
byteCounter.getAndAdd(len);
LogUtil.v(
"%s: Received data for uri = %s, len = %d, Counter = %d",
TAG, uri, len, byteCounter.get());
} else {
synchronized (DownloadProgressMonitor.class) {
// Reset timestamp.
lastBroadcastOnProgressTimestampMs = timeSource.currentTimeMillis();
byteCounter.getAndAdd(len);
LogUtil.v(
"%s: Received data for uri = %s, len = %d, Counter = %d",
TAG, uri, len, byteCounter.get());
if (uriToDownloadedBytesCounter.containsKey(uri)) {
sequentialControlExecutor.execute(() -> downloadListener.onProgress(byteCounter.get()));
}
}
}
}
public void pausedForConnectivity() {
downloadListener.onPausedForConnectivity();
}
}
}