9
0
mirror of https://github.com/Winds-Studio/Leaf.git synced 2025-12-19 15:09:25 +00:00

Some fixes for failed to shutdown if some async are disabled

This commit is contained in:
Dreeam
2025-06-11 07:56:22 +08:00
parent bf2158577d
commit 38fe533d32
6 changed files with 97 additions and 51 deletions

View File

@@ -12,20 +12,30 @@ import java.util.concurrent.TimeUnit;
public class AsyncPlayerDataSaving {
public static final ExecutorService IO_POOL = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new com.google.common.util.concurrent.ThreadFactoryBuilder()
.setPriority(Thread.NORM_PRIORITY - 2)
.setNameFormat("Leaf IO Thread")
.setUncaughtExceptionHandler(Util::onThreadException)
.build(),
new ThreadPoolExecutor.DiscardPolicy()
);
public static ExecutorService IO_POOL = null;
private AsyncPlayerDataSaving() {
}
public static void init() {
if (IO_POOL == null) {
IO_POOL = new ThreadPoolExecutor(
1,
1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new com.google.common.util.concurrent.ThreadFactoryBuilder()
.setPriority(Thread.NORM_PRIORITY - 2)
.setNameFormat("Leaf IO Thread")
.setUncaughtExceptionHandler(Util::onThreadException)
.build(),
new ThreadPoolExecutor.DiscardPolicy()
);
} else {
throw new IllegalStateException();
}
}
public static Optional<Future<?>> submit(Runnable runnable) {
if (!AsyncPlayerDataSave.enabled) {
runnable.run();

View File

@@ -10,6 +10,7 @@ import org.dreeam.leaf.async.tracker.MultithreadedTracker;
import java.util.concurrent.TimeUnit;
public class ShutdownExecutors {
public static final Logger LOGGER = LogManager.getLogger("Leaf");
public static void shutdown(MinecraftServer server) {
@@ -48,11 +49,11 @@ public class ShutdownExecutors {
}
}
if (AsyncPathProcessor.pathProcessingExecutor != null) {
if (AsyncPathProcessor.PATH_PROCESSING_EXECUTOR != null) {
LOGGER.info("Waiting for mob pathfinding executor to shutdown...");
AsyncPathProcessor.pathProcessingExecutor.shutdown();
AsyncPathProcessor.PATH_PROCESSING_EXECUTOR.shutdown();
try {
AsyncPathProcessor.pathProcessingExecutor.awaitTermination(10L, TimeUnit.SECONDS);
AsyncPathProcessor.PATH_PROCESSING_EXECUTOR.awaitTermination(10L, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
}
}

View File

@@ -1,6 +1,7 @@
package org.dreeam.leaf.async.path;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import net.minecraft.Util;
import net.minecraft.server.MinecraftServer;
import net.minecraft.world.level.pathfinder.Path;
import org.apache.logging.log4j.LogManager;
@@ -15,6 +16,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -28,49 +30,25 @@ public class AsyncPathProcessor {
private static final String THREAD_PREFIX = "Leaf Async Pathfinding";
private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX);
private static long lastWarnMillis = System.currentTimeMillis();
public static final ThreadPoolExecutor pathProcessingExecutor = new ThreadPoolExecutor(
1,
AsyncPathfinding.asyncPathfindingMaxThreads,
AsyncPathfinding.asyncPathfindingKeepalive, TimeUnit.SECONDS,
getQueueImpl(),
new ThreadFactoryBuilder()
.setNameFormat(THREAD_PREFIX + " Thread - %d")
.setPriority(Thread.NORM_PRIORITY - 2)
.build(),
new RejectedTaskHandler()
);
public static ThreadPoolExecutor PATH_PROCESSING_EXECUTOR = null;
private static class RejectedTaskHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable rejectedTask, ThreadPoolExecutor executor) {
BlockingQueue<Runnable> workQueue = executor.getQueue();
if (!executor.isShutdown()) {
switch (AsyncPathfinding.asyncPathfindingRejectPolicy) {
case FLUSH_ALL -> {
if (!workQueue.isEmpty()) {
List<Runnable> pendingTasks = new ArrayList<>(workQueue.size());
workQueue.drainTo(pendingTasks);
for (Runnable pendingTask : pendingTasks) {
pendingTask.run();
}
}
rejectedTask.run();
}
case CALLER_RUNS -> rejectedTask.run();
}
}
if (System.currentTimeMillis() - lastWarnMillis > 30000L) {
LOGGER.warn("Async pathfinding processor is busy! Pathfinding tasks will be treated as policy defined in config. Increasing max-threads in Leaf config may help.");
lastWarnMillis = System.currentTimeMillis();
}
public static void init() {
if (PATH_PROCESSING_EXECUTOR == null) {
PATH_PROCESSING_EXECUTOR = new ThreadPoolExecutor(
getCorePoolSize(),
getMaxPoolSize(),
getKeepAliveTime(), TimeUnit.SECONDS,
getQueueImpl(),
getThreadFactory(),
getRejectedPolicy()
);
} else {
throw new IllegalStateException();
}
}
protected static CompletableFuture<Void> queue(@NotNull AsyncPath path) {
return CompletableFuture.runAsync(path::process, pathProcessingExecutor)
return CompletableFuture.runAsync(path::process, PATH_PROCESSING_EXECUTOR)
.orTimeout(60L, TimeUnit.SECONDS)
.exceptionally(throwable -> {
if (throwable instanceof TimeoutException e) {
@@ -98,9 +76,57 @@ public class AsyncPathProcessor {
}
}
private static int getCorePoolSize() {
return 1;
}
private static int getMaxPoolSize() {
return AsyncPathfinding.asyncPathfindingMaxThreads;
}
private static long getKeepAliveTime() {
return AsyncPathfinding.asyncPathfindingKeepalive;
}
private static BlockingQueue<Runnable> getQueueImpl() {
final int queueCapacity = AsyncPathfinding.asyncPathfindingQueueSize;
return new LinkedBlockingQueue<>(queueCapacity);
}
private static @NotNull ThreadFactory getThreadFactory() {
return new ThreadFactoryBuilder()
.setNameFormat(THREAD_PREFIX + " Thread - %d")
.setPriority(Thread.NORM_PRIORITY - 2)
.setUncaughtExceptionHandler(Util::onThreadException)
.build();
}
private static @NotNull RejectedExecutionHandler getRejectedPolicy() {
return (Runnable rejectedTask, ThreadPoolExecutor executor) -> {
BlockingQueue<Runnable> workQueue = executor.getQueue();
if (!executor.isShutdown()) {
switch (AsyncPathfinding.asyncPathfindingRejectPolicy) {
case FLUSH_ALL -> {
if (!workQueue.isEmpty()) {
List<Runnable> pendingTasks = new ArrayList<>(workQueue.size());
workQueue.drainTo(pendingTasks);
for (Runnable pendingTask : pendingTasks) {
pendingTask.run();
}
}
rejectedTask.run();
}
case CALLER_RUNS -> rejectedTask.run();
}
}
if (System.currentTimeMillis() - lastWarnMillis > 30000L) {
LOGGER.warn("Async pathfinding processor is busy! Pathfinding tasks will be treated as policy defined in config. Increasing max-threads in Leaf config may help.");
lastWarnMillis = System.currentTimeMillis();
}
};
}
}

View File

@@ -60,5 +60,9 @@ public class AsyncPathfinding extends ConfigModules {
? PathfindTaskRejectPolicy.FLUSH_ALL.toString()
: PathfindTaskRejectPolicy.CALLER_RUNS.toString())
);
if (enabled) {
org.dreeam.leaf.async.path.AsyncPathProcessor.init();
}
}
}

View File

@@ -19,5 +19,9 @@ public class AsyncPlayerDataSave extends ConfigModules {
异步保存玩家数据.""");
enabled = config.getBoolean(getBasePath() + ".enabled", enabled);
if (enabled) {
org.dreeam.leaf.async.AsyncPlayerDataSaving.init();
}
}
}

View File

@@ -57,6 +57,7 @@ public class MultithreadedTracker extends ConfigModules {
if (asyncEntityTrackerQueueSize <= 0)
asyncEntityTrackerQueueSize = asyncEntityTrackerMaxThreads * 384;
if (enabled) {
org.dreeam.leaf.async.tracker.MultithreadedTracker.init();
}