mirror of
https://github.com/Dreeam-qwq/Gale.git
synced 2025-12-22 00:09:25 +00:00
1077 lines
56 KiB
Diff
1077 lines
56 KiB
Diff
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
|
|
From: Martijn Muijsers <martijnmuijsers@live.nl>
|
|
Date: Sun, 29 Jan 2023 22:37:12 +0100
|
|
Subject: [PATCH] Run chunk cache tasks on base thread pool
|
|
|
|
License: AGPL-3.0 (https://www.gnu.org/licenses/agpl-3.0.html)
|
|
Gale - https://galemc.org
|
|
|
|
diff --git a/src/main/java/com/destroystokyo/paper/util/misc/DistanceTrackingAreaMap.java b/src/main/java/com/destroystokyo/paper/util/misc/DistanceTrackingAreaMap.java
|
|
index 896c3ff7ddb07f1f6f05f90e1e3fe7fb615071d4..1dfee2b857f2a37fa1bb9b8e163809963b408613 100644
|
|
--- a/src/main/java/com/destroystokyo/paper/util/misc/DistanceTrackingAreaMap.java
|
|
+++ b/src/main/java/com/destroystokyo/paper/util/misc/DistanceTrackingAreaMap.java
|
|
@@ -15,7 +15,7 @@ public abstract class DistanceTrackingAreaMap<E> extends AreaMap<E> {
|
|
this.chunkToNearestDistance.defaultReturnValue(-1);
|
|
}
|
|
|
|
- protected final DistanceChangeCallback<E> distanceChangeCallback;
|
|
+ public DistanceChangeCallback<E> distanceChangeCallback; // Gale - base thread pool - chunk-sorted cache tasks - private -> public, final -> non-final
|
|
|
|
public DistanceTrackingAreaMap() {
|
|
this(new PooledLinkedHashSets<>());
|
|
diff --git a/src/main/java/io/papermc/paper/chunk/PlayerChunkLoader.java b/src/main/java/io/papermc/paper/chunk/PlayerChunkLoader.java
|
|
index e77972c4c264100ffdd824bfa2dac58dbbc6d678..b2e4fb69fd6564484e0ebd120ba87431c5c158e4 100644
|
|
--- a/src/main/java/io/papermc/paper/chunk/PlayerChunkLoader.java
|
|
+++ b/src/main/java/io/papermc/paper/chunk/PlayerChunkLoader.java
|
|
@@ -718,7 +718,7 @@ public final class PlayerChunkLoader {
|
|
this.onChunkSendReady(queuedLoad.chunkX, queuedLoad.chunkZ);
|
|
} else if (this.chunkNeedsPostProcessing(queuedLoad.chunkX, queuedLoad.chunkZ)) {
|
|
// requires post processing
|
|
- this.chunkMap.mainThreadExecutor.execute(() -> {
|
|
+ this.chunkMap.mainThreadExecutor.execute(queuedLoad.chunkX, queuedLoad.chunkZ, () -> { // Gale - base thread pool - chunk-sorted cache tasks
|
|
final long key = CoordinateUtils.getChunkKey(queuedLoad.chunkX, queuedLoad.chunkZ);
|
|
final ChunkHolder holder = PlayerChunkLoader.this.chunkMap.getVisibleChunkIfPresent(key);
|
|
|
|
diff --git a/src/main/java/net/minecraft/server/level/ChunkMap.java b/src/main/java/net/minecraft/server/level/ChunkMap.java
|
|
index 0a39aaf5bc54e1fb7fb4bada2a15fea9b73af731..d7bab50d51896c6aeb6015d5c2eb130c88338ede 100644
|
|
--- a/src/main/java/net/minecraft/server/level/ChunkMap.java
|
|
+++ b/src/main/java/net/minecraft/server/level/ChunkMap.java
|
|
@@ -1,6 +1,7 @@
|
|
package net.minecraft.server.level;
|
|
|
|
import co.aikar.timings.Timing; // Paper
|
|
+import com.destroystokyo.paper.util.misc.PooledLinkedHashSets;
|
|
import com.google.common.collect.ImmutableList;
|
|
import com.google.common.collect.ImmutableList.Builder;
|
|
import com.google.common.collect.Iterables;
|
|
@@ -111,6 +112,7 @@ import net.minecraft.world.level.storage.LevelStorageSource;
|
|
import net.minecraft.world.phys.Vec3;
|
|
import org.apache.commons.lang3.mutable.MutableBoolean;
|
|
import org.apache.commons.lang3.mutable.MutableObject;
|
|
+import org.galemc.gale.executor.ClosestChunkBlockableEventLoop;
|
|
import org.slf4j.Logger;
|
|
import org.bukkit.craftbukkit.generator.CustomChunkGenerator;
|
|
import org.bukkit.entity.Player;
|
|
@@ -134,7 +136,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
|
|
// Paper - rewrite chunk system
|
|
public final ServerLevel level;
|
|
private final ThreadedLevelLightEngine lightEngine;
|
|
- public final BlockableEventLoop<Runnable> mainThreadExecutor; // Paper - public
|
|
+ public final ClosestChunkBlockableEventLoop<Runnable> mainThreadExecutor; // Paper - public // Gale - base thread pool - chunk-sorted cache tasks
|
|
public ChunkGenerator generator;
|
|
private final RandomState randomState;
|
|
private final ChunkGeneratorStructureState chunkGeneratorState;
|
|
@@ -178,7 +180,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
|
|
|
|
// these maps are named after spigot's uses
|
|
public final com.destroystokyo.paper.util.misc.PlayerAreaMap playerMobSpawnMap; // this map is absent from updateMaps since it's controlled at the start of the chunkproviderserver tick
|
|
- public final com.destroystokyo.paper.util.misc.PlayerAreaMap playerChunkTickRangeMap;
|
|
+ public final com.destroystokyo.paper.util.misc.PlayerDistanceTrackingAreaMap playerChunkTickRangeMap; // Gale - base thread pool - chunk-sorted cache tasks
|
|
// Paper end - optimise ChunkMap#anyPlayerCloseEnoughForSpawning
|
|
// Paper start - use distance map to optimise tracker
|
|
public static boolean isLegacyTrackingEntity(Entity entity) {
|
|
@@ -293,7 +295,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
|
|
}
|
|
// Paper end
|
|
|
|
- public ChunkMap(ServerLevel world, LevelStorageSource.LevelStorageAccess session, DataFixer dataFixer, StructureTemplateManager structureTemplateManager, Executor executor, BlockableEventLoop<Runnable> mainThreadExecutor, LightChunkGetter chunkProvider, ChunkGenerator chunkGenerator, ChunkProgressListener worldGenerationProgressListener, ChunkStatusUpdateListener chunkStatusChangeListener, Supplier<DimensionDataStorage> persistentStateManagerFactory, int viewDistance, boolean dsync) {
|
|
+ public ChunkMap(ServerLevel world, LevelStorageSource.LevelStorageAccess session, DataFixer dataFixer, StructureTemplateManager structureTemplateManager, Executor executor, ClosestChunkBlockableEventLoop<Runnable> mainThreadExecutor, LightChunkGetter chunkProvider, ChunkGenerator chunkGenerator, ChunkProgressListener worldGenerationProgressListener, ChunkStatusUpdateListener chunkStatusChangeListener, Supplier<DimensionDataStorage> persistentStateManagerFactory, int viewDistance, boolean dsync) {
|
|
super(session.getDimensionPath(world.dimension()).resolve("region"), dataFixer, dsync);
|
|
// Paper - rewrite chunk system
|
|
this.tickingGenerated = new AtomicInteger();
|
|
@@ -325,7 +327,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
|
|
}
|
|
|
|
this.chunkGeneratorState = chunkGenerator.createState(iregistrycustom.lookupOrThrow(Registries.STRUCTURE_SET), this.randomState, j, world.spigotConfig); // Spigot
|
|
- this.mainThreadExecutor = mainThreadExecutor;
|
|
+ this.mainThreadExecutor = mainThreadExecutor; // Gale - base thread pool
|
|
// Paper - rewrite chunk system
|
|
|
|
Objects.requireNonNull(mainThreadExecutor);
|
|
@@ -386,7 +388,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
|
|
}
|
|
// Paper end - use distance map to optimise entity tracker
|
|
// Paper start - optimise ChunkMap#anyPlayerCloseEnoughForSpawning
|
|
- this.playerChunkTickRangeMap = new com.destroystokyo.paper.util.misc.PlayerAreaMap(this.pooledLinkedPlayerHashSets,
|
|
+ this.playerChunkTickRangeMap = new com.destroystokyo.paper.util.misc.PlayerDistanceTrackingAreaMap(this.pooledLinkedPlayerHashSets, // Gale - base thread pool - chunk-sorted cache tasks
|
|
(ServerPlayer player, int rangeX, int rangeZ, int currPosX, int currPosZ, int prevPosX, int prevPosZ,
|
|
com.destroystokyo.paper.util.misc.PooledLinkedHashSets.PooledObjectLinkedOpenHashSet<ServerPlayer> newState) -> {
|
|
ChunkHolder playerChunk = ChunkMap.this.getUpdatingChunkIfPresent(MCUtil.getCoordinateKey(rangeX, rangeZ));
|
|
@@ -400,7 +402,12 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
|
|
if (playerChunk != null) {
|
|
playerChunk.playersInChunkTickRange = newState;
|
|
}
|
|
+ // Gale start - base thread pool - chunk-sorted cache tasks
|
|
+ },
|
|
+ (int posX, int posZ, int oldNearestDistance, int newNearestDistance, PooledLinkedHashSets.PooledObjectLinkedOpenHashSet<ServerPlayer> state) -> {
|
|
+ this.level.chunkSource.mainThreadProcessor.onChunkDistanceChange(posX, posZ, newNearestDistance);
|
|
});
|
|
+ // Gale end - base thread pool - chunk-sorted cache tasks
|
|
this.playerMobSpawnMap = new com.destroystokyo.paper.util.misc.PlayerAreaMap(this.pooledLinkedPlayerHashSets,
|
|
(ServerPlayer player, int rangeX, int rangeZ, int currPosX, int currPosZ, int prevPosX, int prevPosZ,
|
|
com.destroystokyo.paper.util.misc.PooledLinkedHashSets.PooledObjectLinkedOpenHashSet<ServerPlayer> newState) -> {
|
|
@@ -696,7 +703,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider
|
|
}
|
|
|
|
protected void releaseLightTicket(ChunkPos pos) {
|
|
- this.mainThreadExecutor.tell(Util.name(() -> {
|
|
+ this.mainThreadExecutor.tell(pos.x, pos.z, Util.name(() -> { // Gale - base thread pool - chunk-sorted cache tasks
|
|
this.distanceManager.removeTicket(TicketType.LIGHT, pos, 33 + ChunkStatus.getDistance(ChunkStatus.LIGHT), pos);
|
|
}, () -> {
|
|
return "release light ticket " + pos;
|
|
diff --git a/src/main/java/net/minecraft/server/level/ServerChunkCache.java b/src/main/java/net/minecraft/server/level/ServerChunkCache.java
|
|
index 83a57b9bc59063ed8299f98bc33e14b57f2ea0de..9ee931a6442d3f18a7521704f39297af0d7af6d7 100644
|
|
--- a/src/main/java/net/minecraft/server/level/ServerChunkCache.java
|
|
+++ b/src/main/java/net/minecraft/server/level/ServerChunkCache.java
|
|
@@ -2,16 +2,12 @@ package net.minecraft.server.level;
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.collect.Lists;
|
|
+import com.google.common.collect.Queues;
|
|
import com.mojang.datafixers.DataFixer;
|
|
import com.mojang.datafixers.util.Either;
|
|
import java.io.File;
|
|
import java.io.IOException;
|
|
-import java.util.Arrays;
|
|
-import java.util.Collections;
|
|
-import java.util.Iterator;
|
|
-import java.util.List;
|
|
-import java.util.Objects;
|
|
-import java.util.Optional;
|
|
+import java.util.*;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.function.BooleanSupplier;
|
|
@@ -22,6 +18,7 @@ import net.minecraft.Util;
|
|
import net.minecraft.core.BlockPos;
|
|
import net.minecraft.core.SectionPos;
|
|
import net.minecraft.network.protocol.Packet;
|
|
+import net.minecraft.server.MinecraftServer;
|
|
import net.minecraft.server.level.progress.ChunkProgressListener;
|
|
import net.minecraft.util.VisibleForDebug;
|
|
import net.minecraft.util.thread.BlockableEventLoop;
|
|
@@ -48,6 +45,7 @@ import net.minecraft.world.level.storage.DimensionDataStorage;
|
|
import net.minecraft.world.level.storage.LevelData;
|
|
import net.minecraft.world.level.storage.LevelStorageSource;
|
|
import it.unimi.dsi.fastutil.objects.ReferenceOpenHashSet; // Paper
|
|
+import org.galemc.gale.executor.ClosestChunkBlockableEventLoop;
|
|
import org.galemc.gale.executor.lock.YieldingLock;
|
|
import org.galemc.gale.executor.queue.BaseTaskQueues;
|
|
import org.galemc.gale.executor.thread.AbstractYieldingThread;
|
|
@@ -308,6 +306,7 @@ public class ServerChunkCache extends ChunkSource {
|
|
file.mkdirs();
|
|
this.dataStorage = new DimensionDataStorage(file, dataFixer);
|
|
this.chunkMap = new ChunkMap(world, session, dataFixer, structureTemplateManager, workerExecutor, this.mainThreadProcessor, this, chunkGenerator, worldGenerationProgressListener, chunkStatusChangeListener, persistentStateManagerFactory, viewDistance, dsync);
|
|
+ this.mainThreadProcessor.setAreaMap(this.chunkMap.playerChunkTickRangeMap); // Gale - base thread pool - chunk-sorted cache tasks
|
|
this.lightEngine = this.chunkMap.getLightEngine();
|
|
this.distanceManager = this.chunkMap.getDistanceManager();
|
|
this.distanceManager.updateSimulationDistance(simulationDistance);
|
|
@@ -422,7 +421,7 @@ public class ServerChunkCache extends ChunkSource {
|
|
if (!io.papermc.paper.util.TickThread.isTickThread()) { // Paper - rewrite chunk system
|
|
return (ChunkAccess) CompletableFuture.supplyAsync(() -> {
|
|
return this.getChunk(x, z, leastStatus, create);
|
|
- }, this.mainThreadProcessor).join();
|
|
+ }, this.mainThreadProcessor.createExecutorForChunk(x, z)).join(); // Gale - base thread pool - chunk-sorted cache tasks
|
|
} else {
|
|
// Paper start - optimise for loaded chunks
|
|
LevelChunk ifLoaded = this.getChunkAtIfLoadedMainThread(x, z);
|
|
@@ -446,7 +445,7 @@ public class ServerChunkCache extends ChunkSource {
|
|
// Paper end
|
|
com.destroystokyo.paper.io.SyncLoadFinder.logSyncLoad(this.level, x1, z1); // Paper - sync load info
|
|
this.level.timings.syncChunkLoad.startTiming(); // Paper
|
|
- chunkproviderserver_b.managedBlock(completablefuture::isDone);
|
|
+ chunkproviderserver_b.managedYield(completablefuture); // Gale - base thread pool
|
|
io.papermc.paper.chunk.system.scheduling.ChunkTaskScheduler.popChunkWait(); // Paper - async chunk debug // Paper - rewrite chunk system
|
|
this.level.timings.syncChunkLoad.stopTiming(); // Paper
|
|
} // Paper
|
|
@@ -489,11 +488,11 @@ public class ServerChunkCache extends ChunkSource {
|
|
ServerChunkCache.MainThreadExecutor chunkproviderserver_b = this.mainThreadProcessor;
|
|
|
|
Objects.requireNonNull(completablefuture);
|
|
- chunkproviderserver_b.managedBlock(completablefuture::isDone);
|
|
+ chunkproviderserver_b.managedYield(completablefuture); // Gale - base thread pool
|
|
} else {
|
|
completablefuture = CompletableFuture.supplyAsync(() -> {
|
|
return this.getChunkFutureMainThread(chunkX, chunkZ, leastStatus, create);
|
|
- }, this.mainThreadProcessor).thenCompose((completablefuture1) -> {
|
|
+ }, this.mainThreadProcessor.createExecutorForChunk(chunkX, chunkZ)).thenCompose((completablefuture1) -> { // Gale - base thread pool - chunk-sorted cache tasks
|
|
return completablefuture1;
|
|
});
|
|
}
|
|
@@ -885,7 +884,7 @@ public class ServerChunkCache extends ChunkSource {
|
|
|
|
@Override
|
|
public void onLightUpdate(LightLayer type, SectionPos pos) {
|
|
- this.mainThreadProcessor.execute(() -> {
|
|
+ this.mainThreadProcessor.execute(pos.x(), pos.z(), () -> { // Gale - base thread pool - chunk-sorted cache tasks
|
|
ChunkHolder playerchunk = this.getVisibleChunkIfPresent(pos.chunk().toLong());
|
|
|
|
if (playerchunk != null) {
|
|
@@ -971,7 +970,16 @@ public class ServerChunkCache extends ChunkSource {
|
|
this.distanceManager.removeTicketsOnClosing();
|
|
}
|
|
|
|
- public final class MainThreadExecutor extends BlockableEventLoop<Runnable> {
|
|
+ // Gale start - base thread pool
|
|
+ public final class MainThreadExecutor extends ClosestChunkBlockableEventLoop<Runnable> { // Gale - base thread pool - chunk-sorted cache tasks
|
|
+
|
|
+ /**
|
|
+ * The time interval for the server thread to yield when this executor is performing
|
|
+ * a {@link #managedYield} but failed to perform any other tasks from this executor itself.
|
|
+ */
|
|
+ private static final long MANAGED_YIELD_TIMEOUT_TIME = 50_000L;
|
|
+ private static @Nullable YieldingLock yieldingLockToNotifyForNewChunkCacheTasks;
|
|
+ // Gale end - base thread pool
|
|
|
|
MainThreadExecutor(Level world) {
|
|
super("Chunk source main thread executor for " + world.dimension().location());
|
|
@@ -1002,6 +1010,40 @@ public class ServerChunkCache extends ChunkSource {
|
|
super.doRunTask(task);
|
|
}
|
|
|
|
+ // Gale start - base thread pool
|
|
+ @Override
|
|
+ public void tell(int chunkX, int chunkZ, Runnable runnable) {
|
|
+ super.tell(chunkX, chunkZ, runnable);
|
|
+ MinecraftServer.nextTimeAssumeWeMayHaveDelayedTasks = true;
|
|
+ BaseTaskQueues.allLevelsScheduledChunkCache.newTaskWasAdded();
|
|
+ if (yieldingLockToNotifyForNewChunkCacheTasks != null) {
|
|
+ yieldingLockToNotifyForNewChunkCacheTasks.unlock();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public void managedBlock(BooleanSupplier stopCondition) {
|
|
+ throw new UnsupportedOperationException("Cannot call " + this.getClass().getName() + ".managedBlock(BooleanSupplier), call managedYield(CompletableFuture) instead");
|
|
+ }
|
|
+
|
|
+ public void managedYield(CompletableFuture<?> future) {
|
|
+ if (!future.isDone()) {
|
|
+ ++this.blockingCount;
|
|
+ try {
|
|
+ var currentThread = AbstractYieldingThread.currentYieldingThread();
|
|
+ while (!future.isDone()) {
|
|
+ if (!this.pollTask()) {
|
|
+ long timeoutTime = System.nanoTime() + MANAGED_YIELD_TIMEOUT_TIME;
|
|
+ currentThread.yieldUntilFuture(timeoutTime, () -> this.hasPendingTasks(), future, autoCompletingLock -> yieldingLockToNotifyForNewChunkCacheTasks = autoCompletingLock);
|
|
+ }
|
|
+ }
|
|
+ yieldingLockToNotifyForNewChunkCacheTasks = null;
|
|
+ } finally {
|
|
+ --this.blockingCount;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ // Gale end - base thread pool
|
|
+
|
|
@Override
|
|
// CraftBukkit start - process pending Chunk loadCallback() and unloadCallback() after each run task
|
|
public boolean pollTask() {
|
|
diff --git a/src/main/java/net/minecraft/server/level/ServerLevel.java b/src/main/java/net/minecraft/server/level/ServerLevel.java
|
|
index feeca393f960210a62740da053d21fecd62b0252..8a6f903d84eb9259236a63011d66b7246fca28c2 100644
|
|
--- a/src/main/java/net/minecraft/server/level/ServerLevel.java
|
|
+++ b/src/main/java/net/minecraft/server/level/ServerLevel.java
|
|
@@ -275,7 +275,7 @@ public class ServerLevel extends Level implements WorldGenLevel {
|
|
public final void loadChunksForMoveAsync(AABB axisalignedbb, ca.spottedleaf.concurrentutil.executor.standard.PrioritisedExecutor.Priority priority,
|
|
java.util.function.Consumer<List<net.minecraft.world.level.chunk.ChunkAccess>> onLoad) {
|
|
if (Thread.currentThread() != this.thread) {
|
|
- this.getChunkSource().mainThreadProcessor.execute(() -> {
|
|
+ this.getChunkSource().mainThreadProcessor.execute(Mth.floor((axisalignedbb.minX + axisalignedbb.maxX) / 2.0) >> 4, Mth.floor((axisalignedbb.minZ + axisalignedbb.maxZ) / 2.0) >> 4, () -> { // Gale - base thread pool - chunk-sorted cache tasks
|
|
this.loadChunksForMoveAsync(axisalignedbb, priority, onLoad);
|
|
});
|
|
return;
|
|
diff --git a/src/main/java/net/minecraft/server/level/ThreadedLevelLightEngine.java b/src/main/java/net/minecraft/server/level/ThreadedLevelLightEngine.java
|
|
index 660693c6dc0ef86f4013df980b6d0c11c03e46cd..236cc920a5943abb249d50a6957d6418fd941501 100644
|
|
--- a/src/main/java/net/minecraft/server/level/ThreadedLevelLightEngine.java
|
|
+++ b/src/main/java/net/minecraft/server/level/ThreadedLevelLightEngine.java
|
|
@@ -98,7 +98,7 @@ public class ThreadedLevelLightEngine extends LevelLightEngine implements AutoCl
|
|
this.chunkMap.level.chunkTaskScheduler.lightExecutor.queueRunnable(() -> { // Paper - rewrite chunk system
|
|
this.theLightEngine.relightChunks(chunks, (ChunkPos chunkPos) -> {
|
|
chunkLightCallback.accept(chunkPos);
|
|
- ((java.util.concurrent.Executor)((ServerLevel)this.theLightEngine.getWorld()).getChunkSource().mainThreadProcessor).execute(() -> {
|
|
+ (((ServerLevel)this.theLightEngine.getWorld()).getChunkSource().mainThreadProcessor).execute(chunkPos.x, chunkPos.z, () -> { // Gale - base thread pool - chunk-sorted cache tasks
|
|
((ServerLevel)this.theLightEngine.getWorld()).getChunkSource().chunkMap.getUpdatingChunkIfPresent(chunkPos.toLong()).broadcast(new net.minecraft.network.protocol.game.ClientboundLightUpdatePacket(chunkPos, ThreadedLevelLightEngine.this, null, null, true), false);
|
|
((ServerLevel)this.theLightEngine.getWorld()).getChunkSource().removeTicketAtLevel(TicketType.CHUNK_RELIGHT, chunkPos, io.papermc.paper.util.MCUtil.getTicketLevelFor(ChunkStatus.LIGHT), ticketIds.get(chunkPos));
|
|
});
|
|
@@ -130,7 +130,7 @@ public class ThreadedLevelLightEngine extends LevelLightEngine implements AutoCl
|
|
|
|
if (!world.getChunkSource().chunkMap.mainThreadExecutor.isSameThread()) {
|
|
// ticket logic is not safe to run off-main, re-schedule
|
|
- world.getChunkSource().chunkMap.mainThreadExecutor.execute(() -> {
|
|
+ world.getChunkSource().chunkMap.mainThreadExecutor.execute(chunkX, chunkZ, () -> { // Gale - base thread pool - chunk-sorted cache tasks
|
|
this.queueTaskForSection(chunkX, chunkY, chunkZ, runnable);
|
|
});
|
|
return;
|
|
@@ -160,7 +160,7 @@ public class ThreadedLevelLightEngine extends LevelLightEngine implements AutoCl
|
|
} else {
|
|
this.chunksBeingWorkedOn.put(key, newReferences - 1);
|
|
}
|
|
- }, world.getChunkSource().chunkMap.mainThreadExecutor).whenComplete((final Void ignore, final Throwable thr) -> {
|
|
+ }, world.getChunkSource().chunkMap.mainThreadExecutor.createExecutorForChunk(chunkX, chunkZ)).whenComplete((final Void ignore, final Throwable thr) -> { // Gale - base thread pool - chunk-sorted cache tasks
|
|
if (thr != null) {
|
|
LOGGER.error("Failed to remove ticket level for post chunk task " + new ChunkPos(chunkX, chunkZ), thr);
|
|
}
|
|
diff --git a/src/main/java/net/minecraft/util/thread/BlockableEventLoop.java b/src/main/java/net/minecraft/util/thread/BlockableEventLoop.java
|
|
index 392e7b4a89669f16b32043b65b69e6593d17f10e..c2378d66bbd65f786a942eba74dd374b551bcbe8 100644
|
|
--- a/src/main/java/net/minecraft/util/thread/BlockableEventLoop.java
|
|
+++ b/src/main/java/net/minecraft/util/thread/BlockableEventLoop.java
|
|
@@ -21,7 +21,7 @@ public abstract class BlockableEventLoop<R extends Runnable> implements Profiler
|
|
private final String name;
|
|
private static final Logger LOGGER = LogUtils.getLogger();
|
|
private final Queue<R> pendingRunnables = Queues.newConcurrentLinkedQueue();
|
|
- private int blockingCount;
|
|
+ protected int blockingCount; // Gale - base thread pool
|
|
|
|
protected BlockableEventLoop(String name) {
|
|
this.name = name;
|
|
@@ -62,7 +62,7 @@ public abstract class BlockableEventLoop<R extends Runnable> implements Profiler
|
|
return this.scheduleExecutables() ? CompletableFuture.supplyAsync(task, this) : CompletableFuture.completedFuture(task.get());
|
|
}
|
|
|
|
- private CompletableFuture<Void> submitAsync(Runnable runnable) {
|
|
+ protected CompletableFuture<Void> submitAsync(Runnable runnable) { // Gale - base thread pool - private -> protected
|
|
return CompletableFuture.supplyAsync(() -> {
|
|
runnable.run();
|
|
return null;
|
|
diff --git a/src/main/java/net/minecraft/world/level/chunk/LevelChunk.java b/src/main/java/net/minecraft/world/level/chunk/LevelChunk.java
|
|
index 18c3f13c523ea62a098f489636b4320c73da4b8b..43c3ff92bad2b812fbad9af65bde17681690a72e 100644
|
|
--- a/src/main/java/net/minecraft/world/level/chunk/LevelChunk.java
|
|
+++ b/src/main/java/net/minecraft/world/level/chunk/LevelChunk.java
|
|
@@ -196,7 +196,7 @@ public class LevelChunk extends ChunkAccess {
|
|
if (!areNeighboursLoaded(bitsetBefore, 2) && areNeighboursLoaded(bitsetAfter, 2)) {
|
|
if (chunkMap.playerChunkManager.tickMap.getObjectsInRange(this.coordinateKey) != null) { // Paper - replace old player chunk loading system
|
|
// now we're ready for entity ticking
|
|
- chunkProviderServer.mainThreadProcessor.execute(() -> {
|
|
+ chunkProviderServer.mainThreadProcessor.execute(this.chunkPos.x, this.chunkPos.z, () -> { // Gale - base thread pool - chunk-sorted cache tasks
|
|
// double check that this condition still holds.
|
|
if (LevelChunk.this.areNeighboursLoaded(2) && chunkMap.playerChunkManager.tickMap.getObjectsInRange(LevelChunk.this.coordinateKey) != null) { // Paper - replace old player chunk loading system
|
|
chunkMap.playerChunkManager.onChunkPlayerTickReady(this.chunkPos.x, this.chunkPos.z); // Paper - replace old player chunk
|
|
@@ -212,7 +212,7 @@ public class LevelChunk extends ChunkAccess {
|
|
if (chunkMap.playerChunkManager.isChunkNearPlayers(this.chunkPos.x, this.chunkPos.z)) {
|
|
// the post processing is expensive, so we don't want to run it unless we're actually near
|
|
// a player.
|
|
- chunkProviderServer.mainThreadProcessor.execute(() -> {
|
|
+ chunkProviderServer.mainThreadProcessor.execute(this.chunkPos.x, this.chunkPos.z, () -> { // Gale - base thread pool - chunk-sorted cache tasks
|
|
if (!LevelChunk.this.areNeighboursLoaded(1)) {
|
|
return;
|
|
}
|
|
diff --git a/src/main/java/org/bukkit/craftbukkit/CraftWorld.java b/src/main/java/org/bukkit/craftbukkit/CraftWorld.java
|
|
index 1c2220ff497db66a48ae47cff9103c424ff37bc8..1cdcf0b7ec11c7bda92aeed2f3470e9419e1214e 100644
|
|
--- a/src/main/java/org/bukkit/craftbukkit/CraftWorld.java
|
|
+++ b/src/main/java/org/bukkit/craftbukkit/CraftWorld.java
|
|
@@ -371,7 +371,7 @@ public class CraftWorld extends CraftRegionAccessor implements World {
|
|
if (!Bukkit.isPrimaryThread()) {
|
|
return java.util.concurrent.CompletableFuture.supplyAsync(() -> {
|
|
return CraftWorld.this.isChunkGenerated(x, z);
|
|
- }, world.getChunkSource().mainThreadProcessor).join();
|
|
+ }, world.getChunkSource().mainThreadProcessor.createExecutorForChunk(x, z)).join(); // Gale - base thread pool - chunk-sorted cache tasks
|
|
}
|
|
ChunkAccess chunk = world.getChunkSource().getChunkAtImmediately(x, z);
|
|
if (chunk == null) {
|
|
@@ -479,7 +479,7 @@ public class CraftWorld extends CraftRegionAccessor implements World {
|
|
list,
|
|
true
|
|
);
|
|
- serverChunkCache.mainThreadProcessor.managedBlock(future::isDone);
|
|
+ serverChunkCache.mainThreadProcessor.managedYield(future); // Gale - base thread pool
|
|
if (chunkStatus == ChunkStatus.NOISE) {
|
|
future.join().left().ifPresent(chunk -> net.minecraft.world.level.levelgen.Heightmap.primeHeightmaps(chunk, ChunkStatus.POST_FEATURES));
|
|
}
|
|
diff --git a/src/main/java/org/galemc/gale/executor/ClosestChunkBlockableEventLoop.java b/src/main/java/org/galemc/gale/executor/ClosestChunkBlockableEventLoop.java
|
|
new file mode 100644
|
|
index 0000000000000000000000000000000000000000..5048444f22b2718d5f4b0429fcb65655e0c5a3c0
|
|
--- /dev/null
|
|
+++ b/src/main/java/org/galemc/gale/executor/ClosestChunkBlockableEventLoop.java
|
|
@@ -0,0 +1,590 @@
|
|
+// Gale - base thread pool - chunk-sorted cache tasks
|
|
+
|
|
+package org.galemc.gale.executor;
|
|
+
|
|
+import com.destroystokyo.paper.util.misc.DistanceTrackingAreaMap;
|
|
+import com.destroystokyo.paper.util.misc.PlayerDistanceTrackingAreaMap;
|
|
+import io.papermc.paper.util.IntegerUtil;
|
|
+import io.papermc.paper.util.MCUtil;
|
|
+import it.unimi.dsi.fastutil.longs.*;
|
|
+import net.minecraft.util.thread.BlockableEventLoop;
|
|
+import org.galemc.gale.concurrent.Mutex;
|
|
+import org.galemc.gale.executor.annotation.Access;
|
|
+import org.galemc.gale.executor.annotation.Guarded;
|
|
+import org.galemc.gale.executor.annotation.YieldFree;
|
|
+import org.galemc.gale.executor.annotation.thread.AnyThreadSafe;
|
|
+import org.galemc.gale.executor.thread.ServerThread;
|
|
+import org.jetbrains.annotations.NotNull;
|
|
+import org.jetbrains.annotations.Nullable;
|
|
+
|
|
+import java.util.*;
|
|
+import java.util.concurrent.CompletableFuture;
|
|
+import java.util.concurrent.Executor;
|
|
+import java.util.function.Supplier;
|
|
+
|
|
+/**
|
|
+ * A {@link BlockableEventLoop} for which all tasks relate to a chunk, where the chunks with the smallest
|
|
+ * object distance in a given {@link DistanceTrackingAreaMap} have the highest priority (i.e. are executed first).
|
|
+ * have the highest priority.
|
|
+ *
|
|
+ * @author Martijn Muijsers under AGPL-3.0
|
|
+ */
|
|
+@AnyThreadSafe
|
|
+@YieldFree
|
|
+public abstract class ClosestChunkBlockableEventLoop<R extends Runnable> extends BlockableEventLoop<R> {
|
|
+
|
|
+ /**
|
|
+ * @return A packing of the chunk key, similar to {@link MCUtil#getCoordinateKey}, but
|
|
+ * instead of allocating 32 bits for each coordinate, only 27 bits are used.
|
|
+ * Of those 27 bits, 1 bit is used for the sign (1 indicating negative, 0 indicating nonnegative),
|
|
+ * and 26 bits for the absolute value, allowing a signed value in the
|
|
+ * range [-67108863, 67108863] per coordinate (there are two ways to represent 0, of which only the one
|
|
+ * with a nonnegative sign bit (0) is used).
|
|
+ * With 2 coordinates, this leaves 10 unused bits at the most significant end,
|
|
+ * allowing an extra unsigned 9-bit value in the range [0, 511] to be stored,
|
|
+ * and leaving the final (most significant) bit always zero.
|
|
+ */
|
|
+ private static long getTightlyPackedXZ(final int x, final int z) {
|
|
+ return ((z & 0x80000000L) << 22) | ((IntegerUtil.branchlessAbs(z) & 0x03FFFFFFL) << 27) | ((x & 0x80000000L) >> 5) | (IntegerUtil.branchlessAbs(x) & 0x03FFFFFFL);
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * A packed long containing chunk coordinates and a distance. Because the distance is stored in the most
|
|
+ * significant bits (and the sign bit of the long is unused), sorting these values in ascending order
|
|
+ * always sorts them primarily by their distance in ascending order (and secondarily by their chunk coordinates,
|
|
+ * in an order that is irrelevant apart from being deterministic).
|
|
+ *
|
|
+ * @param tightlyPackedXZ The return value pf {@link #getTightlyPackedXZ(int, int)}.
|
|
+ * @param distance A distance in the range [0, 511].
|
|
+ * @return The same as {@link #getTightlyPackedXZ(int, int)}, but with the additional distance value
|
|
+ * stored in the 9 bits at indices [1..9] (where index 0 indicates the most significant bit)
|
|
+ * as an unsigned integer.
|
|
+ *
|
|
+ * @see #getTightlyPackedXZ(int, int)
|
|
+ */
|
|
+ private static long getTightlyPackedXZWithDistance(final long tightlyPackedXZ, final int distance) {
|
|
+ return tightlyPackedXZ | ((distance & 0x000001FFL) << 54);
|
|
+ }
|
|
+
|
|
+ private static long stripTightlyPackedDistance(final long tightlyPackedXZWithDistance) {
|
|
+ return tightlyPackedXZWithDistance & 0x003FFFFFFFFFFFFFL;
|
|
+ }
|
|
+
|
|
+ private static int unpackTightlyPackedX(final long tightlyPackedXZ) {
|
|
+ long sign = (tightlyPackedXZ >> 26) & 1L;
|
|
+ return (int) (((-sign) ^ (tightlyPackedXZ & 0x03FFFFFFL)) + sign);
|
|
+ }
|
|
+
|
|
+ private static int unpackTightlyPackedZ(final long tightlyPackedXZ) {
|
|
+ long sign = (tightlyPackedXZ >> 53) & 1L;
|
|
+ return (int) (((-sign) ^ ((tightlyPackedXZ >> 27) & 0x03FFFFFFL)) + sign);
|
|
+ }
|
|
+
|
|
+ private static int unpackTightlyPackedDistance(final long tightlyPackedXZWithDistance) {
|
|
+ return (int) (tightlyPackedXZWithDistance >> 54);
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * The {@link DistanceTrackingAreaMap} to get distances from.
|
|
+ * This must be set after construction with {@link #setAreaMap}.
|
|
+ */
|
|
+ private PlayerDistanceTrackingAreaMap areaMap;
|
|
+
|
|
+ private final Mutex lock = Mutex.create();
|
|
+
|
|
+ /**
|
|
+ * A pool of re-usable task queues.
|
|
+ * <br>
|
|
+ * This pool is used as a LIFO stack.
|
|
+ */
|
|
+ @Guarded("#lock")
|
|
+ private final ArrayList<Queue<R>> taskQueuePool = new ArrayList<>();
|
|
+
|
|
+ /**
|
|
+ * The last known distance for a chunk, by their {@linkplain #getTightlyPackedXZ(int, int) chunk key}.
|
|
+ * <br>
|
|
+ * Only contains values for chunks that have tasks in {@link #tasksPerChunk}.
|
|
+ * Does not contain a value for the {@link //#prepolledRunnable}.
|
|
+ * For other tasks, the default return value of {@link Long2IntMap#get} is -1.
|
|
+ */
|
|
+ @Guarded("#lock")
|
|
+ private final Long2IntMap distancePerChunk = new Long2IntOpenHashMap();
|
|
+ {
|
|
+ distancePerChunk.defaultReturnValue(-1);
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * The tasks stored for a specific {@linkplain #getTightlyPackedXZ(int, int) chunk key}.
|
|
+ * Each chunk has its tasks stored as a FIFO queue.
|
|
+ */
|
|
+ @Guarded("#lock")
|
|
+ private final Long2ObjectMap<Queue<R>> tasksPerChunk = new Long2ObjectOpenHashMap<>();
|
|
+
|
|
+ /**
|
|
+ * The chunks with tasks, stored as a
|
|
+ * {@linkplain #getTightlyPackedXZWithDistance(long, int) packed chunk key and distance}
|
|
+ * (where the distance is {@linkplain DistanceTrackingAreaMap#getNearestObjectDistance measured}
|
|
+ * at the time of adding).
|
|
+ */
|
|
+ @Guarded("#lock")
|
|
+ private final LongAVLTreeSet chunkQueue = new LongAVLTreeSet();
|
|
+
|
|
+ @Guarded(value = "#lock", fieldAccess = Access.WRITE)
|
|
+ private volatile int pendingTaskCount = 0;
|
|
+
|
|
+ /**
|
|
+ * An ordered list of chunks to update to a new distance, where each triple of elements in the array
|
|
+ * consists of the chunk x, chunk z, and new distance.
|
|
+ */
|
|
+ @Guarded(value = "#lock")
|
|
+ private int[] chunkDistanceUpdates = new int[0];
|
|
+
|
|
+ /**
|
|
+ * The number of chunk distance updates in {@link #chunkDistanceUpdates}.
|
|
+ */
|
|
+ @Guarded(value = "#lock")
|
|
+ private int chunkDistanceUpdateLength;
|
|
+
|
|
+ /**
|
|
+ * A flag indicating the server thread wants to add new chunk distance updates,
|
|
+ * but is waiting for the {@link #lock}.
|
|
+ */
|
|
+ private volatile boolean serverThreadWantsLockToAddChunkDistanceUpdates;
|
|
+
|
|
+// /**
|
|
+// * A pre-polled task to increase the speed of {@link #pollTask()} calls made by the server thread.
|
|
+// */
|
|
+// @Guarded("#lock")
|
|
+// private @Nullable R prepolledRunnable;
|
|
+
|
|
+// /**
|
|
+// * The value of {@link #getTightlyPackedXZ(int, int)} for the {@link #prepolledRunnable}, if it is not null.
|
|
+// * Otherwise, an arbitrary value.
|
|
+// */
|
|
+// @Guarded("#lock")
|
|
+// private @Nullable long prepolledRunnablePackedXZ;
|
|
+
|
|
+// /**
|
|
+// * The last known distance for the {@link #prepolledRunnable}, if it is not null.
|
|
+// * Otherwise, an arbitrary value.
|
|
+// */
|
|
+// @Guarded("#lock")
|
|
+// private @Nullable int prepolledRunnableDistance;
|
|
+
|
|
+ public ClosestChunkBlockableEventLoop(String name) {
|
|
+ super(name);
|
|
+ }
|
|
+
|
|
+ public void setAreaMap(PlayerDistanceTrackingAreaMap areaMap) {
|
|
+ if (this.areaMap != null) {
|
|
+ throw new IllegalStateException("Called " + this.getClass().getName() + ".setAreaMap(areaMap) but it was already called before");
|
|
+ }
|
|
+ this.areaMap = areaMap;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Provisions a task queue, either a recycled queue or a newly created one.
|
|
+ * <br>
|
|
+ * This method must only be called while {@link #lock} is held.
|
|
+ */
|
|
+ private Queue<R> provisionTaskQueue() {
|
|
+ return this.taskQueuePool.isEmpty() ? new ArrayDeque<>(1) : this.taskQueuePool.remove(this.taskQueuePool.size() - 1);
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Returns a task queue to the pool.
|
|
+ * <br>
|
|
+ * This method must only be called while {@link #lock} is held.
|
|
+ *
|
|
+ * @param queue An already empty task queue.
|
|
+ */
|
|
+ private void recycleTaskQueue(Queue<R> queue) {
|
|
+ this.taskQueuePool.add(queue);
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Processes all chunk update tasks in {@link #chunkDistanceUpdates}.
|
|
+ * <br>
|
|
+ * This method must only be called while {@link #lock} is held.
|
|
+ */
|
|
+ private void processChunkDistanceUpdates() {
|
|
+// boolean madeChangesSinceLastPrepoll = false;
|
|
+ boolean isNonServerThread = !(Thread.currentThread() instanceof ServerThread);
|
|
+ while (this.chunkDistanceUpdateLength > 0) {
|
|
+
|
|
+ // Let the server thread add new chunk distance updates
|
|
+ if (isNonServerThread && this.serverThreadWantsLockToAddChunkDistanceUpdates) {
|
|
+// if (madeChangesSinceLastPrepoll) {
|
|
+// this.preparePrepolledRunnable();
|
|
+// }
|
|
+ this.lock.release();
|
|
+ while (this.serverThreadWantsLockToAddChunkDistanceUpdates) {
|
|
+ Thread.onSpinWait();
|
|
+ }
|
|
+ this.lock.spinLock();
|
|
+ }
|
|
+
|
|
+ // Read the change
|
|
+ int chunkX = this.chunkDistanceUpdates[this.chunkDistanceUpdateLength * 3 - 3];
|
|
+ int chunkZ = this.chunkDistanceUpdates[this.chunkDistanceUpdateLength * 3 - 2];
|
|
+ int newDistance = this.chunkDistanceUpdates[this.chunkDistanceUpdateLength * 3 - 1];
|
|
+ this.chunkDistanceUpdateLength--;
|
|
+
|
|
+ // Apply the change
|
|
+ long packedXZ = getTightlyPackedXZ(chunkX, chunkZ);
|
|
+ // Apply the change to the pre-polled task
|
|
+// if (this.prepolledRunnablePackedXZ == packedXZ && this.prepolledRunnable != null) {
|
|
+// this.prepolledRunnableDistance = newDistance;
|
|
+// }
|
|
+ // If we don't have tasks for this queue, skip applying the change to the queue
|
|
+ int oldDistance = this.distancePerChunk.get(packedXZ);
|
|
+ if (oldDistance == -1) {
|
|
+ return;
|
|
+ }
|
|
+ long oldPackedXZWithDistance = getTightlyPackedXZWithDistance(packedXZ, oldDistance);
|
|
+ long newPackedXZWithDistance = getTightlyPackedXZWithDistance(packedXZ, newDistance);
|
|
+ this.distancePerChunk.put(packedXZ, newDistance);
|
|
+ this.chunkQueue.remove(oldPackedXZWithDistance);
|
|
+ this.chunkQueue.add(newPackedXZWithDistance);
|
|
+
|
|
+// madeChangesSinceLastPrepoll = true;
|
|
+
|
|
+ }
|
|
+// if (madeChangesSinceLastPrepoll) {
|
|
+// this.preparePrepolledRunnable();
|
|
+// }
|
|
+ }
|
|
+
|
|
+ public void onChunkDistanceChange(int chunkX, int chunkZ, int newDistance) {
|
|
+ /*
|
|
+ Make sure the distance is in the range [0, 511].
|
|
+ A negative or very high value may indicate the chunk is not within the tracking range of the AreaMap,
|
|
+ so those values are replaced by the maximum value that is in range.
|
|
+ */
|
|
+ int newDistanceWithinRange = newDistance < 0 || newDistance >= 512 ? 511 : newDistance;
|
|
+ long packedXZ = getTightlyPackedXZ(chunkX, chunkZ);
|
|
+ boolean setWantsToLock = false;
|
|
+ while (!this.lock.tryAcquire()) {
|
|
+ if (!setWantsToLock) {
|
|
+ setWantsToLock = true;
|
|
+ this.serverThreadWantsLockToAddChunkDistanceUpdates = true;
|
|
+ }
|
|
+ Thread.onSpinWait();
|
|
+ }
|
|
+ try {
|
|
+ if (setWantsToLock) {
|
|
+ this.serverThreadWantsLockToAddChunkDistanceUpdates = false;
|
|
+ }
|
|
+
|
|
+ // If we don't have tasks for this queue, we don't need the change
|
|
+ int oldDistance = this.distancePerChunk.get(packedXZ);
|
|
+ if (oldDistance == -1) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ // Schedule applying the change
|
|
+ this.chunkDistanceUpdateLength++;
|
|
+ if (this.chunkDistanceUpdates.length < this.chunkDistanceUpdateLength * 3) {
|
|
+ this.chunkDistanceUpdates = Arrays.copyOf(this.chunkDistanceUpdates, this.chunkDistanceUpdateLength * 6);
|
|
+ }
|
|
+ this.chunkDistanceUpdates[this.chunkDistanceUpdateLength * 3 - 3] = chunkX;
|
|
+ this.chunkDistanceUpdates[this.chunkDistanceUpdateLength * 3 - 2] = chunkZ;
|
|
+ this.chunkDistanceUpdates[this.chunkDistanceUpdateLength * 3 - 1] = newDistanceWithinRange;
|
|
+
|
|
+ } finally {
|
|
+ this.lock.release();
|
|
+ }
|
|
+ }
|
|
+
|
|
+// /**
|
|
+// * Sets {@link #prepolledRunnable} appropriately,
|
|
+// * potentially returning the previous value to the {@link #tasksPerChunk}.
|
|
+// * <br>
|
|
+// * This method must only be called while {@link #lock} is held.
|
|
+// */
|
|
+// private void preparePrepolledRunnable() {
|
|
+// if (this.prepolledRunnable == null) {
|
|
+// this.pollFromQueueIntoPrepolled();
|
|
+// return;
|
|
+// }
|
|
+// if (this.chunkQueue.isEmpty()) {
|
|
+// return;
|
|
+// }
|
|
+// long firstPackedXZWithDistanceInQueue = this.chunkQueue.firstLong();
|
|
+// int firstDistanceInQueue = unpackTightlyPackedDistance(firstPackedXZWithDistanceInQueue);
|
|
+// // Swap the pre-polled task if necessary
|
|
+// if (firstDistanceInQueue < this.prepolledRunnableDistance) {
|
|
+// // Return the pre-polled task to the queue
|
|
+// long packedXZWithDistance = getTightlyPackedXZWithDistance(this.prepolledRunnablePackedXZ, this.prepolledRunnableDistance);
|
|
+// this.tasksPerChunk.computeIfAbsent(this.prepolledRunnablePackedXZ, $ -> this.provisionTaskQueue()).add(this.prepolledRunnable);
|
|
+// this.chunkQueue.add(packedXZWithDistance);
|
|
+// this.distancePerChunk.putIfAbsent(this.prepolledRunnablePackedXZ, this.prepolledRunnableDistance);
|
|
+// this.prepolledRunnable = null;
|
|
+// // Set a new pre-polled task
|
|
+// this.pollFromQueueIntoPrepolled();
|
|
+// }
|
|
+// }
|
|
+
|
|
+ public final Executor createExecutorForChunk(int chunkX, int chunkZ) {
|
|
+ return command -> this.execute(chunkX, chunkZ, command);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public int getPendingTasksCount() {
|
|
+ return this.pendingTaskCount;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean hasPendingTasks() {
|
|
+ return this.pendingTaskCount > 0;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public <V> @NotNull CompletableFuture<V> submit(@NotNull Supplier<V> task) {
|
|
+ throw new UnsupportedOperationException("Called " + this.getClass().getName() + ".submit(Supplier), use submit(int, int, Supplier) instead");
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * @see #submit(Supplier)
|
|
+ */
|
|
+ public <V> CompletableFuture<V> submit(int chunkX, int chunkZ, Supplier<V> task) {
|
|
+ return this.scheduleExecutables() ? CompletableFuture.supplyAsync(task, this.createExecutorForChunk(chunkX, chunkZ)) : CompletableFuture.completedFuture(task.get());
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ protected @NotNull CompletableFuture<Void> submitAsync(@NotNull Runnable runnable) {
|
|
+ throw new UnsupportedOperationException("Called " + this.getClass().getName() + ".submitAsync(Runnable), use submitAsync(int, int, Runnable) instead");
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * @see #submitAsync(Runnable)
|
|
+ */
|
|
+ private CompletableFuture<Void> submitAsync(int chunkX, int chunkZ, Runnable runnable) {
|
|
+ return CompletableFuture.supplyAsync(() -> {
|
|
+ runnable.run();
|
|
+ return null;
|
|
+ }, this.createExecutorForChunk(chunkX, chunkZ));
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public @NotNull CompletableFuture<Void> submit(@NotNull Runnable task) {
|
|
+ throw new UnsupportedOperationException("Called " + this.getClass().getName() + ".submit(Runnable), use submit(int, int, Runnable) instead");
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * @see #submit(Runnable)
|
|
+ */
|
|
+ public CompletableFuture<Void> submit(int chunkX, int chunkZ, Runnable task) {
|
|
+ if (this.scheduleExecutables()) {
|
|
+ return this.submitAsync(chunkX, chunkZ, task);
|
|
+ } else {
|
|
+ task.run();
|
|
+ return CompletableFuture.completedFuture((Void)null);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public void executeBlocking(@NotNull Runnable runnable) {
|
|
+ throw new UnsupportedOperationException("Called " + this.getClass().getName() + ".executeBlocking(Runnable), use executeBlocking(int, int, Runnable) instead");
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * @see #executeBlocking(Runnable)
|
|
+ */
|
|
+ public void executeBlocking(int chunkX, int chunkZ, Runnable runnable) {
|
|
+ if (!this.isSameThread()) {
|
|
+ this.submitAsync(chunkX, chunkZ, runnable).join();
|
|
+ } else {
|
|
+ runnable.run();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void scheduleOnMain(@NotNull Runnable r0) {
|
|
+ throw new UnsupportedOperationException("Called " + this.getClass().getName() + ".scheduleOnMain(Runnable), use scheduleOnMain(int, int, Runnable) instead");
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * @see #scheduleOnMain(Runnable)
|
|
+ */
|
|
+ public void scheduleOnMain(int chunkX, int chunkZ, Runnable r0) {
|
|
+ this.tell(chunkX, chunkZ, this.wrapRunnable(r0));
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void tell(@NotNull R runnable) {
|
|
+ throw new UnsupportedOperationException("Called " + this.getClass().getName() + ".tell(R), use tell(int, int, R) instead");
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Schedules a task related to the chunk with the given chunk coordinates.
|
|
+ *
|
|
+ * @see #tell(R)
|
|
+ */
|
|
+ public void tell(int chunkX, int chunkZ, R runnable) {
|
|
+ long packedXZ = getTightlyPackedXZ(chunkX, chunkZ);
|
|
+ int computedDistance = this.areaMap.getNearestObjectDistance(chunkX, chunkZ);
|
|
+ int computedDistanceInRange = computedDistance < 0 || computedDistance >= 512 ? 511 : computedDistance;
|
|
+ try (var ignored = this.lock.withSpinLock()) {
|
|
+// if (this.prepolledRunnable == null && this.chunkQueue.isEmpty()) {
|
|
+// // Set the pre-polled runnable right away
|
|
+// this.prepolledRunnable = runnable;
|
|
+// this.prepolledRunnablePackedXZ = packedXZ;
|
|
+// this.prepolledRunnableDistance = computedDistanceInRange;
|
|
+// } else {
|
|
+ this.processChunkDistanceUpdates();
|
|
+ int distance = this.distancePerChunk.get(packedXZ);
|
|
+// if (distance == -1 && packedXZ == this.prepolledRunnablePackedXZ && this.prepolledRunnable != null) {
|
|
+// // Use the value from the pre-polled task
|
|
+// distance = this.prepolledRunnableDistance;
|
|
+// // Keep it consistent with the queue
|
|
+// this.distancePerChunk.put(packedXZ, computedDistanceInRange);
|
|
+// }
|
|
+ if (distance == -1) {
|
|
+ // Set a known distance
|
|
+ distance = computedDistanceInRange;
|
|
+ this.distancePerChunk.put(packedXZ, computedDistanceInRange);
|
|
+ // Keep it consistent with the pre-polled task
|
|
+// if (this.prepolledRunnablePackedXZ == packedXZ) {
|
|
+// this.prepolledRunnableDistance = computedDistanceInRange;
|
|
+// }
|
|
+ }
|
|
+ long packedXZWithDistance = getTightlyPackedXZWithDistance(packedXZ, distance);
|
|
+ this.tasksPerChunk.computeIfAbsent(packedXZ, $ -> this.provisionTaskQueue()).add(runnable);
|
|
+ this.chunkQueue.add(packedXZWithDistance);
|
|
+// this.preparePrepolledRunnable();
|
|
+// }
|
|
+ //noinspection NonAtomicOperationOnVolatileField
|
|
+ this.pendingTaskCount++;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void execute(@NotNull Runnable runnable) {
|
|
+ throw new UnsupportedOperationException("Called " + this.getClass().getName() + ".execute(Runnable), use execute(int, int, Runnable) instead");
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * @see #execute(Runnable)
|
|
+ */
|
|
+ public void execute(int chunkX, int chunkZ, Runnable runnable) {
|
|
+ if (this.scheduleExecutables()) {
|
|
+ this.tell(chunkX, chunkZ, this.wrapRunnable(runnable));
|
|
+ } else {
|
|
+ runnable.run();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void executeIfPossible(@NotNull Runnable runnable) {
|
|
+ throw new UnsupportedOperationException("Called " + this.getClass().getName() + ".executeIfPossible(Runnable), use executeIfPossible(int, int, Runnable) instead");
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * @see #executeIfPossible(Runnable)
|
|
+ */
|
|
+ public void executeIfPossible(int chunkX, int chunkZ, Runnable runnable) {
|
|
+ this.execute(chunkX, chunkZ, runnable);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ protected void dropAllTasks() {
|
|
+ try (var ignored = this.lock.withSpinLock()) {
|
|
+ this.chunkDistanceUpdateLength = 0;
|
|
+ this.distancePerChunk.clear();
|
|
+ this.tasksPerChunk.forEach(($, queue) -> this.recycleTaskQueue(queue));
|
|
+ this.tasksPerChunk.clear();
|
|
+ this.pendingTaskCount = 0;
|
|
+ this.chunkQueue.clear();
|
|
+// this.prepolledRunnable = null;
|
|
+ }
|
|
+ }
|
|
+
|
|
+// /**
|
|
+// * Polls from the {@link #tasksPerChunk}, without checking {@link #prepolledRunnable},
|
|
+// * and stores the result in the {@link #prepolledRunnable}. If no task is polled, {@link #prepolledRunnable}
|
|
+// * is not modified (particularly, it is not cleared),
|
|
+// * so this task must only be called while {@link #prepolledRunnable} is null.
|
|
+// * <br>
|
|
+// * This method will not make a call to {@link #processChunkDistanceUpdates()}: if necessary, such a call
|
|
+// * must be made beforehand.
|
|
+// * <br>
|
|
+// * This method must only be called while {@link #lock} is held.
|
|
+// */
|
|
+// private void pollFromQueueIntoPrepolled() {
|
|
+// long packedXZWithDistance;
|
|
+// long packedXZ;
|
|
+// Queue<R> tasks;
|
|
+// do {
|
|
+// if (this.chunkQueue.isEmpty()) {
|
|
+// return;
|
|
+// }
|
|
+// packedXZWithDistance = this.chunkQueue.firstLong();
|
|
+// packedXZ = stripTightlyPackedDistance(packedXZWithDistance);
|
|
+// tasks = this.tasksPerChunk.get(packedXZ);
|
|
+// // Hot-fix for when tasks is null, but this *should* not be happening (but it currently happens sometimes)
|
|
+// } while (tasks == null || tasks.isEmpty());
|
|
+// this.prepolledRunnable = tasks.poll();
|
|
+// this.prepolledRunnablePackedXZ = packedXZ;
|
|
+// this.prepolledRunnableDistance = unpackTightlyPackedDistance(packedXZWithDistance);
|
|
+// if (tasks.isEmpty()) {
|
|
+// this.distancePerChunk.remove(packedXZ);
|
|
+// this.recycleTaskQueue(tasks);
|
|
+// this.tasksPerChunk.remove(packedXZ);
|
|
+// this.chunkQueue.remove(packedXZWithDistance);
|
|
+// }
|
|
+// }
|
|
+
|
|
+ private @Nullable R pollFromQueue() {
|
|
+ long packedXZWithDistance;
|
|
+ long packedXZ;
|
|
+ Queue<R> tasks;
|
|
+ if (this.chunkQueue.isEmpty()) {
|
|
+ return null;
|
|
+ }
|
|
+ packedXZWithDistance = this.chunkQueue.firstLong();
|
|
+ packedXZ = stripTightlyPackedDistance(packedXZWithDistance);
|
|
+ tasks = this.tasksPerChunk.get(packedXZ);
|
|
+ R task = tasks.peek();
|
|
+ if (this.blockingCount == 0 && !this.shouldRun(task)) {
|
|
+ return null;
|
|
+ }
|
|
+ tasks.poll();
|
|
+ if (tasks.isEmpty()) {
|
|
+ this.distancePerChunk.remove(packedXZ);
|
|
+ this.recycleTaskQueue(tasks);
|
|
+ this.tasksPerChunk.remove(packedXZ);
|
|
+ this.chunkQueue.remove(packedXZWithDistance);
|
|
+ }
|
|
+ return task;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean pollTask() {
|
|
+ if (this.pendingTaskCount == 0) {
|
|
+ return false;
|
|
+ }
|
|
+ R runnable;
|
|
+ try (var ignored = this.lock.withSpinLock()) {
|
|
+ this.processChunkDistanceUpdates();
|
|
+ // Pre-poll a task if necessary
|
|
+// if (this.prepolledRunnable == null) {
|
|
+// this.pollFromQueueIntoPrepolled();
|
|
+// }
|
|
+// runnable = this.prepolledRunnable;
|
|
+ runnable = this.pollFromQueue();
|
|
+ // If it is still null, there are no tasks
|
|
+ if (runnable == null) {
|
|
+ return false;
|
|
+ }
|
|
+// if (this.blockingCount == 0 && !this.shouldRun(runnable)) {
|
|
+// return false;
|
|
+// }
|
|
+// this.prepolledRunnable = null;
|
|
+ //noinspection NonAtomicOperationOnVolatileField
|
|
+ this.pendingTaskCount--;
|
|
+ }
|
|
+ this.doRunTask(runnable);
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+}
|
|
diff --git a/src/main/java/org/galemc/gale/executor/queue/AllLevelsScheduledChunkCacheTaskQueue.java b/src/main/java/org/galemc/gale/executor/queue/AllLevelsScheduledChunkCacheTaskQueue.java
|
|
new file mode 100644
|
|
index 0000000000000000000000000000000000000000..fe2e06a827555d81a30697f8b08667692a3eeade
|
|
--- /dev/null
|
|
+++ b/src/main/java/org/galemc/gale/executor/queue/AllLevelsScheduledChunkCacheTaskQueue.java
|
|
@@ -0,0 +1,52 @@
|
|
+// Gale - base thread pool
|
|
+
|
|
+package org.galemc.gale.executor.queue;
|
|
+
|
|
+import net.minecraft.server.MinecraftServer;
|
|
+import net.minecraft.server.level.ServerChunkCache;
|
|
+import net.minecraft.server.level.ServerLevel;
|
|
+import org.galemc.gale.executor.TaskSpan;
|
|
+import org.galemc.gale.executor.annotation.thread.AnyThreadSafe;
|
|
+import org.galemc.gale.executor.annotation.YieldFree;
|
|
+import org.galemc.gale.executor.thread.ServerThread;
|
|
+import org.jetbrains.annotations.Nullable;
|
|
+
|
|
+/**
|
|
+ * This class provides access to, but does not store, the tasks scheduled to be executed on the main thread,
|
|
+ * that are scheduled and normally polled by each world's {@link ServerChunkCache#mainThreadProcessor} in their
|
|
+ * respective {@link ServerChunkCache.MainThreadExecutor#managedBlock}. These tasks could normally also be run in the
|
|
+ * server's {@link MinecraftServer#managedBlock} if there were no more global scheduled server thread tasks, and as
|
|
+ * such we provide access to polling these tasks from a {@link ServerThread}.
|
|
+ * <br>
|
|
+ * All tasks provided by this queue must be yield-free.
|
|
+ *
|
|
+ * @author Martijn Muijsers under AGPL-3.0
|
|
+ */
|
|
+@AnyThreadSafe
|
|
+@YieldFree
|
|
+public final class AllLevelsScheduledChunkCacheTaskQueue extends AllLevelsScheduledTaskQueue {
|
|
+
|
|
+ AllLevelsScheduledChunkCacheTaskQueue() {
|
|
+ super(TaskSpan.YIELDING, false);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public String getName() {
|
|
+ return "AllLevelsScheduledChunkCache";
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ protected boolean hasLevelTasks(ServerLevel level) {
|
|
+ return level.getChunkSource().mainThreadProcessor.hasPendingTasks();
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ protected @Nullable Runnable pollLevel(ServerLevel level) {
|
|
+ var executor = level.getChunkSource().mainThreadProcessor;
|
|
+ if (executor.hasPendingTasks()) {
|
|
+ return executor::pollTask;
|
|
+ }
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+}
|
|
diff --git a/src/main/java/org/galemc/gale/executor/queue/AllLevelsScheduledTaskQueue.java b/src/main/java/org/galemc/gale/executor/queue/AllLevelsScheduledTaskQueue.java
|
|
index b4172f285fbed1f314891b2f729aa2dc27b9ab9b..ed642b13e95479d0ec98731a3f5b74cf2fb78f81 100644
|
|
--- a/src/main/java/org/galemc/gale/executor/queue/AllLevelsScheduledTaskQueue.java
|
|
+++ b/src/main/java/org/galemc/gale/executor/queue/AllLevelsScheduledTaskQueue.java
|
|
@@ -12,7 +12,8 @@ import org.galemc.gale.executor.thread.pool.BaseThreadActivation;
|
|
import org.jetbrains.annotations.Nullable;
|
|
|
|
/**
|
|
- * Common implementation for queues with scheduled tasks for all levels.
|
|
+ * Common implementation for queues with scheduled tasks for all levels,
|
|
+ * such as {@link AllLevelsScheduledChunkCacheTaskQueue}.
|
|
* <br>
|
|
* All tasks provided by this queue must be yield-free.
|
|
*
|
|
diff --git a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java
|
|
index f4adcdcad96b2748c60aecb8f5c25370ee6e8f5b..8465ce8de44d823aac4784fbc5183b9fc49b2825 100644
|
|
--- a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java
|
|
+++ b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueueTier.java
|
|
@@ -61,7 +61,8 @@ public enum BaseTaskQueueTier {
|
|
SERVER(new AbstractTaskQueue[]{
|
|
BaseTaskQueues.deferredToServerThread,
|
|
BaseTaskQueues.serverThreadTick,
|
|
- BaseTaskQueues.anyTickScheduledServerThread
|
|
+ BaseTaskQueues.anyTickScheduledServerThread,
|
|
+ BaseTaskQueues.allLevelsScheduledChunkCache
|
|
}, MinecraftServer.SERVER_THREAD_PRIORITY),
|
|
/**
|
|
* A tier for queues that contain tasks that are part of ticking,
|
|
diff --git a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java
|
|
index 92721a51268becb05d708db04e9d6daaa66fb8b2..c608cdfc17e02a37e8f1799af2b26f973a32c839 100644
|
|
--- a/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java
|
|
+++ b/src/main/java/org/galemc/gale/executor/queue/BaseTaskQueues.java
|
|
@@ -90,6 +90,11 @@ public final class BaseTaskQueues {
|
|
*/
|
|
public static final SimpleTaskQueue tickAssist = SimpleTaskQueue.allSpans("TickAssist");
|
|
|
|
+ /**
|
|
+ * @see AllLevelsScheduledChunkCacheTaskQueue
|
|
+ */
|
|
+ public static final AllLevelsScheduledChunkCacheTaskQueue allLevelsScheduledChunkCache = new AllLevelsScheduledChunkCacheTaskQueue();
|
|
+
|
|
/**
|
|
* This queue stores the tasks posted to {@link MCUtil#cleanerExecutor}.
|
|
*/
|