blob: f53e1cda0ef37d10698e8a3fa623e66e812f3a5f [file] [log] [blame]
/*
* Copyright 2017 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Decompressor;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import javax.annotation.Nullable;
/**
* Sits between {@link AbstractStream.TransportState} and {@link MessageDeframer} to deframe in the
* client thread. Calls from the transport to the deframer are wrapped into an {@link
* InitializingMessageProducer} and given back to the transport, where they will be run on the
* client thread. Calls from the deframer back to the transport use {@link
* TransportExecutor#runOnTransportThread} to run on the transport thread.
*/
public class ApplicationThreadDeframer implements Deframer {
interface TransportExecutor extends ApplicationThreadDeframerListener.TransportExecutor {}
private final MessageDeframer.Listener storedListener;
private final ApplicationThreadDeframerListener appListener;
private final MessageDeframer deframer;
ApplicationThreadDeframer(
MessageDeframer.Listener listener,
TransportExecutor transportExecutor,
MessageDeframer deframer) {
this.storedListener =
new SquelchLateMessagesAvailableDeframerListener(checkNotNull(listener, "listener"));
this.appListener = new ApplicationThreadDeframerListener(storedListener, transportExecutor);
deframer.setListener(appListener);
this.deframer = deframer;
}
@Override
public void setMaxInboundMessageSize(int messageSize) {
deframer.setMaxInboundMessageSize(messageSize);
}
@Override
public void setDecompressor(Decompressor decompressor) {
deframer.setDecompressor(decompressor);
}
@Override
public void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) {
deframer.setFullStreamDecompressor(fullStreamDecompressor);
}
@Override
public void request(final int numMessages) {
storedListener.messagesAvailable(
new InitializingMessageProducer(
new Runnable() {
@Override
public void run() {
if (deframer.isClosed()) {
return;
}
try {
deframer.request(numMessages);
} catch (Throwable t) {
appListener.deframeFailed(t);
deframer.close(); // unrecoverable state
}
}
}));
}
@Override
public void deframe(final ReadableBuffer data) {
storedListener.messagesAvailable(
new CloseableInitializingMessageProducer(
new Runnable() {
@Override
public void run() {
try {
deframer.deframe(data);
} catch (Throwable t) {
appListener.deframeFailed(t);
deframer.close(); // unrecoverable state
}
}
},
new Closeable() {
@Override
public void close() {
data.close();
}
}));
}
@Override
public void closeWhenComplete() {
storedListener.messagesAvailable(
new InitializingMessageProducer(
new Runnable() {
@Override
public void run() {
deframer.closeWhenComplete();
}
}));
}
@Override
public void close() {
deframer.stopDelivery();
storedListener.messagesAvailable(
new InitializingMessageProducer(
new Runnable() {
@Override
public void run() {
deframer.close();
}
}));
}
@VisibleForTesting
MessageDeframer.Listener getAppListener() {
return appListener;
}
private class InitializingMessageProducer implements StreamListener.MessageProducer {
private final Runnable runnable;
private boolean initialized = false;
private InitializingMessageProducer(Runnable runnable) {
this.runnable = runnable;
}
private void initialize() {
if (!initialized) {
runnable.run();
initialized = true;
}
}
@Nullable
@Override
public InputStream next() {
initialize();
return appListener.messageReadQueuePoll();
}
}
private class CloseableInitializingMessageProducer extends InitializingMessageProducer
implements Closeable {
private final Closeable closeable;
public CloseableInitializingMessageProducer(Runnable runnable, Closeable closeable) {
super(runnable);
this.closeable = closeable;
}
@Override
public void close() throws IOException {
closeable.close();
}
}
}