blob: b96d1883f8dc62b9fbd27e23b728743829e44c1a [file] [log] [blame]
/*
* Copyright 2009 Mike Cumings
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.kenai.jbosh;
import com.kenai.jbosh.ComposableBody.Builder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* BOSH Client session instance. Each communication session with a remote
* connection manager is represented and handled by an instance of this
* class. This is the main entry point for client-side communications.
* To create a new session, a client configuration must first be created
* and then used to create a client instance:
* <pre>
* BOSHClientConfig cfg = BOSHClientConfig.Builder.create(
* "http://server:1234/httpbind", "jabber.org")
* .setFrom("user@jabber.org")
* .build();
* BOSHClient client = BOSHClient.create(cfg);
* </pre>
* Additional client configuration options are available. See the
* {@code BOSHClientConfig.Builder} class for more information.
* <p/>
* Once a {@code BOSHClient} instance has been created, communication with
* the remote connection manager can begin. No attempt will be made to
* establish a connection to the connection manager until the first call
* is made to the {@code send(ComposableBody)} method. Note that it is
* possible to send an empty body to cause an immediate connection attempt
* to the connection manager. Sending an empty message would look like
* the following:
* <pre>
* client.send(ComposableBody.builder().build());
* </pre>
* For more information on creating body messages with content, see the
* {@code ComposableBody.Builder} class documentation.
* <p/>
* Once a session has been successfully started, the client instance can be
* used to send arbitrary payload data. All aspects of the BOSH
* protocol involving setting and processing attributes in the BOSH
* namespace will be handled by the client code transparently and behind the
* scenes. The user of the client instance can therefore concentrate
* entirely on the content of the message payload, leaving the semantics of
* the BOSH protocol to the client implementation.
* <p/>
* To be notified of incoming messages from the remote connection manager,
* a {@code BOSHClientResponseListener} should be added to the client instance.
* All incoming messages will be published to all response listeners as they
* arrive and are processed. As with the transmission of payload data via
* the {@code send(ComposableBody)} method, there is no need to worry about
* handling of the BOSH attributes, since this is handled behind the scenes.
* <p/>
* If the connection to the remote connection manager is terminated (either
* explicitly or due to a terminal condition of some sort), all connection
* listeners will be notified. After the connection has been closed, the
* client instance is considered dead and a new one must be created in order
* to resume communications with the remote server.
* <p/>
* Instances of this class are thread-safe.
*
* @see BOSHClientConfig.Builder
* @see BOSHClientResponseListener
* @see BOSHClientConnListener
* @see ComposableBody.Builder
*/
public final class BOSHClient {
/**
* Logger.
*/
private static final Logger LOG = Logger.getLogger(
BOSHClient.class.getName());
/**
* Value of the 'type' attribute used for session termination.
*/
private static final String TERMINATE = "terminate";
/**
* Value of the 'type' attribute used for recoverable errors.
*/
private static final String ERROR = "error";
/**
* Message to use for interrupted exceptions.
*/
private static final String INTERRUPTED = "Interrupted";
/**
* Message used for unhandled exceptions.
*/
private static final String UNHANDLED = "Unhandled Exception";
/**
* Message used whena null listener is detected.
*/
private static final String NULL_LISTENER = "Listener may not b enull";
/**
* Default empty request delay.
*/
private static final int DEFAULT_EMPTY_REQUEST_DELAY = 100;
/**
* Amount of time to wait before sending an empty request, in
* milliseconds.
*/
private static final int EMPTY_REQUEST_DELAY = Integer.getInteger(
BOSHClient.class.getName() + ".emptyRequestDelay",
DEFAULT_EMPTY_REQUEST_DELAY);
/**
* Default value for the pause margin.
*/
private static final int DEFAULT_PAUSE_MARGIN = 500;
/**
* The amount of time in milliseconds which will be reserved as a
* safety margin when scheduling empty requests against a maxpause
* value. This should give us enough time to build the message
* and transport it to the remote host.
*/
private static final int PAUSE_MARGIN = Integer.getInteger(
BOSHClient.class.getName() + ".pauseMargin",
DEFAULT_PAUSE_MARGIN);
/**
* Flag indicating whether or not we want to perform assertions.
*/
private static final boolean ASSERTIONS;
/**
* Connection listeners.
*/
private final Set<BOSHClientConnListener> connListeners =
new CopyOnWriteArraySet<BOSHClientConnListener>();
/**
* Request listeners.
*/
private final Set<BOSHClientRequestListener> requestListeners =
new CopyOnWriteArraySet<BOSHClientRequestListener>();
/**
* Response listeners.
*/
private final Set<BOSHClientResponseListener> responseListeners =
new CopyOnWriteArraySet<BOSHClientResponseListener>();
/**
* Lock instance.
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* Condition indicating that there are messages to be exchanged.
*/
private final Condition notEmpty = lock.newCondition();
/**
* Condition indicating that there are available slots for sending
* messages.
*/
private final Condition notFull = lock.newCondition();
/**
* Condition indicating that there are no outstanding connections.
*/
private final Condition drained = lock.newCondition();
/**
* Session configuration.
*/
private final BOSHClientConfig cfg;
/**
* Processor thread runnable instance.
*/
private final Runnable procRunnable = new Runnable() {
/**
* Process incoming messages.
*/
public void run() {
processMessages();
}
};
/**
* Processor thread runnable instance.
*/
private final Runnable emptyRequestRunnable = new Runnable() {
/**
* Process incoming messages.
*/
public void run() {
sendEmptyRequest();
}
};
/**
* HTTPSender instance.
*/
private final HTTPSender httpSender =
new ApacheHTTPSender();
/**
* Storage for test hook implementation.
*/
private final AtomicReference<ExchangeInterceptor> exchInterceptor =
new AtomicReference<ExchangeInterceptor>();
/**
* Request ID sequence to use for the session.
*/
private final RequestIDSequence requestIDSeq = new RequestIDSequence();
/**
* ScheduledExcecutor to use for deferred tasks.
*/
private final ScheduledExecutorService schedExec =
Executors.newSingleThreadScheduledExecutor();
/************************************************************
* The following vars must be accessed via the lock instance.
*/
/**
* Thread which is used to process responses from the connection
* manager. Becomes null when session is terminated.
*/
private Thread procThread;
/**
* Future for sending a deferred empty request, if needed.
*/
private ScheduledFuture emptyRequestFuture;
/**
* Connection Manager session parameters. Only available when in a
* connected state.
*/
private CMSessionParams cmParams;
/**
* List of active/outstanding requests.
*/
private Queue<HTTPExchange> exchanges = new LinkedList<HTTPExchange>();
/**
* Set of RIDs which have been received, for the purpose of sending
* response acknowledgements.
*/
private SortedSet<Long> pendingResponseAcks = new TreeSet<Long>();
/**
* The highest RID that we've already received a response for. This value
* is used to implement response acks.
*/
private Long responseAck = Long.valueOf(-1L);
/**
* List of requests which have been made but not yet acknowledged. This
* list remains unpopulated if the CM is not acking requests.
*/
private List<ComposableBody> pendingRequestAcks =
new ArrayList<ComposableBody>();
///////////////////////////////////////////////////////////////////////////
// Classes:
/**
* Class used in testing to dynamically manipulate received exchanges
* at test runtime.
*/
abstract static class ExchangeInterceptor {
/**
* Limit construction.
*/
ExchangeInterceptor() {
// Empty;
}
/**
* Hook to manipulate an HTTPExchange as is is about to be processed.
*
* @param exch original exchange that would be processed
* @return replacement exchange instance, or {@code null} to skip
* processing of this exchange
*/
abstract HTTPExchange interceptExchange(final HTTPExchange exch);
}
///////////////////////////////////////////////////////////////////////////
// Constructors:
/**
* Determine whether or not we should perform assertions. Assertions
* can be specified via system property explicitly, or defaulted to
* the JVM assertions status.
*/
static {
final String prop =
BOSHClient.class.getSimpleName() + ".assertionsEnabled";
boolean enabled = false;
if (System.getProperty(prop) == null) {
assert enabled = true;
} else {
enabled = Boolean.getBoolean(prop);
}
ASSERTIONS = enabled;
}
/**
* Prevent direct construction.
*/
private BOSHClient(final BOSHClientConfig sessCfg) {
cfg = sessCfg;
init();
}
///////////////////////////////////////////////////////////////////////////
// Public methods:
/**
* Create a new BOSH client session using the client configuration
* information provided.
*
* @param clientCfg session configuration
* @return BOSH session instance
*/
public static BOSHClient create(final BOSHClientConfig clientCfg) {
if (clientCfg == null) {
throw(new IllegalArgumentException(
"Client configuration may not be null"));
}
return new BOSHClient(clientCfg);
}
/**
* Get the client configuration that was used to create this client
* instance.
*
* @return client configuration
*/
public BOSHClientConfig getBOSHClientConfig() {
return cfg;
}
/**
* Adds a connection listener to the session.
*
* @param listener connection listener to add, if not already added
*/
public void addBOSHClientConnListener(
final BOSHClientConnListener listener) {
if (listener == null) {
throw(new IllegalArgumentException(NULL_LISTENER));
}
connListeners.add(listener);
}
/**
* Removes a connection listener from the session.
*
* @param listener connection listener to remove, if previously added
*/
public void removeBOSHClientConnListener(
final BOSHClientConnListener listener) {
if (listener == null) {
throw(new IllegalArgumentException(NULL_LISTENER));
}
connListeners.remove(listener);
}
/**
* Adds a request message listener to the session.
*
* @param listener request listener to add, if not already added
*/
public void addBOSHClientRequestListener(
final BOSHClientRequestListener listener) {
if (listener == null) {
throw(new IllegalArgumentException(NULL_LISTENER));
}
requestListeners.add(listener);
}
/**
* Removes a request message listener from the session, if previously
* added.
*
* @param listener instance to remove
*/
public void removeBOSHClientRequestListener(
final BOSHClientRequestListener listener) {
if (listener == null) {
throw(new IllegalArgumentException(NULL_LISTENER));
}
requestListeners.remove(listener);
}
/**
* Adds a response message listener to the session.
*
* @param listener response listener to add, if not already added
*/
public void addBOSHClientResponseListener(
final BOSHClientResponseListener listener) {
if (listener == null) {
throw(new IllegalArgumentException(NULL_LISTENER));
}
responseListeners.add(listener);
}
/**
* Removes a response message listener from the session, if previously
* added.
*
* @param listener instance to remove
*/
public void removeBOSHClientResponseListener(
final BOSHClientResponseListener listener) {
if (listener == null) {
throw(new IllegalArgumentException(NULL_LISTENER));
}
responseListeners.remove(listener);
}
/**
* Send the provided message data to the remote connection manager. The
* provided message body does not need to have any BOSH-specific attribute
* information set. It only needs to contain the actual message payload
* that should be delivered to the remote server.
* <p/>
* The first call to this method will result in a connection attempt
* to the remote connection manager. Subsequent calls to this method
* will block until the underlying session state allows for the message
* to be transmitted. In certain scenarios - such as when the maximum
* number of outbound connections has been reached - calls to this method
* will block for short periods of time.
*
* @param body message data to send to remote server
* @throws BOSHException on message transmission failure
*/
public void send(final ComposableBody body) throws BOSHException {
assertUnlocked();
if (body == null) {
throw(new IllegalArgumentException(
"Message body may not be null"));
}
HTTPExchange exch;
CMSessionParams params;
lock.lock();
try {
blockUntilSendable(body);
if (!isWorking() && !isTermination(body)) {
throw(new BOSHException(
"Cannot send message when session is closed"));
}
long rid = requestIDSeq.getNextRID();
ComposableBody request = body;
params = cmParams;
if (params == null && exchanges.isEmpty()) {
// This is the first message being sent
request = applySessionCreationRequest(rid, body);
} else {
request = applySessionData(rid, body);
if (cmParams.isAckingRequests()) {
pendingRequestAcks.add(request);
}
}
exch = new HTTPExchange(request);
exchanges.add(exch);
notEmpty.signalAll();
clearEmptyRequest();
} finally {
lock.unlock();
}
AbstractBody finalReq = exch.getRequest();
HTTPResponse resp = httpSender.send(params, finalReq);
exch.setHTTPResponse(resp);
fireRequestSent(finalReq);
}
/**
* Attempt to pause the current session. When supported by the remote
* connection manager, pausing the session will result in the connection
* manager closing out all outstanding requests (including the pause
* request) and increases the inactivity timeout of the session. The
* exact value of the temporary timeout is dependent upon the connection
* manager. This method should be used if a client encounters an
* exceptional temporary situation during which it will be unable to send
* requests to the connection manager for a period of time greater than
* the maximum inactivity period.
*
* The session will revert back to it's normal, unpaused state when the
* client sends it's next message.
*
* @return {@code true} if the connection manager supports session pausing,
* {@code false} if the connection manager does not support session
* pausing or if the session has not yet been established
*/
public boolean pause() {
assertUnlocked();
lock.lock();
AttrMaxPause maxPause = null;
try {
if (cmParams == null) {
return false;
}
maxPause = cmParams.getMaxPause();
if (maxPause == null) {
return false;
}
} finally {
lock.unlock();
}
try {
send(ComposableBody.builder()
.setAttribute(Attributes.PAUSE, maxPause.toString())
.build());
} catch (BOSHException boshx) {
LOG.log(Level.FINEST, "Could not send pause", boshx);
}
return true;
}
/**
* End the BOSH session by disconnecting from the remote BOSH connection
* manager.
*
* @throws BOSHException when termination message cannot be sent
*/
public void disconnect() throws BOSHException {
disconnect(ComposableBody.builder().build());
}
/**
* End the BOSH session by disconnecting from the remote BOSH connection
* manager, sending the provided content in the final connection
* termination message.
*
* @param msg final message to send
* @throws BOSHException when termination message cannot be sent
*/
public void disconnect(final ComposableBody msg) throws BOSHException {
if (msg == null) {
throw(new IllegalArgumentException(
"Message body may not be null"));
}
Builder builder = msg.rebuild();
builder.setAttribute(Attributes.TYPE, TERMINATE);
send(builder.build());
}
/**
* Forcibly close this client session instance. The preferred mechanism
* to close the connection is to send a disconnect message and wait for
* organic termination. Calling this method simply shuts down the local
* session without sending a termination message, releasing all resources
* associated with the session.
*/
public void close() {
dispose(new BOSHException("Session explicitly closed by caller"));
}
///////////////////////////////////////////////////////////////////////////
// Package-private methods:
/**
* Get the current CM session params.
*
* @return current session params, or {@code null}
*/
CMSessionParams getCMSessionParams() {
lock.lock();
try {
return cmParams;
} finally {
lock.unlock();
}
}
/**
* Wait until no more messages are waiting to be processed.
*/
void drain() {
lock.lock();
try {
LOG.finest("Waiting while draining...");
while (isWorking()
&& (emptyRequestFuture == null
|| emptyRequestFuture.isDone())) {
try {
drained.await();
} catch (InterruptedException intx) {
LOG.log(Level.FINEST, INTERRUPTED, intx);
}
}
LOG.finest("Drained");
} finally {
lock.unlock();
}
}
/**
* Test method used to forcibly discard next exchange.
*
* @param interceptor exchange interceptor
*/
void setExchangeInterceptor(final ExchangeInterceptor interceptor) {
exchInterceptor.set(interceptor);
}
///////////////////////////////////////////////////////////////////////////
// Private methods:
/**
* Initialize the session. This initializes the underlying HTTP
* transport implementation and starts the receive thread.
*/
private void init() {
assertUnlocked();
lock.lock();
try {
httpSender.init(cfg);
procThread = new Thread(procRunnable);
procThread.setDaemon(true);
procThread.setName(BOSHClient.class.getSimpleName()
+ "[" + System.identityHashCode(this)
+ "]: Receive thread");
procThread.start();
} finally {
lock.unlock();
}
}
/**
* Destroy this session.
*
* @param cause the reason for the session termination, or {@code null}
* for normal termination
*/
private void dispose(final Throwable cause) {
assertUnlocked();
lock.lock();
try {
if (procThread == null) {
// Already disposed
return;
}
procThread = null;
} finally {
lock.unlock();
}
if (cause == null) {
fireConnectionClosed();
} else {
fireConnectionClosedOnError(cause);
}
lock.lock();
try {
clearEmptyRequest();
exchanges = null;
cmParams = null;
pendingResponseAcks = null;
pendingRequestAcks = null;
notEmpty.signalAll();
notFull.signalAll();
drained.signalAll();
} finally {
lock.unlock();
}
httpSender.destroy();
schedExec.shutdownNow();
}
/**
* Determines if the message body specified indicates a request to
* pause the session.
*
* @param msg message to evaluate
* @return {@code true} if the message is a pause request, {@code false}
* otherwise
*/
private static boolean isPause(final AbstractBody msg) {
return msg.getAttribute(Attributes.PAUSE) != null;
}
/**
* Determines if the message body specified indicates a termination of
* the session.
*
* @param msg message to evaluate
* @return {@code true} if the message is a session termination,
* {@code false} otherwise
*/
private static boolean isTermination(final AbstractBody msg) {
return TERMINATE.equals(msg.getAttribute(Attributes.TYPE));
}
/**
* Evaluates the HTTP response code and response message and returns the
* terminal binding condition that it describes, if any.
*
* @param respCode HTTP response code
* @param respBody response body
* @return terminal binding condition, or {@code null} if not a terminal
* binding condition message
*/
private TerminalBindingCondition getTerminalBindingCondition(
final int respCode,
final AbstractBody respBody) {
assertLocked();
if (isTermination(respBody)) {
String str = respBody.getAttribute(Attributes.CONDITION);
return TerminalBindingCondition.forString(str);
}
// Check for deprecated HTTP Error Conditions
if (cmParams != null && cmParams.getVersion() == null) {
return TerminalBindingCondition.forHTTPResponseCode(respCode);
}
return null;
}
/**
* Determines if the message specified is immediately sendable or if it
* needs to block until the session state changes.
*
* @param msg message to evaluate
* @return {@code true} if the message can be immediately sent,
* {@code false} otherwise
*/
private boolean isImmediatelySendable(final AbstractBody msg) {
assertLocked();
if (cmParams == null) {
// block if we're waiting for a response to our first request
return exchanges.isEmpty();
}
AttrRequests requests = cmParams.getRequests();
if (requests == null) {
return true;
}
int maxRequests = requests.intValue();
if (exchanges.size() < maxRequests) {
return true;
}
if (exchanges.size() == maxRequests
&& (isTermination(msg) || isPause(msg))) {
// One additional terminate or pause message is allowed
return true;
}
return false;
}
/**
* Determines whether or not the session is still active.
*
* @return {@code true} if it is, {@code false} otherwise
*/
private boolean isWorking() {
assertLocked();
return procThread != null;
}
/**
* Blocks until either the message provided becomes immediately
* sendable or until the session is terminated.
*
* @param msg message to evaluate
*/
private void blockUntilSendable(final AbstractBody msg) {
assertLocked();
while (isWorking() && !isImmediatelySendable(msg)) {
try {
notFull.await();
} catch (InterruptedException intx) {
LOG.log(Level.FINEST, INTERRUPTED, intx);
}
}
}
/**
* Modifies the specified body message such that it becomes a new
* BOSH session creation request.
*
* @param rid request ID to use
* @param orig original body to modify
* @return modified message which acts as a session creation request
*/
private ComposableBody applySessionCreationRequest(
final long rid, final ComposableBody orig) throws BOSHException {
assertLocked();
Builder builder = orig.rebuild();
builder.setAttribute(Attributes.TO, cfg.getTo());
builder.setAttribute(Attributes.XML_LANG, cfg.getLang());
builder.setAttribute(Attributes.VER,
AttrVersion.getSupportedVersion().toString());
builder.setAttribute(Attributes.WAIT, "60");
builder.setAttribute(Attributes.HOLD, "1");
builder.setAttribute(Attributes.RID, Long.toString(rid));
applyRoute(builder);
applyFrom(builder);
builder.setAttribute(Attributes.ACK, "1");
// Make sure the following are NOT present (i.e., during retries)
builder.setAttribute(Attributes.SID, null);
return builder.build();
}
/**
* Applies routing information to the request message who's builder has
* been provided.
*
* @param builder builder instance to add routing information to
*/
private void applyRoute(final Builder builder) {
assertLocked();
String route = cfg.getRoute();
if (route != null) {
builder.setAttribute(Attributes.ROUTE, route);
}
}
/**
* Applies the local station ID information to the request message who's
* builder has been provided.
*
* @param builder builder instance to add station ID information to
*/
private void applyFrom(final Builder builder) {
assertLocked();
String from = cfg.getFrom();
if (from != null) {
builder.setAttribute(Attributes.FROM, from);
}
}
/**
* Applies existing session data to the outbound request, returning the
* modified request.
*
* This method assumes the lock is currently held.
*
* @param rid request ID to use
* @param orig original/raw request
* @return modified request with session information applied
*/
private ComposableBody applySessionData(
final long rid,
final ComposableBody orig) throws BOSHException {
assertLocked();
Builder builder = orig.rebuild();
builder.setAttribute(Attributes.SID,
cmParams.getSessionID().toString());
builder.setAttribute(Attributes.RID, Long.toString(rid));
applyResponseAcknowledgement(builder, rid);
return builder.build();
}
/**
* Sets the 'ack' attribute of the request to the value of the highest
* 'rid' of a request for which it has already received a response in the
* case where it has also received all responses associated with lower
* 'rid' values. The only exception is that, after its session creation
* request, the client SHOULD NOT include an 'ack' attribute in any request
* if it has received responses to all its previous requests.
*
* @param builder message builder
* @param rid current request RID
*/
private void applyResponseAcknowledgement(
final Builder builder,
final long rid) {
assertLocked();
if (responseAck.equals(Long.valueOf(-1L))) {
// We have not received any responses yet
return;
}
Long prevRID = Long.valueOf(rid - 1L);
if (responseAck.equals(prevRID)) {
// Implicit ack
return;
}
builder.setAttribute(Attributes.ACK, responseAck.toString());
}
/**
* While we are "connected", process received responses.
*
* This method is run in the processing thread.
*/
private void processMessages() {
LOG.log(Level.FINEST, "Processing thread starting");
try {
HTTPExchange exch;
do {
exch = nextExchange();
if (exch == null) {
break;
}
// Test hook to manipulate what the client sees:
ExchangeInterceptor interceptor = exchInterceptor.get();
if (interceptor != null) {
HTTPExchange newExch = interceptor.interceptExchange(exch);
if (newExch == null) {
LOG.log(Level.FINE, "Discarding exchange on request "
+ "of test hook: RID="
+ exch.getRequest().getAttribute(
Attributes.RID));
lock.lock();
try {
exchanges.remove(exch);
} finally {
lock.unlock();
}
continue;
}
exch = newExch;
}
processExchange(exch);
} while (true);
} finally {
LOG.log(Level.FINEST, "Processing thread exiting");
}
}
/**
* Get the next message exchange to process, blocking until one becomes
* available if nothing is already waiting for processing.
*
* @return next available exchange to process, or {@code null} if no
* exchanges are immediately available
*/
private HTTPExchange nextExchange() {
assertUnlocked();
final Thread thread = Thread.currentThread();
HTTPExchange exch = null;
lock.lock();
try {
do {
if (!thread.equals(procThread)) {
break;
}
exch = exchanges.peek();
if (exch == null) {
try {
notEmpty.await();
} catch (InterruptedException intx) {
LOG.log(Level.FINEST, INTERRUPTED, intx);
}
}
} while (exch == null);
} finally {
lock.unlock();
}
return exch;
}
/**
* Process the next, provided exchange. This is the main processing
* method of the receive thread.
*
* @param exch message exchange to process
*/
private void processExchange(final HTTPExchange exch) {
assertUnlocked();
HTTPResponse resp;
AbstractBody body;
int respCode;
try {
resp = exch.getHTTPResponse();
body = resp.getBody();
respCode = resp.getHTTPStatus();
} catch (BOSHException boshx) {
LOG.log(Level.FINEST, "Could not obtain response", boshx);
dispose(boshx);
return;
} catch (InterruptedException intx) {
LOG.log(Level.FINEST, INTERRUPTED, intx);
dispose(intx);
return;
}
fireResponseReceived(body);
// Process the message with the current session state
AbstractBody req = exch.getRequest();
CMSessionParams params;
List<HTTPExchange> toResend = null;
lock.lock();
try {
// Check for session creation response info, if needed
if (cmParams == null) {
cmParams = CMSessionParams.fromSessionInit(req, body);
// The following call handles the lock. It's not an escape.
fireConnectionEstablished();
}
params = cmParams;
checkForTerminalBindingConditions(body, respCode);
if (isTermination(body)) {
// Explicit termination
lock.unlock();
dispose(null);
return;
}
if (isRecoverableBindingCondition(body)) {
// Retransmit outstanding requests
if (toResend == null) {
toResend = new ArrayList<HTTPExchange>(exchanges.size());
}
for (HTTPExchange exchange : exchanges) {
HTTPExchange resendExch =
new HTTPExchange(exchange.getRequest());
toResend.add(resendExch);
}
for (HTTPExchange exchange : toResend) {
exchanges.add(exchange);
}
} else {
// Process message as normal
processRequestAcknowledgements(req, body);
processResponseAcknowledgementData(req);
HTTPExchange resendExch =
processResponseAcknowledgementReport(body);
if (resendExch != null && toResend == null) {
toResend = new ArrayList<HTTPExchange>(1);
toResend.add(resendExch);
exchanges.add(resendExch);
}
}
} catch (BOSHException boshx) {
LOG.log(Level.FINEST, "Could not process response", boshx);
lock.unlock();
dispose(boshx);
return;
} finally {
if (lock.isHeldByCurrentThread()) {
try {
exchanges.remove(exch);
if (exchanges.isEmpty()) {
scheduleEmptyRequest(processPauseRequest(req));
}
notFull.signalAll();
} finally {
lock.unlock();
}
}
}
if (toResend != null) {
for (HTTPExchange resend : toResend) {
HTTPResponse response =
httpSender.send(params, resend.getRequest());
resend.setHTTPResponse(response);
fireRequestSent(resend.getRequest());
}
}
}
/**
* Clears any scheduled empty requests.
*/
private void clearEmptyRequest() {
assertLocked();
if (emptyRequestFuture != null) {
emptyRequestFuture.cancel(false);
emptyRequestFuture = null;
}
}
/**
* Calculates the default empty request delay/interval to use for the
* active session.
*
* @return delay in milliseconds
*/
private long getDefaultEmptyRequestDelay() {
assertLocked();
// Figure out how long we should wait before sending an empty request
AttrPolling polling = cmParams.getPollingInterval();
long delay;
if (polling == null) {
delay = EMPTY_REQUEST_DELAY;
} else {
delay = polling.getInMilliseconds();
}
return delay;
}
/**
* Schedule an empty request to be sent if no other requests are
* sent in a reasonable amount of time.
*/
private void scheduleEmptyRequest(long delay) {
assertLocked();
if (delay < 0L) {
throw(new IllegalArgumentException(
"Empty request delay must be >= 0 (was: " + delay + ")"));
}
clearEmptyRequest();
if (!isWorking()) {
return;
}
// Schedule the transmission
if (LOG.isLoggable(Level.FINER)) {
LOG.finer("Scheduling empty request in " + delay + "ms");
}
try {
emptyRequestFuture = schedExec.schedule(emptyRequestRunnable,
delay, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException rex) {
LOG.log(Level.FINEST, "Could not schedule empty request", rex);
}
drained.signalAll();
}
/**
* Sends an empty request to maintain session requirements. If a request
* is sent within a reasonable time window, the empty request transmission
* will be cancelled.
*/
private void sendEmptyRequest() {
assertUnlocked();
// Send an empty request
LOG.finest("Sending empty request");
try {
send(ComposableBody.builder().build());
} catch (BOSHException boshx) {
dispose(boshx);
}
}
/**
* Assert that the internal lock is held.
*/
private void assertLocked() {
if (ASSERTIONS) {
if (!lock.isHeldByCurrentThread()) {
throw(new AssertionError("Lock is not held by current thread"));
}
return;
}
}
/**
* Assert that the internal lock is *not* held.
*/
private void assertUnlocked() {
if (ASSERTIONS) {
if (lock.isHeldByCurrentThread()) {
throw(new AssertionError("Lock is held by current thread"));
}
return;
}
}
/**
* Checks to see if the response indicates a terminal binding condition
* (as per XEP-0124 section 17). If it does, an exception is thrown.
*
* @param body response body to evaluate
* @param code HTTP response code
* @throws BOSHException if a terminal binding condition is detected
*/
private void checkForTerminalBindingConditions(
final AbstractBody body,
final int code)
throws BOSHException {
TerminalBindingCondition cond =
getTerminalBindingCondition(code, body);
if (cond != null) {
throw(new BOSHException(
"Terminal binding condition encountered: "
+ cond.getCondition() + " ("
+ cond.getMessage() + ")"));
}
}
/**
* Determines whether or not the response indicates a recoverable
* binding condition (as per XEP-0124 section 17).
*
* @param resp response body
* @return {@code true} if it does, {@code false} otherwise
*/
private static boolean isRecoverableBindingCondition(
final AbstractBody resp) {
return ERROR.equals(resp.getAttribute(Attributes.TYPE));
}
/**
* Process the request to determine if the empty request delay
* can be determined by looking to see if the request is a pause
* request. If it can, the request's delay is returned, otherwise
* the default delay is returned.
*
* @return delay in milliseconds that should elapse prior to an
* empty message being sent
*/
private long processPauseRequest(
final AbstractBody req) {
assertLocked();
if (cmParams != null && cmParams.getMaxPause() != null) {
try {
AttrPause pause = AttrPause.createFromString(
req.getAttribute(Attributes.PAUSE));
if (pause != null) {
long delay = pause.getInMilliseconds() - PAUSE_MARGIN;
if (delay < 0) {
delay = EMPTY_REQUEST_DELAY;
}
return delay;
}
} catch (BOSHException boshx) {
LOG.log(Level.FINEST, "Could not extract", boshx);
}
}
return getDefaultEmptyRequestDelay();
}
/**
* Check the response for request acknowledgements and take appropriate
* action.
*
* This method assumes the lock is currently held.
*
* @param req request
* @param resp response
*/
private void processRequestAcknowledgements(
final AbstractBody req, final AbstractBody resp) {
assertLocked();
if (!cmParams.isAckingRequests()) {
return;
}
// If a report or time attribute is set, we aren't acking anything
if (resp.getAttribute(Attributes.REPORT) != null) {
return;
}
// Figure out what the highest acked RID is
String acked = resp.getAttribute(Attributes.ACK);
Long ackUpTo;
if (acked == null) {
// Implicit ack of all prior requests up until RID
ackUpTo = Long.parseLong(req.getAttribute(Attributes.RID));
} else {
ackUpTo = Long.parseLong(acked);
}
// Remove the acked requests from the list
if (LOG.isLoggable(Level.FINEST)) {
LOG.finest("Removing pending acks up to: " + ackUpTo);
}
Iterator<ComposableBody> iter = pendingRequestAcks.iterator();
while (iter.hasNext()) {
AbstractBody pending = iter.next();
Long pendingRID = Long.parseLong(
pending.getAttribute(Attributes.RID));
if (pendingRID.compareTo(ackUpTo) <= 0) {
iter.remove();
}
}
}
/**
* Process the response in order to update the response acknowlegement
* data.
*
* This method assumes the lock is currently held.
*
* @param req request
*/
private void processResponseAcknowledgementData(
final AbstractBody req) {
assertLocked();
Long rid = Long.parseLong(req.getAttribute(Attributes.RID));
if (responseAck.equals(Long.valueOf(-1L))) {
// This is the first request
responseAck = rid;
} else {
pendingResponseAcks.add(rid);
// Remove up until the first missing response (or end of queue)
Long whileVal = responseAck;
while (whileVal.equals(pendingResponseAcks.first())) {
responseAck = whileVal;
pendingResponseAcks.remove(whileVal);
whileVal = Long.valueOf(whileVal.longValue() + 1);
}
}
}
/**
* Process the response in order to check for and respond to any potential
* ack reports.
*
* This method assumes the lock is currently held.
*
* @param resp response
* @return exchange to transmit if a resend is to be performed, or
* {@code null} if no resend is necessary
* @throws BOSHException when a a retry is needed but cannot be performed
*/
private HTTPExchange processResponseAcknowledgementReport(
final AbstractBody resp)
throws BOSHException {
assertLocked();
String reportStr = resp.getAttribute(Attributes.REPORT);
if (reportStr == null) {
// No report on this message
return null;
}
Long report = Long.parseLong(reportStr);
Long time = Long.parseLong(resp.getAttribute(Attributes.TIME));
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Received report of missing request (RID="
+ report + ", time=" + time + "ms)");
}
// Find the missing request
Iterator<ComposableBody> iter = pendingRequestAcks.iterator();
AbstractBody req = null;
while (iter.hasNext() && req == null) {
AbstractBody pending = iter.next();
Long pendingRID = Long.parseLong(
pending.getAttribute(Attributes.RID));
if (report.equals(pendingRID)) {
req = pending;
}
}
if (req == null) {
throw(new BOSHException("Report of missing message with RID '"
+ reportStr
+ "' but local copy of that request was not found"));
}
// Resend the missing request
HTTPExchange exch = new HTTPExchange(req);
exchanges.add(exch);
notEmpty.signalAll();
return exch;
}
/**
* Notifies all request listeners that the specified request is being
* sent.
*
* @param request request being sent
*/
private void fireRequestSent(final AbstractBody request) {
assertUnlocked();
BOSHMessageEvent event = null;
for (BOSHClientRequestListener listener : requestListeners) {
if (event == null) {
event = BOSHMessageEvent.createRequestSentEvent(this, request);
}
try {
listener.requestSent(event);
} catch (Exception ex) {
LOG.log(Level.WARNING, UNHANDLED, ex);
}
}
}
/**
* Notifies all response listeners that the specified response has been
* received.
*
* @param response response received
*/
private void fireResponseReceived(final AbstractBody response) {
assertUnlocked();
BOSHMessageEvent event = null;
for (BOSHClientResponseListener listener : responseListeners) {
if (event == null) {
event = BOSHMessageEvent.createResponseReceivedEvent(
this, response);
}
try {
listener.responseReceived(event);
} catch (Exception ex) {
LOG.log(Level.WARNING, UNHANDLED, ex);
}
}
}
/**
* Notifies all connection listeners that the session has been successfully
* established.
*/
private void fireConnectionEstablished() {
final boolean hadLock = lock.isHeldByCurrentThread();
if (hadLock) {
lock.unlock();
}
try {
BOSHClientConnEvent event = null;
for (BOSHClientConnListener listener : connListeners) {
if (event == null) {
event = BOSHClientConnEvent
.createConnectionEstablishedEvent(this);
}
try {
listener.connectionEvent(event);
} catch (Exception ex) {
LOG.log(Level.WARNING, UNHANDLED, ex);
}
}
} finally {
if (hadLock) {
lock.lock();
}
}
}
/**
* Notifies all connection listeners that the session has been
* terminated normally.
*/
private void fireConnectionClosed() {
assertUnlocked();
BOSHClientConnEvent event = null;
for (BOSHClientConnListener listener : connListeners) {
if (event == null) {
event = BOSHClientConnEvent.createConnectionClosedEvent(this);
}
try {
listener.connectionEvent(event);
} catch (Exception ex) {
LOG.log(Level.WARNING, UNHANDLED, ex);
}
}
}
/**
* Notifies all connection listeners that the session has been
* terminated due to the exceptional condition provided.
*
* @param cause cause of the termination
*/
private void fireConnectionClosedOnError(
final Throwable cause) {
assertUnlocked();
BOSHClientConnEvent event = null;
for (BOSHClientConnListener listener : connListeners) {
if (event == null) {
event = BOSHClientConnEvent
.createConnectionClosedOnErrorEvent(
this, pendingRequestAcks, cause);
}
try {
listener.connectionEvent(event);
} catch (Exception ex) {
LOG.log(Level.WARNING, UNHANDLED, ex);
}
}
}
}