From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: Martijn Muijsers Date: Mon, 30 Jan 2023 22:35:08 +0100 Subject: [PATCH] Run chunk worker tasks on base thread pool License: AGPL-3.0 (https://www.gnu.org/licenses/agpl-3.0.html) Gale - https://galemc.org diff --git a/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java b/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java index 26fa2caa18a9194e57574a4a7fa9f7a4265740e0..2fa56c06ec16817d5e35d5283b8e5a0d0e01d643 100644 --- a/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java +++ b/src/main/java/ca/spottedleaf/concurrentutil/executor/standard/PrioritisedThreadPool.java @@ -2,11 +2,17 @@ package ca.spottedleaf.concurrentutil.executor.standard; import com.mojang.logging.LogUtils; import it.unimi.dsi.fastutil.objects.ReferenceOpenHashSet; +import org.galemc.gale.concurrent.Mutex; +import org.galemc.gale.executor.chunksystem.PrioritisedQueueExecutorThreadAgent; +import org.galemc.gale.executor.queue.BaseTaskQueueTier; +import org.galemc.gale.executor.queue.BaseTaskQueues; +import org.galemc.gale.executor.thread.BaseThread; +import org.galemc.gale.executor.thread.pool.BaseThreadActivation; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.TreeSet; + +import java.util.*; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -14,13 +20,16 @@ public final class PrioritisedThreadPool { private static final Logger LOGGER = LogUtils.getLogger(); - protected final PrioritisedThread[] threads; - protected final TreeSet queues = new TreeSet<>(PrioritisedPoolExecutorImpl.comparator()); + public final Set activeThreads = new ConcurrentSkipListSet<>(); // Gale - base thread pool - chunk worker task queue + public final TreeSet queues = new TreeSet<>(PrioritisedPoolExecutorImpl.comparator()); // Gale - base thread pool - chunk worker task queue - protected -> public + public final Mutex queuesLock = Mutex.create(); // Gale - base thread pool - chunk worker task queue - spin lock for pool queues protected final String name; protected final long queueMaxHoldTime; protected final ReferenceOpenHashSet nonShutdownQueues = new ReferenceOpenHashSet<>(); + protected final Mutex nonShutdownQueuesLock = Mutex.create(); // Gale - base thread pool - chunk worker task queue - spin lock for pool queues protected final ReferenceOpenHashSet activeQueues = new ReferenceOpenHashSet<>(); + protected final Mutex activeQueuesLock = Mutex.create(); // Gale - base thread pool - chunk worker task queue - spin lock for pool queues protected boolean shutdown; @@ -46,41 +55,18 @@ public final class PrioritisedThreadPool { } this.name = name; this.queueMaxHoldTime = queueHoldTime; - - this.threads = new PrioritisedThread[threads]; - for (int i = 0; i < threads; ++i) { - this.threads[i] = new PrioritisedThread(this); - - // set default attributes - this.threads[i].setName("Prioritised thread for pool '" + name + "' #" + i); - this.threads[i].setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> { - LOGGER.error("Uncaught exception in thread " + thread.getName(), throwable); - }); - - // let thread modifier override defaults - if (threadModifier != null) { - threadModifier.accept(this.threads[i], Integer.valueOf(i)); - } - - // now the thread can start - this.threads[i].start(); - } } - public Thread[] getThreads() { - return Arrays.copyOf(this.threads, this.threads.length, Thread[].class); - } - - public PrioritisedPoolExecutor createExecutor(final String name, final int parallelism) { - synchronized (this.nonShutdownQueues) { + public PrioritisedPoolExecutor createExecutor(final String name, boolean supportsParallelism) { // Gale - base thread pool - chunk worker task queue + try (var ignored = nonShutdownQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues if (this.shutdown) { throw new IllegalStateException("Queue is shutdown: " + this.toString()); } - final PrioritisedPoolExecutorImpl ret = new PrioritisedPoolExecutorImpl(this, name, Math.min(Math.max(1, parallelism), this.threads.length)); + final PrioritisedPoolExecutorImpl ret = new PrioritisedPoolExecutorImpl(this, name, supportsParallelism ? -1 : 1); // Gale - base thread pool - chunk worker task queue this.nonShutdownQueues.add(ret); - synchronized (this.activeQueues) { + try (var ignored2 = this.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues this.activeQueues.add(ret); } @@ -88,136 +74,24 @@ public final class PrioritisedThreadPool { } } - /** - * Prevents creation of new queues, shutdowns all non-shutdown queues if specified - */ - public void halt(final boolean shutdownQueues) { - synchronized (this.nonShutdownQueues) { - this.shutdown = true; - } - if (shutdownQueues) { - final ArrayList queuesToShutdown; - synchronized (this.nonShutdownQueues) { - this.shutdown = true; - queuesToShutdown = new ArrayList<>(this.nonShutdownQueues); - } - - for (final PrioritisedPoolExecutorImpl queue : queuesToShutdown) { - queue.shutdown(); - } - } - - - for (final PrioritisedThread thread : this.threads) { - // can't kill queue, queue is null - thread.halt(false); - } - } - - /** - * Waits until all threads in this pool have shutdown, or until the specified time has passed. - * @param msToWait Maximum time to wait. - * @return {@code false} if the maximum time passed, {@code true} otherwise. - */ - public boolean join(final long msToWait) { - try { - return this.join(msToWait, false); - } catch (final InterruptedException ex) { - throw new IllegalStateException(ex); - } - } - - /** - * Waits until all threads in this pool have shutdown, or until the specified time has passed. - * @param msToWait Maximum time to wait. - * @return {@code false} if the maximum time passed, {@code true} otherwise. - * @throws InterruptedException If this thread is interrupted. - */ - public boolean joinInterruptable(final long msToWait) throws InterruptedException { - return this.join(msToWait, true); - } - - protected final boolean join(final long msToWait, final boolean interruptable) throws InterruptedException { - final long nsToWait = msToWait * (1000 * 1000); - final long start = System.nanoTime(); - final long deadline = start + nsToWait; - boolean interrupted = false; - try { - for (final PrioritisedThread thread : this.threads) { - for (;;) { - if (!thread.isAlive()) { - break; - } - final long current = System.nanoTime(); - if (current >= deadline) { - return false; - } - - try { - thread.join(Math.max(1L, (deadline - current) / (1000 * 1000))); - } catch (final InterruptedException ex) { - if (interruptable) { - throw ex; - } - interrupted = true; - } - } - } - - return true; - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - - public void shutdown(final boolean wait) { - final ArrayList queuesToShutdown; - synchronized (this.nonShutdownQueues) { - this.shutdown = true; - queuesToShutdown = new ArrayList<>(this.nonShutdownQueues); - } - - for (final PrioritisedPoolExecutorImpl queue : queuesToShutdown) { - queue.shutdown(); - } - - for (final PrioritisedThread thread : this.threads) { - // none of these can be true or else NPE - thread.close(false, false); - } - - if (wait) { - final ArrayList queues; - synchronized (this.activeQueues) { - queues = new ArrayList<>(this.activeQueues); - } - for (final PrioritisedPoolExecutorImpl queue : queues) { - queue.waitUntilAllExecuted(); - } - } - } - - protected static final class PrioritisedThread extends PrioritisedQueueExecutorThread { + public static final class PrioritisedThreadAgent extends PrioritisedQueueExecutorThreadAgent implements Comparable { // Gale - base thread pool - chunk worker task queue protected final PrioritisedThreadPool pool; protected final AtomicBoolean alertedHighPriority = new AtomicBoolean(); - public PrioritisedThread(final PrioritisedThreadPool pool) { - super(null); + // Gale start - base thread pool - chunk worker task queue + public PrioritisedThreadAgent(final PrioritisedThreadPool pool, BaseThread baseThread) { + // Gale end - base thread pool - chunk worker task queue + super(null, baseThread); this.pool = pool; } - public boolean alertHighPriorityExecutor() { - if (!this.notifyTasks()) { - if (!this.alertedHighPriority.get()) { - this.alertedHighPriority.set(true); - } - return false; + // Gale start - base thread pool - chunk worker task queue + public void alertHighPriorityExecutor() { + if (!this.alertedHighPriority.get()) { + this.alertedHighPriority.set(true); } - - return true; + // Gale end - base thread pool - chunk worker task queue } private boolean isAlertedHighPriority() { @@ -225,21 +99,19 @@ public final class PrioritisedThreadPool { } @Override - protected boolean pollTasks() { + public boolean pollTasks() { // Gale - base thread pool - chunk worker task queue - protected -> public final PrioritisedThreadPool pool = this.pool; final TreeSet queues = this.pool.queues; + final Mutex queuesLock = this.pool.queuesLock; // Gale - base thread pool - chunk worker task queue - spin lock for pool queues boolean ret = false; for (;;) { - if (this.halted) { - break; - } // try to find a queue // note that if and ONLY IF the queues set is empty, this means there are no tasks for us to execute. // so we can only break when it's empty final PrioritisedPoolExecutorImpl queue; // select queue - synchronized (queues) { + try (var ignored = queuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues queue = queues.pollFirst(); if (queue == null) { // no tasks to execute @@ -249,7 +121,7 @@ public final class PrioritisedThreadPool { queue.schedulingId = ++pool.schedulingIdGenerator; // we own this queue now, so increment the executor count // do we also need to push this queue up for grabs for another executor? - if (++queue.concurrentExecutors < queue.maximumExecutors) { + if (++queue.concurrentExecutors < queue.maximumExecutors || queue.maximumExecutors == -1) { // Gale - base thread pool - chunk worker task queue // re-add to queues // it's very important this is done in the same synchronised block for polling, as this prevents // us from possibly later adding a queue that should not exist in the set @@ -268,9 +140,6 @@ public final class PrioritisedThreadPool { final long deadline = start + pool.queueMaxHoldTime; do { try { - if (this.halted) { - break; - } if (!queue.executeTask()) { // no more tasks, try next queue break; @@ -279,11 +148,11 @@ public final class PrioritisedThreadPool { } catch (final ThreadDeath death) { throw death; // goodbye world... } catch (final Throwable throwable) { - LOGGER.error("Exception thrown from thread '" + this.getName() + "' in queue '" + queue.toString() + "'", throwable); + LOGGER.error("Exception thrown from thread '" + this.baseThread.getName() + "' in queue '" + queue.toString() + "'", throwable); } - } while (!this.isAlertedHighPriority() && System.nanoTime() <= deadline); + } while (!this.isAlertedHighPriority() && System.nanoTime() <= deadline && BaseThreadActivation.tierInExcessOrdinal > BaseTaskQueueTier.LOW_PRIORITY_ASYNC.ordinal); // Gale - base thread pool - chunk worker task queue - synchronized (queues) { + try (var ignored = queuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues // decrement executors, we are no longer executing if (queue.isQueued) { queues.remove(queue); @@ -306,6 +175,12 @@ public final class PrioritisedThreadPool { return ret; } + + @Override + public int compareTo(@NotNull PrioritisedThreadPool.PrioritisedThreadAgent o) { + return Integer.compare(this.baseThread.baseThreadIndex, o.baseThread.baseThreadIndex); + } + } public interface PrioritisedPoolExecutor extends PrioritisedExecutor { @@ -322,7 +197,7 @@ public final class PrioritisedThreadPool { public boolean isActive(); } - protected static final class PrioritisedPoolExecutorImpl extends PrioritisedThreadedTaskQueue implements PrioritisedPoolExecutor { + public static final class PrioritisedPoolExecutorImpl extends PrioritisedThreadedTaskQueue implements PrioritisedPoolExecutor { // Gale - base thread pool - chunk worker task queue - protected -> public protected final PrioritisedThreadPool pool; protected final long[] priorityCounts = new long[Priority.TOTAL_SCHEDULABLE_PRIORITIES]; @@ -369,7 +244,10 @@ public final class PrioritisedThreadPool { public void halt() { final PrioritisedThreadPool pool = this.pool; final TreeSet queues = pool.queues; - synchronized (queues) { + // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues + final Mutex queuesLock = pool.queuesLock; + try (var ignored = queuesLock.withSpinLock()) { + // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues if (this.isHalted) { return; } @@ -379,10 +257,10 @@ public final class PrioritisedThreadPool { this.isQueued = false; } } - synchronized (pool.nonShutdownQueues) { + try (var ignored = pool.nonShutdownQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues pool.nonShutdownQueues.remove(this); } - synchronized (pool.activeQueues) { + try (var ignored = pool.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues pool.activeQueues.remove(this); } } @@ -391,12 +269,15 @@ public final class PrioritisedThreadPool { public boolean isActive() { final PrioritisedThreadPool pool = this.pool; final TreeSet queues = pool.queues; + // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues + final Mutex queuesLock = pool.queuesLock; - synchronized (queues) { + try (var ignored = queuesLock.withSpinLock()) { + // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues if (this.concurrentExecutors != 0) { return true; } - synchronized (pool.activeQueues) { + try (var ignored2 = pool.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues if (pool.activeQueues.contains(this)) { return true; } @@ -469,8 +350,11 @@ public final class PrioritisedThreadPool { final PrioritisedThreadPool pool = this.pool; final TreeSet queues = pool.queues; + // Gale start - base thread pool - chunk worker task queue - spin lock for pool queues + final Mutex queuesLock = pool.queuesLock; - synchronized (queues) { + try (var ignored = queuesLock.withSpinLock()) { + // Gale end - base thread pool - chunk worker task queue - spin lock for pool queues if (!this.isQueued) { // see if we need to be queued if (newPriority != null) { @@ -478,7 +362,7 @@ public final class PrioritisedThreadPool { this.schedulingId = ++pool.schedulingIdGenerator; } this.scheduledPriority = newPriority; // must be updated before queue add - if (!this.isHalted && this.concurrentExecutors < this.maximumExecutors) { + if (!this.isHalted && (this.concurrentExecutors < this.maximumExecutors || this.maximumExecutors == -1)) { // Gale - base thread pool - chunk worker task queue shouldNotifyHighPriority = newPriority.isHigherOrEqualPriority(Priority.HIGH); queues.add(this); this.isQueued = true; @@ -511,19 +395,20 @@ public final class PrioritisedThreadPool { } if (newPriority == null && shutdown) { - synchronized (pool.activeQueues) { + try (var ignored = pool.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues pool.activeQueues.remove(this); } } // Wake up the number of executors we want + // Gale start - base thread pool - chunk worker task queue if (executorsWanted > 0 || (shouldNotifyTasks | shouldNotifyHighPriority)) { - int notified = 0; - for (final PrioritisedThread thread : pool.threads) { - if ((shouldNotifyHighPriority ? thread.alertHighPriorityExecutor() : thread.notifyTasks()) - && (++notified >= executorsWanted)) { - break; + BaseTaskQueues.chunkWorker.newTaskWasAdded(); + if (shouldNotifyHighPriority) { + for (final PrioritisedThreadAgent thread : pool.activeThreads) { + thread.alertHighPriorityExecutor(); } + // Gale end - base thread pool - chunk worker task queue } } } @@ -538,17 +423,18 @@ public final class PrioritisedThreadPool { final PrioritisedThreadPool pool = this.pool; // remove from active queues - synchronized (pool.nonShutdownQueues) { + try (var ignored = pool.nonShutdownQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues pool.nonShutdownQueues.remove(this); } final TreeSet queues = pool.queues; + final Mutex queuesLock = pool.queuesLock; // Gale - base thread pool - chunk worker task queue - spin lock for pool queues // try and shift around our priority - synchronized (queues) { + try (var ignored = queuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues if (this.scheduledPriority == null) { // no tasks are queued, ensure we aren't in activeQueues - synchronized (pool.activeQueues) { + try (var ignored2 = pool.activeQueuesLock.withSpinLock()) { // Gale - base thread pool - chunk worker task queue - spin lock for pool queues pool.activeQueues.remove(this); } diff --git a/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java b/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java index f5c15d40094c2ddc6220b0595597d12103fcf425..79ef41d2bb30beee2355d1de3dc99c9e00d929d5 100644 --- a/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java +++ b/src/main/java/io/papermc/paper/chunk/system/scheduling/ChunkTaskScheduler.java @@ -22,6 +22,7 @@ import net.minecraft.world.level.chunk.ChunkAccess; import net.minecraft.world.level.chunk.ChunkStatus; import net.minecraft.world.level.chunk.LevelChunk; import org.bukkit.Bukkit; +import org.galemc.gale.executor.thread.pool.BaseThreadPool; import org.slf4j.Logger; import java.io.File; import java.util.ArrayDeque; @@ -103,7 +104,7 @@ public final class ChunkTaskScheduler { thread.setUncaughtExceptionHandler(io.papermc.paper.chunk.system.scheduling.NewChunkHolder.CHUNKSYSTEM_UNCAUGHT_EXCEPTION_HANDLER); }, (long)(20.0e6)); // 20ms - LOGGER.info("Chunk system is using " + newChunkSystemIOThreads + " I/O threads, " + newChunkSystemWorkerThreads + " worker threads, and gen parallelism of " + ChunkTaskScheduler.newChunkSystemGenParallelism + " threads"); + LOGGER.info("Chunk system is using " + newChunkSystemIOThreads + " I/O thread" + (newChunkSystemIOThreads == 1 ? "" : "s")); // Gale - base thread pool - chunk worker task queue } public final ServerLevel world; @@ -196,12 +197,14 @@ public final class ChunkTaskScheduler { this.workers = workers; final String worldName = world.getWorld().getName(); - this.genExecutor = workers.createExecutor("Chunk single-threaded generation executor for world '" + worldName + "'", 1); + this.genExecutor = workers.createExecutor("Chunk single-threaded generation executor for world '" + worldName + "'", false); // Gale - base thread pool - chunk worker task queue // same as genExecutor, as there are race conditions between updating blocks in FEATURE status while lighting chunks this.lightExecutor = this.genExecutor; - this.parallelGenExecutor = newChunkSystemGenParallelism <= 1 ? this.genExecutor - : workers.createExecutor("Chunk parallel generation executor for world '" + worldName + "'", newChunkSystemGenParallelism); - this.loadExecutor = workers.createExecutor("Chunk load executor for world '" + worldName + "'", newChunkSystemLoadParallelism); + // Gale start - base thread pool - chunk worker task queue + this.parallelGenExecutor = BaseThreadPool.targetParallelism <= 2 ? this.genExecutor + : workers.createExecutor("Chunk parallel generation executor for world '" + worldName + "'", true); + this.loadExecutor = workers.createExecutor("Chunk load executor for world '" + worldName + "'", true); + // Gale end - base thread pool - chunk worker task queue this.chunkHolderManager = new ChunkHolderManager(world, this); } diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java index c2930cc38d02fb62cf7b1a8bdde53753cb4bc64e..29c86b4ee9b147a7645f6577ce22b7a2e6f0b213 100644 --- a/src/main/java/net/minecraft/server/MinecraftServer.java +++ b/src/main/java/net/minecraft/server/MinecraftServer.java @@ -1070,6 +1070,10 @@ public abstract class MinecraftServer extends MinecraftServerBlockableEventLoop } } + // Gale start - base thread pool - chunk worker task queue + LOGGER.info("Waiting for chunk worker tasks to finish..."); + serverThread.runTasksUntil(null, () -> !BaseTaskQueues.chunkWorker.hasTasks(), null); + // Gale end - base thread pool - chunk worker task queue this.saveAllChunks(false, true, false, true); // Paper - rewrite chunk system - move closing into here this.isSaving = false; diff --git a/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java b/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java index abdec5529763b77126494ae0c2be9b48de900bc1..35233587de14cf52da30324df89d9ec76b9c09fe 100644 --- a/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java +++ b/src/main/java/org/galemc/gale/executor/chunksystem/PrioritisedQueueExecutorThreadAgent.java @@ -9,7 +9,6 @@ import org.galemc.gale.executor.queue.BaseTaskQueues; import org.galemc.gale.executor.thread.BaseThread; import org.slf4j.Logger; - /** * This class is a copy of {@link PrioritisedQueueExecutorThread}, with the notable difference * that it does not extend {@link Thread}, but may be instantiated on its own, as an agent representing diff --git a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java index 0035d638667c6e0707ecf3e3c040f0123f8e68d5..afd25cb20200baea7c62cf6b3081e19e4188997a 100644 --- a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java +++ b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java @@ -75,7 +75,6 @@ public enum BaseTaskQueueTier { /** * A tier for queues that contain general tasks that must be performed at some point in time, * asynchronously with respect to the {@link ServerThread} and the ticking of the server. - * Execution of */ ASYNC(new AbstractTaskQueue[]{ // The cleaner queue has high priority because it releases resources back to a pool, thereby saving memory @@ -86,7 +85,9 @@ public enum BaseTaskQueueTier { * A tier for queues that contain tasks with the same considerations as {@link #ASYNC}, * but with a low priority. */ - LOW_PRIORITY_ASYNC(new AbstractTaskQueue[0], Integer.getInteger("gale.thread.priority.async.low", 3)); + LOW_PRIORITY_ASYNC(new AbstractTaskQueue[]{ + BaseTaskQueues.chunkWorker + }, Integer.getInteger("gale.thread.priority.async.low", 3)); /** * Equal to {@link #ordinal()}. diff --git a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java index ed3ccf2e64539363a7be2d507c68c40b5913f75c..a12250e5aaed02995b7bf09a8018a93f27e42920 100644 --- a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java +++ b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java @@ -113,4 +113,9 @@ public final class BaseTaskQueues { */ public static final SimpleTaskQueue scheduledAsync = SimpleTaskQueue.allSpans("ScheduledAsync"); + /** + * @see ChunkWorkerTaskQueue + */ + public static final ChunkWorkerTaskQueue chunkWorker = new ChunkWorkerTaskQueue(); + } diff --git a/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java b/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..9c36119b03ba9a20dc6994ef2d2704ebe9cc4800 --- /dev/null +++ b/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java @@ -0,0 +1,103 @@ +// Gale - base thread pool - chunk worker task queue + +package org.galemc.gale.executor.queue; + +import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedQueueExecutorThread; +import io.papermc.paper.chunk.system.scheduling.ChunkTaskScheduler; +import net.minecraft.server.level.ServerChunkCache; +import org.galemc.gale.executor.TaskSpan; +import org.galemc.gale.executor.annotation.YieldFree; +import org.galemc.gale.executor.annotation.thread.AnyThreadSafe; +import org.galemc.gale.executor.thread.BaseThread; +import org.galemc.gale.executor.thread.pool.BaseThreadActivation; +import org.jetbrains.annotations.Nullable; + +/** + * This class provides access to, but does not store, the tasks related to chunk loading and chunk generation, + * that are scheduled and normally polled by the chunk system's {@link PrioritisedQueueExecutorThread}s. + *
+ * Tasks provided by this queue are {@link TaskSpan#YIELDING}. // TODO Gale replace potential used synchronized blocks etc. by spin locks or yielding locks + * + * @author Martijn Muijsers under AGPL-3.0 + */ +@AnyThreadSafe +@YieldFree +public final class ChunkWorkerTaskQueue implements AbstractTaskQueue { + + /** + * Will be initialized in {@link #setTier}. + */ + private BaseTaskQueueTier tier; + + ChunkWorkerTaskQueue() {} + + @Override + public String getName() { + return "ChunkWorker"; + } + + @Override + public boolean hasTasks() { + var workerThreads = ChunkTaskScheduler.workerThreads; + if (workerThreads == null) { + return false; + } + try (var ignored = workerThreads.queuesLock.withSpinLock()) { + for (var queue : workerThreads.queues) { + if (queue.hasScheduledUncompletedTasksVolatile()) { + return true; + } + } + } + return false; + } + + @Override + public boolean hasTasks(TaskSpan span) { + return span == TaskSpan.YIELDING && this.hasTasks(); + } + + @Override + public boolean canHaveTasks(TaskSpan span) { + return span == TaskSpan.YIELDING; + } + + @Override + public @Nullable Runnable poll(BaseThread currentThread) { + if (!this.hasTasks()) { + return null; + } + return () -> { + var workerAgent = currentThread.getChunkWorkerAgent(); + ChunkTaskScheduler.workerThreads.activeThreads.add(workerAgent); + workerAgent.pollTasks(); + ChunkTaskScheduler.workerThreads.activeThreads.remove(workerAgent); + }; + } + + @Override + public @Nullable Runnable pollTiny(BaseThread currentThread) { + return null; + } + + @Override + public void add(Runnable task, TaskSpan span) { + throw new UnsupportedOperationException(); + } + + /** + * To be called when a new task has been added to the underlying storage of this queue. + */ + public void newTaskWasAdded() { + BaseThreadActivation.newTaskWasAdded(this.tier, TaskSpan.YIELDING, true); + } + + @Override + public void setTier(BaseTaskQueueTier tier) { + if (this.tier != null) { + throw new IllegalStateException(this.getClass().getSimpleName() + ".tier was already initialized"); + } + this.tier = tier; + } + +} diff --git a/src/main/java/org/galemc/gale/executor/thread/BaseThread.java b/src/main/java/org/galemc/gale/executor/thread/BaseThread.java index 17ae524053949073fcdcbccc53327c42ba1903c6..a01ce790927ff79071c582ca84d64207fd850dfa 100644 --- a/src/main/java/org/galemc/gale/executor/thread/BaseThread.java +++ b/src/main/java/org/galemc/gale/executor/thread/BaseThread.java @@ -2,6 +2,8 @@ package org.galemc.gale.executor.thread; +import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedThreadPool; +import io.papermc.paper.chunk.system.scheduling.ChunkTaskScheduler; import io.papermc.paper.util.TickThread; import net.minecraft.server.MinecraftServer; import org.galemc.gale.executor.TaskSpan; @@ -16,6 +18,7 @@ import org.galemc.gale.executor.lock.YieldingLock; import org.galemc.gale.executor.queue.AbstractTaskQueue; import org.galemc.gale.executor.queue.BaseTaskQueueTier; import org.galemc.gale.executor.thread.pool.*; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.concurrent.TimeUnit; @@ -699,6 +702,20 @@ public abstract class BaseThread extends Thread implements AbstractYieldingThrea } } + // Gale start - base thread pool - chunk worker task queue + /** + * Lazily computed value for {@link #getChunkWorkerAgent}. + */ + private @Nullable PrioritisedThreadPool.PrioritisedThreadAgent chunkWorkerAgent; + + public @NotNull PrioritisedThreadPool.PrioritisedThreadAgent getChunkWorkerAgent() { + if (this.chunkWorkerAgent == null) { + this.chunkWorkerAgent = new PrioritisedThreadPool.PrioritisedThreadAgent(ChunkTaskScheduler.workerThreads, this); + } + return this.chunkWorkerAgent; + } + // Gale end - base thread pool - chunk worker task queue + /** * @return The current thread if it is a {@link BaseThread}, or null otherwise. */