| /* |
| * Copyright 2007 Sun Microsystems, Inc. All Rights Reserved. |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. Sun designates this |
| * particular file as subject to the "Classpath" exception as provided |
| * by Sun in the LICENSE file that accompanied this code. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, |
| * CA 95054 USA or visit www.sun.com if you need additional information or |
| * have any questions. |
| */ |
| |
| package javax.management.event; |
| |
| import com.sun.jmx.event.DaemonThreadFactory; |
| import com.sun.jmx.event.RepeatedSingletonJob; |
| import com.sun.jmx.remote.util.ClassLogger; |
| import java.rmi.RemoteException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import javax.management.Notification; |
| import javax.management.remote.NotificationResult; |
| import javax.management.remote.TargetedNotification; |
| |
| |
| /** |
| * This class is used by {@link RMIPushEventRelay}. When |
| * {@link RMIPushEventRelay} calls {@link |
| * EventClientDelegateMBean#addClient(String, Object[], String[])} to get a new |
| * client identifier, it uses this class name as the |
| * first argument to ask {@code EventClientDelegateMBean} to create an object of |
| * this class. |
| * Then {@code EventClientDelegateMBean} forwards client notifications |
| * to this object. This object then continues forwarding the notifications |
| * to the {@code RMIPushEventRelay}. |
| */ |
| public class RMIPushEventForwarder implements EventForwarder { |
| private static final int DEFAULT_BUFFER_SIZE = 6000; |
| |
| /** |
| * Creates a new instance of {@code RMIPushEventForwarder}. |
| * |
| * @param receiver An RMI stub exported to receive notifications |
| * from this object for its {@link RMIPushEventRelay}. |
| * |
| * @param bufferSize The maximum number of notifications to store |
| * while waiting for the last remote send to complete. |
| */ |
| public RMIPushEventForwarder(RMIPushServer receiver, int bufferSize) { |
| if (logger.traceOn()) { |
| logger.trace("RMIEventForwarder", "new one"); |
| } |
| |
| if (bufferSize < 0) { |
| throw new IllegalArgumentException( |
| "Negative buffer size: " + bufferSize); |
| } else if (bufferSize == 0) |
| bufferSize = DEFAULT_BUFFER_SIZE; |
| |
| if (receiver == null) { |
| throw new NullPointerException(); |
| } |
| |
| this.receiver = receiver; |
| this.buffer = new ArrayBlockingQueue<TargetedNotification>(bufferSize); |
| } |
| |
| public void forward(Notification n, Integer listenerId) { |
| if (logger.traceOn()) { |
| logger.trace("forward", "to the listener: "+listenerId); |
| } |
| synchronized(sendingJob) { |
| TargetedNotification tn = new TargetedNotification(n, listenerId); |
| while (!buffer.offer(tn)) { |
| buffer.remove(); |
| passed++; |
| } |
| sendingJob.resume(); |
| } |
| } |
| |
| public void close() { |
| if (logger.traceOn()) { |
| logger.trace("close", "called"); |
| } |
| |
| synchronized(sendingJob) { |
| ended = true; |
| buffer.clear(); |
| } |
| } |
| |
| public void setClientId(String clientId) { |
| if (logger.traceOn()) { |
| logger.trace("setClientId", clientId); |
| } |
| } |
| |
| private class SendingJob extends RepeatedSingletonJob { |
| public SendingJob() { |
| super(executor); |
| } |
| |
| public boolean isSuspended() { |
| return ended || buffer.isEmpty(); |
| } |
| |
| public void task() { |
| final long earliest = passed; |
| |
| List<TargetedNotification> tns = |
| new ArrayList<TargetedNotification>(buffer.size()); |
| synchronized(sendingJob) { |
| buffer.drainTo(tns); |
| passed += tns.size(); |
| } |
| |
| if (logger.traceOn()) { |
| logger.trace("SendingJob-task", "sending: "+tns.size()); |
| } |
| |
| if (!tns.isEmpty()) { |
| try { |
| TargetedNotification[] tnArray = |
| new TargetedNotification[tns.size()]; |
| tns.toArray(tnArray); |
| receiver.receive(new NotificationResult(earliest, passed, tnArray)); |
| } catch (RemoteException e) { |
| if (logger.debugOn()) { |
| logger.debug("SendingJob-task", |
| "Got exception to forward notifs.", e); |
| } |
| |
| long currentLost = passed - earliest; |
| if (FetchingEventRelay.isSerialOrClassNotFound(e)) { |
| // send one by one |
| long tmpPassed = earliest; |
| for (TargetedNotification tn : tns) { |
| try { |
| receiver.receive(new NotificationResult(earliest, |
| ++tmpPassed, new TargetedNotification[]{tn})); |
| } catch (RemoteException ioee) { |
| logger.trace( |
| "SendingJob-task", "send to remote", ioee); |
| // sends nonFatal notifs? |
| } |
| } |
| |
| currentLost = passed - tmpPassed; |
| } |
| |
| if (currentLost > 0) { // inform of the lost. |
| try { |
| receiver.receive(new NotificationResult( |
| passed, passed, |
| new TargetedNotification[]{})); |
| } catch (RemoteException ee) { |
| logger.trace( |
| "SendingJob-task", "receiver.receive", ee); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| private long passed = 0; |
| |
| private static final ExecutorService executor = |
| Executors.newCachedThreadPool( |
| new DaemonThreadFactory("RMIEventForwarder Executor")); |
| private final SendingJob sendingJob = new SendingJob(); |
| |
| private final BlockingQueue<TargetedNotification> buffer; |
| |
| private final RMIPushServer receiver; |
| private boolean ended = false; |
| |
| private static final ClassLogger logger = |
| new ClassLogger("javax.management.event", "RMIEventForwarder"); |
| } |