summaryrefslogtreecommitdiff
path: root/src/de/lmu/ifi/dbs/elki/parallel/ParallelCore.java
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2019-03-09 22:30:40 +0000
committerAndrej Shadura <andrewsh@debian.org>2019-03-09 22:30:40 +0000
commit337087b668d3a54f3afee3a9adb597a32e9f7e94 (patch)
treed860094269622472f8079d497ac7af02dbb4e038 /src/de/lmu/ifi/dbs/elki/parallel/ParallelCore.java
parent14a486343aef55f97f54082d6b542dedebf6f3ba (diff)
Import Upstream version 0.6.5~20141030
Diffstat (limited to 'src/de/lmu/ifi/dbs/elki/parallel/ParallelCore.java')
-rw-r--r--src/de/lmu/ifi/dbs/elki/parallel/ParallelCore.java134
1 files changed, 134 insertions, 0 deletions
diff --git a/src/de/lmu/ifi/dbs/elki/parallel/ParallelCore.java b/src/de/lmu/ifi/dbs/elki/parallel/ParallelCore.java
new file mode 100644
index 00000000..98ca2ec8
--- /dev/null
+++ b/src/de/lmu/ifi/dbs/elki/parallel/ParallelCore.java
@@ -0,0 +1,134 @@
+package de.lmu.ifi.dbs.elki.parallel;
+
+/*
+ This file is part of ELKI:
+ Environment for Developing KDD-Applications Supported by Index-Structures
+
+ Copyright (C) 2014
+ Ludwig-Maximilians-Universität München
+ Lehr- und Forschungseinheit für Datenbanksysteme
+ ELKI Development Team
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Core for parallel processing in ELKI, based on {@link ThreadPoolExecutor}.
+ *
+ * TODO: make configurable how many threads are used.
+ *
+ * @author Erich Schubert
+ */
+public class ParallelCore {
+ /**
+ * The number of CPUs to use.
+ */
+ public static final int ALL_PROCESSORS = Runtime.getRuntime().availableProcessors();
+
+ /**
+ * Static core
+ */
+ private static final ParallelCore STATIC = new ParallelCore(ALL_PROCESSORS);
+
+ /**
+ * Executor service.
+ */
+ ThreadPoolExecutor executor;
+
+ /**
+ * Number of connected submitters.
+ */
+ private AtomicInteger connected = new AtomicInteger(0);
+
+ /**
+ * Maximum number of processors to use.
+ */
+ private int processors;
+
+ /**
+ * Constructor.
+ */
+ protected ParallelCore(int processors) {
+ super();
+ this.processors = processors;
+ }
+
+ /**
+ * Get the static core object.
+ *
+ * @return Core
+ */
+ public static ParallelCore getCore() {
+ return STATIC;
+ }
+
+ /**
+ * Get desired level of parallelism
+ *
+ * @return Number of threads to run in parallel
+ */
+ public int getParallelism() {
+ return executor.getMaximumPoolSize();
+ }
+
+ /**
+ * Submit a task to the executor core.
+ *
+ * @param task Submitted task
+ *
+ * @return Future to observe completion
+ */
+ public <T> Future<T> submit(Callable<T> task) {
+ return executor.submit(task);
+ }
+
+ /**
+ * Connect to the executor.
+ */
+ public void connect() {
+ if(executor == null) {
+ synchronized(this) {
+ if(executor == null) {
+ executor = new ThreadPoolExecutor(0, processors, 10L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+ executor.allowCoreThreadTimeOut(true);
+ }
+ }
+ }
+ int c = this.connected.incrementAndGet();
+ if(c == 1) {
+ executor.allowCoreThreadTimeOut(false);
+ executor.setCorePoolSize(executor.getMaximumPoolSize());
+ }
+ }
+
+ /**
+ * Disconnect to the executor.
+ */
+ public void disconnect() {
+ int c = this.connected.decrementAndGet();
+ if(c == 0) {
+ synchronized(this) {
+ executor.allowCoreThreadTimeOut(true);
+ executor.setCorePoolSize(0);
+ }
+ }
+ }
+}