9
0
mirror of https://github.com/BX-Team/DivineMC.git synced 2025-12-22 08:19:19 +00:00

update Async Pathfinding and Multithreaded Tracker

This commit is contained in:
NONPLAYT
2025-02-24 15:06:22 +03:00
parent 7c4d0ebc0d
commit 1b9c969f1c
9 changed files with 540 additions and 220 deletions

View File

@@ -5,6 +5,7 @@ import org.apache.logging.log4j.Logger;
import org.bukkit.Bukkit;
import org.bukkit.configuration.ConfigurationSection;
import org.bukkit.configuration.MemoryConfiguration;
import org.bxteam.divinemc.entity.pathfinding.PathfindTaskRejectPolicy;
import org.bxteam.divinemc.server.chunk.ChunkSystemAlgorithms;
import org.jetbrains.annotations.Nullable;
import org.simpleyaml.configuration.comments.CommentType;
@@ -316,33 +317,52 @@ public class DivineConfig {
public static boolean asyncPathfinding = true;
public static int asyncPathfindingMaxThreads = 2;
public static int asyncPathfindingKeepalive = 60;
public static int asyncPathfindingQueueSize = 0;
public static PathfindTaskRejectPolicy asyncPathfindingRejectPolicy = PathfindTaskRejectPolicy.FLUSH_ALL;
private static void asyncPathfinding() {
asyncPathfinding = getBoolean("settings.async-pathfinding.enable", asyncPathfinding);
asyncPathfindingMaxThreads = getInt("settings.async-pathfinding.max-threads", asyncPathfindingMaxThreads);
asyncPathfindingKeepalive = getInt("settings.async-pathfinding.keepalive", asyncPathfindingKeepalive);
asyncPathfindingQueueSize = getInt("settings.async-pathfinding.queue-size", asyncPathfindingQueueSize);
final int maxThreads = Runtime.getRuntime().availableProcessors();
if (asyncPathfindingMaxThreads < 0) {
asyncPathfindingMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() + asyncPathfindingMaxThreads, 1);
asyncPathfindingMaxThreads = Math.max(maxThreads + asyncPathfindingMaxThreads, 1);
} else if (asyncPathfindingMaxThreads == 0) {
asyncPathfindingMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() / 4, 1);
asyncPathfindingMaxThreads = Math.max(maxThreads / 4, 1);
}
if (!asyncPathfinding) {
asyncPathfindingMaxThreads = 0;
} else {
Bukkit.getLogger().log(Level.INFO, "Using " + asyncPathfindingMaxThreads + " threads for Async Pathfinding");
LOGGER.info("Using " + asyncPathfindingMaxThreads + " threads for Async Pathfinding");
}
if (asyncPathfindingQueueSize <= 0) asyncPathfindingQueueSize = asyncPathfindingMaxThreads * 256;
asyncPathfindingRejectPolicy = PathfindTaskRejectPolicy.fromString(getString("settings.async-pathfinding.reject-policy", maxThreads >= 12 && asyncPathfindingQueueSize < 512 ? PathfindTaskRejectPolicy.FLUSH_ALL.toString() : PathfindTaskRejectPolicy.CALLER_RUNS.toString(),
"The policy to use when the queue is full and a new task is submitted.",
"FLUSH_ALL: All pending tasks will be run on server thread.",
"CALLER_RUNS: Newly submitted task will be run on server thread."));
}
public static boolean multithreadedEnabled = true;
public static boolean multithreadedCompatModeEnabled = false;
public static int asyncEntityTrackerMaxThreads = 1;
public static int asyncEntityTrackerKeepalive = 60;
public static int asyncEntityTrackerQueueSize = 0;
private static void multithreadedTracker() {
multithreadedEnabled = getBoolean("settings.multithreaded-tracker.enable", multithreadedEnabled);
multithreadedCompatModeEnabled = getBoolean("settings.multithreaded-tracker.compat-mode", multithreadedCompatModeEnabled);
multithreadedEnabled = getBoolean("settings.multithreaded-tracker.enable", multithreadedEnabled,
"Make entity tracking saving asynchronously, can improve performance significantly,",
"especially in some massive entities in small area situations.");
multithreadedCompatModeEnabled = getBoolean("settings.multithreaded-tracker.compat-mode", multithreadedCompatModeEnabled,
"Enable compat mode ONLY if Citizens or NPC plugins using real entity has installed.",
"Compat mode fixes visible issues with player type NPCs of Citizens.",
"But we recommend to use packet based / virtual entity NPC plugin, e.g. ZNPC Plus, Adyeshach, Fancy NPC and etc.");
asyncEntityTrackerMaxThreads = getInt("settings.multithreaded-tracker.max-threads", asyncEntityTrackerMaxThreads);
asyncEntityTrackerKeepalive = getInt("settings.multithreaded-tracker.keepalive", asyncEntityTrackerKeepalive);
asyncEntityTrackerQueueSize = getInt("settings.multithreaded-tracker.queue-size", asyncEntityTrackerQueueSize);
if (asyncEntityTrackerMaxThreads < 0) {
asyncEntityTrackerMaxThreads = Math.max(Runtime.getRuntime().availableProcessors() + asyncEntityTrackerMaxThreads, 1);
@@ -353,7 +373,9 @@ public class DivineConfig {
if (!multithreadedEnabled) {
asyncEntityTrackerMaxThreads = 0;
} else {
Bukkit.getLogger().log(Level.INFO, "Using " + asyncEntityTrackerMaxThreads + " threads for Async Entity Tracker");
LOGGER.info("Using " + asyncEntityTrackerMaxThreads + " threads for Async Entity Tracker");
}
if (asyncEntityTrackerQueueSize <= 0) asyncEntityTrackerQueueSize = asyncEntityTrackerMaxThreads * 384;
}
}

View File

@@ -3,29 +3,76 @@ package org.bxteam.divinemc.entity.pathfinding;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import net.minecraft.server.MinecraftServer;
import net.minecraft.world.level.pathfinder.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bxteam.divinemc.DivineConfig;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
/**
* used to handle the scheduling of async path processing
* Used to handle the scheduling of async path processing
*/
@SuppressWarnings("DuplicatedCode")
public class AsyncPathProcessor {
private static final Executor pathProcessingExecutor = new ThreadPoolExecutor(
private static final String THREAD_PREFIX = "DivineMC Async Pathfinding";
private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX);
private static long lastWarnMillis = System.currentTimeMillis();
private static final ThreadPoolExecutor pathProcessingExecutor = new ThreadPoolExecutor(
1,
org.bxteam.divinemc.DivineConfig.asyncPathfindingMaxThreads,
org.bxteam.divinemc.DivineConfig.asyncPathfindingKeepalive, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
DivineConfig.asyncPathfindingMaxThreads,
DivineConfig.asyncPathfindingKeepalive, TimeUnit.SECONDS,
getQueueImpl(),
new ThreadFactoryBuilder()
.setNameFormat("DivineMC Async Pathfinding Thread - %d")
.setNameFormat(THREAD_PREFIX + " Thread - %d")
.setPriority(Thread.NORM_PRIORITY - 2)
.build()
.build(),
new RejectedTaskHandler()
);
private static class RejectedTaskHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable rejectedTask, ThreadPoolExecutor executor) {
BlockingQueue<Runnable> workQueue = executor.getQueue();
if (!executor.isShutdown()) {
switch (DivineConfig.asyncPathfindingRejectPolicy) {
case FLUSH_ALL -> {
if (!workQueue.isEmpty()) {
List<Runnable> pendingTasks = new ArrayList<>(workQueue.size());
workQueue.drainTo(pendingTasks);
for (Runnable pendingTask : pendingTasks) {
pendingTask.run();
}
}
rejectedTask.run();
}
case CALLER_RUNS -> rejectedTask.run();
}
}
if (System.currentTimeMillis() - lastWarnMillis > 30000L) {
LOGGER.warn("Async pathfinding processor is busy! Pathfinding tasks will be treated as policy defined in config. Increasing max-threads in DivineMC config may help.");
lastWarnMillis = System.currentTimeMillis();
}
}
}
protected static CompletableFuture<Void> queue(@NotNull AsyncPath path) {
return CompletableFuture.runAsync(path::process, pathProcessingExecutor);
return CompletableFuture.runAsync(path::process, pathProcessingExecutor)
.orTimeout(60L, TimeUnit.SECONDS)
.exceptionally(throwable -> {
if (throwable instanceof TimeoutException e) {
LOGGER.warn("Async Pathfinding process timed out", e);
} else LOGGER.warn("Error occurred while processing async path", throwable);
return null;
});
}
/**
@@ -45,4 +92,10 @@ public class AsyncPathProcessor {
afterProcessing.accept(path);
}
}
private static BlockingQueue<Runnable> getQueueImpl() {
final int queueCapacity = DivineConfig.asyncPathfindingQueueSize;
return new LinkedBlockingQueue<>(queueCapacity);
}
}

View File

@@ -0,0 +1,18 @@
package org.bxteam.divinemc.entity.pathfinding;
import org.bxteam.divinemc.DivineConfig;
import java.util.Locale;
public enum PathfindTaskRejectPolicy {
FLUSH_ALL,
CALLER_RUNS;
public static PathfindTaskRejectPolicy fromString(String policy) {
try {
return PathfindTaskRejectPolicy.valueOf(policy.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
DivineConfig.LOGGER.warn("Invalid pathfind task reject policy: {}, falling back to {}.", policy, FLUSH_ALL.toString());
return FLUSH_ALL;
}
}
}

View File

@@ -5,7 +5,6 @@ import ca.spottedleaf.moonrise.common.misc.NearbyPlayers;
import ca.spottedleaf.moonrise.patches.chunk_system.level.ChunkSystemServerLevel;
import ca.spottedleaf.moonrise.patches.chunk_system.level.entity.server.ServerEntityLookup;
import ca.spottedleaf.moonrise.patches.entity_tracker.EntityTrackerEntity;
import ca.spottedleaf.moonrise.patches.entity_tracker.EntityTrackerTrackedEntity;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import net.minecraft.server.level.ChunkMap;
import net.minecraft.server.level.FullChunkStatus;
@@ -13,41 +12,32 @@ import net.minecraft.server.level.ServerLevel;
import net.minecraft.world.entity.Entity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bxteam.divinemc.DivineConfig;
import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MultithreadedTracker {
private static final Logger LOGGER = LogManager.getLogger("MultithreadedTracker");
private static final String THREAD_PREFIX = "DivineMC Async Tracker";
private static final Logger LOGGER = LogManager.getLogger(THREAD_PREFIX);
public static class MultithreadedTrackerThread extends Thread {
@Override
public void run() {
super.run();
}
}
private static final Executor trackerExecutor = new ThreadPoolExecutor(
1,
org.bxteam.divinemc.DivineConfig.asyncEntityTrackerMaxThreads,
org.bxteam.divinemc.DivineConfig.asyncEntityTrackerKeepalive, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder()
.setThreadFactory(
r -> new MultithreadedTrackerThread() {
@Override
public void run() {
r.run();
}
}
)
.setNameFormat("DivineMC Async Tracker Thread - %d")
.setPriority(Thread.NORM_PRIORITY - 2)
.build());
private MultithreadedTracker() { }
private static long lastWarnMillis = System.currentTimeMillis();
private static final ThreadPoolExecutor trackerExecutor = new ThreadPoolExecutor(
getCorePoolSize(),
getMaxPoolSize(),
getKeepAliveTime(), TimeUnit.SECONDS,
getQueueImpl(),
getThreadFactory(),
getRejectedPolicy()
);
public static Executor getTrackerExecutor() {
return trackerExecutor;
@@ -55,7 +45,7 @@ public class MultithreadedTracker {
public static void tick(ChunkSystemServerLevel level) {
try {
if (!org.bxteam.divinemc.DivineConfig.multithreadedCompatModeEnabled) {
if (!DivineConfig.multithreadedCompatModeEnabled) {
tickAsync(level);
} else {
tickAsyncWithCompatMode(level);
@@ -81,7 +71,7 @@ public class MultithreadedTracker {
if (tracker == null) continue;
((EntityTrackerTrackedEntity) tracker).moonrise$tick(nearbyPlayers.getChunk(entity.chunkPosition()));
tracker.moonrise$tick(nearbyPlayers.getChunk(entity.chunkPosition()));
tracker.serverEntity.sendChanges();
}
});
@@ -103,7 +93,7 @@ public class MultithreadedTracker {
if (tracker == null) continue;
((EntityTrackerTrackedEntity) tracker).moonrise$tick(nearbyPlayers.getChunk(entity.chunkPosition()));
tracker.moonrise$tick(nearbyPlayers.getChunk(entity.chunkPosition()));
sendChangesTasks[index++] = () -> tracker.serverEntity.sendChanges(); // Collect send changes to task array
}
@@ -138,4 +128,61 @@ public class MultithreadedTracker {
}
}
}
private static int getCorePoolSize() {
return 1;
}
private static int getMaxPoolSize() {
return DivineConfig.asyncEntityTrackerMaxThreads;
}
private static long getKeepAliveTime() {
return DivineConfig.asyncEntityTrackerKeepalive;
}
private static BlockingQueue<Runnable> getQueueImpl() {
final int queueCapacity = DivineConfig.asyncEntityTrackerQueueSize;
return new LinkedBlockingQueue<>(queueCapacity);
}
private static @NotNull ThreadFactory getThreadFactory() {
return new ThreadFactoryBuilder()
.setThreadFactory(MultithreadedTrackerThread::new)
.setNameFormat(THREAD_PREFIX + " Thread - %d")
.setPriority(Thread.NORM_PRIORITY - 2)
.build();
}
private static @NotNull RejectedExecutionHandler getRejectedPolicy() {
return (rejectedTask, executor) -> {
BlockingQueue<Runnable> workQueue = executor.getQueue();
if (!executor.isShutdown()) {
if (!workQueue.isEmpty()) {
List<Runnable> pendingTasks = new ArrayList<>(workQueue.size());
workQueue.drainTo(pendingTasks);
for (Runnable pendingTask : pendingTasks) {
pendingTask.run();
}
}
rejectedTask.run();
}
if (System.currentTimeMillis() - lastWarnMillis > 30000L) {
LOGGER.warn("Async entity tracker is busy! Tracking tasks will be done in the server thread. Increasing max-threads in DivineMC config may help.");
lastWarnMillis = System.currentTimeMillis();
}
};
}
public static class MultithreadedTrackerThread extends Thread {
public MultithreadedTrackerThread(Runnable runnable) {
super(runnable);
}
}
}

View File

@@ -0,0 +1,213 @@
package org.bxteam.divinemc.util.map;
import it.unimi.dsi.fastutil.longs.LongCollection;
import it.unimi.dsi.fastutil.longs.LongIterator;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
import org.jetbrains.annotations.NotNull;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* A thread-safe implementation of {@link LongOpenHashSet} using ConcurrentHashMap.KeySetView as backing storage.
* This implementation provides concurrent access and high performance for concurrent operations.
*
* @author HaHaWTH at Leaf
*/
@SuppressWarnings({"unused", "deprecation"})
public final class ConcurrentLongHashSet extends LongOpenHashSet implements LongSet {
private final ConcurrentHashMap.KeySetView<Long, Boolean> backing;
/**
* Creates a new empty concurrent long set.
*/
public ConcurrentLongHashSet() {
this.backing = ConcurrentHashMap.newKeySet();
}
@Override
public int size() {
return backing.size();
}
@Override
public boolean isEmpty() {
return backing.isEmpty();
}
@Override
public @NotNull LongIterator iterator() {
return new WrappingLongIterator(backing.iterator());
}
@NotNull
@Override
public Object @NotNull [] toArray() {
return backing.toArray();
}
@NotNull
@Override
public <T> T @NotNull [] toArray(@NotNull T @NotNull [] array) {
Objects.requireNonNull(array, "Array cannot be null");
return backing.toArray(array);
}
@Override
public boolean containsAll(@NotNull Collection<?> collection) {
Objects.requireNonNull(collection, "Collection cannot be null");
return backing.containsAll(collection);
}
@Override
public boolean addAll(@NotNull Collection<? extends Long> collection) {
Objects.requireNonNull(collection, "Collection cannot be null");
return backing.addAll(collection);
}
@Override
public boolean removeAll(@NotNull Collection<?> collection) {
Objects.requireNonNull(collection, "Collection cannot be null");
return backing.removeAll(collection);
}
@Override
public boolean retainAll(@NotNull Collection<?> collection) {
Objects.requireNonNull(collection, "Collection cannot be null");
return backing.retainAll(collection);
}
@Override
public void clear() {
backing.clear();
}
@Override
public boolean add(long key) {
return backing.add(key);
}
@Override
public boolean contains(long key) {
return backing.contains(key);
}
@Override
public long[] toLongArray() {
int size = backing.size();
long[] result = new long[size];
int i = 0;
for (Long value : backing) {
result[i++] = value;
}
return result;
}
@Override
public long[] toArray(long[] array) {
Objects.requireNonNull(array, "Array cannot be null");
long[] result = toLongArray();
if (array.length < result.length) {
return result;
}
System.arraycopy(result, 0, array, 0, result.length);
if (array.length > result.length) {
array[result.length] = 0;
}
return array;
}
@Override
public boolean addAll(LongCollection c) {
Objects.requireNonNull(c, "Collection cannot be null");
boolean modified = false;
LongIterator iterator = c.iterator();
while (iterator.hasNext()) {
modified |= add(iterator.nextLong());
}
return modified;
}
@Override
public boolean containsAll(LongCollection c) {
Objects.requireNonNull(c, "Collection cannot be null");
LongIterator iterator = c.iterator();
while (iterator.hasNext()) {
if (!contains(iterator.nextLong())) {
return false;
}
}
return true;
}
@Override
public boolean removeAll(LongCollection c) {
Objects.requireNonNull(c, "Collection cannot be null");
boolean modified = false;
LongIterator iterator = c.iterator();
while (iterator.hasNext()) {
modified |= remove(iterator.nextLong());
}
return modified;
}
@Override
public boolean retainAll(LongCollection c) {
Objects.requireNonNull(c, "Collection cannot be null");
return backing.retainAll(c);
}
@Override
public boolean remove(long k) {
return backing.remove(k);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof LongSet that)) return false;
if (size() != that.size()) return false;
return containsAll(that);
}
@Override
public int hashCode() {
return backing.hashCode();
}
@Override
public String toString() {
return backing.toString();
}
static class WrappingLongIterator implements LongIterator {
private final Iterator<Long> backing;
WrappingLongIterator(Iterator<Long> backing) {
this.backing = Objects.requireNonNull(backing);
}
@Override
public boolean hasNext() {
return backing.hasNext();
}
@Override
public long nextLong() {
return backing.next();
}
@Override
public Long next() {
return backing.next();
}
@Override
public void remove() {
backing.remove();
}
}
}