blob: 6db0b1cc68c9baaac0c95be5230f9c79575f8d02 [file] [log] [blame]
/*
* Copyright (C) 2011 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.android.volley;
import android.os.Handler;
import android.os.Looper;
import androidx.annotation.IntDef;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A request dispatch queue with a thread pool of dispatchers.
*
* <p>Calling {@link #add(Request)} will enqueue the given Request for dispatch, resolving from
* either cache or network on a worker thread, and then delivering a parsed response on the main
* thread.
*/
public class RequestQueue {
/** Callback interface for completed requests. */
// TODO: This should not be a generic class, because the request type can't be determined at
// compile time, so all calls to onRequestFinished are unsafe. However, changing this would be
// an API-breaking change. See also: https://github.com/google/volley/pull/109
@Deprecated // Use RequestEventListener instead.
public interface RequestFinishedListener<T> {
/** Called when a request has finished processing. */
void onRequestFinished(Request<T> request);
}
/** Request event types the listeners {@link RequestEventListener} will be notified about. */
@Retention(RetentionPolicy.SOURCE)
@IntDef({
RequestEvent.REQUEST_QUEUED,
RequestEvent.REQUEST_CACHE_LOOKUP_STARTED,
RequestEvent.REQUEST_CACHE_LOOKUP_FINISHED,
RequestEvent.REQUEST_NETWORK_DISPATCH_STARTED,
RequestEvent.REQUEST_NETWORK_DISPATCH_FINISHED,
RequestEvent.REQUEST_FINISHED
})
public @interface RequestEvent {
/** The request was added to the queue. */
public static final int REQUEST_QUEUED = 0;
/** Cache lookup started for the request. */
public static final int REQUEST_CACHE_LOOKUP_STARTED = 1;
/**
* Cache lookup finished for the request and cached response is delivered or request is
* queued for network dispatching.
*/
public static final int REQUEST_CACHE_LOOKUP_FINISHED = 2;
/** Network dispatch started for the request. */
public static final int REQUEST_NETWORK_DISPATCH_STARTED = 3;
/** The network dispatch finished for the request and response (if any) is delivered. */
public static final int REQUEST_NETWORK_DISPATCH_FINISHED = 4;
/**
* All the work associated with the request is finished and request is removed from all the
* queues.
*/
public static final int REQUEST_FINISHED = 5;
}
/** Callback interface for request life cycle events. */
public interface RequestEventListener {
/**
* Called on every request lifecycle event. Can be called from different threads. The call
* is blocking request processing, so any processing should be kept at minimum or moved to
* another thread.
*/
void onRequestEvent(Request<?> request, @RequestEvent int event);
}
/** Used for generating monotonically-increasing sequence numbers for requests. */
private final AtomicInteger mSequenceGenerator = new AtomicInteger();
/**
* The set of all requests currently being processed by this RequestQueue. A Request will be in
* this set if it is waiting in any queue or currently being processed by any dispatcher.
*/
private final Set<Request<?>> mCurrentRequests = new HashSet<>();
/** The cache triage queue. */
private final PriorityBlockingQueue<Request<?>> mCacheQueue = new PriorityBlockingQueue<>();
/** The queue of requests that are actually going out to the network. */
private final PriorityBlockingQueue<Request<?>> mNetworkQueue = new PriorityBlockingQueue<>();
/** Number of network request dispatcher threads to start. */
private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4;
/** Cache interface for retrieving and storing responses. */
private final Cache mCache;
/** Network interface for performing requests. */
private final Network mNetwork;
/** Response delivery mechanism. */
private final ResponseDelivery mDelivery;
/** The network dispatchers. */
private final NetworkDispatcher[] mDispatchers;
/** The cache dispatcher. */
private CacheDispatcher mCacheDispatcher;
private final List<RequestFinishedListener> mFinishedListeners = new ArrayList<>();
/** Collection of listeners for request life cycle events. */
private final List<RequestEventListener> mEventListeners = new ArrayList<>();
/**
* Creates the worker pool. Processing will not begin until {@link #start()} is called.
*
* @param cache A Cache to use for persisting responses to disk
* @param network A Network interface for performing HTTP requests
* @param threadPoolSize Number of network dispatcher threads to create
* @param delivery A ResponseDelivery interface for posting responses and errors
*/
public RequestQueue(
Cache cache, Network network, int threadPoolSize, ResponseDelivery delivery) {
mCache = cache;
mNetwork = network;
mDispatchers = new NetworkDispatcher[threadPoolSize];
mDelivery = delivery;
}
/**
* Creates the worker pool. Processing will not begin until {@link #start()} is called.
*
* @param cache A Cache to use for persisting responses to disk
* @param network A Network interface for performing HTTP requests
* @param threadPoolSize Number of network dispatcher threads to create
*/
public RequestQueue(Cache cache, Network network, int threadPoolSize) {
this(
cache,
network,
threadPoolSize,
new ExecutorDelivery(new Handler(Looper.getMainLooper())));
}
/**
* Creates the worker pool. Processing will not begin until {@link #start()} is called.
*
* @param cache A Cache to use for persisting responses to disk
* @param network A Network interface for performing HTTP requests
*/
public RequestQueue(Cache cache, Network network) {
this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE);
}
/** Starts the dispatchers in this queue. */
public void start() {
stop(); // Make sure any currently running dispatchers are stopped.
// Create the cache dispatcher and start it.
mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
mCacheDispatcher.start();
// Create network dispatchers (and corresponding threads) up to the pool size.
for (int i = 0; i < mDispatchers.length; i++) {
NetworkDispatcher networkDispatcher =
new NetworkDispatcher(mNetworkQueue, mNetwork, mCache, mDelivery);
mDispatchers[i] = networkDispatcher;
networkDispatcher.start();
}
}
/** Stops the cache and network dispatchers. */
public void stop() {
if (mCacheDispatcher != null) {
mCacheDispatcher.quit();
}
for (final NetworkDispatcher mDispatcher : mDispatchers) {
if (mDispatcher != null) {
mDispatcher.quit();
}
}
}
/** Gets a sequence number. */
public int getSequenceNumber() {
return mSequenceGenerator.incrementAndGet();
}
/** Gets the {@link Cache} instance being used. */
public Cache getCache() {
return mCache;
}
/**
* A simple predicate or filter interface for Requests, for use by {@link
* RequestQueue#cancelAll(RequestFilter)}.
*/
public interface RequestFilter {
boolean apply(Request<?> request);
}
/**
* Cancels all requests in this queue for which the given filter applies.
*
* @param filter The filtering function to use
*/
public void cancelAll(RequestFilter filter) {
synchronized (mCurrentRequests) {
for (Request<?> request : mCurrentRequests) {
if (filter.apply(request)) {
request.cancel();
}
}
}
}
/**
* Cancels all requests in this queue with the given tag. Tag must be non-null and equality is
* by identity.
*/
public void cancelAll(final Object tag) {
if (tag == null) {
throw new IllegalArgumentException("Cannot cancelAll with a null tag");
}
cancelAll(
new RequestFilter() {
@Override
public boolean apply(Request<?> request) {
return request.getTag() == tag;
}
});
}
/**
* Adds a Request to the dispatch queue.
*
* @param request The request to service
* @return The passed-in request
*/
public <T> Request<T> add(Request<T> request) {
// Tag the request as belonging to this queue and add it to the set of current requests.
request.setRequestQueue(this);
synchronized (mCurrentRequests) {
mCurrentRequests.add(request);
}
// Process requests in the order they are added.
request.setSequence(getSequenceNumber());
request.addMarker("add-to-queue");
sendRequestEvent(request, RequestEvent.REQUEST_QUEUED);
beginRequest(request);
return request;
}
<T> void beginRequest(Request<T> request) {
// If the request is uncacheable, skip the cache queue and go straight to the network.
if (!request.shouldCache()) {
sendRequestOverNetwork(request);
} else {
mCacheQueue.add(request);
}
}
/**
* Called from {@link Request#finish(String)}, indicating that processing of the given request
* has finished.
*/
@SuppressWarnings("unchecked") // see above note on RequestFinishedListener
<T> void finish(Request<T> request) {
// Remove from the set of requests currently being processed.
synchronized (mCurrentRequests) {
mCurrentRequests.remove(request);
}
synchronized (mFinishedListeners) {
for (RequestFinishedListener<T> listener : mFinishedListeners) {
listener.onRequestFinished(request);
}
}
sendRequestEvent(request, RequestEvent.REQUEST_FINISHED);
}
/** Sends a request life cycle event to the listeners. */
void sendRequestEvent(Request<?> request, @RequestEvent int event) {
synchronized (mEventListeners) {
for (RequestEventListener listener : mEventListeners) {
listener.onRequestEvent(request, event);
}
}
}
/** Add a listener for request life cycle events. */
public void addRequestEventListener(RequestEventListener listener) {
synchronized (mEventListeners) {
mEventListeners.add(listener);
}
}
/** Remove a listener for request life cycle events. */
public void removeRequestEventListener(RequestEventListener listener) {
synchronized (mEventListeners) {
mEventListeners.remove(listener);
}
}
@Deprecated // Use RequestEventListener instead.
public <T> void addRequestFinishedListener(RequestFinishedListener<T> listener) {
synchronized (mFinishedListeners) {
mFinishedListeners.add(listener);
}
}
/** Remove a RequestFinishedListener. Has no effect if listener was not previously added. */
@Deprecated // Use RequestEventListener instead.
public <T> void removeRequestFinishedListener(RequestFinishedListener<T> listener) {
synchronized (mFinishedListeners) {
mFinishedListeners.remove(listener);
}
}
public ResponseDelivery getResponseDelivery() {
return mDelivery;
}
<T> void sendRequestOverNetwork(Request<T> request) {
mNetworkQueue.add(request);
}
}