| /* |
| * Copyright (C) 2020 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.android.volley; |
| |
| import android.os.Handler; |
| import android.os.Looper; |
| import android.os.SystemClock; |
| import androidx.annotation.NonNull; |
| import androidx.annotation.Nullable; |
| import com.android.volley.AsyncCache.OnGetCompleteCallback; |
| import com.android.volley.AsyncNetwork.OnRequestComplete; |
| import com.android.volley.Cache.Entry; |
| import java.net.HttpURLConnection; |
| import java.util.ArrayList; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.PriorityBlockingQueue; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| /** |
| * An asynchronous request dispatch queue. |
| * |
| * <p>Add requests to the queue with {@link #add(Request)}. Once completed, responses will be |
| * delivered on the main thread (unless a custom {@link ResponseDelivery} has been provided). |
| * |
| * <p><b>WARNING</b>: This API is experimental and subject to breaking changes. Please see |
| * https://github.com/google/volley/wiki/Asynchronous-Volley for more details. |
| */ |
| public class AsyncRequestQueue extends RequestQueue { |
| /** Default number of blocking threads to start. */ |
| private static final int DEFAULT_BLOCKING_THREAD_POOL_SIZE = 4; |
| |
| /** |
| * AsyncCache used to retrieve and store responses. |
| * |
| * <p>{@code null} indicates use of blocking Cache. |
| */ |
| @Nullable private final AsyncCache mAsyncCache; |
| |
| /** AsyncNetwork used to perform nework requests. */ |
| private final AsyncNetwork mNetwork; |
| |
| /** Executor for non-blocking tasks. */ |
| private ExecutorService mNonBlockingExecutor; |
| |
| /** Executor to be used for non-blocking tasks that need to be scheduled. */ |
| private ScheduledExecutorService mNonBlockingScheduledExecutor; |
| |
| /** |
| * Executor for blocking tasks. |
| * |
| * <p>Some tasks in handling requests may not be easy to implement in a non-blocking way, such |
| * as reading or parsing the response data. This executor is used to run these tasks. |
| */ |
| private ExecutorService mBlockingExecutor; |
| |
| /** |
| * This interface may be used by advanced applications to provide custom executors according to |
| * their needs. Apps must create ExecutorServices dynamically given a blocking queue rather than |
| * providing them directly so that Volley can provide a PriorityQueue which will prioritize |
| * requests according to Request#getPriority. |
| */ |
| private ExecutorFactory mExecutorFactory; |
| |
| /** Manage list of waiting requests and de-duplicate requests with same cache key. */ |
| private final WaitingRequestManager mWaitingRequestManager = new WaitingRequestManager(this); |
| |
| /** |
| * Requests which have been queued before cache initialization has completed. |
| * |
| * <p>These requests are kicked off once cache initialization finishes. We avoid enqueuing them |
| * sooner as the cache may not yet be ready. |
| */ |
| private final List<Request<?>> mRequestsAwaitingCacheInitialization = new ArrayList<>(); |
| |
| private volatile boolean mIsCacheInitialized = false; |
| private final Object mCacheInitializationLock = new Object[0]; |
| |
| /** |
| * Sets all the variables, but processing does not begin until {@link #start()} is called. |
| * |
| * @param cache to use for persisting responses to disk. If an AsyncCache was provided, then |
| * this will be a {@link ThrowingCache} |
| * @param network to perform HTTP requests |
| * @param asyncCache to use for persisting responses to disk. May be null to indicate use of |
| * blocking cache |
| * @param responseDelivery interface for posting responses and errors |
| * @param executorFactory Interface to be used to provide custom executors according to the |
| * users needs. |
| */ |
| private AsyncRequestQueue( |
| Cache cache, |
| AsyncNetwork network, |
| @Nullable AsyncCache asyncCache, |
| ResponseDelivery responseDelivery, |
| ExecutorFactory executorFactory) { |
| super(cache, network, /* threadPoolSize= */ 0, responseDelivery); |
| mAsyncCache = asyncCache; |
| mNetwork = network; |
| mExecutorFactory = executorFactory; |
| } |
| |
| /** Sets the executors and initializes the cache. */ |
| @Override |
| public void start() { |
| stop(); // Make sure any currently running threads are stopped |
| |
| // Create blocking / non-blocking executors and set them in the network and stack. |
| mNonBlockingExecutor = mExecutorFactory.createNonBlockingExecutor(getBlockingQueue()); |
| mBlockingExecutor = mExecutorFactory.createBlockingExecutor(getBlockingQueue()); |
| mNonBlockingScheduledExecutor = mExecutorFactory.createNonBlockingScheduledExecutor(); |
| mNetwork.setBlockingExecutor(mBlockingExecutor); |
| mNetwork.setNonBlockingExecutor(mNonBlockingExecutor); |
| mNetwork.setNonBlockingScheduledExecutor(mNonBlockingScheduledExecutor); |
| |
| // Kick off cache initialization, which must complete before any requests can be processed. |
| if (mAsyncCache != null) { |
| mNonBlockingExecutor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| mAsyncCache.initialize( |
| new AsyncCache.OnWriteCompleteCallback() { |
| @Override |
| public void onWriteComplete() { |
| onCacheInitializationComplete(); |
| } |
| }); |
| } |
| }); |
| } else { |
| mBlockingExecutor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| getCache().initialize(); |
| mNonBlockingExecutor.execute( |
| new Runnable() { |
| @Override |
| public void run() { |
| onCacheInitializationComplete(); |
| } |
| }); |
| } |
| }); |
| } |
| } |
| |
| /** Shuts down and nullifies both executors */ |
| @Override |
| public void stop() { |
| if (mNonBlockingExecutor != null) { |
| mNonBlockingExecutor.shutdownNow(); |
| mNonBlockingExecutor = null; |
| } |
| if (mBlockingExecutor != null) { |
| mBlockingExecutor.shutdownNow(); |
| mBlockingExecutor = null; |
| } |
| if (mNonBlockingScheduledExecutor != null) { |
| mNonBlockingScheduledExecutor.shutdownNow(); |
| mNonBlockingScheduledExecutor = null; |
| } |
| } |
| |
| /** Begins the request by sending it to the Cache or Network. */ |
| @Override |
| <T> void beginRequest(Request<T> request) { |
| // If the cache hasn't been initialized yet, add the request to a temporary queue to be |
| // flushed once initialization completes. |
| if (!mIsCacheInitialized) { |
| synchronized (mCacheInitializationLock) { |
| if (!mIsCacheInitialized) { |
| mRequestsAwaitingCacheInitialization.add(request); |
| return; |
| } |
| } |
| } |
| |
| // If the request is uncacheable, send it over the network. |
| if (request.shouldCache()) { |
| if (mAsyncCache != null) { |
| mNonBlockingExecutor.execute(new CacheTask<>(request)); |
| } else { |
| mBlockingExecutor.execute(new CacheTask<>(request)); |
| } |
| } else { |
| sendRequestOverNetwork(request); |
| } |
| } |
| |
| private void onCacheInitializationComplete() { |
| List<Request<?>> requestsToDispatch; |
| synchronized (mCacheInitializationLock) { |
| requestsToDispatch = new ArrayList<>(mRequestsAwaitingCacheInitialization); |
| mRequestsAwaitingCacheInitialization.clear(); |
| mIsCacheInitialized = true; |
| } |
| |
| // Kick off any requests that were queued while waiting for cache initialization. |
| for (Request<?> request : requestsToDispatch) { |
| beginRequest(request); |
| } |
| } |
| |
| @Override |
| <T> void sendRequestOverNetwork(Request<T> request) { |
| mNonBlockingExecutor.execute(new NetworkTask<>(request)); |
| } |
| |
| /** Runnable that gets an entry from the cache. */ |
| private class CacheTask<T> extends RequestTask<T> { |
| CacheTask(Request<T> request) { |
| super(request); |
| } |
| |
| @Override |
| public void run() { |
| // If the request has been canceled, don't bother dispatching it. |
| if (mRequest.isCanceled()) { |
| mRequest.finish("cache-discard-canceled"); |
| return; |
| } |
| |
| mRequest.addMarker("cache-queue-take"); |
| |
| // Attempt to retrieve this item from cache. |
| if (mAsyncCache != null) { |
| mAsyncCache.get( |
| mRequest.getCacheKey(), |
| new OnGetCompleteCallback() { |
| @Override |
| public void onGetComplete(Entry entry) { |
| handleEntry(entry, mRequest); |
| } |
| }); |
| } else { |
| Entry entry = getCache().get(mRequest.getCacheKey()); |
| handleEntry(entry, mRequest); |
| } |
| } |
| } |
| |
| /** Helper method that handles the cache entry after getting it from the Cache. */ |
| private void handleEntry(final Entry entry, final Request<?> mRequest) { |
| if (entry == null) { |
| mRequest.addMarker("cache-miss"); |
| // Cache miss; send off to the network dispatcher. |
| if (!mWaitingRequestManager.maybeAddToWaitingRequests(mRequest)) { |
| sendRequestOverNetwork(mRequest); |
| } |
| return; |
| } |
| |
| // Use a single instant to evaluate cache expiration. Otherwise, a cache entry with |
| // identical soft and hard TTL times may appear to be valid when checking isExpired but |
| // invalid upon checking refreshNeeded(), triggering a soft TTL refresh which should be |
| // impossible. |
| long currentTimeMillis = System.currentTimeMillis(); |
| |
| // If it is completely expired, just send it to the network. |
| if (entry.isExpired(currentTimeMillis)) { |
| mRequest.addMarker("cache-hit-expired"); |
| mRequest.setCacheEntry(entry); |
| if (!mWaitingRequestManager.maybeAddToWaitingRequests(mRequest)) { |
| sendRequestOverNetwork(mRequest); |
| } |
| return; |
| } |
| |
| // We have a cache hit; parse its data for delivery back to the request. |
| mBlockingExecutor.execute(new CacheParseTask<>(mRequest, entry, currentTimeMillis)); |
| } |
| |
| private class CacheParseTask<T> extends RequestTask<T> { |
| Cache.Entry entry; |
| long startTimeMillis; |
| |
| CacheParseTask(Request<T> request, Cache.Entry entry, long startTimeMillis) { |
| super(request); |
| this.entry = entry; |
| this.startTimeMillis = startTimeMillis; |
| } |
| |
| @Override |
| public void run() { |
| mRequest.addMarker("cache-hit"); |
| Response<?> response = |
| mRequest.parseNetworkResponse( |
| new NetworkResponse( |
| HttpURLConnection.HTTP_OK, |
| entry.data, |
| /* notModified= */ false, |
| /* networkTimeMs= */ 0, |
| entry.allResponseHeaders)); |
| mRequest.addMarker("cache-hit-parsed"); |
| |
| if (!entry.refreshNeeded(startTimeMillis)) { |
| // Completely unexpired cache hit. Just deliver the response. |
| getResponseDelivery().postResponse(mRequest, response); |
| } else { |
| // Soft-expired cache hit. We can deliver the cached response, |
| // but we need to also send the request to the network for |
| // refreshing. |
| mRequest.addMarker("cache-hit-refresh-needed"); |
| mRequest.setCacheEntry(entry); |
| // Mark the response as intermediate. |
| response.intermediate = true; |
| |
| if (!mWaitingRequestManager.maybeAddToWaitingRequests(mRequest)) { |
| // Post the intermediate response back to the user and have |
| // the delivery then forward the request along to the network. |
| getResponseDelivery() |
| .postResponse( |
| mRequest, |
| response, |
| new Runnable() { |
| @Override |
| public void run() { |
| sendRequestOverNetwork(mRequest); |
| } |
| }); |
| } else { |
| // request has been added to list of waiting requests |
| // to receive the network response from the first request once it |
| // returns. |
| getResponseDelivery().postResponse(mRequest, response); |
| } |
| } |
| } |
| } |
| |
| private class ParseErrorTask<T> extends RequestTask<T> { |
| VolleyError volleyError; |
| |
| ParseErrorTask(Request<T> request, VolleyError volleyError) { |
| super(request); |
| this.volleyError = volleyError; |
| } |
| |
| @Override |
| public void run() { |
| VolleyError parsedError = mRequest.parseNetworkError(volleyError); |
| getResponseDelivery().postError(mRequest, parsedError); |
| mRequest.notifyListenerResponseNotUsable(); |
| } |
| } |
| |
| /** Runnable that performs the network request */ |
| private class NetworkTask<T> extends RequestTask<T> { |
| NetworkTask(Request<T> request) { |
| super(request); |
| } |
| |
| @Override |
| public void run() { |
| // If the request was cancelled already, do not perform the network request. |
| if (mRequest.isCanceled()) { |
| mRequest.finish("network-discard-cancelled"); |
| mRequest.notifyListenerResponseNotUsable(); |
| return; |
| } |
| |
| final long startTimeMs = SystemClock.elapsedRealtime(); |
| mRequest.addMarker("network-queue-take"); |
| |
| // TODO: Figure out what to do with traffic stats tags. Can this be pushed to the |
| // HTTP stack, or is it no longer feasible to support? |
| |
| // Perform the network request. |
| mNetwork.performRequest( |
| mRequest, |
| new OnRequestComplete() { |
| @Override |
| public void onSuccess(final NetworkResponse networkResponse) { |
| mRequest.addMarker("network-http-complete"); |
| |
| // If the server returned 304 AND we delivered a response already, |
| // we're done -- don't deliver a second identical response. |
| if (networkResponse.notModified && mRequest.hasHadResponseDelivered()) { |
| mRequest.finish("not-modified"); |
| mRequest.notifyListenerResponseNotUsable(); |
| return; |
| } |
| |
| // Parse the response here on the worker thread. |
| mBlockingExecutor.execute( |
| new NetworkParseTask<>(mRequest, networkResponse)); |
| } |
| |
| @Override |
| public void onError(final VolleyError volleyError) { |
| volleyError.setNetworkTimeMs( |
| SystemClock.elapsedRealtime() - startTimeMs); |
| mBlockingExecutor.execute(new ParseErrorTask<>(mRequest, volleyError)); |
| } |
| }); |
| } |
| } |
| |
| /** Runnable that parses a network response. */ |
| private class NetworkParseTask<T> extends RequestTask<T> { |
| NetworkResponse networkResponse; |
| |
| NetworkParseTask(Request<T> request, NetworkResponse networkResponse) { |
| super(request); |
| this.networkResponse = networkResponse; |
| } |
| |
| @Override |
| public void run() { |
| final Response<?> response = mRequest.parseNetworkResponse(networkResponse); |
| mRequest.addMarker("network-parse-complete"); |
| |
| // Write to cache if applicable. |
| // TODO: Only update cache metadata instead of entire |
| // record for 304s. |
| if (mRequest.shouldCache() && response.cacheEntry != null) { |
| if (mAsyncCache != null) { |
| mNonBlockingExecutor.execute(new CachePutTask<>(mRequest, response)); |
| } else { |
| mBlockingExecutor.execute(new CachePutTask<>(mRequest, response)); |
| } |
| } else { |
| finishRequest(mRequest, response, /* cached= */ false); |
| } |
| } |
| } |
| |
| private class CachePutTask<T> extends RequestTask<T> { |
| Response<?> response; |
| |
| CachePutTask(Request<T> request, Response<?> response) { |
| super(request); |
| this.response = response; |
| } |
| |
| @Override |
| public void run() { |
| if (mAsyncCache != null) { |
| mAsyncCache.put( |
| mRequest.getCacheKey(), |
| response.cacheEntry, |
| new AsyncCache.OnWriteCompleteCallback() { |
| @Override |
| public void onWriteComplete() { |
| finishRequest(mRequest, response, /* cached= */ true); |
| } |
| }); |
| } else { |
| getCache().put(mRequest.getCacheKey(), response.cacheEntry); |
| finishRequest(mRequest, response, /* cached= */ true); |
| } |
| } |
| } |
| |
| /** Posts response and notifies listener */ |
| private void finishRequest(Request<?> mRequest, Response<?> response, boolean cached) { |
| if (cached) { |
| mRequest.addMarker("network-cache-written"); |
| } |
| // Post the response back. |
| mRequest.markDelivered(); |
| getResponseDelivery().postResponse(mRequest, response); |
| mRequest.notifyListenerResponseReceived(response); |
| } |
| |
| /** |
| * Factory to create/provide the executors which Volley will use. |
| * |
| * <p>This class may be used by advanced applications to provide custom executors according to |
| * their needs. |
| * |
| * <p>For applications which rely on setting request priority via {@link Request#getPriority}, a |
| * task queue is provided which will prioritize requests of higher priority should the thread |
| * pool itself be exhausted. If a shared pool is provided which does not make use of the given |
| * queue, then lower-priority requests may have tasks executed before higher-priority requests |
| * when enough tasks are in flight to fully saturate the shared pool. |
| */ |
| public abstract static class ExecutorFactory { |
| public abstract ExecutorService createNonBlockingExecutor( |
| BlockingQueue<Runnable> taskQueue); |
| |
| public abstract ExecutorService createBlockingExecutor(BlockingQueue<Runnable> taskQueue); |
| |
| public abstract ScheduledExecutorService createNonBlockingScheduledExecutor(); |
| } |
| |
| /** Provides a BlockingQueue to be used to create executors. */ |
| private static PriorityBlockingQueue<Runnable> getBlockingQueue() { |
| return new PriorityBlockingQueue<>( |
| /* initialCapacity= */ 11, |
| new Comparator<Runnable>() { |
| @Override |
| public int compare(Runnable r1, Runnable r2) { |
| // Vanilla runnables are prioritized first, then RequestTasks are ordered |
| // by the underlying Request. |
| if (r1 instanceof RequestTask) { |
| if (r2 instanceof RequestTask) { |
| return ((RequestTask<?>) r1).compareTo(((RequestTask<?>) r2)); |
| } |
| return 1; |
| } |
| return r2 instanceof RequestTask ? -1 : 0; |
| } |
| }); |
| } |
| |
| /** |
| * Builder is used to build an instance of {@link AsyncRequestQueue} from values configured by |
| * the setters. |
| */ |
| public static class Builder { |
| @Nullable private AsyncCache mAsyncCache = null; |
| private final AsyncNetwork mNetwork; |
| @Nullable private Cache mCache = null; |
| @Nullable private ExecutorFactory mExecutorFactory = null; |
| @Nullable private ResponseDelivery mResponseDelivery = null; |
| |
| public Builder(AsyncNetwork asyncNetwork) { |
| if (asyncNetwork == null) { |
| throw new IllegalArgumentException("Network cannot be null"); |
| } |
| mNetwork = asyncNetwork; |
| } |
| |
| /** |
| * Sets the executor factory to be used by the AsyncRequestQueue. If this is not called, |
| * Volley will create suitable private thread pools. |
| */ |
| public Builder setExecutorFactory(ExecutorFactory executorFactory) { |
| mExecutorFactory = executorFactory; |
| return this; |
| } |
| |
| /** |
| * Sets the response deliver to be used by the AsyncRequestQueue. If this is not called, we |
| * will default to creating a new {@link ExecutorDelivery} with the application's main |
| * thread. |
| */ |
| public Builder setResponseDelivery(ResponseDelivery responseDelivery) { |
| mResponseDelivery = responseDelivery; |
| return this; |
| } |
| |
| /** Sets the AsyncCache to be used by the AsyncRequestQueue. */ |
| public Builder setAsyncCache(AsyncCache asyncCache) { |
| mAsyncCache = asyncCache; |
| return this; |
| } |
| |
| /** Sets the Cache to be used by the AsyncRequestQueue. */ |
| public Builder setCache(Cache cache) { |
| mCache = cache; |
| return this; |
| } |
| |
| /** Provides a default ExecutorFactory to use, if one is never set. */ |
| private ExecutorFactory getDefaultExecutorFactory() { |
| return new ExecutorFactory() { |
| @Override |
| public ExecutorService createNonBlockingExecutor( |
| BlockingQueue<Runnable> taskQueue) { |
| return getNewThreadPoolExecutor( |
| /* maximumPoolSize= */ 1, |
| /* threadNameSuffix= */ "Non-BlockingExecutor", |
| taskQueue); |
| } |
| |
| @Override |
| public ExecutorService createBlockingExecutor(BlockingQueue<Runnable> taskQueue) { |
| return getNewThreadPoolExecutor( |
| /* maximumPoolSize= */ DEFAULT_BLOCKING_THREAD_POOL_SIZE, |
| /* threadNameSuffix= */ "BlockingExecutor", |
| taskQueue); |
| } |
| |
| @Override |
| public ScheduledExecutorService createNonBlockingScheduledExecutor() { |
| return new ScheduledThreadPoolExecutor( |
| /* corePoolSize= */ 0, getThreadFactory("ScheduledExecutor")); |
| } |
| |
| private ThreadPoolExecutor getNewThreadPoolExecutor( |
| int maximumPoolSize, |
| final String threadNameSuffix, |
| BlockingQueue<Runnable> taskQueue) { |
| return new ThreadPoolExecutor( |
| /* corePoolSize= */ 0, |
| /* maximumPoolSize= */ maximumPoolSize, |
| /* keepAliveTime= */ 60, |
| /* unit= */ TimeUnit.SECONDS, |
| taskQueue, |
| getThreadFactory(threadNameSuffix)); |
| } |
| |
| private ThreadFactory getThreadFactory(final String threadNameSuffix) { |
| return new ThreadFactory() { |
| @Override |
| public Thread newThread(@NonNull Runnable runnable) { |
| Thread t = Executors.defaultThreadFactory().newThread(runnable); |
| t.setName("Volley-" + threadNameSuffix); |
| return t; |
| } |
| }; |
| } |
| }; |
| } |
| |
| public AsyncRequestQueue build() { |
| // If neither cache is set by the caller, throw an illegal argument exception. |
| if (mCache == null && mAsyncCache == null) { |
| throw new IllegalArgumentException("You must set one of the cache objects"); |
| } |
| if (mCache == null) { |
| // if no cache is provided, we will provide one that throws |
| // UnsupportedOperationExceptions to pass into the parent class. |
| mCache = new ThrowingCache(); |
| } |
| if (mResponseDelivery == null) { |
| mResponseDelivery = new ExecutorDelivery(new Handler(Looper.getMainLooper())); |
| } |
| if (mExecutorFactory == null) { |
| mExecutorFactory = getDefaultExecutorFactory(); |
| } |
| return new AsyncRequestQueue( |
| mCache, mNetwork, mAsyncCache, mResponseDelivery, mExecutorFactory); |
| } |
| } |
| |
| /** A cache that throws an error if a method is called. */ |
| private static class ThrowingCache implements Cache { |
| @Override |
| public Entry get(String key) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void put(String key, Entry entry) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void initialize() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void invalidate(String key, boolean fullExpire) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void remove(String key) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void clear() { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| } |