| /* |
| * Copyright 2007 Sun Microsystems, Inc. All Rights Reserved. |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, |
| * CA 95054 USA or visit www.sun.com if you need additional information or |
| * have any questions. |
| */ |
| |
| /* |
| * @test CustomForwarderTest |
| * @bug 5108776 |
| * @summary Test that a custom EventForwarder can be added |
| * @author Eamonn McManus |
| */ |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.ObjectInputStream; |
| import java.io.ObjectOutputStream; |
| import java.lang.management.ManagementFactory; |
| import java.net.DatagramPacket; |
| import java.net.DatagramSocket; |
| import java.net.SocketAddress; |
| import java.util.Map; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import javax.management.MBeanServer; |
| import javax.management.MBeanServerInvocationHandler; |
| import javax.management.Notification; |
| import javax.management.NotificationBroadcasterSupport; |
| import javax.management.NotificationFilter; |
| import javax.management.NotificationListener; |
| import javax.management.ObjectName; |
| import javax.management.event.EventClient; |
| import javax.management.event.EventClientDelegate; |
| import javax.management.event.EventClientDelegateMBean; |
| import javax.management.event.EventForwarder; |
| import javax.management.event.EventReceiver; |
| import javax.management.event.EventRelay; |
| import javax.management.remote.MBeanServerForwarder; |
| import javax.management.remote.NotificationResult; |
| import javax.management.remote.TargetedNotification; |
| |
| public class CustomForwarderTest { |
| public static class UdpEventRelay implements EventRelay { |
| private final EventClientDelegateMBean delegate; |
| private final DatagramSocket socket; |
| private final AtomicBoolean closed = new AtomicBoolean(); |
| private final String clientId; |
| private EventReceiver receiver; |
| |
| public UdpEventRelay(EventClientDelegateMBean delegate) |
| throws IOException { |
| this.delegate = delegate; |
| this.socket = new DatagramSocket(); |
| try { |
| clientId = delegate.addClient( |
| UdpEventForwarder.class.getName(), |
| new Object[] {socket.getLocalSocketAddress()}, |
| new String[] {SocketAddress.class.getName()}); |
| } catch (IOException e) { |
| throw e; |
| } catch (RuntimeException e) { |
| throw e; |
| } catch (Exception e) { |
| final IOException ioe = |
| new IOException("Exception creating EventForwarder"); |
| ioe.initCause(e); |
| throw ioe; |
| } |
| Thread t = new Thread(new Receiver()); |
| t.setDaemon(true); |
| t.start(); |
| } |
| |
| public String getClientId() throws IOException { |
| return clientId; |
| } |
| |
| public void setEventReceiver(EventReceiver eventReceiver) { |
| this.receiver = eventReceiver; |
| } |
| |
| public void stop() throws IOException { |
| closed.set(true); |
| socket.close(); |
| } |
| |
| private class Receiver implements Runnable { |
| public void run() { |
| byte[] buf = new byte[1024]; |
| DatagramPacket packet = new DatagramPacket(buf, buf.length); |
| while (true) { |
| try { |
| socket.receive(packet); |
| } catch (IOException e) { |
| if (closed.get()) { |
| System.out.println("Receiver got exception: " + e); |
| System.out.println("Normal because it has been closed"); |
| return; |
| } else { |
| System.err.println("UNEXPECTED EXCEPTION IN RECEIVER:"); |
| e.printStackTrace(); |
| System.exit(1); |
| } |
| } |
| try { |
| ByteArrayInputStream bin = new ByteArrayInputStream(buf); |
| ObjectInputStream oin = new ObjectInputStream(bin); |
| NotificationResult nr = (NotificationResult) |
| oin.readObject(); |
| receiver.receive(nr); |
| } catch (Exception e) { |
| System.err.println("UNEXPECTED EXCEPTION IN RECEIVER:"); |
| e.printStackTrace(); |
| System.exit(1); |
| } |
| } |
| } |
| } |
| } |
| |
| public static class UdpEventForwarder implements EventForwarder { |
| private final DatagramSocket socket; |
| private final AtomicLong seqNo = new AtomicLong(0); |
| private static volatile boolean drop; |
| |
| public UdpEventForwarder(SocketAddress addr) throws IOException { |
| this.socket = new DatagramSocket(); |
| socket.connect(addr); |
| } |
| |
| public static void setDrop(boolean drop) { |
| UdpEventForwarder.drop = drop; |
| } |
| |
| public void forward(Notification n, Integer listenerId) throws IOException { |
| long nextSeqNo = seqNo.incrementAndGet(); |
| long thisSeqNo = nextSeqNo - 1; |
| TargetedNotification tn = new TargetedNotification(n, listenerId); |
| NotificationResult nr = new NotificationResult( |
| thisSeqNo, nextSeqNo, new TargetedNotification[] {tn}); |
| ByteArrayOutputStream bout = new ByteArrayOutputStream(); |
| ObjectOutputStream oout = new ObjectOutputStream(bout); |
| oout.writeObject(nr); |
| oout.close(); |
| byte[] bytes = bout.toByteArray(); |
| DatagramPacket packet = new DatagramPacket(bytes, bytes.length); |
| if (!drop) |
| socket.send(packet); |
| } |
| |
| public void close() throws IOException { |
| socket.close(); |
| } |
| |
| public void setClientId(String clientId) throws IOException { |
| // Nothing to do. |
| } |
| } |
| |
| public static interface EmptyMBean {} |
| |
| public static class Empty |
| extends NotificationBroadcasterSupport implements EmptyMBean { |
| public void send(Notification n) { |
| super.sendNotification(n); |
| } |
| } |
| |
| public static void main(String[] args) throws Exception { |
| MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); |
| MBeanServerForwarder mbsf = EventClientDelegate.newForwarder(); |
| mbsf.setMBeanServer(mbs); |
| mbs = mbsf; |
| |
| // for 1.5 |
| if (System.getProperty("java.version").startsWith("1.5") && |
| !mbs.isRegistered(EventClientDelegateMBean.OBJECT_NAME)) { |
| System.out.print("Working on "+System.getProperty("java.version")+ |
| " register "+EventClientDelegateMBean.OBJECT_NAME); |
| |
| mbs.registerMBean(EventClientDelegate. |
| getEventClientDelegate(mbs), |
| EventClientDelegateMBean.OBJECT_NAME); |
| } |
| |
| ObjectName name = new ObjectName("a:b=c"); |
| Empty mbean = new Empty(); |
| mbs.registerMBean(mbean, name); |
| |
| EventClientDelegateMBean delegate = (EventClientDelegateMBean) |
| MBeanServerInvocationHandler.newProxyInstance( |
| mbs, |
| EventClientDelegateMBean.OBJECT_NAME, |
| EventClientDelegateMBean.class, |
| false); |
| EventRelay relay = new UdpEventRelay(delegate); |
| EventClient client = new EventClient(delegate, relay, null, null, 0L); |
| |
| final Semaphore lostCountSema = new Semaphore(0); |
| NotificationListener lostListener = new NotificationListener() { |
| public void handleNotification(Notification notification, Object handback) { |
| if (notification.getType().equals(EventClient.NOTIFS_LOST)) { |
| System.out.println("Got lost-notifs notif: count=" + |
| notification.getUserData()); |
| lostCountSema.release(((Long) notification.getUserData()).intValue()); |
| } else |
| System.out.println("Mysterious EventClient notif: " + notification); |
| } |
| }; |
| client.addEventClientListener(lostListener, null, null); |
| |
| final BlockingQueue<Notification> notifQueue = |
| new ArrayBlockingQueue<Notification>(10); |
| NotificationListener countListener = new NotificationListener() { |
| public void handleNotification(Notification notification, Object handback) { |
| System.out.println("Received: " + notification); |
| notifQueue.add(notification); |
| if (!"tiddly".equals(handback)) { |
| System.err.println("TEST FAILED: bad handback: " + handback); |
| System.exit(1); |
| } |
| } |
| }; |
| |
| final AtomicInteger filterCount = new AtomicInteger(0); |
| NotificationFilter countFilter = new NotificationFilter() { |
| private static final long serialVersionUID = 1234L; |
| |
| public boolean isNotificationEnabled(Notification notification) { |
| System.out.println("Filter called for: " + notification); |
| filterCount.incrementAndGet(); |
| return true; |
| } |
| }; |
| |
| client.addNotificationListener(name, countListener, countFilter, "tiddly"); |
| |
| assertEquals("Initial notif count", 0, notifQueue.size()); |
| assertEquals("Initial filter count", 0, filterCount.get()); |
| |
| Notification n = nextNotif(name); |
| mbean.send(n); |
| |
| System.out.println("Waiting for notification to arrive..."); |
| |
| Notification n1 = notifQueue.poll(10, TimeUnit.SECONDS); |
| |
| assertEquals("Received notif", n, n1); |
| assertEquals("Notif queue size after receive", 0, notifQueue.size()); |
| assertEquals("Filter count after notif", 1, filterCount.get()); |
| assertEquals("Lost notif count", 0, lostCountSema.availablePermits()); |
| |
| System.out.println("Dropping notifs"); |
| |
| UdpEventForwarder.setDrop(true); |
| for (int i = 0; i < 3; i++) |
| mbean.send(nextNotif(name)); |
| UdpEventForwarder.setDrop(false); |
| |
| Thread.sleep(2); |
| assertEquals("Notif queue size after drops", 0, notifQueue.size()); |
| |
| System.out.println("Turning off dropping and sending a notif"); |
| n = nextNotif(name); |
| mbean.send(n); |
| |
| System.out.println("Waiting for dropped notifications to be detected..."); |
| boolean acquired = lostCountSema.tryAcquire(3, 5, TimeUnit.SECONDS); |
| assertEquals("Correct count of lost notifs", true, acquired); |
| |
| n1 = notifQueue.poll(10, TimeUnit.SECONDS); |
| assertEquals("Received non-dropped notif", n, n1); |
| |
| assertEquals("Notif queue size", 0, notifQueue.size()); |
| assertEquals("Filter count after drops", 5, filterCount.get()); |
| |
| Thread.sleep(10); |
| assertEquals("Further lost-notifs", 0, lostCountSema.availablePermits()); |
| |
| client.close(); |
| |
| System.out.println("TEST PASSED"); |
| } |
| |
| private static AtomicLong nextSeqNo = new AtomicLong(0); |
| private static Notification nextNotif(ObjectName name) { |
| long n = nextSeqNo.incrementAndGet(); |
| return new Notification("type", name, n, "" + n); |
| } |
| |
| private static void assertEquals(String what, Object expected, Object got) { |
| if (equals(expected, got)) |
| System.out.println(what + " = " + expected + ", as expected"); |
| else { |
| Map<Thread, StackTraceElement[]> traces = Thread.getAllStackTraces(); |
| for (Thread t : traces.keySet()) { |
| System.out.println(t.getName()); |
| for (StackTraceElement elmt : traces.get(t)) { |
| System.out.println(" " + elmt); |
| } |
| } |
| throw new RuntimeException( |
| "TEST FAILED: " + what + " is " + got + "; should be " + |
| expected); |
| } |
| } |
| |
| private static boolean equals(Object expected, Object got) { |
| if (!(expected instanceof Notification)) |
| return expected.equals(got); |
| if (expected.getClass() != got.getClass()) |
| return false; |
| // Notification doesn't override Object.equals so two distinct |
| // notifs are never equal even if they have the same contents. |
| // Although the test doesn't serialize the notifs, if at some |
| // stage it did then it would fail because the deserialized notif |
| // was not equal to the original one. Therefore we compare enough |
| // notif fields to detect when notifs really are different. |
| Notification en = (Notification) expected; |
| Notification gn = (Notification) got; |
| return (en.getType().equals(gn.getType()) && |
| en.getSource().equals(gn.getSource()) && |
| en.getSequenceNumber() == gn.getSequenceNumber()); |
| } |
| } |