| /* |
| * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. Oracle designates this |
| * particular file as subject to the "Classpath" exception as provided |
| * by Oracle in the LICENSE file that accompanied this code. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
| * or visit www.oracle.com if you need additional information or have any |
| * questions. |
| */ |
| |
| package jdk.jfr.internal.consumer; |
| |
| import java.io.IOException; |
| import java.security.AccessControlContext; |
| import java.security.AccessController; |
| import java.security.PrivilegedAction; |
| import java.time.Duration; |
| import java.time.Instant; |
| import java.util.Objects; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.Consumer; |
| |
| import jdk.jfr.consumer.EventStream; |
| import jdk.jfr.consumer.RecordedEvent; |
| import jdk.jfr.internal.LogLevel; |
| import jdk.jfr.internal.LogTag; |
| import jdk.jfr.internal.Logger; |
| import jdk.jfr.internal.SecuritySupport; |
| |
| /* |
| * Purpose of this class is to simplify the implementation of |
| * an event stream. |
| */ |
| abstract class AbstractEventStream implements EventStream { |
| private final static AtomicLong counter = new AtomicLong(1); |
| |
| private final Object terminated = new Object(); |
| private final boolean active; |
| private final Runnable flushOperation = () -> dispatcher().runFlushActions(); |
| private final AccessControlContext accessControllerContext; |
| private final StreamConfiguration configuration = new StreamConfiguration(); |
| |
| private volatile Thread thread; |
| private Dispatcher dispatcher; |
| |
| private volatile boolean closed; |
| |
| AbstractEventStream(AccessControlContext acc, boolean active) throws IOException { |
| this.accessControllerContext = Objects.requireNonNull(acc); |
| this.active = active; |
| } |
| |
| @Override |
| abstract public void start(); |
| |
| @Override |
| abstract public void startAsync(); |
| |
| @Override |
| abstract public void close(); |
| |
| protected final Dispatcher dispatcher() { |
| if (configuration.hasChanged()) { |
| synchronized (configuration) { |
| dispatcher = new Dispatcher(configuration); |
| } |
| } |
| return dispatcher; |
| } |
| |
| @Override |
| public final void setOrdered(boolean ordered) { |
| configuration.setOrdered(ordered); |
| } |
| |
| @Override |
| public final void setReuse(boolean reuse) { |
| configuration.setReuse(reuse); |
| } |
| |
| @Override |
| public final void setStartTime(Instant startTime) { |
| Objects.nonNull(startTime); |
| synchronized (configuration) { |
| if (configuration.started) { |
| throw new IllegalStateException("Stream is already started"); |
| } |
| if (startTime.isBefore(Instant.EPOCH)) { |
| startTime = Instant.EPOCH; |
| } |
| configuration.setStartTime(startTime); |
| } |
| } |
| |
| @Override |
| public final void setEndTime(Instant endTime) { |
| Objects.requireNonNull(endTime); |
| synchronized (configuration) { |
| if (configuration.started) { |
| throw new IllegalStateException("Stream is already started"); |
| } |
| configuration.setEndTime(endTime); |
| } |
| } |
| |
| @Override |
| public final void onEvent(Consumer<RecordedEvent> action) { |
| Objects.requireNonNull(action); |
| configuration.addEventAction(action); |
| } |
| |
| @Override |
| public final void onEvent(String eventName, Consumer<RecordedEvent> action) { |
| Objects.requireNonNull(eventName); |
| Objects.requireNonNull(action); |
| configuration.addEventAction(eventName, action); |
| } |
| |
| @Override |
| public final void onFlush(Runnable action) { |
| Objects.requireNonNull(action); |
| configuration.addFlushAction(action); |
| } |
| |
| @Override |
| public final void onClose(Runnable action) { |
| Objects.requireNonNull(action); |
| configuration.addCloseAction(action); |
| } |
| |
| @Override |
| public final void onError(Consumer<Throwable> action) { |
| Objects.requireNonNull(action); |
| configuration.addErrorAction(action); |
| } |
| |
| @Override |
| public final boolean remove(Object action) { |
| Objects.requireNonNull(action); |
| return configuration.remove(action); |
| } |
| |
| @Override |
| public final void awaitTermination() throws InterruptedException { |
| awaitTermination(Duration.ofMillis(0)); |
| } |
| |
| @Override |
| public final void awaitTermination(Duration timeout) throws InterruptedException { |
| Objects.requireNonNull(timeout); |
| if (timeout.isNegative()) { |
| throw new IllegalArgumentException("timeout value is negative"); |
| } |
| |
| long base = System.currentTimeMillis(); |
| long now = 0; |
| |
| long millis; |
| try { |
| millis = Math.multiplyExact(timeout.getSeconds(), 1000); |
| } catch (ArithmeticException a) { |
| millis = Long.MAX_VALUE; |
| } |
| int nanos = timeout.toNanosPart(); |
| if (nanos == 0 && millis == 0) { |
| synchronized (terminated) { |
| while (!isClosed()) { |
| terminated.wait(0); |
| } |
| } |
| } else { |
| while (!isClosed()) { |
| long delay = millis - now; |
| if (delay <= 0) { |
| break; |
| } |
| synchronized (terminated) { |
| terminated.wait(delay, nanos); |
| } |
| now = System.currentTimeMillis() - base; |
| } |
| } |
| } |
| |
| protected abstract void process() throws IOException; |
| |
| protected final void setClosed(boolean closed) { |
| this.closed = closed; |
| } |
| |
| protected final boolean isClosed() { |
| return closed; |
| } |
| |
| public final void startAsync(long startNanos) { |
| startInternal(startNanos); |
| Runnable r = () -> run(accessControllerContext); |
| thread = SecuritySupport.createThreadWitNoPermissions(nextThreadName(), r); |
| thread.start(); |
| } |
| |
| public final void start(long startNanos) { |
| startInternal(startNanos); |
| thread = Thread.currentThread(); |
| run(accessControllerContext); |
| } |
| |
| protected final Runnable getFlushOperation() { |
| return flushOperation; |
| } |
| |
| private void startInternal(long startNanos) { |
| synchronized (configuration) { |
| if (configuration.started) { |
| throw new IllegalStateException("Event stream can only be started once"); |
| } |
| if (active && configuration.startTime == null) { |
| configuration.setStartNanos(startNanos); |
| } |
| configuration.setStarted(true); |
| } |
| } |
| |
| private void execute() { |
| try { |
| process(); |
| } catch (IOException ioe) { |
| // This can happen if a chunk file is removed, or |
| // a file is access that has been closed |
| // This is "normal" behavior for streaming and the |
| // stream will be closed when this happens. |
| } finally { |
| Logger.log(LogTag.JFR_SYSTEM_STREAMING, LogLevel.DEBUG, "Execution of stream ended."); |
| try { |
| close(); |
| } finally { |
| synchronized (terminated) { |
| terminated.notifyAll(); |
| } |
| } |
| } |
| } |
| |
| private void run(AccessControlContext accessControlContext) { |
| AccessController.doPrivileged(new PrivilegedAction<Void>() { |
| @Override |
| public Void run() { |
| execute(); |
| return null; |
| } |
| }, accessControlContext); |
| } |
| |
| private String nextThreadName() { |
| counter.incrementAndGet(); |
| return "JFR Event Stream " + counter; |
| } |
| } |