From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: Taiyou06 Date: Fri, 21 Feb 2025 15:52:42 +0100 Subject: [PATCH] Rewrite queue on Connection.flushQueue diff --git a/net/minecraft/network/Connection.java b/net/minecraft/network/Connection.java index 7b78c0af4a83bd39a5bc2d6554cc677bd4c0c822..dd3ed93700202e581cabccf4d53f6fd71810008d 100644 --- a/net/minecraft/network/Connection.java +++ b/net/minecraft/network/Connection.java @@ -85,7 +85,7 @@ public class Connection extends SimpleChannelInboundHandler> { private static final ProtocolInfo INITIAL_PROTOCOL = HandshakeProtocols.SERVERBOUND; private final PacketFlow receiving; private volatile boolean sendLoginDisconnect = true; - private final Queue pendingActions = Queues.newConcurrentLinkedQueue(); // Paper - Optimize network + private final Queue pendingActions = org.dreeam.leaf.config.modules.network.ConnectionFlushQueueRewrite.enabled ? new java.util.ArrayDeque<>() : Queues.newConcurrentLinkedQueue(); // Paper - Optimize network // Leaf - Rewrite queue on Connection.flushQueue public Channel channel; public SocketAddress address; // Spigot start @@ -541,9 +541,17 @@ public class Connection extends SimpleChannelInboundHandler> { if (io.papermc.paper.util.MCUtil.isMainThread()) { return this.processQueue(); } else if (this.isPending) { - // Should only happen during login/status stages - synchronized (this.pendingActions) { - return this.processQueue(); + if (org.dreeam.leaf.config.modules.network.ConnectionFlushQueueRewrite.enabled) { + // Leaf start - Rewrite queue on Connection.flushQueue + // Submit to the event loop to ensure thread confinement + this.channel.eventLoop().execute(this::processQueue); + return false; + // Leaf end - Rewrite queue on Connection.flushQueue + } else { + // Original Paper behavior + synchronized (this.pendingActions) { + return this.processQueue(); + } } } return false; @@ -554,36 +562,56 @@ public class Connection extends SimpleChannelInboundHandler> { return true; } - // If we are on main, we are safe here in that nothing else should be processing queue off main anymore - // But if we are not on main due to login/status, the parent is synchronized on packetQueue - final java.util.Iterator iterator = this.pendingActions.iterator(); - while (iterator.hasNext()) { - final WrappedConsumer queued = iterator.next(); // poll -> peek + if (org.dreeam.leaf.config.modules.network.ConnectionFlushQueueRewrite.enabled) { + // Leaf start - Rewrite queue on Connection.flushQueue + WrappedConsumer queued; + while ((queued = this.pendingActions.poll()) != null) { + if (queued instanceof PacketSendAction packetSendAction) { + final Packet packet = packetSendAction.packet; + if (!packet.isReady()) { + // Re-add to the front and exit + this.pendingActions.add(queued); + return false; + } + } - // Fix NPE (Spigot bug caused by handleDisconnection()) - if (queued == null) { - return true; + if (queued.tryMarkConsumed()) { + queued.accept(this); + } } + // Leaf end - Rewrite queue on Connection.flushQueue + } else { + // If we are on main, we are safe here in that nothing else should be processing queue off main anymore + // But if we are not on main due to login/status, the parent is synchronized on packetQueue + final java.util.Iterator iterator = this.pendingActions.iterator(); + while (iterator.hasNext()) { + final WrappedConsumer queued = iterator.next(); // poll -> peek + + // Fix NPE (Spigot bug caused by handleDisconnection()) + if (queued == null) { + return true; + } - if (queued.isConsumed()) { - continue; - } + if (queued.isConsumed()) { + continue; + } - if (queued instanceof PacketSendAction packetSendAction) { - final Packet packet = packetSendAction.packet; - if (!packet.isReady()) { - return false; + if (queued instanceof PacketSendAction packetSendAction) { + final Packet packet = packetSendAction.packet; + if (!packet.isReady()) { + return false; + } } - } - iterator.remove(); - if (queued.tryMarkConsumed()) { - queued.accept(this); + iterator.remove(); + if (queued.tryMarkConsumed()) { + queued.accept(this); + } } } return true; } - // Paper end - Optimize network +// Paper end - Optimize network private static final int MAX_PER_TICK = io.papermc.paper.configuration.GlobalConfiguration.get().misc.maxJoinsPerTick; // Paper - Buffer joins to world private static int joinAttemptsThisTick; // Paper - Buffer joins to world @@ -910,19 +938,44 @@ public class Connection extends SimpleChannelInboundHandler> { this.bandwidthDebugMonitor = new BandwidthDebugMonitor(bandwithLogger); } - // Paper start - Optimize network + // Paper start - Optimize network - // Leaf start - Rewrite queue on Connection.flushQueue public void clearPacketQueue() { final net.minecraft.server.level.ServerPlayer player = getPlayer(); - for (final Consumer queuedAction : this.pendingActions) { - if (queuedAction instanceof PacketSendAction packetSendAction) { - final Packet packet = packetSendAction.packet; - if (packet.hasFinishListener()) { - packet.onPacketDispatchFinish(player, null); + + if (org.dreeam.leaf.config.modules.network.ConnectionFlushQueueRewrite.enabled) { + // When using Leaf's queue rewrite, ensure thread safety via event loop + if (this.channel != null && !this.channel.eventLoop().inEventLoop()) { + this.channel.eventLoop().execute(() -> this.clearPacketQueue()); + return; + } + + // Take a snapshot to avoid ConcurrentModificationException + java.util.List> queueSnapshot = new java.util.ArrayList<>(this.pendingActions); + for (Consumer queuedAction : queueSnapshot) { + if (queuedAction instanceof PacketSendAction packetSendAction) { + final Packet packet = packetSendAction.packet; + if (packet.hasFinishListener()) { + packet.onPacketDispatchFinish(player, null); + } + } + } + this.pendingActions.clear(); + } else { + // Original Paper behavior - use synchronization + synchronized (this.pendingActions) { + for (final Consumer queuedAction : this.pendingActions) { + if (queuedAction instanceof PacketSendAction packetSendAction) { + final Packet packet = packetSendAction.packet; + if (packet.hasFinishListener()) { + packet.onPacketDispatchFinish(player, null); + } + } } + this.pendingActions.clear(); } } - this.pendingActions.clear(); } + // Leaf end - Rewrite queue on Connection.flushQueue private static class InnerUtil { // Attempt to hide these methods from ProtocolLib, so it doesn't accidently pick them up.