9
0
mirror of https://github.com/Dreeam-qwq/Gale.git synced 2025-12-22 08:19:31 +00:00
Files
Gale/patches/server/0161-Run-chunk-worker-tasks-on-base-thread-pool.patch
2023-02-13 20:34:58 +01:00

705 lines
34 KiB
Diff

From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
From: Martijn Muijsers <martijnmuijsers@live.nl>
Date: Mon, 30 Jan 2023 22:35:08 +0100
Subject: [PATCH] Run chunk worker tasks on base thread pool
License: AGPL-3.0 (https://www.gnu.org/licenses/agpl-3.0.html)
Gale - https://galemc.org
diff --git a/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java b/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java
index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..2fa56c06ec16817d5e35d5283b8e5a0d0e01d643 100644
--- a/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java
+++ b/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java
@@ -2,11 +2,17 @@ package ca.spottedleaf.concurrentutil.executor.standard;
import com.mojang.logging.LogUtils;
import it.unimi.dsi.fastutil.objects.ReferenceOpenHashSet;
+import org.galemc.gale.concurrent.Mutex;
+import org.galemc.gale.executor.chunksystem.PrioritisedQueueExecutorThreadAgent;
+import org.galemc.gale.executor.queue.BaseTaskQueueTier;
+import org.galemc.gale.executor.queue.BaseTaskQueues;
+import org.galemc.gale.executor.thread.BaseThread;
+import org.galemc.gale.executor.thread.pool.BaseThreadActivation;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.TreeSet;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
@@ -14,13 +20,16 @@ public final class PrioritisedThreadPool {
private static final Logger LOGGER = LogUtils.getLogger();
- protected final PrioritisedThread[] threads;
- protected final TreeSet<PrioritisedPoolExecutorImpl> queues = new TreeSet<>(PrioritisedPoolExecutorImpl.comparator());
+ public final Set<PrioritisedThreadAgent> activeThreads = new ConcurrentSkipListSet<>(); // Gale - base thread pool - chunk worker task queue
+ public final TreeSet<PrioritisedPoolExecutorImpl> queues = new TreeSet<>(PrioritisedPoolExecutorImpl.comparator()); // Gale - base thread pool - chunk worker task queue - protected -> public
+ public final Mutex queuesLock = Mutex.create(); // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
protected final String name;
protected final long queueMaxHoldTime;
protected final ReferenceOpenHashSet<PrioritisedPoolExecutorImpl> nonShutdownQueues = new ReferenceOpenHashSet<>();
+ protected final Mutex nonShutdownQueuesLock = Mutex.create(); // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
protected final ReferenceOpenHashSet<PrioritisedPoolExecutorImpl> activeQueues = new ReferenceOpenHashSet<>();
+ protected final Mutex activeQueuesLock = Mutex.create(); // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
protected boolean shutdown;
@@ -46,41 +55,18 @@ public final class PrioritisedThreadPool {
}
this.name = name;
this.queueMaxHoldTime = queueHoldTime;
-
- this.threads = new PrioritisedThread[threads];
- for (int i = 0; i < threads; ++i) {
- this.threads[i] = new PrioritisedThread(this);
-
- // set default attributes
- this.threads[i].setName("Prioritised thread for pool '" + name + "' #" + i);
- this.threads[i].setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> {
- LOGGER.error("Uncaught exception in thread " + thread.getName(), throwable);
- });
-
- // let thread modifier override defaults
- if (threadModifier != null) {
- threadModifier.accept(this.threads[i], Integer.valueOf(i));
- }
-
- // now the thread can start
- this.threads[i].start();
- }
}
- public Thread[] getThreads() {
- return Arrays.copyOf(this.threads, this.threads.length, Thread[].class);
- }
-
- public PrioritisedPoolExecutor createExecutor(final String name, final int parallelism) {
- synchronized (this.nonShutdownQueues) {
+ public PrioritisedPoolExecutor createExecutor(final String name, boolean supportsParallelism) { // Gale - base thread pool - chunk worker task queue
+ try (var ignored = nonShutdownQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
if (this.shutdown) {
throw new IllegalStateException("Queue is shutdown: " + this.toString());
}
- final PrioritisedPoolExecutorImpl ret = new PrioritisedPoolExecutorImpl(this, name, Math.min(Math.max(1, parallelism), this.threads.length));
+ final PrioritisedPoolExecutorImpl ret = new PrioritisedPoolExecutorImpl(this, name, supportsParallelism ? -1 : 1); // Gale - base thread pool - chunk worker task queue
this.nonShutdownQueues.add(ret);
- synchronized (this.activeQueues) {
+ try (var ignored2 = this.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
this.activeQueues.add(ret);
}
@@ -88,136 +74,24 @@ public final class PrioritisedThreadPool {
}
}
- /**
- * Prevents creation of new queues, shutdowns all non-shutdown queues if specified
- */
- public void halt(final boolean shutdownQueues) {
- synchronized (this.nonShutdownQueues) {
- this.shutdown = true;
- }
- if (shutdownQueues) {
- final ArrayList<PrioritisedPoolExecutorImpl> queuesToShutdown;
- synchronized (this.nonShutdownQueues) {
- this.shutdown = true;
- queuesToShutdown = new ArrayList<>(this.nonShutdownQueues);
- }
-
- for (final PrioritisedPoolExecutorImpl queue : queuesToShutdown) {
- queue.shutdown();
- }
- }
-
-
- for (final PrioritisedThread thread : this.threads) {
- // can't kill queue, queue is null
- thread.halt(false);
- }
- }
-
- /**
- * Waits until all threads in this pool have shutdown, or until the specified time has passed.
- * @param msToWait Maximum time to wait.
- * @return {@code false} if the maximum time passed, {@code true} otherwise.
- */
- public boolean join(final long msToWait) {
- try {
- return this.join(msToWait, false);
- } catch (final InterruptedException ex) {
- throw new IllegalStateException(ex);
- }
- }
-
- /**
- * Waits until all threads in this pool have shutdown, or until the specified time has passed.
- * @param msToWait Maximum time to wait.
- * @return {@code false} if the maximum time passed, {@code true} otherwise.
- * @throws InterruptedException If this thread is interrupted.
- */
- public boolean joinInterruptable(final long msToWait) throws InterruptedException {
- return this.join(msToWait, true);
- }
-
- protected final boolean join(final long msToWait, final boolean interruptable) throws InterruptedException {
- final long nsToWait = msToWait * (1000 * 1000);
- final long start = System.nanoTime();
- final long deadline = start + nsToWait;
- boolean interrupted = false;
- try {
- for (final PrioritisedThread thread : this.threads) {
- for (;;) {
- if (!thread.isAlive()) {
- break;
- }
- final long current = System.nanoTime();
- if (current >= deadline) {
- return false;
- }
-
- try {
- thread.join(Math.max(1L, (deadline - current) / (1000 * 1000)));
- } catch (final InterruptedException ex) {
- if (interruptable) {
- throw ex;
- }
- interrupted = true;
- }
- }
- }
-
- return true;
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- public void shutdown(final boolean wait) {
- final ArrayList<PrioritisedPoolExecutorImpl> queuesToShutdown;
- synchronized (this.nonShutdownQueues) {
- this.shutdown = true;
- queuesToShutdown = new ArrayList<>(this.nonShutdownQueues);
- }
-
- for (final PrioritisedPoolExecutorImpl queue : queuesToShutdown) {
- queue.shutdown();
- }
-
- for (final PrioritisedThread thread : this.threads) {
- // none of these can be true or else NPE
- thread.close(false, false);
- }
-
- if (wait) {
- final ArrayList<PrioritisedPoolExecutorImpl> queues;
- synchronized (this.activeQueues) {
- queues = new ArrayList<>(this.activeQueues);
- }
- for (final PrioritisedPoolExecutorImpl queue : queues) {
- queue.waitUntilAllExecuted();
- }
- }
- }
-
- protected static final class PrioritisedThread extends PrioritisedQueueExecutorThread {
+ public static final class PrioritisedThreadAgent extends PrioritisedQueueExecutorThreadAgent implements Comparable<PrioritisedThreadAgent> { // Gale - base thread pool - chunk worker task queue
protected final PrioritisedThreadPool pool;
protected final AtomicBoolean alertedHighPriority = new AtomicBoolean();
- public PrioritisedThread(final PrioritisedThreadPool pool) {
- super(null);
+ // Gale start - base thread pool - chunk worker task queue
+ public PrioritisedThreadAgent(final PrioritisedThreadPool pool, BaseThread baseThread) {
+ // Gale end - base thread pool - chunk worker task queue
+ super(null, baseThread);
this.pool = pool;
}
- public boolean alertHighPriorityExecutor() {
- if (!this.notifyTasks()) {
- if (!this.alertedHighPriority.get()) {
- this.alertedHighPriority.set(true);
- }
- return false;
+ // Gale start - base thread pool - chunk worker task queue
+ public void alertHighPriorityExecutor() {
+ if (!this.alertedHighPriority.get()) {
+ this.alertedHighPriority.set(true);
}
-
- return true;
+ // Gale end - base thread pool - chunk worker task queue
}
private boolean isAlertedHighPriority() {
@@ -225,21 +99,19 @@ public final class PrioritisedThreadPool {
}
@Override
- protected boolean pollTasks() {
+ public boolean pollTasks() { // Gale - base thread pool - chunk worker task queue - protected -> public
final PrioritisedThreadPool pool = this.pool;
final TreeSet<PrioritisedPoolExecutorImpl> queues = this.pool.queues;
+ final Mutex queuesLock = this.pool.queuesLock; // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
boolean ret = false;
for (;;) {
- if (this.halted) {
- break;
- }
// try to find a queue
// note that if and ONLY IF the queues set is empty, this means there are no tasks for us to execute.
// so we can only break when it's empty
final PrioritisedPoolExecutorImpl queue;
// select queue
- synchronized (queues) {
+ try (var ignored = queuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
queue = queues.pollFirst();
if (queue == null) {
// no tasks to execute
@@ -249,7 +121,7 @@ public final class PrioritisedThreadPool {
queue.schedulingId = ++pool.schedulingIdGenerator;
// we own this queue now, so increment the executor count
// do we also need to push this queue up for grabs for another executor?
- if (++queue.concurrentExecutors < queue.maximumExecutors) {
+ if (++queue.concurrentExecutors < queue.maximumExecutors || queue.maximumExecutors == -1) { // Gale - base thread pool - chunk worker task queue
// re-add to queues
// it's very important this is done in the same synchronised block for polling, as this prevents
// us from possibly later adding a queue that should not exist in the set
@@ -268,9 +140,6 @@ public final class PrioritisedThreadPool {
final long deadline = start + pool.queueMaxHoldTime;
do {
try {
- if (this.halted) {
- break;
- }
if (!queue.executeTask()) {
// no more tasks, try next queue
break;
@@ -279,11 +148,11 @@ public final class PrioritisedThreadPool {
} catch (final ThreadDeath death) {
throw death; // goodbye world...
} catch (final Throwable throwable) {
- LOGGER.error("Exception thrown from thread '" + this.getName() + "' in queue '" + queue.toString() + "'", throwable);
+ LOGGER.error("Exception thrown from thread '" + this.baseThread.getName() + "' in queue '" + queue.toString() + "'", throwable);
}
- } while (!this.isAlertedHighPriority() && System.nanoTime() <= deadline);
+ } while (!this.isAlertedHighPriority() && System.nanoTime() <= deadline && BaseThreadActivation.tierInExcessOrdinal > BaseTaskQueueTier.LOW_PRIORITY_ASYNC.ordinal); // Gale - base thread pool - chunk worker task queue
- synchronized (queues) {
+ try (var ignored = queuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
// decrement executors, we are no longer executing
if (queue.isQueued) {
queues.remove(queue);
@@ -306,6 +175,12 @@ public final class PrioritisedThreadPool {
return ret;
}
+
+ @Override
+ public int compareTo(@NotNull PrioritisedThreadPool.PrioritisedThreadAgent o) {
+ return Integer.compare(this.baseThread.baseThreadIndex, o.baseThread.baseThreadIndex);
+ }
+
}
public interface PrioritisedPoolExecutor extends PrioritisedExecutor {
@@ -322,7 +197,7 @@ public final class PrioritisedThreadPool {
public boolean isActive();
}
- protected static final class PrioritisedPoolExecutorImpl extends PrioritisedThreadedTaskQueue implements PrioritisedPoolExecutor {
+ public static final class PrioritisedPoolExecutorImpl extends PrioritisedThreadedTaskQueue implements PrioritisedPoolExecutor { // Gale - base thread pool - chunk worker task queue - protected -> public
protected final PrioritisedThreadPool pool;
protected final long[] priorityCounts = new long[Priority.TOTAL_SCHEDULABLE_PRIORITIES];
@@ -369,7 +244,10 @@ public final class PrioritisedThreadPool {
public void halt() {
final PrioritisedThreadPool pool = this.pool;
final TreeSet<PrioritisedPoolExecutorImpl> queues = pool.queues;
- synchronized (queues) {
+ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues
+ final Mutex queuesLock = pool.queuesLock;
+ try (var ignored = queuesLock.withSpinLock()) {
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
if (this.isHalted) {
return;
}
@@ -379,10 +257,10 @@ public final class PrioritisedThreadPool {
this.isQueued = false;
}
}
- synchronized (pool.nonShutdownQueues) {
+ try (var ignored = pool.nonShutdownQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
pool.nonShutdownQueues.remove(this);
}
- synchronized (pool.activeQueues) {
+ try (var ignored = pool.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
pool.activeQueues.remove(this);
}
}
@@ -391,12 +269,15 @@ public final class PrioritisedThreadPool {
public boolean isActive() {
final PrioritisedThreadPool pool = this.pool;
final TreeSet<PrioritisedPoolExecutorImpl> queues = pool.queues;
+ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues
+ final Mutex queuesLock = pool.queuesLock;
- synchronized (queues) {
+ try (var ignored = queuesLock.withSpinLock()) {
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
if (this.concurrentExecutors != 0) {
return true;
}
- synchronized (pool.activeQueues) {
+ try (var ignored2 = pool.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
if (pool.activeQueues.contains(this)) {
return true;
}
@@ -469,8 +350,11 @@ public final class PrioritisedThreadPool {
final PrioritisedThreadPool pool = this.pool;
final TreeSet<PrioritisedPoolExecutorImpl> queues = pool.queues;
+ // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues
+ final Mutex queuesLock = pool.queuesLock;
- synchronized (queues) {
+ try (var ignored = queuesLock.withSpinLock()) {
+ // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues
if (!this.isQueued) {
// see if we need to be queued
if (newPriority != null) {
@@ -478,7 +362,7 @@ public final class PrioritisedThreadPool {
this.schedulingId = ++pool.schedulingIdGenerator;
}
this.scheduledPriority = newPriority; // must be updated before queue add
- if (!this.isHalted && this.concurrentExecutors < this.maximumExecutors) {
+ if (!this.isHalted && (this.concurrentExecutors < this.maximumExecutors || this.maximumExecutors == -1)) { // Gale - base thread pool - chunk worker task queue
shouldNotifyHighPriority = newPriority.isHigherOrEqualPriority(Priority.HIGH);
queues.add(this);
this.isQueued = true;
@@ -511,19 +395,20 @@ public final class PrioritisedThreadPool {
}
if (newPriority == null && shutdown) {
- synchronized (pool.activeQueues) {
+ try (var ignored = pool.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
pool.activeQueues.remove(this);
}
}
// Wake up the number of executors we want
+ // Gale start - base thread pool - chunk worker task queue
if (executorsWanted > 0 || (shouldNotifyTasks | shouldNotifyHighPriority)) {
- int notified = 0;
- for (final PrioritisedThread thread : pool.threads) {
- if ((shouldNotifyHighPriority ? thread.alertHighPriorityExecutor() : thread.notifyTasks())
- && (++notified >= executorsWanted)) {
- break;
+ BaseTaskQueues.chunkWorker.newTaskWasAdded();
+ if (shouldNotifyHighPriority) {
+ for (final PrioritisedThreadAgent thread : pool.activeThreads) {
+ thread.alertHighPriorityExecutor();
}
+ // Gale end - base thread pool - chunk worker task queue
}
}
}
@@ -538,17 +423,18 @@ public final class PrioritisedThreadPool {
final PrioritisedThreadPool pool = this.pool;
// remove from active queues
- synchronized (pool.nonShutdownQueues) {
+ try (var ignored = pool.nonShutdownQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
pool.nonShutdownQueues.remove(this);
}
final TreeSet<PrioritisedPoolExecutorImpl> queues = pool.queues;
+ final Mutex queuesLock = pool.queuesLock; // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
// try and shift around our priority
- synchronized (queues) {
+ try (var ignored = queuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
if (this.scheduledPriority == null) {
// no tasks are queued, ensure we aren't in activeQueues
- synchronized (pool.activeQueues) {
+ try (var ignored2 = pool.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues
pool.activeQueues.remove(this);
}
diff --git a/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java b/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java
index f5c15d40094c2ddc6220b0595597d12103fcf425..79ef41d2bb30beee2355d1de3dc99c9e00d929d5 100644
--- a/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java
+++ b/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java
@@ -22,6 +22,7 @@ import net.minecraft.world.level.chunk.ChunkAccess;
import net.minecraft.world.level.chunk.ChunkStatus;
import net.minecraft.world.level.chunk.LevelChunk;
import org.bukkit.Bukkit;
+import org.galemc.gale.executor.thread.pool.BaseThreadPool;
import org.slf4j.Logger;
import java.io.File;
import java.util.ArrayDeque;
@@ -103,7 +104,7 @@ public final class ChunkTaskScheduler {
thread.setUncaughtExceptionHandler(io.papermc.paper.chunk.system.scheduling.NewChunkHolder.CHUNKSYSTEM_UNCAUGHT_EXCEPTION_HANDLER);
}, (long)(20.0e6)); // 20ms
- LOGGER.info("Chunk system is using " + newChunkSystemIOThreads + " I/O threads, " + newChunkSystemWorkerThreads + " worker threads, and gen parallelism of " + ChunkTaskScheduler.newChunkSystemGenParallelism + " threads");
+ LOGGER.info("Chunk system is using " + newChunkSystemIOThreads + " I/O thread" + (newChunkSystemIOThreads == 1 ? "" : "s")); // Gale - base thread pool - chunk worker task queue
}
public final ServerLevel world;
@@ -196,12 +197,14 @@ public final class ChunkTaskScheduler {
this.workers = workers;
final String worldName = world.getWorld().getName();
- this.genExecutor = workers.createExecutor("Chunk single-threaded generation executor for world '" + worldName + "'", 1);
+ this.genExecutor = workers.createExecutor("Chunk single-threaded generation executor for world '" + worldName + "'", false); // Gale - base thread pool - chunk worker task queue
// same as genExecutor, as there are race conditions between updating blocks in FEATURE status while lighting chunks
this.lightExecutor = this.genExecutor;
- this.parallelGenExecutor = newChunkSystemGenParallelism <= 1 ? this.genExecutor
- : workers.createExecutor("Chunk parallel generation executor for world '" + worldName + "'", newChunkSystemGenParallelism);
- this.loadExecutor = workers.createExecutor("Chunk load executor for world '" + worldName + "'", newChunkSystemLoadParallelism);
+ // Gale start - base thread pool - chunk worker task queue
+ this.parallelGenExecutor = BaseThreadPool.targetParallelism <= 2 ? this.genExecutor
+ : workers.createExecutor("Chunk parallel generation executor for world '" + worldName + "'", true);
+ this.loadExecutor = workers.createExecutor("Chunk load executor for world '" + worldName + "'", true);
+ // Gale end - base thread pool - chunk worker task queue
this.chunkHolderManager = new ChunkHolderManager(world, this);
}
diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java
index 77e39c713a4d022f05ac71e27abd9799587356f0..545f831b34fbaac1b698a90206e35a149f3606d6 100644
--- a/src/main/java/net/minecraft/server/MinecraftServer.java
+++ b/src/main/java/net/minecraft/server/MinecraftServer.java
@@ -1070,6 +1070,10 @@ public abstract class MinecraftServer extends MinecraftServerBlockableEventLoop
}
}
+ // Gale start - base thread pool - chunk worker task queue
+ LOGGER.info("Waiting for chunk worker tasks to finish...");
+ serverThread.runTasksUntil(null, () -> !BaseTaskQueues.chunkWorker.hasTasks(), null);
+ // Gale end - base thread pool - chunk worker task queue
this.saveAllChunks(false, true, false, true); // Paper - rewrite chunk system - move closing into here
this.isSaving = false;
diff --git a/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java b/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java
index abdec5529763b77126494ae0c2be9b48de900bc1..35233587de14cf52da30324df89d9ec76b9c09fe 100644
--- a/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java
+++ b/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java
@@ -9,7 +9,6 @@ import org.galemc.gale.executor.queue.BaseTaskQueues;
import org.galemc.gale.executor.thread.BaseThread;
import org.slf4j.Logger;
-
/**
* This class is a copy of {@link PrioritisedQueueExecutorThread}, with the notable difference
* that it does not extend {@link Thread}, but may be instantiated on its own, as an agent representing
diff --git a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java
index 0035d638667c6e0707ecf3e3c040f0123f8e68d5..afd25cb20200baea7c62cf6b3081e19e4188997a 100644
--- a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java
+++ b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java
@@ -75,7 +75,6 @@ public enum BaseTaskQueueTier {
/**
* A tier for queues that contain general tasks that must be performed at some point in time,
* asynchronously with respect to the {@link ServerThread} and the ticking of the server.
- * Execution of
*/
ASYNC(new AbstractTaskQueue[]{
// The cleaner queue has high priority because it releases resources back to a pool, thereby saving memory
@@ -86,7 +85,9 @@ public enum BaseTaskQueueTier {
* A tier for queues that contain tasks with the same considerations as {@link #ASYNC},
* but with a low priority.
*/
- LOW_PRIORITY_ASYNC(new AbstractTaskQueue[0], Integer.getInteger("gale.thread.priority.async.low", 3));
+ LOW_PRIORITY_ASYNC(new AbstractTaskQueue[]{
+ BaseTaskQueues.chunkWorker
+ }, Integer.getInteger("gale.thread.priority.async.low", 3));
/**
* Equal to {@link #ordinal()}.
diff --git a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java
index ed3ccf2e64539363a7be2d507c68c40b5913f75c..a12250e5aaed02995b7bf09a8018a93f27e42920 100644
--- a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java
+++ b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java
@@ -113,4 +113,9 @@ public final class BaseTaskQueues {
*/
public static final SimpleTaskQueue scheduledAsync = SimpleTaskQueue.allSpans("ScheduledAsync");
+ /**
+ * @see ChunkWorkerTaskQueue
+ */
+ public static final ChunkWorkerTaskQueue chunkWorker = new ChunkWorkerTaskQueue();
+
}
diff --git a/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java b/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java
new file mode 100644
index 0000000000000000000000000000000000000000..0fac4a8b238636adb826a3d1a8db204042fb8098
--- /dev/null
+++ b/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java
@@ -0,0 +1,111 @@
+// Gale - base thread pool - chunk worker task queue
+
+package org.galemc.gale.executor.queue;
+
+import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedQueueExecutorThread;
+import io.papermc.paper.chunk.system.scheduling.ChunkTaskScheduler;
+import org.galemc.gale.executor.TaskSpan;
+import org.galemc.gale.executor.annotation.YieldFree;
+import org.galemc.gale.executor.annotation.thread.AnyThreadSafe;
+import org.galemc.gale.executor.thread.BaseThread;
+import org.galemc.gale.executor.thread.pool.BaseThreadActivation;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This class provides access to, but does not store, the tasks related to chunk loading and chunk generation,
+ * that are scheduled and normally polled by the chunk system's {@link PrioritisedQueueExecutorThread}s.
+ * <br>
+ * Tasks provided by this queue are {@link TaskSpan#YIELDING}. // TODO Gale replace potential used synchronized blocks etc. by spin locks or yielding locks
+ *
+ * @author Martijn Muijsers under AGPL-3.0
+ */
+@AnyThreadSafe
+@YieldFree
+public final class ChunkWorkerTaskQueue implements AbstractTaskQueue {
+
+ /**
+ * Will be initialized in {@link #setTier}.
+ */
+ private BaseTaskQueueTier tier;
+
+ ChunkWorkerTaskQueue() {}
+
+ @Override
+ public String getName() {
+ return "ChunkWorker";
+ }
+
+ @Override
+ public boolean canBeYieldedTo() {
+ /*
+ A single returned chunk worker task typically performs a loop that executes as many smaller tasks as possible,
+ and which may keep running for a long time.
+ */
+ return false;
+ }
+
+ @Override
+ public boolean hasTasks() {
+ var workerThreads = ChunkTaskScheduler.workerThreads;
+ if (workerThreads == null) {
+ return false;
+ }
+ try (var ignored = workerThreads.queuesLock.withSpinLock()) {
+ for (var queue : workerThreads.queues) {
+ if (queue.hasScheduledUncompletedTasksVolatile()) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean hasTasks(TaskSpan span) {
+ return span == TaskSpan.YIELDING && this.hasTasks();
+ }
+
+ @Override
+ public boolean canHaveTasks(TaskSpan span) {
+ return span == TaskSpan.YIELDING;
+ }
+
+ @Override
+ public @Nullable Runnable poll(BaseThread currentThread) {
+ if (!this.hasTasks()) {
+ return null;
+ }
+ return () -> {
+ var workerAgent = currentThread.getChunkWorkerAgent();
+ ChunkTaskScheduler.workerThreads.activeThreads.add(workerAgent);
+ workerAgent.pollTasks();
+ ChunkTaskScheduler.workerThreads.activeThreads.remove(workerAgent);
+ };
+ }
+
+ @Override
+ public @Nullable Runnable pollTiny(BaseThread currentThread) {
+ return null;
+ }
+
+ @Override
+ public void add(Runnable task, TaskSpan span) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * To be called when a new task has been added to the underlying storage of this queue.
+ */
+ public void newTaskWasAdded() {
+ BaseThreadActivation.newTaskWasAdded(this.tier, TaskSpan.YIELDING, false, true);
+ }
+
+ @Override
+ public void setTier(BaseTaskQueueTier tier) {
+ if (this.tier != null) {
+ throw new IllegalStateException(this.getClass().getSimpleName() + ".tier was already initialized");
+ }
+ this.tier = tier;
+ }
+
+}
diff --git a/src/main/java/org/galemc/gale/executor/thread/BaseThread.java b/src/main/java/org/galemc/gale/executor/thread/BaseThread.java
index 4ec06cbff045bd42c1da5881cd0f2446cde39a7b..1a97a74e11a8c675980b3ec2e76b72057a203308 100644
--- a/src/main/java/org/galemc/gale/executor/thread/BaseThread.java
+++ b/src/main/java/org/galemc/gale/executor/thread/BaseThread.java
@@ -2,6 +2,8 @@
package org.galemc.gale.executor.thread;
+import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedThreadPool;
+import io.papermc.paper.chunk.system.scheduling.ChunkTaskScheduler;
import io.papermc.paper.util.TickThread;
import net.minecraft.server.MinecraftServer;
import org.galemc.gale.executor.TaskSpan;
@@ -16,6 +18,7 @@ import org.galemc.gale.executor.lock.YieldingLock;
import org.galemc.gale.executor.queue.AbstractTaskQueue;
import org.galemc.gale.executor.queue.BaseTaskQueueTier;
import org.galemc.gale.executor.thread.pool.*;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.TimeUnit;
@@ -742,6 +745,20 @@ public abstract class BaseThread extends Thread implements AbstractYieldingThrea
}
}
+ // Gale start - base thread pool - chunk worker task queue
+ /**
+ * Lazily computed value for {@link #getChunkWorkerAgent}.
+ */
+ private @Nullable PrioritisedThreadPool.PrioritisedThreadAgent chunkWorkerAgent;
+
+ public @NotNull PrioritisedThreadPool.PrioritisedThreadAgent getChunkWorkerAgent() {
+ if (this.chunkWorkerAgent == null) {
+ this.chunkWorkerAgent = new PrioritisedThreadPool.PrioritisedThreadAgent(ChunkTaskScheduler.workerThreads, this);
+ }
+ return this.chunkWorkerAgent;
+ }
+ // Gale end - base thread pool - chunk worker task queue
+
/**
* @return The current thread if it is a {@link BaseThread}, or null otherwise.
*/