From 6e52366b2822acbc86e3767c4e3a35480bd129e1 Mon Sep 17 00:00:00 2001 From: Martijn Muijsers Date: Tue, 31 Jan 2023 01:44:19 +0100 Subject: [PATCH] Separate PrioritisedQueueExecutorThread and agent common code --- ...itisedQueueExecutorThread-agent-util.patch | 375 +++++++++++++----- 1 file changed, 280 insertions(+), 95 deletions(-) diff --git a/patches/server/0154-BaseThread-PrioritisedQueueExecutorThread-agent-util.patch b/patches/server/0154-BaseThread-PrioritisedQueueExecutorThread-agent-util.patch index d94a3fd..e1112cd 100644 --- a/patches/server/0154-BaseThread-PrioritisedQueueExecutorThread-agent-util.patch +++ b/patches/server/0154-BaseThread-PrioritisedQueueExecutorThread-agent-util.patch @@ -6,12 +6,280 @@ Subject: [PATCH] BaseThread PrioritisedQueueExecutorThread agent utility License: GPL-3.0 (https://www.gnu.org/licenses/gpl-3.0.html) Gale - https://galemc.org +diff --git a/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedQueueExecutorThread.java b/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedQueueExecutorThread.java +index 91fe0f7049122f62f05ba09c24cba5d758340cff..38da123e465130445ba094054bc5e9dc44d9c20d 100644 +--- a/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedQueueExecutorThread.java ++++ b/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedQueueExecutorThread.java +@@ -2,6 +2,7 @@ package ca.spottedleaf.concurrentutil.executor.standard; + + import ca.spottedleaf.concurrentutil.util.ConcurrentUtil; + import com.mojang.logging.LogUtils; ++import org.galemc.gale.executor.chunksystem.AbstractPrioritisedQueueExecutorThreadBase; + import org.slf4j.Logger; + import java.lang.invoke.VarHandle; + import java.util.concurrent.locks.LockSupport; +@@ -15,7 +16,7 @@ import java.util.concurrent.locks.LockSupport; + * methods. + *

+ */ +-public class PrioritisedQueueExecutorThread extends Thread implements PrioritisedExecutor { ++public class PrioritisedQueueExecutorThread extends Thread implements AbstractPrioritisedQueueExecutorThreadBase { + + private static final Logger LOGGER = LogUtils.getLogger(); + +@@ -99,28 +100,6 @@ public class PrioritisedQueueExecutorThread extends Thread implements Prioritise + } + } + +- protected boolean pollTasks() { +- boolean ret = false; +- +- for (;;) { +- if (this.halted) { +- break; +- } +- try { +- if (!this.queue.executeTask()) { +- break; +- } +- ret = true; +- } catch (final ThreadDeath death) { +- throw death; // goodbye world... +- } catch (final Throwable throwable) { +- LOGGER.error("Exception thrown from prioritized runnable task in thread '" + this.getName() + "'", throwable); +- } +- } +- +- return ret; +- } +- + protected boolean handleClose() { + if (this.threadShutdown) { + this.pollTasks(); // this ensures we've emptied the queue +@@ -141,12 +120,18 @@ public class PrioritisedQueueExecutorThread extends Thread implements Prioritise + return false; + } + +- @Override +- public PrioritisedTask createTask(final Runnable task, final Priority priority) { +- final PrioritisedExecutor.PrioritisedTask queueTask = this.queue.createTask(task, priority); ++ // Gale start - base thread pool - chunk worker task queue ++ public static class PrioritisedQueueExecutorThreadTask implements PrioritisedTask { ++ ++ private final PrioritisedExecutor.PrioritisedTask queueTask; ++ private final AbstractPrioritisedQueueExecutorThreadBase owningThreadBase; ++ ++ public PrioritisedQueueExecutorThreadTask(PrioritisedExecutor.PrioritisedTask queueTask, AbstractPrioritisedQueueExecutorThreadBase owningThreadBase) { ++ this.queueTask = queueTask; ++ this.owningThreadBase = owningThreadBase; ++ } ++ // Gale end - base thread pool - chunk worker task queue + +- // need to override queue() to notify us of tasks +- return new PrioritisedTask() { + @Override + public Priority getPriority() { + return queueTask.getPriority(); +@@ -171,7 +156,7 @@ public class PrioritisedQueueExecutorThread extends Thread implements Prioritise + public boolean queue() { + final boolean ret = queueTask.queue(); + if (ret) { +- PrioritisedQueueExecutorThread.this.notifyTasks(); ++ this.owningThreadBase.notifyOfTasks(); // Gale - base thread pool - chunk worker task queue + } + return ret; + } +@@ -185,52 +170,7 @@ public class PrioritisedQueueExecutorThread extends Thread implements Prioritise + public boolean execute() { + return queueTask.execute(); + } +- }; +- } +- +- @Override +- public PrioritisedExecutor.PrioritisedTask queueRunnable(final Runnable task, final PrioritisedExecutor.Priority priority) { +- final PrioritisedExecutor.PrioritisedTask ret = this.queue.queueRunnable(task, priority); +- +- this.notifyTasks(); +- +- return ret; +- } + +- @Override +- public boolean haveAllTasksExecuted() { +- return this.queue.haveAllTasksExecuted(); +- } +- +- @Override +- public long getTotalTasksExecuted() { +- return this.queue.getTotalTasksExecuted(); +- } +- +- @Override +- public long getTotalTasksScheduled() { +- return this.queue.getTotalTasksScheduled(); +- } +- +- /** +- * {@inheritDoc} +- * @throws IllegalStateException If the current thread is {@code this} thread, or the underlying queue throws this exception. +- */ +- @Override +- public void waitUntilAllExecuted() throws IllegalStateException { +- if (Thread.currentThread() == this) { +- throw new IllegalStateException("Cannot block on our own queue"); +- } +- this.queue.waitUntilAllExecuted(); +- } +- +- /** +- * {@inheritDoc} +- * @throws IllegalStateException Always +- */ +- @Override +- public boolean executeTask() throws IllegalStateException { +- throw new IllegalStateException(); + } + + /** +@@ -294,4 +234,27 @@ public class PrioritisedQueueExecutorThread extends Thread implements Prioritise + protected final void setThreadParkedVolatile(final boolean value) { + THREAD_PARKED_HANDLE.setVolatile(this, value); + } ++ ++ // Gale start ++ @Override ++ public void notifyOfTasks() { ++ this.notifyTasks(); ++ } ++ ++ @Override ++ public PrioritisedExecutor getQueue() { ++ return this.queue; ++ } ++ ++ @Override ++ public boolean isHalted() { ++ return this.isHalted(); ++ } ++ ++ @Override ++ public Logger getLogger() { ++ return LOGGER; ++ } ++ // Gale end ++ + } +diff --git a/src/main/java/org/galemc/gale/executor/chunksystem/AbstractPrioritisedQueueExecutorThreadBase.java b/src/main/java/org/galemc/gale/executor/chunksystem/AbstractPrioritisedQueueExecutorThreadBase.java +new file mode 100644 +index 0000000000000000000000000000000000000000..37048a100e48f0d103ee8454c7c8c58b25f04c98 +--- /dev/null ++++ b/src/main/java/org/galemc/gale/executor/chunksystem/AbstractPrioritisedQueueExecutorThreadBase.java +@@ -0,0 +1,98 @@ ++// Gale - base thread pool - chunk worker task queue ++ ++package org.galemc.gale.executor.chunksystem; ++ ++import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedExecutor; ++import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedQueueExecutorThread; ++import org.slf4j.Logger; ++ ++/** ++ * A common abstract base for {@link PrioritisedQueueExecutorThread} and ++ * {@link PrioritisedQueueExecutorThreadAgent}, based on {@link PrioritisedQueueExecutorThread}. ++ */ ++public interface AbstractPrioritisedQueueExecutorThreadBase extends PrioritisedExecutor { ++ ++ void notifyOfTasks(); ++ ++ PrioritisedExecutor getQueue(); ++ ++ boolean isHalted(); ++ ++ Logger getLogger(); ++ ++ String getName(); ++ ++ default boolean pollTasks() { ++ boolean ret = false; ++ ++ var queue = this.getQueue(); ++ for (;;) { ++ if (this.isHalted()) { ++ break; ++ } ++ try { ++ if (!queue.executeTask()) { ++ break; ++ } ++ ret = true; ++ } catch (final ThreadDeath death) { ++ throw death; // goodbye world... ++ } catch (final Throwable throwable) { ++ this.getLogger().error("Exception thrown from prioritized runnable task in thread '" + this.getName() + "'", throwable); ++ } ++ } ++ ++ return ret; ++ } ++ ++ @Override ++ default PrioritisedTask createTask(final Runnable task, final Priority priority) { ++ final PrioritisedExecutor.PrioritisedTask queueTask = this.getQueue().createTask(task, priority); ++ ++ // need to override queue() to notify us of tasks ++ return new PrioritisedQueueExecutorThread.PrioritisedQueueExecutorThreadTask(queueTask, this); ++ } ++ ++ @Override ++ default PrioritisedTask queueRunnable(final Runnable task, final Priority priority) { ++ final PrioritisedTask ret = this.getQueue().queueRunnable(task, priority); ++ ++ this.notifyOfTasks(); ++ ++ return ret; ++ } ++ ++ @Override ++ default boolean haveAllTasksExecuted() { ++ return this.getQueue().haveAllTasksExecuted(); ++ } ++ ++ @Override ++ default long getTotalTasksExecuted() { ++ return this.getQueue().getTotalTasksExecuted(); ++ } ++ ++ @Override ++ default long getTotalTasksScheduled() { ++ return this.getQueue().getTotalTasksScheduled(); ++ } ++ ++ /** ++ * {@inheritDoc} ++ * @throws IllegalStateException If the current thread is {@code this} thread, or the underlying queue throws this exception. ++ */ ++ @Override ++ default void waitUntilAllExecuted() throws IllegalStateException { ++ this.getQueue().waitUntilAllExecuted(); ++ } ++ ++ /** ++ * {@inheritDoc} ++ * @throws IllegalStateException Always ++ */ ++ @Override ++ default boolean executeTask() throws IllegalStateException { ++ throw new IllegalStateException(); ++ } ++ ++} diff --git a/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java b/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java new file mode 100644 -index 0000000000000000000000000000000000000000..1cd17795c8f8a7a0d111123e662904522a6c8819 +index 0000000000000000000000000000000000000000..abdec5529763b77126494ae0c2be9b48de900bc1 --- /dev/null +++ b/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java -@@ -0,0 +1,138 @@ +@@ -0,0 +1,55 @@ +// Gale - base thread pool - chunk worker task queue + +package org.galemc.gale.executor.chunksystem; @@ -29,7 +297,7 @@ index 0000000000000000000000000000000000000000..1cd17795c8f8a7a0d111123e66290452 + * that it does not extend {@link Thread}, but may be instantiated on its own, as an agent representing + * a {@link BaseThread} to the {@link PrioritisedExecutor}. + */ -+public abstract class PrioritisedQueueExecutorThreadAgent implements PrioritisedExecutor { ++public abstract class PrioritisedQueueExecutorThreadAgent implements AbstractPrioritisedQueueExecutorThreadBase { + + private static final Logger LOGGER = LogUtils.getLogger(); + @@ -41,112 +309,29 @@ index 0000000000000000000000000000000000000000..1cd17795c8f8a7a0d111123e66290452 + this.baseThread = baseThread; + } + -+ protected boolean pollTasks() { -+ boolean ret = false; -+ -+ for (;;) { -+ try { -+ if (!this.queue.executeTask()) { -+ break; -+ } -+ ret = true; -+ } catch (final ThreadDeath death) { -+ throw death; // goodbye world... -+ } catch (final Throwable throwable) { -+ LOGGER.error("Exception thrown from prioritized runnable task in thread '" + this.baseThread.getName() + "'", throwable); -+ } -+ } -+ -+ return ret; -+ } -+ + @Override -+ public PrioritisedTask createTask(final Runnable task, final Priority priority) { -+ final PrioritisedTask queueTask = this.queue.createTask(task, priority); -+ -+ // need to override queue() to notify us of tasks -+ return new PrioritisedTask() { -+ @Override -+ public Priority getPriority() { -+ return queueTask.getPriority(); -+ } -+ -+ @Override -+ public boolean setPriority(final Priority priority) { -+ return queueTask.setPriority(priority); -+ } -+ -+ @Override -+ public boolean raisePriority(final Priority priority) { -+ return queueTask.raisePriority(priority); -+ } -+ -+ @Override -+ public boolean lowerPriority(final Priority priority) { -+ return queueTask.lowerPriority(priority); -+ } -+ -+ @Override -+ public boolean queue() { -+ final boolean ret = queueTask.queue(); -+ if (ret) { -+ BaseTaskQueues.chunkWorker.newTaskWasAdded(); -+ } -+ return ret; -+ } -+ -+ @Override -+ public boolean cancel() { -+ return queueTask.cancel(); -+ } -+ -+ @Override -+ public boolean execute() { -+ return queueTask.execute(); -+ } -+ }; -+ } -+ -+ @Override -+ public PrioritisedTask queueRunnable(final Runnable task, final Priority priority) { -+ final PrioritisedTask ret = this.queue.queueRunnable(task, priority); -+ ++ public void notifyOfTasks() { + BaseTaskQueues.chunkWorker.newTaskWasAdded(); -+ -+ return ret; + } + + @Override -+ public boolean haveAllTasksExecuted() { -+ return this.queue.haveAllTasksExecuted(); ++ public PrioritisedExecutor getQueue() { ++ return this.queue; + } + + @Override -+ public long getTotalTasksExecuted() { -+ return this.queue.getTotalTasksExecuted(); ++ public boolean isHalted() { ++ return false; + } + + @Override -+ public long getTotalTasksScheduled() { -+ return this.queue.getTotalTasksScheduled(); ++ public Logger getLogger() { ++ return LOGGER; + } + -+ /** -+ * {@inheritDoc} -+ * @throws IllegalStateException If the current thread is {@code this} thread, or the underlying queue throws this exception. -+ */ + @Override -+ public void waitUntilAllExecuted() throws IllegalStateException { -+ this.queue.waitUntilAllExecuted(); -+ } -+ -+ /** -+ * {@inheritDoc} -+ * @throws IllegalStateException Always -+ */ -+ @Override -+ public boolean executeTask() throws IllegalStateException { -+ throw new IllegalStateException(); ++ public String getName() { ++ return this.baseThread.getName(); + } + +}