blob: e98ba993c308aa54e75baf4c27d9e02106fb4efe [file] [log] [blame]
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import java.security.AccessControlContext;
import java.security.ProtectionDomain;
/**
* A thread managed by a {@link ForkJoinPool}, which executes
* {@link ForkJoinTask}s.
* This class is subclassable solely for the sake of adding
* functionality -- there are no overridable methods dealing with
* scheduling or execution. However, you can override initialization
* and termination methods surrounding the main task processing loop.
* If you do create such a subclass, you will also need to supply a
* custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
* {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
*
* @since 1.7
* @author Doug Lea
*/
public class ForkJoinWorkerThread extends Thread {
/*
* ForkJoinWorkerThreads are managed by ForkJoinPools and perform
* ForkJoinTasks. For explanation, see the internal documentation
* of class ForkJoinPool.
*
* This class just maintains links to its pool and WorkQueue. The
* pool field is set immediately upon construction, but the
* workQueue field is not set until a call to registerWorker
* completes. This leads to a visibility race, that is tolerated
* by requiring that the workQueue field is only accessed by the
* owning thread.
*
* Support for (non-public) subclass InnocuousForkJoinWorkerThread
* requires that we break quite a lot of encapsulation (via Unsafe)
* both here and in the subclass to access and set Thread fields.
*/
final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
/**
* Creates a ForkJoinWorkerThread operating in the given pool.
*
* @param pool the pool this thread works in
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
/**
* Version for InnocuousForkJoinWorkerThread.
*/
ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
AccessControlContext acc) {
super(threadGroup, null, "aForkJoinWorkerThread");
U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);
eraseThreadLocals(); // clear before registering
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
/**
* Returns the pool hosting this thread.
*
* @return the pool
*/
public ForkJoinPool getPool() {
return pool;
}
/**
* Returns the unique index number of this thread in its pool.
* The returned value ranges from zero to the maximum number of
* threads (minus one) that may exist in the pool, and does not
* change during the lifetime of the thread. This method may be
* useful for applications that track status or collect results
* per-worker-thread rather than per-task.
*
* @return the index number
*/
public int getPoolIndex() {
return workQueue.getPoolIndex();
}
/**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
* invoke {@code super.onStart()} at the beginning of the method.
* Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts
* processing tasks.
*/
protected void onStart() {
}
/**
* Performs cleanup associated with termination of this worker
* thread. If you override this method, you must invoke
* {@code super.onTermination} at the end of the overridden method.
*
* @param exception the exception causing this thread to abort due
* to an unrecoverable error, or {@code null} if completed normally
*/
protected void onTermination(Throwable exception) {
}
/**
* This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute
* {@link ForkJoinTask}s.
*/
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
/**
* Erases ThreadLocals by nulling out Thread maps.
*/
final void eraseThreadLocals() {
U.putObject(this, THREADLOCALS, null);
U.putObject(this, INHERITABLETHREADLOCALS, null);
}
/**
* Non-public hook method for InnocuousForkJoinWorkerThread.
*/
void afterTopLevelExec() {
}
// Set up to allow setting thread fields in constructor
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long THREADLOCALS;
private static final long INHERITABLETHREADLOCALS;
private static final long INHERITEDACCESSCONTROLCONTEXT;
static {
try {
THREADLOCALS = U.objectFieldOffset
(Thread.class.getDeclaredField("threadLocals"));
INHERITABLETHREADLOCALS = U.objectFieldOffset
(Thread.class.getDeclaredField("inheritableThreadLocals"));
INHERITEDACCESSCONTROLCONTEXT = U.objectFieldOffset
(Thread.class.getDeclaredField("inheritedAccessControlContext"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
/**
* A worker thread that has no permissions, is not a member of any
* user-defined ThreadGroup, and erases all ThreadLocals after
* running each top-level task.
*/
static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
/** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
private static final ThreadGroup innocuousThreadGroup =
createThreadGroup();
/** An AccessControlContext supporting no privileges */
private static final AccessControlContext INNOCUOUS_ACC =
new AccessControlContext(
new ProtectionDomain[] {
new ProtectionDomain(null, null)
});
InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
super(pool, innocuousThreadGroup, INNOCUOUS_ACC);
}
@Override // to erase ThreadLocals
void afterTopLevelExec() {
eraseThreadLocals();
}
@Override // to always report system loader
public ClassLoader getContextClassLoader() {
return ClassLoader.getSystemClassLoader();
}
@Override // to silently fail
public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
@Override // paranoically
public void setContextClassLoader(ClassLoader cl) {
throw new SecurityException("setContextClassLoader");
}
/**
* Returns a new group with the system ThreadGroup (the
* topmost, parent-less group) as parent. Uses Unsafe to
* traverse Thread.group and ThreadGroup.parent fields.
*/
private static ThreadGroup createThreadGroup() {
try {
sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
long tg = u.objectFieldOffset
(Thread.class.getDeclaredField("group"));
long gp = u.objectFieldOffset
(ThreadGroup.class.getDeclaredField("parent"));
ThreadGroup group = (ThreadGroup)
u.getObject(Thread.currentThread(), tg);
while (group != null) {
ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
if (parent == null)
return new ThreadGroup(group,
"InnocuousForkJoinWorkerThreadGroup");
group = parent;
}
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
// fall through if null as cannot-happen safeguard
throw new Error("Cannot create ThreadGroup");
}
}
}