9
0
mirror of https://github.com/BX-Team/DivineMC.git synced 2025-12-22 08:19:19 +00:00

Async Join Thread and executors shutdown

This commit is contained in:
NONPLAYT
2025-07-10 17:52:11 +03:00
parent b088629964
commit a97e6c6855
8 changed files with 316 additions and 47 deletions

View File

@@ -11,6 +11,7 @@ import org.bukkit.configuration.ConfigurationSection;
import org.bukkit.configuration.MemoryConfiguration;
import org.bxteam.divinemc.config.annotations.Experimental;
import org.bxteam.divinemc.entity.pathfinding.PathfindTaskRejectPolicy;
import org.bxteam.divinemc.server.network.AsyncJoinHandler;
import org.jetbrains.annotations.Nullable;
import org.simpleyaml.configuration.comments.CommentType;
import org.simpleyaml.configuration.file.YamlFile;
@@ -229,6 +230,11 @@ public class DivineConfig {
public static int asyncEntityTrackerKeepalive = 60;
public static int asyncEntityTrackerQueueSize = 0;
// Async Join Thread settings
public static boolean asyncJoinEnabled = true;
public static int asyncJoinThreadCount = 1;
public static boolean asyncJoinUseVirtualThreads = false;
// Async chunk sending settings
public static boolean asyncChunkSendingEnabled = true;
@@ -240,6 +246,7 @@ public class DivineConfig {
regionizedChunkTicking();
asyncPathfinding();
multithreadedTracker();
asyncJoinSettings();
asyncChunkSending();
asyncMobSpawning();
}
@@ -333,6 +340,18 @@ public class DivineConfig {
if (asyncEntityTrackerQueueSize <= 0) asyncEntityTrackerQueueSize = asyncEntityTrackerMaxThreads * 384;
}
private static void asyncJoinSettings() {
asyncJoinEnabled = getBoolean(ConfigCategory.ASYNC.key("join-thread.enabled"), asyncJoinEnabled,
"Enables async join thread, which offloads player setup and connection tasks to a separate thread",
"This can significantly improve MSPT when multiple players are joining simultaneously");
asyncJoinThreadCount = getInt(ConfigCategory.ASYNC.key("join-thread.thread-count"), asyncJoinThreadCount,
"Number of threads to use for async join operations");
asyncJoinUseVirtualThreads = getBoolean(ConfigCategory.ASYNC.key("join-thread.use-virtual-threads"), asyncJoinUseVirtualThreads,
"Whether to use virtual threads for async join operations (requires Java 21+)");
AsyncJoinHandler.init(asyncJoinEnabled, asyncJoinThreadCount);
}
private static void asyncChunkSending() {
asyncChunkSendingEnabled = getBoolean(ConfigCategory.ASYNC.key("chunk-sending.enable"), asyncChunkSendingEnabled,
"Makes chunk sending asynchronous, which can significantly reduce main thread load when many players are loading chunks.");

View File

@@ -24,7 +24,7 @@ public class AsyncPathProcessor {
private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX);
private static long lastWarnMillis = System.currentTimeMillis();
private static final ThreadPoolExecutor pathProcessingExecutor = new ThreadPoolExecutor(
public static final ThreadPoolExecutor PATH_PROCESSING_EXECUTOR = new ThreadPoolExecutor(
1,
DivineConfig.AsyncCategory.asyncPathfindingMaxThreads,
DivineConfig.AsyncCategory.asyncPathfindingKeepalive, TimeUnit.SECONDS,
@@ -63,7 +63,7 @@ public class AsyncPathProcessor {
}
protected static CompletableFuture<Void> queue(@NotNull AsyncPath path) {
return CompletableFuture.runAsync(path::process, pathProcessingExecutor)
return CompletableFuture.runAsync(path::process, PATH_PROCESSING_EXECUTOR)
.orTimeout(60L, TimeUnit.SECONDS)
.exceptionally(throwable -> {
if (throwable instanceof TimeoutException e) {

View File

@@ -29,7 +29,7 @@ public class MultithreadedTracker {
private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX);
private static long lastWarnMillis = System.currentTimeMillis();
private static final ThreadPoolExecutor TRACKER_EXECUTOR = new ThreadPoolExecutor(
public static final ThreadPoolExecutor TRACKER_EXECUTOR = new ThreadPoolExecutor(
getCorePoolSize(),
getMaxPoolSize(),
getKeepAliveTime(), TimeUnit.SECONDS,

View File

@@ -0,0 +1,102 @@
package org.bxteam.divinemc.server.network;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import net.minecraft.server.MinecraftServer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bxteam.divinemc.config.DivineConfig;
import org.bxteam.divinemc.spark.ThreadDumperRegistry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
public class AsyncJoinHandler {
private static final String THREAD_PREFIX = "Async Join Thread";
public static final Logger LOGGER = LogManager.getLogger(AsyncJoinHandler.class.getSimpleName());
public static ExecutorService JOIN_EXECUTOR;
private static boolean enabled = false;
private static int threadCount = 2;
/**
* Initialize the AsyncJoinHandler with configuration settings
*/
public static void init(boolean enabled, int threadCount) {
AsyncJoinHandler.enabled = enabled;
AsyncJoinHandler.threadCount = Math.max(1, threadCount);
if (enabled) {
if (JOIN_EXECUTOR != null) {
JOIN_EXECUTOR.shutdown();
}
JOIN_EXECUTOR = org.bxteam.divinemc.config.DivineConfig.PerformanceCategory.virtualThreadsEnabled &&
DivineConfig.AsyncCategory.asyncJoinUseVirtualThreads
? Executors.newVirtualThreadPerTaskExecutor()
: Executors.newFixedThreadPool(
threadCount,
new ThreadFactoryBuilder()
.setNameFormat(THREAD_PREFIX)
.setDaemon(true)
.build()
);
ThreadDumperRegistry.REGISTRY.add(THREAD_PREFIX);
LOGGER.info("Initialized AsyncJoinHandler with {} threads", threadCount);
}
}
/**
* Execute a potentially blocking task asynchronously
*
* @param task The task to run asynchronously
* @param callback The callback to execute on the main thread when the task completes
* @param <T> The return type of the task
*/
public static <T> void runAsync(Supplier<T> task, java.util.function.Consumer<T> callback) {
if (!enabled || JOIN_EXECUTOR == null) {
T result = task.get();
callback.accept(result);
return;
}
CompletableFuture.supplyAsync(task, JOIN_EXECUTOR)
.thenAccept(result -> {
MinecraftServer.getServer().execute(() -> callback.accept(result));
})
.exceptionally(ex -> {
LOGGER.error("Error during async join operation", ex);
return null;
});
}
/**
* Execute a potentially blocking task asynchronously without a result
*
* @param asyncTask The task to run asynchronously
*/
public static void runAsync(Runnable asyncTask) {
if (!enabled || JOIN_EXECUTOR == null) {
asyncTask.run();
return;
}
CompletableFuture.runAsync(asyncTask, JOIN_EXECUTOR)
.thenRun(() -> MinecraftServer.getServer().execute(asyncTask))
.exceptionally(ex -> {
LOGGER.error("Error during async join operation", ex);
return null;
});
}
/**
* Get the executor service for async join operations
*/
public static Executor getExecutor() {
return enabled && JOIN_EXECUTOR != null ? JOIN_EXECUTOR : Runnable::run;
}
}

View File

@@ -1,61 +1,62 @@
package org.bxteam.divinemc.util;
import ca.spottedleaf.moonrise.common.util.TickThread;
import it.unimi.dsi.fastutil.PriorityQueue;
import it.unimi.dsi.fastutil.PriorityQueues;
import it.unimi.dsi.fastutil.objects.ObjectArrayFIFOQueue;
import net.minecraft.Util;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bxteam.divinemc.spark.ThreadDumperRegistry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.LockSupport;
public class AsyncProcessor {
public class AsyncProcessor implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(AsyncProcessor.class);
private final BlockingQueue<Runnable> taskQueue;
private final Thread workerThread;
private volatile boolean isRunning;
public final Thread thread;
private final PriorityQueue<Runnable> jobs = PriorityQueues.synchronize(new ObjectArrayFIFOQueue<>());
private volatile boolean killswitch = false;
public AsyncProcessor(String threadName) {
this.taskQueue = new LinkedBlockingQueue<>();
this.isRunning = true;
this.thread = Thread.ofPlatform()
.name(threadName)
.priority(Thread.NORM_PRIORITY - 1)
.daemon(false)
.uncaughtExceptionHandler(Util::onThreadException)
.unstarted(this);
}
this.workerThread = new TickThread(() -> {
while (isRunning || !taskQueue.isEmpty()) {
public void start() {
thread.start();
ThreadDumperRegistry.REGISTRY.add(thread.getName());
}
public void join(long millis) throws InterruptedException {
killswitch = true;
LockSupport.unpark(thread);
thread.join(millis);
}
public void submit(Runnable runnable) {
jobs.enqueue(runnable);
LockSupport.unpark(thread);
}
@Override
public void run() {
while (!killswitch) {
try {
Runnable runnable;
try {
Runnable task = taskQueue.take();
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
LOGGER.error("An unexpected error occurred when running async processor: {}", e.getMessage(), e);
runnable = jobs.dequeue();
} catch (NoSuchElementException e) {
LockSupport.park();
continue;
}
runnable.run();
} catch (Exception e) {
LOGGER.error("Failed to execute async job for thread {}", thread.getName(), e);
}
}, threadName);
ThreadDumperRegistry.REGISTRY.add(threadName);
this.workerThread.start();
}
public void submit(Runnable task) {
if (!isRunning) {
throw new IllegalStateException("AsyncExecutor is not running.");
}
taskQueue.offer(task);
}
public void shutdown() {
isRunning = false;
workerThread.interrupt();
}
public void shutdownNow() {
isRunning = false;
workerThread.interrupt();
taskQueue.clear();
}
public boolean isRunning() {
return isRunning;
}
}

View File

@@ -0,0 +1,51 @@
package org.bxteam.divinemc.util;
import net.minecraft.server.MinecraftServer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bxteam.divinemc.entity.pathfinding.AsyncPathProcessor;
import org.bxteam.divinemc.entity.tracking.MultithreadedTracker;
import org.bxteam.divinemc.server.network.AsyncJoinHandler;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("ConstantValue")
public class ExecutorShutdown {
public static final Logger LOGGER = LogManager.getLogger(ExecutorShutdown.class.getSimpleName());
public static void shutdown(MinecraftServer server) {
if (server.mobSpawnExecutor != null && server.mobSpawnExecutor.thread.isAlive()) {
LOGGER.info("Shutting down mob spawn executor...");
try {
server.mobSpawnExecutor.join(3000L);
} catch (InterruptedException ignored) { }
}
if (MultithreadedTracker.TRACKER_EXECUTOR != null) {
LOGGER.info("Shutting down mob tracker executor...");
MultithreadedTracker.TRACKER_EXECUTOR.shutdown();
try {
MultithreadedTracker.TRACKER_EXECUTOR.awaitTermination(10L, TimeUnit.SECONDS);
} catch (InterruptedException ignored) { }
}
if (AsyncPathProcessor.PATH_PROCESSING_EXECUTOR != null) {
LOGGER.info("Shutting down mob pathfinding processing executor...");
AsyncPathProcessor.PATH_PROCESSING_EXECUTOR.shutdown();
try {
AsyncPathProcessor.PATH_PROCESSING_EXECUTOR.awaitTermination(10L, TimeUnit.SECONDS);
} catch (InterruptedException ignored) { }
}
if (AsyncJoinHandler.JOIN_EXECUTOR != null) {
LOGGER.info("Shutting down async join executor...");
AsyncJoinHandler.JOIN_EXECUTOR.shutdown();
try {
AsyncJoinHandler.JOIN_EXECUTOR.awaitTermination(10L, TimeUnit.SECONDS);
} catch (InterruptedException ignored) { }
}
}
}