| /** |
| * $RCSfile$ |
| * $Revision$ |
| * $Date$ |
| * |
| * Copyright 2003-2007 Jive Software. |
| * |
| * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.jivesoftware.smack; |
| |
| import org.jivesoftware.smack.packet.Packet; |
| |
| import java.io.IOException; |
| import java.io.Writer; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| |
| /** |
| * Writes packets to a XMPP server. Packets are sent using a dedicated thread. Packet |
| * interceptors can be registered to dynamically modify packets before they're actually |
| * sent. Packet listeners can be registered to listen for all outgoing packets. |
| * |
| * @see Connection#addPacketInterceptor |
| * @see Connection#addPacketSendingListener |
| * |
| * @author Matt Tucker |
| */ |
| class PacketWriter { |
| |
| private Thread writerThread; |
| private Thread keepAliveThread; |
| private Writer writer; |
| private XMPPConnection connection; |
| private final BlockingQueue<Packet> queue; |
| volatile boolean done; |
| |
| /** |
| * Creates a new packet writer with the specified connection. |
| * |
| * @param connection the connection. |
| */ |
| protected PacketWriter(XMPPConnection connection) { |
| this.queue = new ArrayBlockingQueue<Packet>(500, true); |
| this.connection = connection; |
| init(); |
| } |
| |
| /** |
| * Initializes the writer in order to be used. It is called at the first connection and also |
| * is invoked if the connection is disconnected by an error. |
| */ |
| protected void init() { |
| this.writer = connection.writer; |
| done = false; |
| |
| writerThread = new Thread() { |
| public void run() { |
| writePackets(this); |
| } |
| }; |
| writerThread.setName("Smack Packet Writer (" + connection.connectionCounterValue + ")"); |
| writerThread.setDaemon(true); |
| } |
| |
| /** |
| * Sends the specified packet to the server. |
| * |
| * @param packet the packet to send. |
| */ |
| public void sendPacket(Packet packet) { |
| if (!done) { |
| // Invoke interceptors for the new packet that is about to be sent. Interceptors |
| // may modify the content of the packet. |
| connection.firePacketInterceptors(packet); |
| |
| try { |
| queue.put(packet); |
| } |
| catch (InterruptedException ie) { |
| ie.printStackTrace(); |
| return; |
| } |
| synchronized (queue) { |
| queue.notifyAll(); |
| } |
| |
| // Process packet writer listeners. Note that we're using the sending |
| // thread so it's expected that listeners are fast. |
| connection.firePacketSendingListeners(packet); |
| } |
| } |
| |
| /** |
| * Starts the packet writer thread and opens a connection to the server. The |
| * packet writer will continue writing packets until {@link #shutdown} or an |
| * error occurs. |
| */ |
| public void startup() { |
| writerThread.start(); |
| } |
| |
| void setWriter(Writer writer) { |
| this.writer = writer; |
| } |
| |
| /** |
| * Shuts down the packet writer. Once this method has been called, no further |
| * packets will be written to the server. |
| */ |
| public void shutdown() { |
| done = true; |
| synchronized (queue) { |
| queue.notifyAll(); |
| } |
| // Interrupt the keep alive thread if one was created |
| if (keepAliveThread != null) |
| keepAliveThread.interrupt(); |
| } |
| |
| /** |
| * Cleans up all resources used by the packet writer. |
| */ |
| void cleanup() { |
| connection.interceptors.clear(); |
| connection.sendListeners.clear(); |
| } |
| |
| /** |
| * Returns the next available packet from the queue for writing. |
| * |
| * @return the next packet for writing. |
| */ |
| private Packet nextPacket() { |
| Packet packet = null; |
| // Wait until there's a packet or we're done. |
| while (!done && (packet = queue.poll()) == null) { |
| try { |
| synchronized (queue) { |
| queue.wait(); |
| } |
| } |
| catch (InterruptedException ie) { |
| // Do nothing |
| } |
| } |
| return packet; |
| } |
| |
| private void writePackets(Thread thisThread) { |
| try { |
| // Open the stream. |
| openStream(); |
| // Write out packets from the queue. |
| while (!done && (writerThread == thisThread)) { |
| Packet packet = nextPacket(); |
| if (packet != null) { |
| writer.write(packet.toXML()); |
| if (queue.isEmpty()) { |
| writer.flush(); |
| } |
| } |
| } |
| // Flush out the rest of the queue. If the queue is extremely large, it's possible |
| // we won't have time to entirely flush it before the socket is forced closed |
| // by the shutdown process. |
| try { |
| while (!queue.isEmpty()) { |
| Packet packet = queue.remove(); |
| writer.write(packet.toXML()); |
| } |
| writer.flush(); |
| } |
| catch (Exception e) { |
| e.printStackTrace(); |
| } |
| |
| // Delete the queue contents (hopefully nothing is left). |
| queue.clear(); |
| |
| // Close the stream. |
| try { |
| writer.write("</stream:stream>"); |
| writer.flush(); |
| } |
| catch (Exception e) { |
| // Do nothing |
| } |
| finally { |
| try { |
| writer.close(); |
| } |
| catch (Exception e) { |
| // Do nothing |
| } |
| } |
| } |
| catch (IOException ioe) { |
| // The exception can be ignored if the the connection is 'done' |
| // or if the it was caused because the socket got closed |
| if (!(done || connection.isSocketClosed())) { |
| done = true; |
| // packetReader could be set to null by an concurrent disconnect() call. |
| // Therefore Prevent NPE exceptions by checking packetReader. |
| if (connection.packetReader != null) { |
| connection.notifyConnectionError(ioe); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Sends to the server a new stream element. This operation may be requested several times |
| * so we need to encapsulate the logic in one place. This message will be sent while doing |
| * TLS, SASL and resource binding. |
| * |
| * @throws IOException If an error occurs while sending the stanza to the server. |
| */ |
| void openStream() throws IOException { |
| StringBuilder stream = new StringBuilder(); |
| stream.append("<stream:stream"); |
| stream.append(" to=\"").append(connection.getServiceName()).append("\""); |
| stream.append(" xmlns=\"jabber:client\""); |
| stream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\""); |
| stream.append(" version=\"1.0\">"); |
| writer.write(stream.toString()); |
| writer.flush(); |
| } |
| } |