9
0
mirror of https://github.com/Dreeam-qwq/Gale.git synced 2026-01-03 22:16:41 +00:00

Pre-poll chunk cache tasks off the main thread

This commit is contained in:
Martijn Muijsers
2023-02-13 00:26:00 +01:00
parent 1fcd8018f9
commit 0601d22510
3 changed files with 214 additions and 70 deletions

View File

@@ -2969,10 +2969,10 @@ index 0000000000000000000000000000000000000000..44b8bd5fd9a3ee2e484c81104523ba95
+}
diff --git a/src/main/java/org/galemc/gale/executor/queue/AbstractTaskQueue.java b/src/main/java/org/galemc/gale/executor/queue/AbstractTaskQueue.java
new file mode 100644
index 0000000000000000000000000000000000000000..552e82a33c59261b06911b479400a7b11965bbd6
index 0000000000000000000000000000000000000000..633a9b7998b304057d1780b2a4a1f0bc7160c6f6
--- /dev/null
+++ b/src/main/java/org/galemc/gale/executor/queue/AbstractTaskQueue.java
@@ -0,0 +1,93 @@
@@ -0,0 +1,102 @@
+// Gale - base thread pool
+
+package org.galemc.gale.executor.queue;
@@ -3002,6 +3002,15 @@ index 0000000000000000000000000000000000000000..552e82a33c59261b06911b479400a7b1
+ String getName();
+
+ /**
+ * @return Whether this queue can ever be polled from while yielding.
+ * <br>
+ * Some queues should not be yielded to, because they contain tasks that may take excessively long.
+ */
+ default boolean canBeYieldedTo() {
+ return true;
+ }
+
+ /**
+ * @return Whether this queue has any tasks at all.
+ */
+ boolean hasTasks();
@@ -3068,7 +3077,7 @@ index 0000000000000000000000000000000000000000..552e82a33c59261b06911b479400a7b1
+}
diff --git a/src/main/java/org/galemc/gale/executor/queue/AllLevelsScheduledTaskQueue.java b/src/main/java/org/galemc/gale/executor/queue/AllLevelsScheduledTaskQueue.java
new file mode 100644
index 0000000000000000000000000000000000000000..657c3663ed54043e7e4e6660d34903ef746fd8e7
index 0000000000000000000000000000000000000000..b4172f285fbed1f314891b2f729aa2dc27b9ab9b
--- /dev/null
+++ b/src/main/java/org/galemc/gale/executor/queue/AllLevelsScheduledTaskQueue.java
@@ -0,0 +1,133 @@
@@ -3193,7 +3202,7 @@ index 0000000000000000000000000000000000000000..657c3663ed54043e7e4e6660d34903ef
+ * To be called when a new task has been added to the underlying storage of this queue.
+ */
+ public void newTaskWasAdded() {
+ BaseThreadActivation.newTaskWasAdded(this.tier, this.span, onlyNotifyBaseThreadPoolOfNewTasksIfLastTimeIsTooLongAgo);
+ BaseThreadActivation.newTaskWasAdded(this.tier, this.span, true, onlyNotifyBaseThreadPoolOfNewTasksIfLastTimeIsTooLongAgo);
+ }
+
+ @Override
@@ -4438,10 +4447,10 @@ index 0000000000000000000000000000000000000000..eab769d7319f26db1f4db9599a3c263c
+}
diff --git a/src/main/java/org/galemc/gale/executor/thread/BaseThread.java b/src/main/java/org/galemc/gale/executor/thread/BaseThread.java
new file mode 100644
index 0000000000000000000000000000000000000000..76ca22e30b41e4a39ed215cfb03c9a793eefcc09
index 0000000000000000000000000000000000000000..4ec06cbff045bd42c1da5881cd0f2446cde39a7b
--- /dev/null
+++ b/src/main/java/org/galemc/gale/executor/thread/BaseThread.java
@@ -0,0 +1,760 @@
@@ -0,0 +1,765 @@
+// Gale - base thread pool
+
+package org.galemc.gale.executor.thread;
@@ -4853,6 +4862,11 @@ index 0000000000000000000000000000000000000000..76ca22e30b41e4a39ed215cfb03c9a79
+ @YieldFree
+ private @Nullable Runnable pollTaskFromTier(BaseTaskQueueTier tier, boolean tinyOnly) {
+ for (var queue : tier.taskQueues) {
+ // Check whether we can not yield to the queue, if we are yielding
+ boolean canQueueBeYieldedTo = queue.canBeYieldedTo();
+ if (!canQueueBeYieldedTo && this.yieldDepth > 0) {
+ continue;
+ }
+ Runnable task = tinyOnly ? queue.pollTiny(this) : queue.poll(this);
+ if (task != null) {
+ this.lastPolledTaskTier = tier;
@@ -4865,7 +4879,7 @@ index 0000000000000000000000000000000000000000..76ca22e30b41e4a39ed215cfb03c9a79
+ for (int spanI = 0; spanI < TaskSpan.length; spanI++) {
+ TaskSpan span = TaskSpan.VALUES[spanI];
+ if (queue.canHaveTasks(span)) {
+ int oldTasks = BaseThreadActivation.thereMayBeTasks[tier.ordinal][spanI].get();
+ int oldTasks = BaseThreadActivation.thereMayBeTasks[tier.ordinal][spanI][canQueueBeYieldedTo ? 1 : 0].get();
+ if (oldTasks > 0) {
+ if (!queue.hasTasks(span)) {
+ boolean tierHasNoTasksForSpan = true;
@@ -4881,7 +4895,7 @@ index 0000000000000000000000000000000000000000..76ca22e30b41e4a39ed215cfb03c9a79
+ }
+ if (tierHasNoTasksForSpan) {
+ // Set thereMayBeTasks to false, but only if it did not change in the meantime
+ BaseThreadActivation.thereMayBeTasks[tier.ordinal][spanI].compareAndSet(oldTasks, 0);
+ BaseThreadActivation.thereMayBeTasks[tier.ordinal][spanI][canQueueBeYieldedTo ? 1 : 0].compareAndSet(oldTasks, 0);
+ }
+ }
+ }
@@ -5675,10 +5689,10 @@ index 0000000000000000000000000000000000000000..77fe10e51b00115da520cfc211bf84ba
+}
diff --git a/src/main/java/org/galemc/gale/executor/thread/pool/BaseThreadActivation.java b/src/main/java/org/galemc/gale/executor/thread/pool/BaseThreadActivation.java
new file mode 100644
index 0000000000000000000000000000000000000000..ba9a1ffca55354a35bcc7b45227dddecd335da94
index 0000000000000000000000000000000000000000..65ad5020c5c5953c801fb6c31416e8658720e15f
--- /dev/null
+++ b/src/main/java/org/galemc/gale/executor/thread/pool/BaseThreadActivation.java
@@ -0,0 +1,648 @@
@@ -0,0 +1,659 @@
+// Gale - base thread pool
+
+package org.galemc.gale.executor.thread.pool;
@@ -5688,6 +5702,7 @@ index 0000000000000000000000000000000000000000..ba9a1ffca55354a35bcc7b45227dddec
+import org.galemc.gale.executor.annotation.YieldFree;
+import org.galemc.gale.executor.annotation.thread.AnyThreadSafe;
+import org.galemc.gale.executor.lock.YieldingLock;
+import org.galemc.gale.executor.queue.AbstractTaskQueue;
+import org.galemc.gale.executor.queue.BaseTaskQueueTier;
+import org.galemc.gale.executor.thread.BaseThread;
+import org.galemc.gale.executor.thread.ServerThread;
@@ -5780,19 +5795,22 @@ index 0000000000000000000000000000000000000000..ba9a1ffca55354a35bcc7b45227dddec
+ private static final int[] numberOfThreadsIntendedToBeActiveForTier = new int[BaseTaskQueueTier.length];
+
+ /**
+ * An array indicating per {@link BaseTaskQueueTier} (indexed by their {@link BaseTaskQueueTier#ordinal})
+ * per {@link TaskSpan} (indexed by their {@link TaskSpan#ordinal}) whether there may be tasks
+ * An array indicating, per {@link BaseTaskQueueTier} (indexed by their {@link BaseTaskQueueTier#ordinal})
+ * per {@link TaskSpan} (indexed by their {@link TaskSpan#ordinal}) per whether for queues that allow being
+ * yielded to ({@link AbstractTaskQueue#canBeYieldedTo()}) (1) or not (0), whether there may be tasks
+ * for that tier and span, indicated by whether the value is positive (indicating true) or 0 (indicating false).
+ * It is always incremented before calling {@link #update()} due to new tasks being added.
+ * If it is 0, it is certain that either there are no queued task for the tier, or
+ * a task has just been added to the queue and this value has not yet been set to true, but will be due
+ * to a {@link #newTaskWasAdded} call, which is then followed by a {@link #callForUpdate()} call.
+ */
+ public static final AtomicInteger[][] thereMayBeTasks = new AtomicInteger[BaseTaskQueueTier.length][TaskSpan.length];
+ public static final AtomicInteger[][][] thereMayBeTasks = new AtomicInteger[BaseTaskQueueTier.length][TaskSpan.length][2];
+ static {
+ for (int tierI = 0; tierI < BaseTaskQueueTier.length; tierI++) {
+ for (int spanI = 0; spanI < TaskSpan.length; spanI++) {
+ thereMayBeTasks[tierI][spanI] = new AtomicInteger();
+ for (int canBeYieldedTo = 0; canBeYieldedTo <= 1; canBeYieldedTo++) {
+ thereMayBeTasks[tierI][spanI][canBeYieldedTo] = new AtomicInteger();
+ }
+ }
+ }
+ }
@@ -5811,10 +5829,10 @@ index 0000000000000000000000000000000000000000..ba9a1ffca55354a35bcc7b45227dddec
+ }
+
+ /**
+ * @see #newTaskWasAdded(BaseTaskQueueTier, TaskSpan, boolean)
+ * @see #newTaskWasAdded(BaseTaskQueueTier, TaskSpan, boolean, boolean)
+ */
+ public static void newTaskWasAdded(BaseTaskQueueTier tier, TaskSpan span) {
+ newTaskWasAdded(tier, span, false);
+ newTaskWasAdded(tier, span, true, false);
+ }
+
+ /**
@@ -5831,9 +5849,9 @@ index 0000000000000000000000000000000000000000..ba9a1ffca55354a35bcc7b45227dddec
+ * due to tasks it can poll being available, but then upon activation, acquiring a {@link YieldingLock} it was
+ * waiting for instead.
+ */
+ public static void newTaskWasAdded(BaseTaskQueueTier tier, TaskSpan span, boolean onlyIfLastTimeIsTooLongAgo) {
+ public static void newTaskWasAdded(BaseTaskQueueTier tier, TaskSpan span, boolean canBeYieldedTo, boolean onlyIfLastTimeIsTooLongAgo) {
+
+ if (thereMayBeTasks[tier.ordinal][span.ordinal].getAndIncrement() == 0) {
+ if (thereMayBeTasks[tier.ordinal][span.ordinal][canBeYieldedTo ? 1 : 0].getAndIncrement() == 0) {
+ // Always call update() if we just set the thereMayBeTasks value to true
+ onlyIfLastTimeIsTooLongAgo = false;
+ }
@@ -6013,7 +6031,7 @@ index 0000000000000000000000000000000000000000..ba9a1ffca55354a35bcc7b45227dddec
+ if (isServerThread) {
+ // The server thread can be activated whenever there are any non-yielding tasks
+ for (TaskSpan span : TaskSpan.NON_YIELDING_VALUES) {
+ if (thereMayBeTasks[BaseTaskQueueTier.SERVER.ordinal][span.ordinal].get() > 0) {
+ if (thereMayBeTasks[BaseTaskQueueTier.SERVER.ordinal][span.ordinal][0].get() + thereMayBeTasks[BaseTaskQueueTier.SERVER.ordinal][span.ordinal][1].get() > 0) {
+ return true;
+ }
+ }
@@ -6200,10 +6218,15 @@ index 0000000000000000000000000000000000000000..ba9a1ffca55354a35bcc7b45227dddec
+ thread), or if there is a thread already at this exact tier that is waiting for a YieldingLock.
+ */
+ boolean thereAreTasks = false;
+ boolean thereAreOnlyTasksThatCanNotBeYieldedTo = false;
+ for (int spanI = 0; spanI < TaskSpan.length; spanI++) {
+ if (thereMayBeTasks[tierI][spanI].get() > 0) {
+ if (thereMayBeTasks[tierI][spanI][1].get() > 0) {
+ thereAreTasks = true;
+ break;
+ } else if (thereMayBeTasks[tierI][spanI][0].get() > 0) {
+ thereAreTasks = true;
+ thereAreOnlyTasksThatCanNotBeYieldedTo = true;
+ break;
+ }
+ }
+ if (thereAreTasks || !threadsWaitingForUnlockedLockForTier[tierI].isEmpty()) {
@@ -6262,28 +6285,30 @@ index 0000000000000000000000000000000000000000..ba9a1ffca55354a35bcc7b45227dddec
+ var highestTierOfTaskOnStackOrdinalOrLength = highestTierOfTaskOnStack == null ? BaseTaskQueueTier.length : highestTierOfTaskOnStack.ordinal;
+ boolean isThreadWaitingForAvailableYieldingLock = lockWaitingFor != null && !lockWaitingFor.isLocked() && lockWaitingFor.canBeSignalledFor;
+ if (isThreadWaitingForAvailableYieldingLock || highestTierOfTaskOnStack == null || highestTierOfTaskOnStack.ordinal >= tierI) {
+ boolean isBestChoice = false;
+ int yieldDepth = thread.yieldDepth;
+ int yieldPotential = thread.maximumYieldDepth - yieldDepth;
+ if (threadIToUpdate == -1) {
+ isBestChoice = true;
+ } else if (isThreadWaitingForAvailableYieldingLock != threadIToUpdateIsWaitingForAvailableYieldingLock) {
+ isBestChoice = isThreadWaitingForAvailableYieldingLock;
+ } else if (threadIToUpdateYieldDepth == 0 && yieldDepth != 0) {
+ isBestChoice = true;
+ } else if (yieldDepth != 0) {
+ if (yieldPotential > threadIToUpdateYieldPotential) {
+ if (!thereAreOnlyTasksThatCanNotBeYieldedTo || yieldDepth == 0) {
+ boolean isBestChoice = false;
+ int yieldPotential = thread.maximumYieldDepth - yieldDepth;
+ if (threadIToUpdate == -1) {
+ isBestChoice = true;
+ } else if (highestTierOfTaskOnStackOrdinalOrLength > threadIToUpdateTierOrdinalOrLength) {
+ } else if (isThreadWaitingForAvailableYieldingLock != threadIToUpdateIsWaitingForAvailableYieldingLock) {
+ isBestChoice = isThreadWaitingForAvailableYieldingLock;
+ } else if (threadIToUpdateYieldDepth == 0 && yieldDepth != 0) {
+ isBestChoice = true;
+ } else if (yieldDepth != 0) {
+ if (yieldPotential > threadIToUpdateYieldPotential) {
+ isBestChoice = true;
+ } else if (highestTierOfTaskOnStackOrdinalOrLength > threadIToUpdateTierOrdinalOrLength) {
+ isBestChoice = true;
+ }
+ }
+ if (isBestChoice) {
+ threadIToUpdate = threadI;
+ threadIToUpdateIsWaitingForAvailableYieldingLock = isThreadWaitingForAvailableYieldingLock;
+ threadIToUpdateYieldDepth = yieldDepth;
+ threadIToUpdateYieldPotential = yieldPotential;
+ threadIToUpdateTierOrdinalOrLength = highestTierOfTaskOnStackOrdinalOrLength;
+ }
+ }
+ if (isBestChoice) {
+ threadIToUpdate = threadI;
+ threadIToUpdateIsWaitingForAvailableYieldingLock = isThreadWaitingForAvailableYieldingLock;
+ threadIToUpdateYieldDepth = yieldDepth;
+ threadIToUpdateYieldPotential = yieldPotential;
+ threadIToUpdateTierOrdinalOrLength = highestTierOfTaskOnStackOrdinalOrLength;
+ }
+ }
+ }

View File

@@ -378,10 +378,10 @@ index 949feba1264bcafb8dc2dcecd0a566fea80a2ba0..628c33ee1693c9c7f441ab4c8881c50a
}
diff --git a/src/main/java/org/galemc/gale/executor/ClosestChunkBlockableEventLoop.java b/src/main/java/org/galemc/gale/executor/ClosestChunkBlockableEventLoop.java
new file mode 100644
index 0000000000000000000000000000000000000000..819618b7d02f739e9423099961864698808d3a5d
index 0000000000000000000000000000000000000000..43f7b3303140b19fa744f394dab15e8a5dafbaf7
--- /dev/null
+++ b/src/main/java/org/galemc/gale/executor/ClosestChunkBlockableEventLoop.java
@@ -0,0 +1,448 @@
@@ -0,0 +1,559 @@
+// Gale - base thread pool - chunk-sorted cache tasks
+
+package org.galemc.gale.executor;
@@ -399,6 +399,7 @@ index 0000000000000000000000000000000000000000..819618b7d02f739e9423099961864698
+import org.galemc.gale.executor.annotation.thread.AnyThreadSafe;
+import org.galemc.gale.executor.thread.ServerThread;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
@@ -486,7 +487,8 @@ index 0000000000000000000000000000000000000000..819618b7d02f739e9423099961864698
+ /**
+ * The last known distance for a chunk, by their {@linkplain #getTightlyPackedXZ(int, int) chunk key}.
+ * <br>
+ * Only contains values for chunks that have tasks.
+ * Only contains values for chunks that have tasks in {@link #tasksPerChunk}.
+ * Does not contain a value for the {@link #prepolledRunnable}.
+ * For other tasks, the default return value of {@link Long2IntMap#get} is -1.
+ */
+ @Guarded("#lock")
@@ -533,6 +535,26 @@ index 0000000000000000000000000000000000000000..819618b7d02f739e9423099961864698
+ */
+ private volatile boolean serverThreadWantsLockToAddChunkDistanceUpdates;
+
+ /**
+ * A pre-polled task to increase the speed of {@link #pollTask()} calls made by the server thread.
+ */
+ @Guarded("#lock")
+ private @Nullable R prepolledRunnable;
+
+ /**
+ * The value of {@link #getTightlyPackedXZ(int, int)} for the {@link #prepolledRunnable}, if it is not null.
+ * Otherwise, an arbitrary value.
+ */
+ @Guarded("#lock")
+ private @Nullable long prepolledRunnablePackedXZ;
+
+ /**
+ * The last known distance for the {@link #prepolledRunnable}, if it is not null.
+ * Otherwise, an arbitrary value.
+ */
+ @Guarded("#lock")
+ private @Nullable int prepolledRunnableDistance;
+
+ public ClosestChunkBlockableEventLoop(String name) {
+ super(name);
+ }
@@ -570,11 +592,15 @@ index 0000000000000000000000000000000000000000..819618b7d02f739e9423099961864698
+ * This method must only be called while {@link #lock} is held.
+ */
+ private void processChunkDistanceUpdates() {
+ boolean madeChangesSinceLastPrepoll = false;
+ boolean isNonServerThread = !(Thread.currentThread() instanceof ServerThread);
+ while (this.chunkDistanceUpdateLength > 0) {
+
+ // Let the server thread add new chunk distance updates
+ if (isNonServerThread && this.serverThreadWantsLockToAddChunkDistanceUpdates) {
+ if (madeChangesSinceLastPrepoll) {
+ this.preparePrepolledRunnable();
+ }
+ this.lock.release();
+ while (this.serverThreadWantsLockToAddChunkDistanceUpdates) {
+ Thread.onSpinWait();
@@ -590,13 +616,26 @@ index 0000000000000000000000000000000000000000..819618b7d02f739e9423099961864698
+
+ // Apply the change
+ long packedXZ = getTightlyPackedXZ(chunkX, chunkZ);
+ // Apply the change to the pre-polled task
+ if (this.prepolledRunnablePackedXZ == packedXZ && this.prepolledRunnable != null) {
+ this.prepolledRunnableDistance = newDistance;
+ }
+ // If we don't have tasks for this queue, skip applying the change to the queue
+ int oldDistance = this.distancePerChunk.get(packedXZ);
+ if (oldDistance == -1) {
+ return;
+ }
+ long oldPackedXZWithDistance = getTightlyPackedXZWithDistance(packedXZ, oldDistance);
+ long newPackedXZWithDistance = getTightlyPackedXZWithDistance(packedXZ, newDistance);
+ this.distancePerChunk.put(packedXZ, newDistance);
+ this.chunkQueue.remove(oldPackedXZWithDistance);
+ this.chunkQueue.add(newPackedXZWithDistance);
+
+ madeChangesSinceLastPrepoll = true;
+
+ }
+ if (madeChangesSinceLastPrepoll) {
+ this.preparePrepolledRunnable();
+ }
+ }
+
@@ -641,6 +680,35 @@ index 0000000000000000000000000000000000000000..819618b7d02f739e9423099961864698
+ }
+ }
+
+ /**
+ * Sets {@link #prepolledRunnable} appropriately,
+ * potentially returning the previous value to the {@link #tasksPerChunk}.
+ * <br>
+ * This method must only be called while {@link #lock} is held.
+ */
+ private void preparePrepolledRunnable() {
+ if (this.prepolledRunnable == null) {
+ this.pollFromQueueIntoPrepolled();
+ return;
+ }
+ if (this.chunkQueue.isEmpty()) {
+ return;
+ }
+ long firstPackedXZWithDistanceInQueue = this.chunkQueue.firstLong();
+ int firstDistanceInQueue = unpackTightlyPackedDistance(firstPackedXZWithDistanceInQueue);
+ // Swap the pre-polled task if necessary
+ if (firstDistanceInQueue < this.prepolledRunnableDistance) {
+ // Return the pre-polled task to the queue
+ long packedXZWithDistance = getTightlyPackedXZWithDistance(this.prepolledRunnablePackedXZ, this.prepolledRunnableDistance);
+ this.tasksPerChunk.computeIfAbsent(this.prepolledRunnablePackedXZ, $ -> this.provisionTaskQueue()).add(this.prepolledRunnable);
+ this.chunkQueue.add(packedXZWithDistance);
+ this.distancePerChunk.putIfAbsent(this.prepolledRunnablePackedXZ, this.prepolledRunnableDistance);
+ this.prepolledRunnable = null;
+ // Set a new pre-polled task
+ this.pollFromQueueIntoPrepolled();
+ }
+ }
+
+ public final Executor createExecutorForChunk(int chunkX, int chunkZ) {
+ return command -> this.execute(chunkX, chunkZ, command);
+ }
@@ -741,17 +809,36 @@ index 0000000000000000000000000000000000000000..819618b7d02f739e9423099961864698
+ int computedDistance = this.areaMap.getNearestObjectDistance(chunkX, chunkZ);
+ int computedDistanceInRange = computedDistance < 0 || computedDistance >= 512 ? 511 : computedDistance;
+ try (var ignored = this.lock.withSpinLock()) {
+ this.processChunkDistanceUpdates();
+ int distance = this.distancePerChunk.get(packedXZ);
+ if (distance == -1) {
+ distance = computedDistanceInRange;
+ this.distancePerChunk.put(packedXZ, computedDistanceInRange);
+ if (this.prepolledRunnable == null && this.chunkQueue.isEmpty()) {
+ // Set the pre-polled runnable right away
+ this.prepolledRunnable = runnable;
+ this.prepolledRunnablePackedXZ = packedXZ;
+ this.prepolledRunnableDistance = computedDistanceInRange;
+ } else {
+ this.processChunkDistanceUpdates();
+ int distance = this.distancePerChunk.get(packedXZ);
+ if (distance == -1 && packedXZ == this.prepolledRunnablePackedXZ && this.prepolledRunnable != null) {
+ // Use the value from the pre-polled task
+ distance = this.prepolledRunnableDistance;
+ // Keep it consistent with the queue
+ this.distancePerChunk.put(packedXZ, computedDistanceInRange);
+ }
+ if (distance == -1) {
+ // Set a known distance
+ distance = computedDistanceInRange;
+ this.distancePerChunk.put(packedXZ, computedDistanceInRange);
+ // Keep it consistent with the pre-polled task
+ if (this.prepolledRunnablePackedXZ == packedXZ) {
+ this.prepolledRunnableDistance = computedDistanceInRange;
+ }
+ }
+ long packedXZWithDistance = getTightlyPackedXZWithDistance(packedXZ, distance);
+ this.tasksPerChunk.computeIfAbsent(packedXZ, $ -> this.provisionTaskQueue()).add(runnable);
+ this.chunkQueue.add(packedXZWithDistance);
+ this.preparePrepolledRunnable();
+ }
+ long packedXZWithDistance = getTightlyPackedXZWithDistance(packedXZ, distance);
+ this.tasksPerChunk.computeIfAbsent(packedXZ, $ -> this.provisionTaskQueue()).add(runnable);
+ //noinspection NonAtomicOperationOnVolatileField
+ this.pendingTaskCount++;
+ this.chunkQueue.add(packedXZWithDistance);
+ }
+ }
+
@@ -792,6 +879,36 @@ index 0000000000000000000000000000000000000000..819618b7d02f739e9423099961864698
+ this.tasksPerChunk.clear();
+ this.pendingTaskCount = 0;
+ this.chunkQueue.clear();
+ this.prepolledRunnable = null;
+ }
+ }
+
+ /**
+ * Polls from the {@link #tasksPerChunk}, without checking {@link #prepolledRunnable},
+ * and stores the result in the {@link #prepolledRunnable}. If no task is polled, {@link #prepolledRunnable}
+ * is not modified (particularly, it is not cleared),
+ * so this task must only be called while {@link #prepolledRunnable} is null.
+ * <br>
+ * This method will not make a call to {@link #processChunkDistanceUpdates()}: if necessary, such a call
+ * must be made beforehand.
+ * <br>
+ * This method must only be called while {@link #lock} is held.
+ */
+ private void pollFromQueueIntoPrepolled() {
+ if (this.chunkQueue.isEmpty()) {
+ return;
+ }
+ long packedXZWithDistance = this.chunkQueue.firstLong();
+ long packedXZ = stripTightlyPackedDistance(packedXZWithDistance);
+ Queue<R> tasks = this.tasksPerChunk.get(packedXZ);
+ this.prepolledRunnable = tasks.poll();
+ this.prepolledRunnablePackedXZ = packedXZ;
+ this.prepolledRunnableDistance = unpackTightlyPackedDistance(packedXZWithDistance);
+ if (tasks.isEmpty()) {
+ this.distancePerChunk.remove(packedXZ);
+ this.recycleTaskQueue(tasks);
+ this.tasksPerChunk.remove(packedXZ);
+ this.chunkQueue.remove(packedXZWithDistance);
+ }
+ }
+
@@ -803,28 +920,22 @@ index 0000000000000000000000000000000000000000..819618b7d02f739e9423099961864698
+ R runnable;
+ try (var ignored = this.lock.withSpinLock()) {
+ this.processChunkDistanceUpdates();
+ if (this.chunkQueue.isEmpty()) {
+ // Pre-poll a task if necessary
+ if (this.prepolledRunnable == null) {
+ this.pollFromQueueIntoPrepolled();
+ }
+ runnable = this.prepolledRunnable;
+ // If it is still null, there are no tasks
+ if (runnable == null) {
+ return false;
+ }
+ long packedXZWithDistance = this.chunkQueue.firstLong();
+ long packedXZ = stripTightlyPackedDistance(packedXZWithDistance);
+ Queue<R> tasks = this.tasksPerChunk.get(packedXZ);
+ runnable = tasks.peek();
+ //noinspection DataFlowIssue
+ if (this.blockingCount == 0 && !this.shouldRun(runnable)) {
+ return false;
+ }
+ tasks.poll();
+ if (tasks.isEmpty()) {
+ this.distancePerChunk.remove(packedXZ);
+ this.recycleTaskQueue(tasks);
+ this.tasksPerChunk.remove(packedXZ);
+ this.chunkQueue.remove(packedXZWithDistance);
+ }
+ this.prepolledRunnable = null;
+ //noinspection NonAtomicOperationOnVolatileField
+ this.pendingTaskCount--;
+ }
+ //noinspection DataFlowIssue
+ this.doRunTask(runnable);
+ return true;
+ }
@@ -889,7 +1000,7 @@ index 0000000000000000000000000000000000000000..fe2e06a827555d81a30697f8b0866769
+
+}
diff --git a/src/main/java/org/galemc/gale/executor/queue/AllLevelsScheduledTaskQueue.java b/src/main/java/org/galemc/gale/executor/queue/AllLevelsScheduledTaskQueue.java
index 657c3663ed54043e7e4e6660d34903ef746fd8e7..c2acd36b3101042f39afe1436836078dcce2100d 100644
index b4172f285fbed1f314891b2f729aa2dc27b9ab9b..ed642b13e95479d0ec98731a3f5b74cf2fb78f81 100644
--- a/src/main/java/org/galemc/gale/executor/queue/AllLevelsScheduledTaskQueue.java
+++ b/src/main/java/org/galemc/gale/executor/queue/AllLevelsScheduledTaskQueue.java
@@ -12,7 +12,8 @@ import org.galemc.gale.executor.thread.pool.BaseThreadActivation;

View File

@@ -545,17 +545,16 @@ index ed3ccf2e64539363a7be2d507c68c40b5913f75c..a12250e5aaed02995b7bf09a8018a93f
}
diff --git a/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java b/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java
new file mode 100644
index 0000000000000000000000000000000000000000..9c36119b03ba9a20dc6994ef2d2704ebe9cc4800
index 0000000000000000000000000000000000000000..0fac4a8b238636adb826a3d1a8db204042fb8098
--- /dev/null
+++ b/src/main/java/org/galemc/gale/executor/queue/ChunkWorkerTaskQueue.java
@@ -0,0 +1,103 @@
@@ -0,0 +1,111 @@
+// Gale - base thread pool - chunk worker task queue
+
+package org.galemc.gale.executor.queue;
+
+import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedQueueExecutorThread;
+import io.papermc.paper.chunk.system.scheduling.ChunkTaskScheduler;
+import net.minecraft.server.level.ServerChunkCache;
+import org.galemc.gale.executor.TaskSpan;
+import org.galemc.gale.executor.annotation.YieldFree;
+import org.galemc.gale.executor.annotation.thread.AnyThreadSafe;
@@ -588,6 +587,15 @@ index 0000000000000000000000000000000000000000..9c36119b03ba9a20dc6994ef2d2704eb
+ }
+
+ @Override
+ public boolean canBeYieldedTo() {
+ /*
+ A single returned chunk worker task typically performs a loop that executes as many smaller tasks as possible,
+ and which may keep running for a long time.
+ */
+ return false;
+ }
+
+ @Override
+ public boolean hasTasks() {
+ var workerThreads = ChunkTaskScheduler.workerThreads;
+ if (workerThreads == null) {
@@ -640,7 +648,7 @@ index 0000000000000000000000000000000000000000..9c36119b03ba9a20dc6994ef2d2704eb
+ * To be called when a new task has been added to the underlying storage of this queue.
+ */
+ public void newTaskWasAdded() {
+ BaseThreadActivation.newTaskWasAdded(this.tier, TaskSpan.YIELDING, true);
+ BaseThreadActivation.newTaskWasAdded(this.tier, TaskSpan.YIELDING, false, true);
+ }
+
+ @Override
@@ -653,7 +661,7 @@ index 0000000000000000000000000000000000000000..9c36119b03ba9a20dc6994ef2d2704eb
+
+}
diff --git a/src/main/java/org/galemc/gale/executor/thread/BaseThread.java b/src/main/java/org/galemc/gale/executor/thread/BaseThread.java
index 76ca22e30b41e4a39ed215cfb03c9a793eefcc09..f142f726663686ac475f64a1ffcbac145c08f5ae 100644
index 4ec06cbff045bd42c1da5881cd0f2446cde39a7b..1a97a74e11a8c675980b3ec2e76b72057a203308 100644
--- a/src/main/java/org/galemc/gale/executor/thread/BaseThread.java
+++ b/src/main/java/org/galemc/gale/executor/thread/BaseThread.java
@@ -2,6 +2,8 @@
@@ -673,7 +681,7 @@ index 76ca22e30b41e4a39ed215cfb03c9a793eefcc09..f142f726663686ac475f64a1ffcbac14
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.TimeUnit;
@@ -737,6 +740,20 @@ public abstract class BaseThread extends Thread implements AbstractYieldingThrea
@@ -742,6 +745,20 @@ public abstract class BaseThread extends Thread implements AbstractYieldingThrea
}
}