diff --git a/leaf-server/src/main/java/org/dreeam/leaf/async/AsyncPlayerDataSaving.java b/leaf-server/src/main/java/org/dreeam/leaf/async/AsyncPlayerDataSaving.java index 3df32778..b67eda75 100644 --- a/leaf-server/src/main/java/org/dreeam/leaf/async/AsyncPlayerDataSaving.java +++ b/leaf-server/src/main/java/org/dreeam/leaf/async/AsyncPlayerDataSaving.java @@ -12,20 +12,30 @@ import java.util.concurrent.TimeUnit; public class AsyncPlayerDataSaving { - public static final ExecutorService IO_POOL = new ThreadPoolExecutor( - 1, 1, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), - new com.google.common.util.concurrent.ThreadFactoryBuilder() - .setPriority(Thread.NORM_PRIORITY - 2) - .setNameFormat("Leaf IO Thread") - .setUncaughtExceptionHandler(Util::onThreadException) - .build(), - new ThreadPoolExecutor.DiscardPolicy() - ); + public static ExecutorService IO_POOL = null; private AsyncPlayerDataSaving() { } + public static void init() { + if (IO_POOL == null) { + IO_POOL = new ThreadPoolExecutor( + 1, + 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new com.google.common.util.concurrent.ThreadFactoryBuilder() + .setPriority(Thread.NORM_PRIORITY - 2) + .setNameFormat("Leaf IO Thread") + .setUncaughtExceptionHandler(Util::onThreadException) + .build(), + new ThreadPoolExecutor.DiscardPolicy() + ); + } else { + throw new IllegalStateException(); + } + } + public static Optional> submit(Runnable runnable) { if (!AsyncPlayerDataSave.enabled) { runnable.run(); diff --git a/leaf-server/src/main/java/org/dreeam/leaf/async/ShutdownExecutors.java b/leaf-server/src/main/java/org/dreeam/leaf/async/ShutdownExecutors.java index b3d1f658..be79ba70 100644 --- a/leaf-server/src/main/java/org/dreeam/leaf/async/ShutdownExecutors.java +++ b/leaf-server/src/main/java/org/dreeam/leaf/async/ShutdownExecutors.java @@ -10,6 +10,7 @@ import org.dreeam.leaf.async.tracker.MultithreadedTracker; import java.util.concurrent.TimeUnit; public class ShutdownExecutors { + public static final Logger LOGGER = LogManager.getLogger("Leaf"); public static void shutdown(MinecraftServer server) { @@ -48,11 +49,11 @@ public class ShutdownExecutors { } } - if (AsyncPathProcessor.pathProcessingExecutor != null) { + if (AsyncPathProcessor.PATH_PROCESSING_EXECUTOR != null) { LOGGER.info("Waiting for mob pathfinding executor to shutdown..."); - AsyncPathProcessor.pathProcessingExecutor.shutdown(); + AsyncPathProcessor.PATH_PROCESSING_EXECUTOR.shutdown(); try { - AsyncPathProcessor.pathProcessingExecutor.awaitTermination(10L, TimeUnit.SECONDS); + AsyncPathProcessor.PATH_PROCESSING_EXECUTOR.awaitTermination(10L, TimeUnit.SECONDS); } catch (InterruptedException ignored) { } } diff --git a/leaf-server/src/main/java/org/dreeam/leaf/async/path/AsyncPathProcessor.java b/leaf-server/src/main/java/org/dreeam/leaf/async/path/AsyncPathProcessor.java index 3fac5839..a1f2fb95 100644 --- a/leaf-server/src/main/java/org/dreeam/leaf/async/path/AsyncPathProcessor.java +++ b/leaf-server/src/main/java/org/dreeam/leaf/async/path/AsyncPathProcessor.java @@ -1,6 +1,7 @@ package org.dreeam.leaf.async.path; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import net.minecraft.Util; import net.minecraft.server.MinecraftServer; import net.minecraft.world.level.pathfinder.Path; import org.apache.logging.log4j.LogManager; @@ -15,6 +16,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -28,49 +30,25 @@ public class AsyncPathProcessor { private static final String THREAD_PREFIX = "Leaf Async Pathfinding"; private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX); private static long lastWarnMillis = System.currentTimeMillis(); - public static final ThreadPoolExecutor pathProcessingExecutor = new ThreadPoolExecutor( - 1, - AsyncPathfinding.asyncPathfindingMaxThreads, - AsyncPathfinding.asyncPathfindingKeepalive, TimeUnit.SECONDS, - getQueueImpl(), - new ThreadFactoryBuilder() - .setNameFormat(THREAD_PREFIX + " Thread - %d") - .setPriority(Thread.NORM_PRIORITY - 2) - .build(), - new RejectedTaskHandler() - ); + public static ThreadPoolExecutor PATH_PROCESSING_EXECUTOR = null; - private static class RejectedTaskHandler implements RejectedExecutionHandler { - @Override - public void rejectedExecution(Runnable rejectedTask, ThreadPoolExecutor executor) { - BlockingQueue workQueue = executor.getQueue(); - if (!executor.isShutdown()) { - switch (AsyncPathfinding.asyncPathfindingRejectPolicy) { - case FLUSH_ALL -> { - if (!workQueue.isEmpty()) { - List pendingTasks = new ArrayList<>(workQueue.size()); - - workQueue.drainTo(pendingTasks); - - for (Runnable pendingTask : pendingTasks) { - pendingTask.run(); - } - } - rejectedTask.run(); - } - case CALLER_RUNS -> rejectedTask.run(); - } - } - - if (System.currentTimeMillis() - lastWarnMillis > 30000L) { - LOGGER.warn("Async pathfinding processor is busy! Pathfinding tasks will be treated as policy defined in config. Increasing max-threads in Leaf config may help."); - lastWarnMillis = System.currentTimeMillis(); - } + public static void init() { + if (PATH_PROCESSING_EXECUTOR == null) { + PATH_PROCESSING_EXECUTOR = new ThreadPoolExecutor( + getCorePoolSize(), + getMaxPoolSize(), + getKeepAliveTime(), TimeUnit.SECONDS, + getQueueImpl(), + getThreadFactory(), + getRejectedPolicy() + ); + } else { + throw new IllegalStateException(); } } protected static CompletableFuture queue(@NotNull AsyncPath path) { - return CompletableFuture.runAsync(path::process, pathProcessingExecutor) + return CompletableFuture.runAsync(path::process, PATH_PROCESSING_EXECUTOR) .orTimeout(60L, TimeUnit.SECONDS) .exceptionally(throwable -> { if (throwable instanceof TimeoutException e) { @@ -98,9 +76,57 @@ public class AsyncPathProcessor { } } + private static int getCorePoolSize() { + return 1; + } + + private static int getMaxPoolSize() { + return AsyncPathfinding.asyncPathfindingMaxThreads; + } + + private static long getKeepAliveTime() { + return AsyncPathfinding.asyncPathfindingKeepalive; + } + private static BlockingQueue getQueueImpl() { final int queueCapacity = AsyncPathfinding.asyncPathfindingQueueSize; return new LinkedBlockingQueue<>(queueCapacity); } + + private static @NotNull ThreadFactory getThreadFactory() { + return new ThreadFactoryBuilder() + .setNameFormat(THREAD_PREFIX + " Thread - %d") + .setPriority(Thread.NORM_PRIORITY - 2) + .setUncaughtExceptionHandler(Util::onThreadException) + .build(); + } + + private static @NotNull RejectedExecutionHandler getRejectedPolicy() { + return (Runnable rejectedTask, ThreadPoolExecutor executor) -> { + BlockingQueue workQueue = executor.getQueue(); + if (!executor.isShutdown()) { + switch (AsyncPathfinding.asyncPathfindingRejectPolicy) { + case FLUSH_ALL -> { + if (!workQueue.isEmpty()) { + List pendingTasks = new ArrayList<>(workQueue.size()); + + workQueue.drainTo(pendingTasks); + + for (Runnable pendingTask : pendingTasks) { + pendingTask.run(); + } + } + rejectedTask.run(); + } + case CALLER_RUNS -> rejectedTask.run(); + } + } + + if (System.currentTimeMillis() - lastWarnMillis > 30000L) { + LOGGER.warn("Async pathfinding processor is busy! Pathfinding tasks will be treated as policy defined in config. Increasing max-threads in Leaf config may help."); + lastWarnMillis = System.currentTimeMillis(); + } + }; + } } diff --git a/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/AsyncPathfinding.java b/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/AsyncPathfinding.java index 003458e5..19eabb55 100644 --- a/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/AsyncPathfinding.java +++ b/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/AsyncPathfinding.java @@ -60,5 +60,9 @@ public class AsyncPathfinding extends ConfigModules { ? PathfindTaskRejectPolicy.FLUSH_ALL.toString() : PathfindTaskRejectPolicy.CALLER_RUNS.toString()) ); + + if (enabled) { + org.dreeam.leaf.async.path.AsyncPathProcessor.init(); + } } } diff --git a/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/AsyncPlayerDataSave.java b/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/AsyncPlayerDataSave.java index 6d555ce0..c87908e2 100644 --- a/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/AsyncPlayerDataSave.java +++ b/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/AsyncPlayerDataSave.java @@ -19,5 +19,9 @@ public class AsyncPlayerDataSave extends ConfigModules { 异步保存玩家数据."""); enabled = config.getBoolean(getBasePath() + ".enabled", enabled); + + if (enabled) { + org.dreeam.leaf.async.AsyncPlayerDataSaving.init(); + } } } diff --git a/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/MultithreadedTracker.java b/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/MultithreadedTracker.java index 7a01b4b1..8c6c257f 100644 --- a/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/MultithreadedTracker.java +++ b/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/MultithreadedTracker.java @@ -57,6 +57,7 @@ public class MultithreadedTracker extends ConfigModules { if (asyncEntityTrackerQueueSize <= 0) asyncEntityTrackerQueueSize = asyncEntityTrackerMaxThreads * 384; + if (enabled) { org.dreeam.leaf.async.tracker.MultithreadedTracker.init(); }