diff --git a/sources/src/main/java/io/akarin/api/Akari.java b/sources/src/main/java/io/akarin/api/Akari.java index 70e66f4ef..462171ef6 100644 --- a/sources/src/main/java/io/akarin/api/Akari.java +++ b/sources/src/main/java/io/akarin/api/Akari.java @@ -2,6 +2,8 @@ package io.akarin.api; import java.lang.reflect.Method; import java.util.Queue; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import org.apache.logging.log4j.LogManager; @@ -34,6 +36,11 @@ public abstract class Akari { */ public static final Queue callbackQueue = Queues.newConcurrentLinkedQueue(); + /** + * A common tick pool + */ + public static final ExecutorCompletionService STAGE_TICK = new ExecutorCompletionService(Executors.newFixedThreadPool(1, Akari.STAGE_FACTORY)); + /* * Timings */ diff --git a/sources/src/main/java/io/akarin/api/CheckedConcurrentLinkedQueue.java b/sources/src/main/java/io/akarin/api/CheckedConcurrentLinkedQueue.java deleted file mode 100644 index d053bdf3c..000000000 --- a/sources/src/main/java/io/akarin/api/CheckedConcurrentLinkedQueue.java +++ /dev/null @@ -1,9 +0,0 @@ -package io.akarin.api; - -import java.io.Serializable; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - -public class CheckedConcurrentLinkedQueue extends ConcurrentLinkedQueue implements Queue, Serializable { - -} \ No newline at end of file diff --git a/sources/src/main/java/io/akarin/server/mixin/core/MixinMinecraftServer.java b/sources/src/main/java/io/akarin/server/mixin/core/MixinMinecraftServer.java index 45ed44a18..536d9fcf6 100644 --- a/sources/src/main/java/io/akarin/server/mixin/core/MixinMinecraftServer.java +++ b/sources/src/main/java/io/akarin/server/mixin/core/MixinMinecraftServer.java @@ -1,10 +1,7 @@ package io.akarin.server.mixin.core; -import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Queue; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import org.bukkit.craftbukkit.CraftServer; import org.bukkit.craftbukkit.chunkio.ChunkIOExecutor; @@ -15,11 +12,7 @@ import org.spongepowered.asm.mixin.Mutable; import org.spongepowered.asm.mixin.Overwrite; import org.spongepowered.asm.mixin.Shadow; -import com.google.common.collect.Queues; - import co.aikar.timings.MinecraftTimings; -import co.aikar.timings.Timing; -import co.aikar.timings.Timings; import io.akarin.api.Akari; import net.minecraft.server.CrashReport; import net.minecraft.server.CustomFunctionData; @@ -63,8 +56,6 @@ public class MixinMinecraftServer { @Shadow public ServerConnection an() { return null; } @Shadow public CustomFunctionData aL() { return null; } - private final ExecutorCompletionService STAGE_ENTITY_TICK = new ExecutorCompletionService(Executors.newFixedThreadPool(1, Akari.STAGE_FACTORY)); - private void tickEntities(WorldServer world) { try { world.tickEntities(); @@ -123,7 +114,7 @@ public class MixinMinecraftServer { TileEntityHopper.skipHopperEvents = entityWorld.paperConfig.disableHopperMoveEvents || InventoryMoveItemEvent.getHandlerList().getRegisteredListeners().length == 0; Akari.silentTiming = true; - STAGE_ENTITY_TICK.submit(() -> tickEntities(entityWorld), null); + Akari.STAGE_TICK.submit(() -> tickEntities(entityWorld), null); try { mainWorld.timings.doTick.startTiming(); @@ -141,7 +132,7 @@ public class MixinMinecraftServer { } entityWorld.timings.tickEntities.startTiming(); - STAGE_ENTITY_TICK.take(); + Akari.STAGE_TICK.take(); entityWorld.timings.tickEntities.stopTiming(); entityWorld.getTracker().updatePlayers(); diff --git a/sources/src/main/java/io/akarin/server/mixin/core/MixinNetworkManager.java b/sources/src/main/java/io/akarin/server/mixin/core/MixinNetworkManager.java new file mode 100644 index 000000000..d700900d0 --- /dev/null +++ b/sources/src/main/java/io/akarin/server/mixin/core/MixinNetworkManager.java @@ -0,0 +1,9 @@ +package io.akarin.server.mixin.core; + +import org.spongepowered.asm.mixin.Mixin; +import net.minecraft.server.NetworkManager; + +@Mixin(value = NetworkManager.class, remap = false) +public class MixinNetworkManager { + +} diff --git a/sources/src/main/java/io/akarin/server/mixin/core/NonblockingServerConnection.java b/sources/src/main/java/io/akarin/server/mixin/core/NonblockingServerConnection.java index 6df055516..34b26a5c6 100644 --- a/sources/src/main/java/io/akarin/server/mixin/core/NonblockingServerConnection.java +++ b/sources/src/main/java/io/akarin/server/mixin/core/NonblockingServerConnection.java @@ -5,6 +5,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -15,8 +16,9 @@ import org.spongepowered.asm.mixin.Mutable; import org.spongepowered.asm.mixin.Overwrite; import org.spongepowered.asm.mixin.Shadow; import com.google.common.collect.Lists; + +import io.akarin.api.Akari; import io.akarin.api.LocalAddress; -import io.akarin.api.WrappedCollections; import io.akarin.server.core.AkarinGlobalConfig; import io.akarin.server.core.ChannelAdapter; import io.netty.bootstrap.ServerBootstrap; @@ -47,21 +49,9 @@ public class NonblockingServerConnection { */ @Shadow @Mutable @Final private List h; - private final List pending = WrappedCollections.wrappedList(Collections.synchronizedList(Lists.newLinkedList())); - @Overwrite private void addPending() {} // just keep compatibility - /** - * Removes all pending endpoints from global NetworkManager list - */ - private void removePending() { - synchronized (pending) { - h.removeAll(pending); - pending.clear(); - } - } - @Shadow @Final private MinecraftServer f; /** @@ -72,8 +62,6 @@ public class NonblockingServerConnection { registerChannels(Lists.newArrayList(LocalAddress.create(address, port))); } - private boolean needDeployList = true; - public void registerChannels(Collection data) throws IOException { Class channelClass; EventLoopGroup loopGroup; @@ -88,12 +76,6 @@ public class NonblockingServerConnection { logger.info("Using nio channel type"); } - // Since we cannot overwrite the initializer, here is the best chance to handle it - if (needDeployList) { - h = WrappedCollections.wrappedList(Lists.newCopyOnWriteArrayList()); - needDeployList = false; - } - ServerBootstrap bootstrap = new ServerBootstrap().channel(channelClass).childHandler(ChannelAdapter.create(h)).group(loopGroup); synchronized (g) { data.addAll(Lists.transform(AkarinGlobalConfig.extraAddress, s -> { @@ -125,47 +107,56 @@ public class NonblockingServerConnection { } } + public void processPackets(NetworkManager manager) { + try { + manager.a(); // PAIL: NetworkManager::processReceivedPackets + } catch (Exception ex) { + logger.warn("Failed to handle packet for {}", new Object[] { manager.getSocketAddress(), ex }); + final ChatComponentText kick = new ChatComponentText("Internal server error"); + + manager.sendPacket(new PacketPlayOutKickDisconnect(kick), new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + manager.close(kick); + } + }, new GenericFutureListener[0]); + manager.stopReading(); + } + } + /** * Will try to process the packets received by each NetworkManager, gracefully manage processing failures and cleans up dead connections (tick) */ @Overwrite - public void c() { - // Spigot - This prevents players from 'gaming' the server, and strategically relogging to increase their position in the tick order - if (SpigotConfig.playerShuffle > 0 && MinecraftServer.currentTick % SpigotConfig.playerShuffle == 0) { - Collections.shuffle(h); - } - boolean needRemoval = false; - - for (NetworkManager manager : h) { - if (manager.h()) continue; // PAIL: NetworkManager::hasNoChannel + public void c() throws InterruptedException { + synchronized (h) { + // Spigot - This prevents players from 'gaming' the server, and strategically relogging to increase their position in the tick order + if (SpigotConfig.playerShuffle > 0 && MinecraftServer.currentTick % SpigotConfig.playerShuffle == 0) { + Collections.shuffle(h); + } - if (manager.isConnected()) { - try { - manager.a(); // PAIL: NetworkManager::processReceivedPackets - } catch (Exception ex) { - logger.warn("Failed to handle packet for {}", new Object[] { manager.getSocketAddress(), ex }); - final ChatComponentText kick = new ChatComponentText("Internal server error"); + int submitted = 0; + Iterator it = h.iterator(); + while (it.hasNext()) { + NetworkManager manager = it.next(); + if (manager.h()) continue; // PAIL: NetworkManager::hasNoChannel + + if (manager.isConnected()) { + Akari.STAGE_TICK.submit(() -> processPackets(manager), null); + submitted++; + } else { + // Spigot - Fix a race condition where a NetworkManager could be unregistered just before connection. + if (manager.preparing) continue; - manager.sendPacket(new PacketPlayOutKickDisconnect(kick), new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - manager.close(kick); - } - }, new GenericFutureListener[0]); - manager.stopReading(); + it.remove(); + manager.handleDisconnection(); } - } else { - // Spigot - Fix a race condition where a NetworkManager could be unregistered just before connection. - if (manager.preparing) continue; - - needRemoval = true; - pending.add(manager); - - manager.handleDisconnection(); + } + + for (int i = 0; i < submitted; i++) { + Akari.STAGE_TICK.take(); } } - - if (needRemoval) removePending(); } } diff --git a/sources/src/main/java/net/minecraft/server/NetworkManager.java b/sources/src/main/java/net/minecraft/server/NetworkManager.java new file mode 100644 index 000000000..3d32e0056 --- /dev/null +++ b/sources/src/main/java/net/minecraft/server/NetworkManager.java @@ -0,0 +1,372 @@ +package net.minecraft.server; + +import com.google.common.collect.Queues; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.local.LocalEventLoopGroup; +import io.netty.channel.local.LocalServerChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.handler.timeout.TimeoutException; +import io.netty.util.AttributeKey; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import java.net.SocketAddress; +import java.util.Queue; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.annotation.Nullable; +import javax.crypto.SecretKey; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.Validate; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.Marker; +import org.apache.logging.log4j.MarkerManager; + +public class NetworkManager extends SimpleChannelInboundHandler> { + + private static final Logger g = LogManager.getLogger(); + public static final Marker a = MarkerManager.getMarker("NETWORK"); + public static final Marker b = MarkerManager.getMarker("NETWORK_PACKETS", NetworkManager.a); + public static final AttributeKey c = AttributeKey.valueOf("protocol"); + public static final LazyInitVar d = new LazyInitVar() { + protected NioEventLoopGroup a() { + return new NioEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Client IO #%d").setDaemon(true).build()); + } + + protected Object init() { + return this.a(); + } + }; + public static final LazyInitVar e = new LazyInitVar() { + protected EpollEventLoopGroup a() { + return new EpollEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Epoll Client IO #%d").setDaemon(true).build()); + } + + protected Object init() { + return this.a(); + } + }; + public static final LazyInitVar f = new LazyInitVar() { + protected LocalEventLoopGroup a() { + return new LocalEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Client IO #%d").setDaemon(true).build()); + } + + protected Object init() { + return this.a(); + } + }; + private final EnumProtocolDirection h; + private final Queue i = Queues.newConcurrentLinkedQueue(); private final Queue getPacketQueue() { return this.i; } // Paper - Anti-Xray - OBFHELPER + private final ReentrantReadWriteLock j = new ReentrantReadWriteLock(); + public Channel channel; + // Spigot Start // PAIL + public SocketAddress l; + public java.util.UUID spoofedUUID; + public com.mojang.authlib.properties.Property[] spoofedProfile; + public boolean preparing = true; + // Spigot End + private PacketListener m; + private IChatBaseComponent n; + private boolean o; + private boolean p; + // Paper start - NetworkClient implementation + public int protocolVersion; + public java.net.InetSocketAddress virtualHost; + private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush"); + // Paper end + + public NetworkManager(EnumProtocolDirection enumprotocoldirection) { + this.h = enumprotocoldirection; + } + + public void channelActive(ChannelHandlerContext channelhandlercontext) throws Exception { + super.channelActive(channelhandlercontext); + this.channel = channelhandlercontext.channel(); + this.l = this.channel.remoteAddress(); + // Spigot Start + this.preparing = false; + // Spigot End + + try { + this.setProtocol(EnumProtocol.HANDSHAKING); + } catch (Throwable throwable) { + NetworkManager.g.fatal(throwable); + } + + } + + public void setProtocol(EnumProtocol enumprotocol) { + this.channel.attr(NetworkManager.c).set(enumprotocol); + this.channel.config().setAutoRead(true); + NetworkManager.g.debug("Enabled auto read"); + } + + public void channelInactive(ChannelHandlerContext channelhandlercontext) throws Exception { + this.close(new ChatMessage("disconnect.endOfStream", new Object[0])); + } + + public void exceptionCaught(ChannelHandlerContext channelhandlercontext, Throwable throwable) throws Exception { + ChatMessage chatmessage; + + if (throwable instanceof TimeoutException) { + chatmessage = new ChatMessage("disconnect.timeout", new Object[0]); + } else { + chatmessage = new ChatMessage("disconnect.genericReason", new Object[] { "Internal Exception: " + throwable}); + } + + NetworkManager.g.debug(chatmessage.toPlainText(), throwable); + this.close(chatmessage); + if (MinecraftServer.getServer().isDebugging()) throwable.printStackTrace(); // Spigot + } + + protected void a(ChannelHandlerContext channelhandlercontext, Packet packet) throws Exception { + if (this.channel.isOpen()) { + try { + ((Packet) packet).a(this.m); // CraftBukkit - decompile error + } catch (CancelledPacketHandleException cancelledpackethandleexception) { + ; + } + } + + } + + public void setPacketListener(PacketListener packetlistener) { + Validate.notNull(packetlistener, "packetListener", new Object[0]); + NetworkManager.g.debug("Set listener of {} to {}", this, packetlistener); + this.m = packetlistener; + } + + public void sendPacket(Packet packet) { + if (this.isConnected() && this.trySendQueue() && !(packet instanceof PacketPlayOutMapChunk && !((PacketPlayOutMapChunk) packet).isReady())) { // Paper - Async-Anti-Xray - Add chunk packets which are not ready or all packets if the queue contains chunk packets which are not ready to the queue and send the packets later in the right order + //this.m(); // Paper - Async-Anti-Xray - Move to if statement (this.trySendQueue()) + this.a(packet, (GenericFutureListener[]) null); + } else { + this.j.writeLock().lock(); + + try { + this.i.add(new NetworkManager.QueuedPacket(packet, new GenericFutureListener[0])); + } finally { + this.j.writeLock().unlock(); + } + } + + } + + public void sendPacket(Packet packet, GenericFutureListener> genericfuturelistener, GenericFutureListener>... agenericfuturelistener) { + if (this.isConnected() && this.trySendQueue() && !(packet instanceof PacketPlayOutMapChunk && !((PacketPlayOutMapChunk) packet).isReady())) { // Paper - Async-Anti-Xray - Add chunk packets which are not ready or all packets if the queue contains chunk packets which are not ready to the queue and send the packets later in the right order + //this.m(); // Paper - Async-Anti-Xray - Move to if statement (this.trySendQueue()) + this.a(packet, (GenericFutureListener[]) ArrayUtils.add(agenericfuturelistener, 0, genericfuturelistener)); + } else { + this.j.writeLock().lock(); + + try { + this.i.add(new NetworkManager.QueuedPacket(packet, (GenericFutureListener[]) ArrayUtils.add(agenericfuturelistener, 0, genericfuturelistener))); + } finally { + this.j.writeLock().unlock(); + } + } + + } + + private void dispatchPacket(final Packet packet, @Nullable final GenericFutureListener>[] genericFutureListeners) { this.a(packet, genericFutureListeners); } // Paper - Anti-Xray - OBFHELPER + private void a(final Packet packet, @Nullable final GenericFutureListener>[] agenericfuturelistener) { + final EnumProtocol enumprotocol = EnumProtocol.a(packet); + final EnumProtocol enumprotocol1 = (EnumProtocol) this.channel.attr(NetworkManager.c).get(); + + if (enumprotocol1 != enumprotocol) { + NetworkManager.g.debug("Disabled auto read"); + this.channel.config().setAutoRead(false); + } + + if (this.channel.eventLoop().inEventLoop()) { + if (enumprotocol != enumprotocol1) { + this.setProtocol(enumprotocol); + } + + ChannelFuture channelfuture = this.channel.writeAndFlush(packet); + + if (agenericfuturelistener != null) { + channelfuture.addListeners(agenericfuturelistener); + } + + channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + } else { + this.channel.eventLoop().execute(new Runnable() { + public void run() { + if (enumprotocol != enumprotocol1) { + NetworkManager.this.setProtocol(enumprotocol); + } + + ChannelFuture channelfuture = NetworkManager.this.channel.writeAndFlush(packet); + + if (agenericfuturelistener != null) { + channelfuture.addListeners(agenericfuturelistener); + } + + channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + } + }); + } + + } + + // Paper start - Async-Anti-Xray - Stop dispatching further packets and return false if the peeked packet is a chunk packet which is not ready + private boolean trySendQueue() { return this.m(); } // OBFHELPER + private boolean m() { // void -> boolean + if (this.channel != null && this.channel.isOpen()) { + if (this.i.isEmpty()) { // return if the packet queue is empty so that the write lock by Anti-Xray doesn't affect the vanilla performance at all + return true; + } + + this.j.writeLock().lock(); // readLock -> writeLock (because of race condition between peek and poll) + + try { + while (!this.i.isEmpty()) { + NetworkManager.QueuedPacket networkmanager_queuedpacket = (NetworkManager.QueuedPacket) this.getPacketQueue().peek(); // poll -> peek + + if (networkmanager_queuedpacket != null) { // Fix NPE (Spigot bug caused by handleDisconnection()) + if (networkmanager_queuedpacket.getPacket() instanceof PacketPlayOutMapChunk && !((PacketPlayOutMapChunk) networkmanager_queuedpacket.getPacket()).isReady()) { // Check if the peeked packet is a chunk packet which is not ready + return false; // Return false if the peeked packet is a chunk packet which is not ready + } else { + this.getPacketQueue().poll(); // poll here + this.dispatchPacket(networkmanager_queuedpacket.getPacket(), networkmanager_queuedpacket.getGenericFutureListeners()); // dispatch the packet + } + } + } + } finally { + this.j.writeLock().unlock(); // readLock -> writeLock (because of race condition between peek and poll) + } + + } + + return true; // Return true if all packets were dispatched + } + // Paper end + + public void a() { + this.m(); + if (this.m instanceof ITickable) { + ((ITickable) this.m).e(); + } + + if (this.channel != null) { + if (enableExplicitFlush) this.channel.eventLoop().execute(() -> this.channel.flush()); // Paper - we don't need to explicit flush here, but allow opt in incase issues are found to a better version + } + + } + + public SocketAddress getSocketAddress() { + return this.l; + } + + public void close(IChatBaseComponent ichatbasecomponent) { + // Spigot Start + this.preparing = false; + // Spigot End + if (this.channel.isOpen()) { + this.channel.close(); // We can't wait as this may be called from an event loop. + this.n = ichatbasecomponent; + } + + } + + public boolean isLocal() { + return this.channel instanceof LocalChannel || this.channel instanceof LocalServerChannel; + } + + public void a(SecretKey secretkey) { + this.o = true; + this.channel.pipeline().addBefore("splitter", "decrypt", new PacketDecrypter(MinecraftEncryption.a(2, secretkey))); + this.channel.pipeline().addBefore("prepender", "encrypt", new PacketEncrypter(MinecraftEncryption.a(1, secretkey))); + } + + public boolean isConnected() { + return this.channel != null && this.channel.isOpen(); + } + + public boolean h() { + return this.channel == null; + } + + public PacketListener i() { + return this.m; + } + + public IChatBaseComponent j() { + return this.n; + } + + public void stopReading() { + this.channel.config().setAutoRead(false); + } + + public void setCompressionLevel(int i) { + if (i >= 0) { + if (this.channel.pipeline().get("decompress") instanceof PacketDecompressor) { + ((PacketDecompressor) this.channel.pipeline().get("decompress")).a(i); + } else { + this.channel.pipeline().addBefore("decoder", "decompress", new PacketDecompressor(i)); + } + + if (this.channel.pipeline().get("compress") instanceof PacketCompressor) { + ((PacketCompressor) this.channel.pipeline().get("compress")).a(i); + } else { + this.channel.pipeline().addBefore("encoder", "compress", new PacketCompressor(i)); + } + } else { + if (this.channel.pipeline().get("decompress") instanceof PacketDecompressor) { + this.channel.pipeline().remove("decompress"); + } + + if (this.channel.pipeline().get("compress") instanceof PacketCompressor) { + this.channel.pipeline().remove("compress"); + } + } + + } + + public void handleDisconnection() { + if (this.channel != null && !this.channel.isOpen()) { + if (this.p) { + NetworkManager.g.warn("handleDisconnection() called twice"); + } else { + this.p = true; + if (this.j() != null) { + this.i().a(this.j()); + } else if (this.i() != null) { + this.i().a(new ChatMessage("multiplayer.disconnect.generic", new Object[0])); + } + this.i.clear(); // Free up packet queue. + } + + } + } + + protected void channelRead0(ChannelHandlerContext channelhandlercontext, Packet object) throws Exception { // CraftBukkit - fix decompile error + this.a(channelhandlercontext, (Packet) object); + } + + static class QueuedPacket { + + private final Packet a; private final Packet getPacket() { return this.a; } // Paper - Anti-Xray - OBFHELPER + private final GenericFutureListener>[] b; private final GenericFutureListener>[] getGenericFutureListeners() { return this.b; } // Paper - Anti-Xray - OBFHELPER + + public QueuedPacket(Packet packet, GenericFutureListener>... agenericfuturelistener) { + this.a = packet; + this.b = agenericfuturelistener; + } + } + + // Spigot Start + public SocketAddress getRawAddress() + { + return this.channel.remoteAddress(); + } + // Spigot End +} diff --git a/sources/src/main/resources/mixins.akarin.core.json b/sources/src/main/resources/mixins.akarin.core.json index a9e68d25d..bac9cd1a3 100644 --- a/sources/src/main/resources/mixins.akarin.core.json +++ b/sources/src/main/resources/mixins.akarin.core.json @@ -20,6 +20,7 @@ "MixinVersionCommand", "MixinMinecraftServer", "MixinChunkIOExecutor", + "MixinPlayerConnection", "MixinTileEntityEnchantTable" ] } \ No newline at end of file