From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: Taiyou06 Date: Tue, 25 Mar 2025 00:00:36 +0100 Subject: [PATCH] AsyncPacketSending diff --git a/net/minecraft/network/Connection.java b/net/minecraft/network/Connection.java index 5b46036868b6c9d082e35591e58735e16adaae62..b352523e08e212ca4086904042b921af32cd3172 100644 --- a/net/minecraft/network/Connection.java +++ b/net/minecraft/network/Connection.java @@ -66,6 +66,16 @@ import org.slf4j.Logger; import org.slf4j.Marker; import org.slf4j.MarkerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; +import org.dreeam.leaf.config.modules.async.AsyncPacketSending; + public class Connection extends SimpleChannelInboundHandler> { private static final float AVERAGE_PACKETS_SMOOTHING = 0.75F; private static final Logger LOGGER = LogUtils.getLogger(); @@ -128,6 +138,35 @@ public class Connection extends SimpleChannelInboundHandler> { return null; } // Paper end - add utility methods + + private static final ExecutorService PACKET_EXECUTOR; + private static final AtomicInteger PACKET_THREAD_ID = new AtomicInteger(0); + private final AtomicBoolean processingScheduled = new AtomicBoolean(false); + + static { + // Initialize the packet executor only if async sending is enabled + if (AsyncPacketSending.enabled) { + ThreadFactory threadFactory = r -> { + Thread thread = new Thread(r, "Leaf - Async-Packet-Sender-" + PACKET_THREAD_ID.incrementAndGet()); + thread.setDaemon(true); + thread.setPriority(Thread.NORM_PRIORITY - 1); // Slightly lower priority + return thread; + }; + + // Use bounded queue to prevent memory issues + PACKET_EXECUTOR = new ThreadPoolExecutor( + Math.max(1, AsyncPacketSending.threadPoolSize), + Math.max(1, AsyncPacketSending.threadPoolSize), + 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(AsyncPacketSending.queueCapacity), + threadFactory, + new ThreadPoolExecutor.CallerRunsPolicy() // Fallback to caller thread if queue is full + ); + } else { + PACKET_EXECUTOR = null; + } + } + // Paper start - packet limiter protected final Object PACKET_LIMIT_LOCK = new Object(); protected final @Nullable io.papermc.paper.util.IntervalledCounter allPacketCounts = io.papermc.paper.configuration.GlobalConfiguration.get().packetLimiter.allPackets.isEnabled() ? new io.papermc.paper.util.IntervalledCounter( @@ -474,11 +513,82 @@ public class Connection extends SimpleChannelInboundHandler> { private void sendPacket(Packet packet, @Nullable PacketSendListener sendListener, boolean flush) { this.sentPackets++; + + // Fast path: if we're already in the event loop, execute directly if (this.channel.eventLoop().inEventLoop()) { this.doSendPacket(packet, sendListener, flush); - } else { + return; + } + + // Early return if async sending is disabled or executor not initialized + if (!AsyncPacketSending.enabled || PACKET_EXECUTOR == null) { + this.channel.eventLoop().execute(() -> this.doSendPacket(packet, sendListener, flush)); + return; + } + + // Handle high priority packets directly on Netty event loop + if (isHighPriorityPacket(packet)) { this.channel.eventLoop().execute(() -> this.doSendPacket(packet, sendListener, flush)); + return; + } + + // For regular packets, use our async executor + PACKET_EXECUTOR.execute(() -> { + try { + // Wait for packet to be ready if needed + if (AsyncPacketSending.spinWaitForReadyPackets && !packet.isReady()) { + long startTime = System.nanoTime(); + while (!packet.isReady() && + System.nanoTime() - startTime < AsyncPacketSending.spinTimeNanos) { + Thread.onSpinWait(); + } + } + + // We still need to execute on the event loop for the actual channel operations + // as Netty requires channel operations to be on its event loop + this.channel.eventLoop().execute(() -> this.doSendPacket(packet, sendListener, flush)); + } catch (RejectedExecutionException e) { + // If the event loop is shutting down + LOGGER.debug("Failed to schedule packet send, event loop may be shutting down", e); + if (packet.hasFinishListener()) { + packet.onPacketDispatchFinish(this.getPlayer(), null); + } + } catch (Exception e) { + LOGGER.error("Error in async packet sending", e); + if (packet.hasFinishListener()) { + packet.onPacketDispatchFinish(this.getPlayer(), null); + } + } + }); + } + + // Helper method to determine if a packet should have high priority + private boolean isHighPriorityPacket(Packet packet) { + // Critical packets that should never be delayed + if (packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket || + packet instanceof net.minecraft.network.protocol.common.ClientboundDisconnectPacket) { + return true; } + + // Movement packets if prioritization is enabled + if (AsyncPacketSending.prioritizeMovementPackets && ( + packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerPositionPacket || + packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerLookAtPacket || + packet instanceof net.minecraft.network.protocol.game.ClientboundTeleportEntityPacket + )) { + return true; + } + + // Chat packets if prioritization is enabled + if (AsyncPacketSending.prioritizeChatPackets && ( + packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket || + packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket || + packet instanceof net.minecraft.network.protocol.game.ClientboundDisguisedChatPacket + )) { + return true; + } + + return false; } private void doSendPacket(Packet packet, @Nullable PacketSendListener sendListener, boolean flush) { @@ -539,15 +649,106 @@ public class Connection extends SimpleChannelInboundHandler> { if (!this.isConnected()) { return true; } - if (io.papermc.paper.util.MCUtil.isMainThread()) { - return this.processQueue(); - } else if (this.isPending) { - // Should only happen during login/status stages + + // If async sending is disabled or we're in a special case, use original logic + if (!AsyncPacketSending.enabled || PACKET_EXECUTOR == null || this.pendingActions.isEmpty()) { + if (io.papermc.paper.util.MCUtil.isMainThread()) { + return this.processQueue(); + } else if (this.isPending) { + // Should only happen during login/status stages + synchronized (this.pendingActions) { + return this.processQueue(); + } + } + return false; + } + + // For login/status stages, stick with synchronous processing + if (this.isPending) { synchronized (this.pendingActions) { return this.processQueue(); } } - return false; + + // Schedule async processing if not already scheduled + if (processingScheduled.compareAndSet(false, true)) { + PACKET_EXECUTOR.execute(this::processQueueAsync); + } + + return true; + } + + private void processQueueAsync() { + try { + if (AsyncPacketSending.batchProcessing) { + processQueueBatched(); + } else { + processQueue(); + } + } finally { + // Allow scheduling again + processingScheduled.set(false); + + // If there are still items in the queue, schedule another processing round + if (!this.pendingActions.isEmpty()) { + if (processingScheduled.compareAndSet(false, true)) { + PACKET_EXECUTOR.execute(this::processQueueAsync); + } + } + } + } + + // Process queue in batches for better efficiency + private void processQueueBatched() { + int maxToProcess = AsyncPacketSending.batchSize; + int processed = 0; + + while (processed < maxToProcess && !this.pendingActions.isEmpty()) { + WrappedConsumer action = this.pendingActions.poll(); + if (action == null) { + break; + } + + processed++; + + if (action.isConsumed()) { + continue; + } + + // Check if packet is ready for sending + if (action instanceof PacketSendAction packetSendAction) { + Packet packet = packetSendAction.packet; + if (!packet.isReady()) { + // Not ready, put back in queue + this.pendingActions.add(action); + continue; + } + } + + // Execute the action + if (action.tryMarkConsumed()) { + try { + action.accept(this); + } catch (Exception e) { + LOGGER.error("Error processing packet action", e); + } + } + } + } + + // Add a shutdown method for clean shutdown + public static void shutdownAsyncPacketSender() { + if (PACKET_EXECUTOR != null) { + PACKET_EXECUTOR.shutdown(); + try { + if (!PACKET_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS)) { + PACKET_EXECUTOR.shutdownNow(); + } + } catch (InterruptedException e) { + PACKET_EXECUTOR.shutdownNow(); + Thread.currentThread().interrupt(); + } + } } private boolean processQueue() { diff --git a/net/minecraft/server/MinecraftServer.java b/net/minecraft/server/MinecraftServer.java index 7739b4955dcb489c6bba9c9db65ba87025f7c669..e8b4be3d9f4838d8bad8bed5270c725141b7368f 100644 --- a/net/minecraft/server/MinecraftServer.java +++ b/net/minecraft/server/MinecraftServer.java @@ -70,6 +70,7 @@ import net.minecraft.core.RegistryAccess; import net.minecraft.core.registries.Registries; import net.minecraft.data.worldgen.features.MiscOverworldFeatures; import net.minecraft.gametest.framework.GameTestTicker; +import net.minecraft.network.Connection; import net.minecraft.network.chat.ChatDecorator; import net.minecraft.network.chat.ChatType; import net.minecraft.network.chat.Component; @@ -1018,6 +1019,9 @@ public abstract class MinecraftServer extends ReentrantBlockableEventLoop