diff --git a/leaf-server/minecraft-patches/features/0155-AsyncPacketSending.patch b/leaf-server/minecraft-patches/features/0155-AsyncPacketSending.patch deleted file mode 100644 index 29db1654..00000000 --- a/leaf-server/minecraft-patches/features/0155-AsyncPacketSending.patch +++ /dev/null @@ -1,281 +0,0 @@ -From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 -From: Taiyou06 -Date: Tue, 25 Mar 2025 00:00:36 +0100 -Subject: [PATCH] AsyncPacketSending - - -diff --git a/net/minecraft/network/Connection.java b/net/minecraft/network/Connection.java -index 5b46036868b6c9d082e35591e58735e16adaae62..b352523e08e212ca4086904042b921af32cd3172 100644 ---- a/net/minecraft/network/Connection.java -+++ b/net/minecraft/network/Connection.java -@@ -66,6 +66,16 @@ import org.slf4j.Logger; - import org.slf4j.Marker; - import org.slf4j.MarkerFactory; - -+ -+import java.util.concurrent.ExecutorService; -+import java.util.concurrent.LinkedBlockingQueue; -+import java.util.concurrent.ThreadPoolExecutor; -+import java.util.concurrent.TimeUnit; -+import java.util.concurrent.ThreadFactory; -+import java.util.concurrent.atomic.AtomicInteger; -+import java.util.concurrent.atomic.AtomicBoolean; -+import org.dreeam.leaf.config.modules.async.AsyncPacketSending; -+ - public class Connection extends SimpleChannelInboundHandler> { - private static final float AVERAGE_PACKETS_SMOOTHING = 0.75F; - private static final Logger LOGGER = LogUtils.getLogger(); -@@ -128,6 +138,35 @@ public class Connection extends SimpleChannelInboundHandler> { - return null; - } - // Paper end - add utility methods -+ -+ private static final ExecutorService PACKET_EXECUTOR; -+ private static final AtomicInteger PACKET_THREAD_ID = new AtomicInteger(0); -+ private final AtomicBoolean processingScheduled = new AtomicBoolean(false); -+ -+ static { -+ // Initialize the packet executor only if async sending is enabled -+ if (AsyncPacketSending.enabled) { -+ ThreadFactory threadFactory = r -> { -+ Thread thread = new Thread(r, "Leaf - Async-Packet-Sender-" + PACKET_THREAD_ID.incrementAndGet()); -+ thread.setDaemon(true); -+ thread.setPriority(Thread.NORM_PRIORITY - 1); // Slightly lower priority -+ return thread; -+ }; -+ -+ // Use bounded queue to prevent memory issues -+ PACKET_EXECUTOR = new ThreadPoolExecutor( -+ Math.max(1, AsyncPacketSending.threadPoolSize), -+ Math.max(1, AsyncPacketSending.threadPoolSize), -+ 60L, TimeUnit.SECONDS, -+ new LinkedBlockingQueue<>(AsyncPacketSending.queueCapacity), -+ threadFactory, -+ new ThreadPoolExecutor.CallerRunsPolicy() // Fallback to caller thread if queue is full -+ ); -+ } else { -+ PACKET_EXECUTOR = null; -+ } -+ } -+ - // Paper start - packet limiter - protected final Object PACKET_LIMIT_LOCK = new Object(); - protected final @Nullable io.papermc.paper.util.IntervalledCounter allPacketCounts = io.papermc.paper.configuration.GlobalConfiguration.get().packetLimiter.allPackets.isEnabled() ? new io.papermc.paper.util.IntervalledCounter( -@@ -474,11 +513,82 @@ public class Connection extends SimpleChannelInboundHandler> { - - private void sendPacket(Packet packet, @Nullable PacketSendListener sendListener, boolean flush) { - this.sentPackets++; -+ -+ // Fast path: if we're already in the event loop, execute directly - if (this.channel.eventLoop().inEventLoop()) { - this.doSendPacket(packet, sendListener, flush); -- } else { -+ return; -+ } -+ -+ // Early return if async sending is disabled or executor not initialized -+ if (!AsyncPacketSending.enabled || PACKET_EXECUTOR == null) { -+ this.channel.eventLoop().execute(() -> this.doSendPacket(packet, sendListener, flush)); -+ return; -+ } -+ -+ // Handle high priority packets directly on Netty event loop -+ if (isHighPriorityPacket(packet)) { - this.channel.eventLoop().execute(() -> this.doSendPacket(packet, sendListener, flush)); -+ return; -+ } -+ -+ // For regular packets, use our async executor -+ PACKET_EXECUTOR.execute(() -> { -+ try { -+ // Wait for packet to be ready if needed -+ if (AsyncPacketSending.spinWaitForReadyPackets && !packet.isReady()) { -+ long startTime = System.nanoTime(); -+ while (!packet.isReady() && -+ System.nanoTime() - startTime < AsyncPacketSending.spinTimeNanos) { -+ Thread.onSpinWait(); -+ } -+ } -+ -+ // We still need to execute on the event loop for the actual channel operations -+ // as Netty requires channel operations to be on its event loop -+ this.channel.eventLoop().execute(() -> this.doSendPacket(packet, sendListener, flush)); -+ } catch (RejectedExecutionException e) { -+ // If the event loop is shutting down -+ LOGGER.debug("Failed to schedule packet send, event loop may be shutting down", e); -+ if (packet.hasFinishListener()) { -+ packet.onPacketDispatchFinish(this.getPlayer(), null); -+ } -+ } catch (Exception e) { -+ LOGGER.error("Error in async packet sending", e); -+ if (packet.hasFinishListener()) { -+ packet.onPacketDispatchFinish(this.getPlayer(), null); -+ } -+ } -+ }); -+ } -+ -+ // Helper method to determine if a packet should have high priority -+ private boolean isHighPriorityPacket(Packet packet) { -+ // Critical packets that should never be delayed -+ if (packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket || -+ packet instanceof net.minecraft.network.protocol.common.ClientboundDisconnectPacket) { -+ return true; - } -+ -+ // Movement packets if prioritization is enabled -+ if (AsyncPacketSending.prioritizeMovementPackets && ( -+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerPositionPacket || -+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerLookAtPacket || -+ packet instanceof net.minecraft.network.protocol.game.ClientboundTeleportEntityPacket -+ )) { -+ return true; -+ } -+ -+ // Chat packets if prioritization is enabled -+ if (AsyncPacketSending.prioritizeChatPackets && ( -+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket || -+ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket || -+ packet instanceof net.minecraft.network.protocol.game.ClientboundDisguisedChatPacket -+ )) { -+ return true; -+ } -+ -+ return false; - } - - private void doSendPacket(Packet packet, @Nullable PacketSendListener sendListener, boolean flush) { -@@ -539,15 +649,106 @@ public class Connection extends SimpleChannelInboundHandler> { - if (!this.isConnected()) { - return true; - } -- if (io.papermc.paper.util.MCUtil.isMainThread()) { -- return this.processQueue(); -- } else if (this.isPending) { -- // Should only happen during login/status stages -+ -+ // If async sending is disabled or we're in a special case, use original logic -+ if (!AsyncPacketSending.enabled || PACKET_EXECUTOR == null || this.pendingActions.isEmpty()) { -+ if (io.papermc.paper.util.MCUtil.isMainThread()) { -+ return this.processQueue(); -+ } else if (this.isPending) { -+ // Should only happen during login/status stages -+ synchronized (this.pendingActions) { -+ return this.processQueue(); -+ } -+ } -+ return false; -+ } -+ -+ // For login/status stages, stick with synchronous processing -+ if (this.isPending) { - synchronized (this.pendingActions) { - return this.processQueue(); - } - } -- return false; -+ -+ // Schedule async processing if not already scheduled -+ if (processingScheduled.compareAndSet(false, true)) { -+ PACKET_EXECUTOR.execute(this::processQueueAsync); -+ } -+ -+ return true; -+ } -+ -+ private void processQueueAsync() { -+ try { -+ if (AsyncPacketSending.batchProcessing) { -+ processQueueBatched(); -+ } else { -+ processQueue(); -+ } -+ } finally { -+ // Allow scheduling again -+ processingScheduled.set(false); -+ -+ // If there are still items in the queue, schedule another processing round -+ if (!this.pendingActions.isEmpty()) { -+ if (processingScheduled.compareAndSet(false, true)) { -+ PACKET_EXECUTOR.execute(this::processQueueAsync); -+ } -+ } -+ } -+ } -+ -+ // Process queue in batches for better efficiency -+ private void processQueueBatched() { -+ int maxToProcess = AsyncPacketSending.batchSize; -+ int processed = 0; -+ -+ while (processed < maxToProcess && !this.pendingActions.isEmpty()) { -+ WrappedConsumer action = this.pendingActions.poll(); -+ if (action == null) { -+ break; -+ } -+ -+ processed++; -+ -+ if (action.isConsumed()) { -+ continue; -+ } -+ -+ // Check if packet is ready for sending -+ if (action instanceof PacketSendAction packetSendAction) { -+ Packet packet = packetSendAction.packet; -+ if (!packet.isReady()) { -+ // Not ready, put back in queue -+ this.pendingActions.add(action); -+ continue; -+ } -+ } -+ -+ // Execute the action -+ if (action.tryMarkConsumed()) { -+ try { -+ action.accept(this); -+ } catch (Exception e) { -+ LOGGER.error("Error processing packet action", e); -+ } -+ } -+ } -+ } -+ -+ // Add a shutdown method for clean shutdown -+ public static void shutdownAsyncPacketSender() { -+ if (PACKET_EXECUTOR != null) { -+ PACKET_EXECUTOR.shutdown(); -+ try { -+ if (!PACKET_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS)) { -+ PACKET_EXECUTOR.shutdownNow(); -+ } -+ } catch (InterruptedException e) { -+ PACKET_EXECUTOR.shutdownNow(); -+ Thread.currentThread().interrupt(); -+ } -+ } - } - - private boolean processQueue() { -diff --git a/net/minecraft/server/MinecraftServer.java b/net/minecraft/server/MinecraftServer.java -index 7739b4955dcb489c6bba9c9db65ba87025f7c669..e8b4be3d9f4838d8bad8bed5270c725141b7368f 100644 ---- a/net/minecraft/server/MinecraftServer.java -+++ b/net/minecraft/server/MinecraftServer.java -@@ -70,6 +70,7 @@ import net.minecraft.core.RegistryAccess; - import net.minecraft.core.registries.Registries; - import net.minecraft.data.worldgen.features.MiscOverworldFeatures; - import net.minecraft.gametest.framework.GameTestTicker; -+import net.minecraft.network.Connection; - import net.minecraft.network.chat.ChatDecorator; - import net.minecraft.network.chat.ChatType; - import net.minecraft.network.chat.Component; -@@ -1018,6 +1019,9 @@ public abstract class MinecraftServer extends ReentrantBlockableEventLoop