mirror of
https://github.com/Winds-Studio/Leaf.git
synced 2025-12-19 15:09:25 +00:00
Async task handling improvement (#209)
* All threads are core thread * Drop later submitted pathfinding tasks if task before is not started yet * Auto-resize is gone * Refine error handling * Handle rejected execution * Limit size and schedule on EntityScheduler * Allow pr to build * Remove duplicate path handling Since it's a very rare case and Kaiiju has already done something to handle this * Update thread and logger name format * Core pool to 1 * Revert entity scheduler changes * Expose queue size to config * Add reject policy config to pathfinding * [ci/skip] To uppercase * [ci/skip] Add co-authors --------- Co-authored-by: Taiyou06 <kaandindar21@gmail.com> Co-authored-by: Altiami <yoshimo.kristin@gmail.com>
This commit is contained in:
2
.github/workflows/build-1214.yml
vendored
2
.github/workflows/build-1214.yml
vendored
@@ -4,7 +4,7 @@ on:
|
|||||||
push:
|
push:
|
||||||
branches: [ "dev/1.21.4" ]
|
branches: [ "dev/1.21.4" ]
|
||||||
pull_request:
|
pull_request:
|
||||||
branches: [ "ver/1.21.4" ]
|
branches: [ "dev/1.21.4" ]
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
|
|||||||
@@ -10,6 +10,10 @@ Original project: https://github.com/KaiijuMC/Kaiiju
|
|||||||
Original license: GPLv3
|
Original license: GPLv3
|
||||||
Original project: https://github.com/Bloom-host/Petal
|
Original project: https://github.com/Bloom-host/Petal
|
||||||
|
|
||||||
|
Co-authored-by: HaHaWTH <102713261+HaHaWTH@users.noreply.github.com>
|
||||||
|
Co-authored-by: Taiyou06 <kaandindar21@gmail.com>
|
||||||
|
Co-authored-by: Altiami <yoshimo.kristin@gmail.com>
|
||||||
|
|
||||||
This patch was ported downstream from the Petal fork.
|
This patch was ported downstream from the Petal fork.
|
||||||
|
|
||||||
Makes most pathfinding-related work happen asynchronously
|
Makes most pathfinding-related work happen asynchronously
|
||||||
|
|||||||
@@ -1,13 +1,18 @@
|
|||||||
package org.dreeam.leaf.async.path;
|
package org.dreeam.leaf.async.path;
|
||||||
|
|
||||||
|
import com.destroystokyo.paper.util.SneakyThrow;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
import net.minecraft.server.MinecraftServer;
|
import net.minecraft.server.MinecraftServer;
|
||||||
import net.minecraft.world.level.pathfinder.Path;
|
import net.minecraft.world.level.pathfinder.Path;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
@@ -16,19 +21,59 @@ import java.util.function.Consumer;
|
|||||||
*/
|
*/
|
||||||
public class AsyncPathProcessor {
|
public class AsyncPathProcessor {
|
||||||
|
|
||||||
private static final Executor pathProcessingExecutor = new ThreadPoolExecutor(
|
private static final String THREAD_PREFIX = "Leaf Async Pathfinding";
|
||||||
|
private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX);
|
||||||
|
private static long lastWarnMillis = System.currentTimeMillis();
|
||||||
|
private static final ThreadPoolExecutor pathProcessingExecutor = new ThreadPoolExecutor(
|
||||||
1,
|
1,
|
||||||
org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingMaxThreads,
|
org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingMaxThreads,
|
||||||
org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingKeepalive, TimeUnit.SECONDS,
|
org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingKeepalive, TimeUnit.SECONDS,
|
||||||
new LinkedBlockingQueue<>(),
|
getQueueImpl(),
|
||||||
new ThreadFactoryBuilder()
|
new ThreadFactoryBuilder()
|
||||||
.setNameFormat("Leaf Async Pathfinding Thread - %d")
|
.setNameFormat(THREAD_PREFIX + " Thread - %d")
|
||||||
.setPriority(Thread.NORM_PRIORITY - 2)
|
.setPriority(Thread.NORM_PRIORITY - 2)
|
||||||
.build()
|
.build(),
|
||||||
|
new RejectedTaskHandler()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
private static class RejectedTaskHandler implements RejectedExecutionHandler {
|
||||||
|
@Override
|
||||||
|
public void rejectedExecution(Runnable rejectedTask, ThreadPoolExecutor executor) {
|
||||||
|
BlockingQueue<Runnable> workQueue = executor.getQueue();
|
||||||
|
if (!executor.isShutdown()) {
|
||||||
|
switch (org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingRejectPolicy) {
|
||||||
|
case FLUSH_ALL -> {
|
||||||
|
if (!workQueue.isEmpty()) {
|
||||||
|
List<Runnable> pendingTasks = new ArrayList<>(workQueue.size());
|
||||||
|
|
||||||
|
workQueue.drainTo(pendingTasks);
|
||||||
|
|
||||||
|
for (Runnable pendingTask : pendingTasks) {
|
||||||
|
pendingTask.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rejectedTask.run();
|
||||||
|
}
|
||||||
|
case CALLER_RUNS -> rejectedTask.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (System.currentTimeMillis() - lastWarnMillis > 30000L) {
|
||||||
|
LOGGER.warn("Async pathfinding processor is busy! Pathfinding tasks will be treated as policy defined in config. Increasing max-threads in Leaf config may help.");
|
||||||
|
lastWarnMillis = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected static CompletableFuture<Void> queue(@NotNull AsyncPath path) {
|
protected static CompletableFuture<Void> queue(@NotNull AsyncPath path) {
|
||||||
return CompletableFuture.runAsync(path::process, pathProcessingExecutor);
|
return CompletableFuture.runAsync(path::process, pathProcessingExecutor)
|
||||||
|
.orTimeout(60L, TimeUnit.SECONDS)
|
||||||
|
.exceptionally(throwable -> {
|
||||||
|
if (throwable instanceof TimeoutException e) {
|
||||||
|
LOGGER.warn("Async Pathfinding process timed out", e);
|
||||||
|
} else SneakyThrow.sneaky(throwable);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -48,4 +93,9 @@ public class AsyncPathProcessor {
|
|||||||
afterProcessing.accept(path);
|
afterProcessing.accept(path);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
private static BlockingQueue<Runnable> getQueueImpl() {
|
||||||
|
final int queueCapacity = org.dreeam.leaf.config.modules.async.AsyncPathfinding.asyncPathfindingQueueSize;
|
||||||
|
|
||||||
|
return new LinkedBlockingQueue<>(queueCapacity);
|
||||||
|
}}
|
||||||
|
|||||||
@@ -0,0 +1,19 @@
|
|||||||
|
package org.dreeam.leaf.async.path;
|
||||||
|
|
||||||
|
import org.dreeam.leaf.config.LeafConfig;
|
||||||
|
|
||||||
|
import java.util.Locale;
|
||||||
|
|
||||||
|
public enum PathfindTaskRejectPolicy {
|
||||||
|
FLUSH_ALL,
|
||||||
|
CALLER_RUNS;
|
||||||
|
|
||||||
|
public static PathfindTaskRejectPolicy fromString(String policy) {
|
||||||
|
try {
|
||||||
|
return PathfindTaskRejectPolicy.valueOf(policy.toUpperCase(Locale.ROOT));
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
LeafConfig.LOGGER.warn("Invalid pathfind task reject policy: {}, falling back to {}.", policy, FLUSH_ALL.toString());
|
||||||
|
return FLUSH_ALL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -20,7 +20,8 @@ import java.util.concurrent.*;
|
|||||||
|
|
||||||
public class MultithreadedTracker {
|
public class MultithreadedTracker {
|
||||||
|
|
||||||
private static final Logger LOGGER = LogManager.getLogger("MultithreadedTracker");
|
private static final String THREAD_PREFIX = "Leaf Async Tracker";
|
||||||
|
private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX);
|
||||||
private static long lastWarnMillis = System.currentTimeMillis();
|
private static long lastWarnMillis = System.currentTimeMillis();
|
||||||
private static final ThreadPoolExecutor trackerExecutor = new ThreadPoolExecutor(
|
private static final ThreadPoolExecutor trackerExecutor = new ThreadPoolExecutor(
|
||||||
getCorePoolSize(),
|
getCorePoolSize(),
|
||||||
@@ -128,19 +129,15 @@ public class MultithreadedTracker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static int getMaxPoolSize() {
|
private static int getMaxPoolSize() {
|
||||||
return org.dreeam.leaf.config.modules.async.MultithreadedTracker.autoResize ? Integer.MAX_VALUE : org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads;
|
return org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long getKeepAliveTime() {
|
private static long getKeepAliveTime() {
|
||||||
return org.dreeam.leaf.config.modules.async.MultithreadedTracker.autoResize ? 30L : org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerKeepalive;
|
return org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerKeepalive;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static BlockingQueue<Runnable> getQueueImpl() {
|
private static BlockingQueue<Runnable> getQueueImpl() {
|
||||||
if (org.dreeam.leaf.config.modules.async.MultithreadedTracker.autoResize) {
|
final int queueCapacity = org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerQueueSize;
|
||||||
return new SynchronousQueue<>();
|
|
||||||
}
|
|
||||||
|
|
||||||
final int queueCapacity = org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads * (Math.max(org.dreeam.leaf.config.modules.async.MultithreadedTracker.asyncEntityTrackerMaxThreads, 4));
|
|
||||||
|
|
||||||
return new LinkedBlockingQueue<>(queueCapacity);
|
return new LinkedBlockingQueue<>(queueCapacity);
|
||||||
}
|
}
|
||||||
@@ -148,7 +145,7 @@ public class MultithreadedTracker {
|
|||||||
private static @NotNull ThreadFactory getThreadFactory() {
|
private static @NotNull ThreadFactory getThreadFactory() {
|
||||||
return new ThreadFactoryBuilder()
|
return new ThreadFactoryBuilder()
|
||||||
.setThreadFactory(MultithreadedTrackerThread::new)
|
.setThreadFactory(MultithreadedTrackerThread::new)
|
||||||
.setNameFormat("Leaf Async Tracker Thread - %d")
|
.setNameFormat(THREAD_PREFIX + " Thread - %d")
|
||||||
.setPriority(Thread.NORM_PRIORITY - 2)
|
.setPriority(Thread.NORM_PRIORITY - 2)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package org.dreeam.leaf.config.modules.async;
|
package org.dreeam.leaf.config.modules.async;
|
||||||
|
|
||||||
|
import org.dreeam.leaf.async.path.PathfindTaskRejectPolicy;
|
||||||
import org.dreeam.leaf.config.ConfigModules;
|
import org.dreeam.leaf.config.ConfigModules;
|
||||||
import org.dreeam.leaf.config.EnumConfigCategory;
|
import org.dreeam.leaf.config.EnumConfigCategory;
|
||||||
import org.dreeam.leaf.config.LeafConfig;
|
import org.dreeam.leaf.config.LeafConfig;
|
||||||
@@ -13,12 +14,25 @@ public class AsyncPathfinding extends ConfigModules {
|
|||||||
public static boolean enabled = false;
|
public static boolean enabled = false;
|
||||||
public static int asyncPathfindingMaxThreads = 0;
|
public static int asyncPathfindingMaxThreads = 0;
|
||||||
public static int asyncPathfindingKeepalive = 60;
|
public static int asyncPathfindingKeepalive = 60;
|
||||||
|
public static int asyncPathfindingQueueSize = 0;
|
||||||
|
public static PathfindTaskRejectPolicy asyncPathfindingRejectPolicy = PathfindTaskRejectPolicy.FLUSH_ALL;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onLoaded() {
|
public void onLoaded() {
|
||||||
enabled = config.getBoolean(getBasePath() + ".enabled", enabled);
|
enabled = config.getBoolean(getBasePath() + ".enabled", enabled);
|
||||||
asyncPathfindingMaxThreads = config.getInt(getBasePath() + ".max-threads", asyncPathfindingMaxThreads);
|
asyncPathfindingMaxThreads = config.getInt(getBasePath() + ".max-threads", asyncPathfindingMaxThreads);
|
||||||
asyncPathfindingKeepalive = config.getInt(getBasePath() + ".keepalive", asyncPathfindingKeepalive);
|
asyncPathfindingKeepalive = config.getInt(getBasePath() + ".keepalive", asyncPathfindingKeepalive);
|
||||||
|
asyncPathfindingQueueSize = config.getInt(getBasePath() + ".queue-size", asyncPathfindingQueueSize);
|
||||||
|
asyncPathfindingRejectPolicy = PathfindTaskRejectPolicy.fromString(config.getString(getBasePath() + ".reject-policy", asyncPathfindingRejectPolicy.toString(), config.pickStringRegionBased(
|
||||||
|
"""
|
||||||
|
The policy to use when the queue is full and a new task is submitted.
|
||||||
|
FLUSH_ALL: All pending tasks will be run on server thread.
|
||||||
|
CALLER_RUNS: Newly submitted task will be run on server thread.""",
|
||||||
|
"""
|
||||||
|
当队列满时, 新提交的任务将使用以下策略处理.
|
||||||
|
FLUSH_ALL: 所有等待中的任务都将在主线程上运行.
|
||||||
|
CALLER_RUNS: 新提交的任务将在主线程上运行."""
|
||||||
|
)));
|
||||||
|
|
||||||
if (asyncPathfindingMaxThreads < 0)
|
if (asyncPathfindingMaxThreads < 0)
|
||||||
asyncPathfindingMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() + asyncPathfindingMaxThreads, 1);
|
asyncPathfindingMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() + asyncPathfindingMaxThreads, 1);
|
||||||
@@ -28,5 +42,8 @@ public class AsyncPathfinding extends ConfigModules {
|
|||||||
asyncPathfindingMaxThreads = 0;
|
asyncPathfindingMaxThreads = 0;
|
||||||
else
|
else
|
||||||
LeafConfig.LOGGER.info("Using {} threads for Async Pathfinding", asyncPathfindingMaxThreads);
|
LeafConfig.LOGGER.info("Using {} threads for Async Pathfinding", asyncPathfindingMaxThreads);
|
||||||
|
|
||||||
|
if (asyncPathfindingQueueSize <= 0)
|
||||||
|
asyncPathfindingQueueSize = asyncPathfindingMaxThreads * Math.max(asyncPathfindingMaxThreads, 4);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,9 +12,9 @@ public class MultithreadedTracker extends ConfigModules {
|
|||||||
|
|
||||||
public static boolean enabled = false;
|
public static boolean enabled = false;
|
||||||
public static boolean compatModeEnabled = false;
|
public static boolean compatModeEnabled = false;
|
||||||
public static boolean autoResize = false;
|
|
||||||
public static int asyncEntityTrackerMaxThreads = 0;
|
public static int asyncEntityTrackerMaxThreads = 0;
|
||||||
public static int asyncEntityTrackerKeepalive = 60;
|
public static int asyncEntityTrackerKeepalive = 60;
|
||||||
|
public static int asyncEntityTrackerQueueSize = 0;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onLoaded() {
|
public void onLoaded() {
|
||||||
@@ -33,16 +33,9 @@ public class MultithreadedTracker extends ConfigModules {
|
|||||||
"""
|
"""
|
||||||
是否启用兼容模式,
|
是否启用兼容模式,
|
||||||
如果你的服务器安装了 Citizens 或其他类似非发包 NPC 插件, 请开启此项."""));
|
如果你的服务器安装了 Citizens 或其他类似非发包 NPC 插件, 请开启此项."""));
|
||||||
autoResize = config.getBoolean(getBasePath() + ".auto-resize", autoResize, config.pickStringRegionBased("""
|
|
||||||
Auto adjust thread pool size based on server load,
|
|
||||||
This will tweak thread pool size dynamically,
|
|
||||||
overrides max-threads and keepalive.""",
|
|
||||||
"""
|
|
||||||
根据服务器负载自动调整线程池大小,
|
|
||||||
这会使线程池大小动态调整,
|
|
||||||
覆盖设置 max-threads 和 keepalive."""));
|
|
||||||
asyncEntityTrackerMaxThreads = config.getInt(getBasePath() + ".max-threads", asyncEntityTrackerMaxThreads);
|
asyncEntityTrackerMaxThreads = config.getInt(getBasePath() + ".max-threads", asyncEntityTrackerMaxThreads);
|
||||||
asyncEntityTrackerKeepalive = config.getInt(getBasePath() + ".keepalive", asyncEntityTrackerKeepalive);
|
asyncEntityTrackerKeepalive = config.getInt(getBasePath() + ".keepalive", asyncEntityTrackerKeepalive);
|
||||||
|
asyncEntityTrackerQueueSize = config.getInt(getBasePath() + ".queue-size", asyncEntityTrackerQueueSize);
|
||||||
|
|
||||||
if (asyncEntityTrackerMaxThreads < 0)
|
if (asyncEntityTrackerMaxThreads < 0)
|
||||||
asyncEntityTrackerMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() + asyncEntityTrackerMaxThreads, 1);
|
asyncEntityTrackerMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() + asyncEntityTrackerMaxThreads, 1);
|
||||||
@@ -53,5 +46,8 @@ public class MultithreadedTracker extends ConfigModules {
|
|||||||
asyncEntityTrackerMaxThreads = 0;
|
asyncEntityTrackerMaxThreads = 0;
|
||||||
else
|
else
|
||||||
LeafConfig.LOGGER.info("Using {} threads for Async Entity Tracker", asyncEntityTrackerMaxThreads);
|
LeafConfig.LOGGER.info("Using {} threads for Async Entity Tracker", asyncEntityTrackerMaxThreads);
|
||||||
|
|
||||||
|
if (asyncEntityTrackerQueueSize <= 0)
|
||||||
|
asyncEntityTrackerQueueSize = asyncEntityTrackerMaxThreads * Math.max(asyncEntityTrackerMaxThreads, 4);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user