blob: 90b242961dd4932e14f44a1e1325e3b8f6b86f74 [file] [log] [blame]
package com.intellij.util.io.socketConnection.impl;
import com.intellij.openapi.application.ApplicationManager;
import com.intellij.openapi.diagnostic.Logger;
import com.intellij.openapi.util.MultiValuesMap;
import com.intellij.openapi.util.Ref;
import com.intellij.util.Alarm;
import com.intellij.util.SmartList;
import com.intellij.util.io.socketConnection.*;
import gnu.trove.TIntObjectHashMap;
import gnu.trove.TIntObjectProcedure;
import gnu.trove.TObjectProcedure;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* @author nik
*/
public class ResponseProcessor<R extends AbstractResponse> {
private static final Logger LOG = Logger.getInstance("#com.intellij.util.io.socketConnection.impl.ResponseProcessor");
private final TIntObjectHashMap<AbstractResponseToRequestHandler<?>> myHandlers = new TIntObjectHashMap<AbstractResponseToRequestHandler<?>>();
private final MultiValuesMap<Class<? extends R>, AbstractResponseHandler<? extends R>> myClassHandlers = new MultiValuesMap<Class<? extends R>, AbstractResponseHandler<? extends R>>();
private final TIntObjectHashMap<TimeoutHandler> myTimeoutHandlers = new TIntObjectHashMap<TimeoutHandler>();
private boolean myStopped;
private final Object myLock = new Object();
private Thread myThread;
private final Alarm myTimeoutAlarm;
public ResponseProcessor(SocketConnection<?, R> connection) {
myTimeoutAlarm = new Alarm(Alarm.ThreadToUse.SHARED_THREAD, connection);
}
public void startReading(final ResponseReader<R> reader) {
ApplicationManager.getApplication().executeOnPooledThread(new Runnable() {
public void run() {
myThread = Thread.currentThread();
try {
while (true) {
final R r = reader.readResponse();
if (r == null) break;
if (r instanceof ResponseToRequest) {
final int requestId = ((ResponseToRequest)r).getRequestId();
processResponse(requestId, r);
}
else {
processResponse(r);
}
}
}
catch (InterruptedException ignored) {
}
catch (IOException e) {
LOG.info(e);
}
finally {
synchronized (myLock) {
myStopped = true;
}
}
}
});
}
private void processResponse(int requestId, R response) {
synchronized (myLock) {
myTimeoutHandlers.remove(requestId);
}
final AbstractResponseToRequestHandler handler;
synchronized (myLock) {
handler = myHandlers.remove(requestId);
if (handler == null) return;
}
//noinspection unchecked
if (!handler.processResponse(response)) {
synchronized (myLock) {
myHandlers.put(requestId, handler);
}
}
}
private void processResponse(R response) throws IOException {
//noinspection unchecked
final Class<R> responseClass = (Class<R>)response.getClass();
List<AbstractResponseHandler<?>> handlers;
synchronized (myLock) {
final Collection<AbstractResponseHandler<? extends R>> responseHandlers = myClassHandlers.get(responseClass);
if (responseHandlers == null) return;
handlers = new SmartList<AbstractResponseHandler<?>>(responseHandlers);
}
for (AbstractResponseHandler handler : handlers) {
//noinspection unchecked
handler.processResponse(response);
}
}
public void stopReading() {
synchronized (myLock) {
if (myStopped) return;
myStopped = true;
}
if (myThread != null) {
myThread.interrupt();
}
}
public <T extends R> void registerHandler(@NotNull Class<T> responseClass, @NotNull AbstractResponseHandler<T> handler) {
synchronized (myLock) {
myClassHandlers.put(responseClass, handler);
}
}
public void registerHandler(int id, @NotNull AbstractResponseToRequestHandler<?> handler) {
synchronized (myLock) {
myHandlers.put(id, handler);
}
}
public void checkTimeout() {
LOG.debug("Checking timeout");
final List<TimeoutHandler> timedOut = new ArrayList<TimeoutHandler>();
synchronized (myLock) {
final long time = System.currentTimeMillis();
myTimeoutHandlers.retainEntries(new TIntObjectProcedure<TimeoutHandler>() {
public boolean execute(int a, TimeoutHandler b) {
if (time > b.myLastTime) {
timedOut.add(b);
return false;
}
return true;
}
});
}
for (TimeoutHandler handler : timedOut) {
LOG.debug("performing timeout action: " + handler.myAction);
handler.myAction.run();
}
scheduleTimeoutCheck();
}
private void scheduleTimeoutCheck() {
final Ref<Long> nextTime = Ref.create(Long.MAX_VALUE);
synchronized (myLock) {
if (myTimeoutHandlers.isEmpty()) return;
myTimeoutHandlers.forEachValue(new TObjectProcedure<TimeoutHandler>() {
public boolean execute(TimeoutHandler handler) {
nextTime.set(Math.min(nextTime.get(), handler.myLastTime));
return true;
}
});
}
final int delay = (int)(nextTime.get() - System.currentTimeMillis() + 100);
LOG.debug("schedule timeout check in " + delay + "ms");
if (delay > 10) {
myTimeoutAlarm.cancelAllRequests();
myTimeoutAlarm.addRequest(new Runnable() {
public void run() {
checkTimeout();
}
}, delay);
}
else {
checkTimeout();
}
}
public void registerTimeoutHandler(int commandId, int timeout, Runnable onTimeout) {
synchronized (myLock) {
myTimeoutHandlers.put(commandId, new TimeoutHandler(onTimeout, System.currentTimeMillis() + timeout));
}
scheduleTimeoutCheck();
}
private static class TimeoutHandler {
private final Runnable myAction;
private final long myLastTime;
private TimeoutHandler(Runnable action, long lastTime) {
myAction = action;
myLastTime = lastTime;
}
}
}