blob: e790d6c906cab66461958efea111a37e5679e2d6 [file] [log] [blame]
/*
* Copyright (C) 2015 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.android.repository.impl.manager;
import com.android.annotations.NonNull;
import com.android.annotations.Nullable;
import com.android.repository.api.Channel;
import com.android.repository.api.DelegatingProgressIndicator;
import com.android.repository.api.Downloader;
import com.android.repository.api.FallbackRemoteRepoLoader;
import com.android.repository.api.ProgressIndicator;
import com.android.repository.api.ProgressIndicatorAdapter;
import com.android.repository.api.RemotePackage;
import com.android.repository.api.Repository;
import com.android.repository.api.RepositorySource;
import com.android.repository.api.RepositorySourceProvider;
import com.android.repository.api.SettingsController;
import com.android.repository.impl.meta.SchemaModuleUtil;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.JAXBException;
/**
* Utility class that loads {@link Repository}s from {@link RepositorySource}s.
*/
public class RemoteRepoLoaderImpl implements RemoteRepoLoader {
/**
* Timeout to wait for the packages to be fetched. Each time the timeout is reached, a warning
* will be logged but waiting for the thread pool termination will continue. It is expected that
* network operations will eventually time out on their own and/or throw exception in the worst
* case, leading to the thread pool termination anyway.
*/
private static final int FETCH_PACKAGES_WAITING_ITERATION_SECONDS = 10;
private static final String FETCH_PACKAGES_WAITING_MESSAGE =
"Still waiting for package manifests to be fetched remotely.";
/** {@link FallbackRemoteRepoLoader} to use if we get an XML file we can't parse. */
private final FallbackRemoteRepoLoader mFallback;
/**
* The {@link RepositorySourceProvider}s to load from.
*/
private final Collection<RepositorySourceProvider> mSourceProviders;
/**
* Constructor
*
* @param sources The {@link RepositorySourceProvider}s to get the {@link
* RepositorySource}s to load from.
* @param fallback The {@link FallbackRemoteRepoLoader} to use if we can't parse an XML
* file.
*/
public RemoteRepoLoaderImpl(@NonNull Collection<RepositorySourceProvider> sources,
@Nullable FallbackRemoteRepoLoader fallback) {
mSourceProviders = sources;
mFallback = fallback;
}
@Override
@NonNull
public Map<String, RemotePackage> fetchPackages(
@NonNull ProgressIndicator progress,
@NonNull Downloader downloader,
@Nullable SettingsController settings) {
Map<RepositorySource, Collection<? extends RemotePackage>> parsedPackages = new HashMap<>();
Map<RepositorySource, Collection<? extends RemotePackage>> legacyParsedPackages =
new HashMap<>();
List<RepositorySource> sources = Lists.newArrayList();
double progressMax = 0;
for (RepositorySourceProvider provider : mSourceProviders) {
progressMax += 0.1 / mSourceProviders.size();
sources.addAll(
provider.getSources(
downloader, progress.createSubProgress(progressMax), false));
}
// In the context below we are not concerned that much about the precise progress reporting, because
// the manifests downloading is about a dozen tiny files, so it's expected to be relatively fast
// and OTOH a lock-based thread-safe ProgressIndicator implementation would even likely be a bottleneck
// (presumably, probability of that is linearly proportional to how often the progress gets reported
// by the downloads). So we are going to report precise progress only when we manage to process one
// download result, but we won't report the partial progress of every single download. For the latter,
// progress reporting will only be limited to logging in case there are any issues (and logging is deemed
// thread-safe).
boolean wasIndeterminate = progress.isIndeterminate();
progress.setIndeterminate(true);
LoggingOnlyProgressIndicator loggingOnlyProgress =
new LoggingOnlyProgressIndicator(progress);
// This is a typical producer-consumer context - first spawn all the download threads, and then process
// the download results one by one in this thread, waiting where necessary. This ensures that starvation
// on both sides is minimized (as opposed to e.g. first waiting for _all_ downloads to complete, and then
// processing them sequentially).
ArrayBlockingQueue<Map.Entry<RepositorySource, InputStream>> downloadedRepoManifests =
new ArrayBlockingQueue<>(sources.size());
ExecutorService sourceThreadPool = Executors.newCachedThreadPool();
int threadsSubmitted = 0, resultsReceived = 0;
try {
for (RepositorySource source : sources) {
if (!source.isEnabled()) {
continue;
}
++threadsSubmitted;
sourceThreadPool.submit(
() -> {
String errorMessage = null;
Throwable error = null;
try {
InputStream repoStream =
downloader.downloadAndStream(
new URL(source.getUrl()), loggingOnlyProgress);
downloadedRepoManifests.put(
new AbstractMap.SimpleImmutableEntry<>(source, repoStream));
} catch (MalformedURLException e) {
errorMessage = "Malformed URL";
error = e;
} catch (IOException e) {
errorMessage = "IO exception while downloading manifest";
error = e;
} catch (InterruptedException e) {
errorMessage =
"Thread interrupted while enqueuing downloaded manifest";
error = e;
}
if (errorMessage != null) {
source.setFetchError(errorMessage);
progress.logWarning(errorMessage, error);
}
});
}
// Indicate we're not going to add any more tasks for processing.
sourceThreadPool.shutdown();
// Collect & process the results, blocking where necessary & checking whether there is anything to wait for
// before blocking again.
progress.setIndeterminate(false);
double progressIncrement = 0.9 / (sources.size() * 2);
while ((!sourceThreadPool.isTerminated() && (resultsReceived < threadsSubmitted))
|| !downloadedRepoManifests.isEmpty()) {
Map.Entry<RepositorySource, InputStream> repoResult = null;
try {
for (int waitedSeconds = 0;
waitedSeconds < FETCH_PACKAGES_WAITING_ITERATION_SECONDS;
++waitedSeconds) {
// Check sourceThreadPool every second to detect early termination.
repoResult = downloadedRepoManifests.poll(1, TimeUnit.SECONDS);
if (repoResult != null || sourceThreadPool.isTerminated()) {
break;
}
}
} catch (InterruptedException e) {
// ignored
}
if (repoResult == null) {
// Timeout has elapsed.
progress.logWarning(FETCH_PACKAGES_WAITING_MESSAGE);
} else {
// We have got a download result - process it while other downloaders are working.
++resultsReceived;
progressMax += progressIncrement;
RepositorySource source = repoResult.getKey();
InputStream repoStream = repoResult.getValue();
parseSource(
source,
repoStream,
downloader,
settings,
parsedPackages,
legacyParsedPackages,
progress,
progressMax);
progressMax += progressIncrement;
progress.setFraction(progressMax);
}
}
} finally {
shutdownAndJoin(sourceThreadPool, progress);
progress.setIndeterminate(wasIndeterminate);
}
Map<String, RemotePackage> result = new HashMap<>();
for (RepositorySource source : sources) {
Collection<? extends RemotePackage> regularPackages = parsedPackages.get(source);
if (regularPackages != null) {
mergePackages(regularPackages, source, settings, result);
}
}
for (RepositorySource source : sources) {
// Legacy after since they are lower priority
Collection<? extends RemotePackage> legacyPackages = legacyParsedPackages.get(source);
if (legacyPackages != null) {
mergePackages(legacyPackages, source, settings, result);
}
}
return result;
}
private void parseSource(
@NonNull RepositorySource source,
@NonNull InputStream repoStream,
@NonNull Downloader downloader,
@Nullable SettingsController settings,
@NonNull Map<RepositorySource, Collection<? extends RemotePackage>> result,
@NonNull Map<RepositorySource, Collection<? extends RemotePackage>> legacyResult,
@NonNull ProgressIndicator progress,
double progressMax) {
final List<String> errors = Lists.newArrayList();
// Don't show the errors, in case the fallback loader can read it. But keep
// track of them to show later in case not.
ProgressIndicator unmarshalProgress =
new ProgressIndicatorAdapter() {
@Override
public void logWarning(@NonNull String s, Throwable e) {
errors.add(s);
if (e != null) {
errors.add(e.toString());
}
}
@Override
public void logError(@NonNull String s, Throwable e) {
errors.add(s);
if (e != null) {
errors.add(e.toString());
}
}
};
Repository repo = null;
try {
repo =
(Repository)
SchemaModuleUtil.unmarshal(
repoStream,
source.getPermittedModules(),
true,
unmarshalProgress);
} catch (JAXBException e) {
errors.add(e.toString());
}
Collection<? extends RemotePackage> parsedPackages = null;
boolean legacy = false;
if (repo != null) {
parsedPackages = repo.getRemotePackage();
progress.setFraction(progressMax);
} else if (mFallback != null) {
// TODO: don't require downloading again
parsedPackages =
mFallback.parseLegacyXml(
source, downloader, settings, progress.createSubProgress(progressMax));
legacy = true;
}
if (parsedPackages != null && !parsedPackages.isEmpty()) {
(legacy ? legacyResult : result).put(source, parsedPackages);
} else {
progress.logWarning("Errors during XML parse:");
for (String error : errors) {
progress.logWarning(error);
}
//noinspection VariableNotUsedInsideIf
if (mFallback != null) {
progress.logWarning("Additionally, the fallback loader failed to parse the XML.");
}
source.setFetchError(errors.isEmpty() ? "unknown error" : errors.get(0));
}
}
private void mergePackages(
@NonNull Collection<? extends RemotePackage> packagesFromSource,
@NonNull RepositorySource source,
@Nullable SettingsController settings,
@NonNull Map<String, RemotePackage> result) {
for (RemotePackage pkg : packagesFromSource) {
RemotePackage existing = result.get(pkg.getPath());
if (existing != null) {
int compare = existing.getVersion().compareTo(pkg.getVersion());
if (compare > 0) {
// If there are multiple versions of the same package available,
// pick the latest.
continue;
} else if (compare == 0) {
// If there's a file:// version (for debugging) and an http:// version, use the
// file:// version.
try {
URL newUrl = new URL(source.getUrl());
String newProtocol = newUrl.getProtocol();
if (!newProtocol.equals("file")) {
// If the existing package is local, use it.
continue;
}
} catch (MalformedURLException ignore) {
// If it's not a valid url, don't prioritize it.
continue;
}
}
}
Channel settingsChannel =
settings == null || settings.getChannel() == null
? pkg.createFactory().createChannelType(Channel.DEFAULT_ID)
: settings.getChannel();
if (pkg.getArchive() != null && pkg.getChannel().compareTo(settingsChannel) <= 0) {
pkg.setSource(source);
result.put(pkg.getPath(), pkg);
}
source.setFetchError(null);
}
}
private static void shutdownAndJoin(
@NonNull ExecutorService threadPool, @NonNull ProgressIndicator progress) {
if (threadPool.isTerminated()) {
return;
}
threadPool.shutdown();
try {
while (!threadPool.awaitTermination(
FETCH_PACKAGES_WAITING_ITERATION_SECONDS, TimeUnit.SECONDS)) {
progress.logWarning(FETCH_PACKAGES_WAITING_MESSAGE);
}
} catch (InterruptedException ignored) {
// ignored
}
}
/**
* A thread-safe implementation of {@link DelegatingProgressIndicator} which does not report the
* fraction, but preserves the ability to report the errors/warnings, as most underlying logging
* implementations are thread-safe.
*/
private static class LoggingOnlyProgressIndicator extends DelegatingProgressIndicator {
LoggingOnlyProgressIndicator(@NonNull ProgressIndicator progress) {
super(progress);
}
@Override
public void setFraction(double fraction) {}
@Override
public double getFraction() {
return 0;
}
@Override
public void setText(@Nullable String text) {}
@Override
public void setSecondaryText(@Nullable String text) {}
@Override
public ProgressIndicator createSubProgress(double max) {
return this;
}
}
}