From c963bb3315cbddbba9be26b647ecf63034b0a4bf Mon Sep 17 00:00:00 2001 From: Creeam <102713261+HaHaWTH@users.noreply.github.com> Date: Thu, 6 Feb 2025 15:12:44 -0800 Subject: [PATCH] Async task handling improvement (#209) * All threads are core thread * Drop later submitted pathfinding tasks if task before is not started yet * Auto-resize is gone * Refine error handling * Handle rejected execution * Limit size and schedule on EntityScheduler * Allow pr to build * Remove duplicate path handling Since it's a very rare case and Kaiiju has already done something to handle this * Update thread and logger name format * Core pool to 1 * Revert entity scheduler changes * Expose queue size to config * Add reject policy config to pathfinding * [ci/skip] To uppercase * [ci/skip] Add co-authors --------- Co-authored-by: Taiyou06 Co-authored-by: Altiami --- .github/workflows/build-1214.yml | 2 +- .../0026-Petal-Async-Pathfinding.patch | 4 ++ .../leaf/async/path/AsyncPathProcessor.java | 62 +++++++++++++++++-- .../async/path/PathfindTaskRejectPolicy.java | 19 ++++++ .../async/tracker/MultithreadedTracker.java | 15 ++--- .../modules/async/AsyncPathfinding.java | 17 +++++ .../modules/async/MultithreadedTracker.java | 14 ++--- 7 files changed, 108 insertions(+), 25 deletions(-) create mode 100644 leaf-server/src/main/java/org/dreeam/leaf/async/path/PathfindTaskRejectPolicy.java diff --git a/.github/workflows/build-1214.yml b/.github/workflows/build-1214.yml index 97e5ad46..8cddfe07 100644 --- a/.github/workflows/build-1214.yml +++ b/.github/workflows/build-1214.yml @@ -4,7 +4,7 @@ on: push: branches: [ "dev/1.21.4" ] pull_request: - branches: [ "ver/1.21.4" ] + branches: [ "dev/1.21.4" ] jobs: build: diff --git a/leaf-server/minecraft-patches/features/0026-Petal-Async-Pathfinding.patch b/leaf-server/minecraft-patches/features/0026-Petal-Async-Pathfinding.patch index f9c0b103..37807a30 100644 --- a/leaf-server/minecraft-patches/features/0026-Petal-Async-Pathfinding.patch +++ b/leaf-server/minecraft-patches/features/0026-Petal-Async-Pathfinding.patch @@ -10,6 +10,10 @@ Original project: https://github.com/KaiijuMC/Kaiiju Original license: GPLv3 Original project: https://github.com/Bloom-host/Petal +Co-authored-by: HaHaWTH <102713261+HaHaWTH@users.noreply.github.com> +Co-authored-by: Taiyou06 +Co-authored-by: Altiami + This patch was ported downstream from the Petal fork. Makes most pathfinding-related work happen asynchronously diff --git a/leaf-server/src/main/java/org/dreeam/leaf/async/path/AsyncPathProcessor.java b/leaf-server/src/main/java/org/dreeam/leaf/async/path/AsyncPathProcessor.java index 1070cf69..f9881486 100644 --- a/leaf-server/src/main/java/org/dreeam/leaf/async/path/AsyncPathProcessor.java +++ b/leaf-server/src/main/java/org/dreeam/leaf/async/path/AsyncPathProcessor.java @@ -1,13 +1,18 @@ package org.dreeam.leaf.async.path; +import com.destroystokyo.paper.util.SneakyThrow; import com.google.common.util.concurrent.ThreadFactoryBuilder; import net.minecraft.server.MinecraftServer; import net.minecraft.world.level.pathfinder.Path; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.*; import java.util.function.Consumer; @@ -16,19 +21,59 @@ import java.util.function.Consumer; */ public class AsyncPathProcessor { - private static final Executor pathProcessingExecutor = new ThreadPoolExecutor( + private static final String THREAD_PREFIX = "Leaf Async Pathfinding"; + private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX); + private static long lastWarnMillis = System.currentTimeMillis(); + private static final ThreadPoolExecutor pathProcessingExecutor = new ThreadPoolExecutor( 1, org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingMaxThreads, org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingKeepalive, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), + getQueueImpl(), new ThreadFactoryBuilder() - .setNameFormat("Leaf Async Pathfinding Thread - %d") + .setNameFormat(THREAD_PREFIX + " Thread - %d") .setPriority(Thread.NORM_PRIORITY - 2) - .build() + .build(), + new RejectedTaskHandler() ); + private static class RejectedTaskHandler implements RejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable rejectedTask, ThreadPoolExecutor executor) { + BlockingQueue workQueue = executor.getQueue(); + if (!executor.isShutdown()) { + switch (org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingRejectPolicy) { + case FLUSH_ALL -> { + if (!workQueue.isEmpty()) { + List pendingTasks = new ArrayList<>(workQueue.size()); + + workQueue.drainTo(pendingTasks); + + for (Runnable pendingTask : pendingTasks) { + pendingTask.run(); + } + } + rejectedTask.run(); + } + case CALLER_RUNS -> rejectedTask.run(); + } + } + + if (System.currentTimeMillis() - lastWarnMillis > 30000L) { + LOGGER.warn("Async pathfinding processor is busy! Pathfinding tasks will be treated as policy defined in config. Increasing max-threads in Leaf config may help."); + lastWarnMillis = System.currentTimeMillis(); + } + } + } + protected static CompletableFuture queue(@NotNull AsyncPath path) { - return CompletableFuture.runAsync(path::process, pathProcessingExecutor); + return CompletableFuture.runAsync(path::process, pathProcessingExecutor) + .orTimeout(60L, TimeUnit.SECONDS) + .exceptionally(throwable -> { + if (throwable instanceof TimeoutException e) { + LOGGER.warn("Async Pathfinding process timed out", e); + } else SneakyThrow.sneaky(throwable); + return null; + }); } /** @@ -48,4 +93,9 @@ public class AsyncPathProcessor { afterProcessing.accept(path); } } -} + + private static BlockingQueue getQueueImpl() { + final int queueCapacity = org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingQueueSize; + + return new LinkedBlockingQueue<>(queueCapacity); + }} diff --git a/leaf-server/src/main/java/org/dreeam/leaf/async/path/PathfindTaskRejectPolicy.java b/leaf-server/src/main/java/org/dreeam/leaf/async/path/PathfindTaskRejectPolicy.java new file mode 100644 index 00000000..4e195f44 --- /dev/null +++ b/leaf-server/src/main/java/org/dreeam/leaf/async/path/PathfindTaskRejectPolicy.java @@ -0,0 +1,19 @@ +package org.dreeam.leaf.async.path; + +import org.dreeam.leaf.config.LeafConfig; + +import java.util.Locale; + +public enum PathfindTaskRejectPolicy { + FLUSH_ALL, + CALLER_RUNS; + + public static PathfindTaskRejectPolicy fromString(String policy) { + try { + return PathfindTaskRejectPolicy.valueOf(policy.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + LeafConfig.LOGGER.warn("Invalid pathfind task reject policy: {}, falling back to {}.", policy, FLUSH_ALL.toString()); + return FLUSH_ALL; + } + } +} diff --git a/leaf-server/src/main/java/org/dreeam/leaf/async/tracker/MultithreadedTracker.java b/leaf-server/src/main/java/org/dreeam/leaf/async/tracker/MultithreadedTracker.java index d4c23412..91b48ee0 100644 --- a/leaf-server/src/main/java/org/dreeam/leaf/async/tracker/MultithreadedTracker.java +++ b/leaf-server/src/main/java/org/dreeam/leaf/async/tracker/MultithreadedTracker.java @@ -20,7 +20,8 @@ import java.util.concurrent.*; public class MultithreadedTracker { - private static final Logger LOGGER = LogManager.getLogger("MultithreadedTracker"); + private static final String THREAD_PREFIX = "Leaf Async Tracker"; + private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX); private static long lastWarnMillis = System.currentTimeMillis(); private static final ThreadPoolExecutor trackerExecutor = new ThreadPoolExecutor( getCorePoolSize(), @@ -128,19 +129,15 @@ public class MultithreadedTracker { } private static int getMaxPoolSize() { - return org.dreeam.leaf.config.modules.async.MultithreadedTracker.autoResize ? Integer.MAX_VALUE : org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads; + return org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads; } private static long getKeepAliveTime() { - return org.dreeam.leaf.config.modules.async.MultithreadedTracker.autoResize ? 30L : org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerKeepalive; + return org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerKeepalive; } private static BlockingQueue getQueueImpl() { - if (org.dreeam.leaf.config.modules.async.MultithreadedTracker.autoResize) { - return new SynchronousQueue<>(); - } - - final int queueCapacity = org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads * (Math.max(org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads, 4)); + final int queueCapacity = org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerQueueSize; return new LinkedBlockingQueue<>(queueCapacity); } @@ -148,7 +145,7 @@ public class MultithreadedTracker { private static @NotNull ThreadFactory getThreadFactory() { return new ThreadFactoryBuilder() .setThreadFactory(MultithreadedTrackerThread::new) - .setNameFormat("Leaf Async Tracker Thread - %d") + .setNameFormat(THREAD_PREFIX + " Thread - %d") .setPriority(Thread.NORM_PRIORITY - 2) .build(); } diff --git a/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/AsyncPathfinding.java b/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/AsyncPathfinding.java index b45d738f..00d7b9f4 100644 --- a/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/AsyncPathfinding.java +++ b/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/AsyncPathfinding.java @@ -1,5 +1,6 @@ package org.dreeam.leaf.config.modules.async; +import org.dreeam.leaf.async.path.PathfindTaskRejectPolicy; import org.dreeam.leaf.config.ConfigModules; import org.dreeam.leaf.config.EnumConfigCategory; import org.dreeam.leaf.config.LeafConfig; @@ -13,12 +14,25 @@ public class AsyncPathfinding extends ConfigModules { public static boolean enabled = false; public static int asyncPathfindingMaxThreads = 0; public static int asyncPathfindingKeepalive = 60; + public static int asyncPathfindingQueueSize = 0; + public static PathfindTaskRejectPolicy asyncPathfindingRejectPolicy = PathfindTaskRejectPolicy.FLUSH_ALL; @Override public void onLoaded() { enabled = config.getBoolean(getBasePath() + ".enabled", enabled); asyncPathfindingMaxThreads = config.getInt(getBasePath() + ".max-threads", asyncPathfindingMaxThreads); asyncPathfindingKeepalive = config.getInt(getBasePath() + ".keepalive", asyncPathfindingKeepalive); + asyncPathfindingQueueSize = config.getInt(getBasePath() + ".queue-size", asyncPathfindingQueueSize); + asyncPathfindingRejectPolicy = PathfindTaskRejectPolicy.fromString(config.getString(getBasePath() + ".reject-policy", asyncPathfindingRejectPolicy.toString(), config.pickStringRegionBased( + """ + The policy to use when the queue is full and a new task is submitted. + FLUSH_ALL: All pending tasks will be run on server thread. + CALLER_RUNS: Newly submitted task will be run on server thread.""", + """ + 当队列满时, 新提交的任务将使用以下策略处理. + FLUSH_ALL: 所有等待中的任务都将在主线程上运行. + CALLER_RUNS: 新提交的任务将在主线程上运行.""" + ))); if (asyncPathfindingMaxThreads < 0) asyncPathfindingMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() + asyncPathfindingMaxThreads, 1); @@ -28,5 +42,8 @@ public class AsyncPathfinding extends ConfigModules { asyncPathfindingMaxThreads = 0; else LeafConfig.LOGGER.info("Using {} threads for Async Pathfinding", asyncPathfindingMaxThreads); + + if (asyncPathfindingQueueSize <= 0) + asyncPathfindingQueueSize = asyncPathfindingMaxThreads * Math.max(asyncPathfindingMaxThreads, 4); } } diff --git a/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/MultithreadedTracker.java b/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/MultithreadedTracker.java index f6d4249e..84d86499 100644 --- a/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/MultithreadedTracker.java +++ b/leaf-server/src/main/java/org/dreeam/leaf/config/modules/async/MultithreadedTracker.java @@ -12,9 +12,9 @@ public class MultithreadedTracker extends ConfigModules { public static boolean enabled = false; public static boolean compatModeEnabled = false; - public static boolean autoResize = false; public static int asyncEntityTrackerMaxThreads = 0; public static int asyncEntityTrackerKeepalive = 60; + public static int asyncEntityTrackerQueueSize = 0; @Override public void onLoaded() { @@ -33,16 +33,9 @@ public class MultithreadedTracker extends ConfigModules { """ 是否启用兼容模式, 如果你的服务器安装了 Citizens 或其他类似非发包 NPC 插件, 请开启此项.""")); - autoResize = config.getBoolean(getBasePath() + ".auto-resize", autoResize, config.pickStringRegionBased(""" - Auto adjust thread pool size based on server load, - This will tweak thread pool size dynamically, - overrides max-threads and keepalive.""", - """ - 根据服务器负载自动调整线程池大小, - 这会使线程池大小动态调整, - 覆盖设置 max-threads 和 keepalive.""")); asyncEntityTrackerMaxThreads = config.getInt(getBasePath() + ".max-threads", asyncEntityTrackerMaxThreads); asyncEntityTrackerKeepalive = config.getInt(getBasePath() + ".keepalive", asyncEntityTrackerKeepalive); + asyncEntityTrackerQueueSize = config.getInt(getBasePath() + ".queue-size", asyncEntityTrackerQueueSize); if (asyncEntityTrackerMaxThreads < 0) asyncEntityTrackerMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() + asyncEntityTrackerMaxThreads, 1); @@ -53,5 +46,8 @@ public class MultithreadedTracker extends ConfigModules { asyncEntityTrackerMaxThreads = 0; else LeafConfig.LOGGER.info("Using {} threads for Async Entity Tracker", asyncEntityTrackerMaxThreads); + + if (asyncEntityTrackerQueueSize <= 0) + asyncEntityTrackerQueueSize = asyncEntityTrackerMaxThreads * Math.max(asyncEntityTrackerMaxThreads, 4); } }