diff --git a/leaf-server/minecraft-patches/features/0125-Rewrite-queue-on-Connection.flushQueue.patch b/leaf-server/minecraft-patches/features/0125-Rewrite-queue-on-Connection.flushQueue.patch index 0a52495a..507a0596 100644 --- a/leaf-server/minecraft-patches/features/0125-Rewrite-queue-on-Connection.flushQueue.patch +++ b/leaf-server/minecraft-patches/features/0125-Rewrite-queue-on-Connection.flushQueue.patch @@ -5,7 +5,7 @@ Subject: [PATCH] Rewrite queue on Connection.flushQueue diff --git a/net/minecraft/network/Connection.java b/net/minecraft/network/Connection.java -index 7b78c0af4a83bd39a5bc2d6554cc677bd4c0c822..dd3ed93700202e581cabccf4d53f6fd71810008d 100644 +index 7b78c0af4a83bd39a5bc2d6554cc677bd4c0c822..a5c0dd4b150fe85ab7633b1232e211f67de035b6 100644 --- a/net/minecraft/network/Connection.java +++ b/net/minecraft/network/Connection.java @@ -85,7 +85,7 @@ public class Connection extends SimpleChannelInboundHandler> { @@ -24,31 +24,26 @@ index 7b78c0af4a83bd39a5bc2d6554cc677bd4c0c822..dd3ed93700202e581cabccf4d53f6fd7 - // Should only happen during login/status stages - synchronized (this.pendingActions) { - return this.processQueue(); ++ // Leaf start - Rewrite queue on Connection.flushQueue + if (org.dreeam.leaf.config.modules.network.ConnectionFlushQueueRewrite.enabled) { -+ // Leaf start - Rewrite queue on Connection.flushQueue + // Submit to the event loop to ensure thread confinement + this.channel.eventLoop().execute(this::processQueue); + return false; -+ // Leaf end - Rewrite queue on Connection.flushQueue + } else { + // Original Paper behavior + synchronized (this.pendingActions) { + return this.processQueue(); ++ // Leaf end - Rewrite queue on Connection.flushQueue + } } } return false; -@@ -554,36 +562,56 @@ public class Connection extends SimpleChannelInboundHandler> { +@@ -554,6 +562,24 @@ public class Connection extends SimpleChannelInboundHandler> { return true; } -- // If we are on main, we are safe here in that nothing else should be processing queue off main anymore -- // But if we are not on main due to login/status, the parent is synchronized on packetQueue -- final java.util.Iterator iterator = this.pendingActions.iterator(); -- while (iterator.hasNext()) { -- final WrappedConsumer queued = iterator.next(); // poll -> peek ++ // Leaf start - Rewrite queue on Connection.flushQueue + if (org.dreeam.leaf.config.modules.network.ConnectionFlushQueueRewrite.enabled) { -+ // Leaf start - Rewrite queue on Connection.flushQueue + WrappedConsumer queued; + while ((queued = this.pendingActions.poll()) != null) { + if (queued instanceof PacketSendAction packetSendAction) { @@ -59,76 +54,30 @@ index 7b78c0af4a83bd39a5bc2d6554cc677bd4c0c822..dd3ed93700202e581cabccf4d53f6fd7 + return false; + } + } - -- // Fix NPE (Spigot bug caused by handleDisconnection()) -- if (queued == null) { -- return true; -+ if (queued.tryMarkConsumed()) { -+ queued.accept(this); -+ } - } -+ // Leaf end - Rewrite queue on Connection.flushQueue -+ } else { -+ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore -+ // But if we are not on main due to login/status, the parent is synchronized on packetQueue -+ final java.util.Iterator iterator = this.pendingActions.iterator(); -+ while (iterator.hasNext()) { -+ final WrappedConsumer queued = iterator.next(); // poll -> peek + -+ // Fix NPE (Spigot bug caused by handleDisconnection()) -+ if (queued == null) { -+ return true; -+ } - -- if (queued.isConsumed()) { -- continue; -- } -+ if (queued.isConsumed()) { -+ continue; -+ } - -- if (queued instanceof PacketSendAction packetSendAction) { -- final Packet packet = packetSendAction.packet; -- if (!packet.isReady()) { -- return false; -+ if (queued instanceof PacketSendAction packetSendAction) { -+ final Packet packet = packetSendAction.packet; -+ if (!packet.isReady()) { -+ return false; -+ } - } -- } - -- iterator.remove(); -- if (queued.tryMarkConsumed()) { -- queued.accept(this); -+ iterator.remove(); + if (queued.tryMarkConsumed()) { + queued.accept(this); + } ++ } ++ } else { + // If we are on main, we are safe here in that nothing else should be processing queue off main anymore + // But if we are not on main due to login/status, the parent is synchronized on packetQueue + final java.util.Iterator iterator = this.pendingActions.iterator(); +@@ -581,6 +607,8 @@ public class Connection extends SimpleChannelInboundHandler> { + queued.accept(this); } } ++ } ++ // Leaf end - Rewrite queue on Connection.flushQueue return true; } -- // Paper end - Optimize network -+// Paper end - Optimize network - - private static final int MAX_PER_TICK = io.papermc.paper.configuration.GlobalConfiguration.get().misc.maxJoinsPerTick; // Paper - Buffer joins to world - private static int joinAttemptsThisTick; // Paper - Buffer joins to world -@@ -910,19 +938,44 @@ public class Connection extends SimpleChannelInboundHandler> { - this.bandwidthDebugMonitor = new BandwidthDebugMonitor(bandwithLogger); - } - -- // Paper start - Optimize network -+ // Paper start - Optimize network - // Leaf start - Rewrite queue on Connection.flushQueue + // Paper end - Optimize network +@@ -913,6 +941,31 @@ public class Connection extends SimpleChannelInboundHandler> { + // Paper start - Optimize network public void clearPacketQueue() { final net.minecraft.server.level.ServerPlayer player = getPlayer(); -- for (final Consumer queuedAction : this.pendingActions) { -- if (queuedAction instanceof PacketSendAction packetSendAction) { -- final Packet packet = packetSendAction.packet; -- if (packet.hasFinishListener()) { -- packet.onPacketDispatchFinish(player, null); + ++ // Leaf start - Rewrite queue on Connection.flushQueue + if (org.dreeam.leaf.config.modules.network.ConnectionFlushQueueRewrite.enabled) { + // When using Leaf's queue rewrite, ensure thread safety via event loop + if (this.channel != null && !this.channel.eventLoop().inEventLoop()) { @@ -138,6 +87,7 @@ index 7b78c0af4a83bd39a5bc2d6554cc677bd4c0c822..dd3ed93700202e581cabccf4d53f6fd7 + + // Take a snapshot to avoid ConcurrentModificationException + java.util.List> queueSnapshot = new java.util.ArrayList<>(this.pendingActions); ++ + for (Consumer queuedAction : queueSnapshot) { + if (queuedAction instanceof PacketSendAction packetSendAction) { + final Packet packet = packetSendAction.packet; @@ -146,24 +96,21 @@ index 7b78c0af4a83bd39a5bc2d6554cc677bd4c0c822..dd3ed93700202e581cabccf4d53f6fd7 + } + } + } ++ + this.pendingActions.clear(); + } else { -+ // Original Paper behavior - use synchronization -+ synchronized (this.pendingActions) { -+ for (final Consumer queuedAction : this.pendingActions) { -+ if (queuedAction instanceof PacketSendAction packetSendAction) { -+ final Packet packet = packetSendAction.packet; -+ if (packet.hasFinishListener()) { -+ packet.onPacketDispatchFinish(player, null); -+ } -+ } - } -+ this.pendingActions.clear(); ++ // Original Paper behavior - use synchronization ++ synchronized (this.pendingActions) { + for (final Consumer queuedAction : this.pendingActions) { + if (queuedAction instanceof PacketSendAction packetSendAction) { + final Packet packet = packetSendAction.packet; +@@ -922,6 +975,9 @@ public class Connection extends SimpleChannelInboundHandler> { } } -- this.pendingActions.clear(); + this.pendingActions.clear(); ++ } ++ } ++ // Leaf end - Rewrite queue on Connection.flushQueue } -+ // Leaf end - Rewrite queue on Connection.flushQueue private static class InnerUtil { // Attempt to hide these methods from ProtocolLib, so it doesn't accidently pick them up. - diff --git a/leaf-server/src/main/java/org/dreeam/leaf/config/modules/network/ConnectionFlushQueueRewrite.java b/leaf-server/src/main/java/org/dreeam/leaf/config/modules/network/ConnectionFlushQueueRewrite.java index d3b8fc44..dbeeaa65 100644 --- a/leaf-server/src/main/java/org/dreeam/leaf/config/modules/network/ConnectionFlushQueueRewrite.java +++ b/leaf-server/src/main/java/org/dreeam/leaf/config/modules/network/ConnectionFlushQueueRewrite.java @@ -16,14 +16,14 @@ public class ConnectionFlushQueueRewrite extends ConfigModules { enabled = config.getBoolean(getBasePath() + ".connection-flush-queue-rewrite", enabled, config.pickStringRegionBased(""" This replaces ConcurrentLinkedQueue with ArrayDeque for better performance and uses the Netty event loop to ensure thread safety. - + May increase the Netty thread usage and requires server restart to take effect Default: false """, """ 此选项将 ConcurrentLinkedQueue 替换为 ArrayDeque 以提高性能, 并使用 Netty 事件循环以确保线程安全。 - + 默认值: false """)); }