9
0
mirror of https://github.com/Winds-Studio/Leaf.git synced 2025-12-19 15:09:25 +00:00

remove AsyncPacketSending for now (needs some fixes and testing 🙏)

This commit is contained in:
Taiyou06
2025-03-26 08:56:18 +01:00
parent 2c7cb52e6b
commit 1e36d848c2
2 changed files with 0 additions and 331 deletions

View File

@@ -1,281 +0,0 @@
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
From: Taiyou06 <kaandindar21@gmail.com>
Date: Tue, 25 Mar 2025 00:00:36 +0100
Subject: [PATCH] AsyncPacketSending
diff --git a/net/minecraft/network/Connection.java b/net/minecraft/network/Connection.java
index 5b46036868b6c9d082e35591e58735e16adaae62..b352523e08e212ca4086904042b921af32cd3172 100644
--- a/net/minecraft/network/Connection.java
+++ b/net/minecraft/network/Connection.java
@@ -66,6 +66,16 @@ import org.slf4j.Logger;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.dreeam.leaf.config.modules.async.AsyncPacketSending;
+
public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
private static final float AVERAGE_PACKETS_SMOOTHING = 0.75F;
private static final Logger LOGGER = LogUtils.getLogger();
@@ -128,6 +138,35 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
return null;
}
// Paper end - add utility methods
+
+ private static final ExecutorService PACKET_EXECUTOR;
+ private static final AtomicInteger PACKET_THREAD_ID = new AtomicInteger(0);
+ private final AtomicBoolean processingScheduled = new AtomicBoolean(false);
+
+ static {
+ // Initialize the packet executor only if async sending is enabled
+ if (AsyncPacketSending.enabled) {
+ ThreadFactory threadFactory = r -> {
+ Thread thread = new Thread(r, "Leaf - Async-Packet-Sender-" + PACKET_THREAD_ID.incrementAndGet());
+ thread.setDaemon(true);
+ thread.setPriority(Thread.NORM_PRIORITY - 1); // Slightly lower priority
+ return thread;
+ };
+
+ // Use bounded queue to prevent memory issues
+ PACKET_EXECUTOR = new ThreadPoolExecutor(
+ Math.max(1, AsyncPacketSending.threadPoolSize),
+ Math.max(1, AsyncPacketSending.threadPoolSize),
+ 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(AsyncPacketSending.queueCapacity),
+ threadFactory,
+ new ThreadPoolExecutor.CallerRunsPolicy() // Fallback to caller thread if queue is full
+ );
+ } else {
+ PACKET_EXECUTOR = null;
+ }
+ }
+
// Paper start - packet limiter
protected final Object PACKET_LIMIT_LOCK = new Object();
protected final @Nullable io.papermc.paper.util.IntervalledCounter allPacketCounts = io.papermc.paper.configuration.GlobalConfiguration.get().packetLimiter.allPackets.isEnabled() ? new io.papermc.paper.util.IntervalledCounter(
@@ -474,11 +513,82 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
private void sendPacket(Packet<?> packet, @Nullable PacketSendListener sendListener, boolean flush) {
this.sentPackets++;
+
+ // Fast path: if we're already in the event loop, execute directly
if (this.channel.eventLoop().inEventLoop()) {
this.doSendPacket(packet, sendListener, flush);
- } else {
+ return;
+ }
+
+ // Early return if async sending is disabled or executor not initialized
+ if (!AsyncPacketSending.enabled || PACKET_EXECUTOR == null) {
+ this.channel.eventLoop().execute(() -> this.doSendPacket(packet, sendListener, flush));
+ return;
+ }
+
+ // Handle high priority packets directly on Netty event loop
+ if (isHighPriorityPacket(packet)) {
this.channel.eventLoop().execute(() -> this.doSendPacket(packet, sendListener, flush));
+ return;
+ }
+
+ // For regular packets, use our async executor
+ PACKET_EXECUTOR.execute(() -> {
+ try {
+ // Wait for packet to be ready if needed
+ if (AsyncPacketSending.spinWaitForReadyPackets && !packet.isReady()) {
+ long startTime = System.nanoTime();
+ while (!packet.isReady() &&
+ System.nanoTime() - startTime < AsyncPacketSending.spinTimeNanos) {
+ Thread.onSpinWait();
+ }
+ }
+
+ // We still need to execute on the event loop for the actual channel operations
+ // as Netty requires channel operations to be on its event loop
+ this.channel.eventLoop().execute(() -> this.doSendPacket(packet, sendListener, flush));
+ } catch (RejectedExecutionException e) {
+ // If the event loop is shutting down
+ LOGGER.debug("Failed to schedule packet send, event loop may be shutting down", e);
+ if (packet.hasFinishListener()) {
+ packet.onPacketDispatchFinish(this.getPlayer(), null);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error in async packet sending", e);
+ if (packet.hasFinishListener()) {
+ packet.onPacketDispatchFinish(this.getPlayer(), null);
+ }
+ }
+ });
+ }
+
+ // Helper method to determine if a packet should have high priority
+ private boolean isHighPriorityPacket(Packet<?> packet) {
+ // Critical packets that should never be delayed
+ if (packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket ||
+ packet instanceof net.minecraft.network.protocol.common.ClientboundDisconnectPacket) {
+ return true;
}
+
+ // Movement packets if prioritization is enabled
+ if (AsyncPacketSending.prioritizeMovementPackets && (
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerPositionPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerLookAtPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundTeleportEntityPacket
+ )) {
+ return true;
+ }
+
+ // Chat packets if prioritization is enabled
+ if (AsyncPacketSending.prioritizeChatPackets && (
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundDisguisedChatPacket
+ )) {
+ return true;
+ }
+
+ return false;
}
private void doSendPacket(Packet<?> packet, @Nullable PacketSendListener sendListener, boolean flush) {
@@ -539,15 +649,106 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
if (!this.isConnected()) {
return true;
}
- if (io.papermc.paper.util.MCUtil.isMainThread()) {
- return this.processQueue();
- } else if (this.isPending) {
- // Should only happen during login/status stages
+
+ // If async sending is disabled or we're in a special case, use original logic
+ if (!AsyncPacketSending.enabled || PACKET_EXECUTOR == null || this.pendingActions.isEmpty()) {
+ if (io.papermc.paper.util.MCUtil.isMainThread()) {
+ return this.processQueue();
+ } else if (this.isPending) {
+ // Should only happen during login/status stages
+ synchronized (this.pendingActions) {
+ return this.processQueue();
+ }
+ }
+ return false;
+ }
+
+ // For login/status stages, stick with synchronous processing
+ if (this.isPending) {
synchronized (this.pendingActions) {
return this.processQueue();
}
}
- return false;
+
+ // Schedule async processing if not already scheduled
+ if (processingScheduled.compareAndSet(false, true)) {
+ PACKET_EXECUTOR.execute(this::processQueueAsync);
+ }
+
+ return true;
+ }
+
+ private void processQueueAsync() {
+ try {
+ if (AsyncPacketSending.batchProcessing) {
+ processQueueBatched();
+ } else {
+ processQueue();
+ }
+ } finally {
+ // Allow scheduling again
+ processingScheduled.set(false);
+
+ // If there are still items in the queue, schedule another processing round
+ if (!this.pendingActions.isEmpty()) {
+ if (processingScheduled.compareAndSet(false, true)) {
+ PACKET_EXECUTOR.execute(this::processQueueAsync);
+ }
+ }
+ }
+ }
+
+ // Process queue in batches for better efficiency
+ private void processQueueBatched() {
+ int maxToProcess = AsyncPacketSending.batchSize;
+ int processed = 0;
+
+ while (processed < maxToProcess && !this.pendingActions.isEmpty()) {
+ WrappedConsumer action = this.pendingActions.poll();
+ if (action == null) {
+ break;
+ }
+
+ processed++;
+
+ if (action.isConsumed()) {
+ continue;
+ }
+
+ // Check if packet is ready for sending
+ if (action instanceof PacketSendAction packetSendAction) {
+ Packet<?> packet = packetSendAction.packet;
+ if (!packet.isReady()) {
+ // Not ready, put back in queue
+ this.pendingActions.add(action);
+ continue;
+ }
+ }
+
+ // Execute the action
+ if (action.tryMarkConsumed()) {
+ try {
+ action.accept(this);
+ } catch (Exception e) {
+ LOGGER.error("Error processing packet action", e);
+ }
+ }
+ }
+ }
+
+ // Add a shutdown method for clean shutdown
+ public static void shutdownAsyncPacketSender() {
+ if (PACKET_EXECUTOR != null) {
+ PACKET_EXECUTOR.shutdown();
+ try {
+ if (!PACKET_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS)) {
+ PACKET_EXECUTOR.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ PACKET_EXECUTOR.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
}
private boolean processQueue() {
diff --git a/net/minecraft/server/MinecraftServer.java b/net/minecraft/server/MinecraftServer.java
index 7739b4955dcb489c6bba9c9db65ba87025f7c669..e8b4be3d9f4838d8bad8bed5270c725141b7368f 100644
--- a/net/minecraft/server/MinecraftServer.java
+++ b/net/minecraft/server/MinecraftServer.java
@@ -70,6 +70,7 @@ import net.minecraft.core.RegistryAccess;
import net.minecraft.core.registries.Registries;
import net.minecraft.data.worldgen.features.MiscOverworldFeatures;
import net.minecraft.gametest.framework.GameTestTicker;
+import net.minecraft.network.Connection;
import net.minecraft.network.chat.ChatDecorator;
import net.minecraft.network.chat.ChatType;
import net.minecraft.network.chat.Component;
@@ -1018,6 +1019,9 @@ public abstract class MinecraftServer extends ReentrantBlockableEventLoop<TickTa
// CraftBukkit end
if (io.papermc.paper.plugin.PluginInitializerManager.instance().pluginRemapper != null) io.papermc.paper.plugin.PluginInitializerManager.instance().pluginRemapper.shutdown(); // Paper - Plugin remapping
this.getConnection().stop();
+
+ Connection.shutdownAsyncPacketSender();
+
this.isSaving = true;
if (this.playerList != null) {
LOGGER.info("Saving players");

View File

@@ -1,50 +0,0 @@
package org.dreeam.leaf.config.modules.async;
import org.dreeam.leaf.config.ConfigModules;
import org.dreeam.leaf.config.EnumConfigCategory;
import org.dreeam.leaf.config.annotations.Experimental;
public class AsyncPacketSending extends ConfigModules {
public String getBasePath() {
return EnumConfigCategory.ASYNC.getBaseKeyName() + ".async-packet-sending";
}
@Experimental
public static boolean enabled = false;
public static int threadPoolSize = 4;
public static int queueCapacity = 4096;
public static boolean prioritizeMovementPackets = true;
public static boolean prioritizeChatPackets = true;
public static boolean spinWaitForReadyPackets = true;
public static long spinTimeNanos = 1000; // 1 microsecond
public static boolean batchProcessing = true;
public static int batchSize = 128;
private static boolean asyncPacketSendingInitialized;
@Override
public void onLoaded() {
config.addCommentRegionBased(getBasePath(), """
**Experimental feature**
This moves packet sending operations to background threads, reducing main thread load.
Can significantly improve performance on high-player-count servers.""",
"""
这将数据包发送操作移至后台线程,减少主线程负载。
在高玩家数量的服务器上可以显著提高性能。""");
if (!asyncPacketSendingInitialized) {
asyncPacketSendingInitialized = true;
enabled = config.getBoolean(getBasePath() + ".enabled", enabled);
threadPoolSize = config.getInt(getBasePath() + ".thread-pool-size", threadPoolSize);
queueCapacity = config.getInt(getBasePath() + ".queue-capacity", queueCapacity);
prioritizeMovementPackets = config.getBoolean(getBasePath() + ".prioritize-movement-packets", prioritizeMovementPackets);
prioritizeChatPackets = config.getBoolean(getBasePath() + ".prioritize-chat-packets", prioritizeChatPackets);
spinWaitForReadyPackets = config.getBoolean(getBasePath() + ".spin-wait-for-ready-packets", spinWaitForReadyPackets);
spinTimeNanos = config.getLong(getBasePath() + ".spin-time-nanos", spinTimeNanos);
batchProcessing = config.getBoolean(getBasePath() + ".batch-processing", batchProcessing);
batchSize = config.getInt(getBasePath() + ".batch-size", batchSize);
}
}
}