blob: cd93374e47950493af16dcd555347d5c3e0199dd [file] [log] [blame]
/*
* Copyright (C) 2014 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dexfuzz;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
/**
* process.waitFor() can block if its output buffers are not drained.
* These threads are used to keep the buffers drained, and provide the final
* output once the command has finished executing. Each Executor has its own
* output and error StreamConsumers.
*/
public class StreamConsumer extends Thread {
private List<String> output;
private BufferedReader reader;
private State state;
private Semaphore workToBeDone;
private Semaphore outputIsReady;
enum State {
WAITING,
CONSUMING,
SHOULD_STOP_CONSUMING,
FINISHED,
ERROR
}
/**
* Create a StreamConsumer, will be immediately ready to start consuming.
*/
public StreamConsumer() {
output = new ArrayList<String>();
workToBeDone = new Semaphore(0);
outputIsReady = new Semaphore(0);
state = State.WAITING;
}
/**
* Executor should call this to provide its StreamConsumers with the Streams
* for a Process it is about to call waitFor() on.
*/
public void giveStreamAndStartConsuming(InputStream stream) {
output.clear();
reader = new BufferedReader(new InputStreamReader(stream));
changeState(State.CONSUMING, State.WAITING);
// Tell consumer there is work to be done.
workToBeDone.release();
}
/**
* Executor should call this once its call to waitFor() returns.
*/
public void processFinished() {
changeState(State.SHOULD_STOP_CONSUMING, State.CONSUMING);
}
/**
* Executor should call this to get the captured output of this StreamConsumer.
*/
public List<String> getOutput() {
try {
// Wait until the output is ready.
outputIsReady.acquire();
} catch (InterruptedException e) {
Log.error("Client of StreamConsumer was interrupted while waiting for output?");
return null;
}
// Take a copy of the Strings, so when we call output.clear(), we don't
// clear the ExecutionResult's list.
List<String> copy = new ArrayList<String>(output);
return copy;
}
/**
* Executor should call this when we're shutting down.
*/
public void shutdown() {
changeState(State.FINISHED, State.WAITING);
// Tell Consumer there is work to be done (it will check first if FINISHED has been set.)
workToBeDone.release();
}
private void consume() {
try {
if (checkState(State.SHOULD_STOP_CONSUMING)) {
// Caller already called processFinished() before we even started
// consuming. Just get what we can and finish.
while (reader.ready()) {
output.add(reader.readLine());
}
} else {
// Caller's process is still executing, so just loop and consume.
while (checkState(State.CONSUMING)) {
Thread.sleep(50);
while (reader.ready()) {
output.add(reader.readLine());
}
}
}
if (checkState(State.SHOULD_STOP_CONSUMING)) {
changeState(State.WAITING, State.SHOULD_STOP_CONSUMING);
} else {
Log.error("StreamConsumer stopped consuming, but was not told to?");
setErrorState();
}
reader.close();
} catch (IOException e) {
Log.error("StreamConsumer caught IOException while consuming");
setErrorState();
} catch (InterruptedException e) {
Log.error("StreamConsumer caught InterruptedException while consuming");
setErrorState();
}
// Tell client of Consumer that the output is ready.
outputIsReady.release();
}
@Override
public void run() {
while (checkState(State.WAITING)) {
try {
// Wait until there is work to be done
workToBeDone.acquire();
} catch (InterruptedException e) {
Log.error("StreamConsumer caught InterruptedException while waiting for work");
setErrorState();
break;
}
// Check first if we're done
if (checkState(State.FINISHED)) {
break;
}
// Make sure we're either supposed to be consuming
// or supposed to be finishing up consuming
if (!(checkState(State.CONSUMING) || checkState(State.SHOULD_STOP_CONSUMING))) {
Log.error("invalid state: StreamConsumer told about work, but not CONSUMING?");
Log.error("state was: " + getCurrentState());
setErrorState();
break;
}
consume();
}
}
private synchronized boolean checkState(State expectedState) {
return (expectedState == state);
}
private synchronized void changeState(State newState, State previousState) {
if (state != previousState) {
Log.error("StreamConsumer Unexpected state: " + state + ", expected " + previousState);
state = State.ERROR;
} else {
state = newState;
}
}
private synchronized void setErrorState() {
state = State.ERROR;
}
private synchronized State getCurrentState() {
return state;
}
}