| /* |
| * Copyright 2000-2014 JetBrains s.r.o. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.intellij.concurrency; |
| |
| import com.intellij.codeInsight.daemon.impl.DaemonProgressIndicator; |
| import com.intellij.openapi.application.ApplicationManager; |
| import com.intellij.openapi.progress.ProcessCanceledException; |
| import com.intellij.openapi.progress.ProgressIndicator; |
| import com.intellij.openapi.progress.ProgressManager; |
| import com.intellij.openapi.progress.util.AbstractProgressIndicatorBase; |
| import com.intellij.openapi.progress.util.ProgressIndicatorBase; |
| import com.intellij.testFramework.PlatformLangTestCase; |
| import com.intellij.util.Processor; |
| |
| import java.math.BigDecimal; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| public class JobUtilTest extends PlatformLangTestCase { |
| private static final AtomicInteger COUNT = new AtomicInteger(); |
| |
| @Override |
| protected boolean isRunInWriteAction() { |
| return false; |
| } |
| |
| public void testUnbalancedTaskJobUtilPerformance() { |
| List<Integer> things = new ArrayList<Integer>(Collections.<Integer>nCopies(10000, null)); |
| int sum = 0; |
| for (int i = 0; i < things.size(); i++) { |
| int v = i < 9950 ? 1 : 1000; |
| things.set(i, v); |
| sum += things.get(i); |
| } |
| assertEquals(59950, sum); |
| |
| long start = System.currentTimeMillis(); |
| boolean b = JobLauncher.getInstance().invokeConcurrentlyUnderProgress(things, new ProgressIndicatorBase(), false, false, new Processor<Integer>() { |
| @Override |
| public boolean process(Integer o) { |
| busySleep(o); |
| return true; |
| } |
| }); |
| assertTrue(b); |
| long elapsed = System.currentTimeMillis() - start; |
| int expected = 2 * (9950 + 50 * 1000) / JobSchedulerImpl.CORES_COUNT; |
| String message = "Elapsed: " + elapsed + "; expected: " + expected; |
| System.out.println(message); |
| assertTrue(message, elapsed < expected); |
| } |
| |
| private static int busySleep(int ms) { |
| long end = System.currentTimeMillis() + ms; |
| while (System.currentTimeMillis() < end); |
| return COUNT.incrementAndGet(); |
| } |
| |
| public void testJobUtilCorrectlySplitsUpHugeWorkAndFinishes_Performance() throws Exception { |
| COUNT.set(0); |
| int N = 100000; |
| List<String> list = Collections.nCopies(N, null); |
| final AtomicReference<Exception> exception = new AtomicReference<Exception>(); |
| final AtomicBoolean finished = new AtomicBoolean(); |
| |
| JobLauncher.getInstance().invokeConcurrentlyUnderProgress(list, null, false, new Processor<String>() { |
| @Override |
| public boolean process(String name) { |
| try { |
| if (finished.get()) { |
| throw new RuntimeException(); |
| } |
| for (int i = 0; i < 1000; i++) { |
| new BigDecimal(i).multiply(new BigDecimal(1)); |
| } |
| busySleep(1); |
| if (finished.get()) { |
| throw new RuntimeException(); |
| } |
| } |
| catch (Exception e) { |
| exception.set(e); |
| } |
| return true; |
| } |
| }); |
| finished.set(true); |
| Thread.sleep(1000); |
| if (exception.get() != null) throw exception.get(); |
| assertEquals(N, COUNT.get()); |
| } |
| |
| public void testJobUtilProcessesAllItems_Performance() throws Exception { |
| List<String> list = Collections.nCopies(10000, null); |
| final AtomicReference<Exception> exception = new AtomicReference<Exception>(); |
| for (int i=0; i<10; i++) { |
| long start = System.currentTimeMillis(); |
| COUNT.set(0); |
| JobLauncher.getInstance().invokeConcurrentlyUnderProgress(list, null, false, new Processor<String>() { |
| @Override |
| public boolean process(String name) { |
| busySleep(1); |
| return true; |
| } |
| }); |
| if (exception.get() != null) throw exception.get(); |
| long finish = System.currentTimeMillis(); |
| System.out.println("Elapsed: "+(finish-start)+"ms"); |
| assertEquals(list.size(), COUNT.get()); |
| } |
| } |
| |
| public void testJobUtilRecursive_Performance() throws Exception { |
| final List<String> list = Collections.nCopies(100, null); |
| for (int i=0; i<10; i++) { |
| COUNT.set(0); |
| long start = System.currentTimeMillis(); |
| JobLauncher.getInstance().invokeConcurrentlyUnderProgress(list, null, false, new Processor<String>() { |
| @Override |
| public boolean process(String name) { |
| JobLauncher.getInstance().invokeConcurrentlyUnderProgress(list, null, false, new Processor<String>() { |
| @Override |
| public boolean process(String name) { |
| busySleep(1); |
| return true; |
| } |
| }); |
| return true; |
| } |
| }); |
| long finish = System.currentTimeMillis(); |
| System.out.println("Elapsed: "+(finish-start)+"ms"); |
| assertEquals(list.size()*list.size(), COUNT.get()); |
| } |
| } |
| |
| public void testCorrectProgressAndReadAction() throws Throwable { |
| checkProgressAndReadAction(Collections.singletonList(null), new DaemonProgressIndicator(), true); |
| checkProgressAndReadAction(Collections.singletonList(null), new DaemonProgressIndicator(), false); |
| checkProgressAndReadAction(Collections.emptyList(), new DaemonProgressIndicator(), true); |
| checkProgressAndReadAction(Collections.emptyList(), new DaemonProgressIndicator(), false); |
| checkProgressAndReadAction(Arrays.asList(new Object(), new Object()), new DaemonProgressIndicator(), true); |
| checkProgressAndReadAction(Arrays.asList(new Object(), new Object()), new DaemonProgressIndicator(), false); |
| checkProgressAndReadAction(Arrays.asList(new Object(), new Object()), null, false); |
| } |
| |
| private static void checkProgressAndReadAction(final List<Object> objects, |
| final DaemonProgressIndicator progress, |
| final boolean runInReadAction) throws Throwable { |
| final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); |
| JobLauncher.getInstance().invokeConcurrentlyUnderProgress(objects, progress, runInReadAction, new Processor<Object>() { |
| @Override |
| public boolean process(Object o) { |
| try { |
| if (objects.size() <= 1 || JobSchedulerImpl.CORES_COUNT <= JobLauncherImpl.CORES_FORK_THRESHOLD) { |
| assertTrue(ApplicationManager.getApplication().isDispatchThread()); |
| } |
| else { |
| // generally we know nothing about current thread since FJP can help others task to execute while in current context |
| } |
| ProgressIndicator actualIndicator = ProgressManager.getInstance().getProgressIndicator(); |
| if (progress == null) { |
| assertNotNull(actualIndicator); |
| assertTrue(actualIndicator instanceof AbstractProgressIndicatorBase); |
| } |
| else { |
| assertTrue(actualIndicator instanceof SensitiveProgressWrapper); |
| ProgressIndicator original = ((SensitiveProgressWrapper)actualIndicator).getOriginalProgressIndicator(); |
| assertSame(progress, original); |
| } |
| // there can be read access even if we didn't ask for it (e.g. when task under read action steals others work) |
| assertTrue(!runInReadAction || ApplicationManager.getApplication().isReadAccessAllowed()); |
| } |
| catch (Throwable e) { |
| exception.set(e); |
| } |
| return true; |
| } |
| }); |
| if (exception.get() != null) throw exception.get(); |
| } |
| |
| public void testExceptionalCompletion() throws Throwable { |
| final List<Object> objects = Collections.nCopies(100000000, null); |
| COUNT.set(0); |
| try { |
| JobLauncher.getInstance().invokeConcurrentlyUnderProgress(objects, null, true, new Processor<Object>() { |
| @Override |
| public boolean process(Object o) { |
| if (COUNT.incrementAndGet() == 100000) { |
| System.out.println("PCE"); |
| throw new ProcessCanceledException(); |
| } |
| return true; |
| } |
| }); |
| fail("PCE must have been thrown"); |
| } |
| catch (ProcessCanceledException e) { |
| // caught OK |
| } |
| } |
| public void testNotNormalCompletion() throws Throwable { |
| final List<Object> objects = Collections.nCopies(100000000, null); |
| COUNT.set(0); |
| try { |
| boolean success = JobLauncher.getInstance().invokeConcurrentlyUnderProgress(objects, null, true, new Processor<Object>() { |
| @Override |
| public boolean process(Object o) { |
| if (COUNT.incrementAndGet() == 100000) { |
| System.out.println("PCE"); |
| return false; |
| } |
| return true; |
| } |
| }); |
| assertFalse(success); |
| } |
| catch (ProcessCanceledException e) { |
| } |
| } |
| |
| public void testJobUtilCompletesEvenIfCannotGrabReadAction() throws Throwable { |
| final List<Object> objects = Collections.nCopies(1000000, null); |
| COUNT.set(0); |
| ApplicationManager.getApplication().runWriteAction(new Runnable() { |
| @Override |
| public void run() { |
| boolean success = JobLauncher.getInstance().invokeConcurrentlyUnderProgress(objects, null, true, false, new Processor<Object>() { |
| @Override |
| public boolean process(Object o) { |
| COUNT.incrementAndGet(); |
| return true; |
| } |
| }); |
| assertTrue(success); |
| assertEquals(objects.size(), COUNT.get()); |
| } |
| }); |
| } |
| |
| public void testJobUtilRecursiveCancel() throws Exception { |
| final List<String> list = Collections.nCopies(100, ""); |
| final List<Integer> ilist = Collections.nCopies(100, 0); |
| for (int i=0; i<10; i++) { |
| COUNT.set(0); |
| long start = System.currentTimeMillis(); |
| boolean success = false; |
| try { |
| success = JobLauncher.getInstance().invokeConcurrentlyUnderProgress(list, null, false, new Processor<String>() { |
| @Override |
| public boolean process(String name) { |
| boolean nestedSuccess = JobLauncher.getInstance().invokeConcurrentlyUnderProgress(ilist, null, false, new Processor<Integer>() { |
| @Override |
| public boolean process(Integer integer) { |
| if (busySleep(1) == 1000) { |
| System.out.println("PCE"); |
| throw new RuntimeException("xxx"); |
| } |
| return true; |
| } |
| }); |
| //System.out.println("nestedSuccess = " + nestedSuccess); |
| return true; |
| } |
| }); |
| } |
| catch (ProcessCanceledException e) { |
| // OK |
| } |
| catch (RuntimeException e) { |
| assertEquals("xxx", e.getMessage()); |
| } |
| long finish = System.currentTimeMillis(); |
| System.out.println("Elapsed: "+(finish-start)+"ms"); |
| //assertEquals(list.size()*list.size(), COUNT.get()); |
| assertFalse(success); |
| } |
| } |
| |
| public void testSaturation() throws InterruptedException { |
| final CountDownLatch latch = new CountDownLatch(1); |
| for (int i=0; i<100; i++) { |
| JobLauncher.getInstance().submitToJobThread(0, new Runnable() { |
| @Override |
| public void run() { |
| try { |
| latch.await(); |
| } |
| catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| }); |
| } |
| JobLauncher.getInstance().submitToJobThread(0, new Runnable() { |
| @Override |
| public void run() { |
| latch.countDown(); |
| } |
| }); |
| |
| try { |
| boolean scheduled = latch.await(3, TimeUnit.SECONDS); |
| assertFalse(scheduled); // pool saturated, no thread can be scheduled |
| } |
| finally { |
| latch.countDown(); |
| } |
| } |
| } |