blob: c94c9ef9d06b2bcf5f741d47f2e6ce6fe3193ded [file] [log] [blame]
// Copyright 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.android.downloader;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Predicates.instanceOf;
import static com.google.common.base.Throwables.getCausalChain;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.net.HttpHeaders;
import com.google.common.util.concurrent.ClosingFuture;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.CheckReturnValue;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.nio.channels.WritableByteChannel;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Queue;
import java.util.TimeZone;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
/**
* In-process download system. Provides a light-weight mechanism to download resources over the
* network. Downloader includes the following features:
*
* <ul>
* <li>Configurable choice of network stack, with several defaults available out of the box.
* <li>Fully asynchronous behavior, allowing downloads to progress and complete without blocking
* (assuming the underlying network stack can avoid blocking).
* <li>Support for HTTP range requests, allowing partial downloads to resume mid-way through, and
* avoiding redownloading of fully-downloaded requests.
* <li>Detection of network interruptions, and a configurable way to retry requests that have
* failed after losing connectivity.
* </ul>
*
* <p>Note that because this library performs downloads in-process, it is subject to having
* downloads stall or abort when the app is suspended or killed. Download requests only live in
* memory, and thus are lost when the process ends. Library users should persist the set of
* downloads to be executed in persistent storage, such as a SQLite database, and run the downloads
* in the context of a persistent operation (either a foreground service or via Android's {@link
* android.app.job.JobScheduler} mechanism).
*
* <p>This is intended as a functional (but not drop-in) replacement for Android's {@link
* android.app.DownloadManager}, which has a number of issues:
*
* <ul>
* <li>It relies on the network stack built into Android, and thus cannot be patched to receive
* updates. For older versions of Android, that means the DownloadManager is vulnerable to
* issues in the network stack, such as b/18432707, which can result in MITM attacks.
* <li>When downloading to external storage on older versions of Android (via {@link
* android.app.DownloadManager.Request#setDestinationInExternalFilesDir} or {@link
* android.app.DownloadManager.Request#setDestinationInExternalPublicDir}), downloads may
* exceed the maximum size of the download directory (see b/22605830).
* <li>Downloads may pause and never resume again without external interference (b/18110151)
* <li>The DownloadManager may lose track of some files (b/18265497)
* </ul>
*
* <p>This library mitigates these issues by performing downloads in-process over the app-provided
* network stack, thus ensuring that an up-to-date network stack is used, and handing off storage
* management directly to the app without any additional restrictions.
*/
public class Downloader {
@VisibleForTesting static final int HTTP_RANGE_NOT_SATISFIABLE = 416;
@VisibleForTesting static final int HTTP_PARTIAL_CONTENT = 206;
private static final ImmutableSet<String> SCHEMES_REQUIRING_CONNECTIVITY =
ImmutableSet.of("http", "https");
private static final Pattern CONTENT_RANGE_HEADER_PATTERN =
Pattern.compile("bytes (\\d+)-(\\d+)/(\\d+|\\*)");
@GuardedBy("SIMPLE_DATE_FORMAT_LOCK")
private static final SimpleDateFormat RFC_1123_FORMATTER;
private static final Object SIMPLE_DATE_FORMAT_LOCK = new Object();
static {
synchronized (SIMPLE_DATE_FORMAT_LOCK) {
RFC_1123_FORMATTER = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.US);
RFC_1123_FORMATTER.setTimeZone(TimeZone.getTimeZone("UTC"));
}
}
/** Builder for configuring and constructing an instance of the Downloader. */
public static class Builder {
private final Map<String, UrlEngine> urlEngineMap = new HashMap<>();
private Executor ioExecutor;
private DownloaderLogger logger;
private ConnectivityHandler connectivityHandler;
private int maxConcurrentDownloads = 3;
/**
* Creates a downloader builder.
*
* <p>Note that all parameters are required except for {@link #withMaxConcurrentDownloads}.
*/
public Builder() {}
/**
* Specifies the executor to use internally for I/O.
*
* <p>I/O operations can block so don't use a direct executor or one that runs on the main
* thread.
*/
public Builder withIOExecutor(Executor ioExecutor) {
this.ioExecutor = ioExecutor;
return this;
}
/**
* Sets the {@link ConnectivityHandler} to use in order to determine if connectivity is
* sufficient, and if not, how to handle it.
*/
public Builder withConnectivityHandler(ConnectivityHandler connectivityHandler) {
this.connectivityHandler = connectivityHandler;
return this;
}
/**
* Limits the number of downloads in flight at a time. If a download request arrives that would
* exceed this limit, it will be queued until one already in flight completes. Beware that a
* download that is waiting for connectivity requirements is still considered to be in flight,
* so it is possible to saturate the downloader with requests waiting on connectivity
* requirements if the number of concurrent downloads isn't set high enough.
*
* <p>Note that other factors might restrict download concurrency even further, for instance the
* number of threads on the I/O executor when using a blocking engine.
*/
public Builder withMaxConcurrentDownloads(int maxConcurrentDownloads) {
checkArgument(maxConcurrentDownloads > 0);
this.maxConcurrentDownloads = maxConcurrentDownloads;
return this;
}
/**
* Adds an {@link UrlEngine} to handle network requests for the given URL scheme. Note that the
* engine must support the provided scheme, and only one engine may ever be registered for a
* specific URL scheme. An {@link IllegalArgumentException} will be thrown if the engine doesn't
* support the scheme or if an engine is already registered for the scheme.
*/
public Builder addUrlEngine(String scheme, UrlEngine urlEngine) {
checkArgument(
urlEngine.supportedSchemes().contains(scheme),
"Provided UrlEngine must support URL scheme: %s",
scheme);
checkArgument(
!urlEngineMap.containsKey(scheme),
"Requested scheme already has a UrlEngine registered: %s",
scheme);
urlEngineMap.put(scheme, urlEngine);
return this;
}
/**
* Adds an {@link UrlEngine} to handle network requests for the given URL schemes. Note that the
* engine must support the provided schemes, and only one engine may ever be registered for a
* specific URL scheme. An error will be thrown if the engine doesn't support the schemes or if
* an engine is already registered for the schemes.
*/
public Builder addUrlEngine(Iterable<String> schemes, UrlEngine urlEngine) {
for (String scheme : schemes) {
addUrlEngine(scheme, urlEngine);
}
return this;
}
/** Sets the {@link DownloaderLogger} to use for logging in this downloader instance. */
public Builder withLogger(DownloaderLogger logger) {
this.logger = logger;
return this;
}
public Downloader build() {
return new Downloader(this);
}
}
/**
* Value class for capturing a state snapshot of the downloader, for use in the state callbacks
* that can be registered via {@link #registerStateChangeCallback}.
*/
@AutoValue
public abstract static class State {
/**
* Returns the current number of downloads currently in flight, i.e. the number of downloads
* that are concurrently executing, or attempting to make progress on their underlying network
* stack. Note that this number should be in the range of [0, maxConcurrentDownloads), as
* configured by {@link Builder#withMaxConcurrentDownloads}.
*/
public abstract int getNumDownloadsInFlight();
/**
* Returns the number of downloads that have been requested via {@link #execute} but not yet
* started. They may not have been started due to internal asynchronous code, due to waiting on
* connectivity requirements, or due to the limit enforced by {@code maxConcurrentDownloads}.
*/
public abstract int getNumQueuedDownloads();
/**
* Returns the current number of downloads that are waiting for sufficient connectivity
* conditions to be met. These downloads will start running once the device network conditions
* change (e.g. connects to WiFi), if there is enough space as determined by {@code
* maxConcurrentDownloads}.
*
* <p>Note that this number should be in the range of [0, numQueuedDownloads], as a download
* pending connectivity is necessarily queued. However, a download may also be queued due to
* {@code maxConcurrentDownloads}.
*/
public abstract int getNumDownloadsPendingConnectivity();
/** Creates an instance of the state object. Internal-only. */
@VisibleForTesting
public static State create(
int numDownloadsInFlight, int numQueuedDownloads, int numDownloadsPendingConnectivity) {
return new AutoValue_Downloader_State(
numDownloadsInFlight, numQueuedDownloads, numDownloadsPendingConnectivity);
}
}
/**
* Functional callback interface for observing changes to Downloader {@link State}. Used with
* {@link #registerStateChangeCallback} and {@link #unregisterStateChangeCallback}.
*
* <p>Note that the callbacks could just use {@code java.util.function.Consumer} in contexts that
* support the Java 8 SDK, and {@code com.google.common.base.Receiver} in contexts that don't but
* have access to Guava APIs that aren't made public.
*/
public interface StateChangeCallback {
void onStateChange(State state);
}
private final ImmutableMap<String, UrlEngine> urlEngineMap;
private final Executor ioExecutor;
private final DownloaderLogger logger;
private final ConnectivityHandler connectivityHandler;
private final int maxConcurrentDownloads;
@GuardedBy("lock")
private final IdentityHashMap<StateChangeCallback, Executor> stateCallbackMap =
new IdentityHashMap<>();
// TODO: Consider using a PriorityQueue here instead, so that queued downloads can
// retain the order in the queue as they get added/removed due to waiting on connectivity.
@GuardedBy("lock")
private final Queue<QueuedDownload> queuedDownloads = new ArrayDeque<>();
@GuardedBy("lock")
private final List<FluentFuture<DownloadResult>> unresolvedFutures = new ArrayList<>();
private final Object lock = new Object();
@GuardedBy("lock")
private int numDownloadsInFlight = 0;
@GuardedBy("lock")
private int numDownloadsPendingConnectivity = 0;
private Downloader(Builder builder) {
ImmutableMap<String, UrlEngine> urlEngineMap = ImmutableMap.copyOf(builder.urlEngineMap);
checkArgument(!urlEngineMap.isEmpty(), "Must have at least one UrlEngine");
checkArgument(builder.ioExecutor != null, "Must set a callback executor");
checkArgument(builder.logger != null, "Must set a logger");
checkArgument(builder.connectivityHandler != null, "Must set a connectivity handler");
this.urlEngineMap = urlEngineMap;
this.ioExecutor = builder.ioExecutor;
this.logger = builder.logger;
this.connectivityHandler = builder.connectivityHandler;
this.maxConcurrentDownloads = builder.maxConcurrentDownloads;
}
/**
* Creates a new {@link DownloadRequest.Builder} for the given {@link URI} and {@link
* DownloadDestination}. The builder may be used to further customize the request. To execute the
* request, pass the built request to {@link #execute}.
*/
@CheckReturnValue
public DownloadRequest.Builder newRequestBuilder(URI uri, DownloadDestination destination) {
return DownloadRequest.newBuilder()
.setDestination(destination)
.setUri(uri)
.setDownloadConstraints(DownloadConstraints.NETWORK_CONNECTED);
}
/**
* Executes the provided request. The request will be handled by the underlying {@link UrlEngine}
* and the result is streamed to the {@link DownloadDestination} created for the request. The
* download result is provided asynchronously as a {@link FluentFuture} that resolves when the
* download is complete.
*
* <p>The download can be cancelled by calling {@link FluentFuture#cancel} on the future instance
* returned by this method. Cancellation is best-effort and does not guarantee that the download
* will stop immediately, as it is impossible to stop a thread that is in the middle of reading
* bytes off the network.
*
* <p>Note that this method is not idempotent! The downloader does not attempt to merge/de-dupe
* incoming requests, even if the same exact request is executed twice. The calling code needs to
* be careful to manage its downloads in such a way that duplicated downloads don't occur.
*
* <p>TODO: Document what types of exceptions can be set on the returned future, and how clients
* are expected to handle them.
*/
@CheckReturnValue
public FluentFuture<DownloadResult> execute(DownloadRequest request) {
FluentFuture<DownloadResult> resultFuture;
synchronized (lock) {
ClosingFuture<DownloadResult> closingFuture = enqueueRequest(request);
closingFuture.statusFuture().addListener(() -> onDownloadComplete(request), ioExecutor);
resultFuture = closingFuture.finishToFuture();
unresolvedFutures.add(resultFuture);
resultFuture.addListener(
() -> {
synchronized (lock) {
unresolvedFutures.remove(resultFuture);
}
},
MoreExecutors.directExecutor());
}
logger.logFine("New request enqueued, running queue: %s", request.uri());
maybeRunQueuedDownloads();
return resultFuture;
}
/**
* Cancels <strong>all</strong> ongoing downloads. This will result in all unresolved {@link
* FluentFuture} instances returned by {@link #execute} to be cancelled immediately and have their
* error callbacks invoked.
*
* <p>Because implementations of {@link FluentFuture} allow callbacks to be garbage collected
* after the future is resolved, calling {@code cancelAll} is an effective way to avoid having the
* Downloader leak memory after it is logically no longer needed, e.g. if it is only used from an
* Android {@link android.app.Activity}, and that Activity is destroyed.
*
* <p>However, in general the Downloader should be run from a process-level context, e.g. in an
* Android {@link android.app.Service}, so that the downloader doesn't implicitly hold on to
* UI-scoped objects.
*/
public void cancelAll() {
List<FluentFuture<DownloadResult>> unresolvedFuturesCopy;
synchronized (lock) {
// Copy the set of unresolved futures to a local variable to avoid hitting
// ConcurrentModificationExceptions, which could happen since canceling the future may
// trigger the future callback in execute() that removes the future from the set of
// unresolved futures.
unresolvedFuturesCopy = new ArrayList<>(unresolvedFutures);
unresolvedFutures.clear();
}
for (FluentFuture<DownloadResult> unresolvedFuture : unresolvedFuturesCopy) {
unresolvedFuture.cancel(true);
}
}
/**
* Registers an {@link StateChangeCallback} with this downloader instance, to be run when various
* state changes occur. The callback will be executed on the provided {@link Executor}. Use {@link
* #unregisterStateChangeCallback} to unregister a previously registered callback. The callback is
* invoked when the following state transitions occur:
*
* <ul>
* <li>A download was requested via a call to {@link #execute}
* <li>A download completed
* <li>A download started waiting for its connectivity constraints to be satisfied
* <li>A download stopped waiting for its connectivity constraints to be satisfied
* </ul>
*
* <p>A newly registered callback instance will only called for state changes that are triggered
* after the registration; there is no mechanism to replay previous state changes on the callback.
*
* <p>Registering the same callback multiple times has no effect, except that it will overwrite
* the {@link Executor} that was used in a previous registration call.
*
* <p>Invocation of the callbacks will be internally serialized to avoid concurrent invocations of
* the callback with possibly conflicting state. A new invocation of the callback will not start
* running until the previous one has completed. If multiple callbacks are being registered that
* must be synchronized with each other, then the caller must take care to coordinate this
* externally, such as locking in the callbacks or using an executor that guarantees
* serialization, such as {@link MoreExecutors#newSequentialExecutor}.
*
* <p>Warning: Do not use {@link MoreExecutors#directExecutor} or similar executor mechanisms, as
* doing so can easily lead to a deadlock, since the internal downloader lock is held while
* scheduling the callbacks on the provided executor.
*/
public void registerStateChangeCallback(StateChangeCallback callback, Executor executor) {
synchronized (lock) {
stateCallbackMap.put(callback, MoreExecutors.newSequentialExecutor(executor));
}
}
/**
* Unregisters a previously registered {@link StateChangeCallback} with this downloader instance.
*
* <p>Attempting to unregister a callback that was never registered is a no-op.
*/
public void unregisterStateChangeCallback(StateChangeCallback callback) {
synchronized (lock) {
stateCallbackMap.remove(callback);
}
}
private void onDownloadComplete(DownloadRequest request) {
synchronized (lock) {
// The number of downloads should be well-balanced and this case should never
// trigger, so this is just a defensive check.
checkState(
numDownloadsInFlight >= 0, "Encountered < 0 downloads in flight, this shouldn't happen");
}
logger.logFine("Download complete, running queued downloads: %s", request.uri());
maybeRunQueuedDownloads();
}
private void maybeRunQueuedDownloads() {
// Loop until we run out of download slots or out of queued downloads. It should be impossible
// for this to loop forever. Also, note that the synchronized block is inside the loop, since
// the outer loop conditions don't touch shared mutable state.
while (true) {
synchronized (lock) {
if (numDownloadsInFlight >= maxConcurrentDownloads) {
logger.logInfo("Exceeded max concurrent downloads, not running another queued request");
return;
}
QueuedDownload queuedDownload = queuedDownloads.poll();
if (queuedDownload == null) {
return;
}
DownloadRequest request = queuedDownload.request();
ListenableFuture<Void> connectivityFuture = checkConnectivity(request);
if (connectivityFuture.isDone()) {
logger.logFine("Connectivity satisfied; running request. uri=%s", request.uri());
numDownloadsInFlight++;
ListenableFuture<?> statusFuture = queuedDownload.resultFuture().statusFuture();
statusFuture.addListener(
() -> {
synchronized (lock) {
numDownloadsInFlight--;
// One less download in flight, let the state change listeners know.
runStateCallbacks();
}
// A download slot was just freed up, run queued downloads again.
logger.logFine(
"Queued download completed, running queued downloads: %s", request.uri());
maybeRunQueuedDownloads();
},
ioExecutor);
// A new download is about to be in flight, let state callbacks know about the
// state change.
runStateCallbacks();
queuedDownload.task().run();
} else {
logger.logInfo("Waiting on connectivity for request: uri=%s", request.uri());
handlePendingConnectivity(connectivityFuture, queuedDownload);
}
}
}
}
@GuardedBy("lock")
private void handlePendingConnectivity(
ListenableFuture<Void> connectivityFuture, QueuedDownload queuedDownload) {
// Keep track of the number of requests waiting.
numDownloadsPendingConnectivity++;
connectivityFuture.addListener(
() -> {
synchronized (lock) {
numDownloadsPendingConnectivity--;
// Let the listeners know we are no longer waiting.
runStateCallbacks();
}
},
MoreExecutors.directExecutor());
// Let the listeners know we're waiting.
runStateCallbacks();
Futures.addCallback(
connectivityFuture,
new FutureCallback<Void>() {
@Override
public void onSuccess(Void result1) {
logger.logInfo("Connectivity changed, running queued requests");
requeue(queuedDownload);
}
@Override
public void onFailure(Throwable t) {
if (t instanceof TimeoutException) {
logger.logInfo("Timed out waiting for connectivity change");
requeue(queuedDownload);
} else if (t instanceof CancellationException) {
logger.logFine("Connectivity future cancelled, running queued downloads");
maybeRunQueuedDownloads();
} else {
logger.logError(t, "Error observing connectivity changes");
cancelAll();
}
}
},
ioExecutor);
// Run state callbacks and cancel the connectivity future when the result task resolves.
// It doesn't matter if it succeeded or failed, either way it means we no longer need to wait
// for connectivity.
queuedDownload
.task()
.addListener(
() -> {
synchronized (lock) {
logger.logInfo("Queued task completed, cancelling connectivity check");
runStateCallbacks();
connectivityFuture.cancel(false);
}
},
MoreExecutors.directExecutor());
}
private void requeue(QueuedDownload queuedDownload) {
synchronized (lock) {
addToQueue(queuedDownload);
}
logger.logInfo(
"Requeuing download after connectivity change: %s", queuedDownload.request().uri());
maybeRunQueuedDownloads();
}
private ClosingFuture<DownloadResult> enqueueRequest(DownloadRequest request) {
synchronized (lock) {
ListenableFutureTask<Void> task = ListenableFutureTask.create(() -> null);
// When the task runs (i.e. is taken off the queue and is explicitly run), all pre-flight
// checks should have been made, so at that point the request is send to the underlying
// network stack for execution.
ClosingFuture<DownloadResult> resultFuture =
ClosingFuture.from(task)
.transformAsync((closer, result) -> runRequest(request), ioExecutor);
addToQueue(QueuedDownload.create(request, task, resultFuture));
return resultFuture;
}
}
@GuardedBy("lock")
private void addToQueue(QueuedDownload queuedDownload) {
queuedDownloads.add(queuedDownload);
// Make sure that when the task completes, the queued download is definitely removed
// from the queue. This is necessary to be robust in the face of cancellation, as
// canceled tasks may not get removed from the queue otherwise.
queuedDownload
.task()
.addListener(
() -> {
synchronized (lock) {
if (queuedDownloads.remove(queuedDownload)) {
// If the queued download was actually removed, update the state callbacks
// to reflect the state change.
runStateCallbacks();
}
}
},
MoreExecutors.directExecutor());
// Now that a new request is on the queue, run the state callbacks.
runStateCallbacks();
}
private ClosingFuture<DownloadResult> runRequest(DownloadRequest request) throws IOException {
URI uri = request.uri();
UrlEngine urlEngine = checkNotNull(urlEngineMap.get(uri.getScheme()));
UrlRequest.Builder urlRequestBuilder = urlEngine.createRequest(uri.toString());
for (Map.Entry<String, String> entry : request.headers().entries()) {
urlRequestBuilder.addHeader(entry.getKey(), entry.getValue());
}
long numExistingBytes = request.destination().numExistingBytes();
if (numExistingBytes > 0) {
logger.logInfo(
"Existing bytes found. numExistingBytes=%d, uri=%s", numExistingBytes, request.uri());
urlRequestBuilder.addHeader(HttpHeaders.RANGE, "bytes=" + numExistingBytes + "-");
DownloadMetadata destinationMetadata = request.destination().readMetadata();
String contentTag = destinationMetadata.getContentTag();
long lastModifiedTimeSeconds = destinationMetadata.getLastModifiedTimeSeconds();
if (!contentTag.isEmpty()) {
urlRequestBuilder.addHeader(HttpHeaders.IF_RANGE, contentTag);
} else if (lastModifiedTimeSeconds > 0) {
urlRequestBuilder.addHeader(
HttpHeaders.IF_RANGE, formatRfc1123Date(lastModifiedTimeSeconds));
} else {
// TODO: This should probably just clear the destination and remove the Range
// header so there's no chance of data corruption. Leaving this as-is for now to
// keep supporting range requests for offline maps.
logger.logWarning(
"Sending range request without If-Range header, due to missing destination "
+ "metadata. Data corruption is possible.");
}
}
ListenableFuture<UrlRequest> urlRequestFuture;
OAuthTokenProvider oAuthTokenProvider = request.oAuthTokenProvider();
if (oAuthTokenProvider != null) {
urlRequestFuture =
Futures.transform(
oAuthTokenProvider.provideOAuthToken(uri),
authToken -> {
if (authToken != null) {
urlRequestBuilder.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + authToken);
}
return urlRequestBuilder.build();
},
ioExecutor);
} else {
urlRequestFuture = Futures.immediateFuture(urlRequestBuilder.build());
}
return ClosingFuture.from(urlRequestFuture)
.transform((closer, urlRequest) -> checkNotNull(urlRequest).send(), ioExecutor)
.transformAsync(
(closer, responseFuture) -> completeRequest(request, checkNotNull(responseFuture)),
ioExecutor);
}
/**
* @param request the original request that triggered this call
* @param responseFuture the {@link UrlResponse}, provided asynchronously via a {@link
* ListenableFuture}
*/
private ClosingFuture<DownloadResult> completeRequest(
DownloadRequest request, ListenableFuture<UrlResponse> responseFuture) {
return ClosingFuture.from(responseFuture)
.transformAsync(
(closer, urlResponse) -> {
checkNotNull(urlResponse);
// We want to close the response regardless of whether we succeed or fail.
closer.eventuallyClose(urlResponse, ioExecutor);
logger.logFine(
"Got URL response, starting to read response body. uri=%s", request.uri());
DownloadDestination destination = request.destination();
if (request.headers().containsKey(HttpHeaders.RANGE)
&& checkNotNull(urlResponse).getResponseCode() != HTTP_PARTIAL_CONTENT) {
logger.logFine("Clearing %s as our range request wasn't honored", destination);
destination.clear();
}
WritableByteChannel byteChannel =
destination.openByteChannel(
parseResponseStartOffset(urlResponse), parseResponseMetadata(urlResponse));
closer.eventuallyClose(byteChannel, ioExecutor);
return ClosingFuture.from(checkNotNull(urlResponse).readResponseBody(byteChannel));
},
ioExecutor)
.catching(
RequestException.class,
(closer, requestException) -> {
if (checkNotNull(requestException).getErrorDetails().getHttpStatusCode()
== HTTP_RANGE_NOT_SATISFIABLE) {
// This is a bit of a special edge case. Encountering this error means the server
// rejected our HTTP range request because it was outside the range of available
// bytes. This may well mean that the request was malformed (e.g. data on disk was
// corrupted and the existing file size ended up larger than what exists on the
// server). But the more common cause for this error is that the entire file was
// in fact already downloaded, so the requested range would cover 0 bytes, which
// the server interprets as not satisfiable. To mitigate that case we simply return
// a success state here by indicating 0 bytes were downloaded.
// This isn't exactly ideal, as it means that a potential class of errors will
// go unnoticed. A better solution might find a way to distinguish between the
// common, file-already-downloaded case and the file corrupted case. This solution
// is put in place mainly to retain parity with the older downloader implementation.
return 0L;
} else {
throw new DownloadException(requestException);
}
},
ioExecutor)
.transform(
(closer, bytesWritten) -> {
logger.logFine(
"Response body written. bytesWritten=%d, uri=%s",
checkNotNull(bytesWritten), request.uri());
return DownloadResult.create(checkNotNull(bytesWritten));
},
ioExecutor)
.catchingAsync(
Exception.class,
(closer, exception) -> {
ClosingFuture<DownloadResult> result;
synchronized (lock) {
logger.logWarning(
exception, "Error reading download result. uri=%s", request.uri());
RequestException requestException = getRequestException(exception);
if (requestException != null
&& requestException.getErrorDetails().isRetryableAsIs()) {
// Retry the request by just re-enqueueing it. Note that we also need to
// call maybeRunQueuedRequest to keep the queue moving. Also, in this particular
// case we need to decrement the in-flight downloads count, as we are taking a
// previously in-flight download and putting it back in the queue without
// resolving the underlying result future.
numDownloadsInFlight--;
result = enqueueRequest(request);
} else {
throw new DownloadException(exception);
}
}
logger.logInfo("Running queued downloads after handling request exception");
maybeRunQueuedDownloads();
return result;
},
ioExecutor);
}
@GuardedBy("lock")
private ListenableFuture<Void> checkConnectivity(DownloadRequest request) {
if (!SCHEMES_REQUIRING_CONNECTIVITY.contains(request.uri().getScheme())) {
return Futures.immediateVoidFuture();
}
return connectivityHandler.checkConnectivity(request.downloadConstraints());
}
@GuardedBy("lock")
private void runStateCallbacks() {
State state =
State.create(numDownloadsInFlight, queuedDownloads.size(), numDownloadsPendingConnectivity);
for (Map.Entry<StateChangeCallback, Executor> callbackEntry : stateCallbackMap.entrySet()) {
callbackEntry.getValue().execute(() -> callbackEntry.getKey().onStateChange(state));
}
}
private static String formatRfc1123Date(long unixTimeSeconds) {
synchronized (SIMPLE_DATE_FORMAT_LOCK) {
return RFC_1123_FORMATTER.format(new Date(SECONDS.toMillis(unixTimeSeconds)));
}
}
private static long parseResponseStartOffset(UrlResponse response) throws DownloadException {
if (response.getResponseCode() != HttpURLConnection.HTTP_PARTIAL) {
return 0;
}
List<String> contentRangeHeaders = response.getResponseHeaders().get(HttpHeaders.CONTENT_RANGE);
checkDownloadState(
contentRangeHeaders != null && !contentRangeHeaders.isEmpty(),
"Host returned 206/PARTIAL response code but didn't provide a "
+ "'Content-Range' response header");
String contentRangeHeader = checkNotNull(contentRangeHeaders).get(0);
Matcher matcher = CONTENT_RANGE_HEADER_PATTERN.matcher(contentRangeHeader);
checkDownloadState(
matcher.matches() && matcher.groupCount() > 0,
"Content-Range response header didn't match expected pattern. " + "Was '%s', expected '%s'",
contentRangeHeader,
CONTENT_RANGE_HEADER_PATTERN.pattern());
return Long.parseLong(checkNotNull(matcher.group(1)));
}
private static DownloadMetadata parseResponseMetadata(UrlResponse response)
throws DownloadException {
String contentTag = parseResponseContentTag(response);
long lastModifiedTimeSeconds = parseResponseModifiedTime(response);
return DownloadMetadata.create(contentTag, lastModifiedTimeSeconds);
}
private static long parseResponseModifiedTime(UrlResponse response) throws DownloadException {
List<String> lastModifiedHeaders = response.getResponseHeaders().get(HttpHeaders.LAST_MODIFIED);
if (lastModifiedHeaders == null || lastModifiedHeaders.isEmpty()) {
return 0L;
}
String lastModifiedHeader = lastModifiedHeaders.get(0);
Date date;
try {
synchronized (SIMPLE_DATE_FORMAT_LOCK) {
date = RFC_1123_FORMATTER.parse(lastModifiedHeader);
}
if (date == null) {
throw new DownloadException("Invalid Last-Modified header: " + lastModifiedHeader);
}
} catch (ParseException e) {
throw new DownloadException("Invalid Last-Modified header: " + lastModifiedHeader, e);
}
return MILLISECONDS.toSeconds(date.getTime());
}
private static String parseResponseContentTag(UrlResponse response) {
List<String> contentTagHeaders = response.getResponseHeaders().get(HttpHeaders.ETAG);
if (contentTagHeaders == null || contentTagHeaders.isEmpty()) {
return "";
}
return contentTagHeaders.get(0);
}
@AutoValue
abstract static class QueuedDownload {
abstract DownloadRequest request();
abstract ListenableFutureTask<?> task();
abstract ClosingFuture<DownloadResult> resultFuture();
static QueuedDownload create(
DownloadRequest request,
ListenableFutureTask<?> task,
ClosingFuture<DownloadResult> resultFuture) {
return new AutoValue_Downloader_QueuedDownload(request, task, resultFuture);
}
}
@VisibleForTesting
@Nullable
static RequestException getRequestException(@Nullable Throwable throwable) {
if (throwable == null) {
return null;
} else {
return (RequestException)
Iterables.find(
getCausalChain(throwable),
instanceOf(RequestException.class),
/* defaultValue= */ null);
}
}
@FormatMethod
private static void checkDownloadState(
boolean state, @FormatString String message, Object... args) throws DownloadException {
if (!state) {
throw new DownloadException(String.format(message, args));
}
}
}