| /* |
| * Copyright 2000-2012 JetBrains s.r.o. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.jetbrains.jps.client; |
| |
| import com.google.protobuf.MessageLite; |
| import com.intellij.openapi.diagnostic.Logger; |
| import io.netty.bootstrap.Bootstrap; |
| import io.netty.channel.*; |
| import io.netty.channel.nio.NioEventLoopGroup; |
| import io.netty.channel.socket.nio.NioSocketChannel; |
| import io.netty.handler.codec.protobuf.ProtobufDecoder; |
| import io.netty.handler.codec.protobuf.ProtobufEncoder; |
| import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; |
| import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; |
| import org.jetbrains.annotations.Nullable; |
| import org.jetbrains.jps.api.RequestFuture; |
| |
| import java.util.UUID; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| /** |
| * @author Eugene Zhuravlev |
| * Date: 1/22/12 |
| */ |
| public class SimpleProtobufClient<T extends ProtobufResponseHandler> { |
| private static final Logger LOG = Logger.getInstance("#org.jetbrains.jps.client.SimpleProtobufClient"); |
| |
| private enum State { |
| DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING |
| } |
| |
| private final AtomicReference<State> myState = new AtomicReference<State>(State.DISCONNECTED); |
| protected final ChannelInitializer myChannelInitializer; |
| protected final EventLoopGroup myEventLoopGroup; |
| protected volatile ChannelFuture myConnectFuture; |
| private final ProtobufClientMessageHandler<T> myMessageHandler; |
| |
| public SimpleProtobufClient(final MessageLite msgDefaultInstance, final Executor asyncExec, final UUIDGetter uuidGetter) { |
| myMessageHandler = new ProtobufClientMessageHandler<T>(uuidGetter, this, asyncExec); |
| myEventLoopGroup = new NioEventLoopGroup(1, asyncExec); |
| myChannelInitializer = new ChannelInitializer() { |
| @Override |
| protected void initChannel(Channel channel) throws Exception { |
| channel.pipeline().addLast(new ProtobufVarint32FrameDecoder(), |
| new ProtobufDecoder(msgDefaultInstance), |
| new ProtobufVarint32LengthFieldPrepender(), |
| new ProtobufEncoder(), |
| myMessageHandler); |
| } |
| }; |
| } |
| |
| public final void checkConnected() throws Exception { |
| if (myState.get() != State.CONNECTED) { |
| throw new Exception("Client not connected"); |
| } |
| } |
| |
| public final boolean connect(final String host, final int port) throws Throwable { |
| if (myState.compareAndSet(State.DISCONNECTED, State.CONNECTING)) { |
| boolean success = false; |
| |
| try { |
| final Bootstrap bootstrap = new Bootstrap().group(myEventLoopGroup).channel(NioSocketChannel.class).handler(myChannelInitializer); |
| bootstrap.option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true); |
| final ChannelFuture future = bootstrap.connect(host, port).syncUninterruptibly(); |
| success = future.isSuccess(); |
| if (success) { |
| myConnectFuture = future; |
| try { |
| onConnect(); |
| } |
| catch (Throwable e) { |
| LOG.error(e); |
| } |
| } |
| return success; |
| } |
| finally { |
| myState.compareAndSet(State.CONNECTING, success? State.CONNECTED : State.DISCONNECTED); |
| } |
| } |
| // already connected |
| return true; |
| } |
| |
| protected void onConnect() { |
| } |
| protected void beforeDisconnect() { |
| } |
| protected void onDisconnect() { |
| } |
| |
| public final void disconnect() { |
| if (myState.compareAndSet(State.CONNECTED, State.DISCONNECTING)) { |
| try { |
| final ChannelFuture future = myConnectFuture; |
| if (future != null) { |
| try { |
| beforeDisconnect(); |
| } |
| catch (Throwable e) { |
| LOG.error(e); |
| } |
| final ChannelFuture closeFuture = future.channel().close(); |
| closeFuture.awaitUninterruptibly(); |
| } |
| } |
| finally { |
| myConnectFuture = null; |
| myState.compareAndSet(State.DISCONNECTING, State.DISCONNECTED); |
| try { |
| onDisconnect(); |
| } |
| catch (Throwable e) { |
| LOG.error(e); |
| } |
| } |
| } |
| } |
| |
| public final boolean isConnected() { |
| return myState.get() == State.CONNECTED; |
| } |
| |
| public final RequestFuture<T> sendMessage(final UUID messageId, MessageLite message, @Nullable final T responseHandler, @Nullable final RequestFuture.CancelAction<T> cancelAction) { |
| final RequestFuture<T> requestFuture = new RequestFuture<T>(responseHandler, messageId, cancelAction); |
| myMessageHandler.registerFuture(messageId, requestFuture); |
| final ChannelFuture connectFuture = myConnectFuture; |
| final Channel channel = connectFuture != null? connectFuture.channel() : null; |
| if (channel != null && channel.isActive()) { |
| channel.writeAndFlush(message).addListener(new ChannelFutureListener() { |
| @Override |
| public void operationComplete(ChannelFuture future) throws Exception { |
| if (!future.isSuccess()) { |
| notifyTerminated(messageId, requestFuture, responseHandler); |
| } |
| } |
| }); |
| } |
| else { |
| notifyTerminated(messageId, requestFuture, responseHandler); |
| } |
| return requestFuture; |
| } |
| |
| private void notifyTerminated(UUID messageId, RequestFuture<T> requestFuture, @Nullable T responseHandler) { |
| try { |
| myMessageHandler.removeFuture(messageId); |
| if (responseHandler != null) { |
| responseHandler.sessionTerminated(); |
| } |
| } |
| finally { |
| requestFuture.setDone(); |
| } |
| } |
| } |