From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: Martijn Muijsers Date: Mon, 30 Jan 2023 22:34:48 +0100 Subject: [PATCH] BaseThread PrioritisedQueueExecutorThread agent utility License: GPL-3.0 (https://www.gnu.org/licenses/gpl-3.0.html) Gale - https://galemc.org diff --git a/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedQueueExecutorThread.java b/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedQueueExecutorThread.java index 91fe0f7049122f62f05ba09c24cba5d758340cff..a26735a795052d6770498561826e8c5c1bf9fa79 100644 --- a/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedQueueExecutorThread.java +++ b/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedQueueExecutorThread.java @@ -2,6 +2,7 @@ package ca.spottedleaf.concurrentutil.executor.standard; import ca.spottedleaf.concurrentutil.util.ConcurrentUtil; import com.mojang.logging.LogUtils; +import org.galemc.gale.executor.chunksystem.AbstractPrioritisedQueueExecutorThreadBase; import org.slf4j.Logger; import java.lang.invoke.VarHandle; import java.util.concurrent.locks.LockSupport; @@ -15,7 +16,7 @@ import java.util.concurrent.locks.LockSupport; * methods. *

*/ -public class PrioritisedQueueExecutorThread extends Thread implements PrioritisedExecutor { +public class PrioritisedQueueExecutorThread extends Thread implements AbstractPrioritisedQueueExecutorThreadBase { private static final Logger LOGGER = LogUtils.getLogger(); @@ -99,28 +100,6 @@ public class PrioritisedQueueExecutorThread extends Thread implements Prioritise } } - protected boolean pollTasks() { - boolean ret = false; - - for (;;) { - if (this.halted) { - break; - } - try { - if (!this.queue.executeTask()) { - break; - } - ret = true; - } catch (final ThreadDeath death) { - throw death; // goodbye world... - } catch (final Throwable throwable) { - LOGGER.error("Exception thrown from prioritized runnable task in thread '" + this.getName() + "'", throwable); - } - } - - return ret; - } - protected boolean handleClose() { if (this.threadShutdown) { this.pollTasks(); // this ensures we've emptied the queue @@ -141,12 +120,18 @@ public class PrioritisedQueueExecutorThread extends Thread implements Prioritise return false; } - @Override - public PrioritisedTask createTask(final Runnable task, final Priority priority) { - final PrioritisedExecutor.PrioritisedTask queueTask = this.queue.createTask(task, priority); + // Gale start - base thread pool - chunk worker task queue + public static class PrioritisedQueueExecutorThreadTask implements PrioritisedTask { + + private final PrioritisedExecutor.PrioritisedTask queueTask; + private final AbstractPrioritisedQueueExecutorThreadBase owningThreadBase; + + public PrioritisedQueueExecutorThreadTask(PrioritisedExecutor.PrioritisedTask queueTask, AbstractPrioritisedQueueExecutorThreadBase owningThreadBase) { + this.queueTask = queueTask; + this.owningThreadBase = owningThreadBase; + } + // Gale end - base thread pool - chunk worker task queue - // need to override queue() to notify us of tasks - return new PrioritisedTask() { @Override public Priority getPriority() { return queueTask.getPriority(); @@ -171,7 +156,7 @@ public class PrioritisedQueueExecutorThread extends Thread implements Prioritise public boolean queue() { final boolean ret = queueTask.queue(); if (ret) { - PrioritisedQueueExecutorThread.this.notifyTasks(); + this.owningThreadBase.notifyOfTasks(); // Gale - base thread pool - chunk worker task queue } return ret; } @@ -185,52 +170,7 @@ public class PrioritisedQueueExecutorThread extends Thread implements Prioritise public boolean execute() { return queueTask.execute(); } - }; - } - - @Override - public PrioritisedExecutor.PrioritisedTask queueRunnable(final Runnable task, final PrioritisedExecutor.Priority priority) { - final PrioritisedExecutor.PrioritisedTask ret = this.queue.queueRunnable(task, priority); - - this.notifyTasks(); - - return ret; - } - @Override - public boolean haveAllTasksExecuted() { - return this.queue.haveAllTasksExecuted(); - } - - @Override - public long getTotalTasksExecuted() { - return this.queue.getTotalTasksExecuted(); - } - - @Override - public long getTotalTasksScheduled() { - return this.queue.getTotalTasksScheduled(); - } - - /** - * {@inheritDoc} - * @throws IllegalStateException If the current thread is {@code this} thread, or the underlying queue throws this exception. - */ - @Override - public void waitUntilAllExecuted() throws IllegalStateException { - if (Thread.currentThread() == this) { - throw new IllegalStateException("Cannot block on our own queue"); - } - this.queue.waitUntilAllExecuted(); - } - - /** - * {@inheritDoc} - * @throws IllegalStateException Always - */ - @Override - public boolean executeTask() throws IllegalStateException { - throw new IllegalStateException(); } /** @@ -294,4 +234,27 @@ public class PrioritisedQueueExecutorThread extends Thread implements Prioritise protected final void setThreadParkedVolatile(final boolean value) { THREAD_PARKED_HANDLE.setVolatile(this, value); } + + // Gale start + @Override + public void notifyOfTasks() { + this.notifyTasks(); + } + + @Override + public PrioritisedExecutor getQueue() { + return this.queue; + } + + @Override + public boolean isHalted() { + return this.halted; + } + + @Override + public Logger getLogger() { + return LOGGER; + } + // Gale end + } diff --git a/src/main/java/org/galemc/gale/executor/chunksystem/AbstractPrioritisedQueueExecutorThreadBase.java b/src/main/java/org/galemc/gale/executor/chunksystem/AbstractPrioritisedQueueExecutorThreadBase.java new file mode 100644 index 0000000000000000000000000000000000000000..37048a100e48f0d103ee8454c7c8c58b25f04c98 --- /dev/null +++ b/src/main/java/org/galemc/gale/executor/chunksystem/AbstractPrioritisedQueueExecutorThreadBase.java @@ -0,0 +1,98 @@ +// Gale - base thread pool - chunk worker task queue + +package org.galemc.gale.executor.chunksystem; + +import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedExecutor; +import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedQueueExecutorThread; +import org.slf4j.Logger; + +/** + * A common abstract base for {@link PrioritisedQueueExecutorThread} and + * {@link PrioritisedQueueExecutorThreadAgent}, based on {@link PrioritisedQueueExecutorThread}. + */ +public interface AbstractPrioritisedQueueExecutorThreadBase extends PrioritisedExecutor { + + void notifyOfTasks(); + + PrioritisedExecutor getQueue(); + + boolean isHalted(); + + Logger getLogger(); + + String getName(); + + default boolean pollTasks() { + boolean ret = false; + + var queue = this.getQueue(); + for (;;) { + if (this.isHalted()) { + break; + } + try { + if (!queue.executeTask()) { + break; + } + ret = true; + } catch (final ThreadDeath death) { + throw death; // goodbye world... + } catch (final Throwable throwable) { + this.getLogger().error("Exception thrown from prioritized runnable task in thread '" + this.getName() + "'", throwable); + } + } + + return ret; + } + + @Override + default PrioritisedTask createTask(final Runnable task, final Priority priority) { + final PrioritisedExecutor.PrioritisedTask queueTask = this.getQueue().createTask(task, priority); + + // need to override queue() to notify us of tasks + return new PrioritisedQueueExecutorThread.PrioritisedQueueExecutorThreadTask(queueTask, this); + } + + @Override + default PrioritisedTask queueRunnable(final Runnable task, final Priority priority) { + final PrioritisedTask ret = this.getQueue().queueRunnable(task, priority); + + this.notifyOfTasks(); + + return ret; + } + + @Override + default boolean haveAllTasksExecuted() { + return this.getQueue().haveAllTasksExecuted(); + } + + @Override + default long getTotalTasksExecuted() { + return this.getQueue().getTotalTasksExecuted(); + } + + @Override + default long getTotalTasksScheduled() { + return this.getQueue().getTotalTasksScheduled(); + } + + /** + * {@inheritDoc} + * @throws IllegalStateException If the current thread is {@code this} thread, or the underlying queue throws this exception. + */ + @Override + default void waitUntilAllExecuted() throws IllegalStateException { + this.getQueue().waitUntilAllExecuted(); + } + + /** + * {@inheritDoc} + * @throws IllegalStateException Always + */ + @Override + default boolean executeTask() throws IllegalStateException { + throw new IllegalStateException(); + } + +} diff --git a/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java b/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java new file mode 100644 index 0000000000000000000000000000000000000000..abdec5529763b77126494ae0c2be9b48de900bc1 --- /dev/null +++ b/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java @@ -0,0 +1,55 @@ +// Gale - base thread pool - chunk worker task queue + +package org.galemc.gale.executor.chunksystem; + +import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedExecutor; +import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedQueueExecutorThread; +import com.mojang.logging.LogUtils; +import org.galemc.gale.executor.queue.BaseTaskQueues; +import org.galemc.gale.executor.thread.BaseThread; +import org.slf4j.Logger; + + +/** + * This class is a copy of {@link PrioritisedQueueExecutorThread}, with the notable difference + * that it does not extend {@link Thread}, but may be instantiated on its own, as an agent representing + * a {@link BaseThread} to the {@link PrioritisedExecutor}. + */ +public abstract class PrioritisedQueueExecutorThreadAgent implements AbstractPrioritisedQueueExecutorThreadBase { + + private static final Logger LOGGER = LogUtils.getLogger(); + + protected final PrioritisedExecutor queue; + protected final BaseThread baseThread; + + public PrioritisedQueueExecutorThreadAgent(final PrioritisedExecutor queue, final BaseThread baseThread) { + this.queue = queue; + this.baseThread = baseThread; + } + + @Override + public void notifyOfTasks() { + BaseTaskQueues.chunkWorker.newTaskWasAdded(); + } + + @Override + public PrioritisedExecutor getQueue() { + return this.queue; + } + + @Override + public boolean isHalted() { + return false; + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public String getName() { + return this.baseThread.getName(); + } + +}