From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: wangxyper Date: Sun, 15 Jan 2023 09:56:15 +0800 Subject: [PATCH] Hearse: Hearse base codes Original license: MIT Original project: https://github.com/Era4FunMC/Hearse diff --git a/src/main/java/co/earthme/hearse/Hearse.java b/src/main/java/co/earthme/hearse/Hearse.java new file mode 100644 index 0000000000000000000000000000000000000000..692fef51b2f15dd1ddc28773a381b9da3b42725e --- /dev/null +++ b/src/main/java/co/earthme/hearse/Hearse.java @@ -0,0 +1,27 @@ +package co.earthme.hearse; + +import co.earthme.hearse.commands.EntityCountCommand; +import co.earthme.hearse.commands.WorkerCommand; +import co.earthme.hearse.server.ServerEntityTickHook; +import co.earthme.hearse.workers.WorkerThreadPoolManager; +import net.minecraft.server.MinecraftServer; + +public class Hearse { + private static final WorkerThreadPoolManager workerManager = new WorkerThreadPoolManager(); + + public static void initAll(){ + HearseConfig.init(); + ServerEntityTickHook.init(); + MinecraftServer.getServer().server.getCommandMap().register("workers","hearse",new WorkerCommand()); + MinecraftServer.getServer().server.getCommandMap().register("entitycount","hearse",new EntityCountCommand()); + } + + public static void onServerStop(){ + HearseConfig.save(); + workerManager.shutdownAllNow(); + } + + public static WorkerThreadPoolManager getWorkerManager() { + return workerManager; + } +} diff --git a/src/main/java/co/earthme/hearse/HearseConfig.java b/src/main/java/co/earthme/hearse/HearseConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..73b5e76660b5162a7a0b327ddc7dcc3295b86699 --- /dev/null +++ b/src/main/java/co/earthme/hearse/HearseConfig.java @@ -0,0 +1,49 @@ +package co.earthme.hearse; + +import org.bukkit.configuration.InvalidConfigurationException; +import org.bukkit.configuration.file.YamlConfiguration; +import java.io.File; +import java.io.IOException; + +public class HearseConfig { + private static final YamlConfiguration configEntry = new YamlConfiguration(); + private static final File CONFIG_FILE = new File("hearse.yml"); + + public static void init(){ + try { + configEntry.load(CONFIG_FILE); + }catch (IOException ignored){ + } catch (InvalidConfigurationException e) { + e.printStackTrace(); + } + configEntry.options().copyDefaults(true); + } + + public static void save(){ + try { + configEntry.save(CONFIG_FILE); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static int getInt(String key,int def){ + configEntry.addDefault(key,def); + return configEntry.getInt(key); + } + + public static long getLong(String key,int def){ + configEntry.addDefault(key,def); + return configEntry.getLong(key); + } + + public static String getString(String key,String def){ + configEntry.addDefault(key,def); + return configEntry.getString(key); + } + + public static boolean getBoolean(String key,boolean def){ + configEntry.addDefault(key,def); + return configEntry.getBoolean(key); + } +} diff --git a/src/main/java/co/earthme/hearse/commands/EntityCountCommand.java b/src/main/java/co/earthme/hearse/commands/EntityCountCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..65f23896888ab5fcbc8f62f5048d1fdc1a677bf1 --- /dev/null +++ b/src/main/java/co/earthme/hearse/commands/EntityCountCommand.java @@ -0,0 +1,36 @@ +package co.earthme.hearse.commands; + +import com.google.common.collect.Maps; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.level.ServerLevel; +import net.minecraft.world.entity.Entity; +import org.bukkit.ChatColor; +import org.bukkit.command.Command; +import org.bukkit.command.CommandSender; +import org.jetbrains.annotations.NotNull; +import java.util.Map; + +public class EntityCountCommand extends Command { + public EntityCountCommand() { + super("entitycount"); + } + + @Override + public boolean execute(@NotNull CommandSender sender, @NotNull String commandLabel, @NotNull String[] args) { + final Map counts = Maps.newHashMap(); + for (ServerLevel level : MinecraftServer.getServer().getAllLevels()){ + for (Entity entity : level.entityTickList.entities){ + final String name = entity.getType().getCategory().getName(); + if (!counts.containsKey(name)){ + counts.put(name,0); + } + counts.replace(name,counts.get(name)+1); + } + } + sender.sendMessage("Exists entity Counts:"); + for (Map.Entry entry : counts.entrySet()){ + sender.sendMessage(ChatColor.BLUE+String.format("%s:%s",entry.getKey(),entry.getValue())); + } + return true; + } +} diff --git a/src/main/java/co/earthme/hearse/commands/WorkerCommand.java b/src/main/java/co/earthme/hearse/commands/WorkerCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..1a4a6869a7278beadd97af006f4b5fae578b83ed --- /dev/null +++ b/src/main/java/co/earthme/hearse/commands/WorkerCommand.java @@ -0,0 +1,72 @@ +package co.earthme.hearse.commands; + +import co.earthme.hearse.Hearse; +import co.earthme.hearse.concurrent.WorkerThreadPoolExecutor; +import org.bukkit.ChatColor; +import org.bukkit.command.Command; +import org.bukkit.command.CommandSender; +import org.jetbrains.annotations.NotNull; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class WorkerCommand extends Command { + public WorkerCommand() { + super("workers"); + this.setPermission("hearse.commands.workers"); + this.setDescription("You can see or edit the server workers by using this command"); + this.setUsage("/workers "); + } + + @Override + public @NotNull List tabComplete(@NotNull CommandSender sender, @NotNull String alias, @NotNull String[] args) throws IllegalArgumentException { + final List ret = new ArrayList<>(); + if (args.length == 1){ + ret.add("status"); + ret.add("setThreadCount"); + ret.add("forceStop"); + } + if (args.length == 2){ + for (Map.Entry entry : Hearse.getWorkerManager().getManagedWorkers().entrySet()){ + ret.add(entry.getKey()); + } + } + return ret; + } + + @Override + public boolean execute(@NotNull CommandSender sender, @NotNull String commandLabel, @NotNull String[] args) { + if (args.length >= 2){ + final String action = args[0]; + final String workerName = args[1]; + final WorkerThreadPoolExecutor searchedWorker = Hearse.getWorkerManager().getTargetWorker(workerName); + if (searchedWorker == null){ + sender.sendMessage(ChatColor.RED+"Target worker not found!"); + return true; + } + switch (action){ + case "status": + sender.sendMessage(ChatColor.GREEN+"Worker: "+workerName+" Status:"+ searchedWorker); + break; + case "setThreadCount": + if (args.length == 3){ + try { + searchedWorker.setCorePoolSize(Integer.parseInt(args[2])); + sender.sendMessage(ChatColor.GREEN+"Finished!"); + }catch (NumberFormatException e){ + sender.sendMessage(ChatColor.RED+"Please supply an integer!"); + } + }else{ + sender.sendMessage(ChatColor.RED+"Please supply an integer!"); + } + break; + case "forceStop": + searchedWorker.shutdownNow(); + sender.sendMessage(ChatColor.YELLOW+"Worker "+workerName+" has been stopped!"); + break; + } + return true; + } + return false; + } +} diff --git a/src/main/java/co/earthme/hearse/concurrent/WorkStealingThreadPool.java b/src/main/java/co/earthme/hearse/concurrent/WorkStealingThreadPool.java new file mode 100644 index 0000000000000000000000000000000000000000..42d63f105e35e622517378f07aa75e1951d620c5 --- /dev/null +++ b/src/main/java/co/earthme/hearse/concurrent/WorkStealingThreadPool.java @@ -0,0 +1,244 @@ +package co.earthme.hearse.concurrent; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +public class WorkStealingThreadPool{ + private final Executor packaged = new PackagedExecutor(this); + private final List workers = new ArrayList<>(); + private final StampedLock workersLock = new StampedLock(); + private int lastPostedPos = 0; + private final WorkerThreadFactory factory; + + public WorkStealingThreadPool(int nThreads, WorkerThreadFactory factory){ + this.factory = factory; + this.runWorkers(nThreads); + } + + private void sendStopToAll(){ + final long stamp = this.workersLock.readLock(); + try { + for (WorkerThread workerThread : this.workers){ + workerThread.sendStop(); + } + }finally { + this.workersLock.unlockRead(stamp); + } + } + + @NotNull + public Executor getPackaged() { + return this.packaged; + } + + private void runWorkers(int threadCount){ + for (int i = 0; i < threadCount; i++) { + final WorkerThread workerThread = new WorkerThread(); + final long stamp = this.workersLock.writeLock(); + try { + this.workers.add(workerThread); + }finally { + this.workersLock.unlockWrite(stamp); + } + workerThread.start(); + } + } + + public void awaitTasks(){ + final long stamp = this.workersLock.readLock(); + try { + for (WorkerThread workerThread : this.workers){ + workerThread.awaitTasks(); + } + }finally { + this.workersLock.unlockRead(stamp); + } + } + + public void postTask(@NotNull Runnable task){ + long stamp = this.workersLock.tryOptimisticRead(); + if (this.workersLock.validate(stamp)){ + int workerIndex; + synchronized (this.workers){ + if (this.lastPostedPos+1 >= this.workers.size()){ + this.lastPostedPos = 0; + } + workerIndex = this.lastPostedPos++; + } + final WorkerThread workerThread = this.workers.get(workerIndex); + workerThread.postRunnable(task); + return; + } + stamp = this.workersLock.readLock(); + try { + int workerIndex; + synchronized (this.workers){ + if (this.lastPostedPos+1 >= this.workers.size()){ + this.lastPostedPos = 0; + } + workerIndex = this.lastPostedPos++; + } + final WorkerThread workerThread = this.workers.get(workerIndex); + workerThread.postRunnable(task); + }finally { + this.workersLock.unlockRead(stamp); + } + } + + public Future postTask(@NotNull Callable task){ + final FutureTask newTask = new FutureTask<>(task); + this.postTask(newTask); + return newTask; + } + + private class WorkerThread implements Runnable { + private final Deque tasks = new ArrayDeque<>(); + private final ReadWriteLock taskLock = new ReentrantReadWriteLock(); + private final AtomicBoolean runningTask = new AtomicBoolean(); + private final Thread worker = WorkStealingThreadPool.this.factory.getNewThread(this); + + private volatile boolean shouldRun = false; + private volatile boolean running = false; + + + public void postRunnable(@NotNull Runnable task){ + this.taskLock.writeLock().lock(); + try { + this.tasks.add(task); + }finally { + this.taskLock.writeLock().unlock(); + } + } + + public void sendStop(){ + this.shouldRun = false; + LockSupport.unpark(this.worker); + } + + public void awaitTerminate(){ + while (this.running){ + LockSupport.parkNanos(100000); + } + } + + public void awaitTasks(){ + while (this.runningTask.get()){ + LockSupport.parkNanos(100000); + } + } + + @Override + public void run(){ + Runnable curTask; + Runnable stole; + while (this.shouldRun || !this.tasks.isEmpty()){ + if ((curTask = this.pollTask(false)) != null){ + this.runningTask.set(true); + try { + curTask.run(); + }catch (Exception e){ + e.printStackTrace(); + }finally { + this.runningTask.set(false); + } + continue; + } + + if ((stole = this.steal()) != null){ + try { + stole.run(); + }catch (Exception e){ + e.printStackTrace(); + } + continue; + } + + LockSupport.parkNanos("FREE WAITING",1000000); + } + this.running = false; + } + + public void start(){ + this.running = true; + this.shouldRun = true; + this.worker.start(); + } + + @Nullable + private Runnable steal(){ + long stamp = WorkStealingThreadPool.this.workersLock.tryOptimisticRead(); + if (WorkStealingThreadPool.this.workersLock.validate(stamp)){ + for (WorkerThread workerThread : WorkStealingThreadPool.this.workers){ + if (workerThread.equals(this)){ + continue; + } + if (workerThread.isCurrentThreadRunningTask() && workerThread.getQueuedTaskCount() >= 1){ + return workerThread.pollTask(true); + } + } + return null; + } + + stamp = WorkStealingThreadPool.this.workersLock.readLock(); + try { + for (WorkerThread workerThread : WorkStealingThreadPool.this.workers){ + if (workerThread.equals(this)){ + continue; + } + if (workerThread.isCurrentThreadRunningTask() && workerThread.getQueuedTaskCount() >= 1){ + return workerThread.pollTask(true); + } + } + return null; + }finally { + WorkStealingThreadPool.this.workersLock.unlockRead(stamp); + } + } + + + protected int getQueuedTaskCount(){ + this.taskLock.readLock().lock(); + try { + return this.tasks.size(); + }finally { + this.taskLock.readLock().unlock(); + } + } + + protected boolean isCurrentThreadRunningTask(){ + return this.runningTask.get(); + } + + @Nullable + protected Runnable pollTask(boolean tail){ + Runnable polledRunnable; + + this.taskLock.writeLock().lock(); + try { + polledRunnable = tail ? this.tasks.pollLast() : this.tasks.pollFirst(); + }finally { + this.taskLock.writeLock().unlock(); + } + + return polledRunnable; + } + } + + public static class PackagedExecutor implements Executor{ + private final WorkStealingThreadPool internalPool; + + public PackagedExecutor(@NotNull WorkStealingThreadPool internalPool) { + this.internalPool = internalPool; + } + + @Override + public void execute(@NotNull Runnable command) { + this.internalPool.postTask(command); + } + } +} \ No newline at end of file diff --git a/src/main/java/co/earthme/hearse/concurrent/WorkerThreadFactory.java b/src/main/java/co/earthme/hearse/concurrent/WorkerThreadFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..5ffca8dac85fcbe50a4445ebc375b33d8228d690 --- /dev/null +++ b/src/main/java/co/earthme/hearse/concurrent/WorkerThreadFactory.java @@ -0,0 +1,7 @@ +package co.earthme.hearse.concurrent; + +import co.earthme.hearse.concurrent.thread.WorkerThread; + +public interface WorkerThreadFactory { + WorkerThread getNewThread(Runnable task); +} diff --git a/src/main/java/co/earthme/hearse/concurrent/WorkerThreadPoolExecutor.java b/src/main/java/co/earthme/hearse/concurrent/WorkerThreadPoolExecutor.java new file mode 100644 index 0000000000000000000000000000000000000000..7e010bf23c9fc26284212a4388172f5d7d5a4b99 --- /dev/null +++ b/src/main/java/co/earthme/hearse/concurrent/WorkerThreadPoolExecutor.java @@ -0,0 +1,76 @@ +package co.earthme.hearse.concurrent; + +import org.jetbrains.annotations.NotNull; + +import java.util.Queue; +import java.util.concurrent.*; +import java.util.concurrent.locks.LockSupport; + +public class WorkerThreadPoolExecutor extends ThreadPoolExecutor { + private final Queue taskEntries = new ConcurrentLinkedQueue<>(); + + public WorkerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, @NotNull TimeUnit unit, @NotNull BlockingQueue workQueue, @NotNull WorkerThreadFactory workerThreadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, workerThreadFactory::getNewThread); + } + + public WorkerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, @NotNull TimeUnit unit, @NotNull BlockingQueue workQueue, @NotNull WorkerThreadFactory workerThreadFactory, @NotNull RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, workerThreadFactory::getNewThread, handler); + } + + public int getCurrentNotProcessingTasks(){ + return this.getQueue().size(); + } + + public void clearAllTasks(){ + this.getQueue().clear(); + } + + public void executeWithSubTask(Runnable mainTask,Runnable subTask){ + final TaskEntry wrapped = new TaskEntry(subTask,mainTask); + this.taskEntries.offer(wrapped); + this.execute(wrapped); + } + + public void runAllSubTasks(){ + TaskEntry task; + while ((task = this.taskEntries.poll())!=null){ + while (!task.allRunned()){ + LockSupport.parkNanos(this,10000000); + } + } + } + + private static class TaskEntry implements Runnable{ + private final Runnable mainTask; + private final Runnable subTask; + private volatile boolean mainTaskFinished = false; + + public TaskEntry(Runnable subTask,Runnable mainTask){ + this.subTask = subTask; + this.mainTask = mainTask; + } + + public boolean allRunned(){ + if (!this.mainTaskFinished){ + return false; + } + try { + this.subTask.run(); + }catch (Exception e){ + e.printStackTrace(); + } + return true; + } + + @Override + public void run() { + try { + this.mainTask.run(); + }catch(Exception e){ + e.printStackTrace(); + }finally { + this.mainTaskFinished = true; + } + } + } +} diff --git a/src/main/java/co/earthme/hearse/concurrent/thread/Worker.java b/src/main/java/co/earthme/hearse/concurrent/thread/Worker.java new file mode 100644 index 0000000000000000000000000000000000000000..e7a944bd515af644bad37a23e012a5a1997e110d --- /dev/null +++ b/src/main/java/co/earthme/hearse/concurrent/thread/Worker.java @@ -0,0 +1,7 @@ +package co.earthme.hearse.concurrent.thread; + +public interface Worker { + static boolean isWorker(){ + return Thread.currentThread() instanceof Worker; + } +} diff --git a/src/main/java/co/earthme/hearse/concurrent/thread/WorkerForkJoinThread.java b/src/main/java/co/earthme/hearse/concurrent/thread/WorkerForkJoinThread.java new file mode 100644 index 0000000000000000000000000000000000000000..6a2fb0643b6fc5921b24674940c2c3b92b9e4e88 --- /dev/null +++ b/src/main/java/co/earthme/hearse/concurrent/thread/WorkerForkJoinThread.java @@ -0,0 +1,10 @@ +package co.earthme.hearse.concurrent.thread; + +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; + +public class WorkerForkJoinThread extends ForkJoinWorkerThread implements Worker { + protected WorkerForkJoinThread(ForkJoinPool pool) { + super(pool); + } +} diff --git a/src/main/java/co/earthme/hearse/concurrent/thread/WorkerThread.java b/src/main/java/co/earthme/hearse/concurrent/thread/WorkerThread.java new file mode 100644 index 0000000000000000000000000000000000000000..f27bfd7ab3ce10e6c318de0c6376a80fa9014d2a --- /dev/null +++ b/src/main/java/co/earthme/hearse/concurrent/thread/WorkerThread.java @@ -0,0 +1,14 @@ +package co.earthme.hearse.concurrent.thread; + +import io.papermc.paper.util.TickThread; + +public class WorkerThread extends TickThread implements Worker{ + + public WorkerThread(String name) { + super(name); + } + + public WorkerThread(Runnable run, String name) { + super(run, name); + } +} diff --git a/src/main/java/co/earthme/hearse/concurrent/threadfactory/DefaultWorkerFactory.java b/src/main/java/co/earthme/hearse/concurrent/threadfactory/DefaultWorkerFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..b88b4f2d3df7303252a3c02824be3514c2618673 --- /dev/null +++ b/src/main/java/co/earthme/hearse/concurrent/threadfactory/DefaultWorkerFactory.java @@ -0,0 +1,41 @@ +package co.earthme.hearse.concurrent.threadfactory; + +import co.earthme.hearse.concurrent.thread.WorkerThread; +import co.earthme.hearse.concurrent.WorkerThreadFactory; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import it.unimi.dsi.fastutil.objects.ObjectLists; +import net.minecraft.server.MinecraftServer; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class DefaultWorkerFactory implements WorkerThreadFactory { + private static final AtomicInteger poolId = new AtomicInteger(); + private final AtomicInteger threadId = new AtomicInteger(); + private final String bound; + private final List createdThreads = ObjectLists.synchronize(new ObjectArrayList<>()); + + public DefaultWorkerFactory(String bound){ + poolId.getAndIncrement(); + this.bound = bound; + } + + public List getCreatedThreads() { + return this.createdThreads; + } + + @Override + public WorkerThread getNewThread(Runnable task) { + final WorkerThread workerThread = new WorkerThread(()->{ + try { + task.run(); + }finally { + this.createdThreads.remove(Thread.currentThread()); + } + },"pool-"+poolId.get()+"-worker-"+threadId.getAndIncrement()+"-bound-"+this.bound); + this.createdThreads.add(workerThread); + workerThread.setDaemon(true); + workerThread.setPriority(Thread.NORM_PRIORITY - 2); + workerThread.setContextClassLoader(MinecraftServer.class.getClassLoader()); + return workerThread; + } +} diff --git a/src/main/java/co/earthme/hearse/server/ServerEntityTickHook.java b/src/main/java/co/earthme/hearse/server/ServerEntityTickHook.java new file mode 100644 index 0000000000000000000000000000000000000000..343d3b83fe9bffdc6d2bea03354bcf3246a0e0b5 --- /dev/null +++ b/src/main/java/co/earthme/hearse/server/ServerEntityTickHook.java @@ -0,0 +1,82 @@ +package co.earthme.hearse.server; + +import co.earthme.hearse.HearseConfig; +import co.earthme.hearse.concurrent.WorkStealingThreadPool; +import co.earthme.hearse.concurrent.WorkerThreadFactory; +import co.earthme.hearse.concurrent.threadfactory.DefaultWorkerFactory; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.level.ServerLevel; +import net.minecraft.world.entity.Entity; +import net.minecraft.world.entity.player.Player; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; + +public class ServerEntityTickHook { + private static final Logger logger = LogManager.getLogger(); + private static final WorkerThreadFactory defFactory = new DefaultWorkerFactory("entity"); + private static WorkStealingThreadPool worker; + private static boolean asyncEntityEnabled = true; + private static boolean awaitEntityTasks = true; + private static final AtomicInteger taskCounter = new AtomicInteger(0); + + //To prevent the chunk system error.This is necessary + public static void awaitEntityTasks(){ + while (taskCounter.get() > 0){ + LockSupport.parkNanos("Await entities",1000000); + } + } + + public static void init(){ + asyncEntityEnabled = HearseConfig.getBoolean("optimizations.enable-parallel-entity",asyncEntityEnabled); + awaitEntityTasks = HearseConfig.getBoolean("optimizations.await-parallel-entity-tasks",awaitEntityTasks); + if (asyncEntityEnabled){ + final int workerCount = HearseConfig.getInt("workers.async-entity-worker-count",Runtime.getRuntime().availableProcessors()); + worker = new WorkStealingThreadPool( + workerCount, + defFactory + ); + } + } + + public static void callAsyncEntityTick(Entity entity, ServerLevel level){ + MinecraftServer.getServer().executeMidTickTasks(); + taskCounter.getAndIncrement(); + Runnable task = ()->{ + try { + if (!entity.isRemoved()) { + entity.checkDespawn(); + Entity entity1 = entity.getVehicle(); + if (entity1 != null) { + if (!entity1.isRemoved() && entity1.hasPassenger(entity)) { + return; + } + entity.stopRiding(); + } + try { + level.tickNonPassenger(entity); + } catch (Throwable throwable) { + if (throwable instanceof ThreadDeath) throw throwable; + level.getCraftServer().getPluginManager().callEvent(new com.destroystokyo.paper.event.server.ServerExceptionEvent(new com.destroystokyo.paper.exception.ServerInternalException(throwable.getMessage(), throwable))); + throwable.printStackTrace(); + } + } + }finally { + taskCounter.getAndDecrement(); + } + }; + if (!asyncEntityEnabled || entity instanceof Player){ + task.run(); + return; + } + try { + worker.postTask(task); + }catch (RejectedExecutionException e){ + logger.warn("Worker rejected our task.Falling back to sync entity updating"); + asyncEntityEnabled = false; + } + } +} diff --git a/src/main/java/co/earthme/hearse/server/ServerLevelTickHook.java b/src/main/java/co/earthme/hearse/server/ServerLevelTickHook.java new file mode 100644 index 0000000000000000000000000000000000000000..4c85bf8e4705a781a55a048f0b0541f0d32e2f07 --- /dev/null +++ b/src/main/java/co/earthme/hearse/server/ServerLevelTickHook.java @@ -0,0 +1,77 @@ +package co.earthme.hearse.server; + +import co.earthme.hearse.Hearse; +import co.earthme.hearse.HearseConfig; +import co.earthme.hearse.concurrent.WorkerThreadPoolExecutor; +import co.earthme.hearse.concurrent.threadfactory.DefaultWorkerFactory; +import co.earthme.hearse.util.ArrayListBlockingQueue; +import io.netty.handler.codec.serialization.ObjectEncoder; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.level.ServerLevel; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; +import java.util.function.BooleanSupplier; + +public class ServerLevelTickHook { + private static final DefaultWorkerFactory workerFactory = new DefaultWorkerFactory("world"); + private static WorkerThreadPoolExecutor worker; + private static final AtomicInteger activeTaskCount = new AtomicInteger(); + private static final Logger logger = LogManager.getLogger(); + + public static void initWorker(){ + boolean enabledParaWorld = HearseConfig.getBoolean("optimizations.enableparallelworldtick", true); + if (enabledParaWorld){ + worker = new WorkerThreadPoolExecutor( + MinecraftServer.getServer().levels.size(), + MinecraftServer.getServer().levels.size(), + Long.MAX_VALUE, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(MinecraftServer.getServer().levels.size()), + workerFactory + ); + worker.allowCoreThreadTimeOut(true); + worker.prestartAllCoreThreads(); + Hearse.getWorkerManager().addWorker("world",worker); + for (Thread worker : workerFactory.getCreatedThreads()){ + logger.warn("World worker name:{}.This can help you to slove the lag problems when you using parallel world ticking",worker.getName()); + } + } + } + + public static void callWorldTick(ServerLevel worldserver, BooleanSupplier shouldKeepTicking){ + activeTaskCount.getAndIncrement(); + Runnable task = () -> { + try { + try { + worldserver.tick(shouldKeepTicking); + for (final io.papermc.paper.chunk.SingleThreadChunkRegionManager regionManager : worldserver.getChunkSource().chunkMap.regionManagers) { + regionManager.recalculateRegions(); + } + } catch (Throwable throwable) { + throwable.printStackTrace(); + } + worldserver.explosionDensityCache.clear(); + }finally { + activeTaskCount.getAndDecrement(); + } + }; + + if (worker == null){ + task.run(); + return; + } + + worker.execute(task); + } + + public static void awaitWorldTicKTasks(){ + while (activeTaskCount.get() > 0){ + LockSupport.parkNanos("Await world ticking",1000000); + } + } +} diff --git a/src/main/java/co/earthme/hearse/util/ArrayListBlockingQueue.java b/src/main/java/co/earthme/hearse/util/ArrayListBlockingQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..ec63191dca67c51f387ede9796a039210c8c3f99 --- /dev/null +++ b/src/main/java/co/earthme/hearse/util/ArrayListBlockingQueue.java @@ -0,0 +1,256 @@ +package co.earthme.hearse.util; + +import org.jetbrains.annotations.NotNull; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +public class ArrayListBlockingQueue implements BlockingQueue { + private final List internalList = new ArrayList<>(); + private final StampedLock editLock = new StampedLock(); + + @Override + public boolean add(@NotNull T t) { + final long id = this.editLock.writeLock(); + try { + return this.internalList.add(t); + }finally { + this.editLock.unlockWrite(id); + } + } + + @Override + public boolean offer(@NotNull T t) { + final long id = this.editLock.writeLock(); + try { + return this.internalList.add(t); + }finally { + this.editLock.unlockWrite(id); + } + } + + @Override + public T remove() { + final long id = this.editLock.writeLock(); + try { + return this.internalList.remove(0); + }finally { + this.editLock.unlockWrite(id); + } + } + + @Override + public T poll() { + final long id = this.editLock.writeLock(); + try { + return this.internalList.isEmpty() ? null : this.internalList.remove(0); + }finally { + this.editLock.unlockWrite(id); + } + } + + @Override + public T element() { + long id = this.editLock.readLock(); + try { + if (this.internalList.isEmpty()){ + throw new NoSuchElementException(); + } + return this.internalList.get(0); + }finally { + this.editLock.unlockRead(id); + } + } + + @Override + public T peek() { + long id = this.editLock.readLock(); + try { + if (this.internalList.isEmpty()){ + throw new NoSuchElementException(); + } + return this.internalList.get(0); + }finally { + this.editLock.unlockRead(id); + } + } + + @Override + public void put(@NotNull T t) { + final long id = this.editLock.writeLock(); + try { + this.internalList.add(t); + }finally { + this.editLock.unlockWrite(id); + } + } + + @Override + public boolean offer(T t, long timeout, @NotNull TimeUnit unit) { + final long id = this.editLock.writeLock(); + try { + return this.internalList.add(t); + }finally { + this.editLock.unlockWrite(id); + } + } + + @Override + public T take() throws InterruptedException { + T t; + while ((t = this.poll()) == null){ + synchronized (this){ + this.wait(0,1); + } + } + return t; + } + + @Override + public T poll(long timeout, @NotNull TimeUnit unit) throws InterruptedException { + T t; + long countTime = unit.toNanos(timeout); + while ((t = this.poll()) == null){ + if (countTime == 0){ + break; + } + synchronized (this){ + this.wait(0,1); + } + countTime--; + } + return t; + } + + @Override + public int remainingCapacity() { + throw new UnsupportedOperationException("remainingCapacity"); + } + + @Override + public boolean remove(Object o) { + final long id = this.editLock.writeLock(); + try { + return this.internalList.remove(o); + }finally { + this.editLock.unlockWrite(id); + } + } + + @Override + public boolean containsAll(@NotNull Collection c) { + final long id = this.editLock.writeLock(); + try { + return new HashSet<>(this.internalList).containsAll(c); + }finally { + this.editLock.unlockWrite(id); + } + } + + @Override + public boolean addAll(@NotNull Collection c) { + final long id = this.editLock.writeLock(); + try { + return this.internalList.addAll(c); + }finally { + this.editLock.unlockWrite(id); + } + } + + @Override + public boolean removeAll(@NotNull Collection c) { + final long id = this.editLock.writeLock(); + try { + return this.internalList.removeAll(c); + }finally { + this.editLock.unlockWrite(id); + } + } + + @Override + public boolean retainAll(@NotNull Collection c) { + final long id = this.editLock.writeLock(); + try { + return this.internalList.retainAll(c); + }finally { + this.editLock.unlockWrite(id); + } + } + + @Override + public void clear() { + final long id = this.editLock.writeLock(); + try { + this.internalList.clear(); + }finally { + this.editLock.unlockWrite(id); + } + } + + @Override + public int size() { + long id = this.editLock.readLock(); + try { + return this.internalList.size(); + }finally { + this.editLock.unlockRead(id); + } + } + + @Override + public boolean isEmpty() { + long id = this.editLock.readLock(); + try { + return this.internalList.isEmpty(); + }finally { + this.editLock.unlockRead(id); + } + } + + @Override + public boolean contains(Object o) { + long id = this.editLock.readLock(); + try { + return this.internalList.contains(o); + }finally { + this.editLock.unlockRead(id); + } + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException("Iterator"); + } + + @Override + public Object[] toArray() { + long id = this.editLock.readLock(); + try { + return this.internalList.toArray(); + }finally { + this.editLock.unlockRead(id); + } + } + + @Override + public T1[] toArray(T1 @NotNull [] a) { + long id = this.editLock.readLock(); + try { + return this.internalList.toArray(a); + }finally { + this.editLock.unlockRead(id); + } + } + + @Override + public int drainTo(@NotNull Collection c) { + throw new UnsupportedOperationException("drainTo"); + } + + @Override + public int drainTo(@NotNull Collection c, int maxElements) { + throw new UnsupportedOperationException("drainTo"); + } +} + diff --git a/src/main/java/co/earthme/hearse/util/EntityPositionCache.java b/src/main/java/co/earthme/hearse/util/EntityPositionCache.java new file mode 100644 index 0000000000000000000000000000000000000000..6f34233901cf1943694224ab393dea5548cb8e5b --- /dev/null +++ b/src/main/java/co/earthme/hearse/util/EntityPositionCache.java @@ -0,0 +1,60 @@ +package co.earthme.hearse.util; + +import net.minecraft.world.entity.Entity; +import net.minecraft.world.entity.LivingEntity; +import net.minecraft.world.phys.Vec3; +import org.jetbrains.annotations.NotNull; + +public class EntityPositionCache { + private final double x; + private final double y; + private final double z; + private final LivingEntity currentEntity; + + public EntityPositionCache(@NotNull LivingEntity entity){ + this.x = entity.getX(); + this.y = entity.getY(); + this.z = entity.getZ(); + this.currentEntity = entity; + } + + public LivingEntity getCurrentEntity() { + return this.currentEntity; + } + + public double getX() { + return this.x; + } + + public double getY() { + return this.y; + } + + public double getZ() { + return this.z; + } + + public double distanceToSqr(double x, double y, double z) { + double d3 = this.x - x; + double d4 = this.y - y; + double d5 = this.z - z; + + return d3 * d3 + d4 * d4 + d5 * d5; + } + + public double distanceToSqr(Entity entity) { + return this.distanceToSqr(entity.position()); + } + + public double distanceToSqr(Vec3 vector) { + double d0 = this.x - vector.x; + double d1 = this.y - vector.y; + double d2 = this.z - vector.z; + + return d0 * d0 + d1 * d1 + d2 * d2; + } + + public double distanceToSqr(EntityPositionCache entityPositionCache) { + return this.distanceToSqr(entityPositionCache.getX(),entityPositionCache.getY(),entityPositionCache.getZ()); + } +} diff --git a/src/main/java/co/earthme/hearse/workers/WorkerThreadPoolManager.java b/src/main/java/co/earthme/hearse/workers/WorkerThreadPoolManager.java new file mode 100644 index 0000000000000000000000000000000000000000..8cb0d00fb3cd4282873c8b8db88c87e59f8ef9de --- /dev/null +++ b/src/main/java/co/earthme/hearse/workers/WorkerThreadPoolManager.java @@ -0,0 +1,69 @@ +package co.earthme.hearse.workers; + +import co.earthme.hearse.concurrent.WorkerThreadPoolExecutor; +import com.google.common.collect.Maps; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; + +public class WorkerThreadPoolManager { + private final Map managedWorkers = Maps.newConcurrentMap(); + + public void addWorker(String bound,WorkerThreadPoolExecutor worker){ + this.managedWorkers.put(bound,worker); + } + + public void shutdownAll() throws InterruptedException { + for (WorkerThreadPoolExecutor worker : this.managedWorkers.values()){ + if (!worker.isShutdown()){ + worker.getQueue().clear(); //Clear the tasks.We don't need wait them + worker.shutdown(); + while (worker.awaitTermination(100, TimeUnit.MILLISECONDS)); {} + } + } + } + + @Deprecated + public Map getManagedWorkers() { + return Maps.newHashMap(this.managedWorkers); + } + + @Deprecated + public WorkerThreadPoolExecutor getTargetWorker(String bound){ + return this.managedWorkers.get(bound); + } + + public Map> shutdownAllNow(){ + final Map> ret = Maps.newHashMap(); + for (Map.Entry entry : this.managedWorkers.entrySet()){ + final String workerName = entry.getKey(); + final WorkerThreadPoolExecutor worker = entry.getValue(); + if (!worker.isShutdown()){ + try { + final List taskNotRunned = worker.shutdownNow(); + ret.put(workerName,taskNotRunned); + }catch (Exception e){ + e.printStackTrace(); + } + } + } + return ret; + } + + public void shutdownAll(long singleWorkerAwaitTimeOutCount) throws InterruptedException { + long counter = singleWorkerAwaitTimeOutCount; + for (WorkerThreadPoolExecutor worker : this.managedWorkers.values()){ + if (!worker.isShutdown()){ + worker.shutdown(); + while (worker.awaitTermination(1, TimeUnit.MILLISECONDS)) { + if (counter == 0){ + break; + } + counter--; + } + counter = singleWorkerAwaitTimeOutCount; + } + } + } +}