mirror of
https://github.com/Winds-Studio/Leaf.git
synced 2025-12-31 12:56:29 +00:00
* async player packet sending * small cleanup * eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee * holy shit this is fast * some cleanup * change .size to O(1) * rewrite starts (i need to do this OMEGA SAFE) * rebuilt * rebase * Rewritten AsyncPacketSending
282 lines
12 KiB
Diff
282 lines
12 KiB
Diff
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
|
|
From: Taiyou06 <kaandindar21@gmail.com>
|
|
Date: Tue, 25 Mar 2025 00:00:36 +0100
|
|
Subject: [PATCH] AsyncPacketSending
|
|
|
|
|
|
diff --git a/net/minecraft/network/Connection.java b/net/minecraft/network/Connection.java
|
|
index 5b46036868b6c9d082e35591e58735e16adaae62..b352523e08e212ca4086904042b921af32cd3172 100644
|
|
--- a/net/minecraft/network/Connection.java
|
|
+++ b/net/minecraft/network/Connection.java
|
|
@@ -66,6 +66,16 @@ import org.slf4j.Logger;
|
|
import org.slf4j.Marker;
|
|
import org.slf4j.MarkerFactory;
|
|
|
|
+
|
|
+import java.util.concurrent.ExecutorService;
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
+import java.util.concurrent.TimeUnit;
|
|
+import java.util.concurrent.ThreadFactory;
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
+import org.dreeam.leaf.config.modules.async.AsyncPacketSending;
|
|
+
|
|
public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
private static final float AVERAGE_PACKETS_SMOOTHING = 0.75F;
|
|
private static final Logger LOGGER = LogUtils.getLogger();
|
|
@@ -128,6 +138,35 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
return null;
|
|
}
|
|
// Paper end - add utility methods
|
|
+
|
|
+ private static final ExecutorService PACKET_EXECUTOR;
|
|
+ private static final AtomicInteger PACKET_THREAD_ID = new AtomicInteger(0);
|
|
+ private final AtomicBoolean processingScheduled = new AtomicBoolean(false);
|
|
+
|
|
+ static {
|
|
+ // Initialize the packet executor only if async sending is enabled
|
|
+ if (AsyncPacketSending.enabled) {
|
|
+ ThreadFactory threadFactory = r -> {
|
|
+ Thread thread = new Thread(r, "Leaf - Async-Packet-Sender-" + PACKET_THREAD_ID.incrementAndGet());
|
|
+ thread.setDaemon(true);
|
|
+ thread.setPriority(Thread.NORM_PRIORITY - 1); // Slightly lower priority
|
|
+ return thread;
|
|
+ };
|
|
+
|
|
+ // Use bounded queue to prevent memory issues
|
|
+ PACKET_EXECUTOR = new ThreadPoolExecutor(
|
|
+ Math.max(1, AsyncPacketSending.threadPoolSize),
|
|
+ Math.max(1, AsyncPacketSending.threadPoolSize),
|
|
+ 60L, TimeUnit.SECONDS,
|
|
+ new LinkedBlockingQueue<>(AsyncPacketSending.queueCapacity),
|
|
+ threadFactory,
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy() // Fallback to caller thread if queue is full
|
|
+ );
|
|
+ } else {
|
|
+ PACKET_EXECUTOR = null;
|
|
+ }
|
|
+ }
|
|
+
|
|
// Paper start - packet limiter
|
|
protected final Object PACKET_LIMIT_LOCK = new Object();
|
|
protected final @Nullable io.papermc.paper.util.IntervalledCounter allPacketCounts = io.papermc.paper.configuration.GlobalConfiguration.get().packetLimiter.allPackets.isEnabled() ? new io.papermc.paper.util.IntervalledCounter(
|
|
@@ -474,11 +513,82 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
|
|
private void sendPacket(Packet<?> packet, @Nullable PacketSendListener sendListener, boolean flush) {
|
|
this.sentPackets++;
|
|
+
|
|
+ // Fast path: if we're already in the event loop, execute directly
|
|
if (this.channel.eventLoop().inEventLoop()) {
|
|
this.doSendPacket(packet, sendListener, flush);
|
|
- } else {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ // Early return if async sending is disabled or executor not initialized
|
|
+ if (!AsyncPacketSending.enabled || PACKET_EXECUTOR == null) {
|
|
+ this.channel.eventLoop().execute(() -> this.doSendPacket(packet, sendListener, flush));
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ // Handle high priority packets directly on Netty event loop
|
|
+ if (isHighPriorityPacket(packet)) {
|
|
this.channel.eventLoop().execute(() -> this.doSendPacket(packet, sendListener, flush));
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ // For regular packets, use our async executor
|
|
+ PACKET_EXECUTOR.execute(() -> {
|
|
+ try {
|
|
+ // Wait for packet to be ready if needed
|
|
+ if (AsyncPacketSending.spinWaitForReadyPackets && !packet.isReady()) {
|
|
+ long startTime = System.nanoTime();
|
|
+ while (!packet.isReady() &&
|
|
+ System.nanoTime() - startTime < AsyncPacketSending.spinTimeNanos) {
|
|
+ Thread.onSpinWait();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ // We still need to execute on the event loop for the actual channel operations
|
|
+ // as Netty requires channel operations to be on its event loop
|
|
+ this.channel.eventLoop().execute(() -> this.doSendPacket(packet, sendListener, flush));
|
|
+ } catch (RejectedExecutionException e) {
|
|
+ // If the event loop is shutting down
|
|
+ LOGGER.debug("Failed to schedule packet send, event loop may be shutting down", e);
|
|
+ if (packet.hasFinishListener()) {
|
|
+ packet.onPacketDispatchFinish(this.getPlayer(), null);
|
|
+ }
|
|
+ } catch (Exception e) {
|
|
+ LOGGER.error("Error in async packet sending", e);
|
|
+ if (packet.hasFinishListener()) {
|
|
+ packet.onPacketDispatchFinish(this.getPlayer(), null);
|
|
+ }
|
|
+ }
|
|
+ });
|
|
+ }
|
|
+
|
|
+ // Helper method to determine if a packet should have high priority
|
|
+ private boolean isHighPriorityPacket(Packet<?> packet) {
|
|
+ // Critical packets that should never be delayed
|
|
+ if (packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.common.ClientboundDisconnectPacket) {
|
|
+ return true;
|
|
}
|
|
+
|
|
+ // Movement packets if prioritization is enabled
|
|
+ if (AsyncPacketSending.prioritizeMovementPackets && (
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerPositionPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerLookAtPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundTeleportEntityPacket
|
|
+ )) {
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ // Chat packets if prioritization is enabled
|
|
+ if (AsyncPacketSending.prioritizeChatPackets && (
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundDisguisedChatPacket
|
|
+ )) {
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ return false;
|
|
}
|
|
|
|
private void doSendPacket(Packet<?> packet, @Nullable PacketSendListener sendListener, boolean flush) {
|
|
@@ -539,15 +649,106 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
if (!this.isConnected()) {
|
|
return true;
|
|
}
|
|
- if (io.papermc.paper.util.MCUtil.isMainThread()) {
|
|
- return this.processQueue();
|
|
- } else if (this.isPending) {
|
|
- // Should only happen during login/status stages
|
|
+
|
|
+ // If async sending is disabled or we're in a special case, use original logic
|
|
+ if (!AsyncPacketSending.enabled || PACKET_EXECUTOR == null || this.pendingActions.isEmpty()) {
|
|
+ if (io.papermc.paper.util.MCUtil.isMainThread()) {
|
|
+ return this.processQueue();
|
|
+ } else if (this.isPending) {
|
|
+ // Should only happen during login/status stages
|
|
+ synchronized (this.pendingActions) {
|
|
+ return this.processQueue();
|
|
+ }
|
|
+ }
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ // For login/status stages, stick with synchronous processing
|
|
+ if (this.isPending) {
|
|
synchronized (this.pendingActions) {
|
|
return this.processQueue();
|
|
}
|
|
}
|
|
- return false;
|
|
+
|
|
+ // Schedule async processing if not already scheduled
|
|
+ if (processingScheduled.compareAndSet(false, true)) {
|
|
+ PACKET_EXECUTOR.execute(this::processQueueAsync);
|
|
+ }
|
|
+
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ private void processQueueAsync() {
|
|
+ try {
|
|
+ if (AsyncPacketSending.batchProcessing) {
|
|
+ processQueueBatched();
|
|
+ } else {
|
|
+ processQueue();
|
|
+ }
|
|
+ } finally {
|
|
+ // Allow scheduling again
|
|
+ processingScheduled.set(false);
|
|
+
|
|
+ // If there are still items in the queue, schedule another processing round
|
|
+ if (!this.pendingActions.isEmpty()) {
|
|
+ if (processingScheduled.compareAndSet(false, true)) {
|
|
+ PACKET_EXECUTOR.execute(this::processQueueAsync);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ // Process queue in batches for better efficiency
|
|
+ private void processQueueBatched() {
|
|
+ int maxToProcess = AsyncPacketSending.batchSize;
|
|
+ int processed = 0;
|
|
+
|
|
+ while (processed < maxToProcess && !this.pendingActions.isEmpty()) {
|
|
+ WrappedConsumer action = this.pendingActions.poll();
|
|
+ if (action == null) {
|
|
+ break;
|
|
+ }
|
|
+
|
|
+ processed++;
|
|
+
|
|
+ if (action.isConsumed()) {
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ // Check if packet is ready for sending
|
|
+ if (action instanceof PacketSendAction packetSendAction) {
|
|
+ Packet<?> packet = packetSendAction.packet;
|
|
+ if (!packet.isReady()) {
|
|
+ // Not ready, put back in queue
|
|
+ this.pendingActions.add(action);
|
|
+ continue;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ // Execute the action
|
|
+ if (action.tryMarkConsumed()) {
|
|
+ try {
|
|
+ action.accept(this);
|
|
+ } catch (Exception e) {
|
|
+ LOGGER.error("Error processing packet action", e);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ // Add a shutdown method for clean shutdown
|
|
+ public static void shutdownAsyncPacketSender() {
|
|
+ if (PACKET_EXECUTOR != null) {
|
|
+ PACKET_EXECUTOR.shutdown();
|
|
+ try {
|
|
+ if (!PACKET_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS)) {
|
|
+ PACKET_EXECUTOR.shutdownNow();
|
|
+ }
|
|
+ } catch (InterruptedException e) {
|
|
+ PACKET_EXECUTOR.shutdownNow();
|
|
+ Thread.currentThread().interrupt();
|
|
+ }
|
|
+ }
|
|
}
|
|
|
|
private boolean processQueue() {
|
|
diff --git a/net/minecraft/server/MinecraftServer.java b/net/minecraft/server/MinecraftServer.java
|
|
index 7739b4955dcb489c6bba9c9db65ba87025f7c669..e8b4be3d9f4838d8bad8bed5270c725141b7368f 100644
|
|
--- a/net/minecraft/server/MinecraftServer.java
|
|
+++ b/net/minecraft/server/MinecraftServer.java
|
|
@@ -70,6 +70,7 @@ import net.minecraft.core.RegistryAccess;
|
|
import net.minecraft.core.registries.Registries;
|
|
import net.minecraft.data.worldgen.features.MiscOverworldFeatures;
|
|
import net.minecraft.gametest.framework.GameTestTicker;
|
|
+import net.minecraft.network.Connection;
|
|
import net.minecraft.network.chat.ChatDecorator;
|
|
import net.minecraft.network.chat.ChatType;
|
|
import net.minecraft.network.chat.Component;
|
|
@@ -1018,6 +1019,9 @@ public abstract class MinecraftServer extends ReentrantBlockableEventLoop<TickTa
|
|
// CraftBukkit end
|
|
if (io.papermc.paper.plugin.PluginInitializerManager.instance().pluginRemapper != null) io.papermc.paper.plugin.PluginInitializerManager.instance().pluginRemapper.shutdown(); // Paper - Plugin remapping
|
|
this.getConnection().stop();
|
|
+
|
|
+ Connection.shutdownAsyncPacketSender();
|
|
+
|
|
this.isSaving = true;
|
|
if (this.playerList != null) {
|
|
LOGGER.info("Saving players");
|