blob: 9dc3d6a81338b79ad59b37a3b0ba140ae61603f2 [file] [log] [blame]
/*
* Copyright (C) 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.android.server.wm;
import android.os.Process;
import android.os.SystemClock;
import android.util.Slog;
import com.android.internal.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.function.Predicate;
/**
* The common threading logic for persisters to use so that they can run in the same threads.
* Methods in this class are synchronized on its instance, so caller could also synchronize on
* its instance to perform modifications in items.
*/
class PersisterQueue {
private static final String TAG = "PersisterQueue";
private static final boolean DEBUG = false;
/** When not flushing don't write out files faster than this */
private static final long INTER_WRITE_DELAY_MS = 500;
/**
* When not flushing delay this long before writing the first file out. This gives the next task
* being launched a chance to load its resources without this occupying IO bandwidth.
*/
private static final long PRE_TASK_DELAY_MS = 3000;
/** The maximum number of entries to keep in the queue before draining it automatically. */
private static final int MAX_WRITE_QUEUE_LENGTH = 6;
/** Special value for mWriteTime to mean don't wait, just write */
private static final long FLUSH_QUEUE = -1;
/** An {@link WriteQueueItem} that doesn't do anything. Used to trigger {@link
* Listener#onPreProcessItem}. */
static final WriteQueueItem EMPTY_ITEM = () -> { };
private final long mInterWriteDelayMs;
private final long mPreTaskDelayMs;
private final LazyTaskWriterThread mLazyTaskWriterThread;
private final ArrayList<WriteQueueItem> mWriteQueue = new ArrayList<>();
private final ArrayList<Listener> mListeners = new ArrayList<>();
/**
* Value determines write delay mode as follows: < 0 We are Flushing. No delays between writes
* until the image queue is drained and all tasks needing persisting are written to disk. There
* is no delay between writes. == 0 We are Idle. Next writes will be delayed by
* #PRE_TASK_DELAY_MS. > 0 We are Actively writing. Next write will be at this time. Subsequent
* writes will be delayed by #INTER_WRITE_DELAY_MS.
*/
private long mNextWriteTime = 0;
PersisterQueue() {
this(INTER_WRITE_DELAY_MS, PRE_TASK_DELAY_MS);
}
/** Used for tests to reduce waiting time. */
@VisibleForTesting
PersisterQueue(long interWriteDelayMs, long preTaskDelayMs) {
if (interWriteDelayMs < 0 || preTaskDelayMs < 0) {
throw new IllegalArgumentException("Both inter-write delay and pre-task delay need to"
+ "be non-negative. inter-write delay: " + interWriteDelayMs
+ "ms pre-task delay: " + preTaskDelayMs);
}
mInterWriteDelayMs = interWriteDelayMs;
mPreTaskDelayMs = preTaskDelayMs;
mLazyTaskWriterThread = new LazyTaskWriterThread("LazyTaskWriterThread");
}
synchronized void startPersisting() {
if (!mLazyTaskWriterThread.isAlive()) {
mLazyTaskWriterThread.start();
}
}
/** Stops persisting thread. Should only be used in tests. */
@VisibleForTesting
void stopPersisting() throws InterruptedException {
if (!mLazyTaskWriterThread.isAlive()) {
return;
}
synchronized (this) {
mLazyTaskWriterThread.interrupt();
}
mLazyTaskWriterThread.join();
}
synchronized void addItem(WriteQueueItem item, boolean flush) {
mWriteQueue.add(item);
if (flush || mWriteQueue.size() > MAX_WRITE_QUEUE_LENGTH) {
mNextWriteTime = FLUSH_QUEUE;
} else if (mNextWriteTime == 0) {
mNextWriteTime = SystemClock.uptimeMillis() + mPreTaskDelayMs;
}
notify();
}
synchronized <T extends WriteQueueItem> T findLastItem(Predicate<T> predicate, Class<T> clazz) {
for (int i = mWriteQueue.size() - 1; i >= 0; --i) {
WriteQueueItem writeQueueItem = mWriteQueue.get(i);
if (clazz.isInstance(writeQueueItem)) {
T item = clazz.cast(writeQueueItem);
if (predicate.test(item)) {
return item;
}
}
}
return null;
}
/**
* Updates the last item found in the queue that matches the given item, or adds it to the end
* of the queue if no such item is found.
*/
synchronized <T extends WriteQueueItem> void updateLastOrAddItem(T item, boolean flush) {
final T itemToUpdate = findLastItem(item::matches, (Class<T>) item.getClass());
if (itemToUpdate == null) {
addItem(item, flush);
} else {
itemToUpdate.updateFrom(item);
}
yieldIfQueueTooDeep();
}
/**
* Removes all items with which given predicate returns {@code true}.
*/
synchronized <T extends WriteQueueItem> void removeItems(Predicate<T> predicate,
Class<T> clazz) {
for (int i = mWriteQueue.size() - 1; i >= 0; --i) {
WriteQueueItem writeQueueItem = mWriteQueue.get(i);
if (clazz.isInstance(writeQueueItem)) {
T item = clazz.cast(writeQueueItem);
if (predicate.test(item)) {
if (DEBUG) Slog.d(TAG, "Removing " + item + " from write queue.");
mWriteQueue.remove(i);
}
}
}
}
synchronized void flush() {
mNextWriteTime = FLUSH_QUEUE;
notifyAll();
do {
try {
wait();
} catch (InterruptedException e) {
}
} while (mNextWriteTime == FLUSH_QUEUE);
}
void yieldIfQueueTooDeep() {
boolean stall = false;
synchronized (this) {
if (mNextWriteTime == FLUSH_QUEUE) {
stall = true;
}
}
if (stall) {
Thread.yield();
}
}
void addListener(Listener listener) {
mListeners.add(listener);
}
@VisibleForTesting
boolean removeListener(Listener listener) {
return mListeners.remove(listener);
}
private void processNextItem() throws InterruptedException {
// This part is extracted into a method so that the GC can clearly see the end of the
// scope of the variable 'item'. If this part was in the loop in LazyTaskWriterThread, the
// last item it processed would always "leak".
// See https://b.corp.google.com/issues/64438652#comment7
// If mNextWriteTime, then don't delay between each call to saveToXml().
final WriteQueueItem item;
synchronized (this) {
if (mNextWriteTime != FLUSH_QUEUE) {
// The next write we don't have to wait so long.
mNextWriteTime = SystemClock.uptimeMillis() + mInterWriteDelayMs;
if (DEBUG) {
Slog.d(TAG, "Next write time may be in " + mInterWriteDelayMs
+ " msec. (" + mNextWriteTime + ")");
}
}
while (mWriteQueue.isEmpty()) {
if (mNextWriteTime != 0) {
mNextWriteTime = 0; // idle.
notify(); // May need to wake up flush().
}
// Make sure we exit this thread correctly when interrupted before going to
// indefinite wait.
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
if (DEBUG) Slog.d(TAG, "LazyTaskWriter: waiting indefinitely.");
wait();
// Invariant: mNextWriteTime is either FLUSH_QUEUE or PRE_WRITE_DELAY_MS
// from now.
}
item = mWriteQueue.remove(0);
long now = SystemClock.uptimeMillis();
if (DEBUG) {
Slog.d(TAG, "LazyTaskWriter: now=" + now + " mNextWriteTime=" + mNextWriteTime
+ " mWriteQueue.size=" + mWriteQueue.size());
}
while (now < mNextWriteTime) {
if (DEBUG) {
Slog.d(TAG, "LazyTaskWriter: waiting " + (mNextWriteTime - now));
}
wait(mNextWriteTime - now);
now = SystemClock.uptimeMillis();
}
// Got something to do.
}
item.process();
}
interface WriteQueueItem<T extends WriteQueueItem<T>> {
void process();
default void updateFrom(T item) {}
default boolean matches(T item) {
return false;
}
}
interface Listener {
/**
* Called before {@link PersisterQueue} tries to process next item.
*
* Note if the queue is empty, this callback will be called before the indefinite wait. This
* will be called once when {@link PersisterQueue} starts the internal thread before the
* indefinite wait.
*
* This callback is called w/o locking the instance of {@link PersisterQueue}.
*
* @param queueEmpty {@code true} if the queue is empty, which indicates {@link
* PersisterQueue} is likely to enter indefinite wait; or {@code false} if there is still
* item to process.
*/
void onPreProcessItem(boolean queueEmpty);
}
private class LazyTaskWriterThread extends Thread {
private LazyTaskWriterThread(String name) {
super(name);
}
@Override
public void run() {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
try {
while (true) {
final boolean probablyDone;
synchronized (PersisterQueue.this) {
probablyDone = mWriteQueue.isEmpty();
}
for (int i = mListeners.size() - 1; i >= 0; --i) {
mListeners.get(i).onPreProcessItem(probablyDone);
}
processNextItem();
}
} catch (InterruptedException e) {
Slog.e(TAG, "Persister thread is exiting. Should never happen in prod, but"
+ "it's OK in tests.");
}
}
}
}