blob: f7a29497e1c211cbbe77db95f9d5224854fea473 [file] [log] [blame]
package org.jetbrains.rpc;
import com.intellij.openapi.diagnostic.Logger;
import com.intellij.util.containers.ConcurrentIntObjectMap;
import com.intellij.util.containers.StripedLockIntObjectConcurrentHashMap;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.util.Arrays;
/**
* @param <OUTGOING> type of outgoing message
* @param <INCOMING> type of incoming message
* @param <INCOMING_WITH_SEQ> type of incoming message that is a command (has sequence number)
*/
public final class MessageManager<OUTGOING, INCOMING, INCOMING_WITH_SEQ, SUCCESS, ERROR_DETAILS> {
public static final Logger LOG = Logger.getInstance(MessageManager.class);
private final ConcurrentIntObjectMap<AsyncResultCallback<SUCCESS, ERROR_DETAILS>> callbackMap = new StripedLockIntObjectConcurrentHashMap<AsyncResultCallback<SUCCESS, ERROR_DETAILS>>();
private final Handler<OUTGOING, INCOMING, INCOMING_WITH_SEQ, SUCCESS, ERROR_DETAILS> handler;
private volatile boolean closed;
public MessageManager(Handler<OUTGOING, INCOMING, INCOMING_WITH_SEQ, SUCCESS, ERROR_DETAILS> handler) {
this.handler = handler;
}
public interface Handler<OUTGOING, INCOMING, INCOMING_WITH_SEQ, SUCCESS, ERROR_DETAILS> {
int getUpdatedSequence(OUTGOING message);
boolean write(@NotNull OUTGOING message) throws IOException;
INCOMING_WITH_SEQ readIfHasSequence(INCOMING incoming);
int getSequence(INCOMING_WITH_SEQ incomingWithSeq);
void acceptNonSequence(INCOMING incoming);
void call(INCOMING_WITH_SEQ response, AsyncResultCallback<SUCCESS, ERROR_DETAILS> callback);
}
public void send(@NotNull OUTGOING message, @NotNull AsyncResultCallback<SUCCESS, ERROR_DETAILS> callback) {
if (closed) {
callback.onError("Connection is closed", null);
return;
}
int sequence = handler.getUpdatedSequence(message);
callbackMap.put(sequence, callback);
doSend(message, sequence);
}
private void doSend(@NotNull OUTGOING message, int sequence) {
boolean success;
try {
success = handler.write(message);
}
catch (Throwable e) {
try {
failedToSend(sequence);
}
finally {
LOG.error("Failed to send", e);
}
return;
}
if (!success) {
failedToSend(sequence);
}
}
private void failedToSend(int sequence) {
AsyncResultCallback<SUCCESS, ERROR_DETAILS> callback = callbackMap.remove(sequence);
if (callback != null) {
callback.onError("Failed to send", null);
}
}
public void processIncoming(INCOMING incomingParsed) {
INCOMING_WITH_SEQ commandResponse = handler.readIfHasSequence(incomingParsed);
if (commandResponse == null) {
if (closed) {
// just ignore
LOG.info("Connection closed, ignore incoming");
}
else {
handler.acceptNonSequence(incomingParsed);
}
return;
}
AsyncResultCallback<SUCCESS, ERROR_DETAILS> callback = getCallbackAndRemove(handler.getSequence(commandResponse));
try {
if (closed) {
callback.onError("Connection closed", null);
}
else {
handler.call(commandResponse, callback);
}
}
catch (Throwable e) {
callback.onError("Failed to dispatch response to callback", null);
LOG.error("Failed to dispatch response to callback", e);
}
}
public AsyncResultCallback<SUCCESS, ERROR_DETAILS> getCallbackAndRemove(int id) {
AsyncResultCallback<SUCCESS, ERROR_DETAILS> callback = callbackMap.remove(id);
if (callback == null) {
throw new IllegalArgumentException("Cannot find callback with id " + id);
}
return callback;
}
public void closed() {
closed = true;
}
public void cancelWaitingRequests() {
// we should call them in the order they have been submitted
ConcurrentIntObjectMap<AsyncResultCallback<SUCCESS, ERROR_DETAILS>> map = callbackMap;
int[] keys = map.keys();
Arrays.sort(keys);
for (int key : keys) {
try {
AsyncResultCallback<SUCCESS, ERROR_DETAILS> callback = map.get(key);
if (callback != null) {
callback.onError("Connection closed", null);
}
}
catch (Throwable e) {
LOG.error("Failed to reject callback on connection closed", e);
}
}
}
}