From 5b292479afd849577d5ada2b691b8f6aa91bdbef Mon Sep 17 00:00:00 2001 From: Martijn Muijsers Date: Mon, 30 Jan 2023 22:36:16 +0100 Subject: [PATCH] Run chunk worker tasks on base thread pool --- ...itisedQueueExecutorThread-agent-util.patch | 152 ++++ ...unk-worker-tasks-on-base-thread-pool.patch | 730 ++++++++++++++++++ ... => 0156-Non-blocking-PooledObjects.patch} | 0 3 files changed, 882 insertions(+) create mode 100644 patches/server/0154-BaseThread-PrioritisedQueueExecutorThread-agent-util.patch create mode 100644 patches/server/0155-Run-chunk-worker-tasks-on-base-thread-pool.patch rename patches/server/{0154-Non-blocking-PooledObjects.patch => 0156-Non-blocking-PooledObjects.patch} (100%) diff --git a/patches/server/0154-BaseThread-PrioritisedQueueExecutorThread-agent-util.patch b/patches/server/0154-BaseThread-PrioritisedQueueExecutorThread-agent-util.patch new file mode 100644 index 0000000..d94a3fd --- /dev/null +++ b/patches/server/0154-BaseThread-PrioritisedQueueExecutorThread-agent-util.patch @@ -0,0 +1,152 @@ +From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 +From: Martijn Muijsers +Date: Mon, 30 Jan 2023 22:34:48 +0100 +Subject: [PATCH] BaseThread PrioritisedQueueExecutorThread agent utility + +License: GPL-3.0 (https://www.gnu.org/licenses/gpl-3.0.html) +Gale - https://galemc.org + +diff --git a/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java b/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java +new file mode 100644 +index 0000000000000000000000000000000000000000..1cd17795c8f8a7a0d111123e662904522a6c8819 +--- /dev/null ++++ b/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java +@@ -0,0 +1,138 @@ ++// Gale - base thread pool - chunk worker task queue ++ ++package org.galemc.gale.executor.chunksystem; ++ ++import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedExecutor; ++import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedQueueExecutorThread; ++import com.mojang.logging.LogUtils; ++import org.galemc.gale.executor.queue.BaseTaskQueues; ++import org.galemc.gale.executor.thread.BaseThread; ++import org.slf4j.Logger; ++ ++ ++/** ++ * This class is a copy of {@link PrioritisedQueueExecutorThread}, with the notable difference ++ * that it does not extend {@link Thread}, but may be instantiated on its own, as an agent representing ++ * a {@link BaseThread} to the {@link PrioritisedExecutor}. ++ */ ++public abstract class PrioritisedQueueExecutorThreadAgent implements PrioritisedExecutor { ++ ++ private static final Logger LOGGER = LogUtils.getLogger(); ++ ++ protected final PrioritisedExecutor queue; ++ protected final BaseThread baseThread; ++ ++ public PrioritisedQueueExecutorThreadAgent(final PrioritisedExecutor queue, final BaseThread baseThread) { ++ this.queue = queue; ++ this.baseThread = baseThread; ++ } ++ ++ protected boolean pollTasks() { ++ boolean ret = false; ++ ++ for (;;) { ++ try { ++ if (!this.queue.executeTask()) { ++ break; ++ } ++ ret = true; ++ } catch (final ThreadDeath death) { ++ throw death; // goodbye world... ++ } catch (final Throwable throwable) { ++ LOGGER.error("Exception thrown from prioritized runnable task in thread '" + this.baseThread.getName() + "'", throwable); ++ } ++ } ++ ++ return ret; ++ } ++ ++ @Override ++ public PrioritisedTask createTask(final Runnable task, final Priority priority) { ++ final PrioritisedTask queueTask = this.queue.createTask(task, priority); ++ ++ // need to override queue() to notify us of tasks ++ return new PrioritisedTask() { ++ @Override ++ public Priority getPriority() { ++ return queueTask.getPriority(); ++ } ++ ++ @Override ++ public boolean setPriority(final Priority priority) { ++ return queueTask.setPriority(priority); ++ } ++ ++ @Override ++ public boolean raisePriority(final Priority priority) { ++ return queueTask.raisePriority(priority); ++ } ++ ++ @Override ++ public boolean lowerPriority(final Priority priority) { ++ return queueTask.lowerPriority(priority); ++ } ++ ++ @Override ++ public boolean queue() { ++ final boolean ret = queueTask.queue(); ++ if (ret) { ++ BaseTaskQueues.chunkWorker.newTaskWasAdded(); ++ } ++ return ret; ++ } ++ ++ @Override ++ public boolean cancel() { ++ return queueTask.cancel(); ++ } ++ ++ @Override ++ public boolean execute() { ++ return queueTask.execute(); ++ } ++ }; ++ } ++ ++ @Override ++ public PrioritisedTask queueRunnable(final Runnable task, final Priority priority) { ++ final PrioritisedTask ret = this.queue.queueRunnable(task, priority); ++ ++ BaseTaskQueues.chunkWorker.newTaskWasAdded(); ++ ++ return ret; ++ } ++ ++ @Override ++ public boolean haveAllTasksExecuted() { ++ return this.queue.haveAllTasksExecuted(); ++ } ++ ++ @Override ++ public long getTotalTasksExecuted() { ++ return this.queue.getTotalTasksExecuted(); ++ } ++ ++ @Override ++ public long getTotalTasksScheduled() { ++ return this.queue.getTotalTasksScheduled(); ++ } ++ ++ /** ++ * {@inheritDoc} ++ * @throws IllegalStateException If the current thread is {@code this} thread, or the underlying queue throws this exception. ++ */ ++ @Override ++ public void waitUntilAllExecuted() throws IllegalStateException { ++ this.queue.waitUntilAllExecuted(); ++ } ++ ++ /** ++ * {@inheritDoc} ++ * @throws IllegalStateException Always ++ */ ++ @Override ++ public boolean executeTask() throws IllegalStateException { ++ throw new IllegalStateException(); ++ } ++ ++} diff --git a/patches/server/0155-Run-chunk-worker-tasks-on-base-thread-pool.patch b/patches/server/0155-Run-chunk-worker-tasks-on-base-thread-pool.patch new file mode 100644 index 0000000..8529707 --- /dev/null +++ b/patches/server/0155-Run-chunk-worker-tasks-on-base-thread-pool.patch @@ -0,0 +1,730 @@ +From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 +From: Martijn Muijsers +Date: Mon, 30 Jan 2023 22:35:08 +0100 +Subject: [PATCH] Run chunk worker tasks on base thread pool + +License: AGPL-3.0 (https://www.gnu.org/licenses/agpl-3.0.html) +Gale - https://galemc.org + +diff --git a/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java b/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java +index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..ae335a8ab04f4dc08ca0224a4ee70fa9b2567fe0 100644 +--- a/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java ++++ b/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java +@@ -2,11 +2,17 @@ package ca.spottedleaf.concurrentutil.executor.standard; + + import com.mojang.logging.LogUtils; + import it.unimi.dsi.fastutil.objects.ReferenceOpenHashSet; ++import org.galemc.gale.concurrent.Mutex; ++import org.galemc.gale.executor.chunksystem.PrioritisedQueueExecutorThreadAgent; ++import org.galemc.gale.executor.queue.BaseTaskQueueTier; ++import org.galemc.gale.executor.queue.BaseTaskQueues; ++import org.galemc.gale.executor.thread.BaseThread; ++import org.galemc.gale.executor.thread.pool.BaseThreadActivation; ++import org.jetbrains.annotations.NotNull; + import org.slf4j.Logger; +-import java.util.ArrayList; +-import java.util.Arrays; +-import java.util.Comparator; +-import java.util.TreeSet; ++ ++import java.util.*; ++import java.util.concurrent.ConcurrentSkipListSet; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.function.BiConsumer; + +@@ -14,8 +20,9 @@ public final class PrioritisedThreadPool { + + private static final Logger LOGGER = LogUtils.getLogger(); + +- protected final PrioritisedThread[] threads; +- protected final TreeSet queues = new TreeSet<>(PrioritisedPoolExecutorImpl.comparator()); ++ public final Set activeThreads = new ConcurrentSkipListSet<>(); // Gale - base thread pool - chunk worker task queue ++ public final TreeSet queues = new TreeSet<>(PrioritisedPoolExecutorImpl.comparator()); // Gale - base thread pool - chunk worker task queue - protected -> public ++ public final Mutex queuesLock = Mutex.create(); // Gale - base thread pool - chunk worker task queue - spin lock for pool queues + protected final String name; + protected final long queueMaxHoldTime; + +@@ -46,37 +53,14 @@ public final class PrioritisedThreadPool { + } + this.name = name; + this.queueMaxHoldTime = queueHoldTime; +- +- this.threads = new PrioritisedThread[threads]; +- for (int i = 0; i < threads; ++i) { +- this.threads[i] = new PrioritisedThread(this); +- +- // set default attributes +- this.threads[i].setName("Prioritised thread for pool '" + name + "' #" + i); +- this.threads[i].setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> { +- LOGGER.error("Uncaught exception in thread " + thread.getName(), throwable); +- }); +- +- // let thread modifier override defaults +- if (threadModifier != null) { +- threadModifier.accept(this.threads[i], Integer.valueOf(i)); +- } +- +- // now the thread can start +- this.threads[i].start(); +- } +- } +- +- public Thread[] getThreads() { +- return Arrays.copyOf(this.threads, this.threads.length, Thread[].class); + } + +- public PrioritisedPoolExecutor createExecutor(final String name, final int parallelism) { ++ public PrioritisedPoolExecutor createExecutor(final String name, boolean supportsParallelism) { // Gale - base thread pool - chunk worker task queue + synchronized (this.nonShutdownQueues) { + if (this.shutdown) { + throw new IllegalStateException("Queue is shutdown: " + this.toString()); + } +- final PrioritisedPoolExecutorImpl ret = new PrioritisedPoolExecutorImpl(this, name, Math.min(Math.max(1, parallelism), this.threads.length)); ++ final PrioritisedPoolExecutorImpl ret = new PrioritisedPoolExecutorImpl(this, name, supportsParallelism ? -1 : 1); // Gale - base thread pool - chunk worker task queue + + this.nonShutdownQueues.add(ret); + +@@ -88,136 +72,24 @@ public final class PrioritisedThreadPool { + } + } + +- /** +- * Prevents creation of new queues, shutdowns all non-shutdown queues if specified +- */ +- public void halt(final boolean shutdownQueues) { +- synchronized (this.nonShutdownQueues) { +- this.shutdown = true; +- } +- if (shutdownQueues) { +- final ArrayList queuesToShutdown; +- synchronized (this.nonShutdownQueues) { +- this.shutdown = true; +- queuesToShutdown = new ArrayList<>(this.nonShutdownQueues); +- } +- +- for (final PrioritisedPoolExecutorImpl queue : queuesToShutdown) { +- queue.shutdown(); +- } +- } +- +- +- for (final PrioritisedThread thread : this.threads) { +- // can't kill queue, queue is null +- thread.halt(false); +- } +- } +- +- /** +- * Waits until all threads in this pool have shutdown, or until the specified time has passed. +- * @param msToWait Maximum time to wait. +- * @return {@code false} if the maximum time passed, {@code true} otherwise. +- */ +- public boolean join(final long msToWait) { +- try { +- return this.join(msToWait, false); +- } catch (final InterruptedException ex) { +- throw new IllegalStateException(ex); +- } +- } +- +- /** +- * Waits until all threads in this pool have shutdown, or until the specified time has passed. +- * @param msToWait Maximum time to wait. +- * @return {@code false} if the maximum time passed, {@code true} otherwise. +- * @throws InterruptedException If this thread is interrupted. +- */ +- public boolean joinInterruptable(final long msToWait) throws InterruptedException { +- return this.join(msToWait, true); +- } +- +- protected final boolean join(final long msToWait, final boolean interruptable) throws InterruptedException { +- final long nsToWait = msToWait * (1000 * 1000); +- final long start = System.nanoTime(); +- final long deadline = start + nsToWait; +- boolean interrupted = false; +- try { +- for (final PrioritisedThread thread : this.threads) { +- for (;;) { +- if (!thread.isAlive()) { +- break; +- } +- final long current = System.nanoTime(); +- if (current >= deadline) { +- return false; +- } +- +- try { +- thread.join(Math.max(1L, (deadline - current) / (1000 * 1000))); +- } catch (final InterruptedException ex) { +- if (interruptable) { +- throw ex; +- } +- interrupted = true; +- } +- } +- } +- +- return true; +- } finally { +- if (interrupted) { +- Thread.currentThread().interrupt(); +- } +- } +- } +- +- public void shutdown(final boolean wait) { +- final ArrayList queuesToShutdown; +- synchronized (this.nonShutdownQueues) { +- this.shutdown = true; +- queuesToShutdown = new ArrayList<>(this.nonShutdownQueues); +- } +- +- for (final PrioritisedPoolExecutorImpl queue : queuesToShutdown) { +- queue.shutdown(); +- } +- +- for (final PrioritisedThread thread : this.threads) { +- // none of these can be true or else NPE +- thread.close(false, false); +- } +- +- if (wait) { +- final ArrayList queues; +- synchronized (this.activeQueues) { +- queues = new ArrayList<>(this.activeQueues); +- } +- for (final PrioritisedPoolExecutorImpl queue : queues) { +- queue.waitUntilAllExecuted(); +- } +- } +- } +- +- protected static final class PrioritisedThread extends PrioritisedQueueExecutorThread { ++ public static final class PrioritisedThreadAgent extends PrioritisedQueueExecutorThreadAgent implements Comparable { // Gale - base thread pool - chunk worker task queue + + protected final PrioritisedThreadPool pool; + protected final AtomicBoolean alertedHighPriority = new AtomicBoolean(); + +- public PrioritisedThread(final PrioritisedThreadPool pool) { +- super(null); ++ // Gale start - base thread pool - chunk worker task queue ++ public PrioritisedThreadAgent(final PrioritisedThreadPool pool, BaseThread baseThread) { ++ // Gale end - base thread pool - chunk worker task queue ++ super(null, baseThread); + this.pool = pool; + } + +- public boolean alertHighPriorityExecutor() { +- if (!this.notifyTasks()) { +- if (!this.alertedHighPriority.get()) { +- this.alertedHighPriority.set(true); +- } +- return false; ++ // Gale start - base thread pool - chunk worker task queue ++ public void alertHighPriorityExecutor() { ++ if (!this.alertedHighPriority.get()) { ++ this.alertedHighPriority.set(true); + } +- +- return true; ++ // Gale end - base thread pool - chunk worker task queue + } + + private boolean isAlertedHighPriority() { +@@ -225,21 +97,23 @@ public final class PrioritisedThreadPool { + } + + @Override +- protected boolean pollTasks() { ++ public boolean pollTasks() { // Gale - base thread pool - chunk worker task queue - protected -> public + final PrioritisedThreadPool pool = this.pool; + final TreeSet queues = this.pool.queues; ++ final Mutex queuesLock = this.pool.queuesLock; // Gale - base thread pool - chunk worker task queue - spin lock for pool queues + + boolean ret = false; + for (;;) { +- if (this.halted) { +- break; +- } + // try to find a queue + // note that if and ONLY IF the queues set is empty, this means there are no tasks for us to execute. + // so we can only break when it's empty + final PrioritisedPoolExecutorImpl queue; + // select queue +- synchronized (queues) { ++ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues ++ //noinspection StatementWithEmptyBody ++ while (!queuesLock.tryAcquire()); ++ try { ++ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues + queue = queues.pollFirst(); + if (queue == null) { + // no tasks to execute +@@ -249,7 +123,7 @@ public final class PrioritisedThreadPool { + queue.schedulingId = ++pool.schedulingIdGenerator; + // we own this queue now, so increment the executor count + // do we also need to push this queue up for grabs for another executor? +- if (++queue.concurrentExecutors < queue.maximumExecutors) { ++ if (++queue.concurrentExecutors < queue.maximumExecutors || queue.maximumExecutors == -1) { // Gale - base thread pool - chunk worker task queue + // re-add to queues + // it's very important this is done in the same synchronised block for polling, as this prevents + // us from possibly later adding a queue that should not exist in the set +@@ -261,16 +135,17 @@ public final class PrioritisedThreadPool { + // note: we cannot drain entries from the queue while holding this lock, as it will cause deadlock + // the queue addition holds the per-queue lock first then acquires the lock we have now, but if we + // try to poll now we don't hold the per queue lock but we do hold the global lock... ++ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues ++ } finally { ++ queuesLock.release(); + } ++ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues + + // parse tasks as long as we are allowed + final long start = System.nanoTime(); + final long deadline = start + pool.queueMaxHoldTime; + do { + try { +- if (this.halted) { +- break; +- } + if (!queue.executeTask()) { + // no more tasks, try next queue + break; +@@ -279,11 +154,15 @@ public final class PrioritisedThreadPool { + } catch (final ThreadDeath death) { + throw death; // goodbye world... + } catch (final Throwable throwable) { +- LOGGER.error("Exception thrown from thread '" + this.getName() + "' in queue '" + queue.toString() + "'", throwable); ++ LOGGER.error("Exception thrown from thread '" + this.baseThread.getName() + "' in queue '" + queue.toString() + "'", throwable); + } +- } while (!this.isAlertedHighPriority() && System.nanoTime() <= deadline); ++ } while (!this.isAlertedHighPriority() && System.nanoTime() <= deadline && BaseThreadActivation.tierInExcessOrdinal > BaseTaskQueueTier.LOW_PRIORITY_ASYNC.ordinal); // Gale - base thread pool - chunk worker task queue + +- synchronized (queues) { ++ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues ++ //noinspection StatementWithEmptyBody ++ while (!queuesLock.tryAcquire()); ++ try { ++ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues + // decrement executors, we are no longer executing + if (queue.isQueued) { + queues.remove(queue); +@@ -301,11 +180,21 @@ public final class PrioritisedThreadPool { + queues.add(queue); + queue.isQueued = true; + } ++ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues ++ } finally { ++ queuesLock.release(); + } ++ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues + } + + return ret; + } ++ ++ @Override ++ public int compareTo(@NotNull PrioritisedThreadPool.PrioritisedThreadAgent o) { ++ return Integer.compare(this.baseThread.baseThreadIndex, o.baseThread.baseThreadIndex); ++ } ++ + } + + public interface PrioritisedPoolExecutor extends PrioritisedExecutor { +@@ -322,7 +211,7 @@ public final class PrioritisedThreadPool { + public boolean isActive(); + } + +- protected static final class PrioritisedPoolExecutorImpl extends PrioritisedThreadedTaskQueue implements PrioritisedPoolExecutor { ++ public static final class PrioritisedPoolExecutorImpl extends PrioritisedThreadedTaskQueue implements PrioritisedPoolExecutor { // Gale - base thread pool - chunk worker task queue - protected -> public + + protected final PrioritisedThreadPool pool; + protected final long[] priorityCounts = new long[Priority.TOTAL_SCHEDULABLE_PRIORITIES]; +@@ -369,7 +258,12 @@ public final class PrioritisedThreadPool { + public void halt() { + final PrioritisedThreadPool pool = this.pool; + final TreeSet queues = pool.queues; +- synchronized (queues) { ++ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues ++ final Mutex queuesLock = pool.queuesLock; ++ //noinspection StatementWithEmptyBody ++ while (!queuesLock.tryAcquire()); ++ try { ++ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues + if (this.isHalted) { + return; + } +@@ -378,7 +272,11 @@ public final class PrioritisedThreadPool { + queues.remove(this); + this.isQueued = false; + } ++ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues ++ } finally { ++ queuesLock.release(); + } ++ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues + synchronized (pool.nonShutdownQueues) { + pool.nonShutdownQueues.remove(this); + } +@@ -391,8 +289,13 @@ public final class PrioritisedThreadPool { + public boolean isActive() { + final PrioritisedThreadPool pool = this.pool; + final TreeSet queues = pool.queues; ++ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues ++ final Mutex queuesLock = pool.queuesLock; + +- synchronized (queues) { ++ //noinspection StatementWithEmptyBody ++ while (!queuesLock.tryAcquire()); ++ try { ++ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues + if (this.concurrentExecutors != 0) { + return true; + } +@@ -401,7 +304,11 @@ public final class PrioritisedThreadPool { + return true; + } + } ++ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues ++ } finally { ++ queuesLock.release(); + } ++ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues + + return false; + } +@@ -469,8 +376,13 @@ public final class PrioritisedThreadPool { + + final PrioritisedThreadPool pool = this.pool; + final TreeSet queues = pool.queues; ++ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues ++ final Mutex queuesLock = pool.queuesLock; + +- synchronized (queues) { ++ //noinspection StatementWithEmptyBody ++ while (!queuesLock.tryAcquire()); ++ try { ++ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues + if (!this.isQueued) { + // see if we need to be queued + if (newPriority != null) { +@@ -478,7 +390,7 @@ public final class PrioritisedThreadPool { + this.schedulingId = ++pool.schedulingIdGenerator; + } + this.scheduledPriority = newPriority; // must be updated before queue add +- if (!this.isHalted && this.concurrentExecutors < this.maximumExecutors) { ++ if (!this.isHalted && (this.concurrentExecutors < this.maximumExecutors || this.maximumExecutors == -1)) { // Gale - base thread pool - chunk worker task queue + shouldNotifyHighPriority = newPriority.isHigherOrEqualPriority(Priority.HIGH); + queues.add(this); + this.isQueued = true; +@@ -508,7 +420,11 @@ public final class PrioritisedThreadPool { + } else { + executorsWanted = 0; + } ++ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues ++ } finally { ++ queuesLock.release(); + } ++ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues + + if (newPriority == null && shutdown) { + synchronized (pool.activeQueues) { +@@ -517,13 +433,14 @@ public final class PrioritisedThreadPool { + } + + // Wake up the number of executors we want ++ // Gale start - base thread pool - chunk worker task queue + if (executorsWanted > 0 || (shouldNotifyTasks | shouldNotifyHighPriority)) { +- int notified = 0; +- for (final PrioritisedThread thread : pool.threads) { +- if ((shouldNotifyHighPriority ? thread.alertHighPriorityExecutor() : thread.notifyTasks()) +- && (++notified >= executorsWanted)) { +- break; ++ BaseTaskQueues.chunkWorker.newTaskWasAdded(); ++ if (shouldNotifyHighPriority) { ++ for (final PrioritisedThreadAgent thread : pool.activeThreads) { ++ thread.alertHighPriorityExecutor(); + } ++ // Gale end - base thread pool - chunk worker task queue + } + } + } +@@ -543,9 +460,14 @@ public final class PrioritisedThreadPool { + } + + final TreeSet queues = pool.queues; ++ final Mutex queuesLock = pool.queuesLock; // Gale - base thread pool - chunk worker task queue - spin lock for pool queues + + // try and shift around our priority +- synchronized (queues) { ++ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues ++ //noinspection StatementWithEmptyBody ++ while (!queuesLock.tryAcquire()); ++ try { ++ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues + if (this.scheduledPriority == null) { + // no tasks are queued, ensure we aren't in activeQueues + synchronized (pool.activeQueues) { +@@ -571,7 +493,11 @@ public final class PrioritisedThreadPool { + } else { + this.scheduledPriority = Priority.HIGHEST; + } ++ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues ++ } finally { ++ queuesLock.release(); + } ++ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues + + return ret; + } +diff --git a/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java b/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java +index f5c15d40094c2ddc6220b0595597d12103fcf425..79ef41d2bb30beee2355d1de3dc99c9e00d929d5 100644 +--- a/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java ++++ b/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java +@@ -22,6 +22,7 @@ import net.minecraft.world.level.chunk.ChunkAccess; + import net.minecraft.world.level.chunk.ChunkStatus; + import net.minecraft.world.level.chunk.LevelChunk; + import org.bukkit.Bukkit; ++import org.galemc.gale.executor.thread.pool.BaseThreadPool; + import org.slf4j.Logger; + import java.io.File; + import java.util.ArrayDeque; +@@ -103,7 +104,7 @@ public final class ChunkTaskScheduler { + thread.setUncaughtExceptionHandler(io.papermc.paper.chunk.system.scheduling.NewChunkHolder.CHUNKSYSTEM_UNCAUGHT_EXCEPTION_HANDLER); + }, (long)(20.0e6)); // 20ms + +- LOGGER.info("Chunk system is using " + newChunkSystemIOThreads + " I/O threads, " + newChunkSystemWorkerThreads + " worker threads, and gen parallelism of " + ChunkTaskScheduler.newChunkSystemGenParallelism + " threads"); ++ LOGGER.info("Chunk system is using " + newChunkSystemIOThreads + " I/O thread" + (newChunkSystemIOThreads == 1 ? "" : "s")); // Gale - base thread pool - chunk worker task queue + } + + public final ServerLevel world; +@@ -196,12 +197,14 @@ public final class ChunkTaskScheduler { + this.workers = workers; + + final String worldName = world.getWorld().getName(); +- this.genExecutor = workers.createExecutor("Chunk single-threaded generation executor for world '" + worldName + "'", 1); ++ this.genExecutor = workers.createExecutor("Chunk single-threaded generation executor for world '" + worldName + "'", false); // Gale - base thread pool - chunk worker task queue + // same as genExecutor, as there are race conditions between updating blocks in FEATURE status while lighting chunks + this.lightExecutor = this.genExecutor; +- this.parallelGenExecutor = newChunkSystemGenParallelism <= 1 ? this.genExecutor +- : workers.createExecutor("Chunk parallel generation executor for world '" + worldName + "'", newChunkSystemGenParallelism); +- this.loadExecutor = workers.createExecutor("Chunk load executor for world '" + worldName + "'", newChunkSystemLoadParallelism); ++ // Gale start - base thread pool - chunk worker task queue ++ this.parallelGenExecutor = BaseThreadPool.targetParallelism <= 2 ? this.genExecutor ++ : workers.createExecutor("Chunk parallel generation executor for world '" + worldName + "'", true); ++ this.loadExecutor = workers.createExecutor("Chunk load executor for world '" + worldName + "'", true); ++ // Gale end - base thread pool - chunk worker task queue + this.chunkHolderManager = new ChunkHolderManager(world, this); + } + +diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java +index 0019e5eefc4b638526a75dd3706a54033dd9b811..7ed820d2483bf6741a355b062f062a04866ba938 100644 +--- a/src/main/java/net/minecraft/server/MinecraftServer.java ++++ b/src/main/java/net/minecraft/server/MinecraftServer.java +@@ -1069,6 +1069,10 @@ public abstract class MinecraftServer extends MinecraftServerBlockableEventLoop + } + } + ++ // Gale start - base thread pool - chunk worker task queue ++ LOGGER.info("Waiting for chunk worker tasks to finish..."); ++ serverThread.runTasksUntil(() -> !BaseTaskQueues.chunkWorker.hasTasks(), null); ++ // Gale end - base thread pool - chunk worker task queue + this.saveAllChunks(false, true, false, true); // Paper - rewrite chunk system - move closing into here + + this.isSaving = false; +diff --git a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java +index bfcec658cbf381cc793d7dd844a81fac27c43337..dc006d9940ef8114a3a3e4860fbc1da0f7c2ee60 100644 +--- a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java ++++ b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java +@@ -63,13 +63,19 @@ public enum BaseTaskQueueTier { + /** + * A tier for queues that contain general tasks that must be performed at some point in time, + * asynchronously with respect to the {@link ServerThread} and the ticking of the server. +- * Execution of + */ + ASYNC(new AbstractTaskQueue[]{ + // The cleaner queue has high priority because it releases resources back to a pool, thereby saving memory + BaseTaskQueues.cleaner, + BaseTaskQueues.scheduledAsync +- }, Integer.getInteger("gale.thread.priority.async", 6)); ++ }, Integer.getInteger("gale.thread.priority.async", 6)), ++ /** ++ * A tier for queues that contain tasks with the same considerations as {@link #ASYNC}, ++ * but with a low priority. ++ */ ++ LOW_PRIORITY_ASYNC(new AbstractTaskQueue[]{ ++ BaseTaskQueues.chunkWorker ++ }, Integer.getInteger("gale.thread.priority.async.low", 3)); + + /** + * Equal to {@link #ordinal()}. +diff --git a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java +index ed3ccf2e64539363a7be2d507c68c40b5913f75c..a12250e5aaed02995b7bf09a8018a93f27e42920 100644 +--- a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java ++++ b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java +@@ -113,4 +113,9 @@ public final class BaseTaskQueues { + */ + public static final SimpleTaskQueue scheduledAsync = SimpleTaskQueue.allSpans("ScheduledAsync"); + ++ /** ++ * @see ChunkWorkerTaskQueue ++ */ ++ public static final ChunkWorkerTaskQueue chunkWorker = new ChunkWorkerTaskQueue(); ++ + } +diff --git a/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java b/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java +new file mode 100644 +index 0000000000000000000000000000000000000000..0e2e11b883d8910e97c5700f7f6a7d1b545a6164 +--- /dev/null ++++ b/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java +@@ -0,0 +1,108 @@ ++// Gale - base thread pool - chunk worker task queue ++ ++package org.galemc.gale.executor.queue; ++ ++import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedQueueExecutorThread; ++import io.papermc.paper.chunk.system.scheduling.ChunkTaskScheduler; ++import org.galemc.gale.executor.TaskSpan; ++import org.galemc.gale.executor.annotation.YieldFree; ++import org.galemc.gale.executor.annotation.thread.AnyThreadSafe; ++import org.galemc.gale.executor.thread.BaseThread; ++import org.galemc.gale.executor.thread.pool.BaseThreadActivation; ++import org.jetbrains.annotations.Nullable; ++ ++/** ++ * This class provides access to, but does not store, the tasks related to chunk loading and chunk generation, ++ * that are scheduled and normally polled by the chunk system's {@link PrioritisedQueueExecutorThread}s. ++ *
++ * All tasks provided by this queue must be yield-free.//TODO Gale replace locks by spin locks ++ * While multiple chunk system tasks may be executed in one base thread pool task, ++ * tasks in this queue are not regarded as {@link TaskSpan#TINY}. ++ * ++ * @author Martijn Muijsers under AGPL-3.0 ++ */ ++@AnyThreadSafe ++@YieldFree ++public final class ChunkWorkerTaskQueue implements AbstractTaskQueue { ++ ++ /** ++ * Will be initialized in {@link #setTier}. ++ */ ++ private BaseTaskQueueTier tier; ++ ++ ChunkWorkerTaskQueue() {} ++ ++ @Override ++ public String getName() { ++ return "ChunkWorker"; ++ } ++ ++ @Override ++ public boolean hasTasks() { ++ var workerThreads = ChunkTaskScheduler.workerThreads; ++ if (workerThreads == null) { ++ return false; ++ } ++ //noinspection StatementWithEmptyBody ++ while (!workerThreads.queuesLock.tryAcquire()); ++ try { ++ for (var queue : workerThreads.queues) { ++ if (queue.hasScheduledUncompletedTasksVolatile()) { ++ return true; ++ } ++ } ++ } finally { ++ workerThreads.queuesLock.release(); ++ } ++ return false; ++ } ++ ++ @Override ++ public boolean hasTasks(TaskSpan span) { ++ return span == TaskSpan.FREE && this.hasTasks(); ++ } ++ ++ @Override ++ public boolean canHaveTasks(TaskSpan span) { ++ return span == TaskSpan.FREE; ++ } ++ ++ @Override ++ public @Nullable Runnable poll(BaseThread currentThread) { ++ if (!this.hasTasks()) { ++ return null; ++ } ++ return () -> { ++ var workerAgent = currentThread.getChunkWorkerAgent(); ++ ChunkTaskScheduler.workerThreads.activeThreads.add(workerAgent); ++ workerAgent.pollTasks(); ++ ChunkTaskScheduler.workerThreads.activeThreads.remove(workerAgent); ++ }; ++ } ++ ++ @Override ++ public @Nullable Runnable pollTiny(BaseThread currentThread) { ++ return null; ++ } ++ ++ @Override ++ public void add(Runnable task, TaskSpan span) { ++ throw new UnsupportedOperationException(); ++ } ++ ++ /** ++ * To be called when a new task has been added to the underlying storage of this queue. ++ */ ++ public void newTaskWasAdded() { ++ BaseThreadActivation.newTaskWasAdded(this.tier, TaskSpan.FREE, true); ++ } ++ ++ @Override ++ public void setTier(BaseTaskQueueTier tier) { ++ if (this.tier != null) { ++ throw new IllegalStateException(this.getClass().getSimpleName() + ".tier was already initialized"); ++ } ++ this.tier = tier; ++ } ++ ++} +diff --git a/src/main/java/org/galemc/gale/executor/thread/BaseThread.java b/src/main/java/org/galemc/gale/executor/thread/BaseThread.java +index 1024c45be3d867af6563d4f6984e716cc0bc12d2..f384479004bdc9e412f14f35fd3bfedca371053e 100644 +--- a/src/main/java/org/galemc/gale/executor/thread/BaseThread.java ++++ b/src/main/java/org/galemc/gale/executor/thread/BaseThread.java +@@ -2,6 +2,8 @@ + + package org.galemc.gale.executor.thread; + ++import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedThreadPool; ++import io.papermc.paper.chunk.system.scheduling.ChunkTaskScheduler; + import io.papermc.paper.util.TickThread; + import net.minecraft.server.MinecraftServer; + import org.galemc.gale.executor.TaskSpan; +@@ -16,6 +18,7 @@ import org.galemc.gale.executor.lock.YieldingLock; + import org.galemc.gale.executor.queue.AbstractTaskQueue; + import org.galemc.gale.executor.queue.BaseTaskQueueTier; + import org.galemc.gale.executor.thread.pool.*; ++import org.jetbrains.annotations.NotNull; + import org.jetbrains.annotations.Nullable; + + import java.util.concurrent.TimeUnit; +@@ -659,6 +662,20 @@ public abstract class BaseThread extends Thread implements AbstractYieldingThrea + } + } + ++ // Gale start - base thread pool - chunk worker task queue ++ /** ++ * Lazily computed value for {@link #getChunkWorkerAgent}. ++ */ ++ private @Nullable PrioritisedThreadPool.PrioritisedThreadAgent chunkWorkerAgent; ++ ++ public @NotNull PrioritisedThreadPool.PrioritisedThreadAgent getChunkWorkerAgent() { ++ if (this.chunkWorkerAgent == null) { ++ this.chunkWorkerAgent = new PrioritisedThreadPool.PrioritisedThreadAgent(ChunkTaskScheduler.workerThreads, this); ++ } ++ return this.chunkWorkerAgent; ++ } ++ // Gale end - base thread pool - chunk worker task queue ++ + /** + * @return The current thread if it is a {@link BaseThread}, or null otherwise. + */ diff --git a/patches/server/0154-Non-blocking-PooledObjects.patch b/patches/server/0156-Non-blocking-PooledObjects.patch similarity index 100% rename from patches/server/0154-Non-blocking-PooledObjects.patch rename to patches/server/0156-Non-blocking-PooledObjects.patch