| /* |
| * Copyright 2007-2008 Sun Microsystems, Inc. All Rights Reserved. |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, |
| * CA 95054 USA or visit www.sun.com if you need additional information or |
| * have any questions. |
| */ |
| |
| /* |
| * @test ReconnectableJMXConnector |
| * @bug 5108776 |
| * @summary Check that the Event Service can be used to build a |
| * ReconnectableJMXConnector. |
| * @author Eamonn McManus |
| */ |
| |
| import java.io.IOException; |
| import java.lang.reflect.InvocationHandler; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.lang.reflect.Proxy; |
| import java.util.Date; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import javax.management.ListenerNotFoundException; |
| import javax.management.MBeanServer; |
| import javax.management.MBeanServerConnection; |
| import javax.management.MBeanServerFactory; |
| import javax.management.Notification; |
| import javax.management.NotificationBroadcasterSupport; |
| import javax.management.NotificationFilter; |
| import javax.management.NotificationListener; |
| import javax.management.ObjectName; |
| import javax.management.event.EventClient; |
| import javax.management.remote.JMXConnectionNotification; |
| import javax.management.remote.JMXConnector; |
| import javax.management.remote.JMXConnectorFactory; |
| import javax.management.remote.JMXConnectorServer; |
| import javax.management.remote.JMXConnectorServerFactory; |
| import javax.management.remote.JMXServiceURL; |
| import javax.security.auth.Subject; |
| |
| /* |
| * This test checks that it is possible to use the Event Service to create |
| * a "reconnectable connector". |
| * |
| * In the JMX Remote API, we deliberately specified that a connector client |
| * (JMXConnector) that encounters a network failure is then permanently broken. |
| * The idea being that adding recovery logic to the basic connector client |
| * would make it much more complicated and less reliable, and the logic would |
| * in any case never correspond to what a given situation needs. Some of |
| * the tough questions are: Should the connector try to mask the failure by |
| * blocking operations until the failure is resolved? How long should the |
| * connector try to reestablish the connection before giving up? Rather than |
| * try to solve this problem in the connector, we suggested that people who |
| * wanted to recover from network failures could implement the JMXConnector |
| * interface themselves so that it forwards to a wrapped JMXConnector that can |
| * be replaced in case of network failure. |
| * |
| * This works fine except that the connector client has state, |
| * in the form of listeners added by the user through the |
| * MBeanServerConnection.addNotificationListener method. It's possible |
| * for the wrapper to keep track of these listeners as well as forwarding |
| * them to the wrapped JMXConnector, so that it can reapply them to |
| * a replacement JMXConnector after failure recover. But it's quite |
| * tricky, particularly because of the two- and four-argument versions of |
| * removeNotificationListener. |
| * |
| * The Event Service can take care of this for you through the EventClient |
| * class. Listeners added through that class are implemented in a way that |
| * doesn't require the connector client to maintain any state, so they should |
| * continue to work transparently after replacing the wrapped JMXConnector. |
| * This test is a proof of concept that shows it works. Quite a number of |
| * details would need to be changed to build a reliable reconnectable |
| * connector. |
| * |
| * The test simulates network failure by rewrapping the wrapped JMXConnector's |
| * MBeanServerConnection (MBSC) in a "breakable" MBSC which we can cause |
| * to stop working. We do this in two phases. The first phase suspends |
| * any MBSC calls just at the point where they would return to the caller. |
| * The goal here is to block an EventClientDelegateMBean.fetchNotifications |
| * operation when it has received notifications but not yet delivered them |
| * to the EventClient. This is the most delicate point where a breakage |
| * can occur, because the EventClientDelegate must not drop those notifs |
| * from its buffer until another fetchNotifs call arrives with a later |
| * sequence number (which is an implicit ack of the previous set of |
| * notifs). Once the fetchNotifs call is suspended, we "kill" the MBSC, |
| * causing it to throw IOException from this and any other calls. That |
| * triggers the reconnect logic, which will make a new MBSC and issue |
| * the same fetchNotifs call to it. |
| * |
| * The test could be improved by synchronizing explicitly between the |
| * breakable MBSC and the mainline, so we only proceed to kill the MBSC |
| * when we are sure that the fetchNotifs call is blocked. As it is, |
| * we have a small delay which both ensures that no notifs are delivered |
| * while the connection is suspended, and if the machine is fast enough |
| * allows the fetchNotifs call to reach the blocking point. |
| */ |
| public class ReconnectableConnectorTest { |
| private static class ReconnectableJMXConnector implements JMXConnector { |
| private final JMXServiceURL url; |
| private AtomicReference<JMXConnector> wrappedJMXC = |
| new AtomicReference<JMXConnector>(); |
| private AtomicReference<MBeanServerConnection> wrappedMBSC = |
| new AtomicReference<MBeanServerConnection>(); |
| private final NotificationBroadcasterSupport broadcaster = |
| new NotificationBroadcasterSupport(); |
| private final Lock connectLock = new ReentrantLock(); |
| |
| ReconnectableJMXConnector(JMXServiceURL url) { |
| this.url = url; |
| } |
| |
| private class ReconnectIH implements InvocationHandler { |
| public Object invoke(Object proxy, Method method, Object[] args) |
| throws Throwable { |
| try { |
| return method.invoke(wrappedMBSC.get(), args); |
| } catch (InvocationTargetException e) { |
| if (e.getCause() instanceof IOException) { |
| connect(); |
| try { |
| return method.invoke(wrappedMBSC.get(),args); |
| } catch (InvocationTargetException ee) { |
| throw ee.getCause(); |
| } |
| } |
| throw e.getCause(); |
| } |
| } |
| } |
| |
| private class FailureListener implements NotificationListener { |
| public void handleNotification(Notification n, Object h) { |
| String type = n.getType(); |
| if (type.equals(JMXConnectionNotification.FAILED)) { |
| try { |
| connect(); |
| } catch (IOException e) { |
| broadcaster.sendNotification(n); |
| } |
| } else if (type.equals(JMXConnectionNotification.NOTIFS_LOST)) |
| broadcaster.sendNotification(n); |
| } |
| } |
| |
| public void connect() throws IOException { |
| connectLock.lock(); |
| try { |
| connectWithLock(); |
| } finally { |
| connectLock.unlock(); |
| } |
| } |
| |
| private void connectWithLock() throws IOException { |
| MBeanServerConnection mbsc = wrappedMBSC.get(); |
| if (mbsc != null) { |
| try { |
| mbsc.getDefaultDomain(); |
| return; // the connection works |
| } catch (IOException e) { |
| // OK: the connection doesn't work, so make a new one |
| } |
| } |
| // This is where we would need to add the fancy logic that |
| // allows the connection to keep failing for a while |
| // before giving up. |
| JMXConnector jmxc = JMXConnectorFactory.connect(url); |
| jmxc.addConnectionNotificationListener( |
| new FailureListener(), null, null); |
| wrappedJMXC.set(jmxc); |
| if (false) |
| wrappedMBSC.set(jmxc.getMBeanServerConnection()); |
| else { |
| mbsc = jmxc.getMBeanServerConnection(); |
| InvocationHandler ih = new BreakableIH(mbsc); |
| mbsc = (MBeanServerConnection) Proxy.newProxyInstance( |
| MBeanServerConnection.class.getClassLoader(), |
| new Class<?>[] {MBeanServerConnection.class}, |
| ih); |
| wrappedMBSC.set(mbsc); |
| } |
| } |
| |
| private BreakableIH breakableIH() { |
| MBeanServerConnection mbsc = wrappedMBSC.get(); |
| return (BreakableIH) Proxy.getInvocationHandler(mbsc); |
| } |
| |
| void suspend() { |
| BreakableIH ih = breakableIH(); |
| ih.suspend(); |
| } |
| |
| void kill() throws IOException { |
| BreakableIH ih = breakableIH(); |
| wrappedJMXC.get().close(); |
| ih.kill(); |
| } |
| |
| public void connect(Map<String, ?> env) throws IOException { |
| throw new UnsupportedOperationException("Not supported yet."); |
| } |
| |
| private final AtomicReference<MBeanServerConnection> mbscRef = |
| new AtomicReference<MBeanServerConnection>(); |
| |
| public MBeanServerConnection getMBeanServerConnection() |
| throws IOException { |
| connect(); |
| // Synchro here is not strictly correct: two threads could make |
| // an MBSC at the same time. OK for a test but beware for real |
| // code. |
| MBeanServerConnection mbsc = mbscRef.get(); |
| if (mbsc != null) |
| return mbsc; |
| mbsc = (MBeanServerConnection) Proxy.newProxyInstance( |
| MBeanServerConnection.class.getClassLoader(), |
| new Class<?>[] {MBeanServerConnection.class}, |
| new ReconnectIH()); |
| mbsc = EventClient.getEventClientConnection(mbsc); |
| mbscRef.set(mbsc); |
| return mbsc; |
| } |
| |
| public MBeanServerConnection getMBeanServerConnection( |
| Subject delegationSubject) throws IOException { |
| throw new UnsupportedOperationException("Not supported yet."); |
| } |
| |
| public void close() throws IOException { |
| wrappedJMXC.get().close(); |
| } |
| |
| public void addConnectionNotificationListener( |
| NotificationListener l, NotificationFilter f, Object h) { |
| broadcaster.addNotificationListener(l, f, h); |
| } |
| |
| public void removeConnectionNotificationListener(NotificationListener l) |
| throws ListenerNotFoundException { |
| broadcaster.removeNotificationListener(l); |
| } |
| |
| public void removeConnectionNotificationListener( |
| NotificationListener l, NotificationFilter f, Object h) |
| throws ListenerNotFoundException { |
| broadcaster.removeNotificationListener(l, f, h); |
| } |
| |
| public String getConnectionId() throws IOException { |
| return wrappedJMXC.get().getConnectionId(); |
| } |
| } |
| |
| // InvocationHandler that allows us to perform a two-phase "break" of |
| // an object. The first phase suspends the object, so that calls to |
| // it are blocked just before they return. The second phase unblocks |
| // suspended threads and causes them to throw IOException. |
| private static class BreakableIH implements InvocationHandler { |
| private final Object wrapped; |
| private final Holder<String> state = new Holder<String>("running"); |
| |
| BreakableIH(Object wrapped) { |
| this.wrapped = wrapped; |
| } |
| |
| void suspend() { |
| state.set("suspended"); |
| } |
| |
| void kill() { |
| state.set("killed"); |
| } |
| |
| public Object invoke(Object proxy, Method method, Object[] args) |
| throws Throwable { |
| Object result; |
| try { |
| result = method.invoke(wrapped, args); |
| } catch (InvocationTargetException e) { |
| throw e.getCause(); |
| } |
| String s = state.get(); |
| if (s.equals("suspended")) |
| state.waitUntilEqual("killed", 3, TimeUnit.SECONDS); |
| else if (s.equals("killed")) |
| throw new IOException("Broken"); |
| return result; |
| } |
| } |
| |
| private static class Holder<T> { |
| private T held; |
| private Lock lock = new ReentrantLock(); |
| private Condition changed = lock.newCondition(); |
| |
| Holder(T value) { |
| lock.lock(); |
| this.held = value; |
| lock.unlock(); |
| } |
| |
| void waitUntilEqual(T value, long timeout, TimeUnit units) |
| throws InterruptedException { |
| long millis = units.toMillis(timeout); |
| long stop = System.currentTimeMillis() + millis; |
| Date stopDate = new Date(stop); |
| lock.lock(); |
| try { |
| while (!value.equals(held)) { |
| boolean ok = changed.awaitUntil(stopDate); |
| if (!ok) |
| throw new InterruptedException("Timed out"); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| void set(T value) { |
| lock.lock(); |
| try { |
| held = value; |
| changed.signalAll(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| T get() { |
| lock.lock(); |
| try { |
| return held; |
| } finally { |
| lock.unlock(); |
| } |
| } |
| } |
| |
| private static class StoreListener implements NotificationListener { |
| final BlockingQueue<Notification> queue = |
| new ArrayBlockingQueue<Notification>(100); |
| |
| public void handleNotification(Notification n, Object h) { |
| queue.add(n); |
| } |
| |
| Notification nextNotification(long time, TimeUnit units) |
| throws InterruptedException { |
| Notification n = queue.poll(time, units); |
| if (n == null) |
| throw new NoSuchElementException("Notification wait timed out"); |
| return n; |
| } |
| |
| int notifCount() { |
| return queue.size(); |
| } |
| } |
| |
| public static interface SenderMBean {} |
| public static class Sender |
| extends NotificationBroadcasterSupport implements SenderMBean { |
| private AtomicLong seqNo = new AtomicLong(0); |
| |
| void send() { |
| Notification n = |
| new Notification("type", this, seqNo.getAndIncrement()); |
| sendNotification(n); |
| } |
| } |
| |
| public static void main(String[] args) throws Exception { |
| MBeanServer mbs = MBeanServerFactory.newMBeanServer(); |
| Sender sender = new Sender(); |
| ObjectName name = new ObjectName("a:b=c"); |
| mbs.registerMBean(sender, name); |
| |
| System.out.println("Creating connector server"); |
| JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///"); |
| JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer( |
| url, null, mbs); |
| cs.start(); |
| |
| StoreListener csListener = new StoreListener(); |
| cs.addNotificationListener(csListener, null, null); |
| |
| System.out.println("Creating reconnectable client"); |
| JMXServiceURL addr = cs.getAddress(); |
| ReconnectableJMXConnector cc = new ReconnectableJMXConnector(addr); |
| MBeanServerConnection mbsc = cc.getMBeanServerConnection(); |
| |
| System.out.println("Checking server has sent new-client notif"); |
| Notification csn = csListener.nextNotification(1, TimeUnit.SECONDS); |
| assertEquals("CS notif type", |
| JMXConnectionNotification.OPENED, csn.getType()); |
| |
| StoreListener listener = new StoreListener(); |
| mbsc.addNotificationListener(name, listener, null, null); |
| |
| System.out.println("Sending 10 notifs and checking they are received"); |
| for (int i = 0; i < 10; i++) |
| sender.send(); |
| checkNotifs(listener, 0, 10); |
| |
| System.out.println("Suspending the fetchNotifs operation"); |
| cc.suspend(); |
| System.out.println("Sending a notif while fetchNotifs is suspended"); |
| sender.send(); |
| System.out.println("Brief wait before checking no notif is received"); |
| Thread.sleep(2); |
| // dumpThreads(); |
| assertEquals("notif queue while connector suspended", |
| 0, listener.notifCount()); |
| assertEquals("connector server notif queue while connector suspended", |
| 0, csListener.notifCount()); |
| |
| System.out.println("Breaking the connection so fetchNotifs will fail over"); |
| cc.kill(); |
| |
| System.out.println("Checking that client has reconnected"); |
| csn = csListener.nextNotification(1, TimeUnit.SECONDS); |
| assertEquals("First CS notif type after kill", |
| JMXConnectionNotification.CLOSED, csn.getType()); |
| csn = csListener.nextNotification(1, TimeUnit.SECONDS); |
| assertEquals("Second CS notif type after kill", |
| JMXConnectionNotification.OPENED, csn.getType()); |
| |
| System.out.println("Checking that suspended notif has been received"); |
| checkNotifs(listener, 10, 11); |
| } |
| |
| private static void checkNotifs( |
| StoreListener sl, long start, long stop) |
| throws Exception { |
| for (long i = start; i < stop; i++) { |
| Notification n = sl.nextNotification(1, TimeUnit.SECONDS); |
| assertEquals("received sequence number", i, n.getSequenceNumber()); |
| } |
| } |
| |
| private static void assertEquals(String what, Object expect, Object actual) |
| throws Exception { |
| if (!expect.equals(actual)) { |
| fail(what + " should be " + expect + " but is " + actual); |
| } |
| } |
| |
| private static void fail(String why) throws Exception { |
| throw new Exception("TEST FAILED: " + why); |
| } |
| |
| private static void dumpThreads() { |
| System.out.println("Thread stack dump"); |
| Map<Thread, StackTraceElement[]> traces = Thread.getAllStackTraces(); |
| for (Map.Entry<Thread, StackTraceElement[]> entry : traces.entrySet()) { |
| Thread t = entry.getKey(); |
| System.out.println("===Thread " + t.getName() + "==="); |
| for (StackTraceElement ste : entry.getValue()) |
| System.out.println(" " + ste); |
| } |
| } |
| } |